diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index d096e5910..cc20cc756 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -143,6 +143,9 @@ public: virtual srs_error_t sendmmsg(mmsghdr* hdr) = 0; // Whether sender exceed the max queue, that is, overflow. virtual bool overflow() = 0; + // Set the queue extra ratio, for example, when mw_msgs > 0, we need larger queue. + // For r, 100 means x1, 200 means x2. + virtual void set_extra_ratio(int r) = 0; }; class SrsUdpMuxSocket diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 890540c1c..9414741ab 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -674,26 +674,32 @@ srs_error_t SrsRtcSenderThread::cycle() srs_error_t err = srs_success; SrsSource* source = NULL; + SrsRequest* req = &rtc_session->request; // TODO: FIXME: Should refactor it, directly use http server as handler. ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(&rtc_session->request, handler, &source)) != srs_success) { + if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { return srs_error_wrap(err, "rtc fetch source failed"); } SrsConsumer* consumer = NULL; SrsAutoFree(SrsConsumer, consumer); if ((err = source->create_consumer(NULL, consumer)) != srs_success) { - return srs_error_wrap(err, "rtc create consumer, source url=%s", rtc_session->request.get_stream_url().c_str()); + return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str()); } - // TODO: FIXME: Support reload. - SrsRequest* req = &rtc_session->request; realtime = _srs_config->get_realtime_enabled(req->vhost, true); mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); - srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", rtc_session->request.get_stream_url().c_str(), + // We merged write more messages, so we need larger queue. + if (mw_msgs > 2) { + sender->set_extra_ratio(150); + } else if (mw_msgs > 0) { + sender->set_extra_ratio(80); + } + + srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", req->get_stream_url().c_str(), ::getpid(), source->source_id(), rtc_session->encrypt, realtime, srsu2msi(mw_sleep), mw_msgs); SrsRtcPackets pkts; @@ -1846,6 +1852,10 @@ SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s) cache_pos = 0; max_sendmmsg = 0; queue_length = 0; + extra_ratio = 0; + extra_queue = 0; + gso = false; + nn_senders = 0; _srs_config->subscribe(this); } @@ -1877,16 +1887,17 @@ srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd, int senders) } max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); - bool gso = _srs_config->get_rtc_server_gso(); + gso = _srs_config->get_rtc_server_gso(); queue_length = srs_max(128, _srs_config->get_rtc_server_queue_length()); + nn_senders = senders; // For no GSO, we need larger queue. if (!gso) { queue_length *= 2; } - srs_trace("UDP sender #%d init ok, max_sendmmsg=%d, gso=%d, queue_max=%dx%d", srs_netfd_fileno(fd), - max_sendmmsg, gso, queue_length, senders); + srs_trace("RTC sender #%d init ok, max_sendmmsg=%d, gso=%d, queue_max=%dx%d, extra_ratio=%d/%d", srs_netfd_fileno(fd), + max_sendmmsg, gso, queue_length, nn_senders, extra_ratio, extra_queue); return err; } @@ -1944,7 +1955,21 @@ srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr) bool SrsUdpMuxSender::overflow() { - return cache_pos > queue_length; + return cache_pos > queue_length + extra_queue; +} + +void SrsUdpMuxSender::set_extra_ratio(int r) +{ + // We use the larger extra ratio, because all vhosts shares the senders. + if (extra_ratio > r) { + return; + } + + extra_ratio = r; + extra_queue = queue_length * r / 100; + + srs_trace("RTC sender #%d extra queue, max_sendmmsg=%d, gso=%d, queue_max=%dx%d, extra_ratio=%d/%d, cache=%d/%d/%d", srs_netfd_fileno(lfd), + max_sendmmsg, gso, queue_length, nn_senders, extra_ratio, extra_queue, cache_pos, (int)cache.size(), (int)hotspot.size()); } srs_error_t SrsUdpMuxSender::sendmmsg(mmsghdr* hdr) diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 4cf19d727..3ad6a1072 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -307,6 +307,9 @@ private: private: srs_cond_t cond; bool waiting_msgs; + bool gso; + int nn_senders; +private: // Hotspot msgs, we are working on it. // @remark We will wait util all messages are ready. std::vector hotspot; @@ -317,6 +320,9 @@ private: int max_sendmmsg; // The total queue length, for each sender. int queue_length; + // The extra queue ratio. + int extra_ratio; + int extra_queue; public: SrsUdpMuxSender(SrsRtcServer* s); virtual ~SrsUdpMuxSender(); @@ -328,6 +334,8 @@ public: virtual srs_error_t fetch(mmsghdr** pphdr); virtual srs_error_t sendmmsg(mmsghdr* hdr); virtual bool overflow(); + virtual void set_extra_ratio(int r); +public: virtual srs_error_t cycle(); // interface ISrsReloadHandler public: