diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 51c5f7ed4..e41c4b572 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -595,7 +595,7 @@ SrsRtpPacket2* SrsRtcPackets::at(int index) return cache + index; } -SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, int parent_cid) +SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid) { _parent_cid = parent_cid; trd = new SrsDummyCoroutine(); @@ -618,14 +618,14 @@ SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, int parent_cid) _srs_config->subscribe(this); } -SrsRtcSenderThread::~SrsRtcSenderThread() +SrsRtcPlayer::~SrsRtcPlayer() { _srs_config->unsubscribe(this); srs_freep(trd); } -srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt) +srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt) { srs_error_t err = srs_success; @@ -644,7 +644,7 @@ srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t return err; } -srs_error_t SrsRtcSenderThread::on_reload_rtc_server() +srs_error_t SrsRtcPlayer::on_reload_rtc_server() { gso = _srs_config->get_rtc_server_gso(); merge_nalus = _srs_config->get_rtc_server_merge_nalus(); @@ -655,7 +655,7 @@ srs_error_t SrsRtcSenderThread::on_reload_rtc_server() return srs_success; } -srs_error_t SrsRtcSenderThread::on_reload_vhost_play(string vhost) +srs_error_t SrsRtcPlayer::on_reload_vhost_play(string vhost) { SrsRequest* req = rtc_session->req; @@ -672,17 +672,17 @@ srs_error_t SrsRtcSenderThread::on_reload_vhost_play(string vhost) return srs_success; } -srs_error_t SrsRtcSenderThread::on_reload_vhost_realtime(string vhost) +srs_error_t SrsRtcPlayer::on_reload_vhost_realtime(string vhost) { return on_reload_vhost_play(vhost); } -int SrsRtcSenderThread::cid() +int SrsRtcPlayer::cid() { return trd->cid(); } -srs_error_t SrsRtcSenderThread::start() +srs_error_t SrsRtcPlayer::start() { srs_error_t err = srs_success; @@ -696,17 +696,17 @@ srs_error_t SrsRtcSenderThread::start() return err; } -void SrsRtcSenderThread::stop() +void SrsRtcPlayer::stop() { trd->stop(); } -void SrsRtcSenderThread::stop_loop() +void SrsRtcPlayer::stop_loop() { trd->interrupt(); } -srs_error_t SrsRtcSenderThread::cycle() +srs_error_t SrsRtcPlayer::cycle() { srs_error_t err = srs_success; @@ -833,7 +833,7 @@ srs_error_t SrsRtcSenderThread::cycle() } } -srs_error_t SrsRtcSenderThread::send_messages( +srs_error_t SrsRtcPlayer::send_messages( SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets ) { srs_error_t err = srs_success; @@ -866,7 +866,7 @@ srs_error_t SrsRtcSenderThread::send_messages( return err; } -srs_error_t SrsRtcSenderThread::messages_to_packets( +srs_error_t SrsRtcPlayer::messages_to_packets( SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets ) { srs_error_t err = srs_success; @@ -951,7 +951,7 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( return err; } -srs_error_t SrsRtcSenderThread::send_packets(SrsRtcPackets& packets) +srs_error_t SrsRtcPlayer::send_packets(SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -1019,7 +1019,7 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsRtcPackets& packets) } // TODO: FIXME: We can gather and pad audios, because they have similar size. -srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) +srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -1222,7 +1222,7 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) return err; } -srs_error_t SrsRtcSenderThread::package_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets) +srs_error_t SrsRtcPlayer::package_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -1312,7 +1312,7 @@ srs_error_t SrsRtcSenderThread::package_nalus(SrsSharedPtrMessage* msg, SrsRtcPa return err; } -srs_error_t SrsRtcSenderThread::package_opus(SrsSample* sample, SrsRtcPackets& packets, int nn_max_payload) +srs_error_t SrsRtcPlayer::package_opus(SrsSample* sample, SrsRtcPackets& packets, int nn_max_payload) { srs_error_t err = srs_success; @@ -1348,7 +1348,7 @@ srs_error_t SrsRtcSenderThread::package_opus(SrsSample* sample, SrsRtcPackets& p return err; } -srs_error_t SrsRtcSenderThread::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets) +srs_error_t SrsRtcPlayer::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -1389,7 +1389,7 @@ srs_error_t SrsRtcSenderThread::package_fu_a(SrsSharedPtrMessage* msg, SrsSample } // Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 -srs_error_t SrsRtcSenderThread::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets) +srs_error_t SrsRtcPlayer::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -1409,7 +1409,7 @@ srs_error_t SrsRtcSenderThread::package_single_nalu(SrsSharedPtrMessage* msg, Sr return err; } -srs_error_t SrsRtcSenderThread::package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets) +srs_error_t SrsRtcPlayer::package_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -1937,7 +1937,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frames() srs_error_t err = srs_success; std::vector > frames; - audio_queue_->collect_frames(frames); + audio_queue_->collect_frames(audio_nack_, frames); for (size_t i = 0; i < frames.size(); ++i) { vector& packets = frames[i]; @@ -2031,7 +2031,7 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) srs_error_t SrsRtcPublisher::collect_video_frames() { std::vector > frames; - video_queue_->collect_frames(frames); + video_queue_->collect_frames(video_nack_, frames); for (size_t i = 0; i < frames.size(); ++i) { vector& packets = frames[i]; @@ -2252,7 +2252,7 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s) source_ = NULL; publisher = NULL; - sender = NULL; + player = NULL; sendonly_skt = NULL; rtc_server = s; dtls_session = new SrsDtlsSession(this); @@ -2268,7 +2268,7 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s) SrsRtcSession::~SrsRtcSession() { - srs_freep(sender); + srs_freep(player); srs_freep(publisher); srs_freep(dtls_session); srs_freep(req); @@ -2551,8 +2551,8 @@ srs_error_t SrsRtcSession::start_play() { srs_error_t err = srs_success; - srs_freep(sender); - sender = new SrsRtcSenderThread(this, _srs_context->get_id()); + srs_freep(player); + player = new SrsRtcPlayer(this, _srs_context->get_id()); uint32_t video_ssrc = 0; uint32_t audio_ssrc = 0; @@ -2569,12 +2569,12 @@ srs_error_t SrsRtcSession::start_play() } } - if ((err = sender->initialize(video_ssrc, audio_ssrc, video_payload_type, audio_payload_type)) != srs_success) { - return srs_error_wrap(err, "SrsRtcSenderThread init"); + if ((err = player->initialize(video_ssrc, audio_ssrc, video_payload_type, audio_payload_type)) != srs_success) { + return srs_error_wrap(err, "SrsRtcPlayer init"); } - if ((err = sender->start()) != srs_success) { - return srs_error_wrap(err, "start SrsRtcSenderThread"); + if ((err = player->start()) != srs_success) { + return srs_error_wrap(err, "start SrsRtcPlayer"); } return err; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 35c01b3a6..5522c986f 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -194,8 +194,7 @@ public: SrsRtpPacket2* at(int index); }; -// TODO: FIXME: Rename to RTC player or subscriber. -class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler +class SrsRtcPlayer : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler { protected: SrsCoroutine* trd; @@ -221,8 +220,8 @@ private: int mw_msgs; bool realtime; public: - SrsRtcSenderThread(SrsRtcSession* s, int parent_cid); - virtual ~SrsRtcSenderThread(); + SrsRtcPlayer(SrsRtcSession* s, int parent_cid); + virtual ~SrsRtcPlayer(); public: srs_error_t initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt); // interface ISrsReloadHandler @@ -303,13 +302,13 @@ public: class SrsRtcSession { friend class SrsDtlsSession; - friend class SrsRtcSenderThread; + friend class SrsRtcPlayer; friend class SrsRtcPublisher; private: SrsRtcServer* rtc_server; SrsRtcSessionStateType session_state; SrsDtlsSession* dtls_session; - SrsRtcSenderThread* sender; + SrsRtcPlayer* player; SrsRtcPublisher* publisher; bool is_publisher_; private: diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index b685cebce..a3831949f 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -187,11 +187,6 @@ bool SrsRtpRingBuffer::overflow() return high_ - low_ >= capacity_; } -bool SrsRtpRingBuffer::is_heavy() -{ - return high_ - low_ >= capacity_ / 2; -} - uint16_t SrsRtpRingBuffer::next_start_of_frame() { if (low_ == high_) { @@ -269,7 +264,7 @@ SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq) return queue_[seq % capacity_]; } -SrsRtpQueue::SrsRtpQueue(const char* tag, int capacity) +SrsRtpQueue::SrsRtpQueue(int capacity) { nn_collected_frames = 0; queue_ = new SrsRtpRingBuffer(capacity); @@ -282,8 +277,6 @@ SrsRtpQueue::SrsRtpQueue(const char* tag, int capacity) num_of_packet_received_ = 0; number_of_packet_lossed_ = 0; - - tag_ = tag; } SrsRtpQueue::~SrsRtpQueue() @@ -329,27 +322,17 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt uint16_t nack_low = 0, nack_high = 0; queue_->update(seq, !nn_collected_frames, nack_low, nack_high); if (srs_rtp_seq_distance(nack_low, nack_high)) { - srs_trace("%s update nack seq=%u, startup=%d, range [%u, %u]", tag_, seq, !nn_collected_frames, nack_low, nack_high); + srs_trace("update nack seq=%u, startup=%d, range [%u, %u]", seq, !nn_collected_frames, nack_low, nack_high); insert_into_nack_list(nack, nack_low, nack_high); } } - // When packets overflow, collect frame and move head to next frame start. - if (queue_->overflow()) { - on_overflow(nack); - } - // Save packet at the position seq. queue_->set(seq, pkt); return err; } -void SrsRtpQueue::collect_frames(std::vector >& frames) -{ - frames.swap(frames_); -} - void SrsRtpQueue::notify_drop_seq(uint16_t seq) { uint16_t next = queue_->next_start_of_frame(); @@ -360,7 +343,7 @@ void SrsRtpQueue::notify_drop_seq(uint16_t seq) } // When NACK is timeout, move to the next start of frame. - srs_trace("%s nack drop seq=%u, drop range [%u, %u]", tag_, seq, queue_->low(), next + 1); + srs_trace("nack drop seq=%u, drop range [%u, %u]", seq, queue_->low(), next + 1); queue_->advance_to(next + 1); } @@ -374,7 +357,7 @@ void SrsRtpQueue::notify_nack_list_full() } // When NACK is overflow, move to the next keyframe. - srs_trace("%s nack overflow drop range [%u, %u]", tag_, queue_->low(), next + 1); + srs_trace("nack overflow drop range [%u, %u]", queue_->low(), next + 1); queue_->advance_to(next + 1); } @@ -417,7 +400,7 @@ void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t se nack->check_queue_size(); } -SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) : SrsRtpQueue("audio", capacity) +SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) : SrsRtpQueue(capacity) { } @@ -425,27 +408,21 @@ SrsRtpAudioQueue::~SrsRtpAudioQueue() { } -srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) +void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector >& frames) { - srs_error_t err = srs_success; + collect_packet(frames, nack); - if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) { - return srs_error_wrap(err, "audio queue"); + if (queue_->overflow()) { + on_overflow(nack); } - - // For audio, always try to collect frame, because each packet is a frame. - collect_packet(nack); - - return err; } void SrsRtpAudioQueue::on_overflow(SrsRtpNackForReceiver* nack) { - collect_packet(nack); queue_->advance_to(queue_->high()); } -void SrsRtpAudioQueue::collect_packet(SrsRtpNackForReceiver* nack) +void SrsRtpAudioQueue::collect_packet(vector >& frames, SrsRtpNackForReceiver* nack) { // When done, s point to the next available packet. uint16_t next = queue_->low(); @@ -455,7 +432,7 @@ void SrsRtpAudioQueue::collect_packet(SrsRtpNackForReceiver* nack) // Not found or in NACK, stop collecting frame. if (!pkt || nack->find(next) != NULL) { - srs_trace("%s wait for nack seq=%u", tag_, next); + srs_trace("wait for nack seq=%u", next); break; } @@ -465,19 +442,19 @@ void SrsRtpAudioQueue::collect_packet(SrsRtpNackForReceiver* nack) // Done, we got the last packet of frame. nn_collected_frames++; - frames_.push_back(frame); + frames.push_back(frame); } if (queue_->low() != next) { // Reset the range of packets to NULL in buffer. queue_->reset(queue_->low(), next); - srs_verbose("%s collect on frame, update head seq=%u t %u", tag_, queue_->low(), next); + srs_verbose("collect on frame, update head seq=%u t %u", queue_->low(), next); queue_->advance_to(next); } } -SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity) : SrsRtpQueue("video", capacity) +SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity) : SrsRtpQueue(capacity) { request_key_frame_ = false; } @@ -486,23 +463,13 @@ SrsRtpVideoQueue::~SrsRtpVideoQueue() { } -srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) +void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector >& frames) { - srs_error_t err = srs_success; + collect_packet(frames, nack); - if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) { - return srs_error_wrap(err, "video queue"); + if (queue_->overflow()) { + on_overflow(nack); } - - // Collect packets to frame when: - // 1. Marker bit means the last packet of frame received. - // 2. Queue has lots of packets, the load is heavy. - // TODO: FIMXE: For real-time, we should collect each frame ASAP. - if (pkt->rtp_header.get_marker() || queue_->is_heavy()) { - collect_packet(nack); - } - - return err; } bool SrsRtpVideoQueue::should_request_key_frame() @@ -522,26 +489,24 @@ void SrsRtpVideoQueue::request_keyframe() void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack) { - collect_packet(nack); - uint16_t next = queue_->next_start_of_frame(); // Note that low_ mean not found, clear queue util one packet. if (next == queue_->low()) { next = queue_->high() - 1; } - srs_trace("%s seq out of range [%u, %u]", tag_, queue_->low(), next); + srs_trace("seq out of range [%u, %u]", queue_->low(), next); for (uint16_t s = queue_->low(); s != next; ++s) { nack->remove(s); queue_->remove(s); } - srs_trace("%s force update seq %u to %u", tag_, queue_->low(), next + 1); + srs_trace("force update seq %u to %u", queue_->low(), next + 1); queue_->advance_to(next + 1); } -void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack) +void SrsRtpVideoQueue::collect_packet(vector >& frames, SrsRtpNackForReceiver* nack) { while (queue_->low() != queue_->high()) { vector frame; @@ -553,7 +518,7 @@ void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack) } nn_collected_frames++; - frames_.push_back(frame); + frames.push_back(frame); } } @@ -569,7 +534,7 @@ void SrsRtpVideoQueue::do_collect_packet(SrsRtpNackForReceiver* nack, vectorfind(next) != NULL) { - srs_trace("%s wait for nack seq=%u", tag_, next); + srs_trace("wait for nack seq=%u", next); break; } @@ -599,7 +564,7 @@ void SrsRtpVideoQueue::do_collect_packet(SrsRtpNackForReceiver* nack, vectorreset(queue_->low(), next); - srs_verbose("%s collect on frame, update head seq=%u t %u", tag_, queue_->low(), next); + srs_verbose("collect on frame, update head seq=%u t %u", queue_->low(), next); queue_->advance_to(next); } } diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 07472d6de..a605df185 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -143,7 +143,6 @@ public: void reset(uint16_t low, uint16_t high); // Whether queue overflow or heavy(too many packets and need clear). bool overflow(); - bool is_heavy(); // For video, get the next start packet of frame. // @remark If not found, return the low_, which should never be the "next" one, // because it MAY or NOT current start packet of frame but never be the next. @@ -172,15 +171,11 @@ private: protected: SrsRtpRingBuffer* queue_; uint64_t nn_collected_frames; - std::vector > frames_; - const char* tag_; public: - SrsRtpQueue(const char* tag, int capacity); + SrsRtpQueue(int capacity); virtual ~SrsRtpQueue(); public: virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); - // TODO: FIXME: Should merge FU-A to RAW, then we can return RAW payloads. - void collect_frames(std::vector >& frames); void notify_drop_seq(uint16_t seq); void notify_nack_list_full(); public: @@ -190,8 +185,6 @@ public: uint32_t get_interarrival_jitter(); private: void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end); -protected: - virtual void on_overflow(SrsRtpNackForReceiver* nack) = 0; }; class SrsRtpAudioQueue : public SrsRtpQueue @@ -200,11 +193,10 @@ public: SrsRtpAudioQueue(int capacity); virtual ~SrsRtpAudioQueue(); public: - virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); + virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector >& frames); private: virtual void on_overflow(SrsRtpNackForReceiver* nack); -protected: - virtual void collect_packet(SrsRtpNackForReceiver* nack); + virtual void collect_packet(std::vector >& frames, SrsRtpNackForReceiver* nack); }; class SrsRtpVideoQueue : public SrsRtpQueue @@ -215,13 +207,12 @@ public: SrsRtpVideoQueue(int capacity); virtual ~SrsRtpVideoQueue(); public: - virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); + virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector >& frames); bool should_request_key_frame(); void request_keyframe(); -protected: - virtual void on_overflow(SrsRtpNackForReceiver* nack); private: - virtual void collect_packet(SrsRtpNackForReceiver* nack); + virtual void on_overflow(SrsRtpNackForReceiver* nack); + virtual void collect_packet(std::vector >& frames, SrsRtpNackForReceiver* nack); virtual void do_collect_packet(SrsRtpNackForReceiver* nack, std::vector& frame); };