diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 1e16dc67e..3f07fd449 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -488,7 +488,7 @@ srs_error_t SrsEdgeForwarder::start() } srs_freep(trd); - trd = new SrsSTCoroutine("edge-fwr", this); + trd = new SrsSTCoroutine("edge-fwr", this, _srs_context->get_id()); if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "coroutine"); diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index e9b5f5f52..56757bbf0 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -56,11 +56,12 @@ ISrsMessagePumper::~ISrsMessagePumper() { } -SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm) +SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm, int parent_cid) { rtmp = r; pumper = p; timeout = tm; + _parent_cid = parent_cid; trd = new SrsDummyCoroutine(); } @@ -79,7 +80,7 @@ srs_error_t SrsRecvThread::start() srs_error_t err = srs_success; srs_freep(trd); - trd = new SrsSTCoroutine("recv", this); + trd = new SrsSTCoroutine("recv", this, _parent_cid); if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "recv thread"); @@ -159,8 +160,8 @@ srs_error_t SrsRecvThread::do_cycle() return err; } -SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms) -: trd(this, rtmp_sdk, timeout_ms) +SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms, int parent_cid) + : trd(this, rtmp_sdk, timeout_ms, parent_cid) { _consumer = consumer; rtmp = rtmp_sdk; @@ -272,8 +273,9 @@ void SrsQueueRecvThread::on_stop() rtmp->set_auto_response(true); } -SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source) - : trd(this, rtmp_sdk, timeout_ms) +SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, + int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source, int parent_cid) + : trd(this, rtmp_sdk, timeout_ms, parent_cid) { rtmp = rtmp_sdk; diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 785f779ab..349b39bea 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -96,12 +96,13 @@ protected: SrsCoroutine* trd; ISrsMessagePumper* pumper; SrsRtmpServer* rtmp; + int _parent_cid; // The recv timeout in ms. int timeout; public: // Constructor. // @param tm The receive timeout in ms. - SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm); + SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm, int parent_cid); virtual ~SrsRecvThread(); public: virtual int cid(); @@ -133,7 +134,7 @@ private: SrsConsumer* _consumer; public: // TODO: FIXME: Refine timeout in time unit. - SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms); + SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms, int parent_cid); virtual ~SrsQueueRecvThread(); public: virtual srs_error_t start(); @@ -189,7 +190,8 @@ private: int cid; int ncid; public: - SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source); + SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, + int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source, int parent_cid); virtual ~SrsPublishRecvThread(); public: /** diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 61c0f2c40..c4c48b7b7 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -646,7 +646,7 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source) // Use receiving thread to receive packets from peer. // @see: https://github.com/ossrs/srs/issues/217 - SrsQueueRecvThread trd(consumer, rtmp, srsu2msi(SRS_PERF_MW_SLEEP)); + SrsQueueRecvThread trd(consumer, rtmp, srsu2msi(SRS_PERF_MW_SLEEP), _srs_context->get_id()); if ((err = trd.start()) != srs_success) { return srs_error_wrap(err, "rtmp: start receive thread"); @@ -815,7 +815,7 @@ srs_error_t SrsRtmpConn::publishing(SrsSource* source) if ((err = acquire_publish(source)) == srs_success) { // use isolate thread to recv, // @see: https://github.com/ossrs/srs/issues/237 - SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source); + SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id()); err = do_publishing(source, &rtrd); rtrd.stop(); } @@ -847,11 +847,6 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* return srs_error_wrap(err, "rtmp: receive thread"); } - // change the isolate recv thread context id, - // merge its log to current thread. - int receive_thread_cid = rtrd->get_cid(); - rtrd->set_cid(_srs_context->get_id()); - // initialize the publish timeout. publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost); @@ -862,9 +857,8 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* if (true) { bool mr = _srs_config->get_mr_enabled(req->vhost); srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost); - srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d, rtcid=%d", - mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), - tcp_nodelay, receive_thread_cid); + srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d", + mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), tcp_nodelay); } int64_t nb_msgs = 0; diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index 2869f13cb..d99517c68 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -289,7 +289,7 @@ srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) if (rtm == SRS_UTIME_NO_TIMEOUT) { nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_read = st_read((st_netfd_t)stfd, buf, size, rtm * 1000); + nb_read = st_read((st_netfd_t)stfd, buf, size, rtm); } if (nread) { @@ -325,7 +325,7 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) if (rtm == SRS_UTIME_NO_TIMEOUT) { nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm * 1000); + nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm); } if (nread) { @@ -361,7 +361,7 @@ srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) if (stm == SRS_UTIME_NO_TIMEOUT) { nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { - nb_write = st_write((st_netfd_t)stfd, buf, size, stm * 1000); + nb_write = st_write((st_netfd_t)stfd, buf, size, stm); } if (nwrite) { @@ -392,7 +392,7 @@ srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) if (stm == SRS_UTIME_NO_TIMEOUT) { nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); } else { - nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm * 1000); + nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm); } if (nwrite) {