1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Forward: support config full rtmp url forward to other server (#2799)

* Forward: add backend config and demo server for dynamic create forwarder to other server.(#1342)

* Forward: if call forward backend failed, then return directly.

* Forward: add API description and change return value format.

* Forward: add backend conf file and wrapper function for backend service.

* Forward: add backend comment in full.conf and update forward.backend.conf.

* Forward: rename backend param and add comment tips.
This commit is contained in:
chundonglinlin 2022-02-16 10:49:16 +08:00 committed by GitHub
parent 9379ebbc2c
commit 03cf93fc2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 335 additions and 33 deletions

View file

@ -34,6 +34,7 @@ using namespace std;
#include <srs_app_dash.hpp>
#include <srs_protocol_format.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_http_hooks.hpp>
#define CONST_MAX_JITTER_MS 250
#define CONST_MAX_JITTER_MS_NEG -250
@ -807,7 +808,7 @@ SrsSharedPtrMessage* SrsMixQueue::pop()
SrsOriginHub::SrsOriginHub()
{
source = NULL;
req = NULL;
req_ = NULL;
is_active = false;
hls = new SrsHls();
@ -851,22 +852,22 @@ srs_error_t SrsOriginHub::initialize(SrsLiveSource* s, SrsRequest* r)
{
srs_error_t err = srs_success;
req = r;
req_ = r;
source = s;
if ((err = format->initialize()) != srs_success) {
return srs_error_wrap(err, "format initialize");
}
if ((err = hls->initialize(this, req)) != srs_success) {
if ((err = hls->initialize(this, req_)) != srs_success) {
return srs_error_wrap(err, "hls initialize");
}
if ((err = dash->initialize(this, req)) != srs_success) {
if ((err = dash->initialize(this, req_)) != srs_success) {
return srs_error_wrap(err, "dash initialize");
}
if ((err = dvr->initialize(this, req)) != srs_success) {
if ((err = dvr->initialize(this, req_)) != srs_success) {
return srs_error_wrap(err, "dvr initialize");
}
@ -952,7 +953,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
// when got audio stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_audio_info(req, SrsAudioCodecIdAAC, c->sound_rate, c->sound_type, c->aac_object)) != srs_success) {
if ((err = stat->on_audio_info(req_, SrsAudioCodecIdAAC, c->sound_rate, c->sound_type, c->aac_object)) != srs_success) {
return srs_error_wrap(err, "stat audio");
}
@ -966,7 +967,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio)
if ((err = hls->on_audio(msg, format)) != srs_success) {
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
std::string hls_error_strategy = _srs_config->get_hls_on_error(req_->vhost);
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
srs_warn("hls: ignore audio error %s", srs_error_desc(err).c_str());
hls->on_unpublish();
@ -1025,7 +1026,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
// user can disable the sps parse to workaround when parse sps failed.
// @see https://github.com/ossrs/srs/issues/474
if (is_sequence_header) {
format->avc_parse_sps = _srs_config->get_parse_sps(req->vhost);
format->avc_parse_sps = _srs_config->get_parse_sps(req_->vhost);
}
if ((err = format->on_video(msg)) != srs_success) {
@ -1046,7 +1047,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
// when got video stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_video_info(req, SrsVideoCodecIdAVC, c->avc_profile, c->avc_level, c->width, c->height)) != srs_success) {
if ((err = stat->on_video_info(req_, SrsVideoCodecIdAVC, c->avc_profile, c->avc_level, c->width, c->height)) != srs_success) {
return srs_error_wrap(err, "stat video");
}
@ -1066,7 +1067,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se
// TODO: We should support more strategies.
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost);
std::string hls_error_strategy = _srs_config->get_hls_on_error(req_->vhost);
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
srs_warn("hls: ignore video error %s", srs_error_desc(err).c_str());
hls->on_unpublish();
@ -1126,7 +1127,7 @@ srs_error_t SrsOriginHub::on_publish()
}
// TODO: FIXME: use initialize to set req.
if ((err = encoder->on_publish(req)) != srs_success) {
if ((err = encoder->on_publish(req_)) != srs_success) {
return srs_error_wrap(err, "encoder publish");
}
@ -1139,7 +1140,7 @@ srs_error_t SrsOriginHub::on_publish()
}
// @see https://github.com/ossrs/srs/issues/1613#issuecomment-961657927
if ((err = dvr->on_publish(req)) != srs_success) {
if ((err = dvr->on_publish(req_)) != srs_success) {
return srs_error_wrap(err, "dvr publish");
}
@ -1151,7 +1152,7 @@ srs_error_t SrsOriginHub::on_publish()
#endif
// TODO: FIXME: use initialize to set req.
if ((err = ng_exec->on_publish(req)) != srs_success) {
if ((err = ng_exec->on_publish(req_)) != srs_success) {
return srs_error_wrap(err, "exec publish");
}
@ -1236,7 +1237,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_forward(string vhost)
{
srs_error_t err = srs_success;
if (req->vhost != vhost) {
if (req_->vhost != vhost) {
return err;
}
@ -1263,7 +1264,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost)
{
srs_error_t err = srs_success;
if (req->vhost != vhost) {
if (req_->vhost != vhost) {
return err;
}
@ -1305,7 +1306,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost)
{
srs_error_t err = srs_success;
if (req->vhost != vhost) {
if (req_->vhost != vhost) {
return err;
}
@ -1355,7 +1356,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_hds(string vhost)
{
srs_error_t err = srs_success;
if (req->vhost != vhost) {
if (req_->vhost != vhost) {
return err;
}
@ -1382,7 +1383,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dvr(string vhost)
{
srs_error_t err = srs_success;
if (req->vhost != vhost) {
if (req_->vhost != vhost) {
return err;
}
@ -1397,12 +1398,12 @@ srs_error_t SrsOriginHub::on_reload_vhost_dvr(string vhost)
}
// reinitialize the dvr, update plan.
if ((err = dvr->initialize(this, req)) != srs_success) {
if ((err = dvr->initialize(this, req_)) != srs_success) {
return srs_error_wrap(err, "reload dvr");
}
// start to publish by new plan.
if ((err = dvr->on_publish(req)) != srs_success) {
if ((err = dvr->on_publish(req_)) != srs_success) {
return srs_error_wrap(err, "dvr publish failed");
}
@ -1419,7 +1420,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_transcode(string vhost)
{
srs_error_t err = srs_success;
if (req->vhost != vhost) {
if (req_->vhost != vhost) {
return err;
}
@ -1432,7 +1433,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_transcode(string vhost)
return err;
}
if ((err = encoder->on_publish(req)) != srs_success) {
if ((err = encoder->on_publish(req_)) != srs_success) {
return srs_error_wrap(err, "start encoder failed");
}
srs_trace("vhost %s transcode reload success", vhost.c_str());
@ -1444,7 +1445,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost)
{
srs_error_t err = srs_success;
if (req->vhost != vhost) {
if (req_->vhost != vhost) {
return err;
}
@ -1457,7 +1458,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost)
return err;
}
if ((err = ng_exec->on_publish(req)) != srs_success) {
if ((err = ng_exec->on_publish(req_)) != srs_success) {
return srs_error_wrap(err, "start exec failed");
}
srs_trace("vhost %s exec reload success", vhost.c_str());
@ -1469,11 +1470,24 @@ srs_error_t SrsOriginHub::create_forwarders()
{
srs_error_t err = srs_success;
if (!_srs_config->get_forward_enabled(req->vhost)) {
if (!_srs_config->get_forward_enabled(req_->vhost)) {
return err;
}
SrsConfDirective* conf = _srs_config->get_forwards(req->vhost);
// For backend config
// If backend is enabled and applied, ignore destination.
bool applied_backend_server = false;
if ((err = create_backend_forwarders(applied_backend_server)) != srs_success) {
return srs_error_wrap(err, "create backend applied=%d", applied_backend_server);
}
// Already applied backend server, ignore destination.
if (applied_backend_server) {
return err;
}
// For destanition config
SrsConfDirective* conf = _srs_config->get_forwards(req_->vhost);
for (int i = 0; conf && i < (int)conf->args.size(); i++) {
std::string forward_server = conf->args.at(i);
@ -1481,22 +1495,81 @@ srs_error_t SrsOriginHub::create_forwarders()
forwarders.push_back(forwarder);
// initialize the forwarder with request.
if ((err = forwarder->initialize(req, forward_server)) != srs_success) {
if ((err = forwarder->initialize(req_, forward_server)) != srs_success) {
return srs_error_wrap(err, "init forwarder");
}
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
srs_utime_t queue_size = _srs_config->get_queue_length(req_->vhost);
forwarder->set_queue_size(queue_size);
if ((err = forwarder->on_publish()) != srs_success) {
return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s",
req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.c_str());
req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), forward_server.c_str());
}
}
return err;
}
srs_error_t SrsOriginHub::create_backend_forwarders(bool& applied)
{
srs_error_t err = srs_success;
// default not configure backend service
applied = false;
SrsConfDirective* conf = _srs_config->get_forward_backend(req_->vhost);
if (!conf || conf->arg0().empty()) {
return err;
}
// configure backend service
applied = true;
// only get first backend url
std::string backend_url = conf->arg0();
// get urls on forward backend
std::vector<std::string> urls;
if ((err = SrsHttpHooks::on_forward_backend(backend_url, req_, urls)) != srs_success) {
return srs_error_wrap(err, "get forward backend failed, backend=%s", backend_url.c_str());
}
// create forwarders by urls
std::vector<std::string>::iterator it;
for (it = urls.begin(); it != urls.end(); ++it) {
std::string url = *it;
// create temp Request by url
SrsRequest* req = new SrsRequest();
SrsAutoFree(SrsRequest, req);
srs_parse_rtmp_url(url, req->tcUrl, req->stream);
srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
// create forwarder
SrsForwarder* forwarder = new SrsForwarder(this);
forwarders.push_back(forwarder);
std::stringstream forward_server;
forward_server << req->host << ":" << req->port;
// initialize the forwarder with request.
if ((err = forwarder->initialize(req, forward_server.str())) != srs_success) {
return srs_error_wrap(err, "init backend forwarder failed, forward-to=%s", forward_server.str().c_str());
}
srs_utime_t queue_size = _srs_config->get_queue_length(req_->vhost);
forwarder->set_queue_size(queue_size);
if ((err = forwarder->on_publish()) != srs_success) {
return srs_error_wrap(err, "start backend forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s",
req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), forward_server.str().c_str());
}
}
return err;
}
void SrsOriginHub::destroy_forwarders()
{
std::vector<SrsForwarder*>::iterator it;