1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Support composited bridges for 1:N protocols converting. v6.0.41 (#3392)

Co-authored-by: john <hondaxiao@tencent.com>
Co-authored-by: chundonglinlin <chundonglinlin@163.com>
This commit is contained in:
Winlin 2023-04-01 21:34:59 +08:00 committed by GitHub
parent 771ae0a1a6
commit dcd02fe69c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 770 additions and 400 deletions

View file

@ -235,15 +235,7 @@ void SrsSrtConsumer::wait(int nb_msgs, srs_utime_t timeout)
srs_cond_timedwait(mw_wait, timeout);
}
ISrsSrtSourceBridge::ISrsSrtSourceBridge()
{
}
ISrsSrtSourceBridge::~ISrsSrtSourceBridge()
{
}
SrsRtmpFromSrtBridge::SrsRtmpFromSrtBridge(SrsLiveSource* source) : ISrsSrtSourceBridge()
SrsSrtFrameBuilder::SrsSrtFrameBuilder(ISrsStreamBridge* bridge)
{
ts_ctx_ = new SrsTsContext();
@ -252,7 +244,7 @@ SrsRtmpFromSrtBridge::SrsRtmpFromSrtBridge(SrsLiveSource* source) : ISrsSrtSourc
pps_ = "";
req_ = NULL;
live_source_ = source;
bridge_ = bridge;
video_streamid_ = 1;
audio_streamid_ = 2;
@ -260,7 +252,7 @@ SrsRtmpFromSrtBridge::SrsRtmpFromSrtBridge(SrsLiveSource* source) : ISrsSrtSourc
pp_audio_duration_ = new SrsAlonePithyPrint();
}
SrsRtmpFromSrtBridge::~SrsRtmpFromSrtBridge()
SrsSrtFrameBuilder::~SrsSrtFrameBuilder()
{
srs_freep(ts_ctx_);
srs_freep(req_);
@ -268,18 +260,12 @@ SrsRtmpFromSrtBridge::~SrsRtmpFromSrtBridge()
srs_freep(pp_audio_duration_);
}
srs_error_t SrsRtmpFromSrtBridge::on_publish()
srs_error_t SrsSrtFrameBuilder::on_publish()
{
srs_error_t err = srs_success;
if ((err = live_source_->on_publish()) != srs_success) {
return srs_error_wrap(err, "on publish");
}
return err;
return srs_success;
}
srs_error_t SrsRtmpFromSrtBridge::on_packet(SrsSrtPacket *pkt)
srs_error_t SrsSrtFrameBuilder::on_packet(SrsSrtPacket *pkt)
{
srs_error_t err = srs_success;
@ -290,10 +276,10 @@ srs_error_t SrsRtmpFromSrtBridge::on_packet(SrsSrtPacket *pkt)
int nb_packet = nb_buf / SRS_TS_PACKET_SIZE;
for (int i = 0; i < nb_packet; i++) {
char* p = buf + (i * SRS_TS_PACKET_SIZE);
SrsBuffer* stream = new SrsBuffer(p, SRS_TS_PACKET_SIZE);
SrsAutoFree(SrsBuffer, stream);
// Process each ts packet. Note that the jitter of UDP may cause video glitch when packet loss or wrong seq. We
// don't handle it because SRT will, see tlpktdrop at https://ossrs.net/lts/zh-cn/docs/v4/doc/srt-params
if ((err = ts_ctx_->decode(stream, this)) != srs_success) {
@ -302,16 +288,15 @@ srs_error_t SrsRtmpFromSrtBridge::on_packet(SrsSrtPacket *pkt)
continue;
}
}
return err;
}
void SrsRtmpFromSrtBridge::on_unpublish()
void SrsSrtFrameBuilder::on_unpublish()
{
live_source_->on_unpublish();
}
srs_error_t SrsRtmpFromSrtBridge::initialize(SrsRequest* req)
srs_error_t SrsSrtFrameBuilder::initialize(SrsRequest* req)
{
srs_error_t err = srs_success;
@ -321,7 +306,7 @@ srs_error_t SrsRtmpFromSrtBridge::initialize(SrsRequest* req)
return err;
}
srs_error_t SrsRtmpFromSrtBridge::on_ts_message(SrsTsMessage* msg)
srs_error_t SrsSrtFrameBuilder::on_ts_message(SrsTsMessage* msg)
{
srs_error_t err = srs_success;
@ -369,7 +354,7 @@ srs_error_t SrsRtmpFromSrtBridge::on_ts_message(SrsTsMessage* msg)
return err;
}
srs_error_t SrsRtmpFromSrtBridge::on_ts_video_avc(SrsTsMessage* msg, SrsBuffer* avs)
srs_error_t SrsSrtFrameBuilder::on_ts_video_avc(SrsTsMessage* msg, SrsBuffer* avs)
{
srs_error_t err = srs_success;
@ -430,7 +415,7 @@ srs_error_t SrsRtmpFromSrtBridge::on_ts_video_avc(SrsTsMessage* msg, SrsBuffer*
return on_h264_frame(msg, ipb_frames);
}
srs_error_t SrsRtmpFromSrtBridge::check_sps_pps_change(SrsTsMessage* msg)
srs_error_t SrsSrtFrameBuilder::check_sps_pps_change(SrsTsMessage* msg)
{
srs_error_t err = srs_success;
@ -470,14 +455,19 @@ srs_error_t SrsRtmpFromSrtBridge::check_sps_pps_change(SrsTsMessage* msg)
return srs_error_wrap(err, "create rtmp");
}
if ((err = live_source_->on_video(&rtmp)) != srs_success) {
SrsSharedPtrMessage frame;
if ((err = frame.create(&rtmp)) != srs_success) {
return srs_error_wrap(err, "create frame");
}
if ((err = bridge_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err, "srt to rtmp sps/pps");
}
return err;
}
srs_error_t SrsRtmpFromSrtBridge::on_h264_frame(SrsTsMessage* msg, vector<pair<char*, int> >& ipb_frames)
srs_error_t SrsSrtFrameBuilder::on_h264_frame(SrsTsMessage* msg, vector<pair<char*, int> >& ipb_frames)
{
srs_error_t err = srs_success;
@ -526,7 +516,12 @@ srs_error_t SrsRtmpFromSrtBridge::on_h264_frame(SrsTsMessage* msg, vector<pair<c
payload.write_bytes(nal, nal_size);
}
if ((err = live_source_->on_video(&rtmp)) != srs_success) {
SrsSharedPtrMessage frame;
if ((err = frame.create(&rtmp)) != srs_success) {
return srs_error_wrap(err, "create frame");
}
if ((err = bridge_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err ,"srt ts video to rtmp");
}
@ -534,7 +529,7 @@ srs_error_t SrsRtmpFromSrtBridge::on_h264_frame(SrsTsMessage* msg, vector<pair<c
}
#ifdef SRS_H265
srs_error_t SrsRtmpFromSrtBridge::on_ts_video_hevc(SrsTsMessage *msg, SrsBuffer *avs)
srs_error_t SrsSrtFrameBuilder::on_ts_video_hevc(SrsTsMessage *msg, SrsBuffer *avs)
{
srs_error_t err = srs_success;
@ -610,7 +605,7 @@ srs_error_t SrsRtmpFromSrtBridge::on_ts_video_hevc(SrsTsMessage *msg, SrsBuffer
return on_hevc_frame(msg, ipb_frames);
}
srs_error_t SrsRtmpFromSrtBridge::check_vps_sps_pps_change(SrsTsMessage* msg)
srs_error_t SrsSrtFrameBuilder::check_vps_sps_pps_change(SrsTsMessage* msg)
{
srs_error_t err = srs_success;
@ -650,14 +645,19 @@ srs_error_t SrsRtmpFromSrtBridge::check_vps_sps_pps_change(SrsTsMessage* msg)
return srs_error_wrap(err, "create rtmp");
}
if ((err = live_source_->on_video(&rtmp)) != srs_success) {
SrsSharedPtrMessage frame;
if ((err = frame.create(&rtmp)) != srs_success) {
return srs_error_wrap(err, "create frame");
}
if ((err = bridge_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err, "srt to rtmp vps/sps/pps");
}
return err;
}
srs_error_t SrsRtmpFromSrtBridge::on_hevc_frame(SrsTsMessage* msg, vector<pair<char*, int> >& ipb_frames)
srs_error_t SrsSrtFrameBuilder::on_hevc_frame(SrsTsMessage* msg, vector<pair<char*, int> >& ipb_frames)
{
srs_error_t err = srs_success;
@ -713,7 +713,12 @@ srs_error_t SrsRtmpFromSrtBridge::on_hevc_frame(SrsTsMessage* msg, vector<pair<c
payload.write_bytes(nal, nal_size);
}
if ((err = live_source_->on_video(&rtmp)) != srs_success) {
SrsSharedPtrMessage frame;
if ((err = frame.create(&rtmp)) != srs_success) {
return srs_error_wrap(err, "create frame");
}
if ((err = bridge_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err ,"srt ts hevc video to rtmp");
}
@ -721,7 +726,7 @@ srs_error_t SrsRtmpFromSrtBridge::on_hevc_frame(SrsTsMessage* msg, vector<pair<c
}
#endif
srs_error_t SrsRtmpFromSrtBridge::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
srs_error_t SrsSrtFrameBuilder::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
{
srs_error_t err = srs_success;
@ -794,7 +799,7 @@ srs_error_t SrsRtmpFromSrtBridge::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
return err;
}
srs_error_t SrsRtmpFromSrtBridge::check_audio_sh_change(SrsTsMessage* msg, uint32_t pts)
srs_error_t SrsSrtFrameBuilder::check_audio_sh_change(SrsTsMessage* msg, uint32_t pts)
{
srs_error_t err = srs_success;
@ -817,19 +822,24 @@ srs_error_t SrsRtmpFromSrtBridge::check_audio_sh_change(SrsTsMessage* msg, uint3
stream.write_1bytes(aac_flag);
stream.write_1bytes(0);
stream.write_bytes((char*)audio_sh_.data(), audio_sh_.size());
SrsSharedPtrMessage frame;
if ((err = frame.create(&rtmp)) != srs_success) {
return srs_error_wrap(err, "create frame");
}
if ((err = live_source_->on_audio(&rtmp)) != srs_success) {
if ((err = bridge_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err, "srt to rtmp audio sh");
}
return err;
}
srs_error_t SrsRtmpFromSrtBridge::on_aac_frame(SrsTsMessage* msg, uint32_t pts, char* frame, int frame_size)
srs_error_t SrsSrtFrameBuilder::on_aac_frame(SrsTsMessage* msg, uint32_t pts, char* data, int data_size)
{
srs_error_t err = srs_success;
int rtmp_len = frame_size + 2/* 2 bytes of flv audio tag header*/;
int rtmp_len = data_size + 2/* 2 bytes of flv audio tag header*/;
SrsCommonMessage rtmp;
rtmp.header.initialize_audio(rtmp_len, pts, audio_streamid_);
@ -842,9 +852,14 @@ srs_error_t SrsRtmpFromSrtBridge::on_aac_frame(SrsTsMessage* msg, uint32_t pts,
stream.write_1bytes(aac_flag);
stream.write_1bytes(1);
// Write audio frame.
stream.write_bytes(frame, frame_size);
stream.write_bytes(data, data_size);
SrsSharedPtrMessage frame;
if ((err = frame.create(&rtmp)) != srs_success) {
return srs_error_wrap(err, "create frame");
}
if ((err = live_source_->on_audio(&rtmp)) != srs_success) {
if ((err = bridge_->on_frame(&frame)) != srs_success) {
return srs_error_wrap(err, "srt to rtmp audio sh");
}
@ -855,6 +870,7 @@ SrsSrtSource::SrsSrtSource()
{
req = NULL;
can_publish_ = true;
frame_builder_ = NULL;
bridge_ = NULL;
}
@ -864,6 +880,7 @@ SrsSrtSource::~SrsSrtSource()
// for all consumers are auto free.
consumers.clear();
srs_freep(frame_builder_);
srs_freep(bridge_);
srs_freep(req);
}
@ -915,10 +932,13 @@ void SrsSrtSource::update_auth(SrsRequest* r)
req->update_auth(r);
}
void SrsSrtSource::set_bridge(ISrsSrtSourceBridge* bridge)
void SrsSrtSource::set_bridge(ISrsStreamBridge* bridge)
{
srs_freep(bridge_);
bridge_ = bridge;
srs_freep(frame_builder_);
frame_builder_ = new SrsSrtFrameBuilder(bridge);
}
srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer*& consumer)
@ -965,8 +985,18 @@ srs_error_t SrsSrtSource::on_publish()
return srs_error_wrap(err, "source id change");
}
if (bridge_ && (err = bridge_->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridge on publish");
if (bridge_) {
if ((err = frame_builder_->initialize(req)) != srs_success) {
return srs_error_wrap(err, "frame builder initialize");
}
if ((err = frame_builder_->on_publish()) != srs_success) {
return srs_error_wrap(err, "frame builder on publish");
}
if ((err = bridge_->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridge on publish");
}
}
SrsStatistic* stat = SrsStatistic::instance();
@ -985,9 +1015,12 @@ void SrsSrtSource::on_unpublish()
can_publish_ = true;
if (bridge_) {
frame_builder_->on_unpublish();
srs_freep(frame_builder_);
bridge_->on_unpublish();
srs_freep(bridge_);
}
srs_freep(bridge_);
}
srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet)
@ -1001,7 +1034,7 @@ srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet)
}
}
if (bridge_ && (err = bridge_->on_packet(packet)) != srs_success) {
if (frame_builder_ && (err = frame_builder_->on_packet(packet)) != srs_success) {
return srs_error_wrap(err, "bridge consume message");
}