1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Bridger: Support RTC2RTMP bridger and shared FastTimer. 4.0.95

This commit is contained in:
winlin 2021-04-23 10:42:15 +08:00 committed by Winlin
parent c770e6d7bc
commit 3d225973ef
12 changed files with 700 additions and 4 deletions

View file

@ -328,6 +328,14 @@ ISrsRtcStreamEventHandler::~ISrsRtcStreamEventHandler()
{
}
ISrsRtcSourceBridger::ISrsRtcSourceBridger()
{
}
ISrsRtcSourceBridger::~ISrsRtcSourceBridger()
{
}
SrsRtcStream::SrsRtcStream()
{
is_created_ = false;
@ -337,6 +345,7 @@ SrsRtcStream::SrsRtcStream()
stream_desc_ = NULL;
req = NULL;
bridger_ = NULL;
}
SrsRtcStream::~SrsRtcStream()
@ -346,6 +355,7 @@ SrsRtcStream::~SrsRtcStream()
consumers.clear();
srs_freep(req);
srs_freep(bridger_);
srs_freep(stream_desc_);
}
@ -406,6 +416,12 @@ SrsContextId SrsRtcStream::pre_source_id()
return _pre_source_id;
}
void SrsRtcStream::set_bridger(ISrsRtcSourceBridger *bridger)
{
srs_freep(bridger_);
bridger_ = bridger;
}
srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer)
{
srs_error_t err = srs_success;
@ -475,6 +491,17 @@ srs_error_t SrsRtcStream::on_publish()
return srs_error_wrap(err, "source id change");
}
// If bridge to other source, handle event and start timer to request PLI.
if (bridger_) {
if ((err = bridger_->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridger on publish");
}
// For SrsRtcStream::on_timer()
srs_utime_t pli_for_rtmp = _srs_config->get_rtc_pli_for_rtmp(req->vhost);
_srs_hybrid->timer()->subscribe(pli_for_rtmp, this);
}
// TODO: FIXME: Handle by statistic.
return err;
@ -502,6 +529,15 @@ void SrsRtcStream::on_unpublish()
h->on_unpublish();
}
//free bridger resource
if (bridger_) {
// For SrsRtcStream::on_timer()
_srs_hybrid->timer()->unsubscribe(this);
bridger_->on_unpublish();
srs_freep(bridger_);
}
// release unpublish stream description.
set_stream_desc(NULL);
@ -545,6 +581,10 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt)
}
}
if (bridger_ && (err = bridger_->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "bridger consume message");
}
return err;
}
@ -586,6 +626,22 @@ std::vector<SrsRtcTrackDescription*> SrsRtcStream::get_track_desc(std::string ty
return track_descs;
}
srs_error_t SrsRtcStream::on_timer(srs_utime_t interval, srs_utime_t tick)
{
srs_error_t err = srs_success;
if (!publish_stream_) {
return err;
}
for (int i = 0; i < (int)stream_desc_->video_track_descs_.size(); i++) {
SrsRtcTrackDescription* desc = stream_desc_->video_track_descs_.at(i);
publish_stream_->request_keyframe(desc->ssrc_);
}
return err;
}
SrsRtpPacketCacheHelper::SrsRtpPacketCacheHelper()
{
pkt = _srs_rtp_cache->allocate();
@ -1172,6 +1228,473 @@ srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector<SrsRtpPacketCacheHelpe
return err;
}
SrsRtmpFromRtcBridger::SrsRtmpFromRtcBridger(SrsSource *src)
{
source_ = src;
codec_ = NULL;
is_first_audio = true;
is_first_video = true;
format = NULL;
key_frame_ts_ = -1;
header_sn_ = 0;
memset(cache_video_pkts_, 0, sizeof(cache_video_pkts_));
}
SrsRtmpFromRtcBridger::~SrsRtmpFromRtcBridger()
{
srs_freep(codec_);
srs_freep(format);
clear_cached_video();
}
srs_error_t SrsRtmpFromRtcBridger::initialize(SrsRequest* r)
{
srs_error_t err = srs_success;
codec_ = new SrsAudioTranscoder();
format = new SrsRtmpFormat();
SrsAudioCodecId from = SrsAudioCodecIdOpus; // TODO: From SDP?
SrsAudioCodecId to = SrsAudioCodecIdAAC; // The output audio codec.
int channels = 2; // The output audio channels.
int sample_rate = 48000; // The output audio sample rate in HZ.
int bitrate = 48000; // The output audio bitrate in bps.
if ((err = codec_->initialize(from, to, channels, sample_rate, bitrate)) != srs_success) {
return srs_error_wrap(err, "bridge initialize");
}
if ((err = format->initialize()) != srs_success) {
return srs_error_wrap(err, "format initialize");
}
return err;
}
srs_error_t SrsRtmpFromRtcBridger::on_publish()
{
srs_error_t err = srs_success;
is_first_audio = true;
is_first_video = true;
// TODO: FIXME: Should sync with bridger?
if ((err = source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "source publish");
}
return err;
}
srs_error_t SrsRtmpFromRtcBridger::on_rtp(SrsRtpPacket2 *pkt)
{
srs_error_t err = srs_success;
if (!pkt->payload()) {
return err;
}
if (pkt->is_audio()) {
err = trancode_audio(pkt);
} else {
err = packet_video(pkt);
}
return err;
}
void SrsRtmpFromRtcBridger::on_unpublish()
{
// TODO: FIXME: Should sync with bridger?
source_->on_unpublish();
}
srs_error_t SrsRtmpFromRtcBridger::trancode_audio(SrsRtpPacket2 *pkt)
{
srs_error_t err = srs_success;
// to common message.
uint32_t ts = pkt->header.get_timestamp()/(48000/1000);
if (is_first_audio) {
int header_len = 0;
uint8_t* header = NULL;
codec_->aac_codec_header(&header, &header_len);
SrsCommonMessage out_rtmp;
packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio);
if ((err = source_->on_audio(&out_rtmp)) != srs_success) {
return srs_error_wrap(err, "source on audio");
}
is_first_audio = false;
}
std::vector<SrsAudioFrame *> out_pkts;
SrsRtpRawPayload *payload = dynamic_cast<SrsRtpRawPayload *>(pkt->payload());
SrsAudioFrame frame;
frame.add_sample(payload->payload, payload->nn_payload);
frame.dts = ts;
frame.cts = 0;
err = codec_->transcode(&frame, out_pkts);
if (err != srs_success) {
return err;
}
for (std::vector<SrsAudioFrame *>::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) {
SrsCommonMessage out_rtmp;
out_rtmp.header.timestamp = (*it)->dts*(48000/1000);
packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio);
if ((err = source_->on_audio(&out_rtmp)) != srs_success) {
err = srs_error_wrap(err, "source on audio");
break;
}
}
codec_->free_frames(out_pkts);
return err;
}
void SrsRtmpFromRtcBridger::packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header)
{
int rtmp_len = len + 2;
audio->header.initialize_audio(rtmp_len, pts, 1);
audio->create_payload(rtmp_len);
SrsBuffer stream(audio->payload, rtmp_len);
uint8_t aac_flag = (SrsAudioCodecIdAAC << 4) | (SrsAudioSampleRate44100 << 2) | (SrsAudioSampleBits16bit << 1) | SrsAudioChannelsStereo;
stream.write_1bytes(aac_flag);
if (is_header) {
stream.write_1bytes(0);
} else {
stream.write_1bytes(1);
}
stream.write_bytes(data, len);
audio->size = rtmp_len;
}
srs_error_t SrsRtmpFromRtcBridger::packet_video(SrsRtpPacket2* src)
{
srs_error_t err = srs_success;
// TODO: Only copy when need
SrsRtpPacket2* pkt = src->copy();
if (pkt->is_keyframe()) {
return packet_video_key_frame(pkt);
}
// store in cache
int index = cache_index(pkt->header.get_sequence());
cache_video_pkts_[index].in_use = true;
cache_video_pkts_[index].pkt = pkt;
cache_video_pkts_[index].sn = pkt->header.get_sequence();
cache_video_pkts_[index].ts = pkt->header.get_timestamp();
// check whether to recovery lost packet and can construct a video frame
if (lost_sn_ == pkt->header.get_sequence()) {
uint16_t tail_sn = 0;
int sn = find_next_lost_sn(lost_sn_, tail_sn);
if (-1 == sn ) {
if (check_frame_complete(header_sn_, tail_sn)) {
if ((err = packet_video_rtmp(header_sn_, tail_sn)) != srs_success) {
err = srs_error_wrap(err, "fail to pack video frame");
}
}
} else if (-2 == sn) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow");
} else {
lost_sn_ = (uint16_t)sn;
}
}
return err;
}
srs_error_t SrsRtmpFromRtcBridger::packet_video_key_frame(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;
// TODO: handle sps and pps in 2 rtp packets
SrsRtpSTAPPayload* stap_payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload());
if (stap_payload) {
SrsSample* sps = stap_payload->get_sps();
SrsSample* pps = stap_payload->get_pps();
if (NULL == sps || NULL == pps) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "no sps or pps in stap-a rtp. sps: %p, pps:%p", sps, pps);
} else {
//type_codec1 + avc_type + composition time + fix header + count of sps + len of sps + sps + count of pps + len of pps + pps
int nb_payload = 1 + 1 + 3 + 5 + 1 + 2 + sps->size + 1 + 2 + pps->size;
SrsCommonMessage rtmp;
rtmp.header.initialize_video(nb_payload, pkt->header.get_timestamp() / 90, 1);
rtmp.create_payload(nb_payload);
rtmp.size = nb_payload;
SrsBuffer payload(rtmp.payload, rtmp.size);
//TODO: call api
payload.write_1bytes(0x17);// type(4 bits): key frame; code(4bits): avc
payload.write_1bytes(0x0); // avc_type: sequence header
payload.write_1bytes(0x0); // composition time
payload.write_1bytes(0x0);
payload.write_1bytes(0x0);
payload.write_1bytes(0x01); // version
payload.write_1bytes(sps->bytes[1]);
payload.write_1bytes(sps->bytes[2]);
payload.write_1bytes(sps->bytes[3]);
payload.write_1bytes(0xff);
payload.write_1bytes(0xe1);
payload.write_2bytes(sps->size);
payload.write_bytes(sps->bytes, sps->size);
payload.write_1bytes(0x01);
payload.write_2bytes(pps->size);
payload.write_bytes(pps->bytes, pps->size);
if ((err = source_->on_video(&rtmp)) != srs_success) {
return err;
}
}
}
if (-1 == key_frame_ts_) {
key_frame_ts_ = pkt->header.get_timestamp();
header_sn_ = pkt->header.get_sequence();
lost_sn_ = header_sn_ + 1;
// Received key frame and clean cache of old p frame pkts
clear_cached_video();
srs_trace("set ts=%lld, header=%hu, lost=%hu", key_frame_ts_, header_sn_, lost_sn_);
} else if (key_frame_ts_ != pkt->header.get_timestamp()) {
//new key frame, clean cache
int64_t old_ts = key_frame_ts_;
uint16_t old_header_sn = header_sn_;
uint16_t old_lost_sn = lost_sn_;
key_frame_ts_ = pkt->header.get_timestamp();
header_sn_ = pkt->header.get_sequence();
lost_sn_ = header_sn_ + 1;
clear_cached_video();
srs_trace("drop old ts=%lld, header=%hu, lost=%hu, set new ts=%lld, header=%hu, lost=%hu",
old_ts, old_header_sn, old_lost_sn, key_frame_ts_, header_sn_, lost_sn_);
}
uint16_t index = cache_index(pkt->header.get_sequence());
cache_video_pkts_[index].in_use = true;
cache_video_pkts_[index].pkt = pkt;
cache_video_pkts_[index].sn = pkt->header.get_sequence();
cache_video_pkts_[index].ts = pkt->header.get_timestamp();
int32_t sn = lost_sn_;
uint16_t tail_sn = 0;
if (srs_rtp_seq_distance(header_sn_, pkt->header.get_sequence()) < 0){
// When receive previous pkt in the same frame, update header sn;
header_sn_ = pkt->header.get_sequence();
sn = find_next_lost_sn(header_sn_, tail_sn);
} else if (lost_sn_ == pkt->header.get_sequence()) {
sn = find_next_lost_sn(lost_sn_, tail_sn);
}
if (-1 == sn) {
if (check_frame_complete(header_sn_, tail_sn)) {
if ((err = packet_video_rtmp(header_sn_, tail_sn)) != srs_success) {
err = srs_error_wrap(err, "fail to packet key frame");
}
}
} else if (-2 == sn) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow");
} else {
lost_sn_ = (uint16_t)sn;
}
return err;
}
srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const uint16_t end)
{
srs_error_t err = srs_success;
//type_codec1 + avc_type + composition time + nalu size + nalu
int nb_payload = 1 + 1 + 3;
uint16_t cnt = end - start + 1;
for (uint16_t i = 0; i < cnt; ++i) {
uint16_t sn = start + i;
uint16_t index = cache_index(sn);
SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt;
// calculate nalu len
SrsRtpFUAPayload2* fua_payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload());
if (fua_payload) {
if (fua_payload->start) {
nb_payload += 1 + 4;
}
nb_payload += fua_payload->size;
continue;
}
SrsRtpSTAPPayload* stap_payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload());
if (stap_payload) {
for (int j = 0; j < stap_payload->nalus.size(); ++j) {
SrsSample* sample = stap_payload->nalus.at(j);
nb_payload += 4 + sample->size;
}
continue;
}
SrsRtpRawPayload* raw_payload = dynamic_cast<SrsRtpRawPayload*>(pkt->payload());
if (raw_payload) {
nb_payload += 4 + raw_payload->nn_payload;
continue;
}
}
SrsCommonMessage rtmp;
SrsRtpPacket2* header = cache_video_pkts_[cache_index(start)].pkt;
rtmp.header.initialize_video(nb_payload, header->header.get_timestamp() / 90, 1);
rtmp.create_payload(nb_payload);
rtmp.size = nb_payload;
SrsBuffer payload(rtmp.payload, rtmp.size);
if (header->is_keyframe()) {
payload.write_1bytes(0x17); // type(4 bits): key frame; code(4bits): avc
key_frame_ts_ = -1;
} else {
payload.write_1bytes(0x27); // type(4 bits): inter frame; code(4bits): avc
}
payload.write_1bytes(0x01); // avc_type: nalu
payload.write_1bytes(0x0); // composition time
payload.write_1bytes(0x0);
payload.write_1bytes(0x0);
int nalu_len = 0;
for (uint16_t i = 0; i < cnt; ++i) {
uint16_t index = cache_index((start + i));
SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt;
cache_video_pkts_[index].in_use = false;
cache_video_pkts_[index].pkt = NULL;
cache_video_pkts_[index].ts = 0;
cache_video_pkts_[index].sn = 0;
SrsRtpFUAPayload2* fua_payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload());
if (fua_payload) {
if (fua_payload->start) {
nalu_len = fua_payload->size + 1;
//skip 4 bytes to write nalu_len future
payload.skip(4);
payload.write_1bytes(fua_payload->nri | fua_payload->nalu_type);
payload.write_bytes(fua_payload->payload, fua_payload->size);
} else {
nalu_len += fua_payload->size;
payload.write_bytes(fua_payload->payload, fua_payload->size);
if (fua_payload->end) {
//write nalu_len back
payload.skip(-(4 + nalu_len));
payload.write_4bytes(nalu_len);
payload.skip(nalu_len);
}
}
srs_freep(pkt);
continue;
}
SrsRtpSTAPPayload* stap_payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload());
if (stap_payload) {
for (int j = 0; j < stap_payload->nalus.size(); ++j) {
SrsSample* sample = stap_payload->nalus.at(j);
payload.write_4bytes(sample->size);
payload.write_bytes(sample->bytes, sample->size);
}
srs_freep(pkt);
continue;
}
SrsRtpRawPayload* raw_payload = dynamic_cast<SrsRtpRawPayload*>(pkt->payload());
if (raw_payload) {
payload.write_4bytes(raw_payload->nn_payload);
payload.write_bytes(raw_payload->payload, raw_payload->nn_payload);
srs_freep(pkt);
continue;
}
srs_freep(pkt);
}
if ((err = source_->on_video(&rtmp)) != srs_success) {
srs_warn("fail to pack video frame");
}
header_sn_ = end + 1;
uint16_t tail_sn = 0;
int sn = find_next_lost_sn(header_sn_, tail_sn);
if (-1 == sn) {
if (check_frame_complete(header_sn_, tail_sn)) {
err = packet_video_rtmp(header_sn_, tail_sn);
}
} else if (-2 == sn) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow");
} else {
lost_sn_ = sn;
}
return err;
}
int32_t SrsRtmpFromRtcBridger::find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn)
{
uint32_t last_ts = cache_video_pkts_[cache_index(header_sn_)].ts;
for (int i = 0; i < s_cache_size; ++i) {
uint16_t lost_sn = current_sn + i;
int index = cache_index(lost_sn);
if (!cache_video_pkts_[index].in_use) {
return lost_sn;
}
//check time first, avoid two small frame mixed case decode fail
if (last_ts != cache_video_pkts_[index].ts) {
end_sn = lost_sn - 1;
return -1;
}
if (cache_video_pkts_[index].pkt->header.get_marker()) {
end_sn = lost_sn;
return -1;
}
}
srs_error("the cache is mess. the packet count of video frame is more than %u", s_cache_size);
return -2;
}
void SrsRtmpFromRtcBridger::clear_cached_video()
{
for (size_t i = 0; i < s_cache_size; i++)
{
if (cache_video_pkts_[i].in_use) {
srs_freep(cache_video_pkts_[i].pkt);
cache_video_pkts_[i].sn = 0;
cache_video_pkts_[i].ts = 0;
cache_video_pkts_[i].in_use = false;
}
}
}
bool SrsRtmpFromRtcBridger::check_frame_complete(const uint16_t start, const uint16_t end)
{
uint16_t cnt = (end - start + 1);
uint16_t fu_s_c = 0;
uint16_t fu_e_c = 0;
for (uint16_t i = 0; i < cnt; ++i) {
int index = cache_index((start + i));
SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt;
SrsRtpFUAPayload2* fua_payload = dynamic_cast<SrsRtpFUAPayload2*>(pkt->payload());
if (fua_payload) {
if (fua_payload->start) {
++fu_s_c;
}
if (fua_payload->end) {
++fu_e_c;
}
}
}
return fu_s_c == fu_e_c;
}
#endif
SrsCodecPayload::SrsCodecPayload()