diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 5102909ce..ccde267d7 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -212,6 +212,7 @@ int SrsEdgeIngester::ingest() return ret; } +// TODO: FIXME: refine the connect_app. int SrsEdgeIngester::connect_app(string ep_server, string ep_port) { int ret = ERROR_SUCCESS; @@ -641,6 +642,7 @@ int SrsEdgeForwarder::connect_server(string& ep_server, string& ep_port) return ret; } +// TODO: FIXME: refine the connect_app. int SrsEdgeForwarder::connect_app(string ep_server, string ep_port) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 0994ffdca..c7ffbdda2 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -28,6 +28,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +using namespace std; + #include #include #include @@ -43,6 +45,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // when error, forwarder sleep for a while and retry. #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL) @@ -51,6 +54,7 @@ SrsForwarder::SrsForwarder(SrsSource* _source) { source = _source; + _req = NULL; io = NULL; client = NULL; stfd = NULL; @@ -72,37 +76,34 @@ SrsForwarder::~SrsForwarder() srs_freep(kbps); } +int SrsForwarder::initialize(SrsRequest* req, string ep_forward) +{ + int ret = ERROR_SUCCESS; + + // it's ok to use the request object, + // SrsSource already copy it and never delete it. + _req = req; + + // the ep(endpoint) to forward to + _ep_forward = ep_forward; + + return ret; +} + void SrsForwarder::set_queue_size(double queue_size) { queue->set_queue_size(queue_size); } -int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server) +int SrsForwarder::on_publish() { int ret = ERROR_SUCCESS; - // TODO: FIXME: directly use the req object. - // forward app - app = req->app; - vhost = req->vhost; + SrsRequest* req = _req; - stream_name = req->stream; - server = forward_server; - std::string s_port = SRS_CONSTS_RTMP_DEFAULT_PORT; - port = ::atoi(SRS_CONSTS_RTMP_DEFAULT_PORT); - - // TODO: FIXME: parse complex params - size_t pos = forward_server.find(":"); - if (pos != std::string::npos) { - s_port = forward_server.substr(pos + 1); - server = forward_server.substr(0, pos); - } - // discovery vhost - std::string vhost = req->vhost; - port = ::atoi(s_port.c_str()); - - // generate tcUrl - tc_url = srs_generate_tc_url(forward_server, vhost, req->app, s_port, req->param); + // discovery the server port and tcUrl from req and ep_forward. + std::string server, port, tc_url; + discovery_ep(server, port, tc_url); // dead loop check std::string source_ep = "rtmp://"; @@ -113,15 +114,15 @@ int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server) source_ep += req->vhost; std::string dest_ep = "rtmp://"; - if (forward_server == SRS_CONSTS_LOCALHOST) { + if (_ep_forward == SRS_CONSTS_LOCALHOST) { dest_ep += req->host; } else { - dest_ep += forward_server; + dest_ep += _ep_forward; } dest_ep += ":"; - dest_ep += s_port; + dest_ep += port; dest_ep += "?vhost="; - dest_ep += vhost; + dest_ep += req->vhost; if (source_ep == dest_ep) { ret = ERROR_SYSTEM_FORWARD_LOOP; @@ -131,7 +132,7 @@ int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server) } srs_trace("start forward %s to %s, tcUrl=%s, stream=%s", source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(), - stream_name.c_str()); + req->stream.c_str()); if ((ret = pthread->start()) != ERROR_SUCCESS) { srs_error("start srs thread failed. ret=%d", ret); @@ -205,7 +206,8 @@ int SrsForwarder::cycle() { int ret = ERROR_SUCCESS; - if ((ret = connect_server()) != ERROR_SUCCESS) { + std::string ep_server, ep_port; + if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) { return ret; } srs_assert(client); @@ -217,12 +219,8 @@ int SrsForwarder::cycle() srs_error("handshake with server failed. ret=%d", ret); return ret; } - // TODO: FIXME: take debug info for srs, @see SrsEdgeForwarder.connect_server. - // @see https://github.com/winlinvip/simple-rtmp-server/issues/160 - // the debug_srs_upnode is config in vhost and default to true. - bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(vhost); - if ((ret = client->connect_app(app, tc_url, NULL, debug_srs_upnode)) != ERROR_SUCCESS) { - srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret); + if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) { + srs_error("connect with server failed. ret=%d", ret); return ret; } if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { @@ -230,9 +228,9 @@ int SrsForwarder::cycle() return ret; } - if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) { + if ((ret = client->publish(_req->stream, stream_id)) != ERROR_SUCCESS) { srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d", - stream_name.c_str(), stream_id, ret); + _req->stream.c_str(), stream_id, ret); return ret; } @@ -253,18 +251,45 @@ void SrsForwarder::close_underlayer_socket() srs_close_stfd(stfd); } -int SrsForwarder::connect_server() +void SrsForwarder::discovery_ep(string& server, string& port, string& tc_url) +{ + SrsRequest* req = _req; + + server = _ep_forward; + port = SRS_CONSTS_RTMP_DEFAULT_PORT; + + // TODO: FIXME: parse complex params + size_t pos = _ep_forward.find(":"); + if (pos != std::string::npos) { + port = _ep_forward.substr(pos + 1); + server = _ep_forward.substr(0, pos); + } + + // generate tcUrl + tc_url = srs_generate_tc_url(server, req->vhost, req->app, port, req->param); +} + +int SrsForwarder::connect_server(string& ep_server, string& ep_port) { int ret = ERROR_SUCCESS; // reopen close_underlayer_socket(); + // discovery the server port and tcUrl from req and ep_forward. + std::string server, s_port, tc_url; + discovery_ep(server, s_port, tc_url); + int port = ::atoi(s_port.c_str()); + + // output the connected server and port. + ep_server = server; + ep_port = s_port; + // open socket. int64_t timeout = SRS_FORWARDER_SLEEP_US; - if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) { + if ((ret = srs_socket_connect(ep_server, port, timeout, &stfd)) != ERROR_SUCCESS) { srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", - stream_name.c_str(), tc_url.c_str(), server.c_str(), port, timeout, ret); + _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret); return ret; } @@ -278,7 +303,58 @@ int SrsForwarder::connect_server() kbps->set_io(io, io); srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d", - stream_name.c_str(), tc_url.c_str(), server.c_str(), port); + _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port); + + return ret; +} + +// TODO: FIXME: refine the connect_app. +int SrsForwarder::connect_app(string ep_server, string ep_port) +{ + int ret = ERROR_SUCCESS; + + SrsRequest* req = _req; + + // args of request takes the srs info. + if (req->args == NULL) { + req->args = SrsAmf0Any::object(); + } + + // notify server the edge identity, + // @see https://github.com/winlinvip/simple-rtmp-server/issues/147 + SrsAmf0Object* data = req->args; + data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY)); + data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); + data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE)); + data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE)); + data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL)); + data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); + data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB)); + data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL)); + data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT)); + data->set("srs_primary_authors", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY_AUTHROS)); + // for edge to directly get the id of client. + data->set("srs_pid", SrsAmf0Any::number(getpid())); + data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id())); + + // local ip of edge + std::vector ips = srs_get_local_ipv4_ips(); + assert(_srs_config->get_stats_network() < (int)ips.size()); + std::string local_ip = ips[_srs_config->get_stats_network()]; + data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str())); + + // generate the tcUrl + std::string param = ""; + std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param); + + // upnode server identity will show in the connect_app of client. + // @see https://github.com/winlinvip/simple-rtmp-server/issues/160 + // the debug_srs_upnode is config in vhost and default to true. + bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost); + if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) { + srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret); + return ret; + } return ret; } diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index aae1b132e..5d8b18012 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -51,13 +51,10 @@ class SrsKbps; class SrsForwarder : public ISrsThreadHandler { private: - std::string app; - std::string tc_url; - std::string vhost; - std::string stream_name; + // the ep to forward, server[:port]. + std::string _ep_forward; + SrsRequest* _req; int stream_id; - std::string server; - int port; private: st_netfd_t stfd; SrsThread* pthread; @@ -72,9 +69,10 @@ public: SrsForwarder(SrsSource* _source); virtual ~SrsForwarder(); public: + virtual int initialize(SrsRequest* req, std::string ep_forward); virtual void set_queue_size(double queue_size); public: - virtual int on_publish(SrsRequest* req, std::string forward_server); + virtual int on_publish(); virtual void on_unpublish(); virtual int on_meta_data(SrsSharedPtrMessage* metadata); virtual int on_audio(SrsSharedPtrMessage* msg); @@ -84,8 +82,9 @@ public: virtual int cycle(); private: virtual void close_underlayer_socket(); - // TODO: FIXME: take debug info for srs, @see SrsEdgeForwarder.connect_server. - virtual int connect_server(); + virtual void discovery_ep(std::string& server, std::string& port, std::string& tc_url); + virtual int connect_server(std::string& ep_server, std::string& ep_port); + virtual int connect_app(std::string ep_server, std::string ep_port); virtual int forward(); }; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index c34553a65..5b82f0f16 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1045,7 +1045,9 @@ int SrsRtmpConn::do_token_traverse_auth(SrsStSocket* io, SrsRtmpClient* client) srs_error("handshake with server failed. ret=%d", ret); return ret; } - if ((ret = client->connect_app(req->app, req->tcUrl, req)) != ERROR_SUCCESS) { + + // for token tranverse, always take the debug info(which carries token). + if ((ret = client->connect_app(req->app, req->tcUrl, req, true)) != ERROR_SUCCESS) { srs_error("connect with server failed, tcUrl=%s. ret=%d", req->tcUrl.c_str(), ret); return ret; } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 7133141b6..290c025c2 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1386,6 +1386,7 @@ int SrsSource::on_publish() return ret; } + // TODO: FIXME: use initialize to set req. #ifdef SRS_AUTO_TRANSCODE if ((ret = encoder->on_publish(_req)) != ERROR_SUCCESS) { srs_error("start encoder failed. ret=%d", ret); @@ -1393,6 +1394,7 @@ int SrsSource::on_publish() } #endif + // TODO: FIXME: use initialize to set req. #ifdef SRS_AUTO_HLS if ((ret = hls->on_publish(_req)) != ERROR_SUCCESS) { srs_error("start hls failed. ret=%d", ret); @@ -1400,6 +1402,7 @@ int SrsSource::on_publish() } #endif + // TODO: FIXME: use initialize to set req. #ifdef SRS_AUTO_DVR if ((ret = dvr->on_publish(_req)) != ERROR_SUCCESS) { srs_error("start dvr failed. ret=%d", ret); @@ -1548,11 +1551,16 @@ int SrsSource::create_forwarders() SrsForwarder* forwarder = new SrsForwarder(this); forwarders.push_back(forwarder); + + // initialize the forwarder with request. + if ((ret = forwarder->initialize(_req, forward_server)) != ERROR_SUCCESS) { + return ret; + } double queue_size = _srs_config->get_queue_length(_req->vhost); forwarder->set_queue_size(queue_size); - if ((ret = forwarder->on_publish(_req, forward_server)) != ERROR_SUCCESS) { + if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) { srs_error("start forwarder failed. " "vhost=%s, app=%s, stream=%s, forward-to=%s", _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(), diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 78823f874..bc89b05b1 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "201" +#define VERSION_REVISION "202" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "SRS" diff --git a/trunk/src/libs/srs_librtmp.cpp b/trunk/src/libs/srs_librtmp.cpp index 5a561a3b0..c42056c5d 100644 --- a/trunk/src/libs/srs_librtmp.cpp +++ b/trunk/src/libs/srs_librtmp.cpp @@ -253,7 +253,10 @@ int srs_connect_app(srs_rtmp_t rtmp) context->ip, context->vhost, context->app, context->port, context->param ); - if ((ret = context->rtmp->connect_app(context->app, tcUrl)) != ERROR_SUCCESS) { + + if ((ret = context->rtmp->connect_app( + context->app, tcUrl, NULL, true)) != ERROR_SUCCESS) + { return ret; } diff --git a/trunk/src/rtmp/srs_protocol_rtmp.cpp b/trunk/src/rtmp/srs_protocol_rtmp.cpp index 4a4b4f76d..8430d1a6f 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.cpp @@ -434,7 +434,8 @@ int SrsRtmpClient::complex_handshake() return ret; } -int SrsRtmpClient::connect_app(string app, string tc_url, SrsRequest* req, bool debug_srs_upnode) +int SrsRtmpClient::connect_app(string app, string tc_url, + SrsRequest* req, bool debug_srs_upnode) { std::string srs_server_ip; std::string srs_server; diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp index 2a023d864..fcf224f30 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp @@ -248,7 +248,7 @@ public: * args for edge to origin traverse auth, @see SrsRequest.args */ virtual int connect_app(std::string app, std::string tc_url, - SrsRequest* req=NULL, bool debug_srs_upnode=true); + SrsRequest* req, bool debug_srs_upnode); /** * connect to server, get the debug srs info. *