diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 2101435f..56e0c1db 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -130,6 +130,9 @@ void FullNodeShardImpl::create_overlay() { } void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broadcast, td::Promise promise) { + if (mode_ != FullNodeShardMode::active) { + return promise.set_error(td::Status::Error("cannot check broadcast: shard is not active")); + } auto B = fetch_tl_object(std::move(broadcast), true); if (B.is_error()) { return promise.set_error(B.move_as_error_prefix("failed to parse external message broadcast: ")); diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index b886173a..58f87fce 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -197,7 +197,8 @@ class Collator final : public td::actor::Actor { std::vector, ExtMessage::Hash>> ext_msg_list_; std::priority_queue, std::greater> new_msgs; std::pair last_proc_int_msg_, first_unproc_int_msg_; - std::unique_ptr in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_; + std::unique_ptr in_msg_dict, out_msg_dict, old_out_msg_queue_, out_msg_queue_, + sibling_out_msg_queue_; std::unique_ptr ihr_pending; std::shared_ptr processed_upto_, sibling_processed_upto_; std::unique_ptr block_create_stats_; @@ -292,6 +293,7 @@ class Collator final : public td::actor::Actor { bool process_new_messages(bool enqueue_only = false); int process_one_new_message(block::NewOutMsg msg, bool enqueue_only = false, Ref* is_special = nullptr); bool process_inbound_internal_messages(); + bool precheck_inbound_message(Ref msg, ton::LogicalTime lt); bool process_inbound_message(Ref msg, ton::LogicalTime lt, td::ConstBitPtr key, const block::McShardDescr& src_nb); bool process_inbound_external_messages(); @@ -333,8 +335,11 @@ class Collator final : public td::actor::Actor { bool update_cc); bool create_mc_block_extra(Ref& mc_block_extra); bool create_block(); + Ref collate_shard_block_descr_set(); + bool prepare_msg_queue_proof(); bool create_collated_data(); + bool create_block_candidate(); void return_block_candidate(td::Result saved); bool update_last_proc_int_msg(const std::pair& new_lt_hash); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 3468bc5d..98f0799c 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -825,6 +825,7 @@ bool Collator::import_shard_state_data(block::ShardState& ss) { total_validator_fees_ = std::move(ss.total_validator_fees_); old_global_balance_ = std::move(ss.global_balance_); out_msg_queue_ = std::move(ss.out_msg_queue_); + old_out_msg_queue_ = std::make_unique(*out_msg_queue_); processed_upto_ = std::move(ss.processed_upto_); ihr_pending = std::move(ss.ihr_pending_); block_create_stats_ = std::move(ss.block_create_stats_); @@ -2619,8 +2620,7 @@ bool Collator::delete_out_msg_queue_msg(td::ConstBitPtr key) { return register_out_msg_queue_op(); } -bool Collator::process_inbound_message(Ref enq_msg, ton::LogicalTime lt, td::ConstBitPtr key, - const block::McShardDescr& src_nb) { +bool Collator::precheck_inbound_message(Ref enq_msg, ton::LogicalTime lt) { ton::LogicalTime enqueued_lt = 0; if (enq_msg.is_null() || enq_msg->size_ext() != 0x10040 || (enqueued_lt = enq_msg->prefetch_ulong(64)) < /* 0 */ 1 * lt) { // DEBUG @@ -2646,6 +2646,13 @@ bool Collator::process_inbound_message(Ref enq_msg, ton::LogicalT LOG(ERROR) << "inbound internal MsgEnvelope is invalid according to automated checks"; return false; } + return true; +} + +bool Collator::process_inbound_message(Ref enq_msg, ton::LogicalTime lt, td::ConstBitPtr key, + const block::McShardDescr& src_nb) { + ton::LogicalTime enqueued_lt = enq_msg->prefetch_ulong(64); + auto msg_env = enq_msg->prefetch_ref(); // 1. unpack MsgEnvelope block::tlb::MsgEnvelope::Record_std env; if (!tlb::unpack_cell(msg_env, env)) { @@ -2784,14 +2791,22 @@ bool Collator::process_inbound_message(Ref enq_msg, ton::LogicalT } bool Collator::process_inbound_internal_messages() { - while (!block_full_ && !nb_out_msgs_->is_eof()) { + while (!nb_out_msgs_->is_eof()) { block_full_ = !block_limit_status_->fits(block::ParamLimits::cl_normal); + auto kv = nb_out_msgs_->extract_cur(); + CHECK(kv && kv->msg.not_null()); + if (!precheck_inbound_message(kv->msg, kv->lt)) { + if (verbosity > 1) { + std::cerr << "invalid inbound message: lt=" << kv->lt << " from=" << kv->source << " key=" << kv->key.to_hex() + << " msg="; + block::gen::t_EnqueuedMsg.print(std::cerr, *(kv->msg)); + } + return fatal_error("error processing inbound internal message"); + } if (block_full_) { LOG(INFO) << "BLOCK FULL, stop processing inbound internal messages"; break; } - auto kv = nb_out_msgs_->extract_cur(); - CHECK(kv && kv->msg.not_null()); LOG(DEBUG) << "processing inbound message with (lt,hash)=(" << kv->lt << "," << kv->key.to_hex() << ") from neighbor #" << kv->source; if (verbosity > 2) { @@ -3974,6 +3989,34 @@ Ref Collator::collate_shard_block_descr_set() { return cell; } +bool Collator::prepare_msg_queue_proof() { + auto res = old_out_msg_queue_->scan_diff( + *out_msg_queue_, + [this](td::ConstBitPtr key, int key_len, Ref old_value, Ref new_value) { + old_value = old_out_msg_queue_->extract_value(std::move(old_value)); + new_value = out_msg_queue_->extract_value(std::move(new_value)); + if (new_value.not_null()) { + if (!block::gen::t_EnqueuedMsg.validate_csr(new_value)) { + return false; + } + if (!block::tlb::t_EnqueuedMsg.validate_csr(new_value)) { + return false; + } + } + if (old_value.not_null()) { + if (!block::gen::t_EnqueuedMsg.validate_csr(old_value)) { + return false; + } + if (!block::tlb::t_EnqueuedMsg.validate_csr(old_value)) { + return false; + } + } + return true; + }, + 3); + return res; +} + bool Collator::create_collated_data() { // 1. store the set of used shard block descriptions if (!used_shard_block_descr_.empty()) { @@ -3994,6 +4037,10 @@ bool Collator::create_collated_data() { // 3. Previous state proof (only shadchains) std::map> proofs; if (!is_masterchain()) { + if (!prepare_msg_queue_proof()) { + return fatal_error("cannot prepare message queue proof"); + } + state_usage_tree_->set_use_mark_for_is_loaded(false); Ref state_proof = vm::MerkleProof::generate(prev_state_root_, state_usage_tree_.get()); if (state_proof.is_null()) { diff --git a/validator/impl/validate-query.cpp b/validator/impl/validate-query.cpp index eeb2726c..aa342fc4 100644 --- a/validator/impl/validate-query.cpp +++ b/validator/impl/validate-query.cpp @@ -2678,7 +2678,7 @@ bool ValidateQuery::precheck_one_message_queue_update(td::ConstBitPtr out_msg_id } ton::LogicalTime enqueued_lt = old_value->prefetch_ulong(64); if (enqueued_lt >= start_lt_) { - return reject_query(PSTRING() << "new EnqueuedMsg with key "s + out_msg_id.to_hex(352) + " has enqueued_lt=" + return reject_query(PSTRING() << "old EnqueuedMsg with key "s + out_msg_id.to_hex(352) + " has enqueued_lt=" << enqueued_lt << " greater than or equal to this block's start_lt=" << start_lt_); } }