From b175821b625ee16e1d663d5cc3db6ecf09eaae8b Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 14 May 2014 13:56:12 +0800 Subject: [PATCH] refine kbps, provides 30s,1m,5m,60m kbps. 0.9.97 --- trunk/src/app/srs_app_edge.cpp | 14 ++-- trunk/src/app/srs_app_forward.cpp | 7 +- trunk/src/app/srs_app_kbps.cpp | 108 ++++++++++++++++++++++++++-- trunk/src/app/srs_app_kbps.hpp | 58 ++++++++++++++- trunk/src/app/srs_app_rtmp_conn.cpp | 20 ++++-- trunk/src/core/srs_core.hpp | 2 +- 6 files changed, 188 insertions(+), 21 deletions(-) diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index d84c08501..3f9e2f482 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -172,9 +172,12 @@ int SrsEdgeIngester::ingest() // pithy print if (pithy_print.can_print()) { + kbps->sample(); srs_trace("<- "SRS_LOG_ID_EDGE_PLAY - " time=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.age(), kbps->get_send_kbps(), kbps->get_recv_kbps()); + " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", + pithy_print.age(), + kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(), + kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); } // read from client. @@ -464,9 +467,12 @@ int SrsEdgeForwarder::cycle() // pithy print if (pithy_print.can_print()) { + kbps->sample(); srs_trace("-> "SRS_LOG_ID_EDGE_PUBLISH - " time=%"PRId64", msgs=%d, okbps=%d, ikbps=%d", - pithy_print.age(), count, kbps->get_send_kbps(), kbps->get_recv_kbps()); + " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", + pithy_print.age(), count, + kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(), + kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); } // ignore when no messages. diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 280622493..4690b644e 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -342,9 +342,12 @@ int SrsForwarder::forward() // pithy print if (pithy_print.can_print()) { + kbps->sample(); srs_trace("-> "SRS_LOG_ID_FOWARDER - " time=%"PRId64", msgs=%d, okbps=%d, ikbps=%d", - pithy_print.age(), count, kbps->get_send_kbps(), kbps->get_recv_kbps()); + " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", + pithy_print.age(), count, + kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(), + kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); } // ignore when no messages. diff --git a/trunk/src/app/srs_app_kbps.cpp b/trunk/src/app/srs_app_kbps.cpp index d422dda76..07ebeb95a 100644 --- a/trunk/src/app/srs_app_kbps.cpp +++ b/trunk/src/app/srs_app_kbps.cpp @@ -28,6 +28,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +SrsKbpsSample::SrsKbpsSample() +{ + bytes = time = 0; + kbps = 0; +} + SrsKbpsSlice::SrsKbpsSlice() { io.in = NULL; @@ -39,6 +45,59 @@ SrsKbpsSlice::~SrsKbpsSlice() { } +int64_t SrsKbpsSlice::get_total_bytes() +{ + return bytes + last_bytes - io_bytes_base; +} + +void SrsKbpsSlice::sample() +{ + int64_t now = srs_get_system_time_ms(); + int64_t total_bytes = get_total_bytes(); + + if (sample_30s.time <= 0) { + sample_30s.kbps = 0; + sample_30s.time = now; + sample_30s.bytes = total_bytes; + } + if (sample_1m.time <= 0) { + sample_1m.kbps = 0; + sample_1m.time = now; + sample_1m.bytes = total_bytes; + } + if (sample_5m.time <= 0) { + sample_5m.kbps = 0; + sample_5m.time = now; + sample_5m.bytes = total_bytes; + } + if (sample_60m.time <= 0) { + sample_60m.kbps = 0; + sample_60m.time = now; + sample_60m.bytes = total_bytes; + } + + if (now - sample_30s.time > 30 * 1000) { + sample_30s.kbps = (total_bytes - sample_30s.bytes) * 8 / (now - sample_30s.time); + sample_30s.time = now; + sample_30s.bytes = total_bytes; + } + if (now - sample_1m.time > 60 * 1000) { + sample_1m.kbps = (total_bytes - sample_1m.bytes) * 8 / (now - sample_1m.time); + sample_1m.time = now; + sample_1m.bytes = total_bytes; + } + if (now - sample_5m.time > 300 * 1000) { + sample_5m.kbps = (total_bytes - sample_5m.bytes) * 8 / (now - sample_5m.time); + sample_5m.time = now; + sample_5m.bytes = total_bytes; + } + if (now - sample_60m.time > 3600 * 1000) { + sample_60m.kbps = (total_bytes - sample_60m.bytes) * 8 / (now - sample_60m.time); + sample_60m.time = now; + sample_60m.bytes = total_bytes; + } +} + SrsKbps::SrsKbps() { } @@ -64,6 +123,8 @@ void SrsKbps::set_io(ISrsProtocolReader* in, ISrsProtocolWriter* out) if (in) { is.last_bytes = is.io_bytes_base = in->get_recv_bytes(); } + // resample + is.sample(); // set output stream // now, set start time. @@ -80,41 +141,74 @@ void SrsKbps::set_io(ISrsProtocolReader* in, ISrsProtocolWriter* out) if (out) { os.last_bytes = os.io_bytes_base = out->get_send_bytes(); } + // resample + os.sample(); } int SrsKbps::get_send_kbps() { int64_t duration = srs_get_system_time_ms() - is.starttime; - int64_t bytes = get_send_bytes(); if (duration <= 0) { return 0; } + int64_t bytes = get_send_bytes(); return bytes * 8 / duration; } int SrsKbps::get_recv_kbps() { int64_t duration = srs_get_system_time_ms() - os.starttime; - int64_t bytes = get_recv_bytes(); if (duration <= 0) { return 0; } + int64_t bytes = get_recv_bytes(); return bytes * 8 / duration; } +int SrsKbps::get_send_kbps_sample_high() +{ + return os.sample_30s.kbps; +} + +int SrsKbps::get_recv_kbps_sample_high() +{ + return is.sample_30s.kbps; +} + +int SrsKbps::get_send_kbps_sample_medium() +{ + return os.sample_5m.kbps; +} + +int SrsKbps::get_recv_kbps_sample_medium() +{ + return is.sample_5m.kbps; +} + int64_t SrsKbps::get_send_bytes() { - if (os.io.out) { - os.last_bytes = os.io.out->get_send_bytes(); - } - return os.bytes + os.last_bytes - os.io_bytes_base; + return os.get_total_bytes(); } int64_t SrsKbps::get_recv_bytes() { + return is.get_total_bytes(); +} + +void SrsKbps::sample() +{ + if (os.io.out) { + os.last_bytes = os.io.out->get_send_bytes(); + } + + // resample + os.sample(); + if (is.io.in) { is.last_bytes = is.io.in->get_recv_bytes(); } - return is.bytes + is.last_bytes - is.io_bytes_base; + + // resample + is.sample(); } diff --git a/trunk/src/app/srs_app_kbps.hpp b/trunk/src/app/srs_app_kbps.hpp index 63d39ccda..d2cc2d4d2 100644 --- a/trunk/src/app/srs_app_kbps.hpp +++ b/trunk/src/app/srs_app_kbps.hpp @@ -33,8 +33,33 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class ISrsProtocolReader; class ISrsProtocolWriter; +/** +* a kbps sample, for example, 1minute kbps, +* 10minute kbps sample. +*/ +class SrsKbpsSample +{ +public: + int64_t bytes; + int64_t time; + int kbps; +public: + SrsKbpsSample(); +}; + /** * a slice of kbps statistic, for input or output. +* a slice contains a set of sessions, which has a base offset of bytes, +* where a slice is: +* starttime(oldest session startup time) +* bytes(total bytes of previous sessions) +* io_bytes_base(bytes offset of current session) +* last_bytes(bytes of current session) +* so, the total send bytes now is: +* send_bytes = bytes + last_bytes - io_bytes_base +* so, the bytes sent duration current session is: +* send_bytes = last_bytes - io_bytes_base +* @remark user use set_io to start new session. */ class SrsKbpsSlice { @@ -45,17 +70,34 @@ private: }; public: slice_io io; + // session startup bytes + // @remark, use total_bytes() to get the total bytes of slice. int64_t bytes; + // slice starttime, the first time to record bytes. int64_t starttime; - // startup bytes number for io when set it, + // session startup bytes number for io when set it, // the base offset of bytes for io. int64_t io_bytes_base; // last updated bytes number, // cache for io maybe freed. int64_t last_bytes; + // samples + SrsKbpsSample sample_30s; + SrsKbpsSample sample_1m; + SrsKbpsSample sample_5m; + SrsKbpsSample sample_60m; public: SrsKbpsSlice(); virtual ~SrsKbpsSlice(); +public: + /** + * get current total bytes. + */ + virtual int64_t get_total_bytes(); + /** + * resample all samples. + */ + virtual void sample(); }; /** @@ -71,6 +113,7 @@ public: virtual ~SrsKbps(); public: /** + * set io to start new session. * set the underlayer reader/writer, * if the io destroied, for instance, the forwarder reconnect, * user must set the io of SrsKbps to NULL to continue to use the kbps object. @@ -82,15 +125,28 @@ public: public: /** * get total kbps, duration is from the startup of io. + * @remark, use sample() to update data. */ virtual int get_send_kbps(); virtual int get_recv_kbps(); + // 30s + virtual int get_send_kbps_sample_high(); + virtual int get_recv_kbps_sample_high(); + // 5m + virtual int get_send_kbps_sample_medium(); + virtual int get_recv_kbps_sample_medium(); public: /** * get the total send/recv bytes, from the startup of the oldest io. + * @remark, use sample() to update data. */ virtual int64_t get_send_bytes(); virtual int64_t get_recv_bytes(); +public: + /** + * resample all samples. + */ + virtual void sample(); }; #endif \ No newline at end of file diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 5a3628926..de6d6326e 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -504,9 +504,12 @@ int SrsRtmpConn::playing(SrsSource* source) // reportable if (pithy_print.can_print()) { + kbps->sample(); srs_trace("-> "SRS_LOG_ID_PLAY - " time=%"PRId64", duration=%"PRId64", msgs=%d, okbps=%d, ikbps=%d", - pithy_print.age(), duration, count, kbps->get_send_kbps(), kbps->get_recv_kbps()); + " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", + pithy_print.age(), count, + kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(), + kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); } if (count <= 0) { @@ -590,9 +593,11 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) // reportable if (pithy_print.can_print()) { + kbps->sample(); srs_trace("<- "SRS_LOG_ID_CLIENT_PUBLISH - " time=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.age(), kbps->get_send_kbps(), kbps->get_recv_kbps()); + " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", pithy_print.age(), + kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(), + kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); } // process UnPublish event. @@ -666,9 +671,12 @@ int SrsRtmpConn::flash_publish(SrsSource* source) // reportable if (pithy_print.can_print()) { + kbps->sample(); srs_trace("<- "SRS_LOG_ID_WEB_PUBLISH - " time=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.age(), kbps->get_send_kbps(), kbps->get_recv_kbps()); + " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", + pithy_print.age(), + kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(), + kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); } // process UnPublish event. diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 8d3a4870a..334af8f83 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "96" +#define VERSION_REVISION "97" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "srs"