From 276bd2223e098d620613b0dc59616cc2009ba734 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 8 May 2021 10:04:44 +0800 Subject: [PATCH] SquashSRS4: Support circuit breaker --- README.md | 1 + trunk/conf/full.conf | 31 ++++++ trunk/configure | 2 +- trunk/src/app/srs_app_config.cpp | 120 +++++++++++++++++++++ trunk/src/app/srs_app_config.hpp | 9 ++ trunk/src/app/srs_app_hourglass.cpp | 3 + trunk/src/app/srs_app_rtc_conn.cpp | 39 ++++++- trunk/src/app/srs_app_rtc_conn.hpp | 2 + trunk/src/app/srs_app_rtc_queue.cpp | 19 ++++ trunk/src/app/srs_app_rtc_server.cpp | 1 + trunk/src/app/srs_app_rtc_source.cpp | 15 ++- trunk/src/app/srs_app_server.cpp | 4 - trunk/src/app/srs_app_threads.cpp | 149 +++++++++++++++++++++++++++ trunk/src/app/srs_app_threads.hpp | 67 ++++++++++++ trunk/src/core/srs_core_version4.hpp | 2 +- trunk/src/main/srs_main_server.cpp | 6 ++ 16 files changed, 458 insertions(+), 12 deletions(-) create mode 100644 trunk/src/app/srs_app_threads.cpp create mode 100644 trunk/src/app/srs_app_threads.hpp diff --git a/README.md b/README.md index a3c105810..14bec14a4 100755 --- a/README.md +++ b/README.md @@ -182,6 +182,7 @@ The ports used by SRS: ## V4 changes +* v4.0, 2021-05-07, RTC: Support circuit breaker. 4.0.103 * v4.0, 2021-05-07, RTC: Refine play stream find track. 4.0.102 * v4.0, 2021-05-07, RTC: Refine FastTimer to fixed interval. 4.0.101 * v4.0, 2021-05-06, RTC: Fix config bug for nack and twcc. 4.0.99 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 99a2265f4..ed655b423 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -111,6 +111,37 @@ auto_reload_for_docker on; # default: 0.8 tcmalloc_release_rate 0.8; +# For system circuit breaker. +circuit_breaker { + # Whether enable the circuit breaker. + # Default: on + enabled on; + # The CPU percent(0, 100) ever 1s, as system high water-level, which enable the circuit-break + # mechanism, for example, NACK will be disabled if high water-level. + # Default: 90 + high_threshold 90; + # Reset the high water-level, if number of pulse under high_threshold. + # @remark 0 to disable the high water-level. + # Default: 2 + high_pulse 2; + # The CPU percent(0, 100) ever 1s, as system critical water-level, which enable the circuit-break + # mechanism, for example, TWCC will be disabled if high water-level. + # @note All circuit-break mechanism of high-water-level scope are enabled in critical. + # Default: 95 + critical_threshold 95; + # Reset the critical water-level, if number of pulse under critical_threshold. + # @remark 0 to disable the critical water-level. + # Default: 1 + critical_pulse 1; + # If dying, also drop packets for players. + # Default: 99 + dying_threshold 99; + # If CPU exceed the dying_pulse times, enter dying. + # @remark 0 to disable the dying water-level. + # Default: 5 + dying_pulse 5; +} + ############################################################################################# # heartbeat/stats sections ############################################################################################# diff --git a/trunk/configure b/trunk/configure index 604017e28..945359482 100755 --- a/trunk/configure +++ b/trunk/configure @@ -274,7 +274,7 @@ MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_sourc "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" "srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec" "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" - "srs_app_coworkers" "srs_app_hybrid") + "srs_app_coworkers" "srs_app_hybrid" "srs_app_threads") if [[ $SRS_RTC == YES ]]; then MODULE_FILES+=("srs_app_rtc_conn" "srs_app_rtc_dtls" "srs_app_rtc_sdp" "srs_app_rtc_queue" "srs_app_rtc_server" "srs_app_rtc_source" "srs_app_rtc_api") diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 33edb4ff2..95b40bb40 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3608,6 +3608,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit" && n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker" && n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate" + && n != "circuit_breaker" ) { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str()); } @@ -4321,6 +4322,125 @@ double SrsConfig::tcmalloc_release_rate() return trr; } +bool SrsConfig::get_circuit_breaker() +{ + static bool DEFAULT = true; + + SrsConfDirective* conf = root->get("circuit_breaker"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("enabled"); + if (!conf) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + +int SrsConfig::get_high_threshold() +{ + static int DEFAULT = 90; + + SrsConfDirective* conf = root->get("circuit_breaker"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("high_threshold"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_high_pulse() +{ + static int DEFAULT = 2; + + SrsConfDirective* conf = root->get("circuit_breaker"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("high_pulse"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_critical_threshold() +{ + static int DEFAULT = 95; + + SrsConfDirective* conf = root->get("circuit_breaker"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("critical_threshold"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_critical_pulse() +{ + static int DEFAULT = 1; + + SrsConfDirective* conf = root->get("circuit_breaker"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("critical_pulse"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_dying_threshold() +{ + static int DEFAULT = 99; + + SrsConfDirective* conf = root->get("circuit_breaker"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("dying_threshold"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_dying_pulse() +{ + static int DEFAULT = 5; + + SrsConfDirective* conf = root->get("circuit_breaker"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("dying_pulse"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + vector SrsConfig::get_stream_casters() { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 9e6d84880..bda02696e 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -487,6 +487,15 @@ public: virtual bool auto_reload_for_docker(); // For tcmalloc, get the release rate. virtual double tcmalloc_release_rate(); +// Thread pool section. +public: + virtual bool get_circuit_breaker(); + virtual int get_high_threshold(); + virtual int get_high_pulse(); + virtual int get_critical_threshold(); + virtual int get_critical_pulse(); + virtual int get_dying_threshold(); + virtual int get_dying_pulse(); // stream_caster section public: // Get all stream_caster in config file. diff --git a/trunk/src/app/srs_app_hourglass.cpp b/trunk/src/app/srs_app_hourglass.cpp index 16bd200e6..f32424cbc 100644 --- a/trunk/src/app/srs_app_hourglass.cpp +++ b/trunk/src/app/srs_app_hourglass.cpp @@ -194,8 +194,11 @@ srs_error_t SrsFastTimer::cycle() return srs_error_wrap(err, "quit"); } + ++_srs_pps_timer->sugar; + for (int i = 0; i < (int)handlers_.size(); i++) { ISrsFastTimer* timer = handlers_.at(i); + if ((err = timer->on_timer(interval_)) != srs_success) { srs_freep(err); // Ignore any error for shared timer. } diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 57ddb76f8..1dc2245a1 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -57,6 +57,7 @@ using namespace std; #include #include #include +#include #include @@ -72,6 +73,8 @@ SrsPps* _srs_pps_conn = new SrsPps(); extern SrsPps* _srs_pps_snack; extern SrsPps* _srs_pps_snack2; +extern SrsPps* _srs_pps_snack3; +extern SrsPps* _srs_pps_snack4; extern SrsPps* _srs_pps_rnack; extern SrsPps* _srs_pps_rnack2; @@ -532,6 +535,7 @@ srs_error_t SrsRtcPlayStream::start() } // The timer for play, process TWCC in the future. + // @see SrsRtcPlayStream::on_timer() _srs_hybrid->timer1s()->subscribe(this); if ((err = pli_worker_->start()) != srs_success) { @@ -1053,6 +1057,7 @@ srs_error_t SrsRtcPublishStream::start() } // For publisher timer, such as TWCC and RR. + // @see SrsRtcPublishStream::on_timer() _srs_hybrid->timer100ms()->subscribe(this); if ((err = source->on_publish()) != srs_success) { @@ -1299,6 +1304,12 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket2*& pkt, SrsBuf } } + // If circuit-breaker is enabled, disable nack. + if (_srs_circuit_breaker->hybrid_critical_water_level()) { + ++_srs_pps_snack4->sugar; + return err; + } + // For NACK to handle packet. // @remark Note that the pkt might be set to NULL. if (nack_enabled_) { @@ -1547,6 +1558,12 @@ srs_error_t SrsRtcPublishStream::on_timer(srs_utime_t interval) if (twcc_enabled_) { ++_srs_pps_twcc->sugar; + // If circuit-breaker is dropping packet, disable TWCC. + if (_srs_circuit_breaker->hybrid_critical_water_level()) { + ++_srs_pps_snack4->sugar; + return err; + } + // We should not depends on the received packet, // instead we should send feedback every Nms. if ((err = send_periodic_twcc()) != srs_success) { @@ -1701,6 +1718,8 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) pp_address_change = new SrsErrorPithyPrint(); pli_epp = new SrsErrorPithyPrint(); + nack_enabled_ = false; + _srs_rtc_manager->subscribe(this); } @@ -1954,14 +1973,18 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st } // The RTC connection start a timer, handle nacks. + // @see SrsRtcConnection::on_timer() _srs_hybrid->timer20ms()->subscribe(this); // TODO: FIXME: Support reload. session_timeout = _srs_config->get_rtc_stun_timeout(req->vhost); last_stun_time = srs_get_system_time(); - srs_trace("RTC init session, user=%s, url=%s, encrypt=%u/%u, DTLS(role=%s, version=%s), timeout=%dms", username.c_str(), - r->get_stream_url().c_str(), dtls, srtp, cfg->dtls_role.c_str(), cfg->dtls_version.c_str(), srsu2msi(session_timeout)); + nack_enabled_ = _srs_config->get_rtc_nack_enabled(req->vhost); + + srs_trace("RTC init session, user=%s, url=%s, encrypt=%u/%u, DTLS(role=%s, version=%s), timeout=%dms, nack=%d", + username.c_str(), r->get_stream_url().c_str(), dtls, srtp, cfg->dtls_role.c_str(), cfg->dtls_version.c_str(), + srsu2msi(session_timeout), nack_enabled_); return err; } @@ -2332,11 +2355,17 @@ srs_error_t SrsRtcConnection::on_timer(srs_utime_t interval) { srs_error_t err = srs_success; + if (!nack_enabled_) { + return err; + } + ++_srs_pps_conn->sugar; - // For publisher to send NACK. - // TODO: FIXME: Merge with hybrid system clock. - srs_update_system_time(); + // If circuit-breaker is enabled, disable nack. + if (_srs_circuit_breaker->hybrid_critical_water_level()) { + ++_srs_pps_snack4->sugar; + return err; + } std::map::iterator it; for (it = publishers_.begin(); it != publishers_.end(); it++) { diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index ab3d0a4e0..66eaada69 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -454,6 +454,8 @@ private: SrsErrorPithyPrint* pp_address_change; // Pithy print for PLI request. SrsErrorPithyPrint* pli_epp; +private: + bool nack_enabled_; public: SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid); virtual ~SrsRtcConnection(); diff --git a/trunk/src/app/srs_app_rtc_queue.cpp b/trunk/src/app/srs_app_rtc_queue.cpp index 7a19f2b57..c398fcc5f 100644 --- a/trunk/src/app/srs_app_rtc_queue.cpp +++ b/trunk/src/app/srs_app_rtc_queue.cpp @@ -33,6 +33,12 @@ using namespace std; #include #include #include +#include + +#include + +extern SrsPps* _srs_pps_snack3; +extern SrsPps* _srs_pps_snack4; SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity) { @@ -228,6 +234,12 @@ SrsRtpNackForReceiver::~SrsRtpNackForReceiver() void SrsRtpNackForReceiver::insert(uint16_t first, uint16_t last) { + // If circuit-breaker is enabled, disable nack. + if (_srs_circuit_breaker->hybrid_high_water_level()) { + ++_srs_pps_snack4->sugar; + return; + } + for (uint16_t s = first; s != last; ++s) { queue_[s] = SrsRtpNackInfo(); } @@ -259,6 +271,13 @@ void SrsRtpNackForReceiver::check_queue_size() void SrsRtpNackForReceiver::get_nack_seqs(SrsRtcpNack& seqs, uint32_t& timeout_nacks) { + // If circuit-breaker is enabled, disable nack. + if (_srs_circuit_breaker->hybrid_high_water_level()) { + queue_.clear(); + ++_srs_pps_snack4->sugar; + return; + } + srs_utime_t now = srs_get_system_time(); srs_utime_t interval = now - pre_check_time_; diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 1fa107b60..2e9bc9f65 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -281,6 +281,7 @@ srs_error_t SrsRtcServer::initialize() srs_error_t err = srs_success; // The RTC server start a timer, do routines of RTC server. + // @see SrsRtcServer::on_timer() _srs_hybrid->timer5s()->subscribe(this); // Initialize the black hole. diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 8ecabee05..c696cb0da 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #ifdef SRS_FFMPEG_FIT #include @@ -53,6 +54,8 @@ // The NACK sent by us(SFU). SrsPps* _srs_pps_snack = new SrsPps(); SrsPps* _srs_pps_snack2 = new SrsPps(); +SrsPps* _srs_pps_snack3 = new SrsPps(); +SrsPps* _srs_pps_snack4 = new SrsPps(); SrsPps* _srs_pps_sanack = new SrsPps(); SrsPps* _srs_pps_svnack = new SrsPps(); @@ -61,6 +64,8 @@ SrsPps* _srs_pps_rnack2 = new SrsPps(); SrsPps* _srs_pps_rhnack = new SrsPps(); SrsPps* _srs_pps_rmnack = new SrsPps(); +extern SrsPps* _srs_pps_aloss2; + // Firefox defaults as 109, Chrome is 111. const int kAudioPayloadType = 111; const int kAudioChannel = 2; @@ -499,8 +504,10 @@ srs_error_t SrsRtcStream::on_publish() return srs_error_wrap(err, "bridger on publish"); } - // For SrsRtcStream::on_timer() + // The PLI interval for RTC2RTMP. pli_for_rtmp_ = _srs_config->get_rtc_pli_for_rtmp(req->vhost); + + // @see SrsRtcStream::on_timer() _srs_hybrid->timer100ms()->subscribe(this); } @@ -576,6 +583,12 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; + // If circuit-breaker is dying, drop packet. + if (_srs_circuit_breaker->hybrid_dying_water_level()) { + _srs_pps_aloss2->sugar += (int64_t)consumers.size(); + return err; + } + for (int i = 0; i < (int)consumers.size(); i++) { SrsRtcConsumer* consumer = consumers.at(i); if ((err = consumer->enqueue(pkt->copy())) != srs_success) { diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index da93d1c4f..f33b081e4 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1261,9 +1261,6 @@ srs_error_t SrsServer::setup_ticks() if ((err = timer_->tick(2, 3 * SRS_UTIME_SECONDS)) != srs_success) { return srs_error_wrap(err, "tick"); } - if ((err = timer_->tick(3, 3 * SRS_UTIME_SECONDS)) != srs_success) { - return srs_error_wrap(err, "tick"); - } if ((err = timer_->tick(4, 6 * SRS_UTIME_SECONDS)) != srs_success) { return srs_error_wrap(err, "tick"); } @@ -1305,7 +1302,6 @@ srs_error_t SrsServer::notify(int event, srs_utime_t interval, srs_utime_t tick) switch (event) { case 2: srs_update_system_rusage(); break; - case 3: srs_update_proc_stat(); break; case 4: srs_update_disk_stat(); break; case 5: srs_update_meminfo(); break; case 6: srs_update_platform_info(); break; diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp new file mode 100644 index 000000000..77ee3d1b7 --- /dev/null +++ b/trunk/src/app/srs_app_threads.cpp @@ -0,0 +1,149 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +#include +#include + +#include +using namespace std; + +#include + +extern SrsPps* _srs_pps_snack2; +extern SrsPps* _srs_pps_snack3; +extern SrsPps* _srs_pps_snack4; + +SrsPps* _srs_pps_aloss2 = new SrsPps(); + +SrsCircuitBreaker::SrsCircuitBreaker() +{ + enabled_ = false; + high_threshold_ = 0; + high_pulse_ = 0; + critical_threshold_ = 0; + critical_pulse_ = 0; + dying_threshold_ = 0; + dying_pulse_ = 0; + + hybrid_high_water_level_ = 0; + hybrid_critical_water_level_ = 0; + hybrid_dying_water_level_ = 0; +} + +SrsCircuitBreaker::~SrsCircuitBreaker() +{ +} + +srs_error_t SrsCircuitBreaker::initialize() +{ + srs_error_t err = srs_success; + + enabled_ = _srs_config->get_circuit_breaker(); + high_threshold_ = _srs_config->get_high_threshold(); + high_pulse_ = _srs_config->get_high_pulse(); + critical_threshold_ = _srs_config->get_critical_threshold(); + critical_pulse_ = _srs_config->get_critical_pulse(); + dying_threshold_ = _srs_config->get_dying_threshold(); + dying_pulse_ = _srs_config->get_dying_pulse(); + + // Update the water level for circuit breaker. + // @see SrsCircuitBreaker::on_timer() + _srs_hybrid->timer1s()->subscribe(this); + + srs_trace("CircuitBreaker: enabled=%d, high=%dx%d, critical=%dx%d, dying=%dx%d", enabled_, + high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, + dying_pulse_, dying_threshold_); + + return err; +} + +bool SrsCircuitBreaker::hybrid_high_water_level() +{ + return enabled_ && hybrid_critical_water_level() || hybrid_high_water_level_; +} + +bool SrsCircuitBreaker::hybrid_critical_water_level() +{ + return enabled_ && hybrid_dying_water_level() || hybrid_critical_water_level_; +} + +bool SrsCircuitBreaker::hybrid_dying_water_level() +{ + return enabled_ && dying_pulse_ && hybrid_dying_water_level_ >= dying_pulse_; +} + +srs_error_t SrsCircuitBreaker::on_timer(srs_utime_t interval) +{ + srs_error_t err = srs_success; + + // Update the CPU usage. + srs_update_proc_stat(); + SrsProcSelfStat* stat = srs_get_self_proc_stat(); + + // Reset the high water-level when CPU is low for N times. + if (stat->percent * 100 > high_threshold_) { + hybrid_high_water_level_ = high_pulse_; + } else if (hybrid_high_water_level_ > 0) { + hybrid_high_water_level_--; + } + + // Reset the critical water-level when CPU is low for N times. + if (stat->percent * 100 > critical_threshold_) { + hybrid_critical_water_level_ = critical_pulse_; + } else if (hybrid_critical_water_level_ > 0) { + hybrid_critical_water_level_--; + } + + // Reset the dying water-level when CPU is low for N times. + if (stat->percent * 100 > dying_threshold_) { + hybrid_dying_water_level_ = srs_min(dying_pulse_ + 1, hybrid_dying_water_level_ + 1); + } else if (hybrid_dying_water_level_ > 0) { + hybrid_dying_water_level_ = 0; + } + + // Show statistics for RTC server. + SrsProcSelfStat* u = srs_get_self_proc_stat(); + // Resident Set Size: number of pages the process has in real memory. + int memory = (int)(u->rss * 4 / 1024); + + // The hybrid thread cpu and memory. + float thread_percent = stat->percent * 100; + + if (enabled_ && hybrid_high_water_level() || hybrid_critical_water_level() || _srs_pps_snack2->r10s()) { + srs_trace("CircuitBreaker: cpu=%.2f%%,%dMB, break=%d,%d,%d, cond=%.2f%%, snk=%d,%d,%d", + u->percent * 100, memory, + hybrid_high_water_level(), hybrid_critical_water_level(), hybrid_dying_water_level(), // Whether Circuit-Break is enable. + thread_percent, // The conditions to enable Circuit-Breaker. + _srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), _srs_pps_snack4->r10s() // NACK packet,seqs sent. + ); + } + + return err; +} + +SrsCircuitBreaker* _srs_circuit_breaker = new SrsCircuitBreaker(); + diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp new file mode 100644 index 000000000..7e293b6fb --- /dev/null +++ b/trunk/src/app/srs_app_threads.hpp @@ -0,0 +1,67 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_THREADS_HPP +#define SRS_APP_THREADS_HPP + +#include + +#include + +// Protect server in high load. +class SrsCircuitBreaker : public ISrsFastTimer +{ +private: + // The config for high/critical water level. + bool enabled_; + int high_threshold_; + int high_pulse_; + int critical_threshold_; + int critical_pulse_; + int dying_threshold_; + int dying_pulse_; +private: + // Reset the water-level when CPU is low for N times. + // @note To avoid the CPU change rapidly. + int hybrid_high_water_level_; + int hybrid_critical_water_level_; + int hybrid_dying_water_level_; +public: + SrsCircuitBreaker(); + virtual ~SrsCircuitBreaker(); +public: + srs_error_t initialize(); +public: + // Whether hybrid server water-level is high. + bool hybrid_high_water_level(); + bool hybrid_critical_water_level(); + bool hybrid_dying_water_level(); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval); +}; + +extern SrsCircuitBreaker* _srs_circuit_breaker; + +#endif + diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 4c287f878..7d3af0f74 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -26,6 +26,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 102 +#define VERSION_REVISION 103 #endif diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 9325e6e5e..fd0fe5804 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -54,6 +54,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_RTC #include #include @@ -479,6 +480,11 @@ srs_error_t run_hybrid_server() return srs_error_wrap(err, "hybrid initialize"); } + // Circuit breaker to protect server, which depends on hybrid. + if ((err = _srs_circuit_breaker->initialize()) != srs_success) { + return srs_error_wrap(err, "init circuit breaker"); + } + // Should run util hybrid servers all done. if ((err = _srs_hybrid->run()) != srs_success) { return srs_error_wrap(err, "hybrid run");