1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 03:41:55 +00:00

refine the thread to three category.

This commit is contained in:
winlin 2015-05-23 09:20:16 +08:00
parent 2f0ef87d6d
commit e5f449ce36
25 changed files with 648 additions and 416 deletions

View file

@ -41,12 +41,11 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask()
SrsAsyncCallWorker::SrsAsyncCallWorker()
{
pthread = new SrsThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US, true);
pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US);
}
SrsAsyncCallWorker::~SrsAsyncCallWorker()
{
stop();
srs_freep(pthread);
std::vector<ISrsAsyncCallTask*>::iterator it;

View file

@ -57,10 +57,10 @@ public:
* when worker call with the task, the worker will do it in isolate thread.
* that is, the task is execute/call in async mode.
*/
class SrsAsyncCallWorker : public ISrsThreadHandler
class SrsAsyncCallWorker : public ISrsReusableThreadHandler
{
private:
SrsThread* pthread;
SrsReusableThread* pthread;
std::vector<ISrsAsyncCallTask*> tasks;
public:
SrsAsyncCallWorker();
@ -70,6 +70,8 @@ public:
public:
virtual int start();
virtual void stop();
// interface ISrsReusableThreadHandler
public:
virtual int cycle();
};

View file

@ -45,12 +45,17 @@ SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
// so we never use joinable.
// TODO: FIXME: maybe other thread need to stop it.
// @see: https://github.com/simple-rtmp-server/srs/issues/78
pthread = new SrsThread("conn", this, 0, false);
pthread = new SrsOneCycleThread("conn", this);
}
SrsConnection::~SrsConnection()
{
stop();
/**
* when delete the connection, stop the connection,
* close the underlayer socket, delete the thread.
*/
srs_close_stfd(stfd);
srs_freep(pthread);
}
int SrsConnection::start()
@ -83,9 +88,6 @@ int SrsConnection::cycle()
if (ret == ERROR_SOCKET_CLOSED) {
srs_warn("client disconnect peer. ret=%d", ret);
}
// set loop to stop to quit.
pthread->stop_loop();
return ERROR_SUCCESS;
}
@ -101,10 +103,4 @@ int SrsConnection::srs_id()
return id;
}
void SrsConnection::stop()
{
srs_close_stfd(stfd);
srs_freep(pthread);
}

View file

@ -58,14 +58,14 @@ public:
* all connections accept from listener must extends from this base class,
* server will add the connection to manager, and delete it when remove.
*/
class SrsConnection : public virtual ISrsThreadHandler, public virtual IKbpsDelta
class SrsConnection : public virtual ISrsOneCycleThreadHandler, public virtual IKbpsDelta
{
private:
/**
* each connection start a green thread,
* when thread stop, the connection will be delete by server.
*/
SrsThread* pthread;
SrsOneCycleThread* pthread;
/**
* the id of connection.
*/
@ -97,6 +97,8 @@ public:
* to remove the client by server->remove(this).
*/
virtual int start();
// interface ISrsOneCycleThreadHandler
public:
/**
* the thread cycle function,
* when serve connection completed, terminate the loop which will terminate the thread,
@ -119,12 +121,6 @@ protected:
* for concrete connection to do the cycle.
*/
virtual int do_cycle() = 0;
private:
/**
* when delete the connection, stop the connection,
* close the underlayer socket, delete the thread.
*/
virtual void stop();
};
#endif

View file

