1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

fix #257, support 0.1s+ latency. 2.0.70

This commit is contained in:
winlin 2014-12-12 21:51:06 +08:00
parent 68ade0a267
commit 10297fab51
19 changed files with 179 additions and 30 deletions

View file

@ -852,6 +852,17 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
}
srs_trace("vhost %s reload mw success.", vhost.c_str());
}
// min_latency, only one per vhost
if (!srs_directive_equals(new_vhost->get("min_latency"), old_vhost->get("min_latency"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
ISrsReloadHandler* subscribe = *it;
if ((ret = subscribe->on_reload_vhost_realtime(vhost)) != ERROR_SUCCESS) {
srs_error("vhost %s notify subscribes min_latency failed. ret=%d", vhost.c_str(), ret);
return ret;
}
}
srs_trace("vhost %s reload min_latency success.", vhost.c_str());
}
// http, only one per vhost.
if (!srs_directive_equals(new_vhost->get("http"), old_vhost->get("http"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
@ -1350,7 +1361,7 @@ int SrsConfig::check_config()
&& n != "time_jitter"
&& n != "atc" && n != "atc_auto"
&& n != "debug_srs_upnode"
&& n != "mr" && n != "mw_latency"
&& n != "mr" && n != "mw_latency" && n != "min_latency"
) {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported vhost directive %s, ret=%d", n.c_str(), ret);
@ -2125,7 +2136,6 @@ int SrsConfig::get_chunk_size(string vhost)
bool SrsConfig::get_mr_enabled(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
@ -2147,7 +2157,6 @@ bool SrsConfig::get_mr_enabled(string vhost)
int SrsConfig::get_mr_sleep_ms(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
@ -2169,7 +2178,6 @@ int SrsConfig::get_mr_sleep_ms(string vhost)
int SrsConfig::get_mw_sleep_ms(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
@ -2184,6 +2192,22 @@ int SrsConfig::get_mw_sleep_ms(string vhost)
return ::atoi(conf->arg0().c_str());
}
bool SrsConfig::get_realtime_enabled(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return SRS_PERF_MIN_LATENCY_ENABLED;
}
conf = conf->get("min_latency");
if (!conf || conf->arg0() != "off") {
return SRS_PERF_MIN_LATENCY_ENABLED;
}
return false;
}
int SrsConfig::get_global_chunk_size()
{
SrsConfDirective* conf = root->get("chunk_size");

View file

@ -545,6 +545,12 @@ public:
*/
// TODO: FIXME: add utest for mw config.
virtual int get_mw_sleep_ms(std::string vhost);
/**
* whether min latency mode enabled.
* @param vhost, the vhost to get the min_latency.
*/
// TODO: FIXME: add utest for min_latency.
virtual bool get_realtime_enabled(std::string vhost);
private:
/**
* get the global chunk size.

View file

@ -251,6 +251,8 @@ SrsPublishRecvThread::SrsPublishRecvThread(
mr = _srs_config->get_mr_enabled(req->vhost);
mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
realtime = _srs_config->get_realtime_enabled(req->vhost);
_srs_config->subscribe(this);
}
@ -340,6 +342,10 @@ int SrsPublishRecvThread::handle(SrsCommonMessage* msg)
int ret = ERROR_SUCCESS;
_nb_msgs++;
// log to show the time of recv thread.
srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d",
srs_update_system_time_ms(), msg->header.timestamp, msg->size);
// the rtmp connection will handle this message
ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);
@ -363,7 +369,7 @@ void SrsPublishRecvThread::on_recv_error(int ret)
#ifdef SRS_PERF_MERGED_READ
void SrsPublishRecvThread::on_read(ssize_t nread)
{
if (!mr) {
if (!mr || realtime) {
return;
}
@ -386,6 +392,10 @@ void SrsPublishRecvThread::on_read(ssize_t nread)
int SrsPublishRecvThread::on_reload_vhost_mr(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
// the mr settings,
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
@ -419,6 +429,21 @@ int SrsPublishRecvThread::on_reload_vhost_mr(string vhost)
return ret;
}
int SrsPublishRecvThread::on_reload_vhost_realtime(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
bool realtime_enabled = _srs_config->get_realtime_enabled(req->vhost);
srs_trace("realtime changed %d=>%d", realtime, realtime_enabled);
realtime = realtime_enabled;
return ret;
}
void SrsPublishRecvThread::set_socket_buffer(int sleep_ms)
{
// the bytes:
@ -446,8 +471,9 @@ void SrsPublishRecvThread::set_socket_buffer(int sleep_ms)
}
getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size);
srs_trace("mr change sleep %d=>%d, erbuf=%d, rbuf %d=>%d, sbytes=%d",
mr_sleep, sleep_ms, socket_buffer_size, onb_rbuf, nb_rbuf, SRS_MR_SMALL_BYTES);
srs_trace("mr change sleep %d=>%d, erbuf=%d, rbuf %d=>%d, sbytes=%d, realtime=%d",
mr_sleep, sleep_ms, socket_buffer_size, onb_rbuf, nb_rbuf,
SRS_MR_SMALL_BYTES, realtime);
rtmp->set_recv_buffer(nb_rbuf);
}

View file

@ -153,6 +153,9 @@ private:
bool mr;
int mr_fd;
int mr_sleep;
// for realtime
// @see https://github.com/winlinvip/simple-rtmp-server/issues/257
bool realtime;
// the recv thread error code.
int recv_error_code;
SrsRtmpConn* _conn;
@ -193,6 +196,7 @@ public:
// interface ISrsReloadHandler
public:
virtual int on_reload_vhost_mr(std::string vhost);
virtual int on_reload_vhost_realtime(std::string vhost);
private:
virtual void set_socket_buffer(int sleep_ms);
};

View file

@ -150,6 +150,11 @@ int ISrsReloadHandler::on_reload_vhost_mw(string /*vhost*/)
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_vhost_realtime(string /*vhost*/)
{
return ERROR_SUCCESS;
}
int ISrsReloadHandler::on_reload_vhost_chunk_size(string /*vhost*/)
{
return ERROR_SUCCESS;

View file

@ -67,6 +67,7 @@ public:
virtual int on_reload_vhost_dvr(std::string vhost);
virtual int on_reload_vhost_mr(std::string vhost);
virtual int on_reload_vhost_mw(std::string vhost);
virtual int on_reload_vhost_realtime(std::string vhost);
virtual int on_reload_vhost_chunk_size(std::string vhost);
virtual int on_reload_vhost_transcode(std::string vhost);
virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);

View file

@ -50,6 +50,7 @@ using namespace std;
#include <srs_protocol_amf0.hpp>
#include <srs_app_recv_thread.hpp>
#include <srs_core_performance.hpp>
#include <srs_kernel_utility.hpp>
// when stream is busy, for example, streaming is already
// publishing, when a new client to request to publish,
@ -86,6 +87,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd)
mw_sleep = SRS_PERF_MW_SLEEP;
mw_enabled = false;
realtime = SRS_PERF_MIN_LATENCY_ENABLED;
_srs_config->subscribe(this);
}
@ -212,14 +214,35 @@ int SrsRtmpConn::on_reload_vhost_removed(string vhost)
return ret;
}
int SrsRtmpConn::on_reload_vhost_mw(string /*vhost*/)
int SrsRtmpConn::on_reload_vhost_mw(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
int sleep_ms = _srs_config->get_mw_sleep_ms(req->vhost);
// when mw_sleep changed, resize the socket send buffer.
change_mw_sleep(sleep_ms);
return ERROR_SUCCESS;
return ret;
}
int SrsRtmpConn::on_reload_vhost_realtime(string vhost)
{
int ret = ERROR_SUCCESS;
if (req->vhost != vhost) {
return ret;
}
bool realtime_enabled = _srs_config->get_realtime_enabled(req->vhost);
srs_trace("realtime changed %d=>%d", realtime, realtime_enabled);
realtime = realtime_enabled;
return ret;
}
int64_t SrsRtmpConn::get_send_bytes_delta()
@ -570,6 +593,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
// when mw_sleep changed, resize the socket send buffer.
mw_enabled = true;
change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
realtime = _srs_config->get_realtime_enabled(req->vhost);
while (true) {
// to use isolate thread to recv, can improve about 33% performance.
@ -599,9 +623,15 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
pithy_print.elapse();
#ifdef SRS_PERF_QUEUE_COND_WAIT
// for send wait time debug
srs_verbose("send thread now=%"PRId64"us, wait %dms", srs_update_system_time_ms(), mw_sleep);
// wait for message to incoming.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/251
consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep, realtime);
// for send wait time debug
srs_verbose("send thread now=%"PRId64"us wakeup", srs_update_system_time_ms());
#endif
// get messages from consumer.
@ -613,10 +643,12 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// we use wait to get messages, so the count must be positive.
srs_assert(count > 0);
srs_info("mw wait %dms and got %d msgs %"PRId64"-%"PRId64"ms",
mw_sleep, count, msgs.msgs[0]->timestamp, msgs.msgs[count - 1]->timestamp);
// we use wait timeout to get messages,
// for min latency event no message incoming,
// so the count maybe zero.
srs_verbose("mw wait %dms and got %d msgs %"PRId64"-%"PRId64"ms", mw_sleep, count,
(count > 0? msgs.msgs[0]->timestamp : 0),
(count > 0? msgs.msgs[count - 1]->timestamp : 0));
#else
if (count <= 0) {
srs_info("mw sleep %dms for no msg", mw_sleep);
@ -1037,12 +1069,12 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms)
}
getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nb_sbuf, &sock_buf_size);
srs_trace("mw changed sleep %d=>%d, max_msgs=%d, esbuf=%d, sbuf %d=>%d",
srs_trace("mw changed sleep %d=>%d, max_msgs=%d, esbuf=%d, sbuf %d=>%d, realtime=%d",
mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, socket_buffer_size,
onb_sbuf, nb_sbuf);
onb_sbuf, nb_sbuf, realtime);
#else
srs_trace("mw changed sleep %d=>%d, max_msgs=%d, sbuf %d",
mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, onb_sbuf);
srs_trace("mw changed sleep %d=>%d, max_msgs=%d, sbuf %d, realtime=%d",
mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, onb_sbuf, realtime);
#endif
mw_sleep = sleep_ms;

View file

@ -75,6 +75,9 @@ private:
int mw_sleep;
// the MR(merged-write) only enabled for play.
int mw_enabled;
// for realtime
// @see https://github.com/winlinvip/simple-rtmp-server/issues/257
bool realtime;
public:
SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd);
virtual ~SrsRtmpConn();
@ -86,6 +89,7 @@ protected:
public:
virtual int on_reload_vhost_removed(std::string vhost);
virtual int on_reload_vhost_mw(std::string vhost);
virtual int on_reload_vhost_realtime(std::string vhost);
// interface IKbpsDelta
public:
virtual int64_t get_send_bytes_delta();

View file

@ -493,7 +493,7 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
void SrsConsumer::wait(int nb_msgs, int duration)
void SrsConsumer::wait(int nb_msgs, int duration, bool realtime)
{
mw_min_msgs = nb_msgs;
mw_duration = duration;
@ -508,8 +508,15 @@ void SrsConsumer::wait(int nb_msgs, int duration)
// the enqueue will notify this cond.
mw_waiting = true;
// wait for msgs to incoming.
st_cond_wait(mw_wait);
// use timeout wait for realtime mode.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/257
if (realtime) {
st_cond_timedwait(mw_wait, duration * 1000);
} else {
// use cond block wait for high performance mode.
st_cond_wait(mw_wait);
}
}
#endif

View file

@ -245,8 +245,9 @@ public:
* wait for messages incomming, atleast nb_msgs and in duration.
* @param nb_msgs the messages count to wait.
* @param duration the messgae duration to wait.
* @param realtime whether use realtime mode.
*/
virtual void wait(int nb_msgs, int duration);
virtual void wait(int nb_msgs, int duration, bool realtime);
#endif
/**
* when client send the pause message.

View file

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
#define VERSION_REVISION 69
#define VERSION_REVISION 70
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
#define RTMP_SIG_SRS_ROLE "origin/edge server"

View file

@ -128,6 +128,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#ifdef SRS_PERF_QUEUE_COND_WAIT
#define SRS_PERF_MW_MIN_MSGS 8
#endif
/**
* the default value of vhost for
* SRS whether use the min latency mode.
* for min latence mode:
* 1. disable the mr for vhost.
* 2. use timeout for cond wait for consumer queue.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/257
*/
#define SRS_PERF_MIN_LATENCY_ENABLED true
/**
* how many chunk stream to cache, [0, N].

View file

@ -61,13 +61,13 @@ int64_t srs_get_system_startup_time_ms()
return _srs_system_time_startup_time / 1000;
}
void srs_update_system_time_ms()
int64_t srs_update_system_time_ms()
{
timeval now;
if (gettimeofday(&now, NULL) < 0) {
srs_warn("gettimeofday failed, ignore");
return;
return -1;
}
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/35
@ -83,7 +83,7 @@ void srs_update_system_time_ms()
if (_srs_system_time_us_cache <= 0) {
_srs_system_time_us_cache = now_us;
_srs_system_time_startup_time = now_us;
return;
return _srs_system_time_us_cache;
}
// use relative time.
@ -99,6 +99,8 @@ void srs_update_system_time_ms()
_srs_system_time_us_cache = now_us;
srs_info("system time updated, startup=%"PRId64"us, now=%"PRId64"us",
_srs_system_time_startup_time, _srs_system_time_us_cache);
return _srs_system_time_us_cache;
}
string srs_dns_resolve(string host)

View file

@ -40,7 +40,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
extern int64_t srs_get_system_time_ms();
extern int64_t srs_get_system_startup_time_ms();
// the deamon st-thread will update it.
extern void srs_update_system_time_ms();
extern int64_t srs_update_system_time_ms();
// dns resolve utility, return the resolved ip address.
extern std::string srs_dns_resolve(std::string host);