From d0dff2d3326a748058f9a36a231bd9eda9a17e29 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 30 Apr 2014 11:26:32 +0800 Subject: [PATCH] fix bug #34: convert signal to io. 0.9.85 --- trunk/src/app/srs_app_server.cpp | 120 ++++++++++++++++++++++++++ trunk/src/app/srs_app_server.hpp | 39 ++++++++- trunk/src/core/srs_core.hpp | 2 +- trunk/src/kernel/srs_kernel_error.hpp | 1 + trunk/src/main/srs_main_server.cpp | 19 ++-- 5 files changed, 166 insertions(+), 15 deletions(-) diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index cc080e7f0..70669ebde 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -70,6 +70,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // SRS_SYS_CYCLE_INTERVAL * SRS_SYS_MEMINFO_RESOLUTION_TIMES #define SRS_SYS_MEMINFO_RESOLUTION_TIMES 60 +#define SRS_SIGNAL_THREAD_INTERVAL (int64_t)(100*1000LL) + SrsListener::SrsListener(SrsServer* server, SrsListenerType type) { fd = -1; @@ -184,12 +186,117 @@ int SrsListener::cycle() return ret; } +SrsSignalManager* SrsSignalManager::instance = NULL; + +SrsSignalManager::SrsSignalManager(SrsServer* server) +{ + SrsSignalManager::instance = this; + + _server = server; + sig_pipe[0] = sig_pipe[1] = -1; + pthread = new SrsThread(this, SRS_SIGNAL_THREAD_INTERVAL); + signal_read_stfd = NULL; +} + +SrsSignalManager::~SrsSignalManager() +{ + pthread->stop(); + srs_freep(pthread); + + srs_close_stfd(signal_read_stfd); + + if (sig_pipe[0] > 0) { + ::close(sig_pipe[0]); + } + if (sig_pipe[1] > 0) { + ::close(sig_pipe[1]); + } +} + +int SrsSignalManager::initialize() +{ + int ret = ERROR_SUCCESS; + return ret; +} + +int SrsSignalManager::start() +{ + int ret = ERROR_SUCCESS; + + /** + * Note that if multiple processes are used (see below), + * the signal pipe should be initialized after the fork(2) call + * so that each process has its own private pipe. + */ + struct sigaction sa; + + /* Create signal pipe */ + if (pipe(sig_pipe) < 0) { + ret = ERROR_SYSTEM_CREATE_PIPE; + srs_error("create signal manager pipe failed. ret=%d", ret); + return ret; + } + + /* Install sig_catcher() as a signal handler */ + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGNAL_RELOAD, &sa, NULL); + + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGTERM, &sa, NULL); + + sa.sa_handler = SrsSignalManager::sig_catcher; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGINT, &sa, NULL); + + return pthread->start(); +} + +int SrsSignalManager::cycle() +{ + int ret = ERROR_SUCCESS; + + if (signal_read_stfd == NULL) { + signal_read_stfd = st_netfd_open(sig_pipe[0]); + } + + int signo; + + /* Read the next signal from the pipe */ + st_read(signal_read_stfd, &signo, sizeof(int), ST_UTIME_NO_TIMEOUT); + + /* Process signal synchronously */ + _server->on_signal(signo); + + return ret; +} + +void SrsSignalManager::sig_catcher(int signo) +{ + int err; + + /* Save errno to restore it after the write() */ + err = errno; + + /* write() is reentrant/async-safe */ + int fd = SrsSignalManager::instance->sig_pipe[1]; + write(fd, &signo, sizeof(int)); + + errno = err; +} + SrsServer::SrsServer() { signal_reload = false; signal_gmc_stop = false; pid_fd = -1; + signal_manager = new SrsSignalManager(this); + // donot new object in constructor, // for some global instance is not ready now, // new these objects in initialize instead. @@ -226,6 +333,8 @@ SrsServer::~SrsServer() pid_fd = -1; } + srs_freep(signal_manager); + #ifdef SRS_AUTO_HTTP_API srs_freep(http_api_handler); #endif @@ -276,6 +385,11 @@ int SrsServer::initialize() return ret; } +int SrsServer::initialize_signal() +{ + return signal_manager->initialize(); +} + int SrsServer::acquire_pid_file() { int ret = ERROR_SUCCESS; @@ -397,6 +511,12 @@ int SrsServer::listen() return ret; } +int SrsServer::register_signal() +{ + // start signal process thread. + return signal_manager->start(); +} + int SrsServer::ingest() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index d0fcdb279..0ac856e18 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -64,7 +64,7 @@ private: SrsServer* _server; SrsThread* pthread; public: - SrsListener(SrsServer* _server, SrsListenerType type); + SrsListener(SrsServer* server, SrsListenerType type); virtual ~SrsListener(); public: virtual SrsListenerType type(); @@ -75,9 +75,39 @@ public: virtual int cycle(); }; +/** +* convert signal to io, +* @see: st-1.9/docs/notes.html +*/ +class SrsSignalManager : public ISrsThreadHandler +{ +private: + /* Per-process pipe which is used as a signal queue. */ + /* Up to PIPE_BUF/sizeof(int) signals can be queued up. */ + int sig_pipe[2]; + st_netfd_t signal_read_stfd; +private: + SrsServer* _server; + SrsThread* pthread; +public: + SrsSignalManager(SrsServer* server); + virtual ~SrsSignalManager(); +public: + virtual int initialize(); + virtual int start(); +// interface ISrsThreadHandler. +public: + virtual int cycle(); +private: + // global singleton instance + static SrsSignalManager* instance; + /* Signal catching function. */ + /* Converts signal event to I/O event. */ + static void sig_catcher(int signo); +}; + class SrsServer : public ISrsReloadHandler { - friend class SrsListener; private: #ifdef SRS_AUTO_HTTP_API SrsHttpHandler* http_api_handler; @@ -92,6 +122,7 @@ private: int pid_fd; std::vector conns; std::vector listeners; + SrsSignalManager* signal_manager; bool signal_reload; bool signal_gmc_stop; public: @@ -99,9 +130,11 @@ public: virtual ~SrsServer(); public: virtual int initialize(); + virtual int initialize_signal(); virtual int acquire_pid_file(); virtual int initialize_st(); virtual int listen(); + virtual int register_signal(); virtual int ingest(); virtual int cycle(); virtual void remove(SrsConnection* conn); @@ -111,6 +144,8 @@ private: virtual int listen_http_api(); virtual int listen_http_stream(); virtual void close_listeners(SrsListenerType type); +// internal only +public: virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd); // interface ISrsThreadHandler. public: diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 6f385f4e3..5b9334613 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "84" +#define VERSION_REVISION "85" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "srs" diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 7d2c9a6a3..fd8c5dbe9 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -114,6 +114,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_SYSTEM_FILE_WRITE 427 #define ERROR_SYSTEM_FILE_EOF 428 #define ERROR_SYSTEM_FILE_RENAME 429 +#define ERROR_SYSTEM_CREATE_PIPE 430 // see librtmp. // failed when open ssl create the dh diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 0f3a30400..ac7dfc5ff 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -24,8 +24,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -#include - #include #include @@ -56,13 +54,6 @@ ISrsThreadContext* _srs_context = new SrsThreadContext(); SrsConfig* _srs_config = new SrsConfig(); SrsServer* _srs_server = new SrsServer(); -// signal handler -void handler(int signo) -{ - srs_trace("get a signal, signo=%d", signo); - _srs_server->on_signal(signo); -} - // main entrance. int main(int argc, char** argv) { @@ -166,9 +157,9 @@ int run_master() { int ret = ERROR_SUCCESS; - signal(SIGNAL_RELOAD, handler); - signal(SIGTERM, handler); - signal(SIGINT, handler); + if ((ret = _srs_server->initialize_signal()) != ERROR_SUCCESS) { + return ret; + } if ((ret = _srs_server->acquire_pid_file()) != ERROR_SUCCESS) { return ret; @@ -182,6 +173,10 @@ int run_master() return ret; } + if ((ret = _srs_server->register_signal()) != ERROR_SUCCESS) { + return ret; + } + if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) { return ret; }