1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 03:41:55 +00:00

CLS: Refine logging to global object.

This commit is contained in:
winlin 2022-08-27 12:28:33 +08:00
parent 8bc7342c3c
commit 6dc86b8a2e
3 changed files with 75 additions and 70 deletions

View file

@ -391,7 +391,7 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval)
);
// Report logs to CLS if enabled.
if ((err = srs_cls_report()) != srs_success) {
if ((err = _srs_cls->report()) != srs_success) {
srs_warn("ignore cls err %s", srs_error_desc(err).c_str());
srs_freep(err);
}

View file

@ -220,6 +220,7 @@ public:
SrsClsLogGroupList();
virtual ~SrsClsLogGroupList();
public:
bool empty();
SrsClsLogGroup* add_log_group();
public:
virtual uint64_t nb_bytes();
@ -424,6 +425,11 @@ SrsClsLogGroupList::~SrsClsLogGroupList()
}
}
bool SrsClsLogGroupList::empty()
{
return groups_.empty();
}
SrsClsLogGroup* SrsClsLogGroupList::add_log_group()
{
SrsClsLogGroup* group = new SrsClsLogGroup();
@ -469,7 +475,7 @@ SrsClsSugar::SrsClsSugar()
log_ = log_group_->add_log();
log_group_->set_source(srs_get_public_internet_address(true));
log_->set_time(srs_get_system_time() / SRS_UTIME_SECONDS);
log_->set_time(srs_get_system_time() / SRS_UTIME_MILLISECONDS);
kv("agent", RTMP_SIG_SRS_SERVER);
string label = _srs_cls->label();
@ -493,6 +499,21 @@ SrsClsSugar::~SrsClsSugar()
srs_freep(log_groups_);
}
uint64_t SrsClsSugar::nb_bytes()
{
return log_groups_->nb_bytes();
}
srs_error_t SrsClsSugar::encode(SrsBuffer* b)
{
return log_groups_->encode(b);
}
bool SrsClsSugar::empty()
{
return log_groups_->empty();
}
SrsClsSugar* SrsClsSugar::kv(std::string k, std::string v)
{
log_->add_content()->set_key(k)->set_value(v);
@ -520,16 +541,6 @@ SrsClsSugar* SrsClsSugar::kvf(std::string k, const char* fmt, ...)
return kv(k, v);
}
uint64_t SrsClsSugar::nb_bytes()
{
return log_groups_->nb_bytes();
}
srs_error_t SrsClsSugar::encode(SrsBuffer* b)
{
return log_groups_->encode(b);
}
SrsClsSugars::SrsClsSugars()
{
}
@ -581,8 +592,16 @@ SrsClsSugars* SrsClsSugars::slice(int max_size)
for (vector<SrsClsSugar*>::iterator it = sugars.begin(); it != sugars.end();) {
SrsClsSugar* sugar = *it;
// Always push at least one elem.
// Always consume it.
it = sugars.erase(it);
// If empty, ignore it.
if (sugar->empty()) {
srs_freep(sugar);
continue;
}
// Not empty, append it, to make sure at least one elem.
v->sugars.push_back(sugar);
// Util exceed the max size.
@ -616,10 +635,12 @@ SrsClsClient::SrsClsClient()
heartbeat_ratio_ = 0;
streams_ratio_ = 0;
nn_logs_ = 0;
sugars_ = new SrsClsSugars();
}
SrsClsClient::~SrsClsClient()
{
srs_freep(sugars_);
}
bool SrsClsClient::enabled()
@ -627,26 +648,6 @@ bool SrsClsClient::enabled()
return enabled_;
}
bool SrsClsClient::stat_heartbeat()
{
return stat_heartbeat_;
}
bool SrsClsClient::stat_streams()
{
return stat_streams_;
}
int SrsClsClient::heartbeat_ratio()
{
return heartbeat_ratio_;
}
int SrsClsClient::streams_ratio()
{
return streams_ratio_;
}
string SrsClsClient::label()
{
return label_;
@ -706,7 +707,34 @@ srs_error_t SrsClsClient::initialize()
return err;
}
srs_error_t SrsClsClient::send_log(ISrsEncoder* sugar, int count, int total)
srs_error_t SrsClsClient::report()
{
srs_error_t err = srs_success;
if ((err = dump_summaries(sugars_)) != srs_success) {
return srs_error_wrap(err, "dump summary");
}
if ((err = dump_streams(sugars_)) != srs_success) {
return srs_error_wrap(err, "dump streams");
}
if (sugars_->empty()) {
return err;
}
SrsClsSugars* sugars = sugars_;
SrsAutoFree(SrsClsSugars, sugars);
sugars_ = new SrsClsSugars();
if ((err = send_logs(sugars)) != srs_success) {
return srs_error_wrap(err, "cls");
}
return err;
}
srs_error_t SrsClsClient::do_send_logs(ISrsEncoder* sugar, int count, int total)
{
srs_error_t err = srs_success;
@ -811,7 +839,7 @@ srs_error_t SrsClsClient::send_logs(SrsClsSugars* sugars)
SrsClsSugars* v = sugars->slice(SRS_CLS_BATCH_MAX_LOG_SIZE);
SrsAutoFree(SrsClsSugars, v);
if ((err = send_log((ISrsEncoder*)v, v->size(), total)) != srs_success) {
if ((err = do_send_logs((ISrsEncoder*)v, v->size(), total)) != srs_success) {
return srs_error_wrap(err, "send %d/%d/%d logs", v->size(), i, total);
}
}
@ -819,18 +847,18 @@ srs_error_t SrsClsClient::send_logs(SrsClsSugars* sugars)
return err;
}
srs_error_t srs_cls_dump_summaries(SrsClsSugars* sugars)
srs_error_t SrsClsClient::dump_summaries(SrsClsSugars* sugars)
{
srs_error_t err = srs_success;
// Ignore if disabled.
if (!_srs_cls->enabled() || !_srs_cls->stat_heartbeat()) {
if (!enabled_ || !stat_heartbeat_) {
return err;
}
// Whether it's time to report heartbeat.
static int nn_heartbeat = -1;
bool interval_ok = nn_heartbeat == -1 || ++nn_heartbeat >= _srs_cls->heartbeat_ratio();
bool interval_ok = nn_heartbeat == -1 || ++nn_heartbeat >= heartbeat_ratio_;
if (interval_ok) {
nn_heartbeat = 0;
}
@ -911,18 +939,18 @@ srs_error_t srs_cls_dump_summaries(SrsClsSugars* sugars)
return err;
}
srs_error_t srs_cls_dump_streams(SrsClsSugars* sugars)
srs_error_t SrsClsClient::dump_streams(SrsClsSugars* sugars)
{
srs_error_t err = srs_success;
// Ignore if disabled.
if (!_srs_cls->enabled() || !_srs_cls->stat_streams()) {
if (!enabled_ || !stat_streams_) {
return err;
}
// Whether it's time to report streams.
static int nn_streams = -1;
bool interval_ok = nn_streams == -1 || ++nn_streams >= _srs_cls->streams_ratio();
bool interval_ok = nn_streams == -1 || ++nn_streams >= streams_ratio_;
if (interval_ok) {
nn_streams = 0;
}
@ -936,25 +964,4 @@ srs_error_t srs_cls_dump_streams(SrsClsSugars* sugars)
return err;
}
srs_error_t srs_cls_report()
{
srs_error_t err = srs_success;
SrsClsSugars sugars;
if ((err = srs_cls_dump_summaries(&sugars)) != srs_success) {
return srs_error_wrap(err, "dump summary");
}
if ((err = srs_cls_dump_streams(&sugars)) != srs_success) {
return srs_error_wrap(err, "dump streams");
}
if ((err = _srs_cls->send_logs(&sugars)) != srs_success) {
return srs_error_wrap(err, "cls");
}
return err;
}

View file

@ -32,6 +32,7 @@ public:
virtual uint64_t nb_bytes();
srs_error_t encode(SrsBuffer* b);
public:
bool empty();
SrsClsSugar* kv(std::string k, std::string v);
SrsClsSugar* kvf(std::string k, const char* fmt, ...);
};
@ -69,30 +70,27 @@ private:
std::string endpoint_;
std::string topic_;
private:
SrsClsSugars* sugars_;
uint64_t nn_logs_;
public:
SrsClsClient();
virtual ~SrsClsClient();
public:
bool enabled();
bool stat_heartbeat();
bool stat_streams();
int heartbeat_ratio();
int streams_ratio();
std::string label();
std::string tag();
uint64_t nn_logs();
public:
srs_error_t initialize();
srs_error_t report();
private:
srs_error_t send_log(ISrsEncoder* sugar, int count, int total);
public:
srs_error_t do_send_logs(ISrsEncoder* sugar, int count, int total);
srs_error_t send_logs(SrsClsSugars* sugars);
srs_error_t dump_summaries(SrsClsSugars* sugars);
srs_error_t dump_streams(SrsClsSugars* sugars);
};
extern SrsClsClient* _srs_cls;
srs_error_t srs_cls_report();
#endif