diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index a5094bd24..74cbb18ae 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -47,15 +47,9 @@ using namespace std; #include #include -// when error, edge ingester sleep for a while and retry. -#define SRS_EDGE_INGESTER_CIMS (3*1000) - // when edge timeout, retry next. #define SRS_EDGE_INGESTER_TMMS (5*1000) -// when error, edge ingester sleep for a while and retry. -#define SRS_EDGE_FORWARDER_CIMS (3*1000) - // when edge error, wait for quit #define SRS_EDGE_FORWARDER_TMMS (150) @@ -172,7 +166,7 @@ SrsEdgeIngester::SrsEdgeIngester() upstream = new SrsEdgeRtmpUpstream(redirect); lb = new SrsLbRoundRobin(); - pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_CIMS); + trd = NULL; } SrsEdgeIngester::~SrsEdgeIngester() @@ -181,7 +175,7 @@ SrsEdgeIngester::~SrsEdgeIngester() srs_freep(upstream); srs_freep(lb); - srs_freep(pthread); + srs_freep(trd); } int SrsEdgeIngester::initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r) @@ -204,12 +198,14 @@ int SrsEdgeIngester::start() return ret; } - return pthread->start(); + srs_freep(trd); + trd = new SrsCoroutine("edge-igs", this); + return trd->start(); } void SrsEdgeIngester::stop() { - pthread->stop(); + trd->stop(); upstream->close(); // notice to unpublish. @@ -223,11 +219,30 @@ string SrsEdgeIngester::get_curr_origin() return lb->selected(); } +// when error, edge ingester sleep for a while and retry. +#define SRS_EDGE_INGESTER_CIMS (3*1000) + int SrsEdgeIngester::cycle() { int ret = ERROR_SUCCESS; - for (;;) { + while (!trd->pull()) { + if ((ret = do_cycle()) != ERROR_SUCCESS) { + srs_warn("EdgeIngester: Ignore error, ret=%d", ret); + } + + if (!trd->pull()) { + st_usleep(SRS_EDGE_INGESTER_CIMS * 1000); + } + } + return ret; +} + +int SrsEdgeIngester::do_cycle() +{ + int ret = ERROR_SUCCESS; + + while (!trd->pull()) { srs_freep(upstream); upstream = new SrsEdgeRtmpUpstream(redirect); @@ -275,7 +290,7 @@ int SrsEdgeIngester::ingest() // set to larger timeout to read av data from origin. upstream->set_recv_timeout(SRS_EDGE_INGESTER_TMMS); - while (!pthread->interrupted()) { + while (!trd->pull()) { pprint->elapse(); // pithy print @@ -408,7 +423,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() sdk = NULL; lb = new SrsLbRoundRobin(); - pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_CIMS); + trd = NULL; queue = new SrsMessageQueue(); } @@ -417,7 +432,7 @@ SrsEdgeForwarder::~SrsEdgeForwarder() stop(); srs_freep(lb); - srs_freep(pthread); + srs_freep(trd); srs_freep(queue); } @@ -478,22 +493,42 @@ int SrsEdgeForwarder::start() return ret; } - return pthread->start(); + trd = new SrsCoroutine("edge-fwr", this); + return trd->start(); } void SrsEdgeForwarder::stop() { - pthread->stop(); + trd->stop(); queue->clear(); srs_freep(sdk); } -#define SYS_MAX_EDGE_SEND_MSGS 128 +// when error, edge ingester sleep for a while and retry. +#define SRS_EDGE_FORWARDER_CIMS (3*1000) int SrsEdgeForwarder::cycle() { int ret = ERROR_SUCCESS; + while (!trd->pull()) { + if ((ret = do_cycle()) != ERROR_SUCCESS) { + srs_warn("EdgeForwarder: Ignore error, ret=%d", ret); + } + + if (!trd->pull()) { + st_usleep(SRS_EDGE_FORWARDER_CIMS * 1000); + } + } + return ret; +} + +#define SYS_MAX_EDGE_SEND_MSGS 128 + +int SrsEdgeForwarder::do_cycle() +{ + int ret = ERROR_SUCCESS; + sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TMMS); SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); @@ -501,7 +536,7 @@ int SrsEdgeForwarder::cycle() SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS); - while (!pthread->interrupted()) { + while (!trd->pull()) { if (send_error_code != ERROR_SUCCESS) { st_usleep(SRS_EDGE_FORWARDER_TMMS * 1000); continue; diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 49f782d37..99f34bc8d 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -114,13 +114,13 @@ public: /** * edge used to ingest stream from origin. */ -class SrsEdgeIngester : public ISrsReusableThread2Handler +class SrsEdgeIngester : public ISrsCoroutineHandler { private: SrsSource* source; SrsPlayEdge* edge; SrsRequest* req; - SrsReusableThread2* pthread; + SrsCoroutine* trd; SrsLbRoundRobin* lb; SrsEdgeUpstream* upstream; // for RTMP 302 redirect. @@ -136,6 +136,8 @@ public: // interface ISrsReusableThread2Handler public: virtual int cycle(); +private: + virtual int do_cycle(); private: virtual int ingest(); virtual int process_publish_message(SrsCommonMessage* msg); @@ -144,13 +146,13 @@ private: /** * edge used to forward stream to origin. */ -class SrsEdgeForwarder : public ISrsReusableThread2Handler +class SrsEdgeForwarder : public ISrsCoroutineHandler { private: SrsSource* source; SrsPublishEdge* edge; SrsRequest* req; - SrsReusableThread2* pthread; + SrsCoroutine* trd; SrsSimpleRtmpClient* sdk; SrsLbRoundRobin* lb; /** @@ -176,6 +178,8 @@ public: // interface ISrsReusableThread2Handler public: virtual int cycle(); +private: + virtual int do_cycle(); public: virtual int proxy(SrsCommonMessage* msg); }; diff --git a/trunk/src/app/srs_app_encoder.cpp b/trunk/src/app/srs_app_encoder.cpp index d4021b0a3..a2c5be72c 100644 --- a/trunk/src/app/srs_app_encoder.cpp +++ b/trunk/src/app/srs_app_encoder.cpp @@ -100,7 +100,10 @@ int SrsEncoder::cycle() if ((ret = do_cycle()) != ERROR_SUCCESS) { srs_warn("Encoder: Ignore error, ret=%d", ret); } - st_usleep(SRS_RTMP_ENCODER_CIMS * 1000); + + if (!trd->pull()) { + st_usleep(SRS_RTMP_ENCODER_CIMS * 1000); + } } // kill ffmpeg when finished and it alive diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index bd4a0b305..0fe4b0a2c 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -47,9 +47,6 @@ using namespace std; #include #include -// when error, forwarder sleep for a while and retry. -#define SRS_FORWARDER_CIMS (3000) - SrsForwarder::SrsForwarder(SrsOriginHub* h) { hub = h; @@ -58,7 +55,7 @@ SrsForwarder::SrsForwarder(SrsOriginHub* h) sh_video = sh_audio = NULL; sdk = NULL; - pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_CIMS); + trd = NULL; queue = new SrsMessageQueue(); jitter = new SrsRtmpJitter(); } @@ -66,7 +63,7 @@ SrsForwarder::SrsForwarder(SrsOriginHub* h) SrsForwarder::~SrsForwarder() { srs_freep(sdk); - srs_freep(pthread); + srs_freep(trd); srs_freep(queue); srs_freep(jitter); @@ -138,18 +135,19 @@ int SrsForwarder::on_publish() source_ep.c_str(), dest_ep.c_str(), tcUrl.c_str(), req->stream.c_str()); - if ((ret = pthread->start()) != ERROR_SUCCESS) { + srs_freep(trd); + trd = new SrsCoroutine("forward", this); + if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("start srs thread failed. ret=%d", ret); return ret; } - srs_trace("forward thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id()); return ret; } void SrsForwarder::on_unpublish() { - pthread->stop(); + trd->stop(); sdk->close(); } @@ -220,10 +218,30 @@ int SrsForwarder::on_video(SrsSharedPtrMessage* shared_video) return ret; } +// when error, forwarder sleep for a while and retry. +#define SRS_FORWARDER_CIMS (3000) + int SrsForwarder::cycle() { int ret = ERROR_SUCCESS; + while (!trd->pull()) { + if ((ret = do_cycle()) != ERROR_SUCCESS) { + srs_warn("Forwarder: Ignore error, ret=%d", ret); + } + + if (!trd->pull()) { + st_usleep(SRS_FORWARDER_CIMS * 1000); + } + } + + return ret; +} + +int SrsForwarder::do_cycle() +{ + int ret = ERROR_SUCCESS; + std::string url; if (true) { std::string server; @@ -289,7 +307,7 @@ int SrsForwarder::forward() } } - while (!pthread->interrupted()) { + while (!trd->pull()) { pprint->elapse(); // read from client. diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index 01be0da69..29ec907af 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -47,14 +47,14 @@ class SrsSimpleRtmpClient; * forward the stream to other servers. */ // TODO: FIXME: refine the error log, comments it. -class SrsForwarder : public ISrsReusableThread2Handler +class SrsForwarder : public ISrsCoroutineHandler { private: // the ep to forward, server[:port]. std::string ep_forward; SrsRequest* req; private: - SrsReusableThread2* pthread; + SrsCoroutine* trd; private: SrsOriginHub* hub; SrsSimpleRtmpClient* sdk; @@ -93,6 +93,8 @@ public: // interface ISrsReusableThread2Handler. public: virtual int cycle(); +private: + virtual int do_cycle(); private: virtual int forward(); }; diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index 562f4a2fb..bbc51b204 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -181,7 +181,10 @@ int SrsIngester::cycle() if ((ret = do_cycle()) != ERROR_SUCCESS) { srs_warn("Ingester: Ignore error, ret=%d", ret); } - st_usleep(SRS_AUTO_INGESTER_CIMS * 1000); + + if (!trd->pull()) { + st_usleep(SRS_AUTO_INGESTER_CIMS * 1000); + } } return ret; diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 21d6f753e..3c33ae6cc 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -501,7 +501,10 @@ int SrsKafkaProducer::cycle() if ((ret = do_cycle()) != ERROR_SUCCESS) { srs_warn("ignore kafka error. ret=%d", ret); } - st_usleep(SRS_KAKFA_CIMS * 1000); + + if (!trd->pull()) { + st_usleep(SRS_KAKFA_CIMS * 1000); + } } return ret; diff --git a/trunk/src/app/srs_app_ng_exec.cpp b/trunk/src/app/srs_app_ng_exec.cpp index 7f5e920f6..8674bc4b5 100644 --- a/trunk/src/app/srs_app_ng_exec.cpp +++ b/trunk/src/app/srs_app_ng_exec.cpp @@ -86,7 +86,10 @@ int SrsNgExec::cycle() if ((ret = do_cycle()) != ERROR_SUCCESS) { srs_warn("EXEC: Ignore error, ret=%d", ret); } - st_usleep(SRS_RTMP_EXEC_CIMS * 1000); + + if (!trd->pull()) { + st_usleep(SRS_RTMP_EXEC_CIMS * 1000); + } } std::vector::iterator it; diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 16e7da8b7..62e87cf0f 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -60,15 +60,11 @@ SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm) rtmp = r; pumper = p; timeout = tm; - trd = new SrsReusableThread2("recv", this); + trd = NULL; } SrsRecvThread::~SrsRecvThread() { - // stop recv thread. - stop(); - - // destroy the thread. srs_freep(trd); } @@ -79,6 +75,8 @@ int SrsRecvThread::cid() int SrsRecvThread::start() { + srs_freep(trd); + trd = new SrsCoroutine("recv", this); return trd->start(); } @@ -119,7 +117,7 @@ int SrsRecvThread::do_cycle() { int ret = ERROR_SUCCESS; - while (!trd->interrupted()) { + while (!trd->pull()) { // When the pumper is interrupted, wait then retry. if (pumper->interrupted()) { st_usleep(timeout * 1000); diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 77f35028c..c84173039 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -90,10 +90,10 @@ public: /** * the recv thread, use message handler to handle each received message. */ -class SrsRecvThread : public ISrsReusableThread2Handler +class SrsRecvThread : public ISrsCoroutineHandler { protected: - SrsReusableThread2* trd; + SrsCoroutine* trd; ISrsMessagePumper* pumper; SrsRtmpServer* rtmp; // The recv timeout in ms. @@ -157,7 +157,7 @@ public: */ class SrsPublishRecvThread : virtual public ISrsMessagePumper, virtual public ISrsReloadHandler #ifdef SRS_PERF_MERGED_READ -, virtual public IMergeReadHandler + , virtual public IMergeReadHandler #endif { private: diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index 014c69b4b..91bf3a778 100755 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -78,91 +78,3 @@ void SrsCoroutineManager::clear() } } -ISrsReusableThread2Handler::ISrsReusableThread2Handler() -{ -} - -ISrsReusableThread2Handler::~ISrsReusableThread2Handler() -{ -} - -void ISrsReusableThread2Handler::on_thread_start() -{ -} - -int ISrsReusableThread2Handler::on_before_cycle() -{ - return ERROR_SUCCESS; -} - -int ISrsReusableThread2Handler::on_end_cycle() -{ - return ERROR_SUCCESS; -} - -void ISrsReusableThread2Handler::on_thread_stop() -{ -} - -SrsReusableThread2::SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, int64_t cims) -{ - handler = h; - pthread = new internal::SrsThread(n, this, cims, true); -} - -SrsReusableThread2::~SrsReusableThread2() -{ - pthread->stop(); - srs_freep(pthread); -} - -int SrsReusableThread2::start() -{ - return pthread->start(); -} - -void SrsReusableThread2::stop() -{ - pthread->stop(); -} - -int SrsReusableThread2::cid() -{ - return pthread->cid(); -} - -void SrsReusableThread2::interrupt() -{ - pthread->stop_loop(); -} - -bool SrsReusableThread2::interrupted() -{ - return !pthread->can_loop(); -} - -int SrsReusableThread2::cycle() -{ - return handler->cycle(); -} - -void SrsReusableThread2::on_thread_start() -{ - handler->on_thread_start(); -} - -int SrsReusableThread2::on_before_cycle() -{ - return handler->on_before_cycle(); -} - -int SrsReusableThread2::on_end_cycle() -{ - return handler->on_end_cycle(); -} - -void SrsReusableThread2::on_thread_stop() -{ - handler->on_thread_stop(); -} - diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index ee9983493..d1c009118 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -58,99 +58,5 @@ private: void clear(); }; -/** - * the reuse thread is a thread stop and start by other thread. - * the version 2, is the thread cycle has its inner loop, which should - * check the intterrupt, and should interrupt thread when the inner loop want - * to quit the thread. - * user can create thread and stop then start again and again, - * generally must provides a start and stop method, @see SrsIngester. - * the step to create a thread stop by other thread: - * 1. create SrsReusableThread field. - * 2. must manually stop the thread when started it. - * for example: - * class SrsIngester : public ISrsReusableThreadHandler { - * public: SrsIngester() { pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_CIMS); } - * public: virtual int start() { return pthread->start(); } - * public: virtual void stop() { pthread->stop(); } - * public: virtual int cycle() { - * while (!pthread->interrupted()) { - * // quit thread when error. - * if (ret != ERROR_SUCCESS) { - * pthread->interrupt(); - * } - * - * // do something. - * } - * } - * }; - */ -class ISrsReusableThread2Handler -{ -public: - ISrsReusableThread2Handler(); - virtual ~ISrsReusableThread2Handler(); -public: - /** - * the cycle method for the one cycle thread. - * @remark when the cycle has its inner loop, it must check whether - * the thread is interrupted. - */ - virtual int cycle() = 0; -public: - /** - * other callback for handler. - * @remark all callback is optional, handler can ignore it. - */ - virtual void on_thread_start(); - virtual int on_before_cycle(); - virtual int on_end_cycle(); - virtual void on_thread_stop(); -}; -class SrsReusableThread2 : public internal::ISrsThreadHandler -{ -private: - internal::SrsThread* pthread; - ISrsReusableThread2Handler* handler; -public: - SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, int64_t cims = 0); - virtual ~SrsReusableThread2(); -public: - /** - * for the reusable thread, start and stop by user. - */ - virtual int start(); - /** - * stop the thread, wait for the thread to terminate. - * @remark user can stop multiple times, ignore if already stopped. - */ - virtual void stop(); -public: - /** - * get the context id. @see: ISrsThreadContext.get_id(). - * used for parent thread to get the id. - * @remark when start thread, parent thread will block and wait for this id ready. - */ - virtual int cid(); - /** - * interrupt the thread to stop loop. - * we only set the loop flag to false, not really interrupt the thread. - */ - virtual void interrupt(); - /** - * whether the thread is interrupted, - * for the cycle has its loop, the inner loop should quit when thread - * is interrupted. - */ - virtual bool interrupted(); -// interface internal::ISrsThreadHandler -public: - virtual int cycle(); - virtual void on_thread_start(); - virtual int on_before_cycle(); - virtual int on_end_cycle(); - virtual void on_thread_stop(); -}; - #endif