diff --git a/README.md b/README.md index f665f2384..98cd3764a 100755 --- a/README.md +++ b/README.md @@ -97,13 +97,14 @@ Supported operating systems and hardware: 17. support live stream transcoding by ffmpeg.
18. support ffmpeg filters(logo/overlay/crop), x264 params.
19. [plan] support network based cli and json result.
-20. [plan] support bandwidth test api and flash client.
-21. [plan] support adobe flash refer/token/swf verification.
-22. [plan] support adobe amf3 codec.
-23. [plan] support dvr(record live to vod file)
-24. [plan] support FMS edge protocol
-25. [plan] support encryption: RTMPE/RTMPS, HLS DRM
-26. [plan] support RTMPT, http to tranverse firewalls
+20. [plan] support http callback api hooks.
+21. [plan] support bandwidth test api and flash client.
+22. [plan] support adobe flash refer/token/swf verification.
+23. [plan] support adobe amf3 codec.
+24. [plan] support dvr(record live to vod file)
+25. [plan] support FMS edge protocol
+26. [plan] support encryption: RTMPE/RTMPS, HLS DRM
+27. [plan] support RTMPT, http to tranverse firewalls
### Performance 1. 300 connections, 150Mbps, 500kbps, CPU 18.8%, 5956KB. @@ -150,8 +151,9 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw * nginx v1.5.0: 139524 lines
### History +* v0.7, 2013-12-01, support dead-loop detect for forwarder and transcoder. * v0.7, 2013-12-01, support all ffmpeg filters and params. -* v0.7, 2013-11-30, support live stream transcoding by ffmpeg. +* v0.7, 2013-11-30, support live stream transcoder by ffmpeg. * v0.7, 2013-11-30, support --with/without -ffmpeg, build ffmpeg-2.1. * v0.7, 2013-11-30, add ffmpeg-2.1, x264-core138, lame-3.99.5, libaacplus-2.0.2. * v0.6, 2013-11-29, v0.6 released. 16094 lines. diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index 473493e99..32de6600f 100755 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -41,7 +41,7 @@ vhost __defaultVhost__ { achannels 2; aparams { } - output rtmp://[vhost]:[port]/[app]/[stream]_ld; + output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine]; } engine sd{ enabled on; @@ -64,7 +64,7 @@ vhost __defaultVhost__ { achannels 2; aparams { } - output rtmp://[vhost]:[port]/[app]/[stream]_sd; + output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine]; } } } @@ -76,7 +76,7 @@ vhost dev { hls_path ./objs/nginx/html; hls_fragment 5; hls_window 30; - #forward dev:19350; + forward 127.0.0.1:19350?vhost=dev; transcode { enabled on; ffmpeg ./objs/ffmpeg/bin/ffmpeg; @@ -100,7 +100,7 @@ vhost dev { achannels 2; aparams { } - output rtmp://127.0.0.1:[port]/[app]/[stream]_dev; + output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine]; } } } @@ -130,7 +130,7 @@ vhost mirror.transcode.vhost.com { achannels 2; aparams { } - output rtmp://[vhost]:[port]/[app]/[stream]_mirror; + output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine]; } } } @@ -160,7 +160,7 @@ vhost drawtext.transcode.vhost.com { achannels 2; aparams { } - output rtmp://[vhost]:[port]/[app]/[stream]_drawtext; + output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine]; } } } @@ -190,7 +190,7 @@ vhost crop.transcode.vhost.com { achannels 2; aparams { } - output rtmp://[vhost]:[port]/[app]/[stream]_crop; + output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine]; } } } @@ -220,7 +220,7 @@ vhost logo.transcode.vhost.com { achannels 2; aparams { } - output rtmp://[vhost]:[port]/[app]/[stream]_logo; + output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine]; } } } @@ -297,7 +297,8 @@ vhost all.transcode.vhost.com { # [port] the intput stream port. # [app] the input stream app. # [stream] the input stream name. - output rtmp://[vhost]:[port]/[app]/[stream]_ffsuper; + # [engine] the tanscode engine name. + output rtmp://127.0.0.1:[port]/[app]?vhost=[vhost]/[stream]_[engine]; } engine ffhd{ enabled on; @@ -418,15 +419,14 @@ vhost forward.vhost.com { # this used to split/forward the current stream for cluster active-standby, # active-active for cdn to build high available fault tolerance system. # format: {ip}:{port} {ip_N}:{port_N} + # or specify the vhost by: + # format: {ip}:{port}?vhost={vhost} {ip_N}:{port_N}?vhost={vhost} + # if vhost not specified, use the request vhost instead. forward 127.0.0.1:1936 127.0.0.1:1937; } # the vhost which forward publish streams to other vhosts. vhost forward1.vhost.com { - # forward all publish stream to the specified server. - # this used to split/forward the current stream for cluster active-standby, - # active-active for cdn to build high available fault tolerance system. - # format: {ip}:{port} {ip_N}:{port_N} - forward forward.vhost.com:1936 forward.vhost.com:1937; + forward 127.0.0.1:1936?vhost=forward2.vhost.com 127.0.0.1:1937?vhost=forward3.vhost.com; } # the vhost disabled. vhost removed.vhost.com { diff --git a/trunk/src/core/srs_core.cpp b/trunk/src/core/srs_core.cpp index aeab709e1..62f6a541b 100644 --- a/trunk/src/core/srs_core.cpp +++ b/trunk/src/core/srs_core.cpp @@ -24,6 +24,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include +#include + +#include static int64_t _srs_system_time_us_cache = 0; @@ -60,3 +64,49 @@ std::string srs_replace(std::string str, std::string old_str, std::string new_st return ret; } + +std::string srs_dns_resolve(std::string host) +{ + if (inet_addr(host.c_str()) != INADDR_NONE) { + return host; + } + + hostent* answer = gethostbyname(host.c_str()); + if (answer == NULL) { + srs_error("dns resolve host %s error.", host.c_str()); + return ""; + } + + char ipv4[16]; + memset(ipv4, 0, sizeof(ipv4)); + for (int i = 0; i < answer->h_length; i++) { + inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4)); + srs_info("dns resolve host %s to %s.", host.c_str(), ipv4); + break; + } + + return ipv4; +} + +void srs_vhost_resolve(std::string& vhost, std::string& app) +{ + app = srs_replace(app, "...", "?"); + + if ((pos = app.find("?")) == std::string::npos) { + return; + } + + std::string query = app.substr(pos + 1); + app = app.substr(0, pos); + + if ((pos = query.find("vhost?")) != std::string::npos + || (pos = query.find("vhost=")) != std::string::npos + || (pos = query.find("Vhost?")) != std::string::npos + || (pos = query.find("Vhost=")) != std::string::npos + ) { + query = query.substr(pos + 6); + if (!query.empty()) { + vhost = query; + } + } +} diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 2168dbd53..7e2f92459 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -94,5 +94,12 @@ extern void srs_update_system_time_ms(); #include // replace utility extern std::string srs_replace(std::string str, std::string old_str, std::string new_str); +// dns resolve utility, return the resolved ip address. +extern std::string srs_dns_resolve(std::string host); +// resolve the vhost in query string +// @param app, may contains the vhost in query string format: +// app?vhost=request_vhost +// app...vhost...request_vhost +extern void srs_vhost_resolve(std::string& vhost, std::string& app); #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index 6b8062d9d..d79de8993 100644 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -343,7 +343,7 @@ int SrsClient::publish(SrsSource* source, bool is_fmle) SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER); // notify the hls to prepare when publish start. - if ((ret = source->on_publish(req->vhost, req->port, req->app, req->stream)) != ERROR_SUCCESS) { + if ((ret = source->on_publish(req)) != ERROR_SUCCESS) { srs_error("hls on_publish failed. ret=%d", ret); return ret; } diff --git a/trunk/src/core/srs_core_config.hpp b/trunk/src/core/srs_core_config.hpp index a43c79899..979578934 100644 --- a/trunk/src/core/srs_core_config.hpp +++ b/trunk/src/core/srs_core_config.hpp @@ -37,6 +37,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // default vhost for rtmp #define RTMP_VHOST_DEFAULT "__defaultVhost__" +#define SRS_LOCALHOST "127.0.0.1" +#define RTMP_DEFAULT_PORT 1935 +#define RTMP_DEFAULT_PORTS "1935" + #define SRS_CONF_DEFAULT_HLS_PATH "./objs/nginx/html" #define SRS_CONF_DEFAULT_HLS_FRAGMENT 10 #define SRS_CONF_DEFAULT_HLS_WINDOW 60 diff --git a/trunk/src/core/srs_core_encoder.cpp b/trunk/src/core/srs_core_encoder.cpp index 53d042299..765027142 100644 --- a/trunk/src/core/srs_core_encoder.cpp +++ b/trunk/src/core/srs_core_encoder.cpp @@ -26,15 +26,23 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include + #include #include #include +#include + +#ifdef SRS_FFMPEG #define SRS_ENCODER_SLEEP_MS 2000 #define SRS_ENCODER_VCODEC "libx264" #define SRS_ENCODER_ACODEC "libaacplus" +// for encoder to detect the dead loop +static std::vector _transcoded_url; + SrsFFMPEG::SrsFFMPEG(std::string ffmpeg_bin) { started = false; @@ -56,7 +64,7 @@ SrsFFMPEG::~SrsFFMPEG() stop(); } -int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine) +int SrsFFMPEG::initialize(SrsRequest* req, SrsConfDirective* engine) { int ret = ERROR_SUCCESS; @@ -84,39 +92,32 @@ int SrsFFMPEG::initialize(std::string vhost, std::string port, std::string app, // input stream, from local. // ie. rtmp://127.0.0.1:1935/live/livestream input = "rtmp://127.0.0.1:"; - input += port; + input += req->port; input += "/"; - input += app; + input += req->app; + input += "?vhost="; + input += req->vhost; input += "/"; - input += stream; + input += req->stream; // output stream, to other/self server // ie. rtmp://127.0.0.1:1935/live/livestream_sd - if (vhost == RTMP_VHOST_DEFAULT) { - output = srs_replace(output, "[vhost]", "127.0.0.1"); - } else { - output = srs_replace(output, "[vhost]", vhost); - } - output = srs_replace(output, "[port]", port); - output = srs_replace(output, "[app]", app); - output = srs_replace(output, "[stream]", stream); + output = srs_replace(output, "[vhost]", req->vhost); + output = srs_replace(output, "[port]", req->port); + output = srs_replace(output, "[app]", req->app); + output = srs_replace(output, "[stream]", req->stream); + output = srs_replace(output, "[engine]", engine->arg0()); // important: loop check, donot transcode again. - // we think the following is loop circle: - // input: rtmp://127.0.0.1:1935/live/livestream_sd - // output: rtmp://127.0.0.1:1935/live/livestream_sd_sd - std::string tail = ""; // tail="_sd" - if (output.length() > input.length()) { - tail = output.substr(input.length()); - } - // TODO: better dead loop check. - // if input also endwiths the tail, loop detected. - if (!tail.empty() && input.rfind(tail) == input.length() - tail.length()) { + std::vector::iterator it; + it = std::find(_transcoded_url.begin(), _transcoded_url.end(), input); + if (it != _transcoded_url.end()) { ret = ERROR_ENCODER_LOOP; srs_info("detect a loop cycle, input=%s, output=%s, ignore it. ret=%d", input.c_str(), output.c_str(), ret); return ret; } + _transcoded_url.push_back(output); if (vcodec != SRS_ENCODER_VCODEC) { ret = ERROR_ENCODER_VCODEC; @@ -303,6 +304,20 @@ int SrsFFMPEG::start() params.push_back("-y"); params.push_back(output); + + if (true) { + int pparam_size = 8 * 1024; + char* pparam = new char[pparam_size]; + char* p = pparam; + char* last = pparam + pparam_size; + for (int i = 0; i < (int)params.size(); i++) { + std::string ffp = params[i]; + snprintf(p, last - p, "%s ", ffp.c_str()); + p += ffp.length() + 1; + } + srs_trace("start transcoder: %s", pparam); + srs_freepa(pparam); + } // TODO: fork or vfork? if ((pid = fork()) < 0) { @@ -346,6 +361,12 @@ void SrsFFMPEG::stop() if (!started) { return; } + + std::vector::iterator it; + it = std::find(_transcoded_url.begin(), _transcoded_url.end(), output); + if (it != _transcoded_url.end()) { + _transcoded_url.erase(it); + } } SrsEncoder::SrsEncoder() @@ -359,7 +380,7 @@ SrsEncoder::~SrsEncoder() on_unpublish(); } -int SrsEncoder::parse_scope_engines() +int SrsEncoder::parse_scope_engines(SrsRequest* req) { int ret = ERROR_SUCCESS; @@ -368,17 +389,17 @@ int SrsEncoder::parse_scope_engines() // parse vhost scope engines std::string scope = ""; - if ((conf = config->get_transcode(vhost, "")) != NULL) { - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) { + if ((conf = config->get_transcode(req->vhost, scope)) != NULL) { + if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) { srs_error("parse vhost scope=%s transcode engines failed. " "ret=%d", scope.c_str(), ret); return ret; } } // parse app scope engines - scope = app; - if ((conf = config->get_transcode(vhost, app)) != NULL) { - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) { + scope = req->app; + if ((conf = config->get_transcode(req->vhost, scope)) != NULL) { + if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) { srs_error("parse app scope=%s transcode engines failed. " "ret=%d", scope.c_str(), ret); return ret; @@ -386,9 +407,9 @@ int SrsEncoder::parse_scope_engines() } // parse stream scope engines scope += "/"; - scope += stream; - if ((conf = config->get_transcode(vhost, app + "/" + stream)) != NULL) { - if ((ret = parse_transcode(conf)) != ERROR_SUCCESS) { + scope += req->stream; + if ((conf = config->get_transcode(req->vhost, scope)) != NULL) { + if ((ret = parse_transcode(req, conf)) != ERROR_SUCCESS) { srs_error("parse stream scope=%s transcode engines failed. " "ret=%d", scope.c_str(), ret); return ret; @@ -398,16 +419,11 @@ int SrsEncoder::parse_scope_engines() return ret; } -int SrsEncoder::on_publish(std::string _vhost, std::string _port, std::string _app, std::string _stream) +int SrsEncoder::on_publish(SrsRequest* req) { int ret = ERROR_SUCCESS; - vhost = _vhost; - port = _port; - app = _app; - stream = _stream; - - ret = parse_scope_engines(); + ret = parse_scope_engines(req); // ignore the loop encoder if (ret == ERROR_ENCODER_LOOP) { @@ -457,7 +473,7 @@ SrsFFMPEG* SrsEncoder::at(int index) return ffmpegs[index]; } -int SrsEncoder::parse_transcode(SrsConfDirective* conf) +int SrsEncoder::parse_transcode(SrsRequest* req, SrsConfDirective* conf) { int ret = ERROR_SUCCESS; @@ -498,7 +514,7 @@ int SrsEncoder::parse_transcode(SrsConfDirective* conf) SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); - if ((ret = ffmpeg->initialize(vhost, port, app, stream, engine)) != ERROR_SUCCESS) { + if ((ret = ffmpeg->initialize(req, engine)) != ERROR_SUCCESS) { srs_freep(ffmpeg); // if got a loop, donot transcode the whole stream. @@ -577,3 +593,5 @@ void* SrsEncoder::encoder_thread(void* arg) return NULL; } +#endif + diff --git a/trunk/src/core/srs_core_encoder.hpp b/trunk/src/core/srs_core_encoder.hpp index fe3bb362c..5a8ad9eb2 100644 --- a/trunk/src/core/srs_core_encoder.hpp +++ b/trunk/src/core/srs_core_encoder.hpp @@ -35,6 +35,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class SrsConfDirective; +class SrsRequest; + +#ifdef SRS_FFMPEG /** * a transcode engine: ffmepg, @@ -68,7 +71,7 @@ public: SrsFFMPEG(std::string ffmpeg_bin); virtual ~SrsFFMPEG(); public: - virtual int initialize(std::string vhost, std::string port, std::string app, std::string stream, SrsConfDirective* engine); + virtual int initialize(SrsRequest* req, SrsConfDirective* engine); virtual int start(); virtual void stop(); }; @@ -79,11 +82,6 @@ public: */ class SrsEncoder { -private: - std::string vhost; - std::string port; - std::string app; - std::string stream; private: std::vector ffmpegs; private: @@ -93,16 +91,18 @@ public: SrsEncoder(); virtual ~SrsEncoder(); public: - virtual int on_publish(std::string vhost, std::string port, std::string app, std::string stream); + virtual int on_publish(SrsRequest* req); virtual void on_unpublish(); private: - virtual int parse_scope_engines(); + virtual int parse_scope_engines(SrsRequest* req); virtual void clear_engines(); virtual SrsFFMPEG* at(int index); - virtual int parse_transcode(SrsConfDirective* conf); + virtual int parse_transcode(SrsRequest* req, SrsConfDirective* conf); virtual int cycle(); virtual void encoder_cycle(); static void* encoder_thread(void* arg); }; #endif + +#endif diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index 06b2a331e..eea5f56ad 100644 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -85,6 +85,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_SYSTEM_STREAM_BUSY 410 #define ERROR_SYSTEM_IP_INVALID 411 #define ERROR_SYSTEM_CONFIG_TOO_LARGE 412 +#define ERROR_SYSTEM_FORWARD_LOOP 413 // see librtmp. // failed when open ssl create the dh diff --git a/trunk/src/core/srs_core_forward.cpp b/trunk/src/core/srs_core_forward.cpp index f766f13bc..83ee661cb 100644 --- a/trunk/src/core/srs_core_forward.cpp +++ b/trunk/src/core/srs_core_forward.cpp @@ -27,13 +27,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include -#include #include #include #include #include #include +#include +#include #define SRS_PULSE_TIMEOUT_MS 100 #define SRS_FORWARDER_SLEEP_MS 2000 @@ -62,29 +63,54 @@ SrsForwarder::~SrsForwarder() msgs.clear(); } -int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server) +int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server) { int ret = ERROR_SUCCESS; - app = _app; + // forward app + app = req->app; - tc_url = "rtmp://"; - tc_url += vhost; - tc_url += "/"; - tc_url += app; - - stream_name = stream; + stream_name = req->stream; server = forward_server; - port = 1935; - - // TODO: dead loop check. + std::string s_port = RTMP_DEFAULT_PORTS; + port = RTMP_DEFAULT_PORT; size_t pos = forward_server.find(":"); if (pos != std::string::npos) { - port = ::atoi(forward_server.substr(pos + 1).c_str()); + s_port = forward_server.substr(pos + 1); server = forward_server.substr(0, pos); } + // discovery vhost + std::string vhost = req->vhost; + srs_vhost_resolve(vhost, s_port); + port = ::atoi(s_port.c_str()); + // generate tcUrl + tc_url = "rtmp://"; + tc_url += vhost; + tc_url += "/"; + tc_url += req->app; + + // dead loop check + std::string source_ep = req->vhost; + source_ep += ":"; + source_ep += req->port; + + std::string dest_ep = vhost; + dest_ep += ":"; + dest_ep += s_port; + + if (source_ep == dest_ep) { + ret = ERROR_SYSTEM_FORWARD_LOOP; + srs_warn("farder loop detected. src=%s, dest=%s, ret=%d", + source_ep.c_str(), dest_ep.c_str(), ret); + return ret; + } + srs_trace("start forward %s to %s, stream: %s/%s", + source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(), + stream_name.c_str()); + + // start forward if ((ret = open_socket()) != ERROR_SUCCESS) { return ret; } @@ -179,7 +205,7 @@ int SrsForwarder::connect_server() { int ret = ERROR_SUCCESS; - std::string ip = parse_server(server); + std::string ip = srs_dns_resolve(server); if (ip.empty()) { ret = ERROR_SYSTEM_IP_INVALID; srs_error("dns resolve server error, ip empty. ret=%d", ret); @@ -201,29 +227,6 @@ int SrsForwarder::connect_server() return ret; } -std::string SrsForwarder::parse_server(std::string host) -{ - if (inet_addr(host.c_str()) != INADDR_NONE) { - return host; - } - - hostent* answer = gethostbyname(host.c_str()); - if (answer == NULL) { - srs_error("dns resolve host %s error.", host.c_str()); - return ""; - } - - char ipv4[16]; - memset(ipv4, 0, sizeof(ipv4)); - for (int i = 0; i < answer->h_length; i++) { - inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4)); - srs_info("dns resolve host %s to %s.", host.c_str(), ipv4); - break; - } - - return ipv4; -} - int SrsForwarder::cycle() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/core/srs_core_forward.hpp b/trunk/src/core/srs_core_forward.hpp index 3bb8d4639..405ea94a7 100644 --- a/trunk/src/core/srs_core_forward.hpp +++ b/trunk/src/core/srs_core_forward.hpp @@ -37,6 +37,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsSharedPtrMessage; class SrsOnMetaDataPacket; class SrsRtmpClient; +class SrsRequest; /** * forward the stream to other servers. @@ -61,7 +62,7 @@ public: SrsForwarder(); virtual ~SrsForwarder(); public: - virtual int on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server); + virtual int on_publish(SrsRequest* req, std::string forward_server); virtual void on_unpublish(); virtual int on_meta_data(SrsSharedPtrMessage* metadata); virtual int on_audio(SrsSharedPtrMessage* msg); @@ -69,7 +70,6 @@ public: private: virtual int open_socket(); virtual int connect_server(); - std::string parse_server(std::string host); private: virtual int cycle(); virtual int forward(); diff --git a/trunk/src/core/srs_core_hls.cpp b/trunk/src/core/srs_core_hls.cpp index 8b7f2a4ac..22b901791 100644 --- a/trunk/src/core/srs_core_hls.cpp +++ b/trunk/src/core/srs_core_hls.cpp @@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // @see: NGX_RTMP_HLS_DELAY, // 63000: 700ms, ts_tbn=90000 @@ -466,13 +467,13 @@ SrsHls::~SrsHls() srs_freep(video_frame); } -int SrsHls::on_publish(std::string _vhost, std::string _app, std::string _stream) +int SrsHls::on_publish(SrsRequest* req) { int ret = ERROR_SUCCESS; - vhost = _vhost; - stream = _stream; - app = _app; + vhost = req->vhost; + stream = req->stream; + app = req->app; // TODO: subscribe the reload event. diff --git a/trunk/src/core/srs_core_hls.hpp b/trunk/src/core/srs_core_hls.hpp index 87e829234..286e1a835 100644 --- a/trunk/src/core/srs_core_hls.hpp +++ b/trunk/src/core/srs_core_hls.hpp @@ -42,6 +42,7 @@ class SrsMpegtsFrame; class SrsRtmpJitter; class SrsTSMuxer; class SrsCodec; +class SrsRequest; /** * 3.3.2. EXTINF @@ -142,7 +143,7 @@ public: SrsHls(); virtual ~SrsHls(); public: - virtual int on_publish(std::string _vhost, std::string _app, std::string _stream); + virtual int on_publish(SrsRequest* req); virtual void on_unpublish(); virtual int on_meta_data(SrsOnMetaDataPacket* metadata); virtual int on_audio(SrsSharedPtrMessage* audio); diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index 7bafbbb49..f79ffbe83 100644 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include /** * the signature for packets to client. @@ -93,7 +94,7 @@ int SrsRequest::discovery_app() srs_verbose("discovery vhost=%s", vhost.c_str()); } - port = "1935"; + port = RTMP_DEFAULT_PORTS; if ((pos = vhost.find(":")) != std::string::npos) { port = vhost.substr(pos + 1); vhost = vhost.substr(0, pos); @@ -101,6 +102,14 @@ int SrsRequest::discovery_app() } app = url; + srs_vhost_resolve(vhost, app); + + // resolve the vhost from config + SrsConfDirective* parsed_vhost = config->get_vhost(vhost); + if (parsed_vhost) { + vhost = parsed_vhost->arg0(); + } + srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s", schema.c_str(), vhost.c_str(), port.c_str(), app.c_str()); diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index 576cbe78c..3c18eb479 100644 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -48,6 +48,12 @@ class SrsOnMetaDataPacket; */ struct SrsRequest { + /** + * tcUrl: rtmp://request_vhost:port/app/stream + * support pass vhost in query string, such as: + * rtmp://ip:port/app?vhost=request_vhost/stream + * rtmp://ip:port/app...vhost...request_vhost/stream + */ std::string tcUrl; std::string pageUrl; std::string swfUrl; diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index 0055913d1..fab06aa96 100644 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include #define CONST_MAX_JITTER_MS 500 #define DEFAULT_FRAME_TIME_MS 10 @@ -612,56 +613,48 @@ int SrsSource::on_video(SrsCommonMessage* video) return ret; } -int SrsSource::on_publish(std::string vhost, std::string port, std::string app, std::string stream) +int SrsSource::on_publish(SrsRequest* req) { int ret = ERROR_SUCCESS; _can_publish = false; - -#ifdef SRS_HLS - if ((ret = hls->on_publish(vhost, app, stream)) != ERROR_SUCCESS) { - return ret; - } -#endif - -#ifdef SRS_FFMPEG - if ((ret = encoder->on_publish(vhost, port, app, stream)) != ERROR_SUCCESS) { - return ret; - } -#endif // TODO: support reload. // create forwarders - SrsConfDirective* conf = config->get_forward(vhost); + SrsConfDirective* conf = config->get_forward(req->vhost); for (int i = 0; conf && i < (int)conf->args.size(); i++) { std::string forward_server = conf->args.at(i); SrsForwarder* forwarder = new SrsForwarder(); forwarders.push_back(forwarder); - if ((ret = forwarder->on_publish(vhost, app, stream, forward_server)) != ERROR_SUCCESS) { + if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) { srs_error("start forwarder failed. " "vhost=%s, app=%s, stream=%s, forward-to=%s", - vhost.c_str(), app.c_str(), stream.c_str(), + req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.c_str()); return ret; } } +#ifdef SRS_FFMPEG + if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) { + return ret; + } +#endif + +#ifdef SRS_HLS + if ((ret = hls->on_publish(req)) != ERROR_SUCCESS) { + return ret; + } +#endif + return ret; } void SrsSource::on_unpublish() { -#ifdef SRS_HLS - hls->on_unpublish(); -#endif - -#ifdef SRS_FFMPEG - encoder->on_unpublish(); -#endif - // close all forwarders std::vector::iterator it; for (it = forwarders.begin(); it != forwarders.end(); ++it) { @@ -671,6 +664,14 @@ void SrsSource::on_unpublish() } forwarders.clear(); +#ifdef SRS_FFMPEG + encoder->on_unpublish(); +#endif + +#ifdef SRS_HLS + hls->on_unpublish(); +#endif + gop_cache->clear(); srs_freep(cache_metadata); diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index 6fc41c919..5f5bd822f 100644 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -39,6 +39,7 @@ class SrsCommonMessage; class SrsOnMetaDataPacket; class SrsSharedPtrMessage; class SrsForwarder; +class SrsRequest; #ifdef SRS_HLS class SrsHls; #endif @@ -210,7 +211,7 @@ public: virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); virtual int on_audio(SrsCommonMessage* audio); virtual int on_video(SrsCommonMessage* video); - virtual int on_publish(std::string vhost, std::string port, std::string app, std::string stream); + virtual int on_publish(SrsRequest* req); virtual void on_unpublish(); public: virtual int create_consumer(SrsConsumer*& consumer);