From 5dd0c15d072d016b15b8952c9bbd119953a0accf Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Tue, 4 Jul 2023 23:34:34 +0300 Subject: [PATCH] Limit imported msg queue size --- crypto/block/block.tlb | 1 + crypto/block/output-queue-merger.cpp | 42 +++-- crypto/block/output-queue-merger.h | 18 +- tl/generate/scheme/ton_api.tl | 2 +- tl/generate/scheme/ton_api.tlo | Bin 90648 -> 90684 bytes validator/impl/collator-impl.h | 2 + validator/impl/collator.cpp | 29 ++- validator/impl/out-msg-queue-proof.cpp | 210 +++++++++++++-------- validator/impl/out-msg-queue-proof.hpp | 10 +- validator/impl/validate-query.cpp | 21 ++- validator/impl/validate-query.hpp | 1 + validator/interfaces/out-msg-queue-proof.h | 13 +- 12 files changed, 227 insertions(+), 122 deletions(-) diff --git a/crypto/block/block.tlb b/crypto/block/block.tlb index cb786049..b316d1c6 100644 --- a/crypto/block/block.tlb +++ b/crypto/block/block.tlb @@ -829,6 +829,7 @@ top_block_descr#d5 proof_for:BlockIdExt signatures:(Maybe ^BlockSignatures) // COLLATED DATA // top_block_descr_set#4ac789f3 collection:(HashmapE 96 ^TopBlockDescr) = TopBlockDescrSet; +neighbor_msg_queue_limits#7e549333 neighbors:(HashmapE 96 int32) = NeighborMsgQueueLimits; // // VALIDATOR MISBEHAVIOR COMPLAINTS diff --git a/crypto/block/output-queue-merger.cpp b/crypto/block/output-queue-merger.cpp index 1084bb1a..ca7021a8 100644 --- a/crypto/block/output-queue-merger.cpp +++ b/crypto/block/output-queue-merger.cpp @@ -134,34 +134,33 @@ bool OutputQueueMerger::MsgKeyValue::split(MsgKeyValue& second) { return true; } -bool OutputQueueMerger::add_root(int src, Ref outmsg_root) { +void OutputQueueMerger::add_root(int src, Ref outmsg_root, td::int32 msg_limit) { if (outmsg_root.is_null()) { - return true; + return; } //block::gen::HashmapAug{352, block::gen::t_EnqueuedMsg, block::gen::t_uint64}.print_ref(std::cerr, outmsg_root); auto kv = std::make_unique(src, std::move(outmsg_root)); if (kv->replace_by_prefix(common_pfx.cbits(), common_pfx_len)) { heap.push_back(std::move(kv)); } - return true; + if ((int)src_remaining_msgs_.size() < src + 1) { + src_remaining_msgs_.resize(src + 1); + } + src_remaining_msgs_[src] = msg_limit; } -OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector _neighbors) - : queue_for(_queue_for), neighbors(std::move(_neighbors)), eof(false), failed(false) { - init(); -} - -void OutputQueueMerger::init() { +OutputQueueMerger::OutputQueueMerger(ton::ShardIdFull queue_for, std::vector neighbors) + : eof(false), failed(false) { common_pfx.bits().store_int(queue_for.workchain, 32); int l = queue_for.pfx_len(); td::bitstring::bits_store_long_top(common_pfx.bits() + 32, queue_for.shard, l); common_pfx_len = 32 + l; int i = 0; - for (block::McShardDescr& neighbor : neighbors) { - if (!neighbor.is_disabled()) { - LOG(DEBUG) << "adding " << (neighbor.outmsg_root.is_null() ? "" : "non-") << "empty output queue for neighbor #" - << i << " (" << neighbor.blk_.to_str() << ")"; - add_root(i++, neighbor.outmsg_root); + for (Neighbor& neighbor : neighbors) { + if (!neighbor.disabled_) { + LOG(DEBUG) << "adding " << (neighbor.outmsg_root_.is_null() ? "" : "non-") << "empty output queue for neighbor #" + << i << " (" << neighbor.block_id_.to_str() << ")"; + add_root(i++, neighbor.outmsg_root_, neighbor.msg_limit_); } else { LOG(DEBUG) << "skipping output queue for disabled neighbor #" << i; i++; @@ -200,6 +199,10 @@ bool OutputQueueMerger::load() { unsigned long long lt = heap[0]->lt; std::size_t orig_size = msg_list.size(); do { + if (src_remaining_msgs_[heap[0]->source] == 0) { + std::pop_heap(heap.begin(), heap.end(), MsgKeyValue::greater); + continue; + } while (heap[0]->is_fork()) { auto other = std::make_unique(); if (!heap[0]->split(*other)) { @@ -215,6 +218,17 @@ bool OutputQueueMerger::load() { heap.pop_back(); } while (!heap.empty() && heap[0]->lt <= lt); std::sort(msg_list.begin() + orig_size, msg_list.end(), MsgKeyValue::less); + size_t j = orig_size; + for (size_t i = orig_size; i < msg_list.size(); ++i) { + td::int32 &remaining = src_remaining_msgs_[msg_list[i]->source]; + if (remaining != 0) { + if (remaining > 0) { + --remaining; + } + msg_list[j++] = std::move(msg_list[i]); + } + } + msg_list.resize(j); return true; } diff --git a/crypto/block/output-queue-merger.h b/crypto/block/output-queue-merger.h index bf3d8586..f5c15ab4 100644 --- a/crypto/block/output-queue-merger.h +++ b/crypto/block/output-queue-merger.h @@ -52,12 +52,20 @@ struct OutputQueueMerger { bool split(MsgKeyValue& second); }; // - ton::ShardIdFull queue_for; std::vector> msg_list; - std::vector neighbors; public: - OutputQueueMerger(ton::ShardIdFull _queue_for, std::vector _neighbors); + struct Neighbor { + ton::BlockIdExt block_id_; + td::Ref outmsg_root_; + bool disabled_; + td::int32 msg_limit_; // -1 - unlimited + Neighbor(ton::BlockIdExt block_id, td::Ref outmsg_root, bool disabled = false, td::int32 msg_limit = -1) + : block_id_(block_id), outmsg_root_(std::move(outmsg_root)), disabled_(disabled), msg_limit_(msg_limit) { + } + }; + + OutputQueueMerger(ton::ShardIdFull queue_for, std::vector neighbors); bool is_eof() const { return eof; } @@ -70,10 +78,10 @@ struct OutputQueueMerger { int common_pfx_len; std::vector> heap; std::size_t pos{0}; + std::vector src_remaining_msgs_; bool eof; bool failed; - void init(); - bool add_root(int src, Ref outmsg_root); + void add_root(int src, Ref outmsg_root, td::int32 msg_limit); bool load(); }; diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 2a118197..6d52b845 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -415,7 +415,7 @@ tonNode.success = tonNode.Success; tonNode.archiveNotFound = tonNode.ArchiveInfo; tonNode.archiveInfo id:long = tonNode.ArchiveInfo; -tonNode.outMsgQueueProof queue_proof:bytes block_state_proof:bytes = tonNode.OutMsgQueueProof; +tonNode.outMsgQueueProof queue_proof:bytes block_state_proof:bytes msg_count:int = tonNode.OutMsgQueueProof; tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof; tonNode.forgetPeer = tonNode.ForgetPeer; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index f85ddc6b49657a9b9477515e82ad3d1775fe4197..1eed452846aa645ab1b76b678dde81080e579487 100644 GIT binary patch delta 138 zcmbPngmupm)(txPEI$+_I5!*UPdKG1rQfttq9i}hFFz$!FTb?Jw>UkpG_^D}peR2- zje&syr1Iu5As^<+FMkG4R`6$G;mj>gpRBN21#HM X_|=mDRyJKBny~@Q+P);3v4jl(pQkWp delta 119 zcmdmUgmuOd)(txPEJ<2%Q#KpuPdLTepmM}SVsh;-2_6Op29WU0V?sVmlVAP}o~+=; z0aiFU=a+#5ObV!qp~#s{hr_n*AlQ^Gzj_ilVY1AV8}qcMmqarLfOT#^5zSb_1^{`V BEK2|Y diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index dab3f6a9..80a8e9a7 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -133,6 +133,8 @@ class Collator final : public td::actor::Actor { std::unique_ptr config_; std::unique_ptr shard_conf_; std::map> aux_mc_states_; + std::vector neighbor_queues_; + vm::Dictionary neighbor_msg_queues_limits_{32 + 64}; std::vector neighbors_; std::unique_ptr nb_out_msgs_; std::vector special_smcs; diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index a2f3f7bd..c6ac6f2f 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -77,7 +77,7 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha } void Collator::start_up() { - LOG(DEBUG) << "Collator for shard " << shard_.to_str() << " started"; + LOG(INFO) << "Collator for shard " << shard_.to_str() << " started"; LOG(DEBUG) << "Previous block #1 is " << prev_blocks.at(0).to_str(); if (prev_blocks.size() > 1) { LOG(DEBUG) << "Previous block #2 is " << prev_blocks.at(1).to_str(); @@ -690,7 +690,15 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Resultprefetch_ref(0)); + auto queue_root = qinfo.out_queue->prefetch_ref(0); + descr.set_queue_root(queue_root); + neighbor_queues_.emplace_back(descr.top_block_id(), queue_root, descr.is_disabled(), res->msg_count_); + if (res->msg_count_ != -1) { + td::BitArray<96> key; + key.bits().store_int(block_id.id.workchain, 32); + (key.bits() + 32).store_uint(block_id.id.shard, 64); + neighbor_msg_queues_limits_.set_builder(key, vm::CellBuilder().store_long(res->msg_count_, 32)); + } // comment the next two lines in the future when the output queues become huge // CHECK(block::gen::t_OutMsgQueueInfo.validate_ref(1000000, outq_descr->root_cell())); // CHECK(block::tlb::t_OutMsgQueueInfo.validate_ref(1000000, outq_descr->root_cell())); @@ -1723,7 +1731,7 @@ bool Collator::do_collate() { // 1.3. create OutputQueueMerger from adjusted neighbors CHECK(!nb_out_msgs_); LOG(DEBUG) << "creating OutputQueueMerger"; - nb_out_msgs_ = std::make_unique(shard_, neighbors_); + nb_out_msgs_ = std::make_unique(shard_, neighbor_queues_); // 1.4. compute created / minted / recovered if (!init_value_create()) { return fatal_error("cannot compute the value to be created / minted / recovered"); @@ -4013,18 +4021,25 @@ bool Collator::create_collated_data() { auto cell = collate_shard_block_descr_set(); if (cell.is_null()) { return true; - return fatal_error("cannot collate the collection of used shard block descriptions"); + // return fatal_error("cannot collate the collection of used shard block descriptions"); } collated_roots_.push_back(std::move(cell)); } + // 2. Message count for neighbors' out queues + if (!neighbor_msg_queues_limits_.is_empty()) { + vm::CellBuilder cb; + cb.store_long(block::gen::t_NeighborMsgQueueLimits.cons_tag[0], 32); + cb.store_maybe_ref(neighbor_msg_queues_limits_.get_root_cell()); + collated_roots_.push_back(cb.finalize_novm()); + } if (!full_collated_data_) { return true; } - // 2. Proofs for hashes of states: previous states + neighbors + // 3. Proofs for hashes of states: previous states + neighbors for (const auto& p : block_state_proofs_) { collated_roots_.push_back(p.second); } - // 3. Previous state proof (only shadchains) + // 4. Previous state proof (only shadchains) std::map> proofs; if (!is_masterchain()) { if (!prepare_msg_queue_proof()) { @@ -4038,7 +4053,7 @@ bool Collator::create_collated_data() { } proofs[prev_state_root_->get_hash().bits()] = std::move(state_proof); } - // 4. Proofs for message queues + // 5. Proofs for message queues for (vm::MerkleProofBuilder &mpb : neighbor_proof_builders_) { auto r_proof = mpb.extract_proof(); if (r_proof.is_error()) { diff --git a/validator/impl/out-msg-queue-proof.cpp b/validator/impl/out-msg-queue-proof.cpp index 56a4edd3..329da607 100644 --- a/validator/impl/out-msg-queue-proof.cpp +++ b/validator/impl/out-msg-queue-proof.cpp @@ -22,94 +22,45 @@ #include "interfaces/validator-manager.h" #include "block/block-parse.h" #include "block/block-auto.h" +#include "output-queue-merger.h" namespace ton { namespace validator { -td::Result> OutMsgQueueProof::fetch(BlockIdExt block_id, ShardIdFull dst_shard, - const ton_api::tonNode_outMsgQueueProof& f) { - Ref block_state_proof; - td::Bits256 state_root_hash; - if (block_id.seqno() == 0) { - if (!f.block_state_proof_.empty()) { - return td::Status::Error("expected empty block state proof"); - } - state_root_hash = block_id.root_hash; - } else { - TRY_RESULT_ASSIGN(block_state_proof, vm::std_boc_deserialize(f.block_state_proof_.as_slice())); - TRY_RESULT_ASSIGN(state_root_hash, unpack_block_state_proof(block_id, block_state_proof)); +static td::Status check_no_prunned(const Ref& cell) { + if (cell.is_null()) { + return td::Status::OK(); } - - TRY_RESULT(queue_proof, vm::std_boc_deserialize(f.queue_proof_.as_slice())); - auto virtual_root = vm::MerkleProof::virtualize(queue_proof, 1); - if (virtual_root.is_null()) { - return td::Status::Error("invalid queue proof"); + TRY_RESULT(loaded_cell, cell->load_cell()); + if (loaded_cell.data_cell->get_level() > 0) { + return td::Status::Error("prunned branch"); } - if (virtual_root->get_hash().as_slice() != state_root_hash.as_slice()) { - return td::Status::Error("state root hash mismatch"); - } - - // Validate proof - auto state_root = vm::CellSlice(vm::NoVm(), queue_proof).prefetch_ref(0); - TRY_RESULT_PREFIX(state, ShardStateQ::fetch(block_id, {}, state_root), "invalid proof: "); - TRY_RESULT_PREFIX(outq_descr, state->message_queue(), "invalid proof: "); - - block::gen::OutMsgQueueInfo::Record qinfo; - if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { - return td::Status::Error("invalid proof: invalid message queue"); - } - td::Ref proc_info = qinfo.proc_info->prefetch_ref(0); - if (proc_info.not_null() && proc_info->get_level() != 0) { - return td::Status::Error("invalid proof: proc_info has prunned branches"); - } - td::Ref ihr_pending = qinfo.ihr_pending->prefetch_ref(0); - if (ihr_pending.not_null() && ihr_pending->get_level() != 0) { - return td::Status::Error("invalid proof: ihr_pending has prunned branches"); - } - auto queue = - std::make_unique(qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue); - td::BitArray<96> prefix; - td::BitPtr ptr = prefix.bits(); - ptr.store_int(dst_shard.workchain, 32); - ptr.advance(32); - ptr.store_uint(dst_shard.shard, 64); - if (!queue->cut_prefix_subdict(prefix.bits(), 32 + dst_shard.pfx_len())) { - return td::Status::Error("invalid proof: failed to cut queue dict"); - } - if (queue->get_root_cell().not_null() && queue->get_root_cell()->get_level() != 0) { - return td::Status::Error("invalid proof: msg queue has prunned branches"); - } - - return Ref(true, std::move(virtual_root), std::move(block_state_proof)); + return td::Status::OK(); } -td::Result> OutMsgQueueProof::serialize(BlockIdExt block_id, - ShardIdFull dst_shard, - Ref state_root, - Ref block_root) { - if (!dst_shard.is_valid_ext()) { - return td::Status::Error("invalid shard"); - } - vm::MerkleProofBuilder mpb{std::move(state_root)}; - TRY_RESULT(state, ShardStateQ::fetch(block_id, {}, mpb.root())); - TRY_RESULT(outq_descr, state->message_queue()); - block::gen::OutMsgQueueInfo::Record qinfo; - if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { - return td::Status::Error("invalid message queue"); +static td::Status check_no_prunned(const vm::CellSlice& cs) { + for (unsigned i = 0; i < cs.size_refs(); ++i) { + TRY_STATUS(check_no_prunned(cs.prefetch_ref(i))); } + return td::Status::OK(); +} + +static td::Result process_queue(BlockIdExt block_id, ShardIdFull dst_shard, + const block::gen::OutMsgQueueInfo::Record& qinfo) { + td::uint64 estimated_proof_size = 0; td::HashSet visited; - std::function)> dfs = [&](Ref cell) { + std::function dfs_cs; + auto dfs = [&](Ref cell) { if (cell.is_null() || !visited.insert(cell->get_hash()).second) { return; } - vm::CellSlice cs(vm::NoVm(), cell); - for (unsigned i = 0; i < cs.size_refs(); i++) { - dfs(cs.prefetch_ref(i)); - } + dfs_cs(vm::CellSlice(vm::NoVm(), cell)); }; - auto dfs_cs = [&](const vm::CellSlice &cs) { + dfs_cs = [&](const vm::CellSlice& cs) { + // Based on BlockLimitStatus::estimate_block_size + estimated_proof_size += 12 + (cs.size() + 7) / 8 + cs.size_refs() * 3; for (unsigned i = 0; i < cs.size_refs(); i++) { dfs(cs.prefetch_ref(i)); } @@ -117,17 +68,65 @@ td::Result> OutMsgQueueProof::s dfs_cs(*qinfo.proc_info); dfs_cs(*qinfo.ihr_pending); - auto queue = - std::make_unique(qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue); - td::BitArray<96> prefix; - td::BitPtr ptr = prefix.bits(); - ptr.store_int(dst_shard.workchain, 32); - ptr.advance(32); - ptr.store_uint(dst_shard.shard, 64); - if (!queue->cut_prefix_subdict(prefix.bits(), 32 + dst_shard.pfx_len())) { + block::OutputQueueMerger queue_merger{ + dst_shard, {block::OutputQueueMerger::Neighbor{block_id, qinfo.out_queue->prefetch_ref()}}}; + td::int32 msg_count = 0; + bool limit_reached = false; + + while (!queue_merger.is_eof()) { + auto kv = queue_merger.extract_cur(); + queue_merger.next(); + ++msg_count; + + // TODO: Get processed_upto from destination shard (in request?) + /* + // Parse message to check if it was processed (as in Collator::process_inbound_message) + ton::LogicalTime enqueued_lt = kv->msg->prefetch_ulong(64); + auto msg_env = kv->msg->prefetch_ref(); + block::tlb::MsgEnvelope::Record_std env; + if (!tlb::unpack_cell(msg_env, env)) { + return td::Status::Error("cannot unpack MsgEnvelope of an internal message"); + } + vm::CellSlice cs{vm::NoVmOrd{}, env.msg}; + block::gen::CommonMsgInfo::Record_int_msg_info info; + if (!tlb::unpack(cs, info)) { + return td::Status::Error("cannot unpack CommonMsgInfo of an internal message"); + } + auto src_prefix = block::tlb::MsgAddressInt::get_prefix(info.src); + auto dest_prefix = block::tlb::MsgAddressInt::get_prefix(info.dest); + auto cur_prefix = block::interpolate_addr(src_prefix, dest_prefix, env.cur_addr); + auto next_prefix = block::interpolate_addr(src_prefix, dest_prefix, env.next_addr); + block::EnqueuedMsgDescr descr{cur_prefix, next_prefix, kv->lt, enqueued_lt, env.msg->get_hash().bits()}; + if (dst_processed_upto->already_processed(descr)) { + } else { + }*/ + + dfs_cs(*kv->msg); + TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ") + if (estimated_proof_size > OutMsgQueueProof::QUEUE_SIZE_THRESHOLD) { + limit_reached = true; + break; + } + } + return limit_reached ? msg_count : -1; +} + +td::Result> OutMsgQueueProof::build(BlockIdExt block_id, + ShardIdFull dst_shard, + Ref state_root, + Ref block_root) { + if (!dst_shard.is_valid_ext()) { + return td::Status::Error("invalid shard"); + } + + vm::MerkleProofBuilder mpb{std::move(state_root)}; + TRY_RESULT(state, ShardStateQ::fetch(block_id, {}, mpb.root())); + TRY_RESULT(outq_descr, state->message_queue()); + block::gen::OutMsgQueueInfo::Record qinfo; + if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { return td::Status::Error("invalid message queue"); } - dfs(queue->get_root_cell()); + TRY_RESULT(cnt, process_queue(block_id, dst_shard, qinfo)); TRY_RESULT(queue_proof, mpb.extract_proof_boc()); td::BufferSlice block_state_proof; @@ -135,7 +134,52 @@ td::Result> OutMsgQueueProof::s TRY_RESULT(proof, create_block_state_proof(std::move(block_root))); TRY_RESULT_ASSIGN(block_state_proof, vm::std_boc_serialize(std::move(proof), 31)); } - return create_tl_object(std::move(queue_proof), std::move(block_state_proof)); + return create_tl_object(std::move(queue_proof), std::move(block_state_proof), + cnt); +} + +td::Result> OutMsgQueueProof::fetch(BlockIdExt block_id, ShardIdFull dst_shard, + const ton_api::tonNode_outMsgQueueProof& f) { + try { + Ref block_state_proof; + td::Bits256 state_root_hash; + if (block_id.seqno() == 0) { + if (!f.block_state_proof_.empty()) { + return td::Status::Error("expected empty block state proof"); + } + state_root_hash = block_id.root_hash; + } else { + TRY_RESULT_ASSIGN(block_state_proof, vm::std_boc_deserialize(f.block_state_proof_.as_slice())); + TRY_RESULT_ASSIGN(state_root_hash, unpack_block_state_proof(block_id, block_state_proof)); + } + + TRY_RESULT(queue_proof, vm::std_boc_deserialize(f.queue_proof_.as_slice())); + auto virtual_root = vm::MerkleProof::virtualize(queue_proof, 1); + if (virtual_root.is_null()) { + return td::Status::Error("invalid queue proof"); + } + if (virtual_root->get_hash().as_slice() != state_root_hash.as_slice()) { + return td::Status::Error("state root hash mismatch"); + } + + // Validate proof + TRY_RESULT_PREFIX(state, ShardStateQ::fetch(block_id, {}, virtual_root), "invalid proof: "); + TRY_RESULT_PREFIX(outq_descr, state->message_queue(), "invalid proof: "); + + block::gen::OutMsgQueueInfo::Record qinfo; + if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { + return td::Status::Error("invalid proof: invalid message queue"); + } + TRY_STATUS_PREFIX(check_no_prunned(qinfo.proc_info->prefetch_ref(0)), "invalid proc_info: ") + TRY_STATUS_PREFIX(check_no_prunned(qinfo.ihr_pending->prefetch_ref(0)), "invalid ihr_pending: ") + TRY_RESULT(cnt, process_queue(block_id, dst_shard, qinfo)); + if (cnt != f.msg_count_) { + return td::Status::Error(PSTRING() << "invalid msg_count: expected=" << f.msg_count_ << ", found=" << cnt); + } + return Ref(true, std::move(virtual_root), std::move(block_state_proof), cnt); + } catch (vm::VmVirtError& err) { + return td::Status::Error(PSTRING() << "invalid proof: " << err.get_msg()); + } } void WaitOutMsgQueueProof::alarm() { @@ -296,7 +340,7 @@ void BuildOutMsgQueueProof::got_block_root(Ref root) { } void BuildOutMsgQueueProof::build_proof() { - auto result = OutMsgQueueProof::serialize(block_id_, dst_shard_, std::move(state_root_), std::move(block_root_)); + auto result = OutMsgQueueProof::build(block_id_, dst_shard_, std::move(state_root_), std::move(block_root_)); if (result.is_error()) { LOG(ERROR) << "Failed to build msg queue proof: " << result.error(); } diff --git a/validator/impl/out-msg-queue-proof.hpp b/validator/impl/out-msg-queue-proof.hpp index 6a18ca6c..e94f805f 100644 --- a/validator/impl/out-msg-queue-proof.hpp +++ b/validator/impl/out-msg-queue-proof.hpp @@ -62,7 +62,6 @@ class WaitOutMsgQueueProof : public td::actor::Actor { void run_net(); - private: BlockIdExt block_id_; ShardIdFull dst_shard_; @@ -80,12 +79,9 @@ class WaitOutMsgQueueProof : public td::actor::Actor { class BuildOutMsgQueueProof : public td::actor::Actor { public: BuildOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard, - td::actor::ActorId manager, - td::Promise> promise) - : block_id_(std::move(block_id)) - , dst_shard_(dst_shard) - , manager_(manager) - , promise_(std::move(promise)) { + td::actor::ActorId manager, + td::Promise> promise) + : block_id_(std::move(block_id)), dst_shard_(dst_shard), manager_(manager), promise_(std::move(promise)) { } void abort_query(td::Status reason); diff --git a/validator/impl/validate-query.cpp b/validator/impl/validate-query.cpp index f43bad29..3329e4fd 100644 --- a/validator/impl/validate-query.cpp +++ b/validator/impl/validate-query.cpp @@ -522,6 +522,13 @@ bool ValidateQuery::extract_collated_data_from(Ref croot, int idx) { top_shard_descr_dict_ = std::make_unique(cs.prefetch_ref(), 96); return true; } + if (block::gen::t_NeighborMsgQueueLimits.has_valid_tag(cs)) { + LOG(DEBUG) << "collated datum # " << idx << " is a NeighborMsgQueueLimits"; + if (!block::gen::t_NeighborMsgQueueLimits.validate_upto(10000, cs)) { + return reject_query("invalid NeighborMsgQueueLimits"); + } + neighbor_msg_queues_limits_ = vm::Dictionary{cs.prefetch_ref(0), 32 + 64}; + } LOG(WARNING) << "collated datum # " << idx << " has unknown type (magic " << cs.prefetch_ulong(32) << "), ignoring"; return true; } @@ -4106,7 +4113,19 @@ bool ValidateQuery::check_neighbor_outbound_message(Ref enq_msg, } bool ValidateQuery::check_in_queue() { - block::OutputQueueMerger nb_out_msgs(shard_, neighbors_); + std::vector neighbor_queues; + for (const auto& descr : neighbors_) { + td::BitArray<96> key; + key.bits().store_int(descr.workchain(), 32); + (key.bits() + 32).store_uint(descr.shard().shard, 64); + auto r = neighbor_msg_queues_limits_.lookup(key); + td::int32 msg_limit = r.is_null() ? -1 : (td::int32)r->prefetch_long(32); + if (msg_limit < -1) { + return reject_query("invalid value in NeighborMsgQueueLimits"); + } + neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_, msg_limit); + } + block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues)); while (!nb_out_msgs.is_eof()) { auto kv = nb_out_msgs.extract_cur(); CHECK(kv && kv->msg.not_null()); diff --git a/validator/impl/validate-query.hpp b/validator/impl/validate-query.hpp index 942b32a5..cd14504e 100644 --- a/validator/impl/validate-query.hpp +++ b/validator/impl/validate-query.hpp @@ -206,6 +206,7 @@ class ValidateQuery : public td::actor::Actor { block::ActionPhaseConfig action_phase_cfg_; td::RefInt256 masterchain_create_fee_, basechain_create_fee_; + vm::Dictionary neighbor_msg_queues_limits_{32 + 64}; std::vector neighbors_; std::map> aux_mc_states_; diff --git a/validator/interfaces/out-msg-queue-proof.h b/validator/interfaces/out-msg-queue-proof.h index 1fece66a..5d792491 100644 --- a/validator/interfaces/out-msg-queue-proof.h +++ b/validator/interfaces/out-msg-queue-proof.h @@ -18,6 +18,7 @@ #include "vm/cells.h" #include "ton/ton-types.h" #include "auto/tl/ton_api.h" +#include "block/block.h" namespace ton { @@ -25,17 +26,21 @@ namespace validator { using td::Ref; struct OutMsgQueueProof : public td::CntObject { - OutMsgQueueProof(Ref state_root, Ref block_state_proof) - : state_root_(std::move(state_root)), block_state_proof_(std::move(block_state_proof)) { + OutMsgQueueProof(Ref state_root, Ref block_state_proof, td::int32 msg_count = -1) + : state_root_(std::move(state_root)), block_state_proof_(std::move(block_state_proof)), msg_count_(msg_count) { } Ref state_root_; Ref block_state_proof_; + td::int32 msg_count_; // -1 - up to end of queue static td::Result> fetch(BlockIdExt block_id, ShardIdFull dst_shard, const ton_api::tonNode_outMsgQueueProof &f); - static td::Result> serialize( - BlockIdExt block_id, ShardIdFull dst_shard, Ref state_root, Ref block_root); + static td::Result> build(BlockIdExt block_id, ShardIdFull dst_shard, + Ref state_root, + Ref block_root); + + static const td::uint64 QUEUE_SIZE_THRESHOLD = 128 * 1024; }; } // namespace validator