1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Refactor RTC video queue, extract RTP video packet

This commit is contained in:
winlin 2020-05-05 08:24:49 +08:00
parent 899dddb624
commit c4b93b8a38
4 changed files with 85 additions and 58 deletions

View file

@ -270,7 +270,7 @@ srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2
} }
if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) { if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) {
return err; return srs_error_wrap(err, "consume audio");
} }
// OK, we got one new RTP packet, which is not in NACK. // OK, we got one new RTP packet, which is not in NACK.
@ -336,10 +336,31 @@ void SrsRtpAudioQueue::collect_frames(SrsRtpNackForReceiver* nack, vector<SrsRtp
} }
} }
SrsRtpVideoPacket::SrsRtpVideoPacket()
{
video_is_first_packet = false;
video_is_last_packet = false;
video_is_idr = false;
pkt = NULL;
}
SrsRtpVideoPacket::~SrsRtpVideoPacket()
{
srs_freep(pkt);
}
SrsRtpPacket2* SrsRtpVideoPacket::detach()
{
SrsRtpPacket2* p = pkt;
pkt = NULL;
return p;
}
SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity) SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity)
{ {
request_key_frame_ = false; request_key_frame_ = false;
queue_ = new SrsRtpRingBuffer<SrsRtpPacket2*>(capacity); queue_ = new SrsRtpRingBuffer<SrsRtpVideoPacket*>(capacity);
} }
SrsRtpVideoQueue::~SrsRtpVideoQueue() SrsRtpVideoQueue::~SrsRtpVideoQueue()
@ -374,25 +395,28 @@ srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
SrsRtpVideoPacket* vpkt = new SrsRtpVideoPacket();
vpkt->pkt = pkt;
uint8_t v = (uint8_t)pkt->nalu_type; uint8_t v = (uint8_t)pkt->nalu_type;
if (v == kFuA) { if (v == kFuA) {
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload); SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
if (!payload) { if (!payload) {
srs_freep(pkt); srs_freep(pkt); srs_freep(vpkt);
return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload"); return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload");
} }
pkt->video_is_first_packet = payload->start; vpkt->video_is_first_packet = payload->start;
pkt->video_is_last_packet = payload->end; vpkt->video_is_last_packet = payload->end;
pkt->video_is_idr = (payload->nalu_type == SrsAvcNaluTypeIDR); vpkt->video_is_idr = (payload->nalu_type == SrsAvcNaluTypeIDR);
} else { } else {
pkt->video_is_first_packet = true; vpkt->video_is_first_packet = true;
pkt->video_is_last_packet = true; vpkt->video_is_last_packet = true;
if (v == kStapA) { if (v == kStapA) {
pkt->video_is_idr = true; vpkt->video_is_idr = true;
} else { } else {
pkt->video_is_idr = (pkt->nalu_type == SrsAvcNaluTypeIDR); vpkt->video_is_idr = (pkt->nalu_type == SrsAvcNaluTypeIDR);
} }
} }
@ -404,7 +428,8 @@ srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2
} }
if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) { if ((err = SrsRtpQueue::on_consume(nack, pkt)) != srs_success) {
return err; srs_freep(pkt); srs_freep(vpkt);
return srs_error_wrap(err, "consume video");
} }
// OK, we got one new RTP packet, which is not in NACK. // OK, we got one new RTP packet, which is not in NACK.
@ -421,7 +446,7 @@ srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2
} }
// Save packet at the position seq. // Save packet at the position seq.
queue_->set(seq, pkt); queue_->set(seq, vpkt);
return err; return err;
} }
@ -430,9 +455,7 @@ void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector<S
{ {
while (true) { while (true) {
SrsRtpPacket2* pkt = NULL; SrsRtpPacket2* pkt = NULL;
collect_frame(nack, &pkt); collect_frame(nack, &pkt);
if (!pkt) { if (!pkt) {
break; break;
} }
@ -480,7 +503,7 @@ void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack)
void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt) void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt)
{ {
bool found = false; bool found = false;
vector<SrsRtpPacket2*> frame; vector<SrsRtpVideoPacket*> frame;
// When done, next point to the next available packet. // When done, next point to the next available packet.
uint16_t next = queue_->begin; uint16_t next = queue_->begin;
@ -488,18 +511,18 @@ void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2*
// If nack disabled, we ignore any empty packet. // If nack disabled, we ignore any empty packet.
if (!nack) { if (!nack) {
for (; next != queue_->end; ++next) { for (; next != queue_->end; ++next) {
SrsRtpPacket2* pkt = queue_->at(next); SrsRtpVideoPacket* vpkt = queue_->at(next);
if (!pkt) { if (!vpkt) {
continue; continue;
} }
if (frame.empty() && !pkt->video_is_first_packet) { if (frame.empty() && !vpkt->video_is_first_packet) {
continue; continue;
} }
frame.push_back(pkt); frame.push_back(vpkt);
if (pkt->rtp_header.get_marker() || pkt->video_is_last_packet) { if (vpkt->pkt->rtp_header.get_marker() || vpkt->video_is_last_packet) {
found = true; found = true;
next++; next++;
break; break;
@ -507,26 +530,26 @@ void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2*
} }
} else { } else {
for (; next != queue_->end; ++next) { for (; next != queue_->end; ++next) {
SrsRtpPacket2* pkt = queue_->at(next); SrsRtpVideoPacket* vpkt = queue_->at(next);
// TODO: FIXME: Should not wait for NACK packets. // TODO: FIXME: Should not wait for NACK packets.
// Not found or in NACK, stop collecting frame. // Not found or in NACK, stop collecting frame.
if (!pkt || nack->find(next) != NULL) { if (!vpkt || nack->find(next) != NULL) {
srs_trace("wait for nack seq=%u", next); srs_trace("wait for nack seq=%u", next);
return; return;
} }
// Ignore when the first packet not the start. // Ignore when the first packet not the start.
if (frame.empty() && !pkt->video_is_first_packet) { if (frame.empty() && !vpkt->video_is_first_packet) {
return; return;
} }
// OK, collect packet to frame. // OK, collect packet to frame.
frame.push_back(pkt); frame.push_back(vpkt);
// Done, we got the last packet of frame. // Done, we got the last packet of frame.
// @remark Note that the STAP-A is marker false and it's the last packet. // @remark Note that the STAP-A is marker false and it's the last packet.
if (pkt->rtp_header.get_marker() || pkt->video_is_last_packet) { if (vpkt->pkt->rtp_header.get_marker() || vpkt->video_is_last_packet) {
found = true; found = true;
next++; next++;
break; break;
@ -548,38 +571,39 @@ void SrsRtpVideoQueue::collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2*
// Merge packets to one packet. // Merge packets to one packet.
covert_frame(frame, ppkt); covert_frame(frame, ppkt);
for (int i = 0; i < (int)frame.size(); i++) {
SrsRtpVideoPacket* pkt = frame[i];
srs_freep(pkt);
}
return; return;
} }
void SrsRtpVideoQueue::covert_frame(std::vector<SrsRtpPacket2*>& frame, SrsRtpPacket2** ppkt) void SrsRtpVideoQueue::covert_frame(std::vector<SrsRtpVideoPacket*>& frame, SrsRtpPacket2** ppkt)
{ {
if (frame.size() == 1) { if (frame.size() == 1) {
*ppkt = frame[0]; *ppkt = frame[0]->detach();
return; return;
} }
// If more than one packet in a frame, it must be FU-A. // If more than one packet in a frame, it must be FU-A.
SrsRtpPacket2* head = frame.at(0); SrsRtpPacket2* head = frame.at(0)->pkt;
SrsAvcNaluType nalu_type = head->nalu_type; SrsAvcNaluType nalu_type = head->nalu_type;
// Covert FU-A to one RAW RTP packet. // Covert FU-A to one RAW RTP packet.
int nn_nalus = 0; int nn_nalus = 0;
for (size_t i = 0; i < frame.size(); ++i) { for (size_t i = 0; i < frame.size(); ++i) {
SrsRtpPacket2* pkt = frame[i]; SrsRtpVideoPacket* vpkt = frame[i];
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload); SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(vpkt->pkt->payload);
if (!payload) { if (!payload) {
nn_nalus = 0; nn_nalus = 0; break;
break;
} }
nn_nalus += payload->size; nn_nalus += payload->size;
} }
// Invalid packets, ignore. // Invalid packets, ignore.
if (nalu_type != (SrsAvcNaluType)kFuA || !nn_nalus) { if (nalu_type != (SrsAvcNaluType)kFuA || !nn_nalus) {
for (int i = 0; i < (int)frame.size(); i++) {
SrsRtpPacket2* pkt = frame[i];
srs_freep(pkt);
}
return; return;
} }
@ -601,8 +625,8 @@ void SrsRtpVideoQueue::covert_frame(std::vector<SrsRtpPacket2*>& frame, SrsRtpPa
buf.write_1bytes(head_payload->nri | head_payload->nalu_type); // NALU header. buf.write_1bytes(head_payload->nri | head_payload->nalu_type); // NALU header.
for (size_t i = 0; i < frame.size(); ++i) { for (size_t i = 0; i < frame.size(); ++i) {
SrsRtpPacket2* pkt = frame[i]; SrsRtpVideoPacket* vpkt = frame[i];
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload); SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(vpkt->pkt->payload);
buf.write_bytes(payload->payload, payload->size); buf.write_bytes(payload->payload, payload->size);
} }
@ -617,8 +641,8 @@ uint16_t SrsRtpVideoQueue::next_start_of_frame(uint16_t seq)
} }
for (; s != queue_->end; ++s) { for (; s != queue_->end; ++s) {
SrsRtpPacket2* pkt = queue_->at(s); SrsRtpVideoPacket* vpkt = queue_->at(s);
if (pkt && pkt->video_is_first_packet) { if (vpkt && vpkt->video_is_first_packet) {
return s; return s;
} }
} }
@ -631,8 +655,8 @@ uint16_t SrsRtpVideoQueue::next_keyframe()
uint16_t s = queue_->begin + 1; uint16_t s = queue_->begin + 1;
for (; s != queue_->end; ++s) { for (; s != queue_->end; ++s) {
SrsRtpPacket2* pkt = queue_->at(s); SrsRtpVideoPacket* vpkt = queue_->at(s);
if (pkt && pkt->video_is_idr && pkt->video_is_first_packet) { if (vpkt && vpkt->video_is_idr && vpkt->video_is_first_packet) {
return s; return s;
} }
} }

View file

@ -270,11 +270,27 @@ public:
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames); virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames);
}; };
class SrsRtpVideoPacket
{
public:
// Helper information for video decoder only.
bool video_is_first_packet;
bool video_is_last_packet;
bool video_is_idr;
public:
SrsRtpPacket2* pkt;
public:
SrsRtpVideoPacket();
virtual ~SrsRtpVideoPacket();
public:
SrsRtpPacket2* detach();
};
class SrsRtpVideoQueue : public SrsRtpQueue class SrsRtpVideoQueue : public SrsRtpQueue
{ {
private: private:
bool request_key_frame_; bool request_key_frame_;
SrsRtpRingBuffer<SrsRtpPacket2*>* queue_; SrsRtpRingBuffer<SrsRtpVideoPacket*>* queue_;
public: public:
SrsRtpVideoQueue(int capacity); SrsRtpVideoQueue(int capacity);
virtual ~SrsRtpVideoQueue(); virtual ~SrsRtpVideoQueue();
@ -289,7 +305,7 @@ public:
private: private:
virtual void on_overflow(SrsRtpNackForReceiver* nack); virtual void on_overflow(SrsRtpNackForReceiver* nack);
virtual void collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt); virtual void collect_frame(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt);
virtual void covert_frame(std::vector<SrsRtpPacket2*>& frame, SrsRtpPacket2** ppkt); virtual void covert_frame(std::vector<SrsRtpVideoPacket*>& frame, SrsRtpPacket2** ppkt);
uint16_t next_start_of_frame(uint16_t seq); uint16_t next_start_of_frame(uint16_t seq);
uint16_t next_keyframe(); uint16_t next_keyframe();
}; };

View file

@ -277,17 +277,12 @@ SrsRtpPacket2::SrsRtpPacket2()
payload = NULL; payload = NULL;
decode_handler = NULL; decode_handler = NULL;
video_is_first_packet = false;
video_is_last_packet = false;
video_is_idr = false;
nalu_type = SrsAvcNaluTypeReserved; nalu_type = SrsAvcNaluTypeReserved;
original_bytes = NULL;
cache_raw = new SrsRtpRawPayload(); cache_raw = new SrsRtpRawPayload();
cache_fua = new SrsRtpFUAPayload2(); cache_fua = new SrsRtpFUAPayload2();
cache_payload = 0; cache_payload = 0;
original_bytes = NULL;
nn_original_payload = 0;
} }
SrsRtpPacket2::~SrsRtpPacket2() SrsRtpPacket2::~SrsRtpPacket2()
@ -408,7 +403,6 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf)
// If user set the decode handler, call it to set the payload. // If user set the decode handler, call it to set the payload.
if (decode_handler) { if (decode_handler) {
decode_handler->on_before_decode_payload(this, buf, &payload); decode_handler->on_before_decode_payload(this, buf, &payload);
nn_original_payload = buf->left();
} }
// By default, we always use the RAW payload. // By default, we always use the RAW payload.

View file

@ -114,15 +114,8 @@ public:
int padding; int padding;
// Decoder helper. // Decoder helper.
public: public:
// TODO: FIXME: Move to video decoder queue SrsRtpVideoQueue.
// Helper information for video decoder only.
bool video_is_first_packet;
bool video_is_last_packet;
bool video_is_idr;
// The first byte as nalu type, for video decoder only. // The first byte as nalu type, for video decoder only.
SrsAvcNaluType nalu_type; SrsAvcNaluType nalu_type;
// The original payload bytes length.
int nn_original_payload;
// The original bytes for decoder only, we will free it. // The original bytes for decoder only, we will free it.
char* original_bytes; char* original_bytes;
// Fast cache for performance. // Fast cache for performance.