1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

For #1657: Refine connection arch, remove hierachy

This commit is contained in:
winlin 2020-11-05 15:08:36 +08:00
parent 2a14dc0ebd
commit 24125b9770
7 changed files with 213 additions and 52 deletions

View file

@ -85,7 +85,7 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd)); srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd));
} }
SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip, port); ISrsStartableConneciton* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip, port);
conns.push_back(conn); conns.push_back(conn);
if ((err = conn->start()) != srs_success) { if ((err = conn->start()) != srs_success) {
@ -97,14 +97,14 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
void SrsAppCasterFlv::remove(ISrsResource* c) void SrsAppCasterFlv::remove(ISrsResource* c)
{ {
SrsHttpConn* conn = dynamic_cast<SrsHttpConn*>(c); ISrsStartableConneciton* conn = dynamic_cast<ISrsStartableConneciton*>(c);
std::vector<SrsHttpConn*>::iterator it; std::vector<ISrsStartableConneciton*>::iterator it;
if ((it = std::find(conns.begin(), conns.end(), conn)) != conns.end()) { if ((it = std::find(conns.begin(), conns.end(), conn)) != conns.end()) {
conns.erase(it); conns.erase(it);
} }
// fixbug: SrsHttpConn for CasterFlv is not freed, which could cause memory leak // fixbug: ISrsStartableConneciton for CasterFlv is not freed, which could cause memory leak
// so, free conn which is not managed by SrsServer->conns; // so, free conn which is not managed by SrsServer->conns;
// @see: https://github.com/ossrs/srs/issues/826 // @see: https://github.com/ossrs/srs/issues/826
manager->remove(c); manager->remove(c);
@ -141,23 +141,23 @@ srs_error_t SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
return err; return err;
} }
SrsDynamicHttpConn::SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port) SrsDynamicHttpConn::SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int cport)
{ {
manager = cm;
sdk = NULL; sdk = NULL;
pprint = SrsPithyPrint::create_caster(); pprint = SrsPithyPrint::create_caster();
conn = new SrsHttpConn(this, fd, m, cip, cport);
ip = cip;
port = cport;
} }
SrsDynamicHttpConn::~SrsDynamicHttpConn() SrsDynamicHttpConn::~SrsDynamicHttpConn()
{ {
srs_freep(conn);
srs_freep(sdk); srs_freep(sdk);
srs_freep(pprint); srs_freep(pprint);
} }
srs_error_t SrsDynamicHttpConn::on_got_http_message(ISrsHttpMessage* msg)
{
return srs_success;
}
srs_error_t SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o) srs_error_t SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -249,6 +249,43 @@ srs_error_t SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecod
return err; return err;
} }
srs_error_t SrsDynamicHttpConn::on_http_message(ISrsHttpMessage* msg)
{
return srs_success;
}
void SrsDynamicHttpConn::on_conn_done()
{
// Because we use manager to manage this object,
// not the http connection object, so we must remove it here.
manager->remove(this);
}
std::string SrsDynamicHttpConn::desc()
{
return "DHttpConn";
}
std::string SrsDynamicHttpConn::remote_ip()
{
return conn->remote_ip();
}
const SrsContextId& SrsDynamicHttpConn::get_id()
{
return conn->get_id();
}
srs_error_t SrsDynamicHttpConn::start()
{
return conn->start();
}
void SrsDynamicHttpConn::remark(int64_t* in, int64_t* out)
{
conn->remark(in, out);
}
SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h) SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h)
{ {
http = h; http = h;

View file

@ -53,7 +53,7 @@ class SrsAppCasterFlv : virtual public ISrsTcpHandler
private: private:
std::string output; std::string output;
SrsHttpServeMux* http_mux; SrsHttpServeMux* http_mux;
std::vector<SrsHttpConn*> conns; std::vector<ISrsStartableConneciton*> conns;
SrsResourceManager* manager; SrsResourceManager* manager;
public: public:
SrsAppCasterFlv(SrsConfDirective* c); SrsAppCasterFlv(SrsConfDirective* c);
@ -72,21 +72,44 @@ public:
}; };
// The dynamic http connection, never drop the body. // The dynamic http connection, never drop the body.
class SrsDynamicHttpConn : public SrsHttpConn class SrsDynamicHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsHttpMessageHandler
{ {
private: private:
// The manager object to manage the connection.
ISrsResourceManager* manager;
std::string output; std::string output;
SrsPithyPrint* pprint; SrsPithyPrint* pprint;
SrsSimpleRtmpClient* sdk; SrsSimpleRtmpClient* sdk;
SrsHttpConn* conn;
private:
// The ip and port of client.
std::string ip;
int port;
public: public:
SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port); SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsDynamicHttpConn(); virtual ~SrsDynamicHttpConn();
public:
virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg);
public: public:
virtual srs_error_t proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o); virtual srs_error_t proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o);
private: private:
virtual srs_error_t do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec); virtual srs_error_t do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec);
// Extract APIs from SrsTcpConnection.
// Interface ISrsHttpMessageHandler.
public:
virtual srs_error_t on_http_message(ISrsHttpMessage* msg);
virtual void on_conn_done();
// Interface ISrsResource.
public:
virtual std::string desc();
// Interface ISrsConnection.
public:
virtual std::string remote_ip();
virtual const SrsContextId& get_id();
// Interface ISrsStartable
public:
virtual srs_error_t start();
// Interface ISrsKbpsDelta
public:
virtual void remark(int64_t* in, int64_t* out);
}; };
// The http wrapper for file reader, to read http post stream like a file. // The http wrapper for file reader, to read http post stream like a file.

View file

@ -1840,6 +1840,7 @@ srs_error_t SrsHttpApi::cycle()
srs_error_t err = do_cycle(); srs_error_t err = do_cycle();
// Notify manager to remove it. // Notify manager to remove it.
// Note that we create this object, so we use manager to remove it.
manager->remove(this); manager->remove(this);
// success. // success.

View file

@ -59,14 +59,22 @@ using namespace std;
#include <srs_app_utility.hpp> #include <srs_app_utility.hpp>
#include <srs_app_st.hpp> #include <srs_app_st.hpp>
SrsHttpConn::SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int cport) ISrsHttpMessageHandler::ISrsHttpMessageHandler()
{
}
ISrsHttpMessageHandler::~ISrsHttpMessageHandler()
{
}
SrsHttpConn::SrsHttpConn(ISrsHttpMessageHandler* handler, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int cport)
{ {
parser = new SrsHttpParser(); parser = new SrsHttpParser();
cors = new SrsHttpCorsMux(); cors = new SrsHttpCorsMux();
http_mux = m; http_mux = m;
handler_ = handler;
skt = new SrsTcpConnection(fd); skt = new SrsTcpConnection(fd);
manager = cm;
ip = cip; ip = cip;
port = cport; port = cport;
create_time = srsu2ms(srs_get_system_time()); create_time = srsu2ms(srs_get_system_time());
@ -150,7 +158,7 @@ srs_error_t SrsHttpConn::do_cycle()
last_req = hreq->to_request(hreq->host()); last_req = hreq->to_request(hreq->host());
// may should discard the body. // may should discard the body.
if ((err = on_got_http_message(req)) != srs_success) { if ((err = handler_->on_http_message(req)) != srs_success) {
break; break;
} }
@ -176,6 +184,16 @@ srs_error_t SrsHttpConn::do_cycle()
return err; return err;
} }
ISrsHttpMessageHandler* SrsHttpConn::handler()
{
return handler_;
}
srs_error_t SrsHttpConn::pull()
{
return trd->pull();
}
srs_error_t SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) srs_error_t SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -239,8 +257,8 @@ srs_error_t SrsHttpConn::cycle()
{ {
srs_error_t err = do_cycle(); srs_error_t err = do_cycle();
// Notify manager to remove it. // Notify handler to handle it.
manager->remove(this); handler_->on_conn_done();
// success. // success.
if (err == srs_success) { if (err == srs_success) {
@ -284,29 +302,41 @@ void SrsHttpConn::expire()
trd->interrupt(); trd->interrupt();
} }
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port) SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port)
{ {
manager = cm;
conn = new SrsHttpConn(this, fd, m, cip, port);
stfd = fd;
} }
SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn() SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn()
{ {
srs_freep(conn);
} }
srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq) srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
SrsStSocket skt;
// We start a socket to read the stfd, which is writing by conn.
// It's ok, because conn never read it after processing the HTTP request.
if ((err = skt.initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "init socket");
}
// Check user interrupt by interval. // Check user interrupt by interval.
skt->set_recv_timeout(3 * SRS_UTIME_SECONDS); skt.set_recv_timeout(3 * SRS_UTIME_SECONDS);
// drop all request body. // drop all request body.
char body[4096]; char body[4096];
while (true) { while (true) {
if ((err = trd->pull()) != srs_success) { if ((err = conn->pull()) != srs_success) {
return srs_error_wrap(err, "timeout"); return srs_error_wrap(err, "timeout");
} }
if ((err = skt->read(body, 4096, NULL)) != srs_success) { if ((err = skt.read(body, 4096, NULL)) != srs_success) {
// Because we use timeout to check trd state, so we should ignore any timeout. // Because we use timeout to check trd state, so we should ignore any timeout.
if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) { if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) {
srs_freep(err); srs_freep(err);
@ -320,7 +350,7 @@ srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq)
return err; return err;
} }
srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg) srs_error_t SrsResponseOnlyHttpConn::on_http_message(ISrsHttpMessage* msg)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -343,9 +373,46 @@ srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg)
return err; return err;
} }
void SrsResponseOnlyHttpConn::expire() void SrsResponseOnlyHttpConn::on_conn_done()
{ {
SrsHttpConn::expire(); // Because we use manager to manage this object,
// not the http connection object, so we must remove it here.
manager->remove(this);
}
srs_error_t SrsResponseOnlyHttpConn::set_tcp_nodelay(bool v)
{
return conn->set_tcp_nodelay(v);
}
srs_error_t SrsResponseOnlyHttpConn::set_socket_buffer(srs_utime_t buffer_v)
{
return conn->set_socket_buffer(buffer_v);
}
std::string SrsResponseOnlyHttpConn::desc()
{
return "ROHttpConn";
}
std::string SrsResponseOnlyHttpConn::remote_ip()
{
return conn->remote_ip();
}
const SrsContextId& SrsResponseOnlyHttpConn::get_id()
{
return conn->get_id();
}
srs_error_t SrsResponseOnlyHttpConn::start()
{
return conn->start();
}
void SrsResponseOnlyHttpConn::remark(int64_t* in, int64_t* out)
{
conn->remark(in, out);
} }
SrsHttpServer::SrsHttpServer(SrsServer* svr) SrsHttpServer::SrsHttpServer(SrsServer* svr)

