1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Cleanup server for GMC, by WaitGroup to destroy. (#2247). v4.0.206

This commit is contained in:
winlin 2021-11-17 08:23:32 +08:00
parent 73c1392748
commit 63da0dca92
18 changed files with 114 additions and 42 deletions

View file

@ -136,7 +136,7 @@ srs_error_t SrsServerAdapter::initialize()
return err;
}
srs_error_t SrsServerAdapter::run()
srs_error_t SrsServerAdapter::run(SrsWaitGroup* wg)
{
srs_error_t err = srs_success;
@ -173,7 +173,7 @@ srs_error_t SrsServerAdapter::run()
return srs_error_wrap(err, "ingest");
}
if ((err = srs->start()) != srs_success) {
if ((err = srs->start(wg)) != srs_success) {
return srs_error_wrap(err, "start");
}
@ -182,6 +182,7 @@ srs_error_t SrsServerAdapter::run()
void SrsServerAdapter::stop()
{
srs->stop();
}
SrsServer* SrsServerAdapter::instance()
@ -264,17 +265,20 @@ srs_error_t SrsHybridServer::run()
{
srs_error_t err = srs_success;
// Wait for all servers which need to do cleanup.
SrsWaitGroup wg;
vector<ISrsHybridServer*>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer* server = *it;
if ((err = server->run()) != srs_success) {
if ((err = server->run(&wg)) != srs_success) {
return srs_error_wrap(err, "run server");
}
}
// Wait for all server to quit.
srs_usleep(SRS_UTIME_NO_TIMEOUT);
wg.wait();
return err;
}

View file

@ -14,6 +14,7 @@
#include <srs_app_hourglass.hpp>
class SrsServer;
class SrsWaitGroup;
// The hibrid server interfaces, we could register many servers.
class ISrsHybridServer
@ -25,7 +26,7 @@ public:
// Only ST initialized before each server, we could fork processes as such.
virtual srs_error_t initialize() = 0;
// Run each server, should never block except the SRS master server.
virtual srs_error_t run() = 0;
virtual srs_error_t run(SrsWaitGroup* wg) = 0;
// Stop each server, should do cleanup, for example, kill processes forked by server.
virtual void stop() = 0;
};
@ -40,7 +41,7 @@ public:
virtual ~SrsServerAdapter();
public:
virtual srs_error_t initialize();
virtual srs_error_t run();
virtual srs_error_t run(SrsWaitGroup* wg);
virtual void stop();
public:
virtual SrsServer* instance();

View file

@ -295,11 +295,11 @@ void SrsRtcServer::set_handler(ISrsRtcServerHandler* h)
void SrsRtcServer::set_hijacker(ISrsRtcServerHijacker* h)
{
hijacker = h;
}
srs_error_t SrsRtcServer::exec_async_work(ISrsAsyncCallTask * t)
{
return async->execute(t);
}
srs_error_t SrsRtcServer::exec_async_work(ISrsAsyncCallTask * t)
{
return async->execute(t);
}
srs_error_t SrsRtcServer::listen_udp()
@ -698,7 +698,7 @@ srs_error_t RtcServerAdapter::initialize()
return err;
}
srs_error_t RtcServerAdapter::run()
srs_error_t RtcServerAdapter::run(SrsWaitGroup* wg)
{
srs_error_t err = srs_success;

View file

@ -26,6 +26,7 @@ class SrsRequest;
class SrsSdp;
class SrsRtcSource;
class SrsResourceManager;
class SrsWaitGroup;
// The UDP black hole, for developer to use wireshark to catch plaintext packets.
// For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole,
@ -137,7 +138,7 @@ public:
virtual ~RtcServerAdapter();
public:
virtual srs_error_t initialize();
virtual srs_error_t run();
virtual srs_error_t run(SrsWaitGroup* wg);
virtual void stop();
};

View file

@ -242,6 +242,8 @@ SrsSignalManager::SrsSignalManager(SrsServer* s)
SrsSignalManager::~SrsSignalManager()
{
srs_freep(trd);
srs_close_stfd(signal_read_stfd);
if (sig_pipe[0] > 0) {
@ -250,8 +252,6 @@ SrsSignalManager::~SrsSignalManager()
if (sig_pipe[1] > 0) {
::close(sig_pipe[1]);
}
srs_freep(trd);
}
srs_error_t SrsSignalManager::initialize()
@ -528,6 +528,7 @@ SrsServer::SrsServer()
ingester = new SrsIngester();
trd_ = new SrsSTCoroutine("srs", this, _srs_context->get_id());
timer_ = NULL;
wg_ = NULL;
}
SrsServer::~SrsServer()
@ -890,7 +891,7 @@ srs_error_t SrsServer::ingest()
return err;
}
srs_error_t SrsServer::start()
srs_error_t SrsServer::start(SrsWaitGroup* wg)
{
srs_error_t err = srs_success;
@ -906,31 +907,25 @@ srs_error_t SrsServer::start()
return srs_error_wrap(err, "tick");
}
// OK, we start SRS server.
wg_ = wg;
wg->add(1);
return err;
}
srs_error_t SrsServer::cycle()
void SrsServer::stop()
{
srs_error_t err = srs_success;
// Start the inotify auto reload by watching config file.
SrsInotifyWorker inotify(this);
if ((err = inotify.start()) != srs_success) {
return srs_error_wrap(err, "start inotify");
}
// Do server main cycle.
err = do_cycle();
#ifdef SRS_GPERF_MC
destroy();
// remark, for gmc, never invoke the exit().
srs_warn("sleep a long time for system st-threads to cleanup.");
srs_usleep(3 * 1000 * 1000);
srs_warn("system quit");
return err;
// For GCM, cleanup done.
return;
#endif
// quit normally.
@ -949,12 +944,27 @@ srs_error_t SrsServer::cycle()
}
srs_trace("srs terminated");
// for valgrind to detect.
srs_freep(_srs_config);
srs_freep(_srs_log);
}
exit(0);
srs_error_t SrsServer::cycle()
{
srs_error_t err = srs_success;
// Start the inotify auto reload by watching config file.
SrsInotifyWorker inotify(this);
if ((err = inotify.start()) != srs_success) {
return srs_error_wrap(err, "start inotify");
}
// Do server main cycle.
err = do_cycle();
// OK, SRS server is done.
wg_->done();
return err;
}

View file

@ -35,6 +35,7 @@ class SrsTcpListener;
class SrsAppCasterFlv;
class SrsResourceManager;
class SrsLatestVersion;
class SrsWaitGroup;
// The listener type for server to identify the connection,
// that is, use different type to process the connection.
@ -204,6 +205,7 @@ private:
SrsResourceManager* conn_manager;
SrsCoroutine* trd_;
SrsHourGlass* timer_;
SrsWaitGroup* wg_;
private:
// The pid file fd, lock the file write when server is running.
// @remark the init.d script should cleanup the pid file, when stop service,
@ -252,7 +254,9 @@ public:
virtual srs_error_t register_signal();
virtual srs_error_t http_handle();
virtual srs_error_t ingest();
virtual srs_error_t start();
public:
virtual srs_error_t start(SrsWaitGroup* wg);
void stop();
// interface ISrsCoroutineHandler
public:
virtual srs_error_t cycle();

View file

@ -297,3 +297,35 @@ void* SrsFastCoroutine::pfn(void* arg)
return (void*)err;
}
SrsWaitGroup::SrsWaitGroup()
{
nn_ = 0;
done_ = srs_cond_new();
}
SrsWaitGroup::~SrsWaitGroup()
{
wait();
srs_cond_destroy(done_);
}
void SrsWaitGroup::add(int n)
{
nn_ += n;
}
void SrsWaitGroup::done()
{
nn_--;
if (nn_ <= 0) {
srs_cond_signal(done_);
}
}
void SrsWaitGroup::wait()
{
if (nn_ > 0) {
srs_cond_wait(done_);
}
}

View file

@ -189,5 +189,23 @@ private:
static void* pfn(void* arg);
};
// Like goroytine sync.WaitGroup.
class SrsWaitGroup
{
private:
int nn_;
srs_cond_t done_;
public:
SrsWaitGroup();
virtual ~SrsWaitGroup();
public:
// When start for n coroutines.
void add(int n);
// When coroutine is done.
void done();
// Wait for all corotine to be done.
void wait();
};
#endif

View file

@ -9,6 +9,6 @@
#define VERSION_MAJOR 4
#define VERSION_MINOR 0
#define VERSION_REVISION 211
#define VERSION_REVISION 212
#endif

View file

@ -323,7 +323,7 @@ srs_error_t SrtServerAdapter::initialize()
return err;
}
srs_error_t SrtServerAdapter::run()
srs_error_t SrtServerAdapter::run(SrsWaitGroup* wg)
{
srs_error_t err = srs_success;

View file

@ -17,6 +17,7 @@
#include <srs_app_hybrid.hpp>
class srt_handle;
class SrsWaitGroup;
class srt_server {
public:
@ -59,7 +60,7 @@ public:
virtual ~SrtServerAdapter();
public:
virtual srs_error_t initialize();
virtual srs_error_t run();
virtual srs_error_t run(SrsWaitGroup* wg);
virtual void stop();
};