1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Fix validating inbound msg queues

This commit is contained in:
SpyCheese 2023-07-21 19:29:29 +03:00
parent 869c6fe675
commit 32b3fe748a
3 changed files with 14 additions and 34 deletions

View file

@ -3805,6 +3805,9 @@ bool Collator::store_master_ref(vm::CellBuilder& cb) {
bool Collator::update_processed_upto() { bool Collator::update_processed_upto() {
auto ref_mc_seqno = is_masterchain() ? new_block_seqno : prev_mc_block_seqno; auto ref_mc_seqno = is_masterchain() ? new_block_seqno : prev_mc_block_seqno;
update_min_mc_seqno(ref_mc_seqno); update_min_mc_seqno(ref_mc_seqno);
if (in_msg_dict->is_empty()) {
return true;
}
if (last_proc_int_msg_.first) { if (last_proc_int_msg_.first) {
if (!processed_upto_->insert(ref_mc_seqno, last_proc_int_msg_.first, last_proc_int_msg_.second.cbits())) { if (!processed_upto_->insert(ref_mc_seqno, last_proc_int_msg_.first, last_proc_int_msg_.second.cbits())) {
return fatal_error("cannot update our ProcessedUpto to reflect processed inbound message"); return fatal_error("cannot update our ProcessedUpto to reflect processed inbound message");

View file

@ -3927,6 +3927,7 @@ bool ValidateQuery::check_processed_upto() {
if (!ok) { if (!ok) {
return reject_query("new ProcessedInfo is not obtained from old ProcessedInfo by adding at most one new entry"); return reject_query("new ProcessedInfo is not obtained from old ProcessedInfo by adding at most one new entry");
} }
processed_upto_updated_ = upd;
if (upd) { if (upd) {
if (upd->shard != shard_.shard) { if (upd->shard != shard_.shard) {
return reject_query("newly-added ProcessedInfo entry refers to shard "s + return reject_query("newly-added ProcessedInfo entry refers to shard "s +
@ -4007,37 +4008,8 @@ bool ValidateQuery::check_neighbor_outbound_message(Ref<vm::CellSlice> enq_msg,
key.to_hex(352) + " of neighbor " + nb.blk_.to_str()); key.to_hex(352) + " of neighbor " + nb.blk_.to_str());
} }
if (shard_contains(shard_, enq.cur_prefix_)) { if (shard_contains(shard_, enq.cur_prefix_)) {
// if this message comes from our own outbound queue, we must have dequeued it // this message couldn't come from our own outbound queue because processed messages from our queue don't stay here
if (out_entry.is_null()) { return fatal_error("have an already processed EnqueuedMsg from our shard: "s + key.to_hex(352));
return reject_query("our old outbound queue contains EnqueuedMsg with key "s + key.to_hex(352) +
" already processed by this shard, but there is no ext_message_deq OutMsg record for this "
"message in this block");
}
int tag = block::gen::t_OutMsg.get_tag(*out_entry);
if (tag == block::gen::OutMsg::msg_export_deq_short) {
block::gen::OutMsg::Record_msg_export_deq_short deq;
if (!tlb::csr_unpack(std::move(out_entry), deq)) {
return reject_query(
"cannot unpack msg_export_deq_short OutMsg record for already processed EnqueuedMsg with key "s +
key.to_hex(352) + " of old outbound queue");
}
if (deq.msg_env_hash != enq.msg_env_->get_hash().bits()) {
return reject_query("unpack ext_message_deq OutMsg record for already processed EnqueuedMsg with key "s +
key.to_hex(352) + " of old outbound queue refers to MsgEnvelope with different hash " +
deq.msg_env_hash.to_hex());
}
} else {
block::gen::OutMsg::Record_msg_export_deq deq;
if (!tlb::csr_unpack(std::move(out_entry), deq)) {
return reject_query(
"cannot unpack msg_export_deq OutMsg record for already processed EnqueuedMsg with key "s +
key.to_hex(352) + " of old outbound queue");
}
if (deq.out_msg->get_hash() != enq.msg_env_->get_hash()) {
return reject_query("unpack ext_message_deq OutMsg record for already processed EnqueuedMsg with key "s +
key.to_hex(352) + " of old outbound queue contains a different MsgEnvelope");
}
}
} }
// next check is incorrect after a merge, when ns_.processed_upto has > 1 entries // next check is incorrect after a merge, when ns_.processed_upto has > 1 entries
// we effectively comment it out // we effectively comment it out
@ -4110,6 +4082,13 @@ bool ValidateQuery::check_in_queue() {
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_); neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_);
} }
block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues)); block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues));
if (in_msg_dict_->is_empty()) {
LOG(DEBUG) << "in_msg_dict is empty, skip checking neighbors' message queues";
if (processed_upto_updated_) {
return reject_query("processed_upto was updated, but no messages were processed");
}
return true;
}
while (!nb_out_msgs.is_eof()) { while (!nb_out_msgs.is_eof()) {
auto kv = nb_out_msgs.extract_cur(); auto kv = nb_out_msgs.extract_cur();
CHECK(kv && kv->msg.not_null()); CHECK(kv && kv->msg.not_null());
@ -4130,12 +4109,10 @@ bool ValidateQuery::check_in_queue() {
neighbors_.at(kv->source).blk_.to_str()); neighbors_.at(kv->source).blk_.to_str());
} }
if (unprocessed) { if (unprocessed) {
inbound_queues_empty_ = false;
return true; return true;
} }
nb_out_msgs.next(); nb_out_msgs.next();
} }
inbound_queues_empty_ = true;
return true; return true;
} }

View file

@ -210,6 +210,7 @@ class ValidateQuery : public td::actor::Actor {
std::map<BlockSeqno, Ref<MasterchainStateQ>> aux_mc_states_; std::map<BlockSeqno, Ref<MasterchainStateQ>> aux_mc_states_;
block::ShardState ps_, ns_; block::ShardState ps_, ns_;
bool processed_upto_updated_{false};
std::unique_ptr<vm::AugmentedDictionary> sibling_out_msg_queue_; std::unique_ptr<vm::AugmentedDictionary> sibling_out_msg_queue_;
std::shared_ptr<block::MsgProcessedUptoCollection> sibling_processed_upto_; std::shared_ptr<block::MsgProcessedUptoCollection> sibling_processed_upto_;
@ -223,7 +224,6 @@ class ValidateQuery : public td::actor::Actor {
ton::LogicalTime proc_lt_{0}, claimed_proc_lt_{0}, min_enq_lt_{~0ULL}; ton::LogicalTime proc_lt_{0}, claimed_proc_lt_{0}, min_enq_lt_{~0ULL};
ton::Bits256 proc_hash_ = ton::Bits256::zero(), claimed_proc_hash_, min_enq_hash_; ton::Bits256 proc_hash_ = ton::Bits256::zero(), claimed_proc_hash_, min_enq_hash_;
bool inbound_queues_empty_{false};
std::vector<std::tuple<Bits256, LogicalTime, LogicalTime>> msg_proc_lt_; std::vector<std::tuple<Bits256, LogicalTime, LogicalTime>> msg_proc_lt_;