From 869c6fe675bb7ee936a5284d229eadb0ae0e40df Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Thu, 20 Jul 2023 17:48:52 +0300 Subject: [PATCH] Rework limiting imported msg queues --- crypto/block/block.tlb | 1 - crypto/block/output-queue-merger.cpp | 14 +++++--------- crypto/block/output-queue-merger.h | 2 ++ validator/impl/collator-impl.h | 1 - validator/impl/collator.cpp | 25 ++++++++++--------------- validator/impl/out-msg-queue-proof.cpp | 13 +++++++++++-- validator/impl/validate-query.cpp | 22 +--------------------- validator/impl/validate-query.hpp | 1 - validator/validator-group.cpp | 2 +- 9 files changed, 30 insertions(+), 51 deletions(-) diff --git a/crypto/block/block.tlb b/crypto/block/block.tlb index f2f11fc7..2176c0ad 100644 --- a/crypto/block/block.tlb +++ b/crypto/block/block.tlb @@ -829,7 +829,6 @@ top_block_descr#d5 proof_for:BlockIdExt signatures:(Maybe ^BlockSignatures) // COLLATED DATA // top_block_descr_set#4ac789f3 collection:(HashmapE 96 ^TopBlockDescr) = TopBlockDescrSet; -neighbor_msg_queue_limits#7e549333 neighbors:(HashmapE 96 int32) = NeighborMsgQueueLimits; // // VALIDATOR MISBEHAVIOR COMPLAINTS diff --git a/crypto/block/output-queue-merger.cpp b/crypto/block/output-queue-merger.cpp index 2955b3e0..f3ec6143 100644 --- a/crypto/block/output-queue-merger.cpp +++ b/crypto/block/output-queue-merger.cpp @@ -199,10 +199,6 @@ bool OutputQueueMerger::load() { unsigned long long lt = heap[0]->lt; std::size_t orig_size = msg_list.size(); do { - if (src_remaining_msgs_[heap[0]->source] == 0) { - std::pop_heap(heap.begin(), heap.end(), MsgKeyValue::greater); - continue; - } while (heap[0]->is_fork()) { auto other = std::make_unique(); if (!heap[0]->split(*other)) { @@ -218,17 +214,17 @@ bool OutputQueueMerger::load() { heap.pop_back(); } while (!heap.empty() && heap[0]->lt <= lt); std::sort(msg_list.begin() + orig_size, msg_list.end(), MsgKeyValue::less); - size_t j = orig_size; for (size_t i = orig_size; i < msg_list.size(); ++i) { td::int32 &remaining = src_remaining_msgs_[msg_list[i]->source]; - if (remaining != 0) { - if (remaining > 0) { + if (remaining != -1) { + if (remaining == 0) { + limit_exceeded = true; + } else { --remaining; } - msg_list[j++] = std::move(msg_list[i]); } + msg_list[i]->limit_exceeded = limit_exceeded; } - msg_list.resize(j); return msg_list.size() > orig_size; } diff --git a/crypto/block/output-queue-merger.h b/crypto/block/output-queue-merger.h index 3eb4b379..26deb7ee 100644 --- a/crypto/block/output-queue-merger.h +++ b/crypto/block/output-queue-merger.h @@ -32,6 +32,7 @@ struct OutputQueueMerger { int source; int key_len{0}; td::BitArray key; + bool limit_exceeded{false}; MsgKeyValue() = default; MsgKeyValue(int src, Ref node); MsgKeyValue(td::ConstBitPtr key_pfx, int key_pfx_len, int src, Ref node); @@ -82,6 +83,7 @@ struct OutputQueueMerger { std::vector src_remaining_msgs_; bool eof; bool failed; + bool limit_exceeded{false}; void add_root(int src, Ref outmsg_root, td::int32 msg_limit); bool load(); }; diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index 59be8d4b..2839a0e5 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -134,7 +134,6 @@ class Collator final : public td::actor::Actor { std::unique_ptr shard_conf_; std::map> aux_mc_states_; std::map neighbor_msg_queues_limits_; - vm::Dictionary neighbor_msg_queues_limits_dict_{32 + 64}; std::vector neighbors_; std::unique_ptr nb_out_msgs_; std::vector special_smcs; diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 2d91489e..52d9c1c4 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -701,11 +701,7 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Resultprefetch_ref(0); descr.set_queue_root(queue_root); if (res->msg_count_ != -1) { - LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << res->msg_count_; - td::BitArray<96> key; - key.bits().store_int(block_id.id.workchain, 32); - (key.bits() + 32).store_uint(block_id.id.shard, 64); - neighbor_msg_queues_limits_dict_.set_builder(key, vm::CellBuilder().store_long(res->msg_count_, 32)); + LOG(INFO) << "neighbor " << descr.shard().to_str() << " has msg_limit=" << res->msg_count_; neighbor_msg_queues_limits_[block_id.shard_full()] = res->msg_count_; } // comment the next two lines in the future when the output queues become huge @@ -2833,6 +2829,12 @@ bool Collator::process_inbound_internal_messages() { block_full_ = !block_limit_status_->fits(block::ParamLimits::cl_normal); auto kv = nb_out_msgs_->extract_cur(); CHECK(kv && kv->msg.not_null()); + if (kv->limit_exceeded) { + LOG(INFO) << "limit for imported messages is reached, stop processing inbound internal messages"; + block::EnqueuedMsgDescr enq; + enq.unpack(kv->msg.write()); // Visit cells to include it in proof + break; + } if (!precheck_inbound_message(kv->msg, kv->lt)) { if (verbosity > 1) { std::cerr << "invalid inbound message: lt=" << kv->lt << " from=" << kv->source << " key=" << kv->key.to_hex() @@ -4089,21 +4091,14 @@ bool Collator::create_collated_data() { } collated_roots_.push_back(std::move(cell)); } - // 2. Message count for neighbors' out queues - if (!neighbor_msg_queues_limits_dict_.is_empty()) { - vm::CellBuilder cb; - cb.store_long(block::gen::t_NeighborMsgQueueLimits.cons_tag[0], 32); - cb.store_maybe_ref(neighbor_msg_queues_limits_dict_.get_root_cell()); - collated_roots_.push_back(cb.finalize_novm()); - } if (!full_collated_data_) { return true; } - // 3. Proofs for hashes of states: previous states + neighbors + // 2. Proofs for hashes of states: previous states + neighbors for (const auto& p : block_state_proofs_) { collated_roots_.push_back(p.second); } - // 4. Previous state proof (only shadchains) + // 3. Previous state proof (only shadchains) std::map> proofs; if (!is_masterchain()) { if (!prepare_msg_queue_proof()) { @@ -4117,7 +4112,7 @@ bool Collator::create_collated_data() { } proofs[prev_state_root_->get_hash().bits()] = std::move(state_proof); } - // 5. Proofs for message queues + // 4. Proofs for message queues for (vm::MerkleProofBuilder &mpb : neighbor_proof_builders_) { auto r_proof = mpb.extract_proof(); if (r_proof.is_error()) { diff --git a/validator/impl/out-msg-queue-proof.cpp b/validator/impl/out-msg-queue-proof.cpp index 329da607..59aa8aed 100644 --- a/validator/impl/out-msg-queue-proof.cpp +++ b/validator/impl/out-msg-queue-proof.cpp @@ -52,7 +52,7 @@ static td::Result process_queue(BlockIdExt block_id, ShardIdFull dst_ td::HashSet visited; std::function dfs_cs; - auto dfs = [&](Ref cell) { + auto dfs = [&](const Ref& cell) { if (cell.is_null() || !visited.insert(cell->get_hash()).second) { return; } @@ -65,6 +65,8 @@ static td::Result process_queue(BlockIdExt block_id, ShardIdFull dst_ dfs(cs.prefetch_ref(i)); } }; + TRY_STATUS_PREFIX(check_no_prunned(*qinfo.proc_info), "invalid proc_info proof: ") + TRY_STATUS_PREFIX(check_no_prunned(*qinfo.ihr_pending), "invalid ihr_pending proof: ") dfs_cs(*qinfo.proc_info); dfs_cs(*qinfo.ihr_pending); @@ -76,6 +78,14 @@ static td::Result process_queue(BlockIdExt block_id, ShardIdFull dst_ while (!queue_merger.is_eof()) { auto kv = queue_merger.extract_cur(); queue_merger.next(); + block::EnqueuedMsgDescr enq; + auto msg = kv->msg; + if (!enq.unpack(msg.write())) { + return td::Status::Error("cannot unpack EnqueuedMsgDescr"); + } + if (limit_reached) { + break; + } ++msg_count; // TODO: Get processed_upto from destination shard (in request?) @@ -105,7 +115,6 @@ static td::Result process_queue(BlockIdExt block_id, ShardIdFull dst_ TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ") if (estimated_proof_size > OutMsgQueueProof::QUEUE_SIZE_THRESHOLD) { limit_reached = true; - break; } } return limit_reached ? msg_count : -1; diff --git a/validator/impl/validate-query.cpp b/validator/impl/validate-query.cpp index fcd869b1..4f8c464c 100644 --- a/validator/impl/validate-query.cpp +++ b/validator/impl/validate-query.cpp @@ -526,14 +526,6 @@ bool ValidateQuery::extract_collated_data_from(Ref croot, int idx) { top_shard_descr_dict_ = std::make_unique(cs.prefetch_ref(), 96); return true; } - if (block::gen::t_NeighborMsgQueueLimits.has_valid_tag(cs)) { - LOG(DEBUG) << "collated datum # " << idx << " is a NeighborMsgQueueLimits"; - if (!block::gen::t_NeighborMsgQueueLimits.validate_upto(10000, cs)) { - return reject_query("invalid NeighborMsgQueueLimits"); - } - neighbor_msg_queues_limits_ = vm::Dictionary{cs.prefetch_ref(0), 32 + 64}; - return true; - } LOG(WARNING) << "collated datum # " << idx << " has unknown type (magic " << cs.prefetch_ulong(32) << "), ignoring"; return true; } @@ -4115,19 +4107,7 @@ bool ValidateQuery::check_in_queue() { td::BitArray<96> key; key.bits().store_int(descr.workchain(), 32); (key.bits() + 32).store_uint(descr.shard().shard, 64); - auto r = neighbor_msg_queues_limits_.lookup(key); - td::int32 msg_limit = r.is_null() ? -1 : (td::int32)r->prefetch_long(32); - if (msg_limit < -1) { - return reject_query("invalid value in NeighborMsgQueueLimits"); - } - LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << msg_limit; - neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_, msg_limit); - if (msg_limit != -1 && descr.shard().is_masterchain()) { - return reject_query("masterchain out message queue cannot be limited"); - } - if (msg_limit != -1 && shard_intersects(descr.shard(), shard_)) { - return reject_query("prev block out message queue cannot be limited"); - } + neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_); } block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues)); while (!nb_out_msgs.is_eof()) { diff --git a/validator/impl/validate-query.hpp b/validator/impl/validate-query.hpp index cd14504e..942b32a5 100644 --- a/validator/impl/validate-query.hpp +++ b/validator/impl/validate-query.hpp @@ -206,7 +206,6 @@ class ValidateQuery : public td::actor::Actor { block::ActionPhaseConfig action_phase_cfg_; td::RefInt256 masterchain_create_fee_, basechain_create_fee_; - vm::Dictionary neighbor_msg_queues_limits_{32 + 64}; std::vector neighbors_; std::map> aux_mc_states_; diff --git a/validator/validator-group.cpp b/validator/validator-group.cpp index bc6a9812..5c7bef5b 100644 --- a/validator/validator-group.cpp +++ b/validator/validator-group.cpp @@ -417,7 +417,7 @@ void ValidatorGroup::send_collate_query(td::uint32 round_id, td::Timestamp timeo promise = td::PromiseCreator::lambda([=, SelfId = actor_id(this), promise = std::move(promise), timer = td::Timer()](td::Result R) mutable { if (R.is_ok()) { - LOG(WARNING) << "collate query for " << next_block_id.to_str() << ": success, time=" << timer.elapsed() << "s"; + LOG(INFO) << "collate query for " << next_block_id.to_str() << ": success, time=" << timer.elapsed() << "s"; promise.set_result(R.move_as_ok()); return; }