mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
STAT: Extract an ephemeral delta object.
This commit is contained in:
parent
4fe90d4885
commit
db91102e67
5 changed files with 70 additions and 19 deletions
|
@ -1820,14 +1820,11 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
|
||||||
nn_simulate_player_nack_drop = 0;
|
nn_simulate_player_nack_drop = 0;
|
||||||
pp_address_change = new SrsErrorPithyPrint();
|
pp_address_change = new SrsErrorPithyPrint();
|
||||||
pli_epp = new SrsErrorPithyPrint();
|
pli_epp = new SrsErrorPithyPrint();
|
||||||
|
delta_ = new SrsEphemeralDelta();
|
||||||
|
|
||||||
nack_enabled_ = false;
|
nack_enabled_ = false;
|
||||||
timer_nack_ = new SrsRtcConnectionNackTimer(this);
|
timer_nack_ = new SrsRtcConnectionNackTimer(this);
|
||||||
|
|
||||||
clock_ = new SrsWallClock();
|
|
||||||
kbps_ = new SrsKbps(clock_);
|
|
||||||
kbps_->set_io(NULL, NULL);
|
|
||||||
|
|
||||||
_srs_rtc_manager->subscribe(this);
|
_srs_rtc_manager->subscribe(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1872,9 +1869,7 @@ SrsRtcConnection::~SrsRtcConnection()
|
||||||
srs_freep(req_);
|
srs_freep(req_);
|
||||||
srs_freep(pp_address_change);
|
srs_freep(pp_address_change);
|
||||||
srs_freep(pli_epp);
|
srs_freep(pli_epp);
|
||||||
|
srs_freep(delta_);
|
||||||
srs_freep(kbps_);
|
|
||||||
srs_freep(clock_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void SrsRtcConnection::on_before_dispose(ISrsResource* c)
|
void SrsRtcConnection::on_before_dispose(ISrsResource* c)
|
||||||
|
@ -1952,7 +1947,7 @@ vector<SrsUdpMuxSocket*> SrsRtcConnection::peer_addresses()
|
||||||
|
|
||||||
void SrsRtcConnection::remark(int64_t* in, int64_t* out)
|
void SrsRtcConnection::remark(int64_t* in, int64_t* out)
|
||||||
{
|
{
|
||||||
kbps_->remark(in, out);
|
delta_->remark(in, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
const SrsContextId& SrsRtcConnection::get_id()
|
const SrsContextId& SrsRtcConnection::get_id()
|
||||||
|
@ -2110,7 +2105,7 @@ srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r)
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
// Update stat when we received data.
|
// Update stat when we received data.
|
||||||
kbps_->add_delta(skt->size(), 0);
|
delta_->add_delta(skt->size(), 0);
|
||||||
|
|
||||||
if (!r->is_binding_request()) {
|
if (!r->is_binding_request()) {
|
||||||
return err;
|
return err;
|
||||||
|
@ -2135,7 +2130,7 @@ srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r)
|
||||||
srs_error_t SrsRtcConnection::on_dtls(char* data, int nb_data)
|
srs_error_t SrsRtcConnection::on_dtls(char* data, int nb_data)
|
||||||
{
|
{
|
||||||
// Update stat when we received data.
|
// Update stat when we received data.
|
||||||
kbps_->add_delta(nb_data, 0);
|
delta_->add_delta(nb_data, 0);
|
||||||
|
|
||||||
return transport_->on_dtls(data, nb_data);
|
return transport_->on_dtls(data, nb_data);
|
||||||
}
|
}
|
||||||
|
@ -2145,7 +2140,7 @@ srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data)
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
// Update stat when we received data.
|
// Update stat when we received data.
|
||||||
kbps_->add_delta(nb_data, 0);
|
delta_->add_delta(nb_data, 0);
|
||||||
|
|
||||||
int nb_unprotected_buf = nb_data;
|
int nb_unprotected_buf = nb_data;
|
||||||
if ((err = transport_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) {
|
if ((err = transport_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) {
|
||||||
|
@ -2286,7 +2281,7 @@ srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
// Update stat when we received data.
|
// Update stat when we received data.
|
||||||
kbps_->add_delta(nb_data, 0);
|
delta_->add_delta(nb_data, 0);
|
||||||
|
|
||||||
SrsRtcPublishStream* publisher = NULL;
|
SrsRtcPublishStream* publisher = NULL;
|
||||||
if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) {
|
if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) {
|
||||||
|
@ -2485,7 +2480,7 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data)
|
||||||
++_srs_pps_srtcps->sugar;
|
++_srs_pps_srtcps->sugar;
|
||||||
|
|
||||||
// Update stat when we sending data.
|
// Update stat when we sending data.
|
||||||
kbps_->add_delta(0, nb_data);
|
delta_->add_delta(0, nb_data);
|
||||||
|
|
||||||
int nb_buf = nb_data;
|
int nb_buf = nb_data;
|
||||||
if ((err = transport_->protect_rtcp(data, &nb_buf)) != srs_success) {
|
if ((err = transport_->protect_rtcp(data, &nb_buf)) != srs_success) {
|
||||||
|
@ -2692,7 +2687,7 @@ srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket* pkt)
|
||||||
++_srs_pps_srtps->sugar;
|
++_srs_pps_srtps->sugar;
|
||||||
|
|
||||||
// Update stat when we sending data.
|
// Update stat when we sending data.
|
||||||
kbps_->add_delta(0, iov->iov_len);
|
delta_->add_delta(0, iov->iov_len);
|
||||||
|
|
||||||
// TODO: FIXME: Handle error.
|
// TODO: FIXME: Handle error.
|
||||||
sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0);
|
sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0);
|
||||||
|
@ -2766,7 +2761,7 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update stat when we sending data.
|
// Update stat when we sending data.
|
||||||
kbps_->add_delta(0, stream->pos());
|
delta_->add_delta(0, stream->pos());
|
||||||
|
|
||||||
if ((err = sendonly_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) {
|
if ((err = sendonly_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) {
|
||||||
return srs_error_wrap(err, "stun binding response send failed");
|
return srs_error_wrap(err, "stun binding response send failed");
|
||||||
|
|
|
@ -51,8 +51,7 @@ class SrsStatistic;
|
||||||
class SrsRtcUserConfig;
|
class SrsRtcUserConfig;
|
||||||
class SrsRtcSendTrack;
|
class SrsRtcSendTrack;
|
||||||
class SrsRtcPublishStream;
|
class SrsRtcPublishStream;
|
||||||
class SrsKbps;
|
class SrsEphemeralDelta;
|
||||||
class SrsWallClock;
|
|
||||||
|
|
||||||
const uint8_t kSR = 200;
|
const uint8_t kSR = 200;
|
||||||
const uint8_t kRR = 201;
|
const uint8_t kRR = 201;
|
||||||
|
@ -493,8 +492,7 @@ private:
|
||||||
private:
|
private:
|
||||||
bool nack_enabled_;
|
bool nack_enabled_;
|
||||||
private:
|
private:
|
||||||
SrsKbps* kbps_;
|
SrsEphemeralDelta* delta_;
|
||||||
SrsWallClock* clock_;
|
|
||||||
public:
|
public:
|
||||||
SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid);
|
SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid);
|
||||||
virtual ~SrsRtcConnection();
|
virtual ~SrsRtcConnection();
|
||||||
|
|
|
@ -68,6 +68,28 @@ ISrsKbpsDelta::~ISrsKbpsDelta()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SrsEphemeralDelta::SrsEphemeralDelta()
|
||||||
|
{
|
||||||
|
in_ = out_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SrsEphemeralDelta::~SrsEphemeralDelta()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void SrsEphemeralDelta::add_delta(int64_t in, int64_t out)
|
||||||
|
{
|
||||||
|
in_ += in;
|
||||||
|
out_ += out;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SrsEphemeralDelta::remark(int64_t* in, int64_t* out)
|
||||||
|
{
|
||||||
|
if (in) *in = in_;
|
||||||
|
if (out) *out = out_;
|
||||||
|
in_ = out_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
SrsKbps::SrsKbps(SrsWallClock* c) : is(c), os(c)
|
SrsKbps::SrsKbps(SrsWallClock* c) : is(c), os(c)
|
||||||
{
|
{
|
||||||
clk = c;
|
clk = c;
|
||||||
|
|
|
@ -84,6 +84,23 @@ public:
|
||||||
virtual void remark(int64_t* in, int64_t* out) = 0;
|
virtual void remark(int64_t* in, int64_t* out) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// A delta data source for SrsKbps, used in ephemeral case, for example, UDP server to increase stat when received or
|
||||||
|
// sent out each UDP packet.
|
||||||
|
class SrsEphemeralDelta : public ISrsKbpsDelta
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
uint64_t in_;
|
||||||
|
uint64_t out_;
|
||||||
|
public:
|
||||||
|
SrsEphemeralDelta();
|
||||||
|
virtual ~SrsEphemeralDelta();
|
||||||
|
public:
|
||||||
|
virtual void add_delta(int64_t in, int64_t out);
|
||||||
|
// Interface ISrsKbpsDelta.
|
||||||
|
public:
|
||||||
|
virtual void remark(int64_t* in, int64_t* out);
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* to statistic the kbps of io.
|
* to statistic the kbps of io.
|
||||||
* itself can be a statistic source, for example, used for SRS bytes stat.
|
* itself can be a statistic source, for example, used for SRS bytes stat.
|
||||||
|
|
|
@ -6225,6 +6225,25 @@ VOID TEST(ProtocolKbpsTest, Connections)
|
||||||
|
|
||||||
VOID TEST(ProtocolKbpsTest, Delta)
|
VOID TEST(ProtocolKbpsTest, Delta)
|
||||||
{
|
{
|
||||||
|
if (true) {
|
||||||
|
SrsEphemeralDelta ed;
|
||||||
|
|
||||||
|
ISrsKbpsDelta* delta = (ISrsKbpsDelta*)&ed;
|
||||||
|
int64_t in, out;
|
||||||
|
delta->remark(&in, &out);
|
||||||
|
EXPECT_EQ(0, in);
|
||||||
|
EXPECT_EQ(0, out);
|
||||||
|
|
||||||
|
ed.add_delta(100 * 1000, 100 * 1000);
|
||||||
|
delta->remark(&in, &out);
|
||||||
|
EXPECT_EQ(100 * 1000, in);
|
||||||
|
EXPECT_EQ(100 * 1000, out);
|
||||||
|
|
||||||
|
delta->remark(&in, &out);
|
||||||
|
EXPECT_EQ(0, in);
|
||||||
|
EXPECT_EQ(0, out);
|
||||||
|
}
|
||||||
|
|
||||||
if (true) {
|
if (true) {
|
||||||
MockWallClock* clock = new MockWallClock();
|
MockWallClock* clock = new MockWallClock();
|
||||||
SrsAutoFree(MockWallClock, clock);
|
SrsAutoFree(MockWallClock, clock);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue