From 826546d5185e95bc2080894f0fae956640c1c29f Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 6 Feb 2021 18:05:04 +0800 Subject: [PATCH] Refine pps add SrsPps --- trunk/src/protocol/srs_protocol_kbps.cpp | 91 ++++++++++++++++++++---- trunk/src/protocol/srs_protocol_kbps.hpp | 56 +++++++++++---- 2 files changed, 118 insertions(+), 29 deletions(-) diff --git a/trunk/src/protocol/srs_protocol_kbps.cpp b/trunk/src/protocol/srs_protocol_kbps.cpp index 11f119a53..712e684ad 100644 --- a/trunk/src/protocol/srs_protocol_kbps.cpp +++ b/trunk/src/protocol/srs_protocol_kbps.cpp @@ -25,24 +25,83 @@ #include -SrsKbpsSample::SrsKbpsSample() +SrsRateSample::SrsRateSample() { - bytes = time = -1; - kbps = 0; + total = time = -1; + rate = 0; } -SrsKbpsSample::~SrsKbpsSample() +SrsRateSample::~SrsRateSample() { } -SrsKbpsSample* SrsKbpsSample::update(int64_t b, srs_utime_t t, int k) +SrsRateSample* SrsRateSample::update(int64_t nn, srs_utime_t t, int k) { - bytes = b; + total = nn; time = t; - kbps = k; + rate = k; return this; } +SrsPps::SrsPps(SrsWallClock* c) +{ + clk_ = c; + sugar = 0; +} + +SrsPps::~SrsPps() +{ +} + +void SrsPps::update() +{ + update(sugar); +} + +void SrsPps::update(int64_t nn) +{ + srs_utime_t now = clk_->now(); + + if (sample_30s_.time < 0) { + sample_30s_.update(nn, now, 0); + } + if (sample_1m_.time < 0) { + sample_1m_.update(nn, now, 0); + } + if (sample_5m_.time < 0) { + sample_5m_.update(nn, now, 0); + } + if (sample_60m_.time < 0) { + sample_60m_.update(nn, now, 0); + } + + if (now - sample_10s_.time >= 10 * SRS_UTIME_SECONDS) { + int kps = (int)((nn - sample_10s_.total) * 1000 / srsu2ms(now - sample_10s_.time)); + sample_10s_.update(nn, now, kps); + } + if (now - sample_30s_.time >= 30 * SRS_UTIME_SECONDS) { + int kps = (int)((nn - sample_30s_.total) * 1000 / srsu2ms(now - sample_30s_.time)); + sample_30s_.update(nn, now, kps); + } + if (now - sample_1m_.time >= 60 * SRS_UTIME_SECONDS) { + int kps = (int)((nn - sample_1m_.total) * 1000 / srsu2ms(now - sample_1m_.time)); + sample_1m_.update(nn, now, kps); + } + if (now - sample_5m_.time >= 300 * SRS_UTIME_SECONDS) { + int kps = (int)((nn - sample_5m_.total) * 1000 / srsu2ms(now - sample_5m_.time)); + sample_5m_.update(nn, now, kps); + } + if (now - sample_60m_.time >= 3600 * SRS_UTIME_SECONDS) { + int kps = (int)((nn - sample_60m_.total) * 1000 / srsu2ms(now - sample_60m_.time)); + sample_60m_.update(nn, now, kps); + } +} + +int SrsPps::r10s() +{ + return sample_10s_.rate; +} + SrsKbpsSlice::SrsKbpsSlice(SrsWallClock* c) { clk = c; @@ -78,19 +137,19 @@ void SrsKbpsSlice::sample() } if (now - sample_30s.time >= 30 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_30s.bytes) * 8 / srsu2ms(now - sample_30s.time)); + int kbps = (int)((total_bytes - sample_30s.total) * 8 / srsu2ms(now - sample_30s.time)); sample_30s.update(total_bytes, now, kbps); } if (now - sample_1m.time >= 60 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_1m.bytes) * 8 / srsu2ms(now - sample_1m.time)); + int kbps = (int)((total_bytes - sample_1m.total) * 8 / srsu2ms(now - sample_1m.time)); sample_1m.update(total_bytes, now, kbps); } if (now - sample_5m.time >= 300 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_5m.bytes) * 8 / srsu2ms(now - sample_5m.time)); + int kbps = (int)((total_bytes - sample_5m.total) * 8 / srsu2ms(now - sample_5m.time)); sample_5m.update(total_bytes, now, kbps); } if (now - sample_60m.time >= 3600 * SRS_UTIME_SECONDS) { - int kbps = (int)((total_bytes - sample_60m.bytes) * 8 / srsu2ms(now - sample_60m.time)); + int kbps = (int)((total_bytes - sample_60m.total) * 8 / srsu2ms(now - sample_60m.time)); sample_60m.update(total_bytes, now, kbps); } } @@ -188,22 +247,22 @@ int SrsKbps::get_recv_kbps() int SrsKbps::get_send_kbps_30s() { - return os.sample_30s.kbps; + return os.sample_30s.rate; } int SrsKbps::get_recv_kbps_30s() { - return is.sample_30s.kbps; + return is.sample_30s.rate; } int SrsKbps::get_send_kbps_5m() { - return os.sample_5m.kbps; + return os.sample_5m.rate; } int SrsKbps::get_recv_kbps_5m() { - return is.sample_5m.kbps; + return is.sample_5m.rate; } void SrsKbps::add_delta(int64_t in, int64_t out) @@ -297,3 +356,5 @@ int SrsKbps::size_memory() return sizeof(SrsKbps); } +SrsWallClock* _srs_clock = new SrsWallClock(); + diff --git a/trunk/src/protocol/srs_protocol_kbps.hpp b/trunk/src/protocol/srs_protocol_kbps.hpp index 22c3d032b..014038e85 100644 --- a/trunk/src/protocol/srs_protocol_kbps.hpp +++ b/trunk/src/protocol/srs_protocol_kbps.hpp @@ -30,21 +30,46 @@ class SrsWallClock; -/** - * a kbps sample, for example, the kbps at time, - * 10minute kbps sample. - */ -class SrsKbpsSample +// A sample for rate-based stat, such as kbps or kps. +class SrsRateSample { public: - int64_t bytes; + int64_t total; srs_utime_t time; - int kbps; + // kbps or kps + int rate; public: - SrsKbpsSample(); - virtual ~SrsKbpsSample(); + SrsRateSample(); + virtual ~SrsRateSample(); public: - virtual SrsKbpsSample* update(int64_t b, srs_utime_t t, int k); + virtual SrsRateSample* update(int64_t nn, srs_utime_t t, int k); +}; + +// A pps manager every some duration. +class SrsPps +{ +private: + SrsWallClock* clk_; +private: + // samples + SrsRateSample sample_10s_; + SrsRateSample sample_30s_; + SrsRateSample sample_1m_; + SrsRateSample sample_5m_; + SrsRateSample sample_60m_; +public: + // Sugar for target to stat. + int64_t sugar; +public: + SrsPps(SrsWallClock* clk); + virtual ~SrsPps(); +public: + // Update with the nn which is target. + void update(); + // Update with the nn. + void update(int64_t nn); + // Get the 10s average stat. + int r10s(); }; /** @@ -82,10 +107,10 @@ public: // cache for io maybe freed. int64_t last_bytes; // samples - SrsKbpsSample sample_30s; - SrsKbpsSample sample_1m; - SrsKbpsSample sample_5m; - SrsKbpsSample sample_60m; + SrsRateSample sample_30s; + SrsRateSample sample_1m; + SrsRateSample sample_5m; + SrsRateSample sample_60m; public: // for the delta bytes. int64_t delta_bytes; @@ -233,4 +258,7 @@ public: virtual int size_memory(); }; +// The global clock. +extern SrsWallClock* _srs_clock; + #endif