mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
GB28181: Refine SRS listeners without wrapper.
This commit is contained in:
parent
b452144fb7
commit
173c683566
13 changed files with 601 additions and 762 deletions
|
@ -41,199 +41,6 @@ using namespace std;
|
|||
#include <srs_app_rtc_network.hpp>
|
||||
#endif
|
||||
|
||||
std::string srs_listener_type2string(SrsListenerType type)
|
||||
{
|
||||
switch (type) {
|
||||
case SrsListenerRtmpStream:
|
||||
return "RTMP";
|
||||
case SrsListenerHttpApi:
|
||||
return "HTTP-API";
|
||||
case SrsListenerHttpsApi:
|
||||
return "HTTPS-API";
|
||||
case SrsListenerHttpStream:
|
||||
return "HTTP-Server";
|
||||
case SrsListenerHttpsStream:
|
||||
return "HTTPS-Server";
|
||||
case SrsListenerMpegTsOverUdp:
|
||||
return "MPEG-TS over UDP";
|
||||
case SrsListenerFlv:
|
||||
return "HTTP-FLV";
|
||||
case SrsListenerTcp:
|
||||
return "TCP";
|
||||
default:
|
||||
return "UNKONWN";
|
||||
}
|
||||
}
|
||||
|
||||
SrsListener::SrsListener(SrsServer* svr, SrsListenerType t)
|
||||
{
|
||||
port = 0;
|
||||
server = svr;
|
||||
type = t;
|
||||
}
|
||||
|
||||
SrsListener::~SrsListener()
|
||||
{
|
||||
}
|
||||
|
||||
SrsListenerType SrsListener::listen_type()
|
||||
{
|
||||
return type;
|
||||
}
|
||||
|
||||
SrsBufferListener::SrsBufferListener(SrsServer* svr, SrsListenerType t) : SrsListener(svr, t)
|
||||
{
|
||||
listener = NULL;
|
||||
}
|
||||
|
||||
SrsBufferListener::~SrsBufferListener()
|
||||
{
|
||||
srs_freep(listener);
|
||||
}
|
||||
|
||||
srs_error_t SrsBufferListener::listen(string i, int p)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
ip = i;
|
||||
port = p;
|
||||
|
||||
srs_freep(listener);
|
||||
listener = new SrsTcpListener(this, ip, port);
|
||||
|
||||
if ((err = listener->listen()) != srs_success) {
|
||||
return srs_error_wrap(err, "buffered tcp listen");
|
||||
}
|
||||
|
||||
string v = srs_listener_type2string(type);
|
||||
srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
|
||||
{
|
||||
srs_error_t err = server->accept_client(type, stfd);
|
||||
if (err != srs_success) {
|
||||
srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
|
||||
srs_freep(err);
|
||||
}
|
||||
|
||||
return srs_success;
|
||||
}
|
||||
|
||||
SrsHttpFlvListener::SrsHttpFlvListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsListener(svr, t)
|
||||
{
|
||||
listener = NULL;
|
||||
|
||||
// the caller already ensure the type is ok,
|
||||
// we just assert here for unknown stream caster.
|
||||
srs_assert(type == SrsListenerFlv);
|
||||
if (type == SrsListenerFlv) {
|
||||
caster = new SrsAppCasterFlv(c);
|
||||
}
|
||||
}
|
||||
|
||||
SrsHttpFlvListener::~SrsHttpFlvListener()
|
||||
{
|
||||
srs_freep(caster);
|
||||
srs_freep(listener);
|
||||
}
|
||||
|
||||
srs_error_t SrsHttpFlvListener::listen(string i, int p)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// the caller already ensure the type is ok,
|
||||
// we just assert here for unknown stream caster.
|
||||
srs_assert(type == SrsListenerFlv);
|
||||
|
||||
ip = i;
|
||||
port = p;
|
||||
|
||||
if ((err = caster->initialize()) != srs_success) {
|
||||
return srs_error_wrap(err, "init caster %s:%d", ip.c_str(), port);
|
||||
}
|
||||
|
||||
srs_freep(listener);
|
||||
listener = new SrsTcpListener(this, ip, port);
|
||||
|
||||
if ((err = listener->listen()) != srs_success) {
|
||||
return srs_error_wrap(err, "listen");
|
||||
}
|
||||
|
||||
string v = srs_listener_type2string(type);
|
||||
srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsHttpFlvListener::on_tcp_client(srs_netfd_t stfd)
|
||||
{
|
||||
srs_error_t err = caster->on_tcp_client(stfd);
|
||||
if (err != srs_success) {
|
||||
srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
|
||||
srs_freep(err);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
SrsUdpStreamListener::SrsUdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c) : SrsListener(svr, t)
|
||||
{
|
||||
listener = NULL;
|
||||
caster = c;
|
||||
}
|
||||
|
||||
SrsUdpStreamListener::~SrsUdpStreamListener()
|
||||
{
|
||||
srs_freep(listener);
|
||||
}
|
||||
|
||||
srs_error_t SrsUdpStreamListener::listen(string i, int p)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// the caller already ensure the type is ok,
|
||||
// we just assert here for unknown stream caster.
|
||||
srs_assert(type == SrsListenerMpegTsOverUdp);
|
||||
|
||||
ip = i;
|
||||
port = p;
|
||||
|
||||
srs_freep(listener);
|
||||
listener = new SrsUdpListener(caster, ip, port);
|
||||
|
||||
if ((err = listener->listen()) != srs_success) {
|
||||
return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
|
||||
}
|
||||
|
||||
// notify the handler the fd changed.
|
||||
if ((err = caster->on_stfd_change(listener->stfd())) != srs_success) {
|
||||
return srs_error_wrap(err, "notify fd change failed");
|
||||
}
|
||||
|
||||
string v = srs_listener_type2string(type);
|
||||
srs_trace("%s listen at udp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
SrsUdpCasterListener::SrsUdpCasterListener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsUdpStreamListener(svr, t, NULL)
|
||||
{
|
||||
// the caller already ensure the type is ok,
|
||||
// we just assert here for unknown stream caster.
|
||||
srs_assert(type == SrsListenerMpegTsOverUdp);
|
||||
if (type == SrsListenerMpegTsOverUdp) {
|
||||
caster = new SrsMpegtsOverUdp(c);
|
||||
}
|
||||
}
|
||||
|
||||
SrsUdpCasterListener::~SrsUdpCasterListener()
|
||||
{
|
||||
srs_freep(caster);
|
||||
}
|
||||
|
||||
SrsSignalManager* SrsSignalManager::instance = NULL;
|
||||
|
||||
SrsSignalManager::SrsSignalManager(SrsServer* s)
|
||||
|
@ -506,14 +313,6 @@ srs_error_t SrsInotifyWorker::cycle()
|
|||
return err;
|
||||
}
|
||||
|
||||
ISrsServerCycle::ISrsServerCycle()
|
||||
{
|
||||
}
|
||||
|
||||
ISrsServerCycle::~ISrsServerCycle()
|
||||
{
|
||||
}
|
||||
|
||||
SrsServer::SrsServer()
|
||||
{
|
||||
signal_reload = false;
|
||||
|
@ -526,8 +325,14 @@ SrsServer::SrsServer()
|
|||
signal_manager = new SrsSignalManager(this);
|
||||
conn_manager = new SrsResourceManager("TCP", true);
|
||||
latest_version_ = new SrsLatestVersion();
|
||||
|
||||
handler = NULL;
|
||||
rtmp_listener_ = new SrsMultipleTcpListeners(this);
|
||||
api_listener_ = new SrsTcpListener(this);
|
||||
apis_listener_ = new SrsTcpListener(this);
|
||||
http_listener_ = new SrsTcpListener(this);
|
||||
https_listener_ = new SrsTcpListener(this);
|
||||
webrtc_listener_ = new SrsTcpListener(this);
|
||||
stream_caster_flv_listener_ = new SrsHttpFlvListener();
|
||||
stream_caster_mpegts_ = new SrsUdpCasterListener();
|
||||
ppid = ::getppid();
|
||||
|
||||
// donot new object in constructor,
|
||||
|
@ -576,22 +381,30 @@ void SrsServer::destroy()
|
|||
srs_freep(signal_manager);
|
||||
srs_freep(latest_version_);
|
||||
srs_freep(conn_manager);
|
||||
srs_freep(rtmp_listener_);
|
||||
srs_freep(api_listener_);
|
||||
srs_freep(apis_listener_);
|
||||
srs_freep(http_listener_);
|
||||
srs_freep(https_listener_);
|
||||
srs_freep(webrtc_listener_);
|
||||
srs_freep(stream_caster_flv_listener_);
|
||||
srs_freep(stream_caster_mpegts_);
|
||||
}
|
||||
|
||||
void SrsServer::dispose()
|
||||
{
|
||||
_srs_config->unsubscribe(this);
|
||||
|
||||
// prevent fresh clients.
|
||||
close_listeners(SrsListenerRtmpStream);
|
||||
close_listeners(SrsListenerHttpApi);
|
||||
close_listeners(SrsListenerHttpsApi);
|
||||
close_listeners(SrsListenerHttpStream);
|
||||
close_listeners(SrsListenerHttpsStream);
|
||||
close_listeners(SrsListenerMpegTsOverUdp);
|
||||
close_listeners(SrsListenerFlv);
|
||||
close_listeners(SrsListenerTcp);
|
||||
|
||||
// Destroy all listeners.
|
||||
rtmp_listener_->close();
|
||||
api_listener_->close();
|
||||
apis_listener_->close();
|
||||
http_listener_->close();
|
||||
https_listener_->close();
|
||||
webrtc_listener_->close();
|
||||
stream_caster_flv_listener_->close();
|
||||
stream_caster_mpegts_->close();
|
||||
|
||||
// Fast stop to notify FFMPEG to quit, wait for a while then fast kill.
|
||||
ingester->dispose();
|
||||
|
||||
|
@ -609,15 +422,15 @@ void SrsServer::gracefully_dispose()
|
|||
srs_usleep(_srs_config->get_grace_start_wait());
|
||||
srs_trace("start wait for %dms", srsu2msi(_srs_config->get_grace_start_wait()));
|
||||
|
||||
// prevent fresh clients.
|
||||
close_listeners(SrsListenerRtmpStream);
|
||||
close_listeners(SrsListenerHttpApi);
|
||||
close_listeners(SrsListenerHttpsApi);
|
||||
close_listeners(SrsListenerHttpStream);
|
||||
close_listeners(SrsListenerHttpsStream);
|
||||
close_listeners(SrsListenerMpegTsOverUdp);
|
||||
close_listeners(SrsListenerFlv);
|
||||
close_listeners(SrsListenerTcp);
|
||||
// Destroy all listeners.
|
||||
rtmp_listener_->close();
|
||||
api_listener_->close();
|
||||
apis_listener_->close();
|
||||
http_listener_->close();
|
||||
https_listener_->close();
|
||||
webrtc_listener_->close();
|
||||
stream_caster_flv_listener_->close();
|
||||
stream_caster_mpegts_->close();
|
||||
srs_trace("listeners closed");
|
||||
|
||||
// Fast stop to notify FFMPEG to quit, wait for a while then fast kill.
|
||||
|
@ -644,7 +457,7 @@ void SrsServer::gracefully_dispose()
|
|||
srs_trace("final wait for %dms", srsu2msi(_srs_config->get_grace_final_wait()));
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::initialize(ISrsServerCycle* ch)
|
||||
srs_error_t SrsServer::initialize()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
|
@ -653,11 +466,6 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch)
|
|||
// instead, subscribe handler in initialize method.
|
||||
srs_assert(_srs_config);
|
||||
_srs_config->subscribe(this);
|
||||
|
||||
handler = ch;
|
||||
if(handler && (err = handler->initialize()) != srs_success){
|
||||
return srs_error_wrap(err, "handler initialize");
|
||||
}
|
||||
|
||||
bool stream = _srs_config->get_http_stream_enabled();
|
||||
string http_listen = _srs_config->get_http_stream_listen();
|
||||
|
@ -742,51 +550,92 @@ srs_error_t SrsServer::initialize_signal()
|
|||
srs_error_t SrsServer::listen()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
if ((err = listen_rtmp()) != srs_success) {
|
||||
|
||||
// Create RTMP listeners.
|
||||
rtmp_listener_->add(_srs_config->get_listens())->set_label("RTMP");
|
||||
if ((err = rtmp_listener_->listen()) != srs_success) {
|
||||
return srs_error_wrap(err, "rtmp listen");
|
||||
}
|
||||
|
||||
if ((err = listen_http_api()) != srs_success) {
|
||||
return srs_error_wrap(err, "http api listen");
|
||||
}
|
||||
|
||||
if ((err = listen_https_api()) != srs_success) {
|
||||
return srs_error_wrap(err, "https api listen");
|
||||
}
|
||||
|
||||
if ((err = listen_http_stream()) != srs_success) {
|
||||
return srs_error_wrap(err, "http stream listen");
|
||||
}
|
||||
|
||||
if ((err = listen_https_stream()) != srs_success) {
|
||||
return srs_error_wrap(err, "https stream listen");
|
||||
}
|
||||
|
||||
if ((err = listen_stream_caster()) != srs_success) {
|
||||
return srs_error_wrap(err, "stream caster listen");
|
||||
}
|
||||
|
||||
#ifdef SRS_RTC
|
||||
if (!reuse_rtc_over_server_) {
|
||||
// TODO: FIXME: Refine the listeners.
|
||||
close_listeners(SrsListenerTcp);
|
||||
if (_srs_config->get_rtc_server_tcp_enabled()) {
|
||||
SrsListener* listener = new SrsBufferListener(this, SrsListenerTcp);
|
||||
listeners.push_back(listener);
|
||||
|
||||
std::string ep = srs_int2str(_srs_config->get_rtc_server_tcp_listen());
|
||||
|
||||
std::string ip;
|
||||
int port;
|
||||
srs_parse_endpoint(ep, ip, port);
|
||||
|
||||
if ((err = listener->listen(ip, port)) != srs_success) {
|
||||
return srs_error_wrap(err, "tcp listen %s:%d", ip.c_str(), port);
|
||||
// Create HTTP API listener.
|
||||
if (_srs_config->get_http_api_enabled()) {
|
||||
if (reuse_api_over_server_) {
|
||||
srs_trace("HTTP-API: Reuse listen to http server %s", _srs_config->get_http_stream_listen().c_str());
|
||||
} else {
|
||||
api_listener_->set_endpoint(_srs_config->get_http_api_listen())->set_label("HTTP-API");
|
||||
if ((err = api_listener_->listen()) != srs_success) {
|
||||
return srs_error_wrap(err, "http api listen");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create HTTPS API listener.
|
||||
if (_srs_config->get_https_api_enabled()) {
|
||||
if (reuse_api_over_server_) {
|
||||
srs_trace("HTTPS-API: Reuse listen to http server %s", _srs_config->get_http_stream_listen().c_str());
|
||||
} else {
|
||||
apis_listener_->set_endpoint(_srs_config->get_https_api_listen())->set_label("HTTPS-API");
|
||||
if ((err = apis_listener_->listen()) != srs_success) {
|
||||
return srs_error_wrap(err, "https api listen");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create HTTP server listener.
|
||||
if (_srs_config->get_http_stream_enabled()) {
|
||||
http_listener_->set_endpoint(_srs_config->get_http_stream_listen())->set_label("HTTP-Server");
|
||||
if ((err = http_listener_->listen()) != srs_success) {
|
||||
return srs_error_wrap(err, "http server listen");
|
||||
}
|
||||
}
|
||||
|
||||
// Create HTTP server listener.
|
||||
if (_srs_config->get_https_stream_enabled()) {
|
||||
https_listener_->set_endpoint(_srs_config->get_https_stream_listen())->set_label("HTTPS-Server");
|
||||
if ((err = https_listener_->listen()) != srs_success) {
|
||||
return srs_error_wrap(err, "https server listen");
|
||||
}
|
||||
}
|
||||
|
||||
// Start WebRTC over TCP listener.
|
||||
#ifdef SRS_RTC
|
||||
if (!reuse_rtc_over_server_ && _srs_config->get_rtc_server_tcp_enabled()) {
|
||||
webrtc_listener_->set_endpoint(srs_int2str(_srs_config->get_rtc_server_tcp_listen()))->set_label("WebRTC");
|
||||
if ((err = webrtc_listener_->listen()) != srs_success) {
|
||||
return srs_error_wrap(err, "webrtc tcp listen");
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// Start all listeners for stream caster.
|
||||
std::vector<SrsConfDirective*> confs = _srs_config->get_stream_casters();
|
||||
for (vector<SrsConfDirective*>::iterator it = confs.begin(); it != confs.end(); ++it) {
|
||||
SrsConfDirective* conf = *it;
|
||||
if (!_srs_config->get_stream_caster_enabled(conf)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ISrsListener* listener = NULL;
|
||||
std::string caster = _srs_config->get_stream_caster_engine(conf);
|
||||
if (srs_stream_caster_is_udp(caster)) {
|
||||
listener = stream_caster_mpegts_;
|
||||
if ((err = stream_caster_mpegts_->initialize(conf)) != srs_success) {
|
||||
return srs_error_wrap(err, "initialize");
|
||||
}
|
||||
} else if (srs_stream_caster_is_flv(caster)) {
|
||||
listener = stream_caster_flv_listener_;
|
||||
if ((err = stream_caster_flv_listener_->initialize(conf)) != srs_success) {
|
||||
return srs_error_wrap(err, "initialize");
|
||||
}
|
||||
} else {
|
||||
return srs_error_new(ERROR_STREAM_CASTER_ENGINE, "invalid caster %s", caster.c_str());
|
||||
}
|
||||
|
||||
srs_assert(listener);
|
||||
if ((err = listener->listen()) != srs_success) {
|
||||
return srs_error_wrap(err, "listen");
|
||||
}
|
||||
}
|
||||
|
||||
if ((err = conn_manager->start()) != srs_success) {
|
||||
return srs_error_wrap(err, "connection manager");
|
||||
|
@ -1011,10 +860,6 @@ void SrsServer::on_signal(int signo)
|
|||
if (signo == SRS_SIGNAL_REOPEN_LOG) {
|
||||
_srs_log->reopen();
|
||||
|
||||
if (handler) {
|
||||
handler->on_logrotate();
|
||||
}
|
||||
|
||||
srs_warn("reopen log file, signo=%d", signo);
|
||||
return;
|
||||
}
|
||||
|
@ -1071,10 +916,6 @@ srs_error_t SrsServer::do_cycle()
|
|||
if ((err = trd_->pull()) != srs_success) {
|
||||
return srs_error_wrap(err, "pull");
|
||||
}
|
||||
|
||||
if (handler && (err = handler->on_cycle()) != srs_success) {
|
||||
return srs_error_wrap(err, "handle callback");
|
||||
}
|
||||
|
||||
// asprocess check.
|
||||
if (asprocess && ::getppid() != ppid) {
|
||||
|
@ -1191,202 +1032,6 @@ srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
|
|||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::listen_rtmp()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// stream service port.
|
||||
std::vector<std::string> ip_ports = _srs_config->get_listens();
|
||||
srs_assert((int)ip_ports.size() > 0);
|
||||
|
||||
close_listeners(SrsListenerRtmpStream);
|
||||
|
||||
for (int i = 0; i < (int)ip_ports.size(); i++) {
|
||||
SrsListener* listener = new SrsBufferListener(this, SrsListenerRtmpStream);
|
||||
listeners.push_back(listener);
|
||||
|
||||
int port; string ip;
|
||||
srs_parse_endpoint(ip_ports[i], ip, port);
|
||||
|
||||
if ((err = listener->listen(ip, port)) != srs_success) {
|
||||
srs_error_wrap(err, "rtmp listen %s:%d", ip.c_str(), port);
|
||||
}
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::listen_http_api()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
close_listeners(SrsListenerHttpApi);
|
||||
|
||||
// Ignore if not enabled.
|
||||
if (!_srs_config->get_http_api_enabled()) {
|
||||
return err;
|
||||
}
|
||||
|
||||
// Ignore if reuse same port to http server.
|
||||
if (reuse_api_over_server_) {
|
||||
srs_trace("HTTP-API: Reuse listen to http server %s", _srs_config->get_http_stream_listen().c_str());
|
||||
return err;
|
||||
}
|
||||
|
||||
// Listen at a dedicated HTTP API endpoint.
|
||||
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpApi);
|
||||
listeners.push_back(listener);
|
||||
|
||||
std::string ep = _srs_config->get_http_api_listen();
|
||||
|
||||
std::string ip;
|
||||
int port;
|
||||
srs_parse_endpoint(ep, ip, port);
|
||||
|
||||
if ((err = listener->listen(ip, port)) != srs_success) {
|
||||
return srs_error_wrap(err, "http api listen %s:%d", ip.c_str(), port);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::listen_https_api()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
close_listeners(SrsListenerHttpsApi);
|
||||
|
||||
// Ignore if not enabled.
|
||||
if (!_srs_config->get_https_api_enabled()) {
|
||||
return err;
|
||||
}
|
||||
|
||||
// Ignore if reuse same port to https server.
|
||||
if (reuse_api_over_server_) {
|
||||
srs_trace("HTTPS-API: Reuse listen to https server %s", _srs_config->get_https_stream_listen().c_str());
|
||||
return err;
|
||||
}
|
||||
|
||||
// Listen at a dedicated HTTPS API endpoint.
|
||||
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpsApi);
|
||||
listeners.push_back(listener);
|
||||
|
||||
std::string ep = _srs_config->get_https_api_listen();
|
||||
|
||||
std::string ip;
|
||||
int port;
|
||||
srs_parse_endpoint(ep, ip, port);
|
||||
|
||||
if ((err = listener->listen(ip, port)) != srs_success) {
|
||||
return srs_error_wrap(err, "https api listen %s:%d", ip.c_str(), port);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::listen_http_stream()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
close_listeners(SrsListenerHttpStream);
|
||||
if (_srs_config->get_http_stream_enabled()) {
|
||||
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpStream);
|
||||
listeners.push_back(listener);
|
||||
|
||||
std::string ep = _srs_config->get_http_stream_listen();
|
||||
|
||||
std::string ip;
|
||||
int port;
|
||||
srs_parse_endpoint(ep, ip, port);
|
||||
|
||||
if ((err = listener->listen(ip, port)) != srs_success) {
|
||||
return srs_error_wrap(err, "http stream listen %s:%d", ip.c_str(), port);
|
||||
}
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::listen_https_stream()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
close_listeners(SrsListenerHttpsStream);
|
||||
if (_srs_config->get_https_stream_enabled()) {
|
||||
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpsStream);
|
||||
listeners.push_back(listener);
|
||||
|
||||
std::string ep = _srs_config->get_https_stream_listen();
|
||||
|
||||
std::string ip;
|
||||
int port;
|
||||
srs_parse_endpoint(ep, ip, port);
|
||||
|
||||
if ((err = listener->listen(ip, port)) != srs_success) {
|
||||
return srs_error_wrap(err, "https stream listen %s:%d", ip.c_str(), port);
|
||||
}
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::listen_stream_caster()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
close_listeners(SrsListenerMpegTsOverUdp);
|
||||
|
||||
std::vector<SrsConfDirective*>::iterator it;
|
||||
std::vector<SrsConfDirective*> stream_casters = _srs_config->get_stream_casters();
|
||||
|
||||
for (it = stream_casters.begin(); it != stream_casters.end(); ++it) {
|
||||
SrsConfDirective* stream_caster = *it;
|
||||
if (!_srs_config->get_stream_caster_enabled(stream_caster)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SrsListener* listener = NULL;
|
||||
|
||||
std::string caster = _srs_config->get_stream_caster_engine(stream_caster);
|
||||
if (srs_stream_caster_is_udp(caster)) {
|
||||
listener = new SrsUdpCasterListener(this, SrsListenerMpegTsOverUdp, stream_caster);
|
||||
} else if (srs_stream_caster_is_flv(caster)) {
|
||||
listener = new SrsHttpFlvListener(this, SrsListenerFlv, stream_caster);
|
||||
} else {
|
||||
return srs_error_new(ERROR_STREAM_CASTER_ENGINE, "invalid caster %s", caster.c_str());
|
||||
}
|
||||
srs_assert(listener != NULL);
|
||||
|
||||
listeners.push_back(listener);
|
||||
int port = _srs_config->get_stream_caster_listen(stream_caster);
|
||||
if (port <= 0) {
|
||||
return srs_error_new(ERROR_STREAM_CASTER_PORT, "invalid port=%d", port);
|
||||
}
|
||||
// TODO: support listen at <[ip:]port>
|
||||
if ((err = listener->listen(srs_any_address_for_listener(), port)) != srs_success) {
|
||||
return srs_error_wrap(err, "listen at %d", port);
|
||||
}
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
void SrsServer::close_listeners(SrsListenerType type)
|
||||
{
|
||||
std::vector<SrsListener*>::iterator it;
|
||||
for (it = listeners.begin(); it != listeners.end();) {
|
||||
SrsListener* listener = *it;
|
||||
|
||||
if (listener->listen_type() != type) {
|
||||
++it;
|
||||
continue;
|
||||
}
|
||||
|
||||
srs_freep(listener);
|
||||
it = listeners.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
void SrsServer::resample_kbps()
|
||||
{
|
||||
SrsStatistic* stat = SrsStatistic::instance();
|
||||
|
@ -1423,94 +1068,57 @@ void SrsServer::resample_kbps()
|
|||
stat->kbps_sample();
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
ISrsResource* resource = NULL;
|
||||
|
||||
if ((err = fd_to_resource(type, stfd, &resource)) != srs_success) {
|
||||
srs_close_stfd(stfd);
|
||||
if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) {
|
||||
srs_error_reset(err);
|
||||
return srs_success;
|
||||
}
|
||||
return srs_error_wrap(err, "fd to resource");
|
||||
}
|
||||
|
||||
// Ignore if no resource found.
|
||||
if (!resource) {
|
||||
return err;
|
||||
}
|
||||
|
||||
// directly enqueue, the cycle thread will remove the client.
|
||||
conn_manager->add(resource);
|
||||
|
||||
ISrsStartable* conn = dynamic_cast<ISrsStartable*>(resource);
|
||||
if ((err = conn->start()) != srs_success) {
|
||||
return srs_error_wrap(err, "start conn coroutine");
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
ISrsHttpServeMux* SrsServer::api_server()
|
||||
{
|
||||
return http_api_mux;
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, ISrsResource** pr)
|
||||
srs_error_t SrsServer::on_tcp_client(ISrsListener* listener, srs_netfd_t stfd)
|
||||
{
|
||||
srs_error_t err = do_on_tcp_client(listener, stfd);
|
||||
|
||||
// We always try to close the stfd, because it should be NULL if it has been handled or closed.
|
||||
srs_close_stfd(stfd);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stfd)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
|
||||
int fd = srs_netfd_fileno(stfd);
|
||||
string ip = srs_get_peer_ip(fd);
|
||||
int port = srs_get_peer_port(fd);
|
||||
|
||||
// for some keep alive application, for example, the keepalived,
|
||||
// will send some tcp packet which we cann't got the ip,
|
||||
// we just ignore it.
|
||||
|
||||
// Ignore if ip is empty, for example, load balancer keepalive.
|
||||
if (ip.empty()) {
|
||||
if (_srs_config->empty_ip_ok()) return err;
|
||||
return srs_error_new(ERROR_SOCKET_GET_PEER_IP, "ignore empty ip, fd=%d", fd);
|
||||
}
|
||||
|
||||
// check connection limitation.
|
||||
int max_connections = _srs_config->get_max_connections();
|
||||
if (handler && (err = handler->on_accept_client(max_connections, (int)conn_manager->size())) != srs_success) {
|
||||
return srs_error_wrap(err, "drop client fd=%d, ip=%s:%d, max=%d, cur=%d for err: %s",
|
||||
fd, ip.c_str(), port, max_connections, (int)conn_manager->size(), srs_error_desc(err).c_str());
|
||||
}
|
||||
if ((int)conn_manager->size() >= max_connections) {
|
||||
return srs_error_new(ERROR_EXCEED_CONNECTIONS, "drop fd=%d, ip=%s:%d, max=%d, cur=%d for exceed connection limits",
|
||||
fd, ip.c_str(), port, max_connections, (int)conn_manager->size());
|
||||
}
|
||||
|
||||
// avoid fd leak when fork.
|
||||
// @see https://github.com/ossrs/srs/issues/518
|
||||
if (true) {
|
||||
int val;
|
||||
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
|
||||
return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fnctl F_GETFD error! fd=%d", fd);
|
||||
}
|
||||
val |= FD_CLOEXEC;
|
||||
if (fcntl(fd, F_SETFD, val) < 0) {
|
||||
return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "fcntl F_SETFD error! fd=%d", fd);
|
||||
}
|
||||
|
||||
// Security or system flow control check.
|
||||
if ((err = on_before_connection(stfd, ip, port)) != srs_success) {
|
||||
return srs_error_wrap(err, "check");
|
||||
}
|
||||
|
||||
// We will free the stfd from now on.
|
||||
srs_netfd_t fd2 = stfd;
|
||||
stfd = NULL;
|
||||
// Covert handler to resource.
|
||||
ISrsResource* resource = NULL;
|
||||
|
||||
// The context id may change during creating the bellow objects.
|
||||
SrsContextRestore(_srs_context->get_id());
|
||||
|
||||
// From now on, we always handle the stfd, so we set the original one to NULL.
|
||||
srs_netfd_t stfd2 = stfd;
|
||||
stfd = NULL;
|
||||
|
||||
#ifdef SRS_RTC
|
||||
// If reuse HTTP server with WebRTC TCP, peek to detect the client.
|
||||
if (reuse_rtc_over_server_ && (type == SrsListenerHttpStream || type == SrsListenerHttpsStream)) {
|
||||
SrsTcpConnection* skt = new SrsTcpConnection(fd2);
|
||||
if (reuse_rtc_over_server_ && (listener == http_listener_ || listener == https_listener_)) {
|
||||
SrsTcpConnection* skt = new SrsTcpConnection(stfd2);
|
||||
SrsBufferedReadWriter* io = new SrsBufferedReadWriter(skt);
|
||||
|
||||
// Peek first N bytes to finger out the real client type.
|
||||
uint8_t b[10]; int nn = sizeof(b);
|
||||
if ((err = io->peek((char*)b, &nn)) != srs_success) {
|
||||
srs_freep(io); srs_freep(skt);
|
||||
|
@ -1527,30 +1135,71 @@ srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, I
|
|||
if (nn == 10 && b[0] == 0 && b[2] == 0 && b[3] == 1 && b[1] - b[5] == 20
|
||||
&& b[6] == 0x21 && b[7] == 0x12 && b[8] == 0xa4 && b[9] == 0x42
|
||||
) {
|
||||
*pr = new SrsRtcTcpConn(io, ip, port, this);
|
||||
resource = new SrsRtcTcpConn(io, ip, port, this);
|
||||
} else {
|
||||
*pr = new SrsHttpxConn(type == SrsListenerHttpsStream, this, io, http_server, ip, port);
|
||||
resource = new SrsHttpxConn(listener == http_listener_, this, io, http_server, ip, port);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (type == SrsListenerRtmpStream) {
|
||||
*pr = new SrsRtmpConn(this, fd2, ip, port);
|
||||
} else if (type == SrsListenerHttpApi || type == SrsListenerHttpsApi) {
|
||||
*pr = new SrsHttpxConn(type == SrsListenerHttpsApi, this, new SrsTcpConnection(fd2), http_api_mux, ip, port);
|
||||
} else if (type == SrsListenerHttpStream || type == SrsListenerHttpsStream) {
|
||||
*pr = new SrsHttpxConn(type == SrsListenerHttpsStream, this, new SrsTcpConnection(fd2), http_server, ip, port);
|
||||
#ifdef SRS_RTC
|
||||
} else if (type == SrsListenerTcp) {
|
||||
*pr = new SrsRtcTcpConn(new SrsTcpConnection(fd2), ip, port, this);
|
||||
#endif
|
||||
} else {
|
||||
srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
|
||||
srs_close_stfd(fd2);
|
||||
return err;
|
||||
|
||||
// Create resource by normal listeners.
|
||||
if (!resource) {
|
||||
if (listener == rtmp_listener_) {
|
||||
resource = new SrsRtmpConn(this, stfd2, ip, port);
|
||||
} else if (listener == api_listener_ || listener == apis_listener_) {
|
||||
bool is_https = listener == apis_listener_;
|
||||
resource = new SrsHttpxConn(is_https, this, new SrsTcpConnection(stfd2), http_api_mux, ip, port);
|
||||
} else if (listener == http_listener_ || listener == https_listener_) {
|
||||
bool is_https = listener == https_listener_;
|
||||
resource = new SrsHttpxConn(is_https, this, new SrsTcpConnection(stfd2), http_server, ip, port);
|
||||
} else if (listener == webrtc_listener_) {
|
||||
resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port, this);
|
||||
} else {
|
||||
srs_close_stfd(stfd2);
|
||||
srs_warn("Close for invalid fd=%d, ip=%s:%d", fd, ip.c_str(), port);
|
||||
return err;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Use connection manager to manage all the resources.
|
||||
conn_manager->add(resource);
|
||||
|
||||
// If connection is a resource to start, start a coroutine to handle it.
|
||||
ISrsStartable* conn = dynamic_cast<ISrsStartable*>(resource);
|
||||
if ((err = conn->start()) != srs_success) {
|
||||
return srs_error_wrap(err, "start conn coroutine");
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsServer::on_before_connection(srs_netfd_t& stfd, const std::string& ip, int port)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
int fd = srs_netfd_fileno(stfd);
|
||||
|
||||
// Failed if exceed the connection limitation.
|
||||
int max_connections = _srs_config->get_max_connections();
|
||||
|
||||
if ((int)conn_manager->size() >= max_connections) {
|
||||
return srs_error_new(ERROR_EXCEED_CONNECTIONS, "drop fd=%d, ip=%s:%d, max=%d, cur=%d for exceed connection limits",
|
||||
fd, ip.c_str(), port, max_connections, (int)conn_manager->size());
|
||||
}
|
||||
|
||||
// Set to close the fd when forking, to avoid fd leak when start a process.
|
||||
// See https://github.com/ossrs/srs/issues/518
|
||||
if (true) {
|
||||
int val;
|
||||
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
|
||||
return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fnctl F_GETFD error! fd=%d", fd);
|
||||
}
|
||||
val |= FD_CLOEXEC;
|
||||
if (fcntl(fd, F_SETFD, val) < 0) {
|
||||
return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "fcntl F_SETFD error! fd=%d", fd);
|
||||
}
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
|
@ -1616,7 +1265,7 @@ srs_error_t SrsServerAdapter::run(SrsWaitGroup* wg)
|
|||
srs_error_t err = srs_success;
|
||||
|
||||
// Initialize the whole system, set hooks to handle server level events.
|
||||
if ((err = srs->initialize(NULL)) != srs_success) {
|
||||
if ((err = srs->initialize()) != srs_success) {
|
||||
return srs_error_wrap(err, "server initialize");
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue