1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Merge SRS 4.0

This commit is contained in:
winlin 2020-07-19 10:09:28 +08:00
commit 8ae3ab0ba7
15 changed files with 861 additions and 16 deletions

View file

@ -418,6 +418,34 @@ ISrsWakable::~ISrsWakable()
{
}
#ifdef SRS_LAS
SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c, int64_t lasspts, bool only_audio)
{
source = s;
conn = c;
paused = false;
jitter = new SrsRtmpJitter();
queue = new SrsMessageQueue();
should_update_source_id = false;
#ifdef SRS_PERF_QUEUE_COND_WAIT
mw_wait = srs_cond_new();
mw_min_msgs = 0;
mw_duration = 0;
mw_waiting = false;
#endif
this->lasspts = lasspts;
this->only_audio = only_audio;
have_first_msg = false;
have_keyframe = false;
frames_dropped_by_lasspts = 0;
frames_dropped_by_first_keyframe = 0;
//TODO use config
lasspts_max_wait_time = 3000*1000;
create_time = srs_get_system_time();
}
#else
SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
{
source = s;
@ -434,6 +462,7 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
mw_waiting = false;
#endif
}
#endif
SrsConsumer::~SrsConsumer()
{
@ -444,6 +473,16 @@ SrsConsumer::~SrsConsumer()
#ifdef SRS_PERF_QUEUE_COND_WAIT
srs_cond_destroy(mw_wait);
#endif
#ifdef SRS_LAS
std::list<SrsSharedPtrMessage*>::iterator it;
for (it = headers_queue.begin(); it != headers_queue.end(); ++it) {
SrsSharedPtrMessage* msg = *it;
srs_freep(msg);
}
headers_queue.clear();
#endif
}
void SrsConsumer::set_queue_size(srs_utime_t queue_size)
@ -464,7 +503,61 @@ int64_t SrsConsumer::get_time()
srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
{
srs_error_t err = srs_success;
#ifdef SRS_LAS
//filter audio frame if only_audio is set
if (only_audio && !shared_msg->is_audio()) {
return err;
}
//cache header and wait data msg to set timestamp
if (shared_msg->is_header) {
headers_queue.push_back(shared_msg->copy());
srs_info("got msg to headers_queue, msg:", shared_msg->to_str());
return err;
}
// filter the begin data msg
if (!have_first_msg && lasspts > 0) {
// spts_max_wait_time < 0 means wait until shared_msg->pts >= expect_start_pts
if(shared_msg->is_audio() || shared_msg->is_keyframe) {
if (shared_msg->pts < lasspts &&
(lasspts_max_wait_time < 0 || srs_get_system_time() - create_time < lasspts_max_wait_time)) {
frames_dropped_by_lasspts++;
return err;
}
}
}
//filter first video data msg must be key frame
if (!have_keyframe && shared_msg->is_video() && !shared_msg->is_keyframe) {
frames_dropped_by_first_keyframe++;
return err;
}
// now got first data frame
if (!have_first_msg) {
srs_trace("got first data msg, droped=%d, msg:%s", frames_dropped_by_lasspts, shared_msg->to_str().c_str());
have_first_msg = true;
}
// now got first key frame
if (!have_keyframe && shared_msg->is_keyframe) {
srs_trace("got first key frame, droped=%d, msg:%s", frames_dropped_by_first_keyframe, shared_msg->to_str().c_str());
have_keyframe = true;
}
// send cached headers, las mast use atc
while (headers_queue.size() > 0) {
SrsSharedPtrMessage* header_msg = headers_queue.front();
headers_queue.pop_front();
header_msg->timestamp = shared_msg->timestamp;
//no need to copy any more, but
//carefull。。。 enqueue may free header_msg, do not use it after enqueue.
if ((err = queue->enqueue(header_msg, NULL)) != ERROR_SUCCESS) {
return err;
}
}
#endif
SrsSharedPtrMessage* msg = shared_msg->copy();
if (!atc) {
@ -720,6 +813,431 @@ bool SrsGopCache::pure_audio()
return cached_video_count == 0;
}
#ifdef SRS_LAS
//-----------------SRS_LAS--------------
SrsLasCache::SrsLasCache(int cache_dur) {
max_cache_dur = cache_dur;
//we use max max_cache_dur to calculate max cache size
//vfr 60fps, afr 45 fps, and double it size
max_cache_size = cache_dur * (60+45) * 2 /1000;
enable_gop_cache = true;
cached_video_header = NULL;
cached_audio_header = NULL;
cached_metadata_header = NULL;
wait_keyframe = true;
last_video_index = last_audio_index = SRS_NO_DTAT_INDEX;
}
SrsLasCache::~SrsLasCache()
{
clear();
}
void SrsLasCache::dispose()
{
clear();
}
void SrsLasCache::set(bool enabled) {
enable_gop_cache = enabled;
if (!enabled) {
//srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size());
clear();
return;
}
srs_info("enable srs cache");
}
bool SrsLasCache::enabled()
{
return enable_gop_cache;
}
srs_error_t SrsLasCache::cache(SrsSharedPtrMessage* shared_msg)
{
srs_error_t err = srs_success;
if (!enable_gop_cache) {
return err;
}
//if wait_keyframe and first video is not key or video sequence header,do not cache it
if (wait_keyframe && shared_msg->is_video() && !shared_msg->is_keyframe && !shared_msg->is_header) {
srs_warn("wait idr frame, first video is not idr and sps");
return err;
}
//update cache_idr_index|cache_header_index|last av index
if (shared_msg->is_video()) {
if (shared_msg->is_keyframe) {
//if idr pts go back clear data,but (keep headers),
//video pts may go back, but not IDR frame
if(cache_idr_index.size() > 0 && shared_msg->pts < cache_vec[cache_idr_index.back()]->pts) {
srs_warn("video key frame pts rollback,erase all cache data, curent pts=%lld, cache_status=%s",shared_msg->pts, to_str().c_str());
erase_data(cache_vec.size());
}
wait_keyframe = false;
//save key frame index in cache_vec
cache_idr_index.push_back(cache_vec.size());
}
if(!shared_msg->is_header) {
//save last video index in cache_vec
last_video_index = cache_vec.size();
} else {
//save video header index in cache_vec
cache_header_index.push_back(cache_vec.size());
}
} else if (shared_msg->is_audio()) {
if (!shared_msg->is_header) {
//if no video, use audio timestamp, clear data if audio timestamp go back
if (last_video_index < 0 && last_audio_index > 0 && shared_msg->pts < cache_vec[last_audio_index]->pts) {
srs_warn("audio frame pts rollback, erase all cache data, curent pts=%lld, cache_status=%s",shared_msg->pts, to_str().c_str());
erase_data(cache_vec.size());
}
//save last audio index in cache_vec
last_audio_index = cache_vec.size();
} else {
//save audio header index in cache_vec
cache_header_index.push_back(cache_vec.size());
}
} else {
//save metadata index in cache_vec
cache_header_index.push_back(cache_vec.size());
}
cache_vec.push_back(shared_msg->copy());
//there is no need to call drop old cache too often
// we call it only when key frame come, or overflow packet size limit
if (shared_msg->is_keyframe || (int)cache_vec.size() > max_cache_size) {
try_drop_old_cache();
}
return err;
}
srs_error_t SrsLasCache::dump(SrsConsumer* consumer, bool ds, bool dm, int64_t lasspts)
{
srs_error_t err = srs_success;
if (empty()) {
return err;
}
//the first msg index should dump in cache_vec, set to end as defult,
int begin_index = cache_vec.size();
//lasspts > 0 means give the buffer form first pts>=lasspts
if (lasspts > 0) {
if (last_video_index > 0) {
assert(cache_idr_index.size() > 0); //we use wait key, so if have video, must have idr
// get first key frame (pts>=lasspts) index timestamp in cache_vec!
for (int i = (int)cache_idr_index.size() - 1; i >= 0; i--) {
if (cache_vec[cache_idr_index[i]]->pts >= lasspts) {
begin_index = cache_idr_index[i];
continue;
}
break;
}
} else if (last_audio_index > 0){
// only audio use audio timestamp
assert(cache_vec.size() > last_audio_index);
for (int i = last_audio_index; i >= 0; i--) {
if (cache_vec[i]->is_header || !cache_vec[i]->is_audio()) {
continue;
}
if (cache_vec[i]->pts >= lasspts) {
begin_index = i;
continue;
}
break;
}
}
} else if (lasspts == 0) { //lasspts == 0 means from the newest keyframe or audio pkt
if (last_video_index > 0) {
assert(cache_idr_index.size() > 0);
begin_index = cache_idr_index.back();
} else if (last_audio_index > 0) {
assert(cache_vec.size() > last_audio_index);
for (int i = last_audio_index; i >= 0; i--) {
if (cache_vec[i]->is_header || !cache_vec[i]->is_audio()) {
continue;
}
begin_index = i;
break;
}
}
} else { // lasspts < 0 means from the nearest frame pts to newest_pts - |lasspts|
if (last_video_index > 0) {
assert(cache_idr_index.size() > 0);
int64_t point_pts = cache_vec[last_video_index]->pts + lasspts;
begin_index = cache_idr_index.back();
int64_t nearest_distance = abs(cache_vec[begin_index]->pts - point_pts);
for (int i = (int)cache_idr_index.size() - 2; i >= 0; i--) {
int64_t distance = abs(cache_vec[cache_idr_index[i]]->pts - point_pts);
if (distance < nearest_distance) {
nearest_distance = distance;
begin_index = cache_idr_index[i];
} else { //timestamp is monotonically increasing, so the here is the nearest_distance.
break;
}
}
} else if (last_audio_index > 0) {
// only audio, find nearest audio frame pts to newest_pts - |lasspts|
int64_t point_pts = cache_vec[last_audio_index]->pts + lasspts;
int64_t nearest_distance = abs(cache_vec[last_audio_index]->pts - point_pts);
for (int i = last_audio_index - 1; i >= 0; i--) {
if (cache_vec[i]->is_header) {
continue; // do not care header.
}
int64_t distance = abs(cache_vec[i]->pts - point_pts);
if (distance < nearest_distance) {
nearest_distance = distance;
begin_index = i;
} else { // timestamp is monotonically increasing, so the here is the nearest_distance.
break;
}
}
}
}
return do_dump(consumer, ds, dm, begin_index);
}
srs_error_t SrsLasCache::do_dump(SrsConsumer* consumer, bool ds, bool dm, int begin_index) {
srs_error_t err = srs_success;
//set oldest header first
SrsSharedPtrMessage* vh = cached_video_header;
SrsSharedPtrMessage* ah = cached_audio_header;
SrsSharedPtrMessage* mh = cached_metadata_header;
//check if have header before begin_index, if do, use new header
if(cache_header_index.size() > 0) {
for (size_t i = 0; i < cache_header_index.size(); i++) {
if (cache_header_index[i] < begin_index) {
if(cache_vec[cache_header_index[i]]->is_video()) {
vh = cache_vec[cache_header_index[i]];
} else if(cache_vec[cache_header_index[i]]->is_audio()) {
ah = cache_vec[cache_header_index[i]];
} else {
mh = cache_vec[cache_header_index[i]];
}
}
break;
}
}
SrsSharedPtrMessage* first_msg = NULL;
for (size_t i = begin_index; i < cache_vec.size(); i++) {
if(first_msg == NULL) {
//set header timestamp same with first data timestamp
first_msg = cache_vec[begin_index];
if (mh && dm) { //LAS must use atc
mh->timestamp = first_msg->timestamp;
consumer->enqueue(mh, true, SrsRtmpJitterAlgorithmOFF);
}
if (vh && ds) {
vh->timestamp = first_msg->timestamp;
consumer->enqueue(vh, true, SrsRtmpJitterAlgorithmOFF);
}
if (ah && ds) {
ah->timestamp = first_msg->timestamp;
consumer->enqueue(ah, true, SrsRtmpJitterAlgorithmOFF);
}
}
if(cache_vec[i]->is_header) {
if ((!ds && cache_vec[i]->is_av()) || (!dm && !cache_vec[i]->is_av())) {
continue;
}
}
consumer->enqueue(cache_vec[i], true, SrsRtmpJitterAlgorithmOFF);
}
if (first_msg != NULL) {
if (first_msg->is_video()) {
srs_trace("dump_cache, dump_dur=%d begin_index=%d, first_msg_pts=%lld cache_status:%s",
cache_vec[last_video_index]->pts - first_msg->pts, begin_index, first_msg->pts, to_str().c_str());
} else if (first_msg->is_audio()) {
srs_trace("dump_cache, dump_dur=%d begin_index=%d, first_msg=%lld cache_status:%s",
cache_vec[last_audio_index]->pts - first_msg->pts, begin_index, first_msg->pts, to_str().c_str());
} else {
srs_trace("dump_cache, but not first from av, begin_index=%d, cache_status:%s",
begin_index, first_msg->pts, begin_index, to_str().c_str());
}
} else { //if no data msg dump, just set headers
if (mh && dm) {
consumer->enqueue(mh, true, SrsRtmpJitterAlgorithmOFF);
}
if (vh && ds) {
consumer->enqueue(vh, true, SrsRtmpJitterAlgorithmOFF);
}
if (ah && ds) {
consumer->enqueue(ah, true, SrsRtmpJitterAlgorithmOFF);
}
srs_trace("dump_cache, no data frame from cache, begin_index=%d cache_status:%s", begin_index, to_str().c_str());
}
return err;
}
void SrsLasCache::clear()
{
// remove old cache and save headers
std::vector<SrsSharedPtrMessage*>::iterator it;
for (it = cache_vec.begin(); it != cache_vec.end(); it++) {
SrsSharedPtrMessage* msg = *it;
srs_freep(msg);
}
cache_vec.clear();
cache_idr_index.clear();
cache_header_index.clear();
wait_keyframe = true;
srs_freep(cached_video_header);
srs_freep(cached_audio_header);
srs_freep(cached_metadata_header);
last_video_index = last_audio_index = SRS_NO_DTAT_INDEX;
}
void SrsLasCache::try_drop_old_cache()
{
int remove_count = 0;
srs_info("before drop, status:%s", to_str().c_str());
if (last_video_index > 0) {
assert(cache_idr_index.size() > 0);
//keep at least one key frame
for (int i = 0; i < (int)cache_idr_index.size() - 1; i++) {
int index = cache_idr_index[i];
if (cache_vec[last_video_index]->pts - cache_vec[index]->pts >= max_cache_dur) {
continue;
}
// find first not overflow key frame
remove_count = index;
break;
}
} else if (last_audio_index > 0) { //only audio drop cache by audio duration
std::vector<SrsSharedPtrMessage*>::iterator it;
for (it = cache_vec.begin(); it != cache_vec.end(); ++it) {
SrsSharedPtrMessage* msg = *it;
if (!msg->is_header && msg->is_audio()) {
if (cache_vec[last_audio_index]->pts - msg->pts <= max_cache_dur) {
break;
}
}
remove_count++;
}
}
if (remove_count > 0) {
erase_data(remove_count);
}
srs_info("after drop count=%d, status:%s", remove_count, to_str().c_str());
//if still overflow packet size limit after shrink by audio or video duration, just clear all buffer
if ((int)cache_vec.size() > max_cache_size) {
erase_data(cache_vec.size());
srs_warn("packet overflow after drop by video or audio duration, clear all data, status:%s", to_str().c_str());
}
}
bool SrsLasCache::empty() {
return (cache_vec.empty() == 0 && cached_metadata_header == NULL &&
cached_video_header == NULL && cached_audio_header == NULL);
}
void SrsLasCache::erase_data(int count) {
// remove old cache and save headers
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = cache_vec[i];
if (msg->is_header) {
if (msg->is_video()) {
srs_freep(cached_video_header);
cached_video_header = msg;
} else if (msg->is_audio()) {
srs_freep(cached_audio_header);
cached_audio_header = msg;
} else {
srs_freep(cached_metadata_header);
cached_metadata_header = msg;
}
} else {
srs_freep(msg);
}
//remove idr index if data removed
if (!cache_idr_index.empty() && cache_idr_index[0] <= i) {
cache_idr_index.erase(cache_idr_index.begin());
}
//remove header index if data removed
if (!cache_header_index.empty() && cache_header_index[0] <= i) {
cache_header_index.erase(cache_header_index.begin());
}
}
cache_vec.erase(cache_vec.begin(), cache_vec.begin() + count);
if(cache_idr_index.empty()) {
wait_keyframe = true;
}
if (cache_vec.empty()) {
assert(cache_idr_index.empty());
assert(cache_header_index.empty());
last_video_index = last_audio_index = SRS_NO_DTAT_INDEX;
} else {
//slid index
for(size_t i = 0; i < cache_idr_index.size(); i++) {
cache_idr_index[i] -= count;
assert(cache_vec[cache_idr_index[i]]->is_keyframe);
}
for(size_t i = 0; i < cache_header_index.size(); i++) {
cache_header_index[i] -= count;
assert(cache_vec[cache_header_index[i]]->is_header);
}
last_video_index -= count;
last_audio_index -= count;
}
}
std::string SrsLasCache::to_str() {
stringstream ss;
ss << "{";
ss << "mcdur=" << max_cache_dur;
ss << ",mcsize=" << max_cache_size;
ss << ",csize=" << cache_vec.size();
ss << ",lvpts=" << (last_video_index >=0? cache_vec[last_video_index]->pts:-1);
ss << ",lvdts=" << (last_video_index >=0? cache_vec[last_video_index]->timestamp:-1);
ss << ",lapts=" << (last_audio_index >=0? cache_vec[last_audio_index]->pts:-1);
ss << ",ladts=" << (last_audio_index >=0? cache_vec[last_audio_index]->timestamp:-1);
ss << ",wait_key=" << wait_keyframe;
ss << ",key_index=[";
for (size_t i = 0; i < cache_idr_index.size(); i++) {
if (i != 0) {
ss << " | ";
}
ss << cache_idr_index[i] << ":" << cache_vec[cache_idr_index[i]]->pts;
}
ss << "],";
ss << "header_index=[";
for (size_t i = 0; i < cache_header_index.size(); i++) {
if (i != 0) {
ss << " | ";
}
ss << cache_header_index[i] << ":" << cache_vec[cache_header_index[i]]->pts;
}
ss << "]";
ss << " }";
return ss.str();
}
//-----------------SRS_LAS--------------
#endif
ISrsSourceHandler::ISrsSourceHandler()
{
}
@ -1873,7 +2391,13 @@ SrsSource::SrsSource()
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
#ifndef SRS_LAS
gop_cache = new SrsGopCache();
#else
gop_cache = new SrsLasCache();
#endif
hub = new SrsOriginHub();
meta = new SrsMetaCache();
@ -2159,6 +2683,13 @@ srs_error_t SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket*
return srs_error_wrap(err, "consume metadata");
}
}
#ifdef SRS_LAS
// LASCache will cache all packets, except drop_for_reduce
if ((err = gop_cache->cache(meta->data())) != srs_success) {
return srs_error_wrap(err, "gop cache consume vdieo");
}
#endif
}
// Copy to hub to all utilities.
@ -2244,6 +2775,12 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
return srs_error_wrap(err, "consume message");
}
}
#ifdef SRS_LAS
//LASCache will cache all packets, except drop_for_reduce
if ((err = gop_cache->cache(msg)) != srs_success) {
return srs_error_wrap(err, "gop cache consume audio");
}
#endif
}
// cache the sequence header of aac, or first packet of mp3.
@ -2254,7 +2791,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
return srs_error_wrap(err, "meta consume audio");
}
}
#ifndef SRS_LAS
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
return err;
@ -2264,7 +2801,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
if ((err = gop_cache->cache(msg)) != srs_success) {
return srs_error_wrap(err, "gop cache consume audio");
}
#endif
// if atc, update the sequence header to abs time.
if (atc) {
if (meta->ash()) {
@ -2374,18 +2911,24 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
return srs_error_wrap(err, "consume video");
}
}
#ifdef SRS_LAS
// LASCache will cache all packets, except drop_for_reduce
if ((err = gop_cache->cache(msg)) != srs_success) {
return srs_error_wrap(err, "gop cache consume vdieo");
}
#endif
}
#ifndef SRS_LAS
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
return err;
}
// cache the last gop packets
if ((err = gop_cache->cache(msg)) != srs_success) {
return srs_error_wrap(err, "gop cache consume vdieo");
}
#endif
// if atc, update the sequence header to abs time.
if (atc) {
if (meta->vsh()) {
@ -2395,7 +2938,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
meta->data()->timestamp = msg->timestamp;
}
}
return err;
}
@ -2578,13 +3121,57 @@ void SrsSource::on_unpublish()
}
}
#ifdef SRS_LAS
srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, int64_t lasspts, bool only_audio) {
srs_error_t err = srs_success;
consumer = new SrsConsumer(this, conn, lasspts, only_audio);
consumers.push_back(consumer);
// for edge, when play edge stream, check the state
if (_srs_config->get_vhost_is_edge(req->vhost)) {
// notice edge to start for the first client.
if ((err = play_edge->on_client_play()) != srs_success) {
return srs_error_wrap(err, "play edge");
}
}
return err;
}
srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, bool dg, int64_t lasspts)
{
srs_error_t err = srs_success;
srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
consumer->set_queue_size(queue_size);
//las dump cache headers frome gop cache
if (hub->active()) {
if (!dg || !gop_cache->enabled()) {
if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) {
return srs_error_wrap(err, "meta dumps");
}
} else {
// copy gop cache to client.
if ((err = gop_cache->dump(consumer, dm, ds, lasspts)) != srs_success) {
return srs_error_wrap(err, "gop cache dumps");
}
}
}
// print status.
if (dg) {
srs_trace("create consumer, active=%d, queue_size=%.2f, jitter=%d", hub->active(), queue_size, jitter_algorithm);
} else {
srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm);
}
return err;
}
#else
srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer)
{
srs_error_t err = srs_success;
consumer = new SrsConsumer(this, conn);
consumers.push_back(consumer);
// for edge, when play edge stream, check the state
if (_srs_config->get_vhost_is_edge(req->vhost)) {
// notice edge to start for the first client.
@ -2638,6 +3225,7 @@ srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, b
return err;
}
#endif
void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
{