From 181c9f25ed88dba98f62303c2b911405c0737e43 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 19 Feb 2021 18:16:05 +0800 Subject: [PATCH] Live: Support connect origin by HTTP-FLV/HTTPS-FLV --- trunk/conf/full.conf | 7 + trunk/src/app/srs_app_config.cpp | 24 ++- trunk/src/app/srs_app_config.hpp | 2 + trunk/src/app/srs_app_edge.cpp | 188 +++++++++++++++++- trunk/src/app/srs_app_edge.hpp | 31 +++ .../src/protocol/srs_service_http_client.hpp | 2 +- 6 files changed, 250 insertions(+), 4 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 4e0c76424..37c3b3cb1 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -680,6 +680,13 @@ vhost cluster.srs.com { # please read: https://github.com/ossrs/srs/wiki/v3_EN_OriginCluster # TODO: FIXME: Support reload. coworkers 127.0.0.1:9091 127.0.0.1:9092; + + # The protocol to connect to origin. + # rtmp, Connect origin by RTMP + # flv, Connect origin by HTTP-FLV + # flvs, Connect origin by HTTPS-FLV + # Default: rtmp + protocol rtmp; } } diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index efd7e236b..0df692464 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3829,7 +3829,7 @@ srs_error_t SrsConfig::check_normal_config() for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; if (m != "mode" && m != "origin" && m != "token_traverse" && m != "vhost" && m != "debug_srs_upnode" && m != "coworkers" - && m != "origin_cluster") { + && m != "origin_cluster" && m != "protocol") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.cluster.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -6042,6 +6042,28 @@ SrsConfDirective* SrsConfig::get_vhost_edge_origin(string vhost) return conf->get("origin"); } +string SrsConfig::get_vhost_edge_protocol(string vhost) +{ + static string DEFAULT = "rtmp"; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("cluster"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("protocol"); + if (!conf) { + return DEFAULT; + } + + return conf->arg0(); +} + bool SrsConfig::get_vhost_edge_token_traverse(string vhost) { static bool DEFAULT = false; diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 507467073..08cb4f13b 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -759,6 +759,8 @@ public: // Get the origin config of edge, // specifies the origin ip address, port. virtual SrsConfDirective* get_vhost_edge_origin(std::string vhost); + // Get the procotol to connect to origin server. + virtual std::string get_vhost_edge_protocol(std::string vhost); // Whether edge token tranverse is enabled, // If true, edge will send connect origin to verfy the token of client. // For example, we verify all clients on the origin FMS by server-side as, diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 13885d1ac..64fbe61fa 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -46,6 +46,11 @@ using namespace std; #include #include #include +#include +#include +#include +#include +#include // when edge timeout, retry next. #define SRS_EDGE_INGESTER_TIMEOUT (5 * SRS_UTIME_SECONDS) @@ -169,6 +174,180 @@ void SrsEdgeRtmpUpstream::kbps_sample(const char* label, int64_t age) sdk->kbps_sample(label, age); } +SrsEdgeFlvUpstream::SrsEdgeFlvUpstream(std::string schema) +{ + schema_ = schema; + selected_port = 0; + + sdk_ = NULL; + hr_ = NULL; + reader_ = NULL; + decoder_ = NULL; +} + +SrsEdgeFlvUpstream::~SrsEdgeFlvUpstream() +{ + close(); +} + +srs_error_t SrsEdgeFlvUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) +{ + srs_error_t err = srs_success; + + SrsRequest* req = r; + + if (true) { + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); + + // @see https://github.com/ossrs/srs/issues/79 + // when origin is error, for instance, server is shutdown, + // then user remove the vhost then reload, the conf is empty. + if (!conf) { + return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str()); + } + + // select the origin. + std::string server = lb->select(conf->args); + int port = SRS_DEFAULT_HTTP_PORT; + if (schema_ == "https") { + port = SRS_DEFAULT_HTTPS_PORT; + } + srs_parse_hostport(server, server, port); + + // Remember the current selected server. + selected_ip = server; + selected_port = port; + } + + srs_freep(sdk_); + sdk_ = new SrsHttpClient(); + + string path = "/" + req->app + "/" + req->stream + ".flv"; + if (!req->param.empty()) { + path += req->param; + } + + string url = schema_ + "://" + selected_ip + ":" + srs_int2str(selected_port); + url += path; + + srs_utime_t cto = SRS_EDGE_INGESTER_TIMEOUT; + if ((err = sdk_->initialize(schema_, selected_ip, selected_port, cto)) != srs_success) { + return srs_error_wrap(err, "edge pull %s failed, cto=%dms.", url.c_str(), srsu2msi(cto)); + } + + srs_freep(hr_); + if ((err = sdk_->get(path, "", &hr_)) != srs_success) { + return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str()); + } + + srs_freep(reader_); + reader_ = new SrsHttpFileReader(hr_->body_reader()); + + srs_freep(decoder_); + decoder_ = new SrsFlvDecoder(); + + if ((err = decoder_->initialize(reader_)) != srs_success) { + return srs_error_wrap(err, "init decoder"); + } + + char header[9]; + if ((err = decoder_->read_header(header)) != srs_success) { + return srs_error_wrap(err, "read header"); + } + + char pps[4]; + if ((err = decoder_->read_previous_tag_size(pps)) != srs_success) { + return srs_error_wrap(err, "read pts"); + } + + return err; +} + +srs_error_t SrsEdgeFlvUpstream::recv_message(SrsCommonMessage** pmsg) +{ + srs_error_t err = srs_success; + + char type; + int32_t size; + uint32_t time; + if ((err = decoder_->read_tag_header(&type, &size, &time)) != srs_success) { + return srs_error_wrap(err, "read tag header"); + } + + char* data = new char[size]; + if ((err = decoder_->read_tag_data(data, size)) != srs_success) { + srs_freepa(data); + return srs_error_wrap(err, "read tag data"); + } + + char pps[4]; + if ((err = decoder_->read_previous_tag_size(pps)) != srs_success) { + return srs_error_wrap(err, "read pts"); + } + + int stream_id = 1; + SrsCommonMessage* msg = NULL; + if ((err = srs_rtmp_create_msg(type, time, data, size, stream_id, &msg)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + + *pmsg = msg; + + return err; +} + +srs_error_t SrsEdgeFlvUpstream::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) +{ + srs_error_t err = srs_success; + + SrsPacket* packet = NULL; + SrsBuffer stream(msg->payload, msg->size); + SrsMessageHeader& header = msg->header; + + if (header.is_amf0_data() || header.is_amf3_data()) { + std::string command; + if ((err = srs_amf0_read_string(&stream, command)) != srs_success) { + return srs_error_wrap(err, "decode command name"); + } + + stream.skip(-1 * stream.pos()); + + if (command == SRS_CONSTS_RTMP_SET_DATAFRAME) { + *ppacket = packet = new SrsOnMetaDataPacket(); + return packet->decode(&stream); + } else if (command == SRS_CONSTS_RTMP_ON_METADATA) { + *ppacket = packet = new SrsOnMetaDataPacket(); + return packet->decode(&stream); + } + } + + return err; +} + +void SrsEdgeFlvUpstream::close() +{ + srs_freep(sdk_); + srs_freep(hr_); + srs_freep(reader_); + srs_freep(decoder_); +} + +void SrsEdgeFlvUpstream::selected(string& server, int& port) +{ + server = selected_ip; + port = selected_port; +} + +void SrsEdgeFlvUpstream::set_recv_timeout(srs_utime_t tm) +{ + sdk_->set_recv_timeout(tm); +} + +void SrsEdgeFlvUpstream::kbps_sample(const char* label, int64_t age) +{ + sdk_->kbps_sample(label, age); +} + SrsEdgeIngester::SrsEdgeIngester() { source = NULL; @@ -266,9 +445,14 @@ srs_error_t SrsEdgeIngester::do_cycle() if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "do cycle pull"); } - + srs_freep(upstream); - upstream = new SrsEdgeRtmpUpstream(redirect); + string edge_protocol = _srs_config->get_vhost_edge_protocol(req->vhost); + if (edge_protocol == "flv" || edge_protocol == "flvs") { + upstream = new SrsEdgeFlvUpstream(edge_protocol == "flv"? "http" : "https"); + } else { + upstream = new SrsEdgeRtmpUpstream(redirect); + } if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) { return srs_error_wrap(err, "on source id changed"); diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 4a673c926..80d9a1b31 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -45,6 +45,10 @@ class SrsLbRoundRobin; class SrsTcpClient; class SrsSimpleRtmpClient; class SrsPacket; +class SrsHttpClient; +class ISrsHttpMessage; +class SrsHttpFileReader; +class SrsFlvDecoder; // The state of edge, auto machine enum SrsEdgeState @@ -113,6 +117,33 @@ public: virtual void kbps_sample(const char* label, int64_t age); }; +class SrsEdgeFlvUpstream : public SrsEdgeUpstream +{ +private: + std::string schema_; + SrsHttpClient* sdk_; + ISrsHttpMessage* hr_; +private: + SrsHttpFileReader* reader_; + SrsFlvDecoder* decoder_; +private: + // Current selected server, the ip:port. + std::string selected_ip; + int selected_port; +public: + SrsEdgeFlvUpstream(std::string schema); + virtual ~SrsEdgeFlvUpstream(); +public: + virtual srs_error_t connect(SrsRequest* r, SrsLbRoundRobin* lb); + virtual srs_error_t recv_message(SrsCommonMessage** pmsg); + virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); + virtual void close(); +public: + virtual void selected(std::string& server, int& port); + virtual void set_recv_timeout(srs_utime_t tm); + virtual void kbps_sample(const char* label, int64_t age); +}; + // The edge used to ingest stream from origin. class SrsEdgeIngester : public ISrsCoroutineHandler { diff --git a/trunk/src/protocol/srs_service_http_client.hpp b/trunk/src/protocol/srs_service_http_client.hpp index 351735f22..23e052f79 100644 --- a/trunk/src/protocol/srs_service_http_client.hpp +++ b/trunk/src/protocol/srs_service_http_client.hpp @@ -117,7 +117,7 @@ public: // @param ppmsg output the http message to read the response. // @remark user must free the ppmsg if not NULL. virtual srs_error_t get(std::string path, std::string req, ISrsHttpMessage** ppmsg); -private: +public: virtual void set_recv_timeout(srs_utime_t tm); public: virtual void kbps_sample(const char* label, int64_t age);