diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 56757bbf0..8034da82d 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -56,7 +56,7 @@ ISrsMessagePumper::~ISrsMessagePumper() { } -SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm, int parent_cid) +SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, int parent_cid) { rtmp = r; pumper = p; @@ -117,7 +117,7 @@ srs_error_t SrsRecvThread::cycle() } // reset the timeout to pulse mode. - rtmp->set_recv_timeout(timeout * SRS_UTIME_MILLISECONDS); + rtmp->set_recv_timeout(timeout); pumper->on_stop(); @@ -135,7 +135,7 @@ srs_error_t SrsRecvThread::do_cycle() // When the pumper is interrupted, wait then retry. if (pumper->interrupted()) { - srs_usleep(timeout * 1000); + srs_usleep(timeout); continue; } @@ -160,8 +160,8 @@ srs_error_t SrsRecvThread::do_cycle() return err; } -SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms, int parent_cid) - : trd(this, rtmp_sdk, timeout_ms, parent_cid) +SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, int parent_cid) + : trd(this, rtmp_sdk, tm, parent_cid) { _consumer = consumer; rtmp = rtmp_sdk; @@ -274,8 +274,8 @@ void SrsQueueRecvThread::on_stop() } SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, - int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source, int parent_cid) - : trd(this, rtmp_sdk, timeout_ms, parent_cid) + int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, int parent_cid) + : trd(this, rtmp_sdk, tm, parent_cid) { rtmp = rtmp_sdk; @@ -310,14 +310,14 @@ SrsPublishRecvThread::~SrsPublishRecvThread() srs_freep(recv_error); } -srs_error_t SrsPublishRecvThread::wait(uint64_t timeout_ms) +srs_error_t SrsPublishRecvThread::wait(srs_utime_t tm) { if (recv_error != srs_success) { return srs_error_copy(recv_error); } // ignore any return of cond wait. - srs_cond_timedwait(error, timeout_ms * 1000); + srs_cond_timedwait(error, tm); return srs_success; } diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 349b39bea..477b43f29 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -97,12 +97,12 @@ protected: ISrsMessagePumper* pumper; SrsRtmpServer* rtmp; int _parent_cid; - // The recv timeout in ms. - int timeout; + // The recv timeout in srs_utime_t. + srs_utime_t timeout; public: // Constructor. // @param tm The receive timeout in ms. - SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm, int parent_cid); + SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, int parent_cid); virtual ~SrsRecvThread(); public: virtual int cid(); @@ -134,7 +134,7 @@ private: SrsConsumer* _consumer; public: // TODO: FIXME: Refine timeout in time unit. - SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms, int parent_cid); + SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, int parent_cid); virtual ~SrsQueueRecvThread(); public: virtual srs_error_t start(); @@ -191,13 +191,13 @@ private: int ncid; public: SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, - int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source, int parent_cid); + int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, int parent_cid); virtual ~SrsPublishRecvThread(); public: /** * wait for error for some timeout. */ - virtual srs_error_t wait(uint64_t timeout_ms); + virtual srs_error_t wait(srs_utime_t tm); virtual int64_t nb_msgs(); virtual uint64_t nb_video_frames(); virtual srs_error_t error_code(); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 275f32ebe..5ddc91411 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -646,7 +646,7 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source) // Use receiving thread to receive packets from peer. // @see: https://github.com/ossrs/srs/issues/217 - SrsQueueRecvThread trd(consumer, rtmp, srsu2msi(SRS_PERF_MW_SLEEP), _srs_context->get_id()); + SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id()); if ((err = trd.start()) != srs_success) { return srs_error_wrap(err, "rtmp: start receive thread"); @@ -874,9 +874,9 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* if (nb_msgs == 0) { // when not got msgs, wait for a larger timeout. // @see https://github.com/ossrs/srs/issues/441 - rtrd->wait(srsu2msi(publish_1stpkt_timeout)); + rtrd->wait(publish_1stpkt_timeout); } else { - rtrd->wait(srsu2msi(publish_normal_timeout)); + rtrd->wait(publish_normal_timeout); } // check the thread error code.