1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Fix #2881: HTTP: Support merging api to server. v5.0.47

This commit is contained in:
winlin 2022-08-28 09:10:13 +08:00
parent 6508a082e9
commit 457738f6eb
34 changed files with 333 additions and 934 deletions

View file

@ -242,12 +242,6 @@ srs_error_t SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecod
return err;
}
srs_error_t SrsDynamicHttpConn::on_reload_http_stream_crossdomain()
{
bool v = _srs_config->get_http_stream_crossdomain();
return conn->set_crossdomain_enabled(v);
}
srs_error_t SrsDynamicHttpConn::on_start()
{
return srs_success;

View file

@ -76,9 +76,6 @@ public:
private:
virtual srs_error_t do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec);
// Extract APIs from SrsTcpConnection.
// Interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_http_stream_crossdomain();
// Interface ISrsHttpConnOwner.
public:
virtual srs_error_t on_start();

View file

@ -1472,28 +1472,6 @@ srs_error_t SrsConfig::reload_vhost(SrsConfDirective* old_root)
srs_trace("vhost %s reload publish success.", vhost.c_str());
}
// http_static, only one per vhost.
if (!srs_directive_equals(new_vhost->get("http_static"), old_vhost->get("http_static"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_vhost_http_updated()) != srs_success) {
return srs_error_wrap(err, "vhost %s notify subscribes http_static failed", vhost.c_str());
}
}
srs_trace("vhost %s reload http_static success.", vhost.c_str());
}
// http_remux, only one per vhost.
if (!srs_directive_equals(new_vhost->get("http_remux"), old_vhost->get("http_remux"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_vhost_http_remux_updated(vhost)) != srs_success) {
return srs_error_wrap(err, "vhost %s notify subscribes http_remux failed", vhost.c_str());
}
}
srs_trace("vhost %s reload http_remux success.", vhost.c_str());
}
// transcode, many per vhost.
if ((err = reload_transcode(new_vhost, old_vhost)) != srs_success) {
return srs_error_wrap(err, "reload transcode");
@ -1550,16 +1528,6 @@ srs_error_t SrsConfig::reload_conf(SrsConfig* conf)
return srs_error_wrap(err, "pithy print ms");;
}
}
// merge config: http_api
if ((err = reload_http_api(old_root)) != srs_success) {
return srs_error_wrap(err, "http api");;
}
// merge config: http_stream
if ((err = reload_http_stream(old_root)) != srs_success) {
return srs_error_wrap(err, "http steram");;
}
// Merge config: rtc_server
if ((err = reload_rtc_server(old_root)) != srs_success) {
@ -1576,152 +1544,6 @@ srs_error_t SrsConfig::reload_conf(SrsConfig* conf)
return err;
}
srs_error_t SrsConfig::reload_http_api(SrsConfDirective* old_root)
{
srs_error_t err = srs_success;
// merge config.
std::vector<ISrsReloadHandler*>::iterator it;
// state graph
// old_http_api new_http_api
// DISABLED => ENABLED
// ENABLED => DISABLED
// ENABLED => ENABLED (modified)
SrsConfDirective* new_http_api = root->get("http_api");
SrsConfDirective* old_http_api = old_root->get("http_api");
// DISABLED => ENABLED
if (!get_http_api_enabled(old_http_api) && get_http_api_enabled(new_http_api)) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_http_api_enabled()) != srs_success) {
return srs_error_wrap(err, "http api off=>on");
}
}
srs_trace("reload off=>on http_api success.");
return err;
}
// ENABLED => DISABLED
if (get_http_api_enabled(old_http_api) && !get_http_api_enabled(new_http_api)) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_http_api_disabled()) != srs_success) {
return srs_error_wrap(err, "http api on=>off");
}
}
srs_trace("reload http_api on=>off success.");
return err;
}
// ENABLED => ENABLED (modified)
if (get_http_api_enabled(old_http_api) && get_http_api_enabled(new_http_api)
&& !srs_directive_equals(old_http_api, new_http_api)
) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_http_api_enabled()) != srs_success) {
return srs_error_wrap(err, "http api enabled");
}
}
srs_trace("reload http api enabled success.");
if (!srs_directive_equals(old_http_api->get("crossdomain"), new_http_api->get("crossdomain"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_http_api_crossdomain()) != srs_success) {
return srs_error_wrap(err, "http api crossdomain");
}
}
}
srs_trace("reload http api crossdomain success.");
if (!srs_directive_equals(old_http_api->get("raw_api"), new_http_api->get("raw_api"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_http_api_raw_api()) != srs_success) {
return srs_error_wrap(err, "http api raw_api");
}
}
}
srs_trace("reload http api raw_api success.");
return err;
}
srs_trace("reload http_api success, nothing changed.");
return err;
}
srs_error_t SrsConfig::reload_http_stream(SrsConfDirective* old_root)
{
srs_error_t err = srs_success;
// merge config.
std::vector<ISrsReloadHandler*>::iterator it;
// state graph
// old_http_stream new_http_stream
// DISABLED => ENABLED
// ENABLED => DISABLED
// ENABLED => ENABLED (modified)
SrsConfDirective* new_http_stream = root->get("http_server");
SrsConfDirective* old_http_stream = old_root->get("http_server");
// DISABLED => ENABLED
if (!get_http_stream_enabled(old_http_stream) && get_http_stream_enabled(new_http_stream)) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_http_stream_enabled()) != srs_success) {
return srs_error_wrap(err, "http stream off=>on");
}
}
srs_trace("reload http stream off=>on success.");
return err;
}
// ENABLED => DISABLED
if (get_http_stream_enabled(old_http_stream) && !get_http_stream_enabled(new_http_stream)) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_http_stream_disabled()) != srs_success) {
return srs_error_wrap(err, "http stream on=>off");
}
}
srs_trace("reload http stream on=>off success.");
return err;
}
// ENABLED => ENABLED (modified)
if (get_http_stream_enabled(old_http_stream) && get_http_stream_enabled(new_http_stream)
&& !srs_directive_equals(old_http_stream, new_http_stream)
) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_http_stream_updated()) != srs_success) {
return srs_error_wrap(err, "http stream enabled");
}
}
srs_trace("reload http stream enabled success.");
if (!srs_directive_equals(old_http_stream->get("crossdomain"), new_http_stream->get("crossdomain"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((err = subscribe->on_reload_http_stream_crossdomain()) != srs_success) {
return srs_error_wrap(err, "http stream crossdomain");
}
}
}
srs_trace("reload http stream crossdomain success.");
return err;
}
srs_trace("reload http stream success, nothing changed.");
return err;
}
srs_error_t SrsConfig::reload_rtc_server(SrsConfDirective* old_root)
{
srs_error_t err = srs_success;
@ -2539,17 +2361,33 @@ srs_error_t SrsConfig::check_normal_config()
}
////////////////////////////////////////////////////////////////////////
// check http api
// Check HTTP API and server.
////////////////////////////////////////////////////////////////////////
if (get_http_api_listen().empty()) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "http_api.listen requires params");
}
////////////////////////////////////////////////////////////////////////
// check http stream
////////////////////////////////////////////////////////////////////////
if (get_http_stream_listen().empty()) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "http_stream.listen requires params");
if (true) {
string api = get_http_api_listen();
string server = get_http_stream_listen();
if (api.empty()) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "http_api.listen requires params");
}
if (server.empty()) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "http_server.listen requires params");
}
string apis = get_https_api_listen();
string servers = get_https_stream_listen();
if (api == server && apis != servers) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "for same http, https api(%s) != server(%s)", apis.c_str(), servers.c_str());
}
if (apis == servers && api != server) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "for same https, http api(%s) != server(%s)", api.c_str(), server.c_str());
}
if (get_https_api_enabled() && !get_http_api_enabled()) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "https api depends on http");
}
if (get_https_stream_enabled() && !get_http_stream_enabled()) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "https stream depends on http");
}
}
////////////////////////////////////////////////////////////////////////
@ -6917,7 +6755,21 @@ bool SrsConfig::get_https_api_enabled()
string SrsConfig::get_https_api_listen()
{
static string DEFAULT = "1990";
#ifdef SRS_UTEST
// We should not use static default, because we need to reset for different testcase.
string DEFAULT = "";
#else
static string DEFAULT = "";
#endif
// Follow the HTTPS server if config HTTP API as the same of HTTP server.
if (DEFAULT.empty()) {
if (get_http_api_listen() == get_http_stream_listen()) {
DEFAULT = get_https_stream_listen();
} else {
DEFAULT = "1990";
}
}
SrsConfDirective* conf = get_https_api();
if (!conf) {

View file

@ -322,11 +322,6 @@ protected:
// @remark, use protected for the utest to override with mock.
virtual srs_error_t reload_conf(SrsConfig* conf);
private:
// Reload the http_api section of config.
virtual srs_error_t reload_http_api(SrsConfDirective* old_root);
// Reload the http_stream section of config.
// TODO: FIXME: rename to http_server.
virtual srs_error_t reload_http_stream(SrsConfDirective* old_root);
// Reload the rtc_server section of config.
virtual srs_error_t reload_rtc_server(SrsConfDirective* old_root);
// Reload the transcode section of vhost of config.

View file

@ -1062,149 +1062,3 @@ srs_error_t SrsGoApiTcmalloc::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
}
#endif
SrsHttpApi::SrsHttpApi(bool https, ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port)
{
// Create a identify for this client.
_srs_context->set_id(_srs_context->generate_id());
manager = cm;
skt = new SrsTcpConnection(fd);
if (https) {
ssl = new SrsSslConnection(skt);
conn = new SrsHttpConn(this, ssl, m, cip, port);
} else {
ssl = NULL;
conn = new SrsHttpConn(this, skt, m, cip, port);
}
_srs_config->subscribe(this);
}
SrsHttpApi::~SrsHttpApi()
{
_srs_config->unsubscribe(this);
srs_freep(conn);
srs_freep(ssl);
srs_freep(skt);
}
srs_error_t SrsHttpApi::on_start()
{
srs_error_t err = srs_success;
if ((err = conn->set_jsonp(true)) != srs_success) {
return srs_error_wrap(err, "set jsonp");
}
if (ssl) {
srs_utime_t starttime = srs_update_system_time();
string crt_file = _srs_config->get_https_api_ssl_cert();
string key_file = _srs_config->get_https_api_ssl_key();
if ((err = ssl->handshake(key_file, crt_file)) != srs_success) {
return srs_error_wrap(err, "handshake");
}
int cost = srsu2msi(srs_update_system_time() - starttime);
srs_trace("https: api server done, use key %s and cert %s, cost=%dms",
key_file.c_str(), crt_file.c_str(), cost);
}
return err;
}
srs_error_t SrsHttpApi::on_http_message(ISrsHttpMessage* r, SrsHttpResponseWriter* w)
{
srs_error_t err = srs_success;
// After parsed the message, set the schema to https.
if (ssl) {
SrsHttpMessage* hm = dynamic_cast<SrsHttpMessage*>(r);
hm->set_https(true);
}
// TODO: For each API session, we use short-term HTTP connection.
//SrsHttpHeader* hdr = w->header();
//hdr->set("Connection", "Close");
return err;
}
srs_error_t SrsHttpApi::on_message_done(ISrsHttpMessage* r, SrsHttpResponseWriter* w)
{
srs_error_t err = srs_success;
// read all rest bytes in request body.
char buf[SRS_HTTP_READ_CACHE_BYTES];
ISrsHttpResponseReader* br = r->body_reader();
while (!br->eof()) {
if ((err = br->read(buf, SRS_HTTP_READ_CACHE_BYTES, NULL)) != srs_success) {
return srs_error_wrap(err, "read response");
}
}
return err;
}
srs_error_t SrsHttpApi::on_conn_done(srs_error_t r0)
{
// Because we use manager to manage this object,
// not the http connection object, so we must remove it here.
manager->remove(this);
// For HTTP-API timeout, we think it's done successfully,
// because there may be no request or response for HTTP-API.
if (srs_error_code(r0) == ERROR_SOCKET_TIMEOUT) {
srs_freep(r0);
return srs_success;
}
return r0;
}
std::string SrsHttpApi::desc()
{
if (ssl) {
return "HttpsConn";
}
return "HttpConn";
}
void SrsHttpApi::remark(int64_t* in, int64_t* out)
{
conn->remark(in, out);
}
srs_error_t SrsHttpApi::on_reload_http_api_crossdomain()
{
bool v = _srs_config->get_http_api_crossdomain();
return conn->set_crossdomain_enabled(v);
}
srs_error_t SrsHttpApi::start()
{
srs_error_t err = srs_success;
bool v = _srs_config->get_http_api_crossdomain();
if ((err = conn->set_crossdomain_enabled(v)) != srs_success) {
return srs_error_wrap(err, "set cors=%d", v);
}
if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}
return conn->start();
}
string SrsHttpApi::remote_ip()
{
return conn->remote_ip();
}
const SrsContextId& SrsHttpApi::get_id()
{
return conn->get_id();
}

