From 66c680edb1b198f5e36d570b46dc47f7535e3e07 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 11 Sep 2020 18:55:18 +0800 Subject: [PATCH] Refine code, use SrsConnectionManager --- trunk/src/app/srs_app_conn.cpp | 84 ++++++++++++++++++++++-- trunk/src/app/srs_app_conn.hpp | 21 +++++- trunk/src/app/srs_app_rtc_conn.cpp | 8 +++ trunk/src/app/srs_app_rtc_conn.hpp | 6 +- trunk/src/app/srs_app_rtc_server.cpp | 97 ++++++++-------------------- trunk/src/app/srs_app_rtc_server.hpp | 11 +--- trunk/src/app/srs_app_server.cpp | 35 ++++------ trunk/src/app/srs_app_server.hpp | 2 - 8 files changed, 154 insertions(+), 110 deletions(-) diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 025c1480c..6be1ea935 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -59,6 +59,16 @@ srs_error_t SrsConnectionManager::start() return err; } +bool SrsConnectionManager::empty() +{ + return conns_.empty(); +} + +size_t SrsConnectionManager::size() +{ + return conns_.size(); +} + srs_error_t SrsConnectionManager::cycle() { srs_error_t err = srs_success; @@ -76,12 +86,48 @@ srs_error_t SrsConnectionManager::cycle() return err; } +void SrsConnectionManager::add(ISrsConnection* conn) +{ + if (::find(conns_.begin(), conns_.end(), conn) == conns_.end()) { + conns_.push_back(conn); + } +} + +void SrsConnectionManager::add_with_id(const std::string& id, ISrsConnection* conn) +{ + add(conn); + conns_id_.insert(make_pair(id, conn)); +} + +void SrsConnectionManager::add_with_name(const std::string& name, ISrsConnection* conn) +{ + add(conn); + conns_name_.insert(make_pair(name, conn)); +} + +ISrsConnection* SrsConnectionManager::at(int index) +{ + return conns_.at(index); +} + +ISrsConnection* SrsConnectionManager::find_by_id(std::string id) +{ + map::iterator it = conns_id_.find(id); + return (it != conns_id_.end())? it->second : NULL; +} + +ISrsConnection* SrsConnectionManager::find_by_name(std::string name) +{ + map::iterator it = conns_name_.find(name); + return (it != conns_name_.end())? it->second : NULL; +} + void SrsConnectionManager::remove(ISrsConnection* c) { - if (::find(conns.begin(), conns.end(), c) == conns.end()) { - conns.push_back(c); + if (::find(zombies_.begin(), zombies_.end(), c) == zombies_.end()) { + zombies_.push_back(c); + srs_cond_signal(cond); } - srs_cond_signal(cond); } void SrsConnectionManager::clear() @@ -89,15 +135,43 @@ void SrsConnectionManager::clear() // To prevent thread switch when delete connection, // we copy all connections then free one by one. vector copy; - copy.swap(conns); + copy.swap(zombies_); vector::iterator it; for (it = copy.begin(); it != copy.end(); ++it) { ISrsConnection* conn = *it; - srs_freep(conn); + dispose(conn); } } +void SrsConnectionManager::dispose(ISrsConnection* c) +{ + for (map::iterator it = conns_name_.begin(); it != conns_name_.end();) { + if (c != it->second) { + ++it; + } else { + // Use C++98 style: https://stackoverflow.com/a/4636230 + conns_name_.erase(it++); + } + } + + for (map::iterator it = conns_id_.begin(); it != conns_id_.end();) { + if (c != it->second) { + ++it; + } else { + // Use C++98 style: https://stackoverflow.com/a/4636230 + conns_id_.erase(it++); + } + } + + vector::iterator it = std::find(conns_.begin(), conns_.end(), c); + if (it != conns_.end()) { + conns_.erase(it); + } + + srs_freep(c); +} + SrsTcpConnection::SrsTcpConnection(IConnectionManager* cm, srs_netfd_t c, string cip, int cport) { manager = cm; diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index e79d28331..9c0e5167d 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -28,6 +28,7 @@ #include #include +#include #include #include @@ -41,21 +42,39 @@ class SrsConnectionManager : virtual public ISrsCoroutineHandler, virtual public { private: SrsCoroutine* trd; - std::vector conns; srs_cond_t cond; + // The zombie connections, we will delete it asynchronously. + std::vector zombies_; +private: + // The connections without any id. + std::vector conns_; + // The connections with connection id. + std::map conns_id_; + // The connections with connection name. + std::map conns_name_; public: SrsConnectionManager(); virtual ~SrsConnectionManager(); public: srs_error_t start(); + bool empty(); + size_t size(); // Interface ISrsCoroutineHandler public: virtual srs_error_t cycle(); +public: + void add(ISrsConnection* conn); + void add_with_id(const std::string& id, ISrsConnection* conn); + void add_with_name(const std::string& name, ISrsConnection* conn); + ISrsConnection* at(int index); + ISrsConnection* find_by_id(std::string id); + ISrsConnection* find_by_name(std::string name); // Interface IConnectionManager public: virtual void remove(ISrsConnection* c); private: void clear(); + void dispose(ISrsConnection* c); }; // The basic connection of SRS, for TCP based protocols, diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 76cc58de8..2b185b390 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1678,6 +1678,14 @@ vector SrsRtcConnection::peer_addresses() return addresses; } +string SrsRtcConnection::remote_ip() +{ + if (sendonly_skt) { + return sendonly_skt->get_peer_ip(); + } + return ""; +} + void SrsRtcConnection::switch_to_context() { _srs_context->set_id(cid_); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 3c780ae73..e640de19f 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -412,7 +413,7 @@ public: }; // A RTC Peer Connection, SDP level object. -class SrsRtcConnection : virtual public ISrsHourGlass +class SrsRtcConnection : virtual public ISrsHourGlass, virtual public ISrsConnection { friend class SrsSecurityTransport; friend class SrsRtcPlayStream; @@ -478,6 +479,9 @@ public: std::string username(); // Get all addresses client used. std::vector peer_addresses(); +// interface ISrsConnection +public: + virtual std::string remote_ip(); public: void switch_to_context(); const SrsContextId& context_id(); diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 1ae4468e8..1cdc9fc16 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -216,6 +216,7 @@ SrsRtcServer::SrsRtcServer() { handler = NULL; hijacker = NULL; + manager = new SrsConnectionManager(); timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); } @@ -231,20 +232,14 @@ SrsRtcServer::~SrsRtcServer() } } - if (true) { - std::vector::iterator it; - for (it = zombies_.begin(); it != zombies_.end(); ++it) { - SrsRtcConnection* session = *it; - srs_freep(session); - } - } + srs_freep(manager); } srs_error_t SrsRtcServer::initialize() { srs_error_t err = srs_success; - if ((err = timer->tick(1 * SRS_UTIME_SECONDS)) != srs_success) { + if ((err = timer->tick(3 * SRS_UTIME_SECONDS)) != srs_success) { return srs_error_wrap(err, "hourglass tick"); } @@ -256,6 +251,10 @@ srs_error_t SrsRtcServer::initialize() return srs_error_wrap(err, "black hole"); } + if ((err = manager->start()) != srs_success) { + return srs_error_wrap(err, "start manager"); + } + srs_trace("RTC server init ok"); return err; @@ -312,14 +311,11 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) SrsRtcConnection* session = NULL; if (true) { - map::iterator it = map_id_session.find(peer_id); - if (it != map_id_session.end()) { - session = it->second; - + ISrsConnection* conn = manager->find_by_id(peer_id); + if (conn) { // Switch to the session to write logs to the context. - if (session) { - session->switch_to_context(); - } + session = dynamic_cast(conn); + session->switch_to_context(); } } @@ -462,7 +458,7 @@ srs_error_t SrsRtcServer::do_create_session( local_ufrag = srs_random_str(8); username = local_ufrag + ":" + remote_sdp.get_ice_ufrag(); - if (!map_username_session.count(username)) { + if (!manager->find_by_name(username)) { break; } } @@ -508,7 +504,7 @@ srs_error_t SrsRtcServer::do_create_session( } // We allows username is optional, but it never empty here. - map_username_session.insert(make_pair(username, session)); + manager->add_with_name(username, session); return err; } @@ -571,7 +567,7 @@ srs_error_t SrsRtcServer::setup_session2(SrsRtcConnection* session, SrsRequest* } // We allows username is optional, but it never empty here. - map_username_session.insert(make_pair(username, session)); + manager->add_with_name(username, session); session->set_remote_sdp(remote_sdp); session->set_state(WAITING_STUN); @@ -586,60 +582,38 @@ void SrsRtcServer::destroy(SrsRtcConnection* session) } session->disposing_ = true; - std::map::iterator it; - - // We allows username is optional. - string username = session->username(); - if (!username.empty() && (it = map_username_session.find(username)) != map_username_session.end()) { - map_username_session.erase(it); - } - - vector addresses = session->peer_addresses(); - for (int i = 0; i < (int)addresses.size(); i++) { - SrsUdpMuxSocket* addr = addresses.at(i); - map_id_session.erase(addr->peer_id()); - } - SrsContextRestore(_srs_context->get_id()); session->switch_to_context(); + + string username = session->username(); srs_trace("RTC: session destroy, username=%s, summary: %s", username.c_str(), session->stat_->summary().c_str()); - zombies_.push_back(session); + manager->remove(session); } -bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcConnection* session) +void SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcConnection* session) { - return map_id_session.insert(make_pair(peer_id, session)).second; + manager->add_with_id(peer_id, session); } void SrsRtcServer::check_and_clean_timeout_session() { - map::iterator iter = map_username_session.begin(); - while (iter != map_username_session.end()) { - SrsRtcConnection* session = iter->second; + for (int i = 0; i < (int)manager->size(); i++) { + SrsRtcConnection* session = dynamic_cast(manager->at(i)); srs_assert(session); if (!session->is_stun_timeout()) { - ++iter; continue; } // Now, we got the RTC session to cleanup, switch to its context // to make all logs write to the "correct" pid+cid. session->switch_to_context(); - srs_trace("RTC: session STUN timeout, summary: %s", session->stat_->summary().c_str()); + string username = session->username(); + srs_trace("RTC: session STUN timeout, username=%s, summary: %s", username.c_str(), session->stat_->summary().c_str()); session->disposing_ = true; - zombies_.push_back(session); - - // Use C++98 style: https://stackoverflow.com/a/4636230 - map_username_session.erase(iter++); - - vector addresses = session->peer_addresses(); - for (int i = 0; i < (int)addresses.size(); i++) { - SrsUdpMuxSocket* addr = addresses.at(i); - map_id_session.erase(addr->peer_id()); - } + manager->remove(session); if (handler) { handler->on_timeout(session); @@ -649,35 +623,18 @@ void SrsRtcServer::check_and_clean_timeout_session() SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& username) { - map::iterator iter = map_username_session.find(username); - if (iter == map_username_session.end()) { - return NULL; - } - - return iter->second; + ISrsConnection* conn = manager->find_by_name(username); + return dynamic_cast(conn); } srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tick) { srs_error_t err = srs_success; + // TODO: FIXME: Merge small functions. // Check session timeout, put to zombies queue. check_and_clean_timeout_session(); - // Cleanup zombie sessions. - if (zombies_.empty()) { - return err; - } - - std::vector zombies; - zombies.swap(zombies_); - - std::vector::iterator it; - for (it = zombies.begin(); it != zombies.end(); ++it) { - SrsRtcConnection* session = *it; - srs_freep(session); - } - return err; } diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index fbc50859b..3110c92c3 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -40,6 +40,7 @@ class SrsRtcConnection; class SrsRequest; class SrsSdp; class SrsRtcStream; +class SrsConnectionManager; // The UDP black hole, for developer to use wireshark to catch plaintext packets. // For example, server receive UDP packets at udp://8000, and forward the plaintext packet to black hole, @@ -91,13 +92,7 @@ private: std::vector listeners; ISrsRtcServerHandler* handler; ISrsRtcServerHijacker* hijacker; -private: - // TODO: FIXME: Rename it. - std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) - // TODO: FIXME: Rename it. - std::map map_id_session; // key: peerip(ip + ":" + port) - // The zombie sessions, we will free them. - std::vector zombies_; + SrsConnectionManager* manager; public: SrsRtcServer(); virtual ~SrsRtcServer(); @@ -131,7 +126,7 @@ public: // Destroy the session from server. void destroy(SrsRtcConnection* session); public: - bool insert_into_id_sessions(const std::string& peer_id, SrsRtcConnection* session); + void insert_into_id_sessions(const std::string& peer_id, SrsRtcConnection* session); private: void check_and_clean_timeout_session(); public: diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index be3e37169..761ad6b5c 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -749,13 +749,13 @@ void SrsServer::gracefully_dispose() // Wait for connections to quit. // While gracefully quiting, user can requires SRS to fast quit. int wait_step = 1; - while (!conns.empty() && !signal_fast_quit) { - for (int i = 0; i < wait_step && !conns.empty() && !signal_fast_quit; i++) { + while (!conn_manager->empty() && !signal_fast_quit) { + for (int i = 0; i < wait_step && !conn_manager->empty() && !signal_fast_quit; i++) { srs_usleep(1000 * SRS_UTIME_MILLISECONDS); } wait_step = (wait_step * 2) % 33; - srs_trace("wait for %d conns to quit", conns.size()); + srs_trace("wait for %d conns to quit", (int)conn_manager->size()); } // dispose the source for hls and dvr. @@ -1473,8 +1473,8 @@ void SrsServer::resample_kbps() SrsStatistic* stat = SrsStatistic::instance(); // collect delta from all clients. - for (std::vector::iterator it = conns.begin(); it != conns.end(); ++it) { - SrsTcpConnection* conn = *it; + for (int i = 0; i < (int)conn_manager->size(); i++) { + SrsTcpConnection* conn = dynamic_cast(conn_manager->at(i)); // add delta of connection to server kbps., // for next sample() of server kbps can get the stat. @@ -1486,7 +1486,7 @@ void SrsServer::resample_kbps() // sample the kbps, get the stat. SrsKbps* kbps = stat->kbps_sample(); - srs_update_rtmp_server((int)conns.size(), kbps); + srs_update_rtmp_server((int)conn_manager->size(), kbps); } srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) @@ -1505,7 +1505,7 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) srs_assert(conn); // directly enqueue, the cycle thread will remove the client. - conns.push_back(conn); + conn_manager->add(conn); // cycle will start process thread and when finished remove the client. // @remark never use the conn, for it maybe destroyed. @@ -1538,13 +1538,13 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpCon // check connection limitation. int max_connections = _srs_config->get_max_connections(); - if (handler && (err = handler->on_accept_client(max_connections, (int)conns.size())) != srs_success) { + if (handler && (err = handler->on_accept_client(max_connections, (int)conn_manager->size())) != srs_success) { return srs_error_wrap(err, "drop client fd=%d, ip=%s:%d, max=%d, cur=%d for err: %s", - fd, ip.c_str(), port, max_connections, (int)conns.size(), srs_error_desc(err).c_str()); + fd, ip.c_str(), port, max_connections, (int)conn_manager->size(), srs_error_desc(err).c_str()); } - if ((int)conns.size() >= max_connections) { + if ((int)conn_manager->size() >= max_connections) { return srs_error_new(ERROR_EXCEED_CONNECTIONS, "drop fd=%d, ip=%s:%d, max=%d, cur=%d for exceed connection limits", - fd, ip.c_str(), port, max_connections, (int)conns.size()); + fd, ip.c_str(), port, max_connections, (int)conn_manager->size()); } // avoid fd leak when fork. @@ -1578,18 +1578,7 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpCon void SrsServer::remove(ISrsConnection* c) { SrsTcpConnection* conn = dynamic_cast(c); - std::vector::iterator it = std::find(conns.begin(), conns.end(), conn); - - // removed by destroy, ignore. - if (it == conns.end()) { - srs_warn("server moved connection, ignore."); - return; - } - - conns.erase(it); - - srs_info("conn removed. conns=%d", (int)conns.size()); - + SrsStatistic* stat = SrsStatistic::instance(); stat->kbps_add_delta(conn); stat->on_disconnect(conn->srs_id()); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 1c9fff035..7d8677bc8 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -256,8 +256,6 @@ private: // for the server never delete the file; when system startup, the pid in pid file // maybe valid but the process is not SRS, the init.d script will never start server. int pid_fd; - // All connections, connection manager - std::vector conns; // All listners, listener manager. std::vector listeners; // Signal manager which convert gignal to io message.