mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
For #307, refine performance
This commit is contained in:
parent
9e031c9932
commit
c93cd86ce4
11 changed files with 323 additions and 175 deletions
|
@ -269,9 +269,17 @@ void SrsMessageQueue::set_queue_size(srs_utime_t queue_size)
|
|||
max_queue_size = queue_size;
|
||||
}
|
||||
|
||||
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
|
||||
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow, bool pass_timestamp)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
msgs.push_back(msg);
|
||||
|
||||
// For RTC, we never care about the timestamp and duration, so we never shrink queue here,
|
||||
// but we will drop messages in each consumer coroutine.
|
||||
if (pass_timestamp) {
|
||||
return err;
|
||||
}
|
||||
|
||||
if (msg->is_av()) {
|
||||
if (av_start_time == -1) {
|
||||
|
@ -281,8 +289,6 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
|
|||
av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
|
||||
}
|
||||
|
||||
msgs.push_back(msg);
|
||||
|
||||
while (av_end_time - av_start_time > max_queue_size) {
|
||||
// notice the caller queue already overflow and shrinked.
|
||||
if (is_overflow) {
|
||||
|
@ -295,7 +301,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
|
|||
return err;
|
||||
}
|
||||
|
||||
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
|
||||
srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
|
@ -308,13 +314,15 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p
|
|||
count = srs_min(max_count, nb_msgs);
|
||||
|
||||
SrsSharedPtrMessage** omsgs = msgs.data();
|
||||
for (int i = 0; i < count; i++) {
|
||||
pmsgs[i] = omsgs[i];
|
||||
memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*));
|
||||
|
||||
// For RTC, we enable pass_timestamp mode, which never care about the timestamp and duration,
|
||||
// so we do not have to update the start time here.
|
||||
if (!pass_timestamp) {
|
||||
SrsSharedPtrMessage* last = omsgs[count - 1];
|
||||
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
|
||||
}
|
||||
|
||||
SrsSharedPtrMessage* last = omsgs[count - 1];
|
||||
av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS);
|
||||
|
||||
if (count >= nb_msgs) {
|
||||
// the pmsgs is big enough and clear msgs at most time.
|
||||
msgs.clear();
|
||||
|
@ -433,6 +441,8 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
|
|||
mw_duration = 0;
|
||||
mw_waiting = false;
|
||||
#endif
|
||||
|
||||
pass_timestamp = false;
|
||||
}
|
||||
|
||||
SrsConsumer::~SrsConsumer()
|
||||
|
@ -466,20 +476,35 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR
|
|||
srs_error_t err = srs_success;
|
||||
|
||||
SrsSharedPtrMessage* msg = shared_msg->copy();
|
||||
|
||||
if (!atc) {
|
||||
|
||||
// For RTC, we enable pass_timestamp mode, which never correct or depends on monotonic increasing of
|
||||
// timestamp. And in RTC, the audio and video timebase can be different, so we ignore time_jitter here.
|
||||
if (!pass_timestamp && !atc) {
|
||||
if ((err = jitter->correct(msg, ag)) != srs_success) {
|
||||
return srs_error_wrap(err, "consume message");
|
||||
}
|
||||
}
|
||||
|
||||
if ((err = queue->enqueue(msg, NULL)) != srs_success) {
|
||||
|
||||
// Put message in queue, here we may enable pass_timestamp mode.
|
||||
if ((err = queue->enqueue(msg, NULL, pass_timestamp)) != srs_success) {
|
||||
return srs_error_wrap(err, "enqueue message");
|
||||
}
|
||||
|
||||
#ifdef SRS_PERF_QUEUE_COND_WAIT
|
||||
// fire the mw when msgs is enough.
|
||||
if (mw_waiting) {
|
||||
// For RTC, we use pass_timestamp mode, we don't care about the timestamp in queue,
|
||||
// so we only check the messages in queue.
|
||||
if (pass_timestamp) {
|
||||
if (queue->size() > mw_min_msgs) {
|
||||
srs_cond_signal(mw_wait);
|
||||
mw_waiting = false;
|
||||
return err;
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
// For RTMP, we wait for messages and duration.
|
||||
srs_utime_t duration = queue->duration();
|
||||
bool match_min_msgs = queue->size() > mw_min_msgs;
|
||||
|
||||
|
@ -529,7 +554,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
|
|||
}
|
||||
|
||||
// pump msgs from queue.
|
||||
if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) {
|
||||
if ((err = queue->dump_packets(max, msgs->msgs, count, pass_timestamp)) != srs_success) {
|
||||
return srs_error_wrap(err, "dump packets");
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue