mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Add api to stat mw iovec
This commit is contained in:
parent
b3b76b0ca6
commit
90a39ec46d
9 changed files with 335 additions and 7 deletions
|
@ -264,7 +264,8 @@ srs_error_t SrsGoApiV1::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r
|
||||||
urls->set("clients", SrsJsonAny::str("manage all clients or specified client, default query top 10 clients"));
|
urls->set("clients", SrsJsonAny::str("manage all clients or specified client, default query top 10 clients"));
|
||||||
urls->set("raw", SrsJsonAny::str("raw api for srs, support CUID srs for instance the config"));
|
urls->set("raw", SrsJsonAny::str("raw api for srs, support CUID srs for instance the config"));
|
||||||
urls->set("clusters", SrsJsonAny::str("origin cluster server API"));
|
urls->set("clusters", SrsJsonAny::str("origin cluster server API"));
|
||||||
|
urls->set("perf", SrsJsonAny::str("System performance stat"));
|
||||||
|
|
||||||
SrsJsonObject* tests = SrsJsonAny::object();
|
SrsJsonObject* tests = SrsJsonAny::object();
|
||||||
obj->set("tests", tests);
|
obj->set("tests", tests);
|
||||||
|
|
||||||
|
@ -1288,6 +1289,34 @@ srs_error_t SrsGoApiClusters::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
|
||||||
return srs_api_response(w, r, obj->dumps());
|
return srs_api_response(w, r, obj->dumps());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SrsGoApiPerf::SrsGoApiPerf()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
SrsGoApiPerf::~SrsGoApiPerf()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
|
||||||
|
{
|
||||||
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
SrsJsonObject* obj = SrsJsonAny::object();
|
||||||
|
SrsAutoFree(SrsJsonObject, obj);
|
||||||
|
|
||||||
|
obj->set("code", SrsJsonAny::integer(ERROR_SUCCESS));
|
||||||
|
SrsJsonObject* data = SrsJsonAny::object();
|
||||||
|
obj->set("data", data);
|
||||||
|
|
||||||
|
SrsStatistic* stat = SrsStatistic::instance();
|
||||||
|
if ((err = stat->dumps_perf_mw(data)) != srs_success) {
|
||||||
|
int code = srs_error_code(err); srs_error_reset(err);
|
||||||
|
return srs_api_response_code(w, r, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
return srs_api_response(w, r, obj->dumps());
|
||||||
|
}
|
||||||
|
|
||||||
SrsGoApiError::SrsGoApiError()
|
SrsGoApiError::SrsGoApiError()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
|
@ -201,6 +201,15 @@ public:
|
||||||
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
|
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class SrsGoApiPerf : public ISrsHttpHandler
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
SrsGoApiPerf();
|
||||||
|
virtual ~SrsGoApiPerf();
|
||||||
|
public:
|
||||||
|
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
|
||||||
|
};
|
||||||
|
|
||||||
class SrsGoApiError : public ISrsHttpHandler
|
class SrsGoApiError : public ISrsHttpHandler
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
#include <srs_app_source.hpp>
|
#include <srs_app_source.hpp>
|
||||||
#include <srs_app_http_conn.hpp>
|
#include <srs_app_http_conn.hpp>
|
||||||
#include <srs_core_autofree.hpp>
|
#include <srs_core_autofree.hpp>
|
||||||
|
#include <srs_app_statistic.hpp>
|
||||||
|
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
@ -186,6 +187,9 @@ SrsQueueRecvThread::~SrsQueueRecvThread()
|
||||||
srs_error_t SrsQueueRecvThread::start()
|
srs_error_t SrsQueueRecvThread::start()
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
SrsStatistic* stat = SrsStatistic::instance();
|
||||||
|
rtmp->set_perf(stat);
|
||||||
|
|
||||||
if ((err = trd.start()) != srs_success) {
|
if ((err = trd.start()) != srs_success) {
|
||||||
return srs_error_wrap(err, "queue recv thread");
|
return srs_error_wrap(err, "queue recv thread");
|
||||||
|
|
|
@ -746,7 +746,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr
|
||||||
if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
|
if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
|
||||||
return srs_error_wrap(err, "rtmp: consumer dump packets");
|
return srs_error_wrap(err, "rtmp: consumer dump packets");
|
||||||
}
|
}
|
||||||
|
|
||||||
// reportable
|
// reportable
|
||||||
if (pprint->can_print()) {
|
if (pprint->can_print()) {
|
||||||
kbps->sample();
|
kbps->sample();
|
||||||
|
|
|
@ -947,7 +947,10 @@ srs_error_t SrsServer::http_handle()
|
||||||
return srs_error_wrap(err, "handle raw");
|
return srs_error_wrap(err, "handle raw");
|
||||||
}
|
}
|
||||||
if ((err = http_api_mux->handle("/api/v1/clusters", new SrsGoApiClusters())) != srs_success) {
|
if ((err = http_api_mux->handle("/api/v1/clusters", new SrsGoApiClusters())) != srs_success) {
|
||||||
return srs_error_wrap(err, "handle raw");
|
return srs_error_wrap(err, "handle clusters");
|
||||||
|
}
|
||||||
|
if ((err = http_api_mux->handle("/api/v1/perf", new SrsGoApiPerf())) != srs_success) {
|
||||||
|
return srs_error_wrap(err, "handle perf");
|
||||||
}
|
}
|
||||||
|
|
||||||
// test the request info.
|
// test the request info.
|
||||||
|
|
|
@ -234,6 +234,25 @@ srs_error_t SrsStatisticClient::dumps(SrsJsonObject* obj)
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SrsStatisticCategory::SrsStatisticCategory()
|
||||||
|
{
|
||||||
|
a = 0;
|
||||||
|
b = 0;
|
||||||
|
c = 0;
|
||||||
|
d = 0;
|
||||||
|
e = 0;
|
||||||
|
|
||||||
|
f = 0;
|
||||||
|
g = 0;
|
||||||
|
h = 0;
|
||||||
|
i = 0;
|
||||||
|
j = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SrsStatisticCategory::~SrsStatisticCategory()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
SrsStatistic* SrsStatistic::_instance = NULL;
|
SrsStatistic* SrsStatistic::_instance = NULL;
|
||||||
|
|
||||||
SrsStatistic::SrsStatistic()
|
SrsStatistic::SrsStatistic()
|
||||||
|
@ -243,6 +262,10 @@ SrsStatistic::SrsStatistic()
|
||||||
clk = new SrsWallClock();
|
clk = new SrsWallClock();
|
||||||
kbps = new SrsKbps(clk);
|
kbps = new SrsKbps(clk);
|
||||||
kbps->set_io(NULL, NULL);
|
kbps->set_io(NULL, NULL);
|
||||||
|
|
||||||
|
perf_iovs = new SrsStatisticCategory();
|
||||||
|
perf_msgs = new SrsStatisticCategory();
|
||||||
|
perf_sys = new SrsStatisticCategory();
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsStatistic::~SrsStatistic()
|
SrsStatistic::~SrsStatistic()
|
||||||
|
@ -276,6 +299,10 @@ SrsStatistic::~SrsStatistic()
|
||||||
rvhosts.clear();
|
rvhosts.clear();
|
||||||
streams.clear();
|
streams.clear();
|
||||||
rstreams.clear();
|
rstreams.clear();
|
||||||
|
|
||||||
|
srs_freep(perf_iovs);
|
||||||
|
srs_freep(perf_msgs);
|
||||||
|
srs_freep(perf_sys);
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsStatistic* SrsStatistic::instance()
|
SrsStatistic* SrsStatistic::instance()
|
||||||
|
@ -556,6 +583,159 @@ srs_error_t SrsStatistic::dumps_clients(SrsJsonArray* arr, int start, int count)
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SrsStatistic::perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs)
|
||||||
|
{
|
||||||
|
// For perf msgs, the nb_msgs stat.
|
||||||
|
// a: =1
|
||||||
|
// b: <10
|
||||||
|
// c: <100
|
||||||
|
// d: <200
|
||||||
|
// e: <300
|
||||||
|
// f: <400
|
||||||
|
// g: <500
|
||||||
|
// h: <600
|
||||||
|
// i: <1000
|
||||||
|
// j: >=1000
|
||||||
|
if (nb_msgs == 1) {
|
||||||
|
perf_msgs->a++;
|
||||||
|
} else if (nb_msgs < 10) {
|
||||||
|
perf_msgs->b++;
|
||||||
|
} else if (nb_msgs < 100) {
|
||||||
|
perf_msgs->c++;
|
||||||
|
} else if (nb_msgs < 200) {
|
||||||
|
perf_msgs->d++;
|
||||||
|
} else if (nb_msgs < 300) {
|
||||||
|
perf_msgs->e++;
|
||||||
|
} else if (nb_msgs < 400) {
|
||||||
|
perf_msgs->f++;
|
||||||
|
} else if (nb_msgs < 500) {
|
||||||
|
perf_msgs->g++;
|
||||||
|
} else if (nb_msgs < 600) {
|
||||||
|
perf_msgs->h++;
|
||||||
|
} else if (nb_msgs < 1000) {
|
||||||
|
perf_msgs->i++;
|
||||||
|
} else {
|
||||||
|
perf_msgs->j++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// For perf iovs, the nb_iovs stat.
|
||||||
|
// a: <=2
|
||||||
|
// b: <10
|
||||||
|
// c: <20
|
||||||
|
// d: <200
|
||||||
|
// e: <300
|
||||||
|
// f: <500
|
||||||
|
// g: <700
|
||||||
|
// h: <900
|
||||||
|
// i: <1024
|
||||||
|
// j: >=1024
|
||||||
|
if (nb_iovs <= 2) {
|
||||||
|
perf_iovs->a++;
|
||||||
|
} else if (nb_iovs < 10) {
|
||||||
|
perf_iovs->b++;
|
||||||
|
} else if (nb_iovs < 20) {
|
||||||
|
perf_iovs->c++;
|
||||||
|
} else if (nb_iovs < 200) {
|
||||||
|
perf_iovs->d++;
|
||||||
|
} else if (nb_iovs < 300) {
|
||||||
|
perf_iovs->e++;
|
||||||
|
} else if (nb_iovs < 500) {
|
||||||
|
perf_iovs->f++;
|
||||||
|
} else if (nb_iovs < 700) {
|
||||||
|
perf_iovs->g++;
|
||||||
|
} else if (nb_iovs < 900) {
|
||||||
|
perf_iovs->h++;
|
||||||
|
} else if (nb_iovs < 1024) {
|
||||||
|
perf_iovs->i++;
|
||||||
|
} else {
|
||||||
|
perf_iovs->j++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stat the syscalls.
|
||||||
|
// a: number of syscalls of msgs.
|
||||||
|
perf_sys->a++;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SrsStatistic::perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs)
|
||||||
|
{
|
||||||
|
// Stat the syscalls.
|
||||||
|
// a: number of syscalls of msgs.
|
||||||
|
// b: number of syscalls of pkts.
|
||||||
|
perf_sys->b++;
|
||||||
|
}
|
||||||
|
|
||||||
|
srs_error_t SrsStatistic::dumps_perf_mw(SrsJsonObject* obj)
|
||||||
|
{
|
||||||
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
if (true) {
|
||||||
|
SrsJsonObject* p = SrsJsonAny::object();
|
||||||
|
obj->set("msgs", p);
|
||||||
|
|
||||||
|
// For perf msgs, the nb_msgs stat.
|
||||||
|
// a: =1
|
||||||
|
// b: <10
|
||||||
|
// c: <100
|
||||||
|
// d: <200
|
||||||
|
// e: <300
|
||||||
|
// f: <400
|
||||||
|
// g: <500
|
||||||
|
// h: <600
|
||||||
|
// i: <1000
|
||||||
|
// j: >=1000
|
||||||
|
p->set("lt_2", SrsJsonAny::integer(perf_msgs->a));
|
||||||
|
p->set("lt_10", SrsJsonAny::integer(perf_msgs->b));
|
||||||
|
p->set("lt_100", SrsJsonAny::integer(perf_msgs->c));
|
||||||
|
p->set("lt_200", SrsJsonAny::integer(perf_msgs->d));
|
||||||
|
p->set("lt_300", SrsJsonAny::integer(perf_msgs->e));
|
||||||
|
p->set("lt_400", SrsJsonAny::integer(perf_msgs->f));
|
||||||
|
p->set("lt_500", SrsJsonAny::integer(perf_msgs->g));
|
||||||
|
p->set("lt_600", SrsJsonAny::integer(perf_msgs->h));
|
||||||
|
p->set("lt_1000", SrsJsonAny::integer(perf_msgs->i));
|
||||||
|
p->set("gt_1000", SrsJsonAny::integer(perf_msgs->j));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (true) {
|
||||||
|
SrsJsonObject* p = SrsJsonAny::object();
|
||||||
|
obj->set("iovs", p);
|
||||||
|
|
||||||
|
// For perf iovs, the nb_iovs stat.
|
||||||
|
// a: <=2
|
||||||
|
// b: <10
|
||||||
|
// c: <20
|
||||||
|
// d: <200
|
||||||
|
// e: <300
|
||||||
|
// f: <500
|
||||||
|
// g: <700
|
||||||
|
// h: <900
|
||||||
|
// i: <1024
|
||||||
|
// j: >=1024
|
||||||
|
p->set("lt_3", SrsJsonAny::integer(perf_iovs->a));
|
||||||
|
p->set("lt_10", SrsJsonAny::integer(perf_iovs->b));
|
||||||
|
p->set("lt_20", SrsJsonAny::integer(perf_iovs->c));
|
||||||
|
p->set("lt_200", SrsJsonAny::integer(perf_iovs->d));
|
||||||
|
p->set("lt_300", SrsJsonAny::integer(perf_iovs->e));
|
||||||
|
p->set("lt_500", SrsJsonAny::integer(perf_iovs->f));
|
||||||
|
p->set("lt_700", SrsJsonAny::integer(perf_iovs->g));
|
||||||
|
p->set("lt_900", SrsJsonAny::integer(perf_iovs->h));
|
||||||
|
p->set("lt_1024", SrsJsonAny::integer(perf_iovs->i));
|
||||||
|
p->set("gt_1024", SrsJsonAny::integer(perf_iovs->j));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (true) {
|
||||||
|
SrsJsonObject* p = SrsJsonAny::object();
|
||||||
|
obj->set("sys", p);
|
||||||
|
|
||||||
|
// Stat the syscalls.
|
||||||
|
// a: number of syscalls of msgs.
|
||||||
|
// b: number of syscalls of pkts.
|
||||||
|
p->set("msgs", SrsJsonAny::integer(perf_sys->a));
|
||||||
|
p->set("pkts", SrsJsonAny::integer(perf_sys->b));
|
||||||
|
}
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req)
|
SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req)
|
||||||
{
|
{
|
||||||
SrsStatisticVhost* vhost = NULL;
|
SrsStatisticVhost* vhost = NULL;
|
||||||
|
|
|
@ -122,7 +122,26 @@ public:
|
||||||
virtual srs_error_t dumps(SrsJsonObject* obj);
|
virtual srs_error_t dumps(SrsJsonObject* obj);
|
||||||
};
|
};
|
||||||
|
|
||||||
class SrsStatistic
|
class SrsStatisticCategory
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
uint64_t a;
|
||||||
|
uint64_t b;
|
||||||
|
uint64_t c;
|
||||||
|
uint64_t d;
|
||||||
|
uint64_t e;
|
||||||
|
public:
|
||||||
|
uint64_t f;
|
||||||
|
uint64_t g;
|
||||||
|
uint64_t h;
|
||||||
|
uint64_t i;
|
||||||
|
uint64_t j;
|
||||||
|
public:
|
||||||
|
SrsStatisticCategory();
|
||||||
|
virtual ~SrsStatisticCategory();
|
||||||
|
};
|
||||||
|
|
||||||
|
class SrsStatistic : public ISrsProtocolPerf
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
static SrsStatistic *_instance;
|
static SrsStatistic *_instance;
|
||||||
|
@ -146,6 +165,10 @@ private:
|
||||||
// The server total kbps.
|
// The server total kbps.
|
||||||
SrsKbps* kbps;
|
SrsKbps* kbps;
|
||||||
SrsWallClock* clk;
|
SrsWallClock* clk;
|
||||||
|
// The perf stat for mw(merged write).
|
||||||
|
SrsStatisticCategory* perf_iovs;
|
||||||
|
SrsStatisticCategory* perf_msgs;
|
||||||
|
SrsStatisticCategory* perf_sys;
|
||||||
private:
|
private:
|
||||||
SrsStatistic();
|
SrsStatistic();
|
||||||
virtual ~SrsStatistic();
|
virtual ~SrsStatistic();
|
||||||
|
@ -203,6 +226,15 @@ public:
|
||||||
// @param start the start index, from 0.
|
// @param start the start index, from 0.
|
||||||
// @param count the max count of clients to dump.
|
// @param count the max count of clients to dump.
|
||||||
virtual srs_error_t dumps_clients(SrsJsonArray* arr, int start, int count);
|
virtual srs_error_t dumps_clients(SrsJsonArray* arr, int start, int count);
|
||||||
|
public:
|
||||||
|
// Stat for packets merged written, nb_msgs is the number of RTMP messages,
|
||||||
|
// bytes_msgs is the total bytes of RTMP messages, nb_iovs is the total number of iovec.
|
||||||
|
virtual void perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs);
|
||||||
|
// Stat for packets merged written, nb_pkts is the number of or chunk packets,
|
||||||
|
// bytes_pkts is the total bytes of or chunk packets, nb_iovs is the total number of iovec.
|
||||||
|
virtual void perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs);
|
||||||
|
// Dumps the perf statistic data, for performance analysis.
|
||||||
|
virtual srs_error_t dumps_perf_mw(SrsJsonObject* obj);
|
||||||
private:
|
private:
|
||||||
virtual SrsStatisticVhost* create_vhost(SrsRequest* req);
|
virtual SrsStatisticVhost* create_vhost(SrsRequest* req);
|
||||||
virtual SrsStatisticStream* create_stream(SrsStatisticVhost* vhost, SrsRequest* req);
|
virtual SrsStatisticStream* create_stream(SrsStatisticVhost* vhost, SrsRequest* req);
|
||||||
|
|
|
@ -217,6 +217,14 @@ srs_error_t SrsPacket::encode_packet(SrsBuffer* stream)
|
||||||
return srs_error_new(ERROR_SYSTEM_PACKET_INVALID, "encode");
|
return srs_error_new(ERROR_SYSTEM_PACKET_INVALID, "encode");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ISrsProtocolPerf::ISrsProtocolPerf()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
ISrsProtocolPerf::~ISrsProtocolPerf()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
SrsProtocol::AckWindowSize::AckWindowSize()
|
SrsProtocol::AckWindowSize::AckWindowSize()
|
||||||
{
|
{
|
||||||
window = 0;
|
window = 0;
|
||||||
|
@ -256,6 +264,7 @@ SrsProtocol::SrsProtocol(ISrsProtocolReadWriter* io)
|
||||||
}
|
}
|
||||||
|
|
||||||
out_c0c3_caches = new char[SRS_CONSTS_C0C3_HEADERS_MAX];
|
out_c0c3_caches = new char[SRS_CONSTS_C0C3_HEADERS_MAX];
|
||||||
|
perf = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsProtocol::~SrsProtocol()
|
SrsProtocol::~SrsProtocol()
|
||||||
|
@ -303,6 +312,11 @@ void SrsProtocol::set_auto_response(bool v)
|
||||||
auto_response_when_recv = v;
|
auto_response_when_recv = v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SrsProtocol::set_perf(ISrsProtocolPerf* v)
|
||||||
|
{
|
||||||
|
perf = v;
|
||||||
|
}
|
||||||
|
|
||||||
srs_error_t SrsProtocol::manual_response_flush()
|
srs_error_t SrsProtocol::manual_response_flush()
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
@ -448,7 +462,12 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg
|
||||||
|
|
||||||
int c0c3_cache_index = 0;
|
int c0c3_cache_index = 0;
|
||||||
char* c0c3_cache = out_c0c3_caches + c0c3_cache_index;
|
char* c0c3_cache = out_c0c3_caches + c0c3_cache_index;
|
||||||
|
|
||||||
|
// How many messages are merged in written.
|
||||||
|
int nb_msgs_merged_written = 0;
|
||||||
|
// How many bytes of messages are merged in written.
|
||||||
|
int bytes_msgs_merged_written = 0;
|
||||||
|
|
||||||
// try to send use the c0c3 header cache,
|
// try to send use the c0c3 header cache,
|
||||||
// if cache is consumed, try another loop.
|
// if cache is consumed, try another loop.
|
||||||
for (int i = 0; i < nb_msgs; i++) {
|
for (int i = 0; i < nb_msgs; i++) {
|
||||||
|
@ -462,6 +481,10 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg
|
||||||
if (!msg->payload || msg->size <= 0) {
|
if (!msg->payload || msg->size <= 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Increase the perf stat data.
|
||||||
|
nb_msgs_merged_written++;
|
||||||
|
bytes_msgs_merged_written += msg->size;
|
||||||
|
|
||||||
// p set to current write position,
|
// p set to current write position,
|
||||||
// it's ok when payload is NULL and size is 0.
|
// it's ok when payload is NULL and size is 0.
|
||||||
|
@ -522,6 +545,12 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg
|
||||||
if ((err = do_iovs_send(out_iovs, iov_index)) != srs_success) {
|
if ((err = do_iovs_send(out_iovs, iov_index)) != srs_success) {
|
||||||
return srs_error_wrap(err, "send iovs");
|
return srs_error_wrap(err, "send iovs");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notify about perf stat.
|
||||||
|
if (perf) {
|
||||||
|
perf->perf_mw_on_msgs(nb_msgs_merged_written, bytes_msgs_merged_written, iov_index);
|
||||||
|
nb_msgs_merged_written = 0; bytes_msgs_merged_written = 0;
|
||||||
|
}
|
||||||
|
|
||||||
// reset caches, while these cache ensure
|
// reset caches, while these cache ensure
|
||||||
// atleast we can sendout a chunk.
|
// atleast we can sendout a chunk.
|
||||||
|
@ -539,8 +568,19 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg
|
||||||
if (iov_index <= 0) {
|
if (iov_index <= 0) {
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
return do_iovs_send(out_iovs, iov_index);
|
// Send out iovs at a time.
|
||||||
|
if ((err = do_iovs_send(out_iovs, iov_index)) != srs_success) {
|
||||||
|
return srs_error_wrap(err, "send iovs");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify about perf stat.
|
||||||
|
if (perf) {
|
||||||
|
perf->perf_mw_on_msgs(nb_msgs_merged_written, bytes_msgs_merged_written, iov_index);
|
||||||
|
nb_msgs_merged_written = 0; bytes_msgs_merged_written = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return err;
|
||||||
#else
|
#else
|
||||||
// try to send use the c0c3 header cache,
|
// try to send use the c0c3 header cache,
|
||||||
// if cache is consumed, try another loop.
|
// if cache is consumed, try another loop.
|
||||||
|
@ -587,6 +627,11 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg
|
||||||
if ((er = skt->writev(iovs, 2, NULL)) != srs_success) {
|
if ((er = skt->writev(iovs, 2, NULL)) != srs_success) {
|
||||||
return srs_error_wrap(err, "writev");
|
return srs_error_wrap(err, "writev");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notify about perf stat.
|
||||||
|
if (perf) {
|
||||||
|
perf->perf_mw_on_packets(1, payload_size, 2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2209,6 +2254,11 @@ void SrsRtmpServer::set_auto_response(bool v)
|
||||||
protocol->set_auto_response(v);
|
protocol->set_auto_response(v);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SrsRtmpServer::set_perf(ISrsProtocolPerf* v)
|
||||||
|
{
|
||||||
|
protocol->set_perf(v);
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef SRS_PERF_MERGED_READ
|
#ifdef SRS_PERF_MERGED_READ
|
||||||
void SrsRtmpServer::set_merge_read(bool v, IMergeReadHandler* handler)
|
void SrsRtmpServer::set_merge_read(bool v, IMergeReadHandler* handler)
|
||||||
{
|
{
|
||||||
|
|
|
@ -147,6 +147,21 @@ protected:
|
||||||
virtual srs_error_t encode_packet(SrsBuffer* stream);
|
virtual srs_error_t encode_packet(SrsBuffer* stream);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// The performance statistic data collect.
|
||||||
|
class ISrsProtocolPerf
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ISrsProtocolPerf();
|
||||||
|
virtual ~ISrsProtocolPerf();
|
||||||
|
public:
|
||||||
|
// Stat for packets merged written, nb_msgs is the number of RTMP messages,
|
||||||
|
// bytes_msgs is the total bytes of RTMP messages, nb_iovs is the total number of iovec.
|
||||||
|
virtual void perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs) = 0;
|
||||||
|
// Stat for packets merged written, nb_pkts is the number of or chunk packets,
|
||||||
|
// bytes_pkts is the total bytes of or chunk packets, nb_iovs is the total number of iovec.
|
||||||
|
virtual void perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
// The protocol provides the rtmp-message-protocol services,
|
// The protocol provides the rtmp-message-protocol services,
|
||||||
// To recv RTMP message from RTMP chunk stream,
|
// To recv RTMP message from RTMP chunk stream,
|
||||||
// and to send out RTMP message over RTMP chunk stream.
|
// and to send out RTMP message over RTMP chunk stream.
|
||||||
|
@ -168,6 +183,8 @@ private:
|
||||||
private:
|
private:
|
||||||
// The underlayer socket object, send/recv bytes.
|
// The underlayer socket object, send/recv bytes.
|
||||||
ISrsProtocolReadWriter* skt;
|
ISrsProtocolReadWriter* skt;
|
||||||
|
// The performance stat handler.
|
||||||
|
ISrsProtocolPerf* perf;
|
||||||
// The requests sent out, used to build the response.
|
// The requests sent out, used to build the response.
|
||||||
// key: transactionId
|
// key: transactionId
|
||||||
// value: the request command name
|
// value: the request command name
|
||||||
|
@ -227,6 +244,8 @@ public:
|
||||||
// @param v, whether auto response message when recv message.
|
// @param v, whether auto response message when recv message.
|
||||||
// @see: https://github.com/ossrs/srs/issues/217
|
// @see: https://github.com/ossrs/srs/issues/217
|
||||||
virtual void set_auto_response(bool v);
|
virtual void set_auto_response(bool v);
|
||||||
|
// Set the performance stat handler.
|
||||||
|
virtual void set_perf(ISrsProtocolPerf* v);
|
||||||
// Flush for manual response when the auto response is disabled
|
// Flush for manual response when the auto response is disabled
|
||||||
// by set_auto_response(false), we default use auto response, so donot
|
// by set_auto_response(false), we default use auto response, so donot
|
||||||
// need to call this api(the protocol sdk will auto send message).
|
// need to call this api(the protocol sdk will auto send message).
|
||||||
|
@ -631,6 +650,8 @@ public:
|
||||||
// @param v, whether auto response message when recv message.
|
// @param v, whether auto response message when recv message.
|
||||||
// @see: https://github.com/ossrs/srs/issues/217
|
// @see: https://github.com/ossrs/srs/issues/217
|
||||||
virtual void set_auto_response(bool v);
|
virtual void set_auto_response(bool v);
|
||||||
|
// Set the performance stat handler.
|
||||||
|
virtual void set_perf(ISrsProtocolPerf* v);
|
||||||
#ifdef SRS_PERF_MERGED_READ
|
#ifdef SRS_PERF_MERGED_READ
|
||||||
// To improve read performance, merge some packets then read,
|
// To improve read performance, merge some packets then read,
|
||||||
// When it on and read small bytes, we sleep to wait more data.,
|
// When it on and read small bytes, we sleep to wait more data.,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue