/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see .
*/
#include "queue-size-counter.hpp"
#include "block/block-auto.h"
#include "block/block-parse.h"
#include "common/delay.h"
#include "td/actor/MultiPromise.h"
#include "td/utils/Random.h"
namespace ton::validator {
static td::Result calc_queue_size(const td::Ref &state) {
td::uint64 size = 0;
TRY_RESULT(outq_descr, state->message_queue());
block::gen::OutMsgQueueInfo::Record qinfo;
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
return td::Status::Error("invalid message queue");
}
vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
bool ok = queue.check_for_each([&](td::Ref, td::ConstBitPtr, int) -> bool {
++size;
return true;
});
if (!ok) {
return td::Status::Error("invalid message queue dict");
}
return size;
}
static td::Result recalc_queue_size(const td::Ref &state, const td::Ref &prev_state,
td::uint64 prev_size) {
TRY_RESULT(outq_descr, state->message_queue());
block::gen::OutMsgQueueInfo::Record qinfo;
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
return td::Status::Error("invalid message queue");
}
vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
TRY_RESULT(prev_outq_descr, prev_state->message_queue());
block::gen::OutMsgQueueInfo::Record prev_qinfo;
if (!tlb::unpack_cell(prev_outq_descr->root_cell(), prev_qinfo)) {
return td::Status::Error("invalid message queue");
}
vm::AugmentedDictionary prev_queue{prev_qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
td::uint64 add = 0, rem = 0;
bool ok = prev_queue.scan_diff(
queue, [&](td::ConstBitPtr, int, td::Ref prev_val, td::Ref new_val) -> bool {
if (prev_val.not_null()) {
++rem;
}
if (new_val.not_null()) {
++add;
}
return true;
});
if (!ok) {
return td::Status::Error("invalid message queue dict");
}
if (prev_size + add < rem) {
return td::Status::Error("negative value");
}
return prev_size + add - rem;
}
void QueueSizeCounter::start_up() {
if (init_masterchain_state_.is_null()) {
// Used in manager-hardfork or manager-disk
simple_mode_ = true;
return;
}
current_seqno_ = init_masterchain_state_->get_seqno();
process_top_shard_blocks_cont(init_masterchain_state_, true);
init_masterchain_state_ = {};
alarm();
}
void QueueSizeCounter::get_queue_size(BlockIdExt block_id, td::Promise promise) {
get_queue_size_ex(block_id, simple_mode_ || is_block_too_old(block_id), std::move(promise));
}
void QueueSizeCounter::get_queue_size_ex(ton::BlockIdExt block_id, bool calc_whole, td::Promise promise) {
Entry &entry = results_[block_id];
if (entry.done_) {
promise.set_result(entry.queue_size_);
return;
}
entry.promises_.push_back(std::move(promise));
if (entry.started_) {
return;
}
entry.started_ = true;
entry.calc_whole_ = calc_whole;
td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, block_id, true,
[SelfId = actor_id(this), block_id, manager = manager_](td::Result R) mutable {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, block_id, R.move_as_error());
return;
}
BlockHandle handle = R.move_as_ok();
td::actor::send_closure(
manager, &ValidatorManager::wait_block_state, handle, 0, td::Timestamp::in(10.0),
[SelfId, handle](td::Result> R) mutable {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, handle->id(),
R.move_as_error());
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont,
std::move(handle), R.move_as_ok());
});
});
}
void QueueSizeCounter::get_queue_size_cont(BlockHandle handle, td::Ref state) {
Entry &entry = results_[handle->id()];
CHECK(entry.started_);
bool calc_whole = entry.calc_whole_ || handle->id().seqno() == 0;
if (!calc_whole) {
CHECK(handle->inited_prev());
auto prev_blocks = handle->prev();
bool after_split = prev_blocks.size() == 1 && handle->id().shard_full() != prev_blocks[0].shard_full();
bool after_merge = prev_blocks.size() == 2;
calc_whole = after_split || after_merge;
}
if (calc_whole) {
auto r_size = calc_queue_size(state);
if (r_size.is_error()) {
on_error(handle->id(), r_size.move_as_error());
return;
}
entry.done_ = true;
entry.queue_size_ = r_size.move_as_ok();
for (auto &promise : entry.promises_) {
promise.set_result(entry.queue_size_);
}
entry.promises_.clear();
return;
}
auto prev_block_id = handle->one_prev(true);
get_queue_size(prev_block_id, [=, SelfId = actor_id(this), manager = manager_](td::Result R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error());
return;
}
td::uint64 prev_size = R.move_as_ok();
td::actor::send_closure(
manager, &ValidatorManager::wait_block_state_short, prev_block_id, 0, td::Timestamp::in(10.0),
[=](td::Result> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error());
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont2, state, R.move_as_ok(), prev_size);
});
});
}
void QueueSizeCounter::get_queue_size_cont2(td::Ref state, td::Ref prev_state,
td::uint64 prev_size) {
BlockIdExt block_id = state->get_block_id();
Entry &entry = results_[block_id];
CHECK(entry.started_);
auto r_size = recalc_queue_size(state, prev_state, prev_size);
if (r_size.is_error()) {
on_error(block_id, r_size.move_as_error());
return;
}
entry.done_ = true;
entry.queue_size_ = r_size.move_as_ok();
for (auto &promise : entry.promises_) {
promise.set_result(entry.queue_size_);
}
entry.promises_.clear();
}
void QueueSizeCounter::on_error(ton::BlockIdExt block_id, td::Status error) {
auto it = results_.find(block_id);
if (it == results_.end()) {
return;
}
Entry &entry = it->second;
CHECK(!entry.done_);
for (auto &promise : entry.promises_) {
promise.set_error(error.clone());
}
results_.erase(it);
}
void QueueSizeCounter::process_top_shard_blocks() {
LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks seqno=" << current_seqno_;
td::actor::send_closure(
manager_, &ValidatorManager::get_block_by_seqno_from_db, AccountIdPrefixFull{masterchainId, 0}, current_seqno_,
[SelfId = actor_id(this), manager = manager_](td::Result R) {
if (R.is_error()) {
LOG(WARNING) << "Failed to get masterchain block id: " << R.move_as_error();
delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); },
td::Timestamp::in(5.0));
return;
}
td::actor::send_closure(
manager, &ValidatorManager::wait_block_state_short, R.ok()->id(), 0, td::Timestamp::in(10.0),
[=](td::Result> R) {
if (R.is_error()) {
LOG(WARNING) << "Failed to get masterchain state: " << R.move_as_error();
delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); },
td::Timestamp::in(5.0));
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_cont,
td::Ref(R.move_as_ok()), false);
});
});
}
void QueueSizeCounter::process_top_shard_blocks_cont(td::Ref state, bool init) {
LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks_cont seqno=" << current_seqno_ << " init=" << init;
td::MultiPromise mp;
auto ig = mp.init_guard();
last_top_blocks_.clear();
last_top_blocks_.push_back(state->get_block_id());
for (auto &shard : state->get_shards()) {
if (opts_->need_monitor(shard->shard(), state)) {
last_top_blocks_.push_back(shard->top_block_id());
}
}
for (const BlockIdExt &block_id : last_top_blocks_) {
get_queue_size_ex_retry(block_id, init, ig.get_promise());
}
ig.add_promise([SelfId = actor_id(this)](td::Result R) {
if (R.is_error()) {
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_finish);
});
if (init) {
init_top_blocks_ = last_top_blocks_;
}
}
void QueueSizeCounter::get_queue_size_ex_retry(BlockIdExt block_id, bool calc_whole, td::Promise promise) {
get_queue_size_ex(block_id, calc_whole,
[=, promise = std::move(promise), SelfId = actor_id(this)](td::Result R) mutable {
if (R.is_error()) {
LOG(WARNING) << "Failed to calculate queue size for block " << block_id.to_str() << ": "
<< R.move_as_error();
delay_action(
[=, promise = std::move(promise)]() mutable {
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_ex_retry, block_id,
calc_whole, std::move(promise));
},
td::Timestamp::in(5.0));
return;
}
promise.set_result(td::Unit());
});
}
void QueueSizeCounter::process_top_shard_blocks_finish() {
++current_seqno_;
wait_shard_client();
}
void QueueSizeCounter::wait_shard_client() {
LOG(DEBUG) << "QueueSizeCounter::wait_shard_client seqno=" << current_seqno_;
td::actor::send_closure(
manager_, &ValidatorManager::wait_shard_client_state, current_seqno_, td::Timestamp::in(60.0),
[SelfId = actor_id(this)](td::Result R) {
if (R.is_error()) {
delay_action([=]() mutable { td::actor::send_closure(SelfId, &QueueSizeCounter::wait_shard_client); },
td::Timestamp::in(5.0));
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks);
});
}
void QueueSizeCounter::alarm() {
for (auto it = results_.begin(); it != results_.end();) {
if (it->second.done_ && is_block_too_old(it->first)) {
it = results_.erase(it);
} else {
++it;
}
}
alarm_timestamp() = td::Timestamp::in(td::Random::fast(20.0, 40.0));
}
} // namespace ton::validator