1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-24 06:54:22 +00:00
srs/trunk/src/app/srs_app_server.cpp

1429 lines
42 KiB
C++
Raw Normal View History

2017-03-25 09:21:39 +00:00
/**
* The MIT License (MIT)
*
2017-03-25 13:29:29 +00:00
* Copyright (c) 2013-2017 OSSRS(winlin)
2017-03-25 09:21:39 +00:00
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
2013-11-23 03:36:07 +00:00
#include <srs_app_server.hpp>
2013-11-23 03:36:07 +00:00
#include <sys/types.h>
#include <signal.h>
2014-03-21 07:45:34 +00:00
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
2013-11-23 03:36:07 +00:00
#include <algorithm>
using namespace std;
2013-11-23 03:36:07 +00:00
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
2014-03-27 04:14:04 +00:00
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_http_api.hpp>
#include <srs_app_http_conn.hpp>
2014-04-07 01:07:12 +00:00
#include <srs_app_ingest.hpp>
#include <srs_app_source.hpp>
#include <srs_app_utility.hpp>
2014-05-19 09:39:01 +00:00
#include <srs_app_heartbeat.hpp>
#include <srs_app_mpegts_udp.hpp>
#include <srs_app_rtsp.hpp>
#include <srs_app_statistic.hpp>
#include <srs_app_caster_flv.hpp>
2015-06-07 07:13:41 +00:00
#include <srs_core_mem_watch.hpp>
2015-08-27 10:11:50 +00:00
#include <srs_kernel_consts.hpp>
2015-09-22 09:40:05 +00:00
#include <srs_app_kafka.hpp>
#include <srs_app_thread.hpp>
// system interval in ms,
2014-04-19 16:15:26 +00:00
// all resolution times should be times togother,
// for example, system-interval is x=1s(1000ms),
// then rusage can be 3*x, for instance, 3*1=3s,
// the meminfo canbe 6*x, for instance, 6*1=6s,
2015-11-11 02:37:50 +00:00
// for performance refine, @see: https://github.com/ossrs/srs/issues/194
// @remark, recomment to 1000ms.
#define SRS_SYS_CYCLE_INTERVAL 1000
2014-04-19 13:23:34 +00:00
// update time interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_TIME_RESOLUTION_MS_TIMES
#define SRS_SYS_TIME_RESOLUTION_MS_TIMES 1
2014-04-19 13:23:34 +00:00
// update rusage interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_RUSAGE_RESOLUTION_TIMES
#define SRS_SYS_RUSAGE_RESOLUTION_TIMES 3
2014-04-19 13:23:34 +00:00
// update network devices info interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES
#define SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES 3
2014-04-19 13:23:34 +00:00
// update rusage interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_CPU_STAT_RESOLUTION_TIMES
#define SRS_SYS_CPU_STAT_RESOLUTION_TIMES 3
2013-11-23 03:36:07 +00:00
// update the disk iops interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_DISK_STAT_RESOLUTION_TIMES
#define SRS_SYS_DISK_STAT_RESOLUTION_TIMES 6
2014-04-19 16:15:26 +00:00
// update rusage interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_MEMINFO_RESOLUTION_TIMES
#define SRS_SYS_MEMINFO_RESOLUTION_TIMES 6
2014-04-19 16:15:26 +00:00
2014-05-08 07:45:51 +00:00
// update platform info interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES
#define SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES 9
2014-05-08 07:45:51 +00:00
// update network devices info interval:
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES
#define SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES 9
2017-03-25 09:21:39 +00:00
std::string srs_listener_type2string(SrsListenerType type)
{
switch (type) {
2017-03-25 09:21:39 +00:00
case SrsListenerRtmpStream:
return "RTMP";
case SrsListenerHttpApi:
return "HTTP-API";
case SrsListenerHttpStream:
return "HTTP-Server";
case SrsListenerMpegTsOverUdp:
return "MPEG-TS over UDP";
case SrsListenerRtsp:
return "RTSP";
case SrsListenerFlv:
return "HTTP-FLV";
default:
return "UNKONWN";
}
}
SrsListener::SrsListener(SrsServer* svr, SrsListenerType t)
2013-11-23 03:36:07 +00:00
{
port = 0;
server = svr;
type = t;
2013-11-23 03:36:07 +00:00
}
SrsListener::~SrsListener()
{
}
SrsListenerType SrsListener::listen_type()
2014-04-12 14:16:39 +00:00
{
return type;
2014-04-12 14:16:39 +00:00
}
2015-09-22 00:57:31 +00:00
SrsBufferListener::SrsBufferListener(SrsServer* svr, SrsListenerType t) : SrsListener(svr, t)
{
listener = NULL;
}
2015-09-22 00:57:31 +00:00
SrsBufferListener::~SrsBufferListener()
{
srs_freep(listener);
}
2015-09-22 00:57:31 +00:00
int SrsBufferListener::listen(string i, int p)
2013-11-23 03:36:07 +00:00
{
2014-03-18 03:32:58 +00:00
int ret = ERROR_SUCCESS;
ip = i;
port = p;
2017-03-25 09:21:39 +00:00
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
2017-03-25 09:21:39 +00:00
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("tcp listen failed. ret=%d", ret);
2013-11-23 03:36:07 +00:00
return ret;
}
2015-05-27 03:12:52 +00:00
srs_info("listen thread current_cid=%d, "
2017-03-25 09:21:39 +00:00
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
_srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
2017-03-25 09:21:39 +00:00
2014-03-18 03:32:58 +00:00
return ret;
2013-11-23 03:36:07 +00:00
}
int SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
2013-11-23 03:36:07 +00:00
{
2014-03-18 03:32:58 +00:00
int ret = ERROR_SUCCESS;
if ((ret = server->accept_client(type, stfd)) != ERROR_SUCCESS) {
2014-03-18 03:32:58 +00:00
srs_warn("accept client error. ret=%d", ret);
return ret;
}
2017-03-25 09:21:39 +00:00
2014-03-18 03:32:58 +00:00
return ret;
2013-11-23 03:36:07 +00:00
}
#ifdef SRS_AUTO_STREAM_CASTER
SrsRtspListener::SrsRtspListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t)
{
listener = NULL;
2017-03-25 09:21:39 +00:00
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(type == SrsListenerRtsp);
if (type == SrsListenerRtsp) {
2015-02-16 06:05:01 +00:00
caster = new SrsRtspCaster(c);
}
}
SrsRtspListener::~SrsRtspListener()
{
srs_freep(caster);
srs_freep(listener);
}
int SrsRtspListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
2017-03-25 09:21:39 +00:00
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(type == SrsListenerRtsp);
ip = i;
port = p;
2017-03-25 09:21:39 +00:00
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
2017-03-25 09:21:39 +00:00
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("rtsp caster listen failed. ret=%d", ret);
return ret;
}
2017-03-25 04:52:54 +00:00
srs_info("listen thread listen at port=%d, type=%d, fd=%d started success, ep=%s:%d", port, type, listener->fd(), ip.c_str(), port);
2017-03-25 09:21:39 +00:00
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
2017-03-25 09:21:39 +00:00
return ret;
}
int SrsRtspListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
2015-02-17 13:10:06 +00:00
if ((ret = caster->on_tcp_client(stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
}
2017-03-25 09:21:39 +00:00
return ret;
}
SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t)
{
listener = NULL;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(type == SrsListenerFlv);
if (type == SrsListenerFlv) {
caster = new SrsAppCasterFlv(c);
}
}
SrsHttpFlvListener::~SrsHttpFlvListener()
{
srs_freep(caster);
srs_freep(listener);
}
int SrsHttpFlvListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(type == SrsListenerFlv);
ip = i;
port = p;
if ((ret = caster->initialize()) != ERROR_SUCCESS) {
return ret;
}
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("flv caster listen failed. ret=%d", ret);
return ret;
}
2017-05-01 08:44:14 +00:00
srs_info("listen thread listen at port=%d, type=%d, fd=%d started success, ep=%s:%d", port, type, listener->fd(), ip.c_str(), port);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
int SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
if ((ret = caster->on_tcp_client(stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
}
return ret;
}
#endif
SrsUdpStreamListener::SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c) : SrsListener(svr, t)
{
listener = NULL;
caster = c;
}
SrsUdpStreamListener::~SrsUdpStreamListener()
{
srs_freep(listener);
}
int SrsUdpStreamListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
2017-03-25 09:21:39 +00:00
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(type == SrsListenerMpegTsOverUdp);
ip = i;
port = p;
2017-03-25 09:21:39 +00:00
srs_freep(listener);
listener = new SrsUdpListener(caster, ip, port);
2017-03-25 09:21:39 +00:00
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("udp caster listen failed. ret=%d", ret);
return ret;
}
2015-05-27 03:12:52 +00:00
srs_info("listen thread current_cid=%d, "
2017-03-25 09:21:39 +00:00
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
_srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
// notify the handler the fd changed.
if ((ret = caster->on_stfd_change(listener->stfd())) != ERROR_SUCCESS) {
srs_error("notify handler fd changed. ret=%d", ret);
return ret;
}
2017-03-25 09:21:39 +00:00
srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
2017-03-25 09:21:39 +00:00
return ret;
}
#ifdef SRS_AUTO_STREAM_CASTER
SrsUdpCasterListener::SrsUdpCasterListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsUdpStreamListener(svr, t, NULL)
{
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(type == SrsListenerMpegTsOverUdp);
if (type == SrsListenerMpegTsOverUdp) {
caster = new SrsMpegtsOverUdp(c);
}
}
SrsUdpCasterListener::~SrsUdpCasterListener()
{
srs_freep(caster);
}
#endif
SrsSignalManager* SrsSignalManager::instance = NULL;
SrsSignalManager::SrsSignalManager(SrsServer* s)
{
SrsSignalManager::instance = this;
server = s;
sig_pipe[0] = sig_pipe[1] = -1;
trd = new SrsSTCoroutine("signal", this);
signal_read_stfd = NULL;
}
SrsSignalManager::~SrsSignalManager()
{
srs_close_stfd(signal_read_stfd);
if (sig_pipe[0] > 0) {
::close(sig_pipe[0]);
}
if (sig_pipe[1] > 0) {
::close(sig_pipe[1]);
}
2015-06-09 03:19:28 +00:00
srs_freep(trd);
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsSignalManager::initialize()
{
2015-06-09 03:19:28 +00:00
/* Create signal pipe */
if (pipe(sig_pipe) < 0) {
2017-06-10 07:20:48 +00:00
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe");
2015-06-09 03:19:28 +00:00
}
if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) {
2017-06-10 07:20:48 +00:00
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "open pipe");
2015-06-09 03:19:28 +00:00
}
2017-06-10 07:20:48 +00:00
return srs_success;
}
srs_error_t SrsSignalManager::start()
{
srs_error_t err = srs_success;
/**
* Note that if multiple processes are used (see below),
* the signal pipe should be initialized after the fork(2) call
* so that each process has its own private pipe.
*/
struct sigaction sa;
/* Install sig_catcher() as a signal handler */
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SRS_SIGNAL_RELOAD, &sa, NULL);
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
2015-08-27 10:11:50 +00:00
sigaction(SRS_SIGNAL_GRACEFULLY_QUIT, &sa, NULL);
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGINT, &sa, NULL);
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SRS_SIGNAL_REOPEN_LOG, &sa, NULL);
srs_trace("signal installed, reload=%d, reopen=%d, grace_quit=%d",
2017-03-25 09:21:39 +00:00
SRS_SIGNAL_RELOAD, SRS_SIGNAL_REOPEN_LOG, SRS_SIGNAL_GRACEFULLY_QUIT);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "signal manager");
}
return err;
}
srs_error_t SrsSignalManager::cycle()
{
srs_error_t err = srs_success;
2017-03-25 09:21:39 +00:00
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "signal manager");
}
int signo;
/* Read the next signal from the pipe */
srs_read(signal_read_stfd, &signo, sizeof(int), SRS_UTIME_NO_TIMEOUT);
/* Process signal synchronously */
server->on_signal(signo);
}
return err;
}
void SrsSignalManager::sig_catcher(int signo)
{
int err;
/* Save errno to restore it after the write() */
err = errno;
/* write() is reentrant/async-safe */
int fd = SrsSignalManager::instance->sig_pipe[1];
write(fd, &signo, sizeof(int));
errno = err;
}
2015-03-31 10:06:55 +00:00
ISrsServerCycle::ISrsServerCycle()
{
}
ISrsServerCycle::~ISrsServerCycle()
{
}
2013-11-23 03:36:07 +00:00
SrsServer::SrsServer()
{
2014-03-18 03:32:58 +00:00
signal_reload = false;
signal_persistence_config = false;
2014-03-18 03:32:58 +00:00
signal_gmc_stop = false;
signal_gracefully_quit = false;
2014-04-12 12:46:32 +00:00
pid_fd = -1;
2014-03-18 03:32:58 +00:00
2017-06-10 06:19:10 +00:00
signal_manager = new SrsSignalManager(this);
conn_manager = new SrsCoroutineManager();
2015-03-31 10:06:55 +00:00
handler = NULL;
2016-09-23 07:17:46 +00:00
ppid = ::getppid();
2015-03-31 10:06:55 +00:00
// donot new object in constructor,
// for some global instance is not ready now,
// new these objects in initialize instead.
http_api_mux = new SrsHttpServeMux();
http_server = new SrsHttpServer(this);
2017-06-10 06:19:10 +00:00
http_heartbeat = new SrsHttpHeartbeat();
#ifdef SRS_AUTO_INGEST
2017-06-10 06:19:10 +00:00
ingester = new SrsIngester();
#endif
2013-11-23 03:36:07 +00:00
}
SrsServer::~SrsServer()
{
destroy();
}
void SrsServer::destroy()
2013-11-23 03:36:07 +00:00
{
srs_warn("start destroy server");
2014-03-18 03:32:58 +00:00
dispose();
2015-06-07 01:27:47 +00:00
srs_freep(http_api_mux);
srs_freep(http_server);
2014-05-19 09:39:01 +00:00
srs_freep(http_heartbeat);
2017-03-25 09:21:39 +00:00
#ifdef SRS_AUTO_INGEST
srs_freep(ingester);
#endif
if (pid_fd > 0) {
::close(pid_fd);
pid_fd = -1;
}
srs_freep(signal_manager);
srs_freep(conn_manager);
2013-11-23 03:36:07 +00:00
}
void SrsServer::dispose()
{
_srs_config->unsubscribe(this);
// prevent fresh clients.
close_listeners(SrsListenerRtmpStream);
close_listeners(SrsListenerHttpApi);
close_listeners(SrsListenerHttpStream);
close_listeners(SrsListenerMpegTsOverUdp);
close_listeners(SrsListenerRtsp);
close_listeners(SrsListenerFlv);
2015-12-11 02:30:13 +00:00
// @remark don't dispose ingesters, for too slow.
2015-09-22 09:46:07 +00:00
#ifdef SRS_AUTO_KAFKA
2016-12-08 03:44:49 +00:00
srs_dispose_kafka();
2015-09-22 09:46:07 +00:00
#endif
2015-09-22 09:40:05 +00:00
2015-12-11 02:30:13 +00:00
// dispose the source for hls and dvr.
SrsSource::dispose_all();
2015-12-11 02:30:13 +00:00
// @remark don't dispose all connections, for too slow.
2017-03-25 09:21:39 +00:00
#ifdef SRS_AUTO_MEM_WATCH
2015-06-07 07:13:41 +00:00
srs_memory_report();
#endif
}
2017-06-09 03:50:35 +00:00
srs_error_t SrsServer::initialize(ISrsServerCycle* cycle_handler)
2013-11-23 03:36:07 +00:00
{
2017-06-09 03:50:35 +00:00
srs_error_t err = srs_success;
2014-04-02 10:07:34 +00:00
// ensure the time is ok.
srs_update_system_time_ms();
2014-04-12 12:37:16 +00:00
// for the main objects(server, config, log, context),
// never subscribe handler in constructor,
// instead, subscribe handler in initialize method.
srs_assert(_srs_config);
_srs_config->subscribe(this);
2015-03-31 10:06:55 +00:00
handler = cycle_handler;
2017-06-09 05:29:23 +00:00
if(handler && (err = handler->initialize()) != srs_success){
return srs_error_wrap(err, "handler initialize");
2015-03-31 10:06:55 +00:00
}
2017-06-09 05:29:23 +00:00
if ((err = http_api_mux->initialize()) != srs_success) {
return srs_error_wrap(err, "http api initialize");
}
2017-06-09 05:29:23 +00:00
if ((err = http_server->initialize()) != srs_success) {
return srs_error_wrap(err, "http server initialize");
}
2017-06-09 03:50:35 +00:00
return err;
2013-11-23 03:36:07 +00:00
}
2017-06-10 06:29:41 +00:00
srs_error_t SrsServer::initialize_st()
{
2017-06-10 06:29:41 +00:00
srs_error_t err = srs_success;
// init st
2017-06-10 07:20:48 +00:00
if ((err = srs_st_init()) != srs_success) {
return srs_error_wrap(err, "initialize st failed");
}
// @remark, st alloc segment use mmap, which only support 32757 threads,
// if need to support more, for instance, 100k threads, define the macro MALLOC_STACK.
// TODO: FIXME: maybe can use "sysctl vm.max_map_count" to refine.
2017-03-25 09:21:39 +00:00
#define __MMAP_MAX_CONNECTIONS 32756
2016-01-21 08:18:42 +00:00
if (_srs_config->get_max_connections() > __MMAP_MAX_CONNECTIONS) {
srs_error("st mmap for stack allocation must <= %d threads, "
"@see Makefile of st for MALLOC_STACK, please build st manually by "
2017-06-10 06:29:41 +00:00
"\"make EXTRA_CFLAGS=-DMALLOC_STACK linux-debug\"", __MMAP_MAX_CONNECTIONS);
return srs_error_new(ERROR_ST_EXCEED_THREADS, "%d exceed max %d threads",
_srs_config->get_max_connections(), __MMAP_MAX_CONNECTIONS);
}
// set current log id.
_srs_context->generate_id();
2016-09-23 07:00:50 +00:00
2016-12-08 03:44:49 +00:00
// initialize the conponents that depends on st.
#ifdef SRS_AUTO_KAFKA
2017-06-10 07:20:48 +00:00
if ((err = srs_initialize_kafka()) != srs_success) {
return srs_error_wrap(err, "initialize kafka");
2016-12-08 03:44:49 +00:00
}
#endif
2016-09-23 07:00:50 +00:00
// check asprocess.
bool asprocess = _srs_config->get_asprocess();
if (asprocess && ppid == 1) {
2017-06-10 06:29:41 +00:00
return srs_error_new(ERROR_SYSTEM_ASSERT_FAILED, "ppid=%d illegal for asprocess", ppid);
2016-09-23 07:00:50 +00:00
}
2017-06-10 06:29:41 +00:00
2016-09-23 07:00:50 +00:00
srs_trace("server main cid=%d, pid=%d, ppid=%d, asprocess=%d",
2017-06-10 06:29:41 +00:00
_srs_context->get_id(), ::getpid(), ppid, asprocess);
2017-06-10 06:29:41 +00:00
return err;
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::initialize_signal()
{
return signal_manager->initialize();
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::acquire_pid_file()
2014-03-21 07:45:34 +00:00
{
2015-05-28 06:59:12 +00:00
// when srs in dolphin mode, no need the pid file.
if (_srs_config->is_dolphin()) {
2017-06-10 07:20:48 +00:00
return srs_success;
2015-05-28 06:59:12 +00:00
}
2014-03-21 07:45:34 +00:00
std::string pid_file = _srs_config->get_pid_file();
2017-03-25 09:21:39 +00:00
// -rw-r--r--
2014-03-21 07:45:34 +00:00
// 644
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
int fd;
// open pid file
2017-06-10 07:20:48 +00:00
if ((fd = ::open(pid_file.c_str(), O_WRONLY | O_CREAT, mode)) == -1) {
return srs_error_new(ERROR_SYSTEM_PID_ACQUIRE, "open pid file=%s", pid_file.c_str());
2014-03-21 07:45:34 +00:00
}
// require write lock
struct flock lock;
2017-03-25 09:21:39 +00:00
2014-03-21 07:45:34 +00:00
lock.l_type = F_WRLCK; // F_RDLCK, F_WRLCK, F_UNLCK
lock.l_start = 0; // type offset, relative to l_whence
lock.l_whence = SEEK_SET; // SEEK_SET, SEEK_CUR, SEEK_END
lock.l_len = 0;
2017-06-10 07:20:48 +00:00
if (fcntl(fd, F_SETLK, &lock) == -1) {
2014-03-21 07:45:34 +00:00
if(errno == EACCES || errno == EAGAIN) {
2017-06-10 07:20:48 +00:00
srs_error("srs is already running!");
return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running");
2014-03-21 07:45:34 +00:00
}
2017-06-10 07:20:48 +00:00
return srs_error_new(ERROR_SYSTEM_PID_LOCK, "access to pid=%s", pid_file.c_str());
2014-03-21 07:45:34 +00:00
}
2017-03-25 09:21:39 +00:00
2014-03-21 07:45:34 +00:00
// truncate file
2017-06-10 07:20:48 +00:00
if (ftruncate(fd, 0) != 0) {
return srs_error_new(ERROR_SYSTEM_PID_TRUNCATE_FILE, "truncate pid file=%s", pid_file.c_str());
2014-03-21 07:45:34 +00:00
}
// write the pid
string pid = srs_int2str(getpid());
2015-10-28 03:58:49 +00:00
if (write(fd, pid.c_str(), pid.length()) != (int)pid.length()) {
2017-06-10 07:20:48 +00:00
return srs_error_new(ERROR_SYSTEM_PID_WRITE_FILE, "write pid=%d to file=%s", pid.c_str(), pid_file.c_str());
2014-03-21 07:45:34 +00:00
}
2017-03-25 09:21:39 +00:00
2014-03-21 07:45:34 +00:00
// auto close when fork child process.
int val;
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
2017-06-10 07:20:48 +00:00
return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fcntl fd=%d", fd);
2014-03-21 07:45:34 +00:00
}
val |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, val) < 0) {
2017-06-10 07:20:48 +00:00
return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "lock file=%s fd=%d", pid_file.c_str(), fd);
2014-03-21 07:45:34 +00:00
}
srs_trace("write pid=%s to %s success!", pid.c_str(), pid_file.c_str());
2014-04-12 12:46:32 +00:00
pid_fd = fd;
2014-03-21 07:45:34 +00:00
2017-06-10 07:20:48 +00:00
return srs_success;
2014-03-21 07:45:34 +00:00
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::listen()
2013-11-23 03:36:07 +00:00
{
2017-06-10 07:20:48 +00:00
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
2017-06-10 07:20:48 +00:00
if ((err = listen_rtmp()) != srs_success) {
return srs_error_wrap(err, "rtmp listen");
2014-03-27 04:14:04 +00:00
}
2014-04-12 14:16:39 +00:00
2017-06-10 07:20:48 +00:00
if ((err = listen_http_api()) != srs_success) {
return srs_error_wrap(err, "http api listen");
2014-03-27 04:14:04 +00:00
}
2017-06-10 07:20:48 +00:00
if ((err = listen_http_stream()) != srs_success) {
return srs_error_wrap(err, "http stream listen");
2014-03-18 03:32:58 +00:00
}
2017-06-10 07:20:48 +00:00
if ((err = listen_stream_caster()) != srs_success) {
return srs_error_wrap(err, "stream caster listen");
}
if ((err = conn_manager->start()) != srs_success) {
return srs_error_wrap(err, "connection manager");
}
2017-06-10 07:20:48 +00:00
return err;
2013-11-23 03:36:07 +00:00
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::register_signal()
{
srs_error_t err = srs_success;
if ((err = signal_manager->start()) != srs_success) {
return srs_error_wrap(err, "signal manager start");
2017-06-10 07:20:48 +00:00
}
return err;
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::http_handle()
{
2017-06-10 07:20:48 +00:00
srs_error_t err = srs_success;
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/", new SrsHttpNotFoundHandler())) != srs_success) {
return srs_error_wrap(err, "handle not found");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/", new SrsGoApiApi())) != srs_success) {
return srs_error_wrap(err, "handle api");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/", new SrsGoApiV1())) != srs_success) {
return srs_error_wrap(err, "handle v1");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != srs_success) {
return srs_error_wrap(err, "handle versions");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/summaries", new SrsGoApiSummaries())) != srs_success) {
return srs_error_wrap(err, "handle summaries");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/rusages", new SrsGoApiRusages())) != srs_success) {
return srs_error_wrap(err, "handle rusages");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/self_proc_stats", new SrsGoApiSelfProcStats())) != srs_success) {
return srs_error_wrap(err, "handle self proc stats");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/system_proc_stats", new SrsGoApiSystemProcStats())) != srs_success) {
return srs_error_wrap(err, "handle system proc stats");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/meminfos", new SrsGoApiMemInfos())) != srs_success) {
return srs_error_wrap(err, "handle meminfos");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/authors", new SrsGoApiAuthors())) != srs_success) {
return srs_error_wrap(err, "handle authors");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/features", new SrsGoApiFeatures())) != srs_success) {
return srs_error_wrap(err, "handle features");
2015-08-22 03:08:56 +00:00
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/vhosts/", new SrsGoApiVhosts())) != srs_success) {
return srs_error_wrap(err, "handle vhosts");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != srs_success) {
return srs_error_wrap(err, "handle streams");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != srs_success) {
return srs_error_wrap(err, "handle clients");
}
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/raw", new SrsGoApiRaw(this))) != srs_success) {
return srs_error_wrap(err, "handle raw");
2015-08-27 10:11:50 +00:00
}
// test the request info.
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != srs_success) {
return srs_error_wrap(err, "handle tests requests");
}
// test the error code response.
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/tests/errors", new SrsGoApiError())) != srs_success) {
return srs_error_wrap(err, "handle tests errors");
}
// test the redirect mechenism.
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/api/v1/tests/redirects", new SrsHttpRedirectHandler("/api/v1/tests/errors", SRS_CONSTS_HTTP_MovedPermanently))) != srs_success) {
return srs_error_wrap(err, "handle tests redirects");
}
// test the http vhost.
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("error.srs.com/api/v1/tests/errors", new SrsGoApiError())) != srs_success) {
return srs_error_wrap(err, "handle tests errors for error.srs.com");
2015-08-10 08:41:25 +00:00
}
// TODO: FIXME: for console.
// TODO: FIXME: support reload.
2015-08-12 04:49:22 +00:00
std::string dir = _srs_config->get_http_stream_dir() + "/console";
2017-06-10 07:20:48 +00:00
if ((err = http_api_mux->handle("/console/", new SrsHttpFileServer(dir))) != srs_success) {
return srs_error_wrap(err, "handle console at %s", dir.c_str());
2015-08-10 08:41:25 +00:00
}
2015-08-12 04:49:22 +00:00
srs_trace("http: api mount /console to %s", dir.c_str());
2017-03-25 09:21:39 +00:00
2017-06-10 07:20:48 +00:00
return err;
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::ingest()
2013-11-23 03:36:07 +00:00
{
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
#ifdef SRS_AUTO_INGEST
if ((err = ingester->start()) != srs_success) {
return srs_error_wrap(err, "ingest start");
2014-04-07 00:41:32 +00:00
}
#endif
2017-03-25 09:21:39 +00:00
return err;
2014-04-07 05:13:57 +00:00
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::cycle()
{
2017-06-10 07:20:48 +00:00
srs_error_t err = do_cycle();
2017-03-25 09:21:39 +00:00
#ifdef SRS_AUTO_GPERF_MC
destroy();
2014-09-10 02:19:48 +00:00
// remark, for gmc, never invoke the exit().
srs_warn("sleep a long time for system st-threads to cleanup.");
srs_usleep(3 * 1000 * 1000);
srs_warn("system quit");
#else
2015-12-11 02:30:13 +00:00
// normally quit with neccessary cleanup by dispose().
srs_warn("main cycle terminated, system quit normally.");
dispose();
2015-06-09 03:19:28 +00:00
srs_trace("srs terminated");
2016-09-05 06:13:37 +00:00
// for valgrind to detect.
srs_freep(_srs_config);
srs_freep(_srs_log);
exit(0);
#endif
2017-06-10 07:20:48 +00:00
return err;
}
void SrsServer::on_signal(int signo)
2017-03-25 09:21:39 +00:00
{
if (signo == SRS_SIGNAL_RELOAD) {
signal_reload = true;
return;
}
#ifndef SRS_AUTO_GPERF_MC
if (signo == SRS_SIGNAL_REOPEN_LOG) {
_srs_log->reopen();
srs_warn("reopen log file");
return;
}
#endif
#ifdef SRS_AUTO_GPERF_MC
if (signo == SRS_SIGNAL_REOPEN_LOG) {
signal_gmc_stop = true;
srs_warn("for gmc, the SIGUSR1 used as SIGINT");
return;
}
#endif
if (signo == SRS_SIGNAL_PERSISTENCE_CONFIG) {
signal_persistence_config = true;
return;
}
if (signo == SIGINT) {
#ifdef SRS_AUTO_GPERF_MC
srs_trace("gmc is on, main cycle will terminate normally.");
signal_gmc_stop = true;
#else
srs_trace("user terminate program");
#ifdef SRS_AUTO_MEM_WATCH
2015-06-07 07:13:41 +00:00
srs_memory_report();
#endif
exit(0);
#endif
return;
}
2015-08-27 10:11:50 +00:00
if (signo == SRS_SIGNAL_GRACEFULLY_QUIT && !signal_gracefully_quit) {
srs_trace("user terminate program, gracefully quit.");
signal_gracefully_quit = true;
return;
}
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::do_cycle()
2014-04-07 05:13:57 +00:00
{
2017-06-10 07:20:48 +00:00
srs_error_t err = srs_success;
2014-04-07 00:41:32 +00:00
2014-04-19 13:23:34 +00:00
// find the max loop
int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);
#ifdef SRS_AUTO_STAT
2014-04-19 13:23:34 +00:00
max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES);
2014-04-19 16:15:26 +00:00
max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);
2014-05-08 07:45:51 +00:00
max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);
#endif
2014-04-19 13:23:34 +00:00
2016-09-23 07:00:50 +00:00
// for asprocess.
bool asprocess = _srs_config->get_asprocess();
2014-03-18 03:32:58 +00:00
// the deamon thread, update the time cache
// TODO: FIXME: use SrsHourGlass.
2014-03-18 03:32:58 +00:00
while (true) {
2017-06-11 01:40:07 +00:00
if (handler && (err = handler->on_cycle()) != srs_success) {
return srs_error_wrap(err, "handle callback");
2015-03-31 10:06:55 +00:00
}
2017-03-25 09:21:39 +00:00
2014-05-19 09:39:01 +00:00
// the interval in config.
int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL);
2014-05-19 09:39:01 +00:00
// dynamic fetch the max.
2015-09-23 09:21:57 +00:00
int dynamic_max = srs_max(max, heartbeat_max_resolution);
2014-05-19 09:39:01 +00:00
2015-09-23 09:21:57 +00:00
for (int i = 0; i < dynamic_max; i++) {
srs_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);
2016-09-23 07:00:50 +00:00
// asprocess check.
if (asprocess && ::getppid() != ppid) {
2017-06-10 07:20:48 +00:00
return srs_error_new(ERROR_ASPROCESS_PPID, "asprocess ppid changed from %d to %d", ppid, ::getppid());
2016-09-23 07:00:50 +00:00
}
// gracefully quit for SIGINT or SIGTERM.
if (signal_gracefully_quit) {
srs_trace("cleanup for gracefully terminate.");
2017-06-10 07:20:48 +00:00
return err;
}
2017-03-25 09:21:39 +00:00
// for gperf heap checker,
// @see: research/gperftools/heap-checker/heap_checker.cc
// if user interrupt the program, exit to check mem leak.
// but, if gperf, use reload to ensure main return normally,
// because directly exit will cause core-dump.
#ifdef SRS_AUTO_GPERF_MC
2014-04-19 13:23:34 +00:00
if (signal_gmc_stop) {
srs_warn("gmc got singal to stop server.");
2017-06-10 07:20:48 +00:00
return err;
2014-04-19 13:23:34 +00:00
}
#endif
2014-03-18 03:32:58 +00:00
// do persistence config to file.
if (signal_persistence_config) {
signal_persistence_config = false;
srs_info("get signal to persistence config to file.");
if ((err = _srs_config->persistence()) != srs_success) {
return srs_error_wrap(err, "config persistence to file");
}
srs_trace("persistence config to file success.");
}
2017-03-25 09:21:39 +00:00
// do reload the config.
if (signal_reload) {
signal_reload = false;
srs_info("get signal to reload the config.");
if ((err = _srs_config->reload()) != srs_success) {
return srs_error_wrap(err, "config reload");
}
srs_trace("reload config success.");
}
// notice the stream sources to cycle.
2017-06-11 01:40:07 +00:00
if ((err = SrsSource::cycle_all()) != srs_success) {
return srs_error_wrap(err, "source cycle");
}
// update the cache time
2014-04-19 16:15:26 +00:00
if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {
srs_info("update current time cache.");
2014-04-19 13:23:34 +00:00
srs_update_system_time_ms();
}
#ifdef SRS_AUTO_STAT
2014-04-19 16:15:26 +00:00
if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {
srs_info("update resource info, rss.");
2014-04-19 13:23:34 +00:00
srs_update_system_rusage();
}
2014-04-19 16:15:26 +00:00
if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {
srs_info("update cpu info, cpu usage.");
2014-04-19 13:43:13 +00:00
srs_update_proc_stat();
2014-03-18 03:32:58 +00:00
}
if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) {
srs_info("update disk info, disk iops.");
srs_update_disk_stat();
}
2014-04-19 16:15:26 +00:00
if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {
srs_info("update memory info, usage/free.");
2014-04-19 16:15:26 +00:00
srs_update_meminfo();
}
2014-05-08 07:45:51 +00:00
if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {
srs_info("update platform info, uptime/load.");
2014-05-08 07:45:51 +00:00
srs_update_platform_info();
}
if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {
srs_info("update network devices info.");
srs_update_network_devices();
}
if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {
2015-03-08 04:55:40 +00:00
srs_info("update network server kbps info.");
resample_kbps();
}
2014-05-19 09:39:01 +00:00
if (_srs_config->get_heartbeat_enabled()) {
if ((i % heartbeat_max_resolution) == 0) {
srs_info("do http heartbeat, for internal server to report.");
2014-05-19 09:39:01 +00:00
http_heartbeat->heartbeat();
}
}
#endif
srs_info("server main thread loop");
2014-03-18 03:32:58 +00:00
}
}
2017-03-25 09:21:39 +00:00
2017-06-10 07:20:48 +00:00
return err;
2013-11-23 03:36:07 +00:00
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::listen_rtmp()
2014-04-12 14:16:39 +00:00
{
int ret = ERROR_SUCCESS;
// stream service port.
std::vector<std::string> ip_ports = _srs_config->get_listens();
srs_assert((int)ip_ports.size() > 0);
2014-04-12 14:16:39 +00:00
close_listeners(SrsListenerRtmpStream);
for (int i = 0; i < (int)ip_ports.size(); i++) {
2015-09-22 00:57:31 +00:00
SrsListener* listener = new SrsBufferListener(this, SrsListenerRtmpStream);
2014-04-12 14:16:39 +00:00
listeners.push_back(listener);
std::string ip;
int port;
srs_parse_endpoint(ip_ports[i], ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
2017-06-10 07:20:48 +00:00
srs_error_new(ret, "rtmp listen %s:%d", ip.c_str(), port);
2014-04-12 14:16:39 +00:00
}
}
2017-06-10 07:20:48 +00:00
return srs_success;
2014-04-12 14:16:39 +00:00
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::listen_http_api()
2014-04-12 14:16:39 +00:00
{
int ret = ERROR_SUCCESS;
close_listeners(SrsListenerHttpApi);
if (_srs_config->get_http_api_enabled()) {
2015-09-22 00:57:31 +00:00
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpApi);
2014-04-12 14:16:39 +00:00
listeners.push_back(listener);
std::string ep = _srs_config->get_http_api_listen();
std::string ip;
int port;
srs_parse_endpoint(ep, ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
2017-06-10 07:20:48 +00:00
return srs_error_new(ret, "http api listen %s:%d", ip.c_str(), port);
2014-04-12 14:16:39 +00:00
}
}
2017-06-10 07:20:48 +00:00
return srs_success;
2014-04-12 14:16:39 +00:00
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::listen_http_stream()
2014-04-12 14:16:39 +00:00
{
int ret = ERROR_SUCCESS;
close_listeners(SrsListenerHttpStream);
if (_srs_config->get_http_stream_enabled()) {
2015-09-22 00:57:31 +00:00
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpStream);
2014-04-12 14:16:39 +00:00
listeners.push_back(listener);
std::string ep = _srs_config->get_http_stream_listen();
std::string ip;
int port;
srs_parse_endpoint(ep, ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
2017-06-10 07:20:48 +00:00
return srs_error_new(ret, "http stream listen %s:%d", ip.c_str(), port);
}
}
2017-06-10 07:20:48 +00:00
return srs_success;
}
2017-06-10 07:20:48 +00:00
srs_error_t SrsServer::listen_stream_caster()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_STREAM_CASTER
close_listeners(SrsListenerMpegTsOverUdp);
std::vector<SrsConfDirective*>::iterator it;
std::vector<SrsConfDirective*> stream_casters = _srs_config->get_stream_casters();
2017-03-25 09:21:39 +00:00
for (it = stream_casters.begin(); it != stream_casters.end(); ++it) {
SrsConfDirective* stream_caster = *it;
if (!_srs_config->get_stream_caster_enabled(stream_caster)) {
continue;
}
2017-03-25 09:21:39 +00:00
SrsListener* listener = NULL;
2017-03-25 09:21:39 +00:00
std::string caster = _srs_config->get_stream_caster_engine(stream_caster);
if (srs_stream_caster_is_udp(caster)) {
listener = new SrsUdpCasterListener(this, SrsListenerMpegTsOverUdp, stream_caster);
} else if (srs_stream_caster_is_rtsp(caster)) {
listener = new SrsRtspListener(this, SrsListenerRtsp, stream_caster);
} else if (srs_stream_caster_is_flv(caster)) {
listener = new SrsHttpFlvListener(this, SrsListenerFlv, stream_caster);
} else {
2017-06-10 07:20:48 +00:00
return srs_error_new(ERROR_STREAM_CASTER_ENGINE, "invalid caster %s", caster.c_str());
}
srs_assert(listener != NULL);
2017-03-25 09:21:39 +00:00
listeners.push_back(listener);
int port = _srs_config->get_stream_caster_listen(stream_caster);
if (port <= 0) {
2017-06-10 07:20:48 +00:00
return srs_error_new(ERROR_STREAM_CASTER_PORT, "invalid port=%d", port);
}
2015-04-03 03:30:59 +00:00
// TODO: support listen at <[ip:]port>
if ((ret = listener->listen("0.0.0.0", port)) != ERROR_SUCCESS) {
2017-06-10 07:20:48 +00:00
return srs_error_new(ret, "listen at %d", port);
2014-04-12 14:16:39 +00:00
}
}
#endif
2017-06-10 07:20:48 +00:00
return srs_success;
2014-04-12 14:16:39 +00:00
}
void SrsServer::close_listeners(SrsListenerType type)
2013-11-23 03:36:07 +00:00
{
2014-03-18 03:32:58 +00:00
std::vector<SrsListener*>::iterator it;
2014-04-12 14:16:39 +00:00
for (it = listeners.begin(); it != listeners.end();) {
2014-03-18 03:32:58 +00:00
SrsListener* listener = *it;
2014-04-12 14:16:39 +00:00
if (listener->listen_type() != type) {
2014-04-12 14:16:39 +00:00
++it;
continue;
}
2014-03-18 03:32:58 +00:00
srs_freep(listener);
2014-04-12 14:16:39 +00:00
it = listeners.erase(it);
2014-03-18 03:32:58 +00:00
}
2013-11-23 03:36:07 +00:00
}
2015-03-08 04:55:40 +00:00
void SrsServer::resample_kbps()
{
SrsStatistic* stat = SrsStatistic::instance();
2015-03-08 04:55:40 +00:00
// collect delta from all clients.
for (std::vector<SrsConnection*>::iterator it = conns.begin(); it != conns.end(); ++it) {
SrsConnection* conn = *it;
2015-03-08 04:55:40 +00:00
// add delta of connection to server kbps.,
// for next sample() of server kbps can get the stat.
stat->kbps_add_delta(conn);
}
2015-03-08 04:55:40 +00:00
// TODO: FXME: support all other connections.
2017-03-25 09:21:39 +00:00
2015-03-08 04:55:40 +00:00
// sample the kbps, get the stat.
SrsKbps* kbps = stat->kbps_sample();
srs_update_rtmp_server((int)conns.size(), kbps);
}
int SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
2013-11-23 03:36:07 +00:00
{
2014-03-18 03:32:58 +00:00
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
2014-03-18 03:32:58 +00:00
2015-12-24 09:25:05 +00:00
SrsConnection* conn = fd2conn(type, stfd);
if (conn == NULL) {
srs_close_stfd(stfd);
return ERROR_SUCCESS;
}
srs_assert(conn);
// directly enqueue, the cycle thread will remove the client.
conns.push_back(conn);
srs_verbose("add conn to vector.");
// cycle will start process thread and when finished remove the client.
// @remark never use the conn, for it maybe destroyed.
if ((err = conn->start()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
2015-12-24 09:25:05 +00:00
return ret;
}
srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);
return ret;
}
SrsConnection* SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd)
2015-12-24 09:25:05 +00:00
{
int ret = ERROR_SUCCESS;
int fd = srs_netfd_fileno(stfd);
2015-12-24 09:25:05 +00:00
string ip = srs_get_peer_ip(fd);
// for some keep alive application, for example, the keepalived,
// will send some tcp packet which we cann't got the ip,
// we just ignore it.
if (ip.empty()) {
srs_info("ignore empty ip client, fd=%d.", fd);
return NULL;
}
2017-03-25 09:21:39 +00:00
2015-12-08 10:32:37 +00:00
// check connection limitation.
2014-03-18 03:32:58 +00:00
int max_connections = _srs_config->get_max_connections();
2015-12-08 10:32:37 +00:00
if (handler && (ret = handler->on_accept_client(max_connections, (int)conns.size()) != ERROR_SUCCESS)) {
2015-12-24 09:25:05 +00:00
srs_error("handle accept client failed, drop client: clients=%d, max=%d, fd=%d. ret=%d", (int)conns.size(), max_connections, fd, ret);
return NULL;
2015-12-08 10:32:37 +00:00
}
2014-03-18 03:32:58 +00:00
if ((int)conns.size() >= max_connections) {
2015-12-24 09:25:05 +00:00
srs_error("exceed the max connections, drop client: clients=%d, max=%d, fd=%d", (int)conns.size(), max_connections, fd);
return NULL;
2014-03-18 03:32:58 +00:00
}
// avoid fd leak when fork.
// @see https://github.com/ossrs/srs/issues/518
if (true) {
int val;
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
ret = ERROR_SYSTEM_PID_GET_FILE_INFO;
srs_error("fnctl F_GETFD error! fd=%d. ret=%#x", fd, ret);
2015-12-24 09:25:05 +00:00
return NULL;
}
val |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, val) < 0) {
ret = ERROR_SYSTEM_PID_SET_FILE_INFO;
srs_error("fcntl F_SETFD error! fd=%d ret=%#x", fd, ret);
2015-12-24 09:25:05 +00:00
return NULL;
}
2015-11-16 07:47:17 +00:00
}
2014-03-18 03:32:58 +00:00
SrsConnection* conn = NULL;
2015-12-24 09:25:05 +00:00
2014-03-27 04:27:47 +00:00
if (type == SrsListenerRtmpStream) {
2016-12-08 03:44:49 +00:00
conn = new SrsRtmpConn(this, stfd, ip);
2014-03-27 04:14:04 +00:00
} else if (type == SrsListenerHttpApi) {
2015-12-24 09:25:05 +00:00
conn = new SrsHttpApi(this, stfd, http_api_mux, ip);
2014-03-27 04:14:04 +00:00
} else if (type == SrsListenerHttpStream) {
2015-12-24 09:25:05 +00:00
conn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip);
2014-03-18 03:32:58 +00:00
} else {
srs_warn("close for no service handler. fd=%d, ip=%s", fd, ip.c_str());
2016-12-08 09:26:04 +00:00
srs_close_stfd(stfd);
return NULL;
2014-03-18 03:32:58 +00:00
}
2015-12-24 09:25:05 +00:00
return conn;
2013-11-23 03:36:07 +00:00
}
2017-03-26 05:40:39 +00:00
void SrsServer::remove(ISrsConnection* c)
2015-09-23 09:21:57 +00:00
{
2017-03-26 05:40:39 +00:00
SrsConnection* conn = dynamic_cast<SrsConnection*>(c);
2015-09-23 09:21:57 +00:00
std::vector<SrsConnection*>::iterator it = std::find(conns.begin(), conns.end(), conn);
// removed by destroy, ignore.
if (it == conns.end()) {
srs_warn("server moved connection, ignore.");
return;
}
conns.erase(it);
srs_info("conn removed. conns=%d", (int)conns.size());
SrsStatistic* stat = SrsStatistic::instance();
stat->kbps_add_delta(conn);
stat->on_disconnect(conn->srs_id());
// all connections are created by server,
// so we free it here.
conn_manager->remove(c);
2015-09-23 09:21:57 +00:00
}
2013-11-23 03:36:07 +00:00
int SrsServer::on_reload_listen()
{
2017-06-10 07:20:48 +00:00
// TODO: FIXME: Use error.
srs_error_t err = listen();
int ret = srs_error_code(err);
srs_freep(err);
return ret;
2013-11-23 03:36:07 +00:00
}
2014-04-12 12:46:32 +00:00
int SrsServer::on_reload_pid()
{
if (pid_fd > 0) {
::close(pid_fd);
pid_fd = -1;
}
2017-06-10 07:20:48 +00:00
// TODO: FIXME: Use error.
srs_error_t err = acquire_pid_file();
int ret = srs_error_code(err);
srs_freep(err);
return ret;
2014-04-12 12:46:32 +00:00
}
2014-04-13 05:08:10 +00:00
2014-04-13 05:27:51 +00:00
int SrsServer::on_reload_vhost_added(std::string vhost)
{
int ret = ERROR_SUCCESS;
if (!_srs_config->get_vhost_http_enabled(vhost)) {
return ret;
}
// TODO: FIXME: should handle the event in SrsHttpStaticServer
2014-04-13 05:27:51 +00:00
if ((ret = on_reload_vhost_http_updated()) != ERROR_SUCCESS) {
return ret;
}
2017-03-25 09:21:39 +00:00
2014-04-13 05:27:51 +00:00
return ret;
}
int SrsServer::on_reload_vhost_removed(std::string /*vhost*/)
2014-04-13 05:27:51 +00:00
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: should handle the event in SrsHttpStaticServer
2014-04-13 05:27:51 +00:00
if ((ret = on_reload_vhost_http_updated()) != ERROR_SUCCESS) {
return ret;
}
2017-03-25 09:21:39 +00:00
2014-04-13 05:27:51 +00:00
return ret;
}
2014-04-13 05:08:10 +00:00
int SrsServer::on_reload_http_api_enabled()
{
2017-06-10 07:20:48 +00:00
// TODO: FIXME: Use error.
srs_error_t err = listen_http_api();
int ret = srs_error_code(err);
srs_freep(err);
return ret;
2014-04-13 05:08:10 +00:00
}
int SrsServer::on_reload_http_api_disabled()
{
close_listeners(SrsListenerHttpApi);
return ERROR_SUCCESS;
2014-04-13 05:27:51 +00:00
}
int SrsServer::on_reload_http_stream_enabled()
{
2017-06-10 07:20:48 +00:00
// TODO: FIXME: Use error.
srs_error_t err = listen_http_stream();
int ret = srs_error_code(err);
srs_freep(err);
return ret;
2014-04-13 05:27:51 +00:00
}
int SrsServer::on_reload_http_stream_disabled()
{
close_listeners(SrsListenerHttpStream);
return ERROR_SUCCESS;
2014-04-13 05:27:51 +00:00
}
// TODO: FIXME: rename to http_remux
2014-04-13 05:27:51 +00:00
int SrsServer::on_reload_http_stream_updated()
{
int ret = ERROR_SUCCESS;
if ((ret = on_reload_http_stream_enabled()) != ERROR_SUCCESS) {
return ret;
}
// TODO: FIXME: should handle the event in SrsHttpStaticServer
2014-04-13 05:27:51 +00:00
if ((ret = on_reload_vhost_http_updated()) != ERROR_SUCCESS) {
return ret;
}
2014-04-13 05:08:10 +00:00
return ret;
}
2014-08-02 14:18:39 +00:00
int SrsServer::on_publish(SrsSource* s, SrsRequest* r)
{
int ret = ERROR_SUCCESS;
if ((ret = http_server->http_mount(s, r)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
void SrsServer::on_unpublish(SrsSource* s, SrsRequest* r)
{
http_server->http_unmount(s, r);
}