1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Refine code, use SrsConnectionManager

This commit is contained in:
winlin 2020-09-11 18:55:18 +08:00
parent 33f1cb87a2
commit 66c680edb1
8 changed files with 154 additions and 110 deletions

View file

@ -59,6 +59,16 @@ srs_error_t SrsConnectionManager::start()
return err; return err;
} }
bool SrsConnectionManager::empty()
{
return conns_.empty();
}
size_t SrsConnectionManager::size()
{
return conns_.size();
}
srs_error_t SrsConnectionManager::cycle() srs_error_t SrsConnectionManager::cycle()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -76,12 +86,48 @@ srs_error_t SrsConnectionManager::cycle()
return err; return err;
} }
void SrsConnectionManager::add(ISrsConnection* conn)
{
if (::find(conns_.begin(), conns_.end(), conn) == conns_.end()) {
conns_.push_back(conn);
}
}
void SrsConnectionManager::add_with_id(const std::string& id, ISrsConnection* conn)
{
add(conn);
conns_id_.insert(make_pair(id, conn));
}
void SrsConnectionManager::add_with_name(const std::string& name, ISrsConnection* conn)
{
add(conn);
conns_name_.insert(make_pair(name, conn));
}
ISrsConnection* SrsConnectionManager::at(int index)
{
return conns_.at(index);
}
ISrsConnection* SrsConnectionManager::find_by_id(std::string id)
{
map<string, ISrsConnection*>::iterator it = conns_id_.find(id);
return (it != conns_id_.end())? it->second : NULL;
}
ISrsConnection* SrsConnectionManager::find_by_name(std::string name)
{
map<string, ISrsConnection*>::iterator it = conns_name_.find(name);
return (it != conns_name_.end())? it->second : NULL;
}
void SrsConnectionManager::remove(ISrsConnection* c) void SrsConnectionManager::remove(ISrsConnection* c)
{ {
if (::find(conns.begin(), conns.end(), c) == conns.end()) { if (::find(zombies_.begin(), zombies_.end(), c) == zombies_.end()) {
conns.push_back(c); zombies_.push_back(c);
srs_cond_signal(cond);
} }
srs_cond_signal(cond);
} }
void SrsConnectionManager::clear() void SrsConnectionManager::clear()
@ -89,15 +135,43 @@ void SrsConnectionManager::clear()
// To prevent thread switch when delete connection, // To prevent thread switch when delete connection,
// we copy all connections then free one by one. // we copy all connections then free one by one.
vector<ISrsConnection*> copy; vector<ISrsConnection*> copy;
copy.swap(conns); copy.swap(zombies_);
vector<ISrsConnection*>::iterator it; vector<ISrsConnection*>::iterator it;
for (it = copy.begin(); it != copy.end(); ++it) { for (it = copy.begin(); it != copy.end(); ++it) {
ISrsConnection* conn = *it; ISrsConnection* conn = *it;
srs_freep(conn); dispose(conn);
} }
} }
void SrsConnectionManager::dispose(ISrsConnection* c)
{
for (map<string, ISrsConnection*>::iterator it = conns_name_.begin(); it != conns_name_.end();) {
if (c != it->second) {
++it;
} else {
// Use C++98 style: https://stackoverflow.com/a/4636230
conns_name_.erase(it++);
}
}
for (map<string, ISrsConnection*>::iterator it = conns_id_.begin(); it != conns_id_.end();) {
if (c != it->second) {
++it;
} else {
// Use C++98 style: https://stackoverflow.com/a/4636230
conns_id_.erase(it++);
}
}
vector<ISrsConnection*>::iterator it = std::find(conns_.begin(), conns_.end(), c);
if (it != conns_.end()) {
conns_.erase(it);
}
srs_freep(c);
}
SrsTcpConnection::SrsTcpConnection(IConnectionManager* cm, srs_netfd_t c, string cip, int cport) SrsTcpConnection::SrsTcpConnection(IConnectionManager* cm, srs_netfd_t c, string cip, int cport)
{ {
manager = cm; manager = cm;

View file

@ -28,6 +28,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <map>
#include <srs_app_st.hpp> #include <srs_app_st.hpp>
#include <srs_protocol_kbps.hpp> #include <srs_protocol_kbps.hpp>
@ -41,21 +42,39 @@ class SrsConnectionManager : virtual public ISrsCoroutineHandler, virtual public
{ {
private: private:
SrsCoroutine* trd; SrsCoroutine* trd;
std::vector<ISrsConnection*> conns;
srs_cond_t cond; srs_cond_t cond;
// The zombie connections, we will delete it asynchronously.
std::vector<ISrsConnection*> zombies_;
private:
// The connections without any id.
std::vector<ISrsConnection*> conns_;
// The connections with connection id.
std::map<std::string, ISrsConnection*> conns_id_;
// The connections with connection name.
std::map<std::string, ISrsConnection*> conns_name_;
public: public:
SrsConnectionManager(); SrsConnectionManager();
virtual ~SrsConnectionManager(); virtual ~SrsConnectionManager();
public: public:
srs_error_t start(); srs_error_t start();
bool empty();
size_t size();
// Interface ISrsCoroutineHandler // Interface ISrsCoroutineHandler
public: public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
public:
void add(ISrsConnection* conn);
void add_with_id(const std::string& id, ISrsConnection* conn);
void add_with_name(const std::string& name, ISrsConnection* conn);
ISrsConnection* at(int index);
ISrsConnection* find_by_id(std::string id);
ISrsConnection* find_by_name(std::string name);
// Interface IConnectionManager // Interface IConnectionManager
public: public:
virtual void remove(ISrsConnection* c); virtual void remove(ISrsConnection* c);
private: private:
void clear(); void clear();
void dispose(ISrsConnection* c);
}; };
// The basic connection of SRS, for TCP based protocols, // The basic connection of SRS, for TCP based protocols,

View file

@ -1678,6 +1678,14 @@ vector<SrsUdpMuxSocket*> SrsRtcConnection::peer_addresses()
return addresses; return addresses;
} }
string SrsRtcConnection::remote_ip()
{
if (sendonly_skt) {
return sendonly_skt->get_peer_ip();
}
return "";
}
void SrsRtcConnection::switch_to_context() void SrsRtcConnection::switch_to_context()
{ {
_srs_context->set_id(cid_); _srs_context->set_id(cid_);

View file

@ -38,6 +38,7 @@
#include <srs_app_rtc_queue.hpp> #include <srs_app_rtc_queue.hpp>
#include <srs_app_rtc_source.hpp> #include <srs_app_rtc_source.hpp>
#include <srs_app_rtc_dtls.hpp> #include <srs_app_rtc_dtls.hpp>
#include <srs_service_conn.hpp>
#include <string> #include <string>
#include <map> #include <map>
@ -412,7 +413,7 @@ public:
}; };
// A RTC Peer Connection, SDP level object. // A RTC Peer Connection, SDP level object.
class SrsRtcConnection : virtual public ISrsHourGlass class SrsRtcConnection : virtual public ISrsHourGlass, virtual public ISrsConnection
{ {
friend class SrsSecurityTransport; friend class SrsSecurityTransport;
friend class SrsRtcPlayStream; friend class SrsRtcPlayStream;
@ -478,6 +479,9 @@ public:
std::string username(); std::string username();
// Get all addresses client used. // Get all addresses client used.
std::vector<SrsUdpMuxSocket*> peer_addresses(); std::vector<SrsUdpMuxSocket*> peer_addresses();
// interface ISrsConnection
public:
virtual std::string remote_ip();
public: public:
void switch_to_context(); void switch_to_context();
const SrsContextId& context_id(); const SrsContextId& context_id();

View file

@ -216,6 +216,7 @@ SrsRtcServer::SrsRtcServer()
{ {
handler = NULL; handler = NULL;
hijacker = NULL; hijacker = NULL;
manager = new SrsConnectionManager();
timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS);
} }
@ -231,20 +232,14 @@ SrsRtcServer::~SrsRtcServer()
} }
} }
if (true) { srs_freep(manager);
std::vector<SrsRtcConnection*>::iterator it;
for (it = zombies_.begin(); it != zombies_.end(); ++it) {
SrsRtcConnection* session = *it;
srs_freep(session);
}
}
} }
srs_error_t SrsRtcServer::initialize() srs_error_t SrsRtcServer::initialize()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
if ((err = timer->tick(1 * SRS_UTIME_SECONDS)) != srs_success) { if ((err = timer->tick(3 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "hourglass tick"); return srs_error_wrap(err, "hourglass tick");
} }
@ -256,6 +251,10 @@ srs_error_t SrsRtcServer::initialize()
return srs_error_wrap(err, "black hole"); return srs_error_wrap(err, "black hole");
} }
if ((err = manager->start()) != srs_success) {
return srs_error_wrap(err, "start manager");
}
srs_trace("RTC server init ok"); srs_trace("RTC server init ok");
return err; return err;
@ -312,14 +311,11 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
SrsRtcConnection* session = NULL; SrsRtcConnection* session = NULL;
if (true) { if (true) {
map<string, SrsRtcConnection*>::iterator it = map_id_session.find(peer_id); ISrsConnection* conn = manager->find_by_id(peer_id);
if (it != map_id_session.end()) { if (conn) {
session = it->second;
// Switch to the session to write logs to the context. // Switch to the session to write logs to the context.
if (session) { session = dynamic_cast<SrsRtcConnection*>(conn);
session->switch_to_context(); session->switch_to_context();
}
} }
} }
@ -462,7 +458,7 @@ srs_error_t SrsRtcServer::do_create_session(
local_ufrag = srs_random_str(8); local_ufrag = srs_random_str(8);
username = local_ufrag + ":" + remote_sdp.get_ice_ufrag(); username = local_ufrag + ":" + remote_sdp.get_ice_ufrag();
if (!map_username_session.count(username)) { if (!manager->find_by_name(username)) {
break; break;
} }
} }
@ -508,7 +504,7 @@ srs_error_t SrsRtcServer::do_create_session(
} }
// We allows username is optional, but it never empty here. // We allows username is optional, but it never empty here.
map_username_session.insert(make_pair(username, session)); manager->add_with_name(username, session);
return err; return err;
} }
@ -571,7 +567,7 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest*
} }
// We allows username is optional, but it never empty here. // We allows username is optional, but it never empty here.
map_username_session.insert(make_pair(username, session)); manager->add_with_name(username, session);
session->set_remote_sdp(remote_sdp); session->set_remote_sdp(remote_sdp);
session->set_state(WAITING_STUN); session->set_state(WAITING_STUN);
@ -586,60 +582,38 @@ void SrsRtcServer::destroy(SrsRtcConnection* session)
} }
session->disposing_ = true; session->disposing_ = true;
std::map<std::string, SrsRtcConnection*>::iterator it;
// We allows username is optional.
string username = session->username();
if (!username.empty() && (it = map_username_session.find(username)) != map_username_session.end()) {
map_username_session.erase(it);
}
vector<SrsUdpMuxSocket*> addresses = session->peer_addresses();
for (int i = 0; i < (int)addresses.size(); i++) {
SrsUdpMuxSocket* addr = addresses.at(i);
map_id_session.erase(addr->peer_id());
}
SrsContextRestore(_srs_context->get_id()); SrsContextRestore(_srs_context->get_id());
session->switch_to_context(); session->switch_to_context();
string username = session->username();
srs_trace("RTC: session destroy, username=%s, summary: %s", username.c_str(), session->stat_->summary().c_str()); srs_trace("RTC: session destroy, username=%s, summary: %s", username.c_str(), session->stat_->summary().c_str());
zombies_.push_back(session); manager->remove(session);
} }
bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcConnection* session) void SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcConnection* session)
{ {
return map_id_session.insert(make_pair(peer_id, session)).second; manager->add_with_id(peer_id, session);
} }
void SrsRtcServer::check_and_clean_timeout_session() void SrsRtcServer::check_and_clean_timeout_session()
{ {
map<string, SrsRtcConnection*>::iterator iter = map_username_session.begin(); for (int i = 0; i < (int)manager->size(); i++) {
while (iter != map_username_session.end()) { SrsRtcConnection* session = dynamic_cast<SrsRtcConnection*>(manager->at(i));
SrsRtcConnection* session = iter->second;
srs_assert(session); srs_assert(session);
if (!session->is_stun_timeout()) { if (!session->is_stun_timeout()) {
++iter;
continue; continue;
} }
// Now, we got the RTC session to cleanup, switch to its context // Now, we got the RTC session to cleanup, switch to its context
// to make all logs write to the "correct" pid+cid. // to make all logs write to the "correct" pid+cid.
session->switch_to_context(); session->switch_to_context();
srs_trace("RTC: session STUN timeout, summary: %s", session->stat_->summary().c_str()); string username = session->username();
srs_trace("RTC: session STUN timeout, username=%s, summary: %s", username.c_str(), session->stat_->summary().c_str());
session->disposing_ = true; session->disposing_ = true;
zombies_.push_back(session); manager->remove(session);
// Use C++98 style: https://stackoverflow.com/a/4636230
map_username_session.erase(iter++);
vector<SrsUdpMuxSocket*> addresses = session->peer_addresses();
for (int i = 0; i < (int)addresses.size(); i++) {
SrsUdpMuxSocket* addr = addresses.at(i);
map_id_session.erase(addr->peer_id());
}
if (handler) { if (handler) {
handler->on_timeout(session); handler->on_timeout(session);
@ -649,35 +623,18 @@ void SrsRtcServer::check_and_clean_timeout_session()
SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& username) SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& username)
{ {
map<string, SrsRtcConnection*>::iterator iter = map_username_session.find(username); ISrsConnection* conn = manager->find_by_name(username);
if (iter == map_username_session.end()) { return dynamic_cast<SrsRtcConnection*>(conn);
return NULL;
}
return iter->second;
} }
srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tick) srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tick)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
// TODO: FIXME: Merge small functions.
// Check session timeout, put to zombies queue. // Check session timeout, put to zombies queue.
check_and_clean_timeout_session(); check_and_clean_timeout_session();
// Cleanup zombie sessions.
if (zombies_.empty()) {
return err;
}
std::vector<SrsRtcConnection*> zombies;
zombies.swap(zombies_);
std::vector<SrsRtcConnection*>::iterator it;
for (it = zombies.begin(); it != zombies.end(); ++it) {
SrsRtcConnection* session = *it;
srs_freep(session);
}
return err; return err;
} }

View file

@ -40,6 +40,7 @@ class SrsRtcConnection;
class SrsRequest; class SrsRequest;
class SrsSdp; class SrsSdp;
class SrsRtcStream; class SrsRtcStream;
class SrsConnectionManager;
// The UDP black hole, for developer to use wireshark to catch plaintext packets. // The UDP black hole, for developer to use wireshark to catch plaintext packets.
// For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole, // For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole,
@ -91,13 +92,7 @@ private:
std::vector<SrsUdpMuxListener*> listeners; std::vector<SrsUdpMuxListener*> listeners;
ISrsRtcServerHandler* handler; ISrsRtcServerHandler* handler;
ISrsRtcServerHijacker* hijacker; ISrsRtcServerHijacker* hijacker;
private: SrsConnectionManager* manager;
// TODO: FIXME: Rename it.
std::map<std::string, SrsRtcConnection*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
// TODO: FIXME: Rename it.
std::map<std::string, SrsRtcConnection*> map_id_session; // key: peerip(ip + ":" + port)
// The zombie sessions, we will free them.
std::vector<SrsRtcConnection*> zombies_;
public: public:
SrsRtcServer(); SrsRtcServer();
virtual ~SrsRtcServer(); virtual ~SrsRtcServer();
@ -131,7 +126,7 @@ public:
// Destroy the session from server. // Destroy the session from server.
void destroy(SrsRtcConnection* session); void destroy(SrsRtcConnection* session);
public: public:
bool insert_into_id_sessions(const std::string& peer_id, SrsRtcConnection* session); void insert_into_id_sessions(const std::string& peer_id, SrsRtcConnection* session);
private: private:
void check_and_clean_timeout_session(); void check_and_clean_timeout_session();
public: public:

