1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Bridger: Start RTMP2RTC bridger in RTMP publisher

This commit is contained in:
winlin 2021-04-21 19:17:56 +08:00 committed by Winlin
parent c10232b4e2
commit c770e6d7bc
5 changed files with 50 additions and 139 deletions

View file

@ -337,7 +337,6 @@ SrsRtcStream::SrsRtcStream()
stream_desc_ = NULL; stream_desc_ = NULL;
req = NULL; req = NULL;
bridger_ = new SrsRtcDummyBridger(this);
} }
SrsRtcStream::~SrsRtcStream() SrsRtcStream::~SrsRtcStream()
@ -347,7 +346,6 @@ SrsRtcStream::~SrsRtcStream()
consumers.clear(); consumers.clear();
srs_freep(req); srs_freep(req);
srs_freep(bridger_);
srs_freep(stream_desc_); srs_freep(stream_desc_);
} }
@ -408,11 +406,6 @@ SrsContextId SrsRtcStream::pre_source_id()
return _pre_source_id; return _pre_source_id;
} }
ISrsSourceBridger* SrsRtcStream::bridger()
{
return bridger_;
}
srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer) srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -477,16 +470,6 @@ srs_error_t SrsRtcStream::on_publish()
is_created_ = true; is_created_ = true;
is_delivering_packets_ = true; is_delivering_packets_ = true;
// Create a new bridger, because it's been disposed when unpublish.
#ifdef SRS_FFMPEG_FIT
SrsRtcFromRtmpBridger* impl = new SrsRtcFromRtmpBridger(this);
if ((err = impl->initialize(req)) != srs_success) {
return srs_error_wrap(err, "bridge initialize");
}
bridger_->setup(impl);
#endif
// Notify the consumers about stream change event. // Notify the consumers about stream change event.
if ((err = on_source_changed()) != srs_success) { if ((err = on_source_changed()) != srs_success) {
return srs_error_wrap(err, "source id change"); return srs_error_wrap(err, "source id change");
@ -522,11 +505,6 @@ void SrsRtcStream::on_unpublish()
// release unpublish stream description. // release unpublish stream description.
set_stream_desc(NULL); set_stream_desc(NULL);
// Dispose the impl of bridger, to free memory.
#ifdef SRS_FFMPEG_FIT
bridger_->setup(NULL);
#endif
// TODO: FIXME: Handle by statistic. // TODO: FIXME: Handle by statistic.
} }
@ -1196,56 +1174,6 @@ srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector<SrsRtpPacketCacheHelpe
} }
#endif #endif
SrsRtcDummyBridger::SrsRtcDummyBridger(SrsRtcStream* s)
{
rtc_ = s;
impl_ = NULL;
}
SrsRtcDummyBridger::~SrsRtcDummyBridger()
{
srs_freep(impl_);
}
srs_error_t SrsRtcDummyBridger::on_publish()
{
if (impl_) {
return impl_->on_publish();
}
return rtc_->on_publish();
}
srs_error_t SrsRtcDummyBridger::on_audio(SrsSharedPtrMessage* audio)
{
if (impl_) {
return impl_->on_audio(audio);
}
return srs_success;
}
srs_error_t SrsRtcDummyBridger::on_video(SrsSharedPtrMessage* video)
{
if (impl_) {
return impl_->on_video(video);
}
return srs_success;
}
void SrsRtcDummyBridger::on_unpublish()
{
if (impl_) {
impl_->on_unpublish();
return;
}
rtc_->on_unpublish();
}
void SrsRtcDummyBridger::setup(ISrsSourceBridger* impl)
{
srs_freep(impl_);
impl_ = impl;
}
SrsCodecPayload::SrsCodecPayload() SrsCodecPayload::SrsCodecPayload()
{ {
pt_of_publisher_ = pt_ = 0; pt_of_publisher_ = pt_ = 0;

View file

@ -56,7 +56,6 @@ class SrsRtpRingBuffer;
class SrsRtpNackForReceiver; class SrsRtpNackForReceiver;
class SrsJsonObject; class SrsJsonObject;
class SrsErrorPithyPrint; class SrsErrorPithyPrint;
class SrsRtcDummyBridger;
class SrsNtp class SrsNtp
{ {
@ -177,8 +176,6 @@ private:
SrsContextId _pre_source_id; SrsContextId _pre_source_id;
SrsRequest* req; SrsRequest* req;
ISrsRtcPublishStream* publish_stream_; ISrsRtcPublishStream* publish_stream_;
// Transmux RTMP to RTC.
SrsRtcDummyBridger* bridger_;
// Steam description for this steam. // Steam description for this steam.
SrsRtcStreamDescription* stream_desc_; SrsRtcStreamDescription* stream_desc_;
private: private:
@ -204,8 +201,6 @@ public:
// Get current source id. // Get current source id.
virtual SrsContextId source_id(); virtual SrsContextId source_id();
virtual SrsContextId pre_source_id(); virtual SrsContextId pre_source_id();
// Get the bridger.
ISrsSourceBridger* bridger();
public: public:
// Create consumer // Create consumer
// @param consumer, output the create consumer. // @param consumer, output the create consumer.
@ -293,25 +288,6 @@ private:
}; };
#endif #endif
class SrsRtcDummyBridger : public ISrsSourceBridger
{
private:
SrsRtcStream* rtc_;
// The optional implementation bridger, ignore if NULL.
ISrsSourceBridger* impl_;
public:
SrsRtcDummyBridger(SrsRtcStream* s);
virtual ~SrsRtcDummyBridger();
public:
virtual srs_error_t on_publish();
virtual srs_error_t on_audio(SrsSharedPtrMessage* audio);
virtual srs_error_t on_video(SrsSharedPtrMessage* video);
virtual void on_unpublish();
public:
// Setup a new implementation bridger, which might be NULL to free previous one.
void setup(ISrsSourceBridger* impl);
};
// TODO: FIXME: Rename it. // TODO: FIXME: Rename it.
class SrsCodecPayload class SrsCodecPayload
{ {

View file

@ -55,6 +55,7 @@ using namespace std;
#include <srs_app_statistic.hpp> #include <srs_app_statistic.hpp>
#include <srs_protocol_utility.hpp> #include <srs_protocol_utility.hpp>
#include <srs_protocol_json.hpp> #include <srs_protocol_json.hpp>
#include <srs_app_rtc_source.hpp>
// the timeout in srs_utime_t to wait encoder to republish // the timeout in srs_utime_t to wait encoder to republish
// if timeout, close the connection. // if timeout, close the connection.
@ -959,23 +960,47 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source)
srs_error_t err = srs_success; srs_error_t err = srs_success;
SrsRequest* req = info->req; SrsRequest* req = info->req;
// Check whether RTC stream is busy.
#ifdef SRS_RTC
SrsRtcStream *rtc = NULL;
bool rtc_server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost);
if (rtc_server_enabled && rtc_enabled && !info->edge) {
if ((err = _srs_rtc_sources->fetch_or_create(req, &rtc)) != srs_success) {
return srs_error_wrap(err, "create source");
}
if (!rtc->can_publish()) {
return srs_error_new(ERROR_RTC_SOURCE_BUSY, "rtc stream %s busy", req->get_stream_url().c_str());
}
}
#endif
// Check whether RTMP stream is busy.
if (!source->can_publish(info->edge)) { if (!source->can_publish(info->edge)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str()); return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str());
} }
// when edge, ignore the publish event, directly proxy it. // Bridge to RTC streaming.
if (info->edge) { #ifdef SRS_RTC
if ((err = source->on_edge_start_publish()) != srs_success) { if (rtc) {
return srs_error_wrap(err, "rtmp: edge start publish"); SrsRtcFromRtmpBridger *bridger = new SrsRtcFromRtmpBridger(rtc);
} if ((err = bridger->initialize(req)) != srs_success) {
} else { srs_freep(bridger);
if ((err = source->on_publish()) != srs_success) { return srs_error_wrap(err, "bridger init");
return srs_error_wrap(err, "rtmp: source publish");
} }
source->set_bridger(bridger);
}
#endif
// Start publisher now.
if (info->edge) {
return source->on_edge_start_publish();
} else {
return source->on_publish();
} }
return err;
} }
void SrsRtmpConn::release_publish(SrsSource* source) void SrsRtmpConn::release_publish(SrsSource* source)

