diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 98a8792dc..39f02bd7f 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -484,12 +484,16 @@ vhost rtc.vhost.srs.com { stun_strict_check on; } # whether enable min delay mode for vhost. - # For RTC, we recommend to set to on. + # default: on, for RTC. min_latency on; play { # set the MW(merged-write) latency in ms. - # For RTC, we recommend lower value, such as 0. + # default: 0 (For WebRTC) mw_latency 0; + # Set the MW(merged-write) min messages. + # default: 0 (For Real-Time, min_latency on) + # default: 1 (For WebRTC, min_latency off) + mw_msgs 0; } } @@ -716,10 +720,16 @@ vhost play.srs.com { # SRS always set mw on, so we just set the latency value. # the latency of stream >= mw_latency + mr_latency # the value recomment is [300, 1800] - # default: 350 (for RTMP/HTTP-FLV) - # default: 0 (for WebRTC) + # default: 350 (For RTMP/HTTP-FLV) + # default: 0 (For WebRTC) mw_latency 350; + # Set the MW(merged-write) min messages. + # default: 0 (For Real-Time, min_latency on) + # default: 1 (For WebRTC, min_latency off) + # default: 8 (For RTMP/HTTP-FLV, min_latency off). + mw_msgs 8; + # the minimal packets send interval in ms, # used to control the ndiff of stream by srs_rtmp_dump, # for example, some device can only accept some stream which @@ -776,6 +786,7 @@ vhost mrw.srs.com { # @see play.srs.com play { mw_latency 350; + mw_msgs 8; } # @see publish.srs.com @@ -795,6 +806,7 @@ vhost min.delay.com { # @see play.srs.com play { mw_latency 100; + mw_msgs 4; gop_cache off; queue_length 10; } @@ -823,6 +835,7 @@ vhost stream.control.com { # @see play.srs.com play { mw_latency 100; + mw_msgs 4; queue_length 10; send_min_interval 10.0; reduce_sequence_header on; diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index eef0a325c..e9b89a346 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3785,7 +3785,8 @@ srs_error_t SrsConfig::check_normal_config() for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; if (m != "time_jitter" && m != "mix_correct" && m != "atc" && m != "atc_auto" && m != "mw_latency" - && m != "gop_cache" && m != "queue_length" && m != "send_min_interval" && m != "reduce_sequence_header") { + && m != "gop_cache" && m != "queue_length" && m != "send_min_interval" && m != "reduce_sequence_header" + && m != "mw_msgs") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.play.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -5423,6 +5424,40 @@ srs_utime_t SrsConfig::get_mw_sleep(string vhost, bool is_rtc) return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS); } +int SrsConfig::get_mw_msgs(string vhost, bool is_realtime, bool is_rtc) +{ + int DEFAULT = SRS_PERF_MW_MIN_MSGS; + if (is_rtc) { + DEFAULT = SRS_PERF_MW_MIN_MSGS_FOR_RTC; + } + if (is_realtime) { + DEFAULT = SRS_PERF_MW_MIN_MSGS_REALTIME; + } + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("play"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("mw_msgs"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + int v = ::atoi(conf->arg0().c_str()); + if (v > SRS_PERF_MW_MSGS) { + srs_warn("reset mw_msgs %d to max %d", v, SRS_PERF_MW_MSGS); + v = SRS_PERF_MW_MSGS; + } + + return v; +} + bool SrsConfig::get_realtime_enabled(string vhost, bool is_rtc) { static bool SYS_DEFAULT = SRS_PERF_MIN_LATENCY_ENABLED; diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index bc9417d76..a905949b1 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -630,6 +630,10 @@ public: // @param vhost, the vhost to get the mw sleep time. // TODO: FIXME: add utest for mw config. virtual srs_utime_t get_mw_sleep(std::string vhost, bool is_rtc = false); + // Get the mw_msgs, mw wait time in packets for vhost. + // @param vhost, the vhost to get the mw sleep msgs. + // TODO: FIXME: add utest for mw config. + virtual int get_mw_msgs(std::string vhost, bool is_realtime, bool is_rtc = false); // Whether min latency mode enabled. // @param vhost, the vhost to get the min_latency. // TODO: FIXME: add utest for min_latency. diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index fa0a7d9f5..61552def2 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -869,6 +869,7 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe api = prop->to_str(); } + // TODO: FIXME: Parse vhost. // Parse app and stream from streamurl. string app; string stream_name; @@ -909,6 +910,13 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe request.app = app; request.stream = stream_name; + // TODO: FIXME: Parse vhost. + // discovery vhost, resolve the vhost from config + SrsConfDirective* parsed_vhost = _srs_config->get_vhost(""); + if (parsed_vhost) { + request.vhost = parsed_vhost->arg0(); + } + // TODO: FIXME: Maybe need a better name? // TODO: FIXME: When server enabled, but vhost disabled, should report error. SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp, eip); diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 95c1f03fd..95a67c21a 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -660,7 +660,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess if ((err = consumer->dump_packets(&msgs, count)) != srs_success) { return srs_error_wrap(err, "consumer dump packets"); } - + + // TODO: FIXME: Support merged-write wait. if (count <= 0) { // Directly use sleep, donot use consumer wait, because we couldn't awake consumer. srs_usleep(mw_sleep); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index a2d7c9c7a..890540c1c 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -563,6 +563,10 @@ SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int video_sequence = 0; + mw_sleep = 0; + mw_msgs = 0; + realtime = true; + _srs_config->subscribe(this); } @@ -595,33 +599,37 @@ srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t srs_error_t SrsRtcSenderThread::on_reload_rtc_server() { - if (true) { - bool v = _srs_config->get_rtc_server_gso(); - if (gso != v) { - srs_trace("Reload gso %d=>%d", gso, v); - gso = v; - } - } + gso = _srs_config->get_rtc_server_gso(); + merge_nalus = _srs_config->get_rtc_server_merge_nalus(); + max_padding = _srs_config->get_rtc_server_padding(); - if (true) { - bool v = _srs_config->get_rtc_server_merge_nalus(); - if (merge_nalus != v) { - srs_trace("Reload merge_nalus %d=>%d", merge_nalus, v); - merge_nalus = v; - } - } - - if (true) { - bool v = _srs_config->get_rtc_server_padding(); - if (max_padding != v) { - srs_trace("Reload padding %d=>%d", max_padding, v); - max_padding = v; - } - } + srs_trace("Reload rtc_server gso=%d, merge_nalus=%d, max_padding=%d", gso, merge_nalus, max_padding); return srs_success; } +srs_error_t SrsRtcSenderThread::on_reload_vhost_play(string vhost) +{ + SrsRequest* req = &rtc_session->request; + + if (req->vhost != vhost) { + return srs_success; + } + + realtime = _srs_config->get_realtime_enabled(req->vhost, true); + mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); + mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); + + srs_trace("Reload play realtime=%d, mw_msgs=%d, mw_sleep=%d", realtime, mw_msgs, mw_sleep); + + return srs_success; +} + +srs_error_t SrsRtcSenderThread::on_reload_vhost_realtime(string vhost) +{ + return on_reload_vhost_play(vhost); +} + int SrsRtcSenderThread::cid() { return trd->cid(); @@ -673,9 +681,6 @@ srs_error_t SrsRtcSenderThread::cycle() return srs_error_wrap(err, "rtc fetch source failed"); } - srs_trace("source url=%s, source_id=[%d][%d], encrypt=%d", - rtc_session->request.get_stream_url().c_str(), ::getpid(), source->source_id(), rtc_session->encrypt); - SrsConsumer* consumer = NULL; SrsAutoFree(SrsConsumer, consumer); if ((err = source->create_consumer(NULL, consumer)) != srs_success) { @@ -684,8 +689,12 @@ srs_error_t SrsRtcSenderThread::cycle() // TODO: FIXME: Support reload. SrsRequest* req = &rtc_session->request; - bool realtime = _srs_config->get_realtime_enabled(req->vhost, true); - srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); + realtime = _srs_config->get_realtime_enabled(req->vhost, true); + mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); + mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); + + srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", rtc_session->request.get_stream_url().c_str(), + ::getpid(), source->source_id(), rtc_session->encrypt, realtime, srsu2msi(mw_sleep), mw_msgs); SrsRtcPackets pkts; SrsMessageArray msgs(SRS_PERF_MW_MSGS); @@ -703,13 +712,7 @@ srs_error_t SrsRtcSenderThread::cycle() #ifdef SRS_PERF_QUEUE_COND_WAIT // Wait for amount of messages or a duration. - if (realtime) { - // for realtime, min required msgs is 0, send when got one+ msgs. - consumer->wait(SRS_PERF_MW_MIN_MSGS_FOR_RTC_REALTIME, mw_sleep); - } else { - // for no-realtime, got some msgs then send. - consumer->wait(SRS_PERF_MW_MIN_MSGS_FOR_RTC, mw_sleep); - } + consumer->wait(mw_msgs, mw_sleep); #endif // Try to read some messages. diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index a563a4e2a..4cf19d727 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -193,6 +193,10 @@ private: bool merge_nalus; bool gso; int max_padding; +private: + srs_utime_t mw_sleep; + int mw_msgs; + bool realtime; public: SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid); virtual ~SrsRtcSenderThread(); @@ -201,6 +205,8 @@ public: // interface ISrsReloadHandler public: virtual srs_error_t on_reload_rtc_server(); + virtual srs_error_t on_reload_vhost_play(std::string vhost); + virtual srs_error_t on_reload_vhost_realtime(std::string vhost); public: virtual int cid(); public: diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index d06d79a42..0f8b1012a 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -116,7 +116,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip) : SrsConnect wakable = NULL; mw_sleep = SRS_PERF_MW_SLEEP; - mw_enabled = false; + mw_msgs = 0; realtime = SRS_PERF_MIN_LATENCY_ENABLED; send_min_interval = 0; tcp_nodelay = false; @@ -264,6 +264,10 @@ srs_error_t SrsRtmpConn::on_reload_vhost_play(string vhost) send_min_interval = v; } } + + mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); + mw_sleep = _srs_config->get_mw_sleep(req->vhost); + set_socket_buffer(mw_sleep); return err; } @@ -298,6 +302,10 @@ srs_error_t SrsRtmpConn::on_reload_vhost_realtime(string vhost) srs_trace("realtime changed %d=>%d", realtime, realtime_enabled); realtime = realtime_enabled; } + + mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); + mw_sleep = _srs_config->get_mw_sleep(req->vhost); + set_socket_buffer(mw_sleep); return err; } @@ -689,18 +697,19 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr SrsMessageArray msgs(SRS_PERF_MW_MSGS); bool user_specified_duration_to_stop = (req->duration > 0); int64_t starttime = -1; - + // setup the realtime. realtime = _srs_config->get_realtime_enabled(req->vhost); // setup the mw config. // when mw_sleep changed, resize the socket send buffer. - mw_enabled = true; - change_mw_sleep(_srs_config->get_mw_sleep(req->vhost)); + mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); + mw_sleep = _srs_config->get_mw_sleep(req->vhost); + set_socket_buffer(mw_sleep); // initialize the send_min_interval send_min_interval = _srs_config->get_send_min_interval(req->vhost); - srs_trace("start play smi=%dms, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d", - srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_enabled, realtime, tcp_nodelay); + srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d", + srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_msgs, realtime, tcp_nodelay); while (true) { // when source is set to expired, disconnect it. @@ -730,13 +739,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr // wait for message to incoming. // @see https://github.com/ossrs/srs/issues/251 // @see https://github.com/ossrs/srs/issues/257 - if (realtime) { - // for realtime, min required msgs is 0, send when got one+ msgs. - consumer->wait(SRS_PERF_MW_MIN_MSGS_REALTIME, mw_sleep); - } else { - // for no-realtime, got some msgs then send. - consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep); - } + consumer->wait(mw_msgs, mw_sleep); #endif // get messages from consumer. @@ -750,9 +753,9 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr // reportable if (pprint->can_print()) { kbps->sample(); - srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d", + srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d", (int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep)); + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep), mw_msgs); } if (count <= 0) { @@ -1114,16 +1117,6 @@ srs_error_t SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsComm return err; } -void SrsRtmpConn::change_mw_sleep(srs_utime_t sleep_v) -{ - if (!mw_enabled) { - return; - } - - set_socket_buffer(sleep_v); - mw_sleep = sleep_v; -} - void SrsRtmpConn::set_sock_options() { SrsRequest* req = info->req; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index ac4b7efb9..c58111f04 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -102,8 +102,7 @@ private: srs_utime_t duration; // The MR(merged-write) sleep time in srs_utime_t. srs_utime_t mw_sleep; - // The MR(merged-write) only enabled for play. - int mw_enabled; + int mw_msgs; // For realtime // @see https://github.com/ossrs/srs/issues/257 bool realtime; @@ -149,7 +148,6 @@ private: virtual srs_error_t handle_publish_message(SrsSource* source, SrsCommonMessage* msg); virtual srs_error_t process_publish_message(SrsSource* source, SrsCommonMessage* msg); virtual srs_error_t process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); - virtual void change_mw_sleep(srs_utime_t sleep_v); virtual void set_sock_options(); private: virtual srs_error_t check_edge_token_traverse_auth(); diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 08b006c10..4d71ccdf8 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -129,10 +129,10 @@ #ifdef SRS_PERF_QUEUE_COND_WAIT // For RTMP, use larger wait queue. #define SRS_PERF_MW_MIN_MSGS 8 - #define SRS_PERF_MW_MIN_MSGS_REALTIME 0 // For RTC, use smaller wait queue. - #define SRS_PERF_MW_MIN_MSGS_FOR_RTC 2 - #define SRS_PERF_MW_MIN_MSGS_FOR_RTC_REALTIME 0 + #define SRS_PERF_MW_MIN_MSGS_FOR_RTC 1 + // For Real-Time, never wait messages. + #define SRS_PERF_MW_MIN_MSGS_REALTIME 0 #endif /** * the default value of vhost for