diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp index e32b536ff..f7904a041 100644 --- a/trunk/src/app/srs_app_async_call.cpp +++ b/trunk/src/app/srs_app_async_call.cpp @@ -38,7 +38,7 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask() SrsAsyncCallWorker::SrsAsyncCallWorker() { - trd = NULL; + trd = new SrsDummyCoroutine(); wait = srs_cond_new(); } @@ -74,7 +74,7 @@ int SrsAsyncCallWorker::count() int SrsAsyncCallWorker::start() { srs_freep(trd); - trd = new SrsCoroutine("async", this, _srs_context->get_id()); + trd = new SrsSTCoroutine("async", this, _srs_context->get_id()); return trd->start(); } diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index fb215454c..0dbb06d72 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -41,7 +41,7 @@ SrsConnection::SrsConnection(IConnectionManager* cm, srs_netfd_t c, string cip) kbps = new SrsKbps(); kbps->set_io(skt, skt); - trd = new SrsCoroutine("conn", this); + trd = new SrsSTCoroutine("conn", this); } SrsConnection::~SrsConnection() diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index efe649ef9..2d6defb7a 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -166,7 +166,7 @@ SrsEdgeIngester::SrsEdgeIngester() upstream = new SrsEdgeRtmpUpstream(redirect); lb = new SrsLbRoundRobin(); - trd = NULL; + trd = new SrsDummyCoroutine(); } SrsEdgeIngester::~SrsEdgeIngester() @@ -199,7 +199,7 @@ int SrsEdgeIngester::start() } srs_freep(trd); - trd = new SrsCoroutine("edge-igs", this); + trd = new SrsSTCoroutine("edge-igs", this); return trd->start(); } @@ -423,7 +423,7 @@ SrsEdgeForwarder::SrsEdgeForwarder() sdk = NULL; lb = new SrsLbRoundRobin(); - trd = NULL; + trd = new SrsDummyCoroutine(); queue = new SrsMessageQueue(); } @@ -493,7 +493,8 @@ int SrsEdgeForwarder::start() return ret; } - trd = new SrsCoroutine("edge-fwr", this); + srs_freep(trd); + trd = new SrsSTCoroutine("edge-fwr", this); return trd->start(); } diff --git a/trunk/src/app/srs_app_encoder.cpp b/trunk/src/app/srs_app_encoder.cpp index 39ba45876..96da82bd7 100644 --- a/trunk/src/app/srs_app_encoder.cpp +++ b/trunk/src/app/srs_app_encoder.cpp @@ -41,7 +41,7 @@ static std::vector _transcoded_url; SrsEncoder::SrsEncoder() { - trd = NULL; + trd = new SrsDummyCoroutine(); pprint = SrsPithyPrint::create_encoder(); } @@ -74,7 +74,7 @@ int SrsEncoder::on_publish(SrsRequest* req) // start thread to run all encoding engines. srs_freep(trd); - trd = new SrsCoroutine("encoder", this, _srs_context->get_id()); + trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id()); if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("st_thread_create failed. ret=%d", ret); return ret; diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 165c5e36d..05636cea7 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -55,7 +55,7 @@ SrsForwarder::SrsForwarder(SrsOriginHub* h) sh_video = sh_audio = NULL; sdk = NULL; - trd = NULL; + trd = new SrsDummyCoroutine(); queue = new SrsMessageQueue(); jitter = new SrsRtmpJitter(); } @@ -136,7 +136,7 @@ int SrsForwarder::on_publish() req->stream.c_str()); srs_freep(trd); - trd = new SrsCoroutine("forward", this); + trd = new SrsSTCoroutine("forward", this); if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("start srs thread failed. ret=%d", ret); return ret; diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index f85ea1e86..a948a9391 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -60,7 +60,7 @@ SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r) req = r->copy(); source = s; queue = new SrsMessageQueue(true); - trd = new SrsCoroutine("http-stream", this); + trd = new SrsSTCoroutine("http-stream", this); // TODO: FIXME: support reload. fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index 0565445f3..dbbe59628 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -105,7 +105,7 @@ SrsIngester::SrsIngester() expired = false; - trd = NULL; + trd = new SrsDummyCoroutine(); pprint = SrsPithyPrint::create_ingester(); } @@ -141,7 +141,7 @@ int SrsIngester::start() // start thread to run all encoding engines. srs_freep(trd); - trd = new SrsCoroutine("ingest", this, _srs_context->get_id()); + trd = new SrsSTCoroutine("ingest", this, _srs_context->get_id()); if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("st_thread_create failed. ret=%d", ret); return ret; diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 6f435cc2b..7d4e103ef 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -365,7 +365,7 @@ SrsKafkaProducer::SrsKafkaProducer() metadata_expired = srs_cond_new(); lock = srs_mutex_new(); - trd = NULL; + trd = new SrsDummyCoroutine(); worker = new SrsAsyncCallWorker(); cache = new SrsKafkaCache(); @@ -410,7 +410,7 @@ int SrsKafkaProducer::start() } srs_freep(trd); - trd = new SrsCoroutine("kafka", this, _srs_context->get_id()); + trd = new SrsSTCoroutine("kafka", this, _srs_context->get_id()); if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("start kafka thread failed. ret=%d", ret); } diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index d2ef6512a..a3e5db8b0 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -80,7 +80,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p) nb_buf = SRS_UDP_MAX_PACKET_SIZE; buf = new char[nb_buf]; - trd = NULL; + trd = new SrsDummyCoroutine(); } SrsUdpListener::~SrsUdpListener() @@ -140,7 +140,7 @@ int SrsUdpListener::listen() srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); srs_freep(trd); - trd = new SrsCoroutine("udp", this); + trd = new SrsSTCoroutine("udp", this); if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); return ret; @@ -187,7 +187,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p) _fd = -1; _stfd = NULL; - trd = NULL; + trd = new SrsDummyCoroutine(); } SrsTcpListener::~SrsTcpListener() @@ -242,7 +242,7 @@ int SrsTcpListener::listen() srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); srs_freep(trd); - trd = new SrsCoroutine("tcp", this); + trd = new SrsSTCoroutine("tcp", this); if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret); return ret; diff --git a/trunk/src/app/srs_app_ng_exec.cpp b/trunk/src/app/srs_app_ng_exec.cpp index e132b2f2f..371ac64d8 100644 --- a/trunk/src/app/srs_app_ng_exec.cpp +++ b/trunk/src/app/srs_app_ng_exec.cpp @@ -38,7 +38,7 @@ using namespace std; SrsNgExec::SrsNgExec() { - trd = NULL; + trd = new SrsDummyCoroutine(); pprint = SrsPithyPrint::create_exec(); } @@ -61,7 +61,7 @@ int SrsNgExec::on_publish(SrsRequest* req) // start thread to run all processes. srs_freep(trd); - trd = new SrsCoroutine("encoder", this, _srs_context->get_id()); + trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id()); if ((ret = trd->start()) != ERROR_SUCCESS) { srs_error("st_thread_create failed. ret=%d", ret); return ret; diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index b24ffd1f9..bc7768902 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -61,7 +61,7 @@ SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm) rtmp = r; pumper = p; timeout = tm; - trd = NULL; + trd = new SrsDummyCoroutine(); } SrsRecvThread::~SrsRecvThread() @@ -77,7 +77,7 @@ int SrsRecvThread::cid() int SrsRecvThread::start() { srs_freep(trd); - trd = new SrsCoroutine("recv", this); + trd = new SrsSTCoroutine("recv", this); return trd->start(); } @@ -535,7 +535,7 @@ SrsHttpRecvThread::SrsHttpRecvThread(SrsResponseOnlyHttpConn* c) { conn = c; error = ERROR_SUCCESS; - trd = new SrsCoroutine("http-receive", this, _srs_context->get_id()); + trd = new SrsSTCoroutine("http-receive", this, _srs_context->get_id()); } SrsHttpRecvThread::~SrsHttpRecvThread() diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index d0f83b2cb..135ca9aa8 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -195,7 +195,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o) stfd = fd; skt = new SrsStSocket(); rtsp = new SrsRtspStack(skt); - trd = new SrsCoroutine("rtsp", this); + trd = new SrsSTCoroutine("rtsp", this); req = NULL; sdk = NULL; diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index f54e1e310..5788b2fa1 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -362,7 +362,7 @@ SrsSignalManager::SrsSignalManager(SrsServer* s) server = s; sig_pipe[0] = sig_pipe[1] = -1; - trd = new SrsCoroutine("signal", this); + trd = new SrsSTCoroutine("signal", this); signal_read_stfd = NULL; } diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index c3e34e3cc..f92becc78 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -40,7 +40,46 @@ ISrsCoroutineHandler::~ISrsCoroutineHandler() { } -SrsCoroutine::SrsCoroutine(const string& n, ISrsCoroutineHandler* h, int cid) +SrsCoroutine::SrsCoroutine() +{ +} + +SrsCoroutine::~SrsCoroutine() +{ +} + +SrsDummyCoroutine::SrsDummyCoroutine() +{ +} + +SrsDummyCoroutine::~SrsDummyCoroutine() +{ +} + +int SrsDummyCoroutine::start() +{ + return ERROR_THREAD_DUMMY; +} + +void SrsDummyCoroutine::stop() +{ +} + +void SrsDummyCoroutine::interrupt() +{ +} + +int SrsDummyCoroutine::pull() +{ + return ERROR_THREAD_DUMMY; +} + +int SrsDummyCoroutine::cid() +{ + return 0; +} + +SrsSTCoroutine::SrsSTCoroutine(const string& n, ISrsCoroutineHandler* h, int cid) { name = n; handler = h; @@ -50,12 +89,12 @@ SrsCoroutine::SrsCoroutine(const string& n, ISrsCoroutineHandler* h, int cid) started = interrupted = disposed = false; } -SrsCoroutine::~SrsCoroutine() +SrsSTCoroutine::~SrsSTCoroutine() { stop(); } -int SrsCoroutine::start() +int SrsSTCoroutine::start() { int ret = ERROR_SUCCESS; @@ -77,7 +116,7 @@ int SrsCoroutine::start() return ret; } -void SrsCoroutine::stop() +void SrsSTCoroutine::stop() { if (!started || disposed) { return; @@ -101,7 +140,7 @@ void SrsCoroutine::stop() return; } -void SrsCoroutine::interrupt() +void SrsSTCoroutine::interrupt() { if (!started || interrupted) { return; @@ -113,17 +152,17 @@ void SrsCoroutine::interrupt() st_thread_interrupt((st_thread_t)trd); } -int SrsCoroutine::pull() +int SrsSTCoroutine::pull() { return err; } -int SrsCoroutine::cid() +int SrsSTCoroutine::cid() { return context; } -int SrsCoroutine::cycle() +int SrsSTCoroutine::cycle() { if (_srs_context) { if (context) { @@ -139,9 +178,9 @@ int SrsCoroutine::cycle() return ret; } -void* SrsCoroutine::pfn(void* arg) +void* SrsSTCoroutine::pfn(void* arg) { - SrsCoroutine* p = (SrsCoroutine*)arg; + SrsSTCoroutine* p = (SrsSTCoroutine*)arg; void*res = (void*)(uint64_t)p->cycle(); return res; } diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 2d151f06e..480349181 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -67,6 +67,39 @@ public: virtual int cycle() = 0; }; +/** + * The corotine object. + */ +class SrsCoroutine +{ +public: + SrsCoroutine(); + virtual ~SrsCoroutine(); +public: + virtual int start() = 0; + virtual void stop() = 0; + virtual void interrupt() = 0; + virtual int pull() = 0; + virtual int cid() = 0; +}; + +/** + * An empty coroutine, user can default to this object before create any real coroutine. + * @see https://github.com/ossrs/srs/pull/908 + */ +class SrsDummyCoroutine : public SrsCoroutine +{ +public: + SrsDummyCoroutine(); + virtual ~SrsDummyCoroutine(); +public: + virtual int start(); + virtual void stop(); + virtual void interrupt(); + virtual int pull(); + virtual int cid(); +}; + /** * A ST-coroutine is a lightweight thread, just like the goroutine. * But the goroutine maybe run on different thread, while ST-coroutine only @@ -81,7 +114,7 @@ public: * @remark We always create joinable thread, so we must join it or memory leak, * Please read https://github.com/ossrs/srs/issues/78 */ -class SrsCoroutine +class SrsSTCoroutine : public SrsCoroutine { private: std::string name; @@ -97,8 +130,8 @@ private: public: // Create a thread with name n and handler h. // @remark User can specify a cid for thread to use, or we will allocate a new one. - SrsCoroutine(const std::string& n, ISrsCoroutineHandler* h, int cid = 0); - virtual ~SrsCoroutine(); + SrsSTCoroutine(const std::string& n, ISrsCoroutineHandler* h, int cid = 0); + virtual ~SrsSTCoroutine(); public: /** * Start the thread. diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index 4839c3586..1db664707 100755 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -32,7 +32,7 @@ using namespace std; SrsCoroutineManager::SrsCoroutineManager() { cond = srs_cond_new(); - trd = new SrsCoroutine("manager", this); + trd = new SrsSTCoroutine("manager", this); } SrsCoroutineManager::~SrsCoroutineManager() diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index c3fb41f4c..8149f4343 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -106,6 +106,7 @@ #define ERROR_THREAD_DISPOSED 1069 #define ERROR_THREAD_INTERRUPED 1070 #define ERROR_THREAD_TERMINATED 1071 +#define ERROR_THREAD_DUMMY 1072 /////////////////////////////////////////////////////// // RTMP protocol error.