diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 261578479..ce8fa590a 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -26,20 +26,101 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk) +ISrsMessageHandler::ISrsMessageHandler() { +} + +ISrsMessageHandler::~ISrsMessageHandler() +{ +} + +SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk) +{ + handler = msg_handler; rtmp = rtmp_sdk; trd = new SrsThread(this, 0, true); } -SrsQueueRecvThread::~SrsQueueRecvThread() +SrsRecvThread::~SrsRecvThread() { // stop recv thread. stop(); - + // destroy the thread. srs_freep(trd); - +} + +int SrsRecvThread::start() +{ + return trd->start(); +} + +void SrsRecvThread::stop() +{ + trd->stop(); +} + +int SrsRecvThread::cycle() +{ + int ret = ERROR_SUCCESS; + + if (!handler->can_handle()) { + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + return ret; + } + + SrsMessage* msg = NULL; + + if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("recv client control message failed. ret=%d", ret); + } + + // we use no timeout to recv, should never got any error. + trd->stop_loop(); + + return ret; + } + srs_verbose("play loop recv message. ret=%d", ret); + + handler->handle(msg); + + return ret; +} + +void SrsRecvThread::on_thread_start() +{ + // the multiple messages writev improve performance large, + // but the timeout recv will cause 33% sys call performance, + // to use isolate thread to recv, can improve about 33% performance. + // @see https://github.com/winlinvip/simple-rtmp-server/issues/194 + // @see: https://github.com/winlinvip/simple-rtmp-server/issues/217 + rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT); + + // disable the protocol auto response, + // for the isolate recv thread should never send any messages. + rtmp->set_auto_response(false); +} + +void SrsRecvThread::on_thread_stop() +{ + // enable the protocol auto response, + // for the isolate recv thread terminated. + rtmp->set_auto_response(true); + + // reset the timeout to pulse mode. + rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); +} + +SrsQueueRecvThread::SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk) + : SrsRecvThread(this, rtmp_sdk) +{ +} + +SrsQueueRecvThread::~SrsQueueRecvThread() +{ + stop(); + // clear all messages. std::vector::iterator it; for (it = queue.begin(); it != queue.end(); ++it) { @@ -70,71 +151,20 @@ SrsMessage* SrsQueueRecvThread::pump() return msg; } -int SrsQueueRecvThread::start() +bool SrsQueueRecvThread::can_handle() { - return trd->start(); -} - -void SrsQueueRecvThread::stop() -{ - trd->stop(); -} - -int SrsQueueRecvThread::cycle() -{ - int ret = ERROR_SUCCESS; - // we only recv one message and then process it, // for the message may cause the thread to stop, // when stop, the thread is freed, so the messages // are dropped. - if (!queue.empty()) { - st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); - return ret; - } - - SrsMessage* msg = NULL; - - if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("recv client control message failed. ret=%d", ret); - } - - // we use no timeout to recv, should never got any error. - trd->stop_loop(); - - return ret; - } - srs_verbose("play loop recv message. ret=%d", ret); - + return empty(); +} + +int SrsQueueRecvThread::handle(SrsMessage* msg) +{ // put into queue, the send thread will get and process it, // @see SrsRtmpConn::process_play_control_msg queue.push_back(msg); - - return ret; -} -void SrsQueueRecvThread::on_thread_start() -{ - // the multiple messages writev improve performance large, - // but the timeout recv will cause 33% sys call performance, - // to use isolate thread to recv, can improve about 33% performance. - // @see https://github.com/winlinvip/simple-rtmp-server/issues/194 - // @see: https://github.com/winlinvip/simple-rtmp-server/issues/217 - rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT); - - // disable the protocol auto response, - // for the isolate recv thread should never send any messages. - rtmp->set_auto_response(false); + return ERROR_SUCCESS; } - -void SrsQueueRecvThread::on_thread_stop() -{ - // enable the protocol auto response, - // for the isolate recv thread terminated. - rtmp->set_auto_response(true); - - // reset the timeout to pulse mode. - rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); -} - diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 29cd74b27..914be09e3 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -38,24 +38,39 @@ class SrsRtmpServer; class SrsMessage; /** -* the recv thread used to replace the timeout recv, -* which hurt performance for the epoll_ctrl is frequently used. -* @see: SrsRtmpConn::playing -* @see: https://github.com/winlinvip/simple-rtmp-server/issues/217 -*/ -class SrsQueueRecvThread : public ISrsThreadHandler + * for the recv thread to handle the message. + */ +class ISrsMessageHandler { -private: +public: + ISrsMessageHandler(); + virtual ~ISrsMessageHandler(); +public: + /** + * whether the handler can handle, + * for example, when queue recv handler got an message, + * it wait the user to process it, then the recv thread + * never recv message util the handler is ok. + */ + virtual bool can_handle() = 0; + /** + * process the received message. + */ + virtual int handle(SrsMessage* msg) = 0; +}; + +/** + * the recv thread, use message handler to handle each received message. + */ +class SrsRecvThread : public ISrsThreadHandler +{ +protected: SrsThread* trd; + ISrsMessageHandler* handler; SrsRtmpServer* rtmp; - std::vector queue; public: - SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk); - virtual ~SrsQueueRecvThread(); -public: - virtual bool empty(); - virtual int size(); - virtual SrsMessage* pump(); + SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk); + virtual ~SrsRecvThread(); public: virtual int start(); virtual void stop(); @@ -65,5 +80,27 @@ public: virtual void on_thread_stop(); }; +/** +* the recv thread used to replace the timeout recv, +* which hurt performance for the epoll_ctrl is frequently used. +* @see: SrsRtmpConn::playing +* @see: https://github.com/winlinvip/simple-rtmp-server/issues/217 +*/ +class SrsQueueRecvThread : virtual public ISrsMessageHandler, virtual public SrsRecvThread +{ +private: + std::vector queue; +public: + SrsQueueRecvThread(SrsRtmpServer* rtmp_sdk); + virtual ~SrsQueueRecvThread(); +public: + virtual bool empty(); + virtual int size(); + virtual SrsMessage* pump(); +public: + virtual bool can_handle(); + virtual int handle(SrsMessage* msg); +}; + #endif