mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Update
This commit is contained in:
commit
f4966f992c
18 changed files with 731 additions and 110 deletions
|
@ -1,6 +1,7 @@
|
|||
Welome to contribute to SRS!
|
||||
|
||||
1. Please start from fixing some [Issues: good first issue](https://github.com/ossrs/srs/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22).
|
||||
1. Please [setup your email](https://github.com/ossrs/srs/wiki/HowToFilePR#setup-your-email) before contributing, this is important.
|
||||
1. Then follow the [guide](https://github.com/ossrs/srs/wiki/HowToFilePR) to file a PR.
|
||||
1. We will review your PR ASAP.
|
||||
|
||||
|
|
|
@ -15,10 +15,10 @@
|
|||
[](https://stackoverflow.com/questions/tagged/simple-realtime-server)
|
||||
[](https://hub.docker.com/r/ossrs/srs/tags)
|
||||
|
||||
SRS/5.0,[Bee](https://github.com/ossrs/srs/wiki/Product#release50) 是一个简单高效的实时视频服务器,支持RTMP/WebRTC/HLS/HTTP-FLV/SRT。
|
||||
|
||||
SRS/5.0 is a simple, high efficiency and realtime video server, supports RTMP/WebRTC/HLS/HTTP-FLV/SRT.
|
||||
|
||||
SRS/5.0,[Bee](https://github.com/ossrs/srs/wiki/Product#release50) 是一个简单高效的实时视频服务器,支持RTMP/WebRTC/HLS/HTTP-FLV/SRT。
|
||||
|
||||
SRS is licenced under [MIT](https://github.com/ossrs/srs/blob/develop/LICENSE) or [MulanPSL-2.0](https://spdx.org/licenses/MulanPSL-2.0.html),
|
||||
and note that [MulanPSL-2.0 is compatible with Apache-2.0](https://www.apache.org/legal/resolved.html#category-a),
|
||||
but some third-party libraries are distributed using their [own licenses](https://github.com/ossrs/srs/wiki/LicenseMixing).
|
||||
|
|
|
@ -115,4 +115,5 @@ CONTRIBUTORS ordered by first contribution.
|
|||
* `pyw<PYW1@users.noreply.github.com>`
|
||||
* `MatheusMacabu<macabu@users.noreply.github.com>`
|
||||
* `Alex.CR<xiaoq_bj@126.com>`
|
||||
* `mapengfei53<52305649+mapengfei53@users.noreply.github.com>`
|
||||
|
||||
|
|
15
trunk/conf/forward.backend.conf
Normal file
15
trunk/conf/forward.backend.conf
Normal file
|
@ -0,0 +1,15 @@
|
|||
# the config for srs to forward to slave service
|
||||
# @see https://github.com/ossrs/srs/wiki/v5_CN_SampleForward
|
||||
# @see full.conf for detail config.
|
||||
|
||||
listen 1935;
|
||||
max_connections 1000;
|
||||
pid ./objs/srs.backend.pid;
|
||||
daemon off;
|
||||
srs_log_tank console;
|
||||
vhost __defaultVhost__ {
|
||||
forward {
|
||||
enabled on;
|
||||
backend http://127.0.0.1:8085/api/v1/forward;
|
||||
}
|
||||
}
|
|
@ -669,6 +669,37 @@ vhost same.vhost.forward.srs.com {
|
|||
# active-active for cdn to build high available fault tolerance system.
|
||||
# format: {ip}:{port} {ip_N}:{port_N}
|
||||
destination 127.0.0.1:1936 127.0.0.1:1937;
|
||||
|
||||
# when client(encoder) publish to vhost/app/stream, call the hook in creating backend forwarder.
|
||||
# the request in the POST data string is a object encode by json:
|
||||
# {
|
||||
# "action": "on_forward",
|
||||
# "server_id": "vid-k21d7y2",
|
||||
# "client_id": "9o7g1330",
|
||||
# "ip": "127.0.0.1",
|
||||
# "vhost": "__defaultVhost__",
|
||||
# "app": "live",
|
||||
# "tcUrl": "rtmp://127.0.0.1:1935/live",
|
||||
# "stream": "livestream",
|
||||
# "param": ""
|
||||
# }
|
||||
# if valid, the hook must return HTTP code 200(Status OK) and response
|
||||
# an int value specifies the error code(0 corresponding to success):
|
||||
# {
|
||||
# "code": 0,
|
||||
# "data": {
|
||||
# "urls":[
|
||||
# "rtmp://127.0.0.1:19350/test/teststream"
|
||||
# ]
|
||||
# }
|
||||
# }
|
||||
# PS: you can transform params to backend service, such as:
|
||||
# { "param": "?forward=rtmp://127.0.0.1:19351/test/livestream" }
|
||||
# then backend return forward's url in response.
|
||||
# if backend return empty urls, destanition is still disabled.
|
||||
# only support one api hook, format:
|
||||
# backend http://xxx/api0
|
||||
backend http://127.0.0.1:8085/api/v1/forward;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,8 @@ The changelog for SRS.
|
|||
|
||||
## SRS 5.0 Changelog
|
||||
|
||||
* v5.0, 2022-02-16, Merge [#2799](https://github.com/ossrs/srs/pull/2799): Forward: Support dynamic forwarding by backend api. (#2799). v5.0.24
|
||||
* v5.0, 2022-02-14, Merge [#2878](https://github.com/ossrs/srs/pull/2878): Support include directive for config file. (#2878). v5.0.23
|
||||
* v5.0, 2022-01-18, Eliminate unused *.as files for Adobe Flash. v5.0.22
|
||||
* v5.0, 2022-01-13, Switch LICENSE from MIT to **MIT or MulanPSL-2.0**. v5.0.21
|
||||
* v5.0, 2021-10-24, For [#2689](https://github.com/ossrs/srs/issues/2689): Support loongarch, loongson CPU. v5.0.19
|
||||
|
|
|
@ -805,6 +805,86 @@ class RESTSnapshots(object):
|
|||
def OPTIONS(self, *args, **kwargs):
|
||||
enable_crossdomain()
|
||||
|
||||
'''
|
||||
handle the forward requests: dynamic forward url.
|
||||
'''
|
||||
class RESTForward(object):
|
||||
exposed = True
|
||||
|
||||
def __init__(self):
|
||||
self.__forwards = []
|
||||
|
||||
def GET(self):
|
||||
enable_crossdomain()
|
||||
|
||||
forwards = {}
|
||||
return json.dumps(forwards)
|
||||
|
||||
'''
|
||||
for SRS hook: on_forward
|
||||
on_forward:
|
||||
when srs reap a dvr file, call the hook,
|
||||
the request in the POST data string is a object encode by json:
|
||||
{
|
||||
"action": "on_forward",
|
||||
"server_id": "server_test",
|
||||
"client_id": 1985,
|
||||
"ip": "192.168.1.10",
|
||||
"vhost": "video.test.com",
|
||||
"app": "live",
|
||||
"tcUrl": "rtmp://video.test.com/live?key=d2fa801d08e3f90ed1e1670e6e52651a",
|
||||
"stream": "livestream",
|
||||
"param":"?token=xxx&salt=yyy"
|
||||
}
|
||||
if valid, the hook must return HTTP code 200(Stauts OK) and response
|
||||
an int value specifies the error code(0 corresponding to success):
|
||||
0
|
||||
'''
|
||||
def POST(self):
|
||||
enable_crossdomain()
|
||||
|
||||
# return the error code in str
|
||||
code = Error.success
|
||||
|
||||
req = cherrypy.request.body.read()
|
||||
trace("post to forwards, req=%s"%(req))
|
||||
try:
|
||||
json_req = json.loads(req)
|
||||
except Exception, ex:
|
||||
code = Error.system_parse_json
|
||||
trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
|
||||
return json.dumps({"code": int(code), "data": None})
|
||||
|
||||
action = json_req["action"]
|
||||
if action == "on_forward":
|
||||
return self.__on_forward(json_req)
|
||||
else:
|
||||
trace("invalid request action: %s"%(json_req["action"]))
|
||||
code = Error.request_invalid_action
|
||||
|
||||
return json.dumps({"code": int(code), "data": None})
|
||||
|
||||
def OPTIONS(self, *args, **kwargs):
|
||||
enable_crossdomain()
|
||||
|
||||
def __on_forward(self, req):
|
||||
code = Error.success
|
||||
|
||||
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, tcUrl=%s, stream=%s, param=%s"%(
|
||||
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["tcUrl"], req["stream"], req["param"]
|
||||
))
|
||||
|
||||
'''
|
||||
backend service config description:
|
||||
support multiple rtmp urls(custom addresses or third-party cdn service),
|
||||
url's host is slave service.
|
||||
For example:
|
||||
["rtmp://127.0.0.1:19350/test/teststream", "rtmp://127.0.0.1:19350/test/teststream?token=xxxx"]
|
||||
'''
|
||||
forwards = ["rtmp://127.0.0.1:19350/test/teststream"]
|
||||
|
||||
return json.dumps({"code": int(code), "data": {"urls": forwards}})
|
||||
|
||||
# HTTP RESTful path.
|
||||
class Root(object):
|
||||
exposed = True
|
||||
|
@ -846,6 +926,7 @@ class V1(object):
|
|||
self.chats = RESTChats()
|
||||
self.servers = RESTServers()
|
||||
self.snapshots = RESTSnapshots()
|
||||
self.forward = RESTForward()
|
||||
def GET(self):
|
||||
enable_crossdomain();
|
||||
return json.dumps({"code":Error.success, "urls":{
|
||||
|
|
|
@ -842,9 +842,9 @@ bool SrsConfDirective::is_stream_caster()
|
|||
return name == "stream_caster";
|
||||
}
|
||||
|
||||
srs_error_t SrsConfDirective::parse(SrsConfigBuffer* buffer)
|
||||
srs_error_t SrsConfDirective::parse(SrsConfigBuffer* buffer, SrsConfig* conf)
|
||||
{
|
||||
return parse_conf(buffer, parse_file);
|
||||
return parse_conf(buffer, SrsDirectiveContextFile, conf);
|
||||
}
|
||||
|
||||
srs_error_t SrsConfDirective::persistence(SrsFileWriter* writer, int level)
|
||||
|
@ -971,71 +971,78 @@ SrsJsonAny* SrsConfDirective::dumps_arg0_to_boolean()
|
|||
// LCOV_EXCL_STOP
|
||||
|
||||
// see: ngx_conf_parse
|
||||
srs_error_t SrsConfDirective::parse_conf(SrsConfigBuffer* buffer, SrsDirectiveType type)
|
||||
srs_error_t SrsConfDirective::parse_conf(SrsConfigBuffer* buffer, SrsDirectiveContext ctx, SrsConfig* conf)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
while (true) {
|
||||
std::vector<string> args;
|
||||
int line_start = 0;
|
||||
err = read_token(buffer, args, line_start);
|
||||
|
||||
/**
|
||||
* ret maybe:
|
||||
* ERROR_SYSTEM_CONFIG_INVALID error.
|
||||
* ERROR_SYSTEM_CONFIG_DIRECTIVE directive terminated by ';' found
|
||||
* ERROR_SYSTEM_CONFIG_BLOCK_START token terminated by '{' found
|
||||
* ERROR_SYSTEM_CONFIG_BLOCK_END the '}' found
|
||||
* ERROR_SYSTEM_CONFIG_EOF the config file is done
|
||||
*/
|
||||
if (srs_error_code(err) == ERROR_SYSTEM_CONFIG_INVALID) {
|
||||
return err;
|
||||
SrsDirectiveState state = SrsDirectiveStateInit;
|
||||
if ((err = read_token(buffer, args, line_start, state)) != srs_success) {
|
||||
return srs_error_wrap(err, "read token, line=%d, state=%d", line_start, state);
|
||||
}
|
||||
if (srs_error_code(err) == ERROR_SYSTEM_CONFIG_BLOCK_END) {
|
||||
if (type != parse_block) {
|
||||
return srs_error_wrap(err, "line %d: unexpected \"}\"", buffer->line);
|
||||
}
|
||||
|
||||
srs_freep(err);
|
||||
return srs_success;
|
||||
|
||||
if (state == SrsDirectiveStateBlockEnd) {
|
||||
return ctx == SrsDirectiveContextBlock ? srs_success : srs_error_wrap(err, "line %d: unexpected \"}\"", buffer->line);
|
||||
}
|
||||
if (srs_error_code(err) == ERROR_SYSTEM_CONFIG_EOF) {
|
||||
if (type == parse_block) {
|
||||
return srs_error_wrap(err, "line %d: unexpected end of file, expecting \"}\"", conf_line);
|
||||
}
|
||||
|
||||
srs_freep(err);
|
||||
return srs_success;
|
||||
if (state == SrsDirectiveStateEOF) {
|
||||
return ctx != SrsDirectiveContextBlock ? srs_success : srs_error_wrap(err, "line %d: unexpected end of file, expecting \"}\"", conf_line);
|
||||
}
|
||||
|
||||
if (args.empty()) {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "line %d: empty directive", conf_line);
|
||||
}
|
||||
|
||||
// build directive tree.
|
||||
SrsConfDirective* directive = new SrsConfDirective();
|
||||
|
||||
directive->conf_line = line_start;
|
||||
directive->name = args[0];
|
||||
args.erase(args.begin());
|
||||
directive->args.swap(args);
|
||||
|
||||
directives.push_back(directive);
|
||||
|
||||
if (srs_error_code(err) == ERROR_SYSTEM_CONFIG_BLOCK_START) {
|
||||
srs_freep(err);
|
||||
if ((err = directive->parse_conf(buffer, parse_block)) != srs_success) {
|
||||
return srs_error_wrap(err, "parse dir");
|
||||
// Build normal directive which is not "include".
|
||||
if (args.at(0) != "include") {
|
||||
SrsConfDirective* directive = new SrsConfDirective();
|
||||
|
||||
directive->conf_line = line_start;
|
||||
directive->name = args[0];
|
||||
args.erase(args.begin());
|
||||
directive->args.swap(args);
|
||||
|
||||
directives.push_back(directive);
|
||||
|
||||
if (state == SrsDirectiveStateBlockStart) {
|
||||
if ((err = directive->parse_conf(buffer, SrsDirectiveContextBlock, conf)) != srs_success) {
|
||||
return srs_error_wrap(err, "parse dir");
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Parse including, allow multiple files.
|
||||
vector<string> files(args.begin() + 1, args.end());
|
||||
if (files.empty()) {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "line %d: include is empty directive", buffer->line);
|
||||
}
|
||||
if (!conf) {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "line %d: no config", buffer->line);
|
||||
}
|
||||
|
||||
for (int i = 0; i < (int)files.size(); i++) {
|
||||
std::string file = files.at(i);
|
||||
srs_assert(!file.empty());
|
||||
srs_trace("config parse include %s", file.c_str());
|
||||
|
||||
SrsConfigBuffer* include_file_buffer = NULL;
|
||||
SrsAutoFree(SrsConfigBuffer, include_file_buffer);
|
||||
if ((err = conf->build_buffer(file, &include_file_buffer)) != srs_success) {
|
||||
return srs_error_wrap(err, "buffer fullfill %s", file.c_str());
|
||||
}
|
||||
|
||||
if ((err = parse_conf(include_file_buffer, SrsDirectiveContextFile, conf)) != srs_success) {
|
||||
return srs_error_wrap(err, "parse include buffer");
|
||||
}
|
||||
}
|
||||
srs_freep(err);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
// see: ngx_conf_read_token
|
||||
srs_error_t SrsConfDirective::read_token(SrsConfigBuffer* buffer, vector<string>& args, int& line_start)
|
||||
srs_error_t SrsConfDirective::read_token(SrsConfigBuffer* buffer, vector<string>& args, int& line_start, SrsDirectiveState& state)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
|
@ -1057,8 +1064,9 @@ srs_error_t SrsConfDirective::read_token(SrsConfigBuffer* buffer, vector<string>
|
|||
buffer->line);
|
||||
}
|
||||
srs_trace("config parse complete");
|
||||
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_EOF, "EOF");
|
||||
|
||||
state = SrsDirectiveStateEOF;
|
||||
return err;
|
||||
}
|
||||
|
||||
char ch = *buffer->pos++;
|
||||
|
@ -1079,10 +1087,12 @@ srs_error_t SrsConfDirective::read_token(SrsConfigBuffer* buffer, vector<string>
|
|||
continue;
|
||||
}
|
||||
if (ch == ';') {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_DIRECTIVE, "dir");
|
||||
state = SrsDirectiveStateEntire;
|
||||
return err;
|
||||
}
|
||||
if (ch == '{') {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_BLOCK_START, "block");
|
||||
state = SrsDirectiveStateBlockStart;
|
||||
return err;
|
||||
}
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "line %d: unexpected '%c'", buffer->line, ch);
|
||||
}
|
||||
|
@ -1098,17 +1108,20 @@ srs_error_t SrsConfDirective::read_token(SrsConfigBuffer* buffer, vector<string>
|
|||
if (args.size() == 0) {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "line %d: unexpected ';'", buffer->line);
|
||||
}
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_DIRECTIVE, "dir");
|
||||
state = SrsDirectiveStateEntire;
|
||||
return err;
|
||||
case '{':
|
||||
if (args.size() == 0) {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "line %d: unexpected '{'", buffer->line);
|
||||
}
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_BLOCK_START, "block");
|
||||
state = SrsDirectiveStateBlockStart;
|
||||
return err;
|
||||
case '}':
|
||||
if (args.size() != 0) {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "line %d: unexpected '}'", buffer->line);
|
||||
}
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_BLOCK_END, "block");
|
||||
state = SrsDirectiveStateBlockEnd;
|
||||
return err;
|
||||
case '#':
|
||||
sharp_comment = 1;
|
||||
continue;
|
||||
|
@ -1163,10 +1176,12 @@ srs_error_t SrsConfDirective::read_token(SrsConfigBuffer* buffer, vector<string>
|
|||
srs_freepa(aword);
|
||||
|
||||
if (ch == ';') {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_DIRECTIVE, "dir");
|
||||
state = SrsDirectiveStateEntire;
|
||||
return err;
|
||||
}
|
||||
if (ch == '{') {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_BLOCK_START, "block");
|
||||
state = SrsDirectiveStateBlockStart;
|
||||
return err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2406,19 +2421,34 @@ srs_error_t SrsConfig::parse_file(const char* filename)
|
|||
if (config_file.empty()) {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "empty config");
|
||||
}
|
||||
|
||||
SrsConfigBuffer buffer;
|
||||
|
||||
if ((err = buffer.fullfill(config_file.c_str())) != srs_success) {
|
||||
return srs_error_wrap(err, "buffer fullfil");
|
||||
|
||||
SrsConfigBuffer* buffer = NULL;
|
||||
SrsAutoFree(SrsConfigBuffer, buffer);
|
||||
if ((err = build_buffer(config_file, &buffer)) != srs_success) {
|
||||
return srs_error_wrap(err, "buffer fullfill %s", config_file.c_str());
|
||||
}
|
||||
|
||||
if ((err = parse_buffer(&buffer)) != srs_success) {
|
||||
if ((err = parse_buffer(buffer)) != srs_success) {
|
||||
return srs_error_wrap(err, "parse buffer");
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsConfig::build_buffer(string src, SrsConfigBuffer** pbuffer)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
SrsConfigBuffer* buffer = new SrsConfigBuffer();
|
||||
|
||||
if ((err = buffer->fullfill(src.c_str())) != srs_success) {
|
||||
srs_freep(buffer);
|
||||
return srs_error_wrap(err, "read from src %s", src.c_str());
|
||||
}
|
||||
|
||||
*pbuffer = buffer;
|
||||
return err;
|
||||
}
|
||||
// LCOV_EXCL_STOP
|
||||
|
||||
srs_error_t SrsConfig::check_config()
|
||||
|
@ -2795,7 +2825,7 @@ srs_error_t SrsConfig::check_normal_config()
|
|||
} else if (n == "forward") {
|
||||
for (int j = 0; j < (int)conf->directives.size(); j++) {
|
||||
string m = conf->at(j)->name;
|
||||
if (m != "enabled" && m != "destination") {
|
||||
if (m != "enabled" && m != "destination" && m != "backend") {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.forward.%s of %s", m.c_str(), vhost->arg0().c_str());
|
||||
}
|
||||
}
|
||||
|
@ -2955,7 +2985,7 @@ srs_error_t SrsConfig::parse_buffer(SrsConfigBuffer* buffer)
|
|||
root = new SrsConfDirective();
|
||||
|
||||
// Parse root tree from buffer.
|
||||
if ((err = root->parse(buffer)) != srs_success) {
|
||||
if ((err = root->parse(buffer, this)) != srs_success) {
|
||||
return srs_error_wrap(err, "root parse");
|
||||
}
|
||||
|
||||
|
@ -4605,6 +4635,21 @@ SrsConfDirective* SrsConfig::get_forwards(string vhost)
|
|||
return conf->get("destination");
|
||||
}
|
||||
|
||||
SrsConfDirective* SrsConfig::get_forward_backend(string vhost)
|
||||
{
|
||||
SrsConfDirective* conf = get_vhost(vhost);
|
||||
if (!conf) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
conf = conf->get("forward");
|
||||
if (!conf) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return conf->get("backend");
|
||||
}
|
||||
|
||||
SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost)
|
||||
{
|
||||
SrsConfDirective* conf = get_vhost(vhost);
|
||||
|
|
|
@ -210,7 +210,7 @@ public:
|
|||
// Parse utilities
|
||||
public:
|
||||
// Parse config directive from file buffer.
|
||||
virtual srs_error_t parse(srs_internal::SrsConfigBuffer* buffer);
|
||||
virtual srs_error_t parse(srs_internal::SrsConfigBuffer* buffer, SrsConfig* conf = NULL);
|
||||
// Marshal the directive to writer.
|
||||
// @param level, the root is level0, all its directives are level1, and so on.
|
||||
virtual srs_error_t persistence(SrsFileWriter* writer, int level);
|
||||
|
@ -223,24 +223,36 @@ public:
|
|||
virtual SrsJsonAny* dumps_arg0_to_boolean();
|
||||
// private parse.
|
||||
private:
|
||||
// The directive parsing type.
|
||||
enum SrsDirectiveType {
|
||||
// The directive parsing context.
|
||||
enum SrsDirectiveContext {
|
||||
// The root directives, parsing file.
|
||||
parse_file,
|
||||
// For each direcitve, parsing text block.
|
||||
parse_block
|
||||
SrsDirectiveContextFile,
|
||||
// For each directive, parsing text block.
|
||||
SrsDirectiveContextBlock,
|
||||
};
|
||||
enum SrsDirectiveState {
|
||||
// Init state
|
||||
SrsDirectiveStateInit,
|
||||
// The directive terminated by ';' found
|
||||
SrsDirectiveStateEntire,
|
||||
// The token terminated by '{' found
|
||||
SrsDirectiveStateBlockStart,
|
||||
// The '}' found
|
||||
SrsDirectiveStateBlockEnd,
|
||||
// The config file is done
|
||||
SrsDirectiveStateEOF,
|
||||
};
|
||||
// Parse the conf from buffer. the work flow:
|
||||
// 1. read a token(directive args and a ret flag),
|
||||
// 2. initialize the directive by args, args[0] is name, args[1-N] is args of directive,
|
||||
// 3. if ret flag indicates there are child-directives, read_conf(directive, block) recursively.
|
||||
virtual srs_error_t parse_conf(srs_internal::SrsConfigBuffer* buffer, SrsDirectiveType type);
|
||||
virtual srs_error_t parse_conf(srs_internal::SrsConfigBuffer* buffer, SrsDirectiveContext ctx, SrsConfig* conf);
|
||||
// Read a token from buffer.
|
||||
// A token, is the directive args and a flag indicates whether has child-directives.
|
||||
// @param args, the output directive args, the first is the directive name, left is the args.
|
||||
// @param line_start, the actual start line of directive.
|
||||
// @return, an error code indicates error or has child-directives.
|
||||
virtual srs_error_t read_token(srs_internal::SrsConfigBuffer* buffer, std::vector<std::string>& args, int& line_start);
|
||||
virtual srs_error_t read_token(srs_internal::SrsConfigBuffer* buffer, std::vector<std::string>& args, int& line_start, SrsDirectiveState& state);
|
||||
};
|
||||
|
||||
// The config service provider.
|
||||
|
@ -250,6 +262,7 @@ private:
|
|||
// You could keep it before st-thread switch, or simply never keep it.
|
||||
class SrsConfig
|
||||
{
|
||||
friend class SrsConfDirective;
|
||||
// user command
|
||||
private:
|
||||
// Whether srs is run in dolphin mode.
|
||||
|
@ -356,6 +369,10 @@ private:
|
|||
public:
|
||||
// Parse the config file, which is specified by cli.
|
||||
virtual srs_error_t parse_file(const char* filename);
|
||||
private:
|
||||
// Build a buffer from a src, which is string content or filename.
|
||||
virtual srs_error_t build_buffer(std::string src, srs_internal::SrsConfigBuffer** pbuffer);
|
||||
public:
|
||||
// Check the parsed config.
|
||||
virtual srs_error_t check_config();
|
||||
protected:
|
||||
|
@ -607,6 +624,8 @@ public:
|
|||
virtual bool get_forward_enabled(SrsConfDirective* vhost);
|
||||
// Get the forward directive of vhost.
|
||||
virtual SrsConfDirective* get_forwards(std::string vhost);
|
||||
// Get the forward directive of backend.
|
||||
virtual SrsConfDirective* get_forward_backend(std::string vhost);
|
||||
|
||||
public:
|
||||
// Whether the srt sevice enabled
|
||||
|
|
|
@ -52,6 +52,8 @@ SrsForwarder::~SrsForwarder()
|
|||
|
||||
srs_freep(sh_video);
|
||||
srs_freep(sh_audio);
|
||||
|
||||
srs_freep(req);
|
||||
}
|
||||
|
||||
srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
|
||||
|
@ -60,7 +62,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
|
|||
|
||||
// it's ok to use the request object,
|
||||
// SrsLiveSource already copy it and never delete it.
|
||||
req = r;
|
||||
req = r->copy();
|
||||
|
||||
// the ep(endpoint) to forward to
|
||||
ep_forward = ep;
|
||||
|
|
|
@ -482,6 +482,76 @@ srs_error_t SrsHttpHooks::discover_co_workers(string url, string& host, int& por
|
|||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsHttpHooks::on_forward_backend(string url, SrsRequest* req, std::vector<std::string>& rtmp_urls)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
SrsContextId cid = _srs_context->get_id();
|
||||
|
||||
SrsStatistic* stat = SrsStatistic::instance();
|
||||
|
||||
SrsJsonObject* obj = SrsJsonAny::object();
|
||||
SrsAutoFree(SrsJsonObject, obj);
|
||||
|
||||
obj->set("action", SrsJsonAny::str("on_forward"));
|
||||
obj->set("server_id", SrsJsonAny::str(stat->server_id().c_str()));
|
||||
obj->set("client_id", SrsJsonAny::str(cid.c_str()));
|
||||
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
|
||||
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
|
||||
obj->set("app", SrsJsonAny::str(req->app.c_str()));
|
||||
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
|
||||
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
|
||||
obj->set("param", SrsJsonAny::str(req->param.c_str()));
|
||||
|
||||
std::string data = obj->dumps();
|
||||
std::string res;
|
||||
int status_code;
|
||||
|
||||
SrsHttpClient http;
|
||||
if ((err = do_post(&http, url, data, status_code, res)) != srs_success) {
|
||||
return srs_error_wrap(err, "http: on_forward_backend failed, client_id=%s, url=%s, request=%s, response=%s, code=%d",
|
||||
cid.c_str(), url.c_str(), data.c_str(), res.c_str(), status_code);
|
||||
}
|
||||
|
||||
// parse string res to json.
|
||||
SrsJsonAny* info = SrsJsonAny::loads(res);
|
||||
if (!info) {
|
||||
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "load json from %s", res.c_str());
|
||||
}
|
||||
SrsAutoFree(SrsJsonAny, info);
|
||||
|
||||
// response error code in string.
|
||||
if (!info->is_object()) {
|
||||
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "response %s", res.c_str());
|
||||
}
|
||||
|
||||
SrsJsonAny* prop = NULL;
|
||||
// response standard object, format in json: {}
|
||||
SrsJsonObject* res_info = info->to_object();
|
||||
if ((prop = res_info->ensure_property_object("data")) == NULL) {
|
||||
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse data %s", res.c_str());
|
||||
}
|
||||
|
||||
SrsJsonObject* p = prop->to_object();
|
||||
if ((prop = p->ensure_property_array("urls")) == NULL) {
|
||||
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "parse urls %s", res.c_str());
|
||||
}
|
||||
|
||||
SrsJsonArray* urls = prop->to_array();
|
||||
for (int i = 0; i < urls->count(); i++) {
|
||||
prop = urls->at(i);
|
||||
string rtmp_url = prop->to_str();
|
||||
if (!rtmp_url.empty()) {
|
||||
rtmp_urls.push_back(rtmp_url);
|
||||
}
|
||||
}
|
||||
|
||||
srs_trace("http: on_forward_backend ok, client_id=%s, url=%s, request=%s, response=%s",
|
||||
cid.c_str(), url.c_str(), data.c_str(), res.c_str());
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, string& res)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
#include <srs_core.hpp>
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
class SrsHttpUri;
|
||||
class SrsStSocket;
|
||||
|
@ -79,6 +80,10 @@ public:
|
|||
static srs_error_t on_hls_notify(SrsContextId cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify);
|
||||
// Discover co-workers for origin cluster.
|
||||
static srs_error_t discover_co_workers(std::string url, std::string& host, int& port);
|
||||
// The on_forward_backend hook, when publish stream start to forward
|
||||
// @param url the api server url, to valid the client.
|
||||
// ignore if empty.
|
||||
static srs_error_t on_forward_backend(std::string url, SrsRequest* req, std::vector<std::string>& rtmp_urls);
|
||||
private:
|
||||
static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res);
|
||||
};
|
||||
|
|
|
@ -34,6 +34,7 @@ using namespace std;
|
|||
#include <srs_app_dash.hpp>
|
||||
#include <srs_protocol_format.hpp>
|
||||
#include <srs_app_rtc_source.hpp>
|
||||
#include <srs_app_http_hooks.hpp>
|
||||
|
||||
#define CONST_MAX_JITTER_MS 250
|
||||
#define CONST_MAX_JITTER_MS_NEG -250
|
||||
|
@ -807,7 +808,7 @@ SrsSharedPtrMessage* SrsMixQueue::pop()
|
|||
SrsOriginHub::SrsOriginHub()
|
||||
{
|
||||
source = NULL;
|
||||
req = NULL;
|
||||
req_ = NULL;
|
||||
is_active = false;
|
||||
|
||||
hls = new SrsHls();
|
||||
|
@ -851,22 +852,22 @@ srs_error_t SrsOriginHub::initialize(SrsLiveSource* s, SrsRequest* r)
|
|||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
req = r;
|
||||
req_ = r;
|
||||
source = s;
|
||||
|
||||
if ((err = format->initialize()) != srs_success) {
|
||||
return srs_error_wrap(err, "format initialize");
|
||||
}
|
||||
|
||||
if ((err = hls->initialize(this, req)) != srs_success) {
|
||||
if ((err = hls->initialize(this, req_)) != srs_success) {
|
||||
return srs_error_wrap(err, "hls initialize");
|
||||
}
|
||||
|
||||
if ((err = dash->initialize(this, req)) != srs_success) {
|
||||
if ((err = dash->initialize(this, req_)) != srs_success) {
|
||||
return srs_error_wrap(err, "dash initialize");
|
||||
}
|
||||
|
||||
if ((err = dvr->initialize(this, req)) != srs_success) {
|
||||
if ((err = dvr->initialize(this, req_)) != srs_success) {
|
||||
return srs_error_wrap(err, "dvr initialize");
|
||||
}
|
||||
|
||||
|
@ -952,7 +953,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
|
|||
|
||||
// when got audio stream info.
|
||||
SrsStatistic* stat = SrsStatistic::instance();
|
||||
if ((err = stat->on_audio_info(req, SrsAudioCodecIdAAC, c->sound_rate, c->sound_type, c->aac_object)) != srs_success) {
|
||||
if ((err = stat->on_audio_info(req_, SrsAudioCodecIdAAC, c->sound_rate, c->sound_type, c->aac_object)) != srs_success) {
|
||||
return srs_error_wrap(err, "stat audio");
|
||||
}
|
||||
|
||||
|
@ -966,7 +967,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
|
|||
if ((err = hls->on_audio(msg, format)) != srs_success) {
|
||||
// apply the error strategy for hls.
|
||||
// @see https://github.com/ossrs/srs/issues/264
|
||||
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
|
||||
std::string hls_error_strategy = _srs_config->get_hls_on_error(req_->vhost);
|
||||
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
|
||||
srs_warn("hls: ignore audio error %s", srs_error_desc(err).c_str());
|
||||
hls->on_unpublish();
|
||||
|
@ -1025,7 +1026,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
|
|||
// user can disable the sps parse to workaround when parse sps failed.
|
||||
// @see https://github.com/ossrs/srs/issues/474
|
||||
if (is_sequence_header) {
|
||||
format->avc_parse_sps = _srs_config->get_parse_sps(req->vhost);
|
||||
format->avc_parse_sps = _srs_config->get_parse_sps(req_->vhost);
|
||||
}
|
||||
|
||||
if ((err = format->on_video(msg)) != srs_success) {
|
||||
|
@ -1046,7 +1047,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
|
|||
|
||||
// when got video stream info.
|
||||
SrsStatistic* stat = SrsStatistic::instance();
|
||||
if ((err = stat->on_video_info(req, SrsVideoCodecIdAVC, c->avc_profile, c->avc_level, c->width, c->height)) != srs_success) {
|
||||
if ((err = stat->on_video_info(req_, SrsVideoCodecIdAVC, c->avc_profile, c->avc_level, c->width, c->height)) != srs_success) {
|
||||
return srs_error_wrap(err, "stat video");
|
||||
}
|
||||
|
||||
|
@ -1066,7 +1067,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
|
|||
// TODO: We should support more strategies.
|
||||
// apply the error strategy for hls.
|
||||
// @see https://github.com/ossrs/srs/issues/264
|
||||
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
|
||||
std::string hls_error_strategy = _srs_config->get_hls_on_error(req_->vhost);
|
||||
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
|
||||
srs_warn("hls: ignore video error %s", srs_error_desc(err).c_str());
|
||||
hls->on_unpublish();
|
||||
|
@ -1126,7 +1127,7 @@ srs_error_t SrsOriginHub::on_publish()
|
|||
}
|
||||
|
||||
// TODO: FIXME: use initialize to set req.
|
||||
if ((err = encoder->on_publish(req)) != srs_success) {
|
||||
if ((err = encoder->on_publish(req_)) != srs_success) {
|
||||
return srs_error_wrap(err, "encoder publish");
|
||||
}
|
||||
|
||||
|
@ -1139,7 +1140,7 @@ srs_error_t SrsOriginHub::on_publish()
|
|||
}
|
||||
|
||||
// @see https://github.com/ossrs/srs/issues/1613#issuecomment-961657927
|
||||
if ((err = dvr->on_publish(req)) != srs_success) {
|
||||
if ((err = dvr->on_publish(req_)) != srs_success) {
|
||||
return srs_error_wrap(err, "dvr publish");
|
||||
}
|
||||
|
||||
|
@ -1151,7 +1152,7 @@ srs_error_t SrsOriginHub::on_publish()
|
|||
#endif
|
||||
|
||||
// TODO: FIXME: use initialize to set req.
|
||||
if ((err = ng_exec->on_publish(req)) != srs_success) {
|
||||
if ((err = ng_exec->on_publish(req_)) != srs_success) {
|
||||
return srs_error_wrap(err, "exec publish");
|
||||
}
|
||||
|
||||
|
@ -1236,7 +1237,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_forward(string vhost)
|
|||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
if (req->vhost != vhost) {
|
||||
if (req_->vhost != vhost) {
|
||||
return err;
|
||||
}
|
||||
|
||||
|
@ -1263,7 +1264,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost)
|
|||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
if (req->vhost != vhost) {
|
||||
if (req_->vhost != vhost) {
|
||||
return err;
|
||||
}
|
||||
|
||||
|
@ -1305,7 +1306,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost)
|
|||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
if (req->vhost != vhost) {
|
||||
if (req_->vhost != vhost) {
|
||||
return err;
|
||||
}
|
||||
|
||||
|
@ -1355,7 +1356,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_hds(string vhost)
|
|||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
if (req->vhost != vhost) {
|
||||
if (req_->vhost != vhost) {
|
||||
return err;
|
||||
}
|
||||
|
||||
|
@ -1382,7 +1383,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dvr(string vhost)
|
|||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
if (req->vhost != vhost) {
|
||||
if (req_->vhost != vhost) {
|
||||
return err;
|
||||
}
|
||||
|
||||
|
@ -1397,12 +1398,12 @@ srs_error_t SrsOriginHub::on_reload_vhost_dvr(string vhost)
|
|||
}
|
||||
|
||||
// reinitialize the dvr, update plan.
|
||||
if ((err = dvr->initialize(this, req)) != srs_success) {
|
||||
if ((err = dvr->initialize(this, req_)) != srs_success) {
|
||||
return srs_error_wrap(err, "reload dvr");
|
||||
}
|
||||
|
||||
// start to publish by new plan.
|
||||
if ((err = dvr->on_publish(req)) != srs_success) {
|
||||
if ((err = dvr->on_publish(req_)) != srs_success) {
|
||||
return srs_error_wrap(err, "dvr publish failed");
|
||||
}
|
||||
|
||||
|
@ -1419,7 +1420,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_transcode(string vhost)
|
|||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
if (req->vhost != vhost) {
|
||||
if (req_->vhost != vhost) {
|
||||
return err;
|
||||
}
|
||||
|
||||
|
@ -1432,7 +1433,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_transcode(string vhost)
|
|||
return err;
|
||||
}
|
||||
|
||||
if ((err = encoder->on_publish(req)) != srs_success) {
|
||||
if ((err = encoder->on_publish(req_)) != srs_success) {
|
||||
return srs_error_wrap(err, "start encoder failed");
|
||||
}
|
||||
srs_trace("vhost %s transcode reload success", vhost.c_str());
|
||||
|
@ -1444,7 +1445,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost)
|
|||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
if (req->vhost != vhost) {
|
||||
if (req_->vhost != vhost) {
|
||||
return err;
|
||||
}
|
||||
|
||||
|
@ -1457,7 +1458,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost)
|
|||
return err;
|
||||
}
|
||||
|
||||
if ((err = ng_exec->on_publish(req)) != srs_success) {
|
||||
if ((err = ng_exec->on_publish(req_)) != srs_success) {
|
||||
return srs_error_wrap(err, "start exec failed");
|
||||
}
|
||||
srs_trace("vhost %s exec reload success", vhost.c_str());
|
||||
|
@ -1469,11 +1470,24 @@ srs_error_t SrsOriginHub::create_forwarders()
|
|||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
if (!_srs_config->get_forward_enabled(req->vhost)) {
|
||||
if (!_srs_config->get_forward_enabled(req_->vhost)) {
|
||||
return err;
|
||||
}
|
||||
|
||||
SrsConfDirective* conf = _srs_config->get_forwards(req->vhost);
|
||||
|
||||
// For backend config
|
||||
// If backend is enabled and applied, ignore destination.
|
||||
bool applied_backend_server = false;
|
||||
if ((err = create_backend_forwarders(applied_backend_server)) != srs_success) {
|
||||
return srs_error_wrap(err, "create backend applied=%d", applied_backend_server);
|
||||
}
|
||||
|
||||
// Already applied backend server, ignore destination.
|
||||
if (applied_backend_server) {
|
||||
return err;
|
||||
}
|
||||
|
||||
// For destanition config
|
||||
SrsConfDirective* conf = _srs_config->get_forwards(req_->vhost);
|
||||
for (int i = 0; conf && i < (int)conf->args.size(); i++) {
|
||||
std::string forward_server = conf->args.at(i);
|
||||
|
||||
|
@ -1481,22 +1495,81 @@ srs_error_t SrsOriginHub::create_forwarders()
|
|||
forwarders.push_back(forwarder);
|
||||
|
||||
// initialize the forwarder with request.
|
||||
if ((err = forwarder->initialize(req, forward_server)) != srs_success) {
|
||||
if ((err = forwarder->initialize(req_, forward_server)) != srs_success) {
|
||||
return srs_error_wrap(err, "init forwarder");
|
||||
}
|
||||
|
||||
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
|
||||
srs_utime_t queue_size = _srs_config->get_queue_length(req_->vhost);
|
||||
forwarder->set_queue_size(queue_size);
|
||||
|
||||
if ((err = forwarder->on_publish()) != srs_success) {
|
||||
return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s",
|
||||
req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.c_str());
|
||||
req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), forward_server.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsOriginHub::create_backend_forwarders(bool& applied)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// default not configure backend service
|
||||
applied = false;
|
||||
|
||||
SrsConfDirective* conf = _srs_config->get_forward_backend(req_->vhost);
|
||||
if (!conf || conf->arg0().empty()) {
|
||||
return err;
|
||||
}
|
||||
|
||||
// configure backend service
|
||||
applied = true;
|
||||
|
||||
// only get first backend url
|
||||
std::string backend_url = conf->arg0();
|
||||
|
||||
// get urls on forward backend
|
||||
std::vector<std::string> urls;
|
||||
if ((err = SrsHttpHooks::on_forward_backend(backend_url, req_, urls)) != srs_success) {
|
||||
return srs_error_wrap(err, "get forward backend failed, backend=%s", backend_url.c_str());
|
||||
}
|
||||
|
||||
// create forwarders by urls
|
||||
std::vector<std::string>::iterator it;
|
||||
for (it = urls.begin(); it != urls.end(); ++it) {
|
||||
std::string url = *it;
|
||||
|
||||
// create temp Request by url
|
||||
SrsRequest* req = new SrsRequest();
|
||||
SrsAutoFree(SrsRequest, req);
|
||||
srs_parse_rtmp_url(url, req->tcUrl, req->stream);
|
||||
srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
|
||||
|
||||
// create forwarder
|
||||
SrsForwarder* forwarder = new SrsForwarder(this);
|
||||
forwarders.push_back(forwarder);
|
||||
|
||||
std::stringstream forward_server;
|
||||
forward_server << req->host << ":" << req->port;
|
||||
|
||||
// initialize the forwarder with request.
|
||||
if ((err = forwarder->initialize(req, forward_server.str())) != srs_success) {
|
||||
return srs_error_wrap(err, "init backend forwarder failed, forward-to=%s", forward_server.str().c_str());
|
||||
}
|
||||
|
||||
srs_utime_t queue_size = _srs_config->get_queue_length(req_->vhost);
|
||||
forwarder->set_queue_size(queue_size);
|
||||
|
||||
if ((err = forwarder->on_publish()) != srs_success) {
|
||||
return srs_error_wrap(err, "start backend forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s",
|
||||
req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), forward_server.str().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
void SrsOriginHub::destroy_forwarders()
|
||||
{
|
||||
std::vector<SrsForwarder*>::iterator it;
|
||||
|
|
|
@ -310,7 +310,7 @@ class SrsOriginHub : public ISrsReloadHandler
|
|||
{
|
||||
private:
|
||||
SrsLiveSource* source;
|
||||
SrsRequest* req;
|
||||
SrsRequest* req_;
|
||||
bool is_active;
|
||||
private:
|
||||
// The format, codec information.
|
||||
|
@ -375,6 +375,7 @@ public:
|
|||
virtual srs_error_t on_reload_vhost_exec(std::string vhost);
|
||||
private:
|
||||
virtual srs_error_t create_forwarders();
|
||||
virtual srs_error_t create_backend_forwarders(bool& applied);
|
||||
virtual void destroy_forwarders();
|
||||
};
|
||||
|
||||
|
|
|
@ -9,6 +9,6 @@
|
|||
|
||||
#define VERSION_MAJOR 5
|
||||
#define VERSION_MINOR 0
|
||||
#define VERSION_REVISION 22
|
||||
#define VERSION_REVISION 24
|
||||
|
||||
#endif
|
||||
|
|
|
@ -43,10 +43,6 @@
|
|||
#define ERROR_SYSTEM_ASSERT_FAILED 1021
|
||||
#define ERROR_READER_BUFFER_OVERFLOW 1022
|
||||
#define ERROR_SYSTEM_CONFIG_INVALID 1023
|
||||
#define ERROR_SYSTEM_CONFIG_DIRECTIVE 1024
|
||||
#define ERROR_SYSTEM_CONFIG_BLOCK_START 1025
|
||||
#define ERROR_SYSTEM_CONFIG_BLOCK_END 1026
|
||||
#define ERROR_SYSTEM_CONFIG_EOF 1027
|
||||
#define ERROR_SYSTEM_STREAM_BUSY 1028
|
||||
#define ERROR_SYSTEM_IP_INVALID 1029
|
||||
#define ERROR_SYSTEM_FORWARD_LOOP 1030
|
||||
|
|
|
@ -71,6 +71,29 @@ srs_error_t MockSrsConfig::parse(string buf)
|
|||
return err;
|
||||
}
|
||||
|
||||
srs_error_t MockSrsConfig::mock_include(const string file_name, const string content)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
included_files[file_name] = content;
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_error_t MockSrsConfig::build_buffer(std::string src, srs_internal::SrsConfigBuffer** pbuffer)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
string content = included_files[src];
|
||||
if(content.empty()) {
|
||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "file %s: no found", src.c_str());
|
||||
}
|
||||
|
||||
*pbuffer = new MockSrsConfigBuffer(content);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
VOID TEST(ConfigTest, CheckMacros)
|
||||
{
|
||||
#ifndef SRS_CONSTS_LOCALHOST
|
||||
|
@ -303,6 +326,63 @@ VOID TEST(ConfigDirectiveTest, ParseNameArg2_Dir0Arg0_Dir0Arg0)
|
|||
EXPECT_EQ(0, (int)ddir0.directives.size());
|
||||
}
|
||||
|
||||
VOID TEST(ConfigDirectiveTest, ParseArgsSpace)
|
||||
{
|
||||
srs_error_t err;
|
||||
|
||||
if (true) {
|
||||
vector <string> usecases;
|
||||
usecases.push_back("include;");
|
||||
usecases.push_back("include ;");
|
||||
usecases.push_back("include ;");
|
||||
usecases.push_back("include ;");;
|
||||
usecases.push_back("include\r;");
|
||||
usecases.push_back("include\n;");
|
||||
usecases.push_back("include \r \n \r\n \n\r;");
|
||||
for (int i = 0; i < (int)usecases.size(); i++) {
|
||||
string usecase = usecases.at(i);
|
||||
|
||||
MockSrsConfigBuffer buf(usecase);
|
||||
SrsConfDirective conf;
|
||||
HELPER_ASSERT_FAILED(conf.parse(&buf));
|
||||
EXPECT_EQ(0, (int) conf.name.length());
|
||||
EXPECT_EQ(0, (int) conf.args.size());
|
||||
EXPECT_EQ(0, (int) conf.directives.size());
|
||||
}
|
||||
}
|
||||
|
||||
if (true) {
|
||||
vector <string> usecases;
|
||||
usecases.push_back("include test;");
|
||||
usecases.push_back("include test;");
|
||||
usecases.push_back("include test;");
|
||||
usecases.push_back("include test;");;
|
||||
usecases.push_back("include\rtest;");
|
||||
usecases.push_back("include\ntest;");
|
||||
usecases.push_back("include \r \n \r\n \n\rtest;");
|
||||
|
||||
MockSrsConfig config;
|
||||
config.mock_include("test", "listen 1935;");
|
||||
|
||||
for (int i = 0; i < (int)usecases.size(); i++) {
|
||||
string usecase = usecases.at(i);
|
||||
|
||||
MockSrsConfigBuffer buf(usecase);
|
||||
SrsConfDirective conf;
|
||||
HELPER_ASSERT_SUCCESS(conf.parse(&buf, &config));
|
||||
EXPECT_EQ(0, (int) conf.name.length());
|
||||
EXPECT_EQ(0, (int) conf.args.size());
|
||||
EXPECT_EQ(1, (int) conf.directives.size());
|
||||
|
||||
SrsConfDirective &dir = *conf.directives.at(0);
|
||||
EXPECT_STREQ("listen", dir.name.c_str());
|
||||
EXPECT_EQ(1, (int) dir.args.size());
|
||||
EXPECT_STREQ("1935", dir.arg0().c_str());
|
||||
EXPECT_EQ(0, (int) dir.directives.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VOID TEST(ConfigDirectiveTest, Parse2SingleDirs)
|
||||
{
|
||||
srs_error_t err;
|
||||
|
@ -2914,6 +2994,13 @@ VOID TEST(ConfigMainTest, CheckVhostConfig2)
|
|||
EXPECT_EQ(5000000, conf.get_publish_normal_timeout("ossrs.net"));
|
||||
EXPECT_FALSE(conf.get_forward_enabled("ossrs.net"));
|
||||
EXPECT_TRUE(conf.get_forwards("ossrs.net") == NULL);
|
||||
EXPECT_TRUE(conf.get_forward_backend("ossrs.net") == NULL);
|
||||
}
|
||||
|
||||
if (true) {
|
||||
MockSrsConfig conf;
|
||||
HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "vhost ossrs.net{forward {backend xxx;}}"));
|
||||
EXPECT_TRUE(conf.get_forward_backend("ossrs.net") != NULL);
|
||||
}
|
||||
|
||||
if (true) {
|
||||
|
@ -3622,3 +3709,190 @@ VOID TEST(ConfigMainTest, CheckVhostConfig5)
|
|||
}
|
||||
}
|
||||
|
||||
VOID TEST(ConfigMainTest, CheckIncludeConfig)
|
||||
{
|
||||
srs_error_t err;
|
||||
|
||||
if (true) {
|
||||
MockSrsConfig conf;
|
||||
|
||||
conf.mock_include("./conf/include_test/include.conf", "listen 1935;include ./conf/include_test/include_1.conf;");
|
||||
conf.mock_include("./conf/include_test/include_1.conf", "max_connections 1000;daemon off;srs_log_tank console;http_server {enabled on;listen xxx;dir xxx2;}vhost ossrs.net {hls {enabled on;hls_path xxx;hls_m3u8_file xxx1;hls_ts_file xxx2;hls_fragment 10;hls_window 60;}}");
|
||||
|
||||
HELPER_ASSERT_SUCCESS(conf.parse("include ./conf/include_test/include.conf;"));
|
||||
|
||||
vector<string> listens = conf.get_listens();
|
||||
EXPECT_EQ(1, (int)listens.size());
|
||||
EXPECT_STREQ("1935", listens.at(0).c_str());
|
||||
|
||||
EXPECT_FALSE(conf.get_log_tank_file());
|
||||
|
||||
EXPECT_TRUE(conf.get_http_stream_enabled());
|
||||
EXPECT_STREQ("xxx", conf.get_http_stream_listen().c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_http_stream_dir().c_str());
|
||||
|
||||
EXPECT_TRUE(conf.get_hls_enabled("ossrs.net"));
|
||||
EXPECT_STREQ("xxx", conf.get_hls_path("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx1", conf.get_hls_m3u8_file("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_hls_ts_file("ossrs.net").c_str());
|
||||
EXPECT_EQ(10*SRS_UTIME_SECONDS, conf.get_hls_fragment("ossrs.net"));
|
||||
EXPECT_EQ(60*SRS_UTIME_SECONDS, conf.get_hls_window("ossrs.net"));
|
||||
}
|
||||
|
||||
if (true) {
|
||||
MockSrsConfig conf;
|
||||
|
||||
conf.mock_include("./conf/include_test/include_1.conf", "max_connections 1000;daemon off;srs_log_tank console;http_server {enabled on;listen xxx;dir xxx2;}vhost ossrs.net {hls {enabled on;hls_path xxx;hls_m3u8_file xxx1;hls_ts_file xxx2;hls_fragment 10;hls_window 60;}}");
|
||||
|
||||
HELPER_ASSERT_SUCCESS(conf.parse(_MIN_OK_CONF "include ./conf/include_test/include_1.conf;"));
|
||||
|
||||
vector<string> listens = conf.get_listens();
|
||||
EXPECT_EQ(1, (int)listens.size());
|
||||
EXPECT_STREQ("1935", listens.at(0).c_str());
|
||||
|
||||
EXPECT_FALSE(conf.get_log_tank_file());
|
||||
|
||||
EXPECT_TRUE(conf.get_http_stream_enabled());
|
||||
EXPECT_STREQ("xxx", conf.get_http_stream_listen().c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_http_stream_dir().c_str());
|
||||
|
||||
EXPECT_TRUE(conf.get_hls_enabled("ossrs.net"));
|
||||
EXPECT_STREQ("xxx", conf.get_hls_path("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx1", conf.get_hls_m3u8_file("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_hls_ts_file("ossrs.net").c_str());
|
||||
EXPECT_EQ(10*SRS_UTIME_SECONDS, conf.get_hls_fragment("ossrs.net"));
|
||||
EXPECT_EQ(60*SRS_UTIME_SECONDS, conf.get_hls_window("ossrs.net"));
|
||||
}
|
||||
|
||||
if (true) {
|
||||
MockSrsConfig conf;
|
||||
|
||||
conf.mock_include("./conf/include_test/include_2.conf", "listen 1935;max_connections 1000;daemon off;srs_log_tank console;http_server {enabled on;listen xxx;dir xxx2;}vhost ossrs.net {include ./conf/include_test/include_3.conf;}");
|
||||
conf.mock_include("./conf/include_test/include_3.conf", "hls {enabled on;hls_path xxx;hls_m3u8_file xxx1;hls_ts_file xxx2;hls_fragment 10;hls_window 60;}");
|
||||
|
||||
HELPER_ASSERT_SUCCESS(conf.parse("include ./conf/include_test/include_2.conf;"));
|
||||
|
||||
vector<string> listens = conf.get_listens();
|
||||
EXPECT_EQ(1, (int)listens.size());
|
||||
EXPECT_STREQ("1935", listens.at(0).c_str());
|
||||
|
||||
EXPECT_FALSE(conf.get_log_tank_file());
|
||||
|
||||
EXPECT_TRUE(conf.get_http_stream_enabled());
|
||||
EXPECT_STREQ("xxx", conf.get_http_stream_listen().c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_http_stream_dir().c_str());
|
||||
|
||||
EXPECT_TRUE(conf.get_hls_enabled("ossrs.net"));
|
||||
EXPECT_STREQ("xxx", conf.get_hls_path("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx1", conf.get_hls_m3u8_file("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_hls_ts_file("ossrs.net").c_str());
|
||||
EXPECT_EQ(10*SRS_UTIME_SECONDS, conf.get_hls_fragment("ossrs.net"));
|
||||
EXPECT_EQ(60*SRS_UTIME_SECONDS, conf.get_hls_window("ossrs.net"));
|
||||
}
|
||||
|
||||
if (true) {
|
||||
MockSrsConfig conf;
|
||||
|
||||
conf.mock_include("./conf/include_test/include_3.conf", "hls {enabled on;hls_path xxx;hls_m3u8_file xxx1;hls_ts_file xxx2;hls_fragment 10;hls_window 60;}");
|
||||
conf.mock_include("./conf/include_test/include_4.conf", "listen 1935;max_connections 1000;daemon off;srs_log_tank console;include ./conf/include_test/include_5.conf;vhost ossrs.net {include ./conf/include_test/include_3.conf;}include ./conf/include_test/include_6.conf;");
|
||||
conf.mock_include("./conf/include_test/include_5.conf", "http_server {enabled on;listen xxx;dir xxx2;}");
|
||||
conf.mock_include("./conf/include_test/include_6.conf", "http_api {enabled on;listen xxx;}");
|
||||
|
||||
HELPER_ASSERT_SUCCESS(conf.parse("include ./conf/include_test/include_4.conf;"));
|
||||
|
||||
vector<string> listens = conf.get_listens();
|
||||
EXPECT_EQ(1, (int)listens.size());
|
||||
EXPECT_STREQ("1935", listens.at(0).c_str());
|
||||
|
||||
EXPECT_FALSE(conf.get_log_tank_file());
|
||||
|
||||
EXPECT_TRUE(conf.get_http_stream_enabled());
|
||||
EXPECT_STREQ("xxx", conf.get_http_stream_listen().c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_http_stream_dir().c_str());
|
||||
|
||||
EXPECT_TRUE(conf.get_hls_enabled("ossrs.net"));
|
||||
EXPECT_STREQ("xxx", conf.get_hls_path("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx1", conf.get_hls_m3u8_file("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_hls_ts_file("ossrs.net").c_str());
|
||||
EXPECT_EQ(10*SRS_UTIME_SECONDS, conf.get_hls_fragment("ossrs.net"));
|
||||
EXPECT_EQ(60*SRS_UTIME_SECONDS, conf.get_hls_window("ossrs.net"));
|
||||
|
||||
EXPECT_TRUE(conf.get_http_api_enabled());
|
||||
EXPECT_STREQ("xxx", conf.get_http_api_listen().c_str());
|
||||
}
|
||||
|
||||
if (true) {
|
||||
MockSrsConfig conf;
|
||||
|
||||
conf.mock_include("./conf/include_test/include_3.conf", "hls {enabled on;hls_path xxx;hls_m3u8_file xxx1;hls_ts_file xxx2;hls_fragment 10;hls_window 60;}");
|
||||
conf.mock_include("./conf/include_test/include_4.conf", "listen 1935;max_connections 1000;daemon off;srs_log_tank console;include ./conf/include_test/include_5.conf ./conf/include_test/include_6.conf;vhost ossrs.net {include ./conf/include_test/include_3.conf;}");
|
||||
conf.mock_include("./conf/include_test/include_5.conf", "http_server {enabled on;listen xxx;dir xxx2;}");
|
||||
conf.mock_include("./conf/include_test/include_6.conf", "http_api {enabled on;listen xxx;}");
|
||||
|
||||
HELPER_ASSERT_SUCCESS(conf.parse("include ./conf/include_test/include_4.conf;"));
|
||||
|
||||
vector<string> listens = conf.get_listens();
|
||||
EXPECT_EQ(1, (int)listens.size());
|
||||
EXPECT_STREQ("1935", listens.at(0).c_str());
|
||||
|
||||
EXPECT_FALSE(conf.get_log_tank_file());
|
||||
|
||||
EXPECT_TRUE(conf.get_http_stream_enabled());
|
||||
EXPECT_STREQ("xxx", conf.get_http_stream_listen().c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_http_stream_dir().c_str());
|
||||
|
||||
EXPECT_TRUE(conf.get_http_api_enabled());
|
||||
EXPECT_STREQ("xxx", conf.get_http_api_listen().c_str());
|
||||
|
||||
EXPECT_TRUE(conf.get_hls_enabled("ossrs.net"));
|
||||
EXPECT_STREQ("xxx", conf.get_hls_path("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx1", conf.get_hls_m3u8_file("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_hls_ts_file("ossrs.net").c_str());
|
||||
EXPECT_EQ(10*SRS_UTIME_SECONDS, conf.get_hls_fragment("ossrs.net"));
|
||||
EXPECT_EQ(60*SRS_UTIME_SECONDS, conf.get_hls_window("ossrs.net"));
|
||||
}
|
||||
|
||||
if (true) {
|
||||
MockSrsConfig conf;
|
||||
|
||||
conf.mock_include("./conf/include_test/include_3.conf", "hls {enabled on;hls_path xxx;hls_m3u8_file xxx1;hls_ts_file xxx2;hls_fragment 10;hls_window 60;}");
|
||||
conf.mock_include("./conf/include_test/include_4.conf", "listen 1935;max_connections 1000;daemon off;srs_log_tank console;include ./conf/include_test/include_5.conf ./conf/include_test/include_6.conf;vhost ossrs.net {include ./conf/include_test/include_3.conf ./conf/include_test/include_7.conf;}");
|
||||
conf.mock_include("./conf/include_test/include_5.conf", "http_server {enabled on;listen xxx;dir xxx2;}");
|
||||
conf.mock_include("./conf/include_test/include_6.conf", "http_api {enabled on;listen xxx;}");
|
||||
conf.mock_include("./conf/include_test/include_7.conf", "dash{enabled on;dash_fragment 10;dash_update_period 10;dash_timeshift 10;dash_path xxx;dash_mpd_file xxx2;}");
|
||||
|
||||
HELPER_ASSERT_SUCCESS(conf.parse("include ./conf/include_test/include_4.conf;"));
|
||||
|
||||
vector<string> listens = conf.get_listens();
|
||||
EXPECT_EQ(1, (int)listens.size());
|
||||
EXPECT_STREQ("1935", listens.at(0).c_str());
|
||||
|
||||
EXPECT_FALSE(conf.get_log_tank_file());
|
||||
|
||||
EXPECT_TRUE(conf.get_http_stream_enabled());
|
||||
EXPECT_STREQ("xxx", conf.get_http_stream_listen().c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_http_stream_dir().c_str());
|
||||
|
||||
EXPECT_TRUE(conf.get_http_api_enabled());
|
||||
EXPECT_STREQ("xxx", conf.get_http_api_listen().c_str());
|
||||
|
||||
EXPECT_TRUE(conf.get_hls_enabled("ossrs.net"));
|
||||
EXPECT_STREQ("xxx", conf.get_hls_path("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx1", conf.get_hls_m3u8_file("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_hls_ts_file("ossrs.net").c_str());
|
||||
EXPECT_EQ(10*SRS_UTIME_SECONDS, conf.get_hls_fragment("ossrs.net"));
|
||||
EXPECT_EQ(60*SRS_UTIME_SECONDS, conf.get_hls_window("ossrs.net"));
|
||||
|
||||
EXPECT_EQ(10*SRS_UTIME_SECONDS, conf.get_dash_fragment("ossrs.net"));
|
||||
EXPECT_EQ(10*SRS_UTIME_SECONDS, conf.get_dash_update_period("ossrs.net"));
|
||||
EXPECT_EQ(10*SRS_UTIME_SECONDS, conf.get_dash_timeshift("ossrs.net"));
|
||||
EXPECT_STREQ("xxx", conf.get_dash_path("ossrs.net").c_str());
|
||||
EXPECT_STREQ("xxx2", conf.get_dash_mpd_file("ossrs.net").c_str());
|
||||
}
|
||||
|
||||
if (true) {
|
||||
MockSrsConfig conf;
|
||||
|
||||
HELPER_ASSERT_FAILED(conf.parse("include ./conf/include_test/include.conf;"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,8 +32,13 @@ class MockSrsConfig : public SrsConfig
|
|||
public:
|
||||
MockSrsConfig();
|
||||
virtual ~MockSrsConfig();
|
||||
private:
|
||||
std::map<std::string, std::string> included_files;
|
||||
public:
|
||||
virtual srs_error_t parse(std::string buf);
|
||||
virtual srs_error_t mock_include(const std::string file_name, const std::string content);
|
||||
protected:
|
||||
virtual srs_error_t build_buffer(std::string src, srs_internal::SrsConfigBuffer** pbuffer);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue