mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
For RTC publish, refine the ring buffer.
This commit is contained in:
parent
5eba90ded9
commit
2fe1874a87
2 changed files with 256 additions and 171 deletions
|
@ -134,20 +134,123 @@ void SrsRtpNackList::update_rtt(int rtt)
|
||||||
opts_.nack_interval = rtt_;
|
opts_.nack_interval = rtt_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SrsRtpRingBuffer::SrsRtpRingBuffer(size_t capacity)
|
||||||
|
{
|
||||||
|
nn_seq_flip_backs = 0;
|
||||||
|
high_ = low_ = 0;
|
||||||
|
capacity_ = capacity;
|
||||||
|
initialized_ = false;
|
||||||
|
|
||||||
|
queue_ = new SrsRtpSharedPacket*[capacity_];
|
||||||
|
memset(queue_, 0, sizeof(SrsRtpSharedPacket*) * capacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
SrsRtpRingBuffer::~SrsRtpRingBuffer()
|
||||||
|
{
|
||||||
|
srs_freepa(queue_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SrsRtpRingBuffer::set(uint16_t at, SrsRtpSharedPacket* pkt)
|
||||||
|
{
|
||||||
|
SrsRtpSharedPacket* p = queue_[at % capacity_];
|
||||||
|
|
||||||
|
if (p) {
|
||||||
|
srs_freep(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
queue_[at % capacity_] = pkt;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SrsRtpRingBuffer::remove(uint16_t at)
|
||||||
|
{
|
||||||
|
set(at, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint16_t SrsRtpRingBuffer::next_start_of_frame()
|
||||||
|
{
|
||||||
|
if (low_ == high_) {
|
||||||
|
return low_;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint16_t s = low_ + 1 ; s != high_; ++s) {
|
||||||
|
SrsRtpSharedPacket*& pkt = queue_[s % capacity_];
|
||||||
|
if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return low_;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint16_t SrsRtpRingBuffer::next_keyframe()
|
||||||
|
{
|
||||||
|
if (low_ == high_) {
|
||||||
|
return low_;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint16_t s = low_ + 1 ; s != high_; ++s) {
|
||||||
|
SrsRtpSharedPacket*& pkt = queue_[s % capacity_];
|
||||||
|
if (pkt && pkt->rtp_payload_header->is_key_frame && pkt->rtp_payload_header->is_first_packet_of_frame) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return low_;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t SrsRtpRingBuffer::get_extended_highest_sequence()
|
||||||
|
{
|
||||||
|
return nn_seq_flip_backs * 65536 + high_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SrsRtpRingBuffer::update(uint16_t seq, bool startup, uint16_t& nack_low, uint16_t& nack_high)
|
||||||
|
{
|
||||||
|
if (!initialized_) {
|
||||||
|
initialized_ = true;
|
||||||
|
low_ = high_ = seq;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normal sequence, seq follows high_.
|
||||||
|
if (srs_rtp_seq_distance(high_, seq)) {
|
||||||
|
nack_low = high_ + 1;
|
||||||
|
nack_high = seq;
|
||||||
|
|
||||||
|
// When distance(seq,high_)>0 and seq<high_, seq must flip back,
|
||||||
|
// for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535.
|
||||||
|
if (seq < high_) {
|
||||||
|
srs_verbose("warp around, flip_back=%" PRId64, nn_seq_flip_backs);
|
||||||
|
++nn_seq_flip_backs;
|
||||||
|
}
|
||||||
|
high_ = seq;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Out-of-order sequence, seq before low_.
|
||||||
|
if (srs_rtp_seq_distance(seq, low_)) {
|
||||||
|
// When startup, we may receive packets in chaos order.
|
||||||
|
// Because we don't know the ISN(initiazlie sequence number), the first packet
|
||||||
|
// we received maybe no the first packet client sent.
|
||||||
|
if (startup) {
|
||||||
|
nack_low = seq + 1;
|
||||||
|
nack_high = low_;
|
||||||
|
|
||||||
|
srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", low_, seq);
|
||||||
|
low_ = seq;
|
||||||
|
} else {
|
||||||
|
srs_verbose("seq=%u, rtx success, too old", seq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
|
SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
|
||||||
: nack_(this, capacity * 2 / 3)
|
: nack_(this, capacity * 2 / 3)
|
||||||
{
|
{
|
||||||
capacity_ = capacity;
|
nn_collected_frames = 0;
|
||||||
head_sequence_ = 0;
|
queue_ = new SrsRtpRingBuffer(capacity);
|
||||||
highest_sequence_ = 0;
|
|
||||||
initialized_ = false;
|
|
||||||
start_collected_ = false;
|
|
||||||
queue_ = new SrsRtpSharedPacket*[capacity_];
|
|
||||||
memset(queue_, 0, sizeof(SrsRtpSharedPacket*) * capacity);
|
|
||||||
|
|
||||||
cycle_ = 0;
|
|
||||||
jitter_ = 0;
|
jitter_ = 0;
|
||||||
last_trans_time_ = 0;
|
last_trans_time_ = -1;
|
||||||
|
|
||||||
pre_number_of_packet_received_ = 0;
|
pre_number_of_packet_received_ = 0;
|
||||||
pre_number_of_packet_lossed_ = 0;
|
pre_number_of_packet_lossed_ = 0;
|
||||||
|
@ -162,137 +265,91 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
|
||||||
|
|
||||||
SrsRtpQueue::~SrsRtpQueue()
|
SrsRtpQueue::~SrsRtpQueue()
|
||||||
{
|
{
|
||||||
srs_freepa(queue_);
|
srs_freep(queue_);
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
|
srs_error_t SrsRtpQueue::insert(SrsRtpSharedPacket* rtp_pkt)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
uint16_t seq = rtp_pkt->rtp_header.get_sequence();
|
|
||||||
|
|
||||||
// TODO: FIXME: Update time for each packet, may hurt performance.
|
// TODO: FIXME: Update time for each packet, may hurt performance.
|
||||||
srs_utime_t now = srs_update_system_time();
|
srs_utime_t now = srs_update_system_time();
|
||||||
|
|
||||||
// First packet recv, init head_sequence and highest_sequence.
|
uint16_t seq = rtp_pkt->rtp_header.get_sequence();
|
||||||
if (!initialized_) {
|
SrsRtpNackInfo* nack_info = nack_.find(seq);
|
||||||
initialized_ = true;
|
if (nack_info) {
|
||||||
head_sequence_ = seq;
|
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
|
||||||
highest_sequence_ = seq;
|
(void)nack_rtt;
|
||||||
|
srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms",
|
||||||
|
seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt);
|
||||||
|
nack_.remove(seq);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calc jitter time, ignore nack packets.
|
||||||
|
// TODO: FIXME: Covert time to srs_utime_t.
|
||||||
|
if (last_trans_time_ == -1) {
|
||||||
|
last_trans_time_ = now / 1000 - rtp_pkt->rtp_header.get_timestamp() / 90;
|
||||||
|
} else if (!nack_info) {
|
||||||
|
int trans_time = now / 1000 - rtp_pkt->rtp_header.get_timestamp() / 90;
|
||||||
|
|
||||||
|
int cur_jitter = trans_time - last_trans_time_;
|
||||||
|
if (cur_jitter < 0) {
|
||||||
|
cur_jitter = -cur_jitter;
|
||||||
|
}
|
||||||
|
|
||||||
|
last_trans_time_ = trans_time;
|
||||||
|
|
||||||
|
jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast<double>(cur_jitter) / 16.0);
|
||||||
|
srs_verbose("jitter=%.2f", jitter_);
|
||||||
|
}
|
||||||
|
|
||||||
|
// OK, we got one new RTP packet, which is not in NACK.
|
||||||
|
if (!nack_info) {
|
||||||
++num_of_packet_received_;
|
++num_of_packet_received_;
|
||||||
|
uint16_t nack_low = 0, nack_high = 0;
|
||||||
// TODO: FIXME: Covert time to srs_utime_t.
|
queue_->update(seq, !nn_collected_frames, nack_low, nack_high);
|
||||||
last_trans_time_ = now/1000 - rtp_pkt->rtp_header.get_timestamp()/90;
|
if (srs_rtp_seq_distance(nack_low, nack_high)) {
|
||||||
} else {
|
srs_trace("update nack seq=%u, startup=%d, nack range [%u, %u]", seq, !nn_collected_frames, nack_low, nack_high);
|
||||||
SrsRtpNackInfo* nack_info = NULL;
|
insert_into_nack_list(nack_low, nack_high);
|
||||||
if ((nack_info = nack_.find(seq)) != NULL) {
|
|
||||||
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
|
|
||||||
(void)nack_rtt;
|
|
||||||
srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms",
|
|
||||||
seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt);
|
|
||||||
nack_.remove(seq);
|
|
||||||
} else {
|
|
||||||
// Calc jitter.
|
|
||||||
{
|
|
||||||
int trans_time = now / 1000 - rtp_pkt->rtp_header.get_timestamp() / 90;
|
|
||||||
|
|
||||||
int cur_jitter = trans_time - last_trans_time_;
|
|
||||||
if (cur_jitter < 0) {
|
|
||||||
cur_jitter = -cur_jitter;
|
|
||||||
}
|
|
||||||
|
|
||||||
last_trans_time_ = trans_time;
|
|
||||||
|
|
||||||
jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast<double>(cur_jitter) / 16.0);
|
|
||||||
srs_verbose("jitter=%.2f", jitter_);
|
|
||||||
}
|
|
||||||
|
|
||||||
++num_of_packet_received_;
|
|
||||||
// seq > highest_sequence_
|
|
||||||
if (seq_cmp(highest_sequence_, seq)) {
|
|
||||||
insert_into_nack_list(highest_sequence_ + 1, seq);
|
|
||||||
|
|
||||||
if (seq < highest_sequence_) {
|
|
||||||
srs_verbose("warp around, cycle=%lu", cycle_);
|
|
||||||
++cycle_;
|
|
||||||
}
|
|
||||||
highest_sequence_ = seq;
|
|
||||||
} else {
|
|
||||||
// Because we don't know the ISN(initiazlie sequence number), the first packet
|
|
||||||
// we received maybe no the first packet client sented.
|
|
||||||
if (!start_collected_) {
|
|
||||||
if (seq_cmp(seq, head_sequence_)) {
|
|
||||||
srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", head_sequence_, seq);
|
|
||||||
head_sequence_ = seq;
|
|
||||||
}
|
|
||||||
insert_into_nack_list(seq + 1, highest_sequence_);
|
|
||||||
} else {
|
|
||||||
srs_verbose("seq=%u, rtx success, too old", seq);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check seqs out of range.
|
// When packets overflow, collect frame and move head to next frame start.
|
||||||
if (head_sequence_ + capacity_ < highest_sequence_) {
|
if (queue_->overflow()) {
|
||||||
srs_verbose("try collect packet becuase seq out of range");
|
srs_verbose("try collect packet becuase seq out of range");
|
||||||
collect_packet();
|
collect_packet();
|
||||||
}
|
|
||||||
while (head_sequence_ + capacity_ < highest_sequence_) {
|
|
||||||
srs_trace("seqs out of range, head seq=%u, hightest seq=%u", head_sequence_, highest_sequence_);
|
|
||||||
remove(head_sequence_);
|
|
||||||
uint16_t s = head_sequence_ + 1;
|
|
||||||
for ( ; s != highest_sequence_; ++s) {
|
|
||||||
SrsRtpSharedPacket*& pkt = queue_[s % capacity_];
|
|
||||||
// Choose the new head sequence. Must be the first packet of frame.
|
|
||||||
if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) {
|
|
||||||
srs_trace("find except, update head seq from %u to %u when seqs out of range", head_sequence_, s);
|
|
||||||
head_sequence_ = s;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Drop the seq.
|
uint16_t next = queue_->next_start_of_frame();
|
||||||
nack_.remove(s);
|
|
||||||
srs_verbose("seqs out of range, drop seq=%u", s);
|
// Note that low_ mean not found, clear queue util one packet.
|
||||||
if (pkt && pkt->rtp_header.get_sequence() == s) {
|
if (next == queue_->low()) {
|
||||||
delete pkt;
|
next = queue_->high() - 1;
|
||||||
pkt = NULL;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
srs_trace("force update, update head seq from %u to %u when seqs out of range", head_sequence_, s);
|
srs_trace("seqs out of range, seq range [%u, %u]", queue_->low(), next);
|
||||||
head_sequence_ = s;
|
|
||||||
}
|
|
||||||
|
|
||||||
SrsRtpSharedPacket*& old_pkt = queue_[seq % capacity_];
|
for (uint16_t s = queue_->low(); s != next; ++s) {
|
||||||
if (old_pkt) {
|
nack_.remove(s);
|
||||||
delete old_pkt;
|
queue_->remove(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
srs_trace("force update, update head seq from %u to %u when seqs out of range", queue_->low(), next + 1);
|
||||||
|
queue_->advance_to(next + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: FIXME: Change to ptr of ptr.
|
// TODO: FIXME: Change to ptr of ptr.
|
||||||
old_pkt = rtp_pkt->copy();
|
queue_->set(seq, rtp_pkt->copy());
|
||||||
|
|
||||||
// Marker bit means the last packet of frame received.
|
// Collect packets to frame when:
|
||||||
if (rtp_pkt->rtp_header.get_marker() || (highest_sequence_ - head_sequence_ >= capacity_ / 2) || one_packet_per_frame_) {
|
// 1. Marker bit means the last packet of frame received.
|
||||||
|
// 2. Queue has lots of packets, the load is heavy.
|
||||||
|
// 3. The frame contains only one packet for each frame.
|
||||||
|
if (rtp_pkt->rtp_header.get_marker() || queue_->is_heavy() || one_packet_per_frame_) {
|
||||||
collect_packet();
|
collect_packet();
|
||||||
}
|
}
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsRtpQueue::remove(uint16_t seq)
|
|
||||||
{
|
|
||||||
srs_error_t err = srs_success;
|
|
||||||
|
|
||||||
SrsRtpSharedPacket*& pkt = queue_[seq % capacity_];
|
|
||||||
if (pkt && pkt->rtp_header.get_sequence() == seq) {
|
|
||||||
delete pkt;
|
|
||||||
pkt = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return err;
|
|
||||||
}
|
|
||||||
|
|
||||||
void SrsRtpQueue::get_and_clean_collected_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames)
|
void SrsRtpQueue::get_and_clean_collected_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames)
|
||||||
{
|
{
|
||||||
frames.swap(frames_);
|
frames.swap(frames_);
|
||||||
|
@ -309,44 +366,35 @@ bool SrsRtpQueue::get_and_clean_if_needed_request_key_frame()
|
||||||
|
|
||||||
void SrsRtpQueue::notify_drop_seq(uint16_t seq)
|
void SrsRtpQueue::notify_drop_seq(uint16_t seq)
|
||||||
{
|
{
|
||||||
uint16_t s = seq + 1;
|
uint16_t next = queue_->next_start_of_frame();
|
||||||
for ( ; s != highest_sequence_; ++s) {
|
|
||||||
SrsRtpSharedPacket* pkt = queue_[s % capacity_];
|
// Note that low_ mean not found, clear queue util one packet.
|
||||||
if (pkt && pkt->rtp_payload_header->is_first_packet_of_frame) {
|
if (next == queue_->low()) {
|
||||||
break;
|
next = queue_->high() - 1;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_verbose("drop seq=%u, highest seq=%u, update head seq %u to %u", seq, highest_sequence_, head_sequence_, s);
|
// When NACK is timeout, move to the next start of frame.
|
||||||
head_sequence_ = s;
|
srs_trace("nack drop seq=%u, drop range [%u, %u]", seq, queue_->low(), next + 1);
|
||||||
|
queue_->advance_to(next + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SrsRtpQueue::notify_nack_list_full()
|
void SrsRtpQueue::notify_nack_list_full()
|
||||||
{
|
{
|
||||||
bool found_key_frame = false;
|
uint16_t next = queue_->next_keyframe();
|
||||||
while (head_sequence_ <= highest_sequence_) {
|
|
||||||
SrsRtpSharedPacket* pkt = queue_[head_sequence_ % capacity_];
|
|
||||||
if (pkt && pkt->rtp_payload_header->is_key_frame && pkt->rtp_payload_header->is_first_packet_of_frame) {
|
|
||||||
found_key_frame = true;
|
|
||||||
srs_verbose("found firsr packet of key frame, seq=%u", head_sequence_);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
nack_.remove(head_sequence_);
|
// Note that low_ mean not found, clear queue util one packet.
|
||||||
remove(head_sequence_);
|
if (next == queue_->low()) {
|
||||||
++head_sequence_;
|
next = queue_->high() - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!found_key_frame) {
|
// When NACK is overflow, move to the next keyframe.
|
||||||
srs_verbose("no found first packet of key frame, request key frame");
|
srs_trace("nack overflow drop range [%u, %u]", queue_->low(), next + 1);
|
||||||
request_key_frame_ = true;
|
queue_->advance_to(next + 1);
|
||||||
head_sequence_ = highest_sequence_;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t SrsRtpQueue::get_extended_highest_sequence()
|
uint32_t SrsRtpQueue::get_extended_highest_sequence()
|
||||||
{
|
{
|
||||||
return cycle_ * 65536 + highest_sequence_;
|
return queue_->get_extended_highest_sequence();
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8_t SrsRtpQueue::get_fraction_lost()
|
uint8_t SrsRtpQueue::get_fraction_lost()
|
||||||
|
@ -392,8 +440,8 @@ void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end)
|
||||||
void SrsRtpQueue::collect_packet()
|
void SrsRtpQueue::collect_packet()
|
||||||
{
|
{
|
||||||
vector<SrsRtpSharedPacket*> frame;
|
vector<SrsRtpSharedPacket*> frame;
|
||||||
for (uint16_t s = head_sequence_; s != highest_sequence_; ++s) {
|
for (uint16_t s = queue_->low(); s != queue_->high(); ++s) {
|
||||||
SrsRtpSharedPacket* pkt = queue_[s % capacity_];
|
SrsRtpSharedPacket* pkt = queue_->at(s);
|
||||||
|
|
||||||
if (nack_.find(s) != NULL) {
|
if (nack_.find(s) != NULL) {
|
||||||
srs_verbose("seq=%u, found in nack list when collect frame", s);
|
srs_verbose("seq=%u, found in nack list when collect frame", s);
|
||||||
|
@ -401,20 +449,18 @@ void SrsRtpQueue::collect_packet()
|
||||||
}
|
}
|
||||||
|
|
||||||
// We must collect frame from first packet to last packet.
|
// We must collect frame from first packet to last packet.
|
||||||
if (s == head_sequence_ && pkt->rtp_payload_size() != 0 && !pkt->rtp_payload_header->is_first_packet_of_frame) {
|
if (s == queue_->low() && pkt->rtp_payload_size() != 0 && !pkt->rtp_payload_header->is_first_packet_of_frame) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
frame.push_back(pkt->copy());
|
frame.push_back(pkt->copy());
|
||||||
if (pkt->rtp_header.get_marker() || one_packet_per_frame_) {
|
if (pkt->rtp_header.get_marker() || one_packet_per_frame_) {
|
||||||
if (!start_collected_) {
|
nn_collected_frames++;
|
||||||
start_collected_ = true;
|
|
||||||
}
|
|
||||||
frames_.push_back(frame);
|
frames_.push_back(frame);
|
||||||
frame.clear();
|
frame.clear();
|
||||||
|
|
||||||
srs_verbose("head seq=%u, update to %u because collect one full farme", head_sequence_, s + 1);
|
srs_verbose("head seq=%u, update to %u because collect one full farme", queue_->low(), s + 1);
|
||||||
head_sequence_ = s + 1;
|
queue_->advance_to(s + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,21 +61,24 @@ struct SrsRtpNackInfo
|
||||||
int req_nack_count_;
|
int req_nack_count_;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline bool seq_cmp(const uint16_t& l, const uint16_t& r)
|
// The "distance" between two uint16 number, for example:
|
||||||
|
// distance(low=3, high=5) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)5) === -2
|
||||||
|
// distance(low=3, high=65534) === (int16_t)(uint16_t)((uint16_t)3-(uint16_t)65534) === 5
|
||||||
|
// distance(low=65532, high=65534) === (int16_t)(uint16_t)((uint16_t)65532-(uint16_t)65534) === -2
|
||||||
|
// For RTP sequence, it's only uint16 and may flip back, so 3 maybe 3+0xffff.
|
||||||
|
inline bool srs_rtp_seq_distance(const uint16_t& low, const uint16_t& high)
|
||||||
{
|
{
|
||||||
return ((int16_t)(r - l)) > 0;
|
return ((int16_t)(high - low)) > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SeqComp
|
|
||||||
{
|
|
||||||
bool operator()(const uint16_t& l, const uint16_t& r) const
|
|
||||||
{
|
|
||||||
return seq_cmp(l, r);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
class SrsRtpNackList
|
class SrsRtpNackList
|
||||||
{
|
{
|
||||||
|
private:
|
||||||
|
struct SeqComp {
|
||||||
|
bool operator()(const uint16_t& low, const uint16_t& high) const {
|
||||||
|
return srs_rtp_seq_distance(low, high);
|
||||||
|
}
|
||||||
|
};
|
||||||
private:
|
private:
|
||||||
// Nack queue, seq order, oldest to newest.
|
// Nack queue, seq order, oldest to newest.
|
||||||
std::map<uint16_t, SrsRtpNackInfo, SeqComp> queue_;
|
std::map<uint16_t, SrsRtpNackInfo, SeqComp> queue_;
|
||||||
|
@ -101,27 +104,64 @@ public:
|
||||||
void update_rtt(int rtt);
|
void update_rtt(int rtt);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// For UDP, the packets sequence may present as bellow:
|
||||||
|
// [seq1(done)|seq2|seq3 ... seq10|seq11(lost)|seq12|seq13]
|
||||||
|
// \___(head_sequence_) \ \___(highest_sequence_)
|
||||||
|
// \___(no received, in nack list)
|
||||||
|
// * seq1: The packet is done, we already got the entire frame and processed it.
|
||||||
|
// * seq2,seq3,...,seq10,seq12,seq13: We are processing theses packets, for example, some FU-A or NALUs,
|
||||||
|
// but not an entire video frame right now.
|
||||||
|
// * seq10: This packet is lost or not received, we put it in the nack list.
|
||||||
|
// We store the received packets in ring buffer.
|
||||||
|
class SrsRtpRingBuffer
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
// Capacity of the ring-buffer.
|
||||||
|
uint16_t capacity_;
|
||||||
|
// Ring bufer.
|
||||||
|
SrsRtpSharedPacket** queue_;
|
||||||
|
// Increase one when uint16 flip back, for get_extended_highest_sequence.
|
||||||
|
uint64_t nn_seq_flip_backs;
|
||||||
|
// Whether initialized, because we use uint16 so we can't use -1.
|
||||||
|
bool initialized_;
|
||||||
|
private:
|
||||||
|
// Current position we are working at.
|
||||||
|
uint16_t low_;
|
||||||
|
uint16_t high_;
|
||||||
|
public:
|
||||||
|
SrsRtpRingBuffer(size_t capacity);
|
||||||
|
virtual ~SrsRtpRingBuffer();
|
||||||
|
public:
|
||||||
|
uint16_t low() { return low_; }
|
||||||
|
uint16_t high() { return high_; }
|
||||||
|
void advance_to(uint16_t seq) { low_ = seq; }
|
||||||
|
void set(uint16_t at, SrsRtpSharedPacket* pkt);
|
||||||
|
void remove(uint16_t at);
|
||||||
|
bool overflow() { return low_ + capacity_ < high_; }
|
||||||
|
bool is_heavy() { return high_ - low_ >= capacity_ / 2; }
|
||||||
|
// Get the next start packet of frame.
|
||||||
|
// @remark If not found, return the low_, which should never be the "next" one,
|
||||||
|
// because it MAY or NOT current start packet of frame but never be the next.
|
||||||
|
uint16_t next_start_of_frame();
|
||||||
|
// Get the next seq of keyframe.
|
||||||
|
// @remark Return low_ if not found.
|
||||||
|
uint16_t next_keyframe();
|
||||||
|
// The highest sequence number, calculate the flip back base.
|
||||||
|
uint32_t get_extended_highest_sequence();
|
||||||
|
// Update the sequence, got the nack range by [low, high].
|
||||||
|
void update(uint16_t seq, bool startup, uint16_t& nack_low, uint16_t& nack_high);
|
||||||
|
// Get the packet by seq.
|
||||||
|
SrsRtpSharedPacket* at(uint16_t seq) { return queue_[seq % capacity_]; }
|
||||||
|
};
|
||||||
|
|
||||||
class SrsRtpQueue
|
class SrsRtpQueue
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
/*
|
uint64_t nn_collected_frames;
|
||||||
*[seq1|seq2|seq3|seq4|seq5 ... seq10|seq11(loss)|seq12(loss)|seq13]
|
SrsRtpRingBuffer* queue_;
|
||||||
* \___(head_sequence_) \ \___(highest_sequence_)
|
|
||||||
* \___(no received, in nack list)
|
|
||||||
*/
|
|
||||||
// Capacity of the ring-buffer.
|
|
||||||
uint16_t capacity_;
|
|
||||||
// Thei highest sequence we have receive.
|
|
||||||
uint16_t highest_sequence_;
|
|
||||||
// The sequence waitting to read.
|
|
||||||
uint16_t head_sequence_;
|
|
||||||
bool initialized_;
|
|
||||||
bool start_collected_;
|
|
||||||
// Ring bufer.
|
|
||||||
SrsRtpSharedPacket** queue_;
|
|
||||||
private:
|
private:
|
||||||
uint64_t cycle_;
|
|
||||||
double jitter_;
|
double jitter_;
|
||||||
|
// TODO: FIXME: Covert time to srs_utime_t.
|
||||||
int64_t last_trans_time_;
|
int64_t last_trans_time_;
|
||||||
uint64_t pre_number_of_packet_received_;
|
uint64_t pre_number_of_packet_received_;
|
||||||
uint64_t pre_number_of_packet_lossed_;
|
uint64_t pre_number_of_packet_lossed_;
|
||||||
|
@ -139,7 +179,6 @@ public:
|
||||||
virtual ~SrsRtpQueue();
|
virtual ~SrsRtpQueue();
|
||||||
public:
|
public:
|
||||||
srs_error_t insert(SrsRtpSharedPacket* rtp_pkt);
|
srs_error_t insert(SrsRtpSharedPacket* rtp_pkt);
|
||||||
srs_error_t remove(uint16_t seq);
|
|
||||||
public:
|
public:
|
||||||
void get_and_clean_collected_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames);
|
void get_and_clean_collected_frames(std::vector<std::vector<SrsRtpSharedPacket*> >& frames);
|
||||||
bool get_and_clean_if_needed_request_key_frame();
|
bool get_and_clean_if_needed_request_key_frame();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue