mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Refine SrsStSocket in time unit
This commit is contained in:
parent
5a45fd98c3
commit
feb34ed225
5 changed files with 22 additions and 24 deletions
|
@ -488,7 +488,7 @@ srs_error_t SrsEdgeForwarder::start()
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_freep(trd);
|
srs_freep(trd);
|
||||||
trd = new SrsSTCoroutine("edge-fwr", this);
|
trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id());
|
||||||
|
|
||||||
if ((err = trd->start()) != srs_success) {
|
if ((err = trd->start()) != srs_success) {
|
||||||
return srs_error_wrap(err, "coroutine");
|
return srs_error_wrap(err, "coroutine");
|
||||||
|
|
|
@ -56,11 +56,12 @@ ISrsMessagePumper::~ISrsMessagePumper()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm)
|
SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm, int parent_cid)
|
||||||
{
|
{
|
||||||
rtmp = r;
|
rtmp = r;
|
||||||
pumper = p;
|
pumper = p;
|
||||||
timeout = tm;
|
timeout = tm;
|
||||||
|
_parent_cid = parent_cid;
|
||||||
trd = new SrsDummyCoroutine();
|
trd = new SrsDummyCoroutine();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +80,7 @@ srs_error_t SrsRecvThread::start()
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
srs_freep(trd);
|
srs_freep(trd);
|
||||||
trd = new SrsSTCoroutine("recv", this);
|
trd = new SrsSTCoroutine("recv", this, _parent_cid);
|
||||||
|
|
||||||
if ((err = trd->start()) != srs_success) {
|
if ((err = trd->start()) != srs_success) {
|
||||||
return srs_error_wrap(err, "recv thread");
|
return srs_error_wrap(err, "recv thread");
|
||||||
|
@ -159,8 +160,8 @@ srs_error_t SrsRecvThread::do_cycle()
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms)
|
SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms, int parent_cid)
|
||||||
: trd(this, rtmp_sdk, timeout_ms)
|
: trd(this, rtmp_sdk, timeout_ms, parent_cid)
|
||||||
{
|
{
|
||||||
_consumer = consumer;
|
_consumer = consumer;
|
||||||
rtmp = rtmp_sdk;
|
rtmp = rtmp_sdk;
|
||||||
|
@ -272,8 +273,9 @@ void SrsQueueRecvThread::on_stop()
|
||||||
rtmp->set_auto_response(true);
|
rtmp->set_auto_response(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source)
|
SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
|
||||||
: trd(this, rtmp_sdk, timeout_ms)
|
int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source, int parent_cid)
|
||||||
|
: trd(this, rtmp_sdk, timeout_ms, parent_cid)
|
||||||
{
|
{
|
||||||
rtmp = rtmp_sdk;
|
rtmp = rtmp_sdk;
|
||||||
|
|
||||||
|
|
|
@ -96,12 +96,13 @@ protected:
|
||||||
SrsCoroutine* trd;
|
SrsCoroutine* trd;
|
||||||
ISrsMessagePumper* pumper;
|
ISrsMessagePumper* pumper;
|
||||||
SrsRtmpServer* rtmp;
|
SrsRtmpServer* rtmp;
|
||||||
|
int _parent_cid;
|
||||||
// The recv timeout in ms.
|
// The recv timeout in ms.
|
||||||
int timeout;
|
int timeout;
|
||||||
public:
|
public:
|
||||||
// Constructor.
|
// Constructor.
|
||||||
// @param tm The receive timeout in ms.
|
// @param tm The receive timeout in ms.
|
||||||
SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm);
|
SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm, int parent_cid);
|
||||||
virtual ~SrsRecvThread();
|
virtual ~SrsRecvThread();
|
||||||
public:
|
public:
|
||||||
virtual int cid();
|
virtual int cid();
|
||||||
|
@ -133,7 +134,7 @@ private:
|
||||||
SrsConsumer* _consumer;
|
SrsConsumer* _consumer;
|
||||||
public:
|
public:
|
||||||
// TODO: FIXME: Refine timeout in time unit.
|
// TODO: FIXME: Refine timeout in time unit.
|
||||||
SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms);
|
SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms, int parent_cid);
|
||||||
virtual ~SrsQueueRecvThread();
|
virtual ~SrsQueueRecvThread();
|
||||||
public:
|
public:
|
||||||
virtual srs_error_t start();
|
virtual srs_error_t start();
|
||||||
|
@ -189,7 +190,8 @@ private:
|
||||||
int cid;
|
int cid;
|
||||||
int ncid;
|
int ncid;
|
||||||
public:
|
public:
|
||||||
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source);
|
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
|
||||||
|
int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source, int parent_cid);
|
||||||
virtual ~SrsPublishRecvThread();
|
virtual ~SrsPublishRecvThread();
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -646,7 +646,7 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
|
||||||
|
|
||||||
// Use receiving thread to receive packets from peer.
|
// Use receiving thread to receive packets from peer.
|
||||||
// @see: https://github.com/ossrs/srs/issues/217
|
// @see: https://github.com/ossrs/srs/issues/217
|
||||||
SrsQueueRecvThread trd(consumer, rtmp, srsu2msi(SRS_PERF_MW_SLEEP));
|
SrsQueueRecvThread trd(consumer, rtmp, srsu2msi(SRS_PERF_MW_SLEEP), _srs_context->get_id());
|
||||||
|
|
||||||
if ((err = trd.start()) != srs_success) {
|
if ((err = trd.start()) != srs_success) {
|
||||||
return srs_error_wrap(err, "rtmp: start receive thread");
|
return srs_error_wrap(err, "rtmp: start receive thread");
|
||||||
|
@ -815,7 +815,7 @@ srs_error_t SrsRtmpConn::publishing(SrsSource* source)
|
||||||
if ((err = acquire_publish(source)) == srs_success) {
|
if ((err = acquire_publish(source)) == srs_success) {
|
||||||
// use isolate thread to recv,
|
// use isolate thread to recv,
|
||||||
// @see: https://github.com/ossrs/srs/issues/237
|
// @see: https://github.com/ossrs/srs/issues/237
|
||||||
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source);
|
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
|
||||||
err = do_publishing(source, &rtrd);
|
err = do_publishing(source, &rtrd);
|
||||||
rtrd.stop();
|
rtrd.stop();
|
||||||
}
|
}
|
||||||
|
@ -847,11 +847,6 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
|
||||||
return srs_error_wrap(err, "rtmp: receive thread");
|
return srs_error_wrap(err, "rtmp: receive thread");
|
||||||
}
|
}
|
||||||
|
|
||||||
// change the isolate recv thread context id,
|
|
||||||
// merge its log to current thread.
|
|
||||||
int receive_thread_cid = rtrd->get_cid();
|
|
||||||
rtrd->set_cid(_srs_context->get_id());
|
|
||||||
|
|
||||||
// initialize the publish timeout.
|
// initialize the publish timeout.
|
||||||
publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
|
publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
|
||||||
publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
|
publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
|
||||||
|
@ -862,9 +857,8 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
|
||||||
if (true) {
|
if (true) {
|
||||||
bool mr = _srs_config->get_mr_enabled(req->vhost);
|
bool mr = _srs_config->get_mr_enabled(req->vhost);
|
||||||
srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
|
srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
|
||||||
srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d, rtcid=%d",
|
srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d",
|
||||||
mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout),
|
mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), tcp_nodelay);
|
||||||
tcp_nodelay, receive_thread_cid);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t nb_msgs = 0;
|
int64_t nb_msgs = 0;
|
||||||
|
|
|
@ -289,7 +289,7 @@ srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
|
||||||
if (rtm == SRS_UTIME_NO_TIMEOUT) {
|
if (rtm == SRS_UTIME_NO_TIMEOUT) {
|
||||||
nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
|
nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
|
||||||
} else {
|
} else {
|
||||||
nb_read = st_read((st_netfd_t)stfd, buf, size, rtm * 1000);
|
nb_read = st_read((st_netfd_t)stfd, buf, size, rtm);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nread) {
|
if (nread) {
|
||||||
|
@ -325,7 +325,7 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
|
||||||
if (rtm == SRS_UTIME_NO_TIMEOUT) {
|
if (rtm == SRS_UTIME_NO_TIMEOUT) {
|
||||||
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
|
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
|
||||||
} else {
|
} else {
|
||||||
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm * 1000);
|
nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nread) {
|
if (nread) {
|
||||||
|
@ -361,7 +361,7 @@ srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
|
||||||
if (stm == SRS_UTIME_NO_TIMEOUT) {
|
if (stm == SRS_UTIME_NO_TIMEOUT) {
|
||||||
nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
|
nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);
|
||||||
} else {
|
} else {
|
||||||
nb_write = st_write((st_netfd_t)stfd, buf, size, stm * 1000);
|
nb_write = st_write((st_netfd_t)stfd, buf, size, stm);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nwrite) {
|
if (nwrite) {
|
||||||
|
@ -392,7 +392,7 @@ srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
|
||||||
if (stm == SRS_UTIME_NO_TIMEOUT) {
|
if (stm == SRS_UTIME_NO_TIMEOUT) {
|
||||||
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
|
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT);
|
||||||
} else {
|
} else {
|
||||||
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm * 1000);
|
nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nwrite) {
|
if (nwrite) {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue