1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #913, refine server utility

This commit is contained in:
winlin 2017-06-10 15:20:48 +08:00
parent ca9f0bdb1e
commit a20e2c3ef6
15 changed files with 222 additions and 244 deletions

View file

@ -380,24 +380,18 @@ SrsSignalManager::~SrsSignalManager()
srs_freep(trd);
}
int SrsSignalManager::initialize()
srs_error_t SrsSignalManager::initialize()
{
int ret = ERROR_SUCCESS;
/* Create signal pipe */
if (pipe(sig_pipe) < 0) {
ret = ERROR_SYSTEM_CREATE_PIPE;
srs_error("create signal manager pipe failed. ret=%d", ret);
return ret;
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe");
}
if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) {
ret = ERROR_SYSTEM_CREATE_PIPE;
srs_error("create signal manage st pipe failed. ret=%d", ret);
return ret;
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "open pipe");
}
return ret;
return srs_success;
}
int SrsSignalManager::start()
@ -588,12 +582,11 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* cycle_handler)
srs_error_t SrsServer::initialize_st()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// init st
if ((ret = srs_st_init()) != ERROR_SUCCESS) {
return srs_error_new(ret, "initialize st failed");
if ((err = srs_st_init()) != srs_success) {
return srs_error_wrap(err, "initialize st failed");
}
// @remark, st alloc segment use mmap, which only support 32757 threads,
@ -613,8 +606,8 @@ srs_error_t SrsServer::initialize_st()
// initialize the conponents that depends on st.
#ifdef SRS_AUTO_KAFKA
if ((ret = srs_initialize_kafka()) != ERROR_SUCCESS) {
return srs_error_new(ret, "initialize kafka");
if ((err = srs_initialize_kafka()) != srs_success) {
return srs_error_wrap(err, "initialize kafka");
}
#endif
@ -630,18 +623,16 @@ srs_error_t SrsServer::initialize_st()
return err;
}
int SrsServer::initialize_signal()
srs_error_t SrsServer::initialize_signal()
{
return signal_manager->initialize();
}
int SrsServer::acquire_pid_file()
srs_error_t SrsServer::acquire_pid_file()
{
int ret = ERROR_SUCCESS;
// when srs in dolphin mode, no need the pid file.
if (_srs_config->is_dolphin()) {
return ret;
return srs_success;
}
std::string pid_file = _srs_config->get_pid_file();
@ -652,10 +643,8 @@ int SrsServer::acquire_pid_file()
int fd;
// open pid file
if ((fd = ::open(pid_file.c_str(), O_WRONLY | O_CREAT, mode)) < 0) {
ret = ERROR_SYSTEM_PID_ACQUIRE;
srs_error("open pid file %s error, ret=%#x", pid_file.c_str(), ret);
return ret;
if ((fd = ::open(pid_file.c_str(), O_WRONLY | O_CREAT, mode)) == -1) {
return srs_error_new(ERROR_SYSTEM_PID_ACQUIRE, "open pid file=%s", pid_file.c_str());
}
// require write lock
@ -666,185 +655,173 @@ int SrsServer::acquire_pid_file()
lock.l_whence = SEEK_SET; // SEEK_SET, SEEK_CUR, SEEK_END
lock.l_len = 0;
if (fcntl(fd, F_SETLK, &lock) < 0) {
if (fcntl(fd, F_SETLK, &lock) == -1) {
if(errno == EACCES || errno == EAGAIN) {
ret = ERROR_SYSTEM_PID_ALREADY_RUNNING;
srs_error("srs is already running! ret=%#x", ret);
return ret;
srs_error("srs is already running!");
return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running");
}
ret = ERROR_SYSTEM_PID_LOCK;
srs_error("require lock for file %s error! ret=%#x", pid_file.c_str(), ret);
return ret;
return srs_error_new(ERROR_SYSTEM_PID_LOCK, "access to pid=%s", pid_file.c_str());
}
// truncate file
if (ftruncate(fd, 0) < 0) {
ret = ERROR_SYSTEM_PID_TRUNCATE_FILE;
srs_error("truncate pid file %s error! ret=%#x", pid_file.c_str(), ret);
return ret;
if (ftruncate(fd, 0) != 0) {
return srs_error_new(ERROR_SYSTEM_PID_TRUNCATE_FILE, "truncate pid file=%s", pid_file.c_str());
}
// write the pid
string pid = srs_int2str(getpid());
if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) {
ret = ERROR_SYSTEM_PID_WRITE_FILE;
srs_error("write our pid error! pid=%s file=%s ret=%#x", pid.c_str(), pid_file.c_str(), ret);
return ret;
return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%d to file=%s", pid.c_str(), pid_file.c_str());
}
// auto close when fork child process.
int val;
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
ret = ERROR_SYSTEM_PID_GET_FILE_INFO;
srs_error("fnctl F_GETFD error! file=%s ret=%#x", pid_file.c_str(), ret);
return ret;
return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fcntl fd=%d", fd);
}
val |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, val) < 0) {
ret = ERROR_SYSTEM_PID_SET_FILE_INFO;
srs_error("fcntl F_SETFD error! file=%s ret=%#x", pid_file.c_str(), ret);
return ret;
return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "lock file=%s fd=%d", pid_file.c_str(), fd);
}
srs_trace("write pid=%s to %s success!", pid.c_str(), pid_file.c_str());
pid_fd = fd;
return ret;
return srs_success;
}
int SrsServer::listen()
srs_error_t SrsServer::listen()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
return ret;
if ((err = listen_rtmp()) != srs_success) {
return srs_error_wrap(err, "rtmp listen");
}
if ((ret = listen_http_api()) != ERROR_SUCCESS) {
return ret;
if ((err = listen_http_api()) != srs_success) {
return srs_error_wrap(err, "http api listen");
}
if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
return ret;
if ((err = listen_http_stream()) != srs_success) {
return srs_error_wrap(err, "http stream listen");
}
if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
return ret;
if ((err = listen_stream_caster()) != srs_success) {
return srs_error_wrap(err, "stream caster listen");
}
if ((ret = conn_manager->start()) != ERROR_SUCCESS) {
return ret;
return srs_error_new(ret, "connection manager");
}
return ret;
return err;
}
int SrsServer::register_signal()
srs_error_t SrsServer::register_signal()
{
// start signal process thread.
return signal_manager->start();
int ret = signal_manager->start();
if (ret != ERROR_SUCCESS) {
return srs_error_new(ret, "signal manager start");
}
return srs_success;
}
int SrsServer::http_handle()
srs_error_t SrsServer::http_handle()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
srs_assert(http_api_mux);
if ((ret = http_api_mux->handle("/", new SrsHttpNotFoundHandler())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/", new SrsHttpNotFoundHandler())) != srs_success) {
return srs_error_wrap(err, "handle not found");
}
if ((ret = http_api_mux->handle("/api/", new SrsGoApiApi())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/", new SrsGoApiApi())) != srs_success) {
return srs_error_wrap(err, "handle api");
}
if ((ret = http_api_mux->handle("/api/v1/", new SrsGoApiV1())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/", new SrsGoApiV1())) != srs_success) {
return srs_error_wrap(err, "handle v1");
}
if ((ret = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != srs_success) {
return srs_error_wrap(err, "handle versions");
}
if ((ret = http_api_mux->handle("/api/v1/summaries", new SrsGoApiSummaries())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/summaries", new SrsGoApiSummaries())) != srs_success) {
return srs_error_wrap(err, "handle summaries");
}
if ((ret = http_api_mux->handle("/api/v1/rusages", new SrsGoApiRusages())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/rusages", new SrsGoApiRusages())) != srs_success) {
return srs_error_wrap(err, "handle rusages");
}
if ((ret = http_api_mux->handle("/api/v1/self_proc_stats", new SrsGoApiSelfProcStats())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/self_proc_stats", new SrsGoApiSelfProcStats())) != srs_success) {
return srs_error_wrap(err, "handle self proc stats");
}
if ((ret = http_api_mux->handle("/api/v1/system_proc_stats", new SrsGoApiSystemProcStats())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/system_proc_stats", new SrsGoApiSystemProcStats())) != srs_success) {
return srs_error_wrap(err, "handle system proc stats");
}
if ((ret = http_api_mux->handle("/api/v1/meminfos", new SrsGoApiMemInfos())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/meminfos", new SrsGoApiMemInfos())) != srs_success) {
return srs_error_wrap(err, "handle meminfos");
}
if ((ret = http_api_mux->handle("/api/v1/authors", new SrsGoApiAuthors())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/authors", new SrsGoApiAuthors())) != srs_success) {
return srs_error_wrap(err, "handle authors");
}
if ((ret = http_api_mux->handle("/api/v1/features", new SrsGoApiFeatures())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/features", new SrsGoApiFeatures())) != srs_success) {
return srs_error_wrap(err, "handle features");
}
if ((ret = http_api_mux->handle("/api/v1/vhosts/", new SrsGoApiVhosts())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/vhosts/", new SrsGoApiVhosts())) != srs_success) {
return srs_error_wrap(err, "handle vhosts");
}
if ((ret = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != srs_success) {
return srs_error_wrap(err, "handle streams");
}
if ((ret = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != srs_success) {
return srs_error_wrap(err, "handle clients");
}
if ((ret = http_api_mux->handle("/api/v1/raw", new SrsGoApiRaw(this))) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/raw", new SrsGoApiRaw(this))) != srs_success) {
return srs_error_wrap(err, "handle raw");
}
// test the request info.
if ((ret = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != srs_success) {
return srs_error_wrap(err, "handle tests requests");
}
// test the error code response.
if ((ret = http_api_mux->handle("/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/tests/errors", new SrsGoApiError())) != srs_success) {
return srs_error_wrap(err, "handle tests errors");
}
// test the redirect mechenism.
if ((ret = http_api_mux->handle("/api/v1/tests/redirects", new SrsHttpRedirectHandler("/api/v1/tests/errors", SRS_CONSTS_HTTP_MovedPermanently))) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("/api/v1/tests/redirects", new SrsHttpRedirectHandler("/api/v1/tests/errors", SRS_CONSTS_HTTP_MovedPermanently))) != srs_success) {
return srs_error_wrap(err, "handle tests redirects");
}
// test the http vhost.
if ((ret = http_api_mux->handle("error.srs.com/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {
return ret;
if ((err = http_api_mux->handle("error.srs.com/api/v1/tests/errors", new SrsGoApiError())) != srs_success) {
return srs_error_wrap(err, "handle tests errors for error.srs.com");
}
// TODO: FIXME: for console.
// TODO: FIXME: support reload.
std::string dir = _srs_config->get_http_stream_dir() + "/console";
if ((ret = http_api_mux->handle("/console/", new SrsHttpFileServer(dir))) != ERROR_SUCCESS) {
srs_error("http: mount console dir=%s failed. ret=%d", dir.c_str(), ret);
return ret;
if ((err = http_api_mux->handle("/console/", new SrsHttpFileServer(dir))) != srs_success) {
return srs_error_wrap(err, "handle console at %s", dir.c_str());
}
srs_trace("http: api mount /console to %s", dir.c_str());
return ret;
return err;
}
int SrsServer::ingest()
srs_error_t SrsServer::ingest()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_INGEST
if ((ret = ingester->start()) != ERROR_SUCCESS) {
srs_error("start ingest streams failed. ret=%d", ret);
return ret;
return srs_error_new(ret, "ingest start");
}
#endif
return ret;
return srs_success;
}
int SrsServer::cycle()
srs_error_t SrsServer::cycle()
{
int ret = ERROR_SUCCESS;
ret = do_cycle();
srs_error_t err = do_cycle();
#ifdef SRS_AUTO_GPERF_MC
destroy();
@ -866,7 +843,7 @@ int SrsServer::cycle()
exit(0);
#endif
return ret;
return err;
}
@ -919,9 +896,10 @@ void SrsServer::on_signal(int signo)
}
}
int SrsServer::do_cycle()
srs_error_t SrsServer::do_cycle()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// find the max loop
int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);
@ -943,8 +921,7 @@ int SrsServer::do_cycle()
// TODO: FIXME: use SrsHourGlass.
while (true) {
if (handler && (ret = handler->on_cycle()) != ERROR_SUCCESS) {
srs_error("cycle handle failed. ret=%d", ret);
return ret;
return srs_error_new(ret, "handle callback");
}
// the interval in config.
@ -958,14 +935,13 @@ int SrsServer::do_cycle()
// asprocess check.
if (asprocess && ::getppid() != ppid) {
srs_warn("asprocess ppid changed from %d to %d", ppid, ::getppid());
return ret;
return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid());
}
// gracefully quit for SIGINT or SIGTERM.
if (signal_gracefully_quit) {
srs_trace("cleanup for gracefully terminate.");
return ret;
return err;
}
// for gperf heap checker,
@ -976,7 +952,7 @@ int SrsServer::do_cycle()
#ifdef SRS_AUTO_GPERF_MC
if (signal_gmc_stop) {
srs_warn("gmc got singal to stop server.");
return ret;
return err;
}
#endif
@ -986,8 +962,7 @@ int SrsServer::do_cycle()
srs_info("get signal to persistence config to file.");
if ((ret = _srs_config->persistence()) != ERROR_SUCCESS) {
srs_error("persistence config to file failed. ret=%d", ret);
return ret;
return srs_error_new(ret, "config persistence to file");
}
srs_trace("persistence config to file success.");
}
@ -998,15 +973,14 @@ int SrsServer::do_cycle()
srs_info("get signal to reload the config.");
if ((ret = _srs_config->reload()) != ERROR_SUCCESS) {
srs_error("reload config failed. ret=%d", ret);
return ret;
return srs_error_new(ret, "config reload");
}
srs_trace("reload config success.");
}
// notice the stream sources to cycle.
if ((ret = SrsSource::cycle_all()) != ERROR_SUCCESS) {
return ret;
return srs_error_new(ret, "source cycle");
}
// update the cache time
@ -1056,10 +1030,10 @@ int SrsServer::do_cycle()
}
}
return ret;
return err;
}
int SrsServer::listen_rtmp()
srs_error_t SrsServer::listen_rtmp()
{
int ret = ERROR_SUCCESS;
@ -1078,15 +1052,14 @@ int SrsServer::listen_rtmp()
srs_parse_endpoint(ip_ports[i], ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
return ret;
srs_error_new(ret, "rtmp listen %s:%d", ip.c_str(), port);
}
}
return ret;
return srs_success;
}
int SrsServer::listen_http_api()
srs_error_t SrsServer::listen_http_api()
{
int ret = ERROR_SUCCESS;
@ -1102,15 +1075,14 @@ int SrsServer::listen_http_api()
srs_parse_endpoint(ep, ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
srs_error("HTTP api listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ret, "http api listen %s:%d", ip.c_str(), port);
}
}
return ret;
return srs_success;
}
int SrsServer::listen_http_stream()
srs_error_t SrsServer::listen_http_stream()
{
int ret = ERROR_SUCCESS;
@ -1126,15 +1098,14 @@ int SrsServer::listen_http_stream()
srs_parse_endpoint(ep, ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
srs_error("HTTP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
return ret;
return srs_error_new(ret, "http stream listen %s:%d", ip.c_str(), port);
}
}
return ret;
return srs_success;
}
int SrsServer::listen_stream_caster()
srs_error_t SrsServer::listen_stream_caster()
{
int ret = ERROR_SUCCESS;
@ -1160,9 +1131,7 @@ int SrsServer::listen_stream_caster()
} else if (srs_stream_caster_is_flv(caster)) {
listener = new SrsHttpFlvListener(this, SrsListenerFlv, stream_caster);
} else {
ret = ERROR_STREAM_CASTER_ENGINE;
srs_error("unsupported stream caster %s. ret=%d", caster.c_str(), ret);
return ret;
return srs_error_new(ERROR_STREAM_CASTER_ENGINE, "invalid caster %s", caster.c_str());
}
srs_assert(listener != NULL);
@ -1170,20 +1139,17 @@ int SrsServer::listen_stream_caster()
int port = _srs_config->get_stream_caster_listen(stream_caster);
if (port <= 0) {
ret = ERROR_STREAM_CASTER_PORT;
srs_error("invalid stream caster port %d. ret=%d", port, ret);
return ret;
return srs_error_new(ERROR_STREAM_CASTER_PORT, "invalid port=%d", port);
}
// TODO: support listen at <[ip:]port>
if ((ret = listener->listen("0.0.0.0", port)) != ERROR_SUCCESS) {
srs_error("StreamCaster listen at port %d failed. ret=%d", port, ret);
return ret;
return srs_error_new(ret, "listen at %d", port);
}
}
#endif
return ret;
return srs_success;
}
void SrsServer::close_listeners(SrsListenerType type)
@ -1334,7 +1300,11 @@ void SrsServer::remove(ISrsConnection* c)
int SrsServer::on_reload_listen()
{
return listen();
// TODO: FIXME: Use error.
srs_error_t err = listen();
int ret = srs_error_code(err);
srs_freep(err);
return ret;
}
int SrsServer::on_reload_pid()
@ -1344,7 +1314,11 @@ int SrsServer::on_reload_pid()
pid_fd = -1;
}
return acquire_pid_file();
// TODO: FIXME: Use error.
srs_error_t err = acquire_pid_file();
int ret = srs_error_code(err);
srs_freep(err);
return ret;
}
int SrsServer::on_reload_vhost_added(std::string vhost)
@ -1377,7 +1351,11 @@ int SrsServer::on_reload_vhost_removed(std::string /*vhost*/)
int SrsServer::on_reload_http_api_enabled()
{
return listen_http_api();
// TODO: FIXME: Use error.
srs_error_t err = listen_http_api();
int ret = srs_error_code(err);
srs_freep(err);
return ret;
}
int SrsServer::on_reload_http_api_disabled()
@ -1388,7 +1366,11 @@ int SrsServer::on_reload_http_api_disabled()
int SrsServer::on_reload_http_stream_enabled()
{
return listen_http_stream();
// TODO: FIXME: Use error.
srs_error_t err = listen_http_stream();
int ret = srs_error_code(err);
srs_freep(err);
return ret;
}
int SrsServer::on_reload_http_stream_disabled()