From 42ee52fa29ab580a1d8676b783c7581a9eee06f4 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 2 May 2020 10:07:55 +0800 Subject: [PATCH] Refactor code, extract nack from queue --- trunk/src/app/srs_app_rtc_conn.cpp | 20 +++++++++------- trunk/src/app/srs_app_rtc_conn.hpp | 5 +++- trunk/src/app/srs_app_rtp_queue.cpp | 36 ++++++++++------------------- trunk/src/app/srs_app_rtp_queue.hpp | 11 ++++----- 4 files changed, 32 insertions(+), 40 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 24a6645cb..63beea781 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1466,7 +1466,9 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) rtc_session = session; video_queue_ = new SrsRtpQueue(1000); + video_nack_ = new SrsRtpNackForReceiver(video_queue_, 1000 * 2 / 3); audio_queue_ = new SrsRtpQueue(100, true); + audio_nack_ = new SrsRtpNackForReceiver(video_queue_, 100 * 2 / 3); source = NULL; } @@ -1481,7 +1483,9 @@ SrsRtcPublisher::~SrsRtcPublisher() } srs_freep(report_timer); + srs_freep(video_nack_); srs_freep(video_queue_); + srs_freep(audio_nack_); srs_freep(audio_queue_); } @@ -1661,9 +1665,9 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf) ssrc, compact_ntp, lrr, dlrr, rtt); if (ssrc == video_ssrc) { - video_queue_->update_rtt(rtt); + video_nack_->update_rtt(rtt); } else if (ssrc == audio_ssrc) { - audio_queue_->update_rtt(rtt); + audio_nack_->update_rtt(rtt); } } } @@ -1672,7 +1676,7 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf) return err; } -void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc) +void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc) { // If DTLS is not OK, drop all messages. if (!rtc_session->dtls_session) { @@ -1680,7 +1684,7 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc) } vector nack_seqs; - rtp_queue->get_nack_seqs(nack_seqs); + nack->get_nack_seqs(nack_seqs); vector::iterator iter = nack_seqs.begin(); while (iter != nack_seqs.end()) { char buf[kRtpPacketSize]; @@ -1915,14 +1919,14 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt) pkt->is_key_frame = true; // TODO: FIXME: Error check. - audio_queue_->consume(pkt); + audio_queue_->consume(audio_nack_, pkt); if (audio_queue_->should_request_key_frame()) { // TODO: FIXME: Check error. send_rtcp_fb_pli(audio_ssrc); } - check_send_nacks(audio_queue_, audio_ssrc); + check_send_nacks(audio_nack_, audio_ssrc); return collect_audio_frames(); } @@ -2011,14 +2015,14 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) } // TODO: FIXME: Error check. - video_queue_->consume(pkt); + video_queue_->consume(video_nack_, pkt); if (video_queue_->should_request_key_frame()) { // TODO: FIXME: Check error. send_rtcp_fb_pli(video_ssrc); } - check_send_nacks(video_queue_, video_ssrc); + check_send_nacks(video_nack_, video_ssrc); return collect_video_frames(); } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 892f0ee37..fd54bd60b 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -57,6 +57,7 @@ class SrsRtpH264Demuxer; class SrsRtpOpusDemuxer; class SrsRtpPacket2; class ISrsCodec; +class SrsRtpNackForReceiver; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -261,7 +262,9 @@ private: uint32_t audio_ssrc; private: SrsRtpQueue* video_queue_; + SrsRtpNackForReceiver* video_nack_; SrsRtpQueue* audio_queue_; + SrsRtpNackForReceiver* audio_nack_; private: SrsRequest* req; SrsSource* source; @@ -278,7 +281,7 @@ public: srs_error_t on_rtcp_sender_report(char* buf, int nb_buf); srs_error_t on_rtcp_xr(char* buf, int nb_buf); private: - void check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc); + void check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc); srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue); srs_error_t send_rtcp_xr_rrtr(uint32_t ssrc); srs_error_t send_rtcp_fb_pli(uint32_t ssrc); diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index c7f1a0473..4b02eb6a9 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -284,7 +284,6 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) { nn_collected_frames = 0; queue_ = new SrsRtpRingBuffer(capacity); - nack_ = new SrsRtpNackForReceiver(this, capacity * 2 / 3); jitter_ = 0; last_trans_time_ = -1; @@ -303,10 +302,9 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) SrsRtpQueue::~SrsRtpQueue() { srs_freep(queue_); - srs_freep(nack_); } -srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt) +srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -314,13 +312,13 @@ srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt) srs_utime_t now = srs_update_system_time(); uint16_t seq = pkt->rtp_header.get_sequence(); - SrsRtpNackInfo* nack_info = nack_->find(seq); + SrsRtpNackInfo* nack_info = nack->find(seq); if (nack_info) { int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0; (void)nack_rtt; srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms", seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt); - nack_->remove(seq); + nack->remove(seq); } // Calc jitter time, ignore nack packets. @@ -348,14 +346,14 @@ srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt) queue_->update(seq, !nn_collected_frames, nack_low, nack_high); if (srs_rtp_seq_distance(nack_low, nack_high)) { srs_trace("update nack seq=%u, startup=%d, nack range [%u, %u]", seq, !nn_collected_frames, nack_low, nack_high); - insert_into_nack_list(nack_low, nack_high); + insert_into_nack_list(nack, nack_low, nack_high); } } // When packets overflow, collect frame and move head to next frame start. if (queue_->overflow()) { srs_verbose("try collect packet becuase seq out of range"); - collect_packet(); + collect_packet(nack); uint16_t next = queue_->next_start_of_frame(); @@ -366,7 +364,7 @@ srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt) srs_trace("seqs out of range, seq range [%u, %u]", queue_->low(), next); for (uint16_t s = queue_->low(); s != next; ++s) { - nack_->remove(s); + nack->remove(s); queue_->remove(s); } @@ -382,7 +380,7 @@ srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt) // 2. Queue has lots of packets, the load is heavy. // 3. The frame contains only one packet for each frame. if (pkt->rtp_header.get_marker() || queue_->is_heavy() || one_packet_per_frame_) { - collect_packet(); + collect_packet(nack); } return err; @@ -465,28 +463,18 @@ uint32_t SrsRtpQueue::get_interarrival_jitter() return static_cast(jitter_); } -void SrsRtpQueue::get_nack_seqs(vector& seqs) -{ - nack_->get_nack_seqs(seqs); -} - -void SrsRtpQueue::update_rtt(int rtt) -{ - nack_->update_rtt(rtt); -} - -void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end) +void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end) { for (uint16_t s = seq_start; s != seq_end; ++s) { srs_verbose("loss seq=%u, insert into nack list", s); - nack_->insert(s); + nack->insert(s); ++number_of_packet_lossed_; } - nack_->check_queue_size(); + nack->check_queue_size(); } -void SrsRtpQueue::collect_packet() +void SrsRtpQueue::collect_packet(SrsRtpNackForReceiver* nack) { while (queue_->low() != queue_->high()) { vector frame; @@ -496,7 +484,7 @@ void SrsRtpQueue::collect_packet() SrsRtpPacket2* pkt = queue_->at(s); // In NACK, never collect frame. - if (nack_->find(s) != NULL) { + if (nack->find(s) != NULL) { srs_verbose("seq=%u, found in nack list when collect frame", s); return; } diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 523bd9f0f..833107d32 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -164,7 +164,6 @@ class SrsRtpQueue private: uint64_t nn_collected_frames; SrsRtpRingBuffer* queue_; - SrsRtpNackForReceiver* nack_; bool one_packet_per_frame_; private: double jitter_; @@ -181,7 +180,8 @@ public: SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false); virtual ~SrsRtpQueue(); public: - srs_error_t consume(SrsRtpPacket2* pkt); + srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); + // TODO: FIXME: Should merge FU-A to RAW, then we can return RAW payloads. void collect_frames(std::vector >& frames); bool should_request_key_frame(); void notify_drop_seq(uint16_t seq); @@ -192,12 +192,9 @@ public: uint8_t get_fraction_lost(); uint32_t get_cumulative_number_of_packets_lost(); uint32_t get_interarrival_jitter(); -public: - void get_nack_seqs(std::vector& seqs); - void update_rtt(int rtt); private: - void insert_into_nack_list(uint16_t seq_start, uint16_t seq_end); - void collect_packet(); + void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end); + void collect_packet(SrsRtpNackForReceiver* nack); }; #endif