mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
APM: Support distributed tracing by Tencent Cloud APM. v5.0.63
This commit is contained in:
parent
736c661808
commit
3e2f8622f8
49 changed files with 4989 additions and 719 deletions
|
|
@ -1826,6 +1826,13 @@ srs_error_t SrsConfig::parse_options(int argc, char** argv)
|
|||
}
|
||||
}
|
||||
|
||||
// Overwrite the config by env SRS_CONFIG_FILE.
|
||||
if (::getenv("SRS_CONFIG_FILE")) {
|
||||
string ov = config_file;
|
||||
config_file = ::getenv("SRS_CONFIG_FILE");
|
||||
srs_trace("ENV: Overwrite config %s to %s", ov.c_str(), config_file.c_str());
|
||||
}
|
||||
|
||||
// Parse the matched config file.
|
||||
err = parse_file(config_file.c_str());
|
||||
|
||||
|
|
@ -2211,7 +2218,7 @@ srs_error_t SrsConfig::check_normal_config()
|
|||
std::string n = conf->name;
|
||||
if (n != "listen" && n != "pid" && n != "chunk_size" && n != "ff_log_dir"
|
||||
&& n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file"
|
||||
&& n != "max_connections" && n != "daemon" && n != "heartbeat"
|
||||
&& n != "max_connections" && n != "daemon" && n != "heartbeat" && n != "tencentcloud_apm"
|
||||
&& n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms"
|
||||
&& n != "http_server" && n != "stream_caster" && n != "rtc_server" && n != "srt_server"
|
||||
&& n != "utc_time" && n != "work_dir" && n != "asprocess" && n != "server_id"
|
||||
|
|
@ -3442,6 +3449,101 @@ string SrsConfig::get_tencentcloud_cls_topic_id()
|
|||
return conf->arg0();
|
||||
}
|
||||
|
||||
bool SrsConfig::get_tencentcloud_apm_enabled()
|
||||
{
|
||||
SRS_OVERWRITE_BY_ENV_BOOL("SRS_TENCENTCLOUD_APM_ENABLED");
|
||||
|
||||
static bool DEFAULT = false;
|
||||
|
||||
SrsConfDirective* conf = root->get("tencentcloud_apm");
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
conf = conf->get("enabled");
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
return SRS_CONF_PERFER_FALSE(conf->arg0());
|
||||
}
|
||||
|
||||
string SrsConfig::get_tencentcloud_apm_token()
|
||||
{
|
||||
SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_APM_TOKEN");
|
||||
|
||||
static string DEFAULT = "";
|
||||
|
||||
SrsConfDirective* conf = root->get("tencentcloud_apm");
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
conf = conf->get("token");
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
return conf->arg0();
|
||||
}
|
||||
|
||||
string SrsConfig::get_tencentcloud_apm_endpoint()
|
||||
{
|
||||
SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_APM_ENDPOINT");
|
||||
|
||||
static string DEFAULT = "";
|
||||
|
||||
SrsConfDirective* conf = root->get("tencentcloud_apm");
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
conf = conf->get("endpoint");
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
return conf->arg0();
|
||||
}
|
||||
|
||||
string SrsConfig::get_tencentcloud_apm_service_name()
|
||||
{
|
||||
SRS_OVERWRITE_BY_ENV_STRING("SRS_TENCENTCLOUD_APM_SERVICE_NAME");
|
||||
|
||||
static string DEFAULT = "srs-server";
|
||||
|
||||
SrsConfDirective* conf = root->get("tencentcloud_apm");
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
conf = conf->get("service_name");
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
return conf->arg0();
|
||||
}
|
||||
|
||||
bool SrsConfig::get_tencentcloud_apm_debug_logging()
|
||||
{
|
||||
SRS_OVERWRITE_BY_ENV_BOOL("SRS_TENCENTCLOUD_APM_DEBUG_LOGGING");
|
||||
|
||||
static bool DEFAULT = false;
|
||||
|
||||
SrsConfDirective* conf = root->get("tencentcloud_apm");
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
conf = conf->get("debug_logging");
|
||||
if (!conf) {
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
return SRS_CONF_PERFER_FALSE(conf->arg0());
|
||||
}
|
||||
|
||||
vector<SrsConfDirective*> SrsConfig::get_stream_casters()
|
||||
{
|
||||
srs_assert(root);
|
||||
|
|
|
|||
|
|
@ -469,6 +469,11 @@ public:
|
|||
virtual std::string get_tencentcloud_cls_secret_key();
|
||||
virtual std::string get_tencentcloud_cls_endpoint();
|
||||
virtual std::string get_tencentcloud_cls_topic_id();
|
||||
virtual bool get_tencentcloud_apm_enabled();
|
||||
virtual std::string get_tencentcloud_apm_token();
|
||||
virtual std::string get_tencentcloud_apm_endpoint();
|
||||
virtual std::string get_tencentcloud_apm_service_name();
|
||||
virtual bool get_tencentcloud_apm_debug_logging();
|
||||
// stream_caster section
|
||||
public:
|
||||
// Get all stream_caster in config file.
|
||||
|
|
|
|||
|
|
@ -614,7 +614,7 @@ srs_error_t SrsBufferedReadWriter::read(void* buf, size_t size, ssize_t* nread)
|
|||
return io_->read(buf, size, nread);
|
||||
}
|
||||
|
||||
int nn = srs_min(buf_->left(), size);
|
||||
int nn = srs_min(buf_->left(), (int)size);
|
||||
*nread = nn;
|
||||
|
||||
if (nn) {
|
||||
|
|
@ -629,7 +629,7 @@ srs_error_t SrsBufferedReadWriter::read_fully(void* buf, size_t size, ssize_t* n
|
|||
return io_->read_fully(buf, size, nread);
|
||||
}
|
||||
|
||||
int nn = srs_min(buf_->left(), size);
|
||||
int nn = srs_min(buf_->left(), (int)size);
|
||||
if (nn) {
|
||||
buf_->read_bytes((char*)buf, nn);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ using namespace std;
|
|||
#include <srs_kernel_flv.hpp>
|
||||
#include <srs_kernel_buffer.hpp>
|
||||
#include <srs_protocol_amf0.hpp>
|
||||
#include <srs_app_http_client.hpp>
|
||||
#include <srs_app_tencentcloud.hpp>
|
||||
|
||||
// when edge timeout, retry next.
|
||||
#define SRS_EDGE_INGESTER_TIMEOUT (5 * SRS_UTIME_SECONDS)
|
||||
|
|
@ -107,7 +109,11 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
|
|||
srs_utime_t cto = SRS_EDGE_INGESTER_TIMEOUT;
|
||||
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
|
||||
sdk = new SrsSimpleRtmpClient(url, cto, sto);
|
||||
|
||||
|
||||
// Create a client span and store it to an AMF0 propagator.
|
||||
ISrsApmSpan* span_client = _srs_apm->inject(_srs_apm->span("edge-pull")->set_kind(SrsApmKindClient)->as_child(_srs_apm->load()), sdk->extra_args());
|
||||
SrsAutoFree(ISrsApmSpan, span_client);
|
||||
|
||||
if ((err = sdk->connect()) != srs_success) {
|
||||
return srs_error_wrap(err, "edge pull %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
|
||||
}
|
||||
|
|
@ -387,6 +393,7 @@ SrsEdgeIngester::SrsEdgeIngester()
|
|||
source = NULL;
|
||||
edge = NULL;
|
||||
req = NULL;
|
||||
span_main_ = NULL;
|
||||
|
||||
upstream = new SrsEdgeRtmpUpstream("");
|
||||
lb = new SrsLbRoundRobin();
|
||||
|
|
@ -396,7 +403,8 @@ SrsEdgeIngester::SrsEdgeIngester()
|
|||
SrsEdgeIngester::~SrsEdgeIngester()
|
||||
{
|
||||
stop();
|
||||
|
||||
|
||||
srs_freep(span_main_);
|
||||
srs_freep(upstream);
|
||||
srs_freep(lb);
|
||||
srs_freep(trd);
|
||||
|
|
@ -407,7 +415,12 @@ srs_error_t SrsEdgeIngester::initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsReq
|
|||
source = s;
|
||||
edge = e;
|
||||
req = r;
|
||||
|
||||
|
||||
// We create a dedicate span for edge ingester, and all players will link to this one.
|
||||
// Note that we use a producer span and end it immediately.
|
||||
srs_assert(!span_main_);
|
||||
span_main_ = _srs_apm->span("edge")->set_kind(SrsApmKindProducer)->end();
|
||||
|
||||
return srs_success;
|
||||
}
|
||||
|
||||
|
|
@ -445,12 +458,23 @@ string SrsEdgeIngester::get_curr_origin()
|
|||
return lb->selected();
|
||||
}
|
||||
|
||||
ISrsApmSpan* SrsEdgeIngester::span()
|
||||
{
|
||||
srs_assert(span_main_);
|
||||
return span_main_;
|
||||
}
|
||||
|
||||
// when error, edge ingester sleep for a while and retry.
|
||||
#define SRS_EDGE_INGESTER_CIMS (3 * SRS_UTIME_SECONDS)
|
||||
|
||||
srs_error_t SrsEdgeIngester::cycle()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// Save span from parent coroutine to current coroutine context, so that we can load if in this coroutine, for
|
||||
// example to use it in SrsEdgeRtmpUpstream which use RTMP or FLV client to connect to upstream server.
|
||||
_srs_apm->store(span_main_);
|
||||
srs_assert(span_main_);
|
||||
|
||||
while (true) {
|
||||
// We always check status first.
|
||||
|
|
@ -459,11 +483,19 @@ srs_error_t SrsEdgeIngester::cycle()
|
|||
return srs_error_wrap(err, "edge ingester");
|
||||
}
|
||||
|
||||
srs_assert(span_main_);
|
||||
ISrsApmSpan* start = _srs_apm->span("edge-start")->set_kind(SrsApmKindConsumer)->as_child(span_main_)->end();
|
||||
srs_freep(start);
|
||||
|
||||
if ((err = do_cycle()) != srs_success) {
|
||||
srs_warn("EdgeIngester: Ignore error, %s", srs_error_desc(err).c_str());
|
||||
srs_freep(err);
|
||||
}
|
||||
|
||||
srs_assert(span_main_);
|
||||
ISrsApmSpan* stop = _srs_apm->span("edge-stop")->set_kind(SrsApmKindConsumer)->as_child(span_main_)->end();
|
||||
srs_freep(stop);
|
||||
|
||||
srs_usleep(SRS_EDGE_INGESTER_CIMS);
|
||||
}
|
||||
|
||||
|
|
@ -700,7 +732,7 @@ srs_error_t SrsEdgeForwarder::initialize(SrsLiveSource* s, SrsPublishEdge* e, Sr
|
|||
source = s;
|
||||
edge = e;
|
||||
req = r;
|
||||
|
||||
|
||||
return srs_success;
|
||||
}
|
||||
|
||||
|
|
@ -733,6 +765,11 @@ srs_error_t SrsEdgeForwarder::start()
|
|||
srs_utime_t cto = SRS_EDGE_FORWARDER_TIMEOUT;
|
||||
srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;
|
||||
sdk = new SrsSimpleRtmpClient(url, cto, sto);
|
||||
|
||||
// Create a client span and store it to an AMF0 propagator.
|
||||
// Note that we are able to load the span from coroutine context because in the same coroutine.
|
||||
ISrsApmSpan* span_client = _srs_apm->inject(_srs_apm->span("edge-push")->set_kind(SrsApmKindClient)->as_child(_srs_apm->load()), sdk->extra_args());
|
||||
SrsAutoFree(ISrsApmSpan, span_client);
|
||||
|
||||
if ((err = sdk->connect()) != srs_success) {
|
||||
return srs_error_wrap(err, "sdk connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
|
||||
|
|
@ -919,6 +956,14 @@ srs_error_t SrsPlayEdge::on_client_play()
|
|||
} else if (state == SrsEdgeStateIngestStopping) {
|
||||
return srs_error_new(ERROR_RTMP_EDGE_PLAY_STATE, "state is stopping");
|
||||
}
|
||||
|
||||
// APM bind client span to edge span, which fetch stream from upstream server.
|
||||
// We create a new span to link the two span, because these two spans might be ended.
|
||||
if (ingester->span() && _srs_apm->load()) {
|
||||
ISrsApmSpan* from = _srs_apm->span("play-link")->as_child(_srs_apm->load());
|
||||
ISrsApmSpan* to = _srs_apm->span("edge-link")->as_child(ingester->span())->link(from);
|
||||
srs_freep(from); srs_freep(to);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ class SrsHttpClient;
|
|||
class ISrsHttpMessage;
|
||||
class SrsHttpFileReader;
|
||||
class SrsFlvDecoder;
|
||||
class ISrsApmSpan;
|
||||
|
||||
// The state of edge, auto machine
|
||||
enum SrsEdgeState
|
||||
|
|
@ -142,6 +143,7 @@ private:
|
|||
SrsCoroutine* trd;
|
||||
SrsLbRoundRobin* lb;
|
||||
SrsEdgeUpstream* upstream;
|
||||
ISrsApmSpan* span_main_;
|
||||
public:
|
||||
SrsEdgeIngester();
|
||||
virtual ~SrsEdgeIngester();
|
||||
|
|
@ -150,6 +152,8 @@ public:
|
|||
virtual srs_error_t start();
|
||||
virtual void stop();
|
||||
virtual std::string get_curr_origin();
|
||||
// Get the current main span. Note that it might be NULL.
|
||||
ISrsApmSpan* span();
|
||||
// Interface ISrsReusableThread2Handler
|
||||
public:
|
||||
virtual srs_error_t cycle();
|
||||
|
|
|
|||
|
|
@ -185,6 +185,9 @@ srs_error_t SrsHybridServer::initialize()
|
|||
if ((err = _srs_cls->initialize()) != srs_success) {
|
||||
return srs_error_wrap(err, "cls client");
|
||||
}
|
||||
if ((err = _srs_apm->initialize()) != srs_success) {
|
||||
return srs_error_wrap(err, "apm client");
|
||||
}
|
||||
|
||||
// Register some timers.
|
||||
timer20ms_->subscribe(clock_monitor_);
|
||||
|
|
@ -396,6 +399,12 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval)
|
|||
srs_freep(err);
|
||||
}
|
||||
|
||||
// Report logs to APM if enabled.
|
||||
if ((err = _srs_apm->report()) != srs_success) {
|
||||
srs_warn("ignore apm err %s", srs_error_desc(err).c_str());
|
||||
srs_freep(err);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ using namespace std;
|
|||
#include <srs_protocol_utility.hpp>
|
||||
#include <srs_protocol_json.hpp>
|
||||
#include <srs_app_rtc_source.hpp>
|
||||
#include <srs_app_tencentcloud.hpp>
|
||||
|
||||
// the timeout in srs_utime_t to wait encoder to republish
|
||||
// if timeout, close the connection.
|
||||
|
|
@ -100,6 +101,9 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip, int cport)
|
|||
ip = cip;
|
||||
port = cport;
|
||||
create_time = srsu2ms(srs_get_system_time());
|
||||
span_main_ = _srs_apm->dummy();
|
||||
span_connect_ = _srs_apm->dummy();
|
||||
span_client_ = _srs_apm->dummy();
|
||||
trd = new SrsSTCoroutine("rtmp", this, _srs_context->get_id());
|
||||
|
||||
kbps = new SrsNetworkKbps();
|
||||
|
|
@ -145,6 +149,9 @@ SrsRtmpConn::~SrsRtmpConn()
|
|||
srs_freep(rtmp);
|
||||
srs_freep(refer);
|
||||
srs_freep(security);
|
||||
srs_freep(span_main_);
|
||||
srs_freep(span_connect_);
|
||||
srs_freep(span_client_);
|
||||
}
|
||||
|
||||
std::string SrsRtmpConn::desc()
|
||||
|
|
@ -152,13 +159,27 @@ std::string SrsRtmpConn::desc()
|
|||
return "RtmpConn";
|
||||
}
|
||||
|
||||
std::string srs_ipv4_string(uint32_t rip)
|
||||
{
|
||||
return srs_fmt("%d.%d.%d.%d", uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));
|
||||
}
|
||||
|
||||
// TODO: return detail message when error for client.
|
||||
srs_error_t SrsRtmpConn::do_cycle()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// We should keep the root span to alive util connection closed.
|
||||
// Note that we use producer and consumer span because RTMP connection is long polling connection.
|
||||
// Note that we also store this span in coroutine context, so that edge could load it.
|
||||
srs_freep(span_main_);
|
||||
span_main_ = _srs_apm->span("rtmp")->set_kind(SrsApmKindServer)->attr("cip", ip)
|
||||
->attr("cid", _srs_context->get_id().c_str());
|
||||
|
||||
srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));
|
||||
|
||||
srs_trace("RTMP client ip=%s:%d, fd=%d, trace=%s, span=%s", ip.c_str(), port, srs_netfd_fileno(stfd),
|
||||
span_main_->format_trace_id(), span_main_->format_span_id()
|
||||
);
|
||||
|
||||
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
|
||||
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
|
||||
|
||||
|
|
@ -167,16 +188,23 @@ srs_error_t SrsRtmpConn::do_cycle()
|
|||
}
|
||||
|
||||
uint32_t rip = rtmp->proxy_real_ip();
|
||||
std::string rips = srs_ipv4_string(rip);
|
||||
if (rip > 0) {
|
||||
srs_trace("RTMP proxy real client ip=%d.%d.%d.%d",
|
||||
uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));
|
||||
srs_trace("RTMP proxy real client ip=%s", rips.c_str());
|
||||
}
|
||||
|
||||
|
||||
// Update the real IP of client, also set the HTTP fields.
|
||||
span_main_->attr("rip", rip ? rips : ip)->attr("http.client_ip", rip ? rips : ip);
|
||||
|
||||
// The span for RTMP connecting to application.
|
||||
srs_freep(span_connect_);
|
||||
span_connect_ = _srs_apm->span("connect")->as_child(span_main_);
|
||||
|
||||
SrsRequest* req = info->req;
|
||||
if ((err = rtmp->connect_app(req)) != srs_success) {
|
||||
return srs_error_wrap(err, "rtmp connect tcUrl");
|
||||
}
|
||||
|
||||
|
||||
// set client ip to request.
|
||||
req->ip = ip;
|
||||
|
||||
|
|
@ -210,6 +238,10 @@ srs_error_t SrsRtmpConn::do_cycle()
|
|||
srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",
|
||||
srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
|
||||
}
|
||||
|
||||
// Load the span from the AMF0 object propagator.
|
||||
// Note that we will update the trace id, so please make sure no spans are ended before this.
|
||||
_srs_apm->extract(span_main_, req->args);
|
||||
}
|
||||
|
||||
if ((err = service_cycle()) != srs_success) {
|
||||
|
|
@ -381,6 +413,9 @@ srs_error_t SrsRtmpConn::service_cycle()
|
|||
if ((err = rtmp->response_connect_app(req, local_ip.c_str())) != srs_success) {
|
||||
return srs_error_wrap(err, "rtmp: response connect app");
|
||||
}
|
||||
|
||||
// Must be a connecting application span.
|
||||
span_connect_->end();
|
||||
|
||||
if ((err = rtmp->on_bw_done()) != srs_success) {
|
||||
return srs_error_wrap(err, "rtmp: on bw down");
|
||||
|
|
@ -439,9 +474,8 @@ srs_error_t SrsRtmpConn::service_cycle()
|
|||
srs_error_t SrsRtmpConn::stream_service_cycle()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
|
||||
SrsRequest* req = info->req;
|
||||
|
||||
if ((err = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != srs_success) {
|
||||
return srs_error_wrap(err, "rtmp: identify client");
|
||||
}
|
||||
|
|
@ -458,12 +492,21 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
|||
req->strip();
|
||||
srs_trace("client identified, type=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%dms",
|
||||
srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->param.c_str(), srsu2msi(req->duration));
|
||||
|
||||
// Start APM only when client is identified, because it might republish.
|
||||
srs_freep(span_client_);
|
||||
span_client_ = _srs_apm->span("client")->as_child(span_connect_)->attr("type", srs_client_type_string(info->type))
|
||||
->attr("url", req->get_stream_url())->attr("http.url", req->get_stream_url());
|
||||
// We store the span to coroutine context, for edge to load it.
|
||||
_srs_apm->store(span_client_);
|
||||
|
||||
// discovery vhost, resolve the vhost from config
|
||||
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
|
||||
if (parsed_vhost) {
|
||||
req->vhost = parsed_vhost->arg0();
|
||||
}
|
||||
span_client_->attr("vhost", req->vhost)->attr("http.host", req->host)->attr("http.server_name", req->vhost)
|
||||
->attr("http.target", srs_fmt("/%s/%s", req->app.c_str(), req->stream.c_str()));
|
||||
|
||||
if (req->schema.empty() || req->vhost.empty() || req->port == 0 || req->app.empty()) {
|
||||
return srs_error_new(ERROR_RTMP_REQ_TCURL, "discovery tcUrl failed, tcUrl=%s, schema=%s, vhost=%s, port=%d, app=%s",
|
||||
|
|
@ -535,6 +578,11 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
|||
if ((err = http_hooks_on_play()) != srs_success) {
|
||||
return srs_error_wrap(err, "rtmp: callback on play");
|
||||
}
|
||||
|
||||
// Must be a client span.
|
||||
span_client_->set_name("play")->end();
|
||||
// We end the connection span because it's a producer and only trace the established.
|
||||
span_main_->end();
|
||||
|
||||
err = playing(source);
|
||||
http_hooks_on_stop();
|
||||
|
|
@ -545,6 +593,11 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
|||
if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {
|
||||
return srs_error_wrap(err, "rtmp: start FMLE publish");
|
||||
}
|
||||
|
||||
// Must be a client span.
|
||||
span_client_->set_name("publish")->end();
|
||||
// We end the connection span because it's a producer and only trace the established.
|
||||
span_main_->end();
|
||||
|
||||
return publishing(source);
|
||||
}
|
||||
|
|
@ -552,6 +605,11 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
|||
if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
|
||||
return srs_error_wrap(err, "rtmp: start HAIVISION publish");
|
||||
}
|
||||
|
||||
// Must be a client span.
|
||||
span_client_->set_name("publish")->end();
|
||||
// We end the connection span because it's a producer and only trace the established.
|
||||
span_main_->end();
|
||||
|
||||
return publishing(source);
|
||||
}
|
||||
|
|
@ -559,6 +617,11 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
|
|||
if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
|
||||
return srs_error_wrap(err, "rtmp: start FLASH publish");
|
||||
}
|
||||
|
||||
// Must be a client span.
|
||||
span_client_->set_name("publish")->end();
|
||||
// We end the connection span because it's a producer and only trace the established.
|
||||
span_main_->end();
|
||||
|
||||
return publishing(source);
|
||||
}
|
||||
|
|
@ -719,6 +782,10 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons
|
|||
|
||||
srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d",
|
||||
srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_msgs, realtime, tcp_nodelay);
|
||||
|
||||
ISrsApmSpan* span = _srs_apm->span("play-cycle")->set_kind(SrsApmKindProducer)->as_child(span_client_)
|
||||
->attr("realtime", srs_fmt("%d", realtime))->end();
|
||||
SrsAutoFree(ISrsApmSpan, span);
|
||||
|
||||
while (true) {
|
||||
// when source is set to expired, disconnect it.
|
||||
|
|
@ -762,6 +829,11 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons
|
|||
srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d",
|
||||
(int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
|
||||
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep), mw_msgs);
|
||||
|
||||
// TODO: Do not use pithy print for frame span.
|
||||
ISrsApmSpan* sample = _srs_apm->span("play-frame")->set_kind(SrsApmKindConsumer)->as_child(span)
|
||||
->attr("msgs", srs_fmt("%d", count))->attr("kbps", srs_fmt("%d", kbps->get_send_kbps_30s()));
|
||||
srs_freep(sample);
|
||||
}
|
||||
|
||||
if (count <= 0) {
|
||||
|
|
@ -887,6 +959,10 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
|
|||
srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
|
||||
srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d", mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), tcp_nodelay);
|
||||
}
|
||||
|
||||
ISrsApmSpan* span = _srs_apm->span("publish-cycle")->set_kind(SrsApmKindProducer)->as_child(span_client_)
|
||||
->attr("timeout", srs_fmt("%d", srsu2msi(publish_normal_timeout)))->end();
|
||||
SrsAutoFree(ISrsApmSpan, span);
|
||||
|
||||
int64_t nb_msgs = 0;
|
||||
uint64_t nb_frames = 0;
|
||||
|
|
@ -935,6 +1011,12 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
|
|||
(int)pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
|
||||
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep),
|
||||
srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout));
|
||||
|
||||
// TODO: Do not use pithy print for frame span.
|
||||
ISrsApmSpan* sample = _srs_apm->span("publish-frame")->set_kind(SrsApmKindConsumer)->as_child(span)
|
||||
->attr("msgs", srs_fmt("%" PRId64, nb_frames))->attr("kbps", srs_fmt("%d", kbps->get_recv_kbps_30s()));
|
||||
srs_freep(sample);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1453,7 +1535,18 @@ srs_error_t SrsRtmpConn::start()
|
|||
|
||||
srs_error_t SrsRtmpConn::cycle()
|
||||
{
|
||||
srs_error_t err = do_cycle();
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// Serve the client.
|
||||
err = do_cycle();
|
||||
|
||||
// Final APM span, parent is the last span, not the root span. Note that only client or server kind will be filtered
|
||||
// for error or exception report.
|
||||
ISrsApmSpan* span_final = _srs_apm->span("final")->set_kind(SrsApmKindServer)->as_child(span_client_);
|
||||
SrsAutoFree(ISrsApmSpan, span_final);
|
||||
if (srs_error_code(err) != 0) {
|
||||
span_final->record_error(err)->set_status(SrsApmStatusError, srs_fmt("fail code=%d", srs_error_code(err)));
|
||||
}
|
||||
|
||||
// Update statistic when done.
|
||||
SrsStatistic* stat = SrsStatistic::instance();
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ class ISrsWakable;
|
|||
class SrsCommonMessage;
|
||||
class SrsPacket;
|
||||
class SrsNetworkDelta;
|
||||
class ISrsApmSpan;
|
||||
|
||||
// The simple rtmp client for SRS.
|
||||
class SrsSimpleRtmpClient : public SrsBasicRtmpClient
|
||||
|
|
@ -117,6 +118,10 @@ private:
|
|||
// The create time in milliseconds.
|
||||
// for current connection to log self create time and calculate the living time.
|
||||
int64_t create_time;
|
||||
// The span for tracing connection establishment.
|
||||
ISrsApmSpan* span_main_;
|
||||
ISrsApmSpan* span_connect_;
|
||||
ISrsApmSpan* span_client_;
|
||||
public:
|
||||
SrsRtmpConn(SrsServer* svr, srs_netfd_t c, std::string cip, int port);
|
||||
virtual ~SrsRtmpConn();
|
||||
|
|
|
|||
|
|
@ -618,13 +618,13 @@ void SrsStatistic::dumps_hints_kv(std::stringstream & ss)
|
|||
void SrsStatistic::dumps_cls_summaries(SrsClsSugar* sugar)
|
||||
{
|
||||
if (!vhosts.empty()) {
|
||||
sugar->kvf("vhosts", "%d", (int) vhosts.size());
|
||||
sugar->kv("vhosts", srs_fmt("%d", (int)vhosts.size()));
|
||||
}
|
||||
if (!streams.empty()) {
|
||||
sugar->kvf("streams", "%d", (int) streams.size());
|
||||
sugar->kv("streams", srs_fmt("%d", (int)streams.size()));
|
||||
}
|
||||
if (!clients.empty()) {
|
||||
sugar->kvf("clients", "%d", (int) clients.size());
|
||||
sugar->kv("clients", srs_fmt("%d", (int)clients.size()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -639,37 +639,37 @@ void SrsStatistic::dumps_cls_streams(SrsClsSugars* sugars)
|
|||
SrsClsSugar* sugar = sugars->create();
|
||||
sugar->kv("hint", "stream");
|
||||
sugar->kv("version", RTMP_SIG_SRS_VERSION);
|
||||
sugar->kvf("pid", "%d", getpid());
|
||||
sugar->kv("pid", srs_fmt("%d", getpid()));
|
||||
|
||||
sugar->kv("sid", stream->id);
|
||||
sugar->kv("url", stream->url);
|
||||
|
||||
if (stream->frames->r30s()) {
|
||||
sugar->kvf("fps", "%d", stream->frames->r30s());
|
||||
sugar->kv("fps", srs_fmt("%d", stream->frames->r30s()));
|
||||
}
|
||||
if (stream->width) {
|
||||
sugar->kvf("width", "%d", stream->width);
|
||||
sugar->kv("width", srs_fmt("%d", stream->width));
|
||||
}
|
||||
if (stream->height) {
|
||||
sugar->kvf("height", "%d", stream->height);
|
||||
sugar->kv("height", srs_fmt("%d", stream->height));
|
||||
}
|
||||
|
||||
SrsStatisticClient* pub = find_client(stream->publisher_id);
|
||||
if (pub) {
|
||||
if (pub->kbps->get_recv_kbps_30s()) {
|
||||
sugar->kvf("recv", "%d", pub->kbps->get_recv_kbps_30s());
|
||||
sugar->kv("recv", srs_fmt("%d", pub->kbps->get_recv_kbps_30s()));
|
||||
}
|
||||
if (pub->kbps->get_send_kbps_30s()) {
|
||||
sugar->kvf("send", "%d", pub->kbps->get_send_kbps_30s());
|
||||
sugar->kv("send", srs_fmt("%d", pub->kbps->get_send_kbps_30s()));
|
||||
}
|
||||
}
|
||||
|
||||
sugar->kvf("clients", "%d", stream->nb_clients);
|
||||
sugar->kv("clients", srs_fmt("%d", stream->nb_clients));
|
||||
if (stream->kbps->get_recv_kbps_30s()) {
|
||||
sugar->kvf("recv2", "%d", stream->kbps->get_recv_kbps_30s());
|
||||
sugar->kv("recv2", srs_fmt("%d", stream->kbps->get_recv_kbps_30s()));
|
||||
}
|
||||
if (stream->kbps->get_send_kbps_30s()) {
|
||||
sugar->kvf("send2", "%d", stream->kbps->get_send_kbps_30s());
|
||||
sugar->kv("send2", srs_fmt("%d", stream->kbps->get_send_kbps_30s()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -18,6 +18,19 @@ class SrsBuffer;
|
|||
class SrsClsLogGroupList;
|
||||
class SrsClsLogGroup;
|
||||
class SrsClsLog;
|
||||
class SrsOtelSpan;
|
||||
class SrsApmContext;
|
||||
class SrsAmf0Object;
|
||||
class SrsApmSpan;
|
||||
class SrsOtelResourceSpans;
|
||||
class SrsOtelResource;
|
||||
class SrsOtelScopeSpans;
|
||||
class SrsOtelAttribute;
|
||||
class SrsOtelAnyValue;
|
||||
class SrsOtelScope;
|
||||
class SrsOtelStatus;
|
||||
class SrsOtelEvent;
|
||||
class SrsOtelLink;
|
||||
|
||||
class SrsClsSugar : public ISrsEncoder
|
||||
{
|
||||
|
|
@ -34,7 +47,6 @@ public:
|
|||
public:
|
||||
bool empty();
|
||||
SrsClsSugar* kv(std::string k, std::string v);
|
||||
SrsClsSugar* kvf(std::string k, const char* fmt, ...);
|
||||
};
|
||||
|
||||
class SrsClsSugars : public ISrsEncoder
|
||||
|
|
@ -92,5 +104,426 @@ private:
|
|||
|
||||
extern SrsClsClient* _srs_cls;
|
||||
|
||||
enum SrsApmKind
|
||||
{
|
||||
SrsApmKindUnspecified = 0,
|
||||
SrsApmKindInternal = 1,
|
||||
SrsApmKindServer = 2,
|
||||
SrsApmKindClient = 3,
|
||||
SrsApmKindProducer = 4,
|
||||
SrsApmKindConsumer = 5,
|
||||
};
|
||||
|
||||
enum SrsApmStatus
|
||||
{
|
||||
SrsApmStatusUnset = 0,
|
||||
SrsApmStatusOk = 1,
|
||||
SrsApmStatusError = 2,
|
||||
};
|
||||
|
||||
class SrsOtelExportTraceServiceRequest : public ISrsEncoder
|
||||
{
|
||||
private:
|
||||
// repeated opentelemetry.proto.trace.v1.ResourceSpans resource_spans = 1;
|
||||
std::vector<SrsOtelResourceSpans*> spans_;
|
||||
public:
|
||||
SrsOtelExportTraceServiceRequest();
|
||||
virtual ~SrsOtelExportTraceServiceRequest();
|
||||
public:
|
||||
SrsOtelResourceSpans* append();
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsOtelResourceSpans : public ISrsEncoder
|
||||
{
|
||||
private:
|
||||
// opentelemetry.proto.resource.v1.Resource resource = 1;
|
||||
SrsOtelResource* resource_;
|
||||
// repeated ScopeSpans scope_spans = 2;
|
||||
std::vector<SrsOtelScopeSpans*> spans_;
|
||||
public:
|
||||
SrsOtelResourceSpans();
|
||||
virtual ~SrsOtelResourceSpans();
|
||||
public:
|
||||
SrsOtelResource* resource();
|
||||
SrsOtelScopeSpans* append();
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsOtelResource : public ISrsEncoder
|
||||
{
|
||||
private:
|
||||
// repeated opentelemetry.proto.common.v1.KeyValue attributes = 1;
|
||||
std::vector<SrsOtelAttribute*> attributes_;
|
||||
public:
|
||||
SrsOtelResource();
|
||||
virtual ~SrsOtelResource();
|
||||
public:
|
||||
SrsOtelResource* add_addr(SrsOtelAttribute* v);
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsOtelAttribute : public ISrsEncoder
|
||||
{
|
||||
private:
|
||||
// string key = 1;
|
||||
std::string key_;
|
||||
// AnyValue value = 2;
|
||||
SrsOtelAnyValue* value_;
|
||||
private:
|
||||
SrsOtelAttribute();
|
||||
public:
|
||||
virtual ~SrsOtelAttribute();
|
||||
public:
|
||||
const std::string& key();
|
||||
public:
|
||||
static SrsOtelAttribute* kv(std::string k, std::string v);
|
||||
static SrsOtelAttribute* kvi(std::string k, int64_t v);
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsOtelAnyValue : public ISrsEncoder
|
||||
{
|
||||
private:
|
||||
// Default to string(1). Please change it if use other value, such as int(3).
|
||||
uint8_t used_field_id_;
|
||||
private:
|
||||
// string string_value = 1;
|
||||
std::string string_value_;
|
||||
// bool bool_value = 2;
|
||||
// int64 int_value = 3;
|
||||
int64_t int_value_;
|
||||
// double double_value = 4;
|
||||
// ArrayValue array_value = 5;
|
||||
// KeyValueList kvlist_value = 6;
|
||||
// bytes bytes_value = 7;
|
||||
public:
|
||||
SrsOtelAnyValue();
|
||||
virtual ~SrsOtelAnyValue();
|
||||
public:
|
||||
SrsOtelAnyValue* set_string(const std::string& v);
|
||||
SrsOtelAnyValue* set_int(int64_t v);
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsOtelScopeSpans : public ISrsEncoder
|
||||
{
|
||||
private:
|
||||
// opentelemetry.proto.common.v1.InstrumentationScope scope = 1;
|
||||
SrsOtelScope* scope_;
|
||||
// repeated Span spans = 2;
|
||||
std::vector<SrsOtelSpan*> spans_;
|
||||
public:
|
||||
SrsOtelScopeSpans();
|
||||
virtual ~SrsOtelScopeSpans();
|
||||
public:
|
||||
SrsOtelScope* scope();
|
||||
SrsOtelScopeSpans* swap(std::vector<SrsOtelSpan*>& spans);
|
||||
int size();
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsOtelScope : public ISrsEncoder
|
||||
{
|
||||
public:
|
||||
//string name = 1;
|
||||
std::string name_;
|
||||
//string version = 2;
|
||||
//repeated KeyValue attributes = 3;
|
||||
//uint32 dropped_attributes_count = 4;
|
||||
public:
|
||||
SrsOtelScope();
|
||||
virtual ~SrsOtelScope();
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsOtelSpan : public ISrsEncoder
|
||||
{
|
||||
public:
|
||||
//bytes trace_id = 1; // This field is required.
|
||||
std::string trace_id_;
|
||||
//bytes span_id = 2; // This field is required.
|
||||
std::string span_id_;
|
||||
//string trace_state = 3;
|
||||
//bytes parent_span_id = 4;
|
||||
std::string parent_span_id_;
|
||||
//string name = 5; // This field is required.
|
||||
std::string name_;
|
||||
//SpanKind kind = 6;
|
||||
SrsApmKind kind_;
|
||||
//fixed64 start_time_unix_nano = 7;
|
||||
int64_t start_time_unix_nano_;
|
||||
//fixed64 end_time_unix_nano = 8;
|
||||
int64_t end_time_unix_nano_;
|
||||
//repeated opentelemetry.proto.common.v1.KeyValue attributes = 9;
|
||||
std::vector<SrsOtelAttribute*> attributes_;
|
||||
//uint32 dropped_attributes_count = 10;
|
||||
//repeated Event events = 11;
|
||||
std::vector<SrsOtelEvent*> events_;
|
||||
//uint32 dropped_events_count = 12;
|
||||
//repeated Link links = 13;
|
||||
std::vector<SrsOtelLink*> links_;
|
||||
//uint32 dropped_links_count = 14;
|
||||
//Status status = 15;
|
||||
SrsOtelStatus* status_;
|
||||
public:
|
||||
SrsOtelSpan();
|
||||
virtual ~SrsOtelSpan();
|
||||
public:
|
||||
SrsOtelAttribute* attr(const std::string k);
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsOtelEvent : public ISrsEncoder
|
||||
{
|
||||
public:
|
||||
// fixed64 time_unix_nano = 1;
|
||||
int64_t time_;
|
||||
// string name = 2;
|
||||
std::string name_;
|
||||
// repeated opentelemetry.proto.common.v1.KeyValue attributes = 3;
|
||||
std::vector<SrsOtelAttribute*> attributes_;
|
||||
// uint32 dropped_attributes_count = 4;
|
||||
public:
|
||||
SrsOtelEvent();
|
||||
virtual ~SrsOtelEvent();
|
||||
public:
|
||||
static SrsOtelEvent* create(std::string v);
|
||||
SrsOtelEvent* add_attr(SrsOtelAttribute* v);
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsOtelLink : public ISrsEncoder
|
||||
{
|
||||
private:
|
||||
// bytes trace_id = 1;
|
||||
std::string trace_id_;
|
||||
// bytes span_id = 2;
|
||||
std::string span_id_;
|
||||
// string trace_state = 3;
|
||||
// repeated opentelemetry.proto.common.v1.KeyValue attributes = 4;
|
||||
// uint32 dropped_attributes_count = 5;
|
||||
private:
|
||||
SrsOtelLink();
|
||||
public:
|
||||
virtual ~SrsOtelLink();
|
||||
public:
|
||||
static SrsOtelLink* create();
|
||||
SrsOtelLink* set_id(const std::string& trace_id, const std::string& span_id);
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsOtelStatus : public ISrsEncoder
|
||||
{
|
||||
public:
|
||||
//string message = 2;
|
||||
std::string message_;
|
||||
//StatusCode code = 3;
|
||||
SrsApmStatus code_;
|
||||
public:
|
||||
SrsOtelStatus();
|
||||
virtual ~SrsOtelStatus();
|
||||
public:
|
||||
virtual uint64_t nb_bytes();
|
||||
srs_error_t encode(SrsBuffer* b);
|
||||
};
|
||||
|
||||
class SrsApmContext
|
||||
{
|
||||
private:
|
||||
friend class SrsApmSpan;
|
||||
private:
|
||||
SrsApmSpan* span_;
|
||||
SrsApmContext* parent_;
|
||||
std::vector<SrsApmContext*> childs_;
|
||||
// Encode the id in hex string.
|
||||
std::string str_trace_id_;
|
||||
std::string str_span_id_;
|
||||
private:
|
||||
std::string name_;
|
||||
SrsApmKind kind_;
|
||||
std::string trace_id_;
|
||||
std::string span_id_;
|
||||
// Note that parent span id might not be empty, while parent might be NULL, when extract span from propagator.
|
||||
std::string parent_span_id_;
|
||||
srs_utime_t start_time_;
|
||||
SrsApmStatus status_;
|
||||
std::string description_;
|
||||
srs_error_t err_;
|
||||
std::vector<SrsOtelAttribute*> attributes_;
|
||||
std::vector<SrsOtelLink*> links_;
|
||||
private:
|
||||
bool ended_;
|
||||
private:
|
||||
SrsApmContext(const std::string& name);
|
||||
public:
|
||||
virtual ~SrsApmContext();
|
||||
private:
|
||||
// Update the trace id and format it as hex string id.
|
||||
void set_trace_id(std::string v);
|
||||
// Update the span id and format it as hex string id.
|
||||
void set_span_id(std::string v);
|
||||
public:
|
||||
const char* format_trace_id();
|
||||
const char* format_span_id();
|
||||
SrsApmContext* root();
|
||||
void set_parent(SrsApmContext* parent);
|
||||
void set_status(SrsApmStatus status, const std::string& description);
|
||||
bool all_ended();
|
||||
int count_spans();
|
||||
void update_trace_id(std::string v);
|
||||
void link(SrsApmContext* to);
|
||||
void end();
|
||||
};
|
||||
|
||||
class ISrsApmSpan
|
||||
{
|
||||
private:
|
||||
friend class SrsApmClient;
|
||||
public:
|
||||
ISrsApmSpan() {
|
||||
}
|
||||
virtual ~ISrsApmSpan() {
|
||||
}
|
||||
public:
|
||||
// Get the formatted trace ID in hex string.
|
||||
virtual const char* format_trace_id() {
|
||||
return "";
|
||||
}
|
||||
// Get the formatted span ID in hex string.
|
||||
virtual const char* format_span_id() {
|
||||
return "";
|
||||
}
|
||||
public:
|
||||
// Set the name of span.
|
||||
virtual ISrsApmSpan* set_name(const std::string& name) {
|
||||
return this;
|
||||
}
|
||||
// Set the kind of span.
|
||||
virtual ISrsApmSpan* set_kind(SrsApmKind kind) {
|
||||
return this;
|
||||
}
|
||||
// Set span as child span of parent span.
|
||||
virtual ISrsApmSpan* as_child(ISrsApmSpan* parent) {
|
||||
return this;
|
||||
}
|
||||
// Set the status of span, and error description by fmt.
|
||||
// Note that ignore description except for error status.
|
||||
virtual ISrsApmSpan* set_status(SrsApmStatus status, const std::string& description) {
|
||||
return this;
|
||||
}
|
||||
// RecordError will record err as an exception span event for this span. An
|
||||
// additional call to SetStatus is required if the Status of the Span should
|
||||
// be set to Error, as this method does not change the Span status. If this
|
||||
// span is not being recorded or err is nil then this method does nothing.
|
||||
virtual ISrsApmSpan* record_error(srs_error_t err) {
|
||||
return this;
|
||||
}
|
||||
// Add an attribute with all string kv to span.
|
||||
virtual ISrsApmSpan* attr(const std::string& k, const std::string& v) {
|
||||
return this;
|
||||
}
|
||||
// Link with another span.
|
||||
virtual ISrsApmSpan* link(ISrsApmSpan* span) {
|
||||
return this;
|
||||
}
|
||||
// End the span, snapshot and upload(in a while) a otel span to APM server.
|
||||
virtual ISrsApmSpan* end() {
|
||||
return this;
|
||||
}
|
||||
};
|
||||
|
||||
class SrsApmSpan : public ISrsApmSpan
|
||||
{
|
||||
private:
|
||||
friend class SrsApmContext;
|
||||
friend class SrsApmClient;
|
||||
private:
|
||||
bool child_;
|
||||
SrsApmContext* ctx_;
|
||||
private:
|
||||
SrsApmSpan(const std::string& name);
|
||||
public:
|
||||
virtual ~SrsApmSpan();
|
||||
// Span operations.
|
||||
public:
|
||||
const char* format_trace_id();
|
||||
const char* format_span_id();
|
||||
ISrsApmSpan* set_name(const std::string& name);
|
||||
ISrsApmSpan* set_kind(SrsApmKind kind);
|
||||
ISrsApmSpan* as_child(ISrsApmSpan* parent);
|
||||
ISrsApmSpan* set_status(SrsApmStatus status, const std::string& description);
|
||||
ISrsApmSpan* record_error(srs_error_t err);
|
||||
ISrsApmSpan* attr(const std::string& k, const std::string& v);
|
||||
ISrsApmSpan* link(ISrsApmSpan* span);
|
||||
ISrsApmSpan* end();
|
||||
// Inject or extract for propagator.
|
||||
private:
|
||||
ISrsApmSpan* extract(SrsAmf0Object* h);
|
||||
ISrsApmSpan* inject(SrsAmf0Object* h);
|
||||
std::string text_propagator();
|
||||
// Store or load with coroutine context.
|
||||
private:
|
||||
ISrsApmSpan* store();
|
||||
static ISrsApmSpan* load();
|
||||
};
|
||||
|
||||
class SrsApmClient
|
||||
{
|
||||
private:
|
||||
bool enabled_;
|
||||
std::string token_;
|
||||
std::string endpoint_;
|
||||
std::string service_name_;
|
||||
bool debug_logging_;
|
||||
std::vector<SrsOtelSpan*> spans_;
|
||||
public:
|
||||
SrsApmClient();
|
||||
virtual ~SrsApmClient();
|
||||
public:
|
||||
srs_error_t initialize();
|
||||
srs_error_t report();
|
||||
public:
|
||||
// Create a span with specified name.
|
||||
ISrsApmSpan* span(const std::string& name);
|
||||
// Create dummy span for default.
|
||||
ISrsApmSpan* dummy();
|
||||
public:
|
||||
// Extract and inject for propagator.
|
||||
ISrsApmSpan* extract(ISrsApmSpan* v, SrsAmf0Object* h);
|
||||
ISrsApmSpan* inject(ISrsApmSpan* v, SrsAmf0Object* h);
|
||||
// Store the span to coroutine context.
|
||||
void store(ISrsApmSpan* span);
|
||||
// Get or load span from coroutine context.
|
||||
// Note that a dummy span will be returned if no span in coroutine context.
|
||||
// Note that user should never free the returned span.
|
||||
ISrsApmSpan* load();
|
||||
// Internal APIs.
|
||||
public:
|
||||
void snapshot(SrsOtelSpan* span);
|
||||
};
|
||||
|
||||
extern SrsApmClient* _srs_apm;
|
||||
|
||||
#endif
|
||||
|
||||
|
|
|
|||
|
|
@ -441,6 +441,7 @@ srs_error_t srs_global_initialize()
|
|||
|
||||
// Initialize global TencentCloud CLS object.
|
||||
_srs_cls = new SrsClsClient();
|
||||
_srs_apm = new SrsApmClient();
|
||||
|
||||
return err;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue