From 2f397d0460c9c24ea2afb3cc371094e4e75b5c29 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 15 Dec 2013 12:04:28 +0800 Subject: [PATCH] support reload the gop_cache --- README.md | 2 +- trunk/src/core/srs_core_client.cpp | 17 +++++++++---- trunk/src/core/srs_core_client.hpp | 2 +- trunk/src/core/srs_core_config.cpp | 40 ++++++++++++++++++++++++------ trunk/src/core/srs_core_reload.cpp | 9 ++++++- trunk/src/core/srs_core_reload.hpp | 5 ++-- trunk/src/core/srs_core_source.cpp | 30 +++++++++++++++++++--- trunk/src/core/srs_core_source.hpp | 15 ++++++++--- 8 files changed, 95 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 102eae950..57964586d 100755 --- a/README.md +++ b/README.md @@ -198,7 +198,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw * nginx v1.5.0: 139524 lines
### History -* v0.9, 2013-12-14, support reload the hls/forwarder/transcoder. +* v0.9, 2013-12-15, support reload the hls/forwarder/transcoder. * v0.9, 2013-12-14, refine the thread model for the retry threads. * v0.9, 2013-12-10, auto install depends tools/libs on centos/ubuntu. * v0.8, 2013-12-08, v0.8 released. 19186 lines. diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index ef2a8328e..163b76187 100644 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -26,6 +26,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +using namespace std; + #include #include #include @@ -118,15 +120,20 @@ int SrsClient::do_cycle() return ret; } -int SrsClient::on_reload_vhost_removed(SrsConfDirective* vhost) +int SrsClient::on_reload_vhost_removed(string vhost) { int ret = ERROR_SUCCESS; - // if the vhost connected is removed, disconnect the client. - if (req->vhost == vhost->arg0()) { - srs_close_stfd(stfd); + if (req->vhost != vhost) { + return ret; } + // if the vhost connected is removed, disconnect the client. + srs_trace("vhost %s removed/disabled, close client url=%s", + vhost.c_str(), req->get_stream_url().c_str()); + + srs_close_stfd(stfd); + return ret; } @@ -174,7 +181,7 @@ int SrsClient::service_cycle() srs_trace("set chunk_size=%d success", chunk_size); // find a source to publish. - SrsSource* source = SrsSource::find(req->get_stream_url()); + SrsSource* source = SrsSource::find(req->get_stream_url(), req->vhost); srs_assert(source != NULL); // check publish available. diff --git a/trunk/src/core/srs_core_client.hpp b/trunk/src/core/srs_core_client.hpp index 3273fb93b..1c7c7f044 100644 --- a/trunk/src/core/srs_core_client.hpp +++ b/trunk/src/core/srs_core_client.hpp @@ -65,7 +65,7 @@ protected: virtual int do_cycle(); // interface ISrsReloadHandler public: - virtual int on_reload_vhost_removed(SrsConfDirective* vhost); + virtual int on_reload_vhost_removed(std::string vhost); private: // when valid and connected to vhost/app, service the client. virtual int service_cycle(); diff --git a/trunk/src/core/srs_core_config.cpp b/trunk/src/core/srs_core_config.cpp index f5b6922d9..d29a4d1f4 100644 --- a/trunk/src/core/srs_core_config.cpp +++ b/trunk/src/core/srs_core_config.cpp @@ -500,39 +500,60 @@ int SrsConfig::reload() if (old_vhost->name != "vhost") { continue; } + + std::string vhost = old_vhost->arg0(); - SrsConfDirective* new_vhost = root->get("vhost", old_vhost->arg0()); + SrsConfDirective* new_vhost = root->get("vhost", vhost); // ignore if absolutely equal if (new_vhost && srs_directive_equals(old_vhost, new_vhost)) { + srs_trace("vhost %s absolutely equal, ignore.", vhost.c_str()); continue; } // ignore if enable the new vhost when old vhost is disabled. if (get_vhost_enabled(new_vhost) && !get_vhost_enabled(old_vhost)) { + srs_trace("vhost %s disabled=>enabled, ignore.", vhost.c_str()); continue; } // ignore if both old and new vhost are disabled. if (!get_vhost_enabled(new_vhost) && !get_vhost_enabled(old_vhost)) { + srs_trace("vhost %s disabled=>disabled, ignore.", vhost.c_str()); continue; } // merge config: vhost removed/disabled. if (!get_vhost_enabled(new_vhost) && get_vhost_enabled(old_vhost)) { - srs_trace("vhost %s disabled, reload it.", old_vhost->name.c_str()); + srs_trace("vhost %s disabled, reload it.", vhost.c_str()); for (it = subscribes.begin(); it != subscribes.end(); ++it) { ISrsReloadHandler* subscribe = *it; - if ((ret = subscribe->on_reload_vhost_removed(old_vhost)) != ERROR_SUCCESS) { - srs_error("notify subscribes pithy_print remove vhost failed. ret=%d", ret); + if ((ret = subscribe->on_reload_vhost_removed(vhost)) != ERROR_SUCCESS) { + srs_error("notify subscribes pithy_print remove " + "vhost %s failed. ret=%d", vhost.c_str(), ret); return ret; } } - srs_trace("reload remove vhost success."); + srs_trace("reload remove vhost %s success.", vhost.c_str()); } // merge config: vhost modified. + srs_trace("vhost %s modified, reload its detail.", vhost.c_str()); + if (get_vhost_enabled(new_vhost) && get_vhost_enabled(old_vhost)) { + if (!srs_directive_equals(new_vhost->get("gop_cache"), old_vhost->get("gop_cache"))) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_gop_cache(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes gop_cache failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload gop_cache success.", vhost.c_str()); + } + // TODO: suppor reload hls/forward/ffmpeg/http + continue; + } + srs_warn("invalid reload path, enabled old: %d, new: %d", + get_vhost_enabled(old_vhost), get_vhost_enabled(new_vhost)); } - // TODO: suppor reload hls/forward/ffmpeg/http - return ret; } @@ -1440,6 +1461,11 @@ int SrsConfig::get_pithy_print_play() bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b) { + // both NULL, equal. + if (!a && !b) { + return true; + } + if (!a || !b) { return false; } diff --git a/trunk/src/core/srs_core_reload.cpp b/trunk/src/core/srs_core_reload.cpp index 54a856309..d27cefac5 100644 --- a/trunk/src/core/srs_core_reload.cpp +++ b/trunk/src/core/srs_core_reload.cpp @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +using namespace std; + #include ISrsReloadHandler::ISrsReloadHandler() @@ -43,7 +45,12 @@ int ISrsReloadHandler::on_reload_pithy_print() return ERROR_SUCCESS; } -int ISrsReloadHandler::on_reload_vhost_removed(SrsConfDirective* /*vhost*/) +int ISrsReloadHandler::on_reload_vhost_removed(string /*vhost*/) +{ + return ERROR_SUCCESS; +} + +int ISrsReloadHandler::on_reload_gop_cache(string /*vhost*/) { return ERROR_SUCCESS; } diff --git a/trunk/src/core/srs_core_reload.hpp b/trunk/src/core/srs_core_reload.hpp index b92109797..01a31a787 100644 --- a/trunk/src/core/srs_core_reload.hpp +++ b/trunk/src/core/srs_core_reload.hpp @@ -29,7 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include -class SrsConfDirective; +#include /** * the handler for config reload. @@ -42,7 +42,8 @@ public: public: virtual int on_reload_listen(); virtual int on_reload_pithy_print(); - virtual int on_reload_vhost_removed(SrsConfDirective* vhost); + virtual int on_reload_vhost_removed(std::string vhost); + virtual int on_reload_gop_cache(std::string vhost); }; #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index e95f81be8..f58c9aa57 100644 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -24,6 +24,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +using namespace std; #include #include @@ -344,19 +345,21 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv) std::map SrsSource::pool; -SrsSource* SrsSource::find(std::string stream_url) +SrsSource* SrsSource::find(string stream_url, string vhost) { if (pool.find(stream_url) == pool.end()) { - pool[stream_url] = new SrsSource(stream_url); - srs_verbose("create new source for url=%s", stream_url.c_str()); + pool[stream_url] = new SrsSource(stream_url, vhost); + srs_verbose("create new source for " + "url=%s, vhost=%s", stream_url.c_str(), vhost.c_str()); } return pool[stream_url]; } -SrsSource::SrsSource(std::string _stream_url) +SrsSource::SrsSource(string _stream_url, string _vhost) { stream_url = _stream_url; + vhost = _vhost; #ifdef SRS_HLS hls = new SrsHls(); @@ -407,6 +410,25 @@ SrsSource::~SrsSource() #endif } +int SrsSource::on_reload_gop_cache(string _vhost) +{ + int ret = ERROR_SUCCESS; + + if (vhost != _vhost) { + return ret; + } + + // gop cache changed. + bool enabled_cache = config->get_gop_cache(vhost); + + srs_trace("vhost %s gop_cache changed to %d, source url=%s", + vhost.c_str(), enabled_cache, stream_url.c_str()); + + set_cache(enabled_cache); + + return ret; +} + bool SrsSource::can_publish() { return _can_publish; diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index f951e63ad..f119b6754 100644 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -34,6 +34,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include + class SrsSource; class SrsCommonMessage; class SrsOnMetaDataPacket; @@ -158,19 +160,21 @@ public: /** * live streaming source. */ -class SrsSource +class SrsSource : public ISrsReloadHandler { private: static std::map pool; public: /** * find stream by vhost/app/stream. - * @stream_url the stream url, for example, myserver.xxx.com/app/stream + * @param stream_url the stream url, for example, myserver.xxx.com/app/stream + * @param vhost the vhost to constructor the object. * @return the matched source, never be NULL. * @remark stream_url should without port and schema. */ - static SrsSource* find(std::string stream_url); + static SrsSource* find(std::string stream_url, std::string vhost); private: + std::string vhost; std::string stream_url; // to delivery stream to clients. std::vector consumers; @@ -206,8 +210,11 @@ private: // the cached audio sequence header. SrsSharedPtrMessage* cache_sh_audio; public: - SrsSource(std::string _stream_url); + SrsSource(std::string _stream_url, std::string _vhost); virtual ~SrsSource(); +// interface ISrsReloadHandler +public: + virtual int on_reload_gop_cache(std::string _vhost); public: virtual bool can_publish(); virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);