1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #906, #902, use coroutine for one cycle thread

This commit is contained in:
winlin 2017-05-29 19:45:19 +08:00
parent b21f92f97a
commit 2ed2513f08
11 changed files with 44 additions and 212 deletions

View file

@ -32,23 +32,16 @@ using namespace std;
SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c, string cip) SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c, string cip)
{ {
id = 0;
manager = cm; manager = cm;
stfd = c; stfd = c;
ip = cip; ip = cip;
disposed = false;
expired = false;
create_time = srs_get_system_time_ms(); create_time = srs_get_system_time_ms();
skt = new SrsStSocket(); skt = new SrsStSocket();
kbps = new SrsKbps(); kbps = new SrsKbps();
kbps->set_io(skt, skt); kbps->set_io(skt, skt);
// the client thread should reap itself, trd = new SrsCoroutine("conn", this);
// so we never use joinable.
// TODO: FIXME: maybe other thread need to stop it.
// @see: https://github.com/ossrs/srs/issues/78
pthread = new SrsOneCycleThread("conn", this);
} }
SrsConnection::~SrsConnection() SrsConnection::~SrsConnection()
@ -57,7 +50,9 @@ SrsConnection::~SrsConnection()
srs_freep(kbps); srs_freep(kbps);
srs_freep(skt); srs_freep(skt);
srs_freep(pthread); srs_freep(trd);
srs_close_stfd(stfd);
} }
void SrsConnection::resample() void SrsConnection::resample()
@ -82,17 +77,7 @@ void SrsConnection::cleanup()
void SrsConnection::dispose() void SrsConnection::dispose()
{ {
if (disposed) { trd->interrupt();
return;
}
disposed = true;
/**
* when delete the connection, stop the connection,
* close the underlayer socket, delete the thread.
*/
srs_close_stfd(stfd);
} }
int SrsConnection::start() int SrsConnection::start()
@ -103,16 +88,13 @@ int SrsConnection::start()
return ret; return ret;
} }
return pthread->start(); return trd->start();
} }
int SrsConnection::cycle() int SrsConnection::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
_srs_context->generate_id();
id = _srs_context->get_id();
int oret = ret = do_cycle(); int oret = ret = do_cycle();
// if socket io error, set to closed. // if socket io error, set to closed.
@ -138,12 +120,12 @@ int SrsConnection::cycle()
int SrsConnection::srs_id() int SrsConnection::srs_id()
{ {
return id; return trd->cid();
} }
void SrsConnection::expire() void SrsConnection::expire()
{ {
expired = true; trd->interrupt();
} }

View file

