1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Fix #908, use empty coroutine to avoid NULL pointer.

This commit is contained in:
winlin 2017-06-04 19:13:56 +08:00
parent 0e9e1792fe
commit 9ca36970aa
17 changed files with 115 additions and 41 deletions

View file

@ -38,7 +38,7 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask()
SrsAsyncCallWorker::SrsAsyncCallWorker()
{
trd = NULL;
trd = new SrsDummyCoroutine();
wait = srs_cond_new();
}
@ -74,7 +74,7 @@ int SrsAsyncCallWorker::count()
int SrsAsyncCallWorker::start()
{
srs_freep(trd);
trd = new SrsCoroutine("async", this, _srs_context->get_id());
trd = new SrsSTCoroutine("async", this, _srs_context->get_id());
return trd->start();
}

View file

@ -41,7 +41,7 @@ SrsConnection::SrsConnection(IConnectionManager* cm, srs_netfd_t c, string cip)
kbps = new SrsKbps();
kbps->set_io(skt, skt);
trd = new SrsCoroutine("conn", this);
trd = new SrsSTCoroutine("conn", this);
}
SrsConnection::~SrsConnection()

View file

@ -166,7 +166,7 @@ SrsEdgeIngester::SrsEdgeIngester()
upstream = new SrsEdgeRtmpUpstream(redirect);
lb = new SrsLbRoundRobin();
trd = NULL;
trd = new SrsDummyCoroutine();
}
SrsEdgeIngester::~SrsEdgeIngester()
@ -199,7 +199,7 @@ int SrsEdgeIngester::start()
}
srs_freep(trd);
trd = new SrsCoroutine("edge-igs", this);
trd = new SrsSTCoroutine("edge-igs", this);
return trd->start();
}
@ -423,7 +423,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
sdk = NULL;
lb = new SrsLbRoundRobin();
trd = NULL;
trd = new SrsDummyCoroutine();
queue = new SrsMessageQueue();
}
@ -493,7 +493,8 @@ int SrsEdgeForwarder::start()
return ret;
}
trd = new SrsCoroutine("edge-fwr", this);
srs_freep(trd);
trd = new SrsSTCoroutine("edge-fwr", this);
return trd->start();
}

View file

@ -41,7 +41,7 @@ static std::vector<std::string> _transcoded_url;
SrsEncoder::SrsEncoder()
{
trd = NULL;
trd = new SrsDummyCoroutine();
pprint = SrsPithyPrint::create_encoder();
}
@ -74,7 +74,7 @@ int SrsEncoder::on_publish(SrsRequest* req)
// start thread to run all encoding engines.
srs_freep(trd);
trd = new SrsCoroutine("encoder", this, _srs_context->get_id());
trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;

View file

@ -55,7 +55,7 @@ SrsForwarder::SrsForwarder(SrsOriginHub* h)
sh_video = sh_audio = NULL;
sdk = NULL;
trd = NULL;
trd = new SrsDummyCoroutine();
queue = new SrsMessageQueue();
jitter = new SrsRtmpJitter();
}
@ -136,7 +136,7 @@ int SrsForwarder::on_publish()
req->stream.c_str());
srs_freep(trd);
trd = new SrsCoroutine("forward", this);
trd = new SrsSTCoroutine("forward", this);
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("start srs thread failed. ret=%d", ret);
return ret;

View file

@ -60,7 +60,7 @@ SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r)
req = r->copy();
source = s;
queue = new SrsMessageQueue(true);
trd = new SrsCoroutine("http-stream", this);
trd = new SrsSTCoroutine("http-stream", this);
// TODO: FIXME: support reload.
fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost);

View file

@ -105,7 +105,7 @@ SrsIngester::SrsIngester()
expired = false;
trd = NULL;
trd = new SrsDummyCoroutine();
pprint = SrsPithyPrint::create_ingester();
}
@ -141,7 +141,7 @@ int SrsIngester::start()
// start thread to run all encoding engines.
srs_freep(trd);
trd = new SrsCoroutine("ingest", this, _srs_context->get_id());
trd = new SrsSTCoroutine("ingest", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;

View file

@ -365,7 +365,7 @@ SrsKafkaProducer::SrsKafkaProducer()
metadata_expired = srs_cond_new();
lock = srs_mutex_new();
trd = NULL;
trd = new SrsDummyCoroutine();
worker = new SrsAsyncCallWorker();
cache = new SrsKafkaCache();
@ -410,7 +410,7 @@ int SrsKafkaProducer::start()
}
srs_freep(trd);
trd = new SrsCoroutine("kafka", this, _srs_context->get_id());
trd = new SrsSTCoroutine("kafka", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("start kafka thread failed. ret=%d", ret);
}

