diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp old mode 100644 new mode 100755 index 310c5ebec..d73204de9 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include using namespace std; @@ -138,11 +139,12 @@ SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms) { rtmp = rtmp_sdk; recv_error_code = ERROR_SUCCESS; + _consumer = NULL; } SrsQueueRecvThread::~SrsQueueRecvThread() { - trd.stop(); + stop(); // clear all messages. std::vector::iterator it; @@ -160,6 +162,7 @@ int SrsQueueRecvThread::start() void SrsQueueRecvThread::stop() { + _consumer = NULL; trd.stop(); } @@ -203,13 +206,22 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg) // put into queue, the send thread will get and process it, // @see SrsRtmpConn::process_play_control_msg queue.push_back(msg); - +#ifdef SRS_PERF_QUEUE_COND_WAIT + if (_consumer) { + _consumer->on_dispose(); + } +#endif return ERROR_SUCCESS; } void SrsQueueRecvThread::on_recv_error(int ret) { recv_error_code = ret; +#ifdef SRS_PERF_QUEUE_COND_WAIT + if (_consumer) { + _consumer->on_dispose(); + } +#endif } void SrsQueueRecvThread::on_thread_start() @@ -226,6 +238,11 @@ void SrsQueueRecvThread::on_thread_stop() rtmp->set_auto_response(true); } +void SrsQueueRecvThread::set_consumer(SrsConsumer *consumer) +{ + _consumer = consumer; +} + SrsPublishRecvThread::SrsPublishRecvThread( SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp old mode 100644 new mode 100755 index e97639d1d..cf5fc39ce --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -42,6 +42,7 @@ class SrsCommonMessage; class SrsRtmpConn; class SrsSource; class SrsRequest; +class SrsConsumer; /** * for the recv thread to handle the message. @@ -112,6 +113,7 @@ private: SrsRtmpServer* rtmp; // the recv thread error code. int recv_error_code; + SrsConsumer *_consumer; public: SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms); virtual ~SrsQueueRecvThread(); @@ -130,6 +132,8 @@ public: public: virtual void on_thread_start(); virtual void on_thread_stop(); +public: + virtual void set_consumer(SrsConsumer *consumer); }; /** diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp old mode 100644 new mode 100755 index 230bb851c..7a742eaa5 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -595,6 +595,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) // when mw_sleep changed, resize the socket send buffer. mw_enabled = true; change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost)); + trd->set_consumer(consumer); while (true) { // to use isolate thread to recv, can improve about 33% performance. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp old mode 100644 new mode 100755 index efece4516..ee75751cc --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -515,6 +515,14 @@ void SrsConsumer::wait(int nb_msgs, int duration) // use cond block wait for high performance mode. st_cond_wait(mw_wait); } + +void SrsConsumer::on_dispose() +{ + if (mw_waiting) { + st_cond_signal(mw_wait); + mw_waiting = false; + } +} #endif int SrsConsumer::on_play_client_pause(bool is_pause) diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp old mode 100644 new mode 100755 index ad7a060a4..eac2531d2 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -246,6 +246,10 @@ public: * @param duration the messgae duration to wait. */ virtual void wait(int nb_msgs, int duration); + /** + * when waiting, a message incomming, we rouse it + */ + virtual void on_dispose(); #endif /** * when client send the pause message.