diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index b58a4b869..c0bd4b288 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -70,8 +70,9 @@ SrsEdgeUpstream::~SrsEdgeUpstream() { } -SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream() +SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream(string r) { + redirect = r; sdk = new SrsSimpleRtmpClient(); } @@ -106,6 +107,17 @@ int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) int port = SRS_CONSTS_RTMP_DEFAULT_PORT; srs_parse_hostport(server, server, port); + // override the origin info by redirect. + if (!redirect.empty()) { + int _port; + string _schema, _vhost, _app, _param, _host; + srs_discovery_tc_url(redirect, _schema, _host, _vhost, _app, _port, _param); + + srs_warn("RTMP redirect %s:%d to %s:%d", server.c_str(), port, _host.c_str(), _port); + server = _host; + port = _port; + } + // support vhost tranform for edge, // @see https://github.com/ossrs/srs/issues/372 std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); @@ -160,7 +172,7 @@ SrsEdgeIngester::SrsEdgeIngester() edge = NULL; req = NULL; - upstream = new SrsEdgeRtmpUpstream(); + upstream = new SrsEdgeRtmpUpstream(redirect); lb = new SrsLbRoundRobin(); pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); } @@ -215,22 +227,39 @@ int SrsEdgeIngester::cycle() { int ret = ERROR_SUCCESS; - if ((ret = source->on_source_id_changed(_srs_context->get_id())) != ERROR_SUCCESS) { - return ret; - } - - if ((ret = upstream->connect(req, lb)) != ERROR_SUCCESS) { - return ret; - } - - if ((ret = edge->on_ingest_play()) != ERROR_SUCCESS) { - return ret; - } - - ret = ingest(); - if (srs_is_client_gracefully_close(ret)) { - srs_warn("origin disconnected, retry. ret=%d", ret); - ret = ERROR_SUCCESS; + for (;;) { + srs_freep(upstream); + upstream = new SrsEdgeRtmpUpstream(redirect); + + // we only use the redict once. + // reset the redirect to empty, for maybe the origin changed. + redirect = ""; + + if ((ret = source->on_source_id_changed(_srs_context->get_id())) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = upstream->connect(req, lb)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = edge->on_ingest_play()) != ERROR_SUCCESS) { + return ret; + } + + ret = ingest(); + + // retry for rtmp 302 immediately. + if (ret == ERROR_CONTROL_REDIRECT) { + ret = ERROR_SUCCESS; + continue; + } + + if (srs_is_client_gracefully_close(ret)) { + srs_warn("origin disconnected, retry. ret=%d", ret); + ret = ERROR_SUCCESS; + } + break; } return ret; @@ -327,6 +356,47 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) return ret; } + // call messages, for example, reject, redirect. + if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { + SrsPacket* pkt = NULL; + if ((ret = upstream->decode_message(msg, &pkt)) != ERROR_SUCCESS) { + srs_error("decode call message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsPacket, pkt); + + // RTMP 302 redirect + if (dynamic_cast(pkt)) { + SrsCallPacket* call = dynamic_cast(pkt); + if (!call->arguments->is_object()) { + return ret; + } + + SrsAmf0Any* prop = NULL; + SrsAmf0Object* evt = call->arguments->to_object(); + + if ((prop = evt->ensure_property_string("level")) == NULL) { + return ret; + } else if (prop->to_str() != StatusLevelError) { + return ret; + } + + if ((prop = evt->get_property("ex")) == NULL || !prop->is_object()) { + return ret; + } + SrsAmf0Object* ex = prop->to_object(); + + if ((prop = ex->ensure_property_string("redirect")) == NULL) { + return ret; + } + redirect = prop->to_str(); + + ret = ERROR_CONTROL_REDIRECT; + srs_info("RTMP 302 redirect to %s, ret=%d", redirect.c_str(), ret); + return ret; + } + } + return ret; } diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 07738f660..d9f27d346 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -97,9 +97,13 @@ public: class SrsEdgeRtmpUpstream : public SrsEdgeUpstream { private: + // for RTMP 302, if not empty, + // use this as upstream. + std::string redirect; SrsSimpleRtmpClient* sdk; public: - SrsEdgeRtmpUpstream(); + // @param rediect, override the server. ignore if empty. + SrsEdgeRtmpUpstream(std::string r); virtual ~SrsEdgeRtmpUpstream(); public: virtual int connect(SrsRequest* r, SrsLbRoundRobin* lb); @@ -123,6 +127,8 @@ private: SrsReusableThread2* pthread; SrsLbRoundRobin* lb; SrsEdgeUpstream* upstream; + // for RTMP 302 redirect. + std::string redirect; public: SrsEdgeIngester(); virtual ~SrsEdgeIngester();