1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #307, allow dedicated cache for GSO.

This commit is contained in:
winlin 2020-04-14 09:20:21 +08:00
parent 89a247d9bc
commit 8a71ce62db
9 changed files with 303 additions and 48 deletions

View file

@ -866,22 +866,6 @@ srs_error_t SrsRtcSenderThread::send_packets2(SrsUdpMuxSocket* skt, SrsRtcPacket
p->iov_base = new char[kRtpPacketSize];
p->iov_len = kRtpPacketSize;
}
} else {
// Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
if ((err = sender->fetch(&mhdr)) != srs_success) {
return srs_error_wrap(err, "fetch msghdr");
}
// Reset the iovec, we should never change the msg_iovlen.
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* p = mhdr->msg_hdr.msg_iov + j;
p->iov_len = 0;
}
gso_mhdr = mhdr;
gso_size = nn_packet;
gso_cursor = 0;
}
// Change the state according to the next packet.
@ -901,6 +885,32 @@ srs_error_t SrsRtcSenderThread::send_packets2(SrsUdpMuxSocket* skt, SrsRtcPacket
}
}
// Now, we fetch the msg from cache.
if (!mhdr) {
// Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
if (use_gso) {
err = sender->gso_fetch(&mhdr);
} else {
err = sender->fetch(&mhdr);
}
if (err != srs_success) {
return srs_error_wrap(err, "fetch msghdr");
}
// Reset the iovec, we should never change the msg_iovlen.
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* p = mhdr->msg_hdr.msg_iov + j;
p->iov_len = 0;
}
// Now, GSO will use this message and size.
if (use_gso) {
gso_mhdr = mhdr;
gso_size = nn_packet;
}
}
// Marshal packet to bytes.
iovec* iov = mhdr->msg_hdr.msg_iov + gso_cursor;
iov->iov_len = kRtpPacketSize;
@ -946,25 +956,37 @@ srs_error_t SrsRtcSenderThread::send_packets2(SrsUdpMuxSocket* skt, SrsRtcPacket
mhdr->msg_hdr.msg_controllen = 0;
mhdr->msg_len = 0;
if (use_gso) {
#ifndef SRS_AUTO_OSX
if (use_gso) {
mhdr->msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
if (!mhdr->msg_hdr.msg_control) {
mhdr->msg_hdr.msg_control = new char[mhdr->msg_hdr.msg_controllen];
}
mhdr->msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
cmsghdr* cm = CMSG_FIRSTHDR(&mhdr->msg_hdr);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t*)CMSG_DATA(cm)) = gso_encrypt;
#endif
// Private message, use it to store the cursor.
mhdr->msg_len = gso_cursor + 1;
}
#endif
if ((err = sender->sendmmsg(mhdr)) != srs_success) {
return srs_error_wrap(err, "send msghdr");
}
#ifdef SRS_DEBUG
srs_warn("packet SN=%d %d bytes", nn_packet, packet->rtp_header.get_sequence());
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* iov = mhdr->msg_hdr.msg_iov + j;
srs_warn("%s #%d/%d/%d, %d bytes, size %d/%d", (use_gso? "GSO":"RAW"), j, gso_cursor + 1,
mhdr->msg_hdr.msg_iovlen, iov->iov_len, gso_size, gso_encrypt);
}
#endif
// Reset the GSO flag.
gso_mhdr = NULL; gso_size = 0; gso_encrypt = 0; gso_cursor = 0;
use_gso = gso_final = false;
@ -1662,6 +1684,7 @@ SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s)
trd = new SrsDummyCoroutine();
cache_pos = 0;
gso_cache_pos = 0;
_srs_config->subscribe(this);
}
@ -1678,6 +1701,12 @@ SrsUdpMuxSender::~SrsUdpMuxSender()
free_mhdrs(cache);
cache.clear();
free_mhdrs(gso_hotspot);
gso_hotspot.clear();
free_mhdrs(gso_cache);
gso_cache.clear();
}
srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd)
@ -1693,7 +1722,10 @@ srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd)
}
max_sendmmsg = _srs_config->get_rtc_server_sendmmsg();
srs_trace("UDP sender #%d init ok, max_sendmmsg=%d", srs_netfd_fileno(fd), max_sendmmsg);
bool gso = _srs_config->get_rtc_server_gso();
gso_dedicated = _srs_config->get_rtc_server_gso_dedicated();
srs_trace("UDP sender #%d init ok, max_sendmmsg=%d, gso=%d, gso-dedicated=%d",
srs_netfd_fileno(fd), max_sendmmsg, gso, gso_dedicated);
return err;
}
@ -1738,6 +1770,34 @@ srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr)
return srs_success;
}
srs_error_t SrsUdpMuxSender::gso_fetch(mmsghdr** pphdr)
{
// When GSO share cache, we use the same cache with non-GSO.
if (!gso_dedicated) {
return fetch(pphdr);
}
// TODO: FIXME: Maybe need to shrink?
if (gso_cache_pos >= (int)gso_cache.size()) {
mmsghdr mhdr;
memset(&mhdr, 0, sizeof(mmsghdr));
mhdr.msg_hdr.msg_iovlen = 1;
mhdr.msg_hdr.msg_iov = new iovec();
mhdr.msg_hdr.msg_iov->iov_base = new char[kRtpPacketSize];
mhdr.msg_hdr.msg_iov->iov_len = kRtpPacketSize;
mhdr.msg_len = 0;
mhdr.msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
mhdr.msg_hdr.msg_control = new char[mhdr.msg_hdr.msg_controllen];
gso_cache.push_back(mhdr);
}
*pphdr = &gso_cache[gso_cache_pos++];
return srs_success;
}
srs_error_t SrsUdpMuxSender::sendmmsg(mmsghdr* hdr)
{
if (waiting_msgs) {
@ -1752,11 +1812,9 @@ srs_error_t SrsUdpMuxSender::cycle()
{
srs_error_t err = srs_success;
uint64_t nn_msgs = 0;
uint64_t nn_msgs_last = 0;
int nn_msgs_max = 0;
int nn_loop = 0;
int nn_wait = 0;
uint64_t nn_msgs = 0; uint64_t nn_msgs_last = 0; int nn_msgs_max = 0;
uint64_t nn_gso_msgs = 0; uint64_t nn_gso_iovs = 0; int nn_gso_msgs_max = 0; int nn_gso_iovs_max = 0;
int nn_loop = 0; int nn_wait = 0;
srs_utime_t time_last = srs_get_system_time();
SrsStatistic* stat = SrsStatistic::instance();
@ -1771,7 +1829,9 @@ srs_error_t SrsUdpMuxSender::cycle()
nn_loop++;
int pos = cache_pos;
if (pos <= 0) {
int gso_pos = gso_cache_pos;
int gso_iovs = 0;
if (pos <= 0 && gso_pos == 0) {
waiting_msgs = true;
nn_wait++;
srs_cond_wait(cond);
@ -1782,22 +1842,73 @@ srs_error_t SrsUdpMuxSender::cycle()
cache.swap(hotspot);
cache_pos = 0;
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;
for (; p < end; p += max_sendmmsg) {
int vlen = (int)(end - p);
vlen = srs_min(max_sendmmsg, vlen);
if (gso_dedicated) {
gso_cache.swap(gso_hotspot);
gso_cache_pos = 0;
}
int r0 = srs_sendmmsg(lfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 != vlen) {
srs_warn("sendmsg %d msgs, %d done", vlen, r0);
// Send out GSO in dedicated queue.
if (gso_dedicated && gso_pos > 0) {
mmsghdr* p = &gso_hotspot[0]; mmsghdr* end = p + gso_pos;
for (; p < end; p++) {
// Private message, use it to store the cursor.
int real_iovs = p->msg_len;
p->msg_len = 0;
// Send out GSO message.
int r0 = srs_sendmsg(lfd, &p->msg_hdr, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 < 0) {
srs_warn("sendmsg err, r0=%d", r0);
}
nn_gso_msgs++; nn_gso_iovs += real_iovs; gso_iovs += real_iovs;
stat->perf_gso_on_packets(real_iovs);
}
}
// Send out all messages, may GSO if shared cache.
if (pos > 0) {
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;
// For shared GSO cache, stat the messages.
if (!gso_dedicated) {
for (p = &hotspot[0]; p < end; p++) {
if (!p->msg_len) {
continue;
}
// Private message, use it to store the cursor.
int real_iovs = p->msg_len;
p->msg_len = 0;
gso_pos++; nn_gso_msgs++; nn_gso_iovs += real_iovs; gso_iovs += real_iovs;
stat->perf_gso_on_packets(real_iovs);
}
}
stat->perf_mw_on_packets(vlen);
// Send out all messages.
for (p = &hotspot[0]; p < end; p += max_sendmmsg) {
int vlen = (int)(end - p);
vlen = srs_min(max_sendmmsg, vlen);
int r0 = srs_sendmmsg(lfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 != vlen) {
srs_warn("sendmmsg %d msgs, %d done", vlen, r0);
}
stat->perf_sendmmsg_on_packets(vlen);
}
}
// Increase total messages.
nn_msgs += pos;
nn_msgs_max = srs_max(pos, nn_msgs_max);
int nn_pos = pos;
if (gso_dedicated) {
nn_pos = pos + gso_pos;
}
nn_msgs += nn_pos;
nn_msgs_max = srs_max(nn_pos, nn_msgs_max);
nn_gso_msgs_max = srs_max(gso_pos, nn_gso_msgs_max);
nn_gso_iovs_max = srs_max(gso_iovs, nn_gso_iovs_max);
pprint->elapse();
if (pprint->can_print()) {
@ -1819,11 +1930,12 @@ srs_error_t SrsUdpMuxSender::cycle()
pps_unit = "(k)"; pps_last /= 1000; pps_average /= 1000;
}
srs_trace("-> RTC #%d SEND %d/%d/%" PRId64 ", pps %d/%d%s, schedule %d/%d, sessions %d, cache %d/%d by sendmmsg %d",
srs_netfd_fileno(lfd), pos, nn_msgs_max, nn_msgs, pps_average, pps_last, pps_unit.c_str(), nn_loop, nn_wait,
(int)server->nn_sessions(), (int)cache.size(), (int)hotspot.size(), max_sendmmsg);
srs_trace("-> RTC #%d SEND %d/%d/%" PRId64 ", gso %d/%d/%" PRId64 ", gso-iovs %d/%d/%" PRId64 ", pps %d/%d%s, schedule %d/%d, sessions %d, cache %d/%d, sendmmsg %d",
srs_netfd_fileno(lfd), nn_pos, nn_msgs_max, nn_msgs, gso_pos, nn_gso_msgs_max, nn_gso_msgs, gso_iovs, nn_gso_iovs_max, nn_gso_iovs, pps_average, pps_last, pps_unit.c_str(),
nn_loop, nn_wait, (int)server->nn_sessions(), (int)cache.size(), (int)gso_cache.size(), max_sendmmsg);
nn_msgs_last = nn_msgs; time_last = srs_get_system_time();
nn_loop = nn_wait = nn_msgs_max = 0;
nn_gso_msgs_max = 0; nn_gso_iovs_max = 0;
}
}
@ -1832,10 +1944,21 @@ srs_error_t SrsUdpMuxSender::cycle()
srs_error_t SrsUdpMuxSender::on_reload_rtc_server()
{
int v = _srs_config->get_rtc_server_sendmmsg();
if (max_sendmmsg != v) {
srs_trace("Reload max_sendmmsg %d=>%d", max_sendmmsg, v);
max_sendmmsg = v;
if (true) {
int v = _srs_config->get_rtc_server_sendmmsg();
if (max_sendmmsg != v) {
srs_trace("Reload max_sendmmsg %d=>%d", max_sendmmsg, v);
max_sendmmsg = v;
}
}
if (true) {
bool gso = _srs_config->get_rtc_server_gso();
bool v = _srs_config->get_rtc_server_gso_dedicated();
if (gso_dedicated != v) {
srs_trace("Reload gso=%d, gso-dedicated %d=>%d", gso, gso_dedicated, v);
gso_dedicated = v;
}
}
return srs_success;