From 89cdfe2f50f62f6b4d26029beeefb4eab743cff4 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 27 Apr 2020 13:45:50 +0800 Subject: [PATCH] Refactor RTC publisher code --- trunk/src/app/srs_app_rtc_conn.cpp | 49 ++++++++++++++++------------- trunk/src/app/srs_app_rtc_conn.hpp | 3 +- trunk/src/app/srs_app_rtp_queue.cpp | 15 +++++---- trunk/src/app/srs_app_rtp_queue.hpp | 2 +- 4 files changed, 39 insertions(+), 30 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 83cb0e384..20b003d02 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1935,9 +1935,12 @@ srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* return srs_error_wrap(err, "rtp opus demux failed"); } + // TODO: FIXME: Rename it. + // TODO: FIXME: Error check. rtp_audio_queue->insert(rtp_pkt); - if (rtp_audio_queue->get_and_clean_if_needed_rqeuest_key_frame()) { + if (rtp_audio_queue->get_and_clean_if_needed_request_key_frame()) { + // TODO: FIXME: Check error. send_rtcp_fb_pli(skt, audio_ssrc); } @@ -1946,27 +1949,6 @@ srs_error_t SrsRtcPublisher::on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* return collect_audio_frame(); } -srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt) -{ - srs_error_t err = srs_success; - - rtp_pkt->rtp_payload_header = new SrsRtpH264Header(); - - if ((err = rtp_h264_demuxer->parse(rtp_pkt)) != srs_success) { - return srs_error_wrap(err, "rtp h264 demux failed"); - } - - rtp_video_queue->insert(rtp_pkt); - - if (rtp_video_queue->get_and_clean_if_needed_rqeuest_key_frame()) { - send_rtcp_fb_pli(skt, video_ssrc); - } - - check_send_nacks(rtp_video_queue, video_ssrc, skt); - - return collect_video_frame(); -} - srs_error_t SrsRtcPublisher::collect_audio_frame() { srs_error_t err = srs_success; @@ -1980,6 +1962,8 @@ srs_error_t SrsRtcPublisher::collect_audio_frame() frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence()); } + // TODO: FIXME: Write audio frame to source. + for (size_t n = 0; n < frames[i].size(); ++n) { srs_freep(frames[i][n]); } @@ -1988,6 +1972,27 @@ srs_error_t SrsRtcPublisher::collect_audio_frame() return err; } +srs_error_t SrsRtcPublisher::on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt) +{ + srs_error_t err = srs_success; + + rtp_pkt->rtp_payload_header = new SrsRtpH264Header(); + + if ((err = rtp_h264_demuxer->parse(rtp_pkt)) != srs_success) { + return srs_error_wrap(err, "rtp h264 demux failed"); + } + + rtp_video_queue->insert(rtp_pkt); + + if (rtp_video_queue->get_and_clean_if_needed_request_key_frame()) { + send_rtcp_fb_pli(skt, video_ssrc); + } + + check_send_nacks(rtp_video_queue, video_ssrc, skt); + + return collect_video_frame(); +} + srs_error_t SrsRtcPublisher::collect_video_frame() { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 1793b49a4..793813abb 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -292,10 +292,11 @@ private: srs_error_t send_rtcp_fb_pli(SrsUdpMuxSocket* skt, uint32_t ssrc); private: srs_error_t on_audio(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt); + srs_error_t collect_audio_frame(); +private: srs_error_t on_video(SrsUdpMuxSocket* skt, SrsRtpSharedPacket* rtp_pkt); private: srs_error_t collect_video_frame(); - srs_error_t collect_audio_frame(); public: void update_sendonly_socket(SrsUdpMuxSocket* skt); // interface ISrsHourGlass diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index 942c244aa..bfed41778 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -171,16 +171,18 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) uint16_t seq = rtp_pkt->rtp_header.get_sequence(); + // TODO: FIXME: Update time for each packet, may hurt performance. srs_utime_t now = srs_update_system_time(); // First packet recv, init head_sequence and highest_sequence. - if (! initialized_) { + if (!initialized_) { initialized_ = true; head_sequence_ = seq; highest_sequence_ = seq; ++num_of_packet_received_; + // TODO: FIXME: Covert time to srs_utime_t. last_trans_time_ = now/1000 - rtp_pkt->rtp_header.get_timestamp()/90; } else { SrsRtpNackInfo* nack_info = NULL; @@ -219,7 +221,7 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) } else { // Because we don't know the ISN(initiazlie sequence number), the first packet // we received maybe no the first packet client sented. - if (! start_collected_) { + if (!start_collected_) { if (seq_cmp(seq, head_sequence_)) { srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", head_sequence_, seq); head_sequence_ = seq; @@ -267,6 +269,7 @@ srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt) delete old_pkt; } + // TODO: FIXME: Change to ptr of ptr. old_pkt = rtp_pkt->copy(); // Marker bit means the last packet of frame received. @@ -295,7 +298,7 @@ void SrsRtpQueue::get_and_clean_collected_frames(std::vectorrtp_payload_size() != 0 && ! pkt->rtp_payload_header->is_first_packet_of_frame) { + if (s == head_sequence_ && pkt->rtp_payload_size() != 0 && !pkt->rtp_payload_header->is_first_packet_of_frame) { break; } frame.push_back(pkt->copy()); if (pkt->rtp_header.get_marker() || one_packet_per_frame_) { - if (! start_collected_) { + if (!start_collected_) { start_collected_ = true; } frames_.push_back(frame); diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index f8a31d08f..4b2fb8929 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -142,7 +142,7 @@ public: srs_error_t remove(uint16_t seq); public: void get_and_clean_collected_frames(std::vector >& frames); - bool get_and_clean_if_needed_rqeuest_key_frame(); + bool get_and_clean_if_needed_request_key_frame(); void notify_drop_seq(uint16_t seq); void notify_nack_list_full(); public: