From 9e417d54167d072bb9bc0aab9f0f8814e66e7e39 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 12 May 2020 19:53:21 +0800 Subject: [PATCH] Refine SrsSource, bridge to RTC. --- trunk/src/app/srs_app_rtc_source.cpp | 61 +++++++++++++++++++---- trunk/src/app/srs_app_rtc_source.hpp | 20 +++++++- trunk/src/app/srs_app_source.cpp | 72 ++++++++++++++++++++++++++-- trunk/src/app/srs_app_source.hpp | 19 +++++++- 4 files changed, 155 insertions(+), 17 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index d94ba9da4..ff526c527 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -152,6 +152,7 @@ SrsRtcSource::SrsRtcSource() _can_publish = true; rtc_publisher_ = NULL; + bridger_ = new SrsRtcFromRtmpBridger(this); } SrsRtcSource::~SrsRtcSource() @@ -165,6 +166,7 @@ SrsRtcSource::~SrsRtcSource() srs_freep(rtc); srs_freep(req); + srs_freep(bridger_); } srs_error_t SrsRtcSource::initialize(SrsRequest* r) @@ -225,6 +227,11 @@ int SrsRtcSource::pre_source_id() return _pre_source_id; } +ISrsSourceBridger* SrsRtcSource::bridger() +{ + return bridger_; +} + srs_error_t SrsRtcSource::create_consumer(SrsConnection* conn, SrsRtcConsumer*& consumer) { srs_error_t err = srs_success; @@ -382,6 +389,19 @@ srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg) } } + // TODO: FIXME: Support parsing OPUS for RTC. + if ((err = format->on_audio(msg)) != srs_success) { + return srs_error_wrap(err, "format consume audio"); + } + + // Parse RTMP message to RTP packets, in FU-A if too large. + if ((err = rtc->on_audio(msg, format)) != srs_success) { + // TODO: We should support more strategies. + srs_warn("rtc: ignore audio error %s", srs_error_desc(err).c_str()); + srs_error_reset(err); + rtc->on_unpublish(); + } + // copy to all consumer if (!drop_for_reduce) { for (int i = 0; i < (int)consumers.size(); i++) { @@ -401,11 +421,6 @@ srs_error_t SrsRtcSource::on_audio_imp(SrsSharedPtrMessage* msg) } } - // when sequence header, donot push to gop cache and adjust the timestamp. - if (is_sequence_header) { - return err; - } - // if atc, update the sequence header to abs time. if (meta->ash()) { meta->ash()->timestamp = msg->timestamp; @@ -465,11 +480,6 @@ srs_error_t SrsRtcSource::on_video_imp(SrsSharedPtrMessage* msg) } } - // when sequence header, donot push to gop cache and adjust the timestamp. - if (is_sequence_header) { - return err; - } - // if atc, update the sequence header to abs time. if (meta->vsh()) { meta->vsh()->timestamp = msg->timestamp; @@ -551,3 +561,34 @@ SrsRtcSource* SrsRtcSourceManager::fetch(SrsRequest* r) SrsRtcSourceManager* _srs_rtc_sources = new SrsRtcSourceManager(); +SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcSource* source) +{ + source_ = source; +} + +SrsRtcFromRtmpBridger::~SrsRtcFromRtmpBridger() +{ +} + +srs_error_t SrsRtcFromRtmpBridger::on_publish() +{ + // TODO: FIXME: Should sync with bridger? + return source_->on_publish(); +} + +srs_error_t SrsRtcFromRtmpBridger::on_audio(SrsSharedPtrMessage* audio) +{ + return source_->on_audio_imp(audio); +} + +srs_error_t SrsRtcFromRtmpBridger::on_video(SrsSharedPtrMessage* video) +{ + return source_->on_video_imp(video); +} + +void SrsRtcFromRtmpBridger::on_unpublish() +{ + // TODO: FIXME: Should sync with bridger? + source_->on_unpublish(); +} + diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 8c5bd1b48..e04474490 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -40,6 +40,7 @@ class SrsSharedPtrMessage; class SrsCommonMessage; class SrsMessageArray; class SrsRtcSource; +class SrsRtcFromRtmpBridger; class SrsRtcConsumer : public ISrsConsumerQueue { @@ -94,6 +95,8 @@ private: int _pre_source_id; SrsRequest* req; SrsRtcPublisher* rtc_publisher_; + // Transmux RTMP to RTC. + SrsRtcFromRtmpBridger* bridger_; private: // To delivery stream to clients. std::vector consumers; @@ -118,6 +121,8 @@ public: // Get current source id. virtual int source_id(); virtual int pre_source_id(); + // Get the bridger. + ISrsSourceBridger* bridger(); public: // Create consumer // @param consumer, output the create consumer. @@ -144,7 +149,6 @@ public: // TODO: FIXME: Merge with on_audio. srs_error_t on_rtc_audio(SrsSharedPtrMessage* audio); virtual srs_error_t on_video(SrsCommonMessage* video); -private: virtual srs_error_t on_audio_imp(SrsSharedPtrMessage* audio); virtual srs_error_t on_video_imp(SrsSharedPtrMessage* video); }; @@ -171,5 +175,19 @@ private: // Global singleton instance. extern SrsRtcSourceManager* _srs_rtc_sources; +class SrsRtcFromRtmpBridger : public ISrsSourceBridger +{ +private: + SrsRtcSource* source_; +public: + SrsRtcFromRtmpBridger(SrsRtcSource* source); + virtual ~SrsRtcFromRtmpBridger(); +public: + virtual srs_error_t on_publish(); + virtual srs_error_t on_audio(SrsSharedPtrMessage* audio); + virtual srs_error_t on_video(SrsSharedPtrMessage* video); + virtual void on_unpublish(); +}; + #endif diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index caa58a140..9f1ebe83a 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -50,6 +50,7 @@ using namespace std; #include #include #include +#include #define CONST_MAX_JITTER_MS 250 #define CONST_MAX_JITTER_MS_NEG -250 @@ -1715,6 +1716,7 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* // Use lock to protect coroutine switch. // @bug https://github.com/ossrs/srs/issues/1230 + // TODO: FIXME: Use smaller lock. SrsLocker(lock); SrsSource* source = NULL; @@ -1729,17 +1731,41 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); +#ifdef SRS_RTC + bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); + bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost); + + // Get the RTC source and bridger. + SrsRtcSource* rtc = NULL; + if (rtc_server_enabled && rtc_enabled) { + if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) { + err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str()); + goto failed; + } + } +#endif srs_trace("new source, stream_url=%s", stream_url.c_str()); - + source = new SrsSource(); if ((err = source->initialize(r, h)) != srs_success) { - return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); + err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); + goto failed; } + +#ifdef SRS_RTC + // If rtc enabled, bridge RTMP source to RTC, + // all RTMP packets will be forwarded to RTC source. + if (source && rtc) { + source->bridge_to(rtc->bridger()); + } +#endif pool[stream_url] = source; - *pps = source; - + return err; + +failed: + srs_freep(source); return err; } @@ -1832,6 +1858,14 @@ void SrsSourceManager::destroy() pool.clear(); } +ISrsSourceBridger::ISrsSourceBridger() +{ +} + +ISrsSourceBridger::~ISrsSourceBridger() +{ +} + SrsSource::SrsSource() { req = NULL; @@ -1842,6 +1876,9 @@ SrsSource::SrsSource() _can_publish = true; _pre_source_id = _source_id = -1; die_at = 0; + + handler = NULL; + bridger = NULL; play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); @@ -1948,6 +1985,11 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) return err; } +void SrsSource::set_bridger(ISrsSourceBridger* v) +{ + bridger = v; +} + srs_error_t SrsSource::on_reload_vhost_play(string vhost) { srs_error_t err = srs_success; @@ -2198,6 +2240,11 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) return srs_error_wrap(err, "consume audio"); } + // For bridger to consume the message. + if (bridger && (err = bridger->on_audio(msg)) != srs_success) { + return srs_error_wrap(err, "bridger consume audio"); + } + // copy to all consumer if (!drop_for_reduce) { for (int i = 0; i < (int)consumers.size(); i++) { @@ -2226,7 +2273,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) if ((err = gop_cache->cache(msg)) != srs_success) { return srs_error_wrap(err, "gop cache consume audio"); } - + // if atc, update the sequence header to abs time. if (atc) { if (meta->ash()) { @@ -2323,6 +2370,11 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg) return srs_error_wrap(err, "hub consume video"); } + // For bridger to consume the message. + if (bridger && (err = bridger->on_video(msg)) != srs_success) { + return srs_error_wrap(err, "bridger consume video"); + } + // copy to all consumer if (!drop_for_reduce) { for (int i = 0; i < (int)consumers.size(); i++) { @@ -2482,6 +2534,11 @@ srs_error_t SrsSource::on_publish() if ((err = handler->on_publish(this, req)) != srs_success) { return srs_error_wrap(err, "handle publish"); } + + if (bridger && (err = bridger->on_publish()) != srs_success) { + return srs_error_wrap(err, "bridger publish"); + } + SrsStatistic* stat = SrsStatistic::instance(); stat->on_stream_publish(req, _source_id); @@ -2517,7 +2574,12 @@ void SrsSource::on_unpublish() srs_assert(handler); SrsStatistic* stat = SrsStatistic::instance(); stat->on_stream_close(req); + handler->on_unpublish(this, req); + + if (bridger) { + bridger->on_unpublish(); + } // no consumer, stream is die. if (consumers.empty()) { diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 4e54666ad..be84b1328 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -488,7 +488,7 @@ public: private: virtual srs_error_t do_cycle(); public: - // when system exit, destroy the sources, + // when system exit, destroy th`e sources, // For gmc to analysis mem leaks. virtual void destroy(); }; @@ -496,6 +496,19 @@ public: // Global singleton instance. extern SrsSourceManager* _srs_sources; +// For two sources to bridge with each other. +class ISrsSourceBridger +{ +public: + ISrsSourceBridger(); + virtual ~ISrsSourceBridger(); +public: + virtual srs_error_t on_publish() = 0; + virtual srs_error_t on_audio(SrsSharedPtrMessage* audio) = 0; + virtual srs_error_t on_video(SrsSharedPtrMessage* video) = 0; + virtual void on_unpublish() = 0; +}; + // live streaming source. class SrsSource : public ISrsReloadHandler { @@ -529,6 +542,8 @@ private: int64_t last_packet_time; // The event handler. ISrsSourceHandler* handler; + // The source bridger for other source. + ISrsSourceBridger* bridger; // The edge control service SrsPlayEdge* play_edge; SrsPublishEdge* publish_edge; @@ -555,6 +570,8 @@ public: public: // Initialize the hls with handlers. virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h); + // Bridge to other source, forward packets to it. + void bridge_to(ISrsSourceBridger* v); // Interface ISrsReloadHandler public: virtual srs_error_t on_reload_vhost_play(std::string vhost);