From 472b1742a2cf7b204a213933f8345f3dfcffd888 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 1 Dec 2014 23:38:51 +0800 Subject: [PATCH] for bug #237, use isolate thread to recv message. 2.0.41 --- trunk/src/app/srs_app_recv_thread.cpp | 75 +++++++++ trunk/src/app/srs_app_recv_thread.hpp | 36 ++++- trunk/src/app/srs_app_rtmp_conn.cpp | 211 ++++++++++++-------------- trunk/src/app/srs_app_rtmp_conn.hpp | 7 +- trunk/src/core/srs_core.hpp | 2 +- 5 files changed, 209 insertions(+), 122 deletions(-) diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 317309627..c59be261f 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include ISrsMessageHandler::ISrsMessageHandler() { @@ -89,6 +90,11 @@ int SrsRecvThread::cycle() return ret; } +void SrsRecvThread::stop_loop() +{ + trd->stop_loop(); +} + void SrsRecvThread::on_thread_start() { // the multiple messages writev improve performance large, @@ -179,3 +185,72 @@ int SrsQueueRecvThread::handle(SrsMessage* msg) return ERROR_SUCCESS; } + +SrsPublishRecvThread::SrsPublishRecvThread( + SrsRtmpServer* rtmp_sdk, int timeout_ms, + SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge +): trd(this, rtmp_sdk, timeout_ms) +{ + _conn = conn; + _source = source; + _is_fmle = is_fmle; + _is_edge = is_edge; + + recv_error_code = ERROR_SUCCESS; + _nb_msgs = 0; +} + +SrsPublishRecvThread::~SrsPublishRecvThread() +{ + trd.stop(); +} + +int64_t SrsPublishRecvThread::nb_msgs() +{ + return _nb_msgs; +} + +int SrsPublishRecvThread::error_code() +{ + return recv_error_code; +} + +int SrsPublishRecvThread::start() +{ + return trd.start(); +} + +void SrsPublishRecvThread::stop() +{ + trd.stop(); +} + +bool SrsPublishRecvThread::can_handle() +{ + // publish thread always can handle message. + return true; +} + +int SrsPublishRecvThread::handle(SrsMessage* msg) +{ + int ret = ERROR_SUCCESS; + + _nb_msgs++; + + // the rtmp connection will handle this message, + // quit the thread loop when error. + recv_error_code = ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge); + + // when error, use stop loop to terminate the thread normally, + // for we are in the thread loop now, and should never use stop() to terminate it. + if (ret != ERROR_SUCCESS) { + trd.stop_loop(); + } + + // must always free it, + // the source will copy it if need to use. + srs_freep(msg); + + // TODO: FIXME: implements it. + return ret; +} diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 31852742e..b52b120a6 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -36,6 +36,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsRtmpServer; class SrsMessage; +class SrsRtmpConn; +class SrsSource; /** * for the recv thread to handle the message. @@ -76,6 +78,7 @@ public: virtual int start(); virtual void stop(); virtual int cycle(); + virtual void stop_loop(); public: virtual void on_thread_start(); virtual void on_thread_stop(); @@ -87,7 +90,7 @@ public: * @see: SrsRtmpConn::playing * @see: https://github.com/winlinvip/simple-rtmp-server/issues/217 */ -class SrsQueueRecvThread : virtual public ISrsMessageHandler +class SrsQueueRecvThread : public ISrsMessageHandler { private: std::vector queue; @@ -107,5 +110,36 @@ public: virtual int handle(SrsMessage* msg); }; +/** + * the publish recv thread got message and callback the source method to process message. +* @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 + */ +class SrsPublishRecvThread : public ISrsMessageHandler +{ +private: + SrsRecvThread trd; + // the msgs already got. + int64_t _nb_msgs; + // the recv thread error code. + int recv_error_code; + SrsRtmpConn* _conn; + SrsSource* _source; + bool _is_fmle; + bool _is_edge; +public: + SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms, + SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge); + virtual ~SrsPublishRecvThread(); +public: + virtual int64_t nb_msgs(); + virtual int error_code(); +public: + virtual int start(); + virtual void stop(); +public: + virtual bool can_handle(); + virtual int handle(SrsMessage* msg); +}; + #endif diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index f1ac4b548..2282ba049 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -644,8 +644,15 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source) return ret; } + // use isolate thread to recv, + // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 + SrsPublishRecvThread trd(rtmp, SRS_CONSTS_RTMP_RECV_TIMEOUT_US, this, source, true, vhost_is_edge); + srs_info("start to publish stream %s success", req->stream.c_str()); - ret = do_fmle_publishing(source); + ret = do_publishing(source, &trd); + + // stop isolate recv thread + trd.stop(); // when edge, notice edge to change state. // when origin, notice all service to unpublish. @@ -660,95 +667,26 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source) return ret; } -int SrsRtmpConn::do_fmle_publishing(SrsSource* source) -{ - int ret = ERROR_SUCCESS; - - if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { - srs_error("fmle check publish_refer failed. ret=%d", ret); - return ret; - } - srs_verbose("fmle check publish_refer success."); - - SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PUBLISH_USER); - - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - - // when edge, ignore the publish event, directly proxy it. - if (!vhost_is_edge) { - // notify the hls to prepare when publish start. - if ((ret = source->on_publish()) != ERROR_SUCCESS) { - srs_error("fmle hls on_publish failed. ret=%d", ret); - return ret; - } - srs_verbose("fmle hls on_publish success."); - } - - while (true) { - SrsMessage* msg = NULL; - if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { - srs_error("fmle recv identify client message failed. ret=%d", ret); - return ret; - } - - SrsAutoFree(SrsMessage, msg); - - pithy_print.elapse(); - - // reportable - if (pithy_print.can_print()) { - kbps->sample(); - srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH - " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", pithy_print.age(), - kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); - } - - // process UnPublish event. - if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { - SrsPacket* pkt = NULL; - if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) { - srs_error("fmle decode unpublish message failed. ret=%d", ret); - return ret; - } - - SrsAutoFree(SrsPacket, pkt); - - if (dynamic_cast(pkt)) { - SrsFMLEStartPacket* unpublish = dynamic_cast(pkt); - if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) { - return ret; - } - return ERROR_CONTROL_REPUBLISH; - } - - srs_trace("fmle ignore AMF0/AMF3 command message."); - continue; - } - - // video, audio, data message - if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) { - srs_error("fmle process publish message failed. ret=%d", ret); - return ret; - } - } - - return ret; -} - int SrsRtmpConn::flash_publishing(SrsSource* source) { int ret = ERROR_SUCCESS; - + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - + if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { srs_error("http hook on_publish failed. ret=%d", ret); return ret; } - - srs_info("flash start to publish stream %s success", req->stream.c_str()); - ret = do_flash_publishing(source); + + // use isolate thread to recv, + // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 + SrsPublishRecvThread trd(rtmp, SRS_CONSTS_RTMP_RECV_TIMEOUT_US, this, source, false, vhost_is_edge); + + srs_info("start to publish stream %s success", req->stream.c_str()); + ret = do_publishing(source, &trd); + + // stop isolate recv thread + trd.stop(); // when edge, notice edge to change state. // when origin, notice all service to unpublish. @@ -757,80 +695,117 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) } else { source->on_unpublish(); } - + http_hooks_on_unpublish(); - + return ret; } -int SrsRtmpConn::do_flash_publishing(SrsSource* source) +int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) { int ret = ERROR_SUCCESS; - + if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { - srs_error("flash check publish_refer failed. ret=%d", ret); + srs_error("check publish_refer failed. ret=%d", ret); return ret; } - srs_verbose("flash check publish_refer success."); - + srs_verbose("check publish_refer success."); + SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PUBLISH_USER); - + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - + // when edge, ignore the publish event, directly proxy it. if (!vhost_is_edge) { // notify the hls to prepare when publish start. if ((ret = source->on_publish()) != ERROR_SUCCESS) { - srs_error("flash hls on_publish failed. ret=%d", ret); + srs_error("hls on_publish failed. ret=%d", ret); return ret; } - srs_verbose("flash hls on_publish success."); + srs_verbose("hls on_publish success."); } - + + // start isolate recv thread. + if ((ret = trd->start()) != ERROR_SUCCESS) { + srs_error("start isolate recv thread failed. ret=%d", ret); + return ret; + } + + int64_t nb_msgs = 0; while (true) { - SrsMessage* msg = NULL; - if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("flash recv identify client message failed. ret=%d", ret); + // use small loop to check the error code, interval = 30s/100 = 300ms. + for (int i = 0; i < 100; i++) { + st_usleep(SRS_CONSTS_RTMP_RECV_TIMEOUT_US * 1000 / 100); + + // check the thread error code. + if ((ret = trd->error_code()) != ERROR_SUCCESS) { + return ret; } - return ret; } - SrsAutoFree(SrsMessage, msg); - + // when not got any messages, timeout. + if (trd->nb_msgs() <= nb_msgs) { + ret = ERROR_SOCKET_TIMEOUT; + srs_warn("publish timeout %"PRId64"us, nb_msgs=%"PRId64", ret=%d", + SRS_CONSTS_RTMP_RECV_TIMEOUT_US, nb_msgs, ret); + break; + } + nb_msgs = trd->nb_msgs(); + pithy_print.elapse(); // reportable if (pithy_print.can_print()) { kbps->sample(); - srs_trace("<- "SRS_CONSTS_LOG_WEB_PUBLISH - " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", - pithy_print.age(), + srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH + " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", pithy_print.age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); } + } + + return ret; +} + +int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsMessage* msg, bool is_fmle, bool vhost_is_edge) +{ + int ret = ERROR_SUCCESS; - // process UnPublish event. - if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { - SrsPacket* pkt = NULL; - if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) { - srs_error("flash decode unpublish message failed. ret=%d", ret); - return ret; - } - - SrsAutoFree(SrsPacket, pkt); - + // process publish event. + if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { + SrsPacket* pkt = NULL; + if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) { + srs_error("fmle decode unpublish message failed. ret=%d", ret); + return ret; + } + + SrsAutoFree(SrsPacket, pkt); + + // for flash, any packet is republish. + if (!is_fmle) { // flash unpublish. // TODO: maybe need to support republish. srs_trace("flash flash publish finished."); return ERROR_CONTROL_REPUBLISH; } - // video, audio, data message - if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) { - srs_error("flash process publish message failed. ret=%d", ret); - return ret; + // for fmle, drop others except the fmle start packet. + if (dynamic_cast(pkt)) { + SrsFMLEStartPacket* unpublish = dynamic_cast(pkt); + if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) { + return ret; + } + return ERROR_CONTROL_REPUBLISH; } + + srs_trace("fmle ignore AMF0/AMF3 command message."); + return ret; + } + + // video, audio, data message + if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) { + srs_error("fmle process publish message failed. ret=%d", ret); + return ret; } return ret; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 06dd91707..a10b9ecf6 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -50,12 +50,15 @@ class SrsKbps; class SrsRtmpClient; class SrsSharedPtrMessage; class SrsQueueRecvThread; +class SrsPublishRecvThread; /** * the client provides the main logic control for RTMP clients. */ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandler { + // for the thread to directly access any field of connection. + friend class SrsPublishRecvThread; private: SrsRequest* req; SrsResponse* res; @@ -91,9 +94,9 @@ private: virtual int playing(SrsSource* source); virtual int do_playing(SrsSource* source, SrsQueueRecvThread* trd); virtual int fmle_publishing(SrsSource* source); - virtual int do_fmle_publishing(SrsSource* source); virtual int flash_publishing(SrsSource* source); - virtual int do_flash_publishing(SrsSource* source); + virtual int do_publishing(SrsSource* source, SrsPublishRecvThread* trd); + virtual int handle_publish_message(SrsSource* source, SrsMessage* msg, bool is_fmle, bool vhost_is_edge); virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge); virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg); private: diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 266a47f9d..cb506c223 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 40 +#define VERSION_REVISION 41 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server"