From f09d4576446b129a9f9f569fde0dc5cf9ba45224 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 28 Nov 2013 22:59:00 +0800 Subject: [PATCH] add forward framework --- trunk/src/core/srs_core_config.cpp | 11 +++ trunk/src/core/srs_core_config.hpp | 1 + trunk/src/core/srs_core_forward.cpp | 30 ++++++ trunk/src/core/srs_core_forward.hpp | 10 ++ trunk/src/core/srs_core_source.cpp | 137 +++++++++++++++++++++------- 5 files changed, 155 insertions(+), 34 deletions(-) diff --git a/trunk/src/core/srs_core_config.cpp b/trunk/src/core/srs_core_config.cpp index 22c2d57e0..dcce32f02 100644 --- a/trunk/src/core/srs_core_config.cpp +++ b/trunk/src/core/srs_core_config.cpp @@ -573,6 +573,17 @@ SrsConfDirective* SrsConfig::get_gop_cache(std::string vhost) return conf->get("gop_cache"); } +SrsConfDirective* SrsConfig::get_forward(std::string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return NULL; + } + + return conf->get("forward"); +} + SrsConfDirective* SrsConfig::get_hls(std::string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/core/srs_core_config.hpp b/trunk/src/core/srs_core_config.hpp index b02c1efbd..8c27c1c27 100644 --- a/trunk/src/core/srs_core_config.hpp +++ b/trunk/src/core/srs_core_config.hpp @@ -115,6 +115,7 @@ public: virtual SrsConfDirective* get_vhost(std::string vhost); virtual SrsConfDirective* get_vhost_enabled(std::string vhost); virtual SrsConfDirective* get_gop_cache(std::string vhost); + virtual SrsConfDirective* get_forward(std::string vhost); virtual SrsConfDirective* get_hls(std::string vhost); virtual SrsConfDirective* get_hls_path(std::string vhost); virtual SrsConfDirective* get_hls_fragment(std::string vhost); diff --git a/trunk/src/core/srs_core_forward.cpp b/trunk/src/core/srs_core_forward.cpp index 331a3da76..b947f3027 100644 --- a/trunk/src/core/srs_core_forward.cpp +++ b/trunk/src/core/srs_core_forward.cpp @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include + SrsForwarder::SrsForwarder() { } @@ -31,3 +33,31 @@ SrsForwarder::~SrsForwarder() { } +int SrsForwarder::on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +void SrsForwarder::on_unpublish() +{ +} + +int SrsForwarder::on_meta_data(SrsOnMetaDataPacket* metadata) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +int SrsForwarder::on_audio(SrsSharedPtrMessage* msg) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +int SrsForwarder::on_video(SrsSharedPtrMessage* msg) +{ + int ret = ERROR_SUCCESS; + return ret; +} + diff --git a/trunk/src/core/srs_core_forward.hpp b/trunk/src/core/srs_core_forward.hpp index 67d6c0b92..0b00f92f4 100644 --- a/trunk/src/core/srs_core_forward.hpp +++ b/trunk/src/core/srs_core_forward.hpp @@ -29,6 +29,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include +#include + +class SrsSharedPtrMessage; +class SrsOnMetaDataPacket; + /** * forward the stream to other servers. */ @@ -38,6 +43,11 @@ public: SrsForwarder(); virtual ~SrsForwarder(); public: + virtual int on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server); + virtual void on_unpublish(); + virtual int on_meta_data(SrsOnMetaDataPacket* metadata); + virtual int on_audio(SrsSharedPtrMessage* msg); + virtual int on_video(SrsSharedPtrMessage* msg); }; #endif diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index 4024e5a18..e62b32a13 100644 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -32,6 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include #define CONST_MAX_JITTER_MS 500 #define DEFAULT_FRAME_TIME_MS 10 @@ -408,6 +409,17 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata return ret; } #endif + + if (true) { + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + if ((ret = forwarder->on_meta_data(metadata)) != ERROR_SUCCESS) { + srs_error("forwarder process onMetaData message failed. ret=%d", ret); + return ret; + } + } + } metadata->metadata->set("server", new SrsAmf0String( RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); @@ -453,15 +465,17 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata srs_verbose("initialize shared ptr metadata success."); // copy to all consumer - std::vector::iterator it; - for (it = consumers.begin(); it != consumers.end(); ++it) { - SrsConsumer* consumer = *it; - if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { - srs_error("dispatch the metadata failed. ret=%d", ret); - return ret; + if (true) { + std::vector::iterator it; + for (it = consumers.begin(); it != consumers.end(); ++it) { + SrsConsumer* consumer = *it; + if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { + srs_error("dispatch the metadata failed. ret=%d", ret); + return ret; + } } + srs_trace("dispatch metadata success."); } - srs_trace("dispatch metadata success."); return ret; } @@ -484,17 +498,30 @@ int SrsSource::on_audio(SrsCommonMessage* audio) return ret; } #endif - - // copy to all consumer - std::vector::iterator it; - for (it = consumers.begin(); it != consumers.end(); ++it) { - SrsConsumer* consumer = *it; - if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { - srs_error("dispatch the audio failed. ret=%d", ret); - return ret; + + if (true) { + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) { + srs_error("forwarder process audio message failed. ret=%d", ret); + return ret; + } } } - srs_info("dispatch audio success."); + + // copy to all consumer + if (true) { + std::vector::iterator it; + for (it = consumers.begin(); it != consumers.end(); ++it) { + SrsConsumer* consumer = *it; + if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { + srs_error("dispatch the audio failed. ret=%d", ret); + return ret; + } + } + srs_info("dispatch audio success."); + } // cache the sequence header if h264 if (SrsCodec::audio_is_sequence_header(msg->payload, msg->size)) { @@ -532,17 +559,30 @@ int SrsSource::on_video(SrsCommonMessage* video) return ret; } #endif - - // copy to all consumer - std::vector::iterator it; - for (it = consumers.begin(); it != consumers.end(); ++it) { - SrsConsumer* consumer = *it; - if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { - srs_error("dispatch the video failed. ret=%d", ret); - return ret; + + if (true) { + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) { + srs_error("forwarder process video message failed. ret=%d", ret); + return ret; + } } } - srs_info("dispatch video success."); + + // copy to all consumer + if (true) { + std::vector::iterator it; + for (it = consumers.begin(); it != consumers.end(); ++it) { + SrsConsumer* consumer = *it; + if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { + srs_error("dispatch the video failed. ret=%d", ret); + return ret; + } + } + srs_info("dispatch video success."); + } // cache the sequence header if h264 if (SrsCodec::video_is_sequence_header(msg->payload, msg->size)) { @@ -562,26 +602,55 @@ int SrsSource::on_video(SrsCommonMessage* video) return ret; } -#ifdef SRS_HLS int SrsSource::on_publish(std::string vhost, std::string app, std::string stream) { + int ret = ERROR_SUCCESS; + _can_publish = false; - return hls->on_publish(vhost, app, stream); -} -#else -int SrsSource::on_publish(std::string /*vhost*/, std::string /*app*/, std::string /*stream*/) -{ - _can_publish = false; - return ERROR_SUCCESS; -} + +#ifdef SRS_HLS + if ((ret = hls->on_publish(vhost, app, stream)) != ERROR_SUCCESS) { + return ret; + } #endif + // TODO: support reload. + + // create forwarders + SrsConfDirective* conf = config->get_forward(vhost); + for (int i = 0; conf && i < conf->args.size(); i++) { + std::string forward_server = conf->args.at(i); + + SrsForwarder* forwarder = new SrsForwarder(); + forwarders.push_back(forwarder); + + if ((ret = forwarder->on_publish(vhost, app, stream, forward_server)) != ERROR_SUCCESS) { + srs_error("start forwarder failed. " + "vhost=%s, app=%s, stream=%s, forward-to=%s", + vhost.c_str(), app.c_str(), stream.c_str(), + forward_server.c_str()); + return ret; + } + } + + return ret; +} + void SrsSource::on_unpublish() { #ifdef SRS_HLS hls->on_unpublish(); #endif + // close all forwarders + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + forwarder->on_unpublish(); + srs_freep(forwarder); + } + forwarders.clear(); + gop_cache->clear(); srs_freep(cache_metadata);