From e5f449ce36f333f88e440b42bd77233ec37bf35e Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 23 May 2015 09:20:16 +0800 Subject: [PATCH] refine the thread to three category. --- trunk/src/app/srs_app_async_call.cpp | 3 +- trunk/src/app/srs_app_async_call.hpp | 6 +- trunk/src/app/srs_app_conn.cpp | 18 +- trunk/src/app/srs_app_conn.hpp | 12 +- trunk/src/app/srs_app_edge.cpp | 8 +- trunk/src/app/srs_app_edge.hpp | 12 +- trunk/src/app/srs_app_encoder.cpp | 2 +- trunk/src/app/srs_app_encoder.hpp | 6 +- trunk/src/app/srs_app_forward.cpp | 4 +- trunk/src/app/srs_app_forward.hpp | 6 +- trunk/src/app/srs_app_http_conn.cpp | 3 +- trunk/src/app/srs_app_http_conn.hpp | 8 +- trunk/src/app/srs_app_ingest.cpp | 2 +- trunk/src/app/srs_app_ingest.hpp | 6 +- trunk/src/app/srs_app_listener.cpp | 6 +- trunk/src/app/srs_app_listener.hpp | 12 +- trunk/src/app/srs_app_log.hpp | 2 +- trunk/src/app/srs_app_recv_thread.cpp | 8 +- trunk/src/app/srs_app_recv_thread.hpp | 4 +- trunk/src/app/srs_app_rtsp.cpp | 18 +- trunk/src/app/srs_app_rtsp.hpp | 8 +- trunk/src/app/srs_app_server.cpp | 3 +- trunk/src/app/srs_app_server.hpp | 6 +- trunk/src/app/srs_app_thread.cpp | 489 ++++++++++++++++---------- trunk/src/app/srs_app_thread.hpp | 412 ++++++++++++++-------- 25 files changed, 648 insertions(+), 416 deletions(-) diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp index cacc94b9d..28968de71 100644 --- a/trunk/src/app/srs_app_async_call.cpp +++ b/trunk/src/app/srs_app_async_call.cpp @@ -41,12 +41,11 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask() SrsAsyncCallWorker::SrsAsyncCallWorker() { - pthread = new SrsThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US, true); + pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US); } SrsAsyncCallWorker::~SrsAsyncCallWorker() { - stop(); srs_freep(pthread); std::vector::iterator it; diff --git a/trunk/src/app/srs_app_async_call.hpp b/trunk/src/app/srs_app_async_call.hpp index 366ccd47c..42f334ff8 100644 --- a/trunk/src/app/srs_app_async_call.hpp +++ b/trunk/src/app/srs_app_async_call.hpp @@ -57,10 +57,10 @@ public: * when worker call with the task, the worker will do it in isolate thread. * that is, the task is execute/call in async mode. */ -class SrsAsyncCallWorker : public ISrsThreadHandler +class SrsAsyncCallWorker : public ISrsReusableThreadHandler { private: - SrsThread* pthread; + SrsReusableThread* pthread; std::vector tasks; public: SrsAsyncCallWorker(); @@ -70,6 +70,8 @@ public: public: virtual int start(); virtual void stop(); +// interface ISrsReusableThreadHandler +public: virtual int cycle(); }; diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 4c483ba1c..a03a17489 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -45,12 +45,17 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c) // so we never use joinable. // TODO: FIXME: maybe other thread need to stop it. // @see: https://github.com/simple-rtmp-server/srs/issues/78 - pthread = new SrsThread("conn", this, 0, false); + pthread = new SrsOneCycleThread("conn", this); } SrsConnection::~SrsConnection() { - stop(); + /** + * when delete the connection, stop the connection, + * close the underlayer socket, delete the thread. + */ + srs_close_stfd(stfd); + srs_freep(pthread); } int SrsConnection::start() @@ -83,9 +88,6 @@ int SrsConnection::cycle() if (ret == ERROR_SOCKET_CLOSED) { srs_warn("client disconnect peer. ret=%d", ret); } - - // set loop to stop to quit. - pthread->stop_loop(); return ERROR_SUCCESS; } @@ -101,10 +103,4 @@ int SrsConnection::srs_id() return id; } -void SrsConnection::stop() -{ - srs_close_stfd(stfd); - srs_freep(pthread); -} - diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 29a6eace7..e00add8ce 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -58,14 +58,14 @@ public: * all connections accept from listener must extends from this base class, * server will add the connection to manager, and delete it when remove. */ -class SrsConnection : public virtual ISrsThreadHandler, public virtual IKbpsDelta +class SrsConnection : public virtual ISrsOneCycleThreadHandler, public virtual IKbpsDelta { private: /** * each connection start a green thread, * when thread stop, the connection will be delete by server. */ - SrsThread* pthread; + SrsOneCycleThread* pthread; /** * the id of connection. */ @@ -97,6 +97,8 @@ public: * to remove the client by server->remove(this). */ virtual int start(); +// interface ISrsOneCycleThreadHandler +public: /** * the thread cycle function, * when serve connection completed, terminate the loop which will terminate the thread, @@ -119,12 +121,6 @@ protected: * for concrete connection to do the cycle. */ virtual int do_cycle() = 0; -private: - /** - * when delete the connection, stop the connection, - * close the underlayer socket, delete the thread. - */ - virtual void stop(); }; #endif diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 3a8483be9..57dfa5cb6 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -70,7 +70,7 @@ SrsEdgeIngester::SrsEdgeIngester() origin_index = 0; stream_id = 0; stfd = NULL; - pthread = new SrsThread("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US, true); + pthread = new SrsReusableThread("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); } SrsEdgeIngester::~SrsEdgeIngester() @@ -171,7 +171,7 @@ int SrsEdgeIngester::ingest() SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); SrsAutoFree(SrsPithyPrint, pprint); - while (pthread->can_loop()) { + while (!pthread->interrupted()) { pprint->elapse(); // pithy print @@ -397,7 +397,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() origin_index = 0; stream_id = 0; stfd = NULL; - pthread = new SrsThread("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US, true); + pthread = new SrsReusableThread("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); queue = new SrsMessageQueue(); send_error_code = ERROR_SUCCESS; } @@ -489,7 +489,7 @@ int SrsEdgeForwarder::cycle() SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS); - while (pthread->can_loop()) { + while (!pthread->interrupted()) { if (send_error_code != ERROR_SUCCESS) { st_usleep(SRS_EDGE_FORWARDER_ERROR_US); continue; diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 99147e62c..a2d4fd7cc 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -75,7 +75,7 @@ enum SrsEdgeUserState /** * edge used to ingest stream from origin. */ -class SrsEdgeIngester : public ISrsThreadHandler +class SrsEdgeIngester : public ISrsReusableThreadHandler { private: int stream_id; @@ -83,7 +83,7 @@ private: SrsSource* _source; SrsPlayEdge* _edge; SrsRequest* _req; - SrsThread* pthread; + SrsReusableThread* pthread; st_netfd_t stfd; ISrsProtocolReaderWriter* io; SrsKbps* kbps; @@ -96,7 +96,7 @@ public: virtual int initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req); virtual int start(); virtual void stop(); -// interface ISrsThreadHandler +// interface ISrsReusableThreadHandler public: virtual int cycle(); private: @@ -110,7 +110,7 @@ private: /** * edge used to forward stream to origin. */ -class SrsEdgeForwarder : public ISrsThreadHandler +class SrsEdgeForwarder : public ISrsReusableThreadHandler { private: int stream_id; @@ -118,7 +118,7 @@ private: SrsSource* _source; SrsPublishEdge* _edge; SrsRequest* _req; - SrsThread* pthread; + SrsReusableThread* pthread; st_netfd_t stfd; ISrsProtocolReaderWriter* io; SrsKbps* kbps; @@ -144,7 +144,7 @@ public: virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); virtual int start(); virtual void stop(); -// interface ISrsThreadHandler +// interface ISrsReusableThreadHandler public: virtual int cycle(); public: diff --git a/trunk/src/app/srs_app_encoder.cpp b/trunk/src/app/srs_app_encoder.cpp index 3bcbc230c..a64c81c6d 100644 --- a/trunk/src/app/srs_app_encoder.cpp +++ b/trunk/src/app/srs_app_encoder.cpp @@ -44,7 +44,7 @@ static std::vector _transcoded_url; SrsEncoder::SrsEncoder() { - pthread = new SrsThread("encoder", this, SRS_RTMP_ENCODER_SLEEP_US, true); + pthread = new SrsReusableThread("encoder", this, SRS_RTMP_ENCODER_SLEEP_US); pprint = SrsPithyPrint::create_encoder(); } diff --git a/trunk/src/app/srs_app_encoder.hpp b/trunk/src/app/srs_app_encoder.hpp index f728e7579..557c29201 100644 --- a/trunk/src/app/srs_app_encoder.hpp +++ b/trunk/src/app/srs_app_encoder.hpp @@ -45,13 +45,13 @@ class SrsFFMPEG; * the encoder for a stream, * may use multiple ffmpegs to transcode the specified stream. */ -class SrsEncoder : public ISrsThreadHandler +class SrsEncoder : public ISrsReusableThreadHandler { private: std::string input_stream_name; std::vector ffmpegs; private: - SrsThread* pthread; + SrsReusableThread* pthread; SrsPithyPrint* pprint; public: SrsEncoder(); @@ -59,7 +59,7 @@ public: public: virtual int on_publish(SrsRequest* req); virtual void on_unpublish(); -// interface ISrsThreadHandler. +// interface ISrsReusableThreadHandler. public: virtual int cycle(); virtual void on_thread_stop(); diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 1d77d8854..986799057 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -59,7 +59,7 @@ SrsForwarder::SrsForwarder(SrsSource* _source) kbps = new SrsKbps(); stream_id = 0; - pthread = new SrsThread("forward", this, SRS_FORWARDER_SLEEP_US, true); + pthread = new SrsReusableThread("forward", this, SRS_FORWARDER_SLEEP_US); queue = new SrsMessageQueue(); jitter = new SrsRtmpJitter(); @@ -407,7 +407,7 @@ int SrsForwarder::forward() } } - while (pthread->can_loop()) { + while (!pthread->interrupted()) { pprint->elapse(); // read from client. diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index dbad1c827..f57060fa4 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -48,7 +48,7 @@ class SrsKbps; * forward the stream to other servers. */ // TODO: FIXME: refine the error log, comments it. -class SrsForwarder : public ISrsThreadHandler +class SrsForwarder : public ISrsReusableThreadHandler { private: // the ep to forward, server[:port]. @@ -57,7 +57,7 @@ private: int stream_id; private: st_netfd_t stfd; - SrsThread* pthread; + SrsReusableThread* pthread; private: SrsSource* source; ISrsProtocolReaderWriter* io; @@ -95,7 +95,7 @@ public: * @param shared_video, directly ptr, copy it if need to save it. */ virtual int on_video(SrsSharedPtrMessage* shared_video); -// interface ISrsThreadHandler. +// interface ISrsReusableThreadHandler. public: virtual int cycle(); private: diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index bee0e70b3..90dd35fbd 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -1157,12 +1157,11 @@ SrsStreamCache::SrsStreamCache(SrsSource* s, SrsRequest* r) req = r->copy(); source = s; queue = new SrsMessageQueue(true); - pthread = new SrsThread("http-stream", this, 0, false); + pthread = new SrsEndlessThread("http-stream", this); } SrsStreamCache::~SrsStreamCache() { - pthread->stop(); srs_freep(pthread); srs_freep(queue); diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index dbc1a3c01..891c744b3 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -386,20 +386,20 @@ protected: * for example, the audio stream cache to make android(weixin) happy. * we start a thread to shrink the queue. */ -class SrsStreamCache : public ISrsThreadHandler +class SrsStreamCache : public ISrsEndlessThreadHandler { private: SrsMessageQueue* queue; SrsSource* source; SrsRequest* req; - SrsThread* pthread; + SrsEndlessThread* pthread; public: SrsStreamCache(SrsSource* s, SrsRequest* r); virtual ~SrsStreamCache(); public: virtual int start(); virtual int dump_cache(SrsConsumer* consumer); -// interface ISrsThreadHandler. +// interface ISrsEndlessThreadHandler. public: virtual int cycle(); }; @@ -669,7 +669,7 @@ public: virtual int hls_update_m3u8(SrsRequest* r, std::string m3u8); virtual int hls_update_ts(SrsRequest* r, std::string uri, std::string ts); virtual void unmount_hls(SrsRequest* r); -// interface ISrsThreadHandler. +// interface ISrsReloadHandler. public: virtual int on_reload_vhost_http_updated(); virtual int on_reload_vhost_http_remux_updated(); diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index c378a6195..c2c623439 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -55,7 +55,7 @@ SrsIngester::SrsIngester() { _srs_config->subscribe(this); - pthread = new SrsThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US, true); + pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US); pprint = SrsPithyPrint::create_ingester(); } diff --git a/trunk/src/app/srs_app_ingest.hpp b/trunk/src/app/srs_app_ingest.hpp index acdeb7d8c..7a14a8dac 100644 --- a/trunk/src/app/srs_app_ingest.hpp +++ b/trunk/src/app/srs_app_ingest.hpp @@ -59,12 +59,12 @@ public: * encode with FFMPEG(optional), * push to SRS(or any RTMP server) over RTMP. */ -class SrsIngester : public ISrsThreadHandler, public ISrsReloadHandler +class SrsIngester : public ISrsReusableThreadHandler, public ISrsReloadHandler { private: std::vector ingesters; private: - SrsThread* pthread; + SrsReusableThread* pthread; SrsPithyPrint* pprint; public: SrsIngester(); @@ -72,7 +72,7 @@ public: public: virtual int start(); virtual void stop(); -// interface ISrsThreadHandler. +// interface ISrsReusableThreadHandler. public: virtual int cycle(); virtual void on_thread_stop(); diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 915c95c23..bc6626d11 100644 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -79,7 +79,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p) nb_buf = SRS_UDP_MAX_PACKET_SIZE; buf = new char[nb_buf]; - pthread = new SrsThread("udp", this, 0, true); + pthread = new SrsReusableThread("udp", this); } SrsUdpListener::~SrsUdpListener() @@ -157,7 +157,7 @@ int SrsUdpListener::cycle() { int ret = ERROR_SUCCESS; - while (pthread->can_loop()) { + while (!pthread->interrupted()) { // TODO: FIXME: support ipv6, @see man 7 ipv6 sockaddr_in from; int nb_from = sizeof(sockaddr_in); @@ -190,7 +190,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p) _fd = -1; _stfd = NULL; - pthread = new SrsThread("tcp", this, 0, true); + pthread = new SrsReusableThread("tcp", this); } SrsTcpListener::~SrsTcpListener() diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 371b5acba..d5d97eede 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -82,12 +82,12 @@ public: /** * bind udp port, start thread to recv packet and handler it. */ -class SrsUdpListener : public ISrsThreadHandler +class SrsUdpListener : public ISrsReusableThreadHandler { private: int _fd; st_netfd_t _stfd; - SrsThread* pthread; + SrsReusableThread* pthread; private: char* buf; int nb_buf; @@ -103,7 +103,7 @@ public: virtual st_netfd_t stfd(); public: virtual int listen(); -// interface ISrsThreadHandler. +// interface ISrsReusableThreadHandler. public: virtual int cycle(); }; @@ -111,12 +111,12 @@ public: /** * bind and listen tcp port, use handler to process the client. */ -class SrsTcpListener : public ISrsThreadHandler +class SrsTcpListener : public ISrsReusableThreadHandler { private: int _fd; st_netfd_t _stfd; - SrsThread* pthread; + SrsReusableThread* pthread; private: ISrsTcpHandler* handler; std::string ip; @@ -128,7 +128,7 @@ public: virtual int fd(); public: virtual int listen(); -// interface ISrsThreadHandler. +// interface ISrsReusableThreadHandler. public: virtual int cycle(); }; diff --git a/trunk/src/app/srs_app_log.hpp b/trunk/src/app/srs_app_log.hpp index cdb86c8db..9645079fa 100644 --- a/trunk/src/app/srs_app_log.hpp +++ b/trunk/src/app/srs_app_log.hpp @@ -82,7 +82,7 @@ public: virtual void trace(const char* tag, int context_id, const char* fmt, ...); virtual void warn(const char* tag, int context_id, const char* fmt, ...); virtual void error(const char* tag, int context_id, const char* fmt, ...); -// interface ISrsThreadHandler. +// interface ISrsReloadHandler. public: virtual int on_reload_log_tank(); virtual int on_reload_log_level(); diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 537e3dbf1..05352e16f 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -50,7 +50,7 @@ SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtm timeout = timeout_ms; handler = msg_handler; rtmp = rtmp_sdk; - trd = new SrsThread("recv", this, 0, true); + trd = new SrsReusableThread("recv", this); } SrsRecvThread::~SrsRecvThread() @@ -76,7 +76,7 @@ int SrsRecvThread::cycle() { int ret = ERROR_SUCCESS; - while (trd->can_loop()) { + while (!trd->interrupted()) { if (!handler->can_handle()) { st_usleep(timeout * 1000); continue; @@ -96,7 +96,7 @@ int SrsRecvThread::cycle() } // we use no timeout to recv, should never got any error. - trd->stop_loop(); + trd->interrupt(); // notice the handler got a recv error. handler->on_recv_error(ret); @@ -111,7 +111,7 @@ int SrsRecvThread::cycle() void SrsRecvThread::stop_loop() { - trd->stop_loop(); + trd->interrupt(); } void SrsRecvThread::on_thread_start() diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index f6ba086df..e99ca1666 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -79,10 +79,10 @@ public: /** * the recv thread, use message handler to handle each received message. */ -class SrsRecvThread : public ISrsThreadHandler +class SrsRecvThread : public ISrsReusableThreadHandler { protected: - SrsThread* trd; + SrsReusableThread* trd; ISrsMessageHandler* handler; SrsRtmpServer* rtmp; int timeout; diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index 5fe5fb5d6..bc961bd51 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -192,7 +192,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) stfd = fd; skt = new SrsStSocket(fd); rtsp = new SrsRtspStack(skt); - trd = new SrsThread("rtsp", this, 0, false); + trd = new SrsOneCycleThread("rtsp", this); req = NULL; io = NULL; @@ -210,7 +210,6 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) SrsRtspConn::~SrsRtspConn() { srs_close_stfd(stfd); - trd->stop(); srs_freep(video_rtp); srs_freep(audio_rtp); @@ -219,7 +218,9 @@ SrsRtspConn::~SrsRtspConn() srs_freep(skt); srs_freep(rtsp); - close(); + srs_freep(client); + srs_freep(io); + srs_freep(req); srs_freep(vjitter); srs_freep(ajitter); @@ -412,9 +413,6 @@ int SrsRtspConn::cycle() srs_warn("client disconnect peer. ret=%d", ret); } - // terminate thread in the thread cycle itself. - trd->stop_loop(); - return ERROR_SUCCESS; } @@ -763,14 +761,6 @@ int SrsRtspConn::connect_app(string ep_server, string ep_port) return ret; } -void SrsRtspConn::close() -{ - srs_freep(client); - srs_freep(io); - srs_freep(req); - srs_close_stfd(stfd); -} - SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c) { // TODO: FIXME: support reload. diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 86397c033..cb25dbdc4 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -113,7 +113,7 @@ public: /** * the rtsp connection serve the fd. */ -class SrsRtspConn : public ISrsThreadHandler +class SrsRtspConn : public ISrsOneCycleThreadHandler { private: std::string output_template; @@ -136,7 +136,7 @@ private: SrsStSocket* skt; SrsRtspStack* rtsp; SrsRtspCaster* caster; - SrsThread* trd; + SrsOneCycleThread* trd; private: SrsRequest* req; SrsStSocket* io; @@ -163,7 +163,7 @@ private: // internal methods public: virtual int on_rtp_packet(SrsRtpPacket* pkt, int stream_id); -// interface ISrsThreadHandler +// interface ISrsOneCycleThreadHandler public: virtual int cycle(); virtual void on_thread_stop(); @@ -182,8 +182,6 @@ private: // @remark ignore when not connected, reconnect when disconnected. virtual int connect(); virtual int connect_app(std::string ep_server, std::string ep_port); - // close the connected io and rtmp to ready to be re-connect. - virtual void close(); }; /** diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 7e2c0809a..5634db625 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -367,13 +367,12 @@ SrsSignalManager::SrsSignalManager(SrsServer* server) _server = server; sig_pipe[0] = sig_pipe[1] = -1; - pthread = new SrsThread("signal", this, 0, true); + pthread = new SrsEndlessThread("signal", this); signal_read_stfd = NULL; } SrsSignalManager::~SrsSignalManager() { - pthread->stop(); srs_freep(pthread); srs_close_stfd(signal_read_stfd); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 76b681484..8dbcb47f1 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -179,7 +179,7 @@ public: * convert signal to io, * @see: st-1.9/docs/notes.html */ -class SrsSignalManager : public ISrsThreadHandler +class SrsSignalManager : public ISrsEndlessThreadHandler { private: /* Per-process pipe which is used as a signal queue. */ @@ -188,14 +188,14 @@ private: st_netfd_t signal_read_stfd; private: SrsServer* _server; - SrsThread* pthread; + SrsEndlessThread* pthread; public: SrsSignalManager(SrsServer* server); virtual ~SrsSignalManager(); public: virtual int initialize(); virtual int start(); -// interface ISrsThreadHandler. +// interface ISrsEndlessThreadHandler. public: virtual int cycle(); private: diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index a7e89c85e..c4a004a96 100644 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -26,206 +26,337 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -ISrsThreadHandler::ISrsThreadHandler() -{ -} - -ISrsThreadHandler::~ISrsThreadHandler() -{ -} - -void ISrsThreadHandler::on_thread_start() -{ -} - -int ISrsThreadHandler::on_before_cycle() -{ - int ret = ERROR_SUCCESS; - return ret; -} - -int ISrsThreadHandler::on_end_cycle() -{ - int ret = ERROR_SUCCESS; - return ret; -} - -void ISrsThreadHandler::on_thread_stop() -{ -} - -SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable) -{ - _name = name; - handler = thread_handler; - cycle_interval_us = interval_us; +namespace internal { + ISrsThreadHandler::ISrsThreadHandler() + { + } - tid = NULL; - loop = false; - really_terminated = true; - _cid = -1; - _joinable = joinable; + ISrsThreadHandler::~ISrsThreadHandler() + { + } - // in start(), the thread cycle method maybe stop and remove the thread itself, - // and the thread start() is waiting for the _cid, and segment fault then. - // @see https://github.com/simple-rtmp-server/srs/issues/110 - // thread will set _cid, callback on_thread_start(), then wait for the can_run signal. - can_run = false; -} - -SrsThread::~SrsThread() -{ - stop(); -} - -int SrsThread::cid() -{ - return _cid; -} - -int SrsThread::start() -{ - int ret = ERROR_SUCCESS; + void ISrsThreadHandler::on_thread_start() + { + } - if(tid) { - srs_info("thread %s already running.", _name); + int ISrsThreadHandler::on_before_cycle() + { + int ret = ERROR_SUCCESS; return ret; } - if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){ - ret = ERROR_ST_CREATE_CYCLE_THREAD; - srs_error("st_thread_create failed. ret=%d", ret); + int ISrsThreadHandler::on_end_cycle() + { + int ret = ERROR_SUCCESS; return ret; } - // we set to loop to true for thread to run. - loop = true; - - // wait for cid to ready, for parent thread to get the cid. - while (_cid < 0 && loop) { - st_usleep(10 * 1000); + void ISrsThreadHandler::on_thread_stop() + { } - // now, cycle thread can run. - can_run = true; - - return ret; -} - -void SrsThread::stop() -{ - if (tid) { - loop = false; - - // the interrupt will cause the socket to read/write error, - // which will terminate the cycle thread. - st_thread_interrupt(tid); - - // when joinable, wait util quit. - if (_joinable) { - // wait the thread to exit. - int ret = st_thread_join(tid, NULL); - if (ret) { - srs_warn("core: ignore join thread failed."); - } - - // wait the thread actually terminated. - // sometimes the thread join return -1, for example, - // when thread use st_recvfrom, the thread join return -1. - // so here, we use a variable to ensure the thread stopped. - while (!really_terminated) { - st_usleep(10 * 1000); - - if (really_terminated) { - break; - } - srs_warn("core: wait thread to actually terminated"); - } - } + SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable) + { + _name = name; + handler = thread_handler; + cycle_interval_us = interval_us; tid = NULL; - } -} - -bool SrsThread::can_loop() -{ - return loop; -} - -void SrsThread::stop_loop() -{ - loop = false; -} - -void SrsThread::thread_cycle() -{ - int ret = ERROR_SUCCESS; - - _srs_context->generate_id(); - srs_info("thread %s cycle start", _name); - - _cid = _srs_context->get_id(); - - srs_assert(handler); - handler->on_thread_start(); - - // thread is running now. - really_terminated = false; - - // wait for cid to ready, for parent thread to get the cid. - while (!can_run && loop) { - st_usleep(10 * 1000); - } - - while (loop) { - if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) { - srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret); - goto failed; - } - srs_info("thread %s on before cycle success"); + loop = false; + really_terminated = true; + _cid = -1; + _joinable = joinable; - if ((ret = handler->cycle()) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret); + // in start(), the thread cycle method maybe stop and remove the thread itself, + // and the thread start() is waiting for the _cid, and segment fault then. + // @see https://github.com/simple-rtmp-server/srs/issues/110 + // thread will set _cid, callback on_thread_start(), then wait for the can_run signal. + can_run = false; + } + + SrsThread::~SrsThread() + { + stop(); + } + + int SrsThread::cid() + { + return _cid; + } + + int SrsThread::start() + { + int ret = ERROR_SUCCESS; + + if(tid) { + srs_info("thread %s already running.", _name); + return ret; + } + + if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){ + ret = ERROR_ST_CREATE_CYCLE_THREAD; + srs_error("st_thread_create failed. ret=%d", ret); + return ret; + } + + // we set to loop to true for thread to run. + loop = true; + + // wait for cid to ready, for parent thread to get the cid. + while (_cid < 0 && loop) { + st_usleep(10 * 1000); + } + + // now, cycle thread can run. + can_run = true; + + return ret; + } + + void SrsThread::stop() + { + if (tid) { + loop = false; + + // the interrupt will cause the socket to read/write error, + // which will terminate the cycle thread. + st_thread_interrupt(tid); + + // when joinable, wait util quit. + if (_joinable) { + // wait the thread to exit. + int ret = st_thread_join(tid, NULL); + if (ret) { + srs_warn("core: ignore join thread failed."); + } + + // wait the thread actually terminated. + // sometimes the thread join return -1, for example, + // when thread use st_recvfrom, the thread join return -1. + // so here, we use a variable to ensure the thread stopped. + while (!really_terminated) { + st_usleep(10 * 1000); + + if (really_terminated) { + break; + } + srs_warn("core: wait thread to actually terminated"); + } } - goto failed; - } - srs_info("thread %s cycle success", _name); - - if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) { - srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret); - goto failed; - } - srs_info("thread %s on end cycle success", _name); - -failed: - if (!loop) { - break; - } - - // to improve performance, donot sleep when interval is zero. - // @see: https://github.com/simple-rtmp-server/srs/issues/237 - if (cycle_interval_us != 0) { - st_usleep(cycle_interval_us); + + tid = NULL; } } - // readly terminated now. - really_terminated = true; + bool SrsThread::can_loop() + { + return loop; + } - handler->on_thread_stop(); - srs_info("thread %s cycle finished", _name); + void SrsThread::stop_loop() + { + loop = false; + } + + void SrsThread::thread_cycle() + { + int ret = ERROR_SUCCESS; + + _srs_context->generate_id(); + srs_info("thread %s cycle start", _name); + + _cid = _srs_context->get_id(); + + srs_assert(handler); + handler->on_thread_start(); + + // thread is running now. + really_terminated = false; + + // wait for cid to ready, for parent thread to get the cid. + while (!can_run && loop) { + st_usleep(10 * 1000); + } + + while (loop) { + if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) { + srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret); + goto failed; + } + srs_info("thread %s on before cycle success"); + + if ((ret = handler->cycle()) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret); + } + goto failed; + } + srs_info("thread %s cycle success", _name); + + if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) { + srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret); + goto failed; + } + srs_info("thread %s on end cycle success", _name); + + failed: + if (!loop) { + break; + } + + // to improve performance, donot sleep when interval is zero. + // @see: https://github.com/simple-rtmp-server/srs/issues/237 + if (cycle_interval_us != 0) { + st_usleep(cycle_interval_us); + } + } + + // readly terminated now. + really_terminated = true; + + handler->on_thread_stop(); + srs_info("thread %s cycle finished", _name); + } + + void* SrsThread::thread_fun(void* arg) + { + SrsThread* obj = (SrsThread*)arg; + srs_assert(obj); + + obj->thread_cycle(); + + st_thread_exit(NULL); + + return NULL; + } } -void* SrsThread::thread_fun(void* arg) +ISrsEndlessThreadHandler::ISrsEndlessThreadHandler() { - SrsThread* obj = (SrsThread*)arg; - srs_assert(obj); - - obj->thread_cycle(); - - st_thread_exit(NULL); - - return NULL; } +ISrsEndlessThreadHandler::~ISrsEndlessThreadHandler() +{ +} + +SrsEndlessThread::SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h) +{ + handler = h; + pthread = new internal::SrsThread(n, this, 0, false); +} + +SrsEndlessThread::~SrsEndlessThread() +{ + pthread->stop(); + srs_freep(pthread); +} + +int SrsEndlessThread::start() +{ + return pthread->start(); +} + +int SrsEndlessThread::cycle() +{ + return handler->cycle(); +} + +ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler() +{ +} + +ISrsOneCycleThreadHandler::~ISrsOneCycleThreadHandler() +{ +} + +void ISrsOneCycleThreadHandler::on_thread_stop() +{ +} + +SrsOneCycleThread::SrsOneCycleThread(const char* n, ISrsOneCycleThreadHandler* h) +{ + handler = h; + pthread = new internal::SrsThread(n, this, 0, false); +} + +SrsOneCycleThread::~SrsOneCycleThread() +{ + pthread->stop(); + srs_freep(pthread); +} + +int SrsOneCycleThread::start() +{ + return pthread->start(); +} + +int SrsOneCycleThread::cycle() +{ + int ret = handler->cycle(); + pthread->stop_loop(); + return ret; +} + +void SrsOneCycleThread::on_thread_stop() +{ + handler->on_thread_stop(); +} + +ISrsReusableThreadHandler::ISrsReusableThreadHandler() +{ +} + +ISrsReusableThreadHandler::~ISrsReusableThreadHandler() +{ +} + +void ISrsReusableThreadHandler::on_thread_stop() +{ +} + +SrsReusableThread::SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us) +{ + handler = h; + pthread = new internal::SrsThread(n, this, interval_us, true); +} + +SrsReusableThread::~SrsReusableThread() +{ + pthread->stop(); + srs_freep(pthread); +} + +int SrsReusableThread::start() +{ + return pthread->start(); +} + +void SrsReusableThread::stop() +{ + pthread->stop(); +} + +int SrsReusableThread::cid() +{ + return pthread->cid(); +} + +void SrsReusableThread::interrupt() +{ + pthread->stop_loop(); +} + +bool SrsReusableThread::interrupted() +{ + return !pthread->can_loop(); +} + +int SrsReusableThread::cycle() +{ + return handler->cycle(); +} + +void SrsReusableThread::on_thread_stop() +{ + handler->on_thread_stop(); +} diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index db89b8df0..d6e399c10 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -31,34 +31,238 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +// the internal classes, user should never use it. +// user should use the public classes at the bellow: +// @see SrsEndlessThread, SrsOneCycleThread, SrsReusableThread +namespace internal { + /** + * the handler for the thread, callback interface. + * the thread model defines as: + * handler->on_thread_start() + * while loop: + * handler->on_before_cycle() + * handler->cycle() + * handler->on_end_cycle() + * if !loop then break for user stop thread. + * sleep(CycleIntervalMilliseconds) + * handler->on_thread_stop() + * when stop, the thread will interrupt the st_thread, + * which will cause the socket to return error and + * terminate the cycle thread. + * + * @remark why should check can_loop() in cycle method? + * when thread interrupt, the socket maybe not got EINT, + * espectially on st_usleep(), so the cycle must check the loop, + * when handler->cycle() has loop itself, for example: + * while (true): + * if (read_from_socket(skt) < 0) break; + * if thread stop when read_from_socket, it's ok, the loop will break, + * but when thread stop interrupt the s_usleep(0), then the loop is + * death loop. + * in a word, the handler->cycle() must: + * while (pthread->can_loop()): + * if (read_from_socket(skt) < 0) break; + * check the loop, then it works. + * + * @remark why should use stop_loop() to terminate thread in itself? + * in the thread itself, that is the cycle method, + * if itself want to terminate the thread, should never use stop(), + * but use stop_loop() to set the loop to false and terminate normally. + * + * @remark when should set the interval_us, and when not? + * the cycle will invoke util cannot loop, eventhough the return code of cycle is error, + * so the interval_us used to sleep for each cycle. + */ + class ISrsThreadHandler + { + public: + ISrsThreadHandler(); + virtual ~ISrsThreadHandler(); + public: + virtual void on_thread_start(); + virtual int on_before_cycle(); + virtual int cycle() = 0; + virtual int on_end_cycle(); + virtual void on_thread_stop(); + }; + + /** + * provides servies from st_thread_t, + * for common thread usage. + */ + class SrsThread + { + private: + st_thread_t tid; + int _cid; + bool loop; + bool can_run; + bool really_terminated; + bool _joinable; + const char* _name; + private: + ISrsThreadHandler* handler; + int64_t cycle_interval_us; + public: + /** + * initialize the thread. + * @param name, human readable name for st debug. + * @param thread_handler, the cycle handler for the thread. + * @param interval_us, the sleep interval when cycle finished. + * @param joinable, if joinable, other thread must stop the thread. + * @remark if joinable, thread never quit itself, or memory leak. + * @see: https://github.com/simple-rtmp-server/srs/issues/78 + * @remark about st debug, see st-1.9/README, _st_iterate_threads_flag + */ + /** + * TODO: FIXME: maybe all thread must be reap by others threads, + * @see: https://github.com/simple-rtmp-server/srs/issues/77 + */ + SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable); + virtual ~SrsThread(); + public: + /** + * get the context id. @see: ISrsThreadContext.get_id(). + * used for parent thread to get the id. + * @remark when start thread, parent thread will block and wait for this id ready. + */ + virtual int cid(); + /** + * start the thread, invoke the cycle of handler util + * user stop the thread. + * @remark ignore any error of cycle of handler. + * @remark user can start multiple times, ignore if already started. + * @remark wait for the cid is set by thread pfn. + */ + virtual int start(); + /** + * stop the thread, wait for the thread to terminate. + * @remark user can stop multiple times, ignore if already stopped. + */ + virtual void stop(); + public: + /** + * whether the thread should loop, + * used for handler->cycle() which has a loop method, + * to check this method, break if false. + */ + virtual bool can_loop(); + /** + * for the loop thread to stop the loop. + * other thread can directly use stop() to stop loop and wait for quit. + * this stop loop method only set loop to false. + */ + virtual void stop_loop(); + private: + virtual void thread_cycle(); + static void* thread_fun(void* arg); + }; +} + /** - * the handler for the thread, callback interface. - * the thread model defines as: - * handler->on_thread_start() - * while loop: - * handler->on_before_cycle() - * handler->cycle() - * handler->on_end_cycle() - * if !loop then break for user stop thread. - * sleep(CycleIntervalMilliseconds) - * handler->on_thread_stop() - * when stop, the thread will interrupt the st_thread, - * which will cause the socket to return error and - * terminate the cycle thread. - * - * Usage 1: loop thread never quit. + * the endless thread is a loop thread never quit. * user can create thread always running util server terminate. * the step to create a thread never stop: - * 1. create SrsThread field, with joinable false. + * 1. create SrsEndlessThread field. * for example: - * class SrsStreamCache : public ISrsThreadHandler { - * public: SrsStreamCache() { pthread = new SrsThread("http-stream", this, SRS_AUTO_STREAM_SLEEP_US, false); } + * class SrsStreamCache : public ISrsEndlessThreadHandler { + * public: SrsStreamCache() { pthread = new SrsEndlessThread("http-stream", this); } * public: virtual int cycle() { - * // check status, start ffmpeg when stopped. + * // do some work never end. * } * } - * - * Usage 2: stop by other thread. + * @remark user must use block method in cycle method, for example, sleep or socket io. + */ +class ISrsEndlessThreadHandler +{ +public: + ISrsEndlessThreadHandler(); + virtual ~ISrsEndlessThreadHandler(); +public: + /** + * the cycle method for the common thread. + * @remark user must use block method in cycle method, for example, sleep or socket io. + */ + virtual int cycle() = 0; +}; +class SrsEndlessThread : public internal::ISrsThreadHandler +{ +private: + internal::SrsThread* pthread; + ISrsEndlessThreadHandler* handler; +public: + SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h); + virtual ~SrsEndlessThread(); +public: + /** + * for the endless thread, never quit. + */ + virtual int start(); +// interface internal::ISrsThreadHandler +public: + virtual int cycle(); +}; + +/** + * the one cycle thread is a thread do the cycle only one time, + * that is, the thread will quit when return from the cycle. + * user can create thread which stop itself, + * generally only need to provides a start method, + * the object will destroy itself then terminate the thread, @see SrsConnection + * 1. create SrsThread field + * 2. the thread quit when return from cycle. + * for example: + * class SrsConnection : public ISrsOneCycleThreadHandler { + * public: SrsConnection() { pthread = new SrsOneCycleThread("conn", this); } + * public: virtual int start() { return pthread->start(); } + * public: virtual int cycle() { + * // serve client. + * // set loop to stop to quit, stop thread itself. + * pthread->stop_loop(); + * } + * public: virtual void on_thread_stop() { + * // remove the connection in thread itself. + * server->remove(this); + * } + * }; + */ +class ISrsOneCycleThreadHandler +{ +public: + ISrsOneCycleThreadHandler(); + virtual ~ISrsOneCycleThreadHandler(); +public: + /** + * the cycle method for the one cycle thread. + */ + virtual int cycle() = 0; + /** + * when thread stop, the handler can do cleanup. + * @remark this method is optional, handler can ignore it. + */ + virtual void on_thread_stop(); +}; +class SrsOneCycleThread : public internal::ISrsThreadHandler +{ +private: + internal::SrsThread* pthread; + ISrsOneCycleThreadHandler* handler; +public: + SrsOneCycleThread(const char* n, ISrsOneCycleThreadHandler* h); + virtual ~SrsOneCycleThread(); +public: + /** + * for the one cycle thread, quit when cycle return. + */ + virtual int start(); +// interface internal::ISrsThreadHandler +public: + virtual int cycle(); + virtual void on_thread_stop(); +}; + +/** + * the reuse thread is a thread stop and start by other thread. * user can create thread and stop then start again and again, * generally must provides a start and stop method, @see SrsIngester. * the step to create a thread stop by other thread: @@ -73,147 +277,65 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * // check status, start ffmpeg when stopped. * } * }; - * - * Usage 3: stop by thread itself. - * user can create thread which stop itself, - * generally only need to provides a start method, - * the object will destroy itself then terminate the thread, @see SrsConnection - * 1. create SrsThread field, with joinable false. - * 2. owner stop thread loop, destroy itself when thread stop. - * for example: - * class SrsConnection : public ISrsThreadHandler { - * public: SrsConnection() { pthread = new SrsThread("conn", this, 0, false); } - * public: virtual int start() { return pthread->start(); } - * public: virtual int cycle() { - * // serve client. - * // set loop to stop to quit, stop thread itself. - * pthread->stop_loop(); - * } - * public: virtual int on_thread_stop() { - * // remove the connection in thread itself. - * server->remove(this); - * } - * }; - * - * Usage 4: loop in the cycle method. - * user can use loop code in the cycle method, @see SrsForwarder - * 1. create SrsThread field, with or without joinable is ok. - * 2. loop code in cycle method, check the can_loop() for thread to quit. - * for example: - * class SrsForwarder : public ISrsThreadHandler { - * public: virtual int cycle() { - * while (pthread->can_loop()) { - * // read msgs from queue and forward to server. - * } - * } - * }; - * - * @remark why should check can_loop() in cycle method? - * when thread interrupt, the socket maybe not got EINT, - * espectially on st_usleep(), so the cycle must check the loop, - * when handler->cycle() has loop itself, for example: - * while (true): - * if (read_from_socket(skt) < 0) break; - * if thread stop when read_from_socket, it's ok, the loop will break, - * but when thread stop interrupt the s_usleep(0), then the loop is - * death loop. - * in a word, the handler->cycle() must: - * while (pthread->can_loop()): - * if (read_from_socket(skt) < 0) break; - * check the loop, then it works. - * - * @remark why should use stop_loop() to terminate thread in itself? - * in the thread itself, that is the cycle method, - * if itself want to terminate the thread, should never use stop(), - * but use stop_loop() to set the loop to false and terminate normally. - * - * @remark when should set the interval_us, and when not? - * the cycle will invoke util cannot loop, eventhough the return code of cycle is error, - * so the interval_us used to sleep for each cycle. */ -class ISrsThreadHandler +class ISrsReusableThreadHandler { public: - ISrsThreadHandler(); - virtual ~ISrsThreadHandler(); + ISrsReusableThreadHandler(); + virtual ~ISrsReusableThreadHandler(); public: - virtual void on_thread_start(); - virtual int on_before_cycle(); + /** + * the cycle method for the one cycle thread. + * @remark when the cycle has its inner loop, it must check whether + * the thread is interrupted. + */ virtual int cycle() = 0; - virtual int on_end_cycle(); + /** + * when thread stop, the handler can do cleanup. + * @remark this method is optional, handler can ignore it. + */ virtual void on_thread_stop(); }; - -/** -* provides servies from st_thread_t, -* for common thread usage. -*/ -class SrsThread +class SrsReusableThread : public internal::ISrsThreadHandler { private: - st_thread_t tid; - int _cid; - bool loop; - bool can_run; - bool really_terminated; - bool _joinable; - const char* _name; -private: - ISrsThreadHandler* handler; - int64_t cycle_interval_us; + internal::SrsThread* pthread; + ISrsReusableThreadHandler* handler; +public: + SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us = 0); + virtual ~SrsReusableThread(); public: /** - * initialize the thread. - * @param name, human readable name for st debug. - * @param thread_handler, the cycle handler for the thread. - * @param interval_us, the sleep interval when cycle finished. - * @param joinable, if joinable, other thread must stop the thread. - * @remark if joinable, thread never quit itself, or memory leak. - * @see: https://github.com/simple-rtmp-server/srs/issues/78 - * @remark about st debug, see st-1.9/README, _st_iterate_threads_flag - */ - /** - * TODO: FIXME: maybe all thread must be reap by others threads, - * @see: https://github.com/simple-rtmp-server/srs/issues/77 - */ - SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable); - virtual ~SrsThread(); -public: - /** - * get the context id. @see: ISrsThreadContext.get_id(). - * used for parent thread to get the id. - * @remark when start thread, parent thread will block and wait for this id ready. - */ - virtual int cid(); - /** - * start the thread, invoke the cycle of handler util - * user stop the thread. - * @remark ignore any error of cycle of handler. - * @remark user can start multiple times, ignore if already started. - * @remark wait for the cid is set by thread pfn. - */ + * for the reusable thread, start and stop by user. + */ virtual int start(); /** - * stop the thread, wait for the thread to terminate. - * @remark user can stop multiple times, ignore if already stopped. - */ + * stop the thread, wait for the thread to terminate. + * @remark user can stop multiple times, ignore if already stopped. + */ virtual void stop(); public: /** - * whether the thread should loop, - * used for handler->cycle() which has a loop method, - * to check this method, break if false. - */ - virtual bool can_loop(); + * get the context id. @see: ISrsThreadContext.get_id(). + * used for parent thread to get the id. + * @remark when start thread, parent thread will block and wait for this id ready. + */ + virtual int cid(); /** - * for the loop thread to stop the loop. - * other thread can directly use stop() to stop loop and wait for quit. - * this stop loop method only set loop to false. - */ - virtual void stop_loop(); -private: - virtual void thread_cycle(); - static void* thread_fun(void* arg); + * interrupt the thread to stop loop. + * we only set the loop flag to false, not really interrupt the thread. + */ + virtual void interrupt(); + /** + * whether the thread is interrupted, + * for the cycle has its loop, the inner loop should quit when thread + * is interrupted. + */ + virtual bool interrupted(); +// interface internal::ISrsThreadHandler +public: + virtual int cycle(); + virtual void on_thread_stop(); }; #endif