1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

getactorstats query for validator-engine-console

This commit is contained in:
birydrad 2024-09-03 11:24:59 +02:00
parent e55c132178
commit 420029b056
39 changed files with 1223 additions and 51 deletions

View file

@ -27,6 +27,8 @@
#include "catchain-receiver.hpp"
#include "td/utils/ThreadSafeCounter.h"
namespace ton {
namespace catchain {
@ -685,6 +687,7 @@ void CatChainReceiverImpl::receive_query_from_overlay(adnl::AdnlNodeIdShort src,
promise.set_error(td::Status::Error(ErrorCode::notready, "db not read"));
return;
}
TD_PERF_COUNTER(catchain_query_process);
td::PerfWarningTimer t{"catchain query process", 0.05};
auto F = fetch_tl_object<ton_api::Function>(data.clone(), true);
if (F.is_error()) {

View file

@ -2860,6 +2860,8 @@ td::Status Transaction::check_state_limits(const SizeLimitsConfig& size_limits,
vm::CellStorageStat storage_stat;
storage_stat.limit_cells = size_limits.max_acc_state_cells;
storage_stat.limit_bits = size_limits.max_acc_state_bits;
{
TD_PERF_COUNTER(transaction_storage_stat_a);
td::Timer timer;
auto add_used_storage = [&](const td::Ref<vm::Cell>& cell) -> td::Status {
if (cell.not_null()) {
@ -2876,6 +2878,8 @@ td::Status Transaction::check_state_limits(const SizeLimitsConfig& size_limits,
if (timer.elapsed() > 0.1) {
LOG(INFO) << "Compute used storage took " << timer.elapsed() << "s";
}
}
if (acc_status == Account::acc_active) {
storage_stat.clear_limit();
} else {
@ -3156,6 +3160,7 @@ bool Transaction::compute_state() {
if (new_stats) {
stats = new_stats.unwrap();
} else {
TD_PERF_COUNTER(transaction_storage_stat_b);
td::Timer timer;
stats.add_used_storage(Ref<vm::Cell>(storage)).ensure();
if (timer.elapsed() > 0.1) {

View file

@ -33,6 +33,7 @@ class RefcntCellStorer {
template <class StorerT>
void store(StorerT &storer) const {
TD_PERF_COUNTER(cell_store);
using td::store;
if (as_boc_) {
td::int32 tag = -1;
@ -151,6 +152,7 @@ CellLoader::CellLoader(std::shared_ptr<KeyValueReader> reader, std::function<voi
td::Result<CellLoader::LoadResult> CellLoader::load(td::Slice hash, bool need_data, ExtCellCreator &ext_cell_creator) {
//LOG(ERROR) << "Storage: load cell " << hash.size() << " " << td::base64_encode(hash);
TD_PERF_COUNTER(cell_load);
LoadResult res;
std::string serialized;
TRY_RESULT(get_status, reader_->get(hash, serialized));

View file

@ -65,4 +65,4 @@ if (USE_EMSCRIPTEN)
target_compile_options(emulator-emscripten PRIVATE -fexceptions)
endif()
install(TARGETS emulator LIBRARY DESTINATION lib)
install(TARGETS emulator ARCHIVE DESTINATION lib LIBRARY DESTINATION lib)

View file

@ -3,16 +3,19 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
#SOURCE SETS
set(TDACTOR_SOURCE
td/actor/core/ActorExecutor.cpp
td/actor/core/ActorTypeStat.cpp
td/actor/core/CpuWorker.cpp
td/actor/core/IoWorker.cpp
td/actor/core/Scheduler.cpp
td/actor/ActorStats.cpp
td/actor/MultiPromise.cpp
td/actor/actor.h
td/actor/ActorId.h
td/actor/ActorOwn.h
td/actor/ActorShared.h
td/actor/ActorStats.h
td/actor/common.h
td/actor/PromiseFuture.h
td/actor/MultiPromise.h
@ -27,6 +30,7 @@ set(TDACTOR_SOURCE
td/actor/core/ActorMessage.h
td/actor/core/ActorSignals.h
td/actor/core/ActorState.h
td/actor/core/ActorTypeStat.h
td/actor/core/CpuWorker.h
td/actor/core/Context.h
td/actor/core/IoWorker.h

View file

@ -0,0 +1,245 @@
#include "ActorStats.h"
#include "td/utils/ThreadSafeCounter.h"
namespace td {
namespace actor {
void td::actor::ActorStats::start_up() {
auto now = td::Time::now();
for (std::size_t i = 0; i < SIZE; i++) {
stat_[i] = td::TimedStat<StatStorer<ActorTypeStats>>(DURATIONS[i], now);
stat_[i].add_event(ActorTypeStats(), now);
}
begin_ts_ = td::Timestamp::now();
begin_ticks_ = Clocks::rdtsc();
loop();
}
double ActorStats::estimate_inv_ticks_per_second() {
auto now = td::Timestamp::now();
auto elapsed_seconds = now.at() - begin_ts_.at();
auto now_ticks = td::Clocks::rdtsc();
auto elapsed_ticks = now_ticks - begin_ticks_;
auto estimated_inv_ticks_per_second =
elapsed_seconds > 0.1 ? elapsed_seconds / double(elapsed_ticks) : Clocks::inv_ticks_per_second();
return estimated_inv_ticks_per_second;
}
std::string ActorStats::prepare_stats() {
auto estimated_inv_ticks_per_second = estimate_inv_ticks_per_second();
auto current_stats = td::actor::ActorTypeStatManager::get_stats(estimated_inv_ticks_per_second);
auto now = td::Timestamp::now();
auto now_ticks = Clocks::rdtsc();
update(now);
// Lets look at recent stats first
auto load_stats = [&](auto &timed_stat) {
auto res = current_stats;
auto &since = timed_stat.get_stat(now.at());
auto duration = since.get_duration(estimated_inv_ticks_per_second);
if (since.first_) {
res -= since.first_.value();
}
res /= duration;
return res.stats;
};
auto stats_10s = load_stats(stat_[0]);
auto stats_10m = load_stats(stat_[1]);
current_stats /= double(now_ticks - begin_ticks_) * estimated_inv_ticks_per_second;
auto stats_forever = current_stats.stats;
std::map<std::string, double> current_perf_map;
std::map<std::string, double> perf_map_10s;
std::map<std::string, double> perf_map_10m;
std::map<std::string, double> perf_values;
td::NamedPerfCounter::get_default().for_each(
[&](td::Slice name, td::int64 value_int64) { perf_values[name.str()] = double(value_int64); });
for (auto &value_it : perf_values) {
const auto &name = value_it.first;
auto value = value_it.second;
auto &perf_stat = pef_stats_[name];
auto load_perf_stats = [&](auto &timed_stat, auto &m) {
double res = double(value);
auto &since = timed_stat.get_stat(now.at());
auto duration = since.get_duration(estimated_inv_ticks_per_second);
if (since.first_) {
res -= since.first_.value();
}
if (td::ends_with(name, ".duration")) {
res *= estimated_inv_ticks_per_second;
}
// m[name + ".raw"] = res;
// m[name + ".range"] = duration;
res /= duration;
return res;
};
perf_map_10s[name] = load_perf_stats(perf_stat.perf_stat_[0], perf_map_10s);
perf_map_10m[name] = load_perf_stats(perf_stat.perf_stat_[1], perf_map_10m);
auto current_duration = (double(now_ticks - begin_ticks_) * estimated_inv_ticks_per_second);
if (td::ends_with(name, ".duration")) {
value *= estimated_inv_ticks_per_second;
}
current_perf_map[name] = double(value) / current_duration;
// current_perf_map[name + ".raw"] = double(value);
// current_perf_map[name + ".range"] = double(now_ticks - begin_ticks_) * estimated_inv_ticks_per_second;
};
td::StringBuilder sb;
sb << "================================= PERF COUNTERS ================================\n";
sb << "ticks_per_second_estimate\t" << 1.0 / estimated_inv_ticks_per_second << "\n";
for (auto &it : perf_map_10s) {
const std::string &name = it.first;
auto dot_at = name.rfind('.');
CHECK(dot_at != std::string::npos);
auto base_name = name.substr(0, dot_at);
auto rest_name = name.substr(dot_at + 1);
td::Slice new_rest_name = rest_name;
if (rest_name == "count") {
new_rest_name = "qps";
}
if (rest_name == "duration") {
new_rest_name = "load";
}
auto rewrite_name = PSTRING() << base_name << "." << new_rest_name;
sb << rewrite_name << "\t" << perf_map_10s[name] << " " << perf_map_10m[name] << " " << current_perf_map[name]
<< "\n";
}
sb << "\n";
sb << "================================= ACTORS STATS =================================\n";
double max_delay = 0;
ActorTypeStat sum_stat_forever;
ActorTypeStat sum_stat_10m;
ActorTypeStat sum_stat_10s;
for (auto &it : stats_forever) {
sum_stat_forever += it.second;
}
for (auto &it : stats_10m) {
sum_stat_10m += it.second;
}
for (auto &it : stats_10s) {
sum_stat_10s += it.second;
}
sb << "\n";
auto do_describe = [&](auto &&sb, const ActorTypeStat &stat_10s, const ActorTypeStat &stat_10m,
const ActorTypeStat &stat_forever) {
sb() << "load_per_second:\t" << stat_10s.seconds << " " << stat_10m.seconds << " " << stat_forever.seconds << "\n";
sb() << "messages_per_second:\t" << stat_10s.messages << " " << stat_10m.messages << " " << stat_forever.messages
<< "\n";
sb() << "max_execute_messages:\t" << stat_forever.max_execute_messages.value_10s << " "
<< stat_forever.max_execute_messages.value_10m << " " << stat_forever.max_execute_messages.value_forever
<< "\n";
sb() << "max_execute_seconds:\t" << stat_forever.max_execute_seconds.value_10s << "s"
<< " " << stat_forever.max_execute_seconds.value_10m << "s"
<< " " << stat_forever.max_execute_seconds.value_forever << "s\n";
sb() << "max_message_seconds:\t" << stat_forever.max_message_seconds.value_10s << " "
<< stat_forever.max_message_seconds.value_10m << " " << stat_forever.max_message_seconds.value_forever << "\n";
sb() << "created_per_second:\t" << stat_10s.created << " " << stat_10m.created << " " << stat_forever.created
<< "\n";
auto executing_for =
stat_forever.executing_start > 1e15
? 0
: double(td::Clocks::rdtsc()) * estimated_inv_ticks_per_second - stat_forever.executing_start;
sb() << "max_delay:\t" << stat_forever.max_delay_seconds.value_10s << "s "
<< stat_forever.max_delay_seconds.value_10m << "s " << stat_forever.max_delay_seconds.value_forever << "s\n";
sb() << ""
<< "alive: " << stat_forever.alive << " executing: " << stat_forever.executing
<< " max_executing_for: " << executing_for << "s\n";
};
auto describe = [&](td::StringBuilder &sb, std::type_index actor_type_index) {
auto stat_10s = stats_10s[actor_type_index];
auto stat_10m = stats_10m[actor_type_index];
auto stat_forever = stats_forever[actor_type_index];
do_describe([&sb]() -> td::StringBuilder & { return sb << "\t\t"; }, stat_10s, stat_10m, stat_forever);
};
sb << "Cummulative stats:\n";
do_describe([&sb]() -> td::StringBuilder & { return sb << "\t"; }, sum_stat_10s, sum_stat_10m, sum_stat_forever);
sb << "\n";
auto top_k_by = [&](auto &stats_map, size_t k, std::string description, auto by) {
auto stats = td::transform(stats_map, [](auto &it) { return std::make_pair(it.first, it.second); });
k = std::min(k, stats.size());
std::partial_sort(stats.begin(), stats.begin() + k, stats.end(), [&](auto &a, auto &b) { return by(a) > by(b); });
bool is_first = true;
for (size_t i = 0; i < k; i++) {
auto value = by(stats[i]);
if (value < 1e-9) {
break;
}
if (is_first) {
sb << "top actors by " << description << "\n";
is_first = false;
}
sb << "\t#" << i << ": " << ActorTypeStatManager::get_class_name(stats[i].first.name()) << "\t" << value << "\n";
}
sb << "\n";
};
using Entry = std::pair<std::type_index, td::actor::ActorTypeStat>;
static auto cutoff = [](auto x, auto min_value) { return x < min_value ? decltype(x){} : x; };
top_k_by(stats_10s, 10, "load_10s", [](auto &x) { return cutoff(x.second.seconds, 0.005); });
top_k_by(stats_10m, 10, "load_10m", [](auto &x) { return cutoff(x.second.seconds, 0.005); });
top_k_by(stats_forever, 10, "max_execute_seconds_10m",
[](Entry &x) { return cutoff(x.second.max_execute_seconds.value_10m, 0.5); });
auto rdtsc_seconds = double(now_ticks) * estimated_inv_ticks_per_second;
top_k_by(stats_forever, 10, "executing_for", [&](Entry &x) {
if (x.second.executing_start > 1e15) {
return 0.0;
}
return rdtsc_seconds - x.second.executing_start;
});
top_k_by(stats_forever, 10, "max_execute_messages_10m",
[](Entry &x) { return cutoff(x.second.max_execute_messages.value_10m, 10u); });
auto stats = td::transform(stats_forever, [](auto &it) { return std::make_pair(it.first, it.second); });
auto main_key = [&](std::type_index actor_type_index) {
auto stat_10s = stats_10s[actor_type_index];
auto stat_10m = stats_10m[actor_type_index];
auto stat_forever = stats_forever[actor_type_index];
return std::make_tuple(cutoff(std::max(stat_10s.seconds, stat_10m.seconds), 0.1),
cutoff(stat_forever.max_execute_seconds.value_10m, 0.5), stat_forever.seconds);
};
std::sort(stats.begin(), stats.end(),
[&](auto &left, auto &right) { return main_key(left.first) > main_key(right.first); });
auto debug = Debug(SchedulerContext::get()->scheduler_group());
debug.dump(sb);
sb << "All actors:\n";
for (auto &it : stats) {
sb << "\t" << ActorTypeStatManager::get_class_name(it.first.name()) << "\n";
auto key = main_key(it.first);
describe(sb, it.first);
}
sb << "\n";
return sb.as_cslice().str();
}
ActorStats::PefStat::PefStat() {
for (std::size_t i = 0; i < SIZE; i++) {
perf_stat_[i] = td::TimedStat<StatStorer<td::int64>>(DURATIONS[i], td::Time::now());
perf_stat_[i].add_event(0, td::Time::now());
}
}
void ActorStats::update(td::Timestamp now) {
auto stat = td::actor::ActorTypeStatManager::get_stats(estimate_inv_ticks_per_second());
for (auto &timed_stat : stat_) {
timed_stat.add_event(stat, now.at());
}
NamedPerfCounter::get_default().for_each([&](td::Slice name, td::int64 value) {
auto &stat = pef_stats_[name.str()].perf_stat_;
for (auto &timed_stat : stat) {
timed_stat.add_event(value, now.at());
}
});
}
constexpr int ActorStats::DURATIONS[SIZE];
constexpr const char *ActorStats::DESCR[SIZE];
} // namespace actor
} // namespace td

View file

@ -0,0 +1,52 @@
#pragma once
#include "td/actor/actor.h"
#include "td/utils/TimedStat.h"
namespace td {
namespace actor {
class ActorStats : public td::actor::Actor {
public:
ActorStats() {
}
void start_up() override;
double estimate_inv_ticks_per_second();
std::string prepare_stats();
private:
template <class T>
struct StatStorer {
void on_event(const T &event) {
if (!first_) {
first_ = event;
first_ts_ = Clocks::rdtsc();
}
}
double get_duration(double inv_ticks_per_second) const {
if (first_) {
return std::max(1.0, (Clocks::rdtsc() - first_ts_) * inv_ticks_per_second);
}
return 1.0;
}
td::optional<T> first_;
td::uint64 first_ts_;
};
static constexpr std::size_t SIZE = 2;
static constexpr const char *DESCR[SIZE] = {"10sec", "10min"};
static constexpr int DURATIONS[SIZE] = {10, 10 * 60};
td::TimedStat<StatStorer<td::actor::ActorTypeStats>> stat_[SIZE];
struct PefStat {
PefStat();
td::TimedStat<StatStorer<td::int64>> perf_stat_[SIZE];
};
std::map<std::string, PefStat> pef_stats_;
td::Timestamp begin_ts_;
td::uint64 begin_ticks_{};
void loop() override {
alarm_timestamp() = td::Timestamp::in(5.0);
update(td::Timestamp::now());
}
void update(td::Timestamp now);
};
} // namespace actor
} // namespace td

View file

@ -19,6 +19,7 @@
#pragma once
#include "td/actor/core/Actor.h"
#include "td/actor/core/ActorSignals.h"
#include "td/actor/core/ActorTypeStat.h"
#include "td/actor/core/SchedulerId.h"
#include "td/actor/core/SchedulerContext.h"
#include "td/actor/core/Scheduler.h"
@ -68,7 +69,7 @@ using core::set_debug;
struct Debug {
public:
Debug() = default;
Debug(std::shared_ptr<core::SchedulerGroupInfo> group_info) : group_info_(std::move(group_info)) {
Debug(core::SchedulerGroupInfo *group_info) : group_info_(group_info) {
}
template <class F>
void for_each(F &&f) {
@ -80,18 +81,29 @@ struct Debug {
}
}
void dump() {
for_each([](core::Debug &debug) {
void dump(td::StringBuilder &sb) {
sb << "list of active actors with names:\n";
for_each([&](core::Debug &debug) {
core::DebugInfo info;
debug.read(info);
if (info.is_active) {
LOG(ERROR) << info.name << " " << td::format::as_time(Time::now() - info.start_at);
sb << "\t\"" << info.name << "\" is active for " << Time::now() - info.start_at << "s\n";
}
});
sb << "\nsizes of cpu local queues:\n";
for (auto &scheduler : group_info_->schedulers) {
for (size_t i = 0; i < scheduler.cpu_threads_count; i++) {
auto size = scheduler.cpu_local_queue[i].size();
if (size != 0) {
sb << "\tcpu#" << i << " queue.size() = " << size << "\n";
}
}
}
sb << "\n";
}
private:
std::shared_ptr<core::SchedulerGroupInfo> group_info_;
core::SchedulerGroupInfo *group_info_;
};
class Scheduler {
@ -142,7 +154,7 @@ class Scheduler {
}
Debug get_debug() {
return Debug{group_info_};
return Debug{group_info_.get()};
}
bool run() {
@ -200,6 +212,10 @@ class Scheduler {
}
};
using core::ActorTypeStat;
using core::ActorTypeStatManager;
using core::ActorTypeStats;
// Some helper functions. Not part of public interface and not part
// of namespace core
namespace detail {
@ -324,7 +340,7 @@ void send_closure_impl(ActorRef actor_ref, ClosureT &&closure) {
}
template <class... ArgsT>
void send_closure(ActorRef actor_ref, ArgsT &&... args) {
void send_closure(ActorRef actor_ref, ArgsT &&...args) {
send_closure_impl(actor_ref, create_immediate_closure(std::forward<ArgsT>(args)...));
}
@ -365,7 +381,7 @@ void send_closure_with_promise_later(ActorRef actor_ref, ClosureT &&closure, Pro
}
template <class... ArgsT>
void send_closure_later(ActorRef actor_ref, ArgsT &&... args) {
void send_closure_later(ActorRef actor_ref, ArgsT &&...args) {
send_closure_later_impl(actor_ref, create_delayed_closure(std::forward<ArgsT>(args)...));
}
@ -396,15 +412,17 @@ inline void send_signals_later(ActorRef actor_ref, ActorSignals signals) {
inline void register_actor_info_ptr(core::ActorInfoPtr actor_info_ptr) {
auto state = actor_info_ptr->state().get_flags_unsafe();
actor_info_ptr->on_add_to_queue();
core::SchedulerContext::get()->add_to_queue(std::move(actor_info_ptr), state.get_scheduler_id(), !state.is_shared());
}
template <class T, class... ArgsT>
core::ActorInfoPtr create_actor(core::ActorOptions &options, ArgsT &&... args) noexcept {
core::ActorInfoPtr create_actor(core::ActorOptions &options, ArgsT &&...args) noexcept {
auto *scheduler_context = core::SchedulerContext::get();
if (!options.has_scheduler()) {
options.on_scheduler(scheduler_context->get_scheduler_id());
}
options.with_actor_stat_id(core::ActorTypeStatImpl::get_unique_id<T>());
auto res =
scheduler_context->get_actor_info_creator().create(std::make_unique<T>(std::forward<ArgsT>(args)...), options);
register_actor_info_ptr(res);

View file

@ -114,6 +114,8 @@ void ActorExecutor::start() noexcept {
actor_execute_context_.set_actor(&actor_info_.actor());
actor_stats_ = actor_info_.actor_type_stat();
auto execute_timer = actor_stats_.create_execute_timer();
while (flush_one_signal(signals)) {
if (actor_execute_context_.has_immediate_flags()) {
return;
@ -175,6 +177,11 @@ void ActorExecutor::finish() noexcept {
if (add_to_queue) {
actor_info_ptr = actor_info_.actor().get_actor_info_ptr();
}
if (!flags().is_closed() && flags().is_in_queue()) {
// Must do it while we are locked, so to it optimistically
// we will add actor to queue after unlock OR we are already in a queue OR we will be closed
actor_info_.on_add_to_queue();
}
if (actor_locker_.try_unlock(flags())) {
if (add_to_queue) {
dispatcher_.add_to_queue(std::move(actor_info_ptr), flags().get_scheduler_id(), !flags().is_shared());
@ -193,23 +200,31 @@ bool ActorExecutor::flush_one_signal(ActorSignals &signals) {
}
switch (signal) {
//NB: Signals will be handled in order of their value.
// For clarity it conincides with order in this switch
// For clarity, it coincides with order in this switch
case ActorSignals::Pause:
actor_execute_context_.set_pause();
break;
case ActorSignals::Kill:
case ActorSignals::Kill: {
auto message_timer = actor_stats_.create_message_timer();
actor_execute_context_.set_stop();
break;
case ActorSignals::StartUp:
}
case ActorSignals::StartUp: {
auto message_timer = actor_stats_.create_message_timer();
actor_stats_.created();
actor_info_.actor().start_up();
break;
case ActorSignals::Wakeup:
}
case ActorSignals::Wakeup: {
auto message_timer = actor_stats_.create_message_timer();
actor_info_.actor().wake_up();
break;
}
case ActorSignals::Alarm:
if (actor_execute_context_.get_alarm_timestamp() && actor_execute_context_.get_alarm_timestamp().is_in_past()) {
actor_execute_context_.alarm_timestamp() = Timestamp::never();
actor_info_.set_alarm_timestamp(Timestamp::never());
auto message_timer = actor_stats_.create_message_timer();
actor_info_.actor().alarm();
}
break;
@ -245,6 +260,7 @@ bool ActorExecutor::flush_one_message() {
}
actor_execute_context_.set_link_token(message.get_link_token());
auto message_timer = actor_stats_.create_message_timer();
message.run();
return true;
}
@ -257,7 +273,9 @@ void ActorExecutor::flush_context_flags() {
}
flags_.set_closed(true);
if (!flags_.get_signals().has_signal(ActorSignals::Signal::StartUp)) {
auto message_timer = actor_stats_.create_message_timer();
actor_info_.actor().tear_down();
actor_stats_.destroyed();
}
actor_info_.destroy_actor();
} else {

View file

@ -24,6 +24,7 @@
#include "td/actor/core/ActorMessage.h"
#include "td/actor/core/ActorSignals.h"
#include "td/actor/core/ActorState.h"
#include "td/actor/core/ActorTypeStat.h"
#include "td/actor/core/SchedulerContext.h"
#include "td/utils/format.h"
@ -95,6 +96,7 @@ class ActorExecutor {
ActorInfo &actor_info_;
SchedulerDispatcher &dispatcher_;
Options options_;
ActorTypeStatRef actor_stats_;
ActorLocker actor_locker_{&actor_info_.state(), ActorLocker::Options()
.with_can_execute_paused(options_.from_queue)
.with_is_shared(!options_.has_poll)

View file

@ -19,6 +19,7 @@
#pragma once
#include "td/actor/core/ActorState.h"
#include "td/actor/core/ActorTypeStat.h"
#include "td/actor/core/ActorMailbox.h"
#include "td/utils/Heap.h"
@ -34,8 +35,8 @@ class ActorInfo;
using ActorInfoPtr = SharedObjectPool<ActorInfo>::Ptr;
class ActorInfo : private HeapNode, private ListNode {
public:
ActorInfo(std::unique_ptr<Actor> actor, ActorState::Flags state_flags, Slice name)
: actor_(std::move(actor)), name_(name.begin(), name.size()) {
ActorInfo(std::unique_ptr<Actor> actor, ActorState::Flags state_flags, Slice name, td::uint32 actor_stat_id)
: actor_(std::move(actor)), name_(name.begin(), name.size()), actor_stat_id_(actor_stat_id) {
state_.set_flags_unsafe(state_flags);
VLOG(actor) << "Create actor [" << name_ << "]";
}
@ -58,6 +59,18 @@ class ActorInfo : private HeapNode, private ListNode {
Actor *actor_ptr() const {
return actor_.get();
}
// NB: must be called only when actor is locked
ActorTypeStatRef actor_type_stat() {
auto res = ActorTypeStatManager::get_actor_type_stat(actor_stat_id_, actor_.get());
if (in_queue_since_) {
res.pop_from_queue(in_queue_since_);
in_queue_since_ = 0;
}
return res;
}
void on_add_to_queue() {
in_queue_since_ = td::Clocks::rdtsc();
}
void destroy_actor() {
actor_.reset();
}
@ -103,6 +116,8 @@ class ActorInfo : private HeapNode, private ListNode {
std::atomic<double> alarm_timestamp_at_{0};
ActorInfoPtr pin_;
td::uint64 in_queue_since_{0};
td::uint32 actor_stat_id_{0};
};
} // namespace core

View file

@ -46,10 +46,16 @@ class ActorInfoCreator {
return *this;
}
Options& with_actor_stat_id(td::uint32 new_id) {
actor_stat_id = new_id;
return *this;
}
private:
friend class ActorInfoCreator;
Slice name;
SchedulerId scheduler_id;
td::uint32 actor_stat_id{0};
bool is_shared{true};
bool in_queue{true};
//TODO: rename
@ -65,7 +71,7 @@ class ActorInfoCreator {
flags.set_in_queue(args.in_queue);
flags.set_signals(ActorSignals::one(ActorSignals::StartUp));
auto actor_info_ptr = pool_.alloc(std::move(actor), flags, args.name);
auto actor_info_ptr = pool_.alloc(std::move(actor), flags, args.name, args.actor_stat_id);
actor_info_ptr->actor().set_actor_info_ptr(actor_info_ptr);
return actor_info_ptr;
}

View file

@ -0,0 +1,109 @@
#include "td/actor/core/Actor.h"
#include "td/actor/core/ActorTypeStat.h"
#include "td/actor/core/Scheduler.h"
#include "td/utils/port/thread_local.h"
#include <cxxabi.h>
#include <set>
#include <map>
#include <mutex>
#include <typeindex>
#include <typeinfo>
namespace td {
namespace actor {
namespace core {
class ActorTypeStatRef;
struct ActorTypeStatsTlsEntry {
struct Entry {
std::unique_ptr<ActorTypeStatImpl> stat;
std::optional<std::type_index> o_type_index;
};
std::vector<Entry> by_id;
std::mutex mutex;
template <class F>
void foreach_entry(F &&f) {
std::lock_guard<std::mutex> guard(mutex);
for (auto &entry : by_id) {
f(entry);
}
}
ActorTypeStatRef get_actor_type_stat(td::uint32 id, Actor &actor) {
if (id >= by_id.size()) {
std::lock_guard<std::mutex> guard(mutex);
by_id.resize(id + 1);
}
auto &entry = by_id.at(id);
if (!entry.o_type_index) {
std::lock_guard<std::mutex> guard(mutex);
entry.o_type_index = std::type_index(typeid(actor));
entry.stat = std::make_unique<ActorTypeStatImpl>();
}
return ActorTypeStatRef{entry.stat.get()};
}
};
struct ActorTypeStatsRegistry {
std::mutex mutex;
std::vector<std::shared_ptr<ActorTypeStatsTlsEntry>> entries;
void registry_entry(std::shared_ptr<ActorTypeStatsTlsEntry> entry) {
std::lock_guard<std::mutex> guard(mutex);
entries.push_back(std::move(entry));
}
template <class F>
void foreach_entry(F &&f) {
std::lock_guard<std::mutex> guard(mutex);
for (auto &entry : entries) {
f(*entry);
}
}
};
ActorTypeStatsRegistry registry;
struct ActorTypeStatsTlsEntryRef {
ActorTypeStatsTlsEntryRef() {
entry_ = std::make_shared<ActorTypeStatsTlsEntry>();
registry.registry_entry(entry_);
}
std::shared_ptr<ActorTypeStatsTlsEntry> entry_;
};
static TD_THREAD_LOCAL ActorTypeStatsTlsEntryRef *actor_type_stats_tls_entry = nullptr;
ActorTypeStatRef ActorTypeStatManager::get_actor_type_stat(td::uint32 id, Actor *actor) {
if (!actor || !need_debug()) {
return ActorTypeStatRef{nullptr};
}
td::init_thread_local<ActorTypeStatsTlsEntryRef>(actor_type_stats_tls_entry);
ActorTypeStatsTlsEntry &tls_entry = *actor_type_stats_tls_entry->entry_;
return tls_entry.get_actor_type_stat(id, *actor);
}
std::string ActorTypeStatManager::get_class_name(const char *name) {
int status;
char *real_name = abi::__cxa_demangle(name, nullptr, nullptr, &status);
if (status < 0) {
return name;
}
std::string result = real_name;
std::free(real_name);
return result;
}
ActorTypeStats ActorTypeStatManager::get_stats(double inv_ticks_per_second) {
std::map<std::type_index, ActorTypeStat> stats;
registry.foreach_entry([&](ActorTypeStatsTlsEntry &tls_entry) {
tls_entry.foreach_entry([&](ActorTypeStatsTlsEntry::Entry &entry) {
if (entry.o_type_index) {
stats[entry.o_type_index.value()] += entry.stat->to_stat(inv_ticks_per_second);
}
});
});
return ActorTypeStats{.stats = std::move(stats)};
}
} // namespace core
} // namespace actor
} // namespace td

View file

@ -0,0 +1,395 @@
#pragma once
#include "td/utils/int_types.h"
#include "td/utils/port/Clocks.h"
#include <algorithm>
#include <typeindex>
#include <map>
namespace td {
namespace actor {
namespace core {
class Actor;
struct ActorTypeStat {
// diff (speed)
double created{0};
double executions{0};
double messages{0};
double seconds{0};
// current statistics
td::int64 alive{0};
td::int32 executing{0};
double executing_start{1e20};
// max statistics (TODO: recent_max)
template <class T>
struct MaxStatGroup {
T value_forever{};
T value_10s{};
T value_10m{};
MaxStatGroup &operator+=(const MaxStatGroup<T> &other) {
value_forever = std::max(value_forever, other.value_forever);
value_10s = std::max(value_10s, other.value_10s);
value_10m = std::max(value_10m, other.value_10m);
return *this;
}
};
MaxStatGroup<td::uint32> max_execute_messages;
MaxStatGroup<double> max_message_seconds;
MaxStatGroup<double> max_execute_seconds;
MaxStatGroup<double> max_delay_seconds;
ActorTypeStat &operator+=(const ActorTypeStat &other) {
created += other.created;
executions += other.executions;
messages += other.messages;
seconds += other.seconds;
alive += other.alive;
executing += other.executing;
executing_start = std::min(other.executing_start, executing_start);
max_execute_messages += other.max_execute_messages;
max_message_seconds += other.max_message_seconds;
max_execute_seconds += other.max_execute_seconds;
max_delay_seconds += other.max_delay_seconds;
return *this;
}
ActorTypeStat &operator-=(const ActorTypeStat &other) {
created -= other.created;
executions -= other.executions;
messages -= other.messages;
seconds -= other.seconds;
return *this;
}
ActorTypeStat &operator/=(double t) {
if (t > 1e-2) {
created /= t;
executions /= t;
messages /= t;
seconds /= t;
} else {
created = 0;
executions = 0;
messages = 0;
seconds = 0;
}
return *this;
}
};
struct ActorTypeStatImpl {
public:
ActorTypeStatImpl() {
}
class MessageTimer {
public:
MessageTimer(ActorTypeStatImpl *stat, td::uint64 started_at = Clocks::rdtsc())
: stat_(stat), started_at_(started_at) {
}
MessageTimer(const MessageTimer &) = delete;
MessageTimer(MessageTimer &&) = delete;
MessageTimer &operator=(const MessageTimer &) = delete;
MessageTimer &operator=(MessageTimer &&) = delete;
~MessageTimer() {
if (stat_) {
auto ts = td::Clocks::rdtsc();
stat_->message_finish(ts, ts - started_at_);
}
}
private:
ActorTypeStatImpl *stat_;
td::uint64 started_at_;
};
void created() {
inc(total_created_);
inc(alive_);
}
void destroyed() {
dec(alive_);
}
MessageTimer create_run_timer() {
return MessageTimer{this};
}
void message_finish(td::uint64 ts, td::uint64 ticks) {
inc(total_messages_);
inc(execute_messages_);
add(total_ticks_, ticks);
max_message_ticks_.update(ts, ticks);
}
void on_delay(td::uint64 ts, td::uint64 ticks) {
max_delay_ticks_.update(ts, ticks);
}
void execute_start(td::uint64 ts) {
// TODO: this is mostly protection for recursive actor calls, which curretly should be almost impossible
// But too full handle it, one would use one executing_cnt per thread, so only upper level execution is counted
if (inc(executing_) == 1) {
store(execute_start_, ts);
store(execute_messages_, 0);
}
}
void execute_finish(td::uint64 ts) {
CHECK(executing_ > 0);
if (dec(executing_) == 0) {
max_execute_messages_.update(ts, load(execute_messages_));
max_execute_ticks_.update(ts, ts - load(execute_start_));
inc(total_executions_);
store(execute_start_, 0);
store(execute_messages_, 0);
}
}
template <class T>
static td::uint32 get_unique_id() {
static td::uint32 value = get_next_unique_id();
return value;
}
static td::uint32 get_next_unique_id() {
static std::atomic<td::uint32> next_id_{};
return ++next_id_;
}
ActorTypeStat to_stat(double inv_ticks_per_second) const {
auto execute_start_copy = load(execute_start_);
auto actual_total_ticks = load(total_ticks_);
auto ts = Clocks::rdtsc();
if (execute_start_copy != 0) {
actual_total_ticks += ts - execute_start_copy;
}
auto execute_start = ticks_to_seconds(load(execute_start_), inv_ticks_per_second);
return ActorTypeStat{.created = double(load(total_created_)),
.executions = double(load(total_executions_)),
.messages = double(load(total_messages_)),
.seconds = ticks_to_seconds(actual_total_ticks, inv_ticks_per_second),
.alive = load(alive_),
.executing = load(executing_),
.executing_start = execute_start < 1e-9 ? 1e20 : execute_start,
.max_execute_messages = load(max_execute_messages_),
.max_message_seconds = load_seconds(max_message_ticks_, inv_ticks_per_second),
.max_execute_seconds = load_seconds(max_execute_ticks_, inv_ticks_per_second),
.max_delay_seconds = load_seconds(max_delay_ticks_, inv_ticks_per_second)};
}
private:
static double ticks_to_seconds(td::uint64 ticks, double inv_tick_per_second) {
return double(ticks) * inv_tick_per_second;
}
template <class T>
static T load(const std::atomic<T> &a) {
return a.load(std::memory_order_relaxed);
}
template <class T, class S>
static void store(std::atomic<T> &a, S value) {
a.store(value, std::memory_order_relaxed);
}
template <class T, class S>
static T add(std::atomic<T> &a, S value) {
T new_value = load(a) + value;
store(a, new_value);
return new_value;
}
template <class T>
static T inc(std::atomic<T> &a) {
return add(a, 1);
}
template <class T>
static T dec(std::atomic<T> &a) {
return add(a, -1);
}
template <class T>
static void relax_max(std::atomic<T> &a, T value) {
auto old_value = load(a);
if (value > old_value) {
store(a, value);
}
}
template <class ValueT, int Interval>
class MaxCounter {
alignas(64) std::atomic<ValueT> max_values[2] = {0};
std::atomic<td::uint64> last_update_segment_time = 0;
void update_current_segment(uint64 current_segment_time, uint64 segment_difference) {
if (segment_difference >= 2) {
store(max_values[0], 0);
store(max_values[1], 0);
} else if (segment_difference == 1) {
store(max_values[1 - (current_segment_time & 1)], 0);
}
store(last_update_segment_time, current_segment_time);
}
public:
inline void update(td::uint64 rdtsc, ValueT value) {
auto current_segment_time = rdtsc / (Clocks::rdtsc_frequency() * Interval);
auto segment_difference = current_segment_time - last_update_segment_time;
if (unlikely(segment_difference != 0)) {
update_current_segment(current_segment_time, segment_difference);
}
relax_max(max_values[current_segment_time & 1], value);
}
inline ValueT get_max(uint64_t rdtsc) const {
uint64_t current_segment_time = rdtsc / (Clocks::rdtsc_frequency() * Interval);
uint64_t segment_difference = current_segment_time - load(last_update_segment_time);
if (segment_difference >= 2) {
return 0;
} else if (segment_difference == 1) {
return load(max_values[current_segment_time & 1]);
} else {
return std::max(load(max_values[0]), load(max_values[1]));
}
}
};
template <class T>
struct MaxCounterGroup {
std::atomic<T> max_forever{};
MaxCounter<T, 60 * 10> max_10m;
MaxCounter<T, 10> max_10s;
inline void update(td::uint64 rdtsc, T value) {
relax_max(max_forever, value);
max_10m.update(rdtsc, value);
max_10s.update(rdtsc, value);
}
};
template <class T>
static ActorTypeStat::MaxStatGroup<T> load(const MaxCounterGroup<T> &src) {
auto ts = Clocks::rdtsc();
return {.value_forever = load(src.max_forever),
.value_10s = src.max_10s.get_max(ts),
.value_10m = src.max_10m.get_max(ts)};
}
template <class T>
static ActorTypeStat::MaxStatGroup<double> load_seconds(const MaxCounterGroup<T> &src, double inv_ticks_per_second) {
auto ts = Clocks::rdtsc();
return {.value_forever = ticks_to_seconds(load(src.max_forever), inv_ticks_per_second),
.value_10s = ticks_to_seconds(src.max_10s.get_max(ts), inv_ticks_per_second),
.value_10m = ticks_to_seconds(src.max_10m.get_max(ts), inv_ticks_per_second)};
}
// total (increment only statistics)
std::atomic<td::int64> total_created_{0};
std::atomic<td::uint64> total_executions_{0};
std::atomic<td::uint64> total_messages_{0};
std::atomic<td::uint64> total_ticks_{0};
// current statistics
std::atomic<td::int64> alive_{0};
std::atomic<td::int32> executing_{0};
// max statistics (TODO: recent_max)
MaxCounterGroup<td::uint32> max_execute_messages_;
MaxCounterGroup<td::uint64> max_message_ticks_;
MaxCounterGroup<td::uint64> max_execute_ticks_;
MaxCounterGroup<td::uint64> max_delay_ticks_;
// execute state
std::atomic<td::uint64> execute_start_{0};
std::atomic<td::uint32> execute_messages_{0};
};
class ActorTypeStatRef {
public:
ActorTypeStatImpl *ref_{nullptr};
void created() {
if (!ref_) {
return;
}
ref_->created();
}
void destroyed() {
if (!ref_) {
return;
}
ref_->destroyed();
}
void pop_from_queue(td::uint64 in_queue_since) {
if (!ref_) {
return;
}
CHECK(in_queue_since);
auto ts = td::Clocks::rdtsc();
ref_->on_delay(ts, ts - in_queue_since);
}
void start_execute() {
if (!ref_) {
return;
}
ref_->execute_start(td::Clocks::rdtsc());
}
void finish_execute() {
if (!ref_) {
return;
}
ref_->execute_finish(td::Clocks::rdtsc());
}
ActorTypeStatImpl::MessageTimer create_message_timer() {
if (!ref_) {
return ActorTypeStatImpl::MessageTimer{nullptr, 0};
}
return ActorTypeStatImpl::MessageTimer{ref_};
}
struct ExecuteTimer {
ExecuteTimer() = delete;
ExecuteTimer(const ExecuteTimer &) = delete;
ExecuteTimer(ExecuteTimer &&) = delete;
ExecuteTimer &operator=(const ExecuteTimer &) = delete;
ExecuteTimer &operator=(ExecuteTimer &&) = delete;
ExecuteTimer(ActorTypeStatRef *stat) : stat(stat) {
stat->start_execute();
}
ActorTypeStatRef *stat{};
~ExecuteTimer() {
stat->finish_execute();
}
};
ExecuteTimer create_execute_timer() {
return ExecuteTimer(this);
}
};
// TODO: currently it is implemented via TD_THREAD_LOCAL, so the statistics is global across different schedulers
struct ActorTypeStats {
std::map<std::type_index, ActorTypeStat> stats;
ActorTypeStats &operator-=(const ActorTypeStats &other) {
for (auto &it : other.stats) {
stats.at(it.first) -= it.second;
}
return *this;
}
ActorTypeStats &operator/=(double x) {
for (auto &it : stats) {
it.second /= x;
}
return *this;
}
};
class ActorTypeStatManager {
public:
static ActorTypeStatRef get_actor_type_stat(td::uint32 id, Actor *actor);
static ActorTypeStats get_stats(double inv_ticks_per_second);
static std::string get_class_name(const char *name);
};
} // namespace core
} // namespace actor
} // namespace td

View file

@ -130,27 +130,20 @@ struct LocalQueue {
public:
template <class F>
bool push(T value, F &&overflow_f) {
auto res = std::move(next_);
next_ = std::move(value);
if (res) {
queue_.local_push(res.unwrap(), overflow_f);
queue_.local_push(std::move(value), overflow_f);
return true;
}
return false;
}
bool try_pop(T &message) {
if (!next_) {
return queue_.local_pop(message);
}
message = next_.unwrap();
return true;
}
bool steal(T &message, LocalQueue<T> &other) {
bool steal(T &message, LocalQueue &other) {
return queue_.steal(message, other.queue_);
}
size_t size() const {
return queue_.size();
}
private:
td::optional<T> next_;
StealingQueue<T> queue_;
char pad[TD_CONCURRENCY_PAD - sizeof(optional<T>)];
};
@ -267,11 +260,12 @@ class Scheduler {
bool is_stop_requested() override;
void stop() override;
private:
SchedulerGroupInfo *scheduler_group() const {
SchedulerGroupInfo *scheduler_group() const override {
return scheduler_group_;
}
private:
ActorInfoCreator *creator_;
SchedulerId scheduler_id_;
CpuWorkerId cpu_worker_id_;

View file

@ -38,6 +38,7 @@ class SchedulerDispatcher {
};
struct Debug;
struct SchedulerGroupInfo;
class SchedulerContext : public Context<SchedulerContext>, public SchedulerDispatcher {
public:
virtual ~SchedulerContext() = default;
@ -59,6 +60,7 @@ class SchedulerContext : public Context<SchedulerContext>, public SchedulerDispa
// Debug
virtual Debug &get_debug() = 0;
virtual SchedulerGroupInfo *scheduler_group() const = 0;
};
} // namespace core
} // namespace actor

View file

@ -19,15 +19,20 @@
#include "td/actor/core/ActorLocker.h"
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
#include "td/actor/ActorStats.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/thread.h"
#include "td/utils/port/thread.h"
#include "td/utils/Random.h"
#include "td/utils/Slice.h"
#include "td/utils/StringBuilder.h"
#include "td/utils/tests.h"
#include "td/utils/Time.h"
#include "td/utils/TimedStat.h"
#include "td/utils/port/sleep.h"
#include <array>
#include <atomic>
@ -1102,4 +1107,59 @@ TEST(Actor2, send_vs_close2) {
scheduler.run();
}
}
TEST(Actor2, test_stats) {
Scheduler scheduler({8});
td::actor::set_debug(true);
auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); });
scheduler.run_in_context([watcher = std::move(watcher)] {
class SleepWorker : public Actor {
void loop() override {
// 0.8 load
td::usleep_for(800000);
alarm_timestamp() = td::Timestamp::in(0.2);
}
};
class QueueWorker : public Actor {
void loop() override {
for (int i = 0; i < 20; i++) {
send_closure(actor_id(this), &QueueWorker::ping);
}
alarm_timestamp() = td::Timestamp::in(1.0);
}
void ping() {
}
};
class Master : public Actor {
public:
Master(std::shared_ptr<td::Destructor> watcher) : watcher_(std::move(watcher)) {
}
void start_up() override {
alarm_timestamp() = td::Timestamp::in(1);
stats_ = td::actor::create_actor<ActorStats>("actor_stats");
td::actor::create_actor<SleepWorker>("sleep_worker").release();
td::actor::create_actor<QueueWorker>("queue_worker").release();
}
void alarm() override {
td::actor::send_closure(stats_, &ActorStats::prepare_stats, td::promise_send_closure(actor_id(this), &Master::on_stats));
alarm_timestamp() = td::Timestamp::in(5);
}
void on_stats(td::Result<std::string> r_stats) {
LOG(ERROR) << "\n" << r_stats.ok();
if (--cnt_ == 0) {
stop();
}
}
private:
std::shared_ptr<td::Destructor> watcher_;
td::actor::ActorOwn<ActorStats> stats_;
int cnt_={2};
};
td::actor::create_actor<Master>("Master", watcher).release();
});
scheduler.run();
}
#endif //!TD_THREAD_UNSUPPORTED

View file

@ -19,6 +19,7 @@
#include "td/fec/raptorq/Solver.h"
#include "td/fec/algebra/GaussianElimination.h"
#include "td/fec/algebra/InactivationDecoding.h"
#include "td/utils/ThreadSafeCounter.h"
#include "td/utils/Timer.h"
#include <map>
@ -70,6 +71,7 @@ Result<MatrixGF256> Solver::run(const Rfc::Parameters &p, Span<SymbolRef> symbol
auto C = GaussianElimination::run(std::move(A), std::move(D));
return C;
}
TD_PERF_COUNTER(raptor_solve);
PerfWarningTimer x("solve");
Timer timer;
auto perf_log = [&](Slice message) {

View file

@ -115,6 +115,22 @@ class StealingQueue {
std::atomic_thread_fence(std::memory_order_seq_cst);
}
size_t size() const {
while (true) {
auto head = head_.load();
auto tail = tail_.load(std::memory_order_acquire);
if (tail < head) {
continue;
}
size_t n = tail - head;
if (n > N) {
continue;
}
return n;
}
}
private:
std::atomic<td::int64> head_{0};
std::atomic<td::int64> tail_{0};

View file

@ -137,4 +137,55 @@ class NamedThreadSafeCounter {
Counter counter_;
};
// another class for simplicity, it
struct NamedPerfCounter {
public:
static NamedPerfCounter &get_default() {
static NamedPerfCounter res;
return res;
}
struct PerfCounterRef {
NamedThreadSafeCounter::CounterRef count;
NamedThreadSafeCounter::CounterRef duration;
};
PerfCounterRef get_counter(Slice name) {
return {.count = counter_.get_counter(PSLICE() << name << ".count"),
.duration = counter_.get_counter(PSLICE() << name << ".duration")};
}
struct ScopedPerfCounterRef : public NoCopyOrMove {
PerfCounterRef perf_counter;
uint64 started_at_ticks{td::Clocks::rdtsc()};
~ScopedPerfCounterRef() {
perf_counter.count.add(1);
perf_counter.duration.add(td::Clocks::rdtsc() - started_at_ticks);
}
};
template <class F>
void for_each(F &&f) const {
counter_.for_each(f);
}
void clear() {
counter_.clear();
}
friend StringBuilder &operator<<(StringBuilder &sb, const NamedPerfCounter &counter) {
return sb << counter.counter_;
}
private:
NamedThreadSafeCounter counter_;
};
} // namespace td
#define TD_PERF_COUNTER(name) \
static auto perf_##name = td::NamedPerfCounter::get_default().get_counter(td::Slice(#name)); \
auto scoped_perf_##name = td::NamedPerfCounter::ScopedPerfCounterRef{.perf_counter = perf_##name};
#define TD_PERF_COUNTER_SINCE(name, since) \
static auto perf_##name = td::NamedPerfCounter::get_default().get_counter(td::Slice(#name)); \
auto scoped_perf_##name = \
td::NamedPerfCounter::ScopedPerfCounterRef{.perf_counter = perf_##name, .started_at_ticks = since};

View file

@ -127,4 +127,12 @@ struct Auto {
}
};
struct NoCopyOrMove {
NoCopyOrMove() = default;
NoCopyOrMove(NoCopyOrMove &&) = delete;
NoCopyOrMove(const NoCopyOrMove &) = delete;
NoCopyOrMove &operator=(NoCopyOrMove &&) = delete;
NoCopyOrMove &operator=(const NoCopyOrMove &) = delete;
};
} // namespace td

View file

@ -18,6 +18,7 @@
*/
#include "td/utils/logging.h"
#include "ThreadSafeCounter.h"
#include "td/utils/port/Clocks.h"
#include "td/utils/port/StdStreams.h"
#include "td/utils/port/thread_local.h"
@ -127,6 +128,9 @@ Logger::~Logger() {
slice = MutableCSlice(slice.begin(), slice.begin() + slice.size() - 1);
}
log_.append(slice, log_level_);
// put stats here to avoid conflict with PSTRING and PSLICE
TD_PERF_COUNTER_SINCE(logger, start_at_);
} else {
log_.append(as_cslice(), log_level_);
}
@ -301,5 +305,4 @@ ScopedDisableLog::~ScopedDisableLog() {
set_verbosity_level(sdl_verbosity);
}
}
} // namespace td

View file

@ -40,6 +40,7 @@
#include "td/utils/Slice.h"
#include "td/utils/StackAllocator.h"
#include "td/utils/StringBuilder.h"
#include "td/utils/port/Clocks.h"
#include <atomic>
#include <type_traits>
@ -251,7 +252,8 @@ class Logger {
, log_(log)
, sb_(buffer_.as_slice())
, options_(options)
, log_level_(log_level) {
, log_level_(log_level)
, start_at_(Clocks::rdtsc()) {
}
Logger(LogInterface &log, const LogOptions &options, int log_level, Slice file_name, int line_num, Slice comment);
@ -283,6 +285,7 @@ class Logger {
StringBuilder sb_;
const LogOptions &options_;
int log_level_;
td::uint64 start_at_;
};
namespace detail {
@ -346,5 +349,4 @@ class TsLog : public LogInterface {
lock_.clear(std::memory_order_release);
}
};
} // namespace td

View file

@ -17,6 +17,7 @@
Copyright 2017-2020 Telegram Systems LLP
*/
#pragma once
#include "td/utils/int_types.h"
namespace td {
@ -26,6 +27,62 @@ struct Clocks {
static double system();
static int tz_offset();
#if defined(__i386__)
static __inline__ td::uint64 rdtsc(void) {
unsigned long long int x;
__asm__ volatile("rdtsc" : "=A"(x));
return x;
}
static constexpr td::uint64 rdtsc_frequency(void) {
return 2000'000'000;
}
static constexpr double ticks_per_second() {
return 2e9;
}
static constexpr double inv_ticks_per_second() {
return 0.5e-9;
}
#elif defined(__x86_64__)
static __inline__ td::uint64 rdtsc(void) {
unsigned hi, lo;
__asm__ __volatile__("rdtsc" : "=a"(lo), "=d"(hi));
return ((unsigned long long)lo) | (((unsigned long long)hi) << 32);
}
static constexpr td::uint64 rdtsc_frequency(void) {
return 2000'000'000;
}
static constexpr double ticks_per_second() {
return 2e9;
}
static constexpr double inv_ticks_per_second() {
return 0.5e-9;
}
#elif defined(__aarch64__)
static __inline__ td::uint64 rdtsc(void) {
unsigned long long val;
asm volatile("mrs %0, cntvct_el0" : "=r"(val));
return val;
}
static __inline__ td::uint64 rdtsc_frequency(void) {
unsigned long long val;
asm volatile("mrs %0, cntfrq_el0" : "=r"(val));
return val;
}
static double ticks_per_second() {
return static_cast<double>(rdtsc_frequency());
}
static double inv_ticks_per_second() {
return 1.0 / static_cast<double>(rdtsc_frequency());
}
#endif
};
} // namespace td

View file

@ -668,6 +668,7 @@ engine.validator.signature signature:bytes = engine.validator.Signature;
engine.validator.oneStat key:string value:string = engine.validator.OneStat;
engine.validator.stats stats:(vector engine.validator.oneStat) = engine.validator.Stats;
engine.validator.textStats data:string = engine.validator.TextStats;
engine.validator.controlQueryError code:int message:string = engine.validator.ControlQueryError;
@ -758,6 +759,7 @@ engine.validator.setCollatorOptionsJson json:string = engine.validator.Success;
engine.validator.getCollatorOptionsJson = engine.validator.JsonConfig;
engine.validator.getAdnlStats = adnl.Stats;
engine.validator.getActorTextStats = engine.validator.TextStats;
---types---

Binary file not shown.

View file

@ -1008,6 +1008,32 @@ td::Status ImportShardOverlayCertificateQuery::receive(td::BufferSlice data) {
td::TerminalIO::out() << "successfully sent certificate to overlay manager\n";
return td::Status::OK();
}
td::Status GetActorStatsQuery::run() {
auto r_file_name = tokenizer_.get_token<std::string>();
if (r_file_name.is_ok()) {
file_name_ = r_file_name.move_as_ok();
}
return td::Status::OK();
}
td::Status GetActorStatsQuery::send() {
auto b = ton::create_serialize_tl_object<ton::ton_api::engine_validator_getActorTextStats>();
td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise());
return td::Status::OK();
}
td::Status GetActorStatsQuery::receive(td::BufferSlice data) {
TRY_RESULT_PREFIX(f, ton::fetch_tl_object<ton::ton_api::engine_validator_textStats>(data.as_slice(), true),
"received incorrect answer: ");
if (file_name_.empty()) {
td::TerminalIO::out() << f->data_;
} else {
std::ofstream sb(file_name_);
sb << f->data_;
sb << std::flush;
td::TerminalIO::output(std::string("wrote stats to " + file_name_ + "\n"));
}
return td::Status::OK();
}
td::Status GetPerfTimerStatsJsonQuery::run() {
TRY_RESULT_ASSIGN(file_name_, tokenizer_.get_token<std::string>());

View file

@ -1076,6 +1076,28 @@ class ImportShardOverlayCertificateQuery : public Query {
std::string in_file_;
};
class GetActorStatsQuery : public Query {
public:
GetActorStatsQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer)
: Query(console, std::move(tokenizer)) {
}
td::Status run() override;
td::Status send() override;
td::Status receive(td::BufferSlice data) override;
static std::string get_name() {
return "getactorstats";
}
static std::string get_help() {
return "getactorstats [<outfile>]\tget actor stats and print it either in stdout or in <outfile>";
}
std::string name() const override {
return get_name();
}
private:
std::string file_name_;
};
class GetPerfTimerStatsJsonQuery : public Query {
public:
GetPerfTimerStatsJsonQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer)

View file

@ -140,6 +140,7 @@ void ValidatorEngineConsole::run() {
add_query_runner(std::make_unique<QueryRunnerImpl<GetOverlaysStatsJsonQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<ImportShardOverlayCertificateQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<SignShardOverlayCertificateQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<GetActorStatsQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<GetPerfTimerStatsJsonQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<GetShardOutQueueSizeQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<SetExtMessagesBroadcastDisabledQuery>>());

View file

@ -3540,6 +3540,31 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getOverla
});
}
void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getActorTextStats &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise) {
if (!(perm & ValidatorEnginePermissions::vep_default)) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized")));
return;
}
if (validator_manager_.empty()) {
promise.set_value(
create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "validator manager not started")));
return;
}
auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<std::string> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
} else {
auto r = R.move_as_ok();
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::engine_validator_textStats>(std::move(r)));
}
});
td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::prepare_actor_stats,
std::move(P));
}
void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getPerfTimerStats &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise) {
if (!(perm & ValidatorEnginePermissions::vep_default)) {
@ -4319,7 +4344,9 @@ int main(int argc, char *argv[]) {
}
if (need_scheduler_status_flag.exchange(false)) {
LOG(ERROR) << "DUMPING SCHEDULER STATISTICS";
scheduler.get_debug().dump();
td::StringBuilder sb;
scheduler.get_debug().dump(sb);
LOG(ERROR) << "GOT SCHEDULER STATISTICS\n" << sb.as_cslice();
}
if (rotate_logs_flags.exchange(false)) {
if (td::log_interface) {

View file

@ -474,6 +474,8 @@ class ValidatorEngine : public td::actor::Actor {
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_getOverlaysStats &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_getActorTextStats &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_getPerfTimerStats &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_getShardOutQueueSize &query, td::BufferSlice data,

View file

@ -142,6 +142,7 @@ void CellDbIn::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promi
}
void CellDbIn::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise) {
TD_PERF_COUNTER(celldb_store_cell);
td::PerfWarningTimer timer{"storecell", 0.1};
auto key_hash = get_key_hash(block_id);
auto R = get_block(key_hash);
@ -284,6 +285,7 @@ void CellDbIn::gc_cont(BlockHandle handle) {
}
void CellDbIn::gc_cont2(BlockHandle handle) {
TD_PERF_COUNTER(celldb_gc_cell);
td::PerfWarningTimer timer{"gccell", 0.1};
auto key_hash = get_key_hash(handle->id());

View file

@ -230,6 +230,7 @@ void WaitBlockState::got_block_data(td::Ref<BlockData> data) {
}
void WaitBlockState::apply() {
TD_PERF_COUNTER(apply_block_to_state);
td::PerfWarningTimer t{"applyblocktostate", 0.1};
auto S = prev_state_.write().apply_block(handle_->id(), block_);
if (S.is_error()) {

View file

@ -290,6 +290,7 @@ td::Result<std::pair<td::Ref<ShardState>, td::Ref<ShardState>>> ShardStateQ::spl
}
td::Result<td::BufferSlice> ShardStateQ::serialize() const {
TD_PERF_COUNTER(serialize_state);
td::PerfWarningTimer perf_timer_{"serializestate", 0.1};
if (!data.is_null()) {
return data.clone();
@ -314,6 +315,7 @@ td::Result<td::BufferSlice> ShardStateQ::serialize() const {
}
td::Status ShardStateQ::serialize_to_file(td::FileFd& fd) const {
TD_PERF_COUNTER(serialize_state_to_file);
td::PerfWarningTimer perf_timer_{"serializestate", 0.1};
if (!data.is_null()) {
auto cur_data = data.clone();

View file

@ -370,6 +370,10 @@ class ValidatorManagerImpl : public ValidatorManager {
UNREACHABLE();
}
void prepare_actor_stats(td::Promise<std::string> promise) override {
UNREACHABLE();
}
void prepare_perf_timer_stats(td::Promise<std::vector<PerfTimerStats>> promise) override {
UNREACHABLE();
}

View file

@ -432,6 +432,10 @@ class ValidatorManagerImpl : public ValidatorManager {
UNREACHABLE();
}
void prepare_actor_stats(td::Promise<std::string> promise) override {
UNREACHABLE();
}
void prepare_perf_timer_stats(td::Promise<std::vector<PerfTimerStats>> promise) override {
UNREACHABLE();
}

View file

@ -1622,6 +1622,7 @@ void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast, bool c
void ValidatorManagerImpl::start_up() {
db_ = create_db_actor(actor_id(this), db_root_, opts_);
actor_stats_ = td::actor::create_actor<td::actor::ActorStats>("actor_stats");
lite_server_cache_ = create_liteserver_cache_actor(actor_id(this), db_root_);
token_manager_ = td::actor::create_actor<TokenManager>("tokenmanager");
td::mkdir(db_root_ + "/tmp/").ensure();
@ -2771,6 +2772,10 @@ void ValidatorManagerImpl::send_peek_key_block_request() {
send_get_next_key_blocks_request(last_known_key_block_handle_->id(), 1, std::move(P));
}
void ValidatorManagerImpl::prepare_actor_stats(td::Promise<std::string> promise) {
send_closure(actor_stats_, &td::actor::ActorStats::prepare_stats, std::move(promise));
}
void ValidatorManagerImpl::prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise) {
auto merger = StatsMerger::create(std::move(promise));

View file

@ -21,6 +21,7 @@
#include "common/refcnt.hpp"
#include "interfaces/validator-manager.h"
#include "interfaces/db.h"
#include "td/actor/ActorStats.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/SharedSlice.h"
#include "td/utils/buffer.h"
@ -582,6 +583,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise) override;
void prepare_actor_stats(td::Promise<std::string> promise) override;
void prepare_perf_timer_stats(td::Promise<std::vector<PerfTimerStats>> promise) override;
void add_perf_timer_stat(std::string name, double duration) override;
@ -678,6 +681,7 @@ class ValidatorManagerImpl : public ValidatorManager {
private:
std::unique_ptr<Callback> callback_;
td::actor::ActorOwn<Db> db_;
td::actor::ActorOwn<td::actor::ActorStats> actor_stats_;
bool started_ = false;
bool allow_validate_ = false;

View file

@ -275,6 +275,7 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void run_ext_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) = 0;
virtual void prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise) = 0;
virtual void prepare_actor_stats(td::Promise<std::string> promise) = 0;
virtual void prepare_perf_timer_stats(td::Promise<std::vector<PerfTimerStats>> promise) = 0;
virtual void add_perf_timer_stat(std::string name, double duration) = 0;