From f03bf601e42fece1ec650072e357e9d245a57f4d Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 21 May 2020 14:22:45 +0800 Subject: [PATCH] RTC: Directly send without sendmmsg --- trunk/src/app/srs_app_http_api.cpp | 11 +- trunk/src/app/srs_app_listener.cpp | 23 +- trunk/src/app/srs_app_listener.hpp | 24 +-- trunk/src/app/srs_app_rtc_conn.cpp | 52 +---- trunk/src/app/srs_app_rtc_conn.hpp | 5 - trunk/src/app/srs_app_rtc_server.cpp | 268 +----------------------- trunk/src/app/srs_app_rtc_server.hpp | 45 ---- trunk/src/app/srs_app_statistic.cpp | 27 --- trunk/src/app/srs_app_statistic.hpp | 6 - trunk/src/core/srs_core_performance.hpp | 3 - 10 files changed, 15 insertions(+), 449 deletions(-) diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 62ab98791..1a58a9aab 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1335,7 +1335,7 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* p->set("target", SrsJsonAny::str(target.c_str())); p->set("reset", SrsJsonAny::str(reset.c_str())); - p->set("help", SrsJsonAny::str("?target=avframes|rtc|rtp|writev_iovs|sendmmsg|bytes|dropped")); + p->set("help", SrsJsonAny::str("?target=avframes|rtc|rtp|writev_iovs|sendmmsg|bytes")); p->set("help2", SrsJsonAny::str("?reset=all")); } @@ -1398,15 +1398,6 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* } } - if (target.empty() || target == "dropped") { - SrsJsonObject* p = SrsJsonAny::object(); - data->set("dropped", p); - if ((err = stat->dumps_perf_dropped(p)) != srs_success) { - int code = srs_error_code(err); srs_error_reset(err); - return srs_api_response_code(w, r, code); - } - } - return srs_api_response(w, r, obj->dumps()); } diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 0fa26609d..3f190418d 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -236,21 +236,12 @@ srs_error_t SrsTcpListener::cycle() return err; } -ISrsUdpSender::ISrsUdpSender() -{ -} - -ISrsUdpSender::~ISrsUdpSender() -{ -} - -SrsUdpMuxSocket::SrsUdpMuxSocket(ISrsUdpSender* h, srs_netfd_t fd) +SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd) { nb_buf = SRS_UDP_MAX_PACKET_SIZE; buf = new char[nb_buf]; nread = 0; - handler = h; lfd = fd; fromlen = 0; @@ -354,7 +345,7 @@ std::string SrsUdpMuxSocket::peer_id() SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly() { - SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(handler, lfd); + SrsUdpMuxSocket* sendonly = new SrsUdpMuxSocket(lfd); // Don't copy buffer srs_freepa(sendonly->buf); @@ -369,15 +360,9 @@ SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly() return sendonly; } -ISrsUdpSender* SrsUdpMuxSocket::sender() -{ - return handler; -} - -SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, ISrsUdpSender* s, std::string i, int p) +SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p) { handler = h; - sender = s; ip = i; port = p; @@ -489,7 +474,7 @@ srs_error_t SrsUdpMuxListener::cycle() // Because we have to decrypt the cipher of received packet payload, // and the size is not determined, so we think there is at least one copy, // and we can reuse the plaintext h264/opus with players when got plaintext. - SrsUdpMuxSocket skt(sender, lfd); + SrsUdpMuxSocket skt(lfd); int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT); if (nread <= 0) { diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 30d2058ac..672ed01b8 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -131,27 +131,9 @@ public: virtual srs_error_t cycle(); }; -class ISrsUdpSender -{ -public: - ISrsUdpSender(); - virtual ~ISrsUdpSender(); -public: - // Fetch a mmsghdr from sender's cache. - virtual srs_error_t fetch(srs_mmsghdr** pphdr) = 0; - // Notify the sender to send out the msg. - virtual srs_error_t sendmmsg(srs_mmsghdr* hdr) = 0; - // Whether sender exceed the max queue, that is, overflow. - virtual bool overflow() = 0; - // Set the queue extra ratio, for example, when mw_msgs > 0, we need larger queue. - // For r, 100 means x1, 200 means x2. - virtual void set_extra_ratio(int r) = 0; -}; - class SrsUdpMuxSocket { private: - ISrsUdpSender* handler; char* buf; int nb_buf; int nread; @@ -161,7 +143,7 @@ private: std::string peer_ip; int peer_port; public: - SrsUdpMuxSocket(ISrsUdpSender* h, srs_netfd_t fd); + SrsUdpMuxSocket(srs_netfd_t fd); virtual ~SrsUdpMuxSocket(); public: int recvfrom(srs_utime_t timeout); @@ -175,14 +157,12 @@ public: int get_peer_port() const; std::string peer_id(); SrsUdpMuxSocket* copy_sendonly(); - ISrsUdpSender* sender(); }; class SrsUdpMuxListener : public ISrsCoroutineHandler { protected: srs_netfd_t lfd; - ISrsUdpSender* sender; SrsCoroutine* trd; protected: char* buf; @@ -192,7 +172,7 @@ protected: std::string ip; int port; public: - SrsUdpMuxListener(ISrsUdpMuxHandler* h, ISrsUdpSender* s, std::string i, int p); + SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p); virtual ~SrsUdpMuxListener(); public: virtual int fd(); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index fba949077..918b585d4 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -481,7 +481,6 @@ SrsRtcOutgoingInfo::SrsRtcOutgoingInfo() nn_videos = nn_samples = 0; nn_bytes = nn_rtp_bytes = 0; nn_padding_bytes = nn_paddings = 0; - nn_dropped = 0; } SrsRtcOutgoingInfo::~SrsRtcOutgoingInfo() @@ -627,14 +626,6 @@ srs_error_t SrsRtcPlayer::cycle() realtime = _srs_config->get_realtime_enabled(req->vhost, true); mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); - // We merged write more messages, so we need larger queue. - ISrsUdpSender* sender = session_->sendonly_skt->sender(); - if (mw_msgs > 2) { - sender->set_extra_ratio(150); - } else if (mw_msgs > 0) { - sender->set_extra_ratio(80); - } - srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_msgs=%d", req->get_stream_url().c_str(), ::getpid(), source->source_id(), session_->encrypt, realtime, mw_msgs); @@ -689,14 +680,12 @@ srs_error_t SrsRtcPlayer::cycle() stat->perf_on_rtp_packets(msg_count); // Stat the bytes and paddings. stat->perf_on_rtc_bytes(info.nn_bytes, info.nn_rtp_bytes, info.nn_padding_bytes); - // Stat the messages and dropped count. - stat->perf_on_dropped(msg_count, nn_rtc_packets, info.nn_dropped); pprint->elapse(); if (pprint->can_print()) { // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. - srs_trace("-> RTC PLAY %d/%d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes, %d pad, %d/%d cache", - msg_count, info.nn_dropped, msg_count, info.nn_rtp_pkts, info.nn_audios, info.nn_extras, info.nn_videos, info.nn_samples, info.nn_bytes, + srs_trace("-> RTC PLAY %d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes, %d pad, %d/%d cache", + msg_count, msg_count, info.nn_rtp_pkts, info.nn_audios, info.nn_extras, info.nn_videos, info.nn_samples, info.nn_bytes, info.nn_rtp_bytes, info.nn_padding_bytes, info.nn_paddings, msg_count, msg_count); } } @@ -728,17 +717,9 @@ srs_error_t SrsRtcPlayer::messages_to_packets(SrsRtcSource* source, const vector { srs_error_t err = srs_success; - ISrsUdpSender* sender = session_->sendonly_skt->sender(); - for (int i = 0; i < (int)pkts.size(); i++) { SrsRtpPacket2* pkt = pkts[i]; - // If overflow, drop all messages. - if (sender->overflow()) { - info.nn_dropped += (int)pkts.size() - i; - return err; - } - // Update stats. info.nn_bytes += pkt->nb_bytes(); @@ -778,25 +759,15 @@ srs_error_t SrsRtcPlayer::send_packets(const std::vector& pkts, // Cache the encrypt flag and sender. bool encrypt = session_->encrypt; - ISrsUdpSender* sender = session_->sendonly_skt->sender(); for (int i = 0; i < (int)pkts.size(); i++) { SrsRtpPacket2* pkt = pkts.at(i); - // Fetch a cached message from queue. - // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. - srs_mmsghdr* mhdr = NULL; - if ((err = sender->fetch(&mhdr)) != srs_success) { - return srs_error_wrap(err, "fetch msghdr"); - } - // For this message, select the first iovec. - iovec* iov = mhdr->msg_hdr.msg_iov; - mhdr->msg_hdr.msg_iovlen = 1; + iovec* iov = new iovec(); + SrsAutoFree(iovec, iov); - if (!iov->iov_base) { - iov->iov_base = new char[kRtpPacketSize]; - } + iov->iov_base = new char[kRtpPacketSize]; iov->iov_len = kRtpPacketSize; // Marshal packet to bytes in iovec. @@ -839,14 +810,6 @@ srs_error_t SrsRtcPlayer::send_packets(const std::vector& pkts, info.nn_rtp_bytes += (int)iov->iov_len; - // Set the address and control information. - sockaddr_in* addr = (sockaddr_in*)session_->sendonly_skt->peer_addr(); - socklen_t addrlen = (socklen_t)session_->sendonly_skt->peer_addrlen(); - - mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; - mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; - mhdr->msg_hdr.msg_controllen = 0; - // When we send out a packet, increase the stat counter. info.nn_rtp_pkts++; @@ -857,9 +820,8 @@ srs_error_t SrsRtcPlayer::send_packets(const std::vector& pkts, continue; } - if ((err = sender->sendmmsg(mhdr)) != srs_success) { - return srs_error_wrap(err, "send msghdr"); - } + // TODO: FIXME: Handle error. + session_->sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0); } return err; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index ba69ef6c7..3ad21c175 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -53,8 +53,6 @@ class SrsRtcSession; class SrsSharedPtrMessage; class SrsRtcSource; class SrsRtpPacket2; -class ISrsUdpSender; -class SrsRtpPacket2; class ISrsCodec; class SrsRtpNackForReceiver; class SrsRtpIncommingVideoFrame; @@ -181,8 +179,6 @@ public: int nn_videos; // The number of padded packet. int nn_paddings; - // The number of dropped messages. - int nn_dropped; public: SrsRtcOutgoingInfo(); virtual ~SrsRtcOutgoingInfo(); @@ -212,7 +208,6 @@ private: int nn_simulate_nack_drop; private: // For merged-write and GSO. - bool gso; int max_padding; // For merged-write messages. int mw_msgs; diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 89f004a07..3457a2698 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -124,258 +124,6 @@ static std::vector get_candidate_ips() return candidate_ips; } -SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s) -{ - lfd = NULL; - server = s; - - waiting_msgs = false; - cond = srs_cond_new(); - trd = new SrsDummyCoroutine(); - - cache_pos = 0; - max_sendmmsg = 0; - queue_length = 0; - extra_ratio = 0; - extra_queue = 0; - nn_senders = 0; - - _srs_config->subscribe(this); -} - -SrsUdpMuxSender::~SrsUdpMuxSender() -{ - _srs_config->unsubscribe(this); - - srs_freep(trd); - srs_cond_destroy(cond); - - free_mhdrs(hotspot); - hotspot.clear(); - - free_mhdrs(cache); - cache.clear(); -} - -srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd, int senders) -{ - srs_error_t err = srs_success; - - lfd = fd; - - srs_freep(trd); - trd = new SrsSTCoroutine("udp", this); - if ((err = trd->start()) != srs_success) { - return srs_error_wrap(err, "start coroutine"); - } - - max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); - queue_length = srs_max(128, _srs_config->get_rtc_server_queue_length()); - nn_senders = senders; - - srs_trace("RTC sender #%d init ok, max_sendmmsg=%d, queue_max=%dx%d, extra_ratio=%d/%d", srs_netfd_fileno(fd), - max_sendmmsg, queue_length, nn_senders, extra_ratio, extra_queue); - - return err; -} - -void SrsUdpMuxSender::free_mhdrs(std::vector& mhdrs) -{ - int nn_mhdrs = (int)mhdrs.size(); - for (int i = 0; i < nn_mhdrs; i++) { - // @see https://linux.die.net/man/2/sendmmsg - // @see https://linux.die.net/man/2/sendmsg - srs_mmsghdr* hdr = &mhdrs[i]; - - // Free control for GSO. - char* msg_control = (char*)hdr->msg_hdr.msg_control; - srs_freepa(msg_control); - - // Free iovec. - for (int j = SRS_PERF_RTC_GSO_MAX - 1; j >= 0 ; j--) { - iovec* iov = hdr->msg_hdr.msg_iov + j; - char* data = (char*)iov->iov_base; - srs_freepa(data); - srs_freepa(iov); - } - } - mhdrs.clear(); -} - -srs_error_t SrsUdpMuxSender::fetch(srs_mmsghdr** pphdr) -{ - // TODO: FIXME: Maybe need to shrink? - if (cache_pos >= (int)cache.size()) { - // @see https://linux.die.net/man/2/sendmmsg - // @see https://linux.die.net/man/2/sendmsg - srs_mmsghdr mhdr; - - mhdr.msg_len = 0; - mhdr.msg_hdr.msg_flags = 0; - mhdr.msg_hdr.msg_control = NULL; - - mhdr.msg_hdr.msg_iovlen = SRS_PERF_RTC_GSO_MAX; - mhdr.msg_hdr.msg_iov = new iovec[mhdr.msg_hdr.msg_iovlen]; - memset((void*)mhdr.msg_hdr.msg_iov, 0, sizeof(iovec) * mhdr.msg_hdr.msg_iovlen); - - for (int i = 0; i < SRS_PERF_RTC_GSO_IOVS; i++) { - iovec* p = mhdr.msg_hdr.msg_iov + i; - p->iov_base = new char[kRtpPacketSize]; - } - - cache.push_back(mhdr); - } - - *pphdr = &cache[cache_pos++]; - return srs_success; -} - -bool SrsUdpMuxSender::overflow() -{ - return cache_pos > queue_length + extra_queue; -} - -void SrsUdpMuxSender::set_extra_ratio(int r) -{ - // We use the larger extra ratio, because all vhosts shares the senders. - if (extra_ratio > r) { - return; - } - - extra_ratio = r; - extra_queue = queue_length * r / 100; - - srs_trace("RTC sender #%d extra queue, max_sendmmsg=%d, queue_max=%dx%d, extra_ratio=%d/%d, cache=%d/%d/%d", srs_netfd_fileno(lfd), - max_sendmmsg, queue_length, nn_senders, extra_ratio, extra_queue, cache_pos, (int)cache.size(), (int)hotspot.size()); -} - -srs_error_t SrsUdpMuxSender::sendmmsg(srs_mmsghdr* hdr) -{ - if (waiting_msgs) { - waiting_msgs = false; - srs_cond_signal(cond); - } - - return srs_success; -} - -srs_error_t SrsUdpMuxSender::cycle() -{ - srs_error_t err = srs_success; - - uint64_t nn_msgs = 0; uint64_t nn_msgs_last = 0; int nn_msgs_max = 0; - uint64_t nn_bytes = 0; int nn_bytes_max = 0; - int nn_loop = 0; int nn_wait = 0; - srs_utime_t time_last = srs_get_system_time(); - - bool stat_enabled = _srs_config->get_rtc_server_perf_stat(); - SrsStatistic* stat = SrsStatistic::instance(); - - SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_send(srs_netfd_fileno(lfd)); - SrsAutoFree(SrsPithyPrint, pprint); - - while (true) { - if ((err = trd->pull()) != srs_success) { - return err; - } - - nn_loop++; - - int pos = cache_pos; - if (pos <= 0) { - waiting_msgs = true; - nn_wait++; - srs_cond_wait(cond); - continue; - } - - // We are working on hotspot now. - cache.swap(hotspot); - cache_pos = 0; - - int nn_writen = 0; - if (pos > 0) { - // Send out all messages. - // @see https://linux.die.net/man/2/sendmmsg - // @see https://linux.die.net/man/2/sendmsg - srs_mmsghdr* p = &hotspot[0]; srs_mmsghdr* end = p + pos; - for (p = &hotspot[0]; p < end; p += max_sendmmsg) { - int vlen = (int)(end - p); - vlen = srs_min(max_sendmmsg, vlen); - - int r0 = srs_sendmmsg(lfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT); - if (r0 != vlen) { - srs_warn("sendmmsg %d msgs, %d done", vlen, r0); - } - - if (stat_enabled) { - stat->perf_on_sendmmsg_packets(vlen); - } - } - } - - if (!stat_enabled) { - continue; - } - - // Increase total messages. - nn_msgs += pos; - nn_msgs_max = srs_max(pos, nn_msgs_max); - nn_bytes += nn_writen; - nn_bytes_max = srs_max(nn_bytes_max, nn_writen); - - pprint->elapse(); - if (pprint->can_print()) { - // TODO: FIXME: Extract a PPS calculator. - int pps_average = 0; int pps_last = 0; - if (true) { - if (srs_get_system_time() > srs_get_system_startup_time()) { - pps_average = (int)(nn_msgs * SRS_UTIME_SECONDS / (srs_get_system_time() - srs_get_system_startup_time())); - } - if (srs_get_system_time() > time_last) { - pps_last = (int)((nn_msgs - nn_msgs_last) * SRS_UTIME_SECONDS / (srs_get_system_time() - time_last)); - } - } - - string pps_unit = ""; - if (pps_last > 10000 || pps_average > 10000) { - pps_unit = "(w)"; pps_last /= 10000; pps_average /= 10000; - } else if (pps_last > 1000 || pps_average > 1000) { - pps_unit = "(k)"; pps_last /= 1000; pps_average /= 1000; - } - - int nn_cache = 0; - int nn_hotspot_size = (int)hotspot.size(); - for (int i = 0; i < nn_hotspot_size; i++) { - srs_mmsghdr* hdr = &hotspot[i]; - nn_cache += hdr->msg_hdr.msg_iovlen; - } - - srs_trace("-> RTC SEND #%d, sessions %d, udp %d/%d/%" PRId64 ", pps %d/%d%s, cache %d/%d, bytes %d/%" PRId64, - srs_netfd_fileno(lfd), (int)server->nn_sessions(), pos, nn_msgs_max, nn_msgs, pps_average, pps_last, pps_unit.c_str(), - (int)hotspot.size(), nn_cache, nn_bytes_max, nn_bytes); - nn_msgs_last = nn_msgs; time_last = srs_get_system_time(); - nn_loop = nn_wait = nn_msgs_max = 0; - nn_bytes_max = 0; - } - } - - return err; -} - -srs_error_t SrsUdpMuxSender::on_reload_rtc_server() -{ - if (true) { - int v = _srs_config->get_rtc_server_sendmmsg(); - if (max_sendmmsg != v) { - srs_trace("Reload max_sendmmsg %d=>%d", max_sendmmsg, v); - max_sendmmsg = v; - } - } - - return srs_success; -} - ISrsRtcServerHandler::ISrsRtcServerHandler() { } @@ -402,14 +150,6 @@ SrsRtcServer::~SrsRtcServer() } } - if (true) { - vector::iterator it; - for (it = senders.begin(); it != senders.end(); ++it) { - SrsUdpMuxSender* sender = *it; - srs_freep(sender); - } - } - if (true) { std::vector::iterator it; for (it = zombies_.begin(); it != zombies_.end(); ++it) { @@ -459,21 +199,15 @@ srs_error_t SrsRtcServer::listen_udp() int nn_listeners = _srs_config->get_rtc_server_reuseport(); for (int i = 0; i < nn_listeners; i++) { - SrsUdpMuxSender* sender = new SrsUdpMuxSender(this); - SrsUdpMuxListener* listener = new SrsUdpMuxListener(this, sender, ip, port); + SrsUdpMuxListener* listener = new SrsUdpMuxListener(this, ip, port); if ((err = listener->listen()) != srs_success) { srs_freep(listener); return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); } - if ((err = sender->initialize(listener->stfd(), nn_listeners)) != srs_success) { - return srs_error_wrap(err, "init sender"); - } - srs_trace("rtc listen at udp://%s:%d, fd=%d", ip.c_str(), port, listener->fd()); listeners.push_back(listener); - senders.push_back(sender); } return err; diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 409755d84..164971710 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -40,50 +40,6 @@ class SrsRtcSession; class SrsRequest; class SrsSdp; -class SrsUdpMuxSender : virtual public ISrsUdpSender, virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler -{ -private: - srs_netfd_t lfd; - SrsRtcServer* server; - SrsCoroutine* trd; -private: - srs_cond_t cond; - bool waiting_msgs; - bool gso; - int nn_senders; -private: - // Hotspot msgs, we are working on it. - // @remark We will wait util all messages are ready. - std::vector hotspot; - // Cache msgs, for other coroutines to fill it. - std::vector cache; - int cache_pos; - // The max number of messages for sendmmsg. If 1, we use sendmsg to send. - int max_sendmmsg; - // The total queue length, for each sender. - int queue_length; - // The extra queue ratio. - int extra_ratio; - int extra_queue; -public: - SrsUdpMuxSender(SrsRtcServer* s); - virtual ~SrsUdpMuxSender(); -public: - virtual srs_error_t initialize(srs_netfd_t fd, int senders); -private: - void free_mhdrs(std::vector& mhdrs); -public: - virtual srs_error_t fetch(srs_mmsghdr** pphdr); - virtual srs_error_t sendmmsg(srs_mmsghdr* hdr); - virtual bool overflow(); - virtual void set_extra_ratio(int r); -public: - virtual srs_error_t cycle(); -// interface ISrsReloadHandler -public: - virtual srs_error_t on_reload_rtc_server(); -}; - class ISrsRtcServerHandler { public: @@ -99,7 +55,6 @@ class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGl private: SrsHourGlass* timer; std::vector listeners; - std::vector senders; ISrsRtcServerHandler* handler; private: std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index 30c54bbc9..35c220166 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -271,7 +271,6 @@ SrsStatistic::SrsStatistic() perf_rtp = new SrsStatisticCategory(); perf_rtc = new SrsStatisticCategory(); perf_bytes = new SrsStatisticCategory(); - perf_dropped = new SrsStatisticCategory(); } SrsStatistic::~SrsStatistic() @@ -312,7 +311,6 @@ SrsStatistic::~SrsStatistic() srs_freep(perf_rtp); srs_freep(perf_rtc); srs_freep(perf_bytes); - srs_freep(perf_dropped); } SrsStatistic* SrsStatistic::instance() @@ -666,29 +664,6 @@ srs_error_t SrsStatistic::dumps_perf_bytes(SrsJsonObject* obj) return srs_success; } -void SrsStatistic::perf_on_dropped(int nn_msgs, int nn_rtc, int nn_dropped) -{ - // a: System AVFrames. - // b: RTC frames. - // c: Dropped frames. - perf_dropped->a += nn_msgs; - perf_dropped->b += nn_rtc; - perf_dropped->c += nn_dropped; - - perf_dropped->nn += nn_dropped; -} - -srs_error_t SrsStatistic::dumps_perf_dropped(SrsJsonObject* obj) -{ - obj->set("avframes", SrsJsonAny::integer(perf_dropped->a)); - obj->set("rtc_frames", SrsJsonAny::integer(perf_dropped->b)); - obj->set("rtc_dropeed", SrsJsonAny::integer(perf_dropped->c)); - - obj->set("nn", SrsJsonAny::integer(perf_dropped->nn)); - - return srs_success; -} - void SrsStatistic::reset_perf() { srs_freep(perf_iovs); @@ -697,7 +672,6 @@ void SrsStatistic::reset_perf() srs_freep(perf_rtp); srs_freep(perf_rtc); srs_freep(perf_bytes); - srs_freep(perf_dropped); perf_iovs = new SrsStatisticCategory(); perf_msgs = new SrsStatisticCategory(); @@ -705,7 +679,6 @@ void SrsStatistic::reset_perf() perf_rtp = new SrsStatisticCategory(); perf_rtc = new SrsStatisticCategory(); perf_bytes = new SrsStatisticCategory(); - perf_dropped = new SrsStatisticCategory(); } void SrsStatistic::perf_on_packets(SrsStatisticCategory* p, int nb_msgs) diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index f6db1bd9f..2b2844a2e 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -171,11 +171,9 @@ private: SrsStatisticCategory* perf_iovs; SrsStatisticCategory* perf_msgs; SrsStatisticCategory* perf_sendmmsg; - SrsStatisticCategory* perf_gso; SrsStatisticCategory* perf_rtp; SrsStatisticCategory* perf_rtc; SrsStatisticCategory* perf_bytes; - SrsStatisticCategory* perf_dropped; private: SrsStatistic(); virtual ~SrsStatistic(); @@ -260,10 +258,6 @@ public: // Stat for bytes, nn_bytes is the size of bytes, nb_padding is padding bytes. virtual void perf_on_rtc_bytes(int nn_bytes, int nn_rtp_bytes, int nn_padding); virtual srs_error_t dumps_perf_bytes(SrsJsonObject* obj); -public: - // Stat for rtc messages, nn_rtc is rtc messages, nn_dropped is dropped messages. - virtual void perf_on_dropped(int nn_msgs, int nn_rtc, int nn_dropped); - virtual srs_error_t dumps_perf_dropped(SrsJsonObject* obj); public: // Reset all perf stat data. virtual void reset_perf(); diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 3a426c3c7..f57fb4023 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -209,9 +209,6 @@ #define SRS_PERF_RTC_GSO_IOVS 1 #endif -// For RTC, the max iovs in msghdr, the max packets sent in a msghdr. -#define SRS_PERF_RTC_GSO_MAX 64 - // For RTC, the max count of RTP packets we process in one loop. // TODO: FIXME: Remove it. #define SRS_PERF_RTC_RTP_PACKETS 1024