diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index ceb5342db..7696f2d66 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -60,7 +60,7 @@ SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r) req = r->copy(); source = s; queue = new SrsMessageQueue(true); - pthread = new SrsEndlessThread("http-stream", this); + trd = new SrsCoroutine("http-stream", this); // TODO: FIXME: support reload. fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); @@ -68,7 +68,7 @@ SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r) SrsBufferCache::~SrsBufferCache() { - srs_freep(pthread); + srs_freep(trd); srs_freep(queue); srs_freep(req); @@ -87,7 +87,7 @@ int SrsBufferCache::update(SrsSource* s, SrsRequest* r) int SrsBufferCache::start() { - return pthread->start(); + return trd->start(); } int SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) @@ -138,7 +138,7 @@ int SrsBufferCache::cycle() // TODO: FIXME: support reload. queue->set_queue_size(fast_cache); - while (true) { + while (!trd->pull()) { pprint->elapse(); // get messages from consumer. diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 93c679664..cee0feb71 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -38,7 +38,7 @@ class SrsTsTransmuxer; * for example, the audio stream cache to make android(weixin) happy. * we start a thread to shrink the queue. */ -class SrsBufferCache : public ISrsEndlessThreadHandler +class SrsBufferCache : public ISrsCoroutineHandler { private: double fast_cache; @@ -46,7 +46,7 @@ private: SrsMessageQueue* queue; SrsSource* source; SrsRequest* req; - SrsEndlessThread* pthread; + SrsCoroutine* trd; public: SrsBufferCache(SrsSource* s, SrsRequest* r); virtual ~SrsBufferCache(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 7c8d7490e..0893536cb 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -355,13 +355,13 @@ SrsUdpCasterListener::~SrsUdpCasterListener() SrsSignalManager* SrsSignalManager::instance = NULL; -SrsSignalManager::SrsSignalManager(SrsServer* server) +SrsSignalManager::SrsSignalManager(SrsServer* s) { SrsSignalManager::instance = this; - _server = server; + server = s; sig_pipe[0] = sig_pipe[1] = -1; - pthread = new SrsEndlessThread("signal", this); + trd = new SrsCoroutine("signal", this); signal_read_stfd = NULL; } @@ -376,7 +376,7 @@ SrsSignalManager::~SrsSignalManager() ::close(sig_pipe[1]); } - srs_freep(pthread); + srs_freep(trd); } int SrsSignalManager::initialize() @@ -432,20 +432,22 @@ int SrsSignalManager::start() srs_trace("signal installed, reload=%d, reopen=%d, grace_quit=%d", SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_GRACEFULLY_QUIT); - return pthread->start(); + return trd->start(); } int SrsSignalManager::cycle() { int ret = ERROR_SUCCESS; - int signo; - - /* Read the next signal from the pipe */ - st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT); - - /* Process signal synchronously */ - _server->on_signal(signo); + while (!trd->pull()) { + int signo; + + /* Read the next signal from the pipe */ + st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT); + + /* Process signal synchronously */ + server->on_signal(signo); + } return ret; } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index a1cdfe9cb..c6c049693 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -178,7 +178,7 @@ public: * convert signal to io, * @see: st-1.9/docs/notes.html */ -class SrsSignalManager : public ISrsEndlessThreadHandler +class SrsSignalManager : public ISrsCoroutineHandler { private: /* Per-process pipe which is used as a signal queue. */ @@ -186,10 +186,10 @@ private: int sig_pipe[2]; st_netfd_t signal_read_stfd; private: - SrsServer* _server; - SrsEndlessThread* pthread; + SrsServer* server; + SrsCoroutine* trd; public: - SrsSignalManager(SrsServer* server); + SrsSignalManager(SrsServer* s); virtual ~SrsSignalManager(); public: virtual int initialize(); diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index 832632b27..7080d0251 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -87,7 +87,7 @@ void SrsCoroutine::stop() void* res = NULL; int ret = st_thread_join(trd, &res); - srs_trace("Thread.stop: Terminated, ret=%d, err=%d", ret, err); + srs_info("Thread.stop: Terminated, ret=%d, err=%d", ret, err); srs_assert(!ret); // Always override the error by the worker. @@ -107,7 +107,7 @@ void SrsCoroutine::interrupt() } interrupted = true; - srs_trace("Thread.interrupt: Interrupt thread, err=%d", err); + srs_info("Thread.interrupt: Interrupt thread, err=%d", err); err = (err == ERROR_SUCCESS? ERROR_THREAD_INTERRUPED:err); st_thread_interrupt(trd); } @@ -127,10 +127,10 @@ int SrsCoroutine::cycle() if (!context && _srs_context) { context = _srs_context->generate_id(); } - srs_trace("Thread.cycle: Start with cid=%d, err=%d", context, err); + srs_info("Thread.cycle: Start with cid=%d, err=%d", context, err); int ret = handler->cycle(); - srs_trace("Thread.cycle: Finished with ret=%d, err=%d", ret, err); + srs_info("Thread.cycle: Finished with ret=%d, err=%d", ret, err); return ret; } diff --git a/trunk/src/app/srs_app_thread.cpp b/trunk/src/app/srs_app_thread.cpp index c9866357c..f590b07fb 100755 --- a/trunk/src/app/srs_app_thread.cpp +++ b/trunk/src/app/srs_app_thread.cpp @@ -26,74 +26,6 @@ #include #include -ISrsEndlessThreadHandler::ISrsEndlessThreadHandler() -{ -} - -ISrsEndlessThreadHandler::~ISrsEndlessThreadHandler() -{ -} - -void ISrsEndlessThreadHandler::on_thread_start() -{ -} - -int ISrsEndlessThreadHandler::on_before_cycle() -{ - return ERROR_SUCCESS; -} - -int ISrsEndlessThreadHandler::on_end_cycle() -{ - return ERROR_SUCCESS; -} - -void ISrsEndlessThreadHandler::on_thread_stop() -{ -} - -SrsEndlessThread::SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h) -{ - handler = h; - pthread = new internal::SrsThread(n, this, 0, false); -} - -SrsEndlessThread::~SrsEndlessThread() -{ - pthread->stop(); - srs_freep(pthread); -} - -int SrsEndlessThread::start() -{ - return pthread->start(); -} - -int SrsEndlessThread::cycle() -{ - return handler->cycle(); -} - -void SrsEndlessThread::on_thread_start() -{ - handler->on_thread_start(); -} - -int SrsEndlessThread::on_before_cycle() -{ - return handler->on_before_cycle(); -} - -int SrsEndlessThread::on_end_cycle() -{ - return handler->on_end_cycle(); -} - -void SrsEndlessThread::on_thread_stop() -{ - handler->on_thread_stop(); -} - ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler() { } diff --git a/trunk/src/app/srs_app_thread.hpp b/trunk/src/app/srs_app_thread.hpp index 2f06253cd..bd7664b59 100644 --- a/trunk/src/app/srs_app_thread.hpp +++ b/trunk/src/app/srs_app_thread.hpp @@ -28,63 +28,6 @@ #include -/** - * the endless thread is a loop thread never quit. - * user can create thread always running util server terminate. - * the step to create a thread never stop: - * 1. create SrsEndlessThread field. - * for example: - * class SrsBufferCache : public ISrsEndlessThreadHandler { - * public: SrsBufferCache() { pthread = new SrsEndlessThread("http-stream", this); } - * public: virtual int cycle() { - * // do some work never end. - * } - * } - * @remark user must use block method in cycle method, for example, sleep or socket io. - */ -class ISrsEndlessThreadHandler -{ -public: - ISrsEndlessThreadHandler(); - virtual ~ISrsEndlessThreadHandler(); -public: - /** - * the cycle method for the common thread. - * @remark user must use block method in cycle method, for example, sleep or socket io. - */ - virtual int cycle() = 0; -public: - /** - * other callback for handler. - * @remark all callback is optional, handler can ignore it. - */ - virtual void on_thread_start(); - virtual int on_before_cycle(); - virtual int on_end_cycle(); - virtual void on_thread_stop(); -}; -class SrsEndlessThread : public internal::ISrsThreadHandler -{ -private: - internal::SrsThread* pthread; - ISrsEndlessThreadHandler* handler; -public: - SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h); - virtual ~SrsEndlessThread(); -public: - /** - * for the endless thread, never quit. - */ - virtual int start(); -// interface internal::ISrsThreadHandler -public: - virtual int cycle(); - virtual void on_thread_start(); - virtual int on_before_cycle(); - virtual int on_end_cycle(); - virtual void on_thread_stop(); -}; - /** * the one cycle thread is a thread do the cycle only one time, * that is, the thread will quit when return from the cycle.