1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

celldb in-memory mode, stats for actors, perf counters, minor fix in rldp2 (#1164)

* getactorstats query for validator-engine-console

* celldb in-memory mode (--celldb-in-memory option)

* rldp2: bugfix - do not estimate speed while nothing is sent

* add simple ed25519 benchmark

* fix compilation errors of different platforms and move to c++20

* fix some warnings

* turn on TON_USE_ABSEIL for glibc 2.27 nix build

---------

Co-authored-by: birydrad <>
This commit is contained in:
birydrad 2024-09-23 16:34:37 +02:00 committed by GitHub
parent 5f51d3d04f
commit 72020c04c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
100 changed files with 3407 additions and 359 deletions

View file

@ -3,16 +3,19 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
#SOURCE SETS
set(TDACTOR_SOURCE
td/actor/core/ActorExecutor.cpp
td/actor/core/ActorTypeStat.cpp
td/actor/core/CpuWorker.cpp
td/actor/core/IoWorker.cpp
td/actor/core/Scheduler.cpp
td/actor/ActorStats.cpp
td/actor/MultiPromise.cpp
td/actor/actor.h
td/actor/ActorId.h
td/actor/ActorOwn.h
td/actor/ActorShared.h
td/actor/ActorStats.h
td/actor/common.h
td/actor/PromiseFuture.h
td/actor/MultiPromise.h
@ -27,6 +30,7 @@ set(TDACTOR_SOURCE
td/actor/core/ActorMessage.h
td/actor/core/ActorSignals.h
td/actor/core/ActorState.h
td/actor/core/ActorTypeStat.h
td/actor/core/CpuWorker.h
td/actor/core/Context.h
td/actor/core/IoWorker.h

View file

@ -0,0 +1,245 @@
#include "ActorStats.h"
#include "td/utils/ThreadSafeCounter.h"
namespace td {
namespace actor {
void td::actor::ActorStats::start_up() {
auto now = td::Time::now();
for (std::size_t i = 0; i < SIZE; i++) {
stat_[i] = td::TimedStat<StatStorer<ActorTypeStats>>(DURATIONS[i], now);
stat_[i].add_event(ActorTypeStats(), now);
}
begin_ts_ = td::Timestamp::now();
begin_ticks_ = Clocks::rdtsc();
loop();
}
double ActorStats::estimate_inv_ticks_per_second() {
auto now = td::Timestamp::now();
auto elapsed_seconds = now.at() - begin_ts_.at();
auto now_ticks = td::Clocks::rdtsc();
auto elapsed_ticks = now_ticks - begin_ticks_;
auto estimated_inv_ticks_per_second =
elapsed_seconds > 0.1 ? elapsed_seconds / double(elapsed_ticks) : Clocks::inv_ticks_per_second();
return estimated_inv_ticks_per_second;
}
std::string ActorStats::prepare_stats() {
auto estimated_inv_ticks_per_second = estimate_inv_ticks_per_second();
auto current_stats = td::actor::ActorTypeStatManager::get_stats(estimated_inv_ticks_per_second);
auto now = td::Timestamp::now();
auto now_ticks = Clocks::rdtsc();
update(now);
// Lets look at recent stats first
auto load_stats = [&](auto &timed_stat) {
auto res = current_stats;
auto &since = timed_stat.get_stat(now.at());
auto duration = since.get_duration(estimated_inv_ticks_per_second);
if (since.first_) {
res -= since.first_.value();
}
res /= duration;
return res.stats;
};
auto stats_10s = load_stats(stat_[0]);
auto stats_10m = load_stats(stat_[1]);
current_stats /= double(now_ticks - begin_ticks_) * estimated_inv_ticks_per_second;
auto stats_forever = current_stats.stats;
std::map<std::string, double> current_perf_map;
std::map<std::string, double> perf_map_10s;
std::map<std::string, double> perf_map_10m;
std::map<std::string, double> perf_values;
td::NamedPerfCounter::get_default().for_each(
[&](td::Slice name, td::int64 value_int64) { perf_values[name.str()] = double(value_int64); });
for (auto &value_it : perf_values) {
const auto &name = value_it.first;
auto value = value_it.second;
auto &perf_stat = pef_stats_[name];
auto load_perf_stats = [&](auto &timed_stat, auto &m) {
double res = double(value);
auto &since = timed_stat.get_stat(now.at());
auto duration = since.get_duration(estimated_inv_ticks_per_second);
if (since.first_) {
res -= since.first_.value();
}
if (td::ends_with(name, ".duration")) {
res *= estimated_inv_ticks_per_second;
}
// m[name + ".raw"] = res;
// m[name + ".range"] = duration;
res /= duration;
return res;
};
perf_map_10s[name] = load_perf_stats(perf_stat.perf_stat_[0], perf_map_10s);
perf_map_10m[name] = load_perf_stats(perf_stat.perf_stat_[1], perf_map_10m);
auto current_duration = (double(now_ticks - begin_ticks_) * estimated_inv_ticks_per_second);
if (td::ends_with(name, ".duration")) {
value *= estimated_inv_ticks_per_second;
}
current_perf_map[name] = double(value) / current_duration;
// current_perf_map[name + ".raw"] = double(value);
// current_perf_map[name + ".range"] = double(now_ticks - begin_ticks_) * estimated_inv_ticks_per_second;
};
td::StringBuilder sb;
sb << "================================= PERF COUNTERS ================================\n";
sb << "ticks_per_second_estimate\t" << 1.0 / estimated_inv_ticks_per_second << "\n";
for (auto &it : perf_map_10s) {
const std::string &name = it.first;
auto dot_at = name.rfind('.');
CHECK(dot_at != std::string::npos);
auto base_name = name.substr(0, dot_at);
auto rest_name = name.substr(dot_at + 1);
td::Slice new_rest_name = rest_name;
if (rest_name == "count") {
new_rest_name = "qps";
}
if (rest_name == "duration") {
new_rest_name = "load";
}
auto rewrite_name = PSTRING() << base_name << "." << new_rest_name;
sb << rewrite_name << "\t" << perf_map_10s[name] << " " << perf_map_10m[name] << " " << current_perf_map[name]
<< "\n";
}
sb << "\n";
sb << "================================= ACTORS STATS =================================\n";
double max_delay = 0;
ActorTypeStat sum_stat_forever;
ActorTypeStat sum_stat_10m;
ActorTypeStat sum_stat_10s;
for (auto &it : stats_forever) {
sum_stat_forever += it.second;
}
for (auto &it : stats_10m) {
sum_stat_10m += it.second;
}
for (auto &it : stats_10s) {
sum_stat_10s += it.second;
}
sb << "\n";
auto do_describe = [&](auto &&sb, const ActorTypeStat &stat_10s, const ActorTypeStat &stat_10m,
const ActorTypeStat &stat_forever) {
sb() << "load_per_second:\t" << stat_10s.seconds << " " << stat_10m.seconds << " " << stat_forever.seconds << "\n";
sb() << "messages_per_second:\t" << stat_10s.messages << " " << stat_10m.messages << " " << stat_forever.messages
<< "\n";
sb() << "max_execute_messages:\t" << stat_forever.max_execute_messages.value_10s << " "
<< stat_forever.max_execute_messages.value_10m << " " << stat_forever.max_execute_messages.value_forever
<< "\n";
sb() << "max_execute_seconds:\t" << stat_forever.max_execute_seconds.value_10s << "s"
<< " " << stat_forever.max_execute_seconds.value_10m << "s"
<< " " << stat_forever.max_execute_seconds.value_forever << "s\n";
sb() << "max_message_seconds:\t" << stat_forever.max_message_seconds.value_10s << " "
<< stat_forever.max_message_seconds.value_10m << " " << stat_forever.max_message_seconds.value_forever << "\n";
sb() << "created_per_second:\t" << stat_10s.created << " " << stat_10m.created << " " << stat_forever.created
<< "\n";
auto executing_for =
stat_forever.executing_start > 1e15
? 0
: double(td::Clocks::rdtsc()) * estimated_inv_ticks_per_second - stat_forever.executing_start;
sb() << "max_delay:\t" << stat_forever.max_delay_seconds.value_10s << "s "
<< stat_forever.max_delay_seconds.value_10m << "s " << stat_forever.max_delay_seconds.value_forever << "s\n";
sb() << ""
<< "alive: " << stat_forever.alive << " executing: " << stat_forever.executing
<< " max_executing_for: " << executing_for << "s\n";
};
auto describe = [&](td::StringBuilder &sb, std::type_index actor_type_index) {
auto stat_10s = stats_10s[actor_type_index];
auto stat_10m = stats_10m[actor_type_index];
auto stat_forever = stats_forever[actor_type_index];
do_describe([&sb]() -> td::StringBuilder & { return sb << "\t\t"; }, stat_10s, stat_10m, stat_forever);
};
sb << "Cummulative stats:\n";
do_describe([&sb]() -> td::StringBuilder & { return sb << "\t"; }, sum_stat_10s, sum_stat_10m, sum_stat_forever);
sb << "\n";
auto top_k_by = [&](auto &stats_map, size_t k, std::string description, auto by) {
auto stats = td::transform(stats_map, [](auto &it) { return std::make_pair(it.first, it.second); });
k = std::min(k, stats.size());
std::partial_sort(stats.begin(), stats.begin() + k, stats.end(), [&](auto &a, auto &b) { return by(a) > by(b); });
bool is_first = true;
for (size_t i = 0; i < k; i++) {
auto value = by(stats[i]);
if (value < 1e-9) {
break;
}
if (is_first) {
sb << "top actors by " << description << "\n";
is_first = false;
}
sb << "\t#" << i << ": " << ActorTypeStatManager::get_class_name(stats[i].first.name()) << "\t" << value << "\n";
}
sb << "\n";
};
using Entry = std::pair<std::type_index, td::actor::ActorTypeStat>;
static auto cutoff = [](auto x, auto min_value) { return x < min_value ? decltype(x){} : x; };
top_k_by(stats_10s, 10, "load_10s", [](auto &x) { return cutoff(x.second.seconds, 0.005); });
top_k_by(stats_10m, 10, "load_10m", [](auto &x) { return cutoff(x.second.seconds, 0.005); });
top_k_by(stats_forever, 10, "max_execute_seconds_10m",
[](Entry &x) { return cutoff(x.second.max_execute_seconds.value_10m, 0.5); });
auto rdtsc_seconds = double(now_ticks) * estimated_inv_ticks_per_second;
top_k_by(stats_forever, 10, "executing_for", [&](Entry &x) {
if (x.second.executing_start > 1e15) {
return 0.0;
}
return rdtsc_seconds - x.second.executing_start;
});
top_k_by(stats_forever, 10, "max_execute_messages_10m",
[](Entry &x) { return cutoff(x.second.max_execute_messages.value_10m, 10u); });
auto stats = td::transform(stats_forever, [](auto &it) { return std::make_pair(it.first, it.second); });
auto main_key = [&](std::type_index actor_type_index) {
auto stat_10s = stats_10s[actor_type_index];
auto stat_10m = stats_10m[actor_type_index];
auto stat_forever = stats_forever[actor_type_index];
return std::make_tuple(cutoff(std::max(stat_10s.seconds, stat_10m.seconds), 0.1),
cutoff(stat_forever.max_execute_seconds.value_10m, 0.5), stat_forever.seconds);
};
std::sort(stats.begin(), stats.end(),
[&](auto &left, auto &right) { return main_key(left.first) > main_key(right.first); });
auto debug = Debug(SchedulerContext::get()->scheduler_group());
debug.dump(sb);
sb << "All actors:\n";
for (auto &it : stats) {
sb << "\t" << ActorTypeStatManager::get_class_name(it.first.name()) << "\n";
auto key = main_key(it.first);
describe(sb, it.first);
}
sb << "\n";
return sb.as_cslice().str();
}
ActorStats::PefStat::PefStat() {
for (std::size_t i = 0; i < SIZE; i++) {
perf_stat_[i] = td::TimedStat<StatStorer<td::int64>>(DURATIONS[i], td::Time::now());
perf_stat_[i].add_event(0, td::Time::now());
}
}
void ActorStats::update(td::Timestamp now) {
auto stat = td::actor::ActorTypeStatManager::get_stats(estimate_inv_ticks_per_second());
for (auto &timed_stat : stat_) {
timed_stat.add_event(stat, now.at());
}
NamedPerfCounter::get_default().for_each([&](td::Slice name, td::int64 value) {
auto &stat = pef_stats_[name.str()].perf_stat_;
for (auto &timed_stat : stat) {
timed_stat.add_event(value, now.at());
}
});
}
constexpr int ActorStats::DURATIONS[SIZE];
constexpr const char *ActorStats::DESCR[SIZE];
} // namespace actor
} // namespace td

View file

@ -0,0 +1,52 @@
#pragma once
#include "td/actor/actor.h"
#include "td/utils/TimedStat.h"
namespace td {
namespace actor {
class ActorStats : public td::actor::Actor {
public:
ActorStats() {
}
void start_up() override;
double estimate_inv_ticks_per_second();
std::string prepare_stats();
private:
template <class T>
struct StatStorer {
void on_event(const T &event) {
if (!first_) {
first_ = event;
first_ts_ = Clocks::rdtsc();
}
}
double get_duration(double inv_ticks_per_second) const {
if (first_) {
return std::max(1.0, (Clocks::rdtsc() - first_ts_) * inv_ticks_per_second);
}
return 1.0;
}
td::optional<T> first_;
td::uint64 first_ts_;
};
static constexpr std::size_t SIZE = 2;
static constexpr const char *DESCR[SIZE] = {"10sec", "10min"};
static constexpr int DURATIONS[SIZE] = {10, 10 * 60};
td::TimedStat<StatStorer<td::actor::ActorTypeStats>> stat_[SIZE];
struct PefStat {
PefStat();
td::TimedStat<StatStorer<td::int64>> perf_stat_[SIZE];
};
std::map<std::string, PefStat> pef_stats_;
td::Timestamp begin_ts_;
td::uint64 begin_ticks_{};
void loop() override {
alarm_timestamp() = td::Timestamp::in(5.0);
update(td::Timestamp::now());
}
void update(td::Timestamp now);
};
} // namespace actor
} // namespace td

View file

@ -19,6 +19,7 @@
#pragma once
#include "td/actor/core/Actor.h"
#include "td/actor/core/ActorSignals.h"
#include "td/actor/core/ActorTypeStat.h"
#include "td/actor/core/SchedulerId.h"
#include "td/actor/core/SchedulerContext.h"
#include "td/actor/core/Scheduler.h"
@ -68,7 +69,7 @@ using core::set_debug;
struct Debug {
public:
Debug() = default;
Debug(std::shared_ptr<core::SchedulerGroupInfo> group_info) : group_info_(std::move(group_info)) {
Debug(core::SchedulerGroupInfo *group_info) : group_info_(group_info) {
}
template <class F>
void for_each(F &&f) {
@ -80,18 +81,29 @@ struct Debug {
}
}
void dump() {
for_each([](core::Debug &debug) {
void dump(td::StringBuilder &sb) {
sb << "list of active actors with names:\n";
for_each([&](core::Debug &debug) {
core::DebugInfo info;
debug.read(info);
if (info.is_active) {
LOG(ERROR) << info.name << " " << td::format::as_time(Time::now() - info.start_at);
sb << "\t\"" << info.name << "\" is active for " << Time::now() - info.start_at << "s\n";
}
});
sb << "\nsizes of cpu local queues:\n";
for (auto &scheduler : group_info_->schedulers) {
for (size_t i = 0; i < scheduler.cpu_threads_count; i++) {
auto size = scheduler.cpu_local_queue[i].size();
if (size != 0) {
sb << "\tcpu#" << i << " queue.size() = " << size << "\n";
}
}
}
sb << "\n";
}
private:
std::shared_ptr<core::SchedulerGroupInfo> group_info_;
core::SchedulerGroupInfo *group_info_;
};
class Scheduler {
@ -142,7 +154,7 @@ class Scheduler {
}
Debug get_debug() {
return Debug{group_info_};
return Debug{group_info_.get()};
}
bool run() {
@ -200,6 +212,10 @@ class Scheduler {
}
};
using core::ActorTypeStat;
using core::ActorTypeStatManager;
using core::ActorTypeStats;
// Some helper functions. Not part of public interface and not part
// of namespace core
namespace detail {
@ -324,7 +340,7 @@ void send_closure_impl(ActorRef actor_ref, ClosureT &&closure) {
}
template <class... ArgsT>
void send_closure(ActorRef actor_ref, ArgsT &&... args) {
void send_closure(ActorRef actor_ref, ArgsT &&...args) {
send_closure_impl(actor_ref, create_immediate_closure(std::forward<ArgsT>(args)...));
}
@ -365,7 +381,7 @@ void send_closure_with_promise_later(ActorRef actor_ref, ClosureT &&closure, Pro
}
template <class... ArgsT>
void send_closure_later(ActorRef actor_ref, ArgsT &&... args) {
void send_closure_later(ActorRef actor_ref, ArgsT &&...args) {
send_closure_later_impl(actor_ref, create_delayed_closure(std::forward<ArgsT>(args)...));
}
@ -396,15 +412,17 @@ inline void send_signals_later(ActorRef actor_ref, ActorSignals signals) {
inline void register_actor_info_ptr(core::ActorInfoPtr actor_info_ptr) {
auto state = actor_info_ptr->state().get_flags_unsafe();
actor_info_ptr->on_add_to_queue();
core::SchedulerContext::get()->add_to_queue(std::move(actor_info_ptr), state.get_scheduler_id(), !state.is_shared());
}
template <class T, class... ArgsT>
core::ActorInfoPtr create_actor(core::ActorOptions &options, ArgsT &&... args) noexcept {
core::ActorInfoPtr create_actor(core::ActorOptions &options, ArgsT &&...args) noexcept {
auto *scheduler_context = core::SchedulerContext::get();
if (!options.has_scheduler()) {
options.on_scheduler(scheduler_context->get_scheduler_id());
}
options.with_actor_stat_id(core::ActorTypeStatImpl::get_unique_id<T>());
auto res =
scheduler_context->get_actor_info_creator().create(std::make_unique<T>(std::forward<ArgsT>(args)...), options);
register_actor_info_ptr(res);

View file

@ -114,6 +114,8 @@ void ActorExecutor::start() noexcept {
actor_execute_context_.set_actor(&actor_info_.actor());
actor_stats_ = actor_info_.actor_type_stat();
auto execute_timer = actor_stats_.create_execute_timer();
while (flush_one_signal(signals)) {
if (actor_execute_context_.has_immediate_flags()) {
return;
@ -175,6 +177,11 @@ void ActorExecutor::finish() noexcept {
if (add_to_queue) {
actor_info_ptr = actor_info_.actor().get_actor_info_ptr();
}
if (!flags().is_closed() && flags().is_in_queue()) {
// Must do it while we are locked, so to it optimistically
// we will add actor to queue after unlock OR we are already in a queue OR we will be closed
actor_info_.on_add_to_queue();
}
if (actor_locker_.try_unlock(flags())) {
if (add_to_queue) {
dispatcher_.add_to_queue(std::move(actor_info_ptr), flags().get_scheduler_id(), !flags().is_shared());
@ -193,23 +200,31 @@ bool ActorExecutor::flush_one_signal(ActorSignals &signals) {
}
switch (signal) {
//NB: Signals will be handled in order of their value.
// For clarity it conincides with order in this switch
// For clarity, it coincides with order in this switch
case ActorSignals::Pause:
actor_execute_context_.set_pause();
break;
case ActorSignals::Kill:
case ActorSignals::Kill: {
auto message_timer = actor_stats_.create_message_timer();
actor_execute_context_.set_stop();
break;
case ActorSignals::StartUp:
}
case ActorSignals::StartUp: {
auto message_timer = actor_stats_.create_message_timer();
actor_stats_.created();
actor_info_.actor().start_up();
break;
case ActorSignals::Wakeup:
}
case ActorSignals::Wakeup: {
auto message_timer = actor_stats_.create_message_timer();
actor_info_.actor().wake_up();
break;
}
case ActorSignals::Alarm:
if (actor_execute_context_.get_alarm_timestamp() && actor_execute_context_.get_alarm_timestamp().is_in_past()) {
actor_execute_context_.alarm_timestamp() = Timestamp::never();
actor_info_.set_alarm_timestamp(Timestamp::never());
auto message_timer = actor_stats_.create_message_timer();
actor_info_.actor().alarm();
}
break;
@ -245,6 +260,7 @@ bool ActorExecutor::flush_one_message() {
}
actor_execute_context_.set_link_token(message.get_link_token());
auto message_timer = actor_stats_.create_message_timer();
message.run();
return true;
}
@ -257,7 +273,9 @@ void ActorExecutor::flush_context_flags() {
}
flags_.set_closed(true);
if (!flags_.get_signals().has_signal(ActorSignals::Signal::StartUp)) {
auto message_timer = actor_stats_.create_message_timer();
actor_info_.actor().tear_down();
actor_stats_.destroyed();
}
actor_info_.destroy_actor();
} else {

View file

@ -24,6 +24,7 @@
#include "td/actor/core/ActorMessage.h"
#include "td/actor/core/ActorSignals.h"
#include "td/actor/core/ActorState.h"
#include "td/actor/core/ActorTypeStat.h"
#include "td/actor/core/SchedulerContext.h"
#include "td/utils/format.h"
@ -95,6 +96,7 @@ class ActorExecutor {
ActorInfo &actor_info_;
SchedulerDispatcher &dispatcher_;
Options options_;
ActorTypeStatRef actor_stats_;
ActorLocker actor_locker_{&actor_info_.state(), ActorLocker::Options()
.with_can_execute_paused(options_.from_queue)
.with_is_shared(!options_.has_poll)

View file

@ -19,6 +19,7 @@
#pragma once
#include "td/actor/core/ActorState.h"
#include "td/actor/core/ActorTypeStat.h"
#include "td/actor/core/ActorMailbox.h"
#include "td/utils/Heap.h"
@ -34,8 +35,8 @@ class ActorInfo;
using ActorInfoPtr = SharedObjectPool<ActorInfo>::Ptr;
class ActorInfo : private HeapNode, private ListNode {
public:
ActorInfo(std::unique_ptr<Actor> actor, ActorState::Flags state_flags, Slice name)
: actor_(std::move(actor)), name_(name.begin(), name.size()) {
ActorInfo(std::unique_ptr<Actor> actor, ActorState::Flags state_flags, Slice name, td::uint32 actor_stat_id)
: actor_(std::move(actor)), name_(name.begin(), name.size()), actor_stat_id_(actor_stat_id) {
state_.set_flags_unsafe(state_flags);
VLOG(actor) << "Create actor [" << name_ << "]";
}
@ -58,6 +59,18 @@ class ActorInfo : private HeapNode, private ListNode {
Actor *actor_ptr() const {
return actor_.get();
}
// NB: must be called only when actor is locked
ActorTypeStatRef actor_type_stat() {
auto res = ActorTypeStatManager::get_actor_type_stat(actor_stat_id_, actor_.get());
if (in_queue_since_) {
res.pop_from_queue(in_queue_since_);
in_queue_since_ = 0;
}
return res;
}
void on_add_to_queue() {
in_queue_since_ = td::Clocks::rdtsc();
}
void destroy_actor() {
actor_.reset();
}
@ -103,6 +116,8 @@ class ActorInfo : private HeapNode, private ListNode {
std::atomic<double> alarm_timestamp_at_{0};
ActorInfoPtr pin_;
td::uint64 in_queue_since_{0};
td::uint32 actor_stat_id_{0};
};
} // namespace core

View file

@ -46,10 +46,16 @@ class ActorInfoCreator {
return *this;
}
Options& with_actor_stat_id(td::uint32 new_id) {
actor_stat_id = new_id;
return *this;
}
private:
friend class ActorInfoCreator;
Slice name;
SchedulerId scheduler_id;
td::uint32 actor_stat_id{0};
bool is_shared{true};
bool in_queue{true};
//TODO: rename
@ -65,7 +71,7 @@ class ActorInfoCreator {
flags.set_in_queue(args.in_queue);
flags.set_signals(ActorSignals::one(ActorSignals::StartUp));
auto actor_info_ptr = pool_.alloc(std::move(actor), flags, args.name);
auto actor_info_ptr = pool_.alloc(std::move(actor), flags, args.name, args.actor_stat_id);
actor_info_ptr->actor().set_actor_info_ptr(actor_info_ptr);
return actor_info_ptr;
}

View file

@ -0,0 +1,124 @@
#include "td/actor/core/Actor.h"
#include "td/actor/core/ActorTypeStat.h"
#include "td/actor/core/Scheduler.h"
#include "td/utils/port/thread_local.h"
#include <set>
#include <map>
#include <mutex>
#include <typeindex>
#include <typeinfo>
#include <optional>
#ifdef __has_include
#if __has_include(<cxxabi.h>)
#include <cxxabi.h>
#define CXXABI_AVAILABLE 1
#else
#define CXXABI_AVAILABLE 0
#endif
#else
#define CXXABI_AVAILABLE 0
#endif
namespace td {
namespace actor {
namespace core {
class ActorTypeStatRef;
struct ActorTypeStatsTlsEntry {
struct Entry {
std::unique_ptr<ActorTypeStatImpl> stat;
std::optional<std::type_index> o_type_index;
};
std::vector<Entry> by_id;
std::mutex mutex;
template <class F>
void foreach_entry(F &&f) {
std::lock_guard<std::mutex> guard(mutex);
for (auto &entry : by_id) {
f(entry);
}
}
ActorTypeStatRef get_actor_type_stat(td::uint32 id, Actor &actor) {
if (id >= by_id.size()) {
std::lock_guard<std::mutex> guard(mutex);
by_id.resize(id + 1);
}
auto &entry = by_id.at(id);
if (!entry.o_type_index) {
std::lock_guard<std::mutex> guard(mutex);
entry.o_type_index = std::type_index(typeid(actor));
entry.stat = std::make_unique<ActorTypeStatImpl>();
}
return ActorTypeStatRef{entry.stat.get()};
}
};
struct ActorTypeStatsRegistry {
std::mutex mutex;
std::vector<std::shared_ptr<ActorTypeStatsTlsEntry>> entries;
void registry_entry(std::shared_ptr<ActorTypeStatsTlsEntry> entry) {
std::lock_guard<std::mutex> guard(mutex);
entries.push_back(std::move(entry));
}
template <class F>
void foreach_entry(F &&f) {
std::lock_guard<std::mutex> guard(mutex);
for (auto &entry : entries) {
f(*entry);
}
}
};
ActorTypeStatsRegistry registry;
struct ActorTypeStatsTlsEntryRef {
ActorTypeStatsTlsEntryRef() {
entry_ = std::make_shared<ActorTypeStatsTlsEntry>();
registry.registry_entry(entry_);
}
std::shared_ptr<ActorTypeStatsTlsEntry> entry_;
};
static TD_THREAD_LOCAL ActorTypeStatsTlsEntryRef *actor_type_stats_tls_entry = nullptr;
ActorTypeStatRef ActorTypeStatManager::get_actor_type_stat(td::uint32 id, Actor *actor) {
if (!actor || !need_debug()) {
return ActorTypeStatRef{nullptr};
}
td::init_thread_local<ActorTypeStatsTlsEntryRef>(actor_type_stats_tls_entry);
ActorTypeStatsTlsEntry &tls_entry = *actor_type_stats_tls_entry->entry_;
return tls_entry.get_actor_type_stat(id, *actor);
}
std::string ActorTypeStatManager::get_class_name(const char *name) {
#if CXXABI_AVAILABLE
int status;
char *real_name = abi::__cxa_demangle(name, nullptr, nullptr, &status);
if (status < 0) {
return name;
}
std::string result = real_name;
std::free(real_name);
return result;
#else
return name;
#endif
}
ActorTypeStats ActorTypeStatManager::get_stats(double inv_ticks_per_second) {
std::map<std::type_index, ActorTypeStat> stats;
registry.foreach_entry([&](ActorTypeStatsTlsEntry &tls_entry) {
tls_entry.foreach_entry([&](ActorTypeStatsTlsEntry::Entry &entry) {
if (entry.o_type_index) {
stats[entry.o_type_index.value()] += entry.stat->to_stat(inv_ticks_per_second);
}
});
});
return ActorTypeStats{.stats = std::move(stats)};
}
} // namespace core
} // namespace actor
} // namespace td

View file

@ -0,0 +1,395 @@
#pragma once
#include "td/utils/int_types.h"
#include "td/utils/port/Clocks.h"
#include <algorithm>
#include <typeindex>
#include <map>
namespace td {
namespace actor {
namespace core {
class Actor;
struct ActorTypeStat {
// diff (speed)
double created{0};
double executions{0};
double messages{0};
double seconds{0};
// current statistics
td::int64 alive{0};
td::int32 executing{0};
double executing_start{1e20};
// max statistics (TODO: recent_max)
template <class T>
struct MaxStatGroup {
T value_forever{};
T value_10s{};
T value_10m{};
MaxStatGroup &operator+=(const MaxStatGroup<T> &other) {
value_forever = std::max(value_forever, other.value_forever);
value_10s = std::max(value_10s, other.value_10s);
value_10m = std::max(value_10m, other.value_10m);
return *this;
}
};
MaxStatGroup<td::uint32> max_execute_messages;
MaxStatGroup<double> max_message_seconds;
MaxStatGroup<double> max_execute_seconds;
MaxStatGroup<double> max_delay_seconds;
ActorTypeStat &operator+=(const ActorTypeStat &other) {
created += other.created;
executions += other.executions;
messages += other.messages;
seconds += other.seconds;
alive += other.alive;
executing += other.executing;
executing_start = std::min(other.executing_start, executing_start);
max_execute_messages += other.max_execute_messages;
max_message_seconds += other.max_message_seconds;
max_execute_seconds += other.max_execute_seconds;
max_delay_seconds += other.max_delay_seconds;
return *this;
}
ActorTypeStat &operator-=(const ActorTypeStat &other) {
created -= other.created;
executions -= other.executions;
messages -= other.messages;
seconds -= other.seconds;
return *this;
}
ActorTypeStat &operator/=(double t) {
if (t > 1e-2) {
created /= t;
executions /= t;
messages /= t;
seconds /= t;
} else {
created = 0;
executions = 0;
messages = 0;
seconds = 0;
}
return *this;
}
};
struct ActorTypeStatImpl {
public:
ActorTypeStatImpl() {
}
class MessageTimer {
public:
MessageTimer(ActorTypeStatImpl *stat, td::uint64 started_at = Clocks::rdtsc())
: stat_(stat), started_at_(started_at) {
}
MessageTimer(const MessageTimer &) = delete;
MessageTimer(MessageTimer &&) = delete;
MessageTimer &operator=(const MessageTimer &) = delete;
MessageTimer &operator=(MessageTimer &&) = delete;
~MessageTimer() {
if (stat_) {
auto ts = td::Clocks::rdtsc();
stat_->message_finish(ts, ts - started_at_);
}
}
private:
ActorTypeStatImpl *stat_;
td::uint64 started_at_;
};
void created() {
inc(total_created_);
inc(alive_);
}
void destroyed() {
dec(alive_);
}
MessageTimer create_run_timer() {
return MessageTimer{this};
}
void message_finish(td::uint64 ts, td::uint64 ticks) {
inc(total_messages_);
inc(execute_messages_);
add(total_ticks_, ticks);
max_message_ticks_.update(ts, ticks);
}
void on_delay(td::uint64 ts, td::uint64 ticks) {
max_delay_ticks_.update(ts, ticks);
}
void execute_start(td::uint64 ts) {
// TODO: this is mostly protection for recursive actor calls, which curretly should be almost impossible
// But too full handle it, one would use one executing_cnt per thread, so only upper level execution is counted
if (inc(executing_) == 1) {
store(execute_start_, ts);
store(execute_messages_, 0);
}
}
void execute_finish(td::uint64 ts) {
CHECK(executing_ > 0);
if (dec(executing_) == 0) {
max_execute_messages_.update(ts, load(execute_messages_));
max_execute_ticks_.update(ts, ts - load(execute_start_));
inc(total_executions_);
store(execute_start_, 0);
store(execute_messages_, 0);
}
}
template <class T>
static td::uint32 get_unique_id() {
static td::uint32 value = get_next_unique_id();
return value;
}
static td::uint32 get_next_unique_id() {
static std::atomic<td::uint32> next_id_{};
return ++next_id_;
}
ActorTypeStat to_stat(double inv_ticks_per_second) const {
auto execute_start_copy = load(execute_start_);
auto actual_total_ticks = load(total_ticks_);
auto ts = Clocks::rdtsc();
if (execute_start_copy != 0) {
actual_total_ticks += ts - execute_start_copy;
}
auto execute_start = ticks_to_seconds(load(execute_start_), inv_ticks_per_second);
return ActorTypeStat{.created = double(load(total_created_)),
.executions = double(load(total_executions_)),
.messages = double(load(total_messages_)),
.seconds = ticks_to_seconds(actual_total_ticks, inv_ticks_per_second),
.alive = load(alive_),
.executing = load(executing_),
.executing_start = execute_start < 1e-9 ? 1e20 : execute_start,
.max_execute_messages = load(max_execute_messages_),
.max_message_seconds = load_seconds(max_message_ticks_, inv_ticks_per_second),
.max_execute_seconds = load_seconds(max_execute_ticks_, inv_ticks_per_second),
.max_delay_seconds = load_seconds(max_delay_ticks_, inv_ticks_per_second)};
}
private:
static double ticks_to_seconds(td::uint64 ticks, double inv_tick_per_second) {
return double(ticks) * inv_tick_per_second;
}
template <class T>
static T load(const std::atomic<T> &a) {
return a.load(std::memory_order_relaxed);
}
template <class T, class S>
static void store(std::atomic<T> &a, S value) {
a.store(value, std::memory_order_relaxed);
}
template <class T, class S>
static T add(std::atomic<T> &a, S value) {
T new_value = load(a) + value;
store(a, new_value);
return new_value;
}
template <class T>
static T inc(std::atomic<T> &a) {
return add(a, 1);
}
template <class T>
static T dec(std::atomic<T> &a) {
return add(a, -1);
}
template <class T>
static void relax_max(std::atomic<T> &a, T value) {
auto old_value = load(a);
if (value > old_value) {
store(a, value);
}
}
template <class ValueT, int Interval>
class MaxCounter {
alignas(64) std::atomic<ValueT> max_values[2] = {0};
std::atomic<td::uint64> last_update_segment_time = 0;
void update_current_segment(uint64 current_segment_time, uint64 segment_difference) {
if (segment_difference >= 2) {
store(max_values[0], 0);
store(max_values[1], 0);
} else if (segment_difference == 1) {
store(max_values[1 - (current_segment_time & 1)], 0);
}
store(last_update_segment_time, current_segment_time);
}
public:
inline void update(td::uint64 rdtsc, ValueT value) {
auto current_segment_time = rdtsc / (Clocks::rdtsc_frequency() * Interval);
auto segment_difference = current_segment_time - last_update_segment_time;
if (unlikely(segment_difference != 0)) {
update_current_segment(current_segment_time, segment_difference);
}
relax_max(max_values[current_segment_time & 1], value);
}
inline ValueT get_max(uint64_t rdtsc) const {
uint64_t current_segment_time = rdtsc / (Clocks::rdtsc_frequency() * Interval);
uint64_t segment_difference = current_segment_time - load(last_update_segment_time);
if (segment_difference >= 2) {
return 0;
} else if (segment_difference == 1) {
return load(max_values[current_segment_time & 1]);
} else {
return std::max(load(max_values[0]), load(max_values[1]));
}
}
};
template <class T>
struct MaxCounterGroup {
std::atomic<T> max_forever{};
MaxCounter<T, 60 * 10> max_10m;
MaxCounter<T, 10> max_10s;
inline void update(td::uint64 rdtsc, T value) {
relax_max(max_forever, value);
max_10m.update(rdtsc, value);
max_10s.update(rdtsc, value);
}
};
template <class T>
static ActorTypeStat::MaxStatGroup<T> load(const MaxCounterGroup<T> &src) {
auto ts = Clocks::rdtsc();
return {.value_forever = load(src.max_forever),
.value_10s = src.max_10s.get_max(ts),
.value_10m = src.max_10m.get_max(ts)};
}
template <class T>
static ActorTypeStat::MaxStatGroup<double> load_seconds(const MaxCounterGroup<T> &src, double inv_ticks_per_second) {
auto ts = Clocks::rdtsc();
return {.value_forever = ticks_to_seconds(load(src.max_forever), inv_ticks_per_second),
.value_10s = ticks_to_seconds(src.max_10s.get_max(ts), inv_ticks_per_second),
.value_10m = ticks_to_seconds(src.max_10m.get_max(ts), inv_ticks_per_second)};
}
// total (increment only statistics)
std::atomic<td::int64> total_created_{0};
std::atomic<td::uint64> total_executions_{0};
std::atomic<td::uint64> total_messages_{0};
std::atomic<td::uint64> total_ticks_{0};
// current statistics
std::atomic<td::int64> alive_{0};
std::atomic<td::int32> executing_{0};
// max statistics (TODO: recent_max)
MaxCounterGroup<td::uint32> max_execute_messages_;
MaxCounterGroup<td::uint64> max_message_ticks_;
MaxCounterGroup<td::uint64> max_execute_ticks_;
MaxCounterGroup<td::uint64> max_delay_ticks_;
// execute state
std::atomic<td::uint64> execute_start_{0};
std::atomic<td::uint32> execute_messages_{0};
};
class ActorTypeStatRef {
public:
ActorTypeStatImpl *ref_{nullptr};
void created() {
if (!ref_) {
return;
}
ref_->created();
}
void destroyed() {
if (!ref_) {
return;
}
ref_->destroyed();
}
void pop_from_queue(td::uint64 in_queue_since) {
if (!ref_) {
return;
}
CHECK(in_queue_since);
auto ts = td::Clocks::rdtsc();
ref_->on_delay(ts, ts - in_queue_since);
}
void start_execute() {
if (!ref_) {
return;
}
ref_->execute_start(td::Clocks::rdtsc());
}
void finish_execute() {
if (!ref_) {
return;
}
ref_->execute_finish(td::Clocks::rdtsc());
}
ActorTypeStatImpl::MessageTimer create_message_timer() {
if (!ref_) {
return ActorTypeStatImpl::MessageTimer{nullptr, 0};
}
return ActorTypeStatImpl::MessageTimer{ref_};
}
struct ExecuteTimer {
ExecuteTimer() = delete;
ExecuteTimer(const ExecuteTimer &) = delete;
ExecuteTimer(ExecuteTimer &&) = delete;
ExecuteTimer &operator=(const ExecuteTimer &) = delete;
ExecuteTimer &operator=(ExecuteTimer &&) = delete;
ExecuteTimer(ActorTypeStatRef *stat) : stat(stat) {
stat->start_execute();
}
ActorTypeStatRef *stat{};
~ExecuteTimer() {
stat->finish_execute();
}
};
ExecuteTimer create_execute_timer() {
return ExecuteTimer(this);
}
};
// TODO: currently it is implemented via TD_THREAD_LOCAL, so the statistics is global across different schedulers
struct ActorTypeStats {
std::map<std::type_index, ActorTypeStat> stats;
ActorTypeStats &operator-=(const ActorTypeStats &other) {
for (auto &it : other.stats) {
stats.at(it.first) -= it.second;
}
return *this;
}
ActorTypeStats &operator/=(double x) {
for (auto &it : stats) {
it.second /= x;
}
return *this;
}
};
class ActorTypeStatManager {
public:
static ActorTypeStatRef get_actor_type_stat(td::uint32 id, Actor *actor);
static ActorTypeStats get_stats(double inv_ticks_per_second);
static std::string get_class_name(const char *name);
};
} // namespace core
} // namespace actor
} // namespace td

View file

@ -130,27 +130,20 @@ struct LocalQueue {
public:
template <class F>
bool push(T value, F &&overflow_f) {
auto res = std::move(next_);
next_ = std::move(value);
if (res) {
queue_.local_push(res.unwrap(), overflow_f);
return true;
}
return false;
}
bool try_pop(T &message) {
if (!next_) {
return queue_.local_pop(message);
}
message = next_.unwrap();
queue_.local_push(std::move(value), overflow_f);
return true;
}
bool steal(T &message, LocalQueue<T> &other) {
bool try_pop(T &message) {
return queue_.local_pop(message);
}
bool steal(T &message, LocalQueue &other) {
return queue_.steal(message, other.queue_);
}
size_t size() const {
return queue_.size();
}
private:
td::optional<T> next_;
StealingQueue<T> queue_;
char pad[TD_CONCURRENCY_PAD - sizeof(optional<T>)];
};
@ -267,11 +260,12 @@ class Scheduler {
bool is_stop_requested() override;
void stop() override;
private:
SchedulerGroupInfo *scheduler_group() const {
SchedulerGroupInfo *scheduler_group() const override {
return scheduler_group_;
}
private:
ActorInfoCreator *creator_;
SchedulerId scheduler_id_;
CpuWorkerId cpu_worker_id_;

View file

@ -38,6 +38,7 @@ class SchedulerDispatcher {
};
struct Debug;
struct SchedulerGroupInfo;
class SchedulerContext : public Context<SchedulerContext>, public SchedulerDispatcher {
public:
virtual ~SchedulerContext() = default;
@ -59,6 +60,7 @@ class SchedulerContext : public Context<SchedulerContext>, public SchedulerDispa
// Debug
virtual Debug &get_debug() = 0;
virtual SchedulerGroupInfo *scheduler_group() const = 0;
};
} // namespace core
} // namespace actor

View file

@ -19,15 +19,20 @@
#include "td/actor/core/ActorLocker.h"
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
#include "td/actor/ActorStats.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/thread.h"
#include "td/utils/port/thread.h"
#include "td/utils/Random.h"
#include "td/utils/Slice.h"
#include "td/utils/StringBuilder.h"
#include "td/utils/tests.h"
#include "td/utils/Time.h"
#include "td/utils/TimedStat.h"
#include "td/utils/port/sleep.h"
#include <array>
#include <atomic>
@ -1102,4 +1107,59 @@ TEST(Actor2, send_vs_close2) {
scheduler.run();
}
}
TEST(Actor2, test_stats) {
Scheduler scheduler({8});
td::actor::set_debug(true);
auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
scheduler.run_in_context([watcher = std::move(watcher)] {
class SleepWorker : public Actor {
void loop() override {
// 0.8 load
td::usleep_for(800000);
alarm_timestamp() = td::Timestamp::in(0.2);
}
};
class QueueWorker : public Actor {
void loop() override {
for (int i = 0; i < 20; i++) {
send_closure(actor_id(this), &QueueWorker::ping);
}
alarm_timestamp() = td::Timestamp::in(1.0);
}
void ping() {
}
};
class Master : public Actor {
public:
Master(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
}
void start_up() override {
alarm_timestamp() = td::Timestamp::in(1);
stats_ = td::actor::create_actor<ActorStats>("actor_stats");
td::actor::create_actor<SleepWorker>("sleep_worker").release();
td::actor::create_actor<QueueWorker>("queue_worker").release();
}
void alarm() override {
td::actor::send_closure(stats_, &ActorStats::prepare_stats, td::promise_send_closure(actor_id(this), &Master::on_stats));
alarm_timestamp() = td::Timestamp::in(5);
}
void on_stats(td::Result<std::string> r_stats) {
LOG(ERROR) << "\n" << r_stats.ok();
if (--cnt_ == 0) {
stop();
}
}
private:
std::shared_ptr<td::Destructor> watcher_;
td::actor::ActorOwn<ActorStats> stats_;
int cnt_={2};
};
td::actor::create_actor<Master>("Master", watcher).release();
});
scheduler.run();
}
#endif //!TD_THREAD_UNSUPPORTED