1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Fix #464, support origin cluster

This commit is contained in:
winlin 2018-02-16 16:39:07 +08:00
parent 2f09ec4353
commit c70421e656
13 changed files with 171 additions and 39 deletions

View file

@ -366,6 +366,7 @@ vhost cluster.srs.com {
debug_srs_upnode on; debug_srs_upnode on;
# For origin(mode local) cluster, turn on the cluster. # For origin(mode local) cluster, turn on the cluster.
# @remark Origin cluster only supports RTMP, use Edge to transmux RTMP to FLV.
# default: off # default: off
# TODO: FIXME: Support reload. # TODO: FIXME: Support reload.
origin_cluster off; origin_cluster off;

View file

@ -0,0 +1,15 @@
# the config for srs origin-edge cluster
# @see https://github.com/ossrs/srs/wiki/v3_EN_OriginCluster
# @see full.conf for detail config.
listen 1935;
max_connections 1000;
pid objs/edge.pid;
daemon off;
srs_log_tank console;
vhost __defaultVhost__ {
cluster {
mode remote;
origin 127.0.0.1:19351;
}
}

View file

@ -14,6 +14,7 @@ http_api {
vhost __defaultVhost__ { vhost __defaultVhost__ {
cluster { cluster {
mode local; mode local;
origin_cluster on;
coworkers 127.0.0.1:9091; coworkers 127.0.0.1:9091;
} }
} }

View file

@ -14,6 +14,7 @@ http_api {
vhost __defaultVhost__ { vhost __defaultVhost__ {
cluster { cluster {
mode local; mode local;
origin_cluster on;
coworkers 127.0.0.1:9090; coworkers 127.0.0.1:9090;
} }
} }

View file

@ -5195,13 +5195,8 @@ vector<string> SrsConfig::get_vhost_coworkers(string vhost)
} }
conf = conf->get("coworkers"); conf = conf->get("coworkers");
for (int i = 0; i < (int)conf->directives.size(); i++) { for (int i = 0; i < (int)conf->args.size(); i++) {
SrsConfDirective* option = conf->directives[i]; coworkers.push_back(conf->args.at(i));
if (!option) {
continue;
}
coworkers.push_back(option->arg0());
} }
return coworkers; return coworkers;

View file

@ -73,9 +73,13 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string app, string stream)
string service_ip = srs_get_public_internet_address(); string service_ip = srs_get_public_internet_address();
string service_hostport = service_ports.at(0); string service_hostport = service_ports.at(0);
string service_host;
int service_port = SRS_CONSTS_RTMP_DEFAULT_PORT; int service_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
if (service_hostport.find(":") != string::npos) {
string service_host;
srs_parse_hostport(service_hostport, service_host, service_port); srs_parse_hostport(service_hostport, service_host, service_port);
} else {
service_port = ::atoi(service_hostport.c_str());
}
string backend = _srs_config->get_http_api_listen(); string backend = _srs_config->get_http_api_listen();
if (backend.find(":") == string::npos) { if (backend.find(":") == string::npos) {

View file

@ -407,6 +407,61 @@ srs_error_t SrsHttpHooks::on_hls_notify(int cid, std::string url, SrsRequest* re
return srs_success; return srs_success;
} }
srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& port)
{
srs_error_t err = srs_success;
std::string res;
int status_code;
SrsHttpClient http;
if ((err = do_post(&http, url, "", status_code, res)) != srs_success) {
return srs_error_wrap(err, "http: post %s, status=%d, res=%s", url.c_str(), status_code, res.c_str());
}
SrsJsonObject* robj = NULL;
SrsAutoFree(SrsJsonObject, robj);
if (true) {
SrsJsonAny* jr = NULL;
if ((jr = SrsJsonAny::loads(res)) == NULL) {
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "load json from %s", res.c_str());
}
if (!jr->is_object()) {
srs_freep(jr);
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "response %s", res.c_str());
}
robj = jr->to_object();
}
SrsJsonAny* prop = NULL;
if ((prop = robj->ensure_property_object("data")) == NULL) {
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str());
}
SrsJsonObject* p = prop->to_object();
if ((prop = p->ensure_property_object("origin")) == NULL) {
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str());
}
p = prop->to_object();
if ((prop = p->ensure_property_string("ip")) == NULL) {
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str());
}
host = prop->to_str();
if ((prop = p->ensure_property_integer("port")) == NULL) {
return srs_error_new(ERROR_OCLUSTER_DISCOVER, "parse data %s", res.c_str());
}
port = (int)prop->to_integer();
srs_trace("http: on_hls ok, url=%s, response=%s", url.c_str(), res.c_str());
return err;
}
srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res) srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -420,8 +475,13 @@ srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::strin
return srs_error_wrap(err, "http: init client"); return srs_error_wrap(err, "http: init client");
} }
string path = uri.get_path();
if (!uri.get_query().empty()) {
path += "?" + uri.get_query();
}
ISrsHttpMessage* msg = NULL; ISrsHttpMessage* msg = NULL;
if ((err = hc->post(uri.get_path(), req, &msg)) != srs_success) { if ((err = hc->post(path, req, &msg)) != srs_success) {
return srs_error_wrap(err, "http: client post"); return srs_error_wrap(err, "http: client post");
} }
SrsAutoFree(ISrsHttpMessage, msg); SrsAutoFree(ISrsHttpMessage, msg);

