From 087559813fcdf1af1e9ad017466ea4270cc5832f Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 13 Apr 2020 09:13:12 +0800 Subject: [PATCH] For #307, use UDP sender binding to FD for RTC --- trunk/src/app/srs_app_listener.cpp | 21 +- trunk/src/app/srs_app_listener.hpp | 19 +- trunk/src/app/srs_app_rtc_conn.cpp | 364 ++++++++++++++++------------- trunk/src/app/srs_app_rtc_conn.hpp | 43 ++-- 4 files changed, 261 insertions(+), 186 deletions(-) diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 2508ae511..3187a3569 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -236,12 +236,21 @@ srs_error_t SrsTcpListener::cycle() return err; } -SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd) +ISrsUdpSender::ISrsUdpSender() +{ +} + +ISrsUdpSender::~ISrsUdpSender() +{ +} + +SrsUdpMuxSocket::SrsUdpMuxSocket(ISrsUdpSender* h, srs_netfd_t fd) { nb_buf = SRS_UDP_MAX_PACKET_SIZE; buf = new char[nb_buf]; nread = 0; + handler = h; lfd = fd; fromlen = 0; @@ -254,7 +263,7 @@ SrsUdpMuxSocket::~SrsUdpMuxSocket() SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly() { - SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(lfd); + SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(handler, lfd); // Don't copy buffer srs_freepa(sendonly->buf); @@ -340,16 +349,18 @@ std::string SrsUdpMuxSocket::get_peer_id() return string(id_buf, len); } -SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p) +SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, ISrsUdpSender* s, std::string i, int p) { handler = h; + sender = s; + ip = i; port = p; lfd = NULL; nb_buf = SRS_UDP_MAX_PACKET_SIZE; buf = new char[nb_buf]; - + trd = new SrsDummyCoroutine(); } @@ -449,7 +460,7 @@ srs_error_t SrsUdpMuxListener::cycle() nn_loop++; - SrsUdpMuxSocket skt(lfd); + SrsUdpMuxSocket skt(sender, lfd); int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT); if (nread <= 0) { diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 78db998bd..8ef384dd9 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -131,9 +131,22 @@ public: virtual srs_error_t cycle(); }; +class ISrsUdpSender +{ +public: + ISrsUdpSender(); + virtual ~ISrsUdpSender(); +public: + // Fetch a mmsghdr from sender's cache. + virtual srs_error_t fetch(mmsghdr** pphdr) = 0; + // Notify the sender to send out the msg. + virtual srs_error_t sendmmsg(mmsghdr* hdr) = 0; +}; + class SrsUdpMuxSocket { private: + ISrsUdpSender* handler; char* buf; int nb_buf; int nread; @@ -143,7 +156,7 @@ private: std::string peer_ip; int peer_port; public: - SrsUdpMuxSocket(srs_netfd_t fd); + SrsUdpMuxSocket(ISrsUdpSender* h, srs_netfd_t fd); virtual ~SrsUdpMuxSocket(); int recvfrom(srs_utime_t timeout); @@ -160,6 +173,7 @@ public: std::string get_peer_id(); public: SrsUdpMuxSocket* copy_sendonly(); + ISrsUdpSender* sender() { return handler; }; private: // Don't allow copy, user copy_sendonly instead SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs); @@ -170,6 +184,7 @@ class SrsUdpMuxListener : public ISrsCoroutineHandler { protected: srs_netfd_t lfd; + ISrsUdpSender* sender; SrsCoroutine* trd; protected: char* buf; @@ -179,7 +194,7 @@ protected: std::string ip; int port; public: - SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p); + SrsUdpMuxListener(ISrsUdpMuxHandler* h, ISrsUdpSender* s, std::string i, int p); virtual ~SrsUdpMuxListener(); public: virtual int fd(); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 3754ae927..c7c03cfd8 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -703,9 +703,14 @@ srs_error_t SrsRtcSenderThread::send_message2(SrsSharedPtrMessage* msg, bool is_ { srs_error_t err = srs_success; + ISrsUdpSender* sender = skt->sender(); + // Fetch a cached message from queue. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. - mmsghdr* mhdr = rtc_session->rtc_server->fetch(); + mmsghdr* mhdr = NULL; + if ((err = sender->fetch(&mhdr)) != srs_success) { + return srs_error_wrap(err, "fetch msghdr"); + } char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; // Length of iov, default size. @@ -731,7 +736,9 @@ srs_error_t SrsRtcSenderThread::send_message2(SrsSharedPtrMessage* msg, bool is_ mhdr->msg_hdr.msg_iov->iov_len = length; mhdr->msg_len = 0; - rtc_session->rtc_server->sendmmsg(skt->stfd(), mhdr); + if ((err = sender->sendmmsg(mhdr)) != srs_success) { + return srs_error_wrap(err, "send msghdr"); + } return err; } @@ -1330,9 +1337,10 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) return err; } -SrsRtcServer::SrsRtcServer() +SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s) { - timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); + lfd = NULL; + server = s; waiting_msgs = false; cond = srs_cond_new(); @@ -1343,18 +1351,10 @@ SrsRtcServer::SrsRtcServer() _srs_config->subscribe(this); } -SrsRtcServer::~SrsRtcServer() +SrsUdpMuxSender::~SrsUdpMuxSender() { _srs_config->unsubscribe(this); - vector::iterator it; - for (it = listeners.begin(); it != listeners.end(); ++it) { - SrsUdpMuxListener* listener = *it; - srs_freep(listener); - } - - srs_freep(timer); - srs_freep(trd); srs_cond_destroy(cond); @@ -1365,6 +1365,187 @@ SrsRtcServer::~SrsRtcServer() cache.clear(); } +srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd) +{ + srs_error_t err = srs_success; + + lfd = fd; + + srs_freep(trd); + trd = new SrsSTCoroutine("udp", this); + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "start coroutine"); + } + + max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); + srs_trace("UDP sender #%d init ok, max_sendmmsg=%d", srs_netfd_fileno(fd), max_sendmmsg); + + return err; +} + +void SrsUdpMuxSender::free_mhdrs(std::vector& mhdrs) +{ + for (int i = 0; i < (int)mhdrs.size(); i++) { + mmsghdr* hdr = &mhdrs[i]; + + for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) { + iovec* iov = hdr->msg_hdr.msg_iov + j; + char* data = (char*)iov->iov_base; + srs_freep(data); + srs_freep(iov); + } + } +} + +srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr) +{ + // TODO: FIXME: Maybe need to shrink? + if (cache_pos >= (int)cache.size()) { + mmsghdr mhdr; + memset(&mhdr, 0, sizeof(mmsghdr)); + + mhdr.msg_hdr.msg_iovlen = 1; + mhdr.msg_hdr.msg_iov = new iovec(); + mhdr.msg_hdr.msg_iov->iov_base = new char[kRtpPacketSize]; + mhdr.msg_hdr.msg_iov->iov_len = kRtpPacketSize; + mhdr.msg_len = 0; + + cache.push_back(mhdr); + } + + *pphdr = &cache[cache_pos++]; + return srs_success; +} + +srs_error_t SrsUdpMuxSender::sendmmsg(mmsghdr* hdr) +{ + if (waiting_msgs) { + waiting_msgs = false; + srs_cond_signal(cond); + } + + return srs_success; +} + +srs_error_t SrsUdpMuxSender::cycle() +{ + srs_error_t err = srs_success; + + uint64_t nn_msgs = 0; + uint64_t nn_msgs_last = 0; + int nn_msgs_max = 0; + int nn_loop = 0; + int nn_wait = 0; + srs_utime_t time_last = srs_get_system_time(); + SrsStatistic* stat = SrsStatistic::instance(); + + SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_send(); + SrsAutoFree(SrsPithyPrint, pprint); + + while (true) { + if ((err = trd->pull()) != srs_success) { + return err; + } + + nn_loop++; + + int pos = cache_pos; + if (pos <= 0) { + waiting_msgs = true; + nn_wait++; + srs_cond_wait(cond); + continue; + } + + // We are working on hotspot now. + cache.swap(hotspot); + cache_pos = 0; + + mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; + for (; p < end; p += max_sendmmsg) { + int vlen = (int)(end - p); + vlen = srs_min(max_sendmmsg, vlen); + + int r0 = srs_sendmmsg(lfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT); + if (r0 != vlen) { + srs_warn("sendmsg %d msgs, %d done", vlen, r0); + } + + stat->perf_mw_on_packets(vlen); + } + + // Increase total messages. + nn_msgs += pos; + nn_msgs_max = srs_max(pos, nn_msgs_max); + + pprint->elapse(); + if (pprint->can_print()) { + // TODO: FIXME: Extract a PPS calculator. + int pps_average = 0; int pps_last = 0; + if (true) { + if (srs_get_system_time() > srs_get_system_startup_time()) { + pps_average = (int)(nn_msgs * SRS_UTIME_SECONDS / (srs_get_system_time() - srs_get_system_startup_time())); + } + if (srs_get_system_time() > time_last) { + pps_last = (int)((nn_msgs - nn_msgs_last) * SRS_UTIME_SECONDS / (srs_get_system_time() - time_last)); + } + } + + string pps_unit = ""; + if (pps_last > 10000 || pps_average > 10000) { + pps_unit = "(w)"; pps_last /= 10000; pps_average /= 10000; + } else if (pps_last > 1000 || pps_average > 1000) { + pps_unit = "(k)"; pps_last /= 1000; pps_average /= 1000; + } + + srs_trace("-> RTC #%d SEND %d/%d/%" PRId64 ", pps %d/%d%s, schedule %d/%d, sessions %d, cache %d/%d by sendmmsg %d", + srs_netfd_fileno(lfd), pos, nn_msgs_max, nn_msgs, pps_average, pps_last, pps_unit.c_str(), nn_loop, nn_wait, + (int)server->nn_sessions(), (int)cache.size(), (int)hotspot.size(), max_sendmmsg); + nn_msgs_last = nn_msgs; time_last = srs_get_system_time(); + nn_loop = nn_wait = nn_msgs_max = 0; + } + } + + return err; +} + +srs_error_t SrsUdpMuxSender::on_reload_rtc_server() +{ + int v = _srs_config->get_rtc_server_sendmmsg(); + if (max_sendmmsg != v) { + max_sendmmsg = v; + srs_trace("Reload max_sendmmsg=%d", max_sendmmsg); + } + + return srs_success; +} + +SrsRtcServer::SrsRtcServer() +{ + timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); +} + +SrsRtcServer::~SrsRtcServer() +{ + srs_freep(timer); + + if (true) { + vector::iterator it; + for (it = listeners.begin(); it != listeners.end(); ++it) { + SrsUdpMuxListener* listener = *it; + srs_freep(listener); + } + } + + if (true) { + vector::iterator it; + for (it = senders.begin(); it != senders.end(); ++it) { + SrsUdpMuxSender* sender = *it; + srs_freep(sender); + } + } +} + srs_error_t SrsRtcServer::initialize() { srs_error_t err = srs_success; @@ -1377,14 +1558,7 @@ srs_error_t SrsRtcServer::initialize() return srs_error_wrap(err, "start timer"); } - srs_freep(trd); - trd = new SrsSTCoroutine("udp", this); - if ((err = trd->start()) != srs_success) { - return srs_error_wrap(err, "start coroutine"); - } - - max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); - srs_trace("RTC server init ok, max_sendmmsg=%d", max_sendmmsg); + srs_trace("RTC server init ok"); return err; } @@ -1407,18 +1581,21 @@ srs_error_t SrsRtcServer::listen_udp() int nn_listeners = _srs_config->get_rtc_server_reuseport(); for (int i = 0; i < nn_listeners; i++) { - SrsUdpMuxListener* listener = new SrsUdpMuxListener(this, ip, port); + SrsUdpMuxSender* sender = new SrsUdpMuxSender(this); + SrsUdpMuxListener* listener = new SrsUdpMuxListener(this, sender, ip, port); if ((err = listener->listen()) != srs_success) { srs_freep(listener); return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); } - // We will use all FDs to sendmmsg. - stfds.push_back(listener->stfd()); + if ((err = sender->initialize(listener->stfd())) != srs_success) { + return srs_error_wrap(err, "init sender"); + } srs_trace("rtc listen at udp://%s:%d, fd=%d", ip.c_str(), port, listener->fd()); listeners.push_back(listener); + senders.push_back(sender); } return err; @@ -1612,145 +1789,6 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic return srs_success; } -srs_error_t SrsRtcServer::on_reload_rtc_server() -{ - int v = _srs_config->get_rtc_server_sendmmsg(); - if (max_sendmmsg != v) { - max_sendmmsg = v; - srs_trace("Reload max_sendmmsg=%d", max_sendmmsg); - } - - return srs_success; -} - -mmsghdr* SrsRtcServer::fetch() -{ - // TODO: FIXME: Maybe need to shrink? - if (cache_pos >= (int)cache.size()) { - mmsghdr mhdr; - memset(&mhdr, 0, sizeof(mmsghdr)); - - mhdr.msg_hdr.msg_iovlen = 1; - mhdr.msg_hdr.msg_iov = new iovec(); - mhdr.msg_hdr.msg_iov->iov_base = new char[kRtpPacketSize]; - mhdr.msg_hdr.msg_iov->iov_len = kRtpPacketSize; - mhdr.msg_len = 0; - - cache.push_back(mhdr); - } - - return &cache[cache_pos++]; -} - -void SrsRtcServer::sendmmsg(srs_netfd_t stfd, mmsghdr* /*hdr*/) -{ - if (waiting_msgs) { - waiting_msgs = false; - srs_cond_signal(cond); - } -} - -void SrsRtcServer::free_mhdrs(std::vector& mhdrs) -{ - for (int i = 0; i < (int)mhdrs.size(); i++) { - mmsghdr* hdr = &mhdrs[i]; - - for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) { - iovec* iov = hdr->msg_hdr.msg_iov + j; - char* data = (char*)iov->iov_base; - srs_freep(data); - srs_freep(iov); - } - } -} - -srs_error_t SrsRtcServer::cycle() -{ - srs_error_t err = srs_success; - - uint64_t nn_msgs = 0; - uint64_t nn_msgs_last = 0; - int nn_msgs_max = 0; - int nn_loop = 0; - int nn_wait = 0; - srs_utime_t time_last = srs_get_system_time(); - SrsStatistic* stat = SrsStatistic::instance(); - - // We use FDs to send out messages, by round-trip algorithm. - uint32_t fd_index = 0; - - SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_send(); - SrsAutoFree(SrsPithyPrint, pprint); - - while (true) { - if ((err = trd->pull()) != srs_success) { - return err; - } - - nn_loop++; - - int pos = cache_pos; - if (pos <= 0) { - waiting_msgs = true; - nn_wait++; - srs_cond_wait(cond); - continue; - } - - // We are working on hotspot now. - cache.swap(hotspot); - cache_pos = 0; - - srs_netfd_t stfd = NULL; - mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; - for (; p < end; p += max_sendmmsg) { - int vlen = (int)(end - p); - vlen = srs_min(max_sendmmsg, vlen); - stfd = stfds.at((fd_index++) % stfds.size()); - - int r0 = srs_sendmmsg(stfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT); - if (r0 != vlen) { - srs_warn("sendmsg %d msgs, %d done", vlen, r0); - } - - stat->perf_mw_on_packets(vlen); - } - - // Increase total messages. - nn_msgs += pos; - nn_msgs_max = srs_max(pos, nn_msgs_max); - - pprint->elapse(); - if (pprint->can_print()) { - // TODO: FIXME: Extract a PPS calculator. - int pps_average = 0; int pps_last = 0; - if (true) { - if (srs_get_system_time() > srs_get_system_startup_time()) { - pps_average = (int)(nn_msgs * SRS_UTIME_SECONDS / (srs_get_system_time() - srs_get_system_startup_time())); - } - if (srs_get_system_time() > time_last) { - pps_last = (int)((nn_msgs - nn_msgs_last) * SRS_UTIME_SECONDS / (srs_get_system_time() - time_last)); - } - } - - string pps_unit = ""; - if (pps_last > 10000 || pps_average > 10000) { - pps_unit = "(w)"; pps_last /= 10000; pps_average /= 10000; - } else if (pps_last > 1000 || pps_average > 1000) { - pps_unit = "(k)"; pps_last /= 1000; pps_average /= 1000; - } - - srs_trace("-> RTC #%d SEND %d/%d/%" PRId64 ", pps %d/%d%s, schedule %d/%d, sessions %d, cache %d/%d by sendmmsg %d", - srs_netfd_fileno(stfd), pos, nn_msgs_max, nn_msgs, pps_average, pps_last, pps_unit.c_str(), nn_loop, nn_wait, - (int)map_username_session.size(), (int)cache.size(), (int)hotspot.size(), max_sendmmsg); - nn_msgs_last = nn_msgs; time_last = srs_get_system_time(); - nn_loop = nn_wait = nn_msgs_max = 0; - } - } - - return err; -} - RtcServerAdapter::RtcServerAdapter() { rtc = new SrsRtcServer(); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 9eb518018..1ecfd5998 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -229,18 +229,15 @@ private: srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt); }; -class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass, - virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler +class SrsUdpMuxSender : virtual public ISrsUdpSender, virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler { private: - std::vector listeners; - SrsHourGlass* timer; -private: + srs_netfd_t lfd; + SrsRtcServer* server; SrsCoroutine* trd; +private: srs_cond_t cond; bool waiting_msgs; - // TODO: FIXME: Support multiple stfd. - std::vector stfds; // Hotspot msgs, we are working on it. // @remark We will wait util all messages are ready. std::vector hotspot; @@ -249,6 +246,28 @@ private: int cache_pos; // The max number of messages for sendmmsg. If 1, we use sendmsg to send. int max_sendmmsg; +public: + SrsUdpMuxSender(SrsRtcServer* s); + virtual ~SrsUdpMuxSender(); +public: + virtual srs_error_t initialize(srs_netfd_t fd); +private: + void free_mhdrs(std::vector& mhdrs); +public: + virtual srs_error_t fetch(mmsghdr** pphdr); + virtual srs_error_t sendmmsg(mmsghdr* hdr); + virtual srs_error_t cycle(); +// interface ISrsReloadHandler +public: + virtual srs_error_t on_reload_rtc_server(); +}; + +class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass +{ +private: + SrsHourGlass* timer; + std::vector listeners; + std::vector senders; private: std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) std::map map_id_session; // key: peerip(ip + ":" + port) @@ -267,6 +286,7 @@ public: SrsRtcSession* create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip); bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session); void check_and_clean_timeout_session(); + int nn_sessions() { return (int)map_username_session.size(); } private: srs_error_t on_stun(SrsUdpMuxSocket* skt); srs_error_t on_dtls(SrsUdpMuxSocket* skt); @@ -277,15 +297,6 @@ private: // interface ISrsHourGlass public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); -// interface ISrsReloadHandler -public: - virtual srs_error_t on_reload_rtc_server(); -// Internal only. -public: - mmsghdr* fetch(); - void sendmmsg(srs_netfd_t stfd, mmsghdr* hdr); - void free_mhdrs(std::vector& mhdrs); - virtual srs_error_t cycle(); }; // The RTC server adapter.