From 41857edaeec3ecbc3f9145f9a260b1d66e123fd1 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 21 Jun 2014 12:39:04 +0800 Subject: [PATCH] support edge token traverse, fix #104. 0.9.129 --- README.md | 1 + trunk/conf/full.conf | 9 +- trunk/src/app/srs_app_config.cpp | 19 +++- trunk/src/app/srs_app_config.hpp | 1 + trunk/src/app/srs_app_rtmp_conn.cpp | 136 +++++++++++++++++++++++++++- trunk/src/app/srs_app_rtmp_conn.hpp | 5 + trunk/src/core/srs_core.hpp | 2 +- 7 files changed, 168 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 97840fc31..514bc2e78 100755 --- a/README.md +++ b/README.md @@ -240,6 +240,7 @@ Supported operating systems and hardware: * 2013-10-17, Created.
## History +* v1.0, 2014-06-21, support edge token traverse, fix [#104](https://github.com/winlinvip/simple-rtmp-server/issues/104). 0.9.129 * v1.0, 2014-06-19, add connections count to api summaries. 0.9.127 * v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126 * v1.0, 2014-06-18, add network bytes to api summaries. 0.9.125 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 43f32c5e1..15b401484 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -124,8 +124,15 @@ vhost same.edge.srs.com { # @remark user can specifies multiple origin for error backup, by space, # for example, 192.168.1.100:1935 192.168.1.101:1935 192.168.1.102:1935 origin 127.0.0.1:1935 localhost:1935; + # for edge, whether open the token traverse mode, + # if token traverse on, all connections of edge will forward to origin to check(auth), + # it's very important for the edge to do the token auth. + # the better way is use http callback to do the token auth by the edge, + # but if user prefer origin check(auth), the token_traverse if better solution. + # default: off + token_traverse off; } -# vhost for edge, chnage vhost. +# vhost for edge, change vhost. vhost change.edge.srs.com { mode remote; # TODO: FIXME: support extra params. diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index fb1c92570..307eb4531 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -509,7 +509,8 @@ int SrsConfig::reload() // // always support reload without additional code: // chunk_size, ff_log_dir, max_connections, - // bandcheck, http_hooks, heartbeat + // bandcheck, http_hooks, heartbeat, + // token_traverse // merge config: listen if (!srs_directive_equals(root->get("listen"), old_root->get("listen"))) { @@ -1987,6 +1988,22 @@ SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost) return conf->get("origin"); } +bool SrsConfig::get_vhost_edge_token_traverse(std::string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return false; + } + + conf = conf->get("token_traverse"); + if (!conf || conf->arg0() != "on") { + return false; + } + + return true; +} + SrsConfDirective* SrsConfig::get_transcode(string vhost, string scope) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index e58cd2c5f..a2d99dd68 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -214,6 +214,7 @@ public: virtual bool get_vhost_is_edge(std::string vhost); virtual bool get_vhost_is_edge(SrsConfDirective* vhost); virtual SrsConfDirective* get_vhost_edge_origin(std::string vhost); + virtual bool get_vhost_edge_token_traverse(std::string vhost); // vhost transcode section public: virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 55412e019..dbb5f867a 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -24,6 +24,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include +#include +#include using namespace std; @@ -45,6 +48,8 @@ using namespace std; #include #include #include +#include +#include // when stream is busy, for example, streaming is already // publishing, when a new client to request to publish, @@ -63,6 +68,9 @@ using namespace std; // if timeout, close the connection. #define SRS_PAUSED_RECV_TIMEOUT_US (int64_t)(30*60*1000*1000LL) +// when edge timeout, retry next. +#define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US (int64_t)(3*1000*1000LL) + SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd) : SrsConnection(srs_server, client_stfd) { @@ -290,6 +298,16 @@ int SrsRtmpConn::stream_service_cycle() } srs_info("set chunk_size=%d success", chunk_size); + // do token traverse before serve it. + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost); + if (vhost_is_edge && edge_traverse) { + if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) { + srs_warn("token auth failed, ret=%d", ret); + return ret; + } + } + // find a source to serve. SrsSource* source = NULL; if ((ret = SrsSource::find(req, &source)) != ERROR_SUCCESS) { @@ -297,8 +315,6 @@ int SrsRtmpConn::stream_service_cycle() } srs_assert(source != NULL); - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); - // check publish available // for edge, never check it, for edge use proxy mode. if (!vhost_is_edge) { @@ -846,6 +862,122 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg return ret; } +int SrsRtmpConn::check_edge_token_traverse_auth() +{ + int ret = ERROR_SUCCESS; + + srs_assert(req); + + st_netfd_t stsock = NULL; + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); + for (int i = 0; i < (int)conf->args.size(); i++) { + if ((ret = connect_server(i, &stsock)) == ERROR_SUCCESS) { + break; + } + } + if (ret != ERROR_SUCCESS) { + srs_warn("token traverse connect failed. ret=%d", ret); + return ret; + } + + srs_assert(stsock); + SrsSocket* io = new SrsSocket(stsock); + SrsRtmpClient* client = new SrsRtmpClient(io); + + ret = do_token_traverse_auth(io, client); + + srs_freep(client); + srs_freep(io); + srs_close_stfd(stsock); + + return ret; +} + +// TODO: FIXME: refine the connect server serials functions. +int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock) +{ + int ret = ERROR_SUCCESS; + + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); + srs_assert(conf); + + // select the origin. + std::string server = conf->args.at(origin_index % conf->args.size()); + origin_index = (origin_index + 1) % conf->args.size(); + + std::string s_port = RTMP_DEFAULT_PORT; + int port = ::atoi(RTMP_DEFAULT_PORT); + size_t pos = server.find(":"); + if (pos != std::string::npos) { + s_port = server.substr(pos + 1); + server = server.substr(0, pos); + port = ::atoi(s_port.c_str()); + } + + // connect to server. + std::string ip = srs_dns_resolve(server); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + return ret; + } + + // open socket. + // TODO: FIXME: extract utility method + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + st_netfd_t stsock = st_netfd_open_socket(sock); + if(stsock == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect(stsock, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US) == -1){ + ret = ERROR_ST_CONNECT; + srs_close_stfd(stsock); + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); + return ret; + } + srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port); + + *pstsock = stsock; + return ret; +} + +int SrsRtmpConn::do_token_traverse_auth(SrsSocket* io, SrsRtmpClient* client) +{ + int ret = ERROR_SUCCESS; + + srs_assert(client); + + client->set_recv_timeout(SRS_RECV_TIMEOUT_US); + client->set_send_timeout(SRS_SEND_TIMEOUT_US); + + if ((ret = client->handshake()) != ERROR_SUCCESS) { + srs_error("handshake with server failed. ret=%d", ret); + return ret; + } + if ((ret = client->connect_app(req->app, req->tcUrl, req)) != ERROR_SUCCESS) { + srs_error("connect with server failed, tcUrl=%s. ret=%d", req->tcUrl.c_str(), ret); + return ret; + } + + srs_trace("edge token auth ok, tcUrl=%s", req->tcUrl.c_str()); + + return ret; +} + int SrsRtmpConn::http_hooks_on_connect() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 2566f2b30..3fd77a047 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -47,6 +47,7 @@ class SrsHttpHooks; #endif class SrsBandwidth; class SrsKbps; +class SrsRtmpClient; /** * the client provides the main logic control for RTMP clients. @@ -90,6 +91,10 @@ private: virtual int flash_publish(SrsSource* source); virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge); virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg); +private: + virtual int check_edge_token_traverse_auth(); + virtual int connect_server(int origin_index, st_netfd_t* pstsock); + virtual int do_token_traverse_auth(SrsSocket* io, SrsRtmpClient* client); private: virtual int http_hooks_on_connect(); virtual void http_hooks_on_close(); diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index bd6dcaaa8..7870166f6 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "128" +#define VERSION_REVISION "129" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "SRS"