diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index ad6f26f11..d7477abf6 100755 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -83,7 +83,7 @@ vhost __defaultVhost__ { vhost dev { enabled on; gop_cache on; - #forward 127.0.0.1:19350; + forward 127.0.0.1:19350; hls { hls off; hls_path ./objs/nginx/html; diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index 17d7583f8..c41bc62d2 100644 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -181,7 +181,7 @@ int SrsClient::service_cycle() srs_trace("set chunk_size=%d success", chunk_size); // find a source to publish. - SrsSource* source = SrsSource::find(req->get_stream_url(), req->vhost); + SrsSource* source = SrsSource::find(req); srs_assert(source != NULL); // check publish available. diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index b48b0795e..ba70ec643 100644 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -32,6 +32,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +using namespace std; + /** * the signature for packets to client. */ @@ -75,6 +77,23 @@ SrsRequest::~SrsRequest() { } +SrsRequest* SrsRequest::copy() +{ + SrsRequest* cp = new SrsRequest(); + + cp->app = app; + cp->objectEncoding = objectEncoding; + cp->pageUrl = pageUrl; + cp->port = port; + cp->schema = schema; + cp->stream = stream; + cp->swfUrl = swfUrl; + cp->tcUrl = tcUrl; + cp->vhost = vhost; + + return cp; +} + int SrsRequest::discovery_app() { int ret = ERROR_SUCCESS; @@ -128,7 +147,7 @@ int SrsRequest::discovery_app() return ret; } -std::string SrsRequest::get_stream_url() +string SrsRequest::get_stream_url() { std::string url = ""; @@ -148,7 +167,7 @@ void SrsRequest::strip() trim(stream, "/ \n\r\t"); } -std::string& SrsRequest::trim(std::string& str, std::string chs) +std::string& SrsRequest::trim(string& str, string chs) { for (int i = 0; i < (int)chs.length(); i++) { char ch = chs.at(i); @@ -243,7 +262,7 @@ int SrsRtmpClient::handshake() return ret; } -int SrsRtmpClient::connect_app(std::string app, std::string tc_url) +int SrsRtmpClient::connect_app(string app, string tc_url) { int ret = ERROR_SUCCESS; @@ -329,7 +348,7 @@ int SrsRtmpClient::create_stream(int& stream_id) return ret; } -int SrsRtmpClient::play(std::string stream, int stream_id) +int SrsRtmpClient::play(string stream, int stream_id) { int ret = ERROR_SUCCESS; @@ -371,7 +390,7 @@ int SrsRtmpClient::play(std::string stream, int stream_id) return ret; } -int SrsRtmpClient::publish(std::string stream, int stream_id) +int SrsRtmpClient::publish(string stream, int stream_id) { int ret = ERROR_SUCCESS; @@ -1045,7 +1064,7 @@ int SrsRtmp::start_flash_publish(int stream_id) return ret; } -int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) +int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, string& stream_name) { int ret = ERROR_SUCCESS; @@ -1102,7 +1121,7 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea return ret; } -int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name) +int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, string& stream_name) { int ret = ERROR_SUCCESS; @@ -1126,7 +1145,7 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType return ret; } -int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name) +int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, string& stream_name) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index f6e76d1b7..7e17079d6 100644 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -65,6 +65,13 @@ struct SrsRequest SrsRequest(); virtual ~SrsRequest(); + + /** + * deep copy the request, for source to use it to support reload, + * for when initialize the source, the request is valid, + * when reload it, the request maybe invalid, so need to copy it. + */ + virtual SrsRequest* copy(); /** * disconvery vhost/app from tcUrl. diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index cf405f100..0c5ad90e3 100644 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -345,21 +345,22 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv) std::map SrsSource::pool; -SrsSource* SrsSource::find(string stream_url, string vhost) +SrsSource* SrsSource::find(SrsRequest* req) { + string stream_url = req->get_stream_url(); + string vhost = req->vhost; + if (pool.find(stream_url) == pool.end()) { - pool[stream_url] = new SrsSource(stream_url, vhost); - srs_verbose("create new source for " - "url=%s, vhost=%s", stream_url.c_str(), vhost.c_str()); + pool[stream_url] = new SrsSource(req); + srs_verbose("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str()); } return pool[stream_url]; } -SrsSource::SrsSource(string _stream_url, string _vhost) +SrsSource::SrsSource(SrsRequest* _req) { - stream_url = _stream_url; - vhost = _vhost; + req = _req->copy(); #ifdef SRS_HLS hls = new SrsHls(); @@ -412,13 +413,15 @@ SrsSource::~SrsSource() #ifdef SRS_FFMPEG srs_freep(encoder); #endif + + srs_freep(req); } -int SrsSource::on_reload_gop_cache(string _vhost) +int SrsSource::on_reload_gop_cache(string vhost) { int ret = ERROR_SUCCESS; - if (vhost != _vhost) { + if (req->vhost != vhost) { return ret; } @@ -426,13 +429,32 @@ int SrsSource::on_reload_gop_cache(string _vhost) bool enabled_cache = config->get_gop_cache(vhost); srs_trace("vhost %s gop_cache changed to %d, source url=%s", - vhost.c_str(), enabled_cache, stream_url.c_str()); + vhost.c_str(), enabled_cache, req->get_stream_url().c_str()); set_cache(enabled_cache); return ret; } +int SrsSource::on_reload_forward(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + // forwarders + destroy_forwarders(); + if ((ret = create_forwarders()) != ERROR_SUCCESS) { + srs_error("create forwarders failed. ret=%d", ret); + return ret; + } + srs_trace("vhost %s forwarders reload success", vhost.c_str()); + + return ret; +} + bool SrsSource::can_publish() { return _can_publish; @@ -656,31 +678,23 @@ int SrsSource::on_video(SrsCommonMessage* video) return ret; } -int SrsSource::on_publish(SrsRequest* req) +int SrsSource::on_publish(SrsRequest* _req) { int ret = ERROR_SUCCESS; + // update the request object. + srs_freep(req); + req = _req->copy(); + srs_assert(req); + _can_publish = false; - - // TODO: support reload. // create forwarders - SrsConfDirective* conf = config->get_forward(req->vhost); - for (int i = 0; conf && i < (int)conf->args.size(); i++) { - std::string forward_server = conf->args.at(i); - - SrsForwarder* forwarder = new SrsForwarder(); - forwarders.push_back(forwarder); - - if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) { - srs_error("start forwarder failed. " - "vhost=%s, app=%s, stream=%s, forward-to=%s", - req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), - forward_server.c_str()); - return ret; - } + if ((ret = create_forwarders()) != ERROR_SUCCESS) { + srs_error("create forwarders failed. ret=%d", ret); + return ret; } - + #ifdef SRS_FFMPEG if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) { return ret; @@ -698,14 +712,8 @@ int SrsSource::on_publish(SrsRequest* req) void SrsSource::on_unpublish() { - // close all forwarders - std::vector::iterator it; - for (it = forwarders.begin(); it != forwarders.end(); ++it) { - SrsForwarder* forwarder = *it; - forwarder->on_unpublish(); - srs_freep(forwarder); - } - forwarders.clear(); + // destroy all forwarders + destroy_forwarders(); #ifdef SRS_FFMPEG encoder->on_unpublish(); @@ -776,3 +784,37 @@ void SrsSource::set_cache(bool enabled) gop_cache->set(enabled); } +int SrsSource::create_forwarders() +{ + int ret = ERROR_SUCCESS; + + SrsConfDirective* conf = config->get_forward(req->vhost); + for (int i = 0; conf && i < (int)conf->args.size(); i++) { + std::string forward_server = conf->args.at(i); + + SrsForwarder* forwarder = new SrsForwarder(); + forwarders.push_back(forwarder); + + if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) { + srs_error("start forwarder failed. " + "vhost=%s, app=%s, stream=%s, forward-to=%s", + req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), + forward_server.c_str()); + return ret; + } + } + + return ret; +} + +void SrsSource::destroy_forwarders() +{ + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + forwarder->on_unpublish(); + srs_freep(forwarder); + } + forwarders.clear(); +} + diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index f119b6754..d3fbc6bef 100644 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -167,15 +167,14 @@ private: public: /** * find stream by vhost/app/stream. - * @param stream_url the stream url, for example, myserver.xxx.com/app/stream - * @param vhost the vhost to constructor the object. + * @param req the client request. * @return the matched source, never be NULL. * @remark stream_url should without port and schema. */ - static SrsSource* find(std::string stream_url, std::string vhost); + static SrsSource* find(SrsRequest* req); private: - std::string vhost; - std::string stream_url; + // deep copy of client request. + SrsRequest* req; // to delivery stream to clients. std::vector consumers; // hls handler. @@ -210,22 +209,35 @@ private: // the cached audio sequence header. SrsSharedPtrMessage* cache_sh_audio; public: - SrsSource(std::string _stream_url, std::string _vhost); + /** + * @param _req the client request object, + * this object will deep copy it for reload. + */ + SrsSource(SrsRequest* _req); virtual ~SrsSource(); // interface ISrsReloadHandler public: - virtual int on_reload_gop_cache(std::string _vhost); + virtual int on_reload_gop_cache(std::string vhost); + virtual int on_reload_forward(std::string vhost); public: virtual bool can_publish(); virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); virtual int on_audio(SrsCommonMessage* audio); virtual int on_video(SrsCommonMessage* video); - virtual int on_publish(SrsRequest* req); + /** + * publish stream event notify. + * @param _req the request from client, the source will deep copy it, + * for when reload the request of client maybe invalid. + */ + virtual int on_publish(SrsRequest* _req); virtual void on_unpublish(); public: virtual int create_consumer(SrsConsumer*& consumer); virtual void on_consumer_destroy(SrsConsumer* consumer); virtual void set_cache(bool enabled); +private: + virtual int create_forwarders(); + virtual void destroy_forwarders(); }; #endif \ No newline at end of file