View file

@ -54,6 +54,21 @@ class SrsHttpMessage;
class SrsHttpStreamServer; class SrsHttpStreamServer;
class SrsHttpStaticServer; class SrsHttpStaticServer;
// The handler for HTTP message.
class ISrsHttpMessageHandler
{
public:
ISrsHttpMessageHandler();
virtual ~ISrsHttpMessageHandler();
public:
// Handle the HTTP message msg, which may be parsed partially.
// For the static service or api, discard any body.
// For the stream caster, for instance, http flv streaming, may discard the flv header or not.
virtual srs_error_t on_http_message(ISrsHttpMessage* msg) = 0;
// When connection is destroy, should use manager to dispose it.
virtual void on_conn_done() = 0;
};
// The http connection which request the static or stream content. // The http connection which request the static or stream content.
class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsReloadHandler class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsReloadHandler
, virtual public ISrsCoroutineHandler, virtual public ISrsExpire , virtual public ISrsCoroutineHandler, virtual public ISrsExpire
@ -62,6 +77,7 @@ protected:
SrsHttpParser* parser; SrsHttpParser* parser;
ISrsHttpServeMux* http_mux; ISrsHttpServeMux* http_mux;
SrsHttpCorsMux* cors; SrsHttpCorsMux* cors;
ISrsHttpMessageHandler* handler_;
protected: protected:
SrsTcpConnection* skt; SrsTcpConnection* skt;
// Each connection start a green thread, // Each connection start a green thread,
@ -71,8 +87,6 @@ protected:
std::string ip; std::string ip;
int port; int port;
private: private:
// The manager object to manage the connection.
ISrsResourceManager* manager;
// The connection total kbps. // The connection total kbps.
// not only the rtmp or http connection, all type of connection are // not only the rtmp or http connection, all type of connection are
// need to statistic the kbps of io. // need to statistic the kbps of io.
@ -83,7 +97,7 @@ private:
// for current connection to log self create time and calculate the living time. // for current connection to log self create time and calculate the living time.
int64_t create_time; int64_t create_time;
public: public:
SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); SrsHttpConn(ISrsHttpMessageHandler* handler, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpConn(); virtual ~SrsHttpConn();
// Interface ISrsResource. // Interface ISrsResource.
public: public:
@ -91,13 +105,13 @@ public:
// Interface ISrsKbpsDelta // Interface ISrsKbpsDelta
public: public:
virtual void remark(int64_t* in, int64_t* out); virtual void remark(int64_t* in, int64_t* out);
protected: private:
virtual srs_error_t do_cycle(); virtual srs_error_t do_cycle();
protected: public:
// When got http message, // Get the HTTP message handler.
// for the static service or api, discard any body. virtual ISrsHttpMessageHandler* handler();
// for the stream caster, for instance, http flv streaming, may discard the flv header or not. // Whether the connection coroutine is error or terminated.
virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg) = 0; virtual srs_error_t pull();
private: private:
virtual srs_error_t process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); virtual srs_error_t process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
// When the connection disconnect, call this method. // When the connection disconnect, call this method.
@ -115,19 +129,9 @@ public:
virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v); virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v);
// Interface ISrsStartable // Interface ISrsStartable
public: public:
// Start the client green thread.
// when server get a client from listener,
// 1. server will create an concrete connection(for instance, RTMP connection),
// 2. then add connection to its connection manager,
// 3. start the client thread by invoke this start()
// when client cycle thread stop, invoke the on_thread_stop(), which will use server
// To remove the client by server->remove(this).
virtual srs_error_t start(); virtual srs_error_t start();
// Interface ISrsOneCycleThreadHandler // Interface ISrsOneCycleThreadHandler
public: public:
// The thread cycle function,
// when serve connection completed, terminate the loop which will terminate the thread,
// thread will invoke the on_thread_stop() when it terminated.
virtual srs_error_t cycle(); virtual srs_error_t cycle();
// Interface ISrsConnection. // Interface ISrsConnection.
public: public:
@ -139,8 +143,13 @@ public:
}; };
// Drop body of request, only process the response. // Drop body of request, only process the response.
class SrsResponseOnlyHttpConn : public SrsHttpConn class SrsResponseOnlyHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsHttpMessageHandler
{ {
private:
// The manager object to manage the connection.
ISrsResourceManager* manager;
SrsHttpConn* conn;
srs_netfd_t stfd;
public: public:
SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsResponseOnlyHttpConn(); virtual ~SrsResponseOnlyHttpConn();
@ -151,11 +160,29 @@ public:
// @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427 // @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
// @remark Should only used in HTTP-FLV streaming connection. // @remark Should only used in HTTP-FLV streaming connection.
virtual srs_error_t pop_message(ISrsHttpMessage** preq); virtual srs_error_t pop_message(ISrsHttpMessage** preq);
// Interface ISrsHttpMessageHandler.
public: public:
virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg); virtual srs_error_t on_http_message(ISrsHttpMessage* msg);
virtual void on_conn_done();
// Extract APIs from SrsTcpConnection.
public: public:
// Set connection to expired. // Set socket option TCP_NODELAY.
virtual void expire(); virtual srs_error_t set_tcp_nodelay(bool v);
// Set socket option SO_SNDBUF in srs_utime_t.
virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v);
// Interface ISrsResource.
public:
virtual std::string desc();
// Interface ISrsConnection.
public:
virtual std::string remote_ip();
virtual const SrsContextId& get_id();
// Interface ISrsStartable
public:
virtual srs_error_t start();
// Interface ISrsKbpsDelta
public:
virtual void remark(int64_t* in, int64_t* out);
}; };
// The http server, use http stream or static server to serve requests. // The http server, use http stream or static server to serve requests.

