mirror of
https://github.com/ossrs/srs.git
synced 2025-02-13 20:01:56 +00:00
Refactor RTC publisher queue, covert FU-A to RAW.
This commit is contained in:
parent
a812183144
commit
47ed16eda1
4 changed files with 139 additions and 150 deletions
|
@ -1924,18 +1924,14 @@ void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* bu
|
|||
|
||||
srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// TODO: FIXME: Error check.
|
||||
audio_queue_->consume(audio_nack_, pkt);
|
||||
|
||||
check_send_nacks(audio_nack_, audio_ssrc);
|
||||
|
||||
return collect_audio_frames();
|
||||
}
|
||||
|
||||
srs_error_t SrsRtcPublisher::collect_audio_frames()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// Collect all audio frames.
|
||||
std::vector<SrsRtpPacket2*> frames;
|
||||
audio_queue_->collect_frames(audio_nack_, frames);
|
||||
|
||||
|
@ -1943,7 +1939,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frames()
|
|||
SrsRtpPacket2* pkt = frames[i];
|
||||
|
||||
// TODO: FIXME: Check error.
|
||||
do_collect_audio_frame(pkt);
|
||||
collect_audio_frame(pkt);
|
||||
|
||||
srs_freep(pkt);
|
||||
}
|
||||
|
@ -1951,7 +1947,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frames()
|
|||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsRtcPublisher::do_collect_audio_frame(SrsRtpPacket2* pkt)
|
||||
srs_error_t SrsRtcPublisher::collect_audio_frame(SrsRtpPacket2* pkt)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
|
@ -1989,28 +1985,6 @@ srs_error_t SrsRtcPublisher::do_collect_audio_frame(SrsRtpPacket2* pkt)
|
|||
|
||||
srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
|
||||
{
|
||||
uint8_t v = (uint8_t)pkt->nalu_type;
|
||||
if (v == kFuA) {
|
||||
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
|
||||
if (!payload) {
|
||||
srs_freep(pkt);
|
||||
return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload");
|
||||
}
|
||||
|
||||
pkt->video_is_first_packet = payload->start;
|
||||
pkt->video_is_last_packet = payload->end;
|
||||
pkt->video_is_idr = (payload->nalu_type == SrsAvcNaluTypeIDR);
|
||||
} else {
|
||||
pkt->video_is_first_packet = true;
|
||||
pkt->video_is_last_packet = true;
|
||||
|
||||
if (v == kStapA) {
|
||||
pkt->video_is_idr = true;
|
||||
} else {
|
||||
pkt->video_is_idr = (pkt->nalu_type == SrsAvcNaluTypeIDR);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: FIXME: Error check.
|
||||
video_queue_->consume(video_nack_, pkt);
|
||||
|
||||
|
@ -2021,110 +1995,35 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt)
|
|||
|
||||
check_send_nacks(video_nack_, video_ssrc);
|
||||
|
||||
return collect_video_frames();
|
||||
}
|
||||
|
||||
srs_error_t SrsRtcPublisher::collect_video_frames()
|
||||
{
|
||||
std::vector<std::vector<SrsRtpPacket2*> > frames;
|
||||
// Collect video frames.
|
||||
std::vector<SrsRtpPacket2*> frames;
|
||||
video_queue_->collect_frames(video_nack_, frames);
|
||||
|
||||
for (size_t i = 0; i < frames.size(); ++i) {
|
||||
vector<SrsRtpPacket2*>& packets = frames[i];
|
||||
if (packets.empty()) {
|
||||
continue;
|
||||
}
|
||||
SrsRtpPacket2* pkt = frames[i];
|
||||
|
||||
// TODO: FIXME: Check error.
|
||||
do_collect_video_frame(packets);
|
||||
collect_video_frame(pkt);
|
||||
|
||||
for (size_t j = 0; j < packets.size(); ++j) {
|
||||
SrsRtpPacket2* pkt = packets[j];
|
||||
srs_freep(pkt);
|
||||
}
|
||||
srs_freep(pkt);
|
||||
}
|
||||
|
||||
return srs_success;
|
||||
}
|
||||
|
||||
srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector<SrsRtpPacket2*>& packets)
|
||||
srs_error_t SrsRtcPublisher::collect_video_frame(SrsRtpPacket2* pkt)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// Although a video frame may contain many packets, they share the same NALU type.
|
||||
SrsRtpPacket2* head = packets.at(0);
|
||||
SrsAvcNaluType nalu_type = head->nalu_type;
|
||||
int64_t timestamp = head->rtp_header.get_timestamp();
|
||||
int64_t timestamp = pkt->rtp_header.get_timestamp();
|
||||
|
||||
// For FU-A or STAP-A, there must be more than one packets.
|
||||
if (nalu_type == (SrsAvcNaluType)kFuA) {
|
||||
if (packets.size() < 2) {
|
||||
return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A/STAP-A %#x %d packets", nalu_type, packets.size());
|
||||
}
|
||||
} else {
|
||||
// For others type, should be one packet for one frame.
|
||||
if (packets.size() != 1) {
|
||||
return srs_error_new(ERROR_RTC_RTP_MUXER, "NonFU-A %d packets", packets.size());
|
||||
}
|
||||
}
|
||||
|
||||
// For FU-A, group packets to one video frame.
|
||||
if (nalu_type == (SrsAvcNaluType)kFuA) {
|
||||
int nn_nalus = 0;
|
||||
for (size_t i = 0; i < packets.size(); ++i) {
|
||||
SrsRtpPacket2* pkt = packets[i];
|
||||
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
|
||||
if (!payload) {
|
||||
return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload");
|
||||
}
|
||||
nn_nalus += payload->size;
|
||||
}
|
||||
if (!nn_nalus) {
|
||||
return err;
|
||||
}
|
||||
|
||||
// TODO: FIXME: Directly covert to sample for performance.
|
||||
// 5 bytes FLV tag header.
|
||||
// 4 bytes NALU IBMF header, define by sequence header.
|
||||
// 1 byte NALU header.
|
||||
nn_nalus += 1;
|
||||
int nn_payload = nn_nalus + 5 + 4;
|
||||
char* data = new char[nn_payload];
|
||||
SrsBuffer buf(data, nn_payload);
|
||||
|
||||
SrsRtpFUAPayload2* head_payload = dynamic_cast<SrsRtpFUAPayload2*>(head->payload);
|
||||
if (head_payload->nalu_type == SrsAvcNaluTypeIDR) {
|
||||
buf.write_1bytes(0x17); // Keyframe.
|
||||
srs_trace("RTC got IDR %d bytes", nn_nalus);
|
||||
} else {
|
||||
buf.write_1bytes(0x27); // Not Keyframe.
|
||||
}
|
||||
buf.write_1bytes(0x01); // Not Sequence header.
|
||||
buf.write_3bytes(0x00); // CTS.
|
||||
buf.write_4bytes(nn_nalus);
|
||||
|
||||
buf.write_1bytes(head_payload->nri | head_payload->nalu_type); // NALU header.
|
||||
|
||||
for (size_t i = 0; i < packets.size(); ++i) {
|
||||
SrsRtpPacket2* pkt = packets[i];
|
||||
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
|
||||
buf.write_bytes(payload->payload, payload->size);
|
||||
}
|
||||
|
||||
SrsMessageHeader header;
|
||||
header.message_type = RTMP_MSG_VideoMessage;
|
||||
// TODO: FIXME: Maybe the tbn is not 90k.
|
||||
header.timestamp = (timestamp / 90) & 0x3fffffff;
|
||||
SrsCommonMessage* shared_video = new SrsCommonMessage();
|
||||
SrsAutoFree(SrsCommonMessage, shared_video);
|
||||
// TODO: FIXME: Check error.
|
||||
shared_video->create(&header, data, nn_payload);
|
||||
return source->on_video(shared_video);
|
||||
// No FU-A, because we convert it to RAW RTP packet.
|
||||
if (pkt->nalu_type == (SrsAvcNaluType)kFuA) {
|
||||
return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid FU-A");
|
||||
}
|
||||
|
||||
// For STAP-A, it must be SPS/PPS, and only one packet.
|
||||
if (nalu_type == (SrsAvcNaluType)kStapA) {
|
||||
SrsRtpPacket2* pkt = head;
|
||||
if (pkt->nalu_type == (SrsAvcNaluType)kStapA) {
|
||||
SrsRtpSTAPPayload* payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload);
|
||||
if (!payload) {
|
||||
return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload");
|
||||
|
@ -2178,7 +2077,6 @@ srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector<SrsRtpPacket2*>&
|
|||
}
|
||||
|
||||
// For RAW NALU, should be one RAW packet.
|
||||
SrsRtpPacket2* pkt = head;
|
||||
SrsRtpRawPayload* payload = dynamic_cast<SrsRtpRawPayload*>(pkt->payload);
|
||||
if (!payload) {
|
||||
return srs_error_new(ERROR_RTC_RTP_MUXER, "RAW-NALU payload");
|
||||
|
@ -2194,7 +2092,7 @@ srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector<SrsRtpPacket2*>&
|
|||
char* data = new char[nn_payload];
|
||||
SrsBuffer buf(data, nn_payload);
|
||||
|
||||
if (nalu_type == SrsAvcNaluTypeIDR) {
|
||||
if (pkt->nalu_type == SrsAvcNaluTypeIDR) {
|
||||
buf.write_1bytes(0x17); // Keyframe.
|
||||
srs_trace("RTC got IDR %d bytes", nn_payload);
|
||||
} else {
|
||||
|
|
|
@ -58,6 +58,7 @@ class SrsRtpVideoQueue;
|
|||
class SrsRtpPacket2;
|
||||
class ISrsCodec;
|
||||
class SrsRtpNackForReceiver;
|
||||
class SrsRtpIncommingVideoFrame;
|
||||
|
||||
const uint8_t kSR = 200;
|
||||
const uint8_t kRR = 201;
|
||||
|
@ -287,11 +288,9 @@ public:
|
|||
virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload);
|
||||
private:
|
||||
srs_error_t on_audio(SrsRtpPacket2* pkt);
|
||||
srs_error_t collect_audio_frames();
|
||||
srs_error_t do_collect_audio_frame(SrsRtpPacket2* packet);
|
||||
srs_error_t collect_audio_frame(SrsRtpPacket2* pkt);
|
||||
srs_error_t on_video(SrsRtpPacket2* pkt);
|
||||
srs_error_t collect_video_frames();
|
||||
srs_error_t do_collect_video_frame(std::vector<SrsRtpPacket2*>& packets);
|
||||
srs_error_t collect_video_frame(SrsRtpPacket2* pkt);
|
||||
public:
|
||||
void request_keyframe();
|
||||
// interface ISrsHourGlass
|
||||
|
|
|
@ -450,9 +450,53 @@ SrsRtpVideoQueue::~SrsRtpVideoQueue()
|
|||
{
|
||||
}
|
||||
|
||||
void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector<std::vector<SrsRtpPacket2*> >& frames)
|
||||
srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt)
|
||||
{
|
||||
collect_packet(frames, nack);
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
uint8_t v = (uint8_t)pkt->nalu_type;
|
||||
if (v == kFuA) {
|
||||
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
|
||||
if (!payload) {
|
||||
srs_freep(pkt);
|
||||
return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload");
|
||||
}
|
||||
|
||||
pkt->video_is_first_packet = payload->start;
|
||||
pkt->video_is_last_packet = payload->end;
|
||||
pkt->video_is_idr = (payload->nalu_type == SrsAvcNaluTypeIDR);
|
||||
} else {
|
||||
pkt->video_is_first_packet = true;
|
||||
pkt->video_is_last_packet = true;
|
||||
|
||||
if (v == kStapA) {
|
||||
pkt->video_is_idr = true;
|
||||
} else {
|
||||
pkt->video_is_idr = (pkt->nalu_type == SrsAvcNaluTypeIDR);
|
||||
}
|
||||
}
|
||||
|
||||
if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) {
|
||||
return srs_error_wrap(err, "video consume");
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frames)
|
||||
{
|
||||
while (queue_->low() != queue_->high()) {
|
||||
SrsRtpPacket2* pkt = NULL;
|
||||
|
||||
collect_packet(nack, &pkt);
|
||||
|
||||
if (!pkt) {
|
||||
return;
|
||||
}
|
||||
|
||||
nn_collected_frames++;
|
||||
frames.push_back(pkt);
|
||||
}
|
||||
|
||||
if (queue_->overflow()) {
|
||||
on_overflow(nack);
|
||||
|
@ -493,41 +537,27 @@ void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack)
|
|||
queue_->advance_to(next + 1);
|
||||
}
|
||||
|
||||
void SrsRtpVideoQueue::collect_packet(vector<vector<SrsRtpPacket2*> >& frames, SrsRtpNackForReceiver* nack)
|
||||
{
|
||||
while (queue_->low() != queue_->high()) {
|
||||
vector<SrsRtpPacket2*> frame;
|
||||
|
||||
do_collect_packet(nack, frame);
|
||||
|
||||
if (frame.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
nn_collected_frames++;
|
||||
frames.push_back(frame);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: FIXME: Should refer to the FU-A original video frame, to avoid finding for each packet.
|
||||
void SrsRtpVideoQueue::do_collect_packet(SrsRtpNackForReceiver* nack, vector<SrsRtpPacket2*>& frame)
|
||||
void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt)
|
||||
{
|
||||
// When done, s point to the next available packet.
|
||||
uint16_t next = queue_->low();
|
||||
|
||||
bool found = false;
|
||||
vector<SrsRtpPacket2*> frame;
|
||||
|
||||
for (; next != queue_->high(); ++next) {
|
||||
SrsRtpPacket2* pkt = queue_->at(next);
|
||||
|
||||
// Not found or in NACK, stop collecting frame.
|
||||
if (!pkt || nack->find(next) != NULL) {
|
||||
srs_trace("wait for nack seq=%u", next);
|
||||
break;
|
||||
return;
|
||||
}
|
||||
|
||||
// Ignore when the first packet not the start.
|
||||
if (next == queue_->low() && !pkt->video_is_first_packet) {
|
||||
break;
|
||||
return;
|
||||
}
|
||||
|
||||
// OK, collect packet to frame.
|
||||
|
@ -542,17 +572,78 @@ void SrsRtpVideoQueue::do_collect_packet(SrsRtpNackForReceiver* nack, vector<Srs
|
|||
}
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
frame.clear();
|
||||
if (!found || frame.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
uint16_t cur = next - 1;
|
||||
if (found && cur != queue_->high()) {
|
||||
if (cur != queue_->high()) {
|
||||
// Reset the range of packets to NULL in buffer.
|
||||
queue_->reset(queue_->low(), next);
|
||||
|
||||
srs_verbose("collect on frame, update head seq=%u t %u", queue_->low(), next);
|
||||
queue_->advance_to(next);
|
||||
}
|
||||
|
||||
// Merge packets to one packet.
|
||||
covert_packet(frame, ppkt);
|
||||
return;
|
||||
}
|
||||
|
||||
void SrsRtpVideoQueue::covert_packet(std::vector<SrsRtpPacket2*>& frame, SrsRtpPacket2** ppkt)
|
||||
{
|
||||
if (frame.size() == 1) {
|
||||
*ppkt = frame[0];
|
||||
return;
|
||||
}
|
||||
|
||||
// If more than one packet in a frame, it must be FU-A.
|
||||
SrsRtpPacket2* head = frame.at(0);
|
||||
SrsAvcNaluType nalu_type = head->nalu_type;
|
||||
|
||||
// Covert FU-A to one RAW RTP packet.
|
||||
int nn_nalus = 0;
|
||||
for (size_t i = 0; i < frame.size(); ++i) {
|
||||
SrsRtpPacket2* pkt = frame[i];
|
||||
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
|
||||
if (!payload) {
|
||||
nn_nalus = 0;
|
||||
break;
|
||||
}
|
||||
nn_nalus += payload->size;
|
||||
}
|
||||
|
||||
// Invalid packets, ignore.
|
||||
if (nalu_type != (SrsAvcNaluType)kFuA || !nn_nalus) {
|
||||
for (int i = 0; i < (int)frame.size(); i++) {
|
||||
SrsRtpPacket2* pkt = frame[i];
|
||||
srs_freep(pkt);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Merge to one RAW RTP packet.
|
||||
// TODO: FIXME: Should covert to multiple NALU RTP packet to avoid copying.
|
||||
SrsRtpPacket2* pkt = new SrsRtpPacket2();
|
||||
pkt->rtp_header = head->rtp_header;
|
||||
|
||||
SrsRtpFUAPayload2* head_payload = dynamic_cast<SrsRtpFUAPayload2*>(head->payload);
|
||||
pkt->nalu_type = head_payload->nalu_type;
|
||||
|
||||
SrsRtpRawPayload* payload = pkt->reuse_raw();
|
||||
payload->nn_payload = nn_nalus + 1;
|
||||
payload->payload = new char[payload->nn_payload];
|
||||
|
||||
SrsBuffer buf(payload->payload, payload->nn_payload);
|
||||
|
||||
buf.write_1bytes(head_payload->nri | head_payload->nalu_type); // NALU header.
|
||||
|
||||
for (size_t i = 0; i < frame.size(); ++i) {
|
||||
SrsRtpPacket2* pkt = frame[i];
|
||||
SrsRtpFUAPayload2* payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload);
|
||||
buf.write_bytes(payload->payload, payload->size);
|
||||
}
|
||||
|
||||
*ppkt = pkt;
|
||||
}
|
||||
|
||||
|
|
|
@ -204,13 +204,14 @@ public:
|
|||
SrsRtpVideoQueue(int capacity);
|
||||
virtual ~SrsRtpVideoQueue();
|
||||
public:
|
||||
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<std::vector<SrsRtpPacket2*> >& frames);
|
||||
virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt);
|
||||
virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frame);
|
||||
bool should_request_key_frame();
|
||||
void request_keyframe();
|
||||
private:
|
||||
virtual void on_overflow(SrsRtpNackForReceiver* nack);
|
||||
virtual void collect_packet(std::vector<std::vector<SrsRtpPacket2*> >& frames, SrsRtpNackForReceiver* nack);
|
||||
virtual void do_collect_packet(SrsRtpNackForReceiver* nack, std::vector<SrsRtpPacket2*>& frame);
|
||||
virtual void collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt);
|
||||
virtual void covert_packet(std::vector<SrsRtpPacket2*>& frame, SrsRtpPacket2** ppkt);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in a new issue