View file

@ -113,6 +113,10 @@ public:
* @param cid the source connection cid, for the on_dvr is async call. * @param cid the source connection cid, for the on_dvr is async call.
*/ */
static srs_error_t on_hls_notify(int cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify); static srs_error_t on_hls_notify(int cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify);
/**
* Discover co-workers for origin cluster.
*/
static srs_error_t discover_co_workers(std::string url, std::string& host, int& port);
private: private:
static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res); static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res);
}; };

View file

@ -227,6 +227,11 @@ srs_error_t SrsRtmpConn::do_cycle()
srs_freep(r0); srs_freep(r0);
} }
// If client is redirect to other servers, we already logged the event.
if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
srs_error_reset(err);
}
return err; return err;
} }
@ -624,6 +629,31 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
} }
} }
// When origin cluster enabled, try to redirect to the origin which is active.
// A active origin is a server which is delivering stream.
if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) {
vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);
for (int i = 0; i < (int)coworkers.size(); i++) {
int port;
string host;
string url = "http://" + coworkers.at(i) + "/api/v1/clusters?"
+ "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream;
if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) != srs_success) {
return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str());
}
srs_trace("rtmp: redirect in cluster, url=%s, target=%s:%d", url.c_str(), host.c_str(), port);
bool accepted = false;
if ((err = rtmp->redirect(req, host, port, accepted)) != srs_success) {
srs_error_reset(err);
} else {
return srs_error_new(ERROR_CONTROL_REDIRECT, "redirected");
}
}
return srs_error_new(ERROR_OCLUSTER_REDIRECT, "no origin");
}
// Set the socket options for transport. // Set the socket options for transport.
set_sock_options(); set_sock_options();

View file

@ -2004,6 +2004,11 @@ int SrsSource::pre_source_id()
return _pre_source_id; return _pre_source_id;
} }
bool SrsSource::inactive()
{
return _can_publish;
}
bool SrsSource::can_publish(bool is_edge) bool SrsSource::can_publish(bool is_edge)
{ {
if (is_edge) { if (is_edge) {

View file

@ -606,11 +606,9 @@ private:
// The metadata cache. // The metadata cache.
SrsMetaCache* meta; SrsMetaCache* meta;
private: private:
/** // Whether source is avaiable for publishing.
* can publish, true when is not streaming
*/
bool _can_publish; bool _can_publish;
// last die time, when all consumers quit and no publisher, // The last die time, when all consumers quit and no publisher,
// we will remove the source when source die. // we will remove the source when source die.
int64_t die_at; int64_t die_at;
public: public:
@ -623,9 +621,7 @@ public:
virtual bool expired(); virtual bool expired();
// initialize, get and setter. // initialize, get and setter.
public: public:
/** // initialize the hls with handlers.
* initialize the hls with handlers.
*/
virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h); virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h);
// interface ISrsReloadHandler // interface ISrsReloadHandler
public: public:
@ -637,6 +633,9 @@ public:
// get current source id. // get current source id.
virtual int source_id(); virtual int source_id();
virtual int pre_source_id(); virtual int pre_source_id();
// Whether source is inactive, which means there is no publishing stream source.
// @remark For edge, it's inactive util stream has been pulled from origin.
virtual bool inactive();
// logic data methods // logic data methods
public: public:
virtual bool can_publish(bool is_edge); virtual bool can_publish(bool is_edge);

View file

@ -273,6 +273,8 @@
#define ERROR_DASH_WRITE_FAILED 3087 #define ERROR_DASH_WRITE_FAILED 3087
#define ERROR_TS_CONTEXT_NOT_READY 3088 #define ERROR_TS_CONTEXT_NOT_READY 3088
#define ERROR_MP4_ILLEGAL_MOOF 3089 #define ERROR_MP4_ILLEGAL_MOOF 3089
#define ERROR_OCLUSTER_DISCOVER 3090
#define ERROR_OCLUSTER_REDIRECT 3091
/////////////////////////////////////////////////////// ///////////////////////////////////////////////////////
// HTTP/StreamCaster/KAFKA protocol error. // HTTP/StreamCaster/KAFKA protocol error.

View file

@ -175,6 +175,7 @@ void retrieve_local_ips()
} }
// If empty, disover IPv4 loopback. // If empty, disover IPv4 loopback.
if (ips.empty()) {
for (ifaddrs* p = ifap; p ; p = p->ifa_next) { for (ifaddrs* p = ifap; p ; p = p->ifa_next) {
ifaddrs* cur = p; ifaddrs* cur = p;
@ -187,6 +188,7 @@ void retrieve_local_ips()
discover_network_iface(cur, ips, ss0, ss1, false); discover_network_iface(cur, ips, ss0, ss1, false);
} }
} }
}
srs_trace(ss0.str().c_str()); srs_trace(ss0.str().c_str());
srs_trace(ss1.str().c_str()); srs_trace(ss1.str().c_str());
@ -216,6 +218,11 @@ string srs_get_public_internet_address()
// find the best match public address. // find the best match public address.
for (int i = 0; i < (int)ips.size(); i++) { for (int i = 0; i < (int)ips.size(); i++) {
std::string ip = ips[i]; std::string ip = ips[i];
// TODO: FIXME: Support ipv6.
if (ip.find(".") == string::npos) {
continue;
}
in_addr_t addr = inet_addr(ip.c_str()); in_addr_t addr = inet_addr(ip.c_str());
uint32_t addr_h = ntohl(addr); uint32_t addr_h = ntohl(addr);
// lo, 127.0.0.0-127.0.0.1 // lo, 127.0.0.0-127.0.0.1
@ -247,6 +254,11 @@ string srs_get_public_internet_address()
// no public address, use private address. // no public address, use private address.
for (int i = 0; i < (int)ips.size(); i++) { for (int i = 0; i < (int)ips.size(); i++) {
std::string ip = ips[i]; std::string ip = ips[i];
// TODO: FIXME: Support ipv6.
if (ip.find(".") == string::npos) {
continue;
}
in_addr_t addr = inet_addr(ip.c_str()); in_addr_t addr = inet_addr(ip.c_str());
uint32_t addr_h = ntohl(addr); uint32_t addr_h = ntohl(addr);
// lo, 127.0.0.0-127.0.0.1 // lo, 127.0.0.0-127.0.0.1
@ -261,9 +273,12 @@ string srs_get_public_internet_address()
} }
// Finally, use first whatever kind of address. // Finally, use first whatever kind of address.
if (!ips.empty()) { if (!ips.empty() && _public_internet_address.empty()) {
_public_internet_address = ips.at(0); string ip = ips.at(0);
return _public_internet_address; srs_warn("use first address as ip: %s", ip.c_str());
_public_internet_address = ip;
return ip;
} }
return ""; return "";