From 7155bf5ecad50700bc4d50a6eba98ebd879fca34 Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Sun, 13 Aug 2023 20:37:59 +0300 Subject: [PATCH] Fix processing message queue in collator and validator --- validator/impl/collator.cpp | 3 --- validator/impl/validate-query.cpp | 35 +++++++++++++++++++++++-------- validator/impl/validate-query.hpp | 3 ++- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index d2ace04f..4eada9fd 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -3820,9 +3820,6 @@ bool Collator::store_master_ref(vm::CellBuilder& cb) { bool Collator::update_processed_upto() { auto ref_mc_seqno = is_masterchain() ? new_block_seqno : prev_mc_block_seqno; update_min_mc_seqno(ref_mc_seqno); - if (in_msg_dict->is_empty()) { - return true; - } if (last_proc_int_msg_.first) { if (!processed_upto_->insert(ref_mc_seqno, last_proc_int_msg_.first, last_proc_int_msg_.second.cbits())) { return fatal_error("cannot update our ProcessedUpto to reflect processed inbound message"); diff --git a/validator/impl/validate-query.cpp b/validator/impl/validate-query.cpp index 8d0b5173..ecce370d 100644 --- a/validator/impl/validate-query.cpp +++ b/validator/impl/validate-query.cpp @@ -3974,7 +3974,7 @@ bool ValidateQuery::check_processed_upto() { // similar to Collator::process_inbound_message bool ValidateQuery::check_neighbor_outbound_message(Ref enq_msg, ton::LogicalTime lt, td::ConstBitPtr key, const block::McShardDescr& nb, - bool& unprocessed) { + bool& unprocessed, bool& processed_here, td::Bits256& msg_hash) { unprocessed = false; block::EnqueuedMsgDescr enq; if (!enq.unpack(enq_msg.write())) { // unpack EnqueuedMsg @@ -3995,6 +3995,8 @@ bool ValidateQuery::check_neighbor_outbound_message(Ref enq_msg, auto out_entry = out_msg_dict_->lookup(key + 96, 256); bool f0 = ps_.processed_upto_->already_processed(enq); bool f1 = ns_.processed_upto_->already_processed(enq); + processed_here = f1 && !f0; + msg_hash = enq.hash_; if (f0 && !f1) { return fatal_error( "a previously processed message has been un-processed (impossible situation after the validation of " @@ -4074,6 +4076,18 @@ bool ValidateQuery::check_neighbor_outbound_message(Ref enq_msg, } bool ValidateQuery::check_in_queue() { + int imported_messages_count = 0; + in_msg_dict_->check_for_each_extra([&](Ref value, Ref, td::ConstBitPtr, int) { + int tag = block::gen::t_InMsg.get_tag(*value); + if (tag == block::gen::InMsg::msg_import_fin || tag == block::gen::InMsg::msg_import_tr) { + ++imported_messages_count; + } + return true; + }); + if (imported_messages_count == 0 && claimed_proc_lt_ == 0) { + return true; + } + std::vector neighbor_queues; for (const auto& descr : neighbors_) { td::BitArray<96> key; @@ -4082,13 +4096,6 @@ bool ValidateQuery::check_in_queue() { neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_); } block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues)); - if (in_msg_dict_->is_empty()) { - LOG(DEBUG) << "in_msg_dict is empty, skip checking neighbors' message queues"; - if (processed_upto_updated_) { - return reject_query("processed_upto was updated, but no messages were processed"); - } - return true; - } while (!nb_out_msgs.is_eof()) { auto kv = nb_out_msgs.extract_cur(); CHECK(kv && kv->msg.not_null()); @@ -4099,7 +4106,10 @@ bool ValidateQuery::check_in_queue() { block::gen::t_EnqueuedMsg.print(std::cerr, *(kv->msg)); } bool unprocessed = false; - if (!check_neighbor_outbound_message(kv->msg, kv->lt, kv->key.cbits(), neighbors_.at(kv->source), unprocessed)) { + bool processed_here = false; + td::Bits256 msg_hash; + if (!check_neighbor_outbound_message(kv->msg, kv->lt, kv->key.cbits(), neighbors_.at(kv->source), unprocessed, + processed_here, msg_hash)) { if (verbosity > 1) { std::cerr << "invalid neighbor outbound message: lt=" << kv->lt << " from=" << kv->source << " key=" << kv->key.to_hex() << " msg="; @@ -4108,6 +4118,13 @@ bool ValidateQuery::check_in_queue() { return reject_query("error processing outbound internal message "s + kv->key.to_hex() + " of neighbor " + neighbors_.at(kv->source).blk_.to_str()); } + if (processed_here) { + --imported_messages_count; + } + auto msg_lt = kv->lt; + if (imported_messages_count == 0 && msg_lt == claimed_proc_lt_ && msg_hash == claimed_proc_hash_) { + return true; + } if (unprocessed) { return true; } diff --git a/validator/impl/validate-query.hpp b/validator/impl/validate-query.hpp index b1d52819..eb052886 100644 --- a/validator/impl/validate-query.hpp +++ b/validator/impl/validate-query.hpp @@ -340,7 +340,8 @@ class ValidateQuery : public td::actor::Actor { bool check_out_msg_descr(); bool check_processed_upto(); bool check_neighbor_outbound_message(Ref enq_msg, ton::LogicalTime lt, td::ConstBitPtr key, - const block::McShardDescr& src_nb, bool& unprocessed); + const block::McShardDescr& src_nb, bool& unprocessed, bool& processed_here, + td::Bits256& msg_hash); bool check_in_queue(); bool check_delivered_dequeued(); std::unique_ptr make_account_from(td::ConstBitPtr addr, Ref account,