diff --git a/crypto/block/block.h b/crypto/block/block.h index 6a94f0aa..f2c89286 100644 --- a/crypto/block/block.h +++ b/crypto/block/block.h @@ -218,9 +218,12 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC struct ImportedMsgQueueLimits { // Default values - td::uint32 max_bytes = 1 << 19; - td::uint32 max_msgs = 500; + td::uint32 max_bytes = 1 << 16; + td::uint32 max_msgs = 30; bool deserialize(vm::CellSlice& cs); + ImportedMsgQueueLimits operator*(td::uint32 x) const { + return {max_bytes * x, max_msgs * x}; + } }; struct ParamLimits { diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 5fcd62c6..3d9d1923 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -960,26 +960,26 @@ void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std: create_tl_shard_id(dst_shard), std::move(blocks_tl), create_tl_object(limits.max_bytes, limits.max_msgs)); - auto P = td::PromiseCreator::lambda([=, promise = create_neighbour_promise(b, std::move(promise), true), - blocks = std::move(blocks)](td::Result R) mutable { - if (R.is_error()) { - promise.set_result(R.move_as_error()); - return; - } - TRY_RESULT_PROMISE(promise, f, fetch_tl_object(R.move_as_ok(), true)); - ton_api::downcast_call( - *f, td::overloaded( - [&](ton_api::tonNode_outMsgQueueProofEmpty &x) { - promise.set_error(td::Status::Error("node doesn't have this block")); - }, - [&](ton_api::tonNode_outMsgQueueProof &x) { - delay_action( - [=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable { - promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x)); - }, - td::Timestamp::now()); - })); - }); + auto P = td::PromiseCreator::lambda( + [=, promise = std::move(promise), blocks = std::move(blocks)](td::Result R) mutable { + if (R.is_error()) { + promise.set_result(R.move_as_error()); + return; + } + TRY_RESULT_PROMISE(promise, f, fetch_tl_object(R.move_as_ok(), true)); + ton_api::downcast_call( + *f, td::overloaded( + [&](ton_api::tonNode_outMsgQueueProofEmpty &x) { + promise.set_error(td::Status::Error("node doesn't have this block")); + }, + [&](ton_api::tonNode_outMsgQueueProof &x) { + delay_action( + [=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable { + promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x)); + }, + td::Timestamp::now()); + })); + }); td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_, "get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_); } diff --git a/validator/impl/out-msg-queue-proof.cpp b/validator/impl/out-msg-queue-proof.cpp index d0c3be95..3512bf0d 100644 --- a/validator/impl/out-msg-queue-proof.cpp +++ b/validator/impl/out-msg-queue-proof.cpp @@ -285,6 +285,54 @@ void OutMsgQueueImporter::new_masterchain_block_notification(td::Ref blocks, td::Timestamp timeout, td::Promise>> promise) { + if (blocks.empty()) { + promise.set_value({}); + return; + } + if (dst_shard.is_masterchain() && blocks.size() != 1) { + // We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries + // {dst_shard, {block_1}}, ..., {dst_shard, {block_n}} + class Worker : public td::actor::Actor { + public: + Worker(size_t pending, td::Promise>> promise) + : pending_(pending), promise_(std::move(promise)) { + CHECK(pending_ > 0); + } + + void on_result(td::Ref res) { + result_[res->block_id_] = res; + if (--pending_ == 0) { + promise_.set_result(std::move(result_)); + stop(); + } + } + + void on_error(td::Status error) { + promise_.set_error(std::move(error)); + stop(); + } + + private: + size_t pending_; + td::Promise>> promise_; + std::map> result_; + }; + auto worker = td::actor::create_actor("queueworker", blocks.size(), std::move(promise)).release(); + for (const BlockIdExt& block : blocks) { + get_neighbor_msg_queue_proofs(dst_shard, {block}, timeout, + [=](td::Result>> R) { + if (R.is_error()) { + td::actor::send_closure(worker, &Worker::on_error, R.move_as_error()); + } else { + auto res = R.move_as_ok(); + CHECK(res.size() == 1); + td::actor::send_closure(worker, &Worker::on_result, res.begin()->second); + } + }); + } + return; + } + std::sort(blocks.begin(), blocks.end()); auto entry = cache_[{dst_shard, blocks}]; if (entry) { @@ -326,7 +374,8 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs( ++entry->pending; for (size_t i = 0; i < p.second.size(); i += 16) { size_t j = std::min(i + 16, p.second.size()); - get_proof_import(entry, std::vector(p.second.begin() + i, p.second.begin() + j), limits); + get_proof_import(entry, std::vector(p.second.begin() + i, p.second.begin() + j), + limits * (td::uint32)(j - i)); } } if (entry->pending == 0) { @@ -341,7 +390,7 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr entry, Blo td::actor::send_closure( manager_, &ValidatorManager::wait_block_state_short, block, 0, entry->timeout, [=, SelfId = actor_id(this), manager = manager_, timeout = entry->timeout, - retry_after = td::Timestamp::in(0.5)](td::Result> R) mutable { + retry_after = td::Timestamp::in(0.1)](td::Result> R) mutable { if (R.is_error()) { LOG(DEBUG) << "Failed to get block state for " << block.to_str() << ": " << R.move_as_error(); delay_action([=]() { td::actor::send_closure(SelfId, &OutMsgQueueImporter::get_proof_local, entry, block); }, @@ -380,7 +429,7 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr entry, st } td::actor::send_closure( manager_, &ValidatorManager::send_get_out_msg_queue_proof_request, entry->dst_shard, blocks, limits, - [=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.5), + [=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.1), dst_shard = entry->dst_shard](td::Result>> R) { if (R.is_error()) { LOG(DEBUG) << "Failed to get out msg queue for " << dst_shard.to_str() << ": " << R.move_as_error(); diff --git a/validator/manager.cpp b/validator/manager.cpp index 955334db..af5a7c37 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -478,6 +478,10 @@ void ValidatorManagerImpl::add_shard_block_description(td::Refblock_id(), 0, td::Timestamp::in(60.0), std::move(P)); } + if (collating_masterchain() && desc->generated_at() > td::Clocks::system() - 20) { + wait_neighbor_msg_queue_proofs(ShardIdFull{masterchainId}, {desc->block_id()}, td::Timestamp::in(15.0), + [](td::Result>>) {}); + } } } @@ -2525,6 +2529,16 @@ bool ValidatorManagerImpl::validating_masterchain() { .is_zero(); } +bool ValidatorManagerImpl::collating_masterchain() { + if (masterchain_collators_) { + return true; + } + if (opts_->validator_mode() == ValidatorManagerOptions::validator_lite_all) { + return false; + } + return validating_masterchain(); +} + PublicKeyHash ValidatorManagerImpl::get_validator(ShardIdFull shard, td::Ref val_set) { for (auto &key : temp_keys_) { if (val_set->is_validator(key.bits256_value())) { @@ -2726,7 +2740,12 @@ void ValidatorManagerImpl::add_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh it = collator_nodes_.emplace(id, Collator()).first; it->second.actor = td::actor::create_actor("collatornode", id, actor_id(this), adnl_, rldp_); } - it->second.shards.insert(shard); + if (!it->second.shards.insert(shard).second) { + return; + } + if (shard.is_masterchain()) { + ++masterchain_collators_; + } td::actor::send_closure(it->second.actor, &CollatorNode::add_shard, shard); } @@ -2735,10 +2754,16 @@ void ValidatorManagerImpl::del_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh if (it == collator_nodes_.end()) { return; } - td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard); - it->second.shards.erase(shard); + if (!it->second.shards.erase(shard)) { + return; + } + if (shard.is_masterchain()) { + --masterchain_collators_; + } if (it->second.shards.empty()) { collator_nodes_.erase(it); + } else { + td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard); } } diff --git a/validator/manager.hpp b/validator/manager.hpp index ce5254ac..f8ca08f3 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -506,6 +506,7 @@ class ValidatorManagerImpl : public ValidatorManager { bool is_validator(); bool is_collator(); bool validating_masterchain(); + bool collating_masterchain(); PublicKeyHash get_validator(ShardIdFull shard, td::Ref val_set); ValidatorManagerImpl(td::Ref opts, std::string db_root, @@ -650,6 +651,7 @@ class ValidatorManagerImpl : public ValidatorManager { std::set shards; }; std::map collator_nodes_; + size_t masterchain_collators_ = 0; std::set extra_active_shards_; std::map last_validated_blocks_;