1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Refactor ISrsKbpsDelta

This commit is contained in:
winlin 2019-01-01 17:36:27 +08:00
parent dcebf8a31f
commit 6234905532
13 changed files with 146 additions and 159 deletions

View file

@ -58,24 +58,9 @@ SrsConnection::~SrsConnection()
srs_close_stfd(stfd); srs_close_stfd(stfd);
} }
void SrsConnection::resample() void SrsConnection::remark(int64_t* in, int64_t* out)
{ {
kbps->resample(); kbps->remark(in, out);
}
int64_t SrsConnection::get_send_bytes_delta()
{
return kbps->get_send_bytes_delta();
}
int64_t SrsConnection::get_recv_bytes_delta()
{
return kbps->get_recv_bytes_delta();
}
void SrsConnection::cleanup()
{
kbps->cleanup();
} }
void SrsConnection::dispose() void SrsConnection::dispose()

View file

@ -42,7 +42,7 @@ class SrsWallClock;
* server will add the connection to manager, and delete it when remove. * server will add the connection to manager, and delete it when remove.
*/ */
class SrsConnection : virtual public ISrsConnection, virtual public ISrsCoroutineHandler class SrsConnection : virtual public ISrsConnection, virtual public ISrsCoroutineHandler
, virtual public IKbpsDelta, virtual public ISrsReloadHandler , virtual public ISrsKbpsDelta, virtual public ISrsReloadHandler
{ {
protected: protected:
/** /**
@ -82,12 +82,9 @@ protected:
public: public:
SrsConnection(IConnectionManager* cm, srs_netfd_t c, std::string cip); SrsConnection(IConnectionManager* cm, srs_netfd_t c, std::string cip);
virtual ~SrsConnection(); virtual ~SrsConnection();
// interface IKbpsDelta // interface ISrsKbpsDelta
public: public:
virtual void resample(); virtual void remark(int64_t* in, int64_t* out);
virtual int64_t get_send_bytes_delta();
virtual int64_t get_recv_bytes_delta();
virtual void cleanup();
public: public:
/** /**
* to dipose the connection. * to dipose the connection.

View file

@ -1336,24 +1336,7 @@ SrsHttpApi::~SrsHttpApi()
_srs_config->unsubscribe(this); _srs_config->unsubscribe(this);
} }
void SrsHttpApi::resample() void SrsHttpApi::remark(int64_t* in, int64_t* out)
{
// TODO: FIXME: implements it
}
int64_t SrsHttpApi::get_send_bytes_delta()
{
// TODO: FIXME: implements it
return 0;
}
int64_t SrsHttpApi::get_recv_bytes_delta()
{
// TODO: FIXME: implements it
return 0;
}
void SrsHttpApi::cleanup()
{ {
// TODO: FIXME: implements it // TODO: FIXME: implements it
} }

View file

@ -219,12 +219,9 @@ private:
public: public:
SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip); SrsHttpApi(IConnectionManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip);
virtual ~SrsHttpApi(); virtual ~SrsHttpApi();
// interface IKbpsDelta // interface ISrsKbpsDelta
public: public:
virtual void resample(); virtual void remark(int64_t* in, int64_t* out);
virtual int64_t get_send_bytes_delta();
virtual int64_t get_recv_bytes_delta();
virtual void cleanup();
protected: protected:
virtual srs_error_t do_cycle(); virtual srs_error_t do_cycle();
private: private:

View file

@ -72,24 +72,7 @@ SrsHttpConn::~SrsHttpConn()
srs_freep(cors); srs_freep(cors);
} }
void SrsHttpConn::resample() void SrsHttpConn::remark(int64_t* in, int64_t* out)
{
// TODO: FIXME: implements it
}
int64_t SrsHttpConn::get_send_bytes_delta()
{
// TODO: FIXME: implements it
return 0;
}
int64_t SrsHttpConn::get_recv_bytes_delta()
{
// TODO: FIXME: implements it
return 0;
}
void SrsHttpConn::cleanup()
{ {
// TODO: FIXME: implements it // TODO: FIXME: implements it
} }

View file

@ -67,12 +67,9 @@ protected:
public: public:
SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip); SrsHttpConn(IConnectionManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip);
virtual ~SrsHttpConn(); virtual ~SrsHttpConn();
// interface IKbpsDelta // interface ISrsKbpsDelta
public: public:
virtual void resample(); virtual void remark(int64_t* in, int64_t* out);
virtual int64_t get_send_bytes_delta();
virtual int64_t get_recv_bytes_delta();
virtual void cleanup();
protected: protected:
virtual srs_error_t do_cycle(); virtual srs_error_t do_cycle();
protected: protected:

View file

@ -334,24 +334,9 @@ srs_error_t SrsRtmpConn::on_reload_vhost_publish(string vhost)
return err; return err;
} }
void SrsRtmpConn::resample() void SrsRtmpConn::remark(int64_t* in, int64_t* out)
{ {
kbps->resample(); kbps->remark(in, out);
}
int64_t SrsRtmpConn::get_send_bytes_delta()
{
return kbps->get_send_bytes_delta();
}
int64_t SrsRtmpConn::get_recv_bytes_delta()
{
return kbps->get_recv_bytes_delta();
}
void SrsRtmpConn::cleanup()
{
kbps->cleanup();
} }
srs_error_t SrsRtmpConn::service_cycle() srs_error_t SrsRtmpConn::service_cycle()

View file

@ -140,12 +140,9 @@ public:
virtual srs_error_t on_reload_vhost_tcp_nodelay(std::string vhost); virtual srs_error_t on_reload_vhost_tcp_nodelay(std::string vhost);
virtual srs_error_t on_reload_vhost_realtime(std::string vhost); virtual srs_error_t on_reload_vhost_realtime(std::string vhost);
virtual srs_error_t on_reload_vhost_publish(std::string vhost); virtual srs_error_t on_reload_vhost_publish(std::string vhost);
// interface IKbpsDelta // interface ISrsKbpsDelta
public: public:
virtual void resample(); virtual void remark(int64_t* in, int64_t* out);
virtual int64_t get_send_bytes_delta();
virtual int64_t get_recv_bytes_delta();
virtual void cleanup();
private: private:
// when valid and connected to vhost/app, service the client. // when valid and connected to vhost/app, service the client.
virtual srs_error_t service_cycle(); virtual srs_error_t service_cycle();

View file

@ -459,16 +459,14 @@ void SrsStatistic::kbps_add_delta(SrsConnection* conn)
SrsStatisticClient* client = clients[id]; SrsStatisticClient* client = clients[id];
// resample the kbps to collect the delta. // resample the kbps to collect the delta.
conn->resample(); int64_t in, out;
conn->remark(&in, &out);
// add delta of connection to kbps. // add delta of connection to kbps.
// for next sample() of server kbps can get the stat. // for next sample() of server kbps can get the stat.
kbps->add_delta(conn); kbps->add_delta(in, out);
client->stream->kbps->add_delta(conn); client->stream->kbps->add_delta(in, out);
client->stream->vhost->kbps->add_delta(conn); client->stream->vhost->kbps->add_delta(in, out);
// cleanup the delta.
conn->cleanup();
} }
SrsKbps* SrsStatistic::kbps_sample() SrsKbps* SrsStatistic::kbps_sample()

View file

@ -212,7 +212,7 @@ public:
* sample the kbps, add delta bytes of conn. * sample the kbps, add delta bytes of conn.
* use kbps_sample() to get all result of kbps stat. * use kbps_sample() to get all result of kbps stat.
*/ */
// TODO: FIXME: the add delta must use IKbpsDelta interface instead. // TODO: FIXME: the add delta must use ISrsKbpsDelta interface instead.
virtual void kbps_add_delta(SrsConnection* conn); virtual void kbps_add_delta(SrsConnection* conn);
/** /**
* calc the result for all kbps. * calc the result for all kbps.

View file

@ -96,11 +96,11 @@ void SrsKbpsSlice::sample()
} }
} }
IKbpsDelta::IKbpsDelta() ISrsKbpsDelta::ISrsKbpsDelta()
{ {
} }
IKbpsDelta::~IKbpsDelta() ISrsKbpsDelta::~ISrsKbpsDelta()
{ {
} }
@ -205,13 +205,11 @@ int SrsKbps::get_recv_kbps_5m()
return is.sample_5m.kbps; return is.sample_5m.kbps;
} }
void SrsKbps::add_delta(IKbpsDelta* delta) void SrsKbps::add_delta(int64_t in, int64_t out)
{ {
srs_assert(delta);
// update the total bytes // update the total bytes
is.last_bytes += delta->get_recv_bytes_delta(); is.last_bytes += in;
os.last_bytes += delta->get_send_bytes_delta(); os.last_bytes += out;
// we donot sample, please use sample() to do resample. // we donot sample, please use sample() to do resample.
} }
@ -276,27 +274,21 @@ int64_t SrsKbps::get_recv_bytes()
return bytes; return bytes;
} }
void SrsKbps::resample() void SrsKbps::remark(int64_t* in, int64_t* out)
{ {
sample(); sample();
}
int64_t SrsKbps::get_send_bytes_delta() int64_t inv = is.get_total_bytes() - is.delta_bytes;
{
int64_t delta = os.get_total_bytes() - os.delta_bytes;
return delta;
}
int64_t SrsKbps::get_recv_bytes_delta()
{
int64_t delta = is.get_total_bytes() - is.delta_bytes;
return delta;
}
void SrsKbps::cleanup()
{
os.delta_bytes = os.get_total_bytes();
is.delta_bytes = is.get_total_bytes(); is.delta_bytes = is.get_total_bytes();
if (in) {
*in = inv;
}
int64_t outv = os.get_total_bytes() - os.delta_bytes;
os.delta_bytes = os.get_total_bytes();
if (out) {
*out = outv;
}
} }
int SrsKbps::size_memory() int SrsKbps::size_memory()

View file

@ -31,7 +31,7 @@
class SrsWallClock; class SrsWallClock;
/** /**
* a kbps sample, for example, 1minute kbps, * a kbps sample, for example, the kbps at time,
* 10minute kbps sample. * 10minute kbps sample.
*/ */
class SrsKbpsSample class SrsKbpsSample
@ -97,7 +97,7 @@ public:
SrsKbpsSlice(SrsWallClock* clk); SrsKbpsSlice(SrsWallClock* clk);
virtual ~SrsKbpsSlice(); virtual ~SrsKbpsSlice();
public: public:
// Get current total bytes, not depend on sample(). // Get current total bytes, it doesn't depend on sample().
virtual int64_t get_total_bytes(); virtual int64_t get_total_bytes();
// Resample the slice to calculate the kbps. // Resample the slice to calculate the kbps.
virtual void sample(); virtual void sample();
@ -106,30 +106,21 @@ public:
/** /**
* the interface which provices delta of bytes. * the interface which provices delta of bytes.
* for a delta, for example, a live stream connection, we can got the delta by: * for a delta, for example, a live stream connection, we can got the delta by:
* IKbpsDelta* delta = ...; * ISrsKbpsDelta* delta = ...;
* delta->resample(); * int64_t in, out;
* kbps->add_delta(delta); * delta->remark(&in, &out);
* delta->cleanup(); * kbps->add_delta(in, out);
*/ */
class IKbpsDelta class ISrsKbpsDelta
{ {
public: public:
IKbpsDelta(); ISrsKbpsDelta();
virtual ~IKbpsDelta(); virtual ~ISrsKbpsDelta();
public: public:
/** /**
* resample to generate the value of delta bytes. * resample to generate the value of delta bytes.
*/ */
virtual void resample() = 0; virtual void remark(int64_t* in, int64_t* out) = 0;
/**
* get the send or recv bytes delta.
*/
virtual int64_t get_send_bytes_delta() = 0;
virtual int64_t get_recv_bytes_delta() = 0;
/**
* cleanup the value of delta bytes.
*/
virtual void cleanup() = 0;
}; };
/** /**
@ -161,19 +152,19 @@ public:
* SrsKbps* kbps = ...; * SrsKbps* kbps = ...;
* kbps->set_io(NULL, NULL) * kbps->set_io(NULL, NULL)
* for each connection in connections: * for each connection in connections:
* IKbpsDelta* delta = connection; // where connection implements IKbpsDelta * ISrsKbpsDelta* delta = connection; // where connection implements ISrsKbpsDelta
* delta->resample() * int64_t in, out;
* kbps->add_delta(delta) * delta->remark(&in, &out)
* delta->cleanup() * kbps->add_delta(in, out)
* kbps->sample() * kbps->sample()
* kbps->get_xxx_kbps(). * kbps->get_xxx_kbps().
* 3. kbps used as IKbpsDelta, to provides delta bytes: * 3. kbps used as ISrsKbpsDelta, to provides delta bytes:
* SrsKbps* kbps = ...; * SrsKbps* kbps = ...;
* kbps->set_io(in, out); * kbps->set_io(in, out);
* IKbpsDelta* delta = (IKbpsDelta*)kbps; * ISrsKbpsDelta* delta = (ISrsKbpsDelta*)kbps;
* delta->resample(); * int64_t in, out;
* printf("delta is %d/%d", delta->get_send_bytes_delta(), delta->get_recv_bytes_delta()); * delta->remark(&in, out);
* delta->cleanup(); * printf("delta is %d/%d", in, out);
* 4. kbps used as ISrsProtocolStatistic, to provides raw bytes: * 4. kbps used as ISrsProtocolStatistic, to provides raw bytes:
* SrsKbps* kbps = ...; * SrsKbps* kbps = ...;
* kbps->set_io(in, out); * kbps->set_io(in, out);
@ -183,7 +174,7 @@ public:
* user->set_io(kbps, kbps); * user->set_io(kbps, kbps);
* the server never know how many bytes already send/recv, for the connection maybe closed. * the server never know how many bytes already send/recv, for the connection maybe closed.
*/ */
class SrsKbps : virtual public ISrsProtocolStatistic, virtual public IKbpsDelta class SrsKbps : virtual public ISrsProtocolStatistic, virtual public ISrsKbpsDelta
{ {
private: private:
SrsKbpsSlice is; SrsKbpsSlice is;
@ -226,7 +217,7 @@ public:
* @remark user must invoke sample() to calc result after invoke this method. * @remark user must invoke sample() to calc result after invoke this method.
* @param delta, assert should never be NULL. * @param delta, assert should never be NULL.
*/ */
virtual void add_delta(IKbpsDelta* delta); virtual void add_delta(int64_t in, int64_t out);
/** /**
* resample all samples, ignore if in/out is NULL. * resample all samples, ignore if in/out is NULL.
* used for user to calc the kbps, to sample new kbps value. * used for user to calc the kbps, to sample new kbps value.
@ -238,12 +229,9 @@ public:
public: public:
virtual int64_t get_send_bytes(); virtual int64_t get_send_bytes();
virtual int64_t get_recv_bytes(); virtual int64_t get_recv_bytes();
// interface IKbpsDelta // interface ISrsKbpsDelta
public: public:
virtual void resample(); virtual void remark(int64_t* in, int64_t* out);
virtual int64_t get_send_bytes_delta();
virtual int64_t get_recv_bytes_delta();
virtual void cleanup();
// interface ISrsMemorySizer // interface ISrsMemorySizer
public: public:
virtual int size_memory(); virtual int size_memory();

View file

@ -5832,5 +5832,90 @@ VOID TEST(ProtocolKbpsTest, Connections)
} }
} }
VOID TEST(ProtocolKbpsTest, Delta)
{
if (true) {
MockWallClock* clock = new MockWallClock();
SrsAutoFree(MockWallClock, clock);
MockStatistic* io = new MockStatistic();
SrsAutoFree(MockStatistic, io);
SrsKbps* conn = new SrsKbps(clock->set_clock(0));
SrsAutoFree(SrsKbps, conn);
conn->set_io(io, io);
// No data.
ISrsKbpsDelta* delta = (ISrsKbpsDelta*)conn;
int64_t in, out;
delta->remark(&in, &out);
EXPECT_EQ(0, in);
EXPECT_EQ(0, out);
// 800kb.
io->set_in(100 * 1000)->set_out(100 * 1000);
delta->remark(&in, &out);
EXPECT_EQ(100 * 1000, in);
EXPECT_EQ(100 * 1000, out);
// No data.
delta->remark(&in, &out);
EXPECT_EQ(0, in);
EXPECT_EQ(0, out);
}
if (true) {
MockWallClock* clock = new MockWallClock();
SrsAutoFree(MockWallClock, clock);
MockStatistic* io = new MockStatistic();
SrsAutoFree(MockStatistic, io);
SrsKbps* conn = new SrsKbps(clock->set_clock(0));
SrsAutoFree(SrsKbps, conn);
conn->set_io(io, io);
// No data.
ISrsKbpsDelta* delta = (ISrsKbpsDelta*)conn;
int64_t in, out;
delta->remark(&in, &out);
EXPECT_EQ(0, in);
EXPECT_EQ(0, out);
// 800kb.
io->set_in(100 * 1000)->set_out(100 * 1000);
delta->remark(&in, &out);
EXPECT_EQ(100 * 1000, in);
EXPECT_EQ(100 * 1000, out);
// Kbps without io, gather delta.
SrsKbps* kbps = new SrsKbps(clock->set_clock(0));
SrsAutoFree(SrsKbps, kbps);
kbps->set_io(NULL, NULL);
// No data, 0kbps.
kbps->sample();
EXPECT_EQ(0, kbps->get_recv_kbps());
EXPECT_EQ(0, kbps->get_recv_kbps_30s());
EXPECT_EQ(0, kbps->get_recv_kbps_5m());
EXPECT_EQ(0, kbps->get_send_kbps());
EXPECT_EQ(0, kbps->get_send_kbps_30s());
EXPECT_EQ(0, kbps->get_send_kbps_5m());
// 800kbps in 30s.
clock->set_clock(30 * 1000);
kbps->add_delta(30 * in, 30 * out);
kbps->sample();
EXPECT_EQ(800, kbps->get_recv_kbps());
EXPECT_EQ(800, kbps->get_recv_kbps_30s());
EXPECT_EQ(0, kbps->get_recv_kbps_5m());
EXPECT_EQ(800, kbps->get_send_kbps());
EXPECT_EQ(800, kbps->get_send_kbps_30s());
EXPECT_EQ(0, kbps->get_send_kbps_5m());
}
}
#endif #endif