diff --git a/trunk/src/app/srs_app_rtc_queue.cpp b/trunk/src/app/srs_app_rtc_queue.cpp index 9f1f86cb4..3919e385d 100644 --- a/trunk/src/app/srs_app_rtc_queue.cpp +++ b/trunk/src/app/srs_app_rtc_queue.cpp @@ -34,100 +34,6 @@ using namespace std; #include #include -SrsRtpNackInfo::SrsRtpNackInfo() -{ - generate_time_ = srs_update_system_time(); - pre_req_nack_time_ = 0; - req_nack_count_ = 0; -} - -SrsRtpNackForReceiver::SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue_size) -{ - max_queue_size_ = queue_size; - rtp_ = rtp; - pre_check_time_ = 0; - - srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%" PRId64 ", nack_interval=%" PRId64, - max_queue_size_, opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval); -} - -SrsRtpNackForReceiver::~SrsRtpNackForReceiver() -{ -} - -void SrsRtpNackForReceiver::insert(uint16_t first, uint16_t last) -{ - for (uint16_t s = first; s != last; ++s) { - queue_[s] = SrsRtpNackInfo(); - } -} - -void SrsRtpNackForReceiver::remove(uint16_t seq) -{ - queue_.erase(seq); -} - -SrsRtpNackInfo* SrsRtpNackForReceiver::find(uint16_t seq) -{ - std::map::iterator iter = queue_.find(seq); - - if (iter == queue_.end()) { - return NULL; - } - - return &(iter->second); -} - -void SrsRtpNackForReceiver::check_queue_size() -{ - if (queue_.size() >= max_queue_size_) { - rtp_->notify_nack_list_full(); - } -} - -void SrsRtpNackForReceiver::get_nack_seqs(vector& seqs) -{ - srs_utime_t now = srs_update_system_time(); - srs_utime_t interval = now - pre_check_time_; - if (interval < opts_.nack_interval / 2) { - return; - } - pre_check_time_ = now; - - std::map::iterator iter = queue_.begin(); - while (iter != queue_.end()) { - const uint16_t& seq = iter->first; - SrsRtpNackInfo& nack_info = iter->second; - - int alive_time = now - nack_info.generate_time_; - if (alive_time > opts_.max_alive_time || nack_info.req_nack_count_ > opts_.max_count) { - rtp_->notify_drop_seq(seq); - queue_.erase(iter++); - continue; - } - - // TODO:Statistics unorder packet. - if (now - nack_info.generate_time_ < opts_.first_nack_interval) { - break; - } - - if (now - nack_info.pre_req_nack_time_ >= opts_.nack_interval && nack_info.req_nack_count_ <= opts_.max_count) { - ++nack_info.req_nack_count_; - nack_info.pre_req_nack_time_ = now; - seqs.push_back(seq); - } - - ++iter; - } -} - -void SrsRtpNackForReceiver::update_rtt(int rtt) -{ - rtt_ = rtt * SRS_UTIME_MILLISECONDS; - // FIXME: limit min and max value. - opts_.nack_interval = rtt_; -} - SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity) { nn_seq_flip_backs = 0; @@ -235,3 +141,97 @@ void SrsRtpRingBuffer::notify_drop_seq(uint16_t seq) { } +SrsRtpNackInfo::SrsRtpNackInfo() +{ + generate_time_ = srs_update_system_time(); + pre_req_nack_time_ = 0; + req_nack_count_ = 0; +} + +SrsRtpNackForReceiver::SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue_size) +{ + max_queue_size_ = queue_size; + rtp_ = rtp; + pre_check_time_ = 0; + + srs_info("max_queue_size=%u, nack opt: max_count=%d, max_alive_time=%us, first_nack_interval=%" PRId64 ", nack_interval=%" PRId64, + max_queue_size_, opts_.max_count, opts_.max_alive_time, opts.first_nack_interval, opts_.nack_interval); +} + +SrsRtpNackForReceiver::~SrsRtpNackForReceiver() +{ +} + +void SrsRtpNackForReceiver::insert(uint16_t first, uint16_t last) +{ + for (uint16_t s = first; s != last; ++s) { + queue_[s] = SrsRtpNackInfo(); + } +} + +void SrsRtpNackForReceiver::remove(uint16_t seq) +{ + queue_.erase(seq); +} + +SrsRtpNackInfo* SrsRtpNackForReceiver::find(uint16_t seq) +{ + std::map::iterator iter = queue_.find(seq); + + if (iter == queue_.end()) { + return NULL; + } + + return &(iter->second); +} + +void SrsRtpNackForReceiver::check_queue_size() +{ + if (queue_.size() >= max_queue_size_) { + rtp_->notify_nack_list_full(); + } +} + +void SrsRtpNackForReceiver::get_nack_seqs(vector& seqs) +{ + srs_utime_t now = srs_update_system_time(); + srs_utime_t interval = now - pre_check_time_; + if (interval < opts_.nack_interval / 2) { + return; + } + pre_check_time_ = now; + + std::map::iterator iter = queue_.begin(); + while (iter != queue_.end()) { + const uint16_t& seq = iter->first; + SrsRtpNackInfo& nack_info = iter->second; + + int alive_time = now - nack_info.generate_time_; + if (alive_time > opts_.max_alive_time || nack_info.req_nack_count_ > opts_.max_count) { + rtp_->notify_drop_seq(seq); + queue_.erase(iter++); + continue; + } + + // TODO:Statistics unorder packet. + if (now - nack_info.generate_time_ < opts_.first_nack_interval) { + break; + } + + if (now - nack_info.pre_req_nack_time_ >= opts_.nack_interval && nack_info.req_nack_count_ <= opts_.max_count) { + ++nack_info.req_nack_count_; + nack_info.pre_req_nack_time_ = now; + seqs.push_back(seq); + } + + ++iter; + } +} + +void SrsRtpNackForReceiver::update_rtt(int rtt) +{ + rtt_ = rtt * SRS_UTIME_MILLISECONDS; + // FIXME: limit min and max value. + opts_.nack_interval = rtt_; +} + diff --git a/trunk/src/app/srs_app_rtc_queue.hpp b/trunk/src/app/srs_app_rtc_queue.hpp index 436a1998b..09af76158 100644 --- a/trunk/src/app/srs_app_rtc_queue.hpp +++ b/trunk/src/app/srs_app_rtc_queue.hpp @@ -34,34 +34,6 @@ class SrsRtpPacket2; class SrsRtpQueue; class SrsRtpRingBuffer; -struct SrsNackOption -{ - SrsNackOption() - { - // Default nack option. - max_count = 10; - max_alive_time = 2 * SRS_UTIME_SECONDS; - first_nack_interval = 10 * SRS_UTIME_MILLISECONDS; - nack_interval = 400 * SRS_UTIME_MILLISECONDS; - } - int max_count; - srs_utime_t max_alive_time; - int64_t first_nack_interval; - int64_t nack_interval; -}; - -struct SrsRtpNackInfo -{ - SrsRtpNackInfo(); - - // Use to control the time of first nack req and the life of seq. - srs_utime_t generate_time_; - // Use to control nack interval. - srs_utime_t pre_req_nack_time_; - // Use to control nack times. - int req_nack_count_; -}; - // The "distance" between two uint16 number, for example: // distance(low=3, high=5) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)5) === -2 // distance(low=3, high=65534) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)65534) === 5 @@ -72,39 +44,6 @@ inline int16_t srs_rtp_seq_distance(const uint16_t& low, const uint16_t& high) return (int16_t)(high - low); } -class SrsRtpNackForReceiver -{ -private: - struct SeqComp { - bool operator()(const uint16_t& low, const uint16_t& high) const { - return srs_rtp_seq_distance(low, high) > 0; - } - }; -private: - // Nack queue, seq order, oldest to newest. - std::map queue_; - // Max nack count. - size_t max_queue_size_; - SrsRtpRingBuffer* rtp_; - SrsNackOption opts_; -private: - srs_utime_t pre_check_time_; -private: - int rtt_; -public: - SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue_size); - virtual ~SrsRtpNackForReceiver(); -public: - void insert(uint16_t first, uint16_t last); - void remove(uint16_t seq); - SrsRtpNackInfo* find(uint16_t seq); - void check_queue_size(); -public: - void get_nack_seqs(std::vector& seqs); -public: - void update_rtt(int rtt); -}; - // For UDP, the packets sequence may present as bellow: // [seq1(done)|seq2|seq3 ... seq10|seq11(lost)|seq12|seq13] // \___(head_sequence_) \ \___(highest_sequence_) @@ -160,4 +99,65 @@ public: void notify_drop_seq(uint16_t seq); }; +struct SrsNackOption +{ + SrsNackOption() + { + // Default nack option. + max_count = 10; + max_alive_time = 2 * SRS_UTIME_SECONDS; + first_nack_interval = 10 * SRS_UTIME_MILLISECONDS; + nack_interval = 400 * SRS_UTIME_MILLISECONDS; + } + int max_count; + srs_utime_t max_alive_time; + int64_t first_nack_interval; + int64_t nack_interval; +}; + +struct SrsRtpNackInfo +{ + SrsRtpNackInfo(); + + // Use to control the time of first nack req and the life of seq. + srs_utime_t generate_time_; + // Use to control nack interval. + srs_utime_t pre_req_nack_time_; + // Use to control nack times. + int req_nack_count_; +}; + +class SrsRtpNackForReceiver +{ +private: + struct SeqComp { + bool operator()(const uint16_t& low, const uint16_t& high) const { + return srs_rtp_seq_distance(low, high) > 0; + } + }; +private: + // Nack queue, seq order, oldest to newest. + std::map queue_; + // Max nack count. + size_t max_queue_size_; + SrsRtpRingBuffer* rtp_; + SrsNackOption opts_; +private: + srs_utime_t pre_check_time_; +private: + int rtt_; +public: + SrsRtpNackForReceiver(SrsRtpRingBuffer* rtp, size_t queue_size); + virtual ~SrsRtpNackForReceiver(); +public: + void insert(uint16_t first, uint16_t last); + void remove(uint16_t seq); + SrsRtpNackInfo* find(uint16_t seq); + void check_queue_size(); +public: + void get_nack_seqs(std::vector& seqs); +public: + void update_rtt(int rtt); +}; + #endif