mirror of
https://github.com/ossrs/srs.git
synced 2025-02-15 04:42:04 +00:00
Use extra queue when set mw_msgs
This commit is contained in:
parent
08312ddc42
commit
c1464f5aee
3 changed files with 45 additions and 9 deletions
|
@ -143,6 +143,9 @@ public:
|
||||||
virtual srs_error_t sendmmsg(mmsghdr* hdr) = 0;
|
virtual srs_error_t sendmmsg(mmsghdr* hdr) = 0;
|
||||||
// Whether sender exceed the max queue, that is, overflow.
|
// Whether sender exceed the max queue, that is, overflow.
|
||||||
virtual bool overflow() = 0;
|
virtual bool overflow() = 0;
|
||||||
|
// Set the queue extra ratio, for example, when mw_msgs > 0, we need larger queue.
|
||||||
|
// For r, 100 means x1, 200 means x2.
|
||||||
|
virtual void set_extra_ratio(int r) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
class SrsUdpMuxSocket
|
class SrsUdpMuxSocket
|
||||||
|
|
|
@ -674,26 +674,32 @@ srs_error_t SrsRtcSenderThread::cycle()
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
SrsSource* source = NULL;
|
SrsSource* source = NULL;
|
||||||
|
SrsRequest* req = &rtc_session->request;
|
||||||
|
|
||||||
// TODO: FIXME: Should refactor it, directly use http server as handler.
|
// TODO: FIXME: Should refactor it, directly use http server as handler.
|
||||||
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
|
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
|
||||||
if ((err = _srs_sources->fetch_or_create(&rtc_session->request, handler, &source)) != srs_success) {
|
if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) {
|
||||||
return srs_error_wrap(err, "rtc fetch source failed");
|
return srs_error_wrap(err, "rtc fetch source failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsConsumer* consumer = NULL;
|
SrsConsumer* consumer = NULL;
|
||||||
SrsAutoFree(SrsConsumer, consumer);
|
SrsAutoFree(SrsConsumer, consumer);
|
||||||
if ((err = source->create_consumer(NULL, consumer)) != srs_success) {
|
if ((err = source->create_consumer(NULL, consumer)) != srs_success) {
|
||||||
return srs_error_wrap(err, "rtc create consumer, source url=%s", rtc_session->request.get_stream_url().c_str());
|
return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: FIXME: Support reload.
|
|
||||||
SrsRequest* req = &rtc_session->request;
|
|
||||||
realtime = _srs_config->get_realtime_enabled(req->vhost, true);
|
realtime = _srs_config->get_realtime_enabled(req->vhost, true);
|
||||||
mw_sleep = _srs_config->get_mw_sleep(req->vhost, true);
|
mw_sleep = _srs_config->get_mw_sleep(req->vhost, true);
|
||||||
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true);
|
mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true);
|
||||||
|
|
||||||
srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", rtc_session->request.get_stream_url().c_str(),
|
// We merged write more messages, so we need larger queue.
|
||||||
|
if (mw_msgs > 2) {
|
||||||
|
sender->set_extra_ratio(150);
|
||||||
|
} else if (mw_msgs > 0) {
|
||||||
|
sender->set_extra_ratio(80);
|
||||||
|
}
|
||||||
|
|
||||||
|
srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", req->get_stream_url().c_str(),
|
||||||
::getpid(), source->source_id(), rtc_session->encrypt, realtime, srsu2msi(mw_sleep), mw_msgs);
|
::getpid(), source->source_id(), rtc_session->encrypt, realtime, srsu2msi(mw_sleep), mw_msgs);
|
||||||
|
|
||||||
SrsRtcPackets pkts;
|
SrsRtcPackets pkts;
|
||||||
|
@ -1846,6 +1852,10 @@ SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s)
|
||||||
cache_pos = 0;
|
cache_pos = 0;
|
||||||
max_sendmmsg = 0;
|
max_sendmmsg = 0;
|
||||||
queue_length = 0;
|
queue_length = 0;
|
||||||
|
extra_ratio = 0;
|
||||||
|
extra_queue = 0;
|
||||||
|
gso = false;
|
||||||
|
nn_senders = 0;
|
||||||
|
|
||||||
_srs_config->subscribe(this);
|
_srs_config->subscribe(this);
|
||||||
}
|
}
|
||||||
|
@ -1877,16 +1887,17 @@ srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd, int senders)
|
||||||
}
|
}
|
||||||
|
|
||||||
max_sendmmsg = _srs_config->get_rtc_server_sendmmsg();
|
max_sendmmsg = _srs_config->get_rtc_server_sendmmsg();
|
||||||
bool gso = _srs_config->get_rtc_server_gso();
|
gso = _srs_config->get_rtc_server_gso();
|
||||||
queue_length = srs_max(128, _srs_config->get_rtc_server_queue_length());
|
queue_length = srs_max(128, _srs_config->get_rtc_server_queue_length());
|
||||||
|
nn_senders = senders;
|
||||||
|
|
||||||
// For no GSO, we need larger queue.
|
// For no GSO, we need larger queue.
|
||||||
if (!gso) {
|
if (!gso) {
|
||||||
queue_length *= 2;
|
queue_length *= 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_trace("UDP sender #%d init ok, max_sendmmsg=%d, gso=%d, queue_max=%dx%d", srs_netfd_fileno(fd),
|
srs_trace("RTC sender #%d init ok, max_sendmmsg=%d, gso=%d, queue_max=%dx%d, extra_ratio=%d/%d", srs_netfd_fileno(fd),
|
||||||
max_sendmmsg, gso, queue_length, senders);
|
max_sendmmsg, gso, queue_length, nn_senders, extra_ratio, extra_queue);
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
@ -1944,7 +1955,21 @@ srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr)
|
||||||
|
|
||||||
bool SrsUdpMuxSender::overflow()
|
bool SrsUdpMuxSender::overflow()
|
||||||
{
|
{
|
||||||
return cache_pos > queue_length;
|
return cache_pos > queue_length + extra_queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SrsUdpMuxSender::set_extra_ratio(int r)
|
||||||
|
{
|
||||||
|
// We use the larger extra ratio, because all vhosts shares the senders.
|
||||||
|
if (extra_ratio > r) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
extra_ratio = r;
|
||||||
|
extra_queue = queue_length * r / 100;
|
||||||
|
|
||||||
|
srs_trace("RTC sender #%d extra queue, max_sendmmsg=%d, gso=%d, queue_max=%dx%d, extra_ratio=%d/%d, cache=%d/%d/%d", srs_netfd_fileno(lfd),
|
||||||
|
max_sendmmsg, gso, queue_length, nn_senders, extra_ratio, extra_queue, cache_pos, (int)cache.size(), (int)hotspot.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsUdpMuxSender::sendmmsg(mmsghdr* hdr)
|
srs_error_t SrsUdpMuxSender::sendmmsg(mmsghdr* hdr)
|
||||||
|
|
|
@ -307,6 +307,9 @@ private:
|
||||||
private:
|
private:
|
||||||
srs_cond_t cond;
|
srs_cond_t cond;
|
||||||
bool waiting_msgs;
|
bool waiting_msgs;
|
||||||
|
bool gso;
|
||||||
|
int nn_senders;
|
||||||
|
private:
|
||||||
// Hotspot msgs, we are working on it.
|
// Hotspot msgs, we are working on it.
|
||||||
// @remark We will wait util all messages are ready.
|
// @remark We will wait util all messages are ready.
|
||||||
std::vector<mmsghdr> hotspot;
|
std::vector<mmsghdr> hotspot;
|
||||||
|
@ -317,6 +320,9 @@ private:
|
||||||
int max_sendmmsg;
|
int max_sendmmsg;
|
||||||
// The total queue length, for each sender.
|
// The total queue length, for each sender.
|
||||||
int queue_length;
|
int queue_length;
|
||||||
|
// The extra queue ratio.
|
||||||
|
int extra_ratio;
|
||||||
|
int extra_queue;
|
||||||
public:
|
public:
|
||||||
SrsUdpMuxSender(SrsRtcServer* s);
|
SrsUdpMuxSender(SrsRtcServer* s);
|
||||||
virtual ~SrsUdpMuxSender();
|
virtual ~SrsUdpMuxSender();
|
||||||
|
@ -328,6 +334,8 @@ public:
|
||||||
virtual srs_error_t fetch(mmsghdr** pphdr);
|
virtual srs_error_t fetch(mmsghdr** pphdr);
|
||||||
virtual srs_error_t sendmmsg(mmsghdr* hdr);
|
virtual srs_error_t sendmmsg(mmsghdr* hdr);
|
||||||
virtual bool overflow();
|
virtual bool overflow();
|
||||||
|
virtual void set_extra_ratio(int r);
|
||||||
|
public:
|
||||||
virtual srs_error_t cycle();
|
virtual srs_error_t cycle();
|
||||||
// interface ISrsReloadHandler
|
// interface ISrsReloadHandler
|
||||||
public:
|
public:
|
||||||
|
|
Loading…
Reference in a new issue