1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

Refine SrsRecvThread timeout in time unit

This commit is contained in:
winlin 2019-04-18 08:42:43 +08:00
parent 6b587c5e0f
commit f6f49c68da
3 changed files with 18 additions and 18 deletions

View file

@ -56,7 +56,7 @@ ISrsMessagePumper::~ISrsMessagePumper()
{ {
} }
SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm, int parent_cid) SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, int parent_cid)
{ {
rtmp = r; rtmp = r;
pumper = p; pumper = p;
@ -117,7 +117,7 @@ srs_error_t SrsRecvThread::cycle()
} }
// reset the timeout to pulse mode. // reset the timeout to pulse mode.
rtmp->set_recv_timeout(timeout * SRS_UTIME_MILLISECONDS); rtmp->set_recv_timeout(timeout);
pumper->on_stop(); pumper->on_stop();
@ -135,7 +135,7 @@ srs_error_t SrsRecvThread::do_cycle()
// When the pumper is interrupted, wait then retry. // When the pumper is interrupted, wait then retry.
if (pumper->interrupted()) { if (pumper->interrupted()) {
srs_usleep(timeout * 1000); srs_usleep(timeout);
continue; continue;
} }
@ -160,8 +160,8 @@ srs_error_t SrsRecvThread::do_cycle()
return err; return err;
} }
SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms, int parent_cid) SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, int parent_cid)
: trd(this, rtmp_sdk, timeout_ms, parent_cid) : trd(this, rtmp_sdk, tm, parent_cid)
{ {
_consumer = consumer; _consumer = consumer;
rtmp = rtmp_sdk; rtmp = rtmp_sdk;
@ -274,8 +274,8 @@ void SrsQueueRecvThread::on_stop()
} }
SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source, int parent_cid) int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, int parent_cid)
: trd(this, rtmp_sdk, timeout_ms, parent_cid) : trd(this, rtmp_sdk, tm, parent_cid)
{ {
rtmp = rtmp_sdk; rtmp = rtmp_sdk;
@ -310,14 +310,14 @@ SrsPublishRecvThread::~SrsPublishRecvThread()
srs_freep(recv_error); srs_freep(recv_error);
} }
srs_error_t SrsPublishRecvThread::wait(uint64_t timeout_ms) srs_error_t SrsPublishRecvThread::wait(srs_utime_t tm)
{ {
if (recv_error != srs_success) { if (recv_error != srs_success) {
return srs_error_copy(recv_error); return srs_error_copy(recv_error);
} }
// ignore any return of cond wait. // ignore any return of cond wait.
srs_cond_timedwait(error, timeout_ms * 1000); srs_cond_timedwait(error, tm);
return srs_success; return srs_success;
} }

View file

@ -97,12 +97,12 @@ protected:
ISrsMessagePumper* pumper; ISrsMessagePumper* pumper;
SrsRtmpServer* rtmp; SrsRtmpServer* rtmp;
int _parent_cid; int _parent_cid;
// The recv timeout in ms. // The recv timeout in srs_utime_t.
int timeout; srs_utime_t timeout;
public: public:
// Constructor. // Constructor.
// @param tm The receive timeout in ms. // @param tm The receive timeout in ms.
SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm, int parent_cid); SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, srs_utime_t tm, int parent_cid);
virtual ~SrsRecvThread(); virtual ~SrsRecvThread();
public: public:
virtual int cid(); virtual int cid();
@ -134,7 +134,7 @@ private:
SrsConsumer* _consumer; SrsConsumer* _consumer;
public: public:
// TODO: FIXME: Refine timeout in time unit. // TODO: FIXME: Refine timeout in time unit.
SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms, int parent_cid); SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, srs_utime_t tm, int parent_cid);
virtual ~SrsQueueRecvThread(); virtual ~SrsQueueRecvThread();
public: public:
virtual srs_error_t start(); virtual srs_error_t start();
@ -191,13 +191,13 @@ private:
int ncid; int ncid;
public: public:
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
int mr_sock_fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source, int parent_cid); int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, int parent_cid);
virtual ~SrsPublishRecvThread(); virtual ~SrsPublishRecvThread();
public: public:
/** /**
* wait for error for some timeout. * wait for error for some timeout.
*/ */
virtual srs_error_t wait(uint64_t timeout_ms); virtual srs_error_t wait(srs_utime_t tm);
virtual int64_t nb_msgs(); virtual int64_t nb_msgs();
virtual uint64_t nb_video_frames(); virtual uint64_t nb_video_frames();
virtual srs_error_t error_code(); virtual srs_error_t error_code();

View file

@ -646,7 +646,7 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
// Use receiving thread to receive packets from peer. // Use receiving thread to receive packets from peer.
// @see: https://github.com/ossrs/srs/issues/217 // @see: https://github.com/ossrs/srs/issues/217
SrsQueueRecvThread trd(consumer, rtmp, srsu2msi(SRS_PERF_MW_SLEEP), _srs_context->get_id()); SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
if ((err = trd.start()) != srs_success) { if ((err = trd.start()) != srs_success) {
return srs_error_wrap(err, "rtmp: start receive thread"); return srs_error_wrap(err, "rtmp: start receive thread");
@ -874,9 +874,9 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread*
if (nb_msgs == 0) { if (nb_msgs == 0) {
// when not got msgs, wait for a larger timeout. // when not got msgs, wait for a larger timeout.
// @see https://github.com/ossrs/srs/issues/441 // @see https://github.com/ossrs/srs/issues/441
rtrd->wait(srsu2msi(publish_1stpkt_timeout)); rtrd->wait(publish_1stpkt_timeout);
} else { } else {
rtrd->wait(srsu2msi(publish_normal_timeout)); rtrd->wait(publish_normal_timeout);
} }
// check the thread error code. // check the thread error code.