1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-12 11:12:16 +00:00

Improve handling outbound message queues (#825)

* Improve handling outbound message queues

* Cleanup queue faster
* Calculate queue sizes in background
* Force or limit split/merge depending on queue size

* Increase validate_ref limit for transaction

* Add all changes of public libraries to block size estimation

* Don't crash on timeout in GC

* Don't import external messages when queue is too big

---------

Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
EmelyanenkoK 2023-12-13 12:57:34 +03:00 committed by GitHub
parent 3a595ce849
commit 5e6b67ae96
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 727 additions and 111 deletions

View file

@ -715,7 +715,7 @@ td::uint64 BlockLimitStatus::estimate_block_size(const vm::NewCellStorageStat::S
sum += *extra;
}
return 2000 + (sum.bits >> 3) + sum.cells * 12 + sum.internal_refs * 3 + sum.external_refs * 40 + accounts * 200 +
transactions * 200 + (extra ? 200 : 0) + extra_out_msgs * 300 + extra_library_diff * 700;
transactions * 200 + (extra ? 200 : 0) + extra_out_msgs * 300 + public_library_diff * 700;
}
int BlockLimitStatus::classify() const {
@ -1009,8 +1009,8 @@ td::Status ShardState::merge_with(ShardState& sib) {
return td::Status::OK();
}
td::Result<std::unique_ptr<vm::AugmentedDictionary>> ShardState::compute_split_out_msg_queue(
ton::ShardIdFull subshard) {
td::Result<std::unique_ptr<vm::AugmentedDictionary>> ShardState::compute_split_out_msg_queue(ton::ShardIdFull subshard,
td::uint32* queue_size) {
auto shard = id_.shard_full();
if (!ton::shard_is_parent(shard, subshard)) {
return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() +
@ -1018,7 +1018,7 @@ td::Result<std::unique_ptr<vm::AugmentedDictionary>> ShardState::compute_split_o
}
CHECK(out_msg_queue_);
auto subqueue = std::make_unique<vm::AugmentedDictionary>(*out_msg_queue_);
int res = block::filter_out_msg_queue(*subqueue, shard, subshard);
int res = block::filter_out_msg_queue(*subqueue, shard, subshard, queue_size);
if (res < 0) {
return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str());
}
@ -1040,7 +1040,7 @@ td::Result<std::shared_ptr<block::MsgProcessedUptoCollection>> ShardState::compu
return std::move(sub_processed_upto);
}
td::Status ShardState::split(ton::ShardIdFull subshard) {
td::Status ShardState::split(ton::ShardIdFull subshard, td::uint32* queue_size) {
if (!ton::shard_is_parent(id_.shard_full(), subshard)) {
return td::Status::Error(-666, "cannot split subshard "s + subshard.to_str() + " from state of " + id_.to_str() +
" because it is not a parent");
@ -1058,7 +1058,7 @@ td::Status ShardState::split(ton::ShardIdFull subshard) {
auto shard1 = id_.shard_full();
CHECK(ton::shard_is_parent(shard1, subshard));
CHECK(out_msg_queue_);
int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard);
int res1 = block::filter_out_msg_queue(*out_msg_queue_, shard1, subshard, queue_size);
if (res1 < 0) {
return td::Status::Error(-666, "error splitting OutMsgQueue of "s + id_.to_str());
}
@ -1098,8 +1098,12 @@ td::Status ShardState::split(ton::ShardIdFull subshard) {
return td::Status::OK();
}
int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard) {
return out_queue.filter([subshard, old_shard](vm::CellSlice& cs, td::ConstBitPtr key, int key_len) -> int {
int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard,
td::uint32* queue_size) {
if (queue_size) {
*queue_size = 0;
}
return out_queue.filter([=](vm::CellSlice& cs, td::ConstBitPtr key, int key_len) -> int {
CHECK(key_len == 352);
LOG(DEBUG) << "scanning OutMsgQueue entry with key " << key.to_hex(key_len);
block::tlb::MsgEnvelope::Record_std env;
@ -1122,7 +1126,11 @@ int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull ol
<< " does not contain current address belonging to shard " << old_shard.to_str();
return -1;
}
return ton::shard_contains(subshard, cur_prefix);
bool res = ton::shard_contains(subshard, cur_prefix);
if (res && queue_size) {
++*queue_size;
}
return res;
});
}

View file

@ -262,7 +262,7 @@ struct BlockLimitStatus {
td::uint64 gas_used{};
vm::NewCellStorageStat st_stat;
unsigned accounts{}, transactions{}, extra_out_msgs{};
unsigned extra_library_diff{}; // Number of public libraries in deleted/frozen accounts
unsigned public_library_diff{};
BlockLimitStatus(const BlockLimits& limits_, ton::LogicalTime lt = 0)
: limits(limits_), cur_lt(std::max(limits_.start_lt, lt)) {
}
@ -272,7 +272,7 @@ struct BlockLimitStatus {
transactions = accounts = 0;
gas_used = 0;
extra_out_msgs = 0;
extra_library_diff = 0;
public_library_diff = 0;
}
td::uint64 estimate_block_size(const vm::NewCellStorageStat::Stat* extra = nullptr) const;
int classify() const;
@ -433,10 +433,11 @@ struct ShardState {
ton::BlockSeqno prev_mc_block_seqno, bool after_split, bool clear_history,
std::function<bool(ton::BlockSeqno)> for_each_mcseqno);
td::Status merge_with(ShardState& sib);
td::Result<std::unique_ptr<vm::AugmentedDictionary>> compute_split_out_msg_queue(ton::ShardIdFull subshard);
td::Result<std::unique_ptr<vm::AugmentedDictionary>> compute_split_out_msg_queue(ton::ShardIdFull subshard,
td::uint32* queue_size = nullptr);
td::Result<std::shared_ptr<block::MsgProcessedUptoCollection>> compute_split_processed_upto(
ton::ShardIdFull subshard);
td::Status split(ton::ShardIdFull subshard);
td::Status split(ton::ShardIdFull subshard, td::uint32* queue_size = nullptr);
td::Status unpack_out_msg_queue_info(Ref<vm::Cell> out_msg_queue_info);
bool clear_load_history() {
overload_history_ = underload_history_ = 0;
@ -656,7 +657,8 @@ class MtCarloComputeShare {
void gen_vset();
};
int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard);
int filter_out_msg_queue(vm::AugmentedDictionary& out_queue, ton::ShardIdFull old_shard, ton::ShardIdFull subshard,
td::uint32* queue_size = nullptr);
std::ostream& operator<<(std::ostream& os, const ShardId& shard_id);

View file

@ -146,22 +146,30 @@ bool OutputQueueMerger::add_root(int src, Ref<vm::Cell> outmsg_root) {
return true;
}
OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<block::McShardDescr> _neighbors)
OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<Neighbor> _neighbors)
: queue_for(_queue_for), neighbors(std::move(_neighbors)), eof(false), failed(false) {
init();
}
OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<block::McShardDescr> _neighbors)
: queue_for(_queue_for), eof(false), failed(false) {
for (auto& nb : _neighbors) {
neighbors.emplace_back(nb.top_block_id(), nb.outmsg_root, nb.is_disabled());
}
init();
}
void OutputQueueMerger::init() {
common_pfx.bits().store_int(queue_for.workchain, 32);
int l = queue_for.pfx_len();
td::bitstring::bits_store_long_top(common_pfx.bits() + 32, queue_for.shard, l);
common_pfx_len = 32 + l;
int i = 0;
for (block::McShardDescr& neighbor : neighbors) {
if (!neighbor.is_disabled()) {
LOG(DEBUG) << "adding " << (neighbor.outmsg_root.is_null() ? "" : "non-") << "empty output queue for neighbor #"
<< i << " (" << neighbor.blk_.to_str() << ")";
add_root(i++, neighbor.outmsg_root);
for (Neighbor& neighbor : neighbors) {
if (!neighbor.disabled_) {
LOG(DEBUG) << "adding " << (neighbor.outmsg_root_.is_null() ? "" : "non-") << "empty output queue for neighbor #"
<< i << " (" << neighbor.block_id_.to_str() << ")";
add_root(i++, neighbor.outmsg_root_);
} else {
LOG(DEBUG) << "skipping output queue for disabled neighbor #" << i;
i++;

View file

@ -51,12 +51,22 @@ struct OutputQueueMerger {
bool unpack_node(td::ConstBitPtr key_pfx, int key_pfx_len, Ref<vm::Cell> node);
bool split(MsgKeyValue& second);
};
struct Neighbor {
ton::BlockIdExt block_id_;
td::Ref<vm::Cell> outmsg_root_;
bool disabled_;
Neighbor() = default;
Neighbor(ton::BlockIdExt block_id, td::Ref<vm::Cell> outmsg_root, bool disabled = false)
: block_id_(block_id), outmsg_root_(std::move(outmsg_root)), disabled_(disabled) {
}
};
//
ton::ShardIdFull queue_for;
std::vector<std::unique_ptr<MsgKeyValue>> msg_list;
std::vector<block::McShardDescr> neighbors;
std::vector<Neighbor> neighbors;
public:
OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<Neighbor> _neighbors);
OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector<block::McShardDescr> _neighbors);
bool is_eof() const {
return eof;

View file

@ -2536,6 +2536,31 @@ static td::uint32 get_public_libraries_count(const td::Ref<vm::Cell>& libraries)
return count;
}
/**
* Calculates the number of changes of public libraries in the dictionary.
*
* @param old_libraries The dictionary of account libraries before the transaction.
* @param new_libraries The dictionary of account libraries after the transaction.
*
* @returns The number of changed public libraries.
*/
static td::uint32 get_public_libraries_diff_count(const td::Ref<vm::Cell>& old_libraries,
const td::Ref<vm::Cell>& new_libraries) {
td::uint32 count = 0;
vm::Dictionary dict1{old_libraries, 256};
vm::Dictionary dict2{new_libraries, 256};
dict1.scan_diff(dict2, [&](td::ConstBitPtr key, int n, Ref<vm::CellSlice> val1, Ref<vm::CellSlice> val2) -> bool {
CHECK(n == 256);
bool is_public1 = val1.not_null() && block::is_public_library(key, val1);
bool is_public2 = val2.not_null() && block::is_public_library(key, val2);
if (is_public1 != is_public2) {
++count;
}
return true;
});
return count;
}
/**
* Checks that the new account state fits in the limits.
* This function is not called for special accounts.
@ -2979,14 +3004,14 @@ bool Transaction::serialize() {
vm::load_cell_slice(root).print_rec(std::cerr);
}
if (!block::gen::t_Transaction.validate_ref(root)) {
if (!block::gen::t_Transaction.validate_ref(4096, root)) {
LOG(ERROR) << "newly-generated transaction failed to pass automated validation:";
vm::load_cell_slice(root).print_rec(std::cerr);
block::gen::t_Transaction.print_ref(std::cerr, root);
root.clear();
return false;
}
if (!block::tlb::t_Transaction.validate_ref(root)) {
if (!block::tlb::t_Transaction.validate_ref(4096, root)) {
LOG(ERROR) << "newly-generated transaction failed to pass hand-written validation:";
vm::load_cell_slice(root).print_rec(std::cerr);
block::gen::t_Transaction.print_ref(std::cerr, root);
@ -3187,8 +3212,12 @@ bool Transaction::update_limits(block::BlockLimitStatus& blimst, bool with_size)
blimst.add_account(is_first))) {
return false;
}
if (account.is_masterchain() && (was_frozen || was_deleted)) {
blimst.extra_library_diff += get_public_libraries_count(account.orig_library);
if (account.is_masterchain()) {
if (was_frozen || was_deleted) {
blimst.public_library_diff += get_public_libraries_count(account.orig_library);
} else {
blimst.public_library_diff += get_public_libraries_diff_count(account.orig_library, new_library);
}
}
}
return true;

View file

@ -3397,6 +3397,19 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getShardO
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "no such block")));
return;
}
if (!dest) {
td::actor::send_closure(
manager, &ton::validator::ValidatorManagerInterface::get_out_msg_queue_size, handle->id(),
[promise = std::move(promise)](td::Result<td::uint32> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error_prefix("failed to get queue size: ")));
} else {
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::engine_validator_shardOutQueueSize>(
R.move_as_ok()));
}
});
return;
}
td::actor::send_closure(
manager, &ton::validator::ValidatorManagerInterface::get_shard_state_from_db, handle,
[=, promise = std::move(promise)](td::Result<td::Ref<ton::validator::ShardState>> R) mutable {

View file

@ -52,6 +52,7 @@ set(VALIDATOR_HEADERS
invariants.hpp
import-db-slice.hpp
queue-size-counter.hpp
manager-disk.h
manager-disk.hpp
@ -77,6 +78,7 @@ set(VALIDATOR_SOURCE
validator-full-id.cpp
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp
downloaders/wait-block-data.cpp
downloaders/wait-block-state.cpp

View file

@ -60,7 +60,6 @@ class Collator final : public td::actor::Actor {
bool preinit_complete{false};
bool is_key_block_{false};
bool block_full_{false};
bool outq_cleanup_partial_{false};
bool inbound_queues_empty_{false};
bool libraries_changed_{false};
bool prev_key_block_exists_{false};
@ -159,7 +158,6 @@ class Collator final : public td::actor::Actor {
bool report_version_{false};
bool skip_topmsgdescr_{false};
bool skip_extmsg_{false};
bool queue_too_big_{false};
bool short_dequeue_records_{false};
td::uint64 overload_history_{0}, underload_history_{0};
td::uint64 block_size_estimate_{};
@ -189,6 +187,7 @@ class Collator final : public td::actor::Actor {
std::priority_queue<NewOutMsg, std::vector<NewOutMsg>, std::greater<NewOutMsg>> new_msgs;
std::pair<ton::LogicalTime, ton::Bits256> last_proc_int_msg_, first_unproc_int_msg_;
std::unique_ptr<vm::AugmentedDictionary> in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_;
td::uint32 out_msg_queue_size_ = 0;
std::unique_ptr<vm::Dictionary> ihr_pending;
std::shared_ptr<block::MsgProcessedUptoCollection> processed_upto_, sibling_processed_upto_;
std::unique_ptr<vm::Dictionary> block_create_stats_;
@ -227,6 +226,7 @@ class Collator final : public td::actor::Actor {
bool fix_one_processed_upto(block::MsgProcessedUpto& proc, const ton::ShardIdFull& owner);
bool fix_processed_upto(block::MsgProcessedUptoCollection& upto);
void got_neighbor_out_queue(int i, td::Result<Ref<MessageQueue>> res);
void got_out_queue_size(size_t i, td::Result<td::uint32> res);
bool adjust_shard_config();
bool store_shard_fees(ShardIdFull shard, const block::CurrencyCollection& fees,
const block::CurrencyCollection& created);
@ -260,6 +260,7 @@ class Collator final : public td::actor::Actor {
bool check_prev_block_exact(const BlockIdExt& listed, const BlockIdExt& prev);
bool check_this_shard_mc_info();
bool request_neighbor_msg_queues();
bool request_out_msg_queue_size();
void update_max_lt(ton::LogicalTime lt);
bool is_masterchain() const {
return shard_.is_masterchain();

View file

@ -44,6 +44,12 @@ namespace validator {
using td::Ref;
using namespace std::literals::string_literals;
// Don't increase MERGE_MAX_QUEUE_LIMIT too much: merging requires cleaning the whole queue in out_msg_queue_cleanup
static const td::uint32 FORCE_SPLIT_QUEUE_SIZE = 4096;
static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000;
static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047;
static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000;
#define DBG(__n) dbg(__n)&&
#define DSTART int __dcnt = 0;
#define DEB DBG(++__dcnt)
@ -790,6 +796,26 @@ bool Collator::request_neighbor_msg_queues() {
return true;
}
/**
* Requests the size of the outbound message queue from the previous state(s).
*
* @returns True if the request was successful, false otherwise.
*/
bool Collator::request_out_msg_queue_size() {
if (after_split_) {
// If block is after split, the size is calculated during split (see Collator::split_last_state)
return true;
}
for (size_t i = 0; i < prev_blocks.size(); ++i) {
++pending;
send_closure_later(manager, &ValidatorManager::get_out_msg_queue_size, prev_blocks[i],
[self = get_self(), i](td::Result<td::uint32> res) {
td::actor::send_closure(std::move(self), &Collator::got_out_queue_size, i, std::move(res));
});
}
return true;
}
/**
* Handles the result of obtaining the outbound queue for a neighbor.
*
@ -854,6 +880,27 @@ void Collator::got_neighbor_out_queue(int i, td::Result<Ref<MessageQueue>> res)
check_pending();
}
/**
* Handles the result of obtaining the size of the outbound message queue.
*
* If the block is after merge then the two sizes are added.
*
* @param i The index of the previous block (0 or 1).
* @param res The result object containing the size of the queue.
*/
void Collator::got_out_queue_size(size_t i, td::Result<td::uint32> res) {
--pending;
if (res.is_error()) {
fatal_error(
res.move_as_error_prefix(PSTRING() << "failed to get message queue size from prev block #" << i << ": "));
return;
}
td::uint32 size = res.move_as_ok();
LOG(DEBUG) << "got outbound queue size from prev block #" << i << ": " << size;
out_msg_queue_size_ += size;
check_pending();
}
/**
* Unpacks and merges the states of two previous blocks.
* Used if the block is after_merge.
@ -972,7 +1019,7 @@ bool Collator::split_last_state(block::ShardState& ss) {
return fatal_error(res2.move_as_error());
}
sibling_processed_upto_ = res2.move_as_ok();
auto res3 = ss.split(shard_);
auto res3 = ss.split(shard_, &out_msg_queue_size_);
if (res3.is_error()) {
return fatal_error(std::move(res3));
}
@ -1449,6 +1496,9 @@ bool Collator::do_preinit() {
if (!request_neighbor_msg_queues()) {
return false;
}
if (!request_out_msg_queue_size()) {
return false;
}
return true;
}
@ -1824,7 +1874,6 @@ bool Collator::init_utime() {
// Extend collator timeout if previous block is too old
td::Timestamp new_timeout = td::Timestamp::in(std::min(30.0, (td::Clocks::system() - (double)prev_now_) / 2));
if (timeout < new_timeout) {
double add = new_timeout.at() - timeout.at();
timeout = new_timeout;
alarm_timestamp() = timeout;
}
@ -2174,95 +2223,144 @@ bool Collator::out_msg_queue_cleanup() {
block::gen::t_OutMsgQueue.print(std::cerr, *rt);
rt->print_rec(std::cerr);
}
for (const auto& nb : neighbors_) {
if (!nb.is_disabled() && (!nb.processed_upto || !nb.processed_upto->can_check_processed())) {
return fatal_error(-667, PSTRING() << "internal error: no info for checking processed messages from neighbor "
<< nb.blk_.to_str());
}
}
auto queue_root = out_msg_queue_->get_root_cell();
if (queue_root.is_null()) {
LOG(DEBUG) << "out_msg_queue is empty";
return true;
}
auto old_out_msg_queue = std::make_unique<vm::AugmentedDictionary>(queue_root, 352, block::tlb::aug_OutMsgQueue);
if (after_merge_) {
// We need to clean the whole queue after merge
// Queue is not too big, see const MERGE_MAX_QUEUE_SIZE
for (const auto& nb : neighbors_) {
if (!nb.is_disabled() && (!nb.processed_upto || !nb.processed_upto->can_check_processed())) {
return fatal_error(-667, PSTRING() << "internal error: no info for checking processed messages from neighbor "
<< nb.blk_.to_str());
}
}
td::uint32 deleted = 0;
auto res = out_msg_queue_->filter([&](vm::CellSlice& cs, td::ConstBitPtr key, int n) -> int {
assert(n == 352);
block::EnqueuedMsgDescr enq_msg_descr;
unsigned long long created_lt;
if (!(cs.fetch_ulong_bool(64, created_lt) // augmentation
&& enq_msg_descr.unpack(cs) // unpack EnqueuedMsg
&& enq_msg_descr.check_key(key) // check key
&& enq_msg_descr.lt_ == created_lt)) {
LOG(ERROR) << "cannot unpack EnqueuedMsg with key " << key.to_hex(n);
return -1;
}
LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
<< enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_;
bool delivered = false;
ton::LogicalTime deliver_lt = 0;
for (const auto& neighbor : neighbors_) {
// could look up neighbor with shard containing enq_msg_descr.next_prefix more efficiently
// (instead of checking all neighbors)
if (!neighbor.is_disabled() && neighbor.processed_upto->already_processed(enq_msg_descr)) {
delivered = true;
deliver_lt = neighbor.end_lt();
break;
}
}
if (delivered) {
++deleted;
CHECK(out_msg_queue_size_ > 0);
--out_msg_queue_size_;
LOG(DEBUG) << "outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," << enq_msg_descr.hash_.to_hex()
<< ") enqueued_lt=" << enq_msg_descr.enqueued_lt_ << " has been already delivered, dequeueing";
if (!dequeue_message(std::move(enq_msg_descr.msg_env_), deliver_lt)) {
fatal_error(PSTRING() << "cannot dequeue outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
<< enq_msg_descr.hash_.to_hex() << ") by inserting a msg_export_deq record");
return -1;
}
register_out_msg_queue_op();
if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) {
block_full_ = true;
}
}
return !delivered;
});
LOG(INFO) << "deleted " << deleted << " messages from out_msg_queue after merge, remaining queue size is "
<< out_msg_queue_size_;
if (res < 0) {
return fatal_error("error scanning/updating OutMsgQueue");
}
} else {
std::vector<std::pair<block::OutputQueueMerger, const block::McShardDescr*>> queue_parts;
int deleted = 0;
int total = 0;
bool fail = false;
old_out_msg_queue->check_for_each([&](Ref<vm::CellSlice> value, td::ConstBitPtr key, int n) -> bool {
++total;
assert(n == 352);
vm::CellSlice& cs = value.write();
// LOG(DEBUG) << "key is " << key.to_hex(n);
if (queue_cleanup_timeout_.is_in_past(td::Timestamp::now())) {
LOG(WARNING) << "cleaning up outbound queue takes too long, ending";
outq_cleanup_partial_ = true;
return false; // retain all remaining outbound queue entries including this one without processing
block::OutputQueueMerger::Neighbor this_queue{BlockIdExt{new_id} /* block id is only used for logs */,
out_msg_queue_->get_root_cell()};
for (const auto& nb : neighbors_) {
if (nb.is_disabled()) {
continue;
}
if (!nb.processed_upto || !nb.processed_upto->can_check_processed()) {
return fatal_error(-667, PSTRING() << "internal error: no info for checking processed messages from neighbor "
<< nb.blk_.to_str());
}
queue_parts.emplace_back(block::OutputQueueMerger{nb.shard(), {this_queue}}, &nb);
}
if (block_full_) {
LOG(WARNING) << "BLOCK FULL while cleaning up outbound queue, cleanup completed only partially";
outq_cleanup_partial_ = true;
return false; // retain all remaining outbound queue entries including this one without processing
}
block::EnqueuedMsgDescr enq_msg_descr;
unsigned long long created_lt;
if (!(cs.fetch_ulong_bool(64, created_lt) // augmentation
&& enq_msg_descr.unpack(cs) // unpack EnqueuedMsg
&& enq_msg_descr.check_key(key) // check key
&& enq_msg_descr.lt_ == created_lt)) {
LOG(ERROR) << "cannot unpack EnqueuedMsg with key " << key.to_hex(n);
fail = true;
return false;
}
LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
<< enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_;
bool delivered = false;
ton::LogicalTime deliver_lt = 0;
for (const auto& neighbor : neighbors_) {
// could look up neighbor with shard containing enq_msg_descr.next_prefix more efficiently
// (instead of checking all neighbors)
if (!neighbor.is_disabled() && neighbor.processed_upto->already_processed(enq_msg_descr)) {
delivered = true;
deliver_lt = neighbor.end_lt();
size_t i = 0;
td::uint32 deleted = 0;
while (!queue_parts.empty()) {
if (block_full_) {
LOG(WARNING) << "BLOCK FULL while cleaning up outbound queue, cleanup completed only partially";
break;
}
}
if (delivered) {
LOG(DEBUG) << "outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," << enq_msg_descr.hash_.to_hex()
<< ") enqueued_lt=" << enq_msg_descr.enqueued_lt_ << " has been already delivered, dequeueing";
++deleted;
out_msg_queue_->lookup_delete_with_extra(key, n);
if (!dequeue_message(std::move(enq_msg_descr.msg_env_), deliver_lt)) {
fatal_error(PSTRING() << "cannot dequeue outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
<< enq_msg_descr.hash_.to_hex() << ") by inserting a msg_export_deq record");
fail = true;
return false;
if (queue_cleanup_timeout_.is_in_past(td::Timestamp::now())) {
LOG(WARNING) << "cleaning up outbound queue takes too long, ending";
break;
}
register_out_msg_queue_op();
if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) {
block_full_ = true;
if (i == queue_parts.size()) {
i = 0;
}
auto& queue = queue_parts.at(i).first;
auto nb = queue_parts.at(i).second;
auto kv = queue.extract_cur();
if (kv) {
block::EnqueuedMsgDescr enq_msg_descr;
if (!(enq_msg_descr.unpack(kv->msg.write()) // unpack EnqueuedMsg
&& enq_msg_descr.check_key(kv->key.cbits()) // check key
)) {
return fatal_error(PSTRING() << "error scanning/updating OutMsgQueue: cannot unpack EnqueuedMsg with key "
<< kv->key.to_hex());
}
if (nb->processed_upto->already_processed(enq_msg_descr)) {
LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
<< enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_
<< ": message has been already delivered, dequeueing";
++deleted;
CHECK(out_msg_queue_size_ > 0);
--out_msg_queue_size_;
out_msg_queue_->lookup_delete_with_extra(kv->key.cbits(), kv->key_len);
if (!dequeue_message(std::move(enq_msg_descr.msg_env_), nb->end_lt())) {
return fatal_error(PSTRING() << "cannot dequeue outbound message with (lt,hash)=(" << enq_msg_descr.lt_
<< "," << enq_msg_descr.hash_.to_hex()
<< ") by inserting a msg_export_deq record");
}
register_out_msg_queue_op();
if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) {
block_full_ = true;
}
queue.next();
++i;
continue;
} else {
LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
<< enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_
<< ": message has not been delivered";
}
}
LOG(DEBUG) << "no more unprocessed messages to shard " << nb->shard().to_str();
std::swap(queue_parts[i], queue_parts.back());
queue_parts.pop_back();
}
return true;
}, false, true /* random order */);
LOG(INFO) << "deleted " << deleted << " messages from out_msg_queue, processed " << total << " messages in total";
if (fail) {
return fatal_error("error scanning/updating OutMsgQueue");
LOG(INFO) << "deleted " << deleted << " messages from out_msg_queue, remaining queue size is "
<< out_msg_queue_size_;
}
if (outq_cleanup_partial_ || total > 8000) {
LOG(INFO) << "out_msg_queue too big, skipping importing external messages";
skip_extmsg_ = true;
queue_too_big_ = true;
}
auto rt = out_msg_queue_->get_root();
if (verbosity >= 2) {
auto rt = out_msg_queue_->get_root();
std::cerr << "new out_msg_queue is ";
block::gen::t_OutMsgQueue.print(std::cerr, *rt);
rt->print_rec(std::cerr);
}
// CHECK(block::gen::t_OutMsgQueue.validate_upto(100000, *rt)); // DEBUG, comment later if SLOW
return register_out_msg_queue_op(true);
}
@ -3047,6 +3145,7 @@ bool Collator::enqueue_transit_message(Ref<vm::Cell> msg, Ref<vm::Cell> old_msg_
try {
LOG(DEBUG) << "inserting into outbound queue message with (lt,key)=(" << start_lt << "," << key.to_hex() << ")";
ok = out_msg_queue_->set_builder(key.bits(), 352, cb, vm::Dictionary::SetMode::Add);
++out_msg_queue_size_;
} catch (vm::VmError) {
ok = false;
}
@ -3069,6 +3168,8 @@ bool Collator::delete_out_msg_queue_msg(td::ConstBitPtr key) {
try {
LOG(DEBUG) << "deleting from outbound queue message with key=" << key.to_hex(352);
queue_rec = out_msg_queue_->lookup_delete(key, 352);
CHECK(out_msg_queue_size_ > 0);
--out_msg_queue_size_;
} catch (vm::VmError err) {
LOG(ERROR) << "error deleting from out_msg_queue dictionary: " << err.get_msg();
}
@ -3309,8 +3410,9 @@ bool Collator::process_inbound_external_messages() {
LOG(INFO) << "skipping processing of inbound external messages";
return true;
}
if (out_msg_queue_->get_root_cell().not_null() && out_msg_queue_->get_root_cell()->get_depth() > 12) {
LOG(INFO) << "skipping processing of inbound external messages: out msg queue is too big";
if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE) {
LOG(INFO) << "skipping processing of inbound external messages because out_msg_queue is too big ("
<< out_msg_queue_size_ << " > " << SKIP_EXTERNALS_QUEUE_SIZE << ")";
return true;
}
bool full = !block_limit_status_->fits(block::ParamLimits::cl_soft);
@ -3550,6 +3652,7 @@ bool Collator::enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_rema
LOG(DEBUG) << "inserting into outbound queue a new message with (lt,key)=(" << start_lt << "," << key.to_hex()
<< ")";
ok = out_msg_queue_->set_builder(key.bits(), 352, cb, vm::Dictionary::SetMode::Add);
++out_msg_queue_size_;
} catch (vm::VmError) {
ok = false;
}
@ -4158,6 +4261,7 @@ static int history_weight(td::uint64 history) {
* @returns True if the check is successful.
*/
bool Collator::check_block_overload() {
LOG(INFO) << "final out_msg_queue size is " << out_msg_queue_size_;
overload_history_ <<= 1;
underload_history_ <<= 1;
block_size_estimate_ = block_limit_status_->estimate_block_size();
@ -4166,18 +4270,32 @@ bool Collator::check_block_overload() {
<< " size_estimate=" << block_size_estimate_;
auto cl = block_limit_status_->classify();
if (cl <= block::ParamLimits::cl_underload) {
if (queue_too_big_) {
LOG(INFO) << "block is underloaded, but don't set underload history because out msg queue is big";
if (out_msg_queue_size_ > MERGE_MAX_QUEUE_SIZE) {
LOG(INFO)
<< "block is underloaded, but don't set underload history because out_msg_queue size is too big to merge ("
<< out_msg_queue_size_ << " > " << MERGE_MAX_QUEUE_SIZE << ")";
} else {
underload_history_ |= 1;
LOG(INFO) << "block is underloaded";
}
} else if (cl >= block::ParamLimits::cl_soft) {
overload_history_ |= 1;
LOG(INFO) << "block is overloaded (category " << cl << ")";
if (out_msg_queue_size_ > SPLIT_MAX_QUEUE_SIZE) {
LOG(INFO) << "block is overloaded (category " << cl
<< "), but don't set overload history because out_msg_queue size is too big to split ("
<< out_msg_queue_size_ << " > " << SPLIT_MAX_QUEUE_SIZE << ")";
} else {
overload_history_ |= 1;
LOG(INFO) << "block is overloaded (category " << cl << ")";
}
} else {
LOG(INFO) << "block is loaded normally";
}
if (!(overload_history_ & 1) && out_msg_queue_size_ >= FORCE_SPLIT_QUEUE_SIZE &&
out_msg_queue_size_ <= SPLIT_MAX_QUEUE_SIZE) {
overload_history_ |= 1;
LOG(INFO) << "setting overload history because out_msg_queue reached force split limit (" << out_msg_queue_size_
<< " >= " << FORCE_SPLIT_QUEUE_SIZE << ")";
}
if (collator_settings & 1) {
LOG(INFO) << "want_split manually set";
want_split_ = true;

View file

@ -23,6 +23,7 @@
#include "validator-group.hpp"
#include "manager-init.h"
#include "manager-disk.h"
#include "queue-size-counter.hpp"
#include <map>
#include <set>
@ -376,6 +377,13 @@ class ValidatorManagerImpl : public ValidatorManager {
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override {
UNREACHABLE();
}
void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) override {
if (queue_size_counter_.empty()) {
queue_size_counter_ =
td::actor::create_actor<QueueSizeCounter>("queuesizecounter", td::Ref<MasterchainState>{}, actor_id(this));
}
td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise));
}
private:
PublicKeyHash local_id_;
@ -393,6 +401,7 @@ class ValidatorManagerImpl : public ValidatorManager {
int pending_new_shard_block_descr_{0};
std::vector<td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>>> waiting_new_shard_block_descr_;
td::actor::ActorOwn<QueueSizeCounter> queue_size_counter_;
void update_shards();
void update_shard_blocks();

View file

@ -23,6 +23,7 @@
#include "validator-group.hpp"
#include "manager-init.h"
#include "manager-hardfork.h"
#include "queue-size-counter.hpp"
#include <map>
#include <set>
@ -437,6 +438,13 @@ class ValidatorManagerImpl : public ValidatorManager {
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override {
UNREACHABLE();
}
void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) override {
if (queue_size_counter_.empty()) {
queue_size_counter_ =
td::actor::create_actor<QueueSizeCounter>("queuesizecounter", td::Ref<MasterchainState>{}, actor_id(this));
}
td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise));
}
private:
td::Ref<ValidatorManagerOptions> opts_;
@ -445,6 +453,7 @@ class ValidatorManagerImpl : public ValidatorManager {
std::string db_root_;
ShardIdFull shard_to_generate_;
BlockIdExt block_to_generate_;
td::actor::ActorOwn<QueueSizeCounter> queue_size_counter_;
};
} // namespace validator

View file

@ -2277,7 +2277,15 @@ void ValidatorManagerImpl::allow_block_info_gc(BlockIdExt block_id, td::Promise<
void ValidatorManagerImpl::got_next_gc_masterchain_handle(BlockHandle handle) {
CHECK(gc_advancing_);
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Ref<ShardState>> R) {
R.ensure();
if (R.is_error()) {
if (R.error().code() == ErrorCode::timeout) {
LOG(ERROR) << "Failed to get gc masterchain state, retrying: " << R.move_as_error();
td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_gc_masterchain_handle, std::move(handle));
} else {
LOG(FATAL) << "Failed to get gc masterchain state: " << R.move_as_error();
}
return;
}
td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_gc_masterchain_state, std::move(handle),
td::Ref<MasterchainState>{R.move_as_ok()});
});

View file

@ -28,6 +28,7 @@
#include "state-serializer.hpp"
#include "rldp/rldp.h"
#include "token-manager.h"
#include "queue-size-counter.hpp"
#include <map>
#include <set>
@ -548,6 +549,18 @@ class ValidatorManagerImpl : public ValidatorManager {
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override;
void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) override {
if (queue_size_counter_.empty()) {
if (last_masterchain_state_.is_null()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "not ready"));
return;
}
queue_size_counter_ = td::actor::create_actor<QueueSizeCounter>("queuesizecounter",
last_masterchain_state_, actor_id(this));
}
td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise));
}
private:
td::Timestamp resend_shard_blocks_at_;
td::Timestamp check_waiters_at_;
@ -612,6 +625,7 @@ class ValidatorManagerImpl : public ValidatorManager {
private:
std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_;
td::actor::ActorOwn<QueueSizeCounter> queue_size_counter_;
};
} // namespace validator

View file

@ -0,0 +1,301 @@
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#include "queue-size-counter.hpp"
#include "block/block-auto.h"
#include "block/block-parse.h"
#include "common/delay.h"
#include "td/actor/MultiPromise.h"
#include "td/utils/Random.h"
namespace ton::validator {
static td::Result<td::uint32> calc_queue_size(const td::Ref<ShardState> &state) {
td::uint32 size = 0;
TRY_RESULT(outq_descr, state->message_queue());
block::gen::OutMsgQueueInfo::Record qinfo;
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
return td::Status::Error("invalid message queue");
}
vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
bool ok = queue.check_for_each([&](td::Ref<vm::CellSlice>, td::ConstBitPtr, int) -> bool {
++size;
return true;
});
if (!ok) {
return td::Status::Error("invalid message queue dict");
}
return size;
}
static td::Result<td::uint32> recalc_queue_size(const td::Ref<ShardState> &state, const td::Ref<ShardState> &prev_state,
td::uint32 prev_size) {
TRY_RESULT(outq_descr, state->message_queue());
block::gen::OutMsgQueueInfo::Record qinfo;
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
return td::Status::Error("invalid message queue");
}
vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
TRY_RESULT(prev_outq_descr, prev_state->message_queue());
block::gen::OutMsgQueueInfo::Record prev_qinfo;
if (!tlb::unpack_cell(prev_outq_descr->root_cell(), prev_qinfo)) {
return td::Status::Error("invalid message queue");
}
vm::AugmentedDictionary prev_queue{prev_qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
td::uint32 add = 0, rem = 0;
bool ok = prev_queue.scan_diff(
queue, [&](td::ConstBitPtr, int, td::Ref<vm::CellSlice> prev_val, td::Ref<vm::CellSlice> new_val) -> bool {
if (prev_val.not_null()) {
++rem;
}
if (new_val.not_null()) {
++add;
}
return true;
});
if (!ok) {
return td::Status::Error("invalid message queue dict");
}
if (prev_size + add < rem) {
return td::Status::Error("negative value");
}
return prev_size + add - rem;
}
void QueueSizeCounter::start_up() {
if (init_masterchain_state_.is_null()) {
// Used in manager-hardfork or manager-disk
simple_mode_ = true;
return;
}
current_seqno_ = init_masterchain_state_->get_seqno();
process_top_shard_blocks_cont(init_masterchain_state_, true);
init_masterchain_state_ = {};
alarm();
}
void QueueSizeCounter::get_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) {
get_queue_size_ex(block_id, simple_mode_ || is_block_too_old(block_id), std::move(promise));
}
void QueueSizeCounter::get_queue_size_ex(ton::BlockIdExt block_id, bool calc_whole, td::Promise<td::uint32> promise) {
Entry &entry = results_[block_id];
if (entry.done_) {
promise.set_result(entry.queue_size_);
return;
}
entry.promises_.push_back(std::move(promise));
if (entry.started_) {
return;
}
entry.started_ = true;
entry.calc_whole_ = calc_whole;
td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, block_id, true,
[SelfId = actor_id(this), block_id, manager = manager_](td::Result<BlockHandle> R) mutable {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, block_id, R.move_as_error());
return;
}
BlockHandle handle = R.move_as_ok();
td::actor::send_closure(
manager, &ValidatorManager::wait_block_state, handle, 0, td::Timestamp::in(10.0),
[SelfId, handle](td::Result<td::Ref<ShardState>> R) mutable {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, handle->id(),
R.move_as_error());
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont,
std::move(handle), R.move_as_ok());
});
});
}
void QueueSizeCounter::get_queue_size_cont(BlockHandle handle, td::Ref<ShardState> state) {
Entry &entry = results_[handle->id()];
CHECK(entry.started_);
bool calc_whole = entry.calc_whole_ || handle->id().seqno() == 0;
if (!calc_whole) {
CHECK(handle->inited_prev());
auto prev_blocks = handle->prev();
bool after_split = prev_blocks.size() == 1 && handle->id().shard_full() != prev_blocks[0].shard_full();
bool after_merge = prev_blocks.size() == 2;
calc_whole = after_split || after_merge;
}
if (calc_whole) {
auto r_size = calc_queue_size(state);
if (r_size.is_error()) {
on_error(handle->id(), r_size.move_as_error());
return;
}
entry.done_ = true;
entry.queue_size_ = r_size.move_as_ok();
for (auto &promise : entry.promises_) {
promise.set_result(entry.queue_size_);
}
entry.promises_.clear();
return;
}
auto prev_block_id = handle->one_prev(true);
get_queue_size(prev_block_id, [=, SelfId = actor_id(this), manager = manager_](td::Result<td::uint32> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error());
return;
}
td::uint32 prev_size = R.move_as_ok();
td::actor::send_closure(
manager, &ValidatorManager::wait_block_state_short, prev_block_id, 0, td::Timestamp::in(10.0),
[=](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error());
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont2, state, R.move_as_ok(), prev_size);
});
});
}
void QueueSizeCounter::get_queue_size_cont2(td::Ref<ShardState> state, td::Ref<ShardState> prev_state,
td::uint32 prev_size) {
BlockIdExt block_id = state->get_block_id();
Entry &entry = results_[block_id];
CHECK(entry.started_);
auto r_size = recalc_queue_size(state, prev_state, prev_size);
if (r_size.is_error()) {
on_error(block_id, r_size.move_as_error());
return;
}
entry.done_ = true;
entry.queue_size_ = r_size.move_as_ok();
for (auto &promise : entry.promises_) {
promise.set_result(entry.queue_size_);
}
entry.promises_.clear();
}
void QueueSizeCounter::on_error(ton::BlockIdExt block_id, td::Status error) {
auto it = results_.find(block_id);
if (it == results_.end()) {
return;
}
Entry &entry = it->second;
CHECK(!entry.done_);
for (auto &promise : entry.promises_) {
promise.set_error(error.clone());
}
results_.erase(it);
}
void QueueSizeCounter::process_top_shard_blocks() {
LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks seqno=" << current_seqno_;
td::actor::send_closure(
manager_, &ValidatorManager::get_block_by_seqno_from_db, AccountIdPrefixFull{masterchainId, 0}, current_seqno_,
[SelfId = actor_id(this), manager = manager_](td::Result<ConstBlockHandle> R) {
if (R.is_error()) {
LOG(WARNING) << "Failed to get masterchain block id: " << R.move_as_error();
delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); },
td::Timestamp::in(5.0));
return;
}
td::actor::send_closure(
manager, &ValidatorManager::wait_block_state_short, R.ok()->id(), 0, td::Timestamp::in(10.0),
[=](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
LOG(WARNING) << "Failed to get masterchain state: " << R.move_as_error();
delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); },
td::Timestamp::in(5.0));
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_cont,
td::Ref<MasterchainState>(R.move_as_ok()), false);
});
});
}
void QueueSizeCounter::process_top_shard_blocks_cont(td::Ref<MasterchainState> state, bool init) {
LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks_cont seqno=" << current_seqno_ << " init=" << init;
td::MultiPromise mp;
auto ig = mp.init_guard();
last_top_blocks_.clear();
last_top_blocks_.push_back(state->get_block_id());
for (auto &shard : state->get_shards()) {
last_top_blocks_.push_back(shard->top_block_id());
}
for (const BlockIdExt &block_id : last_top_blocks_) {
get_queue_size_ex_retry(block_id, init, ig.get_promise());
}
ig.add_promise([SelfId = actor_id(this)](td::Result<td::Unit> R) {
if (R.is_error()) {
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_finish);
});
if (init) {
init_top_blocks_ = last_top_blocks_;
}
}
void QueueSizeCounter::get_queue_size_ex_retry(BlockIdExt block_id, bool calc_whole, td::Promise<td::Unit> promise) {
get_queue_size_ex(block_id, calc_whole,
[=, promise = std::move(promise), SelfId = actor_id(this)](td::Result<td::uint32> R) mutable {
if (R.is_error()) {
LOG(WARNING) << "Failed to calculate queue size for block " << block_id.to_str() << ": "
<< R.move_as_error();
delay_action(
[=, promise = std::move(promise)]() mutable {
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_ex_retry, block_id,
calc_whole, std::move(promise));
},
td::Timestamp::in(5.0));
return;
}
promise.set_result(td::Unit());
});
}
void QueueSizeCounter::process_top_shard_blocks_finish() {
++current_seqno_;
wait_shard_client();
}
void QueueSizeCounter::wait_shard_client() {
LOG(DEBUG) << "QueueSizeCounter::wait_shard_client seqno=" << current_seqno_;
td::actor::send_closure(
manager_, &ValidatorManager::wait_shard_client_state, current_seqno_, td::Timestamp::in(60.0),
[SelfId = actor_id(this)](td::Result<td::Unit> R) {
if (R.is_error()) {
delay_action([=]() mutable { td::actor::send_closure(SelfId, &QueueSizeCounter::wait_shard_client); },
td::Timestamp::in(5.0));
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks);
});
}
void QueueSizeCounter::alarm() {
for (auto it = results_.begin(); it != results_.end();) {
if (it->second.done_ && is_block_too_old(it->first)) {
it = results_.erase(it);
} else {
++it;
}
}
alarm_timestamp() = td::Timestamp::in(td::Random::fast(20.0, 40.0));
}
} // namespace ton::validator

