From b1d7fbe668b9a01a104fd435b6863024e509f3a2 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 4 Dec 2014 21:35:13 +0800 Subject: [PATCH] fix #241, add mw(merged-write) config. 2.0.53 --- README.md | 1 + trunk/conf/full.conf | 8 +++- trunk/src/app/srs_app_config.cpp | 51 +++++++++++++++++++------ trunk/src/app/srs_app_config.hpp | 8 +++- trunk/src/app/srs_app_reload.cpp | 5 +++ trunk/src/app/srs_app_reload.hpp | 1 + trunk/src/app/srs_app_rtmp_conn.cpp | 25 +++++++++--- trunk/src/app/srs_app_rtmp_conn.hpp | 3 ++ trunk/src/core/srs_core.hpp | 3 +- trunk/src/core/srs_core_performance.hpp | 21 ++++++++-- trunk/src/kernel/srs_kernel_consts.hpp | 4 -- trunk/src/main/srs_main_server.cpp | 23 +++++++++++ 12 files changed, 125 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 19fd4d5a8..3fc55843c 100755 --- a/README.md +++ b/README.md @@ -485,6 +485,7 @@ Supported operating systems and hardware: * 2013-10-17, Created.
## History +* v2.0, 2014-12-04, fix [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), add mw(merged-write) config. 2.0.53 * v2.0, 2014-12-04, for [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), support mr(merged-read) config and reload. 2.0.52. * v2.0, 2014-12-04, enable [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241) and [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), +25% performance, 2.5k publisher. 2.0.50 * v2.0, 2014-12-04, fix [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), improve about 15% performance for fast buffer. 2.0.49 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 6edc5a0bb..ddcbcb0ee 100755 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -143,7 +143,8 @@ vhost __defaultVhost__ { } # the MR(merged-read) setting for publisher. -vhost mr.srs.com { +# the MW(merged-write) settings for player. +vhost mrw.srs.com { # about MR, read https://github.com/winlinvip/simple-rtmp-server/issues/241 mr { # whether enable the MR(merged-read) @@ -160,6 +161,11 @@ vhost mr.srs.com { # default: 500 latency 500; } + # set the MW(merged-write) latency in ms. + # SRS always set mw on, so we just set the latency value. + # the latency of stream >= mw_latency + mr_latency + # default: 500 + mw_latency 500; } # vhost for edge, edge and origin is the same vhost diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index b804e3239..cd4603b05 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -45,6 +45,7 @@ using namespace std; #include #include #include +#include using namespace _srs_internal; @@ -829,6 +830,17 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) } srs_trace("vhost %s reload mr success.", vhost.c_str()); } + // mw, only one per vhost + if (!srs_directive_equals(new_vhost->get("mw_latency"), old_vhost->get("mw_latency"))) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_vhost_mw(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes mw failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload mw success.", vhost.c_str()); + } // http, only one per vhost. if (!srs_directive_equals(new_vhost->get("http"), old_vhost->get("http"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { @@ -1327,7 +1339,7 @@ int SrsConfig::check_config() && n != "time_jitter" && n != "atc" && n != "atc_auto" && n != "debug_srs_upnode" - && n != "mr" + && n != "mr" && n != "mw_latency" ) { ret = ERROR_SYSTEM_CONFIG_INVALID; srs_error("unsupported vhost directive %s, ret=%d", n.c_str(), ret); @@ -1951,7 +1963,7 @@ bool SrsConfig::get_gop_cache(string vhost) SrsConfDirective* conf = get_vhost(vhost); if (!conf) { - return true; + return SRS_PERF_GOP_CACHE; } conf = conf->get("gop_cache"); @@ -1959,7 +1971,7 @@ bool SrsConfig::get_gop_cache(string vhost) return false; } - return true; + return SRS_PERF_GOP_CACHE; } bool SrsConfig::get_debug_srs_upnode(string vhost) @@ -2032,12 +2044,12 @@ double SrsConfig::get_queue_length(string vhost) SrsConfDirective* conf = get_vhost(vhost); if (!conf) { - return SRS_CONF_DEFAULT_QUEUE_LENGTH; + return SRS_PERF_PLAY_QUEUE; } conf = conf->get("queue_length"); if (!conf || conf->arg0().empty()) { - return SRS_CONF_DEFAULT_QUEUE_LENGTH; + return SRS_PERF_PLAY_QUEUE; } return ::atoi(conf->arg0().c_str()); @@ -2106,17 +2118,17 @@ bool SrsConfig::get_mr_enabled(string vhost) SrsConfDirective* conf = get_vhost(vhost); if (!conf) { - return SRS_CONSTS_RTMP_MR; + return SRS_PERF_MR_ENABLED; } conf = conf->get("mr"); if (!conf) { - return SRS_CONSTS_RTMP_MR; + return SRS_PERF_MR_ENABLED; } conf = conf->get("enabled"); if (!conf || conf->arg0() != "on") { - return SRS_CONSTS_RTMP_MR; + return SRS_PERF_MR_ENABLED; } return true; @@ -2128,17 +2140,34 @@ int SrsConfig::get_mr_sleep_ms(string vhost) SrsConfDirective* conf = get_vhost(vhost); if (!conf) { - return SRS_CONSTS_RTMP_MR_SLEEP; + return SRS_PERF_MR_SLEEP; } conf = conf->get("mr"); if (!conf) { - return SRS_CONSTS_RTMP_MR_SLEEP; + return SRS_PERF_MR_SLEEP; } conf = conf->get("latency"); if (!conf || conf->arg0().empty()) { - return SRS_CONSTS_RTMP_MR_SLEEP; + return SRS_PERF_MR_SLEEP; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_mw_sleep_ms(string vhost) +{ + + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return SRS_PERF_MW_SLEEP; + } + + conf = conf->get("mw_latency"); + if (!conf || conf->arg0().empty()) { + return SRS_PERF_MW_SLEEP; } return ::atoi(conf->arg0().c_str()); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 1ec0e2411..2c01d02a4 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -54,8 +54,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_CONF_DEFAULT_DVR_PLAN SRS_CONF_DEFAULT_DVR_PLAN_SESSION #define SRS_CONF_DEFAULT_DVR_DURATION 30 #define SRS_CONF_DEFAULT_TIME_JITTER "full" -// in seconds, the live queue length. -#define SRS_CONF_DEFAULT_QUEUE_LENGTH 30 // in seconds, the paused queue length. #define SRS_CONF_DEFAULT_PAUSED_LENGTH 10 // the interval in seconds for bandwidth check @@ -541,6 +539,12 @@ public: */ // TODO: FIXME: add utest for mr config. virtual int get_mr_sleep_ms(std::string vhost); + /** + * get the mw sleep time in ms for vhost. + * @param vhost, the vhost to get the mw sleep time. + */ + // TODO: FIXME: add utest for mw config. + virtual int get_mw_sleep_ms(std::string vhost); private: /** * get the global chunk size. diff --git a/trunk/src/app/srs_app_reload.cpp b/trunk/src/app/srs_app_reload.cpp index 9581df1d1..cc69a9c44 100644 --- a/trunk/src/app/srs_app_reload.cpp +++ b/trunk/src/app/srs_app_reload.cpp @@ -145,6 +145,11 @@ int ISrsReloadHandler::on_reload_vhost_mr(string /*vhost*/) return ERROR_SUCCESS; } +int ISrsReloadHandler::on_reload_vhost_mw(string /*vhost*/) +{ + return ERROR_SUCCESS; +} + int ISrsReloadHandler::on_reload_vhost_transcode(string /*vhost*/) { return ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_reload.hpp b/trunk/src/app/srs_app_reload.hpp index a133c52d8..878518b09 100644 --- a/trunk/src/app/srs_app_reload.hpp +++ b/trunk/src/app/srs_app_reload.hpp @@ -66,6 +66,7 @@ public: virtual int on_reload_vhost_hls(std::string vhost); virtual int on_reload_vhost_dvr(std::string vhost); virtual int on_reload_vhost_mr(std::string vhost); + virtual int on_reload_vhost_mw(std::string vhost); virtual int on_reload_vhost_transcode(std::string vhost); virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id); virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 958c92396..ff1b8696b 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -83,6 +83,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd) duration = 0; kbps = new SrsKbps(); kbps->set_io(skt, skt); + mw_sleep = SRS_PERF_MW_SLEEP; _srs_config->subscribe(this); } @@ -209,6 +210,13 @@ int SrsRtmpConn::on_reload_vhost_removed(string vhost) return ret; } +int SrsRtmpConn::on_reload_vhost_mw(string /*vhost*/) +{ + mw_sleep = _srs_config->get_mw_sleep_ms(req->vhost); + + return ERROR_SUCCESS; +} + int64_t SrsRtmpConn::get_send_bytes_delta() { return kbps->get_send_bytes_delta(); @@ -361,7 +369,7 @@ int SrsRtmpConn::stream_service_cycle() } bool enabled_cache = _srs_config->get_gop_cache(req->vhost); - srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", + srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, source->source_id(), source->source_id()); source->set_cache(enabled_cache); @@ -592,17 +600,18 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd) // no message to send, sleep a while. if (count <= 0) { srs_verbose("sleep for no messages to send"); - st_usleep(SRS_PERF_SEND_MSGS_CACHE * 1000); + st_usleep(mw_sleep * 1000); } // reportable if (pithy_print.can_print()) { kbps->sample(); srs_trace("-> "SRS_CONSTS_LOG_PLAY - " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", + " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d", pithy_print.age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m() + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), + mw_sleep ); } @@ -774,10 +783,14 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) // reportable if (pithy_print.can_print()) { kbps->sample(); + bool mr = _srs_config->get_mr_enabled(req->vhost); + int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost); srs_trace("<- "SRS_CONSTS_LOG_CLIENT_PUBLISH - " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", pithy_print.age(), + " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d", pithy_print.age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), + mr, mr_sleep + ); } } diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index a10b9ecf6..a5a8885a2 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -71,6 +71,8 @@ private: // @see https://github.com/winlinvip/simple-rtmp-server/issues/47 int64_t duration; SrsKbps* kbps; + // the MR(merged-write) sleep time in ms. + int mw_sleep; public: SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd); virtual ~SrsRtmpConn(); @@ -81,6 +83,7 @@ protected: // interface ISrsReloadHandler public: virtual int on_reload_vhost_removed(std::string vhost); + virtual int on_reload_vhost_mw(std::string vhost); // interface IKbpsDelta public: virtual int64_t get_send_bytes_delta(); diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index b5926e038..da11a8ed6 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 52 +#define VERSION_REVISION 53 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server" @@ -48,6 +48,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define RTMP_SIG_SRS_HANDSHAKE RTMP_SIG_SRS_KEY"("RTMP_SIG_SRS_VERSION")" #define RTMP_SIG_SRS_RELEASE "https://github.com/winlinvip/simple-rtmp-server/tree/1.0release" #define RTMP_SIG_SRS_HTTP_SERVER "https://github.com/winlinvip/simple-rtmp-server/wiki/v1_CN_HTTPServer#feature" +#define RTMP_SIG_SRS_ISSUES(id) "https://github.com/winlinvip/simple-rtmp-server/issues/"#id #define RTMP_SIG_SRS_VERSION __SRS_XSTR(VERSION_MAJOR)"."__SRS_XSTR(VERSION_MINOR)"."__SRS_XSTR(VERSION_REVISION) // internal macros, covert macro values to str, diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 6407ae8df..d2e148949 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -38,6 +38,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * to improve read performance, merge some packets then read, * when it on and read small bytes, we sleep to wait more data., * that is, we merge some data to read together. +* @see SrsConfig::get_mr_enabled() +* @see SrsConfig::get_mr_sleep_ms() * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 * @example, for the default settings, this algorithm will use: * that is, when got nread bytes smaller than 4KB, sleep(780ms). @@ -55,11 +57,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * For example, sleep 120ms. Then there is, and always 120ms data in buffer. * That is, the latency is 120ms(the sleep time). */ -// to enable merged read. #define SRS_PERF_MERGED_READ +// the default config of mr. +#define SRS_PERF_MR_ENABLED false +#define SRS_PERF_MR_SLEEP 500 /** -* the send cache time in ms. +* the MW(merged-write) send cache time in ms. +* the default value, user can override it in config. * to improve send performance, cache msgs and send in a time. * for example, cache 500ms videos and audios, then convert all these * msgs to iovecs, finally use writev to send. @@ -67,8 +72,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * the latency+ when cache+. * @remark the socket send buffer default to 185KB, it large enough. * @see https://github.com/winlinvip/simple-rtmp-server/issues/194 +* @see SrsConfig::get_mw_sleep_ms() */ -#define SRS_PERF_SEND_MSGS_CACHE 500 +// the default config of mw. +#define SRS_PERF_MW_SLEEP 500 /** * how many chunk stream to cache, [0, N]. @@ -78,5 +85,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #define SRS_PERF_CHUNK_STREAM_CACHE 16 +/** +* the gop cache and play cache queue. +*/ +// whether gop cache is on. +#define SRS_PERF_GOP_CACHE true +// in seconds, the live queue length. +#define SRS_PERF_PLAY_QUEUE 30 + #endif diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index 194c9f9d2..d30fd07d1 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -50,10 +50,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // 6. Chunking, RTMP protocol default chunk size. #define SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE 128 -// the default setting of mr. -#define SRS_CONSTS_RTMP_MR false -#define SRS_CONSTS_RTMP_MR_SLEEP 500 - /** * 6. Chunking * The chunk size is configurable. It can be set using a control diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index dcb3b5448..26a2d585d 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -39,6 +39,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // pre-declare int run(); @@ -130,6 +131,24 @@ void show_macro_features() #else srs_warn("check feature compile ffmpeg: off"); #endif + +#ifdef SRS_PERF_MERGED_READ + srs_trace("MR(merged-read): on, @see %s", RTMP_SIG_SRS_ISSUES(241)); +#else + srs_warn("MR(merged-read): off, @see %s", RTMP_SIG_SRS_ISSUES(241)); +#endif + + srs_trace("MR(merged-read) default %d sleep %d", SRS_PERF_MR_ENABLED, SRS_PERF_MR_SLEEP); + srs_trace("MW(merged-write) default sleep %d", SRS_PERF_MW_SLEEP); + srs_trace("read chunk stream cache cid [0, %d)", SRS_PERF_CHUNK_STREAM_CACHE); + srs_trace("default gop cache %d, play queue %ds", SRS_PERF_GOP_CACHE, SRS_PERF_PLAY_QUEUE); + + int possible_mr_latency = 0; +#ifdef SRS_PERF_MERGED_READ + possible_mr_latency = SRS_PERF_MR_SLEEP; +#endif + srs_trace("system default latency in ms: mw(0-%d) + mr(0-%d) + play-queue(0-%d)", + SRS_PERF_MW_SLEEP, possible_mr_latency, SRS_PERF_PLAY_QUEUE*1000); } void check_macro_features() @@ -139,6 +158,10 @@ void check_macro_features() srs_warn("http server is dev feature, @see %s", RTMP_SIG_SRS_HTTP_SERVER); #endif +#ifndef SRS_PERF_MERGED_READ + srs_warn("MR(merged-read) is disabled, hurts read performance. @see %s", RTMP_SIG_SRS_ISSUES(241)); +#endif + #if VERSION_MAJOR > 1 #warning "using develop SRS, please use release instead." srs_warn("SRS %s is develop branch, please use %s instead", RTMP_SIG_SRS_VERSION, RTMP_SIG_SRS_RELEASE);