From 0213cc6466fea6983b918bc7de408abf8d32afce Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 27 Feb 2015 20:39:36 +0800 Subject: [PATCH] for #179, refine dvr, support POST create dvr when publish not start. 2.0.126 --- trunk/conf/full.conf | 2 +- trunk/src/app/srs_app_config.cpp | 24 +++ trunk/src/app/srs_app_config.hpp | 3 + trunk/src/app/srs_app_dvr.cpp | 228 +++++++++++++++------------- trunk/src/app/srs_app_dvr.hpp | 36 ++--- trunk/src/app/srs_app_hls.cpp | 35 ++++- trunk/src/app/srs_app_hls.hpp | 12 +- trunk/src/app/srs_app_rtmp_conn.cpp | 8 +- trunk/src/app/srs_app_source.cpp | 71 ++++++--- trunk/src/app/srs_app_source.hpp | 18 ++- trunk/src/core/srs_core.hpp | 2 +- 11 files changed, 265 insertions(+), 174 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index f6de5728f..d4e3f35ae 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -310,7 +310,7 @@ vhost dvr.srs.com { # vhost:"__defaultVhost", app:"live", stream:"livestream", # wait_keyframe:true, callback:"http://127.0.0.1:8085/api/v1/dvrs" # } - # @remark, the app and stream is optional. + # @remark, the app and stream is required for POST. # response in json, where: # {code:0} # method=DELETE, to stop dvr diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index b0f8219c1..2341e21bf 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1983,6 +1983,7 @@ SrsConfDirective* SrsConfig::create_directive(string vhost, string directive, st if (!vhost_conf) { vhost_conf = new SrsConfDirective(); + vhost_conf->name = vhost; root->directives.push_back(vhost_conf); } @@ -1993,6 +1994,7 @@ SrsConfDirective* SrsConfig::create_directive(string vhost, string directive, st SrsConfDirective* dir = vhost_conf->get(directive); if (!dir) { dir = new SrsConfDirective(); + dir->name = directive; vhost_conf->directives.push_back(dir); } @@ -2003,6 +2005,7 @@ SrsConfDirective* SrsConfig::create_directive(string vhost, string directive, st SrsConfDirective* sdir = dir->get(sub_directive); if (!sdir) { sdir = new SrsConfDirective(); + sdir->name = sub_directive; dir->directives.push_back(sdir); } @@ -2352,6 +2355,13 @@ bool SrsConfig::get_vhost_http_hooks_enabled(string vhost) return true; } +void SrsConfig::set_vhost_http_hooks_enabled(string vhost, bool enabled) +{ + SrsConfDirective* conf = create_directive(vhost, "http_hooks", "enabled"); + conf->args.clear(); + conf->args.push_back(enabled? "on":"off"); +} + SrsConfDirective* SrsConfig::get_vhost_on_connect(string vhost) { SrsConfDirective* conf = get_vhost_http_hooks(vhost); @@ -2429,6 +2439,13 @@ SrsConfDirective* SrsConfig::get_vhost_on_dvr(string vhost) return conf->get("on_dvr"); } +void SrsConfig::set_vhost_on_dvr(string vhost, string callback) +{ + SrsConfDirective* conf = create_directive(vhost, "http_hooks", "on_dvr"); + conf->args.clear(); + conf->args.push_back(callback); +} + bool SrsConfig::get_bw_check_enabled(string vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -3390,6 +3407,13 @@ string SrsConfig::get_dvr_plan(string vhost) return conf->arg0(); } +void SrsConfig::set_dvr_plan(string vhost, string plan) +{ + SrsConfDirective* conf = create_directive(vhost, "dvr", "dvr_plan"); + conf->args.clear(); + conf->args.push_back(plan); +} + int SrsConfig::get_dvr_duration(string vhost) { SrsConfDirective* dvr = get_dvr(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index e2e595ef4..6b7fb4982 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -596,6 +596,7 @@ public: * @remark, if not enabled, donot callback all http hooks. */ virtual bool get_vhost_http_hooks_enabled(std::string vhost); + virtual void set_vhost_http_hooks_enabled(std::string vhost, bool enabled); /** * get the on_connect callbacks of vhost. * @return the on_connect callback directive, the args is the url to callback. @@ -631,6 +632,7 @@ public: * @return the on_dvr callback directive, the args is the url to callback. */ virtual SrsConfDirective* get_vhost_on_dvr(std::string vhost); + virtual void set_vhost_on_dvr(std::string vhost, std::string callback); // bwct(bandwidth check tool) section public: /** @@ -933,6 +935,7 @@ public: * get the plan of dvr, how to reap the flv file. */ virtual std::string get_dvr_plan(std::string vhost); + virtual void set_dvr_plan(std::string vhost, std::string plan); /** * get the duration of dvr flv. */ diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 7a562c894..03c4c3612 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -28,6 +28,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include using namespace std; #include @@ -54,7 +55,6 @@ using namespace std; SrsFlvSegment::SrsFlvSegment(SrsDvrPlan* p) { req = NULL; - source = NULL; jitter = NULL; plan = p; @@ -85,11 +85,10 @@ SrsFlvSegment::~SrsFlvSegment() srs_freep(enc); } -int SrsFlvSegment::initialize(SrsSource* s, SrsRequest* r) +int SrsFlvSegment::initialize(SrsRequest* r) { int ret = ERROR_SUCCESS; - source = s; req = r; jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_dvr_time_jitter(req->vhost); @@ -617,48 +616,6 @@ string SrsDvrAsyncCallOnDvr::to_string() return ss.str(); } -SrsDvrAsyncCallOnSegment::SrsDvrAsyncCallOnSegment(SrsRequest* r, string c, string p) -{ - req = r; - callback = c; - path = p; -} - -SrsDvrAsyncCallOnSegment::~SrsDvrAsyncCallOnSegment() -{ -} - -int SrsDvrAsyncCallOnSegment::call() -{ - int ret = ERROR_SUCCESS; - -#ifdef SRS_AUTO_HTTP_CALLBACK - // HTTP: callback - if (callback.empty()) { - srs_warn("dvr: ignore for callback empty, vhost=%s", req->vhost.c_str()); - return ret; - } - - int connection_id = _srs_context->get_id(); - std::string cwd = _srs_config->cwd(); - std::string file = path; - std::string url = callback; - if ((ret = SrsHttpHooks::on_dvr_reap_segment(url, connection_id, req, cwd, file)) != ERROR_SUCCESS) { - srs_error("hook client on_dvr_reap_segment failed. url=%s, ret=%d", url.c_str(), ret); - return ret; - } -#endif - - return ret; -} - -string SrsDvrAsyncCallOnSegment::to_string() -{ - std::stringstream ss; - ss << "vhost=" << req->vhost << ", file=" << path << "callback=" << callback; - return ss.str(); -} - SrsDvrAsyncCallThread::SrsDvrAsyncCallThread() { pthread = new SrsThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US, true); @@ -717,7 +674,6 @@ int SrsDvrAsyncCallThread::cycle() SrsDvrPlan::SrsDvrPlan() { - source = NULL; req = NULL; dvr_enabled = false; @@ -731,14 +687,13 @@ SrsDvrPlan::~SrsDvrPlan() srs_freep(async); } -int SrsDvrPlan::initialize(SrsSource* s, SrsRequest* r) +int SrsDvrPlan::initialize(SrsRequest* r) { int ret = ERROR_SUCCESS; - - source = s; + req = r; - if ((ret = segment->initialize(s, r)) != ERROR_SUCCESS) { + if ((ret = segment->initialize(r)) != ERROR_SUCCESS) { return ret; } @@ -749,18 +704,6 @@ int SrsDvrPlan::initialize(SrsSource* s, SrsRequest* r) return ret; } -int SrsDvrPlan::on_dvr_request_sh() -{ - int ret = ERROR_SUCCESS; - - // the dvr is enabled, notice the source to push the data. - if ((ret = source->on_dvr_request_sh()) != ERROR_SUCCESS) { - return ret; - } - - return ret; -} - int SrsDvrPlan::on_video_keyframe() { return ERROR_SUCCESS; @@ -833,6 +776,15 @@ SrsDvrPlan* SrsDvrPlan::create_plan(string vhost) } else if (plan == SRS_CONF_DEFAULT_DVR_PLAN_APPEND) { return new SrsDvrAppendPlan(); } else if (plan == SRS_CONF_DEFAULT_DVR_PLAN_API) { + /** + * @remark the api plan maybe create by publish event or http api post create dvr event. + * so when we got from pool first when create it. + */ + SrsApiDvrPool* pool = SrsApiDvrPool::instance(); + SrsDvrApiPlan* plan = pool->get_dvr(vhost); + if (plan) { + return plan; + } return new SrsDvrApiPlan(); } else { srs_error("invalid dvr plan=%s, vhost=%s", plan.c_str(), vhost.c_str()); @@ -900,16 +852,19 @@ SrsDvrApiPlan::SrsDvrApiPlan() SrsDvrApiPlan::~SrsDvrApiPlan() { + SrsApiDvrPool* pool = SrsApiDvrPool::instance(); + pool->detach_dvr(this); + srs_freep(metadata); srs_freep(sh_audio); srs_freep(sh_video); } -int SrsDvrApiPlan::initialize(SrsSource* s, SrsRequest* r) +int SrsDvrApiPlan::initialize(SrsRequest* r) { int ret = ERROR_SUCCESS; - if ((ret = SrsDvrPlan::initialize(s, r)) != ERROR_SUCCESS) { + if ((ret = SrsDvrPlan::initialize(r)) != ERROR_SUCCESS) { return ret; } @@ -1017,6 +972,12 @@ int SrsDvrApiPlan::on_video(SrsSharedPtrMessage* __video) return ret; } +int SrsDvrApiPlan::set_plan() +{ + _srs_config->set_dvr_plan(req->vhost, SRS_CONF_DEFAULT_DVR_PLAN_API); + return ERROR_SUCCESS; +} + int SrsDvrApiPlan::set_path_tmpl(string path_tmpl) { _srs_config->set_dvr_path(req->vhost, path_tmpl); @@ -1025,7 +986,8 @@ int SrsDvrApiPlan::set_path_tmpl(string path_tmpl) int SrsDvrApiPlan::set_callback(string value) { - callback = value; + _srs_config->set_vhost_http_hooks_enabled(req->vhost, true); + _srs_config->set_vhost_on_dvr(req->vhost, value); return ERROR_SUCCESS; } @@ -1072,6 +1034,7 @@ int SrsDvrApiPlan::dumps(stringstream& ss) bool wait_keyframe = _srs_config->get_dvr_wait_keyframe(req->vhost); std::string path_template = _srs_config->get_dvr_path(req->vhost); + SrsConfDirective* callbacks = _srs_config->get_vhost_on_dvr(req->vhost); ss << __SRS_JOBJECT_START << __SRS_JFIELD_STR("path_tmpl", path_template) << __SRS_JFIELD_CONT @@ -1080,7 +1043,7 @@ int SrsDvrApiPlan::dumps(stringstream& ss) << __SRS_JFIELD_STR("vhost", req->vhost) << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("app", req->app) << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("stream", req->stream) << __SRS_JFIELD_CONT - << __SRS_JFIELD_STR("callback", callback) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("callback", callbacks->arg0()) << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("status", (dvr_enabled? "start":"stop")) << __SRS_JOBJECT_END; @@ -1133,21 +1096,6 @@ int SrsDvrApiPlan::rpc(SrsJsonObject* obj) return ret; } -int SrsDvrApiPlan::on_reap_segment() -{ - int ret = ERROR_SUCCESS; - - if ((ret = SrsDvrPlan::on_reap_segment()) != ERROR_SUCCESS) { - return ret; - } - - if ((ret = async->call(new SrsDvrAsyncCallOnSegment(req, callback, segment->get_path()))) != ERROR_SUCCESS) { - return ret; - } - - return ret; -} - int SrsDvrApiPlan::check_user_actions(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -1311,11 +1259,11 @@ SrsDvrSegmentPlan::~SrsDvrSegmentPlan() srs_freep(metadata); } -int SrsDvrSegmentPlan::initialize(SrsSource* source, SrsRequest* req) +int SrsDvrSegmentPlan::initialize(SrsRequest* req) { int ret = ERROR_SUCCESS; - if ((ret = SrsDvrPlan::initialize(source, req)) != ERROR_SUCCESS) { + if ((ret = SrsDvrPlan::initialize(req)) != ERROR_SUCCESS) { return ret; } @@ -1478,12 +1426,35 @@ SrsApiDvrPool::~SrsApiDvrPool() dvrs.clear(); } +SrsDvrApiPlan* SrsApiDvrPool::get_dvr(string vhost) +{ + std::vector::iterator it; + for (it = dvrs.begin(); it != dvrs.end(); ++it) { + SrsDvrApiPlan* plan = *it; + if (plan->req->vhost == vhost) { + return plan; + } + } + + return NULL; +} + int SrsApiDvrPool::add_dvr(SrsDvrApiPlan* dvr) { dvrs.push_back(dvr); return ERROR_SUCCESS; } +void SrsApiDvrPool::detach_dvr(SrsDvrApiPlan* dvr) +{ + std::vector::iterator it; + it = ::find(dvrs.begin(), dvrs.end(), dvr); + + if (it != dvrs.end()) { + dvrs.erase(it); + } +} + int SrsApiDvrPool::dumps(string vhost, string app, string stream, stringstream& ss) { int ret = ERROR_SUCCESS; @@ -1540,39 +1511,69 @@ int SrsApiDvrPool::create(SrsJsonAny* json) srs_error("dvr: api create dvr request requires vhost. ret=%d", ret); return ret; } - std::string vhost = prop->to_str(); - std::string app, stream; - if ((prop = obj->ensure_property_string("app")) != NULL) { - app = prop->to_str(); + + if ((prop = obj->ensure_property_string("app")) == NULL) { + ret = ERROR_HTTP_DVR_CREATE_REQUEST; + srs_error("dvr: api create dvr request requires app. ret=%d", ret); + return ret; } - if ((prop = obj->ensure_property_string("stream")) != NULL) { - stream = prop->to_str(); + std::string app = prop->to_str(); + + if ((prop = obj->ensure_property_string("stream")) == NULL) { + ret = ERROR_HTTP_DVR_CREATE_REQUEST; + srs_error("dvr: api create dvr request requires stream. ret=%d", ret); + return ret; + } + std::string stream = prop->to_str(); + + if (vhost.empty() || app.empty() || stream.empty()) { + ret = ERROR_HTTP_DVR_CREATE_REQUEST; + srs_error("dvr: api create dvr request requires vhost/app/stream. ret=%d", ret); + return ret; } SrsDvrApiPlan* dvr = NULL; for (int i = 0; i < (int)dvrs.size(); i++) { SrsDvrApiPlan* plan = dvrs.at(i); - if (!vhost.empty() && plan->req->vhost != vhost) { - continue; - } - if (!app.empty() && plan->req->app != app) { - continue; - } - if (!stream.empty() && plan->req->stream != stream) { + if (plan->req->vhost != vhost || plan->req->app != app || plan->req->stream != stream) { continue; } dvr = plan; break; } + // mock the client request for dvr. + SrsRequest* req = new SrsRequest(); + SrsAutoFree(SrsRequest, req); + + // should notice the source to reload dvr when already publishing. + SrsSource* source = NULL; + + // create if not exists if (!dvr) { - ret = ERROR_HTTP_DVR_NO_TAEGET; - srs_error("dvr: create not found for url=%s/%s/%s, ret=%d", vhost.c_str(), app.c_str(), stream.c_str(), ret); - return ret; + dvr = new SrsDvrApiPlan(); + + req->vhost = vhost; + req->app = app; + req->stream = stream; + req->tcUrl = "rtmp://" + vhost + "/" + app + "/" + stream; + + // fetch source from pool. + // NULL, create without source, ignore. + // start dvr when already publishing. + source = SrsSource::fetch(req); + + // initialize for dvr pool to create it. + if ((ret = dvr->initialize(req)) != ERROR_SUCCESS) { + return ret; + } } // update optional parameters for plan. + if ((ret = dvr->set_plan()) != ERROR_SUCCESS) { + return ret; + } if ((prop = obj->ensure_property_string("path_tmpl")) != NULL) { if ((ret = dvr->set_path_tmpl(prop->to_str())) != ERROR_SUCCESS) { return ret; @@ -1589,7 +1590,19 @@ int SrsApiDvrPool::create(SrsJsonAny* json) } } - return dvr->start(); + if ((ret = dvr->start()) != ERROR_SUCCESS) { + return ret; + } + + // do reload for source when already publishing. + // when reload, the source will use the request instead. + if (source) { + if ((ret = source->on_reload_vhost_dvr(vhost)) != ERROR_SUCCESS) { + return ret; + } + } + + return ret; } int SrsApiDvrPool::stop(string vhost, string app, string stream) @@ -1681,9 +1694,9 @@ int SrsApiDvrPool::rpc(SrsJsonAny* json) return ret; } -SrsDvr::SrsDvr(SrsSource* s) +SrsDvr::SrsDvr() { - source = s; + source = NULL; plan = NULL; } @@ -1692,14 +1705,20 @@ SrsDvr::~SrsDvr() srs_freep(plan); } -int SrsDvr::initialize(SrsRequest* r) +int SrsDvr::initialize(SrsSource* s, SrsRequest* r) { int ret = ERROR_SUCCESS; + + source = s; srs_freep(plan); plan = SrsDvrPlan::create_plan(r->vhost); - if ((ret = plan->initialize(source, r)) != ERROR_SUCCESS) { + if ((ret = plan->initialize(r)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = source->on_dvr_request_sh()) != ERROR_SUCCESS) { return ret; } @@ -1722,6 +1741,7 @@ void SrsDvr::on_unpublish() plan->on_unpublish(); } +// TODO: FIXME: source should use shared message instead. int SrsDvr::on_meta_data(SrsOnMetaDataPacket* m) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index 6e68a683c..e6bf33434 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -58,7 +58,6 @@ class SrsThread; class SrsFlvSegment : public ISrsReloadHandler { private: - SrsSource* source; SrsRequest* req; SrsDvrPlan* plan; private: @@ -121,7 +120,7 @@ public: /** * initialize the segment. */ - virtual int initialize(SrsSource* s, SrsRequest* r); + virtual int initialize(SrsRequest* r); /** * whether segment is overflow. */ @@ -200,19 +199,6 @@ public: virtual int call(); virtual std::string to_string(); }; -class SrsDvrAsyncCallOnSegment : public ISrsDvrAsyncCall -{ -private: - std::string callback; - std::string path; - SrsRequest* req; -public: - SrsDvrAsyncCallOnSegment(SrsRequest* r, std::string c, std::string p); - virtual ~SrsDvrAsyncCallOnSegment(); -public: - virtual int call(); - virtual std::string to_string(); -}; /** * the async callback for dvr. @@ -247,7 +233,6 @@ public: public: SrsRequest* req; protected: - SrsSource* source; SrsFlvSegment* segment; SrsDvrAsyncCallThread* async; bool dvr_enabled; @@ -255,7 +240,7 @@ public: SrsDvrPlan(); virtual ~SrsDvrPlan(); public: - virtual int initialize(SrsSource* s, SrsRequest* r); + virtual int initialize(SrsRequest* r); virtual int on_publish() = 0; virtual void on_unpublish() = 0; /** @@ -272,7 +257,6 @@ public: virtual int on_video(SrsSharedPtrMessage* __video); protected: virtual int on_reap_segment(); - virtual int on_dvr_request_sh(); virtual int on_video_keyframe(); virtual int64_t filter_timestamp(int64_t timestamp); public: @@ -294,6 +278,8 @@ public: /** * api plan: reap flv by api. +* @remark the api plan maybe create by publish event or http api post create dvr event. +* so when we got from pool first when create it. */ class SrsDvrApiPlan : public SrsDvrPlan { @@ -303,7 +289,6 @@ private: SrsSharedPtrMessage* sh_video; SrsSharedPtrMessage* metadata; private: - std::string callback; bool autostart; bool started; private: @@ -314,13 +299,14 @@ public: SrsDvrApiPlan(); virtual ~SrsDvrApiPlan(); public: - virtual int initialize(SrsSource* s, SrsRequest* r); + virtual int initialize(SrsRequest* r); virtual int on_publish(); virtual void on_unpublish(); virtual int on_meta_data(SrsSharedPtrMessage* __metadata); virtual int on_audio(SrsSharedPtrMessage* __audio); virtual int on_video(SrsSharedPtrMessage* __video); public: + virtual int set_plan(); virtual int set_path_tmpl(std::string path_tmpl); virtual int set_callback(std::string value); virtual int set_wait_keyframe(bool wait_keyframe); @@ -328,8 +314,6 @@ public: virtual int dumps(std::stringstream& ss); virtual int stop(); virtual int rpc(SrsJsonObject* obj); -protected: - virtual int on_reap_segment(); private: virtual int check_user_actions(SrsSharedPtrMessage* msg); }; @@ -368,7 +352,7 @@ public: SrsDvrSegmentPlan(); virtual ~SrsDvrSegmentPlan(); public: - virtual int initialize(SrsSource* source, SrsRequest* req); + virtual int initialize(SrsRequest* req); virtual int on_publish(); virtual void on_unpublish(); virtual int on_meta_data(SrsSharedPtrMessage* __metadata); @@ -392,7 +376,9 @@ public: static SrsApiDvrPool* instance(); virtual ~SrsApiDvrPool(); public: + virtual SrsDvrApiPlan* get_dvr(std::string vhost); virtual int add_dvr(SrsDvrApiPlan* dvr); + virtual void detach_dvr(SrsDvrApiPlan* dvr); public: virtual int dumps(std::string vhost, std::string app, std::string stream, std::stringstream& ss); virtual int create(SrsJsonAny* json); @@ -411,7 +397,7 @@ private: private: SrsDvrPlan* plan; public: - SrsDvr(SrsSource* s); + SrsDvr(); virtual ~SrsDvr(); public: /** @@ -419,7 +405,7 @@ public: * when system initialize(encoder publish at first time, or reload), * initialize the dvr will reinitialize the plan, the whole dvr framework. */ - virtual int initialize(SrsRequest* r); + virtual int initialize(SrsSource* s, SrsRequest* r); /** * publish stream event, * when encoder start to publish RTMP stream. diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 7c7e9a764..8cf8dc74c 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -163,10 +163,10 @@ void SrsHlsSegment::update_duration(int64_t current_frame_dts) return; } -SrsHlsMuxer::SrsHlsMuxer(ISrsHlsHandler* h) +SrsHlsMuxer::SrsHlsMuxer() { req = NULL; - handler = h; + handler = NULL; hls_fragment = hls_window = 0; target_duration = 0; _sequence_no = 0; @@ -189,6 +189,15 @@ SrsHlsMuxer::~SrsHlsMuxer() srs_freep(req); } +int SrsHlsMuxer::initialize(ISrsHlsHandler* h) +{ + int ret = ERROR_SUCCESS; + + handler = h; + + return ret; +} + int SrsHlsMuxer::sequence_no() { return _sequence_no; @@ -811,10 +820,10 @@ int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segme return ret; } -SrsHls::SrsHls(SrsSource* s, ISrsHlsHandler* h) +SrsHls::SrsHls() { - source = s; - handler = h; + source = NULL; + handler = NULL; hls_enabled = false; @@ -822,7 +831,7 @@ SrsHls::SrsHls(SrsSource* s, ISrsHlsHandler* h) sample = new SrsCodecSample(); jitter = new SrsRtmpJitter(); - muxer = new SrsHlsMuxer(h); + muxer = new SrsHlsMuxer(); hls_cache = new SrsHlsCache(); pprint = SrsPithyPrint::create_hls(); @@ -841,6 +850,20 @@ SrsHls::~SrsHls() srs_freep(pprint); } +int SrsHls::initialize(SrsSource* s, ISrsHlsHandler* h) +{ + int ret = ERROR_SUCCESS; + + source = s; + handler = h; + + if ((ret = muxer->initialize(h)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + int SrsHls::on_publish(SrsRequest* req) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index 11a81deb1..62660b3a7 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -195,11 +195,15 @@ private: */ SrsCodecAudio acodec; public: - SrsHlsMuxer(ISrsHlsHandler* h); + SrsHlsMuxer(); virtual ~SrsHlsMuxer(); public: virtual int sequence_no(); public: + /** + * initialize the hls muxer. + */ + virtual int initialize(ISrsHlsHandler* h); /** * when publish, update the config for muxer. */ @@ -325,9 +329,13 @@ private: */ int64_t stream_dts; public: - SrsHls(SrsSource* s, ISrsHlsHandler* h); + SrsHls(); virtual ~SrsHls(); public: + /** + * initialize the hls by handler and source. + */ + virtual int initialize(SrsSource* s, ISrsHlsHandler* h); /** * publish stream event, continue to write the m3u8, * for the muxer object not destroyed. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 1008d31e5..956086c44 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -392,9 +392,11 @@ int SrsRtmpConn::stream_service_cycle() bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); // find a source to serve. - SrsSource* source = NULL; - if ((ret = SrsSource::find(req, server, server, &source)) != ERROR_SUCCESS) { - return ret; + SrsSource* source = SrsSource::fetch(req); + if (!source) { + if ((ret = SrsSource::create(req, server, server, &source)) != ERROR_SUCCESS) { + return ret; + } } srs_assert(source != NULL); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index a59f00aff..fa12bda4f 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -713,35 +713,47 @@ ISrsSourceHandler::~ISrsSourceHandler() std::map SrsSource::pool; -int SrsSource::find(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps) +int SrsSource::create(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps) { int ret = ERROR_SUCCESS; string stream_url = r->get_stream_url(); string vhost = r->vhost; - if (pool.find(stream_url) == pool.end()) { - SrsSource* source = new SrsSource(hh); - if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) { - srs_freep(source); - return ret; - } - - pool[stream_url] = source; - srs_info("create new source for url=%s, vhost=%s", - stream_url.c_str(), vhost.c_str()); + // should always not exists for create a source. + srs_assert (pool.find(stream_url) == pool.end()); + + SrsSource* source = new SrsSource(); + if ((ret = source->initialize(r, h, hh)) != ERROR_SUCCESS) { + srs_freep(source); + return ret; } + + pool[stream_url] = source; + srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str()); + *pps = source; + + return ret; +} + +SrsSource* SrsSource::fetch(SrsRequest* r) +{ + SrsSource* source = NULL; + + string stream_url = r->get_stream_url(); + if (pool.find(stream_url) == pool.end()) { + return NULL; + } + + source = pool[stream_url]; + // we always update the request of resource, // for origin auth is on, the token in request maybe invalid, // and we only need to update the token of request, it's simple. - if (true) { - SrsSource* source = pool[stream_url]; - source->_req->update_auth(r); - *pps = source; - } - - return ret; + source->_req->update_auth(r); + + return source; } void SrsSource::destroy() @@ -754,17 +766,16 @@ void SrsSource::destroy() pool.clear(); } -SrsSource::SrsSource(ISrsHlsHandler* hh) +SrsSource::SrsSource() { _req = NULL; jitter_algorithm = SrsRtmpJitterAlgorithmOFF; #ifdef SRS_AUTO_HLS - // TODO: FIXME: refine code, use subscriber pattern. - hls = new SrsHls(this, hh); + hls = new SrsHls(); #endif #ifdef SRS_AUTO_DVR - dvr = new SrsDvr(this); + dvr = new SrsDvr(); #endif #ifdef SRS_AUTO_TRANSCODE encoder = new SrsEncoder(); @@ -824,16 +835,26 @@ SrsSource::~SrsSource() srs_freep(_req); } -int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) +int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh) { int ret = ERROR_SUCCESS; + srs_assert(h); + srs_assert(hh); + srs_assert(!_req); + handler = h; _req = r->copy(); atc = _srs_config->get_atc(_req->vhost); + +#ifdef SRS_AUTO_HLS + if ((ret = hls->initialize(this, hh)) != ERROR_SUCCESS) { + return ret; + } +#endif #ifdef SRS_AUTO_DVR - if ((ret = dvr->initialize(_req)) != ERROR_SUCCESS) { + if ((ret = dvr->initialize(this, _req)) != ERROR_SUCCESS) { return ret; } #endif @@ -997,7 +1018,7 @@ int SrsSource::on_reload_vhost_dvr(string vhost) dvr->on_unpublish(); // reinitialize the dvr, update plan. - if ((ret = dvr->initialize(_req)) != ERROR_SUCCESS) { + if ((ret = dvr->initialize(this, _req)) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index eb18082f1..decdc0bae 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -380,7 +380,12 @@ public: * @param hh the event handler for hls. * @param pps the matched source, if success never be NULL. */ - static int find(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps); + static int create(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps); + /** + * get the exists source, NULL when not exists. + * update the request and return the exists source. + */ + static SrsSource* fetch(SrsRequest* r); /** * when system exit, destroy the sources, * for gmc to analysis mem leaks. @@ -449,15 +454,14 @@ private: // the cached audio sequence header. SrsSharedPtrMessage* cache_sh_audio; public: - /** - * @param _req the client request object, - * this object will deep copy it for reload. - */ - SrsSource(ISrsHlsHandler* hh); + SrsSource(); virtual ~SrsSource(); // initialize, get and setter. public: - virtual int initialize(SrsRequest* r, ISrsSourceHandler* h); + /** + * initialize the hls with handlers. + */ + virtual int initialize(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh); // interface ISrsReloadHandler public: virtual int on_reload_vhost_atc(std::string vhost); diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 77ffcf3ef..1ed6ec1ab 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 125 +#define VERSION_REVISION 126 // server info. #define RTMP_SIG_SRS_KEY "SRS"