View file

@ -0,0 +1,82 @@
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "interfaces/validator-manager.h"
namespace ton::validator {
class QueueSizeCounter : public td::actor::Actor {
public:
QueueSizeCounter(td::Ref<MasterchainState> last_masterchain_state, td::actor::ActorId<ValidatorManager> manager)
: init_masterchain_state_(last_masterchain_state), manager_(std::move(manager)) {
}
void start_up() override;
void get_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise);
void alarm() override;
private:
td::Ref<MasterchainState> init_masterchain_state_;
td::actor::ActorId<ValidatorManager> manager_;
bool simple_mode_ = false;
BlockSeqno current_seqno_ = 0;
std::vector<BlockIdExt> init_top_blocks_;
std::vector<BlockIdExt> last_top_blocks_;
struct Entry {
bool started_ = false;
bool done_ = false;
bool calc_whole_ = false;
td::uint32 queue_size_ = 0;
std::vector<td::Promise<td::uint32>> promises_;
};
std::map<BlockIdExt, Entry> results_;
void get_queue_size_ex(BlockIdExt block_id, bool calc_whole, td::Promise<td::uint32> promise);
void get_queue_size_cont(BlockHandle handle, td::Ref<ShardState> state);
void get_queue_size_cont2(td::Ref<ShardState> state, td::Ref<ShardState> prev_state, td::uint32 prev_size);
void on_error(BlockIdExt block_id, td::Status error);
void process_top_shard_blocks();
void process_top_shard_blocks_cont(td::Ref<MasterchainState> state, bool init = false);
void get_queue_size_ex_retry(BlockIdExt block_id, bool calc_whole, td::Promise<td::Unit> promise);
void process_top_shard_blocks_finish();
void wait_shard_client();
bool is_block_too_old(const BlockIdExt& block_id) const {
for (const BlockIdExt& top_block : last_top_blocks_) {
if (shard_intersects(block_id.shard_full(), top_block.shard_full())) {
if (block_id.seqno() + 100 < top_block.seqno()) {
return true;
}
break;
}
}
for (const BlockIdExt& init_top_block : init_top_blocks_) {
if (shard_intersects(block_id.shard_full(), init_top_block.shard_full())) {
if (block_id.seqno() < init_top_block.seqno()) {
return true;
}
break;
}
}
return false;
}
};
} // namespace ton::validator

View file

@ -228,6 +228,8 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void prepare_perf_timer_stats(td::Promise<std::vector<PerfTimerStats>> promise) = 0;
virtual void add_perf_timer_stat(std::string name, double duration) = 0;
virtual void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) = 0;
};
} // namespace validator