/* This file is part of TON Blockchain Library. TON Blockchain Library is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version. TON Blockchain Library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with TON Blockchain Library. If not, see . */ #include "queue-size-counter.hpp" #include "block/block-auto.h" #include "block/block-parse.h" #include "common/delay.h" #include "td/actor/MultiPromise.h" #include "td/utils/Random.h" namespace ton::validator { static td::Result calc_queue_size(const td::Ref &state) { td::uint64 size = 0; TRY_RESULT(outq_descr, state->message_queue()); block::gen::OutMsgQueueInfo::Record qinfo; if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { return td::Status::Error("invalid message queue"); } vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue}; bool ok = queue.check_for_each([&](td::Ref, td::ConstBitPtr, int) -> bool { ++size; return true; }); if (!ok) { return td::Status::Error("invalid message queue dict"); } return size; } static td::Result recalc_queue_size(const td::Ref &state, const td::Ref &prev_state, td::uint64 prev_size) { TRY_RESULT(outq_descr, state->message_queue()); block::gen::OutMsgQueueInfo::Record qinfo; if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { return td::Status::Error("invalid message queue"); } vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue}; TRY_RESULT(prev_outq_descr, prev_state->message_queue()); block::gen::OutMsgQueueInfo::Record prev_qinfo; if (!tlb::unpack_cell(prev_outq_descr->root_cell(), prev_qinfo)) { return td::Status::Error("invalid message queue"); } vm::AugmentedDictionary prev_queue{prev_qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue}; td::uint64 add = 0, rem = 0; bool ok = prev_queue.scan_diff( queue, [&](td::ConstBitPtr, int, td::Ref prev_val, td::Ref new_val) -> bool { if (prev_val.not_null()) { ++rem; } if (new_val.not_null()) { ++add; } return true; }); if (!ok) { return td::Status::Error("invalid message queue dict"); } if (prev_size + add < rem) { return td::Status::Error("negative value"); } return prev_size + add - rem; } void QueueSizeCounter::start_up() { if (init_masterchain_state_.is_null()) { // Used in manager-hardfork or manager-disk simple_mode_ = true; return; } current_seqno_ = init_masterchain_state_->get_seqno(); process_top_shard_blocks_cont(init_masterchain_state_, true); init_masterchain_state_ = {}; alarm(); } void QueueSizeCounter::get_queue_size(BlockIdExt block_id, td::Promise promise) { get_queue_size_ex(block_id, simple_mode_ || is_block_too_old(block_id), std::move(promise)); } void QueueSizeCounter::get_queue_size_ex(ton::BlockIdExt block_id, bool calc_whole, td::Promise promise) { Entry &entry = results_[block_id]; if (entry.done_) { promise.set_result(entry.queue_size_); return; } entry.promises_.push_back(std::move(promise)); if (entry.started_) { return; } entry.started_ = true; entry.calc_whole_ = calc_whole; td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, block_id, true, [SelfId = actor_id(this), block_id, manager = manager_](td::Result R) mutable { if (R.is_error()) { td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, block_id, R.move_as_error()); return; } BlockHandle handle = R.move_as_ok(); td::actor::send_closure( manager, &ValidatorManager::wait_block_state, handle, 0, td::Timestamp::in(10.0), [SelfId, handle](td::Result> R) mutable { if (R.is_error()) { td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, handle->id(), R.move_as_error()); return; } td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont, std::move(handle), R.move_as_ok()); }); }); } void QueueSizeCounter::get_queue_size_cont(BlockHandle handle, td::Ref state) { Entry &entry = results_[handle->id()]; CHECK(entry.started_); bool calc_whole = entry.calc_whole_ || handle->id().seqno() == 0; if (!calc_whole) { CHECK(handle->inited_prev()); auto prev_blocks = handle->prev(); bool after_split = prev_blocks.size() == 1 && handle->id().shard_full() != prev_blocks[0].shard_full(); bool after_merge = prev_blocks.size() == 2; calc_whole = after_split || after_merge; } if (calc_whole) { auto r_size = calc_queue_size(state); if (r_size.is_error()) { on_error(handle->id(), r_size.move_as_error()); return; } entry.done_ = true; entry.queue_size_ = r_size.move_as_ok(); for (auto &promise : entry.promises_) { promise.set_result(entry.queue_size_); } entry.promises_.clear(); return; } auto prev_block_id = handle->one_prev(true); get_queue_size(prev_block_id, [=, SelfId = actor_id(this), manager = manager_](td::Result R) { if (R.is_error()) { td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error()); return; } td::uint64 prev_size = R.move_as_ok(); td::actor::send_closure( manager, &ValidatorManager::wait_block_state_short, prev_block_id, 0, td::Timestamp::in(10.0), [=](td::Result> R) { if (R.is_error()) { td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error()); return; } td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont2, state, R.move_as_ok(), prev_size); }); }); } void QueueSizeCounter::get_queue_size_cont2(td::Ref state, td::Ref prev_state, td::uint64 prev_size) { BlockIdExt block_id = state->get_block_id(); Entry &entry = results_[block_id]; CHECK(entry.started_); auto r_size = recalc_queue_size(state, prev_state, prev_size); if (r_size.is_error()) { on_error(block_id, r_size.move_as_error()); return; } entry.done_ = true; entry.queue_size_ = r_size.move_as_ok(); for (auto &promise : entry.promises_) { promise.set_result(entry.queue_size_); } entry.promises_.clear(); } void QueueSizeCounter::on_error(ton::BlockIdExt block_id, td::Status error) { auto it = results_.find(block_id); if (it == results_.end()) { return; } Entry &entry = it->second; CHECK(!entry.done_); for (auto &promise : entry.promises_) { promise.set_error(error.clone()); } results_.erase(it); } void QueueSizeCounter::process_top_shard_blocks() { LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks seqno=" << current_seqno_; td::actor::send_closure( manager_, &ValidatorManager::get_block_by_seqno_from_db, AccountIdPrefixFull{masterchainId, 0}, current_seqno_, [SelfId = actor_id(this), manager = manager_](td::Result R) { if (R.is_error()) { LOG(WARNING) << "Failed to get masterchain block id: " << R.move_as_error(); delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); }, td::Timestamp::in(5.0)); return; } td::actor::send_closure( manager, &ValidatorManager::wait_block_state_short, R.ok()->id(), 0, td::Timestamp::in(10.0), [=](td::Result> R) { if (R.is_error()) { LOG(WARNING) << "Failed to get masterchain state: " << R.move_as_error(); delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); }, td::Timestamp::in(5.0)); return; } td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_cont, td::Ref(R.move_as_ok()), false); }); }); } void QueueSizeCounter::process_top_shard_blocks_cont(td::Ref state, bool init) { LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks_cont seqno=" << current_seqno_ << " init=" << init; td::MultiPromise mp; auto ig = mp.init_guard(); last_top_blocks_.clear(); last_top_blocks_.push_back(state->get_block_id()); for (auto &shard : state->get_shards()) { if (opts_->need_monitor(shard->shard(), state)) { last_top_blocks_.push_back(shard->top_block_id()); } } for (const BlockIdExt &block_id : last_top_blocks_) { get_queue_size_ex_retry(block_id, init, ig.get_promise()); } ig.add_promise([SelfId = actor_id(this)](td::Result R) { if (R.is_error()) { return; } td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_finish); }); if (init) { init_top_blocks_ = last_top_blocks_; } } void QueueSizeCounter::get_queue_size_ex_retry(BlockIdExt block_id, bool calc_whole, td::Promise promise) { get_queue_size_ex(block_id, calc_whole, [=, promise = std::move(promise), SelfId = actor_id(this)](td::Result R) mutable { if (R.is_error()) { LOG(WARNING) << "Failed to calculate queue size for block " << block_id.to_str() << ": " << R.move_as_error(); delay_action( [=, promise = std::move(promise)]() mutable { td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_ex_retry, block_id, calc_whole, std::move(promise)); }, td::Timestamp::in(5.0)); return; } promise.set_result(td::Unit()); }); } void QueueSizeCounter::process_top_shard_blocks_finish() { ++current_seqno_; wait_shard_client(); } void QueueSizeCounter::wait_shard_client() { LOG(DEBUG) << "QueueSizeCounter::wait_shard_client seqno=" << current_seqno_; td::actor::send_closure( manager_, &ValidatorManager::wait_shard_client_state, current_seqno_, td::Timestamp::in(60.0), [SelfId = actor_id(this)](td::Result R) { if (R.is_error()) { delay_action([=]() mutable { td::actor::send_closure(SelfId, &QueueSizeCounter::wait_shard_client); }, td::Timestamp::in(5.0)); return; } td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); }); } void QueueSizeCounter::alarm() { for (auto it = results_.begin(); it != results_.end();) { if (it->second.done_ && is_block_too_old(it->first)) { it = results_.erase(it); } else { ++it; } } alarm_timestamp() = td::Timestamp::in(td::Random::fast(20.0, 40.0)); } } // namespace ton::validator