View file

@ -1728,19 +1728,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler*
// should always not exists for create a source. // should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end()); srs_assert (pool.find(stream_url) == pool.end());
#ifdef SRS_RTC
bool rtc_server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost);
// Get the RTC source and bridger.
SrsRtcStream* rtc = NULL;
if (rtc_server_enabled && rtc_enabled) {
if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) {
err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str());
goto failed;
}
}
#endif
srs_trace("new source, stream_url=%s", stream_url.c_str()); srs_trace("new source, stream_url=%s", stream_url.c_str());
source = new SrsSource(); source = new SrsSource();
@ -1748,14 +1735,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler*
err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
goto failed; goto failed;
} }
#ifdef SRS_RTC
// If rtc enabled, bridge RTMP source to RTC,
// all RTMP packets will be forwarded to RTC source.
if (source && rtc) {
source->bridge_to(rtc->bridger());
}
#endif
pool[stream_url] = source; pool[stream_url] = source;
*pps = source; *pps = source;
@ -1883,7 +1862,7 @@ SrsSource::SrsSource()
die_at = 0; die_at = 0;
handler = NULL; handler = NULL;
bridger = NULL; bridger_ = NULL;
play_edge = new SrsPlayEdge(); play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge(); publish_edge = new SrsPublishEdge();
@ -1915,6 +1894,7 @@ SrsSource::~SrsSource()
srs_freep(gop_cache); srs_freep(gop_cache);
srs_freep(req); srs_freep(req);
srs_freep(bridger_);
} }
void SrsSource::dispose() void SrsSource::dispose()
@ -1990,9 +1970,10 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
return err; return err;
} }
void SrsSource::bridge_to(ISrsSourceBridger* v) void SrsSource::set_bridger(ISrsSourceBridger* v)
{ {
bridger = v; srs_freep(bridger_);
bridger_ = v;
} }
srs_error_t SrsSource::on_reload_vhost_play(string vhost) srs_error_t SrsSource::on_reload_vhost_play(string vhost)
@ -2245,7 +2226,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
} }
// For bridger to consume the message. // For bridger to consume the message.
if (bridger && (err = bridger->on_audio(msg)) != srs_success) { if (bridger_ && (err = bridger_->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "bridger consume audio"); return srs_error_wrap(err, "bridger consume audio");
} }
@ -2375,7 +2356,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
} }
// For bridger to consume the message. // For bridger to consume the message.
if (bridger && (err = bridger->on_video(msg)) != srs_success) { if (bridger_ && (err = bridger_->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "bridger consume video"); return srs_error_wrap(err, "bridger consume video");
} }
@ -2539,7 +2520,7 @@ srs_error_t SrsSource::on_publish()
return srs_error_wrap(err, "handle publish"); return srs_error_wrap(err, "handle publish");
} }
if (bridger && (err = bridger->on_publish()) != srs_success) { if (bridger_ && (err = bridger_->on_publish()) != srs_success) {
return srs_error_wrap(err, "bridger publish"); return srs_error_wrap(err, "bridger publish");
} }
@ -2584,8 +2565,9 @@ void SrsSource::on_unpublish()
handler->on_unpublish(this, req); handler->on_unpublish(this, req);
if (bridger) { if (bridger_) {
bridger->on_unpublish(); bridger_->on_unpublish();
srs_freep(bridger_);
} }
// no consumer, stream is die. // no consumer, stream is die.

View file

@ -535,7 +535,7 @@ private:
// The event handler. // The event handler.
ISrsSourceHandler* handler; ISrsSourceHandler* handler;
// The source bridger for other source. // The source bridger for other source.
ISrsSourceBridger* bridger; ISrsSourceBridger* bridger_;
// The edge control service // The edge control service
SrsPlayEdge* play_edge; SrsPlayEdge* play_edge;
SrsPublishEdge* publish_edge; SrsPublishEdge* publish_edge;
@ -563,7 +563,7 @@ public:
// Initialize the hls with handlers. // Initialize the hls with handlers.
virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h); virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h);
// Bridge to other source, forward packets to it. // Bridge to other source, forward packets to it.
void bridge_to(ISrsSourceBridger* v); void set_bridger(ISrsSourceBridger* v);
// Interface ISrsReloadHandler // Interface ISrsReloadHandler
public: public:
virtual srs_error_t on_reload_vhost_play(std::string vhost); virtual srs_error_t on_reload_vhost_play(std::string vhost);