From fa21df7bb891b77d6d094a732760a84480297e3b Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 13 Apr 2020 13:58:34 +0800 Subject: [PATCH] Refactor RTC package RTP packets. --- trunk/src/app/srs_app_rtc.hpp | 4 +- trunk/src/app/srs_app_rtc_conn.cpp | 145 +++++++++++++---------------- trunk/src/app/srs_app_rtc_conn.hpp | 8 +- 3 files changed, 72 insertions(+), 85 deletions(-) diff --git a/trunk/src/app/srs_app_rtc.hpp b/trunk/src/app/srs_app_rtc.hpp index 9bd794b62..d232ca31f 100644 --- a/trunk/src/app/srs_app_rtc.hpp +++ b/trunk/src/app/srs_app_rtc.hpp @@ -39,9 +39,7 @@ class SrsOriginHub; class SrsAudioRecode; class SrsBuffer; -// Rtp packet max payload size, not include rtp header. -// Must left some bytes to payload header, rtp header, udp header, ip header. -const int kRtpMaxPayloadSize = 1200; +// The RTP packet max size, should never exceed this size. const int kRtpPacketSize = 1500; // Payload type will rewrite in srs_app_rtc_conn.cpp when send to client. diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 757d6b8a7..cd0e8b0b3 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -519,6 +519,14 @@ void SrsRtcSenderThread::stop_loop() trd->interrupt(); } +void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* skt) +{ + srs_trace("session %s address changed, update %s -> %s", + rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), skt->get_peer_id().c_str()); + + srs_freep(sendonly_ukt); + sendonly_ukt = skt->copy_sendonly(); +} srs_error_t SrsRtcSenderThread::cycle() { @@ -598,15 +606,6 @@ srs_error_t SrsRtcSenderThread::cycle() } } -void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* skt) -{ - srs_trace("session %s address changed, update %s -> %s", - rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), skt->get_peer_id().c_str()); - - srs_freep(sendonly_ukt); - sendonly_ukt = skt->copy_sendonly(); -} - srs_error_t SrsRtcSenderThread::send_messages( SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts @@ -617,89 +616,79 @@ srs_error_t SrsRtcSenderThread::send_messages( return err; } + // Covert kernel messages to RTP packets. + vector packets; + for (int i = 0; i < nb_msgs; i++) { SrsSharedPtrMessage* msg = msgs[i]; - bool is_video = msg->is_video(); - bool is_audio = msg->is_audio(); *pnn += msg->size; - if (is_audio) { + SrsRtpPacket2* packet = NULL; + if (msg->is_audio()) { for (int i = 0; i < msg->nn_extra_payloads(); i++) { SrsSample* sample = msg->extra_payloads() + i; - - SrsRtpPacket2* packet = NULL; if ((err = packet_opus(sample, &packet)) != srs_success) { return srs_error_wrap(err, "opus package"); } - - err = send_message2(msg, is_video, is_audio, packet, skt); - srs_freep(packet); - if (err != srs_success) { - return srs_error_wrap(err, "send message"); - } - - *pnn_rtp_pkts += 1; - } - } else { - // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. - if (msg->has_idr()) { - SrsRtpPacket2* packet = NULL; - if ((err = packet_stap_a(source, msg, &packet)) != srs_success) { - return srs_error_wrap(err, "packet stap-a"); - } - - err = send_message2(msg, is_video, is_audio, packet, skt); - srs_freep(packet); - if (err != srs_success) { - return srs_error_wrap(err, "send message"); - } - - *pnn_rtp_pkts += 1; + packets.push_back(packet); } - vector packets; - for (int i = 0; i < msg->nn_samples(); i++) { - SrsSample* sample = msg->samples() + i; - - // We always ignore bframe here, if config to discard bframe, - // the bframe flag will not be set. - if (sample->bframe) { - continue; - } - - if (sample->size <= kRtpMaxPayloadSize) { - SrsRtpPacket2* packet = NULL; - if ((err = packet_single_nalu(msg, sample, &packet)) != srs_success) { - return srs_error_wrap(err, "packet single nalu"); - } - packets.push_back(packet); - } else { - if ((err = packet_fu_a(msg, sample, packets)) != srs_success) { - return srs_error_wrap(err, "packet fu-a"); - } - } - } - - if (!packets.empty()) { - packets.back()->rtp_header.set_marker(true); - } - - for (int j = 0; j < (int)packets.size(); j++) { - SrsRtpPacket2* packet = packets[j]; - err = send_message2(msg, is_video, is_audio, packet, skt); - srs_freep(packet); - if (err != srs_success) { - return srs_error_wrap(err, "send message"); - } - } - *pnn_rtp_pkts += (int)packets.size(); + continue; } + + // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. + if (msg->has_idr()) { + if ((err = packet_stap_a(source, msg, &packet)) != srs_success) { + return srs_error_wrap(err, "packet stap-a"); + } + packets.push_back(packet); + } + + for (int i = 0; i < msg->nn_samples(); i++) { + SrsSample* sample = msg->samples() + i; + + // We always ignore bframe here, if config to discard bframe, + // the bframe flag will not be set. + if (sample->bframe) { + continue; + } + + const int kRtpMaxPayloadSize = 1200; + if (sample->size <= kRtpMaxPayloadSize) { + if ((err = packet_single_nalu(msg, sample, &packet)) != srs_success) { + return srs_error_wrap(err, "packet single nalu"); + } + + if (i == msg->nn_samples() - 1) { + packet->rtp_header.set_marker(true); + } + packets.push_back(packet); + } else { + if ((err = packet_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) { + return srs_error_wrap(err, "packet fu-a"); + } + + if (i == msg->nn_samples() - 1) { + packets.back()->rtp_header.set_marker(true); + } + } + } + } + + *pnn_rtp_pkts += (int)packets.size(); + + for (int j = 0; j < (int)packets.size(); j++) { + SrsRtpPacket2* packet = packets[j]; + if ((err = send_packet(packet, skt)) != srs_success) { + srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); + } + srs_freep(packet); } return err; } -srs_error_t SrsRtcSenderThread::send_message2(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSenderThread::send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; @@ -766,7 +755,7 @@ srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** p return err; } -srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, vector& packets) +srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector& packets) { srs_error_t err = srs_success; @@ -775,9 +764,9 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* uint8_t header = sample->bytes[0]; uint8_t nal_type = header & kNalTypeMask; - int num_of_packet = 1 + (sample->size - 1) / kRtpMaxPayloadSize; + int num_of_packet = 1 + (sample->size - 1) / fu_payload_size; for (int i = 0; i < num_of_packet; ++i) { - int packet_size = srs_min(nb_left, kRtpMaxPayloadSize); + int packet_size = srs_min(nb_left, fu_payload_size); SrsRtpPacket2* packet = new SrsRtpPacket2(); packets.push_back(packet); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 1ecfd5998..3fef015c6 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -147,17 +147,17 @@ public: virtual srs_error_t start(); virtual void stop(); virtual void stop_loop(); -public: - virtual srs_error_t cycle(); public: void update_sendonly_socket(SrsUdpMuxSocket* skt); +public: + virtual srs_error_t cycle(); private: srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts); - srs_error_t send_message2(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt); + srs_error_t send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt); private: srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); private: - srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector& packets); + srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& packets); srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket); srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket); };