@ -70,7 +70,7 @@ SrsEdgeIngester::SrsEdgeIngester()
origin_index = 0;
stream_id = 0;
stfd = NULL;
pthread = new SrsThread("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US, true);
pthread = new SrsReusableThread("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US);
}
SrsEdgeIngester::~SrsEdgeIngester()
@ -171,7 +171,7 @@ int SrsEdgeIngester::ingest()
SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
while (pthread->can_loop()) {
while (!pthread->interrupted()) {
pprint->elapse();
// pithy print
@ -397,7 +397,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
origin_index = 0;
stream_id = 0;
stfd = NULL;
pthread = new SrsThread("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US, true);
pthread = new SrsReusableThread("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US);
queue = new SrsMessageQueue();
send_error_code = ERROR_SUCCESS;
}
@ -489,7 +489,7 @@ int SrsEdgeForwarder::cycle()
SrsMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
while (pthread->can_loop()) {
while (!pthread->interrupted()) {
if (send_error_code != ERROR_SUCCESS) {
st_usleep(SRS_EDGE_FORWARDER_ERROR_US);
continue;

View file

@ -75,7 +75,7 @@ enum SrsEdgeUserState
/**
* edge used to ingest stream from origin.
*/
class SrsEdgeIngester : public ISrsThreadHandler
class SrsEdgeIngester : public ISrsReusableThreadHandler
{
private:
int stream_id;
@ -83,7 +83,7 @@ private:
SrsSource* _source;
SrsPlayEdge* _edge;
SrsRequest* _req;
SrsThread* pthread;
SrsReusableThread* pthread;
st_netfd_t stfd;
ISrsProtocolReaderWriter* io;
SrsKbps* kbps;
@ -96,7 +96,7 @@ public:
virtual int initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req);
virtual int start();
virtual void stop();
// interface ISrsThreadHandler
// interface ISrsReusableThreadHandler
public:
virtual int cycle();
private:
@ -110,7 +110,7 @@ private:
/**
* edge used to forward stream to origin.
*/
class SrsEdgeForwarder : public ISrsThreadHandler
class SrsEdgeForwarder : public ISrsReusableThreadHandler
{
private:
int stream_id;
@ -118,7 +118,7 @@ private:
SrsSource* _source;
SrsPublishEdge* _edge;
SrsRequest* _req;
SrsThread* pthread;
SrsReusableThread* pthread;
st_netfd_t stfd;
ISrsProtocolReaderWriter* io;
SrsKbps* kbps;
@ -144,7 +144,7 @@ public:
virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req);
virtual int start();
virtual void stop();
// interface ISrsThreadHandler
// interface ISrsReusableThreadHandler
public:
virtual int cycle();
public:

View file

@ -44,7 +44,7 @@ static std::vector<std::string> _transcoded_url;
SrsEncoder::SrsEncoder()
{
pthread = new SrsThread("encoder", this, SRS_RTMP_ENCODER_SLEEP_US, true);
pthread = new SrsReusableThread("encoder", this, SRS_RTMP_ENCODER_SLEEP_US);
pprint = SrsPithyPrint::create_encoder();
}

View file

@ -45,13 +45,13 @@ class SrsFFMPEG;
* the encoder for a stream,
* may use multiple ffmpegs to transcode the specified stream.
*/
class SrsEncoder : public ISrsThreadHandler
class SrsEncoder : public ISrsReusableThreadHandler
{
private:
std::string input_stream_name;
std::vector<SrsFFMPEG*> ffmpegs;
private:
SrsThread* pthread;
SrsReusableThread* pthread;
SrsPithyPrint* pprint;
public:
SrsEncoder();
@ -59,7 +59,7 @@ public:
public:
virtual int on_publish(SrsRequest* req);
virtual void on_unpublish();
// interface ISrsThreadHandler.
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
virtual void on_thread_stop();

View file

@ -59,7 +59,7 @@ SrsForwarder::SrsForwarder(SrsSource* _source)
kbps = new SrsKbps();
stream_id = 0;
pthread = new SrsThread("forward", this, SRS_FORWARDER_SLEEP_US, true);
pthread = new SrsReusableThread("forward", this, SRS_FORWARDER_SLEEP_US);
queue = new SrsMessageQueue();
jitter = new SrsRtmpJitter();
@ -407,7 +407,7 @@ int SrsForwarder::forward()
}
}
while (pthread->can_loop()) {
while (!pthread->interrupted()) {
pprint->elapse();
// read from client.

View file

@ -48,7 +48,7 @@ class SrsKbps;
* forward the stream to other servers.
*/
// TODO: FIXME: refine the error log, comments it.
class SrsForwarder : public ISrsThreadHandler
class SrsForwarder : public ISrsReusableThreadHandler
{
private:
// the ep to forward, server[:port].
@ -57,7 +57,7 @@ private:
int stream_id;
private:
st_netfd_t stfd;
SrsThread* pthread;
SrsReusableThread* pthread;
private:
SrsSource* source;
ISrsProtocolReaderWriter* io;
@ -95,7 +95,7 @@ public:
* @param shared_video, directly ptr, copy it if need to save it.
*/
virtual int on_video(SrsSharedPtrMessage* shared_video);
// interface ISrsThreadHandler.
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
private:

View file

@ -1157,12 +1157,11 @@ SrsStreamCache::SrsStreamCache(SrsSource* s, SrsRequest* r)
req = r->copy();
source = s;
queue = new SrsMessageQueue(true);
pthread = new SrsThread("http-stream", this, 0, false);
pthread = new SrsEndlessThread("http-stream", this);
}
SrsStreamCache::~SrsStreamCache()
{
pthread->stop();
srs_freep(pthread);
srs_freep(queue);

View file

@ -386,20 +386,20 @@ protected:
* for example, the audio stream cache to make android(weixin) happy.
* we start a thread to shrink the queue.
*/
class SrsStreamCache : public ISrsThreadHandler
class SrsStreamCache : public ISrsEndlessThreadHandler
{
private:
SrsMessageQueue* queue;
SrsSource* source;
SrsRequest* req;
SrsThread* pthread;
SrsEndlessThread* pthread;
public:
SrsStreamCache(SrsSource* s, SrsRequest* r);
virtual ~SrsStreamCache();
public:
virtual int start();
virtual int dump_cache(SrsConsumer* consumer);
// interface ISrsThreadHandler.
// interface ISrsEndlessThreadHandler.
public:
virtual int cycle();
};
@ -669,7 +669,7 @@ public:
virtual int hls_update_m3u8(SrsRequest* r, std::string m3u8);
virtual int hls_update_ts(SrsRequest* r, std::string uri, std::string ts);
virtual void unmount_hls(SrsRequest* r);
// interface ISrsThreadHandler.
// interface ISrsReloadHandler.
public:
virtual int on_reload_vhost_http_updated();
virtual int on_reload_vhost_http_remux_updated();

View file

@ -55,7 +55,7 @@ SrsIngester::SrsIngester()
{
_srs_config->subscribe(this);
pthread = new SrsThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US, true);
pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US);
pprint = SrsPithyPrint::create_ingester();
}

View file

@ -59,12 +59,12 @@ public:
* encode with FFMPEG(optional),
* push to SRS(or any RTMP server) over RTMP.
*/
class SrsIngester : public ISrsThreadHandler, public ISrsReloadHandler
class SrsIngester : public ISrsReusableThreadHandler, public ISrsReloadHandler
{
private:
std::vector<SrsIngesterFFMPEG*> ingesters;
private:
SrsThread* pthread;
SrsReusableThread* pthread;
SrsPithyPrint* pprint;
public:
SrsIngester();
@ -72,7 +72,7 @@ public:
public:
virtual int start();
virtual void stop();
// interface ISrsThreadHandler.
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
virtual void on_thread_stop();

View file

@ -79,7 +79,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];
pthread = new SrsThread("udp", this, 0, true);
pthread = new SrsReusableThread("udp", this);
}
SrsUdpListener::~SrsUdpListener()
@ -157,7 +157,7 @@ int SrsUdpListener::cycle()
{
int ret = ERROR_SUCCESS;
while (pthread->can_loop()) {
while (!pthread->interrupted()) {
// TODO: FIXME: support ipv6, @see man 7 ipv6
sockaddr_in from;
int nb_from = sizeof(sockaddr_in);
@ -190,7 +190,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
_fd = -1;
_stfd = NULL;
pthread = new SrsThread("tcp", this, 0, true);
pthread = new SrsReusableThread("tcp", this);
}
SrsTcpListener::~SrsTcpListener()

View file

@ -82,12 +82,12 @@ public:
/**
* bind udp port, start thread to recv packet and handler it.
*/
class SrsUdpListener : public ISrsThreadHandler
class SrsUdpListener : public ISrsReusableThreadHandler
{
private:
int _fd;
st_netfd_t _stfd;
SrsThread* pthread;
SrsReusableThread* pthread;
private:
char* buf;
int nb_buf;
@ -103,7 +103,7 @@ public:
virtual st_netfd_t stfd();
public:
virtual int listen();
// interface ISrsThreadHandler.
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
};
@ -111,12 +111,12 @@ public:
/**
* bind and listen tcp port, use handler to process the client.
*/
class SrsTcpListener : public ISrsThreadHandler
class SrsTcpListener : public ISrsReusableThreadHandler
{
private:
int _fd;
st_netfd_t _stfd;
SrsThread* pthread;
SrsReusableThread* pthread;
private:
ISrsTcpHandler* handler;
std::string ip;
@ -128,7 +128,7 @@ public:
virtual int fd();
public:
virtual int listen();
// interface ISrsThreadHandler.
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
};

View file

@ -82,7 +82,7 @@ public:
virtual void trace(const char* tag, int context_id, const char* fmt, ...);
virtual void warn(const char* tag, int context_id, const char* fmt, ...);
virtual void error(const char* tag, int context_id, const char* fmt, ...);
// interface ISrsThreadHandler.
// interface ISrsReloadHandler.
public:
virtual int on_reload_log_tank();
virtual int on_reload_log_level();

View file

@ -50,7 +50,7 @@ SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtm
timeout = timeout_ms;
handler = msg_handler;
rtmp = rtmp_sdk;
trd = new SrsThread("recv", this, 0, true);
trd = new SrsReusableThread("recv", this);
}
SrsRecvThread::~SrsRecvThread()
@ -76,7 +76,7 @@ int SrsRecvThread::cycle()
{
int ret = ERROR_SUCCESS;
while (trd->can_loop()) {
while (!trd->interrupted()) {
if (!handler->can_handle()) {
st_usleep(timeout * 1000);
continue;
@ -96,7 +96,7 @@ int SrsRecvThread::cycle()
}
// we use no timeout to recv, should never got any error.
trd->stop_loop();
trd->interrupt();
// notice the handler got a recv error.
handler->on_recv_error(ret);
@ -111,7 +111,7 @@ int SrsRecvThread::cycle()
void SrsRecvThread::stop_loop()
{
trd->stop_loop();
trd->interrupt();
}
void SrsRecvThread::on_thread_start()

View file

@ -79,10 +79,10 @@ public:
/**
* the recv thread, use message handler to handle each received message.
*/
class SrsRecvThread : public ISrsThreadHandler
class SrsRecvThread : public ISrsReusableThreadHandler
{
protected:
SrsThread* trd;
SrsReusableThread* trd;
ISrsMessageHandler* handler;
SrsRtmpServer* rtmp;
int timeout;

View file

@ -192,7 +192,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
stfd = fd;
skt = new SrsStSocket(fd);
rtsp = new SrsRtspStack(skt);
trd = new SrsThread("rtsp", this, 0, false);
trd = new SrsOneCycleThread("rtsp", this);
req = NULL;
io = NULL;
@ -210,7 +210,6 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
SrsRtspConn::~SrsRtspConn()
{
srs_close_stfd(stfd);
trd->stop();
srs_freep(video_rtp);
srs_freep(audio_rtp);
@ -219,7 +218,9 @@ SrsRtspConn::~SrsRtspConn()
srs_freep(skt);
srs_freep(rtsp);
close();
srs_freep(client);
srs_freep(io);
srs_freep(req);
srs_freep(vjitter);
srs_freep(ajitter);
@ -412,9 +413,6 @@ int SrsRtspConn::cycle()
srs_warn("client disconnect peer. ret=%d", ret);
}
// terminate thread in the thread cycle itself.
trd->stop_loop();
return ERROR_SUCCESS;
}
@ -763,14 +761,6 @@ int SrsRtspConn::connect_app(string ep_server, string ep_port)
return ret;
}
void SrsRtspConn::close()
{
srs_freep(client);
srs_freep(io);
srs_freep(req);
srs_close_stfd(stfd);
}
SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c)
{
// TODO: FIXME: support reload.

View file

@ -113,7 +113,7 @@ public:
/**
* the rtsp connection serve the fd.
*/
class SrsRtspConn : public ISrsThreadHandler
class SrsRtspConn : public ISrsOneCycleThreadHandler
{
private:
std::string output_template;
@ -136,7 +136,7 @@ private:
SrsStSocket* skt;
SrsRtspStack* rtsp;
SrsRtspCaster* caster;
SrsThread* trd;
SrsOneCycleThread* trd;
private:
SrsRequest* req;
SrsStSocket* io;
@ -163,7 +163,7 @@ private:
// internal methods
public:
virtual int on_rtp_packet(SrsRtpPacket* pkt, int stream_id);
// interface ISrsThreadHandler
// interface ISrsOneCycleThreadHandler
public:
virtual int cycle();
virtual void on_thread_stop();
@ -182,8 +182,6 @@ private:
// @remark ignore when not connected, reconnect when disconnected.
virtual int connect();
virtual int connect_app(std::string ep_server, std::string ep_port);
// close the connected io and rtmp to ready to be re-connect.
virtual void close();
};
/**

View file

@ -367,13 +367,12 @@ SrsSignalManager::SrsSignalManager(SrsServer* server)
_server = server;
sig_pipe[0] = sig_pipe[1] = -1;
pthread = new SrsThread("signal", this, 0, true);
pthread = new SrsEndlessThread("signal", this);
signal_read_stfd = NULL;
}
SrsSignalManager::~SrsSignalManager()
{
pthread->stop();
srs_freep(pthread);
srs_close_stfd(signal_read_stfd);

View file

@ -179,7 +179,7 @@ public:
* convert signal to io,
* @see: st-1.9/docs/notes.html
*/
class SrsSignalManager : public ISrsThreadHandler
class SrsSignalManager : public ISrsEndlessThreadHandler
{
private:
/* Per-process pipe which is used as a signal queue. */
@ -188,14 +188,14 @@ private:
st_netfd_t signal_read_stfd;
private:
SrsServer* _server;
SrsThread* pthread;
SrsEndlessThread* pthread;
public:
SrsSignalManager(SrsServer* server);
virtual ~SrsSignalManager();
public:
virtual int initialize();
virtual int start();
// interface ISrsThreadHandler.
// interface ISrsEndlessThreadHandler.
public:
virtual int cycle();
private:

View file

@ -26,206 +26,337 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
ISrsThreadHandler::ISrsThreadHandler()
{
}
ISrsThreadHandler::~ISrsThreadHandler()
{
}
void ISrsThreadHandler::on_thread_start()
{
}
int ISrsThreadHandler::on_before_cycle()
{
int ret = ERROR_SUCCESS;
return ret;
}
int ISrsThreadHandler::on_end_cycle()
{
int ret = ERROR_SUCCESS;
return ret;
}
void ISrsThreadHandler::on_thread_stop()
{
}
SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)
{
_name = name;
handler = thread_handler;
cycle_interval_us = interval_us;
namespace internal {
ISrsThreadHandler::ISrsThreadHandler()
{
}
tid = NULL;
loop = false;
really_terminated = true;
_cid = -1;
_joinable = joinable;
ISrsThreadHandler::~ISrsThreadHandler()
{
}
// in start(), the thread cycle method maybe stop and remove the thread itself,
// and the thread start() is waiting for the _cid, and segment fault then.
// @see https://github.com/simple-rtmp-server/srs/issues/110
// thread will set _cid, callback on_thread_start(), then wait for the can_run signal.
can_run = false;
}
SrsThread::~SrsThread()
{
stop();
}
int SrsThread::cid()
{
return _cid;
}
int SrsThread::start()
{
int ret = ERROR_SUCCESS;
void ISrsThreadHandler::on_thread_start()
{
}
if(tid) {
srs_info("thread %s already running.", _name);
int ISrsThreadHandler::on_before_cycle()
{
int ret = ERROR_SUCCESS;
return ret;
}
if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("st_thread_create failed. ret=%d", ret);
int ISrsThreadHandler::on_end_cycle()
{
int ret = ERROR_SUCCESS;
return ret;
}
// we set to loop to true for thread to run.
loop = true;
// wait for cid to ready, for parent thread to get the cid.
while (_cid < 0 && loop) {
st_usleep(10 * 1000);
void ISrsThreadHandler::on_thread_stop()
{
}
// now, cycle thread can run.
can_run = true;
return ret;
}
void SrsThread::stop()
{
if (tid) {
loop = false;
// the interrupt will cause the socket to read/write error,
// which will terminate the cycle thread.
st_thread_interrupt(tid);
// when joinable, wait util quit.
if (_joinable) {
// wait the thread to exit.
int ret = st_thread_join(tid, NULL);
if (ret) {
srs_warn("core: ignore join thread failed.");
}
// wait the thread actually terminated.
// sometimes the thread join return -1, for example,
// when thread use st_recvfrom, the thread join return -1.
// so here, we use a variable to ensure the thread stopped.
while (!really_terminated) {
st_usleep(10 * 1000);
if (really_terminated) {
break;
}
srs_warn("core: wait thread to actually terminated");
}
}
SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)
{
_name = name;
handler = thread_handler;
cycle_interval_us = interval_us;
tid = NULL;
}
}
bool SrsThread::can_loop()
{
return loop;
}
void SrsThread::stop_loop()
{
loop = false;
}
void SrsThread::thread_cycle()
{
int ret = ERROR_SUCCESS;
_srs_context->generate_id();
srs_info("thread %s cycle start", _name);
_cid = _srs_context->get_id();
srs_assert(handler);
handler->on_thread_start();
// thread is running now.
really_terminated = false;
// wait for cid to ready, for parent thread to get the cid.
while (!can_run && loop) {
st_usleep(10 * 1000);
}
while (loop) {
if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
goto failed;
}
srs_info("thread %s on before cycle success");
loop = false;
really_terminated = true;
_cid = -1;
_joinable = joinable;
if ((ret = handler->cycle()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
// in start(), the thread cycle method maybe stop and remove the thread itself,
// and the thread start() is waiting for the _cid, and segment fault then.
// @see https://github.com/simple-rtmp-server/srs/issues/110
// thread will set _cid, callback on_thread_start(), then wait for the can_run signal.
can_run = false;
}
SrsThread::~SrsThread()
{
stop();
}
int SrsThread::cid()
{
return _cid;
}
int SrsThread::start()
{
int ret = ERROR_SUCCESS;
if(tid) {
srs_info("thread %s already running.", _name);
return ret;
}
if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
// we set to loop to true for thread to run.
loop = true;
// wait for cid to ready, for parent thread to get the cid.
while (_cid < 0 && loop) {
st_usleep(10 * 1000);
}
// now, cycle thread can run.
can_run = true;
return ret;
}
void SrsThread::stop()
{
if (tid) {
loop = false;
// the interrupt will cause the socket to read/write error,
// which will terminate the cycle thread.
st_thread_interrupt(tid);
// when joinable, wait util quit.
if (_joinable) {
// wait the thread to exit.
int ret = st_thread_join(tid, NULL);
if (ret) {
srs_warn("core: ignore join thread failed.");
}
// wait the thread actually terminated.
// sometimes the thread join return -1, for example,
// when thread use st_recvfrom, the thread join return -1.
// so here, we use a variable to ensure the thread stopped.
while (!really_terminated) {
st_usleep(10 * 1000);
if (really_terminated) {
break;
}
srs_warn("core: wait thread to actually terminated");
}
}
goto failed;
}
srs_info("thread %s cycle success", _name);
if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);
goto failed;
}
srs_info("thread %s on end cycle success", _name);
failed:
if (!loop) {
break;
}
// to improve performance, donot sleep when interval is zero.
// @see: https://github.com/simple-rtmp-server/srs/issues/237
if (cycle_interval_us != 0) {
st_usleep(cycle_interval_us);
tid = NULL;
}
}
// readly terminated now.
really_terminated = true;
bool SrsThread::can_loop()
{
return loop;
}
handler->on_thread_stop();
srs_info("thread %s cycle finished", _name);
void SrsThread::stop_loop()
{
loop = false;
}
void SrsThread::thread_cycle()
{
int ret = ERROR_SUCCESS;
_srs_context->generate_id();
srs_info("thread %s cycle start", _name);
_cid = _srs_context->get_id();
srs_assert(handler);
handler->on_thread_start();
// thread is running now.
really_terminated = false;
// wait for cid to ready, for parent thread to get the cid.
while (!can_run && loop) {
st_usleep(10 * 1000);
}
while (loop) {
if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
goto failed;
}
srs_info("thread %s on before cycle success");
if ((ret = handler->cycle()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
}
goto failed;
}
srs_info("thread %s cycle success", _name);
if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);
goto failed;
}
srs_info("thread %s on end cycle success", _name);
failed:
if (!loop) {
break;
}
// to improve performance, donot sleep when interval is zero.
// @see: https://github.com/simple-rtmp-server/srs/issues/237
if (cycle_interval_us != 0) {
st_usleep(cycle_interval_us);
}
}
// readly terminated now.
really_terminated = true;
handler->on_thread_stop();
srs_info("thread %s cycle finished", _name);
}
void* SrsThread::thread_fun(void* arg)
{
SrsThread* obj = (SrsThread*)arg;
srs_assert(obj);
obj->thread_cycle();
st_thread_exit(NULL);
return NULL;
}
}
void* SrsThread::thread_fun(void* arg)
ISrsEndlessThreadHandler::ISrsEndlessThreadHandler()
{
SrsThread* obj = (SrsThread*)arg;
srs_assert(obj);
obj->thread_cycle();
st_thread_exit(NULL);
return NULL;
}
ISrsEndlessThreadHandler::~ISrsEndlessThreadHandler()
{
}
SrsEndlessThread::SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h)
{
handler = h;
pthread = new internal::SrsThread(n, this, 0, false);
}
SrsEndlessThread::~SrsEndlessThread()
{
pthread->stop();
srs_freep(pthread);
}
int SrsEndlessThread::start()
{
return pthread->start();
}
int SrsEndlessThread::cycle()
{
return handler->cycle();
}
ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler()
{
}
ISrsOneCycleThreadHandler::~ISrsOneCycleThreadHandler()
{
}
void ISrsOneCycleThreadHandler::on_thread_stop()
{
}
SrsOneCycleThread::SrsOneCycleThread(const char* n, ISrsOneCycleThreadHandler* h)
{
handler = h;
pthread = new internal::SrsThread(n, this, 0, false);
}
SrsOneCycleThread::~SrsOneCycleThread()
{
pthread->stop();
srs_freep(pthread);
}
int SrsOneCycleThread::start()
{
return pthread->start();
}
int SrsOneCycleThread::cycle()
{
int ret = handler->cycle();
pthread->stop_loop();
return ret;
}
void SrsOneCycleThread::on_thread_stop()
{
handler->on_thread_stop();
}
ISrsReusableThreadHandler::ISrsReusableThreadHandler()
{
}
ISrsReusableThreadHandler::~ISrsReusableThreadHandler()
{
}
void ISrsReusableThreadHandler::on_thread_stop()
{
}
SrsReusableThread::SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us)
{
handler = h;
pthread = new internal::SrsThread(n, this, interval_us, true);
}
SrsReusableThread::~SrsReusableThread()
{
pthread->stop();
srs_freep(pthread);
}
int SrsReusableThread::start()
{
return pthread->start();
}
void SrsReusableThread::stop()
{
pthread->stop();
}
int SrsReusableThread::cid()
{
return pthread->cid();
}
void SrsReusableThread::interrupt()
{
pthread->stop_loop();
}
bool SrsReusableThread::interrupted()
{
return !pthread->can_loop();
}
int SrsReusableThread::cycle()
{
return handler->cycle();
}
void SrsReusableThread::on_thread_stop()
{
handler->on_thread_stop();
}

View file

@ -31,34 +31,238 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_st.hpp>
// the internal classes, user should never use it.
// user should use the public classes at the bellow:
// @see SrsEndlessThread, SrsOneCycleThread, SrsReusableThread
namespace internal {
/**
* the handler for the thread, callback interface.
* the thread model defines as:
* handler->on_thread_start()
* while loop:
* handler->on_before_cycle()
* handler->cycle()
* handler->on_end_cycle()
* if !loop then break for user stop thread.
* sleep(CycleIntervalMilliseconds)
* handler->on_thread_stop()
* when stop, the thread will interrupt the st_thread,
* which will cause the socket to return error and
* terminate the cycle thread.
*
* @remark why should check can_loop() in cycle method?
* when thread interrupt, the socket maybe not got EINT,
* espectially on st_usleep(), so the cycle must check the loop,
* when handler->cycle() has loop itself, for example:
* while (true):
* if (read_from_socket(skt) < 0) break;
* if thread stop when read_from_socket, it's ok, the loop will break,
* but when thread stop interrupt the s_usleep(0), then the loop is
* death loop.
* in a word, the handler->cycle() must:
* while (pthread->can_loop()):
* if (read_from_socket(skt) < 0) break;
* check the loop, then it works.
*
* @remark why should use stop_loop() to terminate thread in itself?
* in the thread itself, that is the cycle method,
* if itself want to terminate the thread, should never use stop(),
* but use stop_loop() to set the loop to false and terminate normally.
*
* @remark when should set the interval_us, and when not?
* the cycle will invoke util cannot loop, eventhough the return code of cycle is error,
* so the interval_us used to sleep for each cycle.
*/
class ISrsThreadHandler
{
public:
ISrsThreadHandler();
virtual ~ISrsThreadHandler();
public:
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int cycle() = 0;
virtual int on_end_cycle();
virtual void on_thread_stop();
};
/**
* provides servies from st_thread_t,
* for common thread usage.
*/
class SrsThread
{
private:
st_thread_t tid;
int _cid;
bool loop;
bool can_run;
bool really_terminated;
bool _joinable;
const char* _name;
private:
ISrsThreadHandler* handler;
int64_t cycle_interval_us;
public:
/**
* initialize the thread.
* @param name, human readable name for st debug.
* @param thread_handler, the cycle handler for the thread.
* @param interval_us, the sleep interval when cycle finished.
* @param joinable, if joinable, other thread must stop the thread.
* @remark if joinable, thread never quit itself, or memory leak.
* @see: https://github.com/simple-rtmp-server/srs/issues/78
* @remark about st debug, see st-1.9/README, _st_iterate_threads_flag
*/
/**
* TODO: FIXME: maybe all thread must be reap by others threads,
* @see: https://github.com/simple-rtmp-server/srs/issues/77
*/
SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable);
virtual ~SrsThread();
public:
/**
* get the context id. @see: ISrsThreadContext.get_id().
* used for parent thread to get the id.
* @remark when start thread, parent thread will block and wait for this id ready.
*/
virtual int cid();
/**
* start the thread, invoke the cycle of handler util
* user stop the thread.
* @remark ignore any error of cycle of handler.
* @remark user can start multiple times, ignore if already started.
* @remark wait for the cid is set by thread pfn.
*/
virtual int start();
/**
* stop the thread, wait for the thread to terminate.
* @remark user can stop multiple times, ignore if already stopped.
*/
virtual void stop();
public:
/**
* whether the thread should loop,
* used for handler->cycle() which has a loop method,
* to check this method, break if false.
*/
virtual bool can_loop();
/**
* for the loop thread to stop the loop.
* other thread can directly use stop() to stop loop and wait for quit.
* this stop loop method only set loop to false.
*/
virtual void stop_loop();
private:
virtual void thread_cycle();
static void* thread_fun(void* arg);
};
}
/**
* the handler for the thread, callback interface.
* the thread model defines as:
* handler->on_thread_start()
* while loop:
* handler->on_before_cycle()
* handler->cycle()
* handler->on_end_cycle()
* if !loop then break for user stop thread.
* sleep(CycleIntervalMilliseconds)
* handler->on_thread_stop()
* when stop, the thread will interrupt the st_thread,
* which will cause the socket to return error and
* terminate the cycle thread.
*
* Usage 1: loop thread never quit.
* the endless thread is a loop thread never quit.
* user can create thread always running util server terminate.
* the step to create a thread never stop:
* 1. create SrsThread field, with joinable false.
* 1. create SrsEndlessThread field.
* for example:
* class SrsStreamCache : public ISrsThreadHandler {
* public: SrsStreamCache() { pthread = new SrsThread("http-stream", this, SRS_AUTO_STREAM_SLEEP_US, false); }
* class SrsStreamCache : public ISrsEndlessThreadHandler {
* public: SrsStreamCache() { pthread = new SrsEndlessThread("http-stream", this); }
* public: virtual int cycle() {
* // check status, start ffmpeg when stopped.
* // do some work never end.
* }
* }
*
* Usage 2: stop by other thread.
* @remark user must use block method in cycle method, for example, sleep or socket io.
*/
class ISrsEndlessThreadHandler
{
public:
ISrsEndlessThreadHandler();
virtual ~ISrsEndlessThreadHandler();
public:
/**
* the cycle method for the common thread.
* @remark user must use block method in cycle method, for example, sleep or socket io.
*/
virtual int cycle() = 0;
};
class SrsEndlessThread : public internal::ISrsThreadHandler
{
private:
internal::SrsThread* pthread;
ISrsEndlessThreadHandler* handler;
public:
SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h);
virtual ~SrsEndlessThread();
public:
/**
* for the endless thread, never quit.
*/
virtual int start();
// interface internal::ISrsThreadHandler
public:
virtual int cycle();
};
/**
* the one cycle thread is a thread do the cycle only one time,
* that is, the thread will quit when return from the cycle.
* user can create thread which stop itself,
* generally only need to provides a start method,
* the object will destroy itself then terminate the thread, @see SrsConnection
* 1. create SrsThread field
* 2. the thread quit when return from cycle.
* for example:
* class SrsConnection : public ISrsOneCycleThreadHandler {
* public: SrsConnection() { pthread = new SrsOneCycleThread("conn", this); }
* public: virtual int start() { return pthread->start(); }
* public: virtual int cycle() {
* // serve client.
* // set loop to stop to quit, stop thread itself.
* pthread->stop_loop();
* }
* public: virtual void on_thread_stop() {
* // remove the connection in thread itself.
* server->remove(this);
* }
* };
*/
class ISrsOneCycleThreadHandler
{
public:
ISrsOneCycleThreadHandler();
virtual ~ISrsOneCycleThreadHandler();
public:
/**
* the cycle method for the one cycle thread.
*/
virtual int cycle() = 0;
/**
* when thread stop, the handler can do cleanup.
* @remark this method is optional, handler can ignore it.
*/
virtual void on_thread_stop();
};
class SrsOneCycleThread : public internal::ISrsThreadHandler
{
private:
internal::SrsThread* pthread;
ISrsOneCycleThreadHandler* handler;
public:
SrsOneCycleThread(const char* n, ISrsOneCycleThreadHandler* h);
virtual ~SrsOneCycleThread();
public:
/**
* for the one cycle thread, quit when cycle return.
*/
virtual int start();
// interface internal::ISrsThreadHandler
public:
virtual int cycle();
virtual void on_thread_stop();
};
/**
* the reuse thread is a thread stop and start by other thread.
* user can create thread and stop then start again and again,
* generally must provides a start and stop method, @see SrsIngester.
* the step to create a thread stop by other thread:
@ -73,147 +277,65 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* // check status, start ffmpeg when stopped.
* }
* };
*
* Usage 3: stop by thread itself.
* user can create thread which stop itself,
* generally only need to provides a start method,
* the object will destroy itself then terminate the thread, @see SrsConnection
* 1. create SrsThread field, with joinable false.
* 2. owner stop thread loop, destroy itself when thread stop.
* for example:
* class SrsConnection : public ISrsThreadHandler {
* public: SrsConnection() { pthread = new SrsThread("conn", this, 0, false); }
* public: virtual int start() { return pthread->start(); }
* public: virtual int cycle() {
* // serve client.
* // set loop to stop to quit, stop thread itself.
* pthread->stop_loop();
* }
* public: virtual int on_thread_stop() {
* // remove the connection in thread itself.
* server->remove(this);
* }
* };
*
* Usage 4: loop in the cycle method.
* user can use loop code in the cycle method, @see SrsForwarder
* 1. create SrsThread field, with or without joinable is ok.
* 2. loop code in cycle method, check the can_loop() for thread to quit.
* for example:
* class SrsForwarder : public ISrsThreadHandler {
* public: virtual int cycle() {
* while (pthread->can_loop()) {
* // read msgs from queue and forward to server.
* }
* }
* };
*
* @remark why should check can_loop() in cycle method?
* when thread interrupt, the socket maybe not got EINT,
* espectially on st_usleep(), so the cycle must check the loop,
* when handler->cycle() has loop itself, for example:
* while (true):
* if (read_from_socket(skt) < 0) break;
* if thread stop when read_from_socket, it's ok, the loop will break,
* but when thread stop interrupt the s_usleep(0), then the loop is
* death loop.
* in a word, the handler->cycle() must:
* while (pthread->can_loop()):
* if (read_from_socket(skt) < 0) break;
* check the loop, then it works.
*
* @remark why should use stop_loop() to terminate thread in itself?
* in the thread itself, that is the cycle method,
* if itself want to terminate the thread, should never use stop(),
* but use stop_loop() to set the loop to false and terminate normally.
*
* @remark when should set the interval_us, and when not?
* the cycle will invoke util cannot loop, eventhough the return code of cycle is error,
* so the interval_us used to sleep for each cycle.
*/
class ISrsThreadHandler
class ISrsReusableThreadHandler
{
public:
ISrsThreadHandler();
virtual ~ISrsThreadHandler();
ISrsReusableThreadHandler();
virtual ~ISrsReusableThreadHandler();
public:
virtual void on_thread_start();
virtual int on_before_cycle();
/**
* the cycle method for the one cycle thread.
* @remark when the cycle has its inner loop, it must check whether
* the thread is interrupted.
*/
virtual int cycle() = 0;
virtual int on_end_cycle();
/**
* when thread stop, the handler can do cleanup.
* @remark this method is optional, handler can ignore it.
*/
virtual void on_thread_stop();
};
/**
* provides servies from st_thread_t,
* for common thread usage.
*/
class SrsThread
class SrsReusableThread : public internal::ISrsThreadHandler
{
private:
st_thread_t tid;
int _cid;
bool loop;
bool can_run;
bool really_terminated;
bool _joinable;
const char* _name;
private:
ISrsThreadHandler* handler;
int64_t cycle_interval_us;
internal::SrsThread* pthread;
ISrsReusableThreadHandler* handler;
public:
SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us = 0);
virtual ~SrsReusableThread();
public:
/**
* initialize the thread.
* @param name, human readable name for st debug.
* @param thread_handler, the cycle handler for the thread.
* @param interval_us, the sleep interval when cycle finished.
* @param joinable, if joinable, other thread must stop the thread.
* @remark if joinable, thread never quit itself, or memory leak.
* @see: https://github.com/simple-rtmp-server/srs/issues/78
* @remark about st debug, see st-1.9/README, _st_iterate_threads_flag
*/
/**
* TODO: FIXME: maybe all thread must be reap by others threads,
* @see: https://github.com/simple-rtmp-server/srs/issues/77
*/
SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable);
virtual ~SrsThread();
public:
/**
* get the context id. @see: ISrsThreadContext.get_id().
* used for parent thread to get the id.
* @remark when start thread, parent thread will block and wait for this id ready.
*/
virtual int cid();
/**
* start the thread, invoke the cycle of handler util
* user stop the thread.
* @remark ignore any error of cycle of handler.
* @remark user can start multiple times, ignore if already started.
* @remark wait for the cid is set by thread pfn.
*/
* for the reusable thread, start and stop by user.
*/
virtual int start();
/**
* stop the thread, wait for the thread to terminate.
* @remark user can stop multiple times, ignore if already stopped.
*/
* stop the thread, wait for the thread to terminate.
* @remark user can stop multiple times, ignore if already stopped.
*/
virtual void stop();
public:
/**
* whether the thread should loop,
* used for handler->cycle() which has a loop method,
* to check this method, break if false.
*/
virtual bool can_loop();
* get the context id. @see: ISrsThreadContext.get_id().
* used for parent thread to get the id.
* @remark when start thread, parent thread will block and wait for this id ready.
*/
virtual int cid();
/**
* for the loop thread to stop the loop.
* other thread can directly use stop() to stop loop and wait for quit.
* this stop loop method only set loop to false.
*/
virtual void stop_loop();
private:
virtual void thread_cycle();
static void* thread_fun(void* arg);
* interrupt the thread to stop loop.
* we only set the loop flag to false, not really interrupt the thread.
*/
virtual void interrupt();
/**
* whether the thread is interrupted,
* for the cycle has its loop, the inner loop should quit when thread
* is interrupted.
*/
virtual bool interrupted();
// interface internal::ISrsThreadHandler
public:
virtual int cycle();
virtual void on_thread_stop();
};
#endif