1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

Refine connection structure

This commit is contained in:
winlin 2020-11-04 18:15:43 +08:00
parent 7c68487bf7
commit 0a82719bd3
11 changed files with 51 additions and 37 deletions

View file

@ -484,11 +484,6 @@ srs_error_t SrsTcpConnection::cycle()
return srs_success; return srs_success;
} }
SrsContextId SrsTcpConnection::srs_id()
{
return trd->cid();
}
string SrsTcpConnection::remote_ip() string SrsTcpConnection::remote_ip()
{ {
return ip; return ip;

View file

@ -110,7 +110,7 @@ private:
// all connections accept from listener must extends from this base class, // all connections accept from listener must extends from this base class,
// server will add the connection to manager, and delete it when remove. // server will add the connection to manager, and delete it when remove.
class SrsTcpConnection : virtual public ISrsConnection, virtual public ISrsCoroutineHandler class SrsTcpConnection : virtual public ISrsConnection, virtual public ISrsCoroutineHandler
, virtual public ISrsKbpsDelta, virtual public ISrsReloadHandler , virtual public ISrsKbpsDelta, virtual public ISrsReloadHandler, virtual public ISrsStartable
{ {
protected: protected:
// Each connection start a green thread, // Each connection start a green thread,
@ -143,6 +143,8 @@ public:
public: public:
// To dipose the connection. // To dipose the connection.
virtual void dispose(); virtual void dispose();
// Interface ISrsStartable
public:
// Start the client green thread. // Start the client green thread.
// when server get a client from listener, // when server get a client from listener,
// 1. server will create an concrete connection(for instance, RTMP connection), // 1. server will create an concrete connection(for instance, RTMP connection),
@ -151,6 +153,7 @@ public:
// when client cycle thread stop, invoke the on_thread_stop(), which will use server // when client cycle thread stop, invoke the on_thread_stop(), which will use server
// To remove the client by server->remove(this). // To remove the client by server->remove(this).
virtual srs_error_t start(); virtual srs_error_t start();
public:
// Set socket option TCP_NODELAY. // Set socket option TCP_NODELAY.
virtual srs_error_t set_tcp_nodelay(bool v); virtual srs_error_t set_tcp_nodelay(bool v);
// Set socket option SO_SNDBUF in srs_utime_t. // Set socket option SO_SNDBUF in srs_utime_t.
@ -161,10 +164,6 @@ public:
// when serve connection completed, terminate the loop which will terminate the thread, // when serve connection completed, terminate the loop which will terminate the thread,
// thread will invoke the on_thread_stop() when it terminated. // thread will invoke the on_thread_stop() when it terminated.
virtual srs_error_t cycle(); virtual srs_error_t cycle();
public:
// Get the srs id which identify the client.
// TODO: FIXME: Rename to cid.
virtual SrsContextId srs_id();
// Interface ISrsConnection. // Interface ISrsConnection.
public: public:
virtual std::string remote_ip(); virtual std::string remote_ip();

View file

@ -254,6 +254,7 @@ public:
}; };
#endif #endif
// TODO: FIXME: Refine arch, change to use SrsTcpConnection
class SrsHttpApi : virtual public SrsTcpConnection, virtual public ISrsReloadHandler class SrsHttpApi : virtual public SrsTcpConnection, virtual public ISrsReloadHandler
{ {
private: private:

View file

@ -55,6 +55,7 @@ class SrsHttpStreamServer;
class SrsHttpStaticServer; class SrsHttpStaticServer;
// The http connection which request the static or stream content. // The http connection which request the static or stream content.
// TODO: FIXME: Refine arch, change to use SrsTcpConnection
class SrsHttpConn : public SrsTcpConnection class SrsHttpConn : public SrsTcpConnection
{ {
protected: protected:

View file

@ -83,6 +83,7 @@ public:
}; };
// The client provides the main logic control for RTMP clients. // The client provides the main logic control for RTMP clients.
// TODO: FIXME: Refine arch, change to use SrsTcpConnection
class SrsRtmpConn : virtual public SrsTcpConnection, virtual public ISrsReloadHandler class SrsRtmpConn : virtual public SrsTcpConnection, virtual public ISrsReloadHandler
{ {
// For the thread to directly access any field of connection. // For the thread to directly access any field of connection.

View file

@ -1462,11 +1462,12 @@ void SrsServer::resample_kbps()
// collect delta from all clients. // collect delta from all clients.
for (int i = 0; i < (int)conn_manager->size(); i++) { for (int i = 0; i < (int)conn_manager->size(); i++) {
SrsTcpConnection* conn = dynamic_cast<SrsTcpConnection*>(conn_manager->at(i)); ISrsResource* c = conn_manager->at(i);
ISrsKbpsDelta* conn = dynamic_cast<ISrsKbpsDelta*>(conn_manager->at(i));
// add delta of connection to server kbps., // add delta of connection to server kbps.,
// for next sample() of server kbps can get the stat. // for next sample() of server kbps can get the stat.
stat->kbps_add_delta(conn); stat->kbps_add_delta(c->get_id(), conn);
} }
// TODO: FXME: support all other connections. // TODO: FXME: support all other connections.
@ -1481,22 +1482,21 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
SrsTcpConnection* conn = NULL; ISrsResource* r = NULL;
if ((err = fd2conn(type, stfd, &conn)) != srs_success) { if ((err = fd_to_resource(type, stfd, &r)) != srs_success) {
if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) { if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) {
srs_close_stfd(stfd); srs_error_reset(err); srs_close_stfd(stfd); srs_error_reset(err);
return srs_success; return srs_success;
} }
return srs_error_wrap(err, "fd2conn"); return srs_error_wrap(err, "fd to resource");
} }
srs_assert(conn); srs_assert(r);
// directly enqueue, the cycle thread will remove the client. // directly enqueue, the cycle thread will remove the client.
conn_manager->add(conn); conn_manager->add(r);
// cycle will start process thread and when finished remove the client. ISrsStartable* conn = dynamic_cast<ISrsStartable*>(r);
// @remark never use the conn, for it maybe destroyed.
if ((err = conn->start()) != srs_success) { if ((err = conn->start()) != srs_success) {
return srs_error_wrap(err, "start conn coroutine"); return srs_error_wrap(err, "start conn coroutine");
} }
@ -1509,7 +1509,7 @@ SrsHttpServeMux* SrsServer::api_server()
return http_api_mux; return http_api_mux;
} }
srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpConnection** pconn) srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsResource** pr)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1549,11 +1549,11 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpCon
} }
if (type == SrsListenerRtmpStream) { if (type == SrsListenerRtmpStream) {
*pconn = new SrsRtmpConn(this, stfd, ip, port); *pr = new SrsRtmpConn(this, stfd, ip, port);
} else if (type == SrsListenerHttpApi) { } else if (type == SrsListenerHttpApi) {
*pconn = new SrsHttpApi(this, stfd, http_api_mux, ip, port); *pr = new SrsHttpApi(this, stfd, http_api_mux, ip, port);
} else if (type == SrsListenerHttpStream) { } else if (type == SrsListenerHttpStream) {
*pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip, port); *pr = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip, port);
} else { } else {
srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port); srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
srs_close_stfd(stfd); srs_close_stfd(stfd);
@ -1565,11 +1565,11 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpCon
void SrsServer::remove(ISrsResource* c) void SrsServer::remove(ISrsResource* c)
{ {
SrsTcpConnection* conn = dynamic_cast<SrsTcpConnection*>(c); ISrsKbpsDelta* conn = dynamic_cast<ISrsKbpsDelta*>(c);
SrsStatistic* stat = SrsStatistic::instance(); SrsStatistic* stat = SrsStatistic::instance();
stat->kbps_add_delta(conn); stat->kbps_add_delta(c->get_id(), conn);
stat->on_disconnect(conn->srs_id()); stat->on_disconnect(c->get_id());
// use manager to free it async. // use manager to free it async.
conn_manager->remove(c); conn_manager->remove(c);

View file

@ -342,7 +342,7 @@ public:
// TODO: FIXME: Fetch from hybrid server manager. // TODO: FIXME: Fetch from hybrid server manager.
virtual SrsHttpServeMux* api_server(); virtual SrsHttpServeMux* api_server();
private: private:
virtual srs_error_t fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpConnection** pconn); virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsResource** pr);
// Interface ISrsResourceManager // Interface ISrsResourceManager
public: public:
// A callback for connection to remove itself. // A callback for connection to remove itself.