View file

@ -749,13 +749,13 @@ void SrsServer::gracefully_dispose()
// Wait for connections to quit. // Wait for connections to quit.
// While gracefully quiting, user can requires SRS to fast quit. // While gracefully quiting, user can requires SRS to fast quit.
int wait_step = 1; int wait_step = 1;
while (!conns.empty() && !signal_fast_quit) { while (!conn_manager->empty() && !signal_fast_quit) {
for (int i = 0; i < wait_step && !conns.empty() && !signal_fast_quit; i++) { for (int i = 0; i < wait_step && !conn_manager->empty() && !signal_fast_quit; i++) {
srs_usleep(1000 * SRS_UTIME_MILLISECONDS); srs_usleep(1000 * SRS_UTIME_MILLISECONDS);
} }
wait_step = (wait_step * 2) % 33; wait_step = (wait_step * 2) % 33;
srs_trace("wait for %d conns to quit", conns.size()); srs_trace("wait for %d conns to quit", (int)conn_manager->size());
} }
// dispose the source for hls and dvr. // dispose the source for hls and dvr.
@ -1473,8 +1473,8 @@ void SrsServer::resample_kbps()
SrsStatistic* stat = SrsStatistic::instance(); SrsStatistic* stat = SrsStatistic::instance();
// collect delta from all clients. // collect delta from all clients.
for (std::vector<SrsTcpConnection*>::iterator it = conns.begin(); it != conns.end(); ++it) { for (int i = 0; i < (int)conn_manager->size(); i++) {
SrsTcpConnection* conn = *it; SrsTcpConnection* conn = dynamic_cast<SrsTcpConnection*>(conn_manager->at(i));
// add delta of connection to server kbps., // add delta of connection to server kbps.,
// for next sample() of server kbps can get the stat. // for next sample() of server kbps can get the stat.
@ -1486,7 +1486,7 @@ void SrsServer::resample_kbps()
// sample the kbps, get the stat. // sample the kbps, get the stat.
SrsKbps* kbps = stat->kbps_sample(); SrsKbps* kbps = stat->kbps_sample();
srs_update_rtmp_server((int)conns.size(), kbps); srs_update_rtmp_server((int)conn_manager->size(), kbps);
} }
srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
@ -1505,7 +1505,7 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
srs_assert(conn); srs_assert(conn);
// directly enqueue, the cycle thread will remove the client. // directly enqueue, the cycle thread will remove the client.
conns.push_back(conn); conn_manager->add(conn);
// cycle will start process thread and when finished remove the client. // cycle will start process thread and when finished remove the client.
// @remark never use the conn, for it maybe destroyed. // @remark never use the conn, for it maybe destroyed.
@ -1538,13 +1538,13 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpCon
// check connection limitation. // check connection limitation.
int max_connections = _srs_config->get_max_connections(); int max_connections = _srs_config->get_max_connections();
if (handler && (err = handler->on_accept_client(max_connections, (int)conns.size())) != srs_success) { if (handler && (err = handler->on_accept_client(max_connections, (int)conn_manager->size())) != srs_success) {
return srs_error_wrap(err, "drop client fd=%d, ip=%s:%d, max=%d, cur=%d for err: %s", return srs_error_wrap(err, "drop client fd=%d, ip=%s:%d, max=%d, cur=%d for err: %s",
fd, ip.c_str(), port, max_connections, (int)conns.size(), srs_error_desc(err).c_str()); fd, ip.c_str(), port, max_connections, (int)conn_manager->size(), srs_error_desc(err).c_str());
} }
if ((int)conns.size() >= max_connections) { if ((int)conn_manager->size() >= max_connections) {
return srs_error_new(ERROR_EXCEED_CONNECTIONS, "drop fd=%d, ip=%s:%d, max=%d, cur=%d for exceed connection limits", return srs_error_new(ERROR_EXCEED_CONNECTIONS, "drop fd=%d, ip=%s:%d, max=%d, cur=%d for exceed connection limits",
fd, ip.c_str(), port, max_connections, (int)conns.size()); fd, ip.c_str(), port, max_connections, (int)conn_manager->size());
} }
// avoid fd leak when fork. // avoid fd leak when fork.
@ -1578,18 +1578,7 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpCon
void SrsServer::remove(ISrsConnection* c) void SrsServer::remove(ISrsConnection* c)
{ {
SrsTcpConnection* conn = dynamic_cast<SrsTcpConnection*>(c); SrsTcpConnection* conn = dynamic_cast<SrsTcpConnection*>(c);
std::vector<SrsTcpConnection*>::iterator it = std::find(conns.begin(), conns.end(), conn);
// removed by destroy, ignore.
if (it == conns.end()) {
srs_warn("server moved connection, ignore.");
return;
}
conns.erase(it);
srs_info("conn removed. conns=%d", (int)conns.size());
SrsStatistic* stat = SrsStatistic::instance(); SrsStatistic* stat = SrsStatistic::instance();
stat->kbps_add_delta(conn); stat->kbps_add_delta(conn);
stat->on_disconnect(conn->srs_id()); stat->on_disconnect(conn->srs_id());

View file

@ -256,8 +256,6 @@ private:
// for the server never delete the file; when system startup, the pid in pid file // for the server never delete the file; when system startup, the pid in pid file
// maybe valid but the process is not SRS, the init.d script will never start server. // maybe valid but the process is not SRS, the init.d script will never start server.
int pid_fd; int pid_fd;
// All connections, connection manager
std::vector<SrsTcpConnection*> conns;
// All listners, listener manager. // All listners, listener manager.
std::vector<SrsListener*> listeners; std::vector<SrsListener*> listeners;
// Signal manager which convert gignal to io message. // Signal manager which convert gignal to io message.