View file

@ -602,7 +602,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Use receive thread to accept the close event to avoid FD leak. // Use receive thread to accept the close event to avoid FD leak.
// @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427 // @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r); SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsResponseOnlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection()); SrsHttpConn* hc = dynamic_cast<SrsHttpConn*>(hr->connection());
// update the statistic when source disconveried. // update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance(); SrsStatistic* stat = SrsStatistic::instance();
@ -623,6 +623,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
} }
} }
// Try to use fast flv encoder, remember that it maybe NULL.
SrsFlvStreamEncoder* ffe = dynamic_cast<SrsFlvStreamEncoder*>(enc); SrsFlvStreamEncoder* ffe = dynamic_cast<SrsFlvStreamEncoder*>(enc);
// Set the socket options for transport. // Set the socket options for transport.
@ -638,7 +639,11 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep); return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep);
} }
SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc); // Note that the handler of hc now is rohc.
SrsResponseOnlyHttpConn* rohc = dynamic_cast<SrsResponseOnlyHttpConn*>(hc->handler());
srs_assert(rohc);
SrsHttpRecvThread* trd = new SrsHttpRecvThread(rohc);
SrsAutoFree(SrsHttpRecvThread, trd); SrsAutoFree(SrsHttpRecvThread, trd);
if ((err = trd->start()) != srs_success) { if ((err = trd->start()) != srs_success) {

View file

@ -1442,6 +1442,7 @@ srs_error_t SrsRtmpConn::cycle()
srs_error_t err = do_cycle(); srs_error_t err = do_cycle();
// Notify manager to remove it. // Notify manager to remove it.
// Note that we create this object, so we use manager to remove it.
manager->remove(this); manager->remove(this);
// success. // success.