From 0e1861b0848c87632390198944954906b0e5e930 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 24 Aug 2015 21:51:05 +0800 Subject: [PATCH] for #367: extract the process from ffmpeg to exec programs. --- trunk/configure | 2 +- trunk/ide/srs_upp/srs_upp.upp | 2 + .../srs_xcode.xcodeproj/project.pbxproj | 6 + trunk/src/app/srs_app_ffmpeg.cpp | 211 ++++------------- trunk/src/app/srs_app_ffmpeg.hpp | 27 +-- trunk/src/app/srs_app_process.cpp | 216 ++++++++++++++++++ trunk/src/app/srs_app_process.hpp | 99 ++++++++ trunk/src/libs/srs_librtmp.hpp | 10 +- 8 files changed, 377 insertions(+), 196 deletions(-) create mode 100644 trunk/src/app/srs_app_process.cpp create mode 100644 trunk/src/app/srs_app_process.hpp diff --git a/trunk/configure b/trunk/configure index f479e1272..5ef3bd33d 100755 --- a/trunk/configure +++ b/trunk/configure @@ -176,7 +176,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds" "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" - "srs_app_caster_flv") + "srs_app_caster_flv" "srs_app_process") DEFINES="" # add each modules for app for SRS_MODULE in ${SRS_MODULES[*]}; do diff --git a/trunk/ide/srs_upp/srs_upp.upp b/trunk/ide/srs_upp/srs_upp.upp index f3a43e124..116621e69 100755 --- a/trunk/ide/srs_upp/srs_upp.upp +++ b/trunk/ide/srs_upp/srs_upp.upp @@ -117,6 +117,8 @@ file ../../src/app/srs_app_log.cpp, ../../src/app/srs_app_mpegts_udp.hpp, ../../src/app/srs_app_mpegts_udp.cpp, + ../../src/app/srs_app_process.hpp, + ../../src/app/srs_app_process.cpp, ../../src/app/srs_app_recv_thread.hpp, ../../src/app/srs_app_recv_thread.cpp, ../../src/app/srs_app_refer.hpp, diff --git a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj index a9b9fd372..0e305b70d 100644 --- a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj +++ b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj @@ -79,6 +79,7 @@ 3C36DB5B1ABD1CB90066CCAF /* srs_lib_bandwidth.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB551ABD1CB90066CCAF /* srs_lib_bandwidth.cpp */; }; 3C36DB5C1ABD1CB90066CCAF /* srs_lib_simple_socket.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB571ABD1CB90066CCAF /* srs_lib_simple_socket.cpp */; }; 3C36DB5D1ABD1CB90066CCAF /* srs_librtmp.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB591ABD1CB90066CCAF /* srs_librtmp.cpp */; }; + 3C4F97121B8B466D00FF0E46 /* srs_app_process.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C4F97101B8B466D00FF0E46 /* srs_app_process.cpp */; }; 3C5265B41B241BF0009CA186 /* srs_core_mem_watch.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C5265B21B241BF0009CA186 /* srs_core_mem_watch.cpp */; }; 3C663F0F1AB0155100286D8B /* srs_aac_raw_publish.c in Sources */ = {isa = PBXBuildFile; fileRef = 3C663F021AB0155100286D8B /* srs_aac_raw_publish.c */; }; 3C663F101AB0155100286D8B /* srs_audio_raw_publish.c in Sources */ = {isa = PBXBuildFile; fileRef = 3C663F031AB0155100286D8B /* srs_audio_raw_publish.c */; }; @@ -328,6 +329,8 @@ 3C36DB581ABD1CB90066CCAF /* srs_lib_simple_socket.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_lib_simple_socket.hpp; path = ../../../src/libs/srs_lib_simple_socket.hpp; sourceTree = ""; }; 3C36DB591ABD1CB90066CCAF /* srs_librtmp.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_librtmp.cpp; path = ../../../src/libs/srs_librtmp.cpp; sourceTree = ""; }; 3C36DB5A1ABD1CB90066CCAF /* srs_librtmp.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_librtmp.hpp; path = ../../../src/libs/srs_librtmp.hpp; sourceTree = ""; }; + 3C4F97101B8B466D00FF0E46 /* srs_app_process.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_process.cpp; path = ../../../src/app/srs_app_process.cpp; sourceTree = ""; }; + 3C4F97111B8B466D00FF0E46 /* srs_app_process.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_process.hpp; path = ../../../src/app/srs_app_process.hpp; sourceTree = ""; }; 3C5265B21B241BF0009CA186 /* srs_core_mem_watch.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_core_mem_watch.cpp; path = ../../../src/core/srs_core_mem_watch.cpp; sourceTree = ""; }; 3C5265B31B241BF0009CA186 /* srs_core_mem_watch.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_core_mem_watch.hpp; path = ../../../src/core/srs_core_mem_watch.hpp; sourceTree = ""; }; 3C663F021AB0155100286D8B /* srs_aac_raw_publish.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; name = srs_aac_raw_publish.c; path = ../../../research/librtmp/srs_aac_raw_publish.c; sourceTree = ""; }; @@ -573,6 +576,8 @@ 3C1232771AAE81D900CE8F6C /* srs_app_mpegts_udp.hpp */, 3C1232781AAE81D900CE8F6C /* srs_app_pithy_print.cpp */, 3C1232791AAE81D900CE8F6C /* srs_app_pithy_print.hpp */, + 3C4F97101B8B466D00FF0E46 /* srs_app_process.cpp */, + 3C4F97111B8B466D00FF0E46 /* srs_app_process.hpp */, 3C12327A1AAE81D900CE8F6C /* srs_app_recv_thread.cpp */, 3C12327B1AAE81D900CE8F6C /* srs_app_recv_thread.hpp */, 3C12327C1AAE81D900CE8F6C /* srs_app_refer.cpp */, @@ -911,6 +916,7 @@ 3C1232B21AAE81D900CE8F6C /* srs_app_source.cpp in Sources */, 3C1231F71AAE652D00CE8F6C /* srs_core_performance.cpp in Sources */, 3CC52DD81ACE4023006FEB01 /* srs_utest_amf0.cpp in Sources */, + 3C4F97121B8B466D00FF0E46 /* srs_app_process.cpp in Sources */, 3C1232981AAE81D900CE8F6C /* srs_app_edge.cpp in Sources */, 3CC52DDB1ACE4023006FEB01 /* srs_utest_kernel.cpp in Sources */, 3C689F9E1AB6AAC800C9CEEE /* md.S in Sources */, diff --git a/trunk/src/app/srs_app_ffmpeg.cpp b/trunk/src/app/srs_app_ffmpeg.cpp index 85b19518c..eacdd04f6 100644 --- a/trunk/src/app/srs_app_ffmpeg.cpp +++ b/trunk/src/app/srs_app_ffmpeg.cpp @@ -30,12 +30,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include using namespace std; #include #include #include #include +#include +#include #ifdef SRS_AUTO_FFMPEG_STUB @@ -52,9 +55,6 @@ using namespace std; SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin) { - started = false; - fast_stopped = false; - pid = -1; ffmpeg = ffmpeg_bin; vbitrate = 0; @@ -65,11 +65,15 @@ SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin) abitrate = 0; asample_rate = 0; achannels = 0; + + process = new SrsProcess(); } SrsFFMPEG::~SrsFFMPEG() { stop(); + + srs_freep(process); } void SrsFFMPEG::set_iparams(string iparams) @@ -236,17 +240,22 @@ int SrsFFMPEG::start() { int ret = ERROR_SUCCESS; - if (started) { + if (process->started()) { return ret; } // prepare exec params - char tmp[256]; - std::vector params; + // @remark we should never use stack variable, use heap to alloc to make lldb happy. + #define SRS_TMP_SIZE 512 + char* tmp = new char[SRS_TMP_SIZE]; + SrsAutoFree(char, tmp); + + // the argv for process. + params.clear(); // argv[0], set to ffmpeg bin. // The execv() and execvp() functions .... - // The first argument, by convention, should point to + // The first argument, by convention, should point to // the filename associated with the file being executed. params.push_back(ffmpeg); @@ -287,32 +296,32 @@ int SrsFFMPEG::start() if (vcodec != SRS_RTMP_ENCODER_COPY && vcodec != SRS_RTMP_ENCODER_NO_VIDEO) { if (vbitrate > 0) { params.push_back("-b:v"); - snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000); + snprintf(tmp, SRS_TMP_SIZE, "%d", vbitrate * 1000); params.push_back(tmp); } if (vfps > 0) { params.push_back("-r"); - snprintf(tmp, sizeof(tmp), "%.2f", vfps); + snprintf(tmp, SRS_TMP_SIZE, "%.2f", vfps); params.push_back(tmp); } if (vwidth > 0 && vheight > 0) { params.push_back("-s"); - snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight); + snprintf(tmp, SRS_TMP_SIZE, "%dx%d", vwidth, vheight); params.push_back(tmp); } // TODO: add aspect if needed. if (vwidth > 0 && vheight > 0) { params.push_back("-aspect"); - snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight); + snprintf(tmp, SRS_TMP_SIZE, "%d:%d", vwidth, vheight); params.push_back(tmp); } if (vthreads > 0) { params.push_back("-threads"); - snprintf(tmp, sizeof(tmp), "%d", vthreads); + snprintf(tmp, SRS_TMP_SIZE, "%d", vthreads); params.push_back(tmp); } @@ -347,19 +356,19 @@ int SrsFFMPEG::start() if (acodec != SRS_RTMP_ENCODER_COPY) { if (abitrate > 0) { params.push_back("-b:a"); - snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000); + snprintf(tmp, SRS_TMP_SIZE, "%d", abitrate * 1000); params.push_back(tmp); } if (asample_rate > 0) { params.push_back("-ar"); - snprintf(tmp, sizeof(tmp), "%d", asample_rate); + snprintf(tmp, SRS_TMP_SIZE, "%d", asample_rate); params.push_back(tmp); } if (achannels > 0) { params.push_back("-ac"); - snprintf(tmp, sizeof(tmp), "%d", achannels); + snprintf(tmp, SRS_TMP_SIZE, "%d", achannels); params.push_back(tmp); } @@ -387,7 +396,7 @@ int SrsFFMPEG::start() } } } - + // output if (oformat != "off" && !oformat.empty()) { params.push_back("-f"); @@ -396,176 +405,40 @@ int SrsFFMPEG::start() params.push_back("-y"); params.push_back(_output); - - std::string cli; - if (true) { - for (int i = 0; i < (int)params.size(); i++) { - std::string ffp = params[i]; - cli += ffp; - if (i < (int)params.size() - 1) { - cli += " "; - } - } - srs_trace("start ffmpeg, log: %s, params: %s", log_file.c_str(), cli.c_str()); + + // when specified the log file. + if (false && !log_file.empty()) { + // stdout + params.push_back("1"); + params.push_back(">"); + params.push_back(log_file); + // stderr + params.push_back("2"); + params.push_back(">"); + params.push_back(log_file); } - // for log - int cid = _srs_context->get_id(); - - // TODO: fork or vfork? - if ((pid = fork()) < 0) { - ret = ERROR_ENCODER_FORK; - srs_error("vfork process failed. ret=%d", ret); + // initialize the process. + if ((ret = process->initialize(ffmpeg, params)) != ERROR_SUCCESS) { return ret; } - // child process: ffmpeg encoder engine. - if (pid == 0) { - // ignore the SIGINT and SIGTERM - signal(SIGINT, SIG_IGN); - signal(SIGTERM, SIG_IGN); - - // redirect logs to file. - int log_fd = -1; - int flags = O_CREAT|O_WRONLY|O_APPEND; - mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH; - if ((log_fd = ::open(log_file.c_str(), flags, mode)) < 0) { - ret = ERROR_ENCODER_OPEN; - srs_error("open encoder file %s failed. ret=%d", log_file.c_str(), ret); - return ret; - } - - // log basic info - if (true) { - char buf[4096]; - int pos = 0; - pos += snprintf(buf + pos, sizeof(buf) - pos, "\n"); - pos += snprintf(buf + pos, sizeof(buf) - pos, "ffmpeg cid=%d\n", cid); - pos += snprintf(buf + pos, sizeof(buf) - pos, "log=%s\n", log_file.c_str()); - pos += snprintf(buf + pos, sizeof(buf) - pos, "params: %s\n", cli.c_str()); - ::write(log_fd, buf, pos); - } - - // dup to stdout and stderr. - if (dup2(log_fd, STDOUT_FILENO) < 0) { - ret = ERROR_ENCODER_DUP2; - srs_error("dup2 encoder file failed. ret=%d", ret); - return ret; - } - if (dup2(log_fd, STDERR_FILENO) < 0) { - ret = ERROR_ENCODER_DUP2; - srs_error("dup2 encoder file failed. ret=%d", ret); - return ret; - } - - // close log fd - ::close(log_fd); - // close other fds - // TODO: do in right way. - for (int i = 3; i < 1024; i++) { - ::close(i); - } - - // memory leak in child process, it's ok. - char** charpv_params = new char*[params.size() + 1]; - for (int i = 0; i < (int)params.size(); i++) { - std::string& p = params[i]; - charpv_params[i] = (char*)p.data(); - } - // EOF: NULL - charpv_params[params.size()] = NULL; - - // TODO: execv or execvp - ret = execv(ffmpeg.c_str(), charpv_params); - if (ret < 0) { - fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)", - errno, strerror(errno)); - } - exit(ret); - } - - // parent. - if (pid > 0) { - started = true; - srs_trace("vfored ffmpeg encoder engine, pid=%d", pid); - return ret; - } - - return ret; + return process->start(); } int SrsFFMPEG::cycle() { - int ret = ERROR_SUCCESS; - - if (!started) { - return ret; - } - - // ffmpeg is prepare to stop, donot cycle. - if (fast_stopped) { - return ret; - } - - int status = 0; - pid_t p = waitpid(pid, &status, WNOHANG); - - if (p < 0) { - ret = ERROR_SYSTEM_WAITPID; - srs_error("transcode waitpid failed, pid=%d, ret=%d", pid, ret); - return ret; - } - - if (p == 0) { - srs_info("transcode process pid=%d is running.", pid); - return ret; - } - - srs_trace("transcode process pid=%d terminate, restart it.", pid); - started = false; - - return ret; + return process->cycle(); } void SrsFFMPEG::stop() { - if (!started) { - return; - } - - // kill the ffmpeg, - // when rewind, upstream will stop publish(unpublish), - // unpublish event will stop all ffmpeg encoders, - // then publish will start all ffmpeg encoders. - int ret = srs_kill_forced(pid); - if (ret != ERROR_SUCCESS) { - srs_warn("ignore kill the encoder failed, pid=%d. ret=%d", pid, ret); - return; - } - - // terminated, set started to false to stop the cycle. - started = false; + process->stop(); } void SrsFFMPEG::fast_stop() { - int ret = ERROR_SUCCESS; - - if (!started) { - return; - } - - if (pid <= 0) { - return; - } - - if (kill(pid, SIGTERM) < 0) { - ret = ERROR_SYSTEM_KILL; - srs_warn("ignore fast stop ffmpeg failed, pid=%d. ret=%d", pid, ret); - return; - } - - return; + process->fast_stop(); } #endif diff --git a/trunk/src/app/srs_app_ffmpeg.hpp b/trunk/src/app/srs_app_ffmpeg.hpp index 4e9f10130..61a43bedf 100644 --- a/trunk/src/app/srs_app_ffmpeg.hpp +++ b/trunk/src/app/srs_app_ffmpeg.hpp @@ -31,11 +31,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #ifdef SRS_AUTO_FFMPEG_STUB -#include #include +#include class SrsConfDirective; class SrsPithyPrint; +class SrsProcess; /** * a transcode engine: ffmepg, @@ -44,11 +45,8 @@ class SrsPithyPrint; class SrsFFMPEG { private: - bool started; - // whether SIGTERM send but need to wait or SIGKILL. - bool fast_stopped; - pid_t pid; -private: + SrsProcess* process; + std::vector params; std::string log_file; private: std::string ffmpeg; @@ -83,26 +81,11 @@ public: virtual int initialize(std::string in, std::string out, std::string log); virtual int initialize_transcode(SrsConfDirective* engine); virtual int initialize_copy(); +public: virtual int start(); virtual int cycle(); - /** - * send SIGTERM then SIGKILL to ensure the process stopped. - * the stop will wait [0, SRS_PROCESS_QUIT_TIMEOUT_MS] depends on the - * process quit timeout. - * @remark use fast_stop before stop one by one, when got lots of process to quit. - */ virtual void stop(); public: - /** - * the fast stop is to send a SIGTERM. - * for example, the ingesters owner lots of FFMPEG, it will take a long time - * to stop one by one, instead the ingesters can fast_stop all FFMPEG, then - * wait one by one to stop, it's more faster. - * @remark user must use stop() to ensure the ffmpeg to stopped. - * @remark we got N processes to stop, compare the time we spend, - * when use stop without fast_stop, we spend maybe [0, SRS_PROCESS_QUIT_TIMEOUT_MS * N] - * but use fast_stop then stop, the time is almost [0, SRS_PROCESS_QUIT_TIMEOUT_MS]. - */ virtual void fast_stop(); }; diff --git a/trunk/src/app/srs_app_process.cpp b/trunk/src/app/srs_app_process.cpp new file mode 100644 index 000000000..f43d41067 --- /dev/null +++ b/trunk/src/app/srs_app_process.cpp @@ -0,0 +1,216 @@ +/* + The MIT License (MIT) + + Copyright (c) 2013-2015 SRS(simple-rtmp-server) + + Permission is hereby granted, free of charge, to any person obtaining a copy of + this software and associated documentation files (the "Software"), to deal in + the Software without restriction, including without limitation the rights to + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + the Software, and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +#include +#include +#include +#include + +using namespace std; + +#include +#include +#include +#include + +SrsProcess::SrsProcess() +{ + is_started = false; + fast_stopped = false; + pid = -1; +} + +SrsProcess::~SrsProcess() +{ +} + +bool SrsProcess::started() +{ + return is_started; +} + +int SrsProcess::initialize(string binary, vector argv) +{ + int ret = ERROR_SUCCESS; + + bin = binary; + params = argv; + + return ret; +} + +int SrsProcess::start() +{ + int ret = ERROR_SUCCESS; + + if (is_started) { + return ret; + } + + std::string cli; + if (true) { + for (int i = 0; i < (int)params.size(); i++) { + std::string ffp = params[i]; + cli += ffp; + if (i < (int)params.size() - 1) { + cli += " "; + } + } + srs_trace("fork process: %s %s", bin.c_str(), cli.c_str()); + } + + // for log + int cid = _srs_context->get_id(); + + // TODO: fork or vfork? + if ((pid = fork()) < 0) { + ret = ERROR_ENCODER_FORK; + srs_error("vfork process failed. ret=%d", ret); + return ret; + } + + // child process: ffmpeg encoder engine. + if (pid == 0) { + // ignore the SIGINT and SIGTERM + signal(SIGINT, SIG_IGN); + signal(SIGTERM, SIG_IGN); + + // log basic info + if (true) { + fprintf(stderr, "\n"); + fprintf(stderr, "process parent cid=%d", cid); + fprintf(stderr, "process binary=%s", bin.c_str()); + fprintf(stderr, "process params: %s %s", bin.c_str(), cli.c_str()); + } + + // close other fds + // TODO: do in right way. + for (int i = 3; i < 1024; i++) { + ::close(i); + } + + // memory leak in child process, it's ok. + char** charpv_params = new char*[params.size() + 1]; + for (int i = 0; i < (int)params.size(); i++) { + std::string& p = params[i]; + charpv_params[i] = (char*)p.data(); + } + // EOF: NULL + charpv_params[params.size()] = NULL; + + // TODO: execv or execvp + ret = execv(bin.c_str(), charpv_params); + if (ret < 0) { + fprintf(stderr, "fork process failed, errno=%d(%s)", errno, strerror(errno)); + } + exit(ret); + } + + // parent. + if (pid > 0) { + is_started = true; + srs_trace("vfored process, pid=%d, cli=%s", pid, bin.c_str()); + return ret; + } + + return ret; +} + +int SrsProcess::cycle() +{ + int ret = ERROR_SUCCESS; + + if (!is_started) { + return ret; + } + + // ffmpeg is prepare to stop, donot cycle. + if (fast_stopped) { + return ret; + } + + int status = 0; + pid_t p = waitpid(pid, &status, WNOHANG); + + if (p < 0) { + ret = ERROR_SYSTEM_WAITPID; + srs_error("process waitpid failed, pid=%d, ret=%d", pid, ret); + return ret; + } + + if (p == 0) { + srs_info("process process pid=%d is running.", pid); + return ret; + } + + srs_trace("process pid=%d terminate, restart it.", pid); + is_started = false; + + return ret; +} + +void SrsProcess::stop() +{ + if (!is_started) { + return; + } + + // kill the ffmpeg, + // when rewind, upstream will stop publish(unpublish), + // unpublish event will stop all ffmpeg encoders, + // then publish will start all ffmpeg encoders. + int ret = srs_kill_forced(pid); + if (ret != ERROR_SUCCESS) { + srs_warn("ignore kill the process failed, pid=%d. ret=%d", pid, ret); + return; + } + + // terminated, set started to false to stop the cycle. + is_started = false; +} + +void SrsProcess::fast_stop() +{ + int ret = ERROR_SUCCESS; + + if (!is_started) { + return; + } + + if (pid <= 0) { + return; + } + + if (kill(pid, SIGTERM) < 0) { + ret = ERROR_SYSTEM_KILL; + srs_warn("ignore fast stop process failed, pid=%d. ret=%d", pid, ret); + return; + } + + return; +} + diff --git a/trunk/src/app/srs_app_process.hpp b/trunk/src/app/srs_app_process.hpp new file mode 100644 index 000000000..9b999b06e --- /dev/null +++ b/trunk/src/app/srs_app_process.hpp @@ -0,0 +1,99 @@ +/* + The MIT License (MIT) + + Copyright (c) 2013-2015 SRS(simple-rtmp-server) + + Permission is hereby granted, free of charge, to any person obtaining a copy of + this software and associated documentation files (the "Software"), to deal in + the Software without restriction, including without limitation the rights to + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + the Software, and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_PROCESS_HPP +#define SRS_APP_PROCESS_HPP + +/* +#include +*/ +#include + +#include +#include + +/** + * to start and stop a process, cycle to restart the process when terminated. + * the usage: + * process = new SrsProcess(); + * if ((ret = process->initialize(binary, argv)) != ERROR_SUCCESS) { return ret; } + * if ((ret = process->start()) != ERROR_SUCCESS) { return ret; } + * if ((ret = process->cycle()) != ERROR_SUCCESS) { return ret; } + * process->fast_stop(); + * process->stop(); + */ +class SrsProcess +{ +private: + bool is_started; + // whether SIGTERM send but need to wait or SIGKILL. + bool fast_stopped; + pid_t pid; +private: + std::string bin; + std::vector params; +public: + SrsProcess(); + virtual ~SrsProcess(); +public: + /** + * whether process is already started. + */ + virtual bool started(); + /** + * initialize the process with binary and argv. + */ + virtual int initialize(std::string binary, std::vector argv); +public: + /** + * start the process, ignore when already started. + */ + virtual int start(); + /** + * cycle check the process, update the state of process. + * @remark when process terminated(not started), user can restart it again by start(). + */ + virtual int cycle(); + /** + * send SIGTERM then SIGKILL to ensure the process stopped. + * the stop will wait [0, SRS_PROCESS_QUIT_TIMEOUT_MS] depends on the + * process quit timeout. + * @remark use fast_stop before stop one by one, when got lots of process to quit. + */ + virtual void stop(); +public: + /** + * the fast stop is to send a SIGTERM. + * for example, the ingesters owner lots of FFMPEG, it will take a long time + * to stop one by one, instead the ingesters can fast_stop all FFMPEG, then + * wait one by one to stop, it's more faster. + * @remark user must use stop() to ensure the ffmpeg to stopped. + * @remark we got N processes to stop, compare the time we spend, + * when use stop without fast_stop, we spend maybe [0, SRS_PROCESS_QUIT_TIMEOUT_MS * N] + * but use fast_stop then stop, the time is almost [0, SRS_PROCESS_QUIT_TIMEOUT_MS]. + */ + virtual void fast_stop(); +}; + +#endif + diff --git a/trunk/src/libs/srs_librtmp.hpp b/trunk/src/libs/srs_librtmp.hpp index ffa074f0f..a163c378c 100644 --- a/trunk/src/libs/srs_librtmp.hpp +++ b/trunk/src/libs/srs_librtmp.hpp @@ -920,7 +920,11 @@ extern int srs_human_print_rtmp_packet4(char type, u_int32_t timestamp, char* da // log to console, for use srs-librtmp application. extern const char* srs_human_format_time(); - + +#ifndef _WIN32 + // for getpid. + #include +#endif // when disabled log, donot compile it. #ifdef SRS_DISABLE_LOG #define srs_human_trace(msg, ...) (void)0 @@ -936,15 +940,13 @@ extern const char* srs_human_format_time(); ************************************************************** * IO hijack, use your specified io functions. ************************************************************** -*************************************************************/ + *************************************************************/ // the void* will convert to your handler for io hijack. typedef void* srs_hijack_io_t; #ifdef SRS_HIJACK_IO #ifndef _WIN32 // for iovec. #include - // for getpid. - #include #endif /** * get the hijack io object in rtmp protocol sdk.