1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #307, support linux GSO for RTC

This commit is contained in:
winlin 2020-04-13 23:40:30 +08:00
parent b1438bf52c
commit 3b7240b8e0
4 changed files with 193 additions and 7 deletions

View file

@ -440,6 +440,7 @@ rtc_server {
# default: on # default: on
merge_nalus on; merge_nalus on;
# Whether enable GSO to send out RTP packets. # Whether enable GSO to send out RTP packets.
# @remark Linux only, for other OS always disabled.
# default: off # default: off
gso off; gso off;
} }

View file

@ -4803,7 +4803,15 @@ bool SrsConfig::get_rtc_server_gso()
return DEFAULT; return DEFAULT;
} }
return SRS_CONF_PERFER_FALSE(conf->arg0()); bool v = SRS_CONF_PERFER_FALSE(conf->arg0());
#ifdef SRS_AUTO_OSX
if (v) {
srs_warn("GSO is for Linux only");
}
v = false;
#endif
return v;
} }
SrsConfDirective* SrsConfig::get_rtc(string vhost) SrsConfDirective* SrsConfig::get_rtc(string vhost)

View file

@ -33,6 +33,11 @@ using namespace std;
#include <fcntl.h> #include <fcntl.h>
#include <unistd.h> #include <unistd.h>
#include <netinet/udp.h>
#ifndef UDP_SEGMENT
#define UDP_SEGMENT 103
#endif
#include <sstream> #include <sstream>
#include <srs_core_autofree.hpp> #include <srs_core_autofree.hpp>
@ -451,7 +456,7 @@ srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, in
SrsRtcPackets::SrsRtcPackets(bool gso, bool merge_nalus) SrsRtcPackets::SrsRtcPackets(bool gso, bool merge_nalus)
{ {
is_gso = gso; use_gso = gso;
should_merge_nalus = merge_nalus; should_merge_nalus = merge_nalus;
nn_rtp_pkts = 0; nn_rtp_pkts = 0;
@ -662,15 +667,25 @@ srs_error_t SrsRtcSenderThread::send_messages(
// Covert kernel messages to RTP packets. // Covert kernel messages to RTP packets.
if ((err = messages_to_packets(source, msgs, nb_msgs, packets)) != srs_success) { if ((err = messages_to_packets(source, msgs, nb_msgs, packets)) != srs_success) {
return err; return srs_error_wrap(err, "messages to packets");
} }
packets.nn_rtp_pkts = (int)packets.packets.size(); packets.nn_rtp_pkts = (int)packets.packets.size();
// Send out RTP packets #ifndef SRS_AUTO_OSX
if ((err = send_packets(skt, packets)) != srs_success) { // If enabled GSO, send out some packets in a msghdr.
if (packets.use_gso) {
if ((err = send_packets2(skt, packets)) != srs_success) {
return srs_error_wrap(err, "gso send");
}
return err; return err;
} }
#endif
// By default, we send packets by sendmmsg.
if ((err = send_packets(skt, packets)) != srs_success) {
return srs_error_wrap(err, "raw send");
}
return err; return err;
} }
@ -761,10 +776,13 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
ISrsUdpSender* sender = skt->sender();
vector<SrsRtpPacket2*>::iterator it; vector<SrsRtpPacket2*>::iterator it;
for (it = packets.packets.begin(); it != packets.packets.end(); ++it) { for (it = packets.packets.begin(); it != packets.packets.end(); ++it) {
SrsRtpPacket2* packet = *it; SrsRtpPacket2* packet = *it;
ISrsUdpSender* sender = skt->sender();
int nn_packet = packet->nb_bytes();
// Fetch a cached message from queue. // Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
@ -807,6 +825,157 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets
return err; return err;
} }
// TODO: FIXME: We can gather and pad audios, because they have similar size.
srs_error_t SrsRtcSenderThread::send_packets2(SrsUdpMuxSocket* skt, SrsRtcPackets& packets)
{
srs_error_t err = srs_success;
ISrsUdpSender* sender = skt->sender();
// Previous handler, if has the same size, we can use GSO.
mmsghdr* gso_mhdr = NULL; int gso_size = 0; int gso_encrypt = 0; int gso_cursor = 0;
// GSO, N packets has same length, the final one may not.
bool use_gso = false; bool gso_final = false;
int nn_packets = (int)packets.packets.size();
for (int i = 0; i < nn_packets; i++) {
SrsRtpPacket2* packet = packets.packets[i];
// The handler to send message.
mmsghdr* mhdr = NULL;
// Check whether we can use GSO to send it.
int nn_packet = packet->nb_bytes();
if ((gso_size && gso_size == nn_packet) || (use_gso && !gso_final)) {
use_gso = true;
gso_final = (gso_size && gso_size != nn_packet);
mhdr = gso_mhdr;
// We need to increase the iov and cursor.
int nb_iovs = mhdr->msg_hdr.msg_iovlen;
if (gso_cursor >= nb_iovs - 1) {
int nn_new_iovs = nb_iovs;
mhdr->msg_hdr.msg_iovlen = nb_iovs + nn_new_iovs;
mhdr->msg_hdr.msg_iov = (iovec*)realloc(mhdr->msg_hdr.msg_iov, sizeof(iovec) * (nb_iovs + nn_new_iovs));
memset(mhdr->msg_hdr.msg_iov + nb_iovs, 0, sizeof(iovec) * nn_new_iovs);
}
gso_cursor++;
// Create payload cache for RTP packet.
iovec* p = mhdr->msg_hdr.msg_iov + gso_cursor;
if (!p->iov_base) {
p->iov_base = new char[kRtpPacketSize];
p->iov_len = kRtpPacketSize;
}
} else {
// Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
if ((err = sender->fetch(&mhdr)) != srs_success) {
return srs_error_wrap(err, "fetch msghdr");
}
// Reset the iovec, we should never change the msg_iovlen.
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* p = mhdr->msg_hdr.msg_iov + j;
p->iov_len = 0;
}
gso_mhdr = mhdr;
gso_size = nn_packet;
gso_cursor = 0;
}
// Change the state according to the next packet.
if (i < nn_packets - 1) {
SrsRtpPacket2* next_packet = (i < nn_packets - 1)? packets.packets[i + 1]:NULL;
int nn_next_packet = next_packet? next_packet->nb_bytes() : 0;
// If GSO, but next is bigger than this one, we must enter the final state.
if (use_gso && !gso_final) {
gso_final = (nn_packet < nn_next_packet);
}
// If not GSO, maybe the first fresh packet, we should see whether the next packet is smaller than this one,
// if smaller, we can still enter GSO.
if (!use_gso) {
use_gso = (nn_packet >= nn_next_packet);
}
}
// Marshal packet to bytes.
iovec* iov = mhdr->msg_hdr.msg_iov + gso_cursor;
iov->iov_len = kRtpPacketSize;
if (true) {
SrsBuffer stream((char*)iov->iov_base, iov->iov_len);
if ((err = packet->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
iov->iov_len = stream.pos();
}
// Whether encrypt the RTP bytes.
if (rtc_session->encrypt) {
int nn_encrypt = (int)iov->iov_len;
if ((err = rtc_session->dtls_session->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;
}
// If GSO, they must has same size, except the final one.
if (use_gso && !gso_final && gso_encrypt && gso_encrypt != (int)iov->iov_len) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "GSO size=%d/%d, encrypt=%d/%d", gso_size, nn_packet, gso_encrypt, iov->iov_len);
}
if (use_gso && !gso_final) {
gso_encrypt = iov->iov_len;
}
// If exceed the max GSO size, set to final.
if (use_gso && gso_cursor > 64) {
gso_final = true;
}
// For last message, or final gso, or determined not using GSO, send it now.
if (i == nn_packets - 1 || gso_final || !use_gso) {
sockaddr_in* addr = (sockaddr_in*)skt->peer_addr();
socklen_t addrlen = (socklen_t)skt->peer_addrlen();
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
mhdr->msg_hdr.msg_controllen = 0;
mhdr->msg_len = 0;
if (use_gso) {
#ifndef SRS_AUTO_OSX
if (!mhdr->msg_hdr.msg_control) {
mhdr->msg_hdr.msg_control = new char[mhdr->msg_hdr.msg_controllen];
}
mhdr->msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
cmsghdr* cm = CMSG_FIRSTHDR(&mhdr->msg_hdr);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t*)CMSG_DATA(cm)) = gso_encrypt;
#endif
}
if ((err = sender->sendmmsg(mhdr)) != srs_success) {
return srs_error_wrap(err, "send msghdr");
}
// Reset the GSO flag.
gso_mhdr = NULL; gso_size = 0; gso_encrypt = 0; gso_cursor = 0;
use_gso = gso_final = false;
}
}
return err;
}
srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets) srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1536,6 +1705,11 @@ void SrsUdpMuxSender::free_mhdrs(std::vector<mmsghdr>& mhdrs)
for (int i = 0; i < (int)mhdrs.size(); i++) { for (int i = 0; i < (int)mhdrs.size(); i++) {
mmsghdr* hdr = &mhdrs[i]; mmsghdr* hdr = &mhdrs[i];
// Free control for GSO.
char* msg_control = (char*)hdr->msg_hdr.msg_control;
srs_freep(msg_control);
// Free iovec.
for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) { for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) {
iovec* iov = hdr->msg_hdr.msg_iov + j; iovec* iov = hdr->msg_hdr.msg_iov + j;
char* data = (char*)iov->iov_base; char* data = (char*)iov->iov_base;
@ -1543,6 +1717,7 @@ void SrsUdpMuxSender::free_mhdrs(std::vector<mmsghdr>& mhdrs)
srs_freep(iov); srs_freep(iov);
} }
} }
mhdrs.clear();
} }
srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr) srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr)

View file

@ -120,7 +120,7 @@ private:
class SrsRtcPackets class SrsRtcPackets
{ {
public: public:
bool is_gso; bool use_gso;
bool should_merge_nalus; bool should_merge_nalus;
public: public:
int nn_bytes; int nn_bytes;
@ -129,6 +129,7 @@ public:
int nn_extras; int nn_extras;
int nn_audios; int nn_audios;
int nn_videos; int nn_videos;
public:
std::vector<SrsRtpPacket2*> packets; std::vector<SrsRtpPacket2*> packets;
public: public:
SrsRtcPackets(bool gso, bool merge_nalus); SrsRtcPackets(bool gso, bool merge_nalus);
@ -178,6 +179,7 @@ private:
srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets);
srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets);
srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets);
srs_error_t send_packets2(SrsUdpMuxSocket* skt, SrsRtcPackets& packets);
private: private:
srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket);
private: private: