From 7cfd2879b0cbc5fe0c480197e35851ab82719be7 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 6 Apr 2020 00:24:14 +0800 Subject: [PATCH] For #307, config sendmmsg max --- trunk/conf/full.conf | 4 ++ trunk/src/app/srs_app_config.cpp | 21 ++++++++ trunk/src/app/srs_app_config.hpp | 1 + trunk/src/app/srs_app_http_api.cpp | 31 ++++++++++-- trunk/src/app/srs_app_rtc_conn.cpp | 31 ++++++++---- trunk/src/app/srs_app_statistic.cpp | 74 +++++++++++++++++++++++++++- trunk/src/app/srs_app_statistic.hpp | 10 +++- trunk/src/service/srs_service_st.cpp | 8 +++ 8 files changed, 165 insertions(+), 15 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 454df82ab..489939a52 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -403,6 +403,10 @@ rtc_server { # If not, use RSA certificate. # default: on ecdsa on; + # The max UDP messages send by sendmmsg. + # @remark No effect if OS does not support sendmmsg, like OSX. + # default: 256 + sendmmsg 256; } vhost rtc.vhost.srs.com { diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 47bfe7d85..010cbcaaa 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4656,6 +4656,27 @@ bool SrsConfig::get_rtc_server_ecdsa() return SRS_CONF_PERFER_TRUE(conf->arg0()); } +int SrsConfig::get_rtc_server_sendmmsg() +{ +#if !defined(SRS_AUTO_HAS_SENDMMSG) || !defined(SRS_AUTO_SENDMMSG) + static int DEFAULT = 1; +#else + static int DEFAULT = 256; +#endif + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("sendmmsg"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + SrsConfDirective* SrsConfig::get_rtc(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index ad6670de6..ebf34ff3f 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -521,6 +521,7 @@ public: virtual int get_rtc_server_listen(); virtual std::string get_rtc_server_candidates(); virtual bool get_rtc_server_ecdsa(); + virtual int get_rtc_server_sendmmsg(); SrsConfDirective* get_rtc(std::string vhost); bool get_rtc_enabled(std::string vhost); diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index ea070c9eb..a8166f977 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1586,9 +1586,34 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* obj->set("data", data); SrsStatistic* stat = SrsStatistic::instance(); - if ((err = stat->dumps_perf_mw(data)) != srs_success) { - int code = srs_error_code(err); srs_error_reset(err); - return srs_api_response_code(w, r, code); + + string target = r->query_get("target"); + srs_trace("query target=%s", target.c_str()); + + if (true) { + SrsJsonObject* p = SrsJsonAny::object(); + data->set("query", p); + + p->set("target", SrsJsonAny::str(target.c_str())); + p->set("help", SrsJsonAny::str("?target=writev|sendmmsg")); + } + + if (target.empty() || target == "writev") { + SrsJsonObject* p = SrsJsonAny::object(); + data->set("writev", p); + if ((err = stat->dumps_perf_writev(p)) != srs_success) { + int code = srs_error_code(err); srs_error_reset(err); + return srs_api_response_code(w, r, code); + } + } + + if (target.empty() || target == "sendmmsg") { + SrsJsonObject* p = SrsJsonAny::object(); + data->set("sendmmsg", p); + if ((err = stat->dumps_perf_sendmmsg(p)) != srs_success) { + int code = srs_error_code(err); srs_error_reset(err); + return srs_api_response_code(w, r, code); + } } return srs_api_response(w, r, obj->dumps()); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index f21fd247b..6ab3ed171 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -52,6 +52,7 @@ using namespace std; #include #include #include +#include static bool is_stun(const uint8_t* data, const int size) { @@ -1342,6 +1343,11 @@ srs_error_t SrsRtcServer::cycle() // TODO: FIXME: Use pithy print. uint32_t cnt = 1; + SrsStatistic* stat = SrsStatistic::instance(); + + // TODO: FIXME: Support reload. + int max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); + while (true) { if ((err = trd->pull()) != srs_success) { return err; @@ -1356,19 +1362,26 @@ srs_error_t SrsRtcServer::cycle() vector mhdrs = mmhdrs; mmhdrs.clear(); - // TODO: FIXME: Use pithy print. - if ((cnt++ % 1000) == 0) { - srs_trace("SEND %d msgs by sendmmsg", mhdrs.size()); - } + mmsghdr* p = &mhdrs[0]; + for (mmsghdr* end = p + mhdrs.size(); p < end; p += max_sendmmsg) { + int vlen = (int)(end - p); + vlen = srs_min(max_sendmmsg, vlen); - if (!mhdrs.empty()) { - mmsghdr* msgvec = &mhdrs[0]; - unsigned int vlen = (unsigned int)mhdrs.size(); - int r0 = srs_sendmmsg(mmstfd, msgvec, vlen, 0, SRS_UTIME_NO_TIMEOUT); - if (r0 != (int)vlen) { + int r0 = srs_sendmmsg(mmstfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT); + if (r0 != vlen) { srs_warn("sendmsg %d msgs, %d done", vlen, r0); } + + stat->perf_mw_on_packets(vlen); } + + // TODO: FIXME: Use pithy print. + if ((cnt++ % 100) == 0) { + // TODO: FIXME: Support reload. + max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); + srs_trace("-> RTC SEND %d msgs, by sendmmsg %d", mhdrs.size(), max_sendmmsg); + } + for (int i = 0; i < (int)mhdrs.size(); i++) { msghdr* hdr = &mhdrs[i].msg_hdr; for (int i = 0; i < (int)hdr->msg_iovlen; i++) { diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index f5bee3b51..be3874ff7 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -266,6 +266,7 @@ SrsStatistic::SrsStatistic() perf_iovs = new SrsStatisticCategory(); perf_msgs = new SrsStatisticCategory(); perf_sys = new SrsStatisticCategory(); + perf_sendmmsg = new SrsStatisticCategory(); } SrsStatistic::~SrsStatistic() @@ -303,6 +304,7 @@ SrsStatistic::~SrsStatistic() srs_freep(perf_iovs); srs_freep(perf_msgs); srs_freep(perf_sys); + srs_freep(perf_sendmmsg); } SrsStatistic* SrsStatistic::instance() @@ -664,7 +666,7 @@ void SrsStatistic::perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs) perf_sys->b++; } -srs_error_t SrsStatistic::dumps_perf_mw(SrsJsonObject* obj) +srs_error_t SrsStatistic::dumps_perf_writev(SrsJsonObject* obj) { srs_error_t err = srs_success; @@ -736,6 +738,76 @@ srs_error_t SrsStatistic::dumps_perf_mw(SrsJsonObject* obj) return err; } +void SrsStatistic::perf_mw_on_packets(int nb_msgs) +{ + // For perf msgs, the nb_msgs stat. + // a: =1 + // b: <10 + // c: <100 + // d: <200 + // e: <300 + // f: <400 + // g: <500 + // h: <600 + // i: <1000 + // j: >=1000 + if (nb_msgs == 1) { + perf_sendmmsg->a++; + } else if (nb_msgs < 10) { + perf_sendmmsg->b++; + } else if (nb_msgs < 100) { + perf_sendmmsg->c++; + } else if (nb_msgs < 200) { + perf_sendmmsg->d++; + } else if (nb_msgs < 300) { + perf_sendmmsg->e++; + } else if (nb_msgs < 400) { + perf_sendmmsg->f++; + } else if (nb_msgs < 500) { + perf_sendmmsg->g++; + } else if (nb_msgs < 600) { + perf_sendmmsg->h++; + } else if (nb_msgs < 1000) { + perf_sendmmsg->i++; + } else { + perf_sendmmsg->j++; + } +} + +srs_error_t SrsStatistic::dumps_perf_sendmmsg(SrsJsonObject* obj) +{ + srs_error_t err = srs_success; + + if (true) { + SrsJsonObject* p = SrsJsonAny::object(); + obj->set("msgs", p); + + // For perf msgs, the nb_msgs stat. + // a: =1 + // b: <10 + // c: <100 + // d: <200 + // e: <300 + // f: <400 + // g: <500 + // h: <600 + // i: <1000 + // j: >=1000 + p->set("lt_2", SrsJsonAny::integer(perf_sendmmsg->a)); + p->set("lt_10", SrsJsonAny::integer(perf_sendmmsg->b)); + p->set("lt_100", SrsJsonAny::integer(perf_sendmmsg->c)); + p->set("lt_200", SrsJsonAny::integer(perf_sendmmsg->d)); + p->set("lt_300", SrsJsonAny::integer(perf_sendmmsg->e)); + p->set("lt_400", SrsJsonAny::integer(perf_sendmmsg->f)); + p->set("lt_500", SrsJsonAny::integer(perf_sendmmsg->g)); + p->set("lt_600", SrsJsonAny::integer(perf_sendmmsg->h)); + p->set("lt_1000", SrsJsonAny::integer(perf_sendmmsg->i)); + p->set("gt_1000", SrsJsonAny::integer(perf_sendmmsg->j)); + } + + return err; +} + SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req) { SrsStatisticVhost* vhost = NULL; diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index ee905673a..339e95b26 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -169,6 +169,7 @@ private: SrsStatisticCategory* perf_iovs; SrsStatisticCategory* perf_msgs; SrsStatisticCategory* perf_sys; + SrsStatisticCategory* perf_sendmmsg; private: SrsStatistic(); virtual ~SrsStatistic(); @@ -233,8 +234,13 @@ public: // Stat for packets merged written, nb_pkts is the number of or chunk packets, // bytes_pkts is the total bytes of or chunk packets, nb_iovs is the total number of iovec. virtual void perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs); - // Dumps the perf statistic data, for performance analysis. - virtual srs_error_t dumps_perf_mw(SrsJsonObject* obj); + // Dumps the perf statistic data for TCP writev, for performance analysis. + virtual srs_error_t dumps_perf_writev(SrsJsonObject* obj); +public: + // Stat for packets UDP sendmmsg, nb_msgs is the vlen for sendmmsg. + virtual void perf_mw_on_packets(int nb_msgs); + // Dumps the perf statistic data for UDP sendmmsg, for performance analysis. + virtual srs_error_t dumps_perf_sendmmsg(SrsJsonObject* obj); private: virtual SrsStatisticVhost* create_vhost(SrsRequest* req); virtual SrsStatisticStream* create_stream(SrsStatisticVhost* vhost, SrsRequest* req); diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index f79220654..18c67de7e 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -428,6 +428,14 @@ int srs_sendmmsg(srs_netfd_t stfd, struct mmsghdr *msgvec, unsigned int vlen, in // further sendmmsg() call to send the remaining messages. return vlen; #else + if (vlen == 1) { + int r0 = srs_sendmsg(stfd, &msgvec->msg_hdr, flags, timeout); + if (r0 < 0) { + return r0; + } + msgvec->msg_len = r0; + return 1; + } return st_sendmmsg((st_netfd_t)stfd, msgvec, vlen, flags, (st_utime_t)timeout); #endif }