1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #307, config sendmmsg max

This commit is contained in:
winlin 2020-04-06 00:24:14 +08:00
parent 40c95b04ca
commit 7cfd2879b0
8 changed files with 165 additions and 15 deletions

View file

@ -403,6 +403,10 @@ rtc_server {
# If not, use RSA certificate.
# default: on
ecdsa on;
# The max UDP messages send by sendmmsg.
# @remark No effect if OS does not support sendmmsg, like OSX.
# default: 256
sendmmsg 256;
}
vhost rtc.vhost.srs.com {

View file

@ -4656,6 +4656,27 @@ bool SrsConfig::get_rtc_server_ecdsa()
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
int SrsConfig::get_rtc_server_sendmmsg()
{
#if !defined(SRS_AUTO_HAS_SENDMMSG) || !defined(SRS_AUTO_SENDMMSG)
static int DEFAULT = 1;
#else
static int DEFAULT = 256;
#endif
SrsConfDirective* conf = root->get("rtc_server");
if (!conf) {
return DEFAULT;
}
conf = conf->get("sendmmsg");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
}
SrsConfDirective* SrsConfig::get_rtc(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);

View file

@ -521,6 +521,7 @@ public:
virtual int get_rtc_server_listen();
virtual std::string get_rtc_server_candidates();
virtual bool get_rtc_server_ecdsa();
virtual int get_rtc_server_sendmmsg();
SrsConfDirective* get_rtc(std::string vhost);
bool get_rtc_enabled(std::string vhost);

View file

@ -1586,10 +1586,35 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage*
obj->set("data", data);
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->dumps_perf_mw(data)) != srs_success) {
string target = r->query_get("target");
srs_trace("query target=%s", target.c_str());
if (true) {
SrsJsonObject* p = SrsJsonAny::object();
data->set("query", p);
p->set("target", SrsJsonAny::str(target.c_str()));
p->set("help", SrsJsonAny::str("?target=writev|sendmmsg"));
}
if (target.empty() || target == "writev") {
SrsJsonObject* p = SrsJsonAny::object();
data->set("writev", p);
if ((err = stat->dumps_perf_writev(p)) != srs_success) {
int code = srs_error_code(err); srs_error_reset(err);
return srs_api_response_code(w, r, code);
}
}
if (target.empty() || target == "sendmmsg") {
SrsJsonObject* p = SrsJsonAny::object();
data->set("sendmmsg", p);
if ((err = stat->dumps_perf_sendmmsg(p)) != srs_success) {
int code = srs_error_code(err); srs_error_reset(err);
return srs_api_response_code(w, r, code);
}
}
return srs_api_response(w, r, obj->dumps());
}

View file

@ -52,6 +52,7 @@ using namespace std;
#include <srs_service_utility.hpp>
#include <srs_http_stack.hpp>
#include <srs_app_http_api.hpp>
#include <srs_app_statistic.hpp>
static bool is_stun(const uint8_t* data, const int size)
{
@ -1342,6 +1343,11 @@ srs_error_t SrsRtcServer::cycle()
// TODO: FIXME: Use pithy print.
uint32_t cnt = 1;
SrsStatistic* stat = SrsStatistic::instance();
// TODO: FIXME: Support reload.
int max_sendmmsg = _srs_config->get_rtc_server_sendmmsg();
while (true) {
if ((err = trd->pull()) != srs_success) {
return err;
@ -1356,19 +1362,26 @@ srs_error_t SrsRtcServer::cycle()
vector<mmsghdr> mhdrs = mmhdrs;
mmhdrs.clear();
// TODO: FIXME: Use pithy print.
if ((cnt++ % 1000) == 0) {
srs_trace("SEND %d msgs by sendmmsg", mhdrs.size());
}
mmsghdr* p = &mhdrs[0];
for (mmsghdr* end = p + mhdrs.size(); p < end; p += max_sendmmsg) {
int vlen = (int)(end - p);
vlen = srs_min(max_sendmmsg, vlen);
if (!mhdrs.empty()) {
mmsghdr* msgvec = &mhdrs[0];
unsigned int vlen = (unsigned int)mhdrs.size();
int r0 = srs_sendmmsg(mmstfd, msgvec, vlen, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 != (int)vlen) {
int r0 = srs_sendmmsg(mmstfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 != vlen) {
srs_warn("sendmsg %d msgs, %d done", vlen, r0);
}
stat->perf_mw_on_packets(vlen);
}
// TODO: FIXME: Use pithy print.
if ((cnt++ % 100) == 0) {
// TODO: FIXME: Support reload.
max_sendmmsg = _srs_config->get_rtc_server_sendmmsg();
srs_trace("-> RTC SEND %d msgs, by sendmmsg %d", mhdrs.size(), max_sendmmsg);
}
for (int i = 0; i < (int)mhdrs.size(); i++) {
msghdr* hdr = &mhdrs[i].msg_hdr;
for (int i = 0; i < (int)hdr->msg_iovlen; i++) {

View file

@ -266,6 +266,7 @@ SrsStatistic::SrsStatistic()
perf_iovs = new SrsStatisticCategory();
perf_msgs = new SrsStatisticCategory();
perf_sys = new SrsStatisticCategory();
perf_sendmmsg = new SrsStatisticCategory();
}
SrsStatistic::~SrsStatistic()
@ -303,6 +304,7 @@ SrsStatistic::~SrsStatistic()
srs_freep(perf_iovs);
srs_freep(perf_msgs);
srs_freep(perf_sys);
srs_freep(perf_sendmmsg);
}
SrsStatistic* SrsStatistic::instance()
@ -664,7 +666,7 @@ void SrsStatistic::perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs)
perf_sys->b++;
}
srs_error_t SrsStatistic::dumps_perf_mw(SrsJsonObject* obj)
srs_error_t SrsStatistic::dumps_perf_writev(SrsJsonObject* obj)
{
srs_error_t err = srs_success;
@ -736,6 +738,76 @@ srs_error_t SrsStatistic::dumps_perf_mw(SrsJsonObject* obj)
return err;
}
void SrsStatistic::perf_mw_on_packets(int nb_msgs)
{
// For perf msgs, the nb_msgs stat.
// a: =1
// b: <10
// c: <100
// d: <200
// e: <300
// f: <400
// g: <500
// h: <600
// i: <1000
// j: >=1000
if (nb_msgs == 1) {
perf_sendmmsg->a++;
} else if (nb_msgs < 10) {
perf_sendmmsg->b++;
} else if (nb_msgs < 100) {
perf_sendmmsg->c++;
} else if (nb_msgs < 200) {
perf_sendmmsg->d++;
} else if (nb_msgs < 300) {
perf_sendmmsg->e++;
} else if (nb_msgs < 400) {
perf_sendmmsg->f++;
} else if (nb_msgs < 500) {
perf_sendmmsg->g++;
} else if (nb_msgs < 600) {
perf_sendmmsg->h++;
} else if (nb_msgs < 1000) {
perf_sendmmsg->i++;
} else {
perf_sendmmsg->j++;
}
}
srs_error_t SrsStatistic::dumps_perf_sendmmsg(SrsJsonObject* obj)
{
srs_error_t err = srs_success;
if (true) {
SrsJsonObject* p = SrsJsonAny::object();
obj->set("msgs", p);
// For perf msgs, the nb_msgs stat.
// a: =1
// b: <10
// c: <100
// d: <200
// e: <300
// f: <400
// g: <500
// h: <600
// i: <1000
// j: >=1000
p->set("lt_2", SrsJsonAny::integer(perf_sendmmsg->a));
p->set("lt_10", SrsJsonAny::integer(perf_sendmmsg->b));
p->set("lt_100", SrsJsonAny::integer(perf_sendmmsg->c));
p->set("lt_200", SrsJsonAny::integer(perf_sendmmsg->d));
p->set("lt_300", SrsJsonAny::integer(perf_sendmmsg->e));
p->set("lt_400", SrsJsonAny::integer(perf_sendmmsg->f));
p->set("lt_500", SrsJsonAny::integer(perf_sendmmsg->g));
p->set("lt_600", SrsJsonAny::integer(perf_sendmmsg->h));
p->set("lt_1000", SrsJsonAny::integer(perf_sendmmsg->i));
p->set("gt_1000", SrsJsonAny::integer(perf_sendmmsg->j));
}
return err;
}
SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req)
{
SrsStatisticVhost* vhost = NULL;

View file

@ -169,6 +169,7 @@ private:
SrsStatisticCategory* perf_iovs;
SrsStatisticCategory* perf_msgs;
SrsStatisticCategory* perf_sys;
SrsStatisticCategory* perf_sendmmsg;
private:
SrsStatistic();
virtual ~SrsStatistic();
@ -233,8 +234,13 @@ public:
// Stat for packets merged written, nb_pkts is the number of or chunk packets,
// bytes_pkts is the total bytes of or chunk packets, nb_iovs is the total number of iovec.
virtual void perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs);
// Dumps the perf statistic data, for performance analysis.
virtual srs_error_t dumps_perf_mw(SrsJsonObject* obj);
// Dumps the perf statistic data for TCP writev, for performance analysis.
virtual srs_error_t dumps_perf_writev(SrsJsonObject* obj);
public:
// Stat for packets UDP sendmmsg, nb_msgs is the vlen for sendmmsg.
virtual void perf_mw_on_packets(int nb_msgs);
// Dumps the perf statistic data for UDP sendmmsg, for performance analysis.
virtual srs_error_t dumps_perf_sendmmsg(SrsJsonObject* obj);
private:
virtual SrsStatisticVhost* create_vhost(SrsRequest* req);
virtual SrsStatisticStream* create_stream(SrsStatisticVhost* vhost, SrsRequest* req);

View file

@ -428,6 +428,14 @@ int srs_sendmmsg(srs_netfd_t stfd, struct mmsghdr *msgvec, unsigned int vlen, in
// further sendmmsg() call to send the remaining messages.
return vlen;
#else
if (vlen == 1) {
int r0 = srs_sendmsg(stfd, &msgvec->msg_hdr, flags, timeout);
if (r0 < 0) {
return r0;
}
msgvec->msg_len = r0;
return 1;
}
return st_sendmmsg((st_netfd_t)stfd, msgvec, vlen, flags, (st_utime_t)timeout);
#endif
}