1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

Refine RTC publisher, no cache. 4.0.25

This commit is contained in:
winlin 2020-05-03 19:09:48 +08:00
parent 128fc9d8c7
commit 365e6bb45a
3 changed files with 103 additions and 113 deletions

View file

@ -131,7 +131,7 @@ void SrsRtpNackForReceiver::update_rtt(int rtt)
SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity) SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity)
{ {
nn_seq_flip_backs = 0; nn_seq_flip_backs = 0;
high_ = low_ = 0; begin = end = 0;
capacity_ = (uint16_t)capacity; capacity_ = (uint16_t)capacity;
initialized_ = false; initialized_ = false;
@ -144,19 +144,21 @@ SrsRtpRingBuffer::~SrsRtpRingBuffer()
srs_freepa(queue_); srs_freepa(queue_);
} }
uint16_t SrsRtpRingBuffer::low() bool SrsRtpRingBuffer::empty()
{ {
return low_; return begin == end;
} }
uint16_t SrsRtpRingBuffer::high() int SrsRtpRingBuffer::size()
{ {
return high_; int size = srs_rtp_seq_distance(begin, end);
srs_assert(size >= 0);
return size;
} }
void SrsRtpRingBuffer::advance_to(uint16_t seq) void SrsRtpRingBuffer::advance_to(uint16_t seq)
{ {
low_ = seq; begin = seq;
} }
void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt) void SrsRtpRingBuffer::set(uint16_t at, SrsRtpPacket2* pkt)
@ -175,52 +177,54 @@ void SrsRtpRingBuffer::remove(uint16_t at)
set(at, NULL); set(at, NULL);
} }
void SrsRtpRingBuffer::reset(uint16_t low, uint16_t high) void SrsRtpRingBuffer::reset(uint16_t first, uint16_t last)
{ {
for (uint16_t s = low; s != high; ++s) { for (uint16_t s = first; s != last; ++s) {
queue_[s % capacity_] = NULL; queue_[s % capacity_] = NULL;
} }
} }
bool SrsRtpRingBuffer::overflow() bool SrsRtpRingBuffer::overflow()
{ {
return high_ - low_ >= capacity_; return srs_rtp_seq_distance(begin, end) >= capacity_;
} }
uint32_t SrsRtpRingBuffer::get_extended_highest_sequence() uint32_t SrsRtpRingBuffer::get_extended_highest_sequence()
{ {
return nn_seq_flip_backs * 65536 + high_; return nn_seq_flip_backs * 65536 + end - 1;
} }
void SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_low, uint16_t& nack_high) void SrsRtpRingBuffer::update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last)
{ {
if (!initialized_) { if (!initialized_) {
initialized_ = true; initialized_ = true;
low_ = high_ = seq; begin = seq;
end = seq + 1;
return; return;
} }
// Normal sequence, seq follows high_. // Normal sequence, seq follows high_.
if (srs_rtp_seq_distance(high_, seq)) { if (srs_rtp_seq_distance(end, seq) >= 0) {
nack_low = high_ + 1; nack_first = end + 1;
nack_high = seq; nack_last = seq + 1;
// When distance(seq,high_)>0 and seq<high_, seq must flip back, // When distance(seq,high_)>0 and seq<high_, seq must flip back,
// for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535. // for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535.
if (seq < high_) { // TODO: FIXME: The first flip may be dropped.
if (seq < end) {
++nn_seq_flip_backs; ++nn_seq_flip_backs;
} }
high_ = seq; end = seq + 1;
return; return;
} }
// Out-of-order sequence, seq before low_. // Out-of-order sequence, seq before low_.
if (srs_rtp_seq_distance(seq, low_)) { if (srs_rtp_seq_distance(seq, begin) > 0) {
// When startup, we may receive packets in chaos order. // When startup, we may receive packets in chaos order.
// Because we don't know the ISN(initiazlie sequence number), the first packet // Because we don't know the ISN(initiazlie sequence number), the first packet
// we received maybe no the first packet client sent. // we received maybe no the first packet client sent.
// @remark We only log a warning, because it seems ok for publisher. // @remark We only log a warning, because it seems ok for publisher.
srs_warn("too old seq %u, range [%u, %u]", seq, low_, high_); srs_warn("too old seq %u, range [%u, %u]", seq, begin, end);
} }
} }
@ -283,11 +287,11 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
// OK, we got one new RTP packet, which is not in NACK. // OK, we got one new RTP packet, which is not in NACK.
if (!nack_info) { if (!nack_info) {
++num_of_packet_received_; ++num_of_packet_received_;
uint16_t nack_low = 0, nack_high = 0; uint16_t nack_first = 0, nack_last = 0;
queue_->update(seq, nack_low, nack_high); queue_->update(seq, nack_first, nack_last);
if (srs_rtp_seq_distance(nack_low, nack_high)) { if (srs_rtp_seq_distance(nack_first, nack_last) > 0) {
srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_low, nack_high); srs_trace("update seq=%u, nack range [%u, %u]", seq, nack_first, nack_last);
insert_into_nack_list(nack, nack_low, nack_high); insert_into_nack_list(nack, nack_first, nack_last);
} }
} }
@ -326,9 +330,9 @@ uint32_t SrsRtpQueue::get_interarrival_jitter()
return static_cast<uint32_t>(jitter_); return static_cast<uint32_t>(jitter_);
} }
void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end) void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last)
{ {
for (uint16_t s = seq_start; s != seq_end; ++s) { for (uint16_t s = first; s != last; ++s) {
nack->insert(s); nack->insert(s);
++number_of_packet_lossed_; ++number_of_packet_lossed_;
} }
@ -346,22 +350,26 @@ SrsRtpAudioQueue::~SrsRtpAudioQueue()
void SrsRtpAudioQueue::notify_drop_seq(uint16_t seq) void SrsRtpAudioQueue::notify_drop_seq(uint16_t seq)
{ {
// TODO: FIXME: The seq may be greater than high. uint16_t next = seq + 1;
queue_->advance_to(seq + 1); if (srs_rtp_seq_distance(queue_->end, seq) > 0) {
seq = queue_->end;
}
srs_trace("nack drop seq=%u, drop range [%u, %u, %u]", seq, queue_->begin, next, queue_->end);
queue_->advance_to(next);
} }
void SrsRtpAudioQueue::notify_nack_list_full() void SrsRtpAudioQueue::notify_nack_list_full()
{ {
// TODO: FIXME: Maybe we should not drop all packets. // TODO: FIXME: Maybe we should not drop all packets.
queue_->advance_to(queue_->high()); queue_->advance_to(queue_->end);
} }
void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtpPacket2*>& frames) void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtpPacket2*>& frames)
{ {
// When done, s point to the next available packet. // When done, next point to the next available packet.
uint16_t next = queue_->low(); uint16_t next = queue_->begin;
for (; next != queue_->end; ++next) {
for (; next != queue_->high(); ++next) {
SrsRtpPacket2* pkt = queue_->at(next); SrsRtpPacket2* pkt = queue_->at(next);
// Not found or in NACK, stop collecting frame. // Not found or in NACK, stop collecting frame.
@ -373,17 +381,17 @@ void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtp
frames.push_back(pkt); frames.push_back(pkt);
} }
if (queue_->low() != next) { if (next != queue_->begin) {
// Reset the range of packets to NULL in buffer. // Reset the range of packets to NULL in buffer.
queue_->reset(queue_->low(), next); queue_->reset(queue_->begin, next);
srs_verbose("collect on frame, update head seq=%u t %u", queue_->low(), next); srs_verbose("RTC collect audio [%u, %u, %u]", queue_->begin, next, queue_->end);
queue_->advance_to(next); queue_->advance_to(next);
} }
// For audio, if overflow, clear all packets. // For audio, if overflow, clear all packets.
if (queue_->overflow()) { if (queue_->overflow()) {
queue_->advance_to(queue_->high()); queue_->advance_to(queue_->end);
} }
} }
@ -398,30 +406,20 @@ SrsRtpVideoQueue::~SrsRtpVideoQueue()
void SrsRtpVideoQueue::notify_drop_seq(uint16_t seq) void SrsRtpVideoQueue::notify_drop_seq(uint16_t seq)
{ {
uint16_t next = next_start_of_frame(); // If not found start frame, return the end, and we will clear queue.
uint16_t next = next_start_of_frame(seq);
srs_trace("nack drop seq=%u, drop range [%u, %u, %u]", seq, queue_->begin, next, queue_->end);
// Note that low_ mean not found, clear queue util one packet. queue_->advance_to(next);
if (next == queue_->low()) {
next = queue_->high() - 1;
}
// When NACK is timeout, move to the next start of frame.
srs_trace("nack drop seq=%u, drop range [%u, %u]", seq, queue_->low(), next + 1);
queue_->advance_to(next + 1);
} }
void SrsRtpVideoQueue::notify_nack_list_full() void SrsRtpVideoQueue::notify_nack_list_full()
{ {
// If not found start frame, return the end, and we will clear queue.
uint16_t next = next_keyframe(); uint16_t next = next_keyframe();
srs_trace("nack overflow, drop range [%u, %u, %u]", queue_->begin, next, queue_->end);
// Note that low_ mean not found, clear queue util one packet. queue_->advance_to(next);
if (next == queue_->low()) {
next = queue_->high() - 1;
}
// When NACK is overflow, move to the next keyframe.
srs_trace("nack overflow drop range [%u, %u]", queue_->low(), next + 1);
queue_->advance_to(next + 1);
} }
srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
@ -459,13 +457,13 @@ srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2
void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames) void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames)
{ {
while (queue_->low() != queue_->high()) { while (true) {
SrsRtpPacket2* pkt = NULL; SrsRtpPacket2* pkt = NULL;
collect_packet(nack, &pkt); collect_frame(nack, &pkt);
if (!pkt) { if (!pkt) {
return; break;
} }
frames.push_back(pkt); frames.push_back(pkt);
@ -493,33 +491,27 @@ void SrsRtpVideoQueue::request_keyframe()
void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack) void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack)
{ {
uint16_t next = next_start_of_frame(); // If not found start frame, return the end, and we will clear queue.
uint16_t next = next_start_of_frame(queue_->begin);
srs_trace("on overflow, remove range [%u, %u, %u]", queue_->begin, next, queue_->end);
// Note that low_ mean not found, clear queue util one packet. for (uint16_t s = queue_->begin; s != next; ++s) {
if (next == queue_->low()) {
next = queue_->high() - 1;
}
srs_trace("seq out of range [%u, %u]", queue_->low(), next);
for (uint16_t s = queue_->low(); s != next; ++s) {
nack->remove(s); nack->remove(s);
queue_->remove(s); queue_->remove(s);
} }
srs_trace("force update seq %u to %u", queue_->low(), next + 1); queue_->advance_to(next);
queue_->advance_to(next + 1);
} }
// TODO: FIXME: Should refer to the FU-A original video frame, to avoid finding for each packet. // TODO: FIXME: Should refer to the FU-A original video frame, to avoid finding for each packet.
void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt) void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt)
{ {
// When done, s point to the next available packet.
uint16_t next = queue_->low();
bool found = false; bool found = false;
vector<SrsRtpPacket2*> frame; vector<SrsRtpPacket2*> frame;
for (; next != queue_->high(); ++next) { // When done, next point to the next available packet.
uint16_t next = queue_->begin;
for (; next != queue_->end; ++next) {
SrsRtpPacket2* pkt = queue_->at(next); SrsRtpPacket2* pkt = queue_->at(next);
// Not found or in NACK, stop collecting frame. // Not found or in NACK, stop collecting frame.
@ -529,7 +521,7 @@ void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2
} }
// Ignore when the first packet not the start. // Ignore when the first packet not the start.
if (next == queue_->low() && !pkt->video_is_first_packet) { if (next == queue_->begin && !pkt->video_is_first_packet) {
return; return;
} }
@ -549,21 +541,20 @@ void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2
return; return;
} }
uint16_t cur = next - 1; if (next != queue_->begin) {
if (cur != queue_->high()) {
// Reset the range of packets to NULL in buffer. // Reset the range of packets to NULL in buffer.
queue_->reset(queue_->low(), next); queue_->reset(queue_->begin, next);
srs_verbose("collect on frame, update head seq=%u t %u", queue_->low(), next); srs_verbose("RTC collect video [%u, %u, %u]", queue_->begin, next, queue_->end);
queue_->advance_to(next); queue_->advance_to(next);
} }
// Merge packets to one packet. // Merge packets to one packet.
covert_packet(frame, ppkt); covert_frame(frame, ppkt);
return; return;
} }
void SrsRtpVideoQueue::covert_packet(std::vector<SrsRtpPacket2*>& frame, SrsRtpPacket2** ppkt) void SrsRtpVideoQueue::covert_frame(std::vector<SrsRtpPacket2*>& frame, SrsRtpPacket2** ppkt)
{ {
if (frame.size() == 1) { if (frame.size() == 1) {
*ppkt = frame[0]; *ppkt = frame[0];
@ -620,35 +611,34 @@ void SrsRtpVideoQueue::covert_packet(std::vector<SrsRtpPacket2*>& frame, SrsRtpP
*ppkt = pkt; *ppkt = pkt;
} }
uint16_t SrsRtpVideoQueue::next_start_of_frame() uint16_t SrsRtpVideoQueue::next_start_of_frame(uint16_t seq)
{ {
if (queue_->low() == queue_->high()) { uint16_t s = seq;
return queue_->low(); if (srs_rtp_seq_distance(seq, queue_->begin) >= 0) {
s = queue_->begin + 1;
} }
for (uint16_t s = queue_->low() + 1 ; s != queue_->high(); ++s) { for (; s != queue_->end; ++s) {
SrsRtpPacket2* pkt = queue_->at(s); SrsRtpPacket2* pkt = queue_->at(s);
if (pkt && pkt->video_is_first_packet) { if (pkt && pkt->video_is_first_packet) {
return s; return s;
} }
} }
return queue_->low(); return queue_->end;
} }
uint16_t SrsRtpVideoQueue::next_keyframe() uint16_t SrsRtpVideoQueue::next_keyframe()
{ {
if (queue_->low() == queue_->high()) { uint16_t s = queue_->begin + 1;
return queue_->low();
}
for (uint16_t s = queue_->low() + 1 ; s != queue_->high(); ++s) { for (; s != queue_->end; ++s) {
SrsRtpPacket2* pkt = queue_->at(s); SrsRtpPacket2* pkt = queue_->at(s);
if (pkt && pkt->video_is_idr && pkt->video_is_first_packet) { if (pkt && pkt->video_is_idr && pkt->video_is_first_packet) {
return s; return s;
} }
} }
return queue_->low(); return queue_->end;
} }

View file

@ -66,9 +66,9 @@ struct SrsRtpNackInfo
// distance(low=3, high=65534) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)65534) === 5 // distance(low=3, high=65534) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)65534) === 5
// distance(low=65532, high=65534) === (int16_t)(uint16_t)((uint16_t)65532-(uint16_t)65534) === -2 // distance(low=65532, high=65534) === (int16_t)(uint16_t)((uint16_t)65532-(uint16_t)65534) === -2
// For RTP sequence, it's only uint16 and may flip back, so 3 maybe 3+0xffff. // For RTP sequence, it's only uint16 and may flip back, so 3 maybe 3+0xffff.
inline bool srs_rtp_seq_distance(const uint16_t& low, const uint16_t& high) inline int16_t srs_rtp_seq_distance(const uint16_t& low, const uint16_t& high)
{ {
return ((int16_t)(high - low)) > 0; return (int16_t)(high - low);
} }
class SrsRtpNackForReceiver class SrsRtpNackForReceiver
@ -76,7 +76,7 @@ class SrsRtpNackForReceiver
private: private:
struct SeqComp { struct SeqComp {
bool operator()(const uint16_t& low, const uint16_t& high) const { bool operator()(const uint16_t& low, const uint16_t& high) const {
return srs_rtp_seq_distance(low, high); return srs_rtp_seq_distance(low, high) > 0;
} }
}; };
private: private:
@ -124,29 +124,34 @@ private:
uint64_t nn_seq_flip_backs; uint64_t nn_seq_flip_backs;
// Whether initialized, because we use uint16 so we can't use -1. // Whether initialized, because we use uint16 so we can't use -1.
bool initialized_; bool initialized_;
private: public:
// Current position we are working at. // The begin iterator for ring buffer.
uint16_t low_; // For example, when got 1 elems, the begin is 0.
uint16_t high_; uint16_t begin;
// The end iterator for ring buffer.
// For example, when got 1 elems, the end is 1.
uint16_t end;
public: public:
SrsRtpRingBuffer(int capacity); SrsRtpRingBuffer(int capacity);
virtual ~SrsRtpRingBuffer(); virtual ~SrsRtpRingBuffer();
public: public:
// Move the position of buffer. // Whether the ring buffer is empty.
uint16_t low(); bool empty();
uint16_t high(); // Get the count of elems in ring buffer.
int size();
// Move the low position of buffer to seq.
void advance_to(uint16_t seq); void advance_to(uint16_t seq);
// Free the packet at position. // Free the packet at position.
void set(uint16_t at, SrsRtpPacket2* pkt); void set(uint16_t at, SrsRtpPacket2* pkt);
void remove(uint16_t at); void remove(uint16_t at);
// Directly reset range [low, high] to NULL. // Directly reset range [first, last) to NULL.
void reset(uint16_t low, uint16_t high); void reset(uint16_t first, uint16_t last);
// Whether queue overflow or heavy(too many packets and need clear). // Whether queue overflow or heavy(too many packets and need clear).
bool overflow(); bool overflow();
// The highest sequence number, calculate the flip back base. // The highest sequence number, calculate the flip back base.
uint32_t get_extended_highest_sequence(); uint32_t get_extended_highest_sequence();
// Update the sequence, got the nack range by [low, high]. // Update the sequence, got the nack range by [first, last).
void update(uint16_t seq, uint16_t& nack_low, uint16_t& nack_high); void update(uint16_t seq, uint16_t& nack_first, uint16_t& nack_last);
// Get the packet by seq. // Get the packet by seq.
SrsRtpPacket2* at(uint16_t seq); SrsRtpPacket2* at(uint16_t seq);
}; };
@ -176,7 +181,7 @@ public:
uint32_t get_cumulative_number_of_packets_lost(); uint32_t get_cumulative_number_of_packets_lost();
uint32_t get_interarrival_jitter(); uint32_t get_interarrival_jitter();
private: private:
void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end); void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t first, uint16_t last);
}; };
class SrsRtpAudioQueue : public SrsRtpQueue class SrsRtpAudioQueue : public SrsRtpQueue
@ -206,14 +211,9 @@ public:
void request_keyframe(); void request_keyframe();
private: private:
virtual void on_overflow(SrsRtpNackForReceiver* nack); virtual void on_overflow(SrsRtpNackForReceiver* nack);
virtual void collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt); virtual void collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt);
virtual void covert_packet(std::vector<SrsRtpPacket2*>& frame, SrsRtpPacket2** ppkt); virtual void covert_frame(std::vector<SrsRtpPacket2*>& frame, SrsRtpPacket2** ppkt);
// For video, get the next start packet of frame. uint16_t next_start_of_frame(uint16_t seq);
// @remark If not found, return the low_, which should never be the "next" one,
// because it MAY or NOT current start packet of frame but never be the next.
uint16_t next_start_of_frame();
// For video, get the next seq of keyframe.
// @remark Return low_ if not found.
uint16_t next_keyframe(); uint16_t next_keyframe();
}; };

View file

@ -24,6 +24,6 @@
#ifndef SRS_CORE_VERSION4_HPP #ifndef SRS_CORE_VERSION4_HPP
#define SRS_CORE_VERSION4_HPP #define SRS_CORE_VERSION4_HPP
#define SRS_VERSION4_REVISION 24 #define SRS_VERSION4_REVISION 25
#endif #endif