mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Fix processing message queue in collator and validator
This commit is contained in:
parent
9e02853cbb
commit
7155bf5eca
3 changed files with 28 additions and 13 deletions
|
@ -3820,9 +3820,6 @@ bool Collator::store_master_ref(vm::CellBuilder& cb) {
|
||||||
bool Collator::update_processed_upto() {
|
bool Collator::update_processed_upto() {
|
||||||
auto ref_mc_seqno = is_masterchain() ? new_block_seqno : prev_mc_block_seqno;
|
auto ref_mc_seqno = is_masterchain() ? new_block_seqno : prev_mc_block_seqno;
|
||||||
update_min_mc_seqno(ref_mc_seqno);
|
update_min_mc_seqno(ref_mc_seqno);
|
||||||
if (in_msg_dict->is_empty()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (last_proc_int_msg_.first) {
|
if (last_proc_int_msg_.first) {
|
||||||
if (!processed_upto_->insert(ref_mc_seqno, last_proc_int_msg_.first, last_proc_int_msg_.second.cbits())) {
|
if (!processed_upto_->insert(ref_mc_seqno, last_proc_int_msg_.first, last_proc_int_msg_.second.cbits())) {
|
||||||
return fatal_error("cannot update our ProcessedUpto to reflect processed inbound message");
|
return fatal_error("cannot update our ProcessedUpto to reflect processed inbound message");
|
||||||
|
|
|
@ -3974,7 +3974,7 @@ bool ValidateQuery::check_processed_upto() {
|
||||||
// similar to Collator::process_inbound_message
|
// similar to Collator::process_inbound_message
|
||||||
bool ValidateQuery::check_neighbor_outbound_message(Ref<vm::CellSlice> enq_msg, ton::LogicalTime lt,
|
bool ValidateQuery::check_neighbor_outbound_message(Ref<vm::CellSlice> enq_msg, ton::LogicalTime lt,
|
||||||
td::ConstBitPtr key, const block::McShardDescr& nb,
|
td::ConstBitPtr key, const block::McShardDescr& nb,
|
||||||
bool& unprocessed) {
|
bool& unprocessed, bool& processed_here, td::Bits256& msg_hash) {
|
||||||
unprocessed = false;
|
unprocessed = false;
|
||||||
block::EnqueuedMsgDescr enq;
|
block::EnqueuedMsgDescr enq;
|
||||||
if (!enq.unpack(enq_msg.write())) { // unpack EnqueuedMsg
|
if (!enq.unpack(enq_msg.write())) { // unpack EnqueuedMsg
|
||||||
|
@ -3995,6 +3995,8 @@ bool ValidateQuery::check_neighbor_outbound_message(Ref<vm::CellSlice> enq_msg,
|
||||||
auto out_entry = out_msg_dict_->lookup(key + 96, 256);
|
auto out_entry = out_msg_dict_->lookup(key + 96, 256);
|
||||||
bool f0 = ps_.processed_upto_->already_processed(enq);
|
bool f0 = ps_.processed_upto_->already_processed(enq);
|
||||||
bool f1 = ns_.processed_upto_->already_processed(enq);
|
bool f1 = ns_.processed_upto_->already_processed(enq);
|
||||||
|
processed_here = f1 && !f0;
|
||||||
|
msg_hash = enq.hash_;
|
||||||
if (f0 && !f1) {
|
if (f0 && !f1) {
|
||||||
return fatal_error(
|
return fatal_error(
|
||||||
"a previously processed message has been un-processed (impossible situation after the validation of "
|
"a previously processed message has been un-processed (impossible situation after the validation of "
|
||||||
|
@ -4074,6 +4076,18 @@ bool ValidateQuery::check_neighbor_outbound_message(Ref<vm::CellSlice> enq_msg,
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ValidateQuery::check_in_queue() {
|
bool ValidateQuery::check_in_queue() {
|
||||||
|
int imported_messages_count = 0;
|
||||||
|
in_msg_dict_->check_for_each_extra([&](Ref<vm::CellSlice> value, Ref<vm::CellSlice>, td::ConstBitPtr, int) {
|
||||||
|
int tag = block::gen::t_InMsg.get_tag(*value);
|
||||||
|
if (tag == block::gen::InMsg::msg_import_fin || tag == block::gen::InMsg::msg_import_tr) {
|
||||||
|
++imported_messages_count;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
if (imported_messages_count == 0 && claimed_proc_lt_ == 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
std::vector<block::OutputQueueMerger::Neighbor> neighbor_queues;
|
std::vector<block::OutputQueueMerger::Neighbor> neighbor_queues;
|
||||||
for (const auto& descr : neighbors_) {
|
for (const auto& descr : neighbors_) {
|
||||||
td::BitArray<96> key;
|
td::BitArray<96> key;
|
||||||
|
@ -4082,13 +4096,6 @@ bool ValidateQuery::check_in_queue() {
|
||||||
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_);
|
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_);
|
||||||
}
|
}
|
||||||
block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues));
|
block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues));
|
||||||
if (in_msg_dict_->is_empty()) {
|
|
||||||
LOG(DEBUG) << "in_msg_dict is empty, skip checking neighbors' message queues";
|
|
||||||
if (processed_upto_updated_) {
|
|
||||||
return reject_query("processed_upto was updated, but no messages were processed");
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
while (!nb_out_msgs.is_eof()) {
|
while (!nb_out_msgs.is_eof()) {
|
||||||
auto kv = nb_out_msgs.extract_cur();
|
auto kv = nb_out_msgs.extract_cur();
|
||||||
CHECK(kv && kv->msg.not_null());
|
CHECK(kv && kv->msg.not_null());
|
||||||
|
@ -4099,7 +4106,10 @@ bool ValidateQuery::check_in_queue() {
|
||||||
block::gen::t_EnqueuedMsg.print(std::cerr, *(kv->msg));
|
block::gen::t_EnqueuedMsg.print(std::cerr, *(kv->msg));
|
||||||
}
|
}
|
||||||
bool unprocessed = false;
|
bool unprocessed = false;
|
||||||
if (!check_neighbor_outbound_message(kv->msg, kv->lt, kv->key.cbits(), neighbors_.at(kv->source), unprocessed)) {
|
bool processed_here = false;
|
||||||
|
td::Bits256 msg_hash;
|
||||||
|
if (!check_neighbor_outbound_message(kv->msg, kv->lt, kv->key.cbits(), neighbors_.at(kv->source), unprocessed,
|
||||||
|
processed_here, msg_hash)) {
|
||||||
if (verbosity > 1) {
|
if (verbosity > 1) {
|
||||||
std::cerr << "invalid neighbor outbound message: lt=" << kv->lt << " from=" << kv->source
|
std::cerr << "invalid neighbor outbound message: lt=" << kv->lt << " from=" << kv->source
|
||||||
<< " key=" << kv->key.to_hex() << " msg=";
|
<< " key=" << kv->key.to_hex() << " msg=";
|
||||||
|
@ -4108,6 +4118,13 @@ bool ValidateQuery::check_in_queue() {
|
||||||
return reject_query("error processing outbound internal message "s + kv->key.to_hex() + " of neighbor " +
|
return reject_query("error processing outbound internal message "s + kv->key.to_hex() + " of neighbor " +
|
||||||
neighbors_.at(kv->source).blk_.to_str());
|
neighbors_.at(kv->source).blk_.to_str());
|
||||||
}
|
}
|
||||||
|
if (processed_here) {
|
||||||
|
--imported_messages_count;
|
||||||
|
}
|
||||||
|
auto msg_lt = kv->lt;
|
||||||
|
if (imported_messages_count == 0 && msg_lt == claimed_proc_lt_ && msg_hash == claimed_proc_hash_) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
if (unprocessed) {
|
if (unprocessed) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -340,7 +340,8 @@ class ValidateQuery : public td::actor::Actor {
|
||||||
bool check_out_msg_descr();
|
bool check_out_msg_descr();
|
||||||
bool check_processed_upto();
|
bool check_processed_upto();
|
||||||
bool check_neighbor_outbound_message(Ref<vm::CellSlice> enq_msg, ton::LogicalTime lt, td::ConstBitPtr key,
|
bool check_neighbor_outbound_message(Ref<vm::CellSlice> enq_msg, ton::LogicalTime lt, td::ConstBitPtr key,
|
||||||
const block::McShardDescr& src_nb, bool& unprocessed);
|
const block::McShardDescr& src_nb, bool& unprocessed, bool& processed_here,
|
||||||
|
td::Bits256& msg_hash);
|
||||||
bool check_in_queue();
|
bool check_in_queue();
|
||||||
bool check_delivered_dequeued();
|
bool check_delivered_dequeued();
|
||||||
std::unique_ptr<block::Account> make_account_from(td::ConstBitPtr addr, Ref<vm::CellSlice> account,
|
std::unique_ptr<block::Account> make_account_from(td::ConstBitPtr addr, Ref<vm::CellSlice> account,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue