diff --git a/tdutils/td/utils/Time.h b/tdutils/td/utils/Time.h index ece822d4..c7795ae4 100644 --- a/tdutils/td/utils/Time.h +++ b/tdutils/td/utils/Time.h @@ -128,6 +128,10 @@ inline Timestamp &operator+=(Timestamp &a, double b) { return a; } +inline double operator-(const Timestamp &a, const Timestamp &b) { + return a.at() - b.at(); +} + template void store(const Timestamp ×tamp, StorerT &storer) { storer.store_binary(timestamp.at() - Time::now() + Clocks::system()); diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index cc7c57b3..81b8278f 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -1957,7 +1957,8 @@ void ValidatorEngine::started_overlays() { void ValidatorEngine::start_validator() { validator_options_.write().set_allow_blockchain_init(config_.validators.size() > 0); - validator_options_.write().set_state_serializer_enabled(config_.state_serializer_enabled); + validator_options_.write().set_state_serializer_enabled(config_.state_serializer_enabled && + !state_serializer_disabled_flag_); load_collator_options(); validator_manager_ = ton::validator::ValidatorManagerFactory::create( @@ -3973,7 +3974,7 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setStateS promise.set_value(ton::create_serialize_tl_object()); return; } - validator_options_.write().set_state_serializer_enabled(query.enabled_); + validator_options_.write().set_state_serializer_enabled(query.enabled_ && !state_serializer_disabled_flag_); td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::update_options, validator_options_); config_.state_serializer_enabled = query.enabled_; @@ -4556,6 +4557,11 @@ int main(int argc, char *argv[]) { td::actor::send_closure(x, &ValidatorEngine::set_validator_telemetry_filename, s); }); }); + p.add_option( + '\0', "disable-state-serializer", + "disable persistent state serializer (similar to set-state-serializer-enabled 0 in validator console)", [&]() { + acts.push_back([&x]() { td::actor::send_closure(x, &ValidatorEngine::set_state_serializer_disabled_flag); }); + }); auto S = p.run(argc, argv); if (S.is_error()) { LOG(ERROR) << "failed to parse options: " << S.move_as_error(); diff --git a/validator-engine/validator-engine.hpp b/validator-engine/validator-engine.hpp index b7abb0b1..6c2f5c4b 100644 --- a/validator-engine/validator-engine.hpp +++ b/validator-engine/validator-engine.hpp @@ -228,6 +228,7 @@ class ValidatorEngine : public td::actor::Actor { std::string validator_telemetry_filename_; bool not_all_shards_ = false; std::vector add_shard_cmds_; + bool state_serializer_disabled_flag_ = false; std::set unsafe_catchains_; std::map> unsafe_catchain_rotations_; @@ -325,6 +326,9 @@ class ValidatorEngine : public td::actor::Actor { void add_shard_cmd(ton::ShardIdFull shard) { add_shard_cmds_.push_back(shard); } + void set_state_serializer_disabled_flag() { + state_serializer_disabled_flag_ = true; + } void start_up() override; ValidatorEngine() { diff --git a/validator/db/archive-manager.cpp b/validator/db/archive-manager.cpp index d349f9d8..8c7cde17 100644 --- a/validator/db/archive-manager.cpp +++ b/validator/db/archive-manager.cpp @@ -1196,6 +1196,30 @@ void ArchiveManager::set_async_mode(bool mode, td::Promise promise) { } } +void ArchiveManager::prepare_stats(td::Promise>> promise) { + std::vector> stats; + { + std::map states; + for (auto &[key, file] : perm_states_) { + BlockSeqno seqno = key.first; + auto r_stat = td::stat(db_root_ + "/archive/states/" + file.filename_short()); + if (r_stat.is_error()) { + LOG(WARNING) << "Cannot stat persistent state file " << file.filename_short() << " : " << r_stat.move_as_error(); + } else { + states[seqno] += r_stat.move_as_ok().size_; + } + } + td::StringBuilder sb; + for (auto &[seqno, size] : states) { + sb << seqno << ":" << td::format::as_size(size) << " "; + } + if (!sb.as_cslice().empty()) { + stats.emplace_back("persistent_states", sb.as_cslice().str()); + } + } + promise.set_value(std::move(stats)); +} + void ArchiveManager::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise promise) { index_->begin_transaction().ensure(); td::MultiPromise mp; diff --git a/validator/db/archive-manager.hpp b/validator/db/archive-manager.hpp index 90fc6a0b..d919e32e 100644 --- a/validator/db/archive-manager.hpp +++ b/validator/db/archive-manager.hpp @@ -81,6 +81,8 @@ class ArchiveManager : public td::actor::Actor { cur_shard_split_depth_ = value; } + void prepare_stats(td::Promise>> promise); + static constexpr td::uint32 archive_size() { return 20000; } diff --git a/validator/db/celldb.cpp b/validator/db/celldb.cpp index 9dcecdb3..e86a373d 100644 --- a/validator/db/celldb.cpp +++ b/validator/db/celldb.cpp @@ -158,6 +158,17 @@ void CellDbIn::start_up() { }, td::Timestamp::now()); } + + { + std::string key = "stats.last_deleted_mc_seqno", value; + auto R = cell_db_->get(td::as_slice(key), value); + R.ensure(); + if (R.ok() == td::KeyValue::GetStatus::Ok) { + auto r_value = td::to_integer_safe(value); + r_value.ensure(); + last_deleted_mc_state_ = r_value.move_as_ok(); + } + } } void CellDbIn::load_cell(RootHash hash, td::Promise> promise) { @@ -452,6 +463,11 @@ void CellDbIn::gc_cont2(BlockHandle handle) { cell_db_->erase(get_key(key_hash)).ensure(); set_block(F.prev, std::move(P)); set_block(F.next, std::move(N)); + if (handle->id().is_masterchain()) { + last_deleted_mc_state_ = handle->id().seqno(); + std::string key = "stats.last_deleted_mc_seqno", value = td::to_string(last_deleted_mc_state_); + cell_db_->set(td::as_slice(key), td::as_slice(value)); + } cell_db_->commit_write_batch().ensure(); alarm_timestamp() = td::Timestamp::now(); timer_write_batch.reset(); @@ -475,9 +491,6 @@ void CellDbIn::gc_cont2(BlockHandle handle) { if (!opts_->get_disable_rocksdb_stats()) { cell_db_statistics_.gc_cell_time_.insert(timer.elapsed() * 1e6); } - if (handle->id().is_masterchain()) { - last_deleted_mc_state_ = handle->id().seqno(); - } LOG(DEBUG) << "Deleted state " << handle->id().to_str(); timer_finish.reset(); timer_all.reset(); diff --git a/validator/db/rootdb.cpp b/validator/db/rootdb.cpp index e0579d57..8d83e7a7 100644 --- a/validator/db/rootdb.cpp +++ b/validator/db/rootdb.cpp @@ -438,6 +438,7 @@ void RootDb::allow_block_gc(BlockIdExt block_id, td::Promise promise) { void RootDb::prepare_stats(td::Promise>> promise) { auto merger = StatsMerger::create(std::move(promise)); td::actor::send_closure(cell_db_, &CellDb::prepare_stats, merger.make_promise("celldb.")); + td::actor::send_closure(archive_db_, &ArchiveManager::prepare_stats, merger.make_promise("archive.")); } void RootDb::truncate(BlockSeqno seqno, ConstBlockHandle handle, td::Promise promise) { diff --git a/validator/downloaders/download-state.cpp b/validator/downloaders/download-state.cpp index 32978ea5..8473cb22 100644 --- a/validator/downloaders/download-state.cpp +++ b/validator/downloaders/download-state.cpp @@ -38,6 +38,7 @@ DownloadShardState::DownloadShardState(BlockIdExt block_id, BlockIdExt mastercha } void DownloadShardState::start_up() { + status_ = ProcessStatus(manager_, "process.download_state"); alarm_timestamp() = timeout_; auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result R) { @@ -81,6 +82,7 @@ void DownloadShardState::download_state() { }); td::actor::send_closure(manager_, &ValidatorManager::send_get_block_proof_link_request, block_id_, priority_, std::move(P)); + status_.set_status(PSTRING() << block_id_.id.to_str() << " : downloading proof"); } void DownloadShardState::downloaded_proof_link(td::BufferSlice data) { @@ -123,6 +125,7 @@ void DownloadShardState::checked_proof_link() { td::actor::send_closure(manager_, &ValidatorManager::send_get_persistent_state_request, block_id_, masterchain_block_id_, priority_, std::move(P)); } + status_.set_status(PSTRING() << block_id_.id.to_str() << " : downloading state"); } void DownloadShardState::download_zero_state() { @@ -152,6 +155,7 @@ void DownloadShardState::downloaded_zero_state(td::BufferSlice data) { } void DownloadShardState::downloaded_shard_state(td::BufferSlice data) { + status_.set_status(PSTRING() << block_id_.id.to_str() << " : processing downloaded state"); auto S = create_shard_state(block_id_, data.clone()); if (S.is_error()) { fail_handler(actor_id(this), S.move_as_error()); @@ -174,6 +178,7 @@ void DownloadShardState::downloaded_shard_state(td::BufferSlice data) { } void DownloadShardState::checked_shard_state() { + status_.set_status(PSTRING() << block_id_.id.to_str() << " : storing state file"); LOG(WARNING) << "checked shard state " << block_id_.to_str(); auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result R) { R.ensure(); @@ -189,6 +194,7 @@ void DownloadShardState::checked_shard_state() { } void DownloadShardState::written_shard_state_file() { + status_.set_status(PSTRING() << block_id_.id.to_str() << " : storing state to celldb"); LOG(WARNING) << "written shard state file " << block_id_.to_str(); auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { R.ensure(); @@ -198,6 +204,7 @@ void DownloadShardState::written_shard_state_file() { } void DownloadShardState::written_shard_state(td::Ref state) { + status_.set_status(PSTRING() << block_id_.id.to_str() << " : finishing"); state_ = std::move(state); handle_->set_unix_time(state_->get_unix_time()); handle_->set_is_key_block(block_id_.is_masterchain()); diff --git a/validator/downloaders/download-state.hpp b/validator/downloaders/download-state.hpp index 02984c53..bde80aae 100644 --- a/validator/downloaders/download-state.hpp +++ b/validator/downloaders/download-state.hpp @@ -19,6 +19,7 @@ #pragma once #include "validator/interfaces/validator-manager.h" +#include "stats-provider.h" namespace ton { @@ -67,6 +68,8 @@ class DownloadShardState : public td::actor::Actor { td::BufferSlice data_; td::Ref state_; + + ProcessStatus status_; }; } // namespace validator diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index a781968d..ce21bc5e 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -50,7 +50,7 @@ class Collator final : public td::actor::Actor { using LtCellRef = block::LtCellRef; using NewOutMsg = block::NewOutMsg; const ShardIdFull shard_; - ton::BlockId new_id; + ton::BlockId new_id{workchainInvalid, 0, 0}; bool busy_{false}; bool before_split_{false}; bool after_split_{false}; diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index d3378cd8..d5c41853 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -352,6 +352,8 @@ bool Collator::fatal_error(td::Status error) { attempt_idx_ + 1); } else { main_promise(std::move(error)); + td::actor::send_closure(manager, &ValidatorManager::record_collate_query_stats, BlockIdExt{new_id, RootHash::zero(), FileHash::zero()}, + work_timer_.elapsed(), cpu_work_timer_.elapsed(), td::optional{}); } busy_ = false; } diff --git a/validator/impl/liteserver.cpp b/validator/impl/liteserver.cpp index 723dbfe9..83f39c45 100644 --- a/validator/impl/liteserver.cpp +++ b/validator/impl/liteserver.cpp @@ -85,19 +85,13 @@ void LiteQuery::abort_query(td::Status reason) { if (acc_state_promise_) { acc_state_promise_.set_error(std::move(reason)); } else if (promise_) { + td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, query_obj_ ? query_obj_->get_id() : 0, + false); promise_.set_error(std::move(reason)); } stop(); } -void LiteQuery::abort_query_ext(td::Status reason, std::string comment) { - LOG(INFO) << "aborted liteserver query: " << comment << " : " << reason.to_string(); - if (promise_) { - promise_.set_error(reason.move_as_error_prefix(comment + " : ")); - } - stop(); -} - bool LiteQuery::fatal_error(td::Status error) { abort_query(std::move(error)); return false; @@ -120,6 +114,8 @@ bool LiteQuery::finish_query(td::BufferSlice result, bool skip_cache_update) { td::actor::send_closure(cache_, &LiteServerCache::update, cache_key_, result.clone()); } if (promise_) { + td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, query_obj_ ? query_obj_->get_id() : 0, + true); promise_.set_result(std::move(result)); stop(); return true; @@ -139,7 +135,6 @@ void LiteQuery::start_up() { auto F = fetch_tl_object(query_, true); if (F.is_error()) { - td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, 0); // unknown abort_query(F.move_as_error()); return; } @@ -192,7 +187,6 @@ bool LiteQuery::use_cache() { } void LiteQuery::perform() { - td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, query_obj_->get_id()); lite_api::downcast_call( *query_obj_, td::overloaded( diff --git a/validator/impl/liteserver.hpp b/validator/impl/liteserver.hpp index 447e1dad..fc873533 100644 --- a/validator/impl/liteserver.hpp +++ b/validator/impl/liteserver.hpp @@ -97,7 +97,6 @@ class LiteQuery : public td::actor::Actor { bool fatal_error(std::string err_msg, int err_code = -400); bool fatal_error(int err_code, std::string err_msg = ""); void abort_query(td::Status reason); - void abort_query_ext(td::Status reason, std::string err_msg); bool finish_query(td::BufferSlice result, bool skip_cache_update = false); void alarm() override; void start_up() override; diff --git a/validator/impl/validate-query.cpp b/validator/impl/validate-query.cpp index 583b1d86..31c30e90 100644 --- a/validator/impl/validate-query.cpp +++ b/validator/impl/validate-query.cpp @@ -115,7 +115,7 @@ bool ValidateQuery::reject_query(std::string error, td::BufferSlice reason) { error = error_ctx() + error; LOG(ERROR) << "REJECT: aborting validation of block candidate for " << shard_.to_str() << " : " << error; if (main_promise) { - record_stats(); + record_stats(false); errorlog::ErrorLog::log(PSTRING() << "REJECT: aborting validation of block candidate for " << shard_.to_str() << " : " << error << ": data=" << block_candidate.id.file_hash.to_hex() << " collated_data=" << block_candidate.collated_file_hash.to_hex()); @@ -153,7 +153,7 @@ bool ValidateQuery::soft_reject_query(std::string error, td::BufferSlice reason) error = error_ctx() + error; LOG(ERROR) << "SOFT REJECT: aborting validation of block candidate for " << shard_.to_str() << " : " << error; if (main_promise) { - record_stats(); + record_stats(false); errorlog::ErrorLog::log(PSTRING() << "SOFT REJECT: aborting validation of block candidate for " << shard_.to_str() << " : " << error << ": data=" << block_candidate.id.file_hash.to_hex() << " collated_data=" << block_candidate.collated_file_hash.to_hex()); @@ -176,7 +176,7 @@ bool ValidateQuery::fatal_error(td::Status error) { error.ensure_error(); LOG(ERROR) << "aborting validation of block candidate for " << shard_.to_str() << " : " << error.to_string(); if (main_promise) { - record_stats(); + record_stats(false); auto c = error.code(); if (c <= -667 && c >= -670) { errorlog::ErrorLog::log(PSTRING() << "FATAL ERROR: aborting validation of block candidate for " << shard_.to_str() @@ -234,7 +234,7 @@ bool ValidateQuery::fatal_error(std::string err_msg, int err_code) { */ void ValidateQuery::finish_query() { if (main_promise) { - record_stats(); + record_stats(true); LOG(WARNING) << "validate query done"; main_promise.set_result(now_); } @@ -6928,13 +6928,13 @@ void ValidateQuery::written_candidate() { /** * Sends validation work time to manager. */ -void ValidateQuery::record_stats() { +void ValidateQuery::record_stats(bool success) { double work_time = work_timer_.elapsed(); double cpu_work_time = cpu_work_timer_.elapsed(); LOG(WARNING) << "validation took " << perf_timer_.elapsed() << "s"; LOG(WARNING) << "Validate query work time = " << work_time << "s, cpu time = " << cpu_work_time << "s"; td::actor::send_closure(manager, &ValidatorManager::record_validate_query_stats, block_candidate.id, work_time, - cpu_work_time); + cpu_work_time, success); } } // namespace validator diff --git a/validator/impl/validate-query.hpp b/validator/impl/validate-query.hpp index 98cd2493..90c368ff 100644 --- a/validator/impl/validate-query.hpp +++ b/validator/impl/validate-query.hpp @@ -400,7 +400,7 @@ class ValidateQuery : public td::actor::Actor { td::Timer work_timer_{true}; td::ThreadCpuTimer cpu_work_timer_{true}; - void record_stats(); + void record_stats(bool success); }; } // namespace validator diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index 20d4bd62..00fb77e1 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -205,13 +205,13 @@ class ValidatorManager : public ValidatorManagerInterface { td::optional shard, td::Promise> promise) = 0; - virtual void add_lite_query_stats(int lite_query_id) { + virtual void add_lite_query_stats(int lite_query_id, bool success) { } virtual void record_collate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time, - CollationStats stats) { + td::optional stats) { } - virtual void record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time) { + virtual void record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time, bool success) { } virtual void add_persistent_state_description(td::Ref desc) = 0; diff --git a/validator/manager-init.cpp b/validator/manager-init.cpp index c2944b25..6f304680 100644 --- a/validator/manager-init.cpp +++ b/validator/manager-init.cpp @@ -32,6 +32,8 @@ namespace ton { namespace validator { void ValidatorManagerMasterchainReiniter::start_up() { + status_ = ProcessStatus(manager_, "process.initial_sync"); + status_.set_status(PSTRING() << "starting, init block seqno " << block_id_.seqno()); LOG(INFO) << "init_block_id=" << block_id_; CHECK(block_id_.is_masterchain()); CHECK(block_id_.id.shard == shardIdAll); @@ -58,6 +60,7 @@ void ValidatorManagerMasterchainReiniter::got_masterchain_handle(BlockHandle han key_blocks_.push_back(handle_); if (opts_->initial_sync_disabled()) { + status_.set_status(PSTRING() << "downloading masterchain state " << handle_->id().seqno()); auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { R.ensure(); td::actor::send_closure(SelfId, &ValidatorManagerMasterchainReiniter::download_masterchain_state); @@ -181,6 +184,7 @@ void ValidatorManagerMasterchainReiniter::got_next_key_blocks(std::vector(key_blocks_.size()); key_blocks_.resize(key_blocks_.size() + vec.size(), nullptr); @@ -247,6 +251,7 @@ void ValidatorManagerMasterchainReiniter::choose_masterchain_state() { } void ValidatorManagerMasterchainReiniter::download_masterchain_state() { + status_.set_status(PSTRING() << "downloading masterchain state " << block_id_.seqno()); auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { LOG(WARNING) << "failed to download masterchain state: " << R.move_as_error(); @@ -274,6 +279,7 @@ void ValidatorManagerMasterchainReiniter::downloaded_masterchain_state(td::Ref("shardclient", opts_, handle_, state_, manager_, std::move(P)); + status_.set_status(PSTRING() << "downloading all shard states, mc seqno " << block_id_.seqno()); } void ValidatorManagerMasterchainReiniter::downloaded_all_shards() { diff --git a/validator/manager-init.hpp b/validator/manager-init.hpp index 7dce4e47..901b826b 100644 --- a/validator/manager-init.hpp +++ b/validator/manager-init.hpp @@ -27,6 +27,8 @@ #include "manager-init.h" +#include + namespace ton { namespace validator { @@ -77,6 +79,8 @@ class ValidatorManagerMasterchainReiniter : public td::actor::Actor { td::uint32 pending_ = 0; td::actor::ActorOwn client_; + + ProcessStatus status_; }; class ValidatorManagerMasterchainStarter : public td::actor::Actor { diff --git a/validator/manager.cpp b/validator/manager.cpp index 068ea5eb..8dce764d 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -451,11 +451,9 @@ void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Prom promise = [self = this, wc, addr, promise = std::move(promise), SelfId = actor_id(this)](td::Result> R) mutable { - if (R.is_error()) { - promise.set_error(R.move_as_error()); - return; - } - td::actor::send_lambda(SelfId, [=, promise = std::move(promise), message = R.move_as_ok()]() mutable { + td::actor::send_lambda(SelfId, [=, promise = std::move(promise), R = std::move(R)]() mutable { + ++(R.is_ok() ? self->total_check_ext_messages_ok_ : self->total_check_ext_messages_error_); + TRY_RESULT_PROMISE(promise, message, std::move(R)); if (self->checked_ext_msg_counter_.inc_msg_count(wc, addr) > max_ext_msg_per_addr()) { promise.set_error( td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex())); @@ -2131,7 +2129,7 @@ void ValidatorManagerImpl::update_shards() { } } - bool validating_masterchain = false; + active_validator_groups_master_ = active_validator_groups_shard_ = 0; if (allow_validate_) { for (auto &desc : new_shards) { auto shard = desc.first; @@ -2148,9 +2146,7 @@ void ValidatorManagerImpl::update_shards() { auto validator_id = get_validator(shard, val_set); if (!validator_id.is_zero()) { - if (shard.is_masterchain()) { - validating_masterchain = true; - } + ++(shard.is_masterchain() ? active_validator_groups_master_ : active_validator_groups_shard_); auto val_group_id = get_validator_set_id(shard, val_set, opts_hash, key_seqno, opts); if (force_recover) { @@ -2845,8 +2841,8 @@ void ValidatorManagerImpl::prepare_stats(td::Promiseid().to_str()); vec.emplace_back("rotatemasterchainblock", last_rotate_block_id_.to_str()); //vec.emplace_back("shardclientmasterchainseqno", td::to_string(min_confirmed_masterchain_seqno_)); - vec.emplace_back("stateserializermasterchainseqno", td::to_string(state_serializer_masterchain_seqno_)); } + td::NamedThreadSafeCounter::get_default().for_each([&](auto key, auto value) { vec.emplace_back("counter." + key, PSTRING() << value); }); @@ -2864,9 +2860,48 @@ void ValidatorManagerImpl::prepare_stats(td::Promiseget_state_serializer_enabled(); + if (is_validator() && last_masterchain_state_->get_global_id() == -239) { + serializer_enabled = false; + } + vec.emplace_back("stateserializerenabled", serializer_enabled ? "true" : "false"); + merger.make_promise("").set_value(std::move(vec)); + if (!serializer_.empty()) { + td::actor::send_closure(serializer_, &AsyncStateSerializer::prepare_stats, merger.make_promise("")); + } + td::actor::send_closure(db_, &Db::prepare_stats, merger.make_promise("db.")); + for (auto &[_, p] : stats_providers_) { + p.second(merger.make_promise(p.first)); + } } void ValidatorManagerImpl::prepare_perf_timer_stats(td::Promise> promise) { @@ -3353,17 +3388,28 @@ td::actor::ActorOwn ValidatorManagerFactory::create( } void ValidatorManagerImpl::record_collate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time, - CollationStats stats) { + td::optional stats) { + if (!stats) { + ++(block_id.is_masterchain() ? total_collated_blocks_master_error_ : total_collated_blocks_shard_error_); + return; + } auto &record = new_block_stats_record(block_id); record.collator_work_time_ = work_time; record.collator_cpu_work_time_ = cpu_work_time; - record.collator_stats_ = std::move(stats); + record.collator_stats_ = std::move(stats.value()); + ++(block_id.is_masterchain() ? total_collated_blocks_master_ok_ : total_collated_blocks_shard_ok_); } -void ValidatorManagerImpl::record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time) { +void ValidatorManagerImpl::record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time, + bool success) { auto &record = new_block_stats_record(block_id); record.validator_work_time_ = work_time; record.validator_cpu_work_time_ = cpu_work_time; + if (success) { + ++(block_id.is_masterchain() ? total_validated_blocks_master_ok_ : total_validated_blocks_shard_ok_); + } else { + ++(block_id.is_masterchain() ? total_validated_blocks_master_error_ : total_validated_blocks_shard_error_); + } } ValidatorManagerImpl::RecordedBlockStats &ValidatorManagerImpl::new_block_stats_record(BlockIdExt block_id) { @@ -3377,6 +3423,16 @@ ValidatorManagerImpl::RecordedBlockStats &ValidatorManagerImpl::new_block_stats_ return recorded_block_stats_[block_id]; } +void ValidatorManagerImpl::register_stats_provider( + td::uint64 idx, std::string prefix, + std::function>>)> callback) { + stats_providers_[idx] = {std::move(prefix), std::move(callback)}; +} + +void ValidatorManagerImpl::unregister_stats_provider(td::uint64 idx) { + stats_providers_.erase(idx); +} + size_t ValidatorManagerImpl::CheckedExtMsgCounter::get_msg_count(WorkchainId wc, StdSmcAddress addr) { before_query(); auto it1 = counter_cur_.find({wc, addr}); diff --git a/validator/manager.hpp b/validator/manager.hpp index 519cab12..9e54c3f3 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -655,8 +655,9 @@ class ValidatorManagerImpl : public ValidatorManager { td::optional shard, td::Promise> promise) override; - void add_lite_query_stats(int lite_query_id) override { + void add_lite_query_stats(int lite_query_id, bool success) override { ++ls_stats_[lite_query_id]; + ++(success ? total_ls_queries_ok_ : total_ls_queries_error_)[lite_query_id]; } private: @@ -747,6 +748,16 @@ class ValidatorManagerImpl : public ValidatorManager { std::map ls_stats_; // lite_api ID -> count, 0 for unknown td::uint32 ls_stats_check_ext_messages_{0}; + UnixTime started_at_ = (UnixTime)td::Clocks::system(); + std::map total_ls_queries_ok_, total_ls_queries_error_; // lite_api ID -> count, 0 for unknown + td::uint64 total_check_ext_messages_ok_{0}, total_check_ext_messages_error_{0}; + td::uint64 total_collated_blocks_master_ok_{0}, total_collated_blocks_master_error_{0}; + td::uint64 total_validated_blocks_master_ok_{0}, total_validated_blocks_master_error_{0}; + td::uint64 total_collated_blocks_shard_ok_{0}, total_collated_blocks_shard_error_{0}; + td::uint64 total_validated_blocks_shard_ok_{0}, total_validated_blocks_shard_error_{0}; + + size_t active_validator_groups_master_{0}, active_validator_groups_shard_{0}; + td::actor::ActorOwn candidates_buffer_; struct RecordedBlockStats { @@ -760,16 +771,25 @@ class ValidatorManagerImpl : public ValidatorManager { std::queue recorded_block_stats_lru_; void record_collate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time, - CollationStats stats) override; - void record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time) override; + td::optional stats) override; + void record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time, bool success) override; RecordedBlockStats &new_block_stats_record(BlockIdExt block_id); + void register_stats_provider( + td::uint64 idx, std::string prefix, + std::function>>)> callback) override; + void unregister_stats_provider(td::uint64 idx) override; + std::map> validator_telemetry_; void init_validator_telemetry(); std::map> persistent_state_descriptions_; std::map> persistent_state_blocks_; + + std::map>>)>>> + stats_providers_; }; } // namespace validator diff --git a/validator/net/download-state.cpp b/validator/net/download-state.cpp index 2b373ef3..6735a2b5 100644 --- a/validator/net/download-state.cpp +++ b/validator/net/download-state.cpp @@ -70,6 +70,7 @@ void DownloadState::finish_query() { } void DownloadState::start_up() { + status_ = ProcessStatus(validator_manager_, "process.download_state_net"); alarm_timestamp() = timeout_; td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_persistent_state, block_id_, @@ -190,6 +191,7 @@ void DownloadState::got_block_state_description(td::BufferSlice data) { td::Timestamp::in(3.0), std::move(P)); } })); + status_.set_status(PSTRING() << block_id_.id.to_str() << " : 0 bytes, 0B/s"); } void DownloadState::got_block_state_part(td::BufferSlice data, td::uint32 requested_size) { @@ -198,14 +200,18 @@ void DownloadState::got_block_state_part(td::BufferSlice data, td::uint32 reques parts_.push_back(std::move(data)); double elapsed = prev_logged_timer_.elapsed(); - if (elapsed > 10.0) { + if (elapsed > 5.0) { prev_logged_timer_ = td::Timer(); + auto speed = (td::uint64)((double)(sum_ - prev_logged_sum_) / elapsed); LOG(WARNING) << "downloading state " << block_id_.to_str() << ": " << td::format::as_size(sum_) << " (" - << td::format::as_size((td::uint64)(double(sum_ - prev_logged_sum_) / elapsed)) << "/s)"; + << td::format::as_size(speed) << "/s)"; + status_.set_status(PSTRING() << block_id_.id.to_str() << " : " << sum_ << " bytes, " << td::format::as_size(speed) + << "/s"); prev_logged_sum_ = sum_; } if (last_part) { + status_.set_status(PSTRING() << block_id_.id.to_str() << " : " << sum_ << " bytes, finishing"); td::BufferSlice res{td::narrow_cast(sum_)}; auto S = res.as_slice(); for (auto &p : parts_) { diff --git a/validator/net/download-state.hpp b/validator/net/download-state.hpp index 19c44beb..470c5431 100644 --- a/validator/net/download-state.hpp +++ b/validator/net/download-state.hpp @@ -23,6 +23,8 @@ #include "validator/validator.h" #include "adnl/adnl-ext-client.h" +#include + namespace ton { namespace validator { @@ -75,6 +77,8 @@ class DownloadState : public td::actor::Actor { td::uint64 prev_logged_sum_ = 0; td::Timer prev_logged_timer_; + + ProcessStatus status_; }; } // namespace fullnode diff --git a/validator/state-serializer.cpp b/validator/state-serializer.cpp index b693232b..bc3d7b5e 100644 --- a/validator/state-serializer.cpp +++ b/validator/state-serializer.cpp @@ -58,6 +58,12 @@ void AsyncStateSerializer::got_self_state(AsyncSerializerState state) { }); td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, last_block_id_, true, std::move(P)); } + + inited_block_id_ = true; + for (auto& promise : wait_init_block_id_) { + promise.set_value(td::Unit()); + } + wait_init_block_id_.clear(); } void AsyncStateSerializer::got_init_handle(BlockHandle handle) { @@ -186,6 +192,9 @@ void AsyncStateSerializer::next_iteration() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_previous_state_files); }, td::Timestamp::in(delay)); + current_status_ = PSTRING() << "delay before serializing seqno=" << masterchain_handle_->id().seqno() << " " + << (int)delay << "s"; + current_status_ts_ = td::Timestamp::now(); return; } if (next_idx_ < shards_.size()) { @@ -379,9 +388,14 @@ void AsyncStateSerializer::got_masterchain_state(td::Ref state td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, masterchain_handle_->id(), masterchain_handle_->id(), write_data, std::move(P)); + + current_status_ = PSTRING() << "serializing masterchain state " << state->get_block_id().id.to_str(); + current_status_ts_ = td::Timestamp::now(); } void AsyncStateSerializer::stored_masterchain_state() { + current_status_ = "pending"; + current_status_ts_ = {}; LOG(ERROR) << "finished serializing masterchain state " << masterchain_handle_->id().id.to_str(); running_ = false; next_iteration(); @@ -444,9 +458,14 @@ void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Refid(), masterchain_handle_->id(), write_data, std::move(P)); + current_status_ = PSTRING() << "serializing shard state " << next_idx_ << "/" << shards_.size() << " " + << state->get_block_id().id.to_str(); + current_status_ts_ = td::Timestamp::now(); } void AsyncStateSerializer::fail_handler(td::Status reason) { + current_status_ = PSTRING() << "pending, " << reason; + current_status_ts_ = {}; VLOG(VALIDATOR_NOTICE) << "failure: " << reason; attempt_++; delay_action( @@ -460,6 +479,8 @@ void AsyncStateSerializer::fail_handler_cont() { } void AsyncStateSerializer::success_handler() { + current_status_ = "pending"; + current_status_ts_ = {}; running_ = false; next_iteration(); } @@ -478,6 +499,29 @@ void AsyncStateSerializer::auto_disable_serializer(bool disabled) { } } +void AsyncStateSerializer::prepare_stats(td::Promise>> promise) { + if (!inited_block_id_) { + wait_init_block_id_.push_back( + [SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { + TRY_STATUS_PROMISE(promise, R.move_as_status()); + td::actor::send_closure(SelfId, &AsyncStateSerializer::prepare_stats, std::move(promise)); + }); + return; + } + std::vector> vec; + vec.emplace_back("stateserializermasterchainseqno", td::to_string(last_block_id_.seqno())); + td::StringBuilder sb; + sb << current_status_; + if (current_status_ts_) { + sb << " (started " << (int)(td::Timestamp::now() - current_status_ts_) << "s ago)"; + } + if (!opts_->get_state_serializer_enabled() || auto_disabled_) { + sb << " (disabled)"; + } + vec.emplace_back("stateserializerstatus", sb.as_cslice().str()); + promise.set_result(std::move(vec)); +} + bool AsyncStateSerializer::need_serialize(BlockHandle handle) { if (handle->id().id.seqno == 0 || !handle->is_key_block()) { return false; diff --git a/validator/state-serializer.hpp b/validator/state-serializer.hpp index 1e7f5c9c..406ac350 100644 --- a/validator/state-serializer.hpp +++ b/validator/state-serializer.hpp @@ -36,6 +36,9 @@ class AsyncStateSerializer : public td::actor::Actor { UnixTime last_key_block_ts_ = 0; bool saved_to_db_ = true; + bool inited_block_id_ = false; + std::vector> wait_init_block_id_; + td::Ref opts_; bool auto_disabled_ = false; td::CancellationTokenSource cancellation_token_source_; @@ -95,6 +98,8 @@ class AsyncStateSerializer : public td::actor::Actor { promise.set_result(last_block_id_.id.seqno); } + void prepare_stats(td::Promise>> promise); + void update_last_known_key_block_ts(UnixTime ts) { last_known_key_block_ts_ = std::max(last_known_key_block_ts_, ts); } @@ -111,6 +116,9 @@ class AsyncStateSerializer : public td::actor::Actor { void update_options(td::Ref opts); void auto_disable_serializer(bool disabled); + + std::string current_status_ = "pending"; + td::Timestamp current_status_ts_ = td::Timestamp::never(); }; } // namespace validator diff --git a/validator/stats-provider.h b/validator/stats-provider.h new file mode 100644 index 00000000..e0a7f565 --- /dev/null +++ b/validator/stats-provider.h @@ -0,0 +1,105 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once + +#include "validator.h" +#include "common/AtomicRef.h" + +#include + +namespace ton { + +namespace validator { + +class StatsProvider { + public: + StatsProvider() = default; + StatsProvider(td::actor::ActorId manager, std::string prefix, + std::function>>)> callback) + : inited_(true), manager_(std::move(manager)) { + static std::atomic cur_idx{0}; + idx_ = cur_idx.fetch_add(1); + td::actor::send_closure(manager_, &ValidatorManagerInterface::register_stats_provider, idx_, std::move(prefix), + std::move(callback)); + } + StatsProvider(const StatsProvider&) = delete; + StatsProvider(StatsProvider&& other) noexcept + : inited_(other.inited_), idx_(other.idx_), manager_(std::move(other.manager_)) { + other.inited_ = false; + } + ~StatsProvider() { + if (inited_) { + td::actor::send_closure(manager_, &ValidatorManagerInterface::unregister_stats_provider, idx_); + } + } + + StatsProvider& operator=(const StatsProvider&) = delete; + StatsProvider& operator=(StatsProvider&& other) noexcept { + if (this != &other) { + inited_ = other.inited_; + idx_ = other.idx_; + manager_ = std::move(other.manager_); + other.inited_ = false; + } + return *this; + } + + bool inited() const { + return inited_; + } + + private: + bool inited_ = false; + td::uint64 idx_ = 0; + td::actor::ActorId manager_; +}; + +class ProcessStatus { + public: + ProcessStatus() = default; + ProcessStatus(td::actor::ActorId manager, std::string name) + : stats_provider_(std::move(manager), std::move(name), [value = value_](auto promise) { + auto status = value->load(); + if (status.is_null()) { + promise.set_error(td::Status::Error("empty")); + return; + } + std::vector> vec; + vec.emplace_back("", *status); + promise.set_value(std::move(vec)); + }) { + } + ProcessStatus(const ProcessStatus&) = delete; + ProcessStatus(ProcessStatus&& other) noexcept = default; + ProcessStatus& operator=(const ProcessStatus&) = delete; + ProcessStatus& operator=(ProcessStatus&& other) noexcept = default; + + void set_status(std::string s) { + if (!value_) { + return; + } + value_->store(td::Ref>(true, std::move(s))); + } + + private: + std::shared_ptr>> value_ = std::make_shared>>(); + StatsProvider stats_provider_; +}; + +} // namespace validator + +} // namespace ton diff --git a/validator/validator.h b/validator/validator.h index 73065aa9..42b3f69a 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -20,6 +20,7 @@ #include #include +#include #include "td/actor/actor.h" @@ -292,6 +293,13 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void get_out_msg_queue_size(BlockIdExt block_id, td::Promise promise) = 0; virtual void update_options(td::Ref opts) = 0; + + virtual void register_stats_provider( + td::uint64 idx, std::string prefix, + std::function>>)> callback) { + } + virtual void unregister_stats_provider(td::uint64 idx) { + } }; } // namespace validator