diff --git a/trunk/auto/auto_headers.sh b/trunk/auto/auto_headers.sh index 60c00db9a..3b1af85fb 100755 --- a/trunk/auto/auto_headers.sh +++ b/trunk/auto/auto_headers.sh @@ -63,6 +63,12 @@ else srs_undefine_macro "SRS_HDS" $SRS_AUTO_HEADERS_H fi +if [ $SRS_LAS = YES ]; then + srs_define_macro "SRS_LAS" $SRS_AUTO_HEADERS_H +else + srs_undefine_macro "SRS_LAS" $SRS_AUTO_HEADERS_H +fi + if [ $SRS_SRT = YES ]; then srs_define_macro "SRS_SRT" $SRS_AUTO_HEADERS_H else diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh index 43f0e701a..8f7186542 100755 --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -16,6 +16,7 @@ help=no ################################################################ # feature options SRS_HDS=NO +SRS_LAS=NO SRS_SRT=NO SRS_RTC=YES SRS_GB28181=NO @@ -146,6 +147,7 @@ Features: --ssl=on|off Whether build the rtmp complex handshake, requires openssl-devel installed. --hds=on|off Whether build the hds streaming, mux RTMP to F4M/F4V files. + --las=on|off Whether use LAS for http-flv adaptive stream. --stream-caster=on|off Whether build the stream caster to serve other stream over other protocol. --stat=on|off Whether build the the data statistic, for http api. --librtmp=on|off Whether build the srs-librtmp, library for client. @@ -279,6 +281,10 @@ function parse_user_option() { --without-hds) SRS_HDS=NO ;; --hds) if [[ $value == off ]]; then SRS_HDS=NO; else SRS_HDS=YES; fi ;; + --with-las) SRS_LAS=YES ;; + --without-las) SRS_LAS=NO ;; + --las) if [[ $value == off ]]; then SRS_LAS=NO; else SRS_LAS=YES; fi ;; + --with-nginx) SRS_NGINX=YES ;; --without-nginx) SRS_NGINX=NO ;; --nginx) if [[ $value == off ]]; then SRS_NGINX=NO; else SRS_NGINX=YES; fi ;; @@ -531,6 +537,7 @@ function regenerate_options() { SRS_AUTO_CONFIGURE="--prefix=${SRS_PREFIX}" if [ $SRS_HLS = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --hls=on"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --hls=off"; fi if [ $SRS_HDS = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --hds=on"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --hds=off"; fi + if [ $SRS_LAS = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --las=on"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --las=off"; fi if [ $SRS_DVR = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --dvr=on"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --dvr=off"; fi if [ $SRS_SSL = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --ssl=on"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --ssl=off"; fi if [ $SRS_USE_SYS_SSL = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --sys-ssl=on"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --sys-ssl=off"; fi @@ -629,6 +636,7 @@ function check_option_conflicts() { # check variable neccessary if [ $SRS_HDS = RESERVED ]; then echo "you must specifies the hds, see: ./configure --help"; __check_ok=NO; fi + if [ $SRS_LAS = RESERVED ]; then echo "you must specifies the las, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_SSL = RESERVED ]; then echo "you must specifies the ssl, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_STREAM_CASTER = RESERVED ]; then echo "you must specifies the stream-caster, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_UTEST = RESERVED ]; then echo "you must specifies the utest, see: ./configure --help"; __check_ok=NO; fi diff --git a/trunk/configure b/trunk/configure index 605fc2db8..c86c9ba17 100755 --- a/trunk/configure +++ b/trunk/configure @@ -679,6 +679,11 @@ if [ $SRS_HLS = YES ]; then else echo -e "${YELLOW}Warning: HLS is disabled.${BLACK}" fi +if [ $SRS_LAS = YES ]; then + echo -e "${YELLOW}Experiment: LAS is enabled.${BLACK}" +else + echo -e "${GREEN}Warning: LAS is disabled.${BLACK}" +fi if [ $SRS_STREAM_CASTER = YES ]; then echo -e "${YELLOW}Experiment: StreamCaster is enabled.${BLACK}" else diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 9bbb4a6b1..2248f0091 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -587,13 +587,29 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // create consumer of souce, ignore gop cache, use the audio gop cache. SrsConsumer* consumer = NULL; SrsAutoFree(SrsConsumer, consumer); +#ifdef SRS_LAS + int64_t lasspts = 0; + bool only_audio = false; + if (!r->query_get("startPts").empty()) { + lasspts = atoi(r->query_get("startPts").c_str()); + } + if (r->query_get("onlyAudio") == "true") { + only_audio = true; + } + if ((err = source->create_consumer(NULL, consumer, lasspts, only_audio)) != srs_success) { + return srs_error_wrap(err, "create consumer"); + } + if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache(), lasspts)) != srs_success) { + return srs_error_wrap(err, "dumps consumer"); + } +#else if ((err = source->create_consumer(NULL, consumer)) != srs_success) { return srs_error_wrap(err, "create consumer"); } if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) { return srs_error_wrap(err, "dumps consumer"); } - +#endif SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream(); SrsAutoFree(SrsPithyPrint, pprint); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 2fe07696c..e02d48fe5 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -418,6 +418,34 @@ ISrsWakable::~ISrsWakable() { } +#ifdef SRS_LAS +SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c, int64_t lasspts, bool only_audio) +{ + source = s; + conn = c; + paused = false; + jitter = new SrsRtmpJitter(); + queue = new SrsMessageQueue(); + should_update_source_id = false; + +#ifdef SRS_PERF_QUEUE_COND_WAIT + mw_wait = srs_cond_new(); + mw_min_msgs = 0; + mw_duration = 0; + mw_waiting = false; +#endif + + this->lasspts = lasspts; + this->only_audio = only_audio; + have_first_msg = false; + have_keyframe = false; + frames_dropped_by_lasspts = 0; + frames_dropped_by_first_keyframe = 0; + //TODO use config + lasspts_max_wait_time = 3000*1000; + create_time = srs_get_system_time(); +} +#else SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c) { source = s; @@ -434,6 +462,7 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c) mw_waiting = false; #endif } +#endif SrsConsumer::~SrsConsumer() { @@ -444,6 +473,16 @@ SrsConsumer::~SrsConsumer() #ifdef SRS_PERF_QUEUE_COND_WAIT srs_cond_destroy(mw_wait); #endif + +#ifdef SRS_LAS + std::list::iterator it; + for (it = headers_queue.begin(); it != headers_queue.end(); ++it) { + SrsSharedPtrMessage* msg = *it; + srs_freep(msg); + } + headers_queue.clear(); +#endif + } void SrsConsumer::set_queue_size(srs_utime_t queue_size) @@ -464,7 +503,61 @@ int64_t SrsConsumer::get_time() srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) { srs_error_t err = srs_success; - + +#ifdef SRS_LAS + //filter audio frame if only_audio is set + if (only_audio && !shared_msg->is_audio()) { + return err; + } + + //cache header and wait data msg to set timestamp + if (shared_msg->is_header) { + headers_queue.push_back(shared_msg->copy()); + srs_info("got msg to headers_queue, msg:", shared_msg->to_str()); + return err; + } + + // filter the begin data msg + if (!have_first_msg && lasspts > 0) { + // spts_max_wait_time < 0 means wait until shared_msg->pts >= expect_start_pts + if(shared_msg->is_audio() || shared_msg->is_keyframe) { + if (shared_msg->pts < lasspts && + (lasspts_max_wait_time < 0 || srs_get_system_time() - create_time < lasspts_max_wait_time)) { + frames_dropped_by_lasspts++; + return err; + } + } + } + + //filter first video data msg must be key frame + if (!have_keyframe && shared_msg->is_video() && !shared_msg->is_keyframe) { + frames_dropped_by_first_keyframe++; + return err; + } + // now got first data frame + if (!have_first_msg) { + srs_trace("got first data msg, droped=%d, msg:%s", frames_dropped_by_lasspts, shared_msg->to_str().c_str()); + have_first_msg = true; + } + // now got first key frame + if (!have_keyframe && shared_msg->is_keyframe) { + srs_trace("got first key frame, droped=%d, msg:%s", frames_dropped_by_first_keyframe, shared_msg->to_str().c_str()); + have_keyframe = true; + } + + // send cached headers, las mast use atc + while (headers_queue.size() > 0) { + SrsSharedPtrMessage* header_msg = headers_queue.front(); + headers_queue.pop_front(); + header_msg->timestamp = shared_msg->timestamp; + //no need to copy any more, but + //carefull。。。 enqueue may free header_msg, do not use it after enqueue. + if ((err = queue->enqueue(header_msg, NULL)) != ERROR_SUCCESS) { + return err; + } + } +#endif + SrsSharedPtrMessage* msg = shared_msg->copy(); if (!atc) { @@ -720,6 +813,431 @@ bool SrsGopCache::pure_audio() return cached_video_count == 0; } + +#ifdef SRS_LAS +//-----------------SRS_LAS-------------- + +SrsLasCache::SrsLasCache(int cache_dur) { + max_cache_dur = cache_dur; + //we use max max_cache_dur to calculate max cache size + //vfr 60fps, afr 45 fps, and double it size + max_cache_size = cache_dur * (60+45) * 2 /1000; + enable_gop_cache = true; + cached_video_header = NULL; + cached_audio_header = NULL; + cached_metadata_header = NULL; + wait_keyframe = true; + last_video_index = last_audio_index = SRS_NO_DTAT_INDEX; +} + +SrsLasCache::~SrsLasCache() +{ + clear(); +} + +void SrsLasCache::dispose() +{ + clear(); +} + +void SrsLasCache::set(bool enabled) { + enable_gop_cache = enabled; + + if (!enabled) { + //srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size()); + clear(); + return; + } + + srs_info("enable srs cache"); +} + +bool SrsLasCache::enabled() +{ + return enable_gop_cache; +} + +srs_error_t SrsLasCache::cache(SrsSharedPtrMessage* shared_msg) +{ + srs_error_t err = srs_success; + + if (!enable_gop_cache) { + return err; + } + //if wait_keyframe and first video is not key or video sequence header,do not cache it + if (wait_keyframe && shared_msg->is_video() && !shared_msg->is_keyframe && !shared_msg->is_header) { + srs_warn("wait idr frame, first video is not idr and sps"); + return err; + } + + + //update cache_idr_index|cache_header_index|last av index + if (shared_msg->is_video()) { + if (shared_msg->is_keyframe) { + //if idr pts go back clear data,but (keep headers), + //video pts may go back, but not IDR frame + if(cache_idr_index.size() > 0 && shared_msg->pts < cache_vec[cache_idr_index.back()]->pts) { + srs_warn("video key frame pts rollback,erase all cache data, curent pts=%lld, cache_status=%s",shared_msg->pts, to_str().c_str()); + erase_data(cache_vec.size()); + } + wait_keyframe = false; + //save key frame index in cache_vec + cache_idr_index.push_back(cache_vec.size()); + } + + if(!shared_msg->is_header) { + //save last video index in cache_vec + last_video_index = cache_vec.size(); + } else { + //save video header index in cache_vec + cache_header_index.push_back(cache_vec.size()); + } + + } else if (shared_msg->is_audio()) { + if (!shared_msg->is_header) { + //if no video, use audio timestamp, clear data if audio timestamp go back + if (last_video_index < 0 && last_audio_index > 0 && shared_msg->pts < cache_vec[last_audio_index]->pts) { + srs_warn("audio frame pts rollback, erase all cache data, curent pts=%lld, cache_status=%s",shared_msg->pts, to_str().c_str()); + erase_data(cache_vec.size()); + } + //save last audio index in cache_vec + last_audio_index = cache_vec.size(); + } else { + //save audio header index in cache_vec + cache_header_index.push_back(cache_vec.size()); + } + } else { + //save metadata index in cache_vec + cache_header_index.push_back(cache_vec.size()); + } + cache_vec.push_back(shared_msg->copy()); + + //there is no need to call drop old cache too often + // we call it only when key frame come, or overflow packet size limit + if (shared_msg->is_keyframe || (int)cache_vec.size() > max_cache_size) { + try_drop_old_cache(); + } + + return err; +} + +srs_error_t SrsLasCache::dump(SrsConsumer* consumer, bool ds, bool dm, int64_t lasspts) +{ + srs_error_t err = srs_success; + if (empty()) { + return err; + } + //the first msg index should dump in cache_vec, set to end as defult, + int begin_index = cache_vec.size(); + + //lasspts > 0 means give the buffer form first pts>=lasspts + if (lasspts > 0) { + if (last_video_index > 0) { + assert(cache_idr_index.size() > 0); //we use wait key, so if have video, must have idr + // get first key frame (pts>=lasspts) index timestamp in cache_vec! + for (int i = (int)cache_idr_index.size() - 1; i >= 0; i--) { + if (cache_vec[cache_idr_index[i]]->pts >= lasspts) { + begin_index = cache_idr_index[i]; + continue; + } + break; + } + } else if (last_audio_index > 0){ + // only audio use audio timestamp + assert(cache_vec.size() > last_audio_index); + for (int i = last_audio_index; i >= 0; i--) { + if (cache_vec[i]->is_header || !cache_vec[i]->is_audio()) { + continue; + } + if (cache_vec[i]->pts >= lasspts) { + begin_index = i; + continue; + } + break; + } + } + } else if (lasspts == 0) { //lasspts == 0 means from the newest keyframe or audio pkt + if (last_video_index > 0) { + assert(cache_idr_index.size() > 0); + begin_index = cache_idr_index.back(); + } else if (last_audio_index > 0) { + assert(cache_vec.size() > last_audio_index); + for (int i = last_audio_index; i >= 0; i--) { + if (cache_vec[i]->is_header || !cache_vec[i]->is_audio()) { + continue; + } + begin_index = i; + break; + } + } + } else { // lasspts < 0 means from the nearest frame pts to newest_pts - |lasspts| + if (last_video_index > 0) { + assert(cache_idr_index.size() > 0); + int64_t point_pts = cache_vec[last_video_index]->pts + lasspts; + begin_index = cache_idr_index.back(); + int64_t nearest_distance = abs(cache_vec[begin_index]->pts - point_pts); + + for (int i = (int)cache_idr_index.size() - 2; i >= 0; i--) { + int64_t distance = abs(cache_vec[cache_idr_index[i]]->pts - point_pts); + if (distance < nearest_distance) { + nearest_distance = distance; + begin_index = cache_idr_index[i]; + } else { //timestamp is monotonically increasing, so the here is the nearest_distance. + break; + } + } + } else if (last_audio_index > 0) { + // only audio, find nearest audio frame pts to newest_pts - |lasspts| + int64_t point_pts = cache_vec[last_audio_index]->pts + lasspts; + int64_t nearest_distance = abs(cache_vec[last_audio_index]->pts - point_pts); + for (int i = last_audio_index - 1; i >= 0; i--) { + if (cache_vec[i]->is_header) { + continue; // do not care header. + } + + int64_t distance = abs(cache_vec[i]->pts - point_pts); + if (distance < nearest_distance) { + nearest_distance = distance; + begin_index = i; + } else { // timestamp is monotonically increasing, so the here is the nearest_distance. + break; + } + } + } + } + return do_dump(consumer, ds, dm, begin_index); +} + + +srs_error_t SrsLasCache::do_dump(SrsConsumer* consumer, bool ds, bool dm, int begin_index) { + + srs_error_t err = srs_success; + //set oldest header first + SrsSharedPtrMessage* vh = cached_video_header; + SrsSharedPtrMessage* ah = cached_audio_header; + SrsSharedPtrMessage* mh = cached_metadata_header; + //check if have header before begin_index, if do, use new header + if(cache_header_index.size() > 0) { + for (size_t i = 0; i < cache_header_index.size(); i++) { + if (cache_header_index[i] < begin_index) { + if(cache_vec[cache_header_index[i]]->is_video()) { + vh = cache_vec[cache_header_index[i]]; + } else if(cache_vec[cache_header_index[i]]->is_audio()) { + ah = cache_vec[cache_header_index[i]]; + } else { + mh = cache_vec[cache_header_index[i]]; + } + } + break; + } + } + + SrsSharedPtrMessage* first_msg = NULL; + for (size_t i = begin_index; i < cache_vec.size(); i++) { + if(first_msg == NULL) { + //set header timestamp same with first data timestamp + first_msg = cache_vec[begin_index]; + if (mh && dm) { //LAS must use atc + mh->timestamp = first_msg->timestamp; + consumer->enqueue(mh, true, SrsRtmpJitterAlgorithmOFF); + } + if (vh && ds) { + vh->timestamp = first_msg->timestamp; + consumer->enqueue(vh, true, SrsRtmpJitterAlgorithmOFF); + } + if (ah && ds) { + ah->timestamp = first_msg->timestamp; + consumer->enqueue(ah, true, SrsRtmpJitterAlgorithmOFF); + } + } + if(cache_vec[i]->is_header) { + if ((!ds && cache_vec[i]->is_av()) || (!dm && !cache_vec[i]->is_av())) { + continue; + } + } + consumer->enqueue(cache_vec[i], true, SrsRtmpJitterAlgorithmOFF); + } + + if (first_msg != NULL) { + if (first_msg->is_video()) { + srs_trace("dump_cache, dump_dur=%d begin_index=%d, first_msg_pts=%lld cache_status:%s", + cache_vec[last_video_index]->pts - first_msg->pts, begin_index, first_msg->pts, to_str().c_str()); + } else if (first_msg->is_audio()) { + srs_trace("dump_cache, dump_dur=%d begin_index=%d, first_msg=%lld cache_status:%s", + cache_vec[last_audio_index]->pts - first_msg->pts, begin_index, first_msg->pts, to_str().c_str()); + } else { + srs_trace("dump_cache, but not first from av, begin_index=%d, cache_status:%s", + begin_index, first_msg->pts, begin_index, to_str().c_str()); + } + } else { //if no data msg dump, just set headers + if (mh && dm) { + consumer->enqueue(mh, true, SrsRtmpJitterAlgorithmOFF); + } + if (vh && ds) { + consumer->enqueue(vh, true, SrsRtmpJitterAlgorithmOFF); + } + if (ah && ds) { + consumer->enqueue(ah, true, SrsRtmpJitterAlgorithmOFF); + } + srs_trace("dump_cache, no data frame from cache, begin_index=%d cache_status:%s", begin_index, to_str().c_str()); + } + + return err; +} + +void SrsLasCache::clear() +{ + // remove old cache and save headers + std::vector::iterator it; + for (it = cache_vec.begin(); it != cache_vec.end(); it++) { + SrsSharedPtrMessage* msg = *it; + srs_freep(msg); + } + cache_vec.clear(); + cache_idr_index.clear(); + cache_header_index.clear(); + wait_keyframe = true; + srs_freep(cached_video_header); + srs_freep(cached_audio_header); + srs_freep(cached_metadata_header); + last_video_index = last_audio_index = SRS_NO_DTAT_INDEX; +} + +void SrsLasCache::try_drop_old_cache() +{ + int remove_count = 0; + srs_info("before drop, status:%s", to_str().c_str()); + + if (last_video_index > 0) { + assert(cache_idr_index.size() > 0); + //keep at least one key frame + for (int i = 0; i < (int)cache_idr_index.size() - 1; i++) { + int index = cache_idr_index[i]; + if (cache_vec[last_video_index]->pts - cache_vec[index]->pts >= max_cache_dur) { + continue; + } + // find first not overflow key frame + remove_count = index; + break; + } + } else if (last_audio_index > 0) { //only audio drop cache by audio duration + std::vector::iterator it; + for (it = cache_vec.begin(); it != cache_vec.end(); ++it) { + SrsSharedPtrMessage* msg = *it; + if (!msg->is_header && msg->is_audio()) { + if (cache_vec[last_audio_index]->pts - msg->pts <= max_cache_dur) { + break; + } + } + remove_count++; + } + } + + if (remove_count > 0) { + erase_data(remove_count); + } + srs_info("after drop count=%d, status:%s", remove_count, to_str().c_str()); + + //if still overflow packet size limit after shrink by audio or video duration, just clear all buffer + if ((int)cache_vec.size() > max_cache_size) { + erase_data(cache_vec.size()); + srs_warn("packet overflow after drop by video or audio duration, clear all data, status:%s", to_str().c_str()); + } +} + +bool SrsLasCache::empty() { + return (cache_vec.empty() == 0 && cached_metadata_header == NULL && + cached_video_header == NULL && cached_audio_header == NULL); +} + +void SrsLasCache::erase_data(int count) { + // remove old cache and save headers + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = cache_vec[i]; + if (msg->is_header) { + if (msg->is_video()) { + srs_freep(cached_video_header); + cached_video_header = msg; + } else if (msg->is_audio()) { + srs_freep(cached_audio_header); + cached_audio_header = msg; + } else { + srs_freep(cached_metadata_header); + cached_metadata_header = msg; + } + } else { + srs_freep(msg); + } + //remove idr index if data removed + if (!cache_idr_index.empty() && cache_idr_index[0] <= i) { + cache_idr_index.erase(cache_idr_index.begin()); + } + //remove header index if data removed + if (!cache_header_index.empty() && cache_header_index[0] <= i) { + cache_header_index.erase(cache_header_index.begin()); + } + } + + cache_vec.erase(cache_vec.begin(), cache_vec.begin() + count); + + if(cache_idr_index.empty()) { + wait_keyframe = true; + } + + if (cache_vec.empty()) { + assert(cache_idr_index.empty()); + assert(cache_header_index.empty()); + last_video_index = last_audio_index = SRS_NO_DTAT_INDEX; + } else { + //slid index + for(size_t i = 0; i < cache_idr_index.size(); i++) { + cache_idr_index[i] -= count; + assert(cache_vec[cache_idr_index[i]]->is_keyframe); + } + for(size_t i = 0; i < cache_header_index.size(); i++) { + cache_header_index[i] -= count; + assert(cache_vec[cache_header_index[i]]->is_header); + } + + last_video_index -= count; + last_audio_index -= count; + } +} + +std::string SrsLasCache::to_str() { + stringstream ss; + ss << "{"; + ss << "mcdur=" << max_cache_dur; + ss << ",mcsize=" << max_cache_size; + ss << ",csize=" << cache_vec.size(); + ss << ",lvpts=" << (last_video_index >=0? cache_vec[last_video_index]->pts:-1); + ss << ",lvdts=" << (last_video_index >=0? cache_vec[last_video_index]->timestamp:-1); + ss << ",lapts=" << (last_audio_index >=0? cache_vec[last_audio_index]->pts:-1); + ss << ",ladts=" << (last_audio_index >=0? cache_vec[last_audio_index]->timestamp:-1); + ss << ",wait_key=" << wait_keyframe; + ss << ",key_index=["; + for (size_t i = 0; i < cache_idr_index.size(); i++) { + if (i != 0) { + ss << " | "; + } + ss << cache_idr_index[i] << ":" << cache_vec[cache_idr_index[i]]->pts; + } + ss << "],"; + ss << "header_index=["; + for (size_t i = 0; i < cache_header_index.size(); i++) { + if (i != 0) { + ss << " | "; + } + ss << cache_header_index[i] << ":" << cache_vec[cache_header_index[i]]->pts; + } + ss << "]"; + ss << " }"; + return ss.str(); +} + +//-----------------SRS_LAS-------------- +#endif + ISrsSourceHandler::ISrsSourceHandler() { } @@ -1874,7 +2392,13 @@ SrsSource::SrsSource() play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); + +#ifndef SRS_LAS gop_cache = new SrsGopCache(); +#else + gop_cache = new SrsLasCache(); +#endif + hub = new SrsOriginHub(); meta = new SrsMetaCache(); @@ -2160,6 +2684,13 @@ srs_error_t SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* return srs_error_wrap(err, "consume metadata"); } } +#ifdef SRS_LAS + // LASCache will cache all packets, except drop_for_reduce + if ((err = gop_cache->cache(meta->data())) != srs_success) { + return srs_error_wrap(err, "gop cache consume vdieo"); + } +#endif + } // Copy to hub to all utilities. @@ -2245,6 +2776,12 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) return srs_error_wrap(err, "consume message"); } } +#ifdef SRS_LAS + //LASCache will cache all packets, except drop_for_reduce + if ((err = gop_cache->cache(msg)) != srs_success) { + return srs_error_wrap(err, "gop cache consume audio"); + } +#endif } // cache the sequence header of aac, or first packet of mp3. @@ -2255,7 +2792,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) return srs_error_wrap(err, "meta consume audio"); } } - +#ifndef SRS_LAS // when sequence header, donot push to gop cache and adjust the timestamp. if (is_sequence_header) { return err; @@ -2265,7 +2802,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) if ((err = gop_cache->cache(msg)) != srs_success) { return srs_error_wrap(err, "gop cache consume audio"); } - +#endif // if atc, update the sequence header to abs time. if (atc) { if (meta->ash()) { @@ -2375,18 +2912,24 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg) return srs_error_wrap(err, "consume video"); } } +#ifdef SRS_LAS + // LASCache will cache all packets, except drop_for_reduce + if ((err = gop_cache->cache(msg)) != srs_success) { + return srs_error_wrap(err, "gop cache consume vdieo"); + } +#endif } - +#ifndef SRS_LAS // when sequence header, donot push to gop cache and adjust the timestamp. if (is_sequence_header) { return err; } - + // cache the last gop packets if ((err = gop_cache->cache(msg)) != srs_success) { return srs_error_wrap(err, "gop cache consume vdieo"); } - +#endif // if atc, update the sequence header to abs time. if (atc) { if (meta->vsh()) { @@ -2396,7 +2939,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg) meta->data()->timestamp = msg->timestamp; } } - + return err; } @@ -2579,13 +3122,57 @@ void SrsSource::on_unpublish() } } +#ifdef SRS_LAS +srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, int64_t lasspts, bool only_audio) { + srs_error_t err = srs_success; + consumer = new SrsConsumer(this, conn, lasspts, only_audio); + consumers.push_back(consumer); + // for edge, when play edge stream, check the state + if (_srs_config->get_vhost_is_edge(req->vhost)) { + // notice edge to start for the first client. + if ((err = play_edge->on_client_play()) != srs_success) { + return srs_error_wrap(err, "play edge"); + } + } + return err; +} + +srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, bool dg, int64_t lasspts) +{ + srs_error_t err = srs_success; + srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); + consumer->set_queue_size(queue_size); + //las dump cache headers frome gop cache + if (hub->active()) { + if (!dg || !gop_cache->enabled()) { + if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) { + return srs_error_wrap(err, "meta dumps"); + } + } else { + // copy gop cache to client. + if ((err = gop_cache->dump(consumer, dm, ds, lasspts)) != srs_success) { + return srs_error_wrap(err, "gop cache dumps"); + } + } + } + + // print status. + if (dg) { + srs_trace("create consumer, active=%d, queue_size=%.2f, jitter=%d", hub->active(), queue_size, jitter_algorithm); + } else { + srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm); + } + + return err; +} +#else srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer) { srs_error_t err = srs_success; - + consumer = new SrsConsumer(this, conn); consumers.push_back(consumer); - + // for edge, when play edge stream, check the state if (_srs_config->get_vhost_is_edge(req->vhost)) { // notice edge to start for the first client. @@ -2639,6 +3226,7 @@ srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, b return err; } +#endif void SrsSource::on_consumer_destroy(SrsConsumer* consumer) { diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 4cdb4be09..6b99be915 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -202,8 +203,39 @@ private: int mw_min_msgs; srs_utime_t mw_duration; #endif + +#ifdef SRS_LAS + //LAS1.0 chapter 4 exeParam @startPts + int64_t lasspts; + //LAS1.0 chapter 4 exeParam @onlyAudio + bool only_audio; + //if have send first data msg + bool have_first_msg; + //if have send first key frame + bool have_keyframe; + //count of droped frames, (when lasspts > 0, if not have_first_msg, droped frame when pts < lasspts) + int64_t frames_dropped_by_lasspts; + //count of deroped frames,(when first video is not key frame drop it) + int64_t frames_dropped_by_first_keyframe; + //cache {metadata,video sequence header, audio header} and send with data timstamp just after them. + std::list headers_queue; + //LAS1.5 chapter 5.3, not use timeoutPts, but wait mode with a timeout + int64_t lasspts_max_wait_time; + //comsumer create time is to check if expire lasspts_max_wait_time + srs_utime_t create_time; +#endif + public: +#ifdef SRS_LAS + //@param s consumer source + //@param c consumer connection + //@param lasspts LAS1.0 chapter 4 exeParam stratPts + //@param only_audio LAS1.0 chapter 4 exeParam onlyAudio + SrsConsumer(SrsSource* s, SrsConnection* c, int64_t lasspts = 0, bool onlyAudio = false); +#else SrsConsumer(SrsSource* s, SrsConnection* c); +#endif + virtual ~SrsConsumer(); public: // Set the size of queue. @@ -292,6 +324,91 @@ public: // when no video in gop cache, the stream is pure audio right now. virtual bool pure_audio(); }; +#ifdef SRS_LAS +#define SRS_NO_DTAT_INDEX -1 +#define SRS_DEFAULT_LAS_MAX_CACHE_DUR 12000 //LAS1.0 chapter 5.2 @cacheLen TODO use config, +class SrsLasCache { +private: + /** + * if disabled the gop cache, + * the client will wait for the next keyframe for h264, + * and will be black-screen. + */ + bool enable_gop_cache; + + /** + * max cached duration. + */ + int max_cache_dur; + int max_cache_size; + + //latest headers before cache_vec + SrsSharedPtrMessage* cached_video_header; + SrsSharedPtrMessage* cached_audio_header; + SrsSharedPtrMessage* cached_metadata_header; + + //if have video must start from keyframe frame + bool wait_keyframe; + + //cache buffer, cache all msg include (data/headers/metadata) + std::vector cache_vec; + //key_frame index in cache_vec + std::vector cache_idr_index; + //header index in cache_vec + std::vector cache_header_index; + + int last_video_index; + int last_audio_index; +public: + SrsLasCache(int cache_dur = SRS_DEFAULT_LAS_MAX_CACHE_DUR); + virtual ~SrsLasCache(); + + /** + * cleanup when system quit. + */ + virtual void dispose(); + /** + * to enable or disable the gop cache. + */ + virtual void set(bool enabled); + virtual bool enabled(); + + /** + * cache msg to buffer + * @param shared_msg, directly ptr, copy it if need to save it. shared_msg may be data/headers/metadata + */ + virtual srs_error_t cache(SrsSharedPtrMessage* shared_msg); + + /** + * dump the cached gop to consumer. + * @param consumer the consumer we should give buffer to. + * @param lasspts: LAS1.0 chapter 5.3, may have the follow value + * 0> : give the buffer form pts>=lasspts + * =0 : from the newest gop + * <0 : from the newest_pts - |lasspts| + */ + virtual srs_error_t dump(SrsConsumer* consumer, bool dm, bool ds, int64_t lasspts = 0); + //@param begin_index the index of first msg to dump in cache_vec + virtual srs_error_t do_dump(SrsConsumer* consumer, bool dm, bool ds, int begin_index); + //set LAS1.0 chapter 5.2 @cacheLen + virtual void set_max_cache_duration(int duration) {max_cache_dur = duration;}; + /** + * clear all cached msg + */ + virtual void clear(); + /** + * is cache empty, if have any of data|header|metadata, it return false + */ + virtual bool empty(); + +private: + //dorp old cache if overflow max_cache_dur or max_cache_size + virtual void try_drop_old_cache(); + virtual std::string to_str(); + //clear cache_vec msg, but keep headers + virtual void erase_data(int count); +}; +#endif // The handler to handle the event of srs source. // For example, the http flv streaming module handle the event and @@ -538,7 +655,11 @@ private: SrsPlayEdge* play_edge; SrsPublishEdge* publish_edge; // The gop cache for client fast startup. +#ifdef SRS_LAS + SrsLasCache* gop_cache; +#else SrsGopCache* gop_cache; +#endif // The hub for origin server. SrsOriginHub* hub; // The metadata cache. @@ -597,6 +718,20 @@ public: virtual srs_error_t on_publish(); virtual void on_unpublish(); public: + +#ifdef SRS_LAS + // Create consumer + // @param consumer, output the create consumer. + // @param lasspts, @startPts in las1.0 + // @param only_audio, whether comsumer want audio only + virtual srs_error_t create_consumer(SrsConnection* conn, SrsConsumer*& consumer, int64_t lasspts = 0, bool only_audio = false); + // Dumps packets in cache to consumer. + // @param ds, whether dumps the sequence header. + // @param dm, whether dumps the metadata. + // @param dg, whether dumps the gop cache. + // @param lasspts, @startPts in las1.0 + virtual srs_error_t consumer_dumps(SrsConsumer* consumer, bool ds = true, bool dm = true, bool dg = true, int64_t lasspts = 0); +#else // Create consumer // @param consumer, output the create consumer. virtual srs_error_t create_consumer(SrsConnection* conn, SrsConsumer*& consumer); @@ -605,6 +740,7 @@ public: // @param dm, whether dumps the metadata. // @param dg, whether dumps the gop cache. virtual srs_error_t consumer_dumps(SrsConsumer* consumer, bool ds = true, bool dm = true, bool dg = true); +#endif virtual void on_consumer_destroy(SrsConsumer* consumer); virtual void set_cache(bool enabled); virtual SrsRtmpJitterAlgorithm jitter(); diff --git a/trunk/src/kernel/srs_kernel_codec.cpp b/trunk/src/kernel/srs_kernel_codec.cpp index e3d3ca4a3..c1dc946c2 100644 --- a/trunk/src/kernel/srs_kernel_codec.cpp +++ b/trunk/src/kernel/srs_kernel_codec.cpp @@ -178,6 +178,22 @@ bool SrsFlvVideo::acceptable(char* data, int size) return true; } +#ifdef SRS_LAS +int64_t SrsFlvVideo::cts(char *data, int size) +{ + if (!h264(data, size) || size < 6) { + return -1; + } else { + int32_t cts = 0x00; + char* pp = (char*)&cts; + pp[2] = data[2]; + pp[1] = data[3]; + pp[0] = data[4]; + return cts; + } +} +#endif + SrsFlvAudio::SrsFlvAudio() { } diff --git a/trunk/src/kernel/srs_kernel_codec.hpp b/trunk/src/kernel/srs_kernel_codec.hpp index a5bb960cb..b851dcfb3 100644 --- a/trunk/src/kernel/srs_kernel_codec.hpp +++ b/trunk/src/kernel/srs_kernel_codec.hpp @@ -268,6 +268,13 @@ public: * @remark all type of audio is possible, no need to check audio. */ static bool acceptable(char* data, int size); + +#ifdef SRS_LAS + /** + * get cts if is 264, if not 264 return 0 + */ + static int64_t cts(char *data, int size); +#endif }; /** diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index edbcb829c..93b2007d5 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -220,6 +220,11 @@ SrsSharedPtrMessage::SrsSharedPtrPayload::~SrsSharedPtrPayload() SrsSharedPtrMessage::SrsSharedPtrMessage() : timestamp(0), stream_id(0), size(0), payload(NULL) { ptr = NULL; +#ifdef SRS_LAS + is_keyframe = false; + is_header = false; + pts = 0; +#endif } SrsSharedPtrMessage::~SrsSharedPtrMessage() @@ -232,6 +237,23 @@ SrsSharedPtrMessage::~SrsSharedPtrMessage() } } } +#ifdef SRS_LAS +std::string SrsSharedPtrMessage::to_str() { + stringstream ss; + ss << "{"; + ss << "dts=" << timestamp; + ss << ",pts=" << pts; + ss << ",is_k=" << is_keyframe; + ss << ",is_h=" << is_header; + if (ptr) { + ss << ",type=" << (int)ptr->header.message_type; + } else { + ss << ",type=unknow"; + } + ss << "}"; + return ss.str(); +}; +#endif srs_error_t SrsSharedPtrMessage::create(SrsCommonMessage* msg) { @@ -268,6 +290,19 @@ srs_error_t SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload ptr->header.perfer_cid = pheader->perfer_cid; this->timestamp = pheader->timestamp; this->stream_id = pheader->stream_id; +#ifdef SRS_LAS + if (ptr->header.message_type == RTMP_MSG_VideoMessage) { + is_header = SrsFlvVideo::sh(payload, size); + is_keyframe = SrsFlvVideo::keyframe(payload, size) && !is_header; + pts = this->timestamp + SrsFlvVideo::cts(payload, size); + } else if (ptr->header.message_type == RTMP_MSG_AudioMessage) { + is_header = SrsFlvAudio::sh(payload, size); + pts=this->timestamp; + } else { // metadate + is_header = true; + pts=this->timestamp; + } +#endif } ptr->payload = payload; ptr->size = size; @@ -361,6 +396,12 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy() copy->payload = ptr->payload; copy->size = ptr->size; +#ifdef SRS_LAS + copy->pts = pts; + copy->is_header = is_header; + copy->is_keyframe = is_keyframe; +#endif + return copy; } diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index f3be5931b..614b80399 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -290,6 +290,17 @@ public: // video/audio packet use raw bytes, no video/audio packet. char* payload; +#ifdef SRS_LAS + //if is video key frame, not include video sequence header + bool is_keyframe; + // if is {video sequence header|aac header|metadata} + bool is_header; + // for video pts = timstamp + cts, others pts = timestamp + int64_t pts; +public: + std::string to_str(); +#endif + private: class SrsSharedPtrPayload {