From c1b64ba24f8a1a41ab8e249dfae812d9c6910c3f Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 16 Apr 2019 07:55:19 +0800 Subject: [PATCH] Refine get_queue_length and set_queue_size in time unit --- trunk/src/app/srs_app_config.cpp | 6 +++--- trunk/src/app/srs_app_config.hpp | 6 +++--- trunk/src/app/srs_app_edge.cpp | 4 ++-- trunk/src/app/srs_app_edge.hpp | 4 ++-- trunk/src/app/srs_app_forward.cpp | 2 +- trunk/src/app/srs_app_forward.hpp | 2 +- trunk/src/app/srs_app_http_stream.cpp | 5 +++-- trunk/src/app/srs_app_http_stream.hpp | 2 +- trunk/src/app/srs_app_source.cpp | 17 ++++++++--------- trunk/src/app/srs_app_source.hpp | 6 +++--- trunk/src/core/srs_core_performance.hpp | 4 ++-- trunk/src/main/srs_main_server.cpp | 4 ++-- trunk/src/utest/srs_utest_config.cpp | 13 ++++++++++++- 13 files changed, 43 insertions(+), 32 deletions(-) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 644eb0ee5..0b9129e5d 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4459,9 +4459,9 @@ bool SrsConfig::get_mix_correct(string vhost) return SRS_CONF_PERFER_FALSE(conf->arg0()); } -double SrsConfig::get_queue_length(string vhost) +srs_utime_t SrsConfig::get_queue_length(string vhost) { - static double DEFAULT = SRS_PERF_PLAY_QUEUE; + static srs_utime_t DEFAULT = SRS_PERF_PLAY_QUEUE; SrsConfDirective* conf = get_vhost(vhost); if (!conf) { @@ -4478,7 +4478,7 @@ double SrsConfig::get_queue_length(string vhost) return DEFAULT; } - return ::atoi(conf->arg0().c_str()); + return srs_utime_t(::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS); } bool SrsConfig::get_refer_enabled(string vhost) diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 4fdeb9c0f..70707f10e 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -729,11 +729,11 @@ public: */ virtual bool get_mix_correct(std::string vhost); /** - * get the cache queue length, in seconds. + * get the cache queue length, in srs_utime_t. * when exceed the queue length, drop packet util I frame. - * @remark, default 10. + * @remark, default 10s. */ - virtual double get_queue_length(std::string vhost); + virtual srs_utime_t get_queue_length(std::string vhost); /** * whether the refer hotlink-denial enabled. */ diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index a6d53508d..3af888282 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -433,7 +433,7 @@ SrsEdgeForwarder::~SrsEdgeForwarder() srs_freep(queue); } -void SrsEdgeForwarder::set_queue_size(double queue_size) +void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size) { return queue->set_queue_size(queue_size); } @@ -710,7 +710,7 @@ SrsPublishEdge::~SrsPublishEdge() srs_freep(forwarder); } -void SrsPublishEdge::set_queue_size(double queue_size) +void SrsPublishEdge::set_queue_size(srs_utime_t queue_size) { return forwarder->set_queue_size(queue_size); } diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 7deac30a9..03d6b70fc 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -170,7 +170,7 @@ public: SrsEdgeForwarder(); virtual ~SrsEdgeForwarder(); public: - virtual void set_queue_size(double queue_size); + virtual void set_queue_size(srs_utime_t queue_size); public: virtual srs_error_t initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r); virtual srs_error_t start(); @@ -232,7 +232,7 @@ public: SrsPublishEdge(); virtual ~SrsPublishEdge(); public: - virtual void set_queue_size(double queue_size); + virtual void set_queue_size(srs_utime_t queue_size); public: virtual srs_error_t initialize(SrsSource* source, SrsRequest* req); virtual bool can_publish(); diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 11295349b..c4a30a5e1 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -85,7 +85,7 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep) return err; } -void SrsForwarder::set_queue_size(double queue_size) +void SrsForwarder::set_queue_size(srs_utime_t queue_size) { queue->set_queue_size(queue_size); } diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index 88fda46bf..f2edc7067 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -71,7 +71,7 @@ public: virtual ~SrsForwarder(); public: virtual srs_error_t initialize(SrsRequest* r, std::string ep); - virtual void set_queue_size(double queue_size); + virtual void set_queue_size(srs_utime_t queue_size); public: virtual srs_error_t on_publish(); virtual void on_unpublish(); diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 99ef37494..af8fb1153 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -64,7 +64,7 @@ SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r) trd = new SrsSTCoroutine("http-stream", this); // TODO: FIXME: support reload. - fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); + fast_cache = srs_utime_t(_srs_config->get_vhost_http_remux_fast_cache(req->vhost) * SRS_UTIME_SECONDS); } SrsBufferCache::~SrsBufferCache() @@ -108,7 +108,8 @@ srs_error_t SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgor return srs_error_wrap(err, "dump packets"); } - srs_trace("http: dump cache %d msgs, duration=%dms, cache=%.2fs", queue->size(), srsu2msi(queue->duration()), fast_cache); + srs_trace("http: dump cache %d msgs, duration=%dms, cache=%dms", + queue->size(), srsu2msi(queue->duration()), srsu2msi(fast_cache)); return err; } diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 24658d299..bfbe35761 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -39,7 +39,7 @@ class SrsTsTransmuxer; class SrsBufferCache : public ISrsCoroutineHandler { private: - double fast_cache; + srs_utime_t fast_cache; private: SrsMessageQueue* queue; SrsSource* source; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 8944af271..73483efbc 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -260,9 +260,9 @@ srs_utime_t SrsMessageQueue::duration() return (av_end_time - av_start_time); } -void SrsMessageQueue::set_queue_size(double queue_size) +void SrsMessageQueue::set_queue_size(srs_utime_t queue_size) { - max_queue_size = srs_utime_t(queue_size * SRS_UTIME_SECONDS); + max_queue_size = queue_size; } srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) @@ -442,7 +442,7 @@ SrsConsumer::~SrsConsumer() #endif } -void SrsConsumer::set_queue_size(double queue_size) +void SrsConsumer::set_queue_size(srs_utime_t queue_size) { queue->set_queue_size(queue_size); } @@ -1468,9 +1468,8 @@ srs_error_t SrsOriginHub::create_forwarders() if ((err = forwarder->initialize(req, forward_server)) != srs_success) { return srs_error_wrap(err, "init forwarder"); } - - // TODO: FIXME: support queue size. - double queue_size = _srs_config->get_queue_length(req->vhost); + + srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); forwarder->set_queue_size(queue_size); if ((err = forwarder->on_publish()) != srs_success) { @@ -1864,7 +1863,7 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) return srs_error_wrap(err, "edge(publish)"); } - double queue_size = _srs_config->get_queue_length(req->vhost); + srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); publish_edge->set_queue_size(queue_size); jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost); @@ -1919,7 +1918,7 @@ srs_error_t SrsSource::on_reload_vhost_play(string vhost) // queue length if (true) { - double v = _srs_config->get_queue_length(req->vhost); + srs_utime_t v = _srs_config->get_queue_length(req->vhost); if (true) { std::vector::iterator it; @@ -2443,7 +2442,7 @@ srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consum consumer = new SrsConsumer(this, conn); consumers.push_back(consumer); - double queue_size = _srs_config->get_queue_length(req->vhost); + srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); consumer->set_queue_size(queue_size); // if atc, update the sequence header to gop cache time. diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 575508768..0103c98b6 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -161,9 +161,9 @@ public: virtual srs_utime_t duration(); /** * set the queue size - * @param queue_size the queue size in seconds. + * @param queue_size the queue size in srs_utime_t. */ - virtual void set_queue_size(double queue_size); + virtual void set_queue_size(srs_utime_t queue_size); public: /** * enqueue the message, the timestamp always monotonically. @@ -243,7 +243,7 @@ public: /** * set the size of queue. */ - virtual void set_queue_size(double queue_size); + virtual void set_queue_size(srs_utime_t queue_size); /** * when source id changed, notice client to print. */ diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 4e2f33888..c9441938b 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -148,8 +148,8 @@ */ // whether gop cache is on. #define SRS_PERF_GOP_CACHE true -// in seconds, the live queue length. -#define SRS_PERF_PLAY_QUEUE 30 +// in srs_utime_t, the live queue length. +#define SRS_PERF_PLAY_QUEUE (30 * SRS_UTIME_SECONDS) /** * whether always use complex send algorithm. diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 904f2cd73..336ce0677 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -306,7 +306,7 @@ void show_macro_features() // gc(gop-cache) ss << "gc:" << srs_bool2switch(SRS_PERF_GOP_CACHE); // pq(play-queue) - ss << ", pq:" << SRS_PERF_PLAY_QUEUE << "s"; + ss << ", pq:" << srsu2msi(SRS_PERF_PLAY_QUEUE) << "ms"; // cscc(chunk stream cache cid) ss << ", cscc:[0," << SRS_PERF_CHUNK_STREAM_CACHE << ")"; // csa(complex send algorithm) @@ -342,7 +342,7 @@ void show_macro_features() possible_mr_latency = srsu2msi(SRS_PERF_MR_SLEEP); #endif srs_trace("system default latency in ms: mw(0-%d) + mr(0-%d) + play-queue(0-%d)", - srsu2msi(SRS_PERF_MW_SLEEP), possible_mr_latency, SRS_PERF_PLAY_QUEUE*1000); + srsu2msi(SRS_PERF_MW_SLEEP), possible_mr_latency, srsu2msi(SRS_PERF_PLAY_QUEUE)); #ifdef SRS_AUTO_MEM_WATCH #warning "srs memory watcher will hurts performance. user should kill by SIGTERM or init.d script." diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index 0779411dc..0a05b854c 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -1806,7 +1806,7 @@ VOID TEST(ConfigMainTest, CheckConf_vhost_ingest_id) EXPECT_TRUE(ERROR_SUCCESS != conf.parse(_MIN_OK_CONF"vhost v{ingest{} ingest{}}")); } -VOID TEST(ConfigUnitTest, CheckDefaultValues) +VOID TEST(ConfigUnitTest, CheckDefaultValuesVhost) { MockSrsConfig conf; @@ -1875,6 +1875,17 @@ VOID TEST(ConfigUnitTest, CheckDefaultValues) EXPECT_EQ(10 * SRS_UTIME_SECONDS, conf.get_hls_dispose("v")); } + if (true) { + EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF)); + EXPECT_EQ(30 * SRS_UTIME_SECONDS, conf.get_queue_length("")); + + EXPECT_TRUE(ERROR_SUCCESS == conf.parse(_MIN_OK_CONF"vhost v{play{queue_length 100;}}")); + EXPECT_EQ(100 * SRS_UTIME_SECONDS, conf.get_queue_length("v")); + } +} + +VOID TEST(ConfigUnitTest, CheckDefaultValuesGlobal) +{ if (true) { srs_utime_t t0 = srs_update_system_time(); srs_usleep(10 * SRS_UTIME_MILLISECONDS);