View file

@ -80,7 +80,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];
trd = NULL;
trd = new SrsDummyCoroutine();
}
SrsUdpListener::~SrsUdpListener()
@ -140,7 +140,7 @@ int SrsUdpListener::listen()
srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
srs_freep(trd);
trd = new SrsCoroutine("udp", this);
trd = new SrsSTCoroutine("udp", this);
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
@ -187,7 +187,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
_fd = -1;
_stfd = NULL;
trd = NULL;
trd = new SrsDummyCoroutine();
}
SrsTcpListener::~SrsTcpListener()
@ -242,7 +242,7 @@ int SrsTcpListener::listen()
srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
srs_freep(trd);
trd = new SrsCoroutine("tcp", this);
trd = new SrsSTCoroutine("tcp", this);
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;

View file

@ -38,7 +38,7 @@ using namespace std;
SrsNgExec::SrsNgExec()
{
trd = NULL;
trd = new SrsDummyCoroutine();
pprint = SrsPithyPrint::create_exec();
}
@ -61,7 +61,7 @@ int SrsNgExec::on_publish(SrsRequest* req)
// start thread to run all processes.
srs_freep(trd);
trd = new SrsCoroutine("encoder", this, _srs_context->get_id());
trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;

View file

@ -61,7 +61,7 @@ SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm)
rtmp = r;
pumper = p;
timeout = tm;
trd = NULL;
trd = new SrsDummyCoroutine();
}
SrsRecvThread::~SrsRecvThread()
@ -77,7 +77,7 @@ int SrsRecvThread::cid()
int SrsRecvThread::start()
{
srs_freep(trd);
trd = new SrsCoroutine("recv", this);
trd = new SrsSTCoroutine("recv", this);
return trd->start();
}
@ -535,7 +535,7 @@ SrsHttpRecvThread::SrsHttpRecvThread(SrsResponseOnlyHttpConn* c)
{
conn = c;
error = ERROR_SUCCESS;
trd = new SrsCoroutine("http-receive", this, _srs_context->get_id());
trd = new SrsSTCoroutine("http-receive", this, _srs_context->get_id());
}
SrsHttpRecvThread::~SrsHttpRecvThread()

View file

@ -195,7 +195,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o)
stfd = fd;
skt = new SrsStSocket();
rtsp = new SrsRtspStack(skt);
trd = new SrsCoroutine("rtsp", this);
trd = new SrsSTCoroutine("rtsp", this);
req = NULL;
sdk = NULL;

View file

@ -362,7 +362,7 @@ SrsSignalManager::SrsSignalManager(SrsServer* s)
server = s;
sig_pipe[0] = sig_pipe[1] = -1;
trd = new SrsCoroutine("signal", this);
trd = new SrsSTCoroutine("signal", this);
signal_read_stfd = NULL;
}

View file

