diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh old mode 100644 new mode 100755 index 4c5168994..eaf8fdbfa --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -5,6 +5,10 @@ help=no SRS_HLS=RESERVED SRS_SSL=RESERVED +# TODO: remove the default to yes. +SRS_HLS=YES +SRS_SSL=YES + opt= for option diff --git a/trunk/configure b/trunk/configure index d42c514d9..a7b8d9fb5 100755 --- a/trunk/configure +++ b/trunk/configure @@ -6,14 +6,14 @@ SRS_AUTO_HEADERS_H="${SRS_OBJS}/srs_auto_headers.hpp" mkdir -p ${SRS_OBJS} +# parse user options. +. auto/options.sh + # clean the exists if [[ -f Makefile ]]; then make clean fi -# parse user options. -. auto/options.sh - # generate the audo headers file. echo "#define SRS_CONFIGURE \"${SRS_CONFIGURE}\"" > $SRS_AUTO_HEADERS_H @@ -92,7 +92,7 @@ MODULE_FILES=("srs_core" "srs_core_log" "srs_core_server" "srs_core_stream" "srs_core_source" "srs_core_codec" "srs_core_handshake" "srs_core_pithy_print" "srs_core_config" "srs_core_refer" "srs_core_reload" - "srs_core_hls") + "srs_core_hls" "srs_core_forward") MODULE_DIR="src/core" . auto/modules.sh CORE_OBJS="${MODULE_OBJS[@]}" diff --git a/trunk/src/core/srs_core_forward.cpp b/trunk/src/core/srs_core_forward.cpp new file mode 100644 index 000000000..331a3da76 --- /dev/null +++ b/trunk/src/core/srs_core_forward.cpp @@ -0,0 +1,33 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +SrsForwarder::SrsForwarder() +{ +} + +SrsForwarder::~SrsForwarder() +{ +} + diff --git a/trunk/src/core/srs_core_forward.hpp b/trunk/src/core/srs_core_forward.hpp new file mode 100644 index 000000000..67d6c0b92 --- /dev/null +++ b/trunk/src/core/srs_core_forward.hpp @@ -0,0 +1,43 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_CORE_FORWARD_HPP +#define SRS_CORE_FORWARD_HPP + +/* +#include +*/ +#include + +/** +* forward the stream to other servers. +*/ +class SrsForwarder +{ +public: + SrsForwarder(); + virtual ~SrsForwarder(); +public: +}; + +#endif diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index cece1cd3c..9137db919 100644 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -31,6 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include #define CONST_MAX_JITTER_MS 500 #define DEFAULT_FRAME_TIME_MS 10 @@ -49,8 +50,8 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv) { int ret = ERROR_SUCCESS; - int audio_sample_rate = tba; - int video_frame_rate = tbv; + int sample_rate = tba; + int frame_rate = tbv; /** * we use a very simple time jitter detect/correct algorithm: @@ -68,10 +69,10 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv) // if jitter detected, reset the delta. if (delta < 0 || delta > CONST_MAX_JITTER_MS) { // calc the right diff by audio sample rate - if (msg->header.is_audio() && audio_sample_rate > 0) { - delta = (int32_t)(delta * 1000.0 / audio_sample_rate); - } else if (msg->header.is_video() && video_frame_rate > 0) { - delta = (int32_t)(delta * 1.0 / video_frame_rate); + if (msg->header.is_audio() && sample_rate > 0) { + delta = (int32_t)(delta * 1000.0 / sample_rate); + } else if (msg->header.is_video() && frame_rate > 0) { + delta = (int32_t)(delta * 1.0 / frame_rate); } else { delta = DEFAULT_FRAME_TIME_MS; } @@ -129,6 +130,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv) return ret; } + // TODO: check the queue size and drop packets if overflow. msgs.push_back(msg); return ret; @@ -242,6 +244,96 @@ void SrsConsumer::clear() msgs.clear(); } +SrsGopCache::SrsGopCache() +{ + cached_video_count = 0; + enable_gop_cache = true; +} + +SrsGopCache::~SrsGopCache() +{ + clear(); +} + +void SrsGopCache::set(bool enabled) +{ + enable_gop_cache = enabled; + + if (!enabled) { + srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size()); + clear(); + return; + } + + srs_info("enable gop cache"); +} + +int SrsGopCache::cache(SrsSharedPtrMessage* msg) +{ + int ret = ERROR_SUCCESS; + + if (!enable_gop_cache) { + srs_verbose("gop cache is disabled."); + return ret; + } + + // got video, update the video count if acceptable + if (msg->header.is_video()) { + cached_video_count++; + } + + // no acceptable video or pure audio, disable the cache. + if (cached_video_count == 0) { + srs_verbose("ignore any frame util got a h264 video frame."); + return ret; + } + + // clear gop cache when got key frame + if (msg->header.is_video() && SrsCodec::video_is_keyframe(msg->payload, msg->size)) { + srs_info("clear gop cache when got keyframe. vcount=%d, count=%d", + cached_video_count, (int)gop_cache.size()); + + clear(); + + // curent msg is video frame, so we set to 1. + cached_video_count = 1; + } + + // cache the frame. + gop_cache.push_back(msg->copy()); + + return ret; +} + +void SrsGopCache::clear() +{ + std::vector::iterator it; + for (it = gop_cache.begin(); it != gop_cache.end(); ++it) { + SrsSharedPtrMessage* msg = *it; + srs_freep(msg); + } + gop_cache.clear(); + + cached_video_count = 0; +} + +int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv) +{ + int ret = ERROR_SUCCESS; + + std::vector::iterator it; + for (it = gop_cache.begin(); it != gop_cache.end(); ++it) { + SrsSharedPtrMessage* msg = *it; + if ((ret = consumer->enqueue(msg->copy(), tba, tbv)) != ERROR_SUCCESS) { + srs_error("dispatch cached gop failed. ret=%d", ret); + return ret; + } + } + srs_trace("dispatch cached gop success. count=%d, duration=%d", (int)gop_cache.size(), consumer->get_time()); + + return ret; +} + std::map SrsSource::pool; SrsSource* SrsSource::find(std::string stream_url) @@ -264,11 +356,10 @@ SrsSource::SrsSource(std::string _stream_url) cache_metadata = cache_sh_video = cache_sh_audio = NULL; - cached_video_count = 0; - enable_gop_cache = true; - - video_frame_rate = audio_sample_rate = 0; + frame_rate = sample_rate = 0; _can_publish = true; + + gop_cache = new SrsGopCache(); } SrsSource::~SrsSource() @@ -280,12 +371,12 @@ SrsSource::~SrsSource() } consumers.clear(); - clear_gop_cache(); - srs_freep(cache_metadata); srs_freep(cache_sh_video); srs_freep(cache_sh_audio); + srs_freep(gop_cache); + #ifdef SRS_HLS srs_freep(hls); #endif @@ -313,12 +404,12 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata SrsAmf0Any* prop = NULL; if ((prop = metadata->metadata->get_property("audiosamplerate")) != NULL) { if (prop->is_number()) { - audio_sample_rate = (int)(srs_amf0_convert(prop)->value); + sample_rate = (int)(srs_amf0_convert(prop)->value); } } if ((prop = metadata->metadata->get_property("framerate")) != NULL) { if (prop->is_number()) { - video_frame_rate = (int)(srs_amf0_convert(prop)->value); + frame_rate = (int)(srs_amf0_convert(prop)->value); } } @@ -354,7 +445,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata std::vector::iterator it; for (it = consumers.begin(); it != consumers.end(); ++it) { SrsConsumer* consumer = *it; - if ((ret = consumer->enqueue(cache_metadata->copy(), audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) { + if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch the metadata failed. ret=%d", ret); return ret; } @@ -387,7 +478,7 @@ int SrsSource::on_audio(SrsCommonMessage* audio) std::vector::iterator it; for (it = consumers.begin(); it != consumers.end(); ++it) { SrsConsumer* consumer = *it; - if ((ret = consumer->enqueue(msg->copy(), audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) { + if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch the audio failed. ret=%d", ret); return ret; } @@ -403,7 +494,7 @@ int SrsSource::on_audio(SrsCommonMessage* audio) } // cache the last gop packets - if ((ret = cache_last_gop(msg)) != ERROR_SUCCESS) { + if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { srs_error("shrink gop cache failed. ret=%d", ret); return ret; } @@ -435,7 +526,7 @@ int SrsSource::on_video(SrsCommonMessage* video) std::vector::iterator it; for (it = consumers.begin(); it != consumers.end(); ++it) { SrsConsumer* consumer = *it; - if ((ret = consumer->enqueue(msg->copy(), audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) { + if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch the video failed. ret=%d", ret); return ret; } @@ -451,7 +542,7 @@ int SrsSource::on_video(SrsCommonMessage* video) } // cache the last gop packets - if ((ret = cache_last_gop(msg)) != ERROR_SUCCESS) { + if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { srs_error("shrink gop cache failed. ret=%d", ret); return ret; } @@ -480,10 +571,10 @@ void SrsSource::on_unpublish() hls->on_unpublish(); #endif - clear_gop_cache(); + gop_cache->clear(); srs_freep(cache_metadata); - video_frame_rate = audio_sample_rate = 0; + frame_rate = sample_rate = 0; srs_freep(cache_sh_video); srs_freep(cache_sh_audio); @@ -500,33 +591,27 @@ void SrsSource::on_unpublish() consumer = new SrsConsumer(this); consumers.push_back(consumer); - if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) { + if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch metadata failed. ret=%d", ret); return ret; } srs_info("dispatch metadata success"); - if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) { + if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch video sequence header failed. ret=%d", ret); return ret; } srs_info("dispatch video sequence header success"); - if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) { + if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch audio sequence header failed. ret=%d", ret); return ret; } srs_info("dispatch audio sequence header success"); - std::vector::iterator it; - for (it = gop_cache.begin(); it != gop_cache.end(); ++it) { - SrsSharedPtrMessage* msg = *it; - if ((ret = consumer->enqueue(msg->copy(), audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) { - srs_error("dispatch cached gop failed. ret=%d", ret); - return ret; - } + if ((ret = gop_cache->dump(consumer, sample_rate, frame_rate)) != ERROR_SUCCESS) { + return ret; } - srs_trace("dispatch cached gop success. count=%d, duration=%d", (int)gop_cache.size(), consumer->get_time()); return ret; } @@ -543,63 +628,6 @@ void SrsSource::on_consumer_destroy(SrsConsumer* consumer) void SrsSource::set_cache(bool enabled) { - enable_gop_cache = enabled; - - if (!enabled) { - srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size()); - clear_gop_cache(); - return; - } - - srs_info("enable gop cache"); -} - -int SrsSource::cache_last_gop(SrsSharedPtrMessage* msg) -{ - int ret = ERROR_SUCCESS; - - if (!enable_gop_cache) { - srs_verbose("gop cache is disabled."); - return ret; - } - - // got video, update the video count if acceptable - if (msg->header.is_video()) { - cached_video_count++; - } - - // no acceptable video or pure audio, disable the cache. - if (cached_video_count == 0) { - srs_verbose("ignore any frame util got a h264 video frame."); - return ret; - } - - // clear gop cache when got key frame - if (msg->header.is_video() && SrsCodec::video_is_keyframe(msg->payload, msg->size)) { - srs_info("clear gop cache when got keyframe. vcount=%d, count=%d", - cached_video_count, (int)gop_cache.size()); - - clear_gop_cache(); - - // curent msg is video frame, so we set to 1. - cached_video_count = 1; - } - - // cache the frame. - gop_cache.push_back(msg->copy()); - - return ret; -} - -void SrsSource::clear_gop_cache() -{ - std::vector::iterator it; - for (it = gop_cache.begin(); it != gop_cache.end(); ++it) { - SrsSharedPtrMessage* msg = *it; - srs_freep(msg); - } - gop_cache.clear(); - - cached_video_count = 0; + gop_cache->set(enabled); } diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index 2ed1e3fa6..14cfd7eea 100644 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -38,6 +38,7 @@ class SrsSource; class SrsCommonMessage; class SrsOnMetaDataPacket; class SrsSharedPtrMessage; +class SrsForwarder; #ifdef SRS_HLS class SrsHls; #endif @@ -112,27 +113,12 @@ private: }; /** -* live streaming source. +* cache a gop of video/audio data, +* delivery at the connect of flash player, +* to enable it to fast startup. */ -class SrsSource +class SrsGopCache { -private: - static std::map pool; -public: - /** - * find stream by vhost/app/stream. - * @stream_url the stream url, for example, myserver.xxx.com/app/stream - * @return the matched source, never be NULL. - * @remark stream_url should without port and schema. - */ - static SrsSource* find(std::string stream_url); -private: -#ifdef SRS_HLS - SrsHls* hls; -#endif - std::string stream_url; - std::vector consumers; -// gop cache for client fast startup. private: /** * if disabled the gop cache, @@ -148,15 +134,55 @@ private: * cached gop. */ std::vector gop_cache; +public: + SrsGopCache(); + virtual ~SrsGopCache(); +public: + virtual void set(bool enabled); + /** + * only for h264 codec + * 1. cache the gop when got h264 video packet. + * 2. clear gop when got keyframe. + */ + virtual int cache(SrsSharedPtrMessage* msg); + virtual void clear(); + virtual int dump(SrsConsumer* consumer, int tba, int tbv); +}; + +/** +* live streaming source. +*/ +class SrsSource +{ +private: + static std::map pool; +public: + /** + * find stream by vhost/app/stream. + * @stream_url the stream url, for example, myserver.xxx.com/app/stream + * @return the matched source, never be NULL. + * @remark stream_url should without port and schema. + */ + static SrsSource* find(std::string stream_url); +private: + std::string stream_url; + std::vector consumers; +private: + // hls handler. +#ifdef SRS_HLS + SrsHls* hls; +#endif + // gop cache for client fast startup. + SrsGopCache* gop_cache; private: /** * the sample rate of audio in metadata. */ - int audio_sample_rate; + int sample_rate; /** * the video frame rate in metadata. */ - int video_frame_rate; + int frame_rate; /** * can publish, true when is not streaming */ @@ -181,14 +207,6 @@ public: virtual int create_consumer(SrsConsumer*& consumer); virtual void on_consumer_destroy(SrsConsumer* consumer); virtual void set_cache(bool enabled); -private: - /** - * only for h264 codec - * 1. cache the gop when got h264 video packet. - * 2. clear gop when got keyframe. - */ - virtual int cache_last_gop(SrsSharedPtrMessage* msg); - virtual void clear_gop_cache(); }; #endif \ No newline at end of file diff --git a/trunk/src/srs/srs.upp b/trunk/src/srs/srs.upp index 08dcbb7a0..614993b14 100755 --- a/trunk/src/srs/srs.upp +++ b/trunk/src/srs/srs.upp @@ -24,6 +24,8 @@ file ..\core\srs_core_client.cpp, ..\core\srs_core_source.hpp, ..\core\srs_core_source.cpp, + ..\core\srs_core_forward.hpp, + ..\core\srs_core_forward.cpp, ..\core\srs_core_hls.hpp, ..\core\srs_core_hls.cpp, ..\core\srs_core_codec.hpp,