mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Refactor code, extract nack from queue
This commit is contained in:
parent
f57e8de3f9
commit
42ee52fa29
4 changed files with 32 additions and 40 deletions
|
@ -1466,7 +1466,9 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
|
||||||
|
|
||||||
rtc_session = session;
|
rtc_session = session;
|
||||||
video_queue_ = new SrsRtpQueue(1000);
|
video_queue_ = new SrsRtpQueue(1000);
|
||||||
|
video_nack_ = new SrsRtpNackForReceiver(video_queue_, 1000 * 2 / 3);
|
||||||
audio_queue_ = new SrsRtpQueue(100, true);
|
audio_queue_ = new SrsRtpQueue(100, true);
|
||||||
|
audio_nack_ = new SrsRtpNackForReceiver(video_queue_, 100 * 2 / 3);
|
||||||
|
|
||||||
source = NULL;
|
source = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1481,7 +1483,9 @@ SrsRtcPublisher::~SrsRtcPublisher()
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_freep(report_timer);
|
srs_freep(report_timer);
|
||||||
|
srs_freep(video_nack_);
|
||||||
srs_freep(video_queue_);
|
srs_freep(video_queue_);
|
||||||
|
srs_freep(audio_nack_);
|
||||||
srs_freep(audio_queue_);
|
srs_freep(audio_queue_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1661,9 +1665,9 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf)
|
||||||
ssrc, compact_ntp, lrr, dlrr, rtt);
|
ssrc, compact_ntp, lrr, dlrr, rtt);
|
||||||
|
|
||||||
if (ssrc == video_ssrc) {
|
if (ssrc == video_ssrc) {
|
||||||
video_queue_->update_rtt(rtt);
|
video_nack_->update_rtt(rtt);
|
||||||
} else if (ssrc == audio_ssrc) {
|
} else if (ssrc == audio_ssrc) {
|
||||||
audio_queue_->update_rtt(rtt);
|
audio_nack_->update_rtt(rtt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1672,7 +1676,7 @@ srs_error_t SrsRtcPublisher::on_rtcp_xr(char* buf, int nb_buf)
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc)
|
void SrsRtcPublisher::check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc)
|
||||||
{
|
{
|
||||||
// If DTLS is not OK, drop all messages.
|
// If DTLS is not OK, drop all messages.
|
||||||
if (!rtc_session->dtls_session) {
|
if (!rtc_session->dtls_session) {
|
||||||
|
@ -1680,7 +1684,7 @@ void SrsRtcPublisher::check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc)
|
||||||
}
|
}
|
||||||
|
|
||||||
vector<uint16_t> nack_seqs;
|
vector<uint16_t> nack_seqs;
|
||||||
rtp_queue->get_nack_seqs(nack_seqs);
|
nack->get_nack_seqs(nack_seqs);
|
||||||
vector<uint16_t>::iterator iter = nack_seqs.begin();
|
vector<uint16_t>::iterator iter = nack_seqs.begin();
|
||||||
while (iter != nack_seqs.end()) {
|
while (iter != nack_seqs.end()) {
|
||||||
char buf[kRtpPacketSize];
|
char buf[kRtpPacketSize];
|
||||||
|
@ -1915,14 +1919,14 @@ srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt)
|
||||||
pkt->is_key_frame = true;
|
pkt->is_key_frame = true;
|
||||||
|
|
||||||
// TODO: FIXME: Error check.
|
// TODO: FIXME: Error check.
|
||||||
audio_queue_->consume(pkt);
|
audio_queue_->consume(audio_nack_, pkt);
|
||||||
|
|
||||||
if (audio_queue_->should_request_key_frame()) {
|
if (audio_queue_->should_request_key_frame()) {
|
||||||
// TODO: FIXME: Check error.
|
// TODO: FIXME: Check error.
|
||||||
send_rtcp_fb_pli(audio_ssrc);
|
send_rtcp_fb_pli(audio_ssrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
check_send_nacks(audio_queue_, audio_ssrc);
|
check_send_nacks(audio_nack_, audio_ssrc);
|
||||||
|
|
||||||
return collect_audio_frames();
|
return collect_audio_frames();
|
||||||
}
|
}
|
||||||
|
@ -2011,14 +2015,14 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: FIXME: Error check.
|
// TODO: FIXME: Error check.
|
||||||
video_queue_->consume(pkt);
|
video_queue_->consume(video_nack_, pkt);
|
||||||
|
|
||||||
if (video_queue_->should_request_key_frame()) {
|
if (video_queue_->should_request_key_frame()) {
|
||||||
// TODO: FIXME: Check error.
|
// TODO: FIXME: Check error.
|
||||||
send_rtcp_fb_pli(video_ssrc);
|
send_rtcp_fb_pli(video_ssrc);
|
||||||
}
|
}
|
||||||
|
|
||||||
check_send_nacks(video_queue_, video_ssrc);
|
check_send_nacks(video_nack_, video_ssrc);
|
||||||
|
|
||||||
return collect_video_frames();
|
return collect_video_frames();
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,7 @@ class SrsRtpH264Demuxer;
|
||||||
class SrsRtpOpusDemuxer;
|
class SrsRtpOpusDemuxer;
|
||||||
class SrsRtpPacket2;
|
class SrsRtpPacket2;
|
||||||
class ISrsCodec;
|
class ISrsCodec;
|
||||||
|
class SrsRtpNackForReceiver;
|
||||||
|
|
||||||
const uint8_t kSR = 200;
|
const uint8_t kSR = 200;
|
||||||
const uint8_t kRR = 201;
|
const uint8_t kRR = 201;
|
||||||
|
@ -261,7 +262,9 @@ private:
|
||||||
uint32_t audio_ssrc;
|
uint32_t audio_ssrc;
|
||||||
private:
|
private:
|
||||||
SrsRtpQueue* video_queue_;
|
SrsRtpQueue* video_queue_;
|
||||||
|
SrsRtpNackForReceiver* video_nack_;
|
||||||
SrsRtpQueue* audio_queue_;
|
SrsRtpQueue* audio_queue_;
|
||||||
|
SrsRtpNackForReceiver* audio_nack_;
|
||||||
private:
|
private:
|
||||||
SrsRequest* req;
|
SrsRequest* req;
|
||||||
SrsSource* source;
|
SrsSource* source;
|
||||||
|
@ -278,7 +281,7 @@ public:
|
||||||
srs_error_t on_rtcp_sender_report(char* buf, int nb_buf);
|
srs_error_t on_rtcp_sender_report(char* buf, int nb_buf);
|
||||||
srs_error_t on_rtcp_xr(char* buf, int nb_buf);
|
srs_error_t on_rtcp_xr(char* buf, int nb_buf);
|
||||||
private:
|
private:
|
||||||
void check_send_nacks(SrsRtpQueue* rtp_queue, uint32_t ssrc);
|
void check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc);
|
||||||
srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue);
|
srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpQueue* rtp_queue);
|
||||||
srs_error_t send_rtcp_xr_rrtr(uint32_t ssrc);
|
srs_error_t send_rtcp_xr_rrtr(uint32_t ssrc);
|
||||||
srs_error_t send_rtcp_fb_pli(uint32_t ssrc);
|
srs_error_t send_rtcp_fb_pli(uint32_t ssrc);
|
||||||
|
|
|
@ -284,7 +284,6 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
|
||||||
{
|
{
|
||||||
nn_collected_frames = 0;
|
nn_collected_frames = 0;
|
||||||
queue_ = new SrsRtpRingBuffer(capacity);
|
queue_ = new SrsRtpRingBuffer(capacity);
|
||||||
nack_ = new SrsRtpNackForReceiver(this, capacity * 2 / 3);
|
|
||||||
|
|
||||||
jitter_ = 0;
|
jitter_ = 0;
|
||||||
last_trans_time_ = -1;
|
last_trans_time_ = -1;
|
||||||
|
@ -303,10 +302,9 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame)
|
||||||
SrsRtpQueue::~SrsRtpQueue()
|
SrsRtpQueue::~SrsRtpQueue()
|
||||||
{
|
{
|
||||||
srs_freep(queue_);
|
srs_freep(queue_);
|
||||||
srs_freep(nack_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt)
|
srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
@ -314,13 +312,13 @@ srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt)
|
||||||
srs_utime_t now = srs_update_system_time();
|
srs_utime_t now = srs_update_system_time();
|
||||||
|
|
||||||
uint16_t seq = pkt->rtp_header.get_sequence();
|
uint16_t seq = pkt->rtp_header.get_sequence();
|
||||||
SrsRtpNackInfo* nack_info = nack_->find(seq);
|
SrsRtpNackInfo* nack_info = nack->find(seq);
|
||||||
if (nack_info) {
|
if (nack_info) {
|
||||||
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
|
int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0;
|
||||||
(void)nack_rtt;
|
(void)nack_rtt;
|
||||||
srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms",
|
srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms",
|
||||||
seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt);
|
seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt);
|
||||||
nack_->remove(seq);
|
nack->remove(seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calc jitter time, ignore nack packets.
|
// Calc jitter time, ignore nack packets.
|
||||||
|
@ -348,14 +346,14 @@ srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt)
|
||||||
queue_->update(seq, !nn_collected_frames, nack_low, nack_high);
|
queue_->update(seq, !nn_collected_frames, nack_low, nack_high);
|
||||||
if (srs_rtp_seq_distance(nack_low, nack_high)) {
|
if (srs_rtp_seq_distance(nack_low, nack_high)) {
|
||||||
srs_trace("update nack seq=%u, startup=%d, nack range [%u, %u]", seq, !nn_collected_frames, nack_low, nack_high);
|
srs_trace("update nack seq=%u, startup=%d, nack range [%u, %u]", seq, !nn_collected_frames, nack_low, nack_high);
|
||||||
insert_into_nack_list(nack_low, nack_high);
|
insert_into_nack_list(nack, nack_low, nack_high);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// When packets overflow, collect frame and move head to next frame start.
|
// When packets overflow, collect frame and move head to next frame start.
|
||||||
if (queue_->overflow()) {
|
if (queue_->overflow()) {
|
||||||
srs_verbose("try collect packet becuase seq out of range");
|
srs_verbose("try collect packet becuase seq out of range");
|
||||||
collect_packet();
|
collect_packet(nack);
|
||||||
|
|
||||||
uint16_t next = queue_->next_start_of_frame();
|
uint16_t next = queue_->next_start_of_frame();
|
||||||
|
|
||||||
|
@ -366,7 +364,7 @@ srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt)
|
||||||
srs_trace("seqs out of range, seq range [%u, %u]", queue_->low(), next);
|
srs_trace("seqs out of range, seq range [%u, %u]", queue_->low(), next);
|
||||||
|
|
||||||
for (uint16_t s = queue_->low(); s != next; ++s) {
|
for (uint16_t s = queue_->low(); s != next; ++s) {
|
||||||
nack_->remove(s);
|
nack->remove(s);
|
||||||
queue_->remove(s);
|
queue_->remove(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -382,7 +380,7 @@ srs_error_t SrsRtpQueue::consume(SrsRtpPacket2* pkt)
|
||||||
// 2. Queue has lots of packets, the load is heavy.
|
// 2. Queue has lots of packets, the load is heavy.
|
||||||
// 3. The frame contains only one packet for each frame.
|
// 3. The frame contains only one packet for each frame.
|
||||||
if (pkt->rtp_header.get_marker() || queue_->is_heavy() || one_packet_per_frame_) {
|
if (pkt->rtp_header.get_marker() || queue_->is_heavy() || one_packet_per_frame_) {
|
||||||
collect_packet();
|
collect_packet(nack);
|
||||||
}
|
}
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
|
@ -465,28 +463,18 @@ uint32_t SrsRtpQueue::get_interarrival_jitter()
|
||||||
return static_cast<uint32_t>(jitter_);
|
return static_cast<uint32_t>(jitter_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SrsRtpQueue::get_nack_seqs(vector<uint16_t>& seqs)
|
void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end)
|
||||||
{
|
|
||||||
nack_->get_nack_seqs(seqs);
|
|
||||||
}
|
|
||||||
|
|
||||||
void SrsRtpQueue::update_rtt(int rtt)
|
|
||||||
{
|
|
||||||
nack_->update_rtt(rtt);
|
|
||||||
}
|
|
||||||
|
|
||||||
void SrsRtpQueue::insert_into_nack_list(uint16_t seq_start, uint16_t seq_end)
|
|
||||||
{
|
{
|
||||||
for (uint16_t s = seq_start; s != seq_end; ++s) {
|
for (uint16_t s = seq_start; s != seq_end; ++s) {
|
||||||
srs_verbose("loss seq=%u, insert into nack list", s);
|
srs_verbose("loss seq=%u, insert into nack list", s);
|
||||||
nack_->insert(s);
|
nack->insert(s);
|
||||||
++number_of_packet_lossed_;
|
++number_of_packet_lossed_;
|
||||||
}
|
}
|
||||||
|
|
||||||
nack_->check_queue_size();
|
nack->check_queue_size();
|
||||||
}
|
}
|
||||||
|
|
||||||
void SrsRtpQueue::collect_packet()
|
void SrsRtpQueue::collect_packet(SrsRtpNackForReceiver* nack)
|
||||||
{
|
{
|
||||||
while (queue_->low() != queue_->high()) {
|
while (queue_->low() != queue_->high()) {
|
||||||
vector<SrsRtpPacket2*> frame;
|
vector<SrsRtpPacket2*> frame;
|
||||||
|
@ -496,7 +484,7 @@ void SrsRtpQueue::collect_packet()
|
||||||
SrsRtpPacket2* pkt = queue_->at(s);
|
SrsRtpPacket2* pkt = queue_->at(s);
|
||||||
|
|
||||||
// In NACK, never collect frame.
|
// In NACK, never collect frame.
|
||||||
if (nack_->find(s) != NULL) {
|
if (nack->find(s) != NULL) {
|
||||||
srs_verbose("seq=%u, found in nack list when collect frame", s);
|
srs_verbose("seq=%u, found in nack list when collect frame", s);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,7 +164,6 @@ class SrsRtpQueue
|
||||||
private:
|
private:
|
||||||
uint64_t nn_collected_frames;
|
uint64_t nn_collected_frames;
|
||||||
SrsRtpRingBuffer* queue_;
|
SrsRtpRingBuffer* queue_;
|
||||||
SrsRtpNackForReceiver* nack_;
|
|
||||||
bool one_packet_per_frame_;
|
bool one_packet_per_frame_;
|
||||||
private:
|
private:
|
||||||
double jitter_;
|
double jitter_;
|
||||||
|
@ -181,7 +180,8 @@ public:
|
||||||
SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false);
|
SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false);
|
||||||
virtual ~SrsRtpQueue();
|
virtual ~SrsRtpQueue();
|
||||||
public:
|
public:
|
||||||
srs_error_t consume(SrsRtpPacket2* pkt);
|
srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
|
||||||
|
// TODO: FIXME: Should merge FU-A to RAW, then we can return RAW payloads.
|
||||||
void collect_frames(std::vector<std::vector<SrsRtpPacket2*> >& frames);
|
void collect_frames(std::vector<std::vector<SrsRtpPacket2*> >& frames);
|
||||||
bool should_request_key_frame();
|
bool should_request_key_frame();
|
||||||
void notify_drop_seq(uint16_t seq);
|
void notify_drop_seq(uint16_t seq);
|
||||||
|
@ -192,12 +192,9 @@ public:
|
||||||
uint8_t get_fraction_lost();
|
uint8_t get_fraction_lost();
|
||||||
uint32_t get_cumulative_number_of_packets_lost();
|
uint32_t get_cumulative_number_of_packets_lost();
|
||||||
uint32_t get_interarrival_jitter();
|
uint32_t get_interarrival_jitter();
|
||||||
public:
|
|
||||||
void get_nack_seqs(std::vector<uint16_t>& seqs);
|
|
||||||
void update_rtt(int rtt);
|
|
||||||
private:
|
private:
|
||||||
void insert_into_nack_list(uint16_t seq_start, uint16_t seq_end);
|
void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end);
|
||||||
void collect_packet();
|
void collect_packet(SrsRtpNackForReceiver* nack);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue