diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 4e2108d9d..8570252be 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -337,7 +337,6 @@ SrsRtcStream::SrsRtcStream() stream_desc_ = NULL; req = NULL; - bridger_ = new SrsRtcDummyBridger(this); } SrsRtcStream::~SrsRtcStream() @@ -347,7 +346,6 @@ SrsRtcStream::~SrsRtcStream() consumers.clear(); srs_freep(req); - srs_freep(bridger_); srs_freep(stream_desc_); } @@ -408,11 +406,6 @@ SrsContextId SrsRtcStream::pre_source_id() return _pre_source_id; } -ISrsSourceBridger* SrsRtcStream::bridger() -{ - return bridger_; -} - srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer) { srs_error_t err = srs_success; @@ -477,16 +470,6 @@ srs_error_t SrsRtcStream::on_publish() is_created_ = true; is_delivering_packets_ = true; - // Create a new bridger, because it's been disposed when unpublish. -#ifdef SRS_FFMPEG_FIT - SrsRtcFromRtmpBridger* impl = new SrsRtcFromRtmpBridger(this); - if ((err = impl->initialize(req)) != srs_success) { - return srs_error_wrap(err, "bridge initialize"); - } - - bridger_->setup(impl); -#endif - // Notify the consumers about stream change event. if ((err = on_source_changed()) != srs_success) { return srs_error_wrap(err, "source id change"); @@ -522,11 +505,6 @@ void SrsRtcStream::on_unpublish() // release unpublish stream description. set_stream_desc(NULL); - // Dispose the impl of bridger, to free memory. -#ifdef SRS_FFMPEG_FIT - bridger_->setup(NULL); -#endif - // TODO: FIXME: Handle by statistic. } @@ -1196,56 +1174,6 @@ srs_error_t SrsRtcFromRtmpBridger::consume_packets(vectoron_publish(); - } - return rtc_->on_publish(); -} - -srs_error_t SrsRtcDummyBridger::on_audio(SrsSharedPtrMessage* audio) -{ - if (impl_) { - return impl_->on_audio(audio); - } - return srs_success; -} - -srs_error_t SrsRtcDummyBridger::on_video(SrsSharedPtrMessage* video) -{ - if (impl_) { - return impl_->on_video(video); - } - return srs_success; -} - -void SrsRtcDummyBridger::on_unpublish() -{ - if (impl_) { - impl_->on_unpublish(); - return; - } - rtc_->on_unpublish(); -} - -void SrsRtcDummyBridger::setup(ISrsSourceBridger* impl) -{ - srs_freep(impl_); - impl_ = impl; -} - SrsCodecPayload::SrsCodecPayload() { pt_of_publisher_ = pt_ = 0; diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 1495f3eaf..a7eb4abfc 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -56,7 +56,6 @@ class SrsRtpRingBuffer; class SrsRtpNackForReceiver; class SrsJsonObject; class SrsErrorPithyPrint; -class SrsRtcDummyBridger; class SrsNtp { @@ -177,8 +176,6 @@ private: SrsContextId _pre_source_id; SrsRequest* req; ISrsRtcPublishStream* publish_stream_; - // Transmux RTMP to RTC. - SrsRtcDummyBridger* bridger_; // Steam description for this steam. SrsRtcStreamDescription* stream_desc_; private: @@ -204,8 +201,6 @@ public: // Get current source id. virtual SrsContextId source_id(); virtual SrsContextId pre_source_id(); - // Get the bridger. - ISrsSourceBridger* bridger(); public: // Create consumer // @param consumer, output the create consumer. @@ -293,25 +288,6 @@ private: }; #endif -class SrsRtcDummyBridger : public ISrsSourceBridger -{ -private: - SrsRtcStream* rtc_; - // The optional implementation bridger, ignore if NULL. - ISrsSourceBridger* impl_; -public: - SrsRtcDummyBridger(SrsRtcStream* s); - virtual ~SrsRtcDummyBridger(); -public: - virtual srs_error_t on_publish(); - virtual srs_error_t on_audio(SrsSharedPtrMessage* audio); - virtual srs_error_t on_video(SrsSharedPtrMessage* video); - virtual void on_unpublish(); -public: - // Setup a new implementation bridger, which might be NULL to free previous one. - void setup(ISrsSourceBridger* impl); -}; - // TODO: FIXME: Rename it. class SrsCodecPayload { diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index e82ca3fbb..4cf58610f 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -55,6 +55,7 @@ using namespace std; #include #include #include +#include // the timeout in srs_utime_t to wait encoder to republish // if timeout, close the connection. @@ -959,23 +960,47 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source) srs_error_t err = srs_success; SrsRequest* req = info->req; - + + // Check whether RTC stream is busy. +#ifdef SRS_RTC + SrsRtcStream *rtc = NULL; + bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); + bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost); + if (rtc_server_enabled && rtc_enabled && !info->edge) { + if ((err = _srs_rtc_sources->fetch_or_create(req, &rtc)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + if (!rtc->can_publish()) { + return srs_error_new(ERROR_RTC_SOURCE_BUSY, "rtc stream %s busy", req->get_stream_url().c_str()); + } + } +#endif + + // Check whether RTMP stream is busy. if (!source->can_publish(info->edge)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str()); } - - // when edge, ignore the publish event, directly proxy it. - if (info->edge) { - if ((err = source->on_edge_start_publish()) != srs_success) { - return srs_error_wrap(err, "rtmp: edge start publish"); - } - } else { - if ((err = source->on_publish()) != srs_success) { - return srs_error_wrap(err, "rtmp: source publish"); + + // Bridge to RTC streaming. +#ifdef SRS_RTC + if (rtc) { + SrsRtcFromRtmpBridger *bridger = new SrsRtcFromRtmpBridger(rtc); + if ((err = bridger->initialize(req)) != srs_success) { + srs_freep(bridger); + return srs_error_wrap(err, "bridger init"); } + + source->set_bridger(bridger); + } +#endif + + // Start publisher now. + if (info->edge) { + return source->on_edge_start_publish(); + } else { + return source->on_publish(); } - - return err; } void SrsRtmpConn::release_publish(SrsSource* source) diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 1a9553d41..ce09e6614 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1728,19 +1728,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); -#ifdef SRS_RTC - bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); - bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost); - - // Get the RTC source and bridger. - SrsRtcStream* rtc = NULL; - if (rtc_server_enabled && rtc_enabled) { - if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) { - err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str()); - goto failed; - } - } -#endif srs_trace("new source, stream_url=%s", stream_url.c_str()); source = new SrsSource(); @@ -1748,14 +1735,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); goto failed; } - -#ifdef SRS_RTC - // If rtc enabled, bridge RTMP source to RTC, - // all RTMP packets will be forwarded to RTC source. - if (source && rtc) { - source->bridge_to(rtc->bridger()); - } -#endif pool[stream_url] = source; *pps = source; @@ -1883,7 +1862,7 @@ SrsSource::SrsSource() die_at = 0; handler = NULL; - bridger = NULL; + bridger_ = NULL; play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); @@ -1915,6 +1894,7 @@ SrsSource::~SrsSource() srs_freep(gop_cache); srs_freep(req); + srs_freep(bridger_); } void SrsSource::dispose() @@ -1990,9 +1970,10 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) return err; } -void SrsSource::bridge_to(ISrsSourceBridger* v) +void SrsSource::set_bridger(ISrsSourceBridger* v) { - bridger = v; + srs_freep(bridger_); + bridger_ = v; } srs_error_t SrsSource::on_reload_vhost_play(string vhost) @@ -2245,7 +2226,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) } // For bridger to consume the message. - if (bridger && (err = bridger->on_audio(msg)) != srs_success) { + if (bridger_ && (err = bridger_->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "bridger consume audio"); } @@ -2375,7 +2356,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg) } // For bridger to consume the message. - if (bridger && (err = bridger->on_video(msg)) != srs_success) { + if (bridger_ && (err = bridger_->on_video(msg)) != srs_success) { return srs_error_wrap(err, "bridger consume video"); } @@ -2539,7 +2520,7 @@ srs_error_t SrsSource::on_publish() return srs_error_wrap(err, "handle publish"); } - if (bridger && (err = bridger->on_publish()) != srs_success) { + if (bridger_ && (err = bridger_->on_publish()) != srs_success) { return srs_error_wrap(err, "bridger publish"); } @@ -2584,8 +2565,9 @@ void SrsSource::on_unpublish() handler->on_unpublish(this, req); - if (bridger) { - bridger->on_unpublish(); + if (bridger_) { + bridger_->on_unpublish(); + srs_freep(bridger_); } // no consumer, stream is die. diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 84474fbd8..5244bbbfd 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -535,7 +535,7 @@ private: // The event handler. ISrsSourceHandler* handler; // The source bridger for other source. - ISrsSourceBridger* bridger; + ISrsSourceBridger* bridger_; // The edge control service SrsPlayEdge* play_edge; SrsPublishEdge* publish_edge; @@ -563,7 +563,7 @@ public: // Initialize the hls with handlers. virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h); // Bridge to other source, forward packets to it. - void bridge_to(ISrsSourceBridger* v); + void set_bridger(ISrsSourceBridger* v); // Interface ISrsReloadHandler public: virtual srs_error_t on_reload_vhost_play(std::string vhost);