mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Refine SrsMessageQueue.duration in time unit.
This commit is contained in:
parent
170cca1f58
commit
695d430dcf
6 changed files with 32 additions and 28 deletions
|
@ -108,7 +108,7 @@ srs_error_t SrsBufferCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgor
|
|||
return srs_error_wrap(err, "dump packets");
|
||||
}
|
||||
|
||||
srs_trace("http: dump cache %d msgs, duration=%dms, cache=%.2fs", queue->size(), queue->duration(), fast_cache);
|
||||
srs_trace("http: dump cache %d msgs, duration=%dms, cache=%.2fs", queue->size(), srsu2msi(queue->duration()), fast_cache);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
|
|
@ -725,10 +725,10 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
|
|||
// @see https://github.com/ossrs/srs/issues/257
|
||||
if (realtime) {
|
||||
// for realtime, min required msgs is 0, send when got one+ msgs.
|
||||
consumer->wait(0, srsu2msi(mw_sleep));
|
||||
consumer->wait(0, mw_sleep);
|
||||
} else {
|
||||
// for no-realtime, got some msgs then send.
|
||||
consumer->wait(SRS_PERF_MW_MIN_MSGS, srsu2msi(mw_sleep));
|
||||
consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
|
|
@ -241,7 +241,7 @@ void SrsFastVector::free()
|
|||
SrsMessageQueue::SrsMessageQueue(bool ignore_shrink)
|
||||
{
|
||||
_ignore_shrink = ignore_shrink;
|
||||
queue_size_ms = 0;
|
||||
max_queue_size = 0;
|
||||
av_start_time = av_end_time = -1;
|
||||
}
|
||||
|
||||
|
@ -255,14 +255,14 @@ int SrsMessageQueue::size()
|
|||
return (int)msgs.size();
|
||||
}
|
||||
|
||||
int SrsMessageQueue::duration()
|
||||
srs_utime_t SrsMessageQueue::duration()
|
||||
{
|
||||
return (int)(av_end_time - av_start_time);
|
||||
return (av_end_time - av_start_time);
|
||||
}
|
||||
|
||||
void SrsMessageQueue::set_queue_size(double queue_size)
|
||||
{
|
||||
queue_size_ms = (int)(queue_size * 1000);
|
||||
max_queue_size = srs_utime_t(queue_size * SRS_UTIME_SECONDS);
|
||||
}
|
||||
|
||||
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
|
||||
|
@ -271,7 +271,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
|
|||
|
||||
if (msg->is_av()) {
|
||||
if (av_start_time == -1) {
|
||||
av_start_time = msg->timestamp;
|
||||
av_start_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
|
||||
}
|
||||
|
||||
av_end_time = msg->timestamp;
|
||||
|
@ -279,7 +279,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
|
|||
|
||||
msgs.push_back(msg);
|
||||
|
||||
while (av_end_time - av_start_time > queue_size_ms) {
|
||||
while (av_end_time - av_start_time > max_queue_size) {
|
||||
// notice the caller queue already overflow and shrinked.
|
||||
if (is_overflow) {
|
||||
*is_overflow = true;
|
||||
|
@ -309,7 +309,7 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p
|
|||
}
|
||||
|
||||
SrsSharedPtrMessage* last = omsgs[count - 1];
|
||||
av_start_time = last->timestamp;
|
||||
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
|
||||
|
||||
if (count >= nb_msgs) {
|
||||
// the pmsgs is big enough and clear msgs at most time.
|
||||
|
@ -384,8 +384,7 @@ void SrsMessageQueue::shrink()
|
|||
}
|
||||
|
||||
if (!_ignore_shrink) {
|
||||
srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f",
|
||||
(int)msgs.size(), msgs_size - (int)msgs.size(), queue_size_ms / 1000.0);
|
||||
srs_trace("shrinking, size=%d, removed=%d, max=%dms", (int)msgs.size(), msgs_size - (int)msgs.size(), srsu2msi(max_queue_size));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -477,20 +476,20 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR
|
|||
#ifdef SRS_PERF_QUEUE_COND_WAIT
|
||||
// fire the mw when msgs is enough.
|
||||
if (mw_waiting) {
|
||||
int duration_ms = queue->duration();
|
||||
srs_utime_t duration = queue->duration();
|
||||
bool match_min_msgs = queue->size() > mw_min_msgs;
|
||||
|
||||
// For ATC, maybe the SH timestamp bigger than A/V packet,
|
||||
// when encoder republish or overflow.
|
||||
// @see https://github.com/ossrs/srs/pull/749
|
||||
if (atc && duration_ms < 0) {
|
||||
if (atc && duration < 0) {
|
||||
srs_cond_signal(mw_wait);
|
||||
mw_waiting = false;
|
||||
return err;
|
||||
}
|
||||
|
||||
// when duration ok, signal to flush.
|
||||
if (match_min_msgs && duration_ms > mw_duration) {
|
||||
if (match_min_msgs && duration > mw_duration) {
|
||||
srs_cond_signal(mw_wait);
|
||||
mw_waiting = false;
|
||||
return err;
|
||||
|
@ -534,7 +533,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
|
|||
}
|
||||
|
||||
#ifdef SRS_PERF_QUEUE_COND_WAIT
|
||||
void SrsConsumer::wait(int nb_msgs, int duration)
|
||||
void SrsConsumer::wait(int nb_msgs, srs_utime_t msgs_duration)
|
||||
{
|
||||
if (paused) {
|
||||
srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
|
||||
|
@ -542,13 +541,13 @@ void SrsConsumer::wait(int nb_msgs, int duration)
|
|||
}
|
||||
|
||||
mw_min_msgs = nb_msgs;
|
||||
mw_duration = duration;
|
||||
mw_duration = msgs_duration;
|
||||
|
||||
int duration_ms = queue->duration();
|
||||
srs_utime_t duration = queue->duration();
|
||||
bool match_min_msgs = queue->size() > mw_min_msgs;
|
||||
|
||||
// when duration ok, signal to flush.
|
||||
if (match_min_msgs && duration_ms > mw_duration) {
|
||||
if (match_min_msgs && duration > mw_duration) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -135,10 +135,14 @@ public:
|
|||
class SrsMessageQueue
|
||||
{
|
||||
private:
|
||||
// The start and end time.
|
||||
srs_utime_t av_start_time;
|
||||
srs_utime_t av_end_time;
|
||||
private:
|
||||
// Whether do logging when shrinking.
|
||||
bool _ignore_shrink;
|
||||
int64_t av_start_time;
|
||||
int64_t av_end_time;
|
||||
int queue_size_ms;
|
||||
// The max queue size, shrink if exceed it.
|
||||
srs_utime_t max_queue_size;
|
||||
#ifdef SRS_PERF_QUEUE_FAST_VECTOR
|
||||
SrsFastVector msgs;
|
||||
#else
|
||||
|
@ -155,7 +159,7 @@ public:
|
|||
/**
|
||||
* get the duration of queue.
|
||||
*/
|
||||
virtual int duration();
|
||||
virtual srs_utime_t duration();
|
||||
/**
|
||||
* set the queue size
|
||||
* @param queue_size the queue size in seconds.
|
||||
|
@ -231,7 +235,7 @@ private:
|
|||
srs_cond_t mw_wait;
|
||||
bool mw_waiting;
|
||||
int mw_min_msgs;
|
||||
int mw_duration;
|
||||
srs_utime_t mw_duration;
|
||||
#endif
|
||||
public:
|
||||
SrsConsumer(SrsSource* s, SrsConnection* c);
|
||||
|
@ -268,9 +272,9 @@ public:
|
|||
/**
|
||||
* wait for messages incomming, atleast nb_msgs and in duration.
|
||||
* @param nb_msgs the messages count to wait.
|
||||
* @param duration the messgae duration to wait.
|
||||
* @param msgs_duration the messages duration to wait.
|
||||
*/
|
||||
virtual void wait(int nb_msgs, int duration);
|
||||
virtual void wait(int nb_msgs, srs_utime_t msgs_duration);
|
||||
#endif
|
||||
/**
|
||||
* when client send the pause message.
|
||||
|
|
|
@ -26,8 +26,8 @@
|
|||
|
||||
#include <srs_core.hpp>
|
||||
|
||||
// Wrap for coroutine.
|
||||
typedef uint64_t srs_utime_t;
|
||||
// Time and duration unit, in us.
|
||||
typedef int64_t srs_utime_t;
|
||||
|
||||
// The time unit in ms, for example 100 * SRS_UTIME_MILLISECONDS means 100ms.
|
||||
#define SRS_UTIME_MILLISECONDS 1000
|
||||
|
|
|
@ -161,6 +161,7 @@ srs_utime_t srs_update_system_time()
|
|||
return _srs_system_time_us_cache;
|
||||
}
|
||||
|
||||
// TODO: FIXME: Replace by ST dns resolve.
|
||||
string srs_dns_resolve(string host, int& family)
|
||||
{
|
||||
addrinfo hints;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue