diff --git a/README.md b/README.md index 7ce5654e1..0acf0a7cd 100755 --- a/README.md +++ b/README.md @@ -566,6 +566,7 @@ Supported operating systems and hardware: ### SRS 2.0 history +* v2.0, 2015-04-15, for [#383](https://github.com/winlinvip/simple-rtmp-server/issues/383), support mix_correct algorithm. 2.0.161. * v2.0, 2015-04-13, for [#381](https://github.com/winlinvip/simple-rtmp-server/issues/381), support reap hls/ts by gop or not. 2.0.160. * v2.0, 2015-04-10, enhanced on_hls_notify, support HTTP GET when reap ts. * v2.0, 2015-04-10, refine the hls deviation for floor algorithm. diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index b0c883939..8a7729ae2 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -75,7 +75,7 @@ heartbeat { # the id of devide. device_id "my-srs-device"; # whether report with summaries - # if true, put /api/v1/summaries to the request data: + # if on, put /api/v1/summaries to the request data: # { # "summaries": summaries object. # } @@ -1329,7 +1329,7 @@ vhost jitter.srs.com { # about the stream monotonically increasing: # 1. video timestamp is monotonically increasing, # 2. audio timestamp is monotonically increasing, - # 3. video and audio timestamp is interleaved monotonically increasing. + # 3. video and audio timestamp is interleaved/mixed monotonically increasing. # it's specified by RTMP specification, @see 3. Byte Order, Alignment, and Time Format # however, some encoder cannot provides this feature, please set this to off to ignore time jitter. # the time jitter algorithm: @@ -1338,6 +1338,11 @@ vhost jitter.srs.com { # 3. off, disable the time jitter algorithm, like atc. # default: full time_jitter full; + # whether use the interleaved/mixed algorithm to correct the timestamp. + # if on, always ensure the timestamp of audio+video is interleaved/mixed monotonically increase. + # if off, use time_jitter to correct the timestamp if required. + # default: off + mix_correct off; } # vhost for atc. diff --git a/trunk/research/librtmp/srs_ingest_flv.c b/trunk/research/librtmp/srs_ingest_flv.c index 7d8ce132c..ee1cfbdce 100644 --- a/trunk/research/librtmp/srs_ingest_flv.c +++ b/trunk/research/librtmp/srs_ingest_flv.c @@ -171,7 +171,7 @@ int do_proxy(srs_flv_t flv, srs_rtmp_t ortmp, int64_t re, int32_t* pstarttime, u return ret; } - if (*pstarttime < 0) { + if (*pstarttime < 0 && srs_utils_flv_tag_is_av(type)) { *pstarttime = *ptimestamp; } diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index a58620299..4219e8e53 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -807,6 +807,17 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) } srs_trace("vhost %s reload time_jitter success.", vhost.c_str()); } + // mix_correct, only one per vhost + if (!srs_directive_equals(new_vhost->get("mix_correct"), old_vhost->get("mix_correct"))) { + for (it = subscribes.begin(); it != subscribes.end(); ++it) { + ISrsReloadHandler* subscribe = *it; + if ((ret = subscribe->on_reload_vhost_mix_correct(vhost)) != ERROR_SUCCESS) { + srs_error("vhost %s notify subscribes mix_correct failed. ret=%d", vhost.c_str(), ret); + return ret; + } + } + srs_trace("vhost %s reload mix_correct success.", vhost.c_str()); + } // forward, only one per vhost if (!srs_directive_equals(new_vhost->get("forward"), old_vhost->get("forward"))) { for (it = subscribes.begin(); it != subscribes.end(); ++it) { @@ -1419,7 +1430,7 @@ int SrsConfig::check_config() && n != "gop_cache" && n != "queue_length" && n != "refer" && n != "refer_publish" && n != "refer_play" && n != "forward" && n != "transcode" && n != "bandcheck" - && n != "time_jitter" + && n != "time_jitter" && n != "mix_correct" && n != "atc" && n != "atc_auto" && n != "debug_srs_upnode" && n != "mr" && n != "mw_latency" && n != "min_latency" @@ -2118,12 +2129,12 @@ bool SrsConfig::get_atc_auto(string vhost) SrsConfDirective* conf = get_vhost(vhost); if (!conf) { - return true; + return SRS_CONF_DEFAULT_ATC_AUTO; } conf = conf->get("atc_auto"); if (!conf || conf->arg0().empty()) { - return true; + return SRS_CONF_DEFAULT_ATC_AUTO; } return SRS_CONF_PERFER_TRUE(conf->arg0()); @@ -2131,14 +2142,14 @@ bool SrsConfig::get_atc_auto(string vhost) int SrsConfig::get_time_jitter(string vhost) { - SrsConfDirective* dvr = get_vhost(vhost); + SrsConfDirective* conf = get_vhost(vhost); std::string time_jitter = SRS_CONF_DEFAULT_TIME_JITTER; - if (dvr) { - SrsConfDirective* conf = dvr->get("time_jitter"); + if (conf) { + conf = conf->get("time_jitter"); - if (conf) { + if (conf && !conf->arg0().empty()) { time_jitter = conf->arg0(); } } @@ -2146,6 +2157,22 @@ int SrsConfig::get_time_jitter(string vhost) return _srs_time_jitter_string2int(time_jitter); } +bool SrsConfig::get_mix_correct(string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return SRS_CONF_DEFAULT_MIX_CORRECT; + } + + conf = conf->get("mix_correct"); + if (!conf || conf->arg0().empty()) { + return SRS_CONF_DEFAULT_MIX_CORRECT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + double SrsConfig::get_queue_length(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 66f924854..5f0a3846a 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -72,6 +72,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_CONF_DEFAULT_DVR_PLAN SRS_CONF_DEFAULT_DVR_PLAN_SESSION #define SRS_CONF_DEFAULT_DVR_DURATION 30 #define SRS_CONF_DEFAULT_TIME_JITTER "full" +#define SRS_CONF_DEFAULT_ATC_AUTO true +#define SRS_CONF_DEFAULT_MIX_CORRECT false // in seconds, the paused queue length. #define SRS_CONF_DEFAULT_PAUSED_LENGTH 10 // the interval in seconds for bandwidth check @@ -531,6 +533,11 @@ public: * @remark, default full. */ virtual int get_time_jitter(std::string vhost); + /** + * whether use mix correct algorithm to ensure the timestamp + * monotonically increase. + */ + virtual bool get_mix_correct(std::string vhost); /** * get the cache queue length, in seconds. * when exceed the queue length, drop packet util I frame. diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 3055da53f..f0490faae 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -358,8 +358,18 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts return ret; } + std::string path = uri.get_query(); + if (path.empty()) { + path = uri.get_path(); + } else { + path = uri.get_path(); + path += "?"; + path += uri.get_query(); + } + srs_warn("GET %s", path.c_str()); + SrsHttpMessage* msg = NULL; - if ((ret = http.get(uri.get_path(), "", &msg)) != ERROR_SUCCESS) { + if ((ret = http.get(path.c_str(), "", &msg)) != ERROR_SUCCESS) { return ret; } SrsAutoFree(SrsHttpMessage, msg); diff --git a/trunk/src/app/srs_app_reload.cpp b/trunk/src/app/srs_app_reload.cpp index cd43faaeb..86e7da498 100644 --- a/trunk/src/app/srs_app_reload.cpp +++ b/trunk/src/app/srs_app_reload.cpp @@ -130,6 +130,11 @@ int ISrsReloadHandler::on_reload_vhost_time_jitter(string /*vhost*/) return ERROR_SUCCESS; } +int ISrsReloadHandler::on_reload_vhost_mix_correct(string /*vhost*/) +{ + return ERROR_SUCCESS; +} + int ISrsReloadHandler::on_reload_vhost_forward(string /*vhost*/) { return ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_reload.hpp b/trunk/src/app/srs_app_reload.hpp index 01068a7ca..2ac8e3782 100644 --- a/trunk/src/app/srs_app_reload.hpp +++ b/trunk/src/app/srs_app_reload.hpp @@ -63,6 +63,7 @@ public: virtual int on_reload_vhost_gop_cache(std::string vhost); virtual int on_reload_vhost_queue_length(std::string vhost); virtual int on_reload_vhost_time_jitter(std::string vhost); + virtual int on_reload_vhost_mix_correct(std::string vhost); virtual int on_reload_vhost_forward(std::string vhost); virtual int on_reload_vhost_hls(std::string vhost); virtual int on_reload_vhost_hds(std::string vhost); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 6aab17203..2b316f38c 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -44,6 +44,7 @@ using namespace std; #include #include #include +#include #define CONST_MAX_JITTER_MS 500 #define DEFAULT_FRAME_TIME_MS 40 @@ -768,10 +769,62 @@ void SrsSource::destroy() pool.clear(); } +SrsMixQueue::SrsMixQueue() +{ + nb_videos = 0; +} + +SrsMixQueue::~SrsMixQueue() +{ + clear(); +} + +void SrsMixQueue::clear() +{ + std::multimap::iterator it; + for (it = msgs.begin(); it != msgs.end(); ++it) { + SrsSharedPtrMessage* msg = it->second; + srs_freep(msg); + } + msgs.clear(); + + nb_videos = 0; +} + +void SrsMixQueue::push(SrsSharedPtrMessage* msg) +{ + msgs.insert(std::make_pair(msg->timestamp, msg)); + + if (msg->is_video()) { + nb_videos++; + } +} + +SrsSharedPtrMessage* SrsMixQueue::pop() +{ + // always keep 2+ videos + if (nb_videos < 2) { + return NULL; + } + + // pop the first msg. + std::multimap::iterator it = msgs.begin(); + SrsSharedPtrMessage* msg = it->second; + msgs.erase(it); + + if (msg->is_video()) { + nb_videos--; + } + + return msg; +} + SrsSource::SrsSource() { _req = NULL; jitter_algorithm = SrsRtmpJitterAlgorithmOFF; + mix_correct = false; + mix_queue = new SrsMixQueue(); #ifdef SRS_AUTO_HLS hls = new SrsHls(); @@ -818,6 +871,7 @@ SrsSource::~SrsSource() forwarders.clear(); } + srs_freep(mix_queue); srs_freep(cache_metadata); srs_freep(cache_sh_video); srs_freep(cache_sh_audio); @@ -878,6 +932,7 @@ int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* h publish_edge->set_queue_size(queue_size); jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(_req->vhost); + mix_correct = _srs_config->get_mix_correct(_req->vhost); return ret; } @@ -973,6 +1028,25 @@ int SrsSource::on_reload_vhost_time_jitter(string vhost) return ret; } +int SrsSource::on_reload_vhost_mix_correct(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (_req->vhost != vhost) { + return ret; + } + + bool v = _srs_config->get_mix_correct(_req->vhost); + + // when changed, clear the mix queue. + if (v != mix_correct) { + mix_queue->clear(); + } + mix_correct = v; + + return ret; +} + int SrsSource::on_reload_vhost_forward(string vhost) { int ret = ERROR_SUCCESS; @@ -1330,10 +1404,23 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) srs_error("initialize the audio failed. ret=%d", ret); return ret; } - srs_verbose("initialize shared ptr audio success."); + srs_info("Audio dts=%"PRId64", size=%d", msg.timestamp, msg.size); + + if (!mix_correct) { + return on_audio_imp(&msg); + } + + return do_mix_correct(&msg); +} + +int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) +{ + int ret = ERROR_SUCCESS; + + srs_info("Audio dts=%"PRId64", size=%d", msg->timestamp, msg->size); #ifdef SRS_AUTO_HLS - if ((ret = hls->on_audio(&msg)) != ERROR_SUCCESS) { + if ((ret = hls->on_audio(msg)) != ERROR_SUCCESS) { // apply the error strategy for hls. // @see https://github.com/winlinvip/simple-rtmp-server/issues/264 std::string hls_error_strategy = _srs_config->get_hls_on_error(_req->vhost); @@ -1347,7 +1434,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) ret = ERROR_SUCCESS; } else if (hls_error_strategy == SRS_CONF_DEFAULT_HLS_ON_ERROR_CONTINUE) { // compare the sequence header with audio, continue when it's actually an sequence header. - if (ret == ERROR_HLS_DECODE_ERROR && cache_sh_audio && cache_sh_audio->size == msg.size) { + if (ret == ERROR_HLS_DECODE_ERROR && cache_sh_audio && cache_sh_audio->size == msg->size) { srs_warn("the audio is actually a sequence header, ignore this packet."); ret = ERROR_SUCCESS; } else { @@ -1362,7 +1449,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) #endif #ifdef SRS_AUTO_DVR - if ((ret = dvr->on_audio(&msg)) != ERROR_SUCCESS) { + if ((ret = dvr->on_audio(msg)) != ERROR_SUCCESS) { srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret); // unpublish, ignore ret. @@ -1374,7 +1461,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) #endif #ifdef SRS_AUTO_HDS - if ((ret = hds->on_audio(&msg)) != ERROR_SUCCESS) { + if ((ret = hds->on_audio(msg)) != ERROR_SUCCESS) { srs_warn("hds process audio message failed, ignore and disable dvr. ret=%d", ret); // unpublish, ignore ret. @@ -1390,7 +1477,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) SrsConsumer** pconsumer = consumers.data(); for (int i = 0; i < nb_consumers; i++) { SrsConsumer* consumer = pconsumer[i]; - if ((ret = consumer->enqueue(&msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { + if ((ret = consumer->enqueue(msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { srs_error("dispatch the audio failed. ret=%d", ret); return ret; } @@ -1403,7 +1490,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) std::vector::iterator it; for (it = forwarders.begin(); it != forwarders.end(); ++it) { SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_audio(&msg)) != ERROR_SUCCESS) { + if ((ret = forwarder->on_audio(msg)) != ERROR_SUCCESS) { srs_error("forwarder process audio message failed. ret=%d", ret); return ret; } @@ -1413,10 +1500,10 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) // cache the sequence header of aac, or first packet of mp3. // for example, the mp3 is used for hls to write the "right" audio codec. // TODO: FIXME: to refine the stream info system. - bool is_aac_sequence_header = SrsFlvCodec::audio_is_sequence_header(msg.payload, msg.size); + bool is_aac_sequence_header = SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size); if (is_aac_sequence_header || !cache_sh_audio) { srs_freep(cache_sh_audio); - cache_sh_audio = msg.copy(); + cache_sh_audio = msg->copy(); } // cache the sequence header if aac @@ -1425,7 +1512,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) // parse detail audio codec SrsAvcAacCodec codec; SrsCodecSample sample; - if ((ret = codec.audio_aac_demux(msg.payload, msg.size, &sample)) != ERROR_SUCCESS) { + if ((ret = codec.audio_aac_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) { srs_error("source codec demux audio failed. ret=%d", ret); return ret; } @@ -1442,7 +1529,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) srs_trace("%dB audio sh, " "codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), " "flv(%dbits, %dchannels, %dHZ)", - msg.size, codec.audio_codec_id, + msg->size, codec.audio_codec_id, srs_codec_aac_object2str(codec.aac_object).c_str(), codec.aac_channels, codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate], flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type], @@ -1451,7 +1538,7 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) } // cache the last gop packets - if ((ret = gop_cache->cache(&msg)) != ERROR_SUCCESS) { + if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { srs_error("shrink gop cache failed. ret=%d", ret); return ret; } @@ -1460,10 +1547,10 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) // if atc, update the sequence header to abs time. if (atc) { if (cache_sh_audio) { - cache_sh_audio->timestamp = msg.timestamp; + cache_sh_audio->timestamp = msg->timestamp; } if (cache_metadata) { - cache_metadata->timestamp = msg.timestamp; + cache_metadata->timestamp = msg->timestamp; } } @@ -1481,10 +1568,23 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) srs_error("initialize the video failed. ret=%d", ret); return ret; } - srs_verbose("initialize shared ptr video success."); + srs_info("Video dts=%"PRId64", size=%d", msg.timestamp, msg.size); + + if (!mix_correct) { + return on_video_imp(&msg); + } + + return do_mix_correct(&msg); +} + +int SrsSource::on_video_imp(SrsSharedPtrMessage* msg) +{ + int ret = ERROR_SUCCESS; + + srs_info("Video dts=%"PRId64", size=%d", msg->timestamp, msg->size); #ifdef SRS_AUTO_HLS - if ((ret = hls->on_video(&msg)) != ERROR_SUCCESS) { + if ((ret = hls->on_video(msg)) != ERROR_SUCCESS) { // apply the error strategy for hls. // @see https://github.com/winlinvip/simple-rtmp-server/issues/264 std::string hls_error_strategy = _srs_config->get_hls_on_error(_req->vhost); @@ -1498,7 +1598,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) ret = ERROR_SUCCESS; } else if (hls_error_strategy == SRS_CONF_DEFAULT_HLS_ON_ERROR_CONTINUE) { // compare the sequence header with video, continue when it's actually an sequence header. - if (ret == ERROR_HLS_DECODE_ERROR && cache_sh_video && cache_sh_video->size == msg.size) { + if (ret == ERROR_HLS_DECODE_ERROR && cache_sh_video && cache_sh_video->size == msg->size) { srs_warn("the video is actually a sequence header, ignore this packet."); ret = ERROR_SUCCESS; } else { @@ -1513,7 +1613,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) #endif #ifdef SRS_AUTO_DVR - if ((ret = dvr->on_video(&msg)) != ERROR_SUCCESS) { + if ((ret = dvr->on_video(msg)) != ERROR_SUCCESS) { srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret); // unpublish, ignore ret. @@ -1525,7 +1625,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) #endif #ifdef SRS_AUTO_HDS - if ((ret = hds->on_video(&msg)) != ERROR_SUCCESS) { + if ((ret = hds->on_video(msg)) != ERROR_SUCCESS) { srs_warn("hds process video message failed, ignore and disable dvr. ret=%d", ret); // unpublish, ignore ret. @@ -1539,7 +1639,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) if (true) { for (int i = 0; i < (int)consumers.size(); i++) { SrsConsumer* consumer = consumers.at(i); - if ((ret = consumer->enqueue(&msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { + if ((ret = consumer->enqueue(msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { srs_error("dispatch the video failed. ret=%d", ret); return ret; } @@ -1552,7 +1652,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) std::vector::iterator it; for (it = forwarders.begin(); it != forwarders.end(); ++it) { SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_video(&msg)) != ERROR_SUCCESS) { + if ((ret = forwarder->on_video(msg)) != ERROR_SUCCESS) { srs_error("forwarder process video message failed. ret=%d", ret); return ret; } @@ -1561,14 +1661,14 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) // cache the sequence header if h264 // donot cache the sequence header to gop_cache, return here. - if (SrsFlvCodec::video_is_sequence_header(msg.payload, msg.size)) { + if (SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size)) { srs_freep(cache_sh_video); - cache_sh_video = msg.copy(); + cache_sh_video = msg->copy(); // parse detail audio codec SrsAvcAacCodec codec; SrsCodecSample sample; - if ((ret = codec.video_avc_demux(msg.payload, msg.size, &sample)) != ERROR_SUCCESS) { + if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) { srs_error("source codec demux video failed. ret=%d", ret); return ret; } @@ -1581,7 +1681,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) srs_trace("%dB video sh, " "codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %dfps, %ds)", - msg.size, codec.video_codec_id, + msg->size, codec.video_codec_id, srs_codec_avc_profile2str(codec.avc_profile).c_str(), srs_codec_avc_level2str(codec.avc_level).c_str(), codec.width, codec.height, codec.video_data_rate / 1000, codec.frame_rate, codec.duration); @@ -1589,7 +1689,7 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) } // cache the last gop packets - if ((ret = gop_cache->cache(&msg)) != ERROR_SUCCESS) { + if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { srs_error("gop cache msg failed. ret=%d", ret); return ret; } @@ -1598,16 +1698,39 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) // if atc, update the sequence header to abs time. if (atc) { if (cache_sh_video) { - cache_sh_video->timestamp = msg.timestamp; + cache_sh_video->timestamp = msg->timestamp; } if (cache_metadata) { - cache_metadata->timestamp = msg.timestamp; + cache_metadata->timestamp = msg->timestamp; } } return ret; } +int SrsSource::do_mix_correct(SrsSharedPtrMessage* msg) +{ + int ret = ERROR_SUCCESS; + + // insert msg to the queue. + mix_queue->push(msg->copy()); + + // fetch someone from mix queue. + SrsSharedPtrMessage* m = mix_queue->pop(); + if (!m) { + return ret; + } + SrsAutoFree(SrsSharedPtrMessage, m); + + // consume the monotonically increase message. + if (m->is_audio()) { + return on_audio_imp(m); + } + + srs_assert(m->is_video()); + return on_video_imp(m); +} + int SrsSource::on_aggregate(SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; @@ -1748,6 +1871,9 @@ int SrsSource::on_publish() // save its id to srouce id. on_source_id_changed(_srs_context->get_id()); + // reset the mix queue. + mix_queue->clear(); + // create forwarders if ((ret = create_forwarders()) != ERROR_SUCCESS) { srs_error("create forwarders failed. ret=%d", ret); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index a30f7a178..87c863ebf 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -368,6 +368,23 @@ public: virtual void on_unpublish(SrsSource* s, SrsRequest* r) = 0; }; +/** + * the mix queue to correct the timestamp for mix_correct algorithm. + */ +class SrsMixQueue +{ +private: + u_int32_t nb_videos; + std::multimap msgs; +public: + SrsMixQueue(); + virtual ~SrsMixQueue(); +public: + virtual void clear(); + virtual void push(SrsSharedPtrMessage* msg); + virtual SrsSharedPtrMessage* pop(); +}; + /** * live streaming source. */ @@ -407,6 +424,9 @@ private: std::vector consumers; // the time jitter algorithm for vhost. SrsRtmpJitterAlgorithm jitter_algorithm; + // whether use interlaced/mixed algorithm to correct timestamp. + bool mix_correct; + SrsMixQueue* mix_queue; // hls handler. #ifdef SRS_AUTO_HLS SrsHls* hls; @@ -474,6 +494,7 @@ public: virtual int on_reload_vhost_gop_cache(std::string vhost); virtual int on_reload_vhost_queue_length(std::string vhost); virtual int on_reload_vhost_time_jitter(std::string vhost); + virtual int on_reload_vhost_mix_correct(std::string vhost); virtual int on_reload_vhost_forward(std::string vhost); virtual int on_reload_vhost_hls(std::string vhost); virtual int on_reload_vhost_hds(std::string vhost); @@ -495,8 +516,17 @@ public: public: virtual bool can_publish(); virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); +public: virtual int on_audio(SrsCommonMessage* audio); +private: + virtual int on_audio_imp(SrsSharedPtrMessage* audio); +public: virtual int on_video(SrsCommonMessage* video); +private: + virtual int on_video_imp(SrsSharedPtrMessage* video); +private: + virtual int do_mix_correct(SrsSharedPtrMessage* msg); +public: virtual int on_aggregate(SrsCommonMessage* msg); /** * the pre-publish is we are very sure we are diff --git a/trunk/src/libs/srs_librtmp.cpp b/trunk/src/libs/srs_librtmp.cpp index dffd90149..09dd9385b 100644 --- a/trunk/src/libs/srs_librtmp.cpp +++ b/trunk/src/libs/srs_librtmp.cpp @@ -1979,6 +1979,21 @@ srs_bool srs_utils_flv_tag_is_ok(char type) return type == SRS_RTMP_TYPE_AUDIO || type == SRS_RTMP_TYPE_VIDEO || type == SRS_RTMP_TYPE_SCRIPT; } +srs_bool srs_utils_flv_tag_is_audio(char type) +{ + return type == SRS_RTMP_TYPE_AUDIO; +} + +srs_bool srs_utils_flv_tag_is_video(char type) +{ + return type == SRS_RTMP_TYPE_VIDEO; +} + +srs_bool srs_utils_flv_tag_is_av(char type) +{ + return type == SRS_RTMP_TYPE_AUDIO || type == SRS_RTMP_TYPE_VIDEO; +} + char srs_utils_flv_video_codec_id(char* data, int size) { if (size < 1) { diff --git a/trunk/src/libs/srs_librtmp.hpp b/trunk/src/libs/srs_librtmp.hpp index aee8b7a9f..31afe02dc 100644 --- a/trunk/src/libs/srs_librtmp.hpp +++ b/trunk/src/libs/srs_librtmp.hpp @@ -661,6 +661,9 @@ extern int srs_utils_parse_timestamp( * @return true when tag is video/audio/script-data; otherwise, false. */ extern srs_bool srs_utils_flv_tag_is_ok(char type); +extern srs_bool srs_utils_flv_tag_is_audio(char type); +extern srs_bool srs_utils_flv_tag_is_video(char type); +extern srs_bool srs_utils_flv_tag_is_av(char type); /** * get the CodecID of video tag.