From 2ed2513f08a21953605c900bf9b4bd2434a67485 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 29 May 2017 19:45:19 +0800 Subject: [PATCH] For #906, #902, use coroutine for one cycle thread --- trunk/src/app/srs_app_conn.cpp | 34 +++---------- trunk/src/app/srs_app_conn.hpp | 21 ++------ trunk/src/app/srs_app_http_api.cpp | 2 +- trunk/src/app/srs_app_http_conn.cpp | 2 +- trunk/src/app/srs_app_recv_thread.cpp | 4 +- trunk/src/app/srs_app_recv_thread.hpp | 4 +- trunk/src/app/srs_app_rtmp_conn.cpp | 46 +++++++++--------- trunk/src/app/srs_app_rtsp.cpp | 4 +- trunk/src/app/srs_app_rtsp.hpp | 4 +- trunk/src/app/srs_app_thread.cpp | 70 --------------------------- trunk/src/app/srs_app_thread.hpp | 65 ------------------------- 11 files changed, 44 insertions(+), 212 deletions(-) diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index fe95274d3..340a3aaa9 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -32,23 +32,16 @@ using namespace std; SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c, string cip) { - id = 0; manager = cm; stfd = c; ip = cip; - disposed = false; - expired = false; create_time = srs_get_system_time_ms(); skt = new SrsStSocket(); kbps = new SrsKbps(); kbps->set_io(skt, skt); - // the client thread should reap itself, - // so we never use joinable. - // TODO: FIXME: maybe other thread need to stop it. - // @see: https://github.com/ossrs/srs/issues/78 - pthread = new SrsOneCycleThread("conn", this); + trd = new SrsCoroutine("conn", this); } SrsConnection::~SrsConnection() @@ -57,7 +50,9 @@ SrsConnection::~SrsConnection() srs_freep(kbps); srs_freep(skt); - srs_freep(pthread); + srs_freep(trd); + + srs_close_stfd(stfd); } void SrsConnection::resample() @@ -82,17 +77,7 @@ void SrsConnection::cleanup() void SrsConnection::dispose() { - if (disposed) { - return; - } - - disposed = true; - - /** - * when delete the connection, stop the connection, - * close the underlayer socket, delete the thread. - */ - srs_close_stfd(stfd); + trd->interrupt(); } int SrsConnection::start() @@ -103,16 +88,13 @@ int SrsConnection::start() return ret; } - return pthread->start(); + return trd->start(); } int SrsConnection::cycle() { int ret = ERROR_SUCCESS; - _srs_context->generate_id(); - id = _srs_context->get_id(); - int oret = ret = do_cycle(); // if socket io error, set to closed. @@ -138,12 +120,12 @@ int SrsConnection::cycle() int SrsConnection::srs_id() { - return id; + return trd->cid(); } void SrsConnection::expire() { - expired = true; + trd->interrupt(); } diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index ae5c729e9..00b447a68 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -39,20 +39,15 @@ * all connections accept from listener must extends from this base class, * server will add the connection to manager, and delete it when remove. */ -class SrsConnection : virtual public ISrsConnection, virtual public ISrsOneCycleThreadHandler +class SrsConnection : virtual public ISrsConnection, virtual public ISrsCoroutineHandler , virtual public IKbpsDelta, virtual public ISrsReloadHandler { -private: +protected: /** * each connection start a green thread, * when thread stop, the connection will be delete by server. */ - SrsOneCycleThread* pthread; - /** - * the id of connection. - */ - int id; -protected: + SrsCoroutine* trd; /** * the manager object to manage the connection. */ @@ -65,16 +60,6 @@ protected: * the ip of client. */ std::string ip; - /** - * whether the connection is disposed, - * when disposed, connection should stop cycle and cleanup itself. - */ - bool disposed; - /** - * whether connection is expired, application definition. - * when expired, the connection must never be served and quit ASAP. - */ - bool expired; /** * the underlayer socket. */ diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 6ca5e5fed..c4a654cb5 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1353,7 +1353,7 @@ int SrsHttpApi::do_cycle() } // process http messages. - while(!disposed) { + while(!trd->pull()) { ISrsHttpMessage* req = NULL; // get a http message diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 70f32898e..89b065206 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -121,7 +121,7 @@ int SrsHttpConn::do_cycle() } // process http messages. - while (!disposed) { + while (!trd->pull()) { ISrsHttpMessage* req = NULL; // get a http message diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index cea1f050c..16e7da8b7 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -536,7 +536,7 @@ SrsHttpRecvThread::SrsHttpRecvThread(SrsResponseOnlyHttpConn* c) { conn = c; error = ERROR_SUCCESS; - trd = new SrsOneCycleThread("http-receive", this); + trd = new SrsCoroutine("http-receive", this, _srs_context->get_id()); } SrsHttpRecvThread::~SrsHttpRecvThread() @@ -558,7 +558,7 @@ int SrsHttpRecvThread::cycle() { int ret = ERROR_SUCCESS; - while (true) { + while (!trd->pull()) { ISrsHttpMessage* req = NULL; SrsAutoFree(ISrsHttpMessage, req); diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 98f7a3bb6..77f35028c 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -229,11 +229,11 @@ private: * when client closed the request, to avoid FD leak. * @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427 */ -class SrsHttpRecvThread : public ISrsOneCycleThreadHandler +class SrsHttpRecvThread : public ISrsCoroutineHandler { private: SrsResponseOnlyHttpConn* conn; - SrsOneCycleThread* trd; + SrsCoroutine* trd; int error; public: SrsHttpRecvThread(SrsResponseOnlyHttpConn* c); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 97c9b7d17..8c827352f 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -450,7 +450,7 @@ int SrsRtmpConn::service_cycle() } srs_verbose("on_bw_done success"); - while (!disposed) { + while (!trd->pull()) { ret = stream_service_cycle(); // stream service must terminated with error, never success. @@ -692,7 +692,7 @@ int SrsRtmpConn::playing(SrsSource* source) return ret; } -int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd) +int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* rtrd) { int ret = ERROR_SUCCESS; @@ -730,12 +730,12 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d", send_min_interval, mw_sleep, mw_enabled, realtime, tcp_nodelay); - while (!disposed) { + while (true) { // collect elapse for pithy print. pprint->elapse(); // when source is set to expired, disconnect it. - if (expired) { + if (trd->pull()) { ret = ERROR_USER_DISCONNECT; srs_error("connection expired. ret=%d", ret); return ret; @@ -744,8 +744,8 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe // to use isolate thread to recv, can improve about 33% performance. // @see: https://github.com/ossrs/srs/issues/196 // @see: https://github.com/ossrs/srs/issues/217 - while (!trd->empty()) { - SrsCommonMessage* msg = trd->pump(); + while (!rtrd->empty()) { + SrsCommonMessage* msg = rtrd->pump(); srs_verbose("pump client message to process."); if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) { @@ -757,7 +757,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe } // quit when recv thread error. - if ((ret = trd->error_code()) != ERROR_SUCCESS) { + if ((ret = rtrd->error_code()) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) { srs_error("recv thread failed. ret=%d", ret); } @@ -893,13 +893,13 @@ int SrsRtmpConn::publishing(SrsSource* source) if ((ret = acquire_publish(source)) == ERROR_SUCCESS) { // use isolate thread to recv, // @see: https://github.com/ossrs/srs/issues/237 - SrsPublishRecvThread trd(rtmp, req, st_netfd_fileno(stfd), 0, this, source); + SrsPublishRecvThread rtrd(rtmp, req, st_netfd_fileno(stfd), 0, this, source); srs_info("start to publish stream %s success", req->stream.c_str()); - ret = do_publishing(source, &trd); + ret = do_publishing(source, &rtrd); // stop isolate recv thread - trd.stop(); + rtrd.stop(); } // whatever the acquire publish, always release publish. @@ -916,7 +916,7 @@ int SrsRtmpConn::publishing(SrsSource* source) return ret; } -int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) +int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* rtrd) { int ret = ERROR_SUCCESS; @@ -925,15 +925,15 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) SrsAutoFree(SrsPithyPrint, pprint); // start isolate recv thread. - if ((ret = trd->start()) != ERROR_SUCCESS) { + if ((ret = rtrd->start()) != ERROR_SUCCESS) { srs_error("start isolate recv thread failed. ret=%d", ret); return ret; } // change the isolate recv thread context id, // merge its log to current thread. - int receive_thread_cid = trd->get_cid(); - trd->set_cid(_srs_context->get_id()); + int receive_thread_cid = rtrd->get_cid(); + rtrd->set_cid(_srs_context->get_id()); // initialize the publish timeout. publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); @@ -951,11 +951,11 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) int64_t nb_msgs = 0; uint64_t nb_frames = 0; - while (!disposed) { + while (true) { pprint->elapse(); // when source is set to expired, disconnect it. - if (expired) { + if (trd->pull()) { ret = ERROR_USER_DISCONNECT; srs_error("connection expired. ret=%d", ret); return ret; @@ -965,13 +965,13 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) if (nb_msgs == 0) { // when not got msgs, wait for a larger timeout. // @see https://github.com/ossrs/srs/issues/441 - trd->wait(publish_1stpkt_timeout); + rtrd->wait(publish_1stpkt_timeout); } else { - trd->wait(publish_normal_timeout); + rtrd->wait(publish_normal_timeout); } // check the thread error code. - if ((ret = trd->error_code()) != ERROR_SUCCESS) { + if ((ret = rtrd->error_code()) != ERROR_SUCCESS) { if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) { srs_error("recv thread failed. ret=%d", ret); } @@ -979,21 +979,21 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) } // when not got any messages, timeout. - if (trd->nb_msgs() <= nb_msgs) { + if (rtrd->nb_msgs() <= nb_msgs) { ret = ERROR_SOCKET_TIMEOUT; srs_warn("publish timeout %dms, nb_msgs=%" PRId64 ", ret=%d", nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret); break; } - nb_msgs = trd->nb_msgs(); + nb_msgs = rtrd->nb_msgs(); // Update the stat for video fps. // @remark https://github.com/ossrs/srs/issues/851 SrsStatistic* stat = SrsStatistic::instance(); - if ((ret = stat->on_video_frames(req, (int)(trd->nb_video_frames() - nb_frames))) != ERROR_SUCCESS) { + if ((ret = stat->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != ERROR_SUCCESS) { return ret; } - nb_frames = trd->nb_video_frames(); + nb_frames = rtrd->nb_video_frames(); // reportable if (pprint->can_print()) { diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 82b126432..c2fdfedeb 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -195,7 +195,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) stfd = fd; skt = new SrsStSocket(); rtsp = new SrsRtspStack(skt); - trd = new SrsOneCycleThread("rtsp", this); + trd = new SrsCoroutine("rtsp", this); req = NULL; sdk = NULL; @@ -249,7 +249,7 @@ int SrsRtspConn::do_cycle() srs_trace("rtsp: serve %s", ip.c_str()); // consume all rtsp messages. - for (;;) { + while (!trd->pull()) { SrsRtspRequest* req = NULL; if ((ret = rtsp->recv_message(&req)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 035a47397..e40aeee84 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -110,7 +110,7 @@ public: /** * the rtsp connection serve the fd. */ -class SrsRtspConn : public ISrsOneCycleThreadHandler +class SrsRtspConn : public ISrsCoroutineHandler { private: std::string output_template; @@ -133,7 +133,7 @@ private: SrsStSocket* skt; SrsRtspStack* rtsp; SrsRtspCaster* caster; - SrsOneCycleThread* trd; + SrsCoroutine* trd; private: SrsRequest* req; SrsSimpleRtmpClient* sdk; diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index 0782b11ed..1fa5668f6 100755 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -78,76 +78,6 @@ void SrsCoroutineManager::clear() } } -ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler() -{ -} - -ISrsOneCycleThreadHandler::~ISrsOneCycleThreadHandler() -{ -} - -void ISrsOneCycleThreadHandler::on_thread_start() -{ -} - -int ISrsOneCycleThreadHandler::on_before_cycle() -{ - return ERROR_SUCCESS; -} - -int ISrsOneCycleThreadHandler::on_end_cycle() -{ - return ERROR_SUCCESS; -} - -void ISrsOneCycleThreadHandler::on_thread_stop() -{ -} - -SrsOneCycleThread::SrsOneCycleThread(const char* n, ISrsOneCycleThreadHandler* h) -{ - handler = h; - pthread = new internal::SrsThread(n, this, 0, false); -} - -SrsOneCycleThread::~SrsOneCycleThread() -{ - pthread->stop(); - srs_freep(pthread); -} - -int SrsOneCycleThread::start() -{ - return pthread->start(); -} - -int SrsOneCycleThread::cycle() -{ - int ret = handler->cycle(); - pthread->stop_loop(); - return ret; -} - -void SrsOneCycleThread::on_thread_start() -{ - handler->on_thread_start(); -} - -int SrsOneCycleThread::on_before_cycle() -{ - return handler->on_before_cycle(); -} - -int SrsOneCycleThread::on_end_cycle() -{ - return handler->on_end_cycle(); -} - -void SrsOneCycleThread::on_thread_stop() -{ - handler->on_thread_stop(); -} - ISrsReusableThreadHandler::ISrsReusableThreadHandler() { } diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index 44487e9b1..85feabf8f 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -58,71 +58,6 @@ private: void clear(); }; -/** - * the one cycle thread is a thread do the cycle only one time, - * that is, the thread will quit when return from the cycle. - * user can create thread which stop itself, - * generally only need to provides a start method, - * the object will destroy itself then terminate the thread, @see SrsConnection - * 1. create SrsThread field - * 2. the thread quit when return from cycle. - * for example: - * class SrsConnection : public ISrsOneCycleThreadHandler { - * public: SrsConnection() { pthread = new SrsOneCycleThread("conn", this); } - * public: virtual int start() { return pthread->start(); } - * public: virtual int cycle() { - * // serve client. - * // set loop to stop to quit, stop thread itself. - * pthread->stop_loop(); - * } - * public: virtual void on_thread_stop() { - * // remove the connection in thread itself. - * server->remove(this); - * } - * }; - */ -class ISrsOneCycleThreadHandler -{ -public: - ISrsOneCycleThreadHandler(); - virtual ~ISrsOneCycleThreadHandler(); -public: - /** - * the cycle method for the one cycle thread. - */ - virtual int cycle() = 0; -public: - /** - * other callback for handler. - * @remark all callback is optional, handler can ignore it. - */ - virtual void on_thread_start(); - virtual int on_before_cycle(); - virtual int on_end_cycle(); - virtual void on_thread_stop(); -}; -class SrsOneCycleThread : public internal::ISrsThreadHandler -{ -private: - internal::SrsThread* pthread; - ISrsOneCycleThreadHandler* handler; -public: - SrsOneCycleThread(const char* n, ISrsOneCycleThreadHandler* h); - virtual ~SrsOneCycleThread(); -public: - /** - * for the one cycle thread, quit when cycle return. - */ - virtual int start(); -// interface internal::ISrsThreadHandler -public: - virtual int cycle(); - virtual void on_thread_start(); - virtual int on_before_cycle(); - virtual int on_end_cycle(); - virtual void on_thread_stop(); -}; - /** * the reuse thread is a thread stop and start by other thread. * user can create thread and stop then start again and again,