From 695d430dcf0863c33d669a76118e540f8e1f8d5f Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 11 Apr 2019 08:35:57 +0800 Subject: [PATCH] Refine SrsMessageQueue.duration in time unit. --- trunk/src/app/srs_app_http_stream.cpp | 2 +- trunk/src/app/srs_app_rtmp_conn.cpp | 4 ++-- trunk/src/app/srs_app_source.cpp | 31 ++++++++++++------------- trunk/src/app/srs_app_source.hpp | 18 ++++++++------ trunk/src/core/srs_core_time.hpp | 4 ++-- trunk/src/kernel/srs_kernel_utility.cpp | 1 + 6 files changed, 32 insertions(+), 28 deletions(-) diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 090bfbd49..605d9ce61 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -108,7 +108,7 @@ srs_error_t SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgor return srs_error_wrap(err, "dump packets"); } - srs_trace("http: dump cache %d msgs, duration=%dms, cache=%.2fs", queue->size(), queue->duration(), fast_cache); + srs_trace("http: dump cache %d msgs, duration=%dms, cache=%.2fs", queue->size(), srsu2msi(queue->duration()), fast_cache); return err; } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 37cfb37de..467fb0d3b 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -725,10 +725,10 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr // @see https://github.com/ossrs/srs/issues/257 if (realtime) { // for realtime, min required msgs is 0, send when got one+ msgs. - consumer->wait(0, srsu2msi(mw_sleep)); + consumer->wait(0, mw_sleep); } else { // for no-realtime, got some msgs then send. - consumer->wait(SRS_PERF_MW_MIN_MSGS, srsu2msi(mw_sleep)); + consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep); } #endif diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index a54f1f430..a48976fad 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -241,7 +241,7 @@ void SrsFastVector::free() SrsMessageQueue::SrsMessageQueue(bool ignore_shrink) { _ignore_shrink = ignore_shrink; - queue_size_ms = 0; + max_queue_size = 0; av_start_time = av_end_time = -1; } @@ -255,14 +255,14 @@ int SrsMessageQueue::size() return (int)msgs.size(); } -int SrsMessageQueue::duration() +srs_utime_t SrsMessageQueue::duration() { - return (int)(av_end_time - av_start_time); + return (av_end_time - av_start_time); } void SrsMessageQueue::set_queue_size(double queue_size) { - queue_size_ms = (int)(queue_size * 1000); + max_queue_size = srs_utime_t(queue_size * SRS_UTIME_SECONDS); } srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) @@ -271,7 +271,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow if (msg->is_av()) { if (av_start_time == -1) { - av_start_time = msg->timestamp; + av_start_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS); } av_end_time = msg->timestamp; @@ -279,7 +279,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow msgs.push_back(msg); - while (av_end_time - av_start_time > queue_size_ms) { + while (av_end_time - av_start_time > max_queue_size) { // notice the caller queue already overflow and shrinked. if (is_overflow) { *is_overflow = true; @@ -309,7 +309,7 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p } SrsSharedPtrMessage* last = omsgs[count - 1]; - av_start_time = last->timestamp; + av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS); if (count >= nb_msgs) { // the pmsgs is big enough and clear msgs at most time. @@ -384,8 +384,7 @@ void SrsMessageQueue::shrink() } if (!_ignore_shrink) { - srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f", - (int)msgs.size(), msgs_size - (int)msgs.size(), queue_size_ms / 1000.0); + srs_trace("shrinking, size=%d, removed=%d, max=%dms", (int)msgs.size(), msgs_size - (int)msgs.size(), srsu2msi(max_queue_size)); } } @@ -477,20 +476,20 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR #ifdef SRS_PERF_QUEUE_COND_WAIT // fire the mw when msgs is enough. if (mw_waiting) { - int duration_ms = queue->duration(); + srs_utime_t duration = queue->duration(); bool match_min_msgs = queue->size() > mw_min_msgs; // For ATC, maybe the SH timestamp bigger than A/V packet, // when encoder republish or overflow. // @see https://github.com/ossrs/srs/pull/749 - if (atc && duration_ms < 0) { + if (atc && duration < 0) { srs_cond_signal(mw_wait); mw_waiting = false; return err; } // when duration ok, signal to flush. - if (match_min_msgs && duration_ms > mw_duration) { + if (match_min_msgs && duration > mw_duration) { srs_cond_signal(mw_wait); mw_waiting = false; return err; @@ -534,7 +533,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) } #ifdef SRS_PERF_QUEUE_COND_WAIT -void SrsConsumer::wait(int nb_msgs, int duration) +void SrsConsumer::wait(int nb_msgs, srs_utime_t msgs_duration) { if (paused) { srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000); @@ -542,13 +541,13 @@ void SrsConsumer::wait(int nb_msgs, int duration) } mw_min_msgs = nb_msgs; - mw_duration = duration; + mw_duration = msgs_duration; - int duration_ms = queue->duration(); + srs_utime_t duration = queue->duration(); bool match_min_msgs = queue->size() > mw_min_msgs; // when duration ok, signal to flush. - if (match_min_msgs && duration_ms > mw_duration) { + if (match_min_msgs && duration > mw_duration) { return; } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index d79e041b2..6147f7da6 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -135,10 +135,14 @@ public: class SrsMessageQueue { private: + // The start and end time. + srs_utime_t av_start_time; + srs_utime_t av_end_time; +private: + // Whether do logging when shrinking. bool _ignore_shrink; - int64_t av_start_time; - int64_t av_end_time; - int queue_size_ms; + // The max queue size, shrink if exceed it. + srs_utime_t max_queue_size; #ifdef SRS_PERF_QUEUE_FAST_VECTOR SrsFastVector msgs; #else @@ -155,7 +159,7 @@ public: /** * get the duration of queue. */ - virtual int duration(); + virtual srs_utime_t duration(); /** * set the queue size * @param queue_size the queue size in seconds. @@ -231,7 +235,7 @@ private: srs_cond_t mw_wait; bool mw_waiting; int mw_min_msgs; - int mw_duration; + srs_utime_t mw_duration; #endif public: SrsConsumer(SrsSource* s, SrsConnection* c); @@ -268,9 +272,9 @@ public: /** * wait for messages incomming, atleast nb_msgs and in duration. * @param nb_msgs the messages count to wait. - * @param duration the messgae duration to wait. + * @param msgs_duration the messages duration to wait. */ - virtual void wait(int nb_msgs, int duration); + virtual void wait(int nb_msgs, srs_utime_t msgs_duration); #endif /** * when client send the pause message. diff --git a/trunk/src/core/srs_core_time.hpp b/trunk/src/core/srs_core_time.hpp index 1ed056fc6..f9f7bdd5c 100644 --- a/trunk/src/core/srs_core_time.hpp +++ b/trunk/src/core/srs_core_time.hpp @@ -26,8 +26,8 @@ #include -// Wrap for coroutine. -typedef uint64_t srs_utime_t; +// Time and duration unit, in us. +typedef int64_t srs_utime_t; // The time unit in ms, for example 100 * SRS_UTIME_MILLISECONDS means 100ms. #define SRS_UTIME_MILLISECONDS 1000 diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp index 808370178..085d74331 100644 --- a/trunk/src/kernel/srs_kernel_utility.cpp +++ b/trunk/src/kernel/srs_kernel_utility.cpp @@ -161,6 +161,7 @@ srs_utime_t srs_update_system_time() return _srs_system_time_us_cache; } +// TODO: FIXME: Replace by ST dns resolve. string srs_dns_resolve(string host, int& family) { addrinfo hints;