View file

@ -40,6 +40,14 @@ ISrsCoroutineHandler::~ISrsCoroutineHandler()
{ {
} }
ISrsStartable::ISrsStartable()
{
}
ISrsStartable::~ISrsStartable()
{
}
SrsCoroutine::SrsCoroutine() SrsCoroutine::SrsCoroutine()
{ {
} }

View file

@ -68,14 +68,23 @@ public:
virtual srs_error_t cycle() = 0; virtual srs_error_t cycle() = 0;
}; };
// Start the object, generally a croutine.
class ISrsStartable
{
public:
ISrsStartable();
virtual ~ISrsStartable();
public:
virtual srs_error_t start() = 0;
};
// The corotine object. // The corotine object.
class SrsCoroutine class SrsCoroutine : public ISrsStartable
{ {
public: public:
SrsCoroutine(); SrsCoroutine();
virtual ~SrsCoroutine(); virtual ~SrsCoroutine();
public: public:
virtual srs_error_t start() = 0;
virtual void stop() = 0; virtual void stop() = 0;
virtual void interrupt() = 0; virtual void interrupt() = 0;
// @return a copy of error, which should be freed by user. // @return a copy of error, which should be freed by user.

View file

@ -453,7 +453,7 @@ srs_error_t SrsStatistic::on_client(SrsContextId cid, SrsRequest* req, SrsTcpCon
return err; return err;
} }
void SrsStatistic::on_disconnect(SrsContextId cid) void SrsStatistic::on_disconnect(const SrsContextId& cid)
{ {
// TODO: FIXME: We should use UUID for client ID. // TODO: FIXME: We should use UUID for client ID.
std::string id = cid.c_str(); std::string id = cid.c_str();
@ -474,10 +474,10 @@ void SrsStatistic::on_disconnect(SrsContextId cid)
vhost->nb_clients--; vhost->nb_clients--;
} }
void SrsStatistic::kbps_add_delta(SrsTcpConnection* conn) void SrsStatistic::kbps_add_delta(const SrsContextId& cid, ISrsKbpsDelta* delta)
{ {
// TODO: FIXME: Should not use context id as connection id. // TODO: FIXME: Should not use context id as connection id.
std::string id = conn->srs_id().c_str(); std::string id = cid.c_str();
if (clients.find(id) == clients.end()) { if (clients.find(id) == clients.end()) {
return; return;
} }
@ -486,7 +486,7 @@ void SrsStatistic::kbps_add_delta(SrsTcpConnection* conn)
// resample the kbps to collect the delta. // resample the kbps to collect the delta.
int64_t in, out; int64_t in, out;
conn->remark(&in, &out); delta->remark(&in, &out);
// add delta of connection to kbps. // add delta of connection to kbps.
// for next sample() of server kbps can get the stat. // for next sample() of server kbps can get the stat.

View file

@ -39,6 +39,7 @@ class SrsRequest;
class SrsTcpConnection; class SrsTcpConnection;
class SrsJsonObject; class SrsJsonObject;
class SrsJsonArray; class SrsJsonArray;
class ISrsKbpsDelta;
struct SrsStatisticVhost struct SrsStatisticVhost
{ {
@ -211,11 +212,10 @@ public:
// only got the request object, so the client specified by id maybe not // only got the request object, so the client specified by id maybe not
// exists in stat. // exists in stat.
// TODO: FIXME: We should not use context id as client id. // TODO: FIXME: We should not use context id as client id.
virtual void on_disconnect(SrsContextId id); virtual void on_disconnect(const SrsContextId& id);
// Sample the kbps, add delta bytes of conn. // Sample the kbps, add delta bytes of conn.
// Use kbps_sample() to get all result of kbps stat. // Use kbps_sample() to get all result of kbps stat.
// TODO: FIXME: the add delta must use ISrsKbpsDelta interface instead. virtual void kbps_add_delta(const SrsContextId& cid, ISrsKbpsDelta* delta);
virtual void kbps_add_delta(SrsTcpConnection* conn);
// Calc the result for all kbps. // Calc the result for all kbps.
// @return the server kbps. // @return the server kbps.
virtual SrsKbps* kbps_sample(); virtual SrsKbps* kbps_sample();