1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Add more stats to validator getstats

1) Liteserver queries count
2) Collated/validated blocks count, number of active sessions
3) Persistent state sizes
4) Initial sync progress
This commit is contained in:
SpyCheese 2025-02-17 10:13:17 +03:00
parent ce6c29941e
commit 9d94e04d20
26 changed files with 365 additions and 45 deletions

View file

@ -128,6 +128,10 @@ inline Timestamp &operator+=(Timestamp &a, double b) {
return a;
}
inline double operator-(const Timestamp &a, const Timestamp &b) {
return a.at() - b.at();
}
template <class StorerT>
void store(const Timestamp &timestamp, StorerT &storer) {
storer.store_binary(timestamp.at() - Time::now() + Clocks::system());

View file

@ -1957,7 +1957,8 @@ void ValidatorEngine::started_overlays() {
void ValidatorEngine::start_validator() {
validator_options_.write().set_allow_blockchain_init(config_.validators.size() > 0);
validator_options_.write().set_state_serializer_enabled(config_.state_serializer_enabled);
validator_options_.write().set_state_serializer_enabled(config_.state_serializer_enabled &&
!state_serializer_disabled_flag_);
load_collator_options();
validator_manager_ = ton::validator::ValidatorManagerFactory::create(
@ -3973,7 +3974,7 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setStateS
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::engine_validator_success>());
return;
}
validator_options_.write().set_state_serializer_enabled(query.enabled_);
validator_options_.write().set_state_serializer_enabled(query.enabled_ && !state_serializer_disabled_flag_);
td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::update_options,
validator_options_);
config_.state_serializer_enabled = query.enabled_;
@ -4556,6 +4557,11 @@ int main(int argc, char *argv[]) {
td::actor::send_closure(x, &ValidatorEngine::set_validator_telemetry_filename, s);
});
});
p.add_option(
'\0', "disable-state-serializer",
"disable persistent state serializer (similar to set-state-serializer-enabled 0 in validator console)", [&]() {
acts.push_back([&x]() { td::actor::send_closure(x, &ValidatorEngine::set_state_serializer_disabled_flag); });
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();

View file

@ -228,6 +228,7 @@ class ValidatorEngine : public td::actor::Actor {
std::string validator_telemetry_filename_;
bool not_all_shards_ = false;
std::vector<ton::ShardIdFull> add_shard_cmds_;
bool state_serializer_disabled_flag_ = false;
std::set<ton::CatchainSeqno> unsafe_catchains_;
std::map<ton::BlockSeqno, std::pair<ton::CatchainSeqno, td::uint32>> unsafe_catchain_rotations_;
@ -325,6 +326,9 @@ class ValidatorEngine : public td::actor::Actor {
void add_shard_cmd(ton::ShardIdFull shard) {
add_shard_cmds_.push_back(shard);
}
void set_state_serializer_disabled_flag() {
state_serializer_disabled_flag_ = true;
}
void start_up() override;
ValidatorEngine() {

View file

@ -1196,6 +1196,30 @@ void ArchiveManager::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
}
}
void ArchiveManager::prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise) {
std::vector<std::pair<std::string, std::string>> stats;
{
std::map<BlockSeqno, td::uint64> states;
for (auto &[key, file] : perm_states_) {
BlockSeqno seqno = key.first;
auto r_stat = td::stat(db_root_ + "/archive/states/" + file.filename_short());
if (r_stat.is_error()) {
LOG(WARNING) << "Cannot stat persistent state file " << file.filename_short() << " : " << r_stat.move_as_error();
} else {
states[seqno] += r_stat.move_as_ok().size_;
}
}
td::StringBuilder sb;
for (auto &[seqno, size] : states) {
sb << seqno << ":" << td::format::as_size(size) << " ";
}
if (!sb.as_cslice().empty()) {
stats.emplace_back("persistent_states", sb.as_cslice().str());
}
}
promise.set_value(std::move(stats));
}
void ArchiveManager::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise) {
index_->begin_transaction().ensure();
td::MultiPromise mp;

View file

@ -81,6 +81,8 @@ class ArchiveManager : public td::actor::Actor {
cur_shard_split_depth_ = value;
}
void prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise);
static constexpr td::uint32 archive_size() {
return 20000;
}

View file

@ -158,6 +158,17 @@ void CellDbIn::start_up() {
},
td::Timestamp::now());
}
{
std::string key = "stats.last_deleted_mc_seqno", value;
auto R = cell_db_->get(td::as_slice(key), value);
R.ensure();
if (R.ok() == td::KeyValue::GetStatus::Ok) {
auto r_value = td::to_integer_safe<BlockSeqno>(value);
r_value.ensure();
last_deleted_mc_state_ = r_value.move_as_ok();
}
}
}
void CellDbIn::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise) {
@ -452,6 +463,11 @@ void CellDbIn::gc_cont2(BlockHandle handle) {
cell_db_->erase(get_key(key_hash)).ensure();
set_block(F.prev, std::move(P));
set_block(F.next, std::move(N));
if (handle->id().is_masterchain()) {
last_deleted_mc_state_ = handle->id().seqno();
std::string key = "stats.last_deleted_mc_seqno", value = td::to_string(last_deleted_mc_state_);
cell_db_->set(td::as_slice(key), td::as_slice(value));
}
cell_db_->commit_write_batch().ensure();
alarm_timestamp() = td::Timestamp::now();
timer_write_batch.reset();
@ -475,9 +491,6 @@ void CellDbIn::gc_cont2(BlockHandle handle) {
if (!opts_->get_disable_rocksdb_stats()) {
cell_db_statistics_.gc_cell_time_.insert(timer.elapsed() * 1e6);
}
if (handle->id().is_masterchain()) {
last_deleted_mc_state_ = handle->id().seqno();
}
LOG(DEBUG) << "Deleted state " << handle->id().to_str();
timer_finish.reset();
timer_all.reset();

View file

@ -438,6 +438,7 @@ void RootDb::allow_block_gc(BlockIdExt block_id, td::Promise<bool> promise) {
void RootDb::prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise) {
auto merger = StatsMerger::create(std::move(promise));
td::actor::send_closure(cell_db_, &CellDb::prepare_stats, merger.make_promise("celldb."));
td::actor::send_closure(archive_db_, &ArchiveManager::prepare_stats, merger.make_promise("archive."));
}
void RootDb::truncate(BlockSeqno seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise) {

View file

@ -38,6 +38,7 @@ DownloadShardState::DownloadShardState(BlockIdExt block_id, BlockIdExt mastercha
}
void DownloadShardState::start_up() {
status_ = ProcessStatus(manager_, "process.download_state");
alarm_timestamp() = timeout_;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
@ -81,6 +82,7 @@ void DownloadShardState::download_state() {
});
td::actor::send_closure(manager_, &ValidatorManager::send_get_block_proof_link_request, block_id_, priority_,
std::move(P));
status_.set_status(PSTRING() << block_id_.id.to_str() << " : downloading proof");
}
void DownloadShardState::downloaded_proof_link(td::BufferSlice data) {
@ -123,6 +125,7 @@ void DownloadShardState::checked_proof_link() {
td::actor::send_closure(manager_, &ValidatorManager::send_get_persistent_state_request, block_id_,
masterchain_block_id_, priority_, std::move(P));
}
status_.set_status(PSTRING() << block_id_.id.to_str() << " : downloading state");
}
void DownloadShardState::download_zero_state() {
@ -152,6 +155,7 @@ void DownloadShardState::downloaded_zero_state(td::BufferSlice data) {
}
void DownloadShardState::downloaded_shard_state(td::BufferSlice data) {
status_.set_status(PSTRING() << block_id_.id.to_str() << " : processing downloaded state");
auto S = create_shard_state(block_id_, data.clone());
if (S.is_error()) {
fail_handler(actor_id(this), S.move_as_error());
@ -174,6 +178,7 @@ void DownloadShardState::downloaded_shard_state(td::BufferSlice data) {
}
void DownloadShardState::checked_shard_state() {
status_.set_status(PSTRING() << block_id_.id.to_str() << " : storing state file");
LOG(WARNING) << "checked shard state " << block_id_.to_str();
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
R.ensure();
@ -189,6 +194,7 @@ void DownloadShardState::checked_shard_state() {
}
void DownloadShardState::written_shard_state_file() {
status_.set_status(PSTRING() << block_id_.id.to_str() << " : storing state to celldb");
LOG(WARNING) << "written shard state file " << block_id_.to_str();
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
R.ensure();
@ -198,6 +204,7 @@ void DownloadShardState::written_shard_state_file() {
}
void DownloadShardState::written_shard_state(td::Ref<ShardState> state) {
status_.set_status(PSTRING() << block_id_.id.to_str() << " : finishing");
state_ = std::move(state);
handle_->set_unix_time(state_->get_unix_time());
handle_->set_is_key_block(block_id_.is_masterchain());

View file

@ -19,6 +19,7 @@
#pragma once
#include "validator/interfaces/validator-manager.h"
#include "stats-provider.h"
namespace ton {
@ -67,6 +68,8 @@ class DownloadShardState : public td::actor::Actor {
td::BufferSlice data_;
td::Ref<ShardState> state_;
ProcessStatus status_;
};
} // namespace validator

View file

@ -50,7 +50,7 @@ class Collator final : public td::actor::Actor {
using LtCellRef = block::LtCellRef;
using NewOutMsg = block::NewOutMsg;
const ShardIdFull shard_;
ton::BlockId new_id;
ton::BlockId new_id{workchainInvalid, 0, 0};
bool busy_{false};
bool before_split_{false};
bool after_split_{false};

View file

@ -352,6 +352,8 @@ bool Collator::fatal_error(td::Status error) {
attempt_idx_ + 1);
} else {
main_promise(std::move(error));
td::actor::send_closure(manager, &ValidatorManager::record_collate_query_stats, BlockIdExt{new_id, RootHash::zero(), FileHash::zero()},
work_timer_.elapsed(), cpu_work_timer_.elapsed(), td::optional<CollationStats>{});
}
busy_ = false;
}

View file

@ -85,19 +85,13 @@ void LiteQuery::abort_query(td::Status reason) {
if (acc_state_promise_) {
acc_state_promise_.set_error(std::move(reason));
} else if (promise_) {
td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, query_obj_ ? query_obj_->get_id() : 0,
false);
promise_.set_error(std::move(reason));
}
stop();
}
void LiteQuery::abort_query_ext(td::Status reason, std::string comment) {
LOG(INFO) << "aborted liteserver query: " << comment << " : " << reason.to_string();
if (promise_) {
promise_.set_error(reason.move_as_error_prefix(comment + " : "));
}
stop();
}
bool LiteQuery::fatal_error(td::Status error) {
abort_query(std::move(error));
return false;
@ -120,6 +114,8 @@ bool LiteQuery::finish_query(td::BufferSlice result, bool skip_cache_update) {
td::actor::send_closure(cache_, &LiteServerCache::update, cache_key_, result.clone());
}
if (promise_) {
td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, query_obj_ ? query_obj_->get_id() : 0,
true);
promise_.set_result(std::move(result));
stop();
return true;
@ -139,7 +135,6 @@ void LiteQuery::start_up() {
auto F = fetch_tl_object<ton::lite_api::Function>(query_, true);
if (F.is_error()) {
td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, 0); // unknown
abort_query(F.move_as_error());
return;
}
@ -192,7 +187,6 @@ bool LiteQuery::use_cache() {
}
void LiteQuery::perform() {
td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, query_obj_->get_id());
lite_api::downcast_call(
*query_obj_,
td::overloaded(

View file

@ -97,7 +97,6 @@ class LiteQuery : public td::actor::Actor {
bool fatal_error(std::string err_msg, int err_code = -400);
bool fatal_error(int err_code, std::string err_msg = "");
void abort_query(td::Status reason);
void abort_query_ext(td::Status reason, std::string err_msg);
bool finish_query(td::BufferSlice result, bool skip_cache_update = false);
void alarm() override;
void start_up() override;

View file

@ -115,7 +115,7 @@ bool ValidateQuery::reject_query(std::string error, td::BufferSlice reason) {
error = error_ctx() + error;
LOG(ERROR) << "REJECT: aborting validation of block candidate for " << shard_.to_str() << " : " << error;
if (main_promise) {
record_stats();
record_stats(false);
errorlog::ErrorLog::log(PSTRING() << "REJECT: aborting validation of block candidate for " << shard_.to_str()
<< " : " << error << ": data=" << block_candidate.id.file_hash.to_hex()
<< " collated_data=" << block_candidate.collated_file_hash.to_hex());
@ -153,7 +153,7 @@ bool ValidateQuery::soft_reject_query(std::string error, td::BufferSlice reason)
error = error_ctx() + error;
LOG(ERROR) << "SOFT REJECT: aborting validation of block candidate for " << shard_.to_str() << " : " << error;
if (main_promise) {
record_stats();
record_stats(false);
errorlog::ErrorLog::log(PSTRING() << "SOFT REJECT: aborting validation of block candidate for " << shard_.to_str()
<< " : " << error << ": data=" << block_candidate.id.file_hash.to_hex()
<< " collated_data=" << block_candidate.collated_file_hash.to_hex());
@ -176,7 +176,7 @@ bool ValidateQuery::fatal_error(td::Status error) {
error.ensure_error();
LOG(ERROR) << "aborting validation of block candidate for " << shard_.to_str() << " : " << error.to_string();
if (main_promise) {
record_stats();
record_stats(false);
auto c = error.code();
if (c <= -667 && c >= -670) {
errorlog::ErrorLog::log(PSTRING() << "FATAL ERROR: aborting validation of block candidate for " << shard_.to_str()
@ -234,7 +234,7 @@ bool ValidateQuery::fatal_error(std::string err_msg, int err_code) {
*/
void ValidateQuery::finish_query() {
if (main_promise) {
record_stats();
record_stats(true);
LOG(WARNING) << "validate query done";
main_promise.set_result(now_);
}
@ -6928,13 +6928,13 @@ void ValidateQuery::written_candidate() {
/**
* Sends validation work time to manager.
*/
void ValidateQuery::record_stats() {
void ValidateQuery::record_stats(bool success) {
double work_time = work_timer_.elapsed();
double cpu_work_time = cpu_work_timer_.elapsed();
LOG(WARNING) << "validation took " << perf_timer_.elapsed() << "s";
LOG(WARNING) << "Validate query work time = " << work_time << "s, cpu time = " << cpu_work_time << "s";
td::actor::send_closure(manager, &ValidatorManager::record_validate_query_stats, block_candidate.id, work_time,
cpu_work_time);
cpu_work_time, success);
}
} // namespace validator

View file

@ -400,7 +400,7 @@ class ValidateQuery : public td::actor::Actor {
td::Timer work_timer_{true};
td::ThreadCpuTimer cpu_work_timer_{true};
void record_stats();
void record_stats(bool success);
};
} // namespace validator

View file

@ -205,13 +205,13 @@ class ValidatorManager : public ValidatorManagerInterface {
td::optional<ShardIdFull> shard,
td::Promise<tl_object_ptr<lite_api::liteServer_nonfinal_validatorGroups>> promise) = 0;
virtual void add_lite_query_stats(int lite_query_id) {
virtual void add_lite_query_stats(int lite_query_id, bool success) {
}
virtual void record_collate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time,
CollationStats stats) {
td::optional<CollationStats> stats) {
}
virtual void record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time) {
virtual void record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time, bool success) {
}
virtual void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) = 0;

View file

@ -32,6 +32,8 @@ namespace ton {
namespace validator {
void ValidatorManagerMasterchainReiniter::start_up() {
status_ = ProcessStatus(manager_, "process.initial_sync");
status_.set_status(PSTRING() << "starting, init block seqno " << block_id_.seqno());
LOG(INFO) << "init_block_id=" << block_id_;
CHECK(block_id_.is_masterchain());
CHECK(block_id_.id.shard == shardIdAll);
@ -58,6 +60,7 @@ void ValidatorManagerMasterchainReiniter::got_masterchain_handle(BlockHandle han
key_blocks_.push_back(handle_);
if (opts_->initial_sync_disabled()) {
status_.set_status(PSTRING() << "downloading masterchain state " << handle_->id().seqno());
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
R.ensure();
td::actor::send_closure(SelfId, &ValidatorManagerMasterchainReiniter::download_masterchain_state);
@ -181,6 +184,7 @@ void ValidatorManagerMasterchainReiniter::got_next_key_blocks(std::vector<BlockI
}
}
LOG(WARNING) << "last key block is " << vec[vec.size() - 1];
status_.set_status(PSTRING() << "last key block is " << vec.back().seqno());
auto s = static_cast<td::uint32>(key_blocks_.size());
key_blocks_.resize(key_blocks_.size() + vec.size(), nullptr);
@ -247,6 +251,7 @@ void ValidatorManagerMasterchainReiniter::choose_masterchain_state() {
}
void ValidatorManagerMasterchainReiniter::download_masterchain_state() {
status_.set_status(PSTRING() << "downloading masterchain state " << block_id_.seqno());
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
LOG(WARNING) << "failed to download masterchain state: " << R.move_as_error();
@ -274,6 +279,7 @@ void ValidatorManagerMasterchainReiniter::downloaded_masterchain_state(td::Ref<S
td::actor::send_closure(SelfId, &ValidatorManagerMasterchainReiniter::downloaded_all_shards);
});
client_ = td::actor::create_actor<ShardClient>("shardclient", opts_, handle_, state_, manager_, std::move(P));
status_.set_status(PSTRING() << "downloading all shard states, mc seqno " << block_id_.seqno());
}
void ValidatorManagerMasterchainReiniter::downloaded_all_shards() {

View file

@ -27,6 +27,8 @@
#include "manager-init.h"
#include <stats-provider.h>
namespace ton {
namespace validator {
@ -77,6 +79,8 @@ class ValidatorManagerMasterchainReiniter : public td::actor::Actor {
td::uint32 pending_ = 0;
td::actor::ActorOwn<ShardClient> client_;
ProcessStatus status_;
};
class ValidatorManagerMasterchainStarter : public td::actor::Actor {

View file

@ -451,11 +451,9 @@ void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Prom
promise = [self = this, wc, addr, promise = std::move(promise),
SelfId = actor_id(this)](td::Result<td::Ref<ExtMessage>> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error());
return;
}
td::actor::send_lambda(SelfId, [=, promise = std::move(promise), message = R.move_as_ok()]() mutable {
td::actor::send_lambda(SelfId, [=, promise = std::move(promise), R = std::move(R)]() mutable {
++(R.is_ok() ? self->total_check_ext_messages_ok_ : self->total_check_ext_messages_error_);
TRY_RESULT_PROMISE(promise, message, std::move(R));
if (self->checked_ext_msg_counter_.inc_msg_count(wc, addr) > max_ext_msg_per_addr()) {
promise.set_error(
td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex()));
@ -2131,7 +2129,7 @@ void ValidatorManagerImpl::update_shards() {
}
}
bool validating_masterchain = false;
active_validator_groups_master_ = active_validator_groups_shard_ = 0;
if (allow_validate_) {
for (auto &desc : new_shards) {
auto shard = desc.first;
@ -2148,9 +2146,7 @@ void ValidatorManagerImpl::update_shards() {
auto validator_id = get_validator(shard, val_set);
if (!validator_id.is_zero()) {
if (shard.is_masterchain()) {
validating_masterchain = true;
}
++(shard.is_masterchain() ? active_validator_groups_master_ : active_validator_groups_shard_);
auto val_group_id = get_validator_set_id(shard, val_set, opts_hash, key_seqno, opts);
if (force_recover) {
@ -2845,8 +2841,8 @@ void ValidatorManagerImpl::prepare_stats(td::Promise<std::vector<std::pair<std::
vec.emplace_back("knownkeymasterchainblock", last_known_key_block_handle_->id().to_str());
vec.emplace_back("rotatemasterchainblock", last_rotate_block_id_.to_str());
//vec.emplace_back("shardclientmasterchainseqno", td::to_string(min_confirmed_masterchain_seqno_));
vec.emplace_back("stateserializermasterchainseqno", td::to_string(state_serializer_masterchain_seqno_));
}
td::NamedThreadSafeCounter::get_default().for_each([&](auto key, auto value) {
vec.emplace_back("counter." + key, PSTRING() << value);
});
@ -2864,9 +2860,48 @@ void ValidatorManagerImpl::prepare_stats(td::Promise<std::vector<std::pair<std::
td::actor::send_closure(shard_client_, &ShardClient::get_processed_masterchain_block, std::move(P));
}
vec.emplace_back("start_time", td::to_string(started_at_));
for (int iter = 0; iter < 2; ++iter) {
td::StringBuilder sb;
td::uint32 total = 0;
for (const auto &p : (iter ? total_ls_queries_error_ : total_ls_queries_ok_)) {
sb << lite_query_name_by_id(p.first) << ":" << p.second << " ";
total += p.second;
}
sb << "TOTAL:" << total;
vec.emplace_back(PSTRING() << "total.ls_queries_" << (iter ? "error" : "ok"), sb.as_cslice().str());
}
vec.emplace_back("total.ext_msg_check",
PSTRING() << "ok:" << total_check_ext_messages_ok_ << " error:" << total_check_ext_messages_error_);
vec.emplace_back("total.collated_blocks.master", PSTRING() << "ok:" << total_collated_blocks_master_ok_
<< " error:" << total_collated_blocks_master_error_);
vec.emplace_back("total.collated_blocks.shard", PSTRING() << "ok:" << total_collated_blocks_shard_ok_
<< " error:" << total_collated_blocks_shard_error_);
vec.emplace_back("total.validated_blocks.master", PSTRING() << "ok:" << total_validated_blocks_master_ok_
<< " error:" << total_validated_blocks_master_error_);
vec.emplace_back("total.validated_blocks.shard", PSTRING() << "ok:" << total_validated_blocks_shard_ok_
<< " error:" << total_validated_blocks_shard_error_);
if (is_validator()) {
vec.emplace_back("active_validator_groups", PSTRING() << "master:" << active_validator_groups_master_
<< " shard:" << active_validator_groups_shard_);
}
bool serializer_enabled = opts_->get_state_serializer_enabled();
if (is_validator() && last_masterchain_state_->get_global_id() == -239) {
serializer_enabled = false;
}
vec.emplace_back("stateserializerenabled", serializer_enabled ? "true" : "false");
merger.make_promise("").set_value(std::move(vec));
if (!serializer_.empty()) {
td::actor::send_closure(serializer_, &AsyncStateSerializer::prepare_stats, merger.make_promise(""));
}
td::actor::send_closure(db_, &Db::prepare_stats, merger.make_promise("db."));
for (auto &[_, p] : stats_providers_) {
p.second(merger.make_promise(p.first));
}
}
void ValidatorManagerImpl::prepare_perf_timer_stats(td::Promise<std::vector<PerfTimerStats>> promise) {
@ -3353,17 +3388,28 @@ td::actor::ActorOwn<ValidatorManagerInterface> ValidatorManagerFactory::create(
}
void ValidatorManagerImpl::record_collate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time,
CollationStats stats) {
td::optional<CollationStats> stats) {
if (!stats) {
++(block_id.is_masterchain() ? total_collated_blocks_master_error_ : total_collated_blocks_shard_error_);
return;
}
auto &record = new_block_stats_record(block_id);
record.collator_work_time_ = work_time;
record.collator_cpu_work_time_ = cpu_work_time;
record.collator_stats_ = std::move(stats);
record.collator_stats_ = std::move(stats.value());
++(block_id.is_masterchain() ? total_collated_blocks_master_ok_ : total_collated_blocks_shard_ok_);
}
void ValidatorManagerImpl::record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time) {
void ValidatorManagerImpl::record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time,
bool success) {
auto &record = new_block_stats_record(block_id);
record.validator_work_time_ = work_time;
record.validator_cpu_work_time_ = cpu_work_time;
if (success) {
++(block_id.is_masterchain() ? total_validated_blocks_master_ok_ : total_validated_blocks_shard_ok_);
} else {
++(block_id.is_masterchain() ? total_validated_blocks_master_error_ : total_validated_blocks_shard_error_);
}
}
ValidatorManagerImpl::RecordedBlockStats &ValidatorManagerImpl::new_block_stats_record(BlockIdExt block_id) {
@ -3377,6 +3423,16 @@ ValidatorManagerImpl::RecordedBlockStats &ValidatorManagerImpl::new_block_stats_
return recorded_block_stats_[block_id];
}
void ValidatorManagerImpl::register_stats_provider(
td::uint64 idx, std::string prefix,
std::function<void(td::Promise<std::vector<std::pair<std::string, std::string>>>)> callback) {
stats_providers_[idx] = {std::move(prefix), std::move(callback)};
}
void ValidatorManagerImpl::unregister_stats_provider(td::uint64 idx) {
stats_providers_.erase(idx);
}
size_t ValidatorManagerImpl::CheckedExtMsgCounter::get_msg_count(WorkchainId wc, StdSmcAddress addr) {
before_query();
auto it1 = counter_cur_.find({wc, addr});

View file

@ -655,8 +655,9 @@ class ValidatorManagerImpl : public ValidatorManager {
td::optional<ShardIdFull> shard,
td::Promise<tl_object_ptr<lite_api::liteServer_nonfinal_validatorGroups>> promise) override;
void add_lite_query_stats(int lite_query_id) override {
void add_lite_query_stats(int lite_query_id, bool success) override {
++ls_stats_[lite_query_id];
++(success ? total_ls_queries_ok_ : total_ls_queries_error_)[lite_query_id];
}
private:
@ -747,6 +748,16 @@ class ValidatorManagerImpl : public ValidatorManager {
std::map<int, td::uint32> ls_stats_; // lite_api ID -> count, 0 for unknown
td::uint32 ls_stats_check_ext_messages_{0};
UnixTime started_at_ = (UnixTime)td::Clocks::system();
std::map<int, td::uint64> total_ls_queries_ok_, total_ls_queries_error_; // lite_api ID -> count, 0 for unknown
td::uint64 total_check_ext_messages_ok_{0}, total_check_ext_messages_error_{0};
td::uint64 total_collated_blocks_master_ok_{0}, total_collated_blocks_master_error_{0};
td::uint64 total_validated_blocks_master_ok_{0}, total_validated_blocks_master_error_{0};
td::uint64 total_collated_blocks_shard_ok_{0}, total_collated_blocks_shard_error_{0};
td::uint64 total_validated_blocks_shard_ok_{0}, total_validated_blocks_shard_error_{0};
size_t active_validator_groups_master_{0}, active_validator_groups_shard_{0};
td::actor::ActorOwn<CandidatesBuffer> candidates_buffer_;
struct RecordedBlockStats {
@ -760,16 +771,25 @@ class ValidatorManagerImpl : public ValidatorManager {
std::queue<BlockIdExt> recorded_block_stats_lru_;
void record_collate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time,
CollationStats stats) override;
void record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time) override;
td::optional<CollationStats> stats) override;
void record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time, bool success) override;
RecordedBlockStats &new_block_stats_record(BlockIdExt block_id);
void register_stats_provider(
td::uint64 idx, std::string prefix,
std::function<void(td::Promise<std::vector<std::pair<std::string, std::string>>>)> callback) override;
void unregister_stats_provider(td::uint64 idx) override;
std::map<PublicKeyHash, td::actor::ActorOwn<ValidatorTelemetry>> validator_telemetry_;
void init_validator_telemetry();
std::map<BlockSeqno, td::Ref<PersistentStateDescription>> persistent_state_descriptions_;
std::map<BlockIdExt, td::Ref<PersistentStateDescription>> persistent_state_blocks_;
std::map<td::uint64,
std::pair<std::string, std::function<void(td::Promise<std::vector<std::pair<std::string, std::string>>>)>>>
stats_providers_;
};
} // namespace validator

View file

@ -70,6 +70,7 @@ void DownloadState::finish_query() {
}
void DownloadState::start_up() {
status_ = ProcessStatus(validator_manager_, "process.download_state_net");
alarm_timestamp() = timeout_;
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_persistent_state, block_id_,
@ -190,6 +191,7 @@ void DownloadState::got_block_state_description(td::BufferSlice data) {
td::Timestamp::in(3.0), std::move(P));
}
}));
status_.set_status(PSTRING() << block_id_.id.to_str() << " : 0 bytes, 0B/s");
}
void DownloadState::got_block_state_part(td::BufferSlice data, td::uint32 requested_size) {
@ -198,14 +200,18 @@ void DownloadState::got_block_state_part(td::BufferSlice data, td::uint32 reques
parts_.push_back(std::move(data));
double elapsed = prev_logged_timer_.elapsed();
if (elapsed > 10.0) {
if (elapsed > 5.0) {
prev_logged_timer_ = td::Timer();
auto speed = (td::uint64)((double)(sum_ - prev_logged_sum_) / elapsed);
LOG(WARNING) << "downloading state " << block_id_.to_str() << ": " << td::format::as_size(sum_) << " ("
<< td::format::as_size((td::uint64)(double(sum_ - prev_logged_sum_) / elapsed)) << "/s)";
<< td::format::as_size(speed) << "/s)";
status_.set_status(PSTRING() << block_id_.id.to_str() << " : " << sum_ << " bytes, " << td::format::as_size(speed)
<< "/s");
prev_logged_sum_ = sum_;
}
if (last_part) {
status_.set_status(PSTRING() << block_id_.id.to_str() << " : " << sum_ << " bytes, finishing");
td::BufferSlice res{td::narrow_cast<std::size_t>(sum_)};
auto S = res.as_slice();
for (auto &p : parts_) {

View file

@ -23,6 +23,8 @@
#include "validator/validator.h"
#include "adnl/adnl-ext-client.h"
#include <stats-provider.h>
namespace ton {
namespace validator {
@ -75,6 +77,8 @@ class DownloadState : public td::actor::Actor {
td::uint64 prev_logged_sum_ = 0;
td::Timer prev_logged_timer_;
ProcessStatus status_;
};
} // namespace fullnode

View file

@ -58,6 +58,12 @@ void AsyncStateSerializer::got_self_state(AsyncSerializerState state) {
});
td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, last_block_id_, true, std::move(P));
}
inited_block_id_ = true;
for (auto& promise : wait_init_block_id_) {
promise.set_value(td::Unit());
}
wait_init_block_id_.clear();
}
void AsyncStateSerializer::got_init_handle(BlockHandle handle) {
@ -186,6 +192,9 @@ void AsyncStateSerializer::next_iteration() {
td::actor::send_closure(SelfId, &AsyncStateSerializer::request_previous_state_files);
},
td::Timestamp::in(delay));
current_status_ = PSTRING() << "delay before serializing seqno=" << masterchain_handle_->id().seqno() << " "
<< (int)delay << "s";
current_status_ts_ = td::Timestamp::now();
return;
}
if (next_idx_ < shards_.size()) {
@ -379,9 +388,14 @@ void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state
td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, masterchain_handle_->id(),
masterchain_handle_->id(), write_data, std::move(P));
current_status_ = PSTRING() << "serializing masterchain state " << state->get_block_id().id.to_str();
current_status_ts_ = td::Timestamp::now();
}
void AsyncStateSerializer::stored_masterchain_state() {
current_status_ = "pending";
current_status_ts_ = {};
LOG(ERROR) << "finished serializing masterchain state " << masterchain_handle_->id().id.to_str();
running_ = false;
next_iteration();
@ -444,9 +458,14 @@ void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref<ShardStat
});
td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, handle->id(),
masterchain_handle_->id(), write_data, std::move(P));
current_status_ = PSTRING() << "serializing shard state " << next_idx_ << "/" << shards_.size() << " "
<< state->get_block_id().id.to_str();
current_status_ts_ = td::Timestamp::now();
}
void AsyncStateSerializer::fail_handler(td::Status reason) {
current_status_ = PSTRING() << "pending, " << reason;
current_status_ts_ = {};
VLOG(VALIDATOR_NOTICE) << "failure: " << reason;
attempt_++;
delay_action(
@ -460,6 +479,8 @@ void AsyncStateSerializer::fail_handler_cont() {
}
void AsyncStateSerializer::success_handler() {
current_status_ = "pending";
current_status_ts_ = {};
running_ = false;
next_iteration();
}
@ -478,6 +499,29 @@ void AsyncStateSerializer::auto_disable_serializer(bool disabled) {
}
}
void AsyncStateSerializer::prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise) {
if (!inited_block_id_) {
wait_init_block_id_.push_back(
[SelfId = actor_id(this), promise = std::move(promise)](td::Result<td::Unit> R) mutable {
TRY_STATUS_PROMISE(promise, R.move_as_status());
td::actor::send_closure(SelfId, &AsyncStateSerializer::prepare_stats, std::move(promise));
});
return;
}
std::vector<std::pair<std::string, std::string>> vec;
vec.emplace_back("stateserializermasterchainseqno", td::to_string(last_block_id_.seqno()));
td::StringBuilder sb;
sb << current_status_;
if (current_status_ts_) {
sb << " (started " << (int)(td::Timestamp::now() - current_status_ts_) << "s ago)";
}
if (!opts_->get_state_serializer_enabled() || auto_disabled_) {
sb << " (disabled)";
}
vec.emplace_back("stateserializerstatus", sb.as_cslice().str());
promise.set_result(std::move(vec));
}
bool AsyncStateSerializer::need_serialize(BlockHandle handle) {
if (handle->id().id.seqno == 0 || !handle->is_key_block()) {
return false;

View file

@ -36,6 +36,9 @@ class AsyncStateSerializer : public td::actor::Actor {
UnixTime last_key_block_ts_ = 0;
bool saved_to_db_ = true;
bool inited_block_id_ = false;
std::vector<td::Promise<td::Unit>> wait_init_block_id_;
td::Ref<ValidatorManagerOptions> opts_;
bool auto_disabled_ = false;
td::CancellationTokenSource cancellation_token_source_;
@ -95,6 +98,8 @@ class AsyncStateSerializer : public td::actor::Actor {
promise.set_result(last_block_id_.id.seqno);
}
void prepare_stats(td::Promise<std::vector<std::pair<std::string, std::string>>> promise);
void update_last_known_key_block_ts(UnixTime ts) {
last_known_key_block_ts_ = std::max(last_known_key_block_ts_, ts);
}
@ -111,6 +116,9 @@ class AsyncStateSerializer : public td::actor::Actor {
void update_options(td::Ref<ValidatorManagerOptions> opts);
void auto_disable_serializer(bool disabled);
std::string current_status_ = "pending";
td::Timestamp current_status_ts_ = td::Timestamp::never();
};
} // namespace validator

105
validator/stats-provider.h Normal file
View file

@ -0,0 +1,105 @@
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "validator.h"
#include "common/AtomicRef.h"
#include <atomic>
namespace ton {
namespace validator {
class StatsProvider {
public:
StatsProvider() = default;
StatsProvider(td::actor::ActorId<ValidatorManagerInterface> manager, std::string prefix,
std::function<void(td::Promise<std::vector<std::pair<std::string, std::string>>>)> callback)
: inited_(true), manager_(std::move(manager)) {
static std::atomic<td::uint64> cur_idx{0};
idx_ = cur_idx.fetch_add(1);
td::actor::send_closure(manager_, &ValidatorManagerInterface::register_stats_provider, idx_, std::move(prefix),
std::move(callback));
}
StatsProvider(const StatsProvider&) = delete;
StatsProvider(StatsProvider&& other) noexcept
: inited_(other.inited_), idx_(other.idx_), manager_(std::move(other.manager_)) {
other.inited_ = false;
}
~StatsProvider() {
if (inited_) {
td::actor::send_closure(manager_, &ValidatorManagerInterface::unregister_stats_provider, idx_);
}
}
StatsProvider& operator=(const StatsProvider&) = delete;
StatsProvider& operator=(StatsProvider&& other) noexcept {
if (this != &other) {
inited_ = other.inited_;
idx_ = other.idx_;
manager_ = std::move(other.manager_);
other.inited_ = false;
}
return *this;
}
bool inited() const {
return inited_;
}
private:
bool inited_ = false;
td::uint64 idx_ = 0;
td::actor::ActorId<ValidatorManagerInterface> manager_;
};
class ProcessStatus {
public:
ProcessStatus() = default;
ProcessStatus(td::actor::ActorId<ValidatorManagerInterface> manager, std::string name)
: stats_provider_(std::move(manager), std::move(name), [value = value_](auto promise) {
auto status = value->load();
if (status.is_null()) {
promise.set_error(td::Status::Error("empty"));
return;
}
std::vector<std::pair<std::string, std::string>> vec;
vec.emplace_back("", *status);
promise.set_value(std::move(vec));
}) {
}
ProcessStatus(const ProcessStatus&) = delete;
ProcessStatus(ProcessStatus&& other) noexcept = default;
ProcessStatus& operator=(const ProcessStatus&) = delete;
ProcessStatus& operator=(ProcessStatus&& other) noexcept = default;
void set_status(std::string s) {
if (!value_) {
return;
}
value_->store(td::Ref<td::Cnt<std::string>>(true, std::move(s)));
}
private:
std::shared_ptr<td::AtomicRef<td::Cnt<std::string>>> value_ = std::make_shared<td::AtomicRef<td::Cnt<std::string>>>();
StatsProvider stats_provider_;
};
} // namespace validator
} // namespace ton

View file

@ -20,6 +20,7 @@
#include <vector>
#include <deque>
#include <functional>
#include "td/actor/actor.h"
@ -292,6 +293,13 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint64> promise) = 0;
virtual void update_options(td::Ref<ValidatorManagerOptions> opts) = 0;
virtual void register_stats_provider(
td::uint64 idx, std::string prefix,
std::function<void(td::Promise<std::vector<std::pair<std::string, std::string>>>)> callback) {
}
virtual void unregister_stats_provider(td::uint64 idx) {
}
};
} // namespace validator