1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for #367: extract the process from ffmpeg to exec programs.

This commit is contained in:
winlin 2015-08-24 21:51:05 +08:00
parent 3a8c03a471
commit 0e1861b084
8 changed files with 377 additions and 196 deletions

2
trunk/configure vendored
View file

@ -176,7 +176,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
"srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static"
"srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds"
"srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call"
"srs_app_caster_flv") "srs_app_caster_flv" "srs_app_process")
DEFINES="" DEFINES=""
# add each modules for app # add each modules for app
for SRS_MODULE in ${SRS_MODULES[*]}; do for SRS_MODULE in ${SRS_MODULES[*]}; do

View file

@ -117,6 +117,8 @@ file
../../src/app/srs_app_log.cpp, ../../src/app/srs_app_log.cpp,
../../src/app/srs_app_mpegts_udp.hpp, ../../src/app/srs_app_mpegts_udp.hpp,
../../src/app/srs_app_mpegts_udp.cpp, ../../src/app/srs_app_mpegts_udp.cpp,
../../src/app/srs_app_process.hpp,
../../src/app/srs_app_process.cpp,
../../src/app/srs_app_recv_thread.hpp, ../../src/app/srs_app_recv_thread.hpp,
../../src/app/srs_app_recv_thread.cpp, ../../src/app/srs_app_recv_thread.cpp,
../../src/app/srs_app_refer.hpp, ../../src/app/srs_app_refer.hpp,

View file

@ -79,6 +79,7 @@
3C36DB5B1ABD1CB90066CCAF /* srs_lib_bandwidth.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB551ABD1CB90066CCAF /* srs_lib_bandwidth.cpp */; }; 3C36DB5B1ABD1CB90066CCAF /* srs_lib_bandwidth.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB551ABD1CB90066CCAF /* srs_lib_bandwidth.cpp */; };
3C36DB5C1ABD1CB90066CCAF /* srs_lib_simple_socket.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB571ABD1CB90066CCAF /* srs_lib_simple_socket.cpp */; }; 3C36DB5C1ABD1CB90066CCAF /* srs_lib_simple_socket.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB571ABD1CB90066CCAF /* srs_lib_simple_socket.cpp */; };
3C36DB5D1ABD1CB90066CCAF /* srs_librtmp.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB591ABD1CB90066CCAF /* srs_librtmp.cpp */; }; 3C36DB5D1ABD1CB90066CCAF /* srs_librtmp.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB591ABD1CB90066CCAF /* srs_librtmp.cpp */; };
3C4F97121B8B466D00FF0E46 /* srs_app_process.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C4F97101B8B466D00FF0E46 /* srs_app_process.cpp */; };
3C5265B41B241BF0009CA186 /* srs_core_mem_watch.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C5265B21B241BF0009CA186 /* srs_core_mem_watch.cpp */; }; 3C5265B41B241BF0009CA186 /* srs_core_mem_watch.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C5265B21B241BF0009CA186 /* srs_core_mem_watch.cpp */; };
3C663F0F1AB0155100286D8B /* srs_aac_raw_publish.c in Sources */ = {isa = PBXBuildFile; fileRef = 3C663F021AB0155100286D8B /* srs_aac_raw_publish.c */; }; 3C663F0F1AB0155100286D8B /* srs_aac_raw_publish.c in Sources */ = {isa = PBXBuildFile; fileRef = 3C663F021AB0155100286D8B /* srs_aac_raw_publish.c */; };
3C663F101AB0155100286D8B /* srs_audio_raw_publish.c in Sources */ = {isa = PBXBuildFile; fileRef = 3C663F031AB0155100286D8B /* srs_audio_raw_publish.c */; }; 3C663F101AB0155100286D8B /* srs_audio_raw_publish.c in Sources */ = {isa = PBXBuildFile; fileRef = 3C663F031AB0155100286D8B /* srs_audio_raw_publish.c */; };
@ -328,6 +329,8 @@
3C36DB581ABD1CB90066CCAF /* srs_lib_simple_socket.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_lib_simple_socket.hpp; path = ../../../src/libs/srs_lib_simple_socket.hpp; sourceTree = "<group>"; }; 3C36DB581ABD1CB90066CCAF /* srs_lib_simple_socket.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_lib_simple_socket.hpp; path = ../../../src/libs/srs_lib_simple_socket.hpp; sourceTree = "<group>"; };
3C36DB591ABD1CB90066CCAF /* srs_librtmp.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_librtmp.cpp; path = ../../../src/libs/srs_librtmp.cpp; sourceTree = "<group>"; }; 3C36DB591ABD1CB90066CCAF /* srs_librtmp.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_librtmp.cpp; path = ../../../src/libs/srs_librtmp.cpp; sourceTree = "<group>"; };
3C36DB5A1ABD1CB90066CCAF /* srs_librtmp.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_librtmp.hpp; path = ../../../src/libs/srs_librtmp.hpp; sourceTree = "<group>"; }; 3C36DB5A1ABD1CB90066CCAF /* srs_librtmp.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_librtmp.hpp; path = ../../../src/libs/srs_librtmp.hpp; sourceTree = "<group>"; };
3C4F97101B8B466D00FF0E46 /* srs_app_process.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_process.cpp; path = ../../../src/app/srs_app_process.cpp; sourceTree = "<group>"; };
3C4F97111B8B466D00FF0E46 /* srs_app_process.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_process.hpp; path = ../../../src/app/srs_app_process.hpp; sourceTree = "<group>"; };
3C5265B21B241BF0009CA186 /* srs_core_mem_watch.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_core_mem_watch.cpp; path = ../../../src/core/srs_core_mem_watch.cpp; sourceTree = "<group>"; }; 3C5265B21B241BF0009CA186 /* srs_core_mem_watch.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_core_mem_watch.cpp; path = ../../../src/core/srs_core_mem_watch.cpp; sourceTree = "<group>"; };
3C5265B31B241BF0009CA186 /* srs_core_mem_watch.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_core_mem_watch.hpp; path = ../../../src/core/srs_core_mem_watch.hpp; sourceTree = "<group>"; }; 3C5265B31B241BF0009CA186 /* srs_core_mem_watch.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_core_mem_watch.hpp; path = ../../../src/core/srs_core_mem_watch.hpp; sourceTree = "<group>"; };
3C663F021AB0155100286D8B /* srs_aac_raw_publish.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; name = srs_aac_raw_publish.c; path = ../../../research/librtmp/srs_aac_raw_publish.c; sourceTree = "<group>"; }; 3C663F021AB0155100286D8B /* srs_aac_raw_publish.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; name = srs_aac_raw_publish.c; path = ../../../research/librtmp/srs_aac_raw_publish.c; sourceTree = "<group>"; };
@ -573,6 +576,8 @@
3C1232771AAE81D900CE8F6C /* srs_app_mpegts_udp.hpp */, 3C1232771AAE81D900CE8F6C /* srs_app_mpegts_udp.hpp */,
3C1232781AAE81D900CE8F6C /* srs_app_pithy_print.cpp */, 3C1232781AAE81D900CE8F6C /* srs_app_pithy_print.cpp */,
3C1232791AAE81D900CE8F6C /* srs_app_pithy_print.hpp */, 3C1232791AAE81D900CE8F6C /* srs_app_pithy_print.hpp */,
3C4F97101B8B466D00FF0E46 /* srs_app_process.cpp */,
3C4F97111B8B466D00FF0E46 /* srs_app_process.hpp */,
3C12327A1AAE81D900CE8F6C /* srs_app_recv_thread.cpp */, 3C12327A1AAE81D900CE8F6C /* srs_app_recv_thread.cpp */,
3C12327B1AAE81D900CE8F6C /* srs_app_recv_thread.hpp */, 3C12327B1AAE81D900CE8F6C /* srs_app_recv_thread.hpp */,
3C12327C1AAE81D900CE8F6C /* srs_app_refer.cpp */, 3C12327C1AAE81D900CE8F6C /* srs_app_refer.cpp */,
@ -911,6 +916,7 @@
3C1232B21AAE81D900CE8F6C /* srs_app_source.cpp in Sources */, 3C1232B21AAE81D900CE8F6C /* srs_app_source.cpp in Sources */,
3C1231F71AAE652D00CE8F6C /* srs_core_performance.cpp in Sources */, 3C1231F71AAE652D00CE8F6C /* srs_core_performance.cpp in Sources */,
3CC52DD81ACE4023006FEB01 /* srs_utest_amf0.cpp in Sources */, 3CC52DD81ACE4023006FEB01 /* srs_utest_amf0.cpp in Sources */,
3C4F97121B8B466D00FF0E46 /* srs_app_process.cpp in Sources */,
3C1232981AAE81D900CE8F6C /* srs_app_edge.cpp in Sources */, 3C1232981AAE81D900CE8F6C /* srs_app_edge.cpp in Sources */,
3CC52DDB1ACE4023006FEB01 /* srs_utest_kernel.cpp in Sources */, 3CC52DDB1ACE4023006FEB01 /* srs_utest_kernel.cpp in Sources */,
3C689F9E1AB6AAC800C9CEEE /* md.S in Sources */, 3C689F9E1AB6AAC800C9CEEE /* md.S in Sources */,

View file

@ -30,12 +30,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <signal.h> #include <signal.h>
#include <sys/types.h> #include <sys/types.h>
#include <vector>
using namespace std; using namespace std;
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
#include <srs_app_config.hpp> #include <srs_app_config.hpp>
#include <srs_app_utility.hpp> #include <srs_app_utility.hpp>
#include <srs_app_process.hpp>
#include <srs_core_autofree.hpp>
#ifdef SRS_AUTO_FFMPEG_STUB #ifdef SRS_AUTO_FFMPEG_STUB
@ -52,9 +55,6 @@ using namespace std;
SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin) SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
{ {
started = false;
fast_stopped = false;
pid = -1;
ffmpeg = ffmpeg_bin; ffmpeg = ffmpeg_bin;
vbitrate = 0; vbitrate = 0;
@ -65,11 +65,15 @@ SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin)
abitrate = 0; abitrate = 0;
asample_rate = 0; asample_rate = 0;
achannels = 0; achannels = 0;
process = new SrsProcess();
} }
SrsFFMPEG::~SrsFFMPEG() SrsFFMPEG::~SrsFFMPEG()
{ {
stop(); stop();
srs_freep(process);
} }
void SrsFFMPEG::set_iparams(string iparams) void SrsFFMPEG::set_iparams(string iparams)
@ -236,17 +240,22 @@ int SrsFFMPEG::start()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if (started) { if (process->started()) {
return ret; return ret;
} }
// prepare exec params // prepare exec params
char tmp[256]; // @remark we should never use stack variable, use heap to alloc to make lldb happy.
std::vector<std::string> params; #define SRS_TMP_SIZE 512
char* tmp = new char[SRS_TMP_SIZE];
SrsAutoFree(char, tmp);
// the argv for process.
params.clear();
// argv[0], set to ffmpeg bin. // argv[0], set to ffmpeg bin.
// The execv() and execvp() functions .... // The execv() and execvp() functions ....
// The first argument, by convention, should point to // The first argument, by convention, should point to
// the filename associated with the file being executed. // the filename associated with the file being executed.
params.push_back(ffmpeg); params.push_back(ffmpeg);
@ -287,32 +296,32 @@ int SrsFFMPEG::start()
if (vcodec != SRS_RTMP_ENCODER_COPY && vcodec != SRS_RTMP_ENCODER_NO_VIDEO) { if (vcodec != SRS_RTMP_ENCODER_COPY && vcodec != SRS_RTMP_ENCODER_NO_VIDEO) {
if (vbitrate > 0) { if (vbitrate > 0) {
params.push_back("-b:v"); params.push_back("-b:v");
snprintf(tmp, sizeof(tmp), "%d", vbitrate * 1000); snprintf(tmp, SRS_TMP_SIZE, "%d", vbitrate * 1000);
params.push_back(tmp); params.push_back(tmp);
} }
if (vfps > 0) { if (vfps > 0) {
params.push_back("-r"); params.push_back("-r");
snprintf(tmp, sizeof(tmp), "%.2f", vfps); snprintf(tmp, SRS_TMP_SIZE, "%.2f", vfps);
params.push_back(tmp); params.push_back(tmp);
} }
if (vwidth > 0 && vheight > 0) { if (vwidth > 0 && vheight > 0) {
params.push_back("-s"); params.push_back("-s");
snprintf(tmp, sizeof(tmp), "%dx%d", vwidth, vheight); snprintf(tmp, SRS_TMP_SIZE, "%dx%d", vwidth, vheight);
params.push_back(tmp); params.push_back(tmp);
} }
// TODO: add aspect if needed. // TODO: add aspect if needed.
if (vwidth > 0 && vheight > 0) { if (vwidth > 0 && vheight > 0) {
params.push_back("-aspect"); params.push_back("-aspect");
snprintf(tmp, sizeof(tmp), "%d:%d", vwidth, vheight); snprintf(tmp, SRS_TMP_SIZE, "%d:%d", vwidth, vheight);
params.push_back(tmp); params.push_back(tmp);
} }
if (vthreads > 0) { if (vthreads > 0) {
params.push_back("-threads"); params.push_back("-threads");
snprintf(tmp, sizeof(tmp), "%d", vthreads); snprintf(tmp, SRS_TMP_SIZE, "%d", vthreads);
params.push_back(tmp); params.push_back(tmp);
} }
@ -347,19 +356,19 @@ int SrsFFMPEG::start()
if (acodec != SRS_RTMP_ENCODER_COPY) { if (acodec != SRS_RTMP_ENCODER_COPY) {
if (abitrate > 0) { if (abitrate > 0) {
params.push_back("-b:a"); params.push_back("-b:a");
snprintf(tmp, sizeof(tmp), "%d", abitrate * 1000); snprintf(tmp, SRS_TMP_SIZE, "%d", abitrate * 1000);
params.push_back(tmp); params.push_back(tmp);
} }
if (asample_rate > 0) { if (asample_rate > 0) {
params.push_back("-ar"); params.push_back("-ar");
snprintf(tmp, sizeof(tmp), "%d", asample_rate); snprintf(tmp, SRS_TMP_SIZE, "%d", asample_rate);
params.push_back(tmp); params.push_back(tmp);
} }
if (achannels > 0) { if (achannels > 0) {
params.push_back("-ac"); params.push_back("-ac");
snprintf(tmp, sizeof(tmp), "%d", achannels); snprintf(tmp, SRS_TMP_SIZE, "%d", achannels);
params.push_back(tmp); params.push_back(tmp);
} }
@ -387,7 +396,7 @@ int SrsFFMPEG::start()
} }
} }
} }
// output // output
if (oformat != "off" && !oformat.empty()) { if (oformat != "off" && !oformat.empty()) {
params.push_back("-f"); params.push_back("-f");
@ -396,176 +405,40 @@ int SrsFFMPEG::start()
params.push_back("-y"); params.push_back("-y");
params.push_back(_output); params.push_back(_output);
std::string cli; // when specified the log file.
if (true) { if (false && !log_file.empty()) {
for (int i = 0; i < (int)params.size(); i++) { // stdout
std::string ffp = params[i]; params.push_back("1");
cli += ffp; params.push_back(">");
if (i < (int)params.size() - 1) { params.push_back(log_file);
cli += " "; // stderr
} params.push_back("2");
} params.push_back(">");
srs_trace("start ffmpeg, log: %s, params: %s", log_file.c_str(), cli.c_str()); params.push_back(log_file);
} }
// for log // initialize the process.
int cid = _srs_context->get_id(); if ((ret = process->initialize(ffmpeg, params)) != ERROR_SUCCESS) {
// TODO: fork or vfork?
if ((pid = fork()) < 0) {
ret = ERROR_ENCODER_FORK;
srs_error("vfork process failed. ret=%d", ret);
return ret; return ret;
} }
// child process: ffmpeg encoder engine. return process->start();
if (pid == 0) {
// ignore the SIGINT and SIGTERM
signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN);
// redirect logs to file.
int log_fd = -1;
int flags = O_CREAT|O_WRONLY|O_APPEND;
mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;
if ((log_fd = ::open(log_file.c_str(), flags, mode)) < 0) {
ret = ERROR_ENCODER_OPEN;
srs_error("open encoder file %s failed. ret=%d", log_file.c_str(), ret);
return ret;
}
// log basic info
if (true) {
char buf[4096];
int pos = 0;
pos += snprintf(buf + pos, sizeof(buf) - pos, "\n");
pos += snprintf(buf + pos, sizeof(buf) - pos, "ffmpeg cid=%d\n", cid);
pos += snprintf(buf + pos, sizeof(buf) - pos, "log=%s\n", log_file.c_str());
pos += snprintf(buf + pos, sizeof(buf) - pos, "params: %s\n", cli.c_str());
::write(log_fd, buf, pos);
}
// dup to stdout and stderr.
if (dup2(log_fd, STDOUT_FILENO) < 0) {
ret = ERROR_ENCODER_DUP2;
srs_error("dup2 encoder file failed. ret=%d", ret);
return ret;
}
if (dup2(log_fd, STDERR_FILENO) < 0) {
ret = ERROR_ENCODER_DUP2;
srs_error("dup2 encoder file failed. ret=%d", ret);
return ret;
}
// close log fd
::close(log_fd);
// close other fds
// TODO: do in right way.
for (int i = 3; i < 1024; i++) {
::close(i);
}
// memory leak in child process, it's ok.
char** charpv_params = new char*[params.size() + 1];
for (int i = 0; i < (int)params.size(); i++) {
std::string& p = params[i];
charpv_params[i] = (char*)p.data();
}
// EOF: NULL
charpv_params[params.size()] = NULL;
// TODO: execv or execvp
ret = execv(ffmpeg.c_str(), charpv_params);
if (ret < 0) {
fprintf(stderr, "fork ffmpeg failed, errno=%d(%s)",
errno, strerror(errno));
}
exit(ret);
}
// parent.
if (pid > 0) {
started = true;
srs_trace("vfored ffmpeg encoder engine, pid=%d", pid);
return ret;
}
return ret;
} }
int SrsFFMPEG::cycle() int SrsFFMPEG::cycle()
{ {
int ret = ERROR_SUCCESS; return process->cycle();
if (!started) {
return ret;
}
// ffmpeg is prepare to stop, donot cycle.
if (fast_stopped) {
return ret;
}
int status = 0;
pid_t p = waitpid(pid, &status, WNOHANG);
if (p < 0) {
ret = ERROR_SYSTEM_WAITPID;
srs_error("transcode waitpid failed, pid=%d, ret=%d", pid, ret);
return ret;
}
if (p == 0) {
srs_info("transcode process pid=%d is running.", pid);
return ret;
}
srs_trace("transcode process pid=%d terminate, restart it.", pid);
started = false;
return ret;
} }
void SrsFFMPEG::stop() void SrsFFMPEG::stop()
{ {
if (!started) { process->stop();
return;
}
// kill the ffmpeg,
// when rewind, upstream will stop publish(unpublish),
// unpublish event will stop all ffmpeg encoders,
// then publish will start all ffmpeg encoders.
int ret = srs_kill_forced(pid);
if (ret != ERROR_SUCCESS) {
srs_warn("ignore kill the encoder failed, pid=%d. ret=%d", pid, ret);
return;
}
// terminated, set started to false to stop the cycle.
started = false;
} }
void SrsFFMPEG::fast_stop() void SrsFFMPEG::fast_stop()
{ {
int ret = ERROR_SUCCESS; process->fast_stop();
if (!started) {
return;
}
if (pid <= 0) {
return;
}
if (kill(pid, SIGTERM) < 0) {
ret = ERROR_SYSTEM_KILL;
srs_warn("ignore fast stop ffmpeg failed, pid=%d. ret=%d", pid, ret);
return;
}
return;
} }
#endif #endif

View file

@ -31,11 +31,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#ifdef SRS_AUTO_FFMPEG_STUB #ifdef SRS_AUTO_FFMPEG_STUB
#include <string>
#include <vector> #include <vector>
#include <string>
class SrsConfDirective; class SrsConfDirective;
class SrsPithyPrint; class SrsPithyPrint;
class SrsProcess;
/** /**
* a transcode engine: ffmepg, * a transcode engine: ffmepg,
@ -44,11 +45,8 @@ class SrsPithyPrint;
class SrsFFMPEG class SrsFFMPEG
{ {
private: private:
bool started; SrsProcess* process;
// whether SIGTERM send but need to wait or SIGKILL. std::vector<std::string> params;
bool fast_stopped;
pid_t pid;
private:
std::string log_file; std::string log_file;
private: private:
std::string ffmpeg; std::string ffmpeg;
@ -83,26 +81,11 @@ public:
virtual int initialize(std::string in, std::string out, std::string log); virtual int initialize(std::string in, std::string out, std::string log);
virtual int initialize_transcode(SrsConfDirective* engine); virtual int initialize_transcode(SrsConfDirective* engine);
virtual int initialize_copy(); virtual int initialize_copy();
public:
virtual int start(); virtual int start();
virtual int cycle(); virtual int cycle();
/**
* send SIGTERM then SIGKILL to ensure the process stopped.
* the stop will wait [0, SRS_PROCESS_QUIT_TIMEOUT_MS] depends on the
* process quit timeout.
* @remark use fast_stop before stop one by one, when got lots of process to quit.
*/
virtual void stop(); virtual void stop();
public: public:
/**
* the fast stop is to send a SIGTERM.
* for example, the ingesters owner lots of FFMPEG, it will take a long time
* to stop one by one, instead the ingesters can fast_stop all FFMPEG, then
* wait one by one to stop, it's more faster.
* @remark user must use stop() to ensure the ffmpeg to stopped.
* @remark we got N processes to stop, compare the time we spend,
* when use stop without fast_stop, we spend maybe [0, SRS_PROCESS_QUIT_TIMEOUT_MS * N]
* but use fast_stop then stop, the time is almost [0, SRS_PROCESS_QUIT_TIMEOUT_MS].
*/
virtual void fast_stop(); virtual void fast_stop();
}; };

