From 1e34d2a5cd0cfd15ff510cc7d4831225a006871a Mon Sep 17 00:00:00 2001 From: zhengfl Date: Thu, 2 Oct 2014 19:45:04 +0800 Subject: [PATCH 1/3] client connection no disconnect --- trunk/src/app/srs_app_recv_thread.cpp | 23 +++++++++++++++++++++-- trunk/src/app/srs_app_recv_thread.hpp | 4 ++++ trunk/src/app/srs_app_rtmp_conn.cpp | 1 + trunk/src/app/srs_app_source.cpp | 8 ++++++++ trunk/src/app/srs_app_source.hpp | 4 ++++ 5 files changed, 38 insertions(+), 2 deletions(-) mode change 100644 => 100755 trunk/src/app/srs_app_recv_thread.cpp mode change 100644 => 100755 trunk/src/app/srs_app_recv_thread.hpp mode change 100644 => 100755 trunk/src/app/srs_app_rtmp_conn.cpp mode change 100644 => 100755 trunk/src/app/srs_app_source.cpp mode change 100644 => 100755 trunk/src/app/srs_app_source.hpp diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp old mode 100644 new mode 100755 index 310c5ebec..3c32a8030 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include using namespace std; @@ -138,11 +139,12 @@ SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms) { rtmp = rtmp_sdk; recv_error_code = ERROR_SUCCESS; + _consumer = NULL; } SrsQueueRecvThread::~SrsQueueRecvThread() { - trd.stop(); + stop(); // clear all messages. std::vector::iterator it; @@ -160,6 +162,7 @@ int SrsQueueRecvThread::start() void SrsQueueRecvThread::stop() { + _consumer = NULL; trd.stop(); } @@ -195,7 +198,13 @@ bool SrsQueueRecvThread::can_handle() // for the message may cause the thread to stop, // when stop, the thread is freed, so the messages // are dropped. - return empty(); + bool e = empty(); +#ifdef SRS_PERF_QUEUE_COND_WAIT + if (_consumer && !e) { + _consumer->on_dispose(); + } +#endif + return e; } int SrsQueueRecvThread::handle(SrsCommonMessage* msg) @@ -209,6 +218,11 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg) void SrsQueueRecvThread::on_recv_error(int ret) { +#ifdef SRS_PERF_QUEUE_COND_WAIT + if (_consumer) { + _consumer->on_dispose(); + } +#endif recv_error_code = ret; } @@ -226,6 +240,11 @@ void SrsQueueRecvThread::on_thread_stop() rtmp->set_auto_response(true); } +void SrsQueueRecvThread::set_consumer(SrsConsumer *consumer) +{ + _consumer = consumer; +} + SrsPublishRecvThread::SrsPublishRecvThread( SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp old mode 100644 new mode 100755 index e97639d1d..cf5fc39ce --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -42,6 +42,7 @@ class SrsCommonMessage; class SrsRtmpConn; class SrsSource; class SrsRequest; +class SrsConsumer; /** * for the recv thread to handle the message. @@ -112,6 +113,7 @@ private: SrsRtmpServer* rtmp; // the recv thread error code. int recv_error_code; + SrsConsumer *_consumer; public: SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms); virtual ~SrsQueueRecvThread(); @@ -130,6 +132,8 @@ public: public: virtual void on_thread_start(); virtual void on_thread_stop(); +public: + virtual void set_consumer(SrsConsumer *consumer); }; /** diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp old mode 100644 new mode 100755 index 230bb851c..7a742eaa5 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -595,6 +595,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) // when mw_sleep changed, resize the socket send buffer. mw_enabled = true; change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost)); + trd->set_consumer(consumer); while (true) { // to use isolate thread to recv, can improve about 33% performance. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp old mode 100644 new mode 100755 index efece4516..ee75751cc --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -515,6 +515,14 @@ void SrsConsumer::wait(int nb_msgs, int duration) // use cond block wait for high performance mode. st_cond_wait(mw_wait); } + +void SrsConsumer::on_dispose() +{ + if (mw_waiting) { + st_cond_signal(mw_wait); + mw_waiting = false; + } +} #endif int SrsConsumer::on_play_client_pause(bool is_pause) diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp old mode 100644 new mode 100755 index ad7a060a4..eac2531d2 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -246,6 +246,10 @@ public: * @param duration the messgae duration to wait. */ virtual void wait(int nb_msgs, int duration); + /** + * when waiting, a message incomming, we rouse it + */ + virtual void on_dispose(); #endif /** * when client send the pause message. From bafdd8312261a7eb9bd3f0dfde1752563e9c24b6 Mon Sep 17 00:00:00 2001 From: zhengfl Date: Thu, 2 Oct 2014 19:51:01 +0800 Subject: [PATCH 2/3] last --- trunk/src/app/srs_app_recv_thread.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 3c32a8030..afce8a81f 100755 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -218,12 +218,12 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg) void SrsQueueRecvThread::on_recv_error(int ret) { + recv_error_code = ret; #ifdef SRS_PERF_QUEUE_COND_WAIT if (_consumer) { _consumer->on_dispose(); } #endif - recv_error_code = ret; } void SrsQueueRecvThread::on_thread_start() From 2317f0e7672276b3f220162401f3d78026e53a38 Mon Sep 17 00:00:00 2001 From: zhengfl Date: Thu, 2 Oct 2014 20:25:36 +0800 Subject: [PATCH 3/3] refine --- trunk/src/app/srs_app_recv_thread.cpp | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index afce8a81f..d73204de9 100755 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -198,13 +198,7 @@ bool SrsQueueRecvThread::can_handle() // for the message may cause the thread to stop, // when stop, the thread is freed, so the messages // are dropped. - bool e = empty(); -#ifdef SRS_PERF_QUEUE_COND_WAIT - if (_consumer && !e) { - _consumer->on_dispose(); - } -#endif - return e; + return empty(); } int SrsQueueRecvThread::handle(SrsCommonMessage* msg) @@ -212,7 +206,11 @@ int SrsQueueRecvThread::handle(SrsCommonMessage* msg) // put into queue, the send thread will get and process it, // @see SrsRtmpConn::process_play_control_msg queue.push_back(msg); - +#ifdef SRS_PERF_QUEUE_COND_WAIT + if (_consumer) { + _consumer->on_dispose(); + } +#endif return ERROR_SUCCESS; }