From 78aad11eeb590a53e77631bad900ea08d53b6f4e Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 12 Apr 2020 08:55:43 +0800 Subject: [PATCH] For #307, enable REUSEPORT to increase UDP buffer --- trunk/conf/full.conf | 5 +++++ trunk/src/app/srs_app_config.cpp | 29 ++++++++++++++++++++++++++++- trunk/src/app/srs_app_config.hpp | 1 + trunk/src/app/srs_app_listener.cpp | 2 ++ trunk/src/app/srs_app_rtc_conn.cpp | 25 +++++++++++++++++-------- trunk/src/app/srs_app_rtc_conn.hpp | 2 +- 6 files changed, 54 insertions(+), 10 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 513a55b87..ff17617d9 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -430,6 +430,11 @@ rtc_server { # @remark Should always turn it on, or Chrome will fail. # default: on encrypt on; + # We listen multiple times at the same port, by REUSEPORT, to increase the UDP queue. + # Note that you can set to 1 and increase the system UDP buffer size by net.core.rmem_max + # or just increase this to get larger UDP recv and send buffer. + # default: 4 + reuseport 4; } vhost rtc.vhost.srs.com { diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 30e987ec7..d3e790426 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3614,7 +3614,7 @@ srs_error_t SrsConfig::check_normal_config() for (int i = 0; conf && i < (int)conf->directives.size(); i++) { string n = conf->at(i)->name; if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa" - && n != "sendmmsg" && n != "encrypt") { + && n != "sendmmsg" && n != "encrypt" && n != "reuseport") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str()); } } @@ -4745,6 +4745,33 @@ int SrsConfig::get_rtc_server_sendmmsg() #endif } +int SrsConfig::get_rtc_server_reuseport() +{ +#if defined(SO_REUSEPORT) + static int DEFAULT = 4; +#else + static int DEFAULT = 1; +#endif + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("reuseport"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + int reuseport = ::atoi(conf->arg0().c_str()); +#if !defined(SO_REUSEPORT) + srs_warn("REUSEPORT not supported, reset %d to %d", reuseport, DEFAULT); + reuseport = DEFAULT +#endif + + return reuseport; +} + SrsConfDirective* SrsConfig::get_rtc(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index c17e26033..ba46984e3 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -527,6 +527,7 @@ public: virtual bool get_rtc_server_ecdsa(); virtual int get_rtc_server_sendmmsg(); virtual bool get_rtc_server_encrypt(); + virtual int get_rtc_server_reuseport(); SrsConfDirective* get_rtc(std::string vhost); bool get_rtc_enabled(std::string vhost); diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 15c4ab773..eee324613 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -391,6 +391,7 @@ srs_error_t SrsUdpMuxListener::listen() void SrsUdpMuxListener::set_socket_buffer() { int default_sndbuf = 0; + // TODO: FIXME: Config it. int expect_sndbuf = 1024*1024*10; // 10M int actual_sndbuf = expect_sndbuf; int r0_sndbuf = 0; @@ -407,6 +408,7 @@ void SrsUdpMuxListener::set_socket_buffer() } int default_rcvbuf = 0; + // TODO: FIXME: Config it. int expect_rcvbuf = 1024*1024*10; // 10M int actual_rcvbuf = expect_rcvbuf; int r0_rcvbuf = 0; diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index d391cfe8f..3c96e69c4 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1332,7 +1332,6 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) SrsRtcServer::SrsRtcServer() { - listener = NULL; timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); mmstfd = NULL; @@ -1349,7 +1348,12 @@ SrsRtcServer::~SrsRtcServer() { _srs_config->unsubscribe(this); - srs_freep(listener); + vector::iterator it; + for (it = listeners.begin(); it != listeners.end(); ++it) { + SrsUdpMuxListener* listener = *it; + srs_freep(listener); + } + srs_freep(timer); srs_freep(trd); @@ -1400,16 +1404,21 @@ srs_error_t SrsRtcServer::listen_udp() } string ip = srs_any_address_for_listener(); + srs_assert(listeners.empty()); - srs_freep(listener); - listener = new SrsUdpMuxListener(this, ip, port); + int nn_listeners = _srs_config->get_rtc_server_reuseport(); + for (int i = 0; i < nn_listeners; i++) { + SrsUdpMuxListener* listener = new SrsUdpMuxListener(this, ip, port); - if ((err = listener->listen()) != srs_success) { - return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); + if ((err = listener->listen()) != srs_success) { + srs_freep(listener); + return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); + } + + srs_trace("rtc listen at udp://%s:%d, fd=%d", ip.c_str(), port, listener->fd()); + listeners.push_back(listener); } - srs_trace("rtc listen at udp://%s:%d, fd=%d", ip.c_str(), port, listener->fd()); - return err; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 16cba0d9f..e63f2109b 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -233,7 +233,7 @@ class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGl virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler { private: - SrsUdpMuxListener* listener; + std::vector listeners; SrsHourGlass* timer; private: SrsCoroutine* trd;