View file

@ -0,0 +1,216 @@
/*
The MIT License (MIT)
Copyright (c) 2013-2015 SRS(simple-rtmp-server)
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_app_process.hpp>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/types.h>
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_app_config.hpp>
#include <srs_app_utility.hpp>
SrsProcess::SrsProcess()
{
is_started = false;
fast_stopped = false;
pid = -1;
}
SrsProcess::~SrsProcess()
{
}
bool SrsProcess::started()
{
return is_started;
}
int SrsProcess::initialize(string binary, vector<string> argv)
{
int ret = ERROR_SUCCESS;
bin = binary;
params = argv;
return ret;
}
int SrsProcess::start()
{
int ret = ERROR_SUCCESS;
if (is_started) {
return ret;
}
std::string cli;
if (true) {
for (int i = 0; i < (int)params.size(); i++) {
std::string ffp = params[i];
cli += ffp;
if (i < (int)params.size() - 1) {
cli += " ";
}
}
srs_trace("fork process: %s %s", bin.c_str(), cli.c_str());
}
// for log
int cid = _srs_context->get_id();
// TODO: fork or vfork?
if ((pid = fork()) < 0) {
ret = ERROR_ENCODER_FORK;
srs_error("vfork process failed. ret=%d", ret);
return ret;
}
// child process: ffmpeg encoder engine.
if (pid == 0) {
// ignore the SIGINT and SIGTERM
signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN);
// log basic info
if (true) {
fprintf(stderr, "\n");
fprintf(stderr, "process parent cid=%d", cid);
fprintf(stderr, "process binary=%s", bin.c_str());
fprintf(stderr, "process params: %s %s", bin.c_str(), cli.c_str());
}
// close other fds
// TODO: do in right way.
for (int i = 3; i < 1024; i++) {
::close(i);
}
// memory leak in child process, it's ok.
char** charpv_params = new char*[params.size() + 1];
for (int i = 0; i < (int)params.size(); i++) {
std::string& p = params[i];
charpv_params[i] = (char*)p.data();
}
// EOF: NULL
charpv_params[params.size()] = NULL;
// TODO: execv or execvp
ret = execv(bin.c_str(), charpv_params);
if (ret < 0) {
fprintf(stderr, "fork process failed, errno=%d(%s)", errno, strerror(errno));
}
exit(ret);
}
// parent.
if (pid > 0) {
is_started = true;
srs_trace("vfored process, pid=%d, cli=%s", pid, bin.c_str());
return ret;
}
return ret;
}
int SrsProcess::cycle()
{
int ret = ERROR_SUCCESS;
if (!is_started) {
return ret;
}
// ffmpeg is prepare to stop, donot cycle.
if (fast_stopped) {
return ret;
}
int status = 0;
pid_t p = waitpid(pid, &status, WNOHANG);
if (p < 0) {
ret = ERROR_SYSTEM_WAITPID;
srs_error("process waitpid failed, pid=%d, ret=%d", pid, ret);
return ret;
}
if (p == 0) {
srs_info("process process pid=%d is running.", pid);
return ret;
}
srs_trace("process pid=%d terminate, restart it.", pid);
is_started = false;
return ret;
}
void SrsProcess::stop()
{
if (!is_started) {
return;
}
// kill the ffmpeg,
// when rewind, upstream will stop publish(unpublish),
// unpublish event will stop all ffmpeg encoders,
// then publish will start all ffmpeg encoders.
int ret = srs_kill_forced(pid);
if (ret != ERROR_SUCCESS) {
srs_warn("ignore kill the process failed, pid=%d. ret=%d", pid, ret);
return;
}
// terminated, set started to false to stop the cycle.
is_started = false;
}
void SrsProcess::fast_stop()
{
int ret = ERROR_SUCCESS;
if (!is_started) {
return;
}
if (pid <= 0) {
return;
}
if (kill(pid, SIGTERM) < 0) {
ret = ERROR_SYSTEM_KILL;
srs_warn("ignore fast stop process failed, pid=%d. ret=%d", pid, ret);
return;
}
return;
}

View file

@ -0,0 +1,99 @@
/*
The MIT License (MIT)
Copyright (c) 2013-2015 SRS(simple-rtmp-server)
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_APP_PROCESS_HPP
#define SRS_APP_PROCESS_HPP
/*
#include <srs_app_process.hpp>
*/
#include <srs_core.hpp>
#include <string>
#include <vector>
/**
* to start and stop a process, cycle to restart the process when terminated.
* the usage:
* process = new SrsProcess();
* if ((ret = process->initialize(binary, argv)) != ERROR_SUCCESS) { return ret; }
* if ((ret = process->start()) != ERROR_SUCCESS) { return ret; }
* if ((ret = process->cycle()) != ERROR_SUCCESS) { return ret; }
* process->fast_stop();
* process->stop();
*/
class SrsProcess
{
private:
bool is_started;
// whether SIGTERM send but need to wait or SIGKILL.
bool fast_stopped;
pid_t pid;
private:
std::string bin;
std::vector<std::string> params;
public:
SrsProcess();
virtual ~SrsProcess();
public:
/**
* whether process is already started.
*/
virtual bool started();
/**
* initialize the process with binary and argv.
*/
virtual int initialize(std::string binary, std::vector<std::string> argv);
public:
/**
* start the process, ignore when already started.
*/
virtual int start();
/**
* cycle check the process, update the state of process.
* @remark when process terminated(not started), user can restart it again by start().
*/
virtual int cycle();
/**
* send SIGTERM then SIGKILL to ensure the process stopped.
* the stop will wait [0, SRS_PROCESS_QUIT_TIMEOUT_MS] depends on the
* process quit timeout.
* @remark use fast_stop before stop one by one, when got lots of process to quit.
*/
virtual void stop();
public:
/**
* the fast stop is to send a SIGTERM.
* for example, the ingesters owner lots of FFMPEG, it will take a long time
* to stop one by one, instead the ingesters can fast_stop all FFMPEG, then
* wait one by one to stop, it's more faster.
* @remark user must use stop() to ensure the ffmpeg to stopped.
* @remark we got N processes to stop, compare the time we spend,
* when use stop without fast_stop, we spend maybe [0, SRS_PROCESS_QUIT_TIMEOUT_MS * N]
* but use fast_stop then stop, the time is almost [0, SRS_PROCESS_QUIT_TIMEOUT_MS].
*/
virtual void fast_stop();
};
#endif

View file

@ -920,7 +920,11 @@ extern int srs_human_print_rtmp_packet4(char type, u_int32_t timestamp, char* da
// log to console, for use srs-librtmp application. // log to console, for use srs-librtmp application.
extern const char* srs_human_format_time(); extern const char* srs_human_format_time();
#ifndef _WIN32
// for getpid.
#include <unistd.h>
#endif
// when disabled log, donot compile it. // when disabled log, donot compile it.
#ifdef SRS_DISABLE_LOG #ifdef SRS_DISABLE_LOG
#define srs_human_trace(msg, ...) (void)0 #define srs_human_trace(msg, ...) (void)0
@ -936,15 +940,13 @@ extern const char* srs_human_format_time();
************************************************************** **************************************************************
* IO hijack, use your specified io functions. * IO hijack, use your specified io functions.
************************************************************** **************************************************************
*************************************************************/ *************************************************************/
// the void* will convert to your handler for io hijack. // the void* will convert to your handler for io hijack.
typedef void* srs_hijack_io_t; typedef void* srs_hijack_io_t;
#ifdef SRS_HIJACK_IO #ifdef SRS_HIJACK_IO
#ifndef _WIN32 #ifndef _WIN32
// for iovec. // for iovec.
#include <sys/uio.h> #include <sys/uio.h>
// for getpid.
#include <unistd.h>
#endif #endif
/** /**
* get the hijack io object in rtmp protocol sdk. * get the hijack io object in rtmp protocol sdk.