@ -39,20 +39,15 @@
* all connections accept from listener must extends from this base class, * all connections accept from listener must extends from this base class,
* server will add the connection to manager, and delete it when remove. * server will add the connection to manager, and delete it when remove.
*/ */
class SrsConnection : virtual public ISrsConnection, virtual public ISrsOneCycleThreadHandler class SrsConnection : virtual public ISrsConnection, virtual public ISrsCoroutineHandler
, virtual public IKbpsDelta, virtual public ISrsReloadHandler , virtual public IKbpsDelta, virtual public ISrsReloadHandler
{ {
private: protected:
/** /**
* each connection start a green thread, * each connection start a green thread,
* when thread stop, the connection will be delete by server. * when thread stop, the connection will be delete by server.
*/ */
SrsOneCycleThread* pthread; SrsCoroutine* trd;
/**
* the id of connection.
*/
int id;
protected:
/** /**
* the manager object to manage the connection. * the manager object to manage the connection.
*/ */
@ -65,16 +60,6 @@ protected:
* the ip of client. * the ip of client.
*/ */
std::string ip; std::string ip;
/**
* whether the connection is disposed,
* when disposed, connection should stop cycle and cleanup itself.
*/
bool disposed;
/**
* whether connection is expired, application definition.
* when expired, the connection must never be served and quit ASAP.
*/
bool expired;
/** /**
* the underlayer socket. * the underlayer socket.
*/ */

View file

@ -1353,7 +1353,7 @@ int SrsHttpApi::do_cycle()
} }
// process http messages. // process http messages.
while(!disposed) { while(!trd->pull()) {
ISrsHttpMessage* req = NULL; ISrsHttpMessage* req = NULL;
// get a http message // get a http message

View file

@ -121,7 +121,7 @@ int SrsHttpConn::do_cycle()
} }
// process http messages. // process http messages.
while (!disposed) { while (!trd->pull()) {
ISrsHttpMessage* req = NULL; ISrsHttpMessage* req = NULL;
// get a http message // get a http message

View file

@ -536,7 +536,7 @@ SrsHttpRecvThread::SrsHttpRecvThread(SrsResponseOnlyHttpConn* c)
{ {
conn = c; conn = c;
error = ERROR_SUCCESS; error = ERROR_SUCCESS;
trd = new SrsOneCycleThread("http-receive", this); trd = new SrsCoroutine("http-receive", this, _srs_context->get_id());
} }
SrsHttpRecvThread::~SrsHttpRecvThread() SrsHttpRecvThread::~SrsHttpRecvThread()
@ -558,7 +558,7 @@ int SrsHttpRecvThread::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
while (true) { while (!trd->pull()) {
ISrsHttpMessage* req = NULL; ISrsHttpMessage* req = NULL;
SrsAutoFree(ISrsHttpMessage, req); SrsAutoFree(ISrsHttpMessage, req);

View file

@ -229,11 +229,11 @@ private:
* when client closed the request, to avoid FD leak. * when client closed the request, to avoid FD leak.
* @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427 * @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
*/ */
class SrsHttpRecvThread : public ISrsOneCycleThreadHandler class SrsHttpRecvThread : public ISrsCoroutineHandler
{ {
private: private:
SrsResponseOnlyHttpConn* conn; SrsResponseOnlyHttpConn* conn;
SrsOneCycleThread* trd; SrsCoroutine* trd;
int error; int error;
public: public:
SrsHttpRecvThread(SrsResponseOnlyHttpConn* c); SrsHttpRecvThread(SrsResponseOnlyHttpConn* c);

View file

@ -450,7 +450,7 @@ int SrsRtmpConn::service_cycle()
} }
srs_verbose("on_bw_done success"); srs_verbose("on_bw_done success");
while (!disposed) { while (!trd->pull()) {
ret = stream_service_cycle(); ret = stream_service_cycle();
// stream service must terminated with error, never success. // stream service must terminated with error, never success.
@ -692,7 +692,7 @@ int SrsRtmpConn::playing(SrsSource* source)
return ret; return ret;
} }
int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd) int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* rtrd)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -730,12 +730,12 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d", srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
send_min_interval, mw_sleep, mw_enabled, realtime, tcp_nodelay); send_min_interval, mw_sleep, mw_enabled, realtime, tcp_nodelay);
while (!disposed) { while (true) {
// collect elapse for pithy print. // collect elapse for pithy print.
pprint->elapse(); pprint->elapse();
// when source is set to expired, disconnect it. // when source is set to expired, disconnect it.
if (expired) { if (trd->pull()) {
ret = ERROR_USER_DISCONNECT; ret = ERROR_USER_DISCONNECT;
srs_error("connection expired. ret=%d", ret); srs_error("connection expired. ret=%d", ret);
return ret; return ret;
@ -744,8 +744,8 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
// to use isolate thread to recv, can improve about 33% performance. // to use isolate thread to recv, can improve about 33% performance.
// @see: https://github.com/ossrs/srs/issues/196 // @see: https://github.com/ossrs/srs/issues/196
// @see: https://github.com/ossrs/srs/issues/217 // @see: https://github.com/ossrs/srs/issues/217
while (!trd->empty()) { while (!rtrd->empty()) {
SrsCommonMessage* msg = trd->pump(); SrsCommonMessage* msg = rtrd->pump();
srs_verbose("pump client message to process."); srs_verbose("pump client message to process.");
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) { if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
@ -757,7 +757,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRe
} }
// quit when recv thread error. // quit when recv thread error.
if ((ret = trd->error_code()) != ERROR_SUCCESS) { if ((ret = rtrd->error_code()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) { if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
srs_error("recv thread failed. ret=%d", ret); srs_error("recv thread failed. ret=%d", ret);
} }
@ -893,13 +893,13 @@ int SrsRtmpConn::publishing(SrsSource* source)
if ((ret = acquire_publish(source)) == ERROR_SUCCESS) { if ((ret = acquire_publish(source)) == ERROR_SUCCESS) {
// use isolate thread to recv, // use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237 // @see: https://github.com/ossrs/srs/issues/237
SrsPublishRecvThread trd(rtmp, req, st_netfd_fileno(stfd), 0, this, source); SrsPublishRecvThread rtrd(rtmp, req, st_netfd_fileno(stfd), 0, this, source);
srs_info("start to publish stream %s success", req->stream.c_str()); srs_info("start to publish stream %s success", req->stream.c_str());
ret = do_publishing(source, &trd); ret = do_publishing(source, &rtrd);
// stop isolate recv thread // stop isolate recv thread
trd.stop(); rtrd.stop();
} }
// whatever the acquire publish, always release publish. // whatever the acquire publish, always release publish.
@ -916,7 +916,7 @@ int SrsRtmpConn::publishing(SrsSource* source)
return ret; return ret;
} }
int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* rtrd)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -925,15 +925,15 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
SrsAutoFree(SrsPithyPrint, pprint); SrsAutoFree(SrsPithyPrint, pprint);
// start isolate recv thread. // start isolate recv thread.
if ((ret = trd->start()) != ERROR_SUCCESS) { if ((ret = rtrd->start()) != ERROR_SUCCESS) {
srs_error("start isolate recv thread failed. ret=%d", ret); srs_error("start isolate recv thread failed. ret=%d", ret);
return ret; return ret;
} }
// change the isolate recv thread context id, // change the isolate recv thread context id,
// merge its log to current thread. // merge its log to current thread.
int receive_thread_cid = trd->get_cid(); int receive_thread_cid = rtrd->get_cid();
trd->set_cid(_srs_context->get_id()); rtrd->set_cid(_srs_context->get_id());
// initialize the publish timeout. // initialize the publish timeout.
publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
@ -951,11 +951,11 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
int64_t nb_msgs = 0; int64_t nb_msgs = 0;
uint64_t nb_frames = 0; uint64_t nb_frames = 0;
while (!disposed) { while (true) {
pprint->elapse(); pprint->elapse();
// when source is set to expired, disconnect it. // when source is set to expired, disconnect it.
if (expired) { if (trd->pull()) {
ret = ERROR_USER_DISCONNECT; ret = ERROR_USER_DISCONNECT;
srs_error("connection expired. ret=%d", ret); srs_error("connection expired. ret=%d", ret);
return ret; return ret;
@ -965,13 +965,13 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
if (nb_msgs == 0) { if (nb_msgs == 0) {
// when not got msgs, wait for a larger timeout. // when not got msgs, wait for a larger timeout.
// @see https://github.com/ossrs/srs/issues/441 // @see https://github.com/ossrs/srs/issues/441
trd->wait(publish_1stpkt_timeout); rtrd->wait(publish_1stpkt_timeout);
} else { } else {
trd->wait(publish_normal_timeout); rtrd->wait(publish_normal_timeout);
} }
// check the thread error code. // check the thread error code.
if ((ret = trd->error_code()) != ERROR_SUCCESS) { if ((ret = rtrd->error_code()) != ERROR_SUCCESS) {
if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) { if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
srs_error("recv thread failed. ret=%d", ret); srs_error("recv thread failed. ret=%d", ret);
} }
@ -979,21 +979,21 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
} }
// when not got any messages, timeout. // when not got any messages, timeout.
if (trd->nb_msgs() <= nb_msgs) { if (rtrd->nb_msgs() <= nb_msgs) {
ret = ERROR_SOCKET_TIMEOUT; ret = ERROR_SOCKET_TIMEOUT;
srs_warn("publish timeout %dms, nb_msgs=%" PRId64 ", ret=%d", srs_warn("publish timeout %dms, nb_msgs=%" PRId64 ", ret=%d",
nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret); nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret);
break; break;
} }
nb_msgs = trd->nb_msgs(); nb_msgs = rtrd->nb_msgs();
// Update the stat for video fps. // Update the stat for video fps.
// @remark https://github.com/ossrs/srs/issues/851 // @remark https://github.com/ossrs/srs/issues/851
SrsStatistic* stat = SrsStatistic::instance(); SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_video_frames(req, (int)(trd->nb_video_frames() - nb_frames))) != ERROR_SUCCESS) { if ((ret = stat->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != ERROR_SUCCESS) {
return ret; return ret;
} }
nb_frames = trd->nb_video_frames(); nb_frames = rtrd->nb_video_frames();
// reportable // reportable
if (pprint->can_print()) { if (pprint->can_print()) {

View file

@ -195,7 +195,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
stfd = fd; stfd = fd;
skt = new SrsStSocket(); skt = new SrsStSocket();
rtsp = new SrsRtspStack(skt); rtsp = new SrsRtspStack(skt);
trd = new SrsOneCycleThread("rtsp", this); trd = new SrsCoroutine("rtsp", this);
req = NULL; req = NULL;
sdk = NULL; sdk = NULL;
@ -249,7 +249,7 @@ int SrsRtspConn::do_cycle()
srs_trace("rtsp: serve %s", ip.c_str()); srs_trace("rtsp: serve %s", ip.c_str());
// consume all rtsp messages. // consume all rtsp messages.
for (;;) { while (!trd->pull()) {
SrsRtspRequest* req = NULL; SrsRtspRequest* req = NULL;
if ((ret = rtsp->recv_message(&req)) != ERROR_SUCCESS) { if ((ret = rtsp->recv_message(&req)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) { if (!srs_is_client_gracefully_close(ret)) {

View file

@ -110,7 +110,7 @@ public:
/** /**
* the rtsp connection serve the fd. * the rtsp connection serve the fd.
*/ */
class SrsRtspConn : public ISrsOneCycleThreadHandler class SrsRtspConn : public ISrsCoroutineHandler
{ {
private: private:
std::string output_template; std::string output_template;
@ -133,7 +133,7 @@ private:
SrsStSocket* skt; SrsStSocket* skt;
SrsRtspStack* rtsp; SrsRtspStack* rtsp;
SrsRtspCaster* caster; SrsRtspCaster* caster;
SrsOneCycleThread* trd; SrsCoroutine* trd;
private: private:
SrsRequest* req; SrsRequest* req;
SrsSimpleRtmpClient* sdk; SrsSimpleRtmpClient* sdk;

View file

@ -78,76 +78,6 @@ void SrsCoroutineManager::clear()
} }
} }
ISrsOneCycleThreadHandler::ISrsOneCycleThreadHandler()
{
}
ISrsOneCycleThreadHandler::~ISrsOneCycleThreadHandler()
{
}
void ISrsOneCycleThreadHandler::on_thread_start()
{
}
int ISrsOneCycleThreadHandler::on_before_cycle()
{
return ERROR_SUCCESS;
}
int ISrsOneCycleThreadHandler::on_end_cycle()
{
return ERROR_SUCCESS;
}
void ISrsOneCycleThreadHandler::on_thread_stop()
{
}
SrsOneCycleThread::SrsOneCycleThread(const char* n, ISrsOneCycleThreadHandler* h)
{
handler = h;
pthread = new internal::SrsThread(n, this, 0, false);
}
SrsOneCycleThread::~SrsOneCycleThread()
{
pthread->stop();
srs_freep(pthread);
}
int SrsOneCycleThread::start()
{
return pthread->start();
}
int SrsOneCycleThread::cycle()
{
int ret = handler->cycle();
pthread->stop_loop();
return ret;
}
void SrsOneCycleThread::on_thread_start()
{
handler->on_thread_start();
}
int SrsOneCycleThread::on_before_cycle()
{
return handler->on_before_cycle();
}
int SrsOneCycleThread::on_end_cycle()
{
return handler->on_end_cycle();
}
void SrsOneCycleThread::on_thread_stop()
{
handler->on_thread_stop();
}
ISrsReusableThreadHandler::ISrsReusableThreadHandler() ISrsReusableThreadHandler::ISrsReusableThreadHandler()
{ {
} }

View file

@ -58,71 +58,6 @@ private:
void clear(); void clear();
}; };
/**
* the one cycle thread is a thread do the cycle only one time,
* that is, the thread will quit when return from the cycle.
* user can create thread which stop itself,
* generally only need to provides a start method,
* the object will destroy itself then terminate the thread, @see SrsConnection
* 1. create SrsThread field
* 2. the thread quit when return from cycle.
* for example:
* class SrsConnection : public ISrsOneCycleThreadHandler {
* public: SrsConnection() { pthread = new SrsOneCycleThread("conn", this); }
* public: virtual int start() { return pthread->start(); }
* public: virtual int cycle() {
* // serve client.
* // set loop to stop to quit, stop thread itself.
* pthread->stop_loop();
* }
* public: virtual void on_thread_stop() {
* // remove the connection in thread itself.
* server->remove(this);
* }
* };
*/
class ISrsOneCycleThreadHandler
{
public:
ISrsOneCycleThreadHandler();
virtual ~ISrsOneCycleThreadHandler();
public:
/**
* the cycle method for the one cycle thread.
*/
virtual int cycle() = 0;
public:
/**
* other callback for handler.
* @remark all callback is optional, handler can ignore it.
*/
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
};
class SrsOneCycleThread : public internal::ISrsThreadHandler
{
private:
internal::SrsThread* pthread;
ISrsOneCycleThreadHandler* handler;
public:
SrsOneCycleThread(const char* n, ISrsOneCycleThreadHandler* h);
virtual ~SrsOneCycleThread();
public:
/**
* for the one cycle thread, quit when cycle return.
*/
virtual int start();
// interface internal::ISrsThreadHandler
public:
virtual int cycle();
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
};
/** /**
* the reuse thread is a thread stop and start by other thread. * the reuse thread is a thread stop and start by other thread.
* user can create thread and stop then start again and again, * user can create thread and stop then start again and again,