View file

@ -216,50 +216,5 @@ public:
};
#endif
// Handle the HTTP API request.
class SrsHttpApi : public ISrsStartableConneciton, public ISrsHttpConnOwner
, public ISrsReloadHandler
{
private:
// The manager object to manage the connection.
ISrsResourceManager* manager;
SrsTcpConnection* skt;
SrsSslConnection* ssl;
SrsHttpConn* conn;
public:
SrsHttpApi(bool https, ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpApi();
// Interface ISrsHttpConnOwner.
public:
virtual srs_error_t on_start();
virtual srs_error_t on_http_message(ISrsHttpMessage* r, SrsHttpResponseWriter* w);
virtual srs_error_t on_message_done(ISrsHttpMessage* r, SrsHttpResponseWriter* w);
virtual srs_error_t on_conn_done(srs_error_t r0);
// Interface ISrsResource.
public:
virtual std::string desc();
// Interface ISrsKbpsDelta
public:
virtual void remark(int64_t* in, int64_t* out);
// Interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_http_api_crossdomain();
// Extract APIs from SrsTcpConnection.
// Interface ISrsStartable
public:
// Start the client green thread.
// when server get a client from listener,
// 1. server will create an concrete connection(for instance, RTMP connection),
// 2. then add connection to its connection manager,
// 3. start the client thread by invoke this start()
// when client cycle thread stop, invoke the on_thread_stop(), which will use server
// To remove the client by server->remove(this).
virtual srs_error_t start();
// Interface ISrsConnection.
public:
virtual std::string remote_ip();
virtual const SrsContextId& get_id();
};
#endif

View file

@ -287,7 +287,7 @@ void SrsHttpConn::expire()
trd->interrupt();
}
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port)
SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port)
{
// Create a identify for this client.
_srs_context->set_id(_srs_context->generate_id());
@ -306,7 +306,7 @@ SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(bool https, ISrsResourceManager
_srs_config->subscribe(this);
}
SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn()
SrsHttpxConn::~SrsHttpxConn()
{
_srs_config->unsubscribe(this);
@ -315,7 +315,7 @@ SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn()
srs_freep(skt);
}
srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq)
srs_error_t SrsHttpxConn::pop_message(ISrsHttpMessage** preq)
{
srs_error_t err = srs_success;
@ -330,13 +330,13 @@ srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq)
// We start a socket to read the stfd, which is writing by conn.
// It's ok, because conn never read it after processing the HTTP request.
// drop all request body.
char body[4096];
static char body[SRS_HTTP_READ_CACHE_BYTES];
while (true) {
if ((err = conn->pull()) != srs_success) {
return srs_error_wrap(err, "timeout");
}
if ((err = io->read(body, 4096, NULL)) != srs_success) {
if ((err = io->read(body, SRS_HTTP_READ_CACHE_BYTES, NULL)) != srs_success) {
// Because we use timeout to check trd state, so we should ignore any timeout.
if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) {
srs_freep(err);
@ -350,16 +350,16 @@ srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq)
return err;
}
srs_error_t SrsResponseOnlyHttpConn::on_reload_http_stream_crossdomain()
{
bool v = _srs_config->get_http_stream_crossdomain();
return conn->set_crossdomain_enabled(v);
}
srs_error_t SrsResponseOnlyHttpConn::on_start()
srs_error_t SrsHttpxConn::on_start()
{
srs_error_t err = srs_success;
// Enable JSONP for HTTP API.
if ((err = conn->set_jsonp(true)) != srs_success) {
return srs_error_wrap(err, "set jsonp");
}
// Do SSL handshake if HTTPS.
if (ssl) {
srs_utime_t starttime = srs_update_system_time();
string crt_file = _srs_config->get_https_stream_ssl_cert();
@ -376,7 +376,7 @@ srs_error_t SrsResponseOnlyHttpConn::on_start()
return err;
}
srs_error_t SrsResponseOnlyHttpConn::on_http_message(ISrsHttpMessage* r, SrsHttpResponseWriter* w)
srs_error_t SrsHttpxConn::on_http_message(ISrsHttpMessage* r, SrsHttpResponseWriter* w)
{
srs_error_t err = srs_success;
@ -385,32 +385,25 @@ srs_error_t SrsResponseOnlyHttpConn::on_http_message(ISrsHttpMessage* r, SrsHttp
SrsHttpMessage* hm = dynamic_cast<SrsHttpMessage*>(r);
hm->set_https(true);
}
ISrsHttpResponseReader* br = r->body_reader();
// when not specified the content length, ignore.
if (r->content_length() == -1) {
return err;
}
// For each session, we use short-term HTTP connection.
SrsHttpHeader* hdr = w->header();
hdr->set("Connection", "Close");
// Drop all request body.
// TODO: Should we set timeout for max reading?
char body[4096];
while (!br->eof()) {
if ((err = br->read(body, 4096, NULL)) != srs_success) {
return srs_error_wrap(err, "read response");
}
// Not support HTTP request with body.
if (r->content_length() > 0) {
return srs_error_new(ERROR_HTTP_WITH_BODY, "with %d body", r->content_length());
}
return err;
}
srs_error_t SrsResponseOnlyHttpConn::on_message_done(ISrsHttpMessage* r, SrsHttpResponseWriter* w)
srs_error_t SrsHttpxConn::on_message_done(ISrsHttpMessage* r, SrsHttpResponseWriter* w)
{
return srs_success;
}
srs_error_t SrsResponseOnlyHttpConn::on_conn_done(srs_error_t r0)
srs_error_t SrsHttpxConn::on_conn_done(srs_error_t r0)
{
// Update statistic when done.
SrsStatistic* stat = SrsStatistic::instance();
@ -421,38 +414,45 @@ srs_error_t SrsResponseOnlyHttpConn::on_conn_done(srs_error_t r0)
// not the http connection object, so we must remove it here.
manager->remove(this);
// For HTTP-API timeout, we think it's done successfully,
// because there may be no request or response for HTTP-API.
if (srs_error_code(r0) == ERROR_SOCKET_TIMEOUT) {
srs_freep(r0);
return srs_success;
}
return r0;
}
srs_error_t SrsResponseOnlyHttpConn::set_tcp_nodelay(bool v)
srs_error_t SrsHttpxConn::set_tcp_nodelay(bool v)
{
return skt->set_tcp_nodelay(v);
}
srs_error_t SrsResponseOnlyHttpConn::set_socket_buffer(srs_utime_t buffer_v)
srs_error_t SrsHttpxConn::set_socket_buffer(srs_utime_t buffer_v)
{
return skt->set_socket_buffer(buffer_v);
}
std::string SrsResponseOnlyHttpConn::desc()
std::string SrsHttpxConn::desc()
{
if (ssl) {
return "HttpsStream";
return "HttpsConn";
}
return "HttpStream";
return "HttpConn";
}
std::string SrsResponseOnlyHttpConn::remote_ip()
std::string SrsHttpxConn::remote_ip()
{
return conn->remote_ip();
}
const SrsContextId& SrsResponseOnlyHttpConn::get_id()
const SrsContextId& SrsHttpxConn::get_id()
{
return conn->get_id();
}
srs_error_t SrsResponseOnlyHttpConn::start()
srs_error_t SrsHttpxConn::start()
{
srs_error_t err = srs_success;
@ -468,7 +468,7 @@ srs_error_t SrsResponseOnlyHttpConn::start()
return conn->start();
}
void SrsResponseOnlyHttpConn::remark(int64_t* in, int64_t* out)
void SrsHttpxConn::remark(int64_t* in, int64_t* out)
{
conn->remark(in, out);
}
@ -506,11 +506,28 @@ srs_error_t SrsHttpServer::initialize()
return err;
}
srs_error_t SrsHttpServer::handle(std::string pattern, ISrsHttpHandler* handler)
{
return http_static->mux.handle(pattern, handler);
}
srs_error_t SrsHttpServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;
string path = r->path();
const char* p = path.data();
// For /api/ or /console/, try static only.
if (path.length() > 4 && p[0] == '/') {
bool is_api = memcmp(p, "/api/", 5) == 0;
bool is_console = path.length() > 8 && memcmp(p, "/console/", 9) == 0;
if (is_api || is_console) {
return http_static->mux.serve_http(w, r);
}
}
// try http stream first.
// Try http stream first, then http static if not found.
ISrsHttpHandler* h = NULL;
if ((err = http_stream->mux.find_handler(r, &h)) != srs_success) {
return srs_error_wrap(err, "find handler");
@ -518,7 +535,8 @@ srs_error_t SrsHttpServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
if (!h->is_not_found()) {
return http_stream->mux.serve_http(w, r);
}
// Use http static as default server.
return http_static->mux.serve_http(w, r);
}

View file

@ -127,7 +127,7 @@ public:
};
// Drop body of request, only process the response.
class SrsResponseOnlyHttpConn : public ISrsStartableConneciton, public ISrsHttpConnOwner
class SrsHttpxConn : public ISrsStartableConneciton, public ISrsHttpConnOwner
, public ISrsReloadHandler
{
private:
@ -137,8 +137,8 @@ private:
SrsSslConnection* ssl;
SrsHttpConn* conn;
public:
SrsResponseOnlyHttpConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsResponseOnlyHttpConn();
SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpxConn();
public:
// Directly read a HTTP request message.
// It's exported for HTTP stream, such as HTTP FLV, only need to write to client when
@ -146,9 +146,6 @@ public:
// @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
// @remark Should only used in HTTP-FLV streaming connection.
virtual srs_error_t pop_message(ISrsHttpMessage** preq);
// Interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_http_stream_crossdomain();
// Interface ISrsHttpConnOwner.
public:
virtual srs_error_t on_start();
@ -189,6 +186,9 @@ public:
public:
virtual srs_error_t initialize();
// Interface ISrsHttpServeMux
public:
virtual srs_error_t handle(std::string pattern, ISrsHttpHandler* handler);
// Interface ISrsHttpHandler
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
public:

View file

@ -506,9 +506,3 @@ srs_error_t SrsHttpStaticServer::on_reload_vhost_added(string vhost)
return err;
}
srs_error_t SrsHttpStaticServer::on_reload_vhost_http_updated()
{
// TODO: FIXME: implements it.
return srs_success;
}

View file

@ -76,7 +76,6 @@ private:
// Interface ISrsReloadHandler.
public:
virtual srs_error_t on_reload_vhost_added(std::string vhost);
virtual srs_error_t on_reload_vhost_http_updated();
};
#endif

View file

@ -613,7 +613,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
SrsFlvStreamEncoder* ffe = dynamic_cast<SrsFlvStreamEncoder*>(enc);
// Note that the handler of hc now is rohc.
SrsResponseOnlyHttpConn* rohc = dynamic_cast<SrsResponseOnlyHttpConn*>(hc->handler());
SrsHttpxConn* rohc = dynamic_cast<SrsHttpxConn*>(hc->handler());
srs_assert(rohc);
// Set the socket options for transport.
@ -964,59 +964,6 @@ void SrsHttpStreamServer::http_unmount(SrsLiveSource* s, SrsRequest* r)
entry->stream->entry->enabled = false;
}
srs_error_t SrsHttpStreamServer::on_reload_vhost_added(string vhost)
{
srs_error_t err = srs_success;
if ((err = on_reload_vhost_http_remux_updated(vhost)) != srs_success) {
return srs_error_wrap(err, "reload vhost added");
}
return err;
}
srs_error_t SrsHttpStreamServer::on_reload_vhost_http_remux_updated(string vhost)
{
srs_error_t err = srs_success;
// Create new vhost.
if (tflvs.find(vhost) == tflvs.end()) {
if ((err = initialize_flv_entry(vhost)) != srs_success) {
return srs_error_wrap(err, "init flv entry");
}
// http mount need SrsRequest and SrsLiveSource param, only create a mapping template entry
// and do mount automatically on playing http flv if this stream is a new http_remux stream.
return err;
}
// Update all streams for exists vhost.
// TODO: FIMXE: If url changed, needs more things to deal with.
std::map<std::string, SrsLiveEntry*>::iterator it;
for (it = sflvs.begin(); it != sflvs.end(); ++it) {
SrsLiveEntry* entry = it->second;
if (!entry || !entry->req || !entry->source) {
continue;
}
SrsRequest* req = entry->req;
if (!req || req->vhost != vhost) {
continue;
}
SrsLiveSource* source = entry->source;
if (_srs_config->get_vhost_http_remux_enabled(vhost)) {
http_mount(source, req);
} else {
http_unmount(source, req);
}
}
srs_trace("vhost %s http_remux reload success", vhost.c_str());
return err;
}
srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
{
srs_error_t err = srs_success;

View file

@ -234,10 +234,6 @@ public:
// HTTP flv/ts/mp3/aac stream
virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r);
virtual void http_unmount(SrsLiveSource* s, SrsRequest* r);
// Interface ISrsReloadHandler.
public:
virtual srs_error_t on_reload_vhost_added(std::string vhost);
virtual srs_error_t on_reload_vhost_http_remux_updated(std::string vhost);
// Interface ISrsHttpMatchHijacker
public:
virtual srs_error_t hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph);

View file

@ -545,7 +545,7 @@ void SrsPublishRecvThread::set_socket_buffer(srs_utime_t sleep_v)
rtmp->set_recv_buffer(nb_rbuf);
}
SrsHttpRecvThread::SrsHttpRecvThread(SrsResponseOnlyHttpConn* c)
SrsHttpRecvThread::SrsHttpRecvThread(SrsHttpxConn* c)
{
conn = c;
trd = new SrsSTCoroutine("http-receive", this, _srs_context->get_id());

View file

@ -24,7 +24,7 @@ class SrsLiveSource;
class SrsRequest;
class SrsLiveConsumer;
class SrsHttpConn;
class SrsResponseOnlyHttpConn;
class SrsHttpxConn;
// The message consumer which consume a message.
class ISrsMessageConsumer
@ -196,10 +196,10 @@ private:
class SrsHttpRecvThread : public ISrsCoroutineHandler
{
private:
SrsResponseOnlyHttpConn* conn;
SrsHttpxConn* conn;
SrsCoroutine* trd;
public:
SrsHttpRecvThread(SrsResponseOnlyHttpConn* c);
SrsHttpRecvThread(SrsHttpxConn* c);
virtual ~SrsHttpRecvThread();
public:
virtual srs_error_t start();

View file

@ -33,71 +33,16 @@ srs_error_t ISrsReloadHandler::on_reload_pithy_print()
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_http_api_enabled()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_http_api_disabled()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_https_api_enabled()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_https_api_disabled()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_http_api_crossdomain()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_http_api_raw_api()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_http_stream_enabled()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_http_stream_disabled()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_http_stream_updated()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_http_stream_crossdomain()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_rtc_server()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_vhost_http_updated()
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_vhost_http_remux_updated(string vhost)
{
return srs_success;
}
srs_error_t ISrsReloadHandler::on_reload_vhost_added(string /*vhost*/)
{
return srs_success;

View file

@ -25,21 +25,9 @@ public:
virtual srs_error_t on_reload_max_conns();
virtual srs_error_t on_reload_listen();
virtual srs_error_t on_reload_pithy_print();
virtual srs_error_t on_reload_http_api_enabled();
virtual srs_error_t on_reload_http_api_disabled();
virtual srs_error_t on_reload_https_api_enabled();
virtual srs_error_t on_reload_https_api_disabled();
virtual srs_error_t on_reload_http_api_crossdomain();
virtual srs_error_t on_reload_http_api_raw_api();
virtual srs_error_t on_reload_http_stream_enabled();
virtual srs_error_t on_reload_http_stream_disabled();
virtual srs_error_t on_reload_http_stream_updated();
virtual srs_error_t on_reload_http_stream_crossdomain();
virtual srs_error_t on_reload_rtc_server();
public:
// TODO: FIXME: should rename to http_static
virtual srs_error_t on_reload_vhost_http_updated();
virtual srs_error_t on_reload_vhost_http_remux_updated(std::string vhost);
virtual srs_error_t on_reload_vhost_added(std::string vhost);
virtual srs_error_t on_reload_vhost_removed(std::string vhost);
virtual srs_error_t on_reload_vhost_play(std::string vhost);

View file

@ -485,7 +485,7 @@ srs_error_t SrsRtcServer::listen_api()
srs_error_t err = srs_success;
// TODO: FIXME: Fetch api from hybrid manager, not from SRS.
SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();
ISrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();
if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) {
return srs_error_wrap(err, "handle play");

View file

@ -529,6 +529,8 @@ SrsServer::SrsServer()
// new these objects in initialize instead.
http_api_mux = new SrsHttpServeMux();
http_server = new SrsHttpServer(this);
reuse_api_over_server_ = false;
http_heartbeat = new SrsHttpHeartbeat();
ingester = new SrsIngester();
trd_ = new SrsSTCoroutine("srs", this, _srs_context->get_id());
@ -549,9 +551,13 @@ void SrsServer::destroy()
srs_freep(timer_);
dispose();
srs_freep(http_api_mux);
// If api reuse the same port of server, they're the same object.
if (!reuse_api_over_server_) {
srs_freep(http_api_mux);
}
srs_freep(http_server);
srs_freep(http_heartbeat);
srs_freep(ingester);
@ -643,11 +649,29 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch)
if(handler && (err = handler->initialize()) != srs_success){
return srs_error_wrap(err, "handler initialize");
}
if ((err = http_api_mux->initialize()) != srs_success) {
return srs_error_wrap(err, "http api initialize");
// If enabled and the listen is the same value, reuse port.
if (_srs_config->get_http_stream_enabled() && _srs_config->get_http_api_enabled()
&& _srs_config->get_http_api_listen() == _srs_config->get_http_stream_listen()
&& _srs_config->get_https_api_listen() == _srs_config->get_https_stream_listen()
) {
srs_trace("API reuse listen to https server at %s", _srs_config->get_https_stream_listen().c_str());
reuse_api_over_server_ = true;
}
// If reuse port, use the same object as server.
if (!reuse_api_over_server_) {
SrsHttpServeMux *api = dynamic_cast<SrsHttpServeMux*>(http_api_mux);
srs_assert(api);
if ((err = api->initialize()) != srs_success) {
return srs_error_wrap(err, "http api initialize");
}
} else {
srs_freep(http_api_mux);
http_api_mux = http_server;
}
if ((err = http_server->initialize()) != srs_success) {
return srs_error_wrap(err, "http server initialize");
}
@ -736,19 +760,23 @@ srs_error_t SrsServer::register_signal()
srs_error_t SrsServer::http_handle()
{
srs_error_t err = srs_success;
if ((err = http_api_mux->handle("/", new SrsGoApiRoot())) != srs_success) {
return srs_error_wrap(err, "handle /");
// Ignore / and /api/v1/versions for already handled by HTTP server.
if (!reuse_api_over_server_) {
if ((err = http_api_mux->handle("/", new SrsGoApiRoot())) != srs_success) {
return srs_error_wrap(err, "handle /");
}
if ((err = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != srs_success) {
return srs_error_wrap(err, "handle versions");
}
}
if ((err = http_api_mux->handle("/api/", new SrsGoApiApi())) != srs_success) {
return srs_error_wrap(err, "handle api");
}
if ((err = http_api_mux->handle("/api/v1/", new SrsGoApiV1())) != srs_success) {
return srs_error_wrap(err, "handle v1");
}
if ((err = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != srs_success) {
return srs_error_wrap(err, "handle versions");
}
if ((err = http_api_mux->handle("/api/v1/summaries", new SrsGoApiSummaries())) != srs_success) {
return srs_error_wrap(err, "handle summaries");
}
@ -1138,23 +1166,34 @@ srs_error_t SrsServer::listen_rtmp()
srs_error_t SrsServer::listen_http_api()
{
srs_error_t err = srs_success;
close_listeners(SrsListenerHttpApi);
if (_srs_config->get_http_api_enabled()) {
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpApi);
listeners.push_back(listener);
std::string ep = _srs_config->get_http_api_listen();
std::string ip;
int port;
srs_parse_endpoint(ep, ip, port);
if ((err = listener->listen(ip, port)) != srs_success) {
return srs_error_wrap(err, "http api listen %s:%d", ip.c_str(), port);
}
// Ignore if not enabled.
if (!_srs_config->get_http_api_enabled()) {
return err;
}
// Ignore if reuse same port to http server.
if (reuse_api_over_server_) {
srs_trace("HTTP-API: Reuse listen to http server %s", _srs_config->get_http_stream_listen().c_str());
return err;
}
// Listen at a dedicated HTTP API endpoint.
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpApi);
listeners.push_back(listener);
std::string ep = _srs_config->get_http_api_listen();
std::string ip;
int port;
srs_parse_endpoint(ep, ip, port);
if ((err = listener->listen(ip, port)) != srs_success) {
return srs_error_wrap(err, "http api listen %s:%d", ip.c_str(), port);
}
return err;
}
@ -1163,19 +1202,30 @@ srs_error_t SrsServer::listen_https_api()
srs_error_t err = srs_success;
close_listeners(SrsListenerHttpsApi);
if (_srs_config->get_https_api_enabled()) {
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpsApi);
listeners.push_back(listener);
std::string ep = _srs_config->get_https_api_listen();
// Ignore if not enabled.
if (!_srs_config->get_https_api_enabled()) {
return err;
}
std::string ip;
int port;
srs_parse_endpoint(ep, ip, port);
// Ignore if reuse same port to https server.
if (reuse_api_over_server_) {
srs_trace("HTTPS-API: Reuse listen to https server %s", _srs_config->get_https_stream_listen().c_str());
return err;
}
if ((err = listener->listen(ip, port)) != srs_success) {
return srs_error_wrap(err, "https api listen %s:%d", ip.c_str(), port);
}
// Listen at a dedicated HTTPS API endpoint.
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpsApi);
listeners.push_back(listener);
std::string ep = _srs_config->get_https_api_listen();
std::string ip;
int port;
srs_parse_endpoint(ep, ip, port);
if ((err = listener->listen(ip, port)) != srs_success) {
return srs_error_wrap(err, "https api listen %s:%d", ip.c_str(), port);
}
return err;
@ -1329,7 +1379,7 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
return err;
}
SrsHttpServeMux* SrsServer::api_server()
ISrsHttpServeMux* SrsServer::api_server()
{
return http_api_mux;
}
@ -1379,13 +1429,13 @@ srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, IS
if (type == SrsListenerRtmpStream) {
*pr = new SrsRtmpConn(this, stfd, ip, port);
} else if (type == SrsListenerHttpApi) {
*pr = new SrsHttpApi(false, this, stfd, http_api_mux, ip, port);
*pr = new SrsHttpxConn(false, this, stfd, http_api_mux, ip, port);
} else if (type == SrsListenerHttpsApi) {
*pr = new SrsHttpApi(true, this, stfd, http_api_mux, ip, port);
*pr = new SrsHttpxConn(true, this, stfd, http_api_mux, ip, port);
} else if (type == SrsListenerHttpStream) {
*pr = new SrsResponseOnlyHttpConn(false, this, stfd, http_server, ip, port);
*pr = new SrsHttpxConn(false, this, stfd, http_server, ip, port);
} else if (type == SrsListenerHttpsStream) {
*pr = new SrsResponseOnlyHttpConn(true, this, stfd, http_server, ip, port);
*pr = new SrsHttpxConn(true, this, stfd, http_server, ip, port);
} else {
srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
srs_close_stfd(stfd);
@ -1412,99 +1462,10 @@ srs_error_t SrsServer::on_reload_listen()
return err;
}
srs_error_t SrsServer::on_reload_vhost_added(std::string vhost)
{
srs_error_t err = srs_success;
if (!_srs_config->get_vhost_http_enabled(vhost)) {
return err;
}
// TODO: FIXME: should handle the event in SrsHttpStaticServer
if ((err = on_reload_vhost_http_updated()) != srs_success) {
return srs_error_wrap(err, "reload vhost added");
}
return err;
}
srs_error_t SrsServer::on_reload_vhost_removed(std::string /*vhost*/)
{
srs_error_t err = srs_success;
// TODO: FIXME: should handle the event in SrsHttpStaticServer
if ((err = on_reload_vhost_http_updated()) != srs_success) {
return srs_error_wrap(err, "reload vhost removed");
}
return err;
}
srs_error_t SrsServer::on_reload_http_api_enabled()
{
srs_error_t err = srs_success;
if ((err = listen_http_api()) != srs_success) {
return srs_error_wrap(err, "reload http_api");
}
if ((err = listen_https_api()) != srs_success) {
return srs_error_wrap(err, "reload https_api");
}
return err;
}
srs_error_t SrsServer::on_reload_http_api_disabled()
{
close_listeners(SrsListenerHttpApi);
close_listeners(SrsListenerHttpsApi);
return srs_success;
}
srs_error_t SrsServer::on_reload_http_stream_enabled()
{
srs_error_t err = srs_success;
if ((err = listen_http_stream()) != srs_success) {
return srs_error_wrap(err, "reload http_stream enabled");
}
if ((err = listen_https_stream()) != srs_success) {
return srs_error_wrap(err, "reload https_stream enabled");
}
return err;
}
srs_error_t SrsServer::on_reload_http_stream_disabled()
{
close_listeners(SrsListenerHttpStream);
close_listeners(SrsListenerHttpsStream);
return srs_success;
}
// TODO: FIXME: rename to http_remux
srs_error_t SrsServer::on_reload_http_stream_updated()
{
srs_error_t err = srs_success;
if ((err = on_reload_http_stream_enabled()) != srs_success) {
return srs_error_wrap(err, "reload http_stream updated");
}
// TODO: FIXME: should handle the event in SrsHttpStaticServer
if ((err = on_reload_vhost_http_updated()) != srs_success) {
return srs_error_wrap(err, "reload http_stream updated");
}
return err;
}
srs_error_t SrsServer::on_publish(SrsLiveSource* s, SrsRequest* r)
{
srs_error_t err = srs_success;
if ((err = http_server->http_mount(s, r)) != srs_success) {
return srs_error_wrap(err, "http mount");
}

View file

@ -23,7 +23,7 @@
#include <srs_app_hybrid.hpp>
class SrsServer;
class SrsHttpServeMux;
class ISrsHttpServeMux;
class SrsHttpServer;
class SrsIngester;
class SrsHttpHeartbeat;
@ -199,8 +199,11 @@ class SrsServer : public ISrsReloadHandler, public ISrsLiveSourceHandler
{
private:
// TODO: FIXME: Extract an HttpApiServer.
SrsHttpServeMux* http_api_mux;
ISrsHttpServeMux* http_api_mux;
SrsHttpServer* http_server;
// If reuse, HTTP API use the same port of HTTP server.
bool reuse_api_over_server_;
private:
SrsHttpHeartbeat* http_heartbeat;
SrsIngester* ingester;
SrsResourceManager* conn_manager;
@ -306,7 +309,7 @@ public:
// @param stfd, the client fd in st boxed, the underlayer fd.
virtual srs_error_t accept_client(SrsListenerType type, srs_netfd_t stfd);
// TODO: FIXME: Fetch from hybrid server manager.
virtual SrsHttpServeMux* api_server();
virtual ISrsHttpServeMux* api_server();
private:
virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsStartableConneciton** pr);
// Interface ISrsResourceManager
@ -318,13 +321,6 @@ public:
// Interface ISrsReloadHandler.
public:
virtual srs_error_t on_reload_listen();
virtual srs_error_t on_reload_vhost_added(std::string vhost);
virtual srs_error_t on_reload_vhost_removed(std::string vhost);
virtual srs_error_t on_reload_http_api_enabled();
virtual srs_error_t on_reload_http_api_disabled();
virtual srs_error_t on_reload_http_stream_enabled();
virtual srs_error_t on_reload_http_stream_disabled();
virtual srs_error_t on_reload_http_stream_updated();
// Interface ISrsLiveSourceHandler
public:
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r);

View file

@ -316,7 +316,7 @@ srs_error_t SrsSrtServerAdapter::initialize()
{
srs_error_t err = srs_success;
if ((err = srs_srt_log_initialie()) != srs_success) {
if ((err = srs_srt_log_initialize()) != srs_success) {
return srs_error_wrap(err, "srt log initialize");
}