From 440089639536b75825d217ada0bdac6378d5b6ae Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 13 Apr 2020 16:50:24 +0800 Subject: [PATCH] Refactor code for merge_nalus and gso --- trunk/conf/full.conf | 7 ++ trunk/src/app/srs_app_config.cpp | 36 +++++++++- trunk/src/app/srs_app_config.hpp | 2 + trunk/src/app/srs_app_rtc_conn.cpp | 105 +++++++++++++++++++++-------- trunk/src/app/srs_app_rtc_conn.hpp | 32 +++++++-- 5 files changed, 149 insertions(+), 33 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 7304edd06..64b920f5b 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -435,6 +435,13 @@ rtc_server { # and net.core.rmem_default or just increase this to get larger UDP recv and send buffer. # default: 4 reuseport 4; + # Whether merge multiple NALUs into one. + # @see https://github.com/ossrs/srs/issues/307#issuecomment-612806318 + # default: off + merge_nalus off; + # Whether enable GSO to send out RTP packets. + # default: off + gso off; } vhost rtc.vhost.srs.com { diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index d3e790426..bc6c5c8bb 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3614,7 +3614,7 @@ srs_error_t SrsConfig::check_normal_config() for (int i = 0; conf && i < (int)conf->directives.size(); i++) { string n = conf->at(i)->name; if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa" - && n != "sendmmsg" && n != "encrypt" && n != "reuseport") { + && n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str()); } } @@ -4772,6 +4772,40 @@ int SrsConfig::get_rtc_server_reuseport() return reuseport; } +bool SrsConfig::get_rtc_server_merge_nalus() +{ + static int DEFAULT = false; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("merge_nalus"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +bool SrsConfig::get_rtc_server_gso() +{ + static int DEFAULT = false; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("gso"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + SrsConfDirective* SrsConfig::get_rtc(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index ba46984e3..8093a50a4 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -528,6 +528,8 @@ public: virtual int get_rtc_server_sendmmsg(); virtual bool get_rtc_server_encrypt(); virtual int get_rtc_server_reuseport(); + virtual bool get_rtc_server_merge_nalus(); + virtual bool get_rtc_server_gso(); SrsConfDirective* get_rtc(std::string vhost); bool get_rtc_enabled(std::string vhost); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 922f81c38..bbf9b7990 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -449,6 +449,25 @@ srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, in return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed"); } +SrsRtcPackets::SrsRtcPackets(bool gso, bool merge_nalus) +{ + is_gso = gso; + should_merge_nalus = merge_nalus; + + nn_rtp_pkts = nn_samples = 0; + nn_audios = nn_videos = 0; +} + +SrsRtcPackets::~SrsRtcPackets() +{ + vector::iterator it; + for (it = packets.begin(); it != packets.end(); ++it ) { + SrsRtpPacket2* packet = *it; + srs_freep(packet); + } + packets.clear(); +} + SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid) : sendonly_ukt(NULL) { @@ -457,15 +476,21 @@ SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int rtc_session = s; sendonly_ukt = u->copy_sendonly(); + gso = false; + merge_nalus = false; audio_timestamp = 0; audio_sequence = 0; video_sequence = 0; + + _srs_config->subscribe(this); } SrsRtcSenderThread::~SrsRtcSenderThread() { + _srs_config->unsubscribe(this); + srs_freep(trd); srs_freep(sendonly_ukt); } @@ -480,9 +505,35 @@ srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t video_payload_type = v_pt; audio_payload_type = a_pt; + gso = _srs_config->get_rtc_server_gso(); + merge_nalus = _srs_config->get_rtc_server_merge_nalus(); + srs_trace("RTC sender video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d)", + video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus); + return err; } +srs_error_t SrsRtcSenderThread::on_reload_rtc_server() +{ + if (true) { + bool v = _srs_config->get_rtc_server_gso(); + if (gso != v) { + srs_trace("Reload gso %d=>%d", gso, v); + gso = v; + } + } + + if (true) { + bool v = _srs_config->get_rtc_server_merge_nalus(); + if (merge_nalus != v) { + srs_trace("Reload merge_nalus %d=>%d", merge_nalus, v); + merge_nalus = v; + } + } + + return srs_success; +} + int SrsRtcSenderThread::cid() { return trd->cid(); @@ -580,28 +631,27 @@ srs_error_t SrsRtcSenderThread::cycle() continue; } - int nn_rtp_pkts = 0; - if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, &nn_rtp_pkts)) != srs_success) { + SrsRtcPackets pkts(gso, merge_nalus); + if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, pkts)) != srs_success) { srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); } - int nn = 0; for (int i = 0; i < msg_count; i++) { SrsSharedPtrMessage* msg = msgs.msgs[i]; - nn += msg->size; srs_freep(msg); } pprint->elapse(); if (pprint->can_print()) { // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. - srs_trace("-> RTC PLAY %d msgs, %d packets, %d bytes", msg_count, nn_rtp_pkts, nn); + srs_trace("-> RTC PLAY %d msgs, %d samples, %d packets, %d audios, %d videos, %d bytes", + msg_count, pkts.nn_samples, pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_videos, pkts.nn_bytes); } } } srs_error_t SrsRtcSenderThread::send_messages( - SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, int* pnn_rtp_pkts + SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets ) { srs_error_t err = srs_success; @@ -610,53 +660,53 @@ srs_error_t SrsRtcSenderThread::send_messages( } // Covert kernel messages to RTP packets. - vector packets; if ((err = messages_to_packets(source, msgs, nb_msgs, packets)) != srs_success) { - for (int j = 0; j < (int)packets.size(); j++) { - SrsRtpPacket2* packet = packets[j]; - srs_freep(packet); - } return err; } - // Send out RTP packets - *pnn_rtp_pkts += (int)packets.size(); - err = send_packets(skt, packets); + packets.nn_rtp_pkts = (int)packets.packets.size(); - for (int j = 0; j < (int)packets.size(); j++) { - SrsRtpPacket2* packet = packets[j]; - srs_freep(packet); + // Send out RTP packets + if ((err = send_packets(skt, packets)) != srs_success) { + return err; } return err; } srs_error_t SrsRtcSenderThread::messages_to_packets( - SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, vector& packets + SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets ) { srs_error_t err = srs_success; for (int i = 0; i < nb_msgs; i++) { SrsSharedPtrMessage* msg = msgs[i]; + packets.nn_bytes += msg->size; + packets.nn_samples += msg->nn_extra_payloads() + msg->nn_samples(); + SrsRtpPacket2* packet = NULL; if (msg->is_audio()) { + packets.nn_audios++; + for (int i = 0; i < msg->nn_extra_payloads(); i++) { SrsSample* sample = msg->extra_payloads() + i; if ((err = packet_opus(sample, &packet)) != srs_success) { return srs_error_wrap(err, "opus package"); } - packets.push_back(packet); + packets.packets.push_back(packet); } continue; } + packets.nn_videos++; + // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. if (msg->has_idr()) { if ((err = packet_stap_a(source, msg, &packet)) != srs_success) { return srs_error_wrap(err, "packet stap-a"); } - packets.push_back(packet); + packets.packets.push_back(packet); } for (int i = 0; i < msg->nn_samples(); i++) { @@ -677,14 +727,14 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( if (i == msg->nn_samples() - 1) { packet->rtp_header.set_marker(true); } - packets.push_back(packet); + packets.packets.push_back(packet); } else { if ((err = packet_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) { return srs_error_wrap(err, "packet fu-a"); } if (i == msg->nn_samples() - 1) { - packets.back()->rtp_header.set_marker(true); + packets.packets.back()->rtp_header.set_marker(true); } } } @@ -693,11 +743,12 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( return err; } -srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, vector& packets) +srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets) { srs_error_t err = srs_success; - for (vector::iterator it = packets.begin(); it != packets.end(); ++it) { + vector::iterator it; + for (it = packets.packets.begin(); it != packets.packets.end(); ++it) { SrsRtpPacket2* packet = *it; ISrsUdpSender* sender = skt->sender(); @@ -766,7 +817,7 @@ srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** p return err; } -srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector& packets) +srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -780,7 +831,7 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* int packet_size = srs_min(nb_left, fu_payload_size); SrsRtpPacket2* packet = new SrsRtpPacket2(); - packets.push_back(packet); + packets.packets.push_back(packet); packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); @@ -1513,8 +1564,8 @@ srs_error_t SrsUdpMuxSender::on_reload_rtc_server() { int v = _srs_config->get_rtc_server_sendmmsg(); if (max_sendmmsg != v) { + srs_trace("Reload max_sendmmsg %d=>%d", max_sendmmsg, v); max_sendmmsg = v; - srs_trace("Reload max_sendmmsg=%d", max_sendmmsg); } return srs_success; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 180872144..906765888 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -117,7 +117,24 @@ private: srs_error_t srtp_recv_init(); }; -class SrsRtcSenderThread : public ISrsCoroutineHandler +class SrsRtcPackets +{ +public: + bool is_gso; + bool should_merge_nalus; +public: + int nn_bytes; + int nn_rtp_pkts; + int nn_samples; + int nn_audios; + int nn_videos; + std::vector packets; +public: + SrsRtcPackets(bool gso, bool merge_nalus); + virtual ~SrsRtcPackets(); +}; + +class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler { protected: SrsCoroutine* trd; @@ -136,11 +153,16 @@ private: uint16_t video_sequence; public: SrsUdpMuxSocket* sendonly_ukt; + bool merge_nalus; + bool gso; public: SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid); virtual ~SrsRtcSenderThread(); public: srs_error_t initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt); +// interface ISrsReloadHandler +public: + virtual srs_error_t on_reload_rtc_server(); public: virtual int cid(); public: @@ -152,13 +174,13 @@ public: public: virtual srs_error_t cycle(); private: - srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, int* pnn_rtp_pkts); - srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, std::vector& packets); - srs_error_t send_packets(SrsUdpMuxSocket* skt, std::vector& packets); + srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); + srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); + srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); private: srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); private: - srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& packets); + srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets); srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket); srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket); };