1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

For #906, #902, replace the endless thread with coroutine

This commit is contained in:
winlin 2017-05-29 17:56:26 +08:00
parent fc380fe48d
commit 44f542f77f
7 changed files with 28 additions and 151 deletions

View file

@ -60,7 +60,7 @@ SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r)
req = r->copy(); req = r->copy();
source = s; source = s;
queue = new SrsMessageQueue(true); queue = new SrsMessageQueue(true);
pthread = new SrsEndlessThread("http-stream", this); trd = new SrsCoroutine("http-stream", this);
// TODO: FIXME: support reload. // TODO: FIXME: support reload.
fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost);
@ -68,7 +68,7 @@ SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r)
SrsBufferCache::~SrsBufferCache() SrsBufferCache::~SrsBufferCache()
{ {
srs_freep(pthread); srs_freep(trd);
srs_freep(queue); srs_freep(queue);
srs_freep(req); srs_freep(req);
@ -87,7 +87,7 @@ int SrsBufferCache::update(SrsSource* s, SrsRequest* r)
int SrsBufferCache::start() int SrsBufferCache::start()
{ {
return pthread->start(); return trd->start();
} }
int SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) int SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter)
@ -138,7 +138,7 @@ int SrsBufferCache::cycle()
// TODO: FIXME: support reload. // TODO: FIXME: support reload.
queue->set_queue_size(fast_cache); queue->set_queue_size(fast_cache);
while (true) { while (!trd->pull()) {
pprint->elapse(); pprint->elapse();
// get messages from consumer. // get messages from consumer.

View file

@ -38,7 +38,7 @@ class SrsTsTransmuxer;
* for example, the audio stream cache to make android(weixin) happy. * for example, the audio stream cache to make android(weixin) happy.
* we start a thread to shrink the queue. * we start a thread to shrink the queue.
*/ */
class SrsBufferCache : public ISrsEndlessThreadHandler class SrsBufferCache : public ISrsCoroutineHandler
{ {
private: private:
double fast_cache; double fast_cache;
@ -46,7 +46,7 @@ private:
SrsMessageQueue* queue; SrsMessageQueue* queue;
SrsSource* source; SrsSource* source;
SrsRequest* req; SrsRequest* req;
SrsEndlessThread* pthread; SrsCoroutine* trd;
public: public:
SrsBufferCache(SrsSource* s, SrsRequest* r); SrsBufferCache(SrsSource* s, SrsRequest* r);
virtual ~SrsBufferCache(); virtual ~SrsBufferCache();

View file

@ -355,13 +355,13 @@ SrsUdpCasterListener::~SrsUdpCasterListener()
SrsSignalManager* SrsSignalManager::instance = NULL; SrsSignalManager* SrsSignalManager::instance = NULL;
SrsSignalManager::SrsSignalManager(SrsServer* server) SrsSignalManager::SrsSignalManager(SrsServer* s)
{ {
SrsSignalManager::instance = this; SrsSignalManager::instance = this;
_server = server; server = s;
sig_pipe[0] = sig_pipe[1] = -1; sig_pipe[0] = sig_pipe[1] = -1;
pthread = new SrsEndlessThread("signal", this); trd = new SrsCoroutine("signal", this);
signal_read_stfd = NULL; signal_read_stfd = NULL;
} }
@ -376,7 +376,7 @@ SrsSignalManager::~SrsSignalManager()
::close(sig_pipe[1]); ::close(sig_pipe[1]);
} }
srs_freep(pthread); srs_freep(trd);
} }
int SrsSignalManager::initialize() int SrsSignalManager::initialize()
@ -432,20 +432,22 @@ int SrsSignalManager::start()
srs_trace("signal installed, reload=%d, reopen=%d, grace_quit=%d", srs_trace("signal installed, reload=%d, reopen=%d, grace_quit=%d",
SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_GRACEFULLY_QUIT); SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_GRACEFULLY_QUIT);
return pthread->start(); return trd->start();
} }
int SrsSignalManager::cycle() int SrsSignalManager::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
int signo; while (!trd->pull()) {
int signo;
/* Read the next signal from the pipe */
st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT); /* Read the next signal from the pipe */
st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT);
/* Process signal synchronously */
_server->on_signal(signo); /* Process signal synchronously */
server->on_signal(signo);
}
return ret; return ret;
} }

View file

@ -178,7 +178,7 @@ public:
* convert signal to io, * convert signal to io,
* @see: st-1.9/docs/notes.html * @see: st-1.9/docs/notes.html
*/ */
class SrsSignalManager : public ISrsEndlessThreadHandler class SrsSignalManager : public ISrsCoroutineHandler
{ {
private: private:
/* Per-process pipe which is used as a signal queue. */ /* Per-process pipe which is used as a signal queue. */
@ -186,10 +186,10 @@ private:
int sig_pipe[2]; int sig_pipe[2];
st_netfd_t signal_read_stfd; st_netfd_t signal_read_stfd;
private: private:
SrsServer* _server; SrsServer* server;
SrsEndlessThread* pthread; SrsCoroutine* trd;
public: public:
SrsSignalManager(SrsServer* server); SrsSignalManager(SrsServer* s);
virtual ~SrsSignalManager(); virtual ~SrsSignalManager();
public: public:
virtual int initialize(); virtual int initialize();

View file

@ -87,7 +87,7 @@ void SrsCoroutine::stop()
void* res = NULL; void* res = NULL;
int ret = st_thread_join(trd, &res); int ret = st_thread_join(trd, &res);
srs_trace("Thread.stop: Terminated, ret=%d, err=%d", ret, err); srs_info("Thread.stop: Terminated, ret=%d, err=%d", ret, err);
srs_assert(!ret); srs_assert(!ret);
// Always override the error by the worker. // Always override the error by the worker.
@ -107,7 +107,7 @@ void SrsCoroutine::interrupt()
} }
interrupted = true; interrupted = true;
srs_trace("Thread.interrupt: Interrupt thread, err=%d", err); srs_info("Thread.interrupt: Interrupt thread, err=%d", err);
err = (err == ERROR_SUCCESS? ERROR_THREAD_INTERRUPED:err); err = (err == ERROR_SUCCESS? ERROR_THREAD_INTERRUPED:err);
st_thread_interrupt(trd); st_thread_interrupt(trd);
} }
@ -127,10 +127,10 @@ int SrsCoroutine::cycle()
if (!context && _srs_context) { if (!context && _srs_context) {
context = _srs_context->generate_id(); context = _srs_context->generate_id();
} }
srs_trace("Thread.cycle: Start with cid=%d, err=%d", context, err); srs_info("Thread.cycle: Start with cid=%d, err=%d", context, err);
int ret = handler->cycle(); int ret = handler->cycle();
srs_trace("Thread.cycle: Finished with ret=%d, err=%d", ret, err); srs_info("Thread.cycle: Finished with ret=%d, err=%d", ret, err);
return ret; return ret;
} }

View file

@ -26,74 +26,6 @@
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
ISrsEndlessThreadHandler::ISrsEndlessThreadHandler()
{
}
ISrsEndlessThreadHandler::~ISrsEndlessThreadHandler()
{
}
void ISrsEndlessThreadHandler::on_thread_start()
{
}
int ISrsEndlessThreadHandler::on_before_cycle()
{
return ERROR_SUCCESS;
}
int ISrsEndlessThreadHandler::on_end_cycle()
{
return ERROR_SUCCESS;
}
void ISrsEndlessThreadHandler::on_thread_stop()
{
}
SrsEndlessThread::SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h)
{
handler = h;
pthread = new internal::SrsThread(n, this, 0, false);
}
SrsEndlessThread::~SrsEndlessThread()
{
pthread->stop();
srs_freep(pthread);
}
int SrsEndlessThread::start()
{
return pthread->start();
}
int SrsEndlessThread::cycle()
{
return handler->cycle();
}
void SrsEndlessThread::on_thread_start()
{
handler->on_thread_start();
}
int SrsEndlessThread::on_before_cycle()
{
return handler->on_before_cycle();
}
int SrsEndlessThread::on_end_cycle()
{
return handler->on_end_cycle();
}
void SrsEndlessThread::on_thread_stop()
{
handler->on_thread_stop();
}
ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler() ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler()
{ {
} }

View file

@ -28,63 +28,6 @@
#include <srs_app_st.hpp> #include <srs_app_st.hpp>
/**
* the endless thread is a loop thread never quit.
* user can create thread always running util server terminate.
* the step to create a thread never stop:
* 1. create SrsEndlessThread field.
* for example:
* class SrsBufferCache : public ISrsEndlessThreadHandler {
* public: SrsBufferCache() { pthread = new SrsEndlessThread("http-stream", this); }
* public: virtual int cycle() {
* // do some work never end.
* }
* }
* @remark user must use block method in cycle method, for example, sleep or socket io.
*/
class ISrsEndlessThreadHandler
{
public:
ISrsEndlessThreadHandler();
virtual ~ISrsEndlessThreadHandler();
public:
/**
* the cycle method for the common thread.
* @remark user must use block method in cycle method, for example, sleep or socket io.
*/
virtual int cycle() = 0;
public:
/**
* other callback for handler.
* @remark all callback is optional, handler can ignore it.
*/
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
};
class SrsEndlessThread : public internal::ISrsThreadHandler
{
private:
internal::SrsThread* pthread;
ISrsEndlessThreadHandler* handler;
public:
SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h);
virtual ~SrsEndlessThread();
public:
/**
* for the endless thread, never quit.
*/
virtual int start();
// interface internal::ISrsThreadHandler
public:
virtual int cycle();
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
};
/** /**
* the one cycle thread is a thread do the cycle only one time, * the one cycle thread is a thread do the cycle only one time,
* that is, the thread will quit when return from the cycle. * that is, the thread will quit when return from the cycle.