1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Log: Support write log to tencentcloud CLS. v5.0.44

This commit is contained in:
winlin 2022-08-07 12:11:37 +08:00
parent 3da0b57121
commit 95cd0e84eb
15 changed files with 1631 additions and 7 deletions

View file

@ -59,6 +59,10 @@ const char* _srs_version = "XCORE-" RTMP_SIG_SRS_SERVER;
#define SRS_CR (char)SRS_CONSTS_CR
// Overwrite the config by env.
#define SRS_OVERWRITE_BY_ENV_STRING(key) if (getenv(key)) return getenv(key)
#define SRS_OVERWRITE_BY_ENV_BOOL(key) if (getenv(key)) return SRS_CONF_PERFER_FALSE(string(getenv(key)))
#define SRS_OVERWRITE_BY_ENV_BOOL2(key) if (getenv(key)) return SRS_CONF_PERFER_TRUE(string(getenv(key)))
#define SRS_OVERWRITE_BY_ENV_INT(key) if (getenv(key)) return ::atoi(getenv(key))
#define SRS_OVERWRITE_BY_ENV_SECONDS(key) if (getenv(key)) return ::atoi(getenv(key)) * SRS_UTIME_SECONDS
/**
@ -2393,7 +2397,7 @@ srs_error_t SrsConfig::check_normal_config()
&& n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker"
&& n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate"
&& n != "query_latest_version" && n != "first_wait_for_qlv" && n != "threads"
&& n != "circuit_breaker" && n != "is_full" && n != "in_docker"
&& n != "circuit_breaker" && n != "is_full" && n != "in_docker" && n != "tencentcloud_cls"
) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str());
}
@ -3371,6 +3375,234 @@ int SrsConfig::get_dying_pulse()
return ::atoi(conf->arg0().c_str());
}
bool SrsConfig::get_tencentcloud_cls_enabled()
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_TENCENTCLOUD_CLS_ENABLED");
static bool DEFAULT = false;
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("enabled");
if (!conf) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
bool SrsConfig::get_tencentcloud_cls_stat_heartbeat()
{
SRS_OVERWRITE_BY_ENV_BOOL2("SRS_TENCENTCLOUD_CLS_STAT_HEARTBEAT");
static bool DEFAULT = true;
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("stat_heartbeat");
if (!conf) {
return DEFAULT;
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
bool SrsConfig::get_tencentcloud_cls_stat_streams()
{
SRS_OVERWRITE_BY_ENV_BOOL2("SRS_TENCENTCLOUD_CLS_STAT_STREAMS");
static bool DEFAULT = true;
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("stat_streams");
if (!conf) {
return DEFAULT;
}
return SRS_CONF_PERFER_TRUE(conf->arg0());
}
bool SrsConfig::get_tencentcloud_cls_debug_logging()
{
SRS_OVERWRITE_BY_ENV_BOOL("SRS_TENCENTCLOUD_CLS_DEBUG_LOGGING");
static bool DEFAULT = false;
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("debug_logging");
if (!conf) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
int SrsConfig::get_tencentcloud_cls_heartbeat_ratio()
{
SRS_OVERWRITE_BY_ENV_INT("SRS_TENCENTCLOUD_CLS_HEARTBEAT_RATIO");
static int DEFAULT = 1;
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("heartbeat_ratio");
if (!conf) {
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
}
int SrsConfig::get_tencentcloud_cls_streams_ratio()
{
SRS_OVERWRITE_BY_ENV_INT("SRS_TENCENTCLOUD_CLS_STREAMS_RATIO");
static int DEFAULT = 1;
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("streams_ratio");
if (!conf) {
return DEFAULT;
}
return ::atoi(conf->arg0().c_str());
}
string SrsConfig::get_tencentcloud_cls_label()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_LABEL");
static string DEFAULT = "";
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("label");
if (!conf) {
return DEFAULT;
}
return conf->arg0();
}
string SrsConfig::get_tencentcloud_cls_tag()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_TAG");
static string DEFAULT = "";
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("tag");
if (!conf) {
return DEFAULT;
}
return conf->arg0();
}
string SrsConfig::get_tencentcloud_cls_secret_id()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_SECRET_ID");
static string DEFAULT = "";
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("secret_id");
if (!conf) {
return DEFAULT;
}
return conf->arg0();
}
string SrsConfig::get_tencentcloud_cls_secret_key()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_SECRET_KEY");
static string DEFAULT = "";
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("secret_key");
if (!conf) {
return DEFAULT;
}
return conf->arg0();
}
string SrsConfig::get_tencentcloud_cls_endpoint()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_ENDPOINT");
static string DEFAULT = "";
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("endpoint");
if (!conf) {
return DEFAULT;
}
return conf->arg0();
}
string SrsConfig::get_tencentcloud_cls_topic_id()
{
SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_CLS_TOPIC_ID");
static string DEFAULT = "";
SrsConfDirective* conf = root->get("tencentcloud_cls");
if (!conf) {
return DEFAULT;
}
conf = conf->get("topic_id");
if (!conf) {
return DEFAULT;
}
return conf->arg0();
}
vector<SrsConfDirective*> SrsConfig::get_stream_casters()
{
srs_assert(root);

View file

@ -460,6 +460,20 @@ public:
virtual int get_critical_pulse();
virtual int get_dying_threshold();
virtual int get_dying_pulse();
// TencentCloud service section.
public:
virtual bool get_tencentcloud_cls_enabled();
virtual bool get_tencentcloud_cls_stat_heartbeat();
virtual bool get_tencentcloud_cls_stat_streams();
virtual bool get_tencentcloud_cls_debug_logging();
virtual int get_tencentcloud_cls_heartbeat_ratio();
virtual int get_tencentcloud_cls_streams_ratio();
virtual std::string get_tencentcloud_cls_label();
virtual std::string get_tencentcloud_cls_tag();
virtual std::string get_tencentcloud_cls_secret_id();
virtual std::string get_tencentcloud_cls_secret_key();
virtual std::string get_tencentcloud_cls_endpoint();
virtual std::string get_tencentcloud_cls_topic_id();
// stream_caster section
public:
// Get all stream_caster in config file.

View file

@ -12,6 +12,7 @@
#include <srs_protocol_st.hpp>
#include <srs_app_utility.hpp>
#include <srs_app_dvr.hpp>
#include <srs_app_tencentcloud.hpp>
using namespace std;
@ -180,6 +181,11 @@ srs_error_t SrsHybridServer::initialize()
return srs_error_wrap(err, "dvr async");
}
// Initialize TencentCloud CLS object.
if ((err = _srs_cls->initialize()) != srs_success) {
return srs_error_wrap(err, "cls client");
}
// Register some timers.
timer20ms_->subscribe(clock_monitor_);
timer5s_->subscribe(this);
@ -384,6 +390,12 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval)
thread_desc.c_str(), free_desc.c_str(), objs_desc.c_str()
);
// Report logs to CLS if enabled.
if ((err = srs_cls_report()) != srs_success) {
srs_warn("ignore cls err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
return err;
}

View file

@ -18,6 +18,8 @@ using namespace std;
#include <srs_kernel_utility.hpp>
#include <srs_protocol_amf0.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_app_tencentcloud.hpp>
#include <srs_kernel_kbps.hpp>
string srs_generate_stat_vid()
{
@ -99,13 +101,14 @@ SrsStatisticStream::SrsStatisticStream()
kbps->set_io(NULL, NULL);
nb_clients = 0;
nb_frames = 0;
frames = new SrsPps();
}
SrsStatisticStream::~SrsStatisticStream()
{
srs_freep(kbps);
srs_freep(clk);
srs_freep(frames);
}
srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj)
@ -118,7 +121,7 @@ srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj)
obj->set("app", SrsJsonAny::str(app.c_str()));
obj->set("live_ms", SrsJsonAny::integer(srsu2ms(srs_get_system_time())));
obj->set("clients", SrsJsonAny::integer(nb_clients));
obj->set("frames", SrsJsonAny::integer(nb_frames));
obj->set("frames", SrsJsonAny::integer(frames->sugar));
obj->set("send_bytes", SrsJsonAny::integer(kbps->get_send_bytes()));
obj->set("recv_bytes", SrsJsonAny::integer(kbps->get_recv_bytes()));
@ -369,7 +372,7 @@ srs_error_t SrsStatistic::on_video_frames(SrsRequest* req, int nb_frames)
SrsStatisticVhost* vhost = create_vhost(req);
SrsStatisticStream* stream = create_stream(vhost, req);
stream->nb_frames += nb_frames;
stream->frames->sugar += nb_frames;
return err;
}
@ -508,6 +511,7 @@ SrsKbps* SrsStatistic::kbps_sample()
for (it = streams.begin(); it != streams.end(); it++) {
SrsStatisticStream* stream = it->second;
stream->kbps->sample();
stream->frames->update();
}
}
if (true) {
@ -610,6 +614,65 @@ void SrsStatistic::dumps_hints_kv(std::stringstream & ss)
}
}
void SrsStatistic::dumps_cls_summaries(SrsClsSugar* sugar)
{
if (!vhosts.empty()) {
sugar->kvf("vhosts", "%d", (int) vhosts.size());
}
if (!streams.empty()) {
sugar->kvf("streams", "%d", (int) streams.size());
}
if (!clients.empty()) {
sugar->kvf("clients", "%d", (int) clients.size());
}
}
void SrsStatistic::dumps_cls_streams(SrsClsSugars* sugars)
{
for (std::map<std::string, SrsStatisticStream*>::iterator it = streams.begin(); it != streams.end(); ++it) {
SrsStatisticStream* stream = it->second;
if (!stream->active || !stream->nb_clients) {
continue;
}
SrsClsSugar* sugar = sugars->create();
sugar->kv("hint", "stream");
sugar->kv("version", RTMP_SIG_SRS_VERSION);
sugar->kvf("pid", "%d", getpid());
sugar->kv("sid", stream->id);
sugar->kv("url", stream->url);
if (stream->frames->r30s()) {
sugar->kvf("fps", "%d", stream->frames->r30s());
}
if (stream->width) {
sugar->kvf("width", "%d", stream->width);
}
if (stream->height) {
sugar->kvf("height", "%d", stream->height);
}
SrsStatisticClient* pub = find_client(stream->publisher_id);
if (pub) {
if (pub->kbps->get_recv_kbps_30s()) {
sugar->kvf("recv", "%d", pub->kbps->get_recv_kbps_30s());
}
if (pub->kbps->get_send_kbps_30s()) {
sugar->kvf("send", "%d", pub->kbps->get_send_kbps_30s());
}
}
sugar->kvf("clients", "%d", stream->nb_clients);
if (stream->kbps->get_recv_kbps_30s()) {
sugar->kvf("recv2", "%d", stream->kbps->get_recv_kbps_30s());
}
if (stream->kbps->get_send_kbps_30s()) {
sugar->kvf("send2", "%d", stream->kbps->get_send_kbps_30s());
}
}
}
SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req)
{
SrsStatisticVhost* vhost = NULL;

View file

@ -24,6 +24,9 @@ class ISrsExpire;
class SrsJsonObject;
class SrsJsonArray;
class ISrsKbpsDelta;
class SrsClsSugar;
class SrsClsSugars;
class SrsPps;
struct SrsStatisticVhost
{
@ -55,11 +58,12 @@ public:
// The publisher connection id.
std::string publisher_id;
int nb_clients;
uint64_t nb_frames;
public:
// The stream total kbps.
SrsKbps* kbps;
SrsWallClock* clk;
// The fps of stream.
SrsPps* frames;
public:
bool has_video;
SrsVideoCodecId vcodec;
@ -200,6 +204,10 @@ public:
virtual srs_error_t dumps_clients(SrsJsonArray* arr, int start, int count);
// Dumps the hints about SRS server.
void dumps_hints_kv(std::stringstream & ss);
public:
// Dumps the CLS summary.
void dumps_cls_summaries(SrsClsSugar* sugar);
void dumps_cls_streams(SrsClsSugars* sugars);
private:
virtual SrsStatisticVhost* create_vhost(SrsRequest* req);
virtual SrsStatisticStream* create_stream(SrsStatisticVhost* vhost, SrsRequest* req);

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,95 @@
//
// Copyright (c) 2013-2022 The SRS Authors
//
// SPDX-License-Identifier: MIT or MulanPSL-2.0
//
#ifndef SRS_APP_TENCENTCLOUD_HPP
#define SRS_APP_TENCENTCLOUD_HPP
#include <srs_core.hpp>
#include <srs_kernel_buffer.hpp>
#include <string>
#include <vector>
class SrsBuffer;
class SrsClsLogGroupList;
class SrsClsLogGroup;
class SrsClsLog;
class SrsClsSugar : public ISrsEncoder
{
private:
SrsClsLog* log_;
SrsClsLogGroup* log_group_;
SrsClsLogGroupList* log_groups_;
public:
SrsClsSugar();
virtual ~SrsClsSugar();
public:
virtual uint64_t nb_bytes();
srs_error_t encode(SrsBuffer* b);
public:
SrsClsSugar* kv(std::string k, std::string v);
SrsClsSugar* kvf(std::string k, const char* fmt, ...);
};
class SrsClsSugars : public ISrsEncoder
{
private:
std::vector<SrsClsSugar*> sugars;
public:
SrsClsSugars();
virtual ~SrsClsSugars();
public:
virtual uint64_t nb_bytes();
srs_error_t encode(SrsBuffer* b);
public:
SrsClsSugar* create();
SrsClsSugars* slice(int max_size);
bool empty();
int size();
};
class SrsClsClient
{
private:
bool enabled_;
bool stat_heartbeat_;
bool stat_streams_;
bool debug_logging_;
int heartbeat_ratio_;
int streams_ratio_;
std::string label_;
std::string tag_;
private:
std::string secret_id_;
std::string endpoint_;
std::string topic_;
public:
SrsClsClient();
virtual ~SrsClsClient();
public:
bool enabled();
bool stat_heartbeat();
bool stat_streams();
int heartbeat_ratio();
int streams_ratio();
std::string label();
std::string tag();
public:
srs_error_t initialize();
private:
srs_error_t send_log(ISrsEncoder* sugar, int count, int total);
public:
srs_error_t send_logs(SrsClsSugars* sugars);
};
extern SrsClsClient* _srs_cls;
srs_error_t srs_cls_report();
#endif

View file

@ -16,6 +16,7 @@
#include <srs_app_rtc_server.hpp>
#include <srs_app_log.hpp>
#include <srs_app_async_call.hpp>
#include <srs_app_tencentcloud.hpp>
#ifdef SRS_RTC
#include <srs_app_rtc_dtls.hpp>
@ -438,6 +439,9 @@ srs_error_t srs_global_initialize()
// Create global async worker for DVR.
_srs_dvr_async = new SrsAsyncCallWorker();
// Initialize global TencentCloud CLS object.
_srs_cls = new SrsClsClient();
return err;
}

View file

@ -9,6 +9,6 @@
#define VERSION_MAJOR 5
#define VERSION_MINOR 0
#define VERSION_REVISION 43
#define VERSION_REVISION 44
#endif

View file

@ -99,6 +99,9 @@
#define ERROR_SOCKET_ACCEPT 1081
#define ERROR_THREAD_CREATE 1082
#define ERROR_THREAD_FINISHED 1083
#define ERROR_PB_NO_SPACE 1084
#define ERROR_CLS_INVALID_CONFIG 1085
#define ERROR_CLS_EXCEED_SIZE 1086
///////////////////////////////////////////////////////
// RTMP protocol error.

View file

@ -91,6 +91,11 @@ int SrsPps::r10s()
return sample_10s_.rate;
}
int SrsPps::r30s()
{
return sample_30s_.rate;
}
SrsWallClock::SrsWallClock()
{
}

View file

@ -53,6 +53,8 @@ public:
void update(int64_t nn);
// Get the 10s average stat.
int r10s();
// Get the 30s average stat.
int r30s();
};
/**