From 3b7240b8e0a360ce7db09f9c183c7b4a59518dd1 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 13 Apr 2020 23:40:30 +0800 Subject: [PATCH] For #307, support linux GSO for RTC --- trunk/conf/full.conf | 1 + trunk/src/app/srs_app_config.cpp | 10 +- trunk/src/app/srs_app_rtc_conn.cpp | 185 ++++++++++++++++++++++++++++- trunk/src/app/srs_app_rtc_conn.hpp | 4 +- 4 files changed, 193 insertions(+), 7 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index caadce385..57e64bf1b 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -440,6 +440,7 @@ rtc_server { # default: on merge_nalus on; # Whether enable GSO to send out RTP packets. + # @remark Linux only, for other OS always disabled. # default: off gso off; } diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index f60a1782e..f7f69a381 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4803,7 +4803,15 @@ bool SrsConfig::get_rtc_server_gso() return DEFAULT; } - return SRS_CONF_PERFER_FALSE(conf->arg0()); + bool v = SRS_CONF_PERFER_FALSE(conf->arg0()); + +#ifdef SRS_AUTO_OSX + if (v) { + srs_warn("GSO is for Linux only"); + } + v = false; +#endif + return v; } SrsConfDirective* SrsConfig::get_rtc(string vhost) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index dfc3faf8e..b51cb236c 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -33,6 +33,11 @@ using namespace std; #include #include +#include +#ifndef UDP_SEGMENT +#define UDP_SEGMENT 103 +#endif + #include #include @@ -451,7 +456,7 @@ srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, in SrsRtcPackets::SrsRtcPackets(bool gso, bool merge_nalus) { - is_gso = gso; + use_gso = gso; should_merge_nalus = merge_nalus; nn_rtp_pkts = 0; @@ -662,15 +667,25 @@ srs_error_t SrsRtcSenderThread::send_messages( // Covert kernel messages to RTP packets. if ((err = messages_to_packets(source, msgs, nb_msgs, packets)) != srs_success) { - return err; + return srs_error_wrap(err, "messages to packets"); } packets.nn_rtp_pkts = (int)packets.packets.size(); - // Send out RTP packets - if ((err = send_packets(skt, packets)) != srs_success) { +#ifndef SRS_AUTO_OSX + // If enabled GSO, send out some packets in a msghdr. + if (packets.use_gso) { + if ((err = send_packets2(skt, packets)) != srs_success) { + return srs_error_wrap(err, "gso send"); + } return err; } +#endif + + // By default, we send packets by sendmmsg. + if ((err = send_packets(skt, packets)) != srs_success) { + return srs_error_wrap(err, "raw send"); + } return err; } @@ -761,10 +776,13 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets { srs_error_t err = srs_success; + ISrsUdpSender* sender = skt->sender(); + vector::iterator it; for (it = packets.packets.begin(); it != packets.packets.end(); ++it) { SrsRtpPacket2* packet = *it; - ISrsUdpSender* sender = skt->sender(); + + int nn_packet = packet->nb_bytes(); // Fetch a cached message from queue. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. @@ -807,6 +825,157 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets return err; } +// TODO: FIXME: We can gather and pad audios, because they have similar size. +srs_error_t SrsRtcSenderThread::send_packets2(SrsUdpMuxSocket* skt, SrsRtcPackets& packets) +{ + srs_error_t err = srs_success; + + ISrsUdpSender* sender = skt->sender(); + + // Previous handler, if has the same size, we can use GSO. + mmsghdr* gso_mhdr = NULL; int gso_size = 0; int gso_encrypt = 0; int gso_cursor = 0; + // GSO, N packets has same length, the final one may not. + bool use_gso = false; bool gso_final = false; + + int nn_packets = (int)packets.packets.size(); + for (int i = 0; i < nn_packets; i++) { + SrsRtpPacket2* packet = packets.packets[i]; + + // The handler to send message. + mmsghdr* mhdr = NULL; + + // Check whether we can use GSO to send it. + int nn_packet = packet->nb_bytes(); + + if ((gso_size && gso_size == nn_packet) || (use_gso && !gso_final)) { + use_gso = true; + gso_final = (gso_size && gso_size != nn_packet); + mhdr = gso_mhdr; + + // We need to increase the iov and cursor. + int nb_iovs = mhdr->msg_hdr.msg_iovlen; + if (gso_cursor >= nb_iovs - 1) { + int nn_new_iovs = nb_iovs; + mhdr->msg_hdr.msg_iovlen = nb_iovs + nn_new_iovs; + mhdr->msg_hdr.msg_iov = (iovec*)realloc(mhdr->msg_hdr.msg_iov, sizeof(iovec) * (nb_iovs + nn_new_iovs)); + memset(mhdr->msg_hdr.msg_iov + nb_iovs, 0, sizeof(iovec) * nn_new_iovs); + } + gso_cursor++; + + // Create payload cache for RTP packet. + iovec* p = mhdr->msg_hdr.msg_iov + gso_cursor; + if (!p->iov_base) { + p->iov_base = new char[kRtpPacketSize]; + p->iov_len = kRtpPacketSize; + } + } else { + // Fetch a cached message from queue. + // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. + if ((err = sender->fetch(&mhdr)) != srs_success) { + return srs_error_wrap(err, "fetch msghdr"); + } + + // Reset the iovec, we should never change the msg_iovlen. + for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) { + iovec* p = mhdr->msg_hdr.msg_iov + j; + p->iov_len = 0; + } + + gso_mhdr = mhdr; + gso_size = nn_packet; + gso_cursor = 0; + } + + // Change the state according to the next packet. + if (i < nn_packets - 1) { + SrsRtpPacket2* next_packet = (i < nn_packets - 1)? packets.packets[i + 1]:NULL; + int nn_next_packet = next_packet? next_packet->nb_bytes() : 0; + + // If GSO, but next is bigger than this one, we must enter the final state. + if (use_gso && !gso_final) { + gso_final = (nn_packet < nn_next_packet); + } + + // If not GSO, maybe the first fresh packet, we should see whether the next packet is smaller than this one, + // if smaller, we can still enter GSO. + if (!use_gso) { + use_gso = (nn_packet >= nn_next_packet); + } + } + + // Marshal packet to bytes. + iovec* iov = mhdr->msg_hdr.msg_iov + gso_cursor; + iov->iov_len = kRtpPacketSize; + + if (true) { + SrsBuffer stream((char*)iov->iov_base, iov->iov_len); + if ((err = packet->encode(&stream)) != srs_success) { + return srs_error_wrap(err, "encode packet"); + } + iov->iov_len = stream.pos(); + } + + // Whether encrypt the RTP bytes. + if (rtc_session->encrypt) { + int nn_encrypt = (int)iov->iov_len; + if ((err = rtc_session->dtls_session->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) { + return srs_error_wrap(err, "srtp protect"); + } + iov->iov_len = (size_t)nn_encrypt; + } + + // If GSO, they must has same size, except the final one. + if (use_gso && !gso_final && gso_encrypt && gso_encrypt != (int)iov->iov_len) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "GSO size=%d/%d, encrypt=%d/%d", gso_size, nn_packet, gso_encrypt, iov->iov_len); + } + + if (use_gso && !gso_final) { + gso_encrypt = iov->iov_len; + } + + // If exceed the max GSO size, set to final. + if (use_gso && gso_cursor > 64) { + gso_final = true; + } + + // For last message, or final gso, or determined not using GSO, send it now. + if (i == nn_packets - 1 || gso_final || !use_gso) { + sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); + socklen_t addrlen = (socklen_t)skt->peer_addrlen(); + + mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; + mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; + mhdr->msg_hdr.msg_controllen = 0; + mhdr->msg_len = 0; + + if (use_gso) { +#ifndef SRS_AUTO_OSX + if (!mhdr->msg_hdr.msg_control) { + mhdr->msg_hdr.msg_control = new char[mhdr->msg_hdr.msg_controllen]; + } + mhdr->msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t)); + + cmsghdr* cm = CMSG_FIRSTHDR(&mhdr->msg_hdr); + cm->cmsg_level = SOL_UDP; + cm->cmsg_type = UDP_SEGMENT; + cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); + *((uint16_t*)CMSG_DATA(cm)) = gso_encrypt; +#endif + } + + if ((err = sender->sendmmsg(mhdr)) != srs_success) { + return srs_error_wrap(err, "send msghdr"); + } + + // Reset the GSO flag. + gso_mhdr = NULL; gso_size = 0; gso_encrypt = 0; gso_cursor = 0; + use_gso = gso_final = false; + } + } + + return err; +} + srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -1536,6 +1705,11 @@ void SrsUdpMuxSender::free_mhdrs(std::vector& mhdrs) for (int i = 0; i < (int)mhdrs.size(); i++) { mmsghdr* hdr = &mhdrs[i]; + // Free control for GSO. + char* msg_control = (char*)hdr->msg_hdr.msg_control; + srs_freep(msg_control); + + // Free iovec. for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) { iovec* iov = hdr->msg_hdr.msg_iov + j; char* data = (char*)iov->iov_base; @@ -1543,6 +1717,7 @@ void SrsUdpMuxSender::free_mhdrs(std::vector& mhdrs) srs_freep(iov); } } + mhdrs.clear(); } srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr) diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index c10cc1305..748374290 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -120,7 +120,7 @@ private: class SrsRtcPackets { public: - bool is_gso; + bool use_gso; bool should_merge_nalus; public: int nn_bytes; @@ -129,6 +129,7 @@ public: int nn_extras; int nn_audios; int nn_videos; +public: std::vector packets; public: SrsRtcPackets(bool gso, bool merge_nalus); @@ -178,6 +179,7 @@ private: srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); + srs_error_t send_packets2(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); private: srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); private: