1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

RTC: Refine RTMP bridge to RTC, use RTP packets in consumer

This commit is contained in:
winlin 2020-05-14 09:33:00 +08:00
parent 54d8c36905
commit 2b1c4a188a
7 changed files with 340 additions and 286 deletions

View file

@ -35,6 +35,7 @@
#include <srs_kernel_buffer.hpp>
#include <srs_app_rtc_codec.hpp>
#include <srs_kernel_rtc_rtp.hpp>
#include <srs_core_autofree.hpp>
const int kChannel = 2;
const int kSamplerate = 48000;
@ -44,6 +45,12 @@ const int kMaxOpusPackets = 8;
// The max size for each OPUS packet.
const int kMaxOpusPacketSize = 4096;
// The RTP payload max size, reserved some paddings for SRTP as such:
// kRtpPacketSize = kRtpMaxPayloadSize + paddings
// For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400,
// which reserves 100 bytes for SRTP or paddings.
const int kRtpMaxPayloadSize = kRtpPacketSize - 200;
using namespace std;
// TODO: Add this function into SrsRtpMux class.
@ -413,13 +420,13 @@ void SrsRtcSource::set_rtc_publisher(SrsRtcPublisher* v)
rtc_publisher_ = v;
}
srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg)
srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
for (int i = 0; i < (int)consumers.size(); i++) {
SrsRtcConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) {
if ((err = consumer->enqueue2(pkt)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
@ -427,13 +434,13 @@ srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg)
return err;
}
srs_error_t SrsRtcSource::on_audio2(SrsRtpPacket2* pkt)
srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
for (int i = 0; i < (int)consumers.size(); i++) {
SrsRtcConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue2(pkt)) != srs_success) {
if ((err = consumer->enqueue(msg, true, SrsRtmpJitterAlgorithmOFF)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
@ -532,6 +539,7 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcSource* source)
codec = new SrsAudioRecode(kChannel, kSamplerate);
discard_aac = false;
discard_bframe = false;
merge_nalus = false;
}
SrsRtcFromRtmpBridger::~SrsRtcFromRtmpBridger()
@ -554,10 +562,12 @@ srs_error_t SrsRtcFromRtmpBridger::initialize(SrsRequest* r)
return srs_error_wrap(err, "init codec");
}
// TODO: FIXME: Support reload and log it.
// TODO: FIXME: Support reload.
discard_aac = _srs_config->get_rtc_aac_discard(req->vhost);
discard_bframe = _srs_config->get_rtc_bframe_discard(req->vhost);
srs_trace("RTC bridge from RTMP, discard_aac=%d, discard_bframe=%d", discard_aac, discard_bframe);
merge_nalus = _srs_config->get_rtc_server_merge_nalus();
srs_trace("RTC bridge from RTMP, discard_aac=%d, discard_bframe=%d, merge_nalus=%d",
discard_aac, discard_bframe, merge_nalus);
return err;
}
@ -661,25 +671,18 @@ srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio
int nn_max_extra_payload = 0;
SrsSample samples[nn_opus_packets];
for (int i = 0; i < nn_opus_packets; i++) {
SrsSample* p = samples + i;
p->size = opus_sizes[i];
p->bytes = new char[p->size];
memcpy(p->bytes, opus_payloads[i], p->size);
char* data = (char*)opus_payloads[i];
int size = (int)opus_sizes[i];
nn_max_extra_payload = srs_max(nn_max_extra_payload, p->size);
// TODO: FIXME: Use it to padding audios.
nn_max_extra_payload = srs_max(nn_max_extra_payload, size);
SrsRtpPacket2* packet = new SrsRtpPacket2();
packet->frame_type = SrsFrameTypeAudio;
SrsRtpPacket2* pkt = NULL;
if ((err = package_opus(data, size, &pkt)) != srs_success) {
return srs_error_wrap(err, "package opus");
}
SrsRtpRawPayload* raw = packet->reuse_raw();
raw->payload = new char[p->size];
raw->nn_payload = p->size;
memcpy(raw->payload, opus_payloads[i], p->size);
// When free the RTP packet, should free the bytes allocated here.
packet->original_bytes = raw->payload;
if ((err = source_->on_audio2(packet)) != srs_success) {
if ((err = source_->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "consume opus");
}
}
@ -687,6 +690,27 @@ srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPacket2** ppkt)
{
srs_error_t err = srs_success;
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkt->rtp_header.set_marker(true);
pkt->frame_type = SrsFrameTypeAudio;
SrsRtpRawPayload* raw = pkt->reuse_raw();
raw->payload = new char[size];
raw->nn_payload = size;
memcpy(raw->payload, data, size);
// When free the RTP packet, should free the bytes allocated here.
pkt->original_bytes = raw->payload;
*ppkt = pkt;
return err;
}
srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
@ -709,13 +733,13 @@ srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* msg)
return source_->on_video_imp(msg);
}
srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* shared_frame, SrsFormat* format)
srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* msg, SrsFormat* format)
{
srs_error_t err = srs_success;
// If IDR, we will insert SPS/PPS before IDR frame.
if (format->video && format->video->has_idr) {
shared_frame->set_has_idr(true);
msg->set_has_idr(true);
}
// Update samples to shared frame.
@ -738,7 +762,266 @@ srs_error_t SrsRtcFromRtmpBridger::filter(SrsSharedPtrMessage* shared_frame, Srs
return err;
}
shared_frame->set_samples(format->video->samples, format->video->nb_samples);
// TODO: FIXME: Directly covert samples to RTP packets.
msg->set_samples(format->video->samples, format->video->nb_samples);
int nn_samples = format->video->nb_samples;
// Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A.
if (msg->has_idr()) {
SrsRtpPacket2* pkt = NULL;
if ((err = package_stap_a(source_, msg, &pkt)) != srs_success) {
return srs_error_wrap(err, "package stap-a");
}
if ((err = source_->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "consume sps/pps");
}
}
// If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet.
vector<SrsRtpPacket2*> pkts;
if (merge_nalus && nn_samples > 1) {
if ((err = package_nalus(msg, pkts)) != srs_success) {
return srs_error_wrap(err, "package nalus as one");
}
}
// By default, we package each NALU(sample) to a RTP or FUA packet.
for (int i = 0; i < nn_samples; i++) {
SrsSample* sample = msg->samples() + i;
// We always ignore bframe here, if config to discard bframe,
// the bframe flag will not be set.
if (sample->bframe) {
continue;
}
if (sample->size <= kRtpMaxPayloadSize) {
if ((err = package_single_nalu(msg, sample, pkts)) != srs_success) {
return srs_error_wrap(err, "package single nalu");
}
} else {
if ((err = package_fu_a(msg, sample, kRtpMaxPayloadSize, pkts)) != srs_success) {
return srs_error_wrap(err, "package fu-a");
}
}
}
if (pkts.size() > 0) {
pkts.back()->rtp_header.set_marker(true);
}
return consume_packets(pkts);
}
srs_error_t SrsRtcFromRtmpBridger::package_stap_a(SrsRtcSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppkt)
{
srs_error_t err = srs_success;
SrsMetaCache* meta = source->cached_meta();
if (!meta) {
return err;
}
SrsFormat* format = meta->vsh_format();
if (!format || !format->vcodec) {
return err;
}
// Note that the sps/pps may change, so we should copy it.
const vector<char>& sps = format->vcodec->sequenceParameterSetNALUnit;
const vector<char>& pps = format->vcodec->pictureParameterSetNALUnit;
if (sps.empty() || pps.empty()) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty");
}
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkt->rtp_header.set_marker(false);
pkt->rtp_header.set_timestamp(msg->timestamp * 90);
SrsRtpSTAPPayload* stap = new SrsRtpSTAPPayload();
pkt->payload = stap;
uint8_t header = sps[0];
stap->nri = (SrsAvcNaluType)header;
// Copy the SPS/PPS bytes, because it may change.
char* p = new char[sps.size() + pps.size()];
pkt->original_bytes = p;
if (true) {
SrsSample* sample = new SrsSample();
sample->bytes = p;
sample->size = (int)sps.size();
stap->nalus.push_back(sample);
memcpy(p, (char*)&sps[0], sps.size());
p += (int)sps.size();
}
if (true) {
SrsSample* sample = new SrsSample();
sample->bytes = p;
sample->size = (int)pps.size();
stap->nalus.push_back(sample);
memcpy(p, (char*)&pps[0], pps.size());
p += (int)pps.size();
}
*ppkt = pkt;
srs_trace("RTC STAP-A seq=%u, sps %d, pps %d bytes", pkt->rtp_header.get_sequence(), sps.size(), pps.size());
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_nalus(SrsSharedPtrMessage* msg, vector<SrsRtpPacket2*>& pkts)
{
srs_error_t err = srs_success;
SrsRtpRawNALUs* raw = new SrsRtpRawNALUs();
for (int i = 0; i < msg->nn_samples(); i++) {
SrsSample* sample = msg->samples() + i;
// We always ignore bframe here, if config to discard bframe,
// the bframe flag will not be set.
if (sample->bframe) {
continue;
}
raw->push_back(sample->copy());
}
// Ignore empty.
int nn_bytes = raw->nb_bytes();
if (nn_bytes <= 0) {
srs_freep(raw);
return err;
}
if (nn_bytes < kRtpMaxPayloadSize) {
// Package NALUs in a single RTP packet.
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkt->rtp_header.set_timestamp(msg->timestamp * 90);
pkt->payload = raw;
pkt->original_msg = msg->copy();
pkts.push_back(pkt);
} else {
// We must free it, should never use RTP packets to free it,
// because more than one RTP packet will refer to it.
SrsAutoFree(SrsRtpRawNALUs, raw);
// Package NALUs in FU-A RTP packets.
int fu_payload_size = kRtpMaxPayloadSize;
// The first byte is store in FU-A header.
uint8_t header = raw->skip_first_byte();
uint8_t nal_type = header & kNalTypeMask;
int nb_left = nn_bytes - 1;
int num_of_packet = 1 + (nn_bytes - 1) / fu_payload_size;
for (int i = 0; i < num_of_packet; ++i) {
int packet_size = srs_min(nb_left, fu_payload_size);
SrsRtpFUAPayload* fua = new SrsRtpFUAPayload();
if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) {
srs_freep(fua);
return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes);
}
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkt->rtp_header.set_timestamp(msg->timestamp * 90);
fua->nri = (SrsAvcNaluType)header;
fua->nalu_type = (SrsAvcNaluType)nal_type;
fua->start = bool(i == 0);
fua->end = bool(i == num_of_packet - 1);
pkt->payload = fua;
pkt->original_msg = msg->copy();
pkts.push_back(pkt);
nb_left -= packet_size;
}
}
return err;
}
// Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6
srs_error_t SrsRtcFromRtmpBridger::package_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, vector<SrsRtpPacket2*>& pkts)
{
srs_error_t err = srs_success;
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkt->rtp_header.set_timestamp(msg->timestamp * 90);
SrsRtpRawPayload* raw = pkt->reuse_raw();
raw->payload = sample->bytes;
raw->nn_payload = sample->size;
pkt->original_msg = msg->copy();
pkts.push_back(pkt);
return err;
}
srs_error_t SrsRtcFromRtmpBridger::package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector<SrsRtpPacket2*>& pkts)
{
srs_error_t err = srs_success;
char* p = sample->bytes + 1;
int nb_left = sample->size - 1;
uint8_t header = sample->bytes[0];
uint8_t nal_type = header & kNalTypeMask;
int num_of_packet = 1 + (sample->size - 1) / fu_payload_size;
for (int i = 0; i < num_of_packet; ++i) {
int packet_size = srs_min(nb_left, fu_payload_size);
SrsRtpPacket2* pkt = new SrsRtpPacket2();
pkt->rtp_header.set_timestamp(msg->timestamp * 90);
SrsRtpFUAPayload2* fua = pkt->reuse_fua();
fua->nri = (SrsAvcNaluType)header;
fua->nalu_type = (SrsAvcNaluType)nal_type;
fua->start = bool(i == 0);
fua->end = bool(i == num_of_packet - 1);
fua->payload = p;
fua->size = packet_size;
pkt->original_msg = msg->copy();
pkts.push_back(pkt);
p += packet_size;
nb_left -= packet_size;
}
return err;
}
srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector<SrsRtpPacket2*>& pkts)
{
srs_error_t err = srs_success;
// TODO: FIXME: Consume a range of packets.
int i = 0;
for (; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts[i];
if ((err = source_->on_rtp(pkt)) != srs_success) {
err = srs_error_wrap(err, "consume sps/pps");
break;
}
}
for (; i < (int)pkts.size(); i++) {
SrsRtpPacket2* pkt = pkts[i];
srs_freep(pkt);
}
return err;
}