1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

merge from bravo system, add the rtmfp to bms(commercial srs). 2.0.163.

This commit is contained in:
winlin 2015-05-19 18:06:20 +08:00
parent 0f7cafe50b
commit 44bc7976ac
17 changed files with 317 additions and 270 deletions

View file

@ -441,10 +441,22 @@ int SrsConfig::reload_conf(SrsConfig* conf)
// daemon
//
// always support reload without additional code:
// chunk_size, ff_log_dir, max_connections,
// chunk_size, ff_log_dir,
// bandcheck, http_hooks, heartbeat,
// token_traverse, debug_srs_upnode,
// security
// merge config: max_connections
if (!srs_directive_equals(root->get("max_connections"), old_root->get("max_connections"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_max_conns()) != ERROR_SUCCESS) {
srs_error("notify subscribes reload max_connections failed. ret=%d", ret);
return ret;
}
}
srs_trace("reload max_connections success.");
}
// merge config: listen
if (!srs_directive_equals(root->get("listen"), old_root->get("listen"))) {

View file

@ -40,6 +40,11 @@ int ISrsReloadHandler::on_reload_listen()
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_max_conns()
{
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_pid()
{
return ERROR_SUCCESS;

View file

@ -44,6 +44,7 @@ public:
ISrsReloadHandler();
virtual ~ISrsReloadHandler();
public:
virtual int on_reload_max_conns();
virtual int on_reload_listen();
virtual int on_reload_pid();
virtual int on_reload_log_tank();
@ -55,6 +56,7 @@ public:
virtual int on_reload_http_stream_enabled();
virtual int on_reload_http_stream_disabled();
virtual int on_reload_http_stream_updated();
public:
virtual int on_reload_vhost_http_updated();
virtual int on_reload_vhost_http_remux_updated();
virtual int on_reload_vhost_added(std::string vhost);

View file

@ -1068,24 +1068,24 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag
return ret;
}
// pause or other msg.
// pause
SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);
if (!pause) {
srs_info("ignore all amf0/amf3 command except pause.");
if (pause) {
if ((ret = rtmp->on_play_client_pause(res->stream_id, pause->is_pause)) != ERROR_SUCCESS) {
srs_error("rtmp process play client pause failed. ret=%d", ret);
return ret;
}
if ((ret = consumer->on_play_client_pause(pause->is_pause)) != ERROR_SUCCESS) {
srs_error("consumer process play client pause failed. ret=%d", ret);
return ret;
}
srs_info("process pause success, is_pause=%d, time=%d.", pause->is_pause, pause->time_ms);
return ret;
}
if ((ret = rtmp->on_play_client_pause(res->stream_id, pause->is_pause)) != ERROR_SUCCESS) {
srs_error("rtmp process play client pause failed. ret=%d", ret);
return ret;
}
if ((ret = consumer->on_play_client_pause(pause->is_pause)) != ERROR_SUCCESS) {
srs_error("consumer process play client pause failed. ret=%d", ret);
return ret;
}
srs_info("process pause success, is_pause=%d, time=%d.", pause->is_pause, pause->time_ms);
// other msg.
srs_info("ignore all amf0/amf3 command except pause and video control.");
return ret;
}

View file

@ -113,23 +113,23 @@ std::string srs_listener_type2string(SrsListenerType type)
}
}
SrsListener::SrsListener(SrsServer* server, SrsListenerType type)
SrsListener::SrsListener(SrsServer* svr, SrsListenerType t)
{
_port = 0;
_server = server;
_type = type;
port = 0;
server = svr;
type = t;
}
SrsListener::~SrsListener()
{
}
SrsListenerType SrsListener::type()
SrsListenerType SrsListener::listen_type()
{
return _type;
return type;
}
SrsStreamListener::SrsStreamListener(SrsServer* server, SrsListenerType type) : SrsListener(server, type)
SrsStreamListener::SrsStreamListener(SrsServer* svr, SrsListenerType t) : SrsListener(svr, t)
{
listener = NULL;
}
@ -139,12 +139,12 @@ SrsStreamListener::~SrsStreamListener()
srs_freep(listener);
}
int SrsStreamListener::listen(string ip, int port)
int SrsStreamListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
_ip = ip;
_port = port;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
@ -158,7 +158,7 @@ int SrsStreamListener::listen(string ip, int port)
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, ip.c_str(), port);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(_type).c_str(), ip.c_str(), _port, listener->fd());
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
@ -167,7 +167,7 @@ int SrsStreamListener::on_tcp_client(st_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
if ((ret = _server->accept_client(_type, stfd)) != ERROR_SUCCESS) {
if ((ret = server->accept_client(type, stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
}
@ -176,14 +176,14 @@ int SrsStreamListener::on_tcp_client(st_netfd_t stfd)
}
#ifdef SRS_AUTO_STREAM_CASTER
SrsRtspListener::SrsRtspListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type)
SrsRtspListener::SrsRtspListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t)
{
listener = NULL;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerRtsp);
if (_type == SrsListenerRtsp) {
srs_assert(type == SrsListenerRtsp);
if (type == SrsListenerRtsp) {
caster = new SrsRtspCaster(c);
}
}
@ -194,16 +194,16 @@ SrsRtspListener::~SrsRtspListener()
srs_freep(listener);
}
int SrsRtspListener::listen(string ip, int port)
int SrsRtspListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerRtsp);
srs_assert(type == SrsListenerRtsp);
_ip = ip;
_port = port;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
@ -215,9 +215,9 @@ int SrsRtspListener::listen(string ip, int port)
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, ip.c_str(), port);
pthread->cid(), _srs_context->get_id(), port, type, fd, ip.c_str(), port);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(_type).c_str(), ip.c_str(), _port, listener->fd());
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
@ -234,14 +234,14 @@ int SrsRtspListener::on_tcp_client(st_netfd_t stfd)
return ret;
}
SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type)
SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t)
{
listener = NULL;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerFlv);
if (_type == SrsListenerFlv) {
srs_assert(type == SrsListenerFlv);
if (type == SrsListenerFlv) {
caster = new SrsAppCasterFlv(c);
}
}
@ -252,16 +252,16 @@ SrsHttpFlvListener::~SrsHttpFlvListener()
srs_freep(listener);
}
int SrsHttpFlvListener::listen(string ip, int port)
int SrsHttpFlvListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerFlv);
srs_assert(type == SrsListenerFlv);
_ip = ip;
_port = port;
ip = i;
port = p;
if ((ret = caster->initialize()) != ERROR_SUCCESS) {
return ret;
@ -277,9 +277,9 @@ int SrsHttpFlvListener::listen(string ip, int port)
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, ip.c_str(), port);
pthread->cid(), _srs_context->get_id(), port, type, fd, ip.c_str(), port);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(_type).c_str(), ip.c_str(), _port, listener->fd());
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
@ -295,36 +295,29 @@ int SrsHttpFlvListener::on_tcp_client(st_netfd_t stfd)
return ret;
}
#endif
SrsUdpCasterListener::SrsUdpCasterListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type)
SrsUdpStreamListener::SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c) : SrsListener(svr, t)
{
_type = type;
listener = NULL;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerMpegTsOverUdp);
if (_type == SrsListenerMpegTsOverUdp) {
caster = new SrsMpegtsOverUdp(c);
}
caster = c;
}
SrsUdpCasterListener::~SrsUdpCasterListener()
SrsUdpStreamListener::~SrsUdpStreamListener()
{
srs_freep(caster);
srs_freep(listener);
}
int SrsUdpCasterListener::listen(string ip, int port)
int SrsUdpStreamListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerMpegTsOverUdp);
srs_assert(type == SrsListenerMpegTsOverUdp);
_ip = ip;
_port = port;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsUdpListener(caster, ip, port);
@ -336,12 +329,28 @@ int SrsUdpCasterListener::listen(string ip, int port)
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, ip.c_str(), port);
pthread->cid(), _srs_context->get_id(), port, type, fd, ip.c_str(), port);
srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(_type).c_str(), ip.c_str(), _port, listener->fd());
srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
#ifdef SRS_AUTO_STREAM_CASTER
SrsUdpCasterListener::SrsUdpCasterListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsUdpStreamListener(svr, t, NULL)
{
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(type == SrsListenerMpegTsOverUdp);
if (type == SrsListenerMpegTsOverUdp) {
caster = new SrsMpegtsOverUdp(c);
}
}
SrsUdpCasterListener::~SrsUdpCasterListener()
{
srs_freep(caster);
}
#endif
SrsSignalManager* SrsSignalManager::instance = NULL;
@ -588,6 +597,34 @@ int SrsServer::initialize(ISrsServerCycle* cycle_handler)
return ret;
}
int SrsServer::initialize_st()
{
int ret = ERROR_SUCCESS;
// init st
if ((ret = srs_init_st()) != ERROR_SUCCESS) {
srs_error("init st failed. ret=%d", ret);
return ret;
}
// @remark, st alloc segment use mmap, which only support 32757 threads,
// if need to support more, for instance, 100k threads, define the macro MALLOC_STACK.
// TODO: FIXME: maybe can use "sysctl vm.max_map_count" to refine.
if (_srs_config->get_max_connections() > 32756) {
ret = ERROR_ST_EXCEED_THREADS;
srs_error("st mmap for stack allocation must <= %d threads, "
"@see Makefile of st for MALLOC_STACK, please build st manually by "
"\"make EXTRA_CFLAGS=-DMALLOC_STACK linux-debug\", ret=%d", ret);
return ret;
}
// set current log id.
_srs_context->generate_id();
srs_trace("server main cid=%d", _srs_context->get_id());
return ret;
}
int SrsServer::initialize_signal()
{
return signal_manager->initialize();
@ -669,34 +706,6 @@ int SrsServer::acquire_pid_file()
return ret;
}
int SrsServer::initialize_st()
{
int ret = ERROR_SUCCESS;
// init st
if ((ret = srs_init_st()) != ERROR_SUCCESS) {
srs_error("init st failed. ret=%d", ret);
return ret;
}
// @remark, st alloc segment use mmap, which only support 32757 threads,
// if need to support more, for instance, 100k threads, define the macro MALLOC_STACK.
// TODO: FIXME: maybe can use "sysctl vm.max_map_count" to refine.
if (_srs_config->get_max_connections() > 32756) {
ret = ERROR_ST_EXCEED_THREADS;
srs_error("st mmap for stack allocation must <= %d threads, "
"@see Makefile of st for MALLOC_STACK, please build st manually by "
"\"make EXTRA_CFLAGS=-DMALLOC_STACK linux-debug\", ret=%d", ret);
return ret;
}
// set current log id.
_srs_context->generate_id();
srs_trace("server main cid=%d", _srs_context->get_id());
return ret;
}
int SrsServer::listen()
{
int ret = ERROR_SUCCESS;
@ -959,6 +968,7 @@ int SrsServer::do_cycle()
}
#endif
#endif
srs_info("server main thread loop");
}
}
@ -1103,7 +1113,7 @@ void SrsServer::close_listeners(SrsListenerType type)
for (it = listeners.begin(); it != listeners.end();) {
SrsListener* listener = *it;
if (listener->type() != type) {
if (listener->listen_type() != type) {
++it;
continue;
}
@ -1264,7 +1274,7 @@ int SrsServer::on_reload_http_stream_enabled()
#ifdef SRS_AUTO_HTTP_SERVER
ret = listen_http_stream();
#endif
return ret;
}

View file

@ -80,17 +80,17 @@ enum SrsListenerType
class SrsListener
{
protected:
SrsListenerType _type;
SrsListenerType type;
protected:
std::string _ip;
int _port;
SrsServer* _server;
std::string ip;
int port;
SrsServer* server;
public:
SrsListener(SrsServer* server, SrsListenerType type);
SrsListener(SrsServer* svr, SrsListenerType t);
virtual ~SrsListener();
public:
virtual SrsListenerType type();
virtual int listen(std::string ip, int port) = 0;
virtual SrsListenerType listen_type();
virtual int listen(std::string i, int p) = 0;
};
/**
@ -120,10 +120,10 @@ private:
SrsTcpListener* listener;
ISrsTcpHandler* caster;
public:
SrsRtspListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c);
SrsRtspListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c);
virtual ~SrsRtspListener();
public:
virtual int listen(std::string ip, int port);
virtual int listen(std::string i, int p);
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
@ -138,28 +138,40 @@ private:
SrsTcpListener* listener;
SrsAppCasterFlv* caster;
public:
SrsHttpFlvListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c);
SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c);
virtual ~SrsHttpFlvListener();
public:
virtual int listen(std::string ip, int port);
virtual int listen(std::string i, int p);
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
};
#endif
/**
* the udp listener, for udp server.
*/
class SrsUdpCasterListener : public SrsListener
* the udp listener, for udp server.
*/
class SrsUdpStreamListener : public SrsListener
{
private:
protected:
SrsUdpListener* listener;
ISrsUdpHandler* caster;
public:
SrsUdpCasterListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c);
virtual ~SrsUdpCasterListener();
SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c);
virtual ~SrsUdpStreamListener();
public:
virtual int listen(std::string ip, int port);
virtual int listen(std::string i, int p);
};
/**
* the udp listener, for udp stream caster server.
*/
#ifdef SRS_AUTO_STREAM_CASTER
class SrsUdpCasterListener : public SrsUdpStreamListener
{
public:
SrsUdpCasterListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c);
virtual ~SrsUdpCasterListener();
};
#endif
@ -337,7 +349,7 @@ public:
* @param client_stfd, the client fd in st boxed, the underlayer fd.
*/
virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd);
// interface ISrsThreadHandler.
// interface ISrsReloadHandler.
public:
virtual int on_reload_listen();
virtual int on_reload_pid();