diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 0e30be98c..287181244 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -182,7 +182,7 @@ void SrsMessageQueue::set_queue_size(double queue_size) queue_size_ms = (int)(queue_size * 1000); } -int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) +int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) { int ret = ERROR_SUCCESS; @@ -197,6 +197,11 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) msgs.push_back(msg); while (av_end_time - av_start_time > queue_size_ms) { + // notice the caller queue already overflow and shrinked. + if (is_overflow) { + *is_overflow = true; + } + shrink(); } @@ -310,10 +315,23 @@ SrsConsumer::SrsConsumer(SrsSource* _source) mw_duration = 0; mw_waiting = false; #endif + +#ifdef SRS_PERF_QUEUE_FAST_CACHE + mw_cache = new SrsMessageArray(SRS_PERF_MW_MSGS); + mw_count = 0; + mw_first_pkt = mw_last_pkt = 0; +#endif } SrsConsumer::~SrsConsumer() { +#ifdef SRS_PERF_QUEUE_FAST_CACHE + if (mw_cache) { + mw_cache->free(mw_count); + mw_count = 0; + } + srs_freep(mw_cache); +#endif source->on_consumer_destroy(this); srs_freep(jitter); srs_freep(queue); @@ -351,11 +369,37 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv, } } - if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) { +#ifdef SRS_PERF_QUEUE_FAST_CACHE + // use fast cache if available + if (mw_count < mw_cache->max) { + // update fast cache timestamps + if (mw_count == 0) { + mw_first_pkt = msg->timestamp; + } + mw_last_pkt = msg->timestamp; + + mw_cache->msgs[mw_count++] = msg; + } else{ + // fast cache is full, use queue. + bool is_overflow = false; + if ((ret = queue->enqueue(msg, &is_overflow)) != ERROR_SUCCESS) { + return ret; + } + // when overflow, clear cache and refresh the fast cache. + if (is_overflow) { + mw_cache->free(mw_count); + if ((ret = dumps_queue_to_fast_cache()) != ERROR_SUCCESS) { + return ret; + } + } + } +#else + if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) { return ret; } +#endif - #ifdef SRS_PERF_QUEUE_COND_WAIT +#ifdef SRS_PERF_QUEUE_COND_WAIT // fire the mw when msgs is enough. if (mw_waiting) { int duration_ms = queue->duration(); @@ -367,7 +411,7 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* __msg, bool atc, int tba, int tbv, mw_waiting = false; } } - #endif +#endif return ret; } @@ -388,12 +432,30 @@ int SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) return ret; } +#ifdef SRS_PERF_QUEUE_FAST_CACHE + // only dumps an whole array to msgs. + for (int i = 0; i < mw_count; i++) { + msgs->msgs[i] = mw_cache->msgs[i]; + } + count = mw_count; + + // when fast cache is not filled, + // we donot check the queue, direclty zero fast cache. + if (mw_count < mw_cache->max) { + mw_count = 0; + mw_first_pkt = mw_last_pkt = 0; + return ret; + } + + return dumps_queue_to_fast_cache(); +#else // pump msgs from queue. if ((ret = queue->dump_packets(msgs->max, msgs->msgs, count)) != ERROR_SUCCESS) { return ret; } return ret; +#endif } #ifdef SRS_PERF_QUEUE_COND_WAIT @@ -402,6 +464,18 @@ void SrsConsumer::wait(int nb_msgs, int duration) mw_min_msgs = nb_msgs; mw_duration = duration; +#ifdef SRS_PERF_QUEUE_FAST_CACHE + // when fast cache not overflow, always flush. + // so we donot care about the queue. + bool fast_cache_overflow = mw_count >= mw_cache->max; + int duration_ms = (int)(mw_last_pkt - mw_first_pkt); + bool match_min_msgs = mw_count > mw_min_msgs; + + // when fast cache overflow, or duration ok, signal to flush. + if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) { + return; + } +#else int duration_ms = queue->duration(); bool match_min_msgs = queue->size() > mw_min_msgs; @@ -409,6 +483,7 @@ void SrsConsumer::wait(int nb_msgs, int duration) if (match_min_msgs && duration_ms > mw_duration) { return; } +#endif // the enqueue will notify this cond. mw_waiting = true; @@ -427,6 +502,28 @@ int SrsConsumer::on_play_client_pause(bool is_pause) return ret; } +#ifdef SRS_PERF_QUEUE_FAST_CACHE +int SrsConsumer::dumps_queue_to_fast_cache() +{ + int ret =ERROR_SUCCESS; + + // fill fast cache with queue. + if ((ret = queue->dump_packets(mw_cache->max, mw_cache->msgs, mw_count)) != ERROR_SUCCESS) { + return ret; + } + // set the timestamp when got message. + if (mw_count > 0) { + SrsSharedPtrMessage* first_msg = mw_cache->msgs[0]; + mw_first_pkt = first_msg->timestamp; + + SrsSharedPtrMessage* last_msg = mw_cache->msgs[mw_count - 1]; + mw_last_pkt = last_msg->timestamp; + } + + return ret; +} +#endif + SrsGopCache::SrsGopCache() { cached_video_count = 0; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 68893de62..d1746c0ff 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -134,8 +134,9 @@ public: /** * enqueue the message, the timestamp always monotonically. * @param msg, the msg to enqueue, user never free it whatever the return code. + * @param is_overflow, whether overflow and shrinked. NULL to ignore. */ - virtual int enqueue(SrsSharedPtrMessage* msg); + virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL); /** * get packets in consumer queue. * @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it. @@ -172,6 +173,16 @@ private: int mw_min_msgs; int mw_duration; #endif +#ifdef SRS_PERF_QUEUE_FAST_CACHE + // use fast cache for msgs + // @see https://github.com/winlinvip/simple-rtmp-server/issues/251 + SrsMessageArray* mw_cache; + // the count of msg in fast cache. + int mw_count; + // the packet time in fast cache. + int64_t mw_first_pkt; + int64_t mw_last_pkt; +#endif public: SrsConsumer(SrsSource* _source); virtual ~SrsConsumer(); @@ -219,6 +230,14 @@ public: * when client send the pause message. */ virtual int on_play_client_pause(bool is_pause); +private: +#ifdef SRS_PERF_QUEUE_FAST_CACHE + /** + * dumps the queue to fast cache, + * when fast cache is clear or queue is overflow. + */ + virtual int dumps_queue_to_fast_cache(); +#endif }; /** diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index ddf9b0d4f..9c6a2092d 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 64 +#define VERSION_REVISION 65 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server" diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 9bb640965..ffb4788d5 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -128,6 +128,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #undef SRS_PERF_MW_SO_RCVBUF /** +* whether enable the fast cache. +* @remark this improve performance for large connectios. +* @remark this also introduce complex, default to disable it. +* @see https://github.com/winlinvip/simple-rtmp-server/issues/251 +*/ +#undef SRS_PERF_QUEUE_FAST_CACHE +/** * whether use cond wait to send messages. * @remark this improve performance for large connectios. * @see https://github.com/winlinvip/simple-rtmp-server/issues/251