From 6234905532373f009e9c0777d4528764da1f964b Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 1 Jan 2019 17:36:27 +0800 Subject: [PATCH] Refactor ISrsKbpsDelta --- trunk/src/app/srs_app_conn.cpp | 19 +----- trunk/src/app/srs_app_conn.hpp | 9 +-- trunk/src/app/srs_app_http_api.cpp | 19 +----- trunk/src/app/srs_app_http_api.hpp | 7 +- trunk/src/app/srs_app_http_conn.cpp | 19 +----- trunk/src/app/srs_app_http_conn.hpp | 7 +- trunk/src/app/srs_app_rtmp_conn.cpp | 19 +----- trunk/src/app/srs_app_rtmp_conn.hpp | 7 +- trunk/src/app/srs_app_statistic.cpp | 12 ++-- trunk/src/app/srs_app_statistic.hpp | 2 +- trunk/src/protocol/srs_protocol_kbps.cpp | 42 +++++------- trunk/src/protocol/srs_protocol_kbps.hpp | 58 +++++++--------- trunk/src/utest/srs_utest_protocol.cpp | 85 ++++++++++++++++++++++++ 13 files changed, 146 insertions(+), 159 deletions(-) diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 8a02f6bd2..c99ebb2af 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -58,24 +58,9 @@ SrsConnection::~SrsConnection() srs_close_stfd(stfd); } -void SrsConnection::resample() +void SrsConnection::remark(int64_t* in, int64_t* out) { - kbps->resample(); -} - -int64_t SrsConnection::get_send_bytes_delta() -{ - return kbps->get_send_bytes_delta(); -} - -int64_t SrsConnection::get_recv_bytes_delta() -{ - return kbps->get_recv_bytes_delta(); -} - -void SrsConnection::cleanup() -{ - kbps->cleanup(); + kbps->remark(in, out); } void SrsConnection::dispose() diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 621ecb2a4..0c34c0bc4 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -42,7 +42,7 @@ class SrsWallClock; * server will add the connection to manager, and delete it when remove. */ class SrsConnection : virtual public ISrsConnection, virtual public ISrsCoroutineHandler - , virtual public IKbpsDelta, virtual public ISrsReloadHandler + , virtual public ISrsKbpsDelta, virtual public ISrsReloadHandler { protected: /** @@ -82,12 +82,9 @@ protected: public: SrsConnection(IConnectionManager* cm, srs_netfd_t c, std::string cip); virtual ~SrsConnection(); -// interface IKbpsDelta +// interface ISrsKbpsDelta public: - virtual void resample(); - virtual int64_t get_send_bytes_delta(); - virtual int64_t get_recv_bytes_delta(); - virtual void cleanup(); + virtual void remark(int64_t* in, int64_t* out); public: /** * to dipose the connection. diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 1739b49fb..dbf9f9c4b 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1336,24 +1336,7 @@ SrsHttpApi::~SrsHttpApi() _srs_config->unsubscribe(this); } -void SrsHttpApi::resample() -{ - // TODO: FIXME: implements it -} - -int64_t SrsHttpApi::get_send_bytes_delta() -{ - // TODO: FIXME: implements it - return 0; -} - -int64_t SrsHttpApi::get_recv_bytes_delta() -{ - // TODO: FIXME: implements it - return 0; -} - -void SrsHttpApi::cleanup() +void SrsHttpApi::remark(int64_t* in, int64_t* out) { // TODO: FIXME: implements it } diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index ad7331f66..2115db61b 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -219,12 +219,9 @@ private: public: SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip); virtual ~SrsHttpApi(); -// interface IKbpsDelta +// interface ISrsKbpsDelta public: - virtual void resample(); - virtual int64_t get_send_bytes_delta(); - virtual int64_t get_recv_bytes_delta(); - virtual void cleanup(); + virtual void remark(int64_t* in, int64_t* out); protected: virtual srs_error_t do_cycle(); private: diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index ff20f78fc..3fdc6d1d8 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -72,24 +72,7 @@ SrsHttpConn::~SrsHttpConn() srs_freep(cors); } -void SrsHttpConn::resample() -{ - // TODO: FIXME: implements it -} - -int64_t SrsHttpConn::get_send_bytes_delta() -{ - // TODO: FIXME: implements it - return 0; -} - -int64_t SrsHttpConn::get_recv_bytes_delta() -{ - // TODO: FIXME: implements it - return 0; -} - -void SrsHttpConn::cleanup() +void SrsHttpConn::remark(int64_t* in, int64_t* out) { // TODO: FIXME: implements it } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 617cd12c6..5c9822b71 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -67,12 +67,9 @@ protected: public: SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip); virtual ~SrsHttpConn(); -// interface IKbpsDelta +// interface ISrsKbpsDelta public: - virtual void resample(); - virtual int64_t get_send_bytes_delta(); - virtual int64_t get_recv_bytes_delta(); - virtual void cleanup(); + virtual void remark(int64_t* in, int64_t* out); protected: virtual srs_error_t do_cycle(); protected: diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 4ea213fef..3598ee62e 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -334,24 +334,9 @@ srs_error_t SrsRtmpConn::on_reload_vhost_publish(string vhost) return err; } -void SrsRtmpConn::resample() +void SrsRtmpConn::remark(int64_t* in, int64_t* out) { - kbps->resample(); -} - -int64_t SrsRtmpConn::get_send_bytes_delta() -{ - return kbps->get_send_bytes_delta(); -} - -int64_t SrsRtmpConn::get_recv_bytes_delta() -{ - return kbps->get_recv_bytes_delta(); -} - -void SrsRtmpConn::cleanup() -{ - kbps->cleanup(); + kbps->remark(in, out); } srs_error_t SrsRtmpConn::service_cycle() diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 7d6196351..a6fe8a52a 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -140,12 +140,9 @@ public: virtual srs_error_t on_reload_vhost_tcp_nodelay(std::string vhost); virtual srs_error_t on_reload_vhost_realtime(std::string vhost); virtual srs_error_t on_reload_vhost_publish(std::string vhost); -// interface IKbpsDelta +// interface ISrsKbpsDelta public: - virtual void resample(); - virtual int64_t get_send_bytes_delta(); - virtual int64_t get_recv_bytes_delta(); - virtual void cleanup(); + virtual void remark(int64_t* in, int64_t* out); private: // when valid and connected to vhost/app, service the client. virtual srs_error_t service_cycle(); diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index c20e42222..298b782ea 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -459,16 +459,14 @@ void SrsStatistic::kbps_add_delta(SrsConnection* conn) SrsStatisticClient* client = clients[id]; // resample the kbps to collect the delta. - conn->resample(); + int64_t in, out; + conn->remark(&in, &out); // add delta of connection to kbps. // for next sample() of server kbps can get the stat. - kbps->add_delta(conn); - client->stream->kbps->add_delta(conn); - client->stream->vhost->kbps->add_delta(conn); - - // cleanup the delta. - conn->cleanup(); + kbps->add_delta(in, out); + client->stream->kbps->add_delta(in, out); + client->stream->vhost->kbps->add_delta(in, out); } SrsKbps* SrsStatistic::kbps_sample() diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 762fa5606..a3eb4e8b8 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -212,7 +212,7 @@ public: * sample the kbps, add delta bytes of conn. * use kbps_sample() to get all result of kbps stat. */ - // TODO: FIXME: the add delta must use IKbpsDelta interface instead. + // TODO: FIXME: the add delta must use ISrsKbpsDelta interface instead. virtual void kbps_add_delta(SrsConnection* conn); /** * calc the result for all kbps. diff --git a/trunk/src/protocol/srs_protocol_kbps.cpp b/trunk/src/protocol/srs_protocol_kbps.cpp index 216ef35b7..484dfa270 100644 --- a/trunk/src/protocol/srs_protocol_kbps.cpp +++ b/trunk/src/protocol/srs_protocol_kbps.cpp @@ -96,11 +96,11 @@ void SrsKbpsSlice::sample() } } -IKbpsDelta::IKbpsDelta() +ISrsKbpsDelta::ISrsKbpsDelta() { } -IKbpsDelta::~IKbpsDelta() +ISrsKbpsDelta::~ISrsKbpsDelta() { } @@ -205,13 +205,11 @@ int SrsKbps::get_recv_kbps_5m() return is.sample_5m.kbps; } -void SrsKbps::add_delta(IKbpsDelta* delta) +void SrsKbps::add_delta(int64_t in, int64_t out) { - srs_assert(delta); - // update the total bytes - is.last_bytes += delta->get_recv_bytes_delta(); - os.last_bytes += delta->get_send_bytes_delta(); + is.last_bytes += in; + os.last_bytes += out; // we donot sample, please use sample() to do resample. } @@ -276,27 +274,21 @@ int64_t SrsKbps::get_recv_bytes() return bytes; } -void SrsKbps::resample() +void SrsKbps::remark(int64_t* in, int64_t* out) { sample(); -} - -int64_t SrsKbps::get_send_bytes_delta() -{ - int64_t delta = os.get_total_bytes() - os.delta_bytes; - return delta; -} - -int64_t SrsKbps::get_recv_bytes_delta() -{ - int64_t delta = is.get_total_bytes() - is.delta_bytes; - return delta; -} - -void SrsKbps::cleanup() -{ - os.delta_bytes = os.get_total_bytes(); + + int64_t inv = is.get_total_bytes() - is.delta_bytes; is.delta_bytes = is.get_total_bytes(); + if (in) { + *in = inv; + } + + int64_t outv = os.get_total_bytes() - os.delta_bytes; + os.delta_bytes = os.get_total_bytes(); + if (out) { + *out = outv; + } } int SrsKbps::size_memory() diff --git a/trunk/src/protocol/srs_protocol_kbps.hpp b/trunk/src/protocol/srs_protocol_kbps.hpp index 8f3e92d69..f0267a2eb 100644 --- a/trunk/src/protocol/srs_protocol_kbps.hpp +++ b/trunk/src/protocol/srs_protocol_kbps.hpp @@ -31,7 +31,7 @@ class SrsWallClock; /** - * a kbps sample, for example, 1minute kbps, + * a kbps sample, for example, the kbps at time, * 10minute kbps sample. */ class SrsKbpsSample @@ -97,7 +97,7 @@ public: SrsKbpsSlice(SrsWallClock* clk); virtual ~SrsKbpsSlice(); public: - // Get current total bytes, not depend on sample(). + // Get current total bytes, it doesn't depend on sample(). virtual int64_t get_total_bytes(); // Resample the slice to calculate the kbps. virtual void sample(); @@ -106,30 +106,21 @@ public: /** * the interface which provices delta of bytes. * for a delta, for example, a live stream connection, we can got the delta by: - * IKbpsDelta* delta = ...; - * delta->resample(); - * kbps->add_delta(delta); - * delta->cleanup(); + * ISrsKbpsDelta* delta = ...; + * int64_t in, out; + * delta->remark(&in, &out); + * kbps->add_delta(in, out); */ -class IKbpsDelta +class ISrsKbpsDelta { public: - IKbpsDelta(); - virtual ~IKbpsDelta(); + ISrsKbpsDelta(); + virtual ~ISrsKbpsDelta(); public: /** * resample to generate the value of delta bytes. */ - virtual void resample() = 0; - /** - * get the send or recv bytes delta. - */ - virtual int64_t get_send_bytes_delta() = 0; - virtual int64_t get_recv_bytes_delta() = 0; - /** - * cleanup the value of delta bytes. - */ - virtual void cleanup() = 0; + virtual void remark(int64_t* in, int64_t* out) = 0; }; /** @@ -161,19 +152,19 @@ public: * SrsKbps* kbps = ...; * kbps->set_io(NULL, NULL) * for each connection in connections: - * IKbpsDelta* delta = connection; // where connection implements IKbpsDelta - * delta->resample() - * kbps->add_delta(delta) - * delta->cleanup() + * ISrsKbpsDelta* delta = connection; // where connection implements ISrsKbpsDelta + * int64_t in, out; + * delta->remark(&in, &out) + * kbps->add_delta(in, out) * kbps->sample() * kbps->get_xxx_kbps(). - * 3. kbps used as IKbpsDelta, to provides delta bytes: + * 3. kbps used as ISrsKbpsDelta, to provides delta bytes: * SrsKbps* kbps = ...; * kbps->set_io(in, out); - * IKbpsDelta* delta = (IKbpsDelta*)kbps; - * delta->resample(); - * printf("delta is %d/%d", delta->get_send_bytes_delta(), delta->get_recv_bytes_delta()); - * delta->cleanup(); + * ISrsKbpsDelta* delta = (ISrsKbpsDelta*)kbps; + * int64_t in, out; + * delta->remark(&in, out); + * printf("delta is %d/%d", in, out); * 4. kbps used as ISrsProtocolStatistic, to provides raw bytes: * SrsKbps* kbps = ...; * kbps->set_io(in, out); @@ -183,7 +174,7 @@ public: * user->set_io(kbps, kbps); * the server never know how many bytes already send/recv, for the connection maybe closed. */ -class SrsKbps : virtual public ISrsProtocolStatistic, virtual public IKbpsDelta +class SrsKbps : virtual public ISrsProtocolStatistic, virtual public ISrsKbpsDelta { private: SrsKbpsSlice is; @@ -226,7 +217,7 @@ public: * @remark user must invoke sample() to calc result after invoke this method. * @param delta, assert should never be NULL. */ - virtual void add_delta(IKbpsDelta* delta); + virtual void add_delta(int64_t in, int64_t out); /** * resample all samples, ignore if in/out is NULL. * used for user to calc the kbps, to sample new kbps value. @@ -238,12 +229,9 @@ public: public: virtual int64_t get_send_bytes(); virtual int64_t get_recv_bytes(); -// interface IKbpsDelta +// interface ISrsKbpsDelta public: - virtual void resample(); - virtual int64_t get_send_bytes_delta(); - virtual int64_t get_recv_bytes_delta(); - virtual void cleanup(); + virtual void remark(int64_t* in, int64_t* out); // interface ISrsMemorySizer public: virtual int size_memory(); diff --git a/trunk/src/utest/srs_utest_protocol.cpp b/trunk/src/utest/srs_utest_protocol.cpp index 2eff05d5a..241a3c82b 100644 --- a/trunk/src/utest/srs_utest_protocol.cpp +++ b/trunk/src/utest/srs_utest_protocol.cpp @@ -5832,5 +5832,90 @@ VOID TEST(ProtocolKbpsTest, Connections) } } +VOID TEST(ProtocolKbpsTest, Delta) +{ + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsKbps* conn = new SrsKbps(clock->set_clock(0)); + SrsAutoFree(SrsKbps, conn); + conn->set_io(io, io); + + // No data. + ISrsKbpsDelta* delta = (ISrsKbpsDelta*)conn; + int64_t in, out; + delta->remark(&in, &out); + EXPECT_EQ(0, in); + EXPECT_EQ(0, out); + + // 800kb. + io->set_in(100 * 1000)->set_out(100 * 1000); + delta->remark(&in, &out); + EXPECT_EQ(100 * 1000, in); + EXPECT_EQ(100 * 1000, out); + + // No data. + delta->remark(&in, &out); + EXPECT_EQ(0, in); + EXPECT_EQ(0, out); + } + + if (true) { + MockWallClock* clock = new MockWallClock(); + SrsAutoFree(MockWallClock, clock); + MockStatistic* io = new MockStatistic(); + SrsAutoFree(MockStatistic, io); + + SrsKbps* conn = new SrsKbps(clock->set_clock(0)); + SrsAutoFree(SrsKbps, conn); + conn->set_io(io, io); + + // No data. + ISrsKbpsDelta* delta = (ISrsKbpsDelta*)conn; + int64_t in, out; + delta->remark(&in, &out); + EXPECT_EQ(0, in); + EXPECT_EQ(0, out); + + // 800kb. + io->set_in(100 * 1000)->set_out(100 * 1000); + delta->remark(&in, &out); + EXPECT_EQ(100 * 1000, in); + EXPECT_EQ(100 * 1000, out); + + // Kbps without io, gather delta. + SrsKbps* kbps = new SrsKbps(clock->set_clock(0)); + SrsAutoFree(SrsKbps, kbps); + kbps->set_io(NULL, NULL); + + // No data, 0kbps. + kbps->sample(); + + EXPECT_EQ(0, kbps->get_recv_kbps()); + EXPECT_EQ(0, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(0, kbps->get_send_kbps()); + EXPECT_EQ(0, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + + // 800kbps in 30s. + clock->set_clock(30 * 1000); + kbps->add_delta(30 * in, 30 * out); + kbps->sample(); + + EXPECT_EQ(800, kbps->get_recv_kbps()); + EXPECT_EQ(800, kbps->get_recv_kbps_30s()); + EXPECT_EQ(0, kbps->get_recv_kbps_5m()); + + EXPECT_EQ(800, kbps->get_send_kbps()); + EXPECT_EQ(800, kbps->get_send_kbps_30s()); + EXPECT_EQ(0, kbps->get_send_kbps_5m()); + } +} + #endif