@ -40,7 +40,46 @@ ISrsCoroutineHandler::~ISrsCoroutineHandler()
{
}
SrsCoroutine::SrsCoroutine(const string& n, ISrsCoroutineHandler* h, int cid)
SrsCoroutine::SrsCoroutine()
{
}
SrsCoroutine::~SrsCoroutine()
{
}
SrsDummyCoroutine::SrsDummyCoroutine()
{
}
SrsDummyCoroutine::~SrsDummyCoroutine()
{
}
int SrsDummyCoroutine::start()
{
return ERROR_THREAD_DUMMY;
}
void SrsDummyCoroutine::stop()
{
}
void SrsDummyCoroutine::interrupt()
{
}
int SrsDummyCoroutine::pull()
{
return ERROR_THREAD_DUMMY;
}
int SrsDummyCoroutine::cid()
{
return 0;
}
SrsSTCoroutine::SrsSTCoroutine(const string& n, ISrsCoroutineHandler* h, int cid)
{
name = n;
handler = h;
@ -50,12 +89,12 @@ SrsCoroutine::SrsCoroutine(const string& n, ISrsCoroutineHandler* h, int cid)
started = interrupted = disposed = false;
}
SrsCoroutine::~SrsCoroutine()
SrsSTCoroutine::~SrsSTCoroutine()
{
stop();
}
int SrsCoroutine::start()
int SrsSTCoroutine::start()
{
int ret = ERROR_SUCCESS;
@ -77,7 +116,7 @@ int SrsCoroutine::start()
return ret;
}
void SrsCoroutine::stop()
void SrsSTCoroutine::stop()
{
if (!started || disposed) {
return;
@ -101,7 +140,7 @@ void SrsCoroutine::stop()
return;
}
void SrsCoroutine::interrupt()
void SrsSTCoroutine::interrupt()
{
if (!started || interrupted) {
return;
@ -113,17 +152,17 @@ void SrsCoroutine::interrupt()
st_thread_interrupt((st_thread_t)trd);
}
int SrsCoroutine::pull()
int SrsSTCoroutine::pull()
{
return err;
}
int SrsCoroutine::cid()
int SrsSTCoroutine::cid()
{
return context;
}
int SrsCoroutine::cycle()
int SrsSTCoroutine::cycle()
{
if (_srs_context) {
if (context) {
@ -139,9 +178,9 @@ int SrsCoroutine::cycle()
return ret;
}
void* SrsCoroutine::pfn(void* arg)
void* SrsSTCoroutine::pfn(void* arg)
{
SrsCoroutine* p = (SrsCoroutine*)arg;
SrsSTCoroutine* p = (SrsSTCoroutine*)arg;
void*res = (void*)(uint64_t)p->cycle();
return res;
}

View file

@ -67,6 +67,39 @@ public:
virtual int cycle() = 0;
};
/**
* The corotine object.
*/
class SrsCoroutine
{
public:
SrsCoroutine();
virtual ~SrsCoroutine();
public:
virtual int start() = 0;
virtual void stop() = 0;
virtual void interrupt() = 0;
virtual int pull() = 0;
virtual int cid() = 0;
};
/**
* An empty coroutine, user can default to this object before create any real coroutine.
* @see https://github.com/ossrs/srs/pull/908
*/
class SrsDummyCoroutine : public SrsCoroutine
{
public:
SrsDummyCoroutine();
virtual ~SrsDummyCoroutine();
public:
virtual int start();
virtual void stop();
virtual void interrupt();
virtual int pull();
virtual int cid();
};
/**
* A ST-coroutine is a lightweight thread, just like the goroutine.
* But the goroutine maybe run on different thread, while ST-coroutine only
@ -81,7 +114,7 @@ public:
* @remark We always create joinable thread, so we must join it or memory leak,
* Please read https://github.com/ossrs/srs/issues/78
*/
class SrsCoroutine
class SrsSTCoroutine : public SrsCoroutine
{
private:
std::string name;
@ -97,8 +130,8 @@ private:
public:
// Create a thread with name n and handler h.
// @remark User can specify a cid for thread to use, or we will allocate a new one.
SrsCoroutine(const std::string& n, ISrsCoroutineHandler* h, int cid = 0);
virtual ~SrsCoroutine();
SrsSTCoroutine(const std::string& n, ISrsCoroutineHandler* h, int cid = 0);
virtual ~SrsSTCoroutine();
public:
/**
* Start the thread.

View file

@ -32,7 +32,7 @@ using namespace std;
SrsCoroutineManager::SrsCoroutineManager()
{
cond = srs_cond_new();
trd = new SrsCoroutine("manager", this);
trd = new SrsSTCoroutine("manager", this);
}
SrsCoroutineManager::~SrsCoroutineManager()

View file

@ -106,6 +106,7 @@
#define ERROR_THREAD_DISPOSED 1069
#define ERROR_THREAD_INTERRUPED 1070
#define ERROR_THREAD_TERMINATED 1071
#define ERROR_THREAD_DUMMY 1072
///////////////////////////////////////////////////////
// RTMP protocol error.