From 14e3ec2fe82c5ee7e6c3c114b73f166ea0b53175 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 17 Apr 2020 12:30:53 +0800 Subject: [PATCH] For #307, drop frame when VBR too high --- trunk/conf/full.conf | 7 +++++++ trunk/scripts/perf_gso.py | 4 ++++ trunk/src/app/srs_app_config.cpp | 19 +++++++++++++++++- trunk/src/app/srs_app_config.hpp | 1 + trunk/src/app/srs_app_http_api.cpp | 11 ++++++++++- trunk/src/app/srs_app_listener.hpp | 2 ++ trunk/src/app/srs_app_rtc_conn.cpp | 30 ++++++++++++++++++++++++----- trunk/src/app/srs_app_rtc_conn.hpp | 7 ++++++- trunk/src/app/srs_app_statistic.cpp | 27 ++++++++++++++++++++++++++ trunk/src/app/srs_app_statistic.hpp | 5 +++++ 10 files changed, 105 insertions(+), 8 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 643c364c8..98a8792dc 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -451,6 +451,13 @@ rtc_server { # Whether enable the perf stat at http://localhost:1985/api/v1/perf # default: on perf_stat on; + # The queue length, in number of mmsghdr, in messages. + # For example, 30 means we will cache 30K messages at most. + # If exceed, we will drop messages. + # @remark Each reuseport use a dedicated queue, if queue is 2000, reuseport is 4, + # then system queue is 2000*4 = 8k, user can incrase reuseport to incrase the queue. + # default: 2000 + queue_length 2000; } vhost rtc.vhost.srs.com { diff --git a/trunk/scripts/perf_gso.py b/trunk/scripts/perf_gso.py index f09014143..7180c7e4b 100755 --- a/trunk/scripts/perf_gso.py +++ b/trunk/scripts/perf_gso.py @@ -41,6 +41,10 @@ print "Repsonse %s"%(s) obj = json.loads(s) +print "" +p = obj['data']['dropped'] +print('RTC Frame Dropped: %s'%(100.0 * p['rtc_dropeed'] / p['rtc_frames'])) + # 2, 3, 5, 9, 16, 32, 64, 128, 256 keys = ['lt_2', 'lt_3', 'lt_5', 'lt_9', 'lt_16', 'lt_32', 'lt_64', 'lt_128', 'lt_256', 'gt_256'] print("\n----------- 1 2 [3,4] [5,8] [9,15] [16,31] [32,63] [64,127] [128,255] [256,+) Packets"), diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index d2fae574d..eef0a325c 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3619,7 +3619,7 @@ srs_error_t SrsConfig::check_normal_config() string n = conf->at(i)->name; if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa" && n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus" - && n != "padding" && n != "perf_stat") { + && n != "padding" && n != "perf_stat" && n != "queue_length") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str()); } } @@ -4875,6 +4875,23 @@ bool SrsConfig::get_rtc_server_perf_stat() return SRS_CONF_PERFER_TRUE(conf->arg0()); } +int SrsConfig::get_rtc_server_queue_length() +{ + static int DEFAULT = 2000; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("queue_length"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + SrsConfDirective* SrsConfig::get_rtc(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 0d2d0c4cc..bc9417d76 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -538,6 +538,7 @@ private: public: virtual int get_rtc_server_padding(); virtual bool get_rtc_server_perf_stat(); + virtual int get_rtc_server_queue_length(); public: SrsConfDirective* get_rtc(std::string vhost); diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index e994b9982..fa0a7d9f5 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1625,7 +1625,7 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* p->set("target", SrsJsonAny::str(target.c_str())); p->set("reset", SrsJsonAny::str(reset.c_str())); - p->set("help", SrsJsonAny::str("?target=avframes|rtc|rtp|gso|writev_iovs|sendmmsg|bytes")); + p->set("help", SrsJsonAny::str("?target=avframes|rtc|rtp|gso|writev_iovs|sendmmsg|bytes|dropped")); p->set("help2", SrsJsonAny::str("?reset=all")); } @@ -1697,6 +1697,15 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* } } + if (target.empty() || target == "dropped") { + SrsJsonObject* p = SrsJsonAny::object(); + data->set("dropped", p); + if ((err = stat->dumps_perf_dropped(p)) != srs_success) { + int code = srs_error_code(err); srs_error_reset(err); + return srs_api_response_code(w, r, code); + } + } + return srs_api_response(w, r, obj->dumps()); } diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 8ef384dd9..d096e5910 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -141,6 +141,8 @@ public: virtual srs_error_t fetch(mmsghdr** pphdr) = 0; // Notify the sender to send out the msg. virtual srs_error_t sendmmsg(mmsghdr* hdr) = 0; + // Whether sender exceed the max queue, that is, overflow. + virtual bool overflow() = 0; }; class SrsUdpMuxSocket diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 239909c59..ad87e622e 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -473,6 +473,7 @@ SrsRtcPackets::SrsRtcPackets() nn_audios = nn_extras = 0; nn_videos = nn_samples = 0; nn_padding_bytes = nn_paddings = 0; + nn_dropped = 0; cursor = 0; } @@ -505,6 +506,7 @@ void SrsRtcPackets::reset(bool gso, bool merge_nalus) nn_audios = nn_extras = 0; nn_videos = nn_samples = 0; nn_padding_bytes = nn_paddings = 0; + nn_dropped = 0; cursor = 0; } @@ -744,6 +746,8 @@ srs_error_t SrsRtcSenderThread::cycle() stat->perf_on_gso_packets(pkts.nn_rtp_pkts); // Stat the bytes and paddings. stat->perf_on_rtc_bytes(pkts.nn_bytes, pkts.nn_padding_bytes); + // Stat the messages and dropped count. + stat->perf_on_dropped(msg_count, nn_rtc_packets, pkts.nn_dropped); } #if defined(SRS_DEBUG) srs_trace("RTC PLAY perf, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d/%d bytes", @@ -754,8 +758,8 @@ srs_error_t SrsRtcSenderThread::cycle() pprint->elapse(); if (pprint->can_print()) { // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. - srs_trace("-> RTC PLAY %d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d/%d bytes, %d pad, %d/%d cache", - msg_count, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, pkts.nn_samples, pkts.nn_bytes, + srs_trace("-> RTC PLAY %d/%d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d/%d bytes, %d pad, %d/%d cache", + msg_count, pkts.nn_dropped, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, pkts.nn_samples, pkts.nn_bytes, pkts.nn_padding_bytes, pkts.nn_paddings, pkts.size(), pkts.capacity()); } } @@ -766,6 +770,7 @@ srs_error_t SrsRtcSenderThread::send_messages( ) { srs_error_t err = srs_success; + // If DTLS is not OK, drop all messages. if (!rtc_session->dtls_session) { return err; } @@ -801,6 +806,12 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( for (int i = 0; i < nb_msgs; i++) { SrsSharedPtrMessage* msg = msgs[i]; + // If overflow, drop all messages. + if (sender->overflow()) { + packets.nn_dropped += nb_msgs - i; + return err; + } + // Update stats. packets.nn_bytes += msg->size; @@ -1802,6 +1813,8 @@ SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s) trd = new SrsDummyCoroutine(); cache_pos = 0; + max_sendmmsg = 0; + queue_length = 0; _srs_config->subscribe(this); } @@ -1820,7 +1833,7 @@ SrsUdpMuxSender::~SrsUdpMuxSender() cache.clear(); } -srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd) +srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd, int senders) { srs_error_t err = srs_success; @@ -1834,7 +1847,9 @@ srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd) max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); bool gso = _srs_config->get_rtc_server_gso(); - srs_trace("UDP sender #%d init ok, max_sendmmsg=%d, gso=%d", srs_netfd_fileno(fd), max_sendmmsg, gso); + queue_length = srs_max(128, _srs_config->get_rtc_server_queue_length()); + srs_trace("UDP sender #%d init ok, max_sendmmsg=%d, gso=%d, queue_max=%dx%d", srs_netfd_fileno(fd), + max_sendmmsg, gso, queue_length, senders); return err; } @@ -1890,6 +1905,11 @@ srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr) return srs_success; } +bool SrsUdpMuxSender::overflow() +{ + return cache_pos > queue_length; +} + srs_error_t SrsUdpMuxSender::sendmmsg(mmsghdr* hdr) { if (waiting_msgs) { @@ -2098,7 +2118,7 @@ srs_error_t SrsRtcServer::listen_udp() return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); } - if ((err = sender->initialize(listener->stfd())) != srs_success) { + if ((err = sender->initialize(listener->stfd(), nn_listeners)) != srs_success) { return srs_error_wrap(err, "init sender"); } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index a1fe62514..9f771028f 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -149,6 +149,8 @@ public: int nn_videos; // The number of padded packet. int nn_paddings; + // The number of dropped messages. + int nn_dropped; private: int cursor; std::vector packets; @@ -303,16 +305,19 @@ private: int cache_pos; // The max number of messages for sendmmsg. If 1, we use sendmsg to send. int max_sendmmsg; + // The total queue length, share with all senders. + int queue_length; public: SrsUdpMuxSender(SrsRtcServer* s); virtual ~SrsUdpMuxSender(); public: - virtual srs_error_t initialize(srs_netfd_t fd); + virtual srs_error_t initialize(srs_netfd_t fd, int senders); private: void free_mhdrs(std::vector& mhdrs); public: virtual srs_error_t fetch(mmsghdr** pphdr); virtual srs_error_t sendmmsg(mmsghdr* hdr); + virtual bool overflow(); virtual srs_error_t cycle(); // interface ISrsReloadHandler public: diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index 4bf58aa99..6116c9f50 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -272,6 +272,7 @@ SrsStatistic::SrsStatistic() perf_rtp = new SrsStatisticCategory(); perf_rtc = new SrsStatisticCategory(); perf_bytes = new SrsStatisticCategory(); + perf_dropped = new SrsStatisticCategory(); } SrsStatistic::~SrsStatistic() @@ -313,6 +314,7 @@ SrsStatistic::~SrsStatistic() srs_freep(perf_rtp); srs_freep(perf_rtc); srs_freep(perf_bytes); + srs_freep(perf_dropped); } SrsStatistic* SrsStatistic::instance() @@ -673,6 +675,29 @@ srs_error_t SrsStatistic::dumps_perf_bytes(SrsJsonObject* obj) return srs_success; } +void SrsStatistic::perf_on_dropped(int nn_msgs, int nn_rtc, int nn_dropped) +{ + // a: System AVFrames. + // b: RTC frames. + // c: Dropped frames. + perf_dropped->a += nn_msgs; + perf_dropped->b += nn_rtc; + perf_dropped->c += nn_dropped; + + perf_dropped->nn += nn_dropped; +} + +srs_error_t SrsStatistic::dumps_perf_dropped(SrsJsonObject* obj) +{ + obj->set("avframes", SrsJsonAny::integer(perf_dropped->a)); + obj->set("rtc_frames", SrsJsonAny::integer(perf_dropped->b)); + obj->set("rtc_dropeed", SrsJsonAny::integer(perf_dropped->c)); + + obj->set("nn", SrsJsonAny::integer(perf_dropped->nn)); + + return srs_success; +} + void SrsStatistic::reset_perf() { srs_freep(perf_iovs); @@ -682,6 +707,7 @@ void SrsStatistic::reset_perf() srs_freep(perf_rtp); srs_freep(perf_rtc); srs_freep(perf_bytes); + srs_freep(perf_dropped); perf_iovs = new SrsStatisticCategory(); perf_msgs = new SrsStatisticCategory(); @@ -690,6 +716,7 @@ void SrsStatistic::reset_perf() perf_rtp = new SrsStatisticCategory(); perf_rtc = new SrsStatisticCategory(); perf_bytes = new SrsStatisticCategory(); + perf_dropped = new SrsStatisticCategory(); } void SrsStatistic::perf_on_packets(SrsStatisticCategory* p, int nb_msgs) diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 50fbb8516..919322098 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -175,6 +175,7 @@ private: SrsStatisticCategory* perf_rtp; SrsStatisticCategory* perf_rtc; SrsStatisticCategory* perf_bytes; + SrsStatisticCategory* perf_dropped; private: SrsStatistic(); virtual ~SrsStatistic(); @@ -264,6 +265,10 @@ public: // Stat for bytes, nn_bytes is the size of bytes, nb_padding is padding bytes. virtual void perf_on_rtc_bytes(int nn_bytes, int nn_padding); virtual srs_error_t dumps_perf_bytes(SrsJsonObject* obj); +public: + // Stat for rtc messages, nn_rtc is rtc messages, nn_dropped is dropped messages. + virtual void perf_on_dropped(int nn_msgs, int nn_rtc, int nn_dropped); + virtual srs_error_t dumps_perf_dropped(SrsJsonObject* obj); public: // Reset all perf stat data. virtual void reset_perf();