diff --git a/README.md b/README.md index e8208ae70..de3abbbf0 100755 --- a/README.md +++ b/README.md @@ -221,6 +221,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw * nginx v1.5.0: 139524 lines
## History +* v1.0, 2014-04-10, support reload ingesters(added/removed/updated). change to 0.9.57. * v1.0, 2014-04-07, [1.0 mainline(0.9.55)](https://github.com/winlinvip/simple-rtmp-server/releases/tag/1.0.mainline) released. 30000 lines. * v1.0, 2014-04-07, support [ingest](https://github.com/winlinvip/simple-rtmp-server/wiki/SampleIngest) file/stream/device. * v1.0, 2014-04-05, support [http api](https://github.com/winlinvip/simple-rtmp-server/wiki/HTTPApi) and [http server](https://github.com/winlinvip/simple-rtmp-server/wiki/HTTPServer). diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 23556e306..1dd96c279 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -568,7 +568,7 @@ int SrsConfig::reload() // merge config: vhost modified. srs_trace("vhost %s modified, reload its detail.", vhost.c_str()); if (get_vhost_enabled(new_vhost) && get_vhost_enabled(old_vhost)) { - // gop_cache + // gop_cache, only one per vhost if (!srs_directive_equals(new_vhost->get("gop_cache"), old_vhost->get("gop_cache"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { ISrsReloadHandler* subscribe = *it; @@ -579,7 +579,7 @@ int SrsConfig::reload() } srs_trace("vhost %s reload gop_cache success.", vhost.c_str()); } - // queue_length + // queue_length, only one per vhost if (!srs_directive_equals(new_vhost->get("queue_length"), old_vhost->get("queue_length"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { ISrsReloadHandler* subscribe = *it; @@ -590,7 +590,7 @@ int SrsConfig::reload() } srs_trace("vhost %s reload queue_length success.", vhost.c_str()); } - // forward + // forward, only one per vhost if (!srs_directive_equals(new_vhost->get("forward"), old_vhost->get("forward"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { ISrsReloadHandler* subscribe = *it; @@ -601,7 +601,7 @@ int SrsConfig::reload() } srs_trace("vhost %s reload forward success.", vhost.c_str()); } - // hls + // hls, only one per vhost if (!srs_directive_equals(new_vhost->get("hls"), old_vhost->get("hls"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { ISrsReloadHandler* subscribe = *it; @@ -612,7 +612,8 @@ int SrsConfig::reload() } srs_trace("vhost %s reload hls success.", vhost.c_str()); } - // transcode + // TODO: FIXME: there might be many transcoders per vhost. + // transcode, only one per vhost if (!srs_directive_equals(new_vhost->get("transcode"), old_vhost->get("transcode"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { ISrsReloadHandler* subscribe = *it; @@ -623,6 +624,10 @@ int SrsConfig::reload() } srs_trace("vhost %s reload transcode success.", vhost.c_str()); } + // ingest, many per vhost. + if ((ret = reload_ingest(new_vhost, old_vhost)) != ERROR_SUCCESS) { + return ret; + } // TODO: suppor reload hls/forward/ffmpeg/http continue; } @@ -689,6 +694,102 @@ int SrsConfig::parse_options(int argc, char** argv) return parse_file(config_file.c_str()); } +int SrsConfig::reload_ingest(SrsConfDirective* new_vhost, SrsConfDirective* old_vhost) +{ + int ret = ERROR_SUCCESS; + + std::vector old_ingesters; + for (int i = 0; i < (int)old_vhost->directives.size(); i++) { + SrsConfDirective* conf = old_vhost->at(i); + if (conf->name == "ingest") { + old_ingesters.push_back(conf); + } + } + + std::vector new_ingesters; + for (int i = 0; i < (int)new_vhost->directives.size(); i++) { + SrsConfDirective* conf = new_vhost->at(i); + if (conf->name == "ingest") { + new_ingesters.push_back(conf); + } + } + + std::vector::iterator it; + + std::string vhost = new_vhost->arg0(); + + // for removed ingesters, stop them. + for (int i = 0; i < (int)old_ingesters.size(); i++) { + SrsConfDirective* old_ingester = old_ingesters.at(i); + std::string ingest_id = old_ingester->arg0(); + + // if ingester exists in new vhost, not removed, ignore. + if (new_vhost->get("ingest", ingest_id)) { + continue; + } + + // notice handler ingester removed. + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_ingest_removed(vhost, ingest_id)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes ingest=%s removed failed. ret=%d", + vhost.c_str(), ingest_id.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload ingest=%s removed success.", vhost.c_str(), ingest_id.c_str()); + } + + // for added ingesters, start them. + for (int i = 0; i < (int)new_ingesters.size(); i++) { + SrsConfDirective* new_ingester = new_ingesters.at(i); + std::string ingest_id = new_ingester->arg0(); + + // if ingester exists in old vhost, not added, ignore. + if (old_vhost->get("ingest", ingest_id)) { + continue; + } + + // notice handler ingester removed. + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_ingest_added(vhost, ingest_id)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes ingest=%s added failed. ret=%d", + vhost.c_str(), ingest_id.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload ingest=%s added success.", vhost.c_str(), ingest_id.c_str()); + } + + // for updated ingesters, restart them. + for (int i = 0; i < (int)new_ingesters.size(); i++) { + SrsConfDirective* new_ingester = new_ingesters.at(i); + std::string ingest_id = new_ingester->arg0(); + SrsConfDirective* old_ingester = old_vhost->get("ingest", ingest_id); + srs_assert(old_ingester); + + if (srs_directive_equals(new_ingester, old_ingester)) { + continue; + } + + // notice handler ingester removed. + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_ingest_updated(vhost, ingest_id)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes ingest=%s updated failed. ret=%d", + vhost.c_str(), ingest_id.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload ingest=%s updated success.", vhost.c_str(), ingest_id.c_str()); + } + + srs_warn("invalid reload ingest vhost=%s", vhost.c_str()); + + return ret; +} + int SrsConfig::parse_file(const char* filename) { int ret = ERROR_SUCCESS; @@ -1667,6 +1768,17 @@ void SrsConfig::get_ingesters(std::string vhost, std::vector& return; } +SrsConfDirective* SrsConfig::get_ingest(std::string vhost, std::string ingest_id) +{ + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return NULL; + } + + conf = conf->get("ingest", ingest_id); + return conf; +} + bool SrsConfig::get_ingest_enabled(SrsConfDirective* ingest) { SrsConfDirective* conf = ingest->get("enable"); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 1edbcb94a..174968305 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -125,6 +125,7 @@ public: public: virtual int parse_options(int argc, char** argv); private: + virtual int reload_ingest(SrsConfDirective* new_vhost, SrsConfDirective* old_vhost); virtual int parse_file(const char* filename); virtual int parse_argv(int& i, char** argv); virtual void print_help(char** argv); @@ -193,6 +194,7 @@ public: // ingest section public: virtual void get_ingesters(std::string vhost, std::vector& ingeters); + virtual SrsConfDirective* get_ingest(std::string vhost, std::string ingest_id); virtual bool get_ingest_enabled(SrsConfDirective* ingest); virtual std::string get_ingest_ffmpeg(SrsConfDirective* ingest); virtual std::string get_ingest_input_type(SrsConfDirective* ingest); diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index 70208366b..6d2d98130 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -97,10 +97,6 @@ int SrsIngester::parse_ingesters(SrsConfDirective* vhost) // create engine for (int i = 0; i < (int)ingesters.size(); i++) { SrsConfDirective* ingest = ingesters[i]; - if (!_srs_config->get_ingest_enabled(ingest)) { - continue; - } - if ((ret = parse_engines(vhost, ingest)) != ERROR_SUCCESS) { return ret; } @@ -112,7 +108,11 @@ int SrsIngester::parse_ingesters(SrsConfDirective* vhost) int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest) { int ret = ERROR_SUCCESS; - + + if (!_srs_config->get_ingest_enabled(ingest)) { + return ret; + } + std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest); if (ffmpeg_bin.empty()) { ret = ERROR_ENCODER_PARSE; @@ -360,6 +360,8 @@ int SrsIngester::on_reload_vhost_added(string vhost) if ((ret = parse_ingesters(_vhost)) != ERROR_SUCCESS) { return ret; } + + srs_trace("reload add vhost ingesters, vhost=%s", vhost.c_str()); return ret; } @@ -393,4 +395,68 @@ int SrsIngester::on_reload_vhost_removed(string vhost) return ret; } +int SrsIngester::on_reload_ingest_removed(string vhost, string ingest_id) +{ + int ret = ERROR_SUCCESS; + + std::vector::iterator it; + + for (it = ingesters.begin(); it != ingesters.end();) { + SrsIngesterFFMPEG* ingester = *it; + + if (ingester->vhost != vhost || ingester->id != ingest_id) { + ++it; + continue; + } + + // stop the ffmpeg and free it. + ingester->ffmpeg->stop(); + + srs_trace("reload stop ingester, " + "vhost=%s, id=%s", vhost.c_str(), ingester->id.c_str()); + + srs_freep(ingester); + + // remove the item from ingesters. + it = ingesters.erase(it); + } + + return ret; +} + +int SrsIngester::on_reload_ingest_added(string vhost, string ingest_id) +{ + int ret = ERROR_SUCCESS; + + SrsConfDirective* _vhost = _srs_config->get_vhost(vhost); + SrsConfDirective* _ingester = _srs_config->get_ingest(vhost, ingest_id); + + if ((ret = parse_engines(_vhost, _ingester)) != ERROR_SUCCESS) { + return ret; + } + + srs_trace("reload add ingester, " + "vhost=%s, id=%s", vhost.c_str(), ingest_id.c_str()); + + return ret; +} + +int SrsIngester::on_reload_ingest_updated(string vhost, string ingest_id) +{ + int ret = ERROR_SUCCESS; + + if ((ret = on_reload_ingest_removed(vhost, ingest_id)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = on_reload_ingest_added(vhost, ingest_id)) != ERROR_SUCCESS) { + return ret; + } + + srs_trace("reload updated ingester, " + "vhost=%s, id=%s", vhost.c_str(), ingest_id.c_str()); + + return ret; +} + #endif diff --git a/trunk/src/app/srs_app_ingest.hpp b/trunk/src/app/srs_app_ingest.hpp index a37ca7956..efc953671 100644 --- a/trunk/src/app/srs_app_ingest.hpp +++ b/trunk/src/app/srs_app_ingest.hpp @@ -86,6 +86,9 @@ private: public: virtual int on_reload_vhost_removed(std::string vhost); virtual int on_reload_vhost_added(std::string vhost); + virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id); + virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id); + virtual int on_reload_ingest_updated(std::string vhost, std::string ingest_id); }; #endif diff --git a/trunk/src/app/srs_app_reload.cpp b/trunk/src/app/srs_app_reload.cpp index 9316ff43c..1cc7bc51a 100644 --- a/trunk/src/app/srs_app_reload.cpp +++ b/trunk/src/app/srs_app_reload.cpp @@ -80,3 +80,18 @@ int ISrsReloadHandler::on_reload_transcode(string /*vhost*/) return ERROR_SUCCESS; } +int ISrsReloadHandler::on_reload_ingest_removed(string /*vhost*/, string /*ingest_id*/) +{ + return ERROR_SUCCESS; +} + +int ISrsReloadHandler::on_reload_ingest_added(string /*vhost*/, string /*ingest_id*/) +{ + return ERROR_SUCCESS; +} + +int ISrsReloadHandler::on_reload_ingest_updated(string /*vhost*/, string /*ingest_id*/) +{ + return ERROR_SUCCESS; +} + diff --git a/trunk/src/app/srs_app_reload.hpp b/trunk/src/app/srs_app_reload.hpp index 427b3128f..547b268a5 100644 --- a/trunk/src/app/srs_app_reload.hpp +++ b/trunk/src/app/srs_app_reload.hpp @@ -50,6 +50,9 @@ public: virtual int on_reload_forward(std::string vhost); virtual int on_reload_hls(std::string vhost); virtual int on_reload_transcode(std::string vhost); + virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id); + virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id); + virtual int on_reload_ingest_updated(std::string vhost, std::string ingest_id); }; #endif \ No newline at end of file