diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 9dc4d2363..f12b028ef 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -134,9 +134,10 @@ void SrsRecvThread::on_thread_stop() handler->on_thread_stop(); } -SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms) +SrsQueueRecvThread::SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms) : trd(this, rtmp_sdk, timeout_ms) { + _consumer = consumer; rtmp = rtmp_sdk; recv_error_code = ERROR_SUCCESS; _consumer = NULL; @@ -237,11 +238,6 @@ void SrsQueueRecvThread::on_thread_stop() rtmp->set_auto_response(true); } -void SrsQueueRecvThread::set_consumer(SrsConsumer *consumer) -{ - _consumer = consumer; -} - SrsPublishRecvThread::SrsPublishRecvThread( SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 223c67944..c3caf2068 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -113,9 +113,9 @@ private: SrsRtmpServer* rtmp; // the recv thread error code. int recv_error_code; - SrsConsumer *_consumer; + SrsConsumer* _consumer; public: - SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms); + SrsQueueRecvThread(SrsConsumer* consumer, SrsRtmpServer* rtmp_sdk, int timeout_ms); virtual ~SrsQueueRecvThread(); public: virtual int start(); @@ -132,8 +132,6 @@ public: public: virtual void on_thread_start(); virtual void on_thread_stop(); -public: - virtual void set_consumer(SrsConsumer *consumer); }; /** diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index b7bb4d815..bce2175be 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -560,9 +560,18 @@ int SrsRtmpConn::playing(SrsSource* source) { int ret = ERROR_SUCCESS; + // create consumer of souce. + SrsConsumer* consumer = NULL; + if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) { + srs_error("create consumer failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsConsumer, consumer); + srs_verbose("consumer created success."); + // use isolate thread to recv, // @see: https://github.com/winlinvip/simple-rtmp-server/issues/217 - SrsQueueRecvThread trd(rtmp, SRS_PERF_MW_SLEEP); + SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP); // start isolate recv thread. if ((ret = trd.start()) != ERROR_SUCCESS) { @@ -571,7 +580,7 @@ int SrsRtmpConn::playing(SrsSource* source) } // delivery messages for clients playing stream. - ret = do_playing(source, &trd); + ret = do_playing(source, consumer, &trd); // stop isolate recv thread trd.stop(); @@ -584,27 +593,18 @@ int SrsRtmpConn::playing(SrsSource* source) return ret; } -int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) +int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd) { int ret = ERROR_SUCCESS; + srs_assert(consumer != NULL); + if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != ERROR_SUCCESS) { srs_error("check play_refer failed. ret=%d", ret); return ret; } srs_verbose("check play_refer success."); - SrsConsumer* consumer = NULL; - if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) { - srs_error("create consumer failed. ret=%d", ret); - return ret; - } - - srs_assert(consumer != NULL); - SrsAutoFree(SrsConsumer, consumer); - trd->set_consumer(consumer); - srs_verbose("consumer created success."); - // initialize other components SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); SrsMessageArray msgs(SRS_PERF_MW_MSGS); diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index ea3fc7619..bca501ad4 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -103,7 +103,7 @@ private: virtual int stream_service_cycle(); virtual int check_vhost(); virtual int playing(SrsSource* source); - virtual int do_playing(SrsSource* source, SrsQueueRecvThread* trd); + virtual int do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd); virtual int fmle_publishing(SrsSource* source); virtual int flash_publishing(SrsSource* source); virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd);