1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Refine code for RTC publisher

This commit is contained in:
winlin 2020-05-02 20:57:36 +08:00
parent 5c43037190
commit 780753c0ec
5 changed files with 284 additions and 140 deletions

View file

@ -81,7 +81,6 @@ SrsRtpNackInfo* SrsRtpNackForReceiver::find(uint16_t seq)
void SrsRtpNackForReceiver::check_queue_size()
{
if (queue_.size() >= max_queue_size_) {
srs_verbose("NACK list full, queue size=%u, max_queue_size=%u", queue_.size(), max_queue_size_);
rtp_queue_->notify_nack_list_full();
}
}
@ -102,9 +101,6 @@ void SrsRtpNackForReceiver::get_nack_seqs(vector<uint16_t>& seqs)
int alive_time = now - nack_info.generate_time_;
if (alive_time > opts_.max_alive_time || nack_info.req_nack_count_ > opts_.max_count) {
srs_verbose("NACK, drop seq=%u alive time %d bigger than max_alive_time=%d OR nack count %d bigger than %d",
seq, alive_time, opts_.max_alive_time, nack_info.req_nack_count_, opts_.max_count);
rtp_queue_->notify_drop_seq(seq);
queue_.erase(iter++);
continue;
@ -119,7 +115,6 @@ void SrsRtpNackForReceiver::get_nack_seqs(vector<uint16_t>& seqs)
++nack_info.req_nack_count_;
nack_info.pre_req_nack_time_ = now;
seqs.push_back(seq);
srs_verbose("NACK, resend seq=%u, count=%d", seq, nack_info.req_nack_count_);
}
++iter;
@ -129,16 +124,15 @@ void SrsRtpNackForReceiver::get_nack_seqs(vector<uint16_t>& seqs)
void SrsRtpNackForReceiver::update_rtt(int rtt)
{
rtt_ = rtt * SRS_UTIME_MILLISECONDS;
srs_verbose("NACK, update rtt from %ld to %d", opts_.nack_interval, rtt_);
// FIXME: limit min and max value.
opts_.nack_interval = rtt_;
}
SrsRtpRingBuffer::SrsRtpRingBuffer(size_t capacity)
SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity)
{
nn_seq_flip_backs = 0;
high_ = low_ = 0;
capacity_ = capacity;
capacity_ = (uint16_t)capacity;
initialized_ = false;
queue_ = new SrsRtpPacket2*[capacity_];
@ -190,7 +184,7 @@ void SrsRtpRingBuffer::reset(uint16_t low, uint16_t high)
bool SrsRtpRingBuffer::overflow()
{
return high_ - low_ < capacity_;
return high_ - low_ >= capacity_;
}
bool SrsRtpRingBuffer::is_heavy()
@ -251,7 +245,6 @@ void SrsRtpRingBuffer::update(uint16_t seq, bool startup, uint16_t& nack_low, ui
// When distance(seq,high_)>0 and seq<high_, seq must flip back,
// for example, high_=65535, seq=1, distance(65535,1)>0 and 1<65535.
if (seq < high_) {
srs_verbose("warp around, flip_back=%" PRId64, nn_seq_flip_backs);
++nn_seq_flip_backs;
}
high_ = seq;
@ -266,11 +259,7 @@ void SrsRtpRingBuffer::update(uint16_t seq, bool startup, uint16_t& nack_low, ui
if (startup) {
nack_low = seq + 1;
nack_high = low_;
srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", low_, seq);
low_ = seq;
} else {
srs_verbose("seq=%u, rtx success, too old", seq);
}
}
}
@ -280,7 +269,7 @@ SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq)
return queue_[seq % capacity_];
}
SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
SrsRtpQueue::SrsRtpQueue(const char* tag, int capacity)
{
nn_collected_frames = 0;
queue_ = new SrsRtpRingBuffer(capacity);
@ -294,9 +283,8 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
num_of_packet_received_ = 0;
number_of_packet_lossed_ = 0;
one_packet_per_frame_ = one_packet_per_frame;
request_key_frame_ = false;
tag_ = tag;
}
SrsRtpQueue::~SrsRtpQueue()
@ -316,8 +304,6 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
if (nack_info) {
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
(void)nack_rtt;
srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms",
seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt);
nack->remove(seq);
}
@ -336,7 +322,6 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
last_trans_time_ = trans_time;
jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast<double>(cur_jitter) / 16.0);
srs_verbose("jitter=%.2f", jitter_);
}
// OK, we got one new RTP packet, which is not in NACK.
@ -345,14 +330,13 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
uint16_t nack_low = 0, nack_high = 0;
queue_->update(seq, !nn_collected_frames, nack_low, nack_high);
if (srs_rtp_seq_distance(nack_low, nack_high)) {
srs_trace("update nack seq=%u, startup=%d, nack range [%u, %u]", seq, !nn_collected_frames, nack_low, nack_high);
srs_trace("%s update nack seq=%u, startup=%d, range [%u, %u]", tag_, seq, !nn_collected_frames, nack_low, nack_high);
insert_into_nack_list(nack, nack_low, nack_high);
}
}
// When packets overflow, collect frame and move head to next frame start.
if (queue_->overflow()) {
srs_verbose("try collect packet becuase seq out of range");
collect_packet(nack);
uint16_t next = queue_->next_start_of_frame();
@ -361,28 +345,20 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt
if (next == queue_->low()) {
next = queue_->high() - 1;
}
srs_trace("seqs out of range, seq range [%u, %u]", queue_->low(), next);
srs_trace("%s seq out of range [%u, %u]", tag_, queue_->low(), next);
for (uint16_t s = queue_->low(); s != next; ++s) {
nack->remove(s);
queue_->remove(s);
}
srs_trace("force update, update head seq from %u to %u when seqs out of range", queue_->low(), next + 1);
srs_trace("%s force update seq %u to %u", tag_, queue_->low(), next + 1);
queue_->advance_to(next + 1);
}
// Save packet at the position seq.
queue_->set(seq, pkt);
// Collect packets to frame when:
// 1. Marker bit means the last packet of frame received.
// 2. Queue has lots of packets, the load is heavy.
// 3. The frame contains only one packet for each frame.
if (pkt->rtp_header.get_marker() || queue_->is_heavy() || one_packet_per_frame_) {
collect_packet(nack);
}
return err;
}
@ -411,7 +387,7 @@ void SrsRtpQueue::notify_drop_seq(uint16_t seq)
}
// When NACK is timeout, move to the next start of frame.
srs_trace("nack drop seq=%u, drop range [%u, %u]", seq, queue_->low(), next + 1);
srs_trace("%s nack drop seq=%u, drop range [%u, %u]", tag_, seq, queue_->low(), next + 1);
queue_->advance_to(next + 1);
}
@ -425,7 +401,7 @@ void SrsRtpQueue::notify_nack_list_full()
}
// When NACK is overflow, move to the next keyframe.
srs_trace("nack overflow drop range [%u, %u]", queue_->low(), next + 1);
srs_trace("%s nack overflow drop range [%u, %u]", tag_, queue_->low(), next + 1);
queue_->advance_to(next + 1);
}
@ -466,7 +442,6 @@ uint32_t SrsRtpQueue::get_interarrival_jitter()
void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end)
{
for (uint16_t s = seq_start; s != seq_end; ++s) {
srs_verbose("loss seq=%u, insert into nack list", s);
nack->insert(s);
++number_of_packet_lossed_;
}
@ -474,46 +449,147 @@ void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t se
nack->check_queue_size();
}
void SrsRtpQueue::collect_packet(SrsRtpNackForReceiver* nack)
SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) : SrsRtpQueue("audio", capacity)
{
}
SrsRtpAudioQueue::~SrsRtpAudioQueue()
{
}
srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) {
return srs_error_wrap(err, "audio queue");
}
// For audio, always try to collect frame, because each packet is a frame.
collect_packet(nack);
return err;
}
void SrsRtpAudioQueue::collect_packet(SrsRtpNackForReceiver* nack)
{
// When done, s point to the next available packet.
uint16_t next = queue_->low();
for (; next != queue_->high(); ++next) {
SrsRtpPacket2* pkt = queue_->at(next);
// Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) {
srs_trace("%s wait for nack seq=%u", tag_, next);
break;
}
// OK, collect packet to frame.
vector<SrsRtpPacket2*> frame;
frame.push_back(pkt);
// Done, we got the last packet of frame.
nn_collected_frames++;
frames_.push_back(frame);
}
if (queue_->low() != next) {
// Reset the range of packets to NULL in buffer.
queue_->reset(queue_->low(), next);
srs_verbose("%s collect on frame, update head seq=%u t %u", tag_, queue_->low(), next);
queue_->advance_to(next);
}
}
SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity) : SrsRtpQueue("video", capacity)
{
}
SrsRtpVideoQueue::~SrsRtpVideoQueue()
{
}
srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) {
return srs_error_wrap(err, "video queue");
}
// Collect packets to frame when:
// 1. Marker bit means the last packet of frame received.
// 2. Queue has lots of packets, the load is heavy.
// TODO: FIMXE: For real-time, we should collect each frame ASAP.
if (pkt->rtp_header.get_marker() || queue_->is_heavy()) {
collect_packet(nack);
}
return err;
}
void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack)
{
while (queue_->low() != queue_->high()) {
vector<SrsRtpPacket2*> frame;
uint16_t s = queue_->low();
for (; s != queue_->high(); ++s) {
SrsRtpPacket2* pkt = queue_->at(s);
do_collect_packet(nack, frame);
// In NACK, never collect frame.
if (nack->find(s) != NULL) {
srs_verbose("seq=%u, found in nack list when collect frame", s);
return;
}
if (frame.empty()) {
return;
}
// Ignore when the first packet not the start.
if (s == queue_->low() && pkt->nn_original_payload && !pkt->is_first_packet_of_frame) {
return;
}
nn_collected_frames++;
frames_.push_back(frame);
}
}
// OK, collect packet to frame.
frame.push_back(pkt);
// TODO: FIXME: Should refer to the FU-A original video frame, to avoid finding for each packet.
void SrsRtpVideoQueue::do_collect_packet(SrsRtpNackForReceiver* nack, vector<SrsRtpPacket2*>& frame)
{
// When done, s point to the next available packet.
uint16_t next = queue_->low();
// Not the last packet, continue to process next one.
if (!pkt->rtp_header.get_marker() && !one_packet_per_frame_) {
continue;
}
bool found = false;
for (; next != queue_->high(); ++next) {
SrsRtpPacket2* pkt = queue_->at(next);
// Done, we got the last packet of frame.
nn_collected_frames++;
frames_.push_back(frame);
// Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) {
srs_trace("%s wait for nack seq=%u", tag_, next);
break;
}
if (queue_->low() != s) {
// Reset the range of packets to NULL in buffer.
queue_->reset(queue_->low(), s);
// Ignore when the first packet not the start.
if (next == queue_->low() && !pkt->is_first_packet_of_frame) {
break;
}
srs_verbose("head seq=%u, update to %u because collect one full farme", queue_->low(), s + 1);
queue_->advance_to(s + 1);
// OK, collect packet to frame.
frame.push_back(pkt);
// Done, we got the last packet of frame.
// @remark Note that the STAP-A is marker false and it's the last packet.
if (pkt->rtp_header.get_marker() || pkt->is_last_packet_of_frame) {
found = true;
next++;
break;
}
}
if (!found) {
frame.clear();
}
uint16_t cur = next - 1;
if (found && cur != queue_->high()) {
// Reset the range of packets to NULL in buffer.
queue_->reset(queue_->low(), next);
srs_verbose("%s collect on frame, update head seq=%u t %u", tag_, queue_->low(), next);
queue_->advance_to(next);
}
}