From 8c4b6d3166a357423fba6c41272073be729228e1 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 4 May 2020 20:42:30 +0800 Subject: [PATCH] Support disable NACK in config --- trunk/conf/full.conf | 6 ++ trunk/src/app/srs_app_config.cpp | 25 ++++++- trunk/src/app/srs_app_config.hpp | 1 + trunk/src/app/srs_app_http_api.cpp | 56 ++++++++------ trunk/src/app/srs_app_http_api.hpp | 5 +- trunk/src/app/srs_app_rtc_conn.cpp | 110 +++++++++++++++++++--------- trunk/src/app/srs_app_rtc_conn.hpp | 8 ++ trunk/src/app/srs_app_rtp_queue.cpp | 69 ++++++++++++++--- trunk/src/app/srs_app_rtp_queue.hpp | 1 + 9 files changed, 212 insertions(+), 69 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 4a8ea1aa1..b10ad8bf9 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -509,6 +509,12 @@ vhost rtc.vhost.srs.com { # default: 1 (For WebRTC, min_latency off) mw_msgs 0; } + # For NACK. + nack { + # Whether support NACK. + # default: on + enabled on; + } } ############################################################################################# diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 3f740346a..9bebe11b6 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3782,7 +3782,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "play" && n != "publish" && n != "cluster" && n != "security" && n != "http_remux" && n != "dash" && n != "http_static" && n != "hds" && n != "exec" - && n != "in_ack_size" && n != "out_ack_size" && n != "rtc") { + && n != "in_ack_size" && n != "out_ack_size" && n != "rtc" && n != "nack") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.%s", n.c_str()); } // for each sub directives of vhost. @@ -5083,6 +5083,29 @@ bool SrsConfig::get_rtc_stun_strict_check(string vhost) return SRS_CONF_PERFER_FALSE(conf->arg0()); } +bool SrsConfig::get_rtc_nack_enabled(string vhost) +{ + static bool DEFAULT = true; + + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("nack"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("enabled"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost) { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 44d4ccb00..212143001 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -547,6 +547,7 @@ public: bool get_rtc_aac_discard(std::string vhost); srs_utime_t get_rtc_stun_timeout(std::string vhost); bool get_rtc_stun_strict_check(std::string vhost); + bool get_rtc_nack_enabled(std::string vhost); // vhost specified section public: diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 51db3bbec..2b5e4a3a8 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -914,11 +914,6 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe return srs_error_wrap(err, "remote sdp check failed"); } - SrsSdp local_sdp; - if ((err = exchange_sdp(app, stream_name, remote_sdp, local_sdp)) != srs_success) { - return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); - } - SrsRequest request; request.app = app; request.stream = stream_name; @@ -930,6 +925,11 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe request.vhost = parsed_vhost->arg0(); } + SrsSdp local_sdp; + if ((err = exchange_sdp(&request, remote_sdp, local_sdp)) != srs_success) { + return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); + } + // Whether enabled. bool server_enabled = _srs_config->get_rtc_server_enabled(); bool rtc_enabled = _srs_config->get_rtc_enabled(request.vhost); @@ -1006,7 +1006,7 @@ srs_error_t SrsGoApiRtcPlay::check_remote_sdp(const SrsSdp& remote_sdp) return err; } -srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp) +srs_error_t SrsGoApiRtcPlay::exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp) { srs_error_t err = srs_success; local_sdp.version_ = "0"; @@ -1021,10 +1021,12 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str local_sdp.session_name_ = "SRSPlaySession"; local_sdp.msid_semantic_ = "WMS"; - local_sdp.msids_.push_back(app + "/" + stream); + local_sdp.msids_.push_back(req->app + "/" + req->stream); local_sdp.group_policy_ = "BUNDLE"; + bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost); + for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i]; @@ -1047,8 +1049,10 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str vector rtcp_fb; payload_type.rtcp_fb_.swap(rtcp_fb); for (int j = 0; j < (int)rtcp_fb.size(); j++) { - if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { - payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); + if (nack_enabled) { + if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { + payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); + } } } @@ -1081,8 +1085,10 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(const std::string& app, const std::str vector rtcp_fb; payload_type.rtcp_fb_.swap(rtcp_fb); for (int j = 0; j < (int)rtcp_fb.size(); j++) { - if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { - payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); + if (nack_enabled) { + if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { + payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); + } } } @@ -1261,11 +1267,6 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt return srs_error_wrap(err, "remote sdp check failed"); } - SrsSdp local_sdp; - if ((err = exchange_sdp(app, stream_name, remote_sdp, local_sdp)) != srs_success) { - return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); - } - SrsRequest request; request.app = app; request.stream = stream_name; @@ -1277,6 +1278,11 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt request.vhost = parsed_vhost->arg0(); } + SrsSdp local_sdp; + if ((err = exchange_sdp(&request, remote_sdp, local_sdp)) != srs_success) { + return srs_error_wrap(err, "remote sdp have error or unsupport attributes"); + } + // Whether enabled. bool server_enabled = _srs_config->get_rtc_server_enabled(); bool rtc_enabled = _srs_config->get_rtc_enabled(request.vhost); @@ -1348,7 +1354,7 @@ srs_error_t SrsGoApiRtcPublish::check_remote_sdp(const SrsSdp& remote_sdp) return err; } -srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp) +srs_error_t SrsGoApiRtcPublish::exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp) { srs_error_t err = srs_success; local_sdp.version_ = "0"; @@ -1363,10 +1369,12 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std:: local_sdp.session_name_ = "SRSPublishSession"; local_sdp.msid_semantic_ = "WMS"; - local_sdp.msids_.push_back(app + "/" + stream); + local_sdp.msids_.push_back(req->app + "/" + req->stream); local_sdp.group_policy_ = "BUNDLE"; + bool nack_enabled = _srs_config->get_rtc_nack_enabled(req->vhost); + for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { const SrsMediaDesc& remote_media_desc = remote_sdp.media_descs_[i]; @@ -1389,8 +1397,10 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std:: vector rtcp_fb; payload_type.rtcp_fb_.swap(rtcp_fb); for (int j = 0; j < (int)rtcp_fb.size(); j++) { - if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { - payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); + if (nack_enabled) { + if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { + payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); + } } } @@ -1424,8 +1434,10 @@ srs_error_t SrsGoApiRtcPublish::exchange_sdp(const std::string& app, const std:: vector rtcp_fb; payload_type.rtcp_fb_.swap(rtcp_fb); for (int j = 0; j < (int)rtcp_fb.size(); j++) { - if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { - payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); + if (nack_enabled) { + if (rtcp_fb.at(j) == "nack" || rtcp_fb.at(j) == "nack pli") { + payload_type.rtcp_fb_.push_back(rtcp_fb.at(j)); + } } } diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index a285e2042..672f8576d 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -34,6 +34,7 @@ class SrsServer; class SrsRtcServer; class SrsJsonObject; class SrsSdp; +class SrsRequest; #include #include @@ -181,7 +182,7 @@ public: virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); private: virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res); - srs_error_t exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp); + srs_error_t exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp); srs_error_t check_remote_sdp(const SrsSdp& remote_sdp); }; @@ -198,7 +199,7 @@ public: virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); private: virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res); - srs_error_t exchange_sdp(const std::string& app, const std::string& stream, const SrsSdp& remote_sdp, SrsSdp& local_sdp); + srs_error_t exchange_sdp(SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp); srs_error_t check_remote_sdp(const SrsSdp& remote_sdp); }; diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index fef896939..e384be27f 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -620,6 +620,7 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid) video_queue_ = new SrsRtpRingBuffer(1000); nn_simulate_nack_drop = 0; + nack_enabled_ = false; _srs_config->subscribe(this); } @@ -646,8 +647,10 @@ srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assr gso = _srs_config->get_rtc_server_gso(); merge_nalus = _srs_config->get_rtc_server_merge_nalus(); max_padding = _srs_config->get_rtc_server_padding(); - srs_trace("RTC sender video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d), padding=%d", - video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus, max_padding); + // TODO: FIXME: Support reload. + nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost); + srs_trace("RTC publisher video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d), padding=%d, nack=%d", + video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus, max_padding, nack_enabled_); return err; } @@ -1007,7 +1010,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) } // Put final RTP packet to NACK/ARQ queue. - if (true) { + if (nack_enabled_) { SrsRtpPacket2* nack = new SrsRtpPacket2(); nack->rtp_header = packet->rtp_header; nack->padding = packet->padding; @@ -1040,11 +1043,7 @@ srs_error_t SrsRtcPlayer::send_packets(SrsRtcOutgoingPackets& packets) // For NACK simulator, drop packet. if (nn_simulate_nack_drop) { - SrsRtpHeader* h = &packet->rtp_header; - srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop, - h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(), - (int)iov->iov_len); - nn_simulate_nack_drop--; + simulate_drop_packet(&packet->rtp_header, (int)iov->iov_len); iov->iov_len = 0; continue; } @@ -1184,7 +1183,7 @@ srs_error_t SrsRtcPlayer::send_packets_gso(SrsRtcOutgoingPackets& packets) } // Put final RTP packet to NACK/ARQ queue. - if (true) { + if (nack_enabled_) { SrsRtpPacket2* nack = new SrsRtpPacket2(); nack->rtp_header = packet->rtp_header; nack->padding = packet->padding; @@ -1542,6 +1541,15 @@ void SrsRtcPlayer::simulate_nack_drop(int nn) nn_simulate_nack_drop = nn; } +void SrsRtcPlayer::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) +{ + srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop, + h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(), + nn_bytes); + + nn_simulate_nack_drop--; +} + SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) { report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); @@ -1554,6 +1562,7 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) source = NULL; nn_simulate_nack_drop = 0; + nack_enabled_ = false; } SrsRtcPublisher::~SrsRtcPublisher() @@ -1580,7 +1589,11 @@ srs_error_t SrsRtcPublisher::initialize(uint32_t vssrc, uint32_t assrc, SrsReque audio_ssrc = assrc; req = r; - srs_verbose("video_ssrc=%u, audio_ssrc=%u", video_ssrc, audio_ssrc); + // TODO: FIXME: Support reload. + nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost); + + srs_trace("RTC player video(ssrc=%u), audio(ssrc=%u), nack=%d", + video_ssrc, audio_ssrc, nack_enabled_); if ((err = report_timer->tick(0 * SRS_UTIME_MILLISECONDS)) != srs_success) { return srs_error_wrap(err, "hourglass tick"); @@ -1788,6 +1801,12 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssr stream.write_2bytes(pid); stream.write_2bytes(blp); + if (session_->blackhole && session_->blackhole_addr && session_->blackhole_stfd) { + // Ignore any error for black-hole. + void* p = stream.data(); int len = stream.pos(); SrsRtcSession* s = session_; + srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + char protected_buf[kRtpPacketSize]; int nb_protected_buf = stream.pos(); @@ -1970,11 +1989,7 @@ srs_error_t SrsRtcPublisher::on_rtp(char* buf, int nb_buf) // For NACK simulator, drop packet. if (nn_simulate_nack_drop) { - SrsRtpHeader* h = &pkt->rtp_header; - srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop, - h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(), - (int)nb_buf); - nn_simulate_nack_drop--; + simulate_drop_packet(&pkt->rtp_header, nb_buf); srs_freep(pkt); return err; } @@ -2016,14 +2031,23 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; - // TODO: FIXME: Error check. - audio_queue_->consume(audio_nack_, pkt); - - check_send_nacks(audio_nack_, audio_ssrc); - - // Collect all audio frames. std::vector frames; - audio_queue_->collect_frames(audio_nack_, frames); + + if (nack_enabled_) { + // TODO: FIXME: Error check. + audio_queue_->consume(audio_nack_, pkt); + + check_send_nacks(audio_nack_, audio_ssrc); + + // Collect all audio frames. + audio_queue_->collect_frames(audio_nack_, frames); + } else { + // TODO: FIXME: Error check. + audio_queue_->consume(NULL, pkt); + + // Collect all audio frames. + audio_queue_->collect_frames(NULL, frames); + } for (size_t i = 0; i < frames.size(); ++i) { SrsRtpPacket2* frame = frames[i]; @@ -2075,19 +2099,23 @@ srs_error_t SrsRtcPublisher::on_audio_frame(SrsRtpPacket2* frame) srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) { - // TODO: FIXME: Error check. - video_queue_->consume(video_nack_, pkt); - - if (video_queue_->should_request_key_frame()) { - // TODO: FIXME: Check error. - send_rtcp_fb_pli(video_ssrc); - } - - check_send_nacks(video_nack_, video_ssrc); - - // Collect video frames. std::vector frames; - video_queue_->collect_frames(video_nack_, frames); + + if (nack_enabled_) { + // TODO: FIXME: Error check. + video_queue_->consume(video_nack_, pkt); + + check_send_nacks(video_nack_, video_ssrc); + + // Collect video frames. + video_queue_->collect_frames(video_nack_, frames); + } else { + // TODO: FIXME: Error check. + video_queue_->consume(NULL, pkt); + + // Collect video frames. + video_queue_->collect_frames(NULL, frames); + } for (size_t i = 0; i < frames.size(); ++i) { SrsRtpPacket2* frame = frames[i]; @@ -2098,6 +2126,11 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) srs_freep(frame); } + if (video_queue_->should_request_key_frame()) { + // TODO: FIXME: Check error. + send_rtcp_fb_pli(video_ssrc); + } + return srs_success; } @@ -2232,6 +2265,15 @@ void SrsRtcPublisher::simulate_nack_drop(int nn) nn_simulate_nack_drop = nn; } +void SrsRtcPublisher::simulate_drop_packet(SrsRtpHeader* h, int nn_bytes) +{ + srs_warn("RTC NACK simulator #%d drop seq=%u, ssrc=%u/%s, ts=%u, %d bytes", nn_simulate_nack_drop, + h->get_sequence(), h->get_ssrc(), (h->get_ssrc()==video_ssrc? "Video":"Audio"), h->get_timestamp(), + nn_bytes); + + nn_simulate_nack_drop--; +} + SrsRtcSession::SrsRtcSession(SrsRtcServer* s) { req = NULL; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 073f0f982..959f6990c 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -227,6 +227,8 @@ private: srs_utime_t mw_sleep; int mw_msgs; bool realtime; + // Whether enabled nack. + bool nack_enabled_; public: SrsRtcPlayer(SrsRtcSession* s, int parent_cid); virtual ~SrsRtcPlayer(); @@ -260,6 +262,8 @@ private: public: void nack_fetch(std::vector& pkts, uint32_t ssrc, uint16_t seq); void simulate_nack_drop(int nn); +private: + void simulate_drop_packet(SrsRtpHeader* h, int nn_bytes); }; class SrsRtcPublisher : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler @@ -278,6 +282,8 @@ private: private: SrsRequest* req; SrsSource* source; + // Whether enabled nack. + bool nack_enabled_; // Simulators. int nn_simulate_nack_drop; private: @@ -310,6 +316,8 @@ public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); public: void simulate_nack_drop(int nn); +private: + void simulate_drop_packet(SrsRtpHeader* h, int nn_bytes); }; class SrsRtcSession diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index 6cc77bbc0..9dc542f10 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -58,8 +58,7 @@ SrsRtpNackForReceiver::~SrsRtpNackForReceiver() void SrsRtpNackForReceiver::insert(uint16_t seq) { // FIXME: full, drop packet, and request key frame. - SrsRtpNackInfo& nack_info = queue_[seq]; - (void)nack_info; + queue_[seq] = SrsRtpNackInfo(); } void SrsRtpNackForReceiver::remove(uint16_t seq) @@ -88,12 +87,12 @@ void SrsRtpNackForReceiver::check_queue_size() void SrsRtpNackForReceiver::get_nack_seqs(vector& seqs) { srs_utime_t now = srs_update_system_time(); - int interval = now - pre_check_time_; + srs_utime_t interval = now - pre_check_time_; if (interval < opts_.nack_interval / 2) { return; } - pre_check_time_ = now; + std::map::iterator iter = queue_.begin(); while (iter != queue_.end()) { const uint16_t& seq = iter->first; @@ -205,8 +204,8 @@ void SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack // Normal sequence, seq follows high_. if (srs_rtp_seq_distance(end, seq) >= 0) { - nack_first = end + 1; - nack_last = seq + 1; + nack_first = end; + nack_last = seq; // When distance(seq,high_)>0 and seq0 and 1<65535. @@ -260,7 +259,14 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt srs_utime_t now = srs_update_system_time(); uint16_t seq = pkt->rtp_header.get_sequence(); - SrsRtpNackInfo* nack_info = nack->find(seq); + + SrsRtpNackInfo* nack_info = NULL; + + // If no NACK, disable nack. + if (nack) { + nack_info = nack->find(seq); + } + if (nack_info) { int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0; (void)nack_rtt; @@ -289,7 +295,7 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt ++num_of_packet_received_; uint16_t nack_first = 0, nack_last = 0; queue_->update(seq, nack_first, nack_last); - if (srs_rtp_seq_distance(nack_first, nack_last) > 0) { + if (nack && srs_rtp_seq_distance(nack_first, nack_last) > 0) { srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last); insert_into_nack_list(nack, nack_first, nack_last); } @@ -332,6 +338,10 @@ uint32_t SrsRtpQueue::get_interarrival_jitter() void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last) { + if (!nack) { + return; + } + for (uint16_t s = first; s != last; ++s) { nack->insert(s); ++number_of_packet_lossed_; @@ -365,6 +375,11 @@ void SrsRtpAudioQueue::notify_nack_list_full() queue_->advance_to(queue_->end); } +srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) +{ + return SrsRtpQueue::consume(nack, pkt); +} + void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector& frames) { // When done, next point to the next available packet. @@ -372,6 +387,16 @@ void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vectorend; ++next) { SrsRtpPacket2* pkt = queue_->at(next); + // If nack disabled, we ignore any empty packet. + if (!nack) { + if (!pkt) { + continue; + } + + frames.push_back(pkt); + continue; + } + // TODO: FIXME: Should not wait for NACK packets. // Not found or in NACK, stop collecting frame. if (!pkt || nack->find(next) != NULL) { @@ -391,6 +416,7 @@ void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vectoroverflow()) { queue_->advance_to(queue_->end); } @@ -497,7 +523,9 @@ void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack) srs_trace("on overflow, remove range [%u, %u, %u]", queue_->begin, next, queue_->end); for (uint16_t s = queue_->begin; s != next; ++s) { - nack->remove(s); + if (nack) { + nack->remove(s); + } queue_->remove(s); } @@ -515,6 +543,27 @@ void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2* for (; next != queue_->end; ++next) { SrsRtpPacket2* pkt = queue_->at(next); + // TODO: FIXME: We should skip whole packet. + // If nack disabled, we ignore any empty packet. + if (!nack) { + if (!pkt) { + continue; + } + + if (frame.empty() && !pkt->video_is_first_packet) { + continue; + } + + frame.push_back(pkt); + + if (pkt->rtp_header.get_marker() || pkt->video_is_last_packet) { + found = true; + next++; + break; + } + continue; + } + // TODO: FIXME: Should not wait for NACK packets. // Not found or in NACK, stop collecting frame. if (!pkt || nack->find(next) != NULL) { @@ -523,7 +572,7 @@ void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2* } // Ignore when the first packet not the start. - if (next == queue_->begin && !pkt->video_is_first_packet) { + if (frame.empty() && !pkt->video_is_first_packet) { return; } diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 7b3d496e9..46fbbdb6a 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -192,6 +192,7 @@ public: public: virtual void notify_drop_seq(uint16_t seq); virtual void notify_nack_list_full(); + virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector& frames); };