From 32b3fe748ab1799843b73595e1f9ebad597934ee Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Fri, 21 Jul 2023 19:29:29 +0300 Subject: [PATCH] Fix validating inbound msg queues --- validator/impl/collator.cpp | 3 +++ validator/impl/validate-query.cpp | 43 +++++++------------------------ validator/impl/validate-query.hpp | 2 +- 3 files changed, 14 insertions(+), 34 deletions(-) diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 52d9c1c4..f11bd116 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -3805,6 +3805,9 @@ bool Collator::store_master_ref(vm::CellBuilder& cb) { bool Collator::update_processed_upto() { auto ref_mc_seqno = is_masterchain() ? new_block_seqno : prev_mc_block_seqno; update_min_mc_seqno(ref_mc_seqno); + if (in_msg_dict->is_empty()) { + return true; + } if (last_proc_int_msg_.first) { if (!processed_upto_->insert(ref_mc_seqno, last_proc_int_msg_.first, last_proc_int_msg_.second.cbits())) { return fatal_error("cannot update our ProcessedUpto to reflect processed inbound message"); diff --git a/validator/impl/validate-query.cpp b/validator/impl/validate-query.cpp index 4f8c464c..8d0b5173 100644 --- a/validator/impl/validate-query.cpp +++ b/validator/impl/validate-query.cpp @@ -3927,6 +3927,7 @@ bool ValidateQuery::check_processed_upto() { if (!ok) { return reject_query("new ProcessedInfo is not obtained from old ProcessedInfo by adding at most one new entry"); } + processed_upto_updated_ = upd; if (upd) { if (upd->shard != shard_.shard) { return reject_query("newly-added ProcessedInfo entry refers to shard "s + @@ -4007,37 +4008,8 @@ bool ValidateQuery::check_neighbor_outbound_message(Ref enq_msg, key.to_hex(352) + " of neighbor " + nb.blk_.to_str()); } if (shard_contains(shard_, enq.cur_prefix_)) { - // if this message comes from our own outbound queue, we must have dequeued it - if (out_entry.is_null()) { - return reject_query("our old outbound queue contains EnqueuedMsg with key "s + key.to_hex(352) + - " already processed by this shard, but there is no ext_message_deq OutMsg record for this " - "message in this block"); - } - int tag = block::gen::t_OutMsg.get_tag(*out_entry); - if (tag == block::gen::OutMsg::msg_export_deq_short) { - block::gen::OutMsg::Record_msg_export_deq_short deq; - if (!tlb::csr_unpack(std::move(out_entry), deq)) { - return reject_query( - "cannot unpack msg_export_deq_short OutMsg record for already processed EnqueuedMsg with key "s + - key.to_hex(352) + " of old outbound queue"); - } - if (deq.msg_env_hash != enq.msg_env_->get_hash().bits()) { - return reject_query("unpack ext_message_deq OutMsg record for already processed EnqueuedMsg with key "s + - key.to_hex(352) + " of old outbound queue refers to MsgEnvelope with different hash " + - deq.msg_env_hash.to_hex()); - } - } else { - block::gen::OutMsg::Record_msg_export_deq deq; - if (!tlb::csr_unpack(std::move(out_entry), deq)) { - return reject_query( - "cannot unpack msg_export_deq OutMsg record for already processed EnqueuedMsg with key "s + - key.to_hex(352) + " of old outbound queue"); - } - if (deq.out_msg->get_hash() != enq.msg_env_->get_hash()) { - return reject_query("unpack ext_message_deq OutMsg record for already processed EnqueuedMsg with key "s + - key.to_hex(352) + " of old outbound queue contains a different MsgEnvelope"); - } - } + // this message couldn't come from our own outbound queue because processed messages from our queue don't stay here + return fatal_error("have an already processed EnqueuedMsg from our shard: "s + key.to_hex(352)); } // next check is incorrect after a merge, when ns_.processed_upto has > 1 entries // we effectively comment it out @@ -4110,6 +4082,13 @@ bool ValidateQuery::check_in_queue() { neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_); } block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues)); + if (in_msg_dict_->is_empty()) { + LOG(DEBUG) << "in_msg_dict is empty, skip checking neighbors' message queues"; + if (processed_upto_updated_) { + return reject_query("processed_upto was updated, but no messages were processed"); + } + return true; + } while (!nb_out_msgs.is_eof()) { auto kv = nb_out_msgs.extract_cur(); CHECK(kv && kv->msg.not_null()); @@ -4130,12 +4109,10 @@ bool ValidateQuery::check_in_queue() { neighbors_.at(kv->source).blk_.to_str()); } if (unprocessed) { - inbound_queues_empty_ = false; return true; } nb_out_msgs.next(); } - inbound_queues_empty_ = true; return true; } diff --git a/validator/impl/validate-query.hpp b/validator/impl/validate-query.hpp index 942b32a5..b1d52819 100644 --- a/validator/impl/validate-query.hpp +++ b/validator/impl/validate-query.hpp @@ -210,6 +210,7 @@ class ValidateQuery : public td::actor::Actor { std::map> aux_mc_states_; block::ShardState ps_, ns_; + bool processed_upto_updated_{false}; std::unique_ptr sibling_out_msg_queue_; std::shared_ptr sibling_processed_upto_; @@ -223,7 +224,6 @@ class ValidateQuery : public td::actor::Actor { ton::LogicalTime proc_lt_{0}, claimed_proc_lt_{0}, min_enq_lt_{~0ULL}; ton::Bits256 proc_hash_ = ton::Bits256::zero(), claimed_proc_hash_, min_enq_hash_; - bool inbound_queues_empty_{false}; std::vector> msg_proc_lt_;