diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index d58d642a0..0fe5ea182 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -196,13 +196,16 @@ int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata) return ret; } -int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio) +int SrsDvrPlan::on_audio(SrsSharedPtrMessage* __audio) { int ret = ERROR_SUCCESS; if (!dvr_enabled) { return ret; } + + SrsSharedPtrMessage* audio = __audio->copy(); + SrsAutoFree(SrsSharedPtrMessage, audio); if ((jitter->correct(audio, 0, 0, jitter_algorithm)) != ERROR_SUCCESS) { return ret; @@ -222,13 +225,16 @@ int SrsDvrPlan::on_audio(SrsSharedPtrMessage* audio) return ret; } -int SrsDvrPlan::on_video(SrsSharedPtrMessage* video) +int SrsDvrPlan::on_video(SrsSharedPtrMessage* __video) { int ret = ERROR_SUCCESS; if (!dvr_enabled) { return ret; } + + SrsSharedPtrMessage* video = __video->copy(); + SrsAutoFree(SrsSharedPtrMessage, video); char* payload = video->payload; int size = video->size; @@ -571,30 +577,14 @@ int SrsDvr::on_meta_data(SrsOnMetaDataPacket* metadata) return ret; } -int SrsDvr::on_audio(SrsSharedPtrMessage* audio) +int SrsDvr::on_audio(SrsSharedPtrMessage* __audio) { - int ret = ERROR_SUCCESS; - - SrsAutoFree(SrsSharedPtrMessage, audio); - - if ((ret = plan->on_audio(audio)) != ERROR_SUCCESS) { - return ret; - } - - return ret; + return plan->on_audio(__audio); } -int SrsDvr::on_video(SrsSharedPtrMessage* video) +int SrsDvr::on_video(SrsSharedPtrMessage* __video) { - int ret = ERROR_SUCCESS; - - SrsAutoFree(SrsSharedPtrMessage, video); - - if ((ret = plan->on_video(video)) != ERROR_SUCCESS) { - return ret; - } - - return ret; + return plan->on_video(__video); } #endif diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index edea5cc75..9f90923f4 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -119,8 +119,14 @@ public: virtual int on_publish(); virtual void on_unpublish() = 0; virtual int on_meta_data(SrsOnMetaDataPacket* metadata); - virtual int on_audio(SrsSharedPtrMessage* audio); - virtual int on_video(SrsSharedPtrMessage* video); + /** + * @param __audio, directly ptr, copy it if need to save it. + */ + virtual int on_audio(SrsSharedPtrMessage* __audio); + /** + * @param __video, directly ptr, copy it if need to save it. + */ + virtual int on_video(SrsSharedPtrMessage* __video); // interface ISrsReloadHandler public: virtual int on_reload_vhost_dvr(std::string vhost); @@ -166,7 +172,13 @@ public: virtual int initialize(SrsSource* source, SrsRequest* req); virtual int on_publish(); virtual void on_unpublish(); + /** + * @param audio, directly ptr, copy it if need to save it. + */ virtual int on_audio(SrsSharedPtrMessage* audio); + /** + * @param video, directly ptr, copy it if need to save it. + */ virtual int on_video(SrsSharedPtrMessage* video); private: virtual int update_duration(SrsSharedPtrMessage* msg); @@ -208,12 +220,14 @@ public: virtual int on_meta_data(SrsOnMetaDataPacket* metadata); /** * mux the audio packets to dvr. + * @param __audio, directly ptr, copy it if need to save it. */ - virtual int on_audio(SrsSharedPtrMessage* audio); + virtual int on_audio(SrsSharedPtrMessage* __audio); /** * mux the video packets to dvr. + * @param __video, directly ptr, copy it if need to save it. */ - virtual int on_video(SrsSharedPtrMessage* video); + virtual int on_video(SrsSharedPtrMessage* __video); }; #endif diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 134fc083b..d3e20988e 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -156,9 +156,11 @@ void SrsForwarder::on_unpublish() kbps->set_io(NULL, NULL); } -int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) +int SrsForwarder::on_meta_data(SrsSharedPtrMessage* __metadata) { int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* metadata = __metadata->copy(); if ((ret = jitter->correct(metadata, 0, 0, SrsRtmpJitterAlgorithmFULL)) != ERROR_SUCCESS) { srs_freep(metadata); @@ -172,10 +174,12 @@ int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) return ret; } -int SrsForwarder::on_audio(SrsSharedPtrMessage* msg) +int SrsForwarder::on_audio(SrsSharedPtrMessage* __audio) { int ret = ERROR_SUCCESS; + SrsSharedPtrMessage* msg = __audio->copy(); + if ((ret = jitter->correct(msg, 0, 0, SrsRtmpJitterAlgorithmFULL)) != ERROR_SUCCESS) { srs_freep(msg); return ret; @@ -193,9 +197,11 @@ int SrsForwarder::on_audio(SrsSharedPtrMessage* msg) return ret; } -int SrsForwarder::on_video(SrsSharedPtrMessage* msg) +int SrsForwarder::on_video(SrsSharedPtrMessage* __video) { int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* msg = __video->copy(); if ((ret = jitter->correct(msg, 0, 0, SrsRtmpJitterAlgorithmFULL)) != ERROR_SUCCESS) { srs_freep(msg); diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index c2fdb6f57..0c8ca24fd 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -80,9 +80,21 @@ public: public: virtual int on_publish(); virtual void on_unpublish(); - virtual int on_meta_data(SrsSharedPtrMessage* metadata); - virtual int on_audio(SrsSharedPtrMessage* msg); - virtual int on_video(SrsSharedPtrMessage* msg); + /** + * forward the audio packet. + * @param __metadata, directly ptr, copy it if need to save it. + */ + virtual int on_meta_data(SrsSharedPtrMessage* __metadata); + /** + * forward the audio packet. + * @param __audio, directly ptr, copy it if need to save it. + */ + virtual int on_audio(SrsSharedPtrMessage* __audio); + /** + * forward the video packet. + * @param __video, directly ptr, copy it if need to save it. + */ + virtual int on_video(SrsSharedPtrMessage* __video); // interface ISrsThreadHandler. public: virtual int cycle(); diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 1f0c6bba0..d36e708c6 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -1426,15 +1426,16 @@ int SrsHls::on_meta_data(SrsAmf0Object* metadata) return ret; } -int SrsHls::on_audio(SrsSharedPtrMessage* audio) +int SrsHls::on_audio(SrsSharedPtrMessage* __audio) { int ret = ERROR_SUCCESS; - SrsAutoFree(SrsSharedPtrMessage, audio); - if (!hls_enabled) { return ret; } + + SrsSharedPtrMessage* audio = __audio->copy(); + SrsAutoFree(SrsSharedPtrMessage, audio); sample->clear(); if ((ret = codec->audio_aac_demux(audio->payload, audio->size, sample)) != ERROR_SUCCESS) { @@ -1470,15 +1471,16 @@ int SrsHls::on_audio(SrsSharedPtrMessage* audio) return ret; } -int SrsHls::on_video(SrsSharedPtrMessage* video) +int SrsHls::on_video(SrsSharedPtrMessage* __video) { int ret = ERROR_SUCCESS; - SrsAutoFree(SrsSharedPtrMessage, video); - if (!hls_enabled) { return ret; } + + SrsSharedPtrMessage* video = __video->copy(); + SrsAutoFree(SrsSharedPtrMessage, video); sample->clear(); if ((ret = codec->video_avc_demux(video->payload, video->size, sample)) != ERROR_SUCCESS) { diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index 8b77945b6..067a16822 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -326,12 +326,14 @@ public: virtual int on_meta_data(SrsAmf0Object* metadata); /** * mux the audio packets to ts. + * @param __audio, directly ptr, copy it if need to save it. */ - virtual int on_audio(SrsSharedPtrMessage* audio); + virtual int on_audio(SrsSharedPtrMessage* __audio); /** * mux the video packets to ts. + * @param __video, directly ptr, copy it if need to save it. */ - virtual int on_video(SrsSharedPtrMessage* video); + virtual int on_video(SrsSharedPtrMessage* __video); private: virtual void hls_mux(); }; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index cec809e64..21ea77f31 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -332,10 +332,12 @@ int SrsConsumer::get_time() return jitter->get_time(); } -int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag) +int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag) { int ret = ERROR_SUCCESS; + SrsSharedPtrMessage* msg = __msg->copy(); + if (!atc) { if ((ret = jitter->correct(msg, tba, tbv, ag)) != ERROR_SUCCESS) { srs_freep(msg); @@ -409,7 +411,7 @@ void SrsGopCache::set(bool enabled) srs_info("enable gop cache"); } -int SrsGopCache::cache(SrsSharedPtrMessage* msg) +int SrsGopCache::cache(SrsSharedPtrMessage* __msg) { int ret = ERROR_SUCCESS; @@ -418,6 +420,9 @@ int SrsGopCache::cache(SrsSharedPtrMessage* msg) return ret; } + // the gop cache know when to gop it. + SrsSharedPtrMessage* msg = __msg; + // disable gop cache when not h.264 if (!SrsFlvCodec::video_is_h264(msg->payload, msg->size)) { srs_info("gop donot cache video for none h.264"); @@ -840,15 +845,15 @@ int SrsSource::on_forwarder_start(SrsForwarder* forwarder) // feed the forwarder the metadata/sequence header, // when reload to enable the forwarder. - if (cache_metadata && (ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) { + if (cache_metadata && (ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) { srs_error("forwarder process onMetaData message failed. ret=%d", ret); return ret; } - if (cache_sh_video && (ret = forwarder->on_video(cache_sh_video->copy())) != ERROR_SUCCESS) { + if (cache_sh_video && (ret = forwarder->on_video(cache_sh_video)) != ERROR_SUCCESS) { srs_error("forwarder process video sequence header message failed. ret=%d", ret); return ret; } - if (cache_sh_audio && (ret = forwarder->on_audio(cache_sh_audio->copy())) != ERROR_SUCCESS) { + if (cache_sh_audio && (ret = forwarder->on_audio(cache_sh_audio)) != ERROR_SUCCESS) { srs_error("forwarder process audio sequence header message failed. ret=%d", ret); return ret; } @@ -865,11 +870,11 @@ int SrsSource::on_hls_start() // when reload to start hls, hls will never get the sequence header in stream, // use the SrsSource.on_hls_start to push the sequence header to HLS. // TODO: maybe need to decode the metadata? - if (cache_sh_video && (ret = hls->on_video(cache_sh_video->copy())) != ERROR_SUCCESS) { + if (cache_sh_video && (ret = hls->on_video(cache_sh_video)) != ERROR_SUCCESS) { srs_error("hls process video sequence header message failed. ret=%d", ret); return ret; } - if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio->copy())) != ERROR_SUCCESS) { + if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio)) != ERROR_SUCCESS) { srs_error("hls process audio sequence header message failed. ret=%d", ret); return ret; } @@ -908,11 +913,11 @@ int SrsSource::on_dvr_request_sh() } } - if (cache_sh_video && (ret = dvr->on_video(cache_sh_video->copy())) != ERROR_SUCCESS) { + if (cache_sh_video && (ret = dvr->on_video(cache_sh_video)) != ERROR_SUCCESS) { srs_error("dvr process video sequence header message failed. ret=%d", ret); return ret; } - if (cache_sh_audio && (ret = dvr->on_audio(cache_sh_audio->copy())) != ERROR_SUCCESS) { + if (cache_sh_audio && (ret = dvr->on_audio(cache_sh_audio)) != ERROR_SUCCESS) { srs_error("dvr process audio sequence header message failed. ret=%d", ret); return ret; } @@ -1048,8 +1053,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata std::vector::iterator it; for (it = consumers.begin(); it != consumers.end(); ++it) { SrsConsumer* consumer = *it; - SrsSharedPtrMessage* copy = cache_metadata->copy(); - if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { + if ((ret = consumer->enqueue(cache_metadata, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { srs_error("dispatch the metadata failed. ret=%d", ret); return ret; } @@ -1062,7 +1066,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata std::vector::iterator it; for (it = forwarders.begin(); it != forwarders.end(); ++it) { SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) { + if ((ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) { srs_error("forwarder process onMetaData message failed. ret=%d", ret); return ret; } @@ -1086,7 +1090,7 @@ int SrsSource::on_audio(SrsCommonMessage* __audio) srs_verbose("initialize shared ptr audio success."); #ifdef SRS_AUTO_HLS - if ((ret = hls->on_audio(msg.copy())) != ERROR_SUCCESS) { + if ((ret = hls->on_audio(&msg)) != ERROR_SUCCESS) { srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret); // unpublish, ignore ret. @@ -1098,7 +1102,7 @@ int SrsSource::on_audio(SrsCommonMessage* __audio) #endif #ifdef SRS_AUTO_DVR - if ((ret = dvr->on_audio(msg.copy())) != ERROR_SUCCESS) { + if ((ret = dvr->on_audio(&msg)) != ERROR_SUCCESS) { srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret); // unpublish, ignore ret. @@ -1115,8 +1119,7 @@ int SrsSource::on_audio(SrsCommonMessage* __audio) SrsConsumer** pconsumer = consumers.data(); for (int i = 0; i < nb_consumers; i++) { SrsConsumer* consumer = pconsumer[i]; - SrsSharedPtrMessage* copy = msg.copy(); - if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { + if ((ret = consumer->enqueue(&msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { srs_error("dispatch the audio failed. ret=%d", ret); return ret; } @@ -1197,7 +1200,7 @@ int SrsSource::on_video(SrsCommonMessage* __video) srs_verbose("initialize shared ptr video success."); #ifdef SRS_AUTO_HLS - if ((ret = hls->on_video(msg.copy())) != ERROR_SUCCESS) { + if ((ret = hls->on_video(&msg)) != ERROR_SUCCESS) { srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret); // unpublish, ignore ret. @@ -1209,7 +1212,7 @@ int SrsSource::on_video(SrsCommonMessage* __video) #endif #ifdef SRS_AUTO_DVR - if ((ret = dvr->on_video(msg.copy())) != ERROR_SUCCESS) { + if ((ret = dvr->on_video(&msg)) != ERROR_SUCCESS) { srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret); // unpublish, ignore ret. @@ -1224,8 +1227,7 @@ int SrsSource::on_video(SrsCommonMessage* __video) if (true) { for (int i = 0; i < (int)consumers.size(); i++) { SrsConsumer* consumer = consumers.at(i); - SrsSharedPtrMessage* copy = msg.copy(); - if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { + if ((ret = consumer->enqueue(&msg, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { srs_error("dispatch the video failed. ret=%d", ret); return ret; } @@ -1234,11 +1236,11 @@ int SrsSource::on_video(SrsCommonMessage* __video) } // copy to all forwarders. - if (true) { + if (!forwarders.empty()) { std::vector::iterator it; for (it = forwarders.begin(); it != forwarders.end(); ++it) { SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_video(msg.copy())) != ERROR_SUCCESS) { + if ((ret = forwarder->on_video(&msg)) != ERROR_SUCCESS) { srs_error("forwarder process video message failed. ret=%d", ret); return ret; } @@ -1512,20 +1514,20 @@ void SrsSource::on_unpublish() SrsRtmpJitterAlgorithm ag = jitter_algorithm; // copy metadata. - if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), atc, tba, tbv, ag)) != ERROR_SUCCESS) { + if (cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, tba, tbv, ag)) != ERROR_SUCCESS) { srs_error("dispatch metadata failed. ret=%d", ret); return ret; } srs_info("dispatch metadata success"); // copy sequence header - if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), atc, tba, tbv, ag)) != ERROR_SUCCESS) { + if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, tba, tbv, ag)) != ERROR_SUCCESS) { srs_error("dispatch video sequence header failed. ret=%d", ret); return ret; } srs_info("dispatch video sequence header success"); - if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), atc, tba, tbv, ag)) != ERROR_SUCCESS) { + if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, tba, tbv, ag)) != ERROR_SUCCESS) { srs_error("dispatch audio sequence header failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index ea135f683..69da17a7d 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -184,6 +184,7 @@ public: virtual int get_time(); /** * enqueue an shared ptr message. + * @param __msg, directly ptr, copy it if need to save it. * @param whether atc, donot use jitter correct if true. * @param tba timebase of audio. * used to calc the audio time delta if time-jitter detected. @@ -191,7 +192,7 @@ public: * used to calc the video time delta if time-jitter detected. * @param ag the algorithm of time jitter. */ - virtual int enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag); + virtual int enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag); /** * get packets in consumer queue. * @param msgs the msgs array to dump packets to send. @@ -252,8 +253,9 @@ public: * only for h264 codec * 1. cache the gop when got h264 video packet. * 2. clear gop when got keyframe. + * @param __msg, directly ptr, copy it if need to save it. */ - virtual int cache(SrsSharedPtrMessage* msg); + virtual int cache(SrsSharedPtrMessage* __msg); /** * clear the gop cache. */