diff --git a/catchain/catchain-receiver.cpp b/catchain/catchain-receiver.cpp index a6ecf061..d7e19031 100644 --- a/catchain/catchain-receiver.cpp +++ b/catchain/catchain-receiver.cpp @@ -27,6 +27,8 @@ #include "catchain-receiver.hpp" +#include "td/utils/ThreadSafeCounter.h" + namespace ton { namespace catchain { @@ -685,6 +687,7 @@ void CatChainReceiverImpl::receive_query_from_overlay(adnl::AdnlNodeIdShort src, promise.set_error(td::Status::Error(ErrorCode::notready, "db not read")); return; } + TD_PERF_COUNTER(catchain_query_process); td::PerfWarningTimer t{"catchain query process", 0.05}; auto F = fetch_tl_object(data.clone(), true); if (F.is_error()) { diff --git a/crypto/block/transaction.cpp b/crypto/block/transaction.cpp index dbf0199e..2e4cda22 100644 --- a/crypto/block/transaction.cpp +++ b/crypto/block/transaction.cpp @@ -2860,22 +2860,26 @@ td::Status Transaction::check_state_limits(const SizeLimitsConfig& size_limits, vm::CellStorageStat storage_stat; storage_stat.limit_cells = size_limits.max_acc_state_cells; storage_stat.limit_bits = size_limits.max_acc_state_bits; - td::Timer timer; - auto add_used_storage = [&](const td::Ref& cell) -> td::Status { - if (cell.not_null()) { - TRY_RESULT(res, storage_stat.add_used_storage(cell)); - if (res.max_merkle_depth > max_allowed_merkle_depth) { - return td::Status::Error("too big merkle depth"); + { + TD_PERF_COUNTER(transaction_storage_stat_a); + td::Timer timer; + auto add_used_storage = [&](const td::Ref& cell) -> td::Status { + if (cell.not_null()) { + TRY_RESULT(res, storage_stat.add_used_storage(cell)); + if (res.max_merkle_depth > max_allowed_merkle_depth) { + return td::Status::Error("too big merkle depth"); + } } + return td::Status::OK(); + }; + TRY_STATUS(add_used_storage(new_code)); + TRY_STATUS(add_used_storage(new_data)); + TRY_STATUS(add_used_storage(new_library)); + if (timer.elapsed() > 0.1) { + LOG(INFO) << "Compute used storage took " << timer.elapsed() << "s"; } - return td::Status::OK(); - }; - TRY_STATUS(add_used_storage(new_code)); - TRY_STATUS(add_used_storage(new_data)); - TRY_STATUS(add_used_storage(new_library)); - if (timer.elapsed() > 0.1) { - LOG(INFO) << "Compute used storage took " << timer.elapsed() << "s"; } + if (acc_status == Account::acc_active) { storage_stat.clear_limit(); } else { @@ -3156,6 +3160,7 @@ bool Transaction::compute_state() { if (new_stats) { stats = new_stats.unwrap(); } else { + TD_PERF_COUNTER(transaction_storage_stat_b); td::Timer timer; stats.add_used_storage(Ref(storage)).ensure(); if (timer.elapsed() > 0.1) { diff --git a/crypto/vm/db/CellStorage.cpp b/crypto/vm/db/CellStorage.cpp index 303d4650..5dac2cff 100644 --- a/crypto/vm/db/CellStorage.cpp +++ b/crypto/vm/db/CellStorage.cpp @@ -33,6 +33,7 @@ class RefcntCellStorer { template void store(StorerT &storer) const { + TD_PERF_COUNTER(cell_store); using td::store; if (as_boc_) { td::int32 tag = -1; @@ -151,6 +152,7 @@ CellLoader::CellLoader(std::shared_ptr reader, std::function CellLoader::load(td::Slice hash, bool need_data, ExtCellCreator &ext_cell_creator) { //LOG(ERROR) << "Storage: load cell " << hash.size() << " " << td::base64_encode(hash); + TD_PERF_COUNTER(cell_load); LoadResult res; std::string serialized; TRY_RESULT(get_status, reader_->get(hash, serialized)); diff --git a/emulator/CMakeLists.txt b/emulator/CMakeLists.txt index dc8cbf62..23beb158 100644 --- a/emulator/CMakeLists.txt +++ b/emulator/CMakeLists.txt @@ -65,4 +65,4 @@ if (USE_EMSCRIPTEN) target_compile_options(emulator-emscripten PRIVATE -fexceptions) endif() -install(TARGETS emulator LIBRARY DESTINATION lib) +install(TARGETS emulator ARCHIVE DESTINATION lib LIBRARY DESTINATION lib) diff --git a/tdactor/CMakeLists.txt b/tdactor/CMakeLists.txt index 46dd0335..98b900a1 100644 --- a/tdactor/CMakeLists.txt +++ b/tdactor/CMakeLists.txt @@ -3,16 +3,19 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR) #SOURCE SETS set(TDACTOR_SOURCE td/actor/core/ActorExecutor.cpp + td/actor/core/ActorTypeStat.cpp td/actor/core/CpuWorker.cpp td/actor/core/IoWorker.cpp td/actor/core/Scheduler.cpp + td/actor/ActorStats.cpp td/actor/MultiPromise.cpp td/actor/actor.h td/actor/ActorId.h td/actor/ActorOwn.h td/actor/ActorShared.h + td/actor/ActorStats.h td/actor/common.h td/actor/PromiseFuture.h td/actor/MultiPromise.h @@ -27,6 +30,7 @@ set(TDACTOR_SOURCE td/actor/core/ActorMessage.h td/actor/core/ActorSignals.h td/actor/core/ActorState.h + td/actor/core/ActorTypeStat.h td/actor/core/CpuWorker.h td/actor/core/Context.h td/actor/core/IoWorker.h diff --git a/tdactor/td/actor/ActorStats.cpp b/tdactor/td/actor/ActorStats.cpp new file mode 100644 index 00000000..ad037204 --- /dev/null +++ b/tdactor/td/actor/ActorStats.cpp @@ -0,0 +1,245 @@ +#include "ActorStats.h" + +#include "td/utils/ThreadSafeCounter.h" +namespace td { +namespace actor { +void td::actor::ActorStats::start_up() { + auto now = td::Time::now(); + for (std::size_t i = 0; i < SIZE; i++) { + stat_[i] = td::TimedStat>(DURATIONS[i], now); + stat_[i].add_event(ActorTypeStats(), now); + } + begin_ts_ = td::Timestamp::now(); + begin_ticks_ = Clocks::rdtsc(); + loop(); +} +double ActorStats::estimate_inv_ticks_per_second() { + auto now = td::Timestamp::now(); + auto elapsed_seconds = now.at() - begin_ts_.at(); + auto now_ticks = td::Clocks::rdtsc(); + auto elapsed_ticks = now_ticks - begin_ticks_; + auto estimated_inv_ticks_per_second = + elapsed_seconds > 0.1 ? elapsed_seconds / double(elapsed_ticks) : Clocks::inv_ticks_per_second(); + return estimated_inv_ticks_per_second; +} + +std::string ActorStats::prepare_stats() { + auto estimated_inv_ticks_per_second = estimate_inv_ticks_per_second(); + + auto current_stats = td::actor::ActorTypeStatManager::get_stats(estimated_inv_ticks_per_second); + auto now = td::Timestamp::now(); + auto now_ticks = Clocks::rdtsc(); + + update(now); + + // Lets look at recent stats first + auto load_stats = [&](auto &timed_stat) { + auto res = current_stats; + auto &since = timed_stat.get_stat(now.at()); + auto duration = since.get_duration(estimated_inv_ticks_per_second); + if (since.first_) { + res -= since.first_.value(); + } + res /= duration; + return res.stats; + }; + auto stats_10s = load_stats(stat_[0]); + auto stats_10m = load_stats(stat_[1]); + current_stats /= double(now_ticks - begin_ticks_) * estimated_inv_ticks_per_second; + auto stats_forever = current_stats.stats; + + std::map current_perf_map; + std::map perf_map_10s; + std::map perf_map_10m; + std::map perf_values; + td::NamedPerfCounter::get_default().for_each( + [&](td::Slice name, td::int64 value_int64) { perf_values[name.str()] = double(value_int64); }); + for (auto &value_it : perf_values) { + const auto &name = value_it.first; + auto value = value_it.second; + + auto &perf_stat = pef_stats_[name]; + auto load_perf_stats = [&](auto &timed_stat, auto &m) { + double res = double(value); + auto &since = timed_stat.get_stat(now.at()); + auto duration = since.get_duration(estimated_inv_ticks_per_second); + if (since.first_) { + res -= since.first_.value(); + } + if (td::ends_with(name, ".duration")) { + res *= estimated_inv_ticks_per_second; + } + // m[name + ".raw"] = res; + // m[name + ".range"] = duration; + res /= duration; + return res; + }; + perf_map_10s[name] = load_perf_stats(perf_stat.perf_stat_[0], perf_map_10s); + perf_map_10m[name] = load_perf_stats(perf_stat.perf_stat_[1], perf_map_10m); + + auto current_duration = (double(now_ticks - begin_ticks_) * estimated_inv_ticks_per_second); + if (td::ends_with(name, ".duration")) { + value *= estimated_inv_ticks_per_second; + } + current_perf_map[name] = double(value) / current_duration; + // current_perf_map[name + ".raw"] = double(value); + // current_perf_map[name + ".range"] = double(now_ticks - begin_ticks_) * estimated_inv_ticks_per_second; + }; + + td::StringBuilder sb; + sb << "================================= PERF COUNTERS ================================\n"; + sb << "ticks_per_second_estimate\t" << 1.0 / estimated_inv_ticks_per_second << "\n"; + for (auto &it : perf_map_10s) { + const std::string &name = it.first; + auto dot_at = name.rfind('.'); + CHECK(dot_at != std::string::npos); + auto base_name = name.substr(0, dot_at); + auto rest_name = name.substr(dot_at + 1); + td::Slice new_rest_name = rest_name; + if (rest_name == "count") { + new_rest_name = "qps"; + } + if (rest_name == "duration") { + new_rest_name = "load"; + } + auto rewrite_name = PSTRING() << base_name << "." << new_rest_name; + sb << rewrite_name << "\t" << perf_map_10s[name] << " " << perf_map_10m[name] << " " << current_perf_map[name] + << "\n"; + } + sb << "\n"; + sb << "================================= ACTORS STATS =================================\n"; + double max_delay = 0; + ActorTypeStat sum_stat_forever; + ActorTypeStat sum_stat_10m; + ActorTypeStat sum_stat_10s; + for (auto &it : stats_forever) { + sum_stat_forever += it.second; + } + for (auto &it : stats_10m) { + sum_stat_10m += it.second; + } + for (auto &it : stats_10s) { + sum_stat_10s += it.second; + } + sb << "\n"; + + auto do_describe = [&](auto &&sb, const ActorTypeStat &stat_10s, const ActorTypeStat &stat_10m, + const ActorTypeStat &stat_forever) { + sb() << "load_per_second:\t" << stat_10s.seconds << " " << stat_10m.seconds << " " << stat_forever.seconds << "\n"; + sb() << "messages_per_second:\t" << stat_10s.messages << " " << stat_10m.messages << " " << stat_forever.messages + << "\n"; + + sb() << "max_execute_messages:\t" << stat_forever.max_execute_messages.value_10s << " " + << stat_forever.max_execute_messages.value_10m << " " << stat_forever.max_execute_messages.value_forever + << "\n"; + + sb() << "max_execute_seconds:\t" << stat_forever.max_execute_seconds.value_10s << "s" + << " " << stat_forever.max_execute_seconds.value_10m << "s" + << " " << stat_forever.max_execute_seconds.value_forever << "s\n"; + sb() << "max_message_seconds:\t" << stat_forever.max_message_seconds.value_10s << " " + << stat_forever.max_message_seconds.value_10m << " " << stat_forever.max_message_seconds.value_forever << "\n"; + sb() << "created_per_second:\t" << stat_10s.created << " " << stat_10m.created << " " << stat_forever.created + << "\n"; + + auto executing_for = + stat_forever.executing_start > 1e15 + ? 0 + : double(td::Clocks::rdtsc()) * estimated_inv_ticks_per_second - stat_forever.executing_start; + sb() << "max_delay:\t" << stat_forever.max_delay_seconds.value_10s << "s " + << stat_forever.max_delay_seconds.value_10m << "s " << stat_forever.max_delay_seconds.value_forever << "s\n"; + sb() << "" + << "alive: " << stat_forever.alive << " executing: " << stat_forever.executing + << " max_executing_for: " << executing_for << "s\n"; + }; + + auto describe = [&](td::StringBuilder &sb, std::type_index actor_type_index) { + auto stat_10s = stats_10s[actor_type_index]; + auto stat_10m = stats_10m[actor_type_index]; + auto stat_forever = stats_forever[actor_type_index]; + do_describe([&sb]() -> td::StringBuilder & { return sb << "\t\t"; }, stat_10s, stat_10m, stat_forever); + }; + + sb << "Cummulative stats:\n"; + do_describe([&sb]() -> td::StringBuilder & { return sb << "\t"; }, sum_stat_10s, sum_stat_10m, sum_stat_forever); + sb << "\n"; + + auto top_k_by = [&](auto &stats_map, size_t k, std::string description, auto by) { + auto stats = td::transform(stats_map, [](auto &it) { return std::make_pair(it.first, it.second); }); + k = std::min(k, stats.size()); + std::partial_sort(stats.begin(), stats.begin() + k, stats.end(), [&](auto &a, auto &b) { return by(a) > by(b); }); + bool is_first = true; + for (size_t i = 0; i < k; i++) { + auto value = by(stats[i]); + if (value < 1e-9) { + break; + } + if (is_first) { + sb << "top actors by " << description << "\n"; + is_first = false; + } + sb << "\t#" << i << ": " << ActorTypeStatManager::get_class_name(stats[i].first.name()) << "\t" << value << "\n"; + } + sb << "\n"; + }; + using Entry = std::pair; + static auto cutoff = [](auto x, auto min_value) { return x < min_value ? decltype(x){} : x; }; + top_k_by(stats_10s, 10, "load_10s", [](auto &x) { return cutoff(x.second.seconds, 0.005); }); + + top_k_by(stats_10m, 10, "load_10m", [](auto &x) { return cutoff(x.second.seconds, 0.005); }); + top_k_by(stats_forever, 10, "max_execute_seconds_10m", + [](Entry &x) { return cutoff(x.second.max_execute_seconds.value_10m, 0.5); }); + auto rdtsc_seconds = double(now_ticks) * estimated_inv_ticks_per_second; + top_k_by(stats_forever, 10, "executing_for", [&](Entry &x) { + if (x.second.executing_start > 1e15) { + return 0.0; + } + return rdtsc_seconds - x.second.executing_start; + }); + top_k_by(stats_forever, 10, "max_execute_messages_10m", + [](Entry &x) { return cutoff(x.second.max_execute_messages.value_10m, 10u); }); + + auto stats = td::transform(stats_forever, [](auto &it) { return std::make_pair(it.first, it.second); }); + + auto main_key = [&](std::type_index actor_type_index) { + auto stat_10s = stats_10s[actor_type_index]; + auto stat_10m = stats_10m[actor_type_index]; + auto stat_forever = stats_forever[actor_type_index]; + return std::make_tuple(cutoff(std::max(stat_10s.seconds, stat_10m.seconds), 0.1), + cutoff(stat_forever.max_execute_seconds.value_10m, 0.5), stat_forever.seconds); + }; + std::sort(stats.begin(), stats.end(), + [&](auto &left, auto &right) { return main_key(left.first) > main_key(right.first); }); + auto debug = Debug(SchedulerContext::get()->scheduler_group()); + debug.dump(sb); + sb << "All actors:\n"; + for (auto &it : stats) { + sb << "\t" << ActorTypeStatManager::get_class_name(it.first.name()) << "\n"; + auto key = main_key(it.first); + describe(sb, it.first); + } + sb << "\n"; + return sb.as_cslice().str(); +} +ActorStats::PefStat::PefStat() { + for (std::size_t i = 0; i < SIZE; i++) { + perf_stat_[i] = td::TimedStat>(DURATIONS[i], td::Time::now()); + perf_stat_[i].add_event(0, td::Time::now()); + } +} + +void ActorStats::update(td::Timestamp now) { + auto stat = td::actor::ActorTypeStatManager::get_stats(estimate_inv_ticks_per_second()); + for (auto &timed_stat : stat_) { + timed_stat.add_event(stat, now.at()); + } + NamedPerfCounter::get_default().for_each([&](td::Slice name, td::int64 value) { + auto &stat = pef_stats_[name.str()].perf_stat_; + for (auto &timed_stat : stat) { + timed_stat.add_event(value, now.at()); + } + }); +} +constexpr int ActorStats::DURATIONS[SIZE]; +constexpr const char *ActorStats::DESCR[SIZE]; +} // namespace actor +} // namespace td diff --git a/tdactor/td/actor/ActorStats.h b/tdactor/td/actor/ActorStats.h new file mode 100644 index 00000000..00f7ebeb --- /dev/null +++ b/tdactor/td/actor/ActorStats.h @@ -0,0 +1,52 @@ +#pragma once +#include "td/actor/actor.h" +#include "td/utils/TimedStat.h" +namespace td { +namespace actor { + +class ActorStats : public td::actor::Actor { + public: + ActorStats() { + } + void start_up() override; + double estimate_inv_ticks_per_second(); + std::string prepare_stats(); + + private: + template + struct StatStorer { + void on_event(const T &event) { + if (!first_) { + first_ = event; + first_ts_ = Clocks::rdtsc(); + } + } + double get_duration(double inv_ticks_per_second) const { + if (first_) { + return std::max(1.0, (Clocks::rdtsc() - first_ts_) * inv_ticks_per_second); + } + return 1.0; + } + td::optional first_; + td::uint64 first_ts_; + }; + static constexpr std::size_t SIZE = 2; + static constexpr const char *DESCR[SIZE] = {"10sec", "10min"}; + static constexpr int DURATIONS[SIZE] = {10, 10 * 60}; + td::TimedStat> stat_[SIZE]; + struct PefStat { + PefStat(); + td::TimedStat> perf_stat_[SIZE]; + }; + std::map pef_stats_; + td::Timestamp begin_ts_; + td::uint64 begin_ticks_{}; + void loop() override { + alarm_timestamp() = td::Timestamp::in(5.0); + update(td::Timestamp::now()); + } + void update(td::Timestamp now); +}; + +} // namespace actor +} // namespace td diff --git a/tdactor/td/actor/common.h b/tdactor/td/actor/common.h index d3858974..222db743 100644 --- a/tdactor/td/actor/common.h +++ b/tdactor/td/actor/common.h @@ -19,6 +19,7 @@ #pragma once #include "td/actor/core/Actor.h" #include "td/actor/core/ActorSignals.h" +#include "td/actor/core/ActorTypeStat.h" #include "td/actor/core/SchedulerId.h" #include "td/actor/core/SchedulerContext.h" #include "td/actor/core/Scheduler.h" @@ -68,7 +69,7 @@ using core::set_debug; struct Debug { public: Debug() = default; - Debug(std::shared_ptr group_info) : group_info_(std::move(group_info)) { + Debug(core::SchedulerGroupInfo *group_info) : group_info_(group_info) { } template void for_each(F &&f) { @@ -80,18 +81,29 @@ struct Debug { } } - void dump() { - for_each([](core::Debug &debug) { + void dump(td::StringBuilder &sb) { + sb << "list of active actors with names:\n"; + for_each([&](core::Debug &debug) { core::DebugInfo info; debug.read(info); if (info.is_active) { - LOG(ERROR) << info.name << " " << td::format::as_time(Time::now() - info.start_at); + sb << "\t\"" << info.name << "\" is active for " << Time::now() - info.start_at << "s\n"; } }); + sb << "\nsizes of cpu local queues:\n"; + for (auto &scheduler : group_info_->schedulers) { + for (size_t i = 0; i < scheduler.cpu_threads_count; i++) { + auto size = scheduler.cpu_local_queue[i].size(); + if (size != 0) { + sb << "\tcpu#" << i << " queue.size() = " << size << "\n"; + } + } + } + sb << "\n"; } private: - std::shared_ptr group_info_; + core::SchedulerGroupInfo *group_info_; }; class Scheduler { @@ -142,7 +154,7 @@ class Scheduler { } Debug get_debug() { - return Debug{group_info_}; + return Debug{group_info_.get()}; } bool run() { @@ -200,6 +212,10 @@ class Scheduler { } }; +using core::ActorTypeStat; +using core::ActorTypeStatManager; +using core::ActorTypeStats; + // Some helper functions. Not part of public interface and not part // of namespace core namespace detail { @@ -324,7 +340,7 @@ void send_closure_impl(ActorRef actor_ref, ClosureT &&closure) { } template -void send_closure(ActorRef actor_ref, ArgsT &&... args) { +void send_closure(ActorRef actor_ref, ArgsT &&...args) { send_closure_impl(actor_ref, create_immediate_closure(std::forward(args)...)); } @@ -365,7 +381,7 @@ void send_closure_with_promise_later(ActorRef actor_ref, ClosureT &&closure, Pro } template -void send_closure_later(ActorRef actor_ref, ArgsT &&... args) { +void send_closure_later(ActorRef actor_ref, ArgsT &&...args) { send_closure_later_impl(actor_ref, create_delayed_closure(std::forward(args)...)); } @@ -396,15 +412,17 @@ inline void send_signals_later(ActorRef actor_ref, ActorSignals signals) { inline void register_actor_info_ptr(core::ActorInfoPtr actor_info_ptr) { auto state = actor_info_ptr->state().get_flags_unsafe(); + actor_info_ptr->on_add_to_queue(); core::SchedulerContext::get()->add_to_queue(std::move(actor_info_ptr), state.get_scheduler_id(), !state.is_shared()); } template -core::ActorInfoPtr create_actor(core::ActorOptions &options, ArgsT &&... args) noexcept { +core::ActorInfoPtr create_actor(core::ActorOptions &options, ArgsT &&...args) noexcept { auto *scheduler_context = core::SchedulerContext::get(); if (!options.has_scheduler()) { options.on_scheduler(scheduler_context->get_scheduler_id()); } + options.with_actor_stat_id(core::ActorTypeStatImpl::get_unique_id()); auto res = scheduler_context->get_actor_info_creator().create(std::make_unique(std::forward(args)...), options); register_actor_info_ptr(res); diff --git a/tdactor/td/actor/core/ActorExecutor.cpp b/tdactor/td/actor/core/ActorExecutor.cpp index 267758d5..d1542242 100644 --- a/tdactor/td/actor/core/ActorExecutor.cpp +++ b/tdactor/td/actor/core/ActorExecutor.cpp @@ -114,6 +114,8 @@ void ActorExecutor::start() noexcept { actor_execute_context_.set_actor(&actor_info_.actor()); + actor_stats_ = actor_info_.actor_type_stat(); + auto execute_timer = actor_stats_.create_execute_timer(); while (flush_one_signal(signals)) { if (actor_execute_context_.has_immediate_flags()) { return; @@ -175,6 +177,11 @@ void ActorExecutor::finish() noexcept { if (add_to_queue) { actor_info_ptr = actor_info_.actor().get_actor_info_ptr(); } + if (!flags().is_closed() && flags().is_in_queue()) { + // Must do it while we are locked, so to it optimistically + // we will add actor to queue after unlock OR we are already in a queue OR we will be closed + actor_info_.on_add_to_queue(); + } if (actor_locker_.try_unlock(flags())) { if (add_to_queue) { dispatcher_.add_to_queue(std::move(actor_info_ptr), flags().get_scheduler_id(), !flags().is_shared()); @@ -193,23 +200,31 @@ bool ActorExecutor::flush_one_signal(ActorSignals &signals) { } switch (signal) { //NB: Signals will be handled in order of their value. - // For clarity it conincides with order in this switch + // For clarity, it coincides with order in this switch case ActorSignals::Pause: actor_execute_context_.set_pause(); break; - case ActorSignals::Kill: + case ActorSignals::Kill: { + auto message_timer = actor_stats_.create_message_timer(); actor_execute_context_.set_stop(); break; - case ActorSignals::StartUp: + } + case ActorSignals::StartUp: { + auto message_timer = actor_stats_.create_message_timer(); + actor_stats_.created(); actor_info_.actor().start_up(); break; - case ActorSignals::Wakeup: + } + case ActorSignals::Wakeup: { + auto message_timer = actor_stats_.create_message_timer(); actor_info_.actor().wake_up(); break; + } case ActorSignals::Alarm: if (actor_execute_context_.get_alarm_timestamp() && actor_execute_context_.get_alarm_timestamp().is_in_past()) { actor_execute_context_.alarm_timestamp() = Timestamp::never(); actor_info_.set_alarm_timestamp(Timestamp::never()); + auto message_timer = actor_stats_.create_message_timer(); actor_info_.actor().alarm(); } break; @@ -245,6 +260,7 @@ bool ActorExecutor::flush_one_message() { } actor_execute_context_.set_link_token(message.get_link_token()); + auto message_timer = actor_stats_.create_message_timer(); message.run(); return true; } @@ -257,7 +273,9 @@ void ActorExecutor::flush_context_flags() { } flags_.set_closed(true); if (!flags_.get_signals().has_signal(ActorSignals::Signal::StartUp)) { + auto message_timer = actor_stats_.create_message_timer(); actor_info_.actor().tear_down(); + actor_stats_.destroyed(); } actor_info_.destroy_actor(); } else { diff --git a/tdactor/td/actor/core/ActorExecutor.h b/tdactor/td/actor/core/ActorExecutor.h index 366cb754..dd86b5ae 100644 --- a/tdactor/td/actor/core/ActorExecutor.h +++ b/tdactor/td/actor/core/ActorExecutor.h @@ -24,6 +24,7 @@ #include "td/actor/core/ActorMessage.h" #include "td/actor/core/ActorSignals.h" #include "td/actor/core/ActorState.h" +#include "td/actor/core/ActorTypeStat.h" #include "td/actor/core/SchedulerContext.h" #include "td/utils/format.h" @@ -95,6 +96,7 @@ class ActorExecutor { ActorInfo &actor_info_; SchedulerDispatcher &dispatcher_; Options options_; + ActorTypeStatRef actor_stats_; ActorLocker actor_locker_{&actor_info_.state(), ActorLocker::Options() .with_can_execute_paused(options_.from_queue) .with_is_shared(!options_.has_poll) diff --git a/tdactor/td/actor/core/ActorInfo.h b/tdactor/td/actor/core/ActorInfo.h index 97419293..690b06d9 100644 --- a/tdactor/td/actor/core/ActorInfo.h +++ b/tdactor/td/actor/core/ActorInfo.h @@ -19,6 +19,7 @@ #pragma once #include "td/actor/core/ActorState.h" +#include "td/actor/core/ActorTypeStat.h" #include "td/actor/core/ActorMailbox.h" #include "td/utils/Heap.h" @@ -34,8 +35,8 @@ class ActorInfo; using ActorInfoPtr = SharedObjectPool::Ptr; class ActorInfo : private HeapNode, private ListNode { public: - ActorInfo(std::unique_ptr actor, ActorState::Flags state_flags, Slice name) - : actor_(std::move(actor)), name_(name.begin(), name.size()) { + ActorInfo(std::unique_ptr actor, ActorState::Flags state_flags, Slice name, td::uint32 actor_stat_id) + : actor_(std::move(actor)), name_(name.begin(), name.size()), actor_stat_id_(actor_stat_id) { state_.set_flags_unsafe(state_flags); VLOG(actor) << "Create actor [" << name_ << "]"; } @@ -58,6 +59,18 @@ class ActorInfo : private HeapNode, private ListNode { Actor *actor_ptr() const { return actor_.get(); } + // NB: must be called only when actor is locked + ActorTypeStatRef actor_type_stat() { + auto res = ActorTypeStatManager::get_actor_type_stat(actor_stat_id_, actor_.get()); + if (in_queue_since_) { + res.pop_from_queue(in_queue_since_); + in_queue_since_ = 0; + } + return res; + } + void on_add_to_queue() { + in_queue_since_ = td::Clocks::rdtsc(); + } void destroy_actor() { actor_.reset(); } @@ -103,6 +116,8 @@ class ActorInfo : private HeapNode, private ListNode { std::atomic alarm_timestamp_at_{0}; ActorInfoPtr pin_; + td::uint64 in_queue_since_{0}; + td::uint32 actor_stat_id_{0}; }; } // namespace core diff --git a/tdactor/td/actor/core/ActorInfoCreator.h b/tdactor/td/actor/core/ActorInfoCreator.h index 49c7611a..d818dc63 100644 --- a/tdactor/td/actor/core/ActorInfoCreator.h +++ b/tdactor/td/actor/core/ActorInfoCreator.h @@ -46,10 +46,16 @@ class ActorInfoCreator { return *this; } + Options& with_actor_stat_id(td::uint32 new_id) { + actor_stat_id = new_id; + return *this; + } + private: friend class ActorInfoCreator; Slice name; SchedulerId scheduler_id; + td::uint32 actor_stat_id{0}; bool is_shared{true}; bool in_queue{true}; //TODO: rename @@ -65,7 +71,7 @@ class ActorInfoCreator { flags.set_in_queue(args.in_queue); flags.set_signals(ActorSignals::one(ActorSignals::StartUp)); - auto actor_info_ptr = pool_.alloc(std::move(actor), flags, args.name); + auto actor_info_ptr = pool_.alloc(std::move(actor), flags, args.name, args.actor_stat_id); actor_info_ptr->actor().set_actor_info_ptr(actor_info_ptr); return actor_info_ptr; } diff --git a/tdactor/td/actor/core/ActorTypeStat.cpp b/tdactor/td/actor/core/ActorTypeStat.cpp new file mode 100644 index 00000000..e241dd56 --- /dev/null +++ b/tdactor/td/actor/core/ActorTypeStat.cpp @@ -0,0 +1,109 @@ +#include "td/actor/core/Actor.h" +#include "td/actor/core/ActorTypeStat.h" +#include "td/actor/core/Scheduler.h" +#include "td/utils/port/thread_local.h" +#include +#include +#include +#include +#include +#include + +namespace td { +namespace actor { +namespace core { + +class ActorTypeStatRef; +struct ActorTypeStatsTlsEntry { + struct Entry { + std::unique_ptr stat; + std::optional o_type_index; + }; + std::vector by_id; + std::mutex mutex; + + template + void foreach_entry(F &&f) { + std::lock_guard guard(mutex); + for (auto &entry : by_id) { + f(entry); + } + } + ActorTypeStatRef get_actor_type_stat(td::uint32 id, Actor &actor) { + if (id >= by_id.size()) { + std::lock_guard guard(mutex); + by_id.resize(id + 1); + } + auto &entry = by_id.at(id); + if (!entry.o_type_index) { + std::lock_guard guard(mutex); + entry.o_type_index = std::type_index(typeid(actor)); + entry.stat = std::make_unique(); + } + return ActorTypeStatRef{entry.stat.get()}; + } +}; + +struct ActorTypeStatsRegistry { + std::mutex mutex; + std::vector> entries; + void registry_entry(std::shared_ptr entry) { + std::lock_guard guard(mutex); + entries.push_back(std::move(entry)); + } + template + void foreach_entry(F &&f) { + std::lock_guard guard(mutex); + for (auto &entry : entries) { + f(*entry); + } + } +}; + +ActorTypeStatsRegistry registry; + +struct ActorTypeStatsTlsEntryRef { + ActorTypeStatsTlsEntryRef() { + entry_ = std::make_shared(); + registry.registry_entry(entry_); + } + std::shared_ptr entry_; +}; + +static TD_THREAD_LOCAL ActorTypeStatsTlsEntryRef *actor_type_stats_tls_entry = nullptr; + +ActorTypeStatRef ActorTypeStatManager::get_actor_type_stat(td::uint32 id, Actor *actor) { + if (!actor || !need_debug()) { + return ActorTypeStatRef{nullptr}; + } + td::init_thread_local(actor_type_stats_tls_entry); + ActorTypeStatsTlsEntry &tls_entry = *actor_type_stats_tls_entry->entry_; + return tls_entry.get_actor_type_stat(id, *actor); +} + +std::string ActorTypeStatManager::get_class_name(const char *name) { + int status; + char *real_name = abi::__cxa_demangle(name, nullptr, nullptr, &status); + if (status < 0) { + return name; + } + + std::string result = real_name; + std::free(real_name); + return result; +} + +ActorTypeStats ActorTypeStatManager::get_stats(double inv_ticks_per_second) { + std::map stats; + registry.foreach_entry([&](ActorTypeStatsTlsEntry &tls_entry) { + tls_entry.foreach_entry([&](ActorTypeStatsTlsEntry::Entry &entry) { + if (entry.o_type_index) { + stats[entry.o_type_index.value()] += entry.stat->to_stat(inv_ticks_per_second); + } + }); + }); + return ActorTypeStats{.stats = std::move(stats)}; +} +} // namespace core +} // namespace actor +} // namespace td diff --git a/tdactor/td/actor/core/ActorTypeStat.h b/tdactor/td/actor/core/ActorTypeStat.h new file mode 100644 index 00000000..5b0bd18d --- /dev/null +++ b/tdactor/td/actor/core/ActorTypeStat.h @@ -0,0 +1,395 @@ +#pragma once +#include "td/utils/int_types.h" +#include "td/utils/port/Clocks.h" +#include +#include +#include + +namespace td { +namespace actor { +namespace core { +class Actor; + +struct ActorTypeStat { + // diff (speed) + double created{0}; + double executions{0}; + double messages{0}; + double seconds{0}; + + // current statistics + td::int64 alive{0}; + td::int32 executing{0}; + double executing_start{1e20}; + + // max statistics (TODO: recent_max) + template + struct MaxStatGroup { + T value_forever{}; + T value_10s{}; + T value_10m{}; + MaxStatGroup &operator+=(const MaxStatGroup &other) { + value_forever = std::max(value_forever, other.value_forever); + value_10s = std::max(value_10s, other.value_10s); + value_10m = std::max(value_10m, other.value_10m); + return *this; + } + }; + MaxStatGroup max_execute_messages; + MaxStatGroup max_message_seconds; + MaxStatGroup max_execute_seconds; + MaxStatGroup max_delay_seconds; + + ActorTypeStat &operator+=(const ActorTypeStat &other) { + created += other.created; + executions += other.executions; + messages += other.messages; + seconds += other.seconds; + + alive += other.alive; + executing += other.executing; + executing_start = std::min(other.executing_start, executing_start); + + max_execute_messages += other.max_execute_messages; + max_message_seconds += other.max_message_seconds; + max_execute_seconds += other.max_execute_seconds; + max_delay_seconds += other.max_delay_seconds; + return *this; + } + + ActorTypeStat &operator-=(const ActorTypeStat &other) { + created -= other.created; + executions -= other.executions; + messages -= other.messages; + seconds -= other.seconds; + return *this; + } + ActorTypeStat &operator/=(double t) { + if (t > 1e-2) { + created /= t; + executions /= t; + messages /= t; + seconds /= t; + } else { + created = 0; + executions = 0; + messages = 0; + seconds = 0; + } + return *this; + } +}; + +struct ActorTypeStatImpl { + public: + ActorTypeStatImpl() { + } + + class MessageTimer { + public: + MessageTimer(ActorTypeStatImpl *stat, td::uint64 started_at = Clocks::rdtsc()) + : stat_(stat), started_at_(started_at) { + } + MessageTimer(const MessageTimer &) = delete; + MessageTimer(MessageTimer &&) = delete; + MessageTimer &operator=(const MessageTimer &) = delete; + MessageTimer &operator=(MessageTimer &&) = delete; + ~MessageTimer() { + if (stat_) { + auto ts = td::Clocks::rdtsc(); + stat_->message_finish(ts, ts - started_at_); + } + } + + private: + ActorTypeStatImpl *stat_; + td::uint64 started_at_; + }; + void created() { + inc(total_created_); + inc(alive_); + } + void destroyed() { + dec(alive_); + } + MessageTimer create_run_timer() { + return MessageTimer{this}; + } + + void message_finish(td::uint64 ts, td::uint64 ticks) { + inc(total_messages_); + inc(execute_messages_); + add(total_ticks_, ticks); + max_message_ticks_.update(ts, ticks); + } + void on_delay(td::uint64 ts, td::uint64 ticks) { + max_delay_ticks_.update(ts, ticks); + } + + void execute_start(td::uint64 ts) { + // TODO: this is mostly protection for recursive actor calls, which curretly should be almost impossible + // But too full handle it, one would use one executing_cnt per thread, so only upper level execution is counted + if (inc(executing_) == 1) { + store(execute_start_, ts); + store(execute_messages_, 0); + } + } + void execute_finish(td::uint64 ts) { + CHECK(executing_ > 0); + if (dec(executing_) == 0) { + max_execute_messages_.update(ts, load(execute_messages_)); + max_execute_ticks_.update(ts, ts - load(execute_start_)); + + inc(total_executions_); + store(execute_start_, 0); + store(execute_messages_, 0); + } + } + + template + static td::uint32 get_unique_id() { + static td::uint32 value = get_next_unique_id(); + return value; + } + + static td::uint32 get_next_unique_id() { + static std::atomic next_id_{}; + return ++next_id_; + } + ActorTypeStat to_stat(double inv_ticks_per_second) const { + auto execute_start_copy = load(execute_start_); + auto actual_total_ticks = load(total_ticks_); + auto ts = Clocks::rdtsc(); + if (execute_start_copy != 0) { + actual_total_ticks += ts - execute_start_copy; + } + auto execute_start = ticks_to_seconds(load(execute_start_), inv_ticks_per_second); + return ActorTypeStat{.created = double(load(total_created_)), + .executions = double(load(total_executions_)), + .messages = double(load(total_messages_)), + .seconds = ticks_to_seconds(actual_total_ticks, inv_ticks_per_second), + + .alive = load(alive_), + .executing = load(executing_), + .executing_start = execute_start < 1e-9 ? 1e20 : execute_start, + .max_execute_messages = load(max_execute_messages_), + .max_message_seconds = load_seconds(max_message_ticks_, inv_ticks_per_second), + .max_execute_seconds = load_seconds(max_execute_ticks_, inv_ticks_per_second), + .max_delay_seconds = load_seconds(max_delay_ticks_, inv_ticks_per_second)}; + } + + private: + static double ticks_to_seconds(td::uint64 ticks, double inv_tick_per_second) { + return double(ticks) * inv_tick_per_second; + } + + template + static T load(const std::atomic &a) { + return a.load(std::memory_order_relaxed); + } + template + static void store(std::atomic &a, S value) { + a.store(value, std::memory_order_relaxed); + } + template + static T add(std::atomic &a, S value) { + T new_value = load(a) + value; + store(a, new_value); + return new_value; + } + template + static T inc(std::atomic &a) { + return add(a, 1); + } + + template + static T dec(std::atomic &a) { + return add(a, -1); + } + template + static void relax_max(std::atomic &a, T value) { + auto old_value = load(a); + if (value > old_value) { + store(a, value); + } + } + + template + class MaxCounter { + alignas(64) std::atomic max_values[2] = {0}; + std::atomic last_update_segment_time = 0; + + void update_current_segment(uint64 current_segment_time, uint64 segment_difference) { + if (segment_difference >= 2) { + store(max_values[0], 0); + store(max_values[1], 0); + } else if (segment_difference == 1) { + store(max_values[1 - (current_segment_time & 1)], 0); + } + store(last_update_segment_time, current_segment_time); + } + + public: + inline void update(td::uint64 rdtsc, ValueT value) { + auto current_segment_time = rdtsc / (Clocks::rdtsc_frequency() * Interval); + + auto segment_difference = current_segment_time - last_update_segment_time; + + if (unlikely(segment_difference != 0)) { + update_current_segment(current_segment_time, segment_difference); + } + + relax_max(max_values[current_segment_time & 1], value); + } + + inline ValueT get_max(uint64_t rdtsc) const { + uint64_t current_segment_time = rdtsc / (Clocks::rdtsc_frequency() * Interval); + uint64_t segment_difference = current_segment_time - load(last_update_segment_time); + + if (segment_difference >= 2) { + return 0; + } else if (segment_difference == 1) { + return load(max_values[current_segment_time & 1]); + } else { + return std::max(load(max_values[0]), load(max_values[1])); + } + } + }; + + template + struct MaxCounterGroup { + std::atomic max_forever{}; + MaxCounter max_10m; + MaxCounter max_10s; + + inline void update(td::uint64 rdtsc, T value) { + relax_max(max_forever, value); + max_10m.update(rdtsc, value); + max_10s.update(rdtsc, value); + } + }; + template + static ActorTypeStat::MaxStatGroup load(const MaxCounterGroup &src) { + auto ts = Clocks::rdtsc(); + return {.value_forever = load(src.max_forever), + .value_10s = src.max_10s.get_max(ts), + .value_10m = src.max_10m.get_max(ts)}; + } + template + static ActorTypeStat::MaxStatGroup load_seconds(const MaxCounterGroup &src, double inv_ticks_per_second) { + auto ts = Clocks::rdtsc(); + return {.value_forever = ticks_to_seconds(load(src.max_forever), inv_ticks_per_second), + .value_10s = ticks_to_seconds(src.max_10s.get_max(ts), inv_ticks_per_second), + .value_10m = ticks_to_seconds(src.max_10m.get_max(ts), inv_ticks_per_second)}; + } + + // total (increment only statistics) + std::atomic total_created_{0}; + std::atomic total_executions_{0}; + std::atomic total_messages_{0}; + std::atomic total_ticks_{0}; + + // current statistics + std::atomic alive_{0}; + std::atomic executing_{0}; + + // max statistics (TODO: recent_max) + MaxCounterGroup max_execute_messages_; + MaxCounterGroup max_message_ticks_; + MaxCounterGroup max_execute_ticks_; + MaxCounterGroup max_delay_ticks_; + + // execute state + std::atomic execute_start_{0}; + std::atomic execute_messages_{0}; +}; + +class ActorTypeStatRef { + public: + ActorTypeStatImpl *ref_{nullptr}; + + void created() { + if (!ref_) { + return; + } + ref_->created(); + } + void destroyed() { + if (!ref_) { + return; + } + ref_->destroyed(); + } + void pop_from_queue(td::uint64 in_queue_since) { + if (!ref_) { + return; + } + CHECK(in_queue_since); + auto ts = td::Clocks::rdtsc(); + ref_->on_delay(ts, ts - in_queue_since); + } + void start_execute() { + if (!ref_) { + return; + } + ref_->execute_start(td::Clocks::rdtsc()); + } + void finish_execute() { + if (!ref_) { + return; + } + ref_->execute_finish(td::Clocks::rdtsc()); + } + ActorTypeStatImpl::MessageTimer create_message_timer() { + if (!ref_) { + return ActorTypeStatImpl::MessageTimer{nullptr, 0}; + } + return ActorTypeStatImpl::MessageTimer{ref_}; + } + + struct ExecuteTimer { + ExecuteTimer() = delete; + ExecuteTimer(const ExecuteTimer &) = delete; + ExecuteTimer(ExecuteTimer &&) = delete; + ExecuteTimer &operator=(const ExecuteTimer &) = delete; + ExecuteTimer &operator=(ExecuteTimer &&) = delete; + + ExecuteTimer(ActorTypeStatRef *stat) : stat(stat) { + stat->start_execute(); + } + ActorTypeStatRef *stat{}; + ~ExecuteTimer() { + stat->finish_execute(); + } + }; + ExecuteTimer create_execute_timer() { + return ExecuteTimer(this); + } +}; + +// TODO: currently it is implemented via TD_THREAD_LOCAL, so the statistics is global across different schedulers +struct ActorTypeStats { + std::map stats; + ActorTypeStats &operator-=(const ActorTypeStats &other) { + for (auto &it : other.stats) { + stats.at(it.first) -= it.second; + } + return *this; + } + ActorTypeStats &operator/=(double x) { + for (auto &it : stats) { + it.second /= x; + } + return *this; + } +}; +class ActorTypeStatManager { + public: + static ActorTypeStatRef get_actor_type_stat(td::uint32 id, Actor *actor); + static ActorTypeStats get_stats(double inv_ticks_per_second); + static std::string get_class_name(const char *name); +}; + +} // namespace core +} // namespace actor +} // namespace td \ No newline at end of file diff --git a/tdactor/td/actor/core/Scheduler.h b/tdactor/td/actor/core/Scheduler.h index 3de519e0..e76b919e 100644 --- a/tdactor/td/actor/core/Scheduler.h +++ b/tdactor/td/actor/core/Scheduler.h @@ -130,27 +130,20 @@ struct LocalQueue { public: template bool push(T value, F &&overflow_f) { - auto res = std::move(next_); - next_ = std::move(value); - if (res) { - queue_.local_push(res.unwrap(), overflow_f); - return true; - } - return false; - } - bool try_pop(T &message) { - if (!next_) { - return queue_.local_pop(message); - } - message = next_.unwrap(); + queue_.local_push(std::move(value), overflow_f); return true; } - bool steal(T &message, LocalQueue &other) { + bool try_pop(T &message) { + return queue_.local_pop(message); + } + bool steal(T &message, LocalQueue &other) { return queue_.steal(message, other.queue_); } + size_t size() const { + return queue_.size(); + } private: - td::optional next_; StealingQueue queue_; char pad[TD_CONCURRENCY_PAD - sizeof(optional)]; }; @@ -267,11 +260,12 @@ class Scheduler { bool is_stop_requested() override; void stop() override; - private: - SchedulerGroupInfo *scheduler_group() const { + SchedulerGroupInfo *scheduler_group() const override { return scheduler_group_; } + private: + ActorInfoCreator *creator_; SchedulerId scheduler_id_; CpuWorkerId cpu_worker_id_; diff --git a/tdactor/td/actor/core/SchedulerContext.h b/tdactor/td/actor/core/SchedulerContext.h index e46aef7d..99b1922f 100644 --- a/tdactor/td/actor/core/SchedulerContext.h +++ b/tdactor/td/actor/core/SchedulerContext.h @@ -38,6 +38,7 @@ class SchedulerDispatcher { }; struct Debug; +struct SchedulerGroupInfo; class SchedulerContext : public Context, public SchedulerDispatcher { public: virtual ~SchedulerContext() = default; @@ -59,6 +60,7 @@ class SchedulerContext : public Context, public SchedulerDispa // Debug virtual Debug &get_debug() = 0; + virtual SchedulerGroupInfo *scheduler_group() const = 0; }; } // namespace core } // namespace actor diff --git a/tdactor/test/actors_core.cpp b/tdactor/test/actors_core.cpp index 96cd6239..1c56d1e5 100644 --- a/tdactor/test/actors_core.cpp +++ b/tdactor/test/actors_core.cpp @@ -19,15 +19,20 @@ #include "td/actor/core/ActorLocker.h" #include "td/actor/actor.h" #include "td/actor/PromiseFuture.h" +#include "td/actor/ActorStats.h" #include "td/utils/format.h" #include "td/utils/logging.h" +#include "td/utils/misc.h" +#include "td/utils/port/thread.h" #include "td/utils/port/thread.h" #include "td/utils/Random.h" #include "td/utils/Slice.h" #include "td/utils/StringBuilder.h" #include "td/utils/tests.h" #include "td/utils/Time.h" +#include "td/utils/TimedStat.h" +#include "td/utils/port/sleep.h" #include #include @@ -1102,4 +1107,59 @@ TEST(Actor2, send_vs_close2) { scheduler.run(); } } + +TEST(Actor2, test_stats) { + Scheduler scheduler({8}); + td::actor::set_debug(true); + + auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); }); + scheduler.run_in_context([watcher = std::move(watcher)] { + class SleepWorker : public Actor { + void loop() override { + // 0.8 load + td::usleep_for(800000); + alarm_timestamp() = td::Timestamp::in(0.2); + } + }; + class QueueWorker : public Actor { + void loop() override { + for (int i = 0; i < 20; i++) { + send_closure(actor_id(this), &QueueWorker::ping); + } + alarm_timestamp() = td::Timestamp::in(1.0); + } + void ping() { + } + }; + class Master : public Actor { + public: + Master(std::shared_ptr watcher) : watcher_(std::move(watcher)) { + } + void start_up() override { + alarm_timestamp() = td::Timestamp::in(1); + stats_ = td::actor::create_actor("actor_stats"); + td::actor::create_actor("sleep_worker").release(); + td::actor::create_actor("queue_worker").release(); + } + void alarm() override { + td::actor::send_closure(stats_, &ActorStats::prepare_stats, td::promise_send_closure(actor_id(this), &Master::on_stats)); + alarm_timestamp() = td::Timestamp::in(5); + } + void on_stats(td::Result r_stats) { + LOG(ERROR) << "\n" << r_stats.ok(); + if (--cnt_ == 0) { + stop(); + } + } + + private: + std::shared_ptr watcher_; + td::actor::ActorOwn stats_; + int cnt_={2}; + }; + td::actor::create_actor("Master", watcher).release(); + }); + + scheduler.run(); +} #endif //!TD_THREAD_UNSUPPORTED diff --git a/tdfec/td/fec/raptorq/Solver.cpp b/tdfec/td/fec/raptorq/Solver.cpp index 02d8102f..772271fa 100644 --- a/tdfec/td/fec/raptorq/Solver.cpp +++ b/tdfec/td/fec/raptorq/Solver.cpp @@ -19,6 +19,7 @@ #include "td/fec/raptorq/Solver.h" #include "td/fec/algebra/GaussianElimination.h" #include "td/fec/algebra/InactivationDecoding.h" +#include "td/utils/ThreadSafeCounter.h" #include "td/utils/Timer.h" #include @@ -70,6 +71,7 @@ Result Solver::run(const Rfc::Parameters &p, Span symbol auto C = GaussianElimination::run(std::move(A), std::move(D)); return C; } + TD_PERF_COUNTER(raptor_solve); PerfWarningTimer x("solve"); Timer timer; auto perf_log = [&](Slice message) { diff --git a/tdutils/td/utils/StealingQueue.h b/tdutils/td/utils/StealingQueue.h index 20a39dc0..86c2f0ef 100644 --- a/tdutils/td/utils/StealingQueue.h +++ b/tdutils/td/utils/StealingQueue.h @@ -115,6 +115,22 @@ class StealingQueue { std::atomic_thread_fence(std::memory_order_seq_cst); } + size_t size() const { + while (true) { + auto head = head_.load(); + auto tail = tail_.load(std::memory_order_acquire); + + if (tail < head) { + continue; + } + size_t n = tail - head; + if (n > N) { + continue; + } + return n; + } + } + private: std::atomic head_{0}; std::atomic tail_{0}; diff --git a/tdutils/td/utils/ThreadSafeCounter.h b/tdutils/td/utils/ThreadSafeCounter.h index 55bf94b5..aa976b2f 100644 --- a/tdutils/td/utils/ThreadSafeCounter.h +++ b/tdutils/td/utils/ThreadSafeCounter.h @@ -137,4 +137,55 @@ class NamedThreadSafeCounter { Counter counter_; }; +// another class for simplicity, it +struct NamedPerfCounter { + public: + static NamedPerfCounter &get_default() { + static NamedPerfCounter res; + return res; + } + struct PerfCounterRef { + NamedThreadSafeCounter::CounterRef count; + NamedThreadSafeCounter::CounterRef duration; + }; + PerfCounterRef get_counter(Slice name) { + return {.count = counter_.get_counter(PSLICE() << name << ".count"), + .duration = counter_.get_counter(PSLICE() << name << ".duration")}; + } + + struct ScopedPerfCounterRef : public NoCopyOrMove { + PerfCounterRef perf_counter; + uint64 started_at_ticks{td::Clocks::rdtsc()}; + + ~ScopedPerfCounterRef() { + perf_counter.count.add(1); + perf_counter.duration.add(td::Clocks::rdtsc() - started_at_ticks); + } + }; + + template + void for_each(F &&f) const { + counter_.for_each(f); + } + + void clear() { + counter_.clear(); + } + + friend StringBuilder &operator<<(StringBuilder &sb, const NamedPerfCounter &counter) { + return sb << counter.counter_; + } + private: + NamedThreadSafeCounter counter_; +}; + } // namespace td + +#define TD_PERF_COUNTER(name) \ + static auto perf_##name = td::NamedPerfCounter::get_default().get_counter(td::Slice(#name)); \ + auto scoped_perf_##name = td::NamedPerfCounter::ScopedPerfCounterRef{.perf_counter = perf_##name}; + +#define TD_PERF_COUNTER_SINCE(name, since) \ + static auto perf_##name = td::NamedPerfCounter::get_default().get_counter(td::Slice(#name)); \ + auto scoped_perf_##name = \ + td::NamedPerfCounter::ScopedPerfCounterRef{.perf_counter = perf_##name, .started_at_ticks = since}; diff --git a/tdutils/td/utils/common.h b/tdutils/td/utils/common.h index 79d3dc52..f9f86c5c 100644 --- a/tdutils/td/utils/common.h +++ b/tdutils/td/utils/common.h @@ -127,4 +127,12 @@ struct Auto { } }; +struct NoCopyOrMove { + NoCopyOrMove() = default; + NoCopyOrMove(NoCopyOrMove &&) = delete; + NoCopyOrMove(const NoCopyOrMove &) = delete; + NoCopyOrMove &operator=(NoCopyOrMove &&) = delete; + NoCopyOrMove &operator=(const NoCopyOrMove &) = delete; +}; + } // namespace td diff --git a/tdutils/td/utils/logging.cpp b/tdutils/td/utils/logging.cpp index 477823b8..345615f1 100644 --- a/tdutils/td/utils/logging.cpp +++ b/tdutils/td/utils/logging.cpp @@ -18,6 +18,7 @@ */ #include "td/utils/logging.h" +#include "ThreadSafeCounter.h" #include "td/utils/port/Clocks.h" #include "td/utils/port/StdStreams.h" #include "td/utils/port/thread_local.h" @@ -127,6 +128,9 @@ Logger::~Logger() { slice = MutableCSlice(slice.begin(), slice.begin() + slice.size() - 1); } log_.append(slice, log_level_); + + // put stats here to avoid conflict with PSTRING and PSLICE + TD_PERF_COUNTER_SINCE(logger, start_at_); } else { log_.append(as_cslice(), log_level_); } @@ -301,5 +305,4 @@ ScopedDisableLog::~ScopedDisableLog() { set_verbosity_level(sdl_verbosity); } } - } // namespace td diff --git a/tdutils/td/utils/logging.h b/tdutils/td/utils/logging.h index e5278b4b..d00fba15 100644 --- a/tdutils/td/utils/logging.h +++ b/tdutils/td/utils/logging.h @@ -40,6 +40,7 @@ #include "td/utils/Slice.h" #include "td/utils/StackAllocator.h" #include "td/utils/StringBuilder.h" +#include "td/utils/port/Clocks.h" #include #include @@ -251,7 +252,8 @@ class Logger { , log_(log) , sb_(buffer_.as_slice()) , options_(options) - , log_level_(log_level) { + , log_level_(log_level) + , start_at_(Clocks::rdtsc()) { } Logger(LogInterface &log, const LogOptions &options, int log_level, Slice file_name, int line_num, Slice comment); @@ -283,6 +285,7 @@ class Logger { StringBuilder sb_; const LogOptions &options_; int log_level_; + td::uint64 start_at_; }; namespace detail { @@ -346,5 +349,4 @@ class TsLog : public LogInterface { lock_.clear(std::memory_order_release); } }; - } // namespace td diff --git a/tdutils/td/utils/port/Clocks.h b/tdutils/td/utils/port/Clocks.h index 2d7d9e0f..7640e216 100644 --- a/tdutils/td/utils/port/Clocks.h +++ b/tdutils/td/utils/port/Clocks.h @@ -17,6 +17,7 @@ Copyright 2017-2020 Telegram Systems LLP */ #pragma once +#include "td/utils/int_types.h" namespace td { @@ -26,6 +27,62 @@ struct Clocks { static double system(); static int tz_offset(); + +#if defined(__i386__) + static __inline__ td::uint64 rdtsc(void) { + unsigned long long int x; + __asm__ volatile("rdtsc" : "=A"(x)); + return x; + } + + static constexpr td::uint64 rdtsc_frequency(void) { + return 2000'000'000; + } + + static constexpr double ticks_per_second() { + return 2e9; + } + + static constexpr double inv_ticks_per_second() { + return 0.5e-9; + } +#elif defined(__x86_64__) + static __inline__ td::uint64 rdtsc(void) { + unsigned hi, lo; + __asm__ __volatile__("rdtsc" : "=a"(lo), "=d"(hi)); + return ((unsigned long long)lo) | (((unsigned long long)hi) << 32); + } + static constexpr td::uint64 rdtsc_frequency(void) { + return 2000'000'000; + } + + static constexpr double ticks_per_second() { + return 2e9; + } + + static constexpr double inv_ticks_per_second() { + return 0.5e-9; + } +#elif defined(__aarch64__) + static __inline__ td::uint64 rdtsc(void) { + unsigned long long val; + asm volatile("mrs %0, cntvct_el0" : "=r"(val)); + return val; + } + static __inline__ td::uint64 rdtsc_frequency(void) { + unsigned long long val; + asm volatile("mrs %0, cntfrq_el0" : "=r"(val)); + return val; + } + + static double ticks_per_second() { + return static_cast(rdtsc_frequency()); + } + + static double inv_ticks_per_second() { + return 1.0 / static_cast(rdtsc_frequency()); + } +#endif }; } // namespace td diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 89b5763e..65bdf71a 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -668,6 +668,7 @@ engine.validator.signature signature:bytes = engine.validator.Signature; engine.validator.oneStat key:string value:string = engine.validator.OneStat; engine.validator.stats stats:(vector engine.validator.oneStat) = engine.validator.Stats; +engine.validator.textStats data:string = engine.validator.TextStats; engine.validator.controlQueryError code:int message:string = engine.validator.ControlQueryError; @@ -758,6 +759,7 @@ engine.validator.setCollatorOptionsJson json:string = engine.validator.Success; engine.validator.getCollatorOptionsJson = engine.validator.JsonConfig; engine.validator.getAdnlStats = adnl.Stats; +engine.validator.getActorTextStats = engine.validator.TextStats; ---types--- diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index 9efac72a..790d2df2 100644 Binary files a/tl/generate/scheme/ton_api.tlo and b/tl/generate/scheme/ton_api.tlo differ diff --git a/validator-engine-console/validator-engine-console-query.cpp b/validator-engine-console/validator-engine-console-query.cpp index 372fa812..baa39f8e 100644 --- a/validator-engine-console/validator-engine-console-query.cpp +++ b/validator-engine-console/validator-engine-console-query.cpp @@ -1008,6 +1008,32 @@ td::Status ImportShardOverlayCertificateQuery::receive(td::BufferSlice data) { td::TerminalIO::out() << "successfully sent certificate to overlay manager\n"; return td::Status::OK(); } +td::Status GetActorStatsQuery::run() { + auto r_file_name = tokenizer_.get_token(); + if (r_file_name.is_ok()) { + file_name_ = r_file_name.move_as_ok(); + } + return td::Status::OK(); +} +td::Status GetActorStatsQuery::send() { + auto b = ton::create_serialize_tl_object(); + td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); + return td::Status::OK(); +} + +td::Status GetActorStatsQuery::receive(td::BufferSlice data) { + TRY_RESULT_PREFIX(f, ton::fetch_tl_object(data.as_slice(), true), + "received incorrect answer: "); + if (file_name_.empty()) { + td::TerminalIO::out() << f->data_; + } else { + std::ofstream sb(file_name_); + sb << f->data_; + sb << std::flush; + td::TerminalIO::output(std::string("wrote stats to " + file_name_ + "\n")); + } + return td::Status::OK(); +} td::Status GetPerfTimerStatsJsonQuery::run() { TRY_RESULT_ASSIGN(file_name_, tokenizer_.get_token()); diff --git a/validator-engine-console/validator-engine-console-query.h b/validator-engine-console/validator-engine-console-query.h index 6314d619..c3c02150 100644 --- a/validator-engine-console/validator-engine-console-query.h +++ b/validator-engine-console/validator-engine-console-query.h @@ -1076,6 +1076,28 @@ class ImportShardOverlayCertificateQuery : public Query { std::string in_file_; }; +class GetActorStatsQuery : public Query { + public: + GetActorStatsQuery(td::actor::ActorId console, Tokenizer tokenizer) + : Query(console, std::move(tokenizer)) { + } + td::Status run() override; + td::Status send() override; + td::Status receive(td::BufferSlice data) override; + static std::string get_name() { + return "getactorstats"; + } + static std::string get_help() { + return "getactorstats []\tget actor stats and print it either in stdout or in "; + } + std::string name() const override { + return get_name(); + } + + private: + std::string file_name_; +}; + class GetPerfTimerStatsJsonQuery : public Query { public: GetPerfTimerStatsJsonQuery(td::actor::ActorId console, Tokenizer tokenizer) diff --git a/validator-engine-console/validator-engine-console.cpp b/validator-engine-console/validator-engine-console.cpp index 1ec0f380..59e2f2e8 100644 --- a/validator-engine-console/validator-engine-console.cpp +++ b/validator-engine-console/validator-engine-console.cpp @@ -140,6 +140,7 @@ void ValidatorEngineConsole::run() { add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); + add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index bb757410..494077d8 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -3540,6 +3540,31 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getOverla }); } +void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getActorTextStats &query, td::BufferSlice data, + ton::PublicKeyHash src, td::uint32 perm, td::Promise promise) { + if (!(perm & ValidatorEnginePermissions::vep_default)) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); + return; + } + + if (validator_manager_.empty()) { + promise.set_value( + create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "validator manager not started"))); + return; + } + + auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_control_query_error(R.move_as_error())); + } else { + auto r = R.move_as_ok(); + promise.set_value(ton::create_serialize_tl_object(std::move(r))); + } + }); + td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::prepare_actor_stats, + std::move(P)); +} + void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getPerfTimerStats &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise) { if (!(perm & ValidatorEnginePermissions::vep_default)) { @@ -4319,7 +4344,9 @@ int main(int argc, char *argv[]) { } if (need_scheduler_status_flag.exchange(false)) { LOG(ERROR) << "DUMPING SCHEDULER STATISTICS"; - scheduler.get_debug().dump(); + td::StringBuilder sb; + scheduler.get_debug().dump(sb); + LOG(ERROR) << "GOT SCHEDULER STATISTICS\n" << sb.as_cslice(); } if (rotate_logs_flags.exchange(false)) { if (td::log_interface) { diff --git a/validator-engine/validator-engine.hpp b/validator-engine/validator-engine.hpp index 2e94dd1e..1bfae12a 100644 --- a/validator-engine/validator-engine.hpp +++ b/validator-engine/validator-engine.hpp @@ -474,6 +474,8 @@ class ValidatorEngine : public td::actor::Actor { ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); void run_control_query(ton::ton_api::engine_validator_getOverlaysStats &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); + void run_control_query(ton::ton_api::engine_validator_getActorTextStats &query, td::BufferSlice data, + ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); void run_control_query(ton::ton_api::engine_validator_getPerfTimerStats &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); void run_control_query(ton::ton_api::engine_validator_getShardOutQueueSize &query, td::BufferSlice data, diff --git a/validator/db/celldb.cpp b/validator/db/celldb.cpp index 1701ae58..5a2466ec 100644 --- a/validator/db/celldb.cpp +++ b/validator/db/celldb.cpp @@ -142,6 +142,7 @@ void CellDbIn::load_cell(RootHash hash, td::Promise> promi } void CellDbIn::store_cell(BlockIdExt block_id, td::Ref cell, td::Promise> promise) { + TD_PERF_COUNTER(celldb_store_cell); td::PerfWarningTimer timer{"storecell", 0.1}; auto key_hash = get_key_hash(block_id); auto R = get_block(key_hash); @@ -284,6 +285,7 @@ void CellDbIn::gc_cont(BlockHandle handle) { } void CellDbIn::gc_cont2(BlockHandle handle) { + TD_PERF_COUNTER(celldb_gc_cell); td::PerfWarningTimer timer{"gccell", 0.1}; auto key_hash = get_key_hash(handle->id()); diff --git a/validator/downloaders/wait-block-state.cpp b/validator/downloaders/wait-block-state.cpp index 0ae82bea..f8d2cdcb 100644 --- a/validator/downloaders/wait-block-state.cpp +++ b/validator/downloaders/wait-block-state.cpp @@ -230,6 +230,7 @@ void WaitBlockState::got_block_data(td::Ref data) { } void WaitBlockState::apply() { + TD_PERF_COUNTER(apply_block_to_state); td::PerfWarningTimer t{"applyblocktostate", 0.1}; auto S = prev_state_.write().apply_block(handle_->id(), block_); if (S.is_error()) { diff --git a/validator/impl/shard.cpp b/validator/impl/shard.cpp index 0a710b5f..0ab216f7 100644 --- a/validator/impl/shard.cpp +++ b/validator/impl/shard.cpp @@ -290,6 +290,7 @@ td::Result, td::Ref>> ShardStateQ::spl } td::Result ShardStateQ::serialize() const { + TD_PERF_COUNTER(serialize_state); td::PerfWarningTimer perf_timer_{"serializestate", 0.1}; if (!data.is_null()) { return data.clone(); @@ -314,6 +315,7 @@ td::Result ShardStateQ::serialize() const { } td::Status ShardStateQ::serialize_to_file(td::FileFd& fd) const { + TD_PERF_COUNTER(serialize_state_to_file); td::PerfWarningTimer perf_timer_{"serializestate", 0.1}; if (!data.is_null()) { auto cur_data = data.clone(); diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index 2f80e28c..e15def2a 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -370,6 +370,10 @@ class ValidatorManagerImpl : public ValidatorManager { UNREACHABLE(); } + void prepare_actor_stats(td::Promise promise) override { + UNREACHABLE(); + } + void prepare_perf_timer_stats(td::Promise> promise) override { UNREACHABLE(); } diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 648a4a1b..a43d4a70 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -432,6 +432,10 @@ class ValidatorManagerImpl : public ValidatorManager { UNREACHABLE(); } + void prepare_actor_stats(td::Promise promise) override { + UNREACHABLE(); + } + void prepare_perf_timer_stats(td::Promise> promise) override { UNREACHABLE(); } diff --git a/validator/manager.cpp b/validator/manager.cpp index cb53f105..ec6221a0 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1622,6 +1622,7 @@ void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast, bool c void ValidatorManagerImpl::start_up() { db_ = create_db_actor(actor_id(this), db_root_, opts_); + actor_stats_ = td::actor::create_actor("actor_stats"); lite_server_cache_ = create_liteserver_cache_actor(actor_id(this), db_root_); token_manager_ = td::actor::create_actor("tokenmanager"); td::mkdir(db_root_ + "/tmp/").ensure(); @@ -2771,6 +2772,10 @@ void ValidatorManagerImpl::send_peek_key_block_request() { send_get_next_key_blocks_request(last_known_key_block_handle_->id(), 1, std::move(P)); } +void ValidatorManagerImpl::prepare_actor_stats(td::Promise promise) { + send_closure(actor_stats_, &td::actor::ActorStats::prepare_stats, std::move(promise)); +} + void ValidatorManagerImpl::prepare_stats(td::Promise>> promise) { auto merger = StatsMerger::create(std::move(promise)); diff --git a/validator/manager.hpp b/validator/manager.hpp index 28cba970..28949837 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -21,6 +21,7 @@ #include "common/refcnt.hpp" #include "interfaces/validator-manager.h" #include "interfaces/db.h" +#include "td/actor/ActorStats.h" #include "td/actor/PromiseFuture.h" #include "td/utils/SharedSlice.h" #include "td/utils/buffer.h" @@ -582,6 +583,8 @@ class ValidatorManagerImpl : public ValidatorManager { void prepare_stats(td::Promise>> promise) override; + void prepare_actor_stats(td::Promise promise) override; + void prepare_perf_timer_stats(td::Promise> promise) override; void add_perf_timer_stat(std::string name, double duration) override; @@ -678,6 +681,7 @@ class ValidatorManagerImpl : public ValidatorManager { private: std::unique_ptr callback_; td::actor::ActorOwn db_; + td::actor::ActorOwn actor_stats_; bool started_ = false; bool allow_validate_ = false; diff --git a/validator/validator.h b/validator/validator.h index 1a78d229..e51836bb 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -275,6 +275,7 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void run_ext_query(td::BufferSlice data, td::Promise promise) = 0; virtual void prepare_stats(td::Promise>> promise) = 0; + virtual void prepare_actor_stats(td::Promise promise) = 0; virtual void prepare_perf_timer_stats(td::Promise> promise) = 0; virtual void add_perf_timer_stat(std::string name, double duration) = 0;