diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 4eada9fd..d9def690 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -236,12 +236,12 @@ void Collator::start_up() { } if (is_masterchain() && !is_hardfork_) { // 5. load shard block info messages - LOG(DEBUG) << "sending get_shard_blocks() query to Manager"; + LOG(DEBUG) << "sending get_shard_blocks_for_collator() query to Manager"; ++pending; td::actor::send_closure_later( - manager, &ValidatorManager::get_shard_blocks, prev_blocks[0], + manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0], [self = get_self()](td::Result>> res) -> void { - LOG(DEBUG) << "got answer to get_shard_blocks() query"; + LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query"; td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks, std::move(res)); }); } @@ -1405,8 +1405,8 @@ bool Collator::import_new_shard_top_blocks() { prev_descr.clear(); descr.clear(); } else { - LOG(INFO) << "updated top shard block information with " << sh_bd->block_id().to_str() << " and " - << prev_bd->block_id().to_str(); + LOG(DEBUG) << "updated top shard block information with " << sh_bd->block_id().to_str() << " and " + << prev_bd->block_id().to_str(); CHECK(ures.move_as_ok()); store_shard_fees(std::move(prev_descr)); store_shard_fees(std::move(descr)); @@ -1448,7 +1448,7 @@ bool Collator::import_new_shard_top_blocks() { store_shard_fees(std::move(descr)); register_shard_block_creators(sh_bd->get_creator_list(chain_len)); shards_max_end_lt_ = std::max(shards_max_end_lt_, end_lt); - LOG(INFO) << "updated top shard block information with " << sh_bd->block_id().to_str(); + LOG(DEBUG) << "updated top shard block information with " << sh_bd->block_id().to_str(); CHECK(ures.move_as_ok()); ++tb_act; used_shard_block_descr_.emplace_back(sh_bd); @@ -1456,10 +1456,13 @@ bool Collator::import_new_shard_top_blocks() { if (tb_act) { shard_conf_adjusted_ = true; } - if (tb_act && verbosity >= 0) { // DEBUG - LOG(INFO) << "updated shard block configuration to "; - auto csr = shard_conf_->get_root_csr(); - block::gen::t_ShardHashes.print(std::cerr, csr.write()); + if (tb_act) { + LOG(INFO) << "updated shard block configuration: " << tb_act << " new top shard blocks"; + if (verbosity >= 1) { + LOG(INFO) << "updated shard block configuration to "; + auto csr = shard_conf_->get_root_csr(); + block::gen::t_ShardHashes.print(std::cerr, csr.write()); + } } block::gen::ShardFeeCreated::Record fc; if (!(tlb::csr_unpack(fees_import_dict_->get_root_extra(), diff --git a/validator/impl/out-msg-queue-proof.cpp b/validator/impl/out-msg-queue-proof.cpp index 2ffc52de..d8e02653 100644 --- a/validator/impl/out-msg-queue-proof.cpp +++ b/validator/impl/out-msg-queue-proof.cpp @@ -289,50 +289,6 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs( promise.set_value({}); return; } - if (dst_shard.is_masterchain() && blocks.size() != 1) { - // We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries - // {dst_shard, {block_1}}, ..., {dst_shard, {block_n}} - class Worker : public td::actor::Actor { - public: - Worker(size_t pending, td::Promise>> promise) - : pending_(pending), promise_(std::move(promise)) { - CHECK(pending_ > 0); - } - - void on_result(td::Ref res) { - result_[res->block_id_] = res; - if (--pending_ == 0) { - promise_.set_result(std::move(result_)); - stop(); - } - } - - void on_error(td::Status error) { - promise_.set_error(std::move(error)); - stop(); - } - - private: - size_t pending_; - td::Promise>> promise_; - std::map> result_; - }; - auto worker = td::actor::create_actor("queueworker", blocks.size(), std::move(promise)).release(); - for (const BlockIdExt& block : blocks) { - get_neighbor_msg_queue_proofs(dst_shard, {block}, timeout, - [=](td::Result>> R) { - if (R.is_error()) { - td::actor::send_closure(worker, &Worker::on_error, R.move_as_error()); - } else { - auto res = R.move_as_ok(); - CHECK(res.size() == 1); - td::actor::send_closure(worker, &Worker::on_result, res.begin()->second); - } - }); - } - return; - } - std::sort(blocks.begin(), blocks.end()); auto entry = cache_[{dst_shard, blocks}]; if (entry) { diff --git a/validator/impl/out-msg-queue-proof.hpp b/validator/impl/out-msg-queue-proof.hpp index 33f6c399..c115a276 100644 --- a/validator/impl/out-msg-queue-proof.hpp +++ b/validator/impl/out-msg-queue-proof.hpp @@ -72,7 +72,7 @@ class OutMsgQueueImporter : public td::actor::Actor { void finish_query(std::shared_ptr entry); bool check_timeout(std::shared_ptr entry); - constexpr static const double CACHE_TTL = 30.0; + constexpr static const double CACHE_TTL = 60.0; }; class BuildOutMsgQueueProof : public td::actor::Actor { diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index c09d4293..f63e453d 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -102,8 +102,8 @@ class ValidatorManager : public ValidatorManagerInterface { td::Promise> promise) = 0; virtual void get_external_messages(ShardIdFull shard, td::Promise>> promise) = 0; virtual void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) = 0; - virtual void get_shard_blocks(BlockIdExt masterchain_block_id, - td::Promise>> promise) = 0; + virtual void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id, + td::Promise>> promise) = 0; virtual void complete_external_messages(std::vector to_delay, std::vector to_delete) = 0; virtual void complete_ihr_messages(std::vector to_delay, diff --git a/validator/manager-disk.cpp b/validator/manager-disk.cpp index 5a314e2f..15954e47 100644 --- a/validator/manager-disk.cpp +++ b/validator/manager-disk.cpp @@ -516,8 +516,8 @@ void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise>> promise) { +void ValidatorManagerImpl::get_shard_blocks_for_collator( + BlockIdExt masterchain_block_id, td::Promise>> promise) { if (!last_masterchain_block_handle_) { promise.set_result(std::vector>{}); return; diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index bad3fc35..53c399ed 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -193,8 +193,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void get_external_messages(ShardIdFull shard, td::Promise>> promise) override; void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) override; - void get_shard_blocks(BlockIdExt masterchain_block_id, - td::Promise>> promise) override; + void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id, + td::Promise>> promise) override; void complete_external_messages(std::vector to_delay, std::vector to_delete) override; void complete_ihr_messages(std::vector to_delay, std::vector to_delete) override; diff --git a/validator/manager-hardfork.cpp b/validator/manager-hardfork.cpp index 80a64d25..d1be057f 100644 --- a/validator/manager-hardfork.cpp +++ b/validator/manager-hardfork.cpp @@ -366,8 +366,8 @@ void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise>> promise) { +void ValidatorManagerImpl::get_shard_blocks_for_collator( + BlockIdExt masterchain_block_id, td::Promise>> promise) { } void ValidatorManagerImpl::get_block_data_from_db(ConstBlockHandle handle, td::Promise> promise) { diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 33b6a1a7..be8e0824 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -233,7 +233,7 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void get_external_messages(ShardIdFull shard, td::Promise>> promise) override; void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) override; - void get_shard_blocks(BlockIdExt masterchain_block_id, + void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id, td::Promise>> promise) override; void complete_external_messages(std::vector to_delay, std::vector to_delete) override { diff --git a/validator/manager.cpp b/validator/manager.cpp index af5a7c37..5b5ad43e 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -441,7 +441,7 @@ void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc return; } auto it = shard_blocks_.find(ShardTopBlockDescriptionId{block_id.shard_full(), cc_seqno}); - if (it != shard_blocks_.end() && block_id.id.seqno <= it->second->block_id().id.seqno) { + if (it != shard_blocks_.end() && block_id.id.seqno <= it->second.latest_desc->block_id().id.seqno) { VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast"; return; } @@ -459,11 +459,11 @@ void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc void ValidatorManagerImpl::add_shard_block_description(td::Ref desc) { if (desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) { auto it = shard_blocks_.find(ShardTopBlockDescriptionId{desc->shard(), desc->catchain_seqno()}); - if (it != shard_blocks_.end() && desc->block_id().id.seqno <= it->second->block_id().id.seqno) { + if (it != shard_blocks_.end() && desc->block_id().id.seqno <= it->second.latest_desc->block_id().id.seqno) { VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast"; return; } - shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}] = desc; + shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}].latest_desc = desc; VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id(); if (need_monitor(desc->block_id().shard_full())) { auto P = td::PromiseCreator::lambda([](td::Result> R) { @@ -478,13 +478,53 @@ void ValidatorManagerImpl::add_shard_block_description(td::Refblock_id(), 0, td::Timestamp::in(60.0), std::move(P)); } - if (collating_masterchain() && desc->generated_at() > td::Clocks::system() - 20) { - wait_neighbor_msg_queue_proofs(ShardIdFull{masterchainId}, {desc->block_id()}, td::Timestamp::in(15.0), - [](td::Result>>) {}); + if (collating_masterchain()) { + preload_msg_queue_to_masterchain(desc); } } } +void ValidatorManagerImpl::preload_msg_queue_to_masterchain(td::Ref desc) { + auto id = ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}; + auto it = shard_blocks_.find(id); + if (!collating_masterchain() || it == shard_blocks_.end() || it->second.latest_desc->block_id() != desc->block_id()) { + return; + } + wait_neighbor_msg_queue_proofs( + ShardIdFull{masterchainId}, {desc->block_id()}, td::Timestamp::in(10.0), + [=, SelfId = actor_id(this), + retry_at = td::Timestamp::in(1.0)](td::Result>> R) { + if (R.is_error()) { + delay_action( + [=]() { td::actor::send_closure(SelfId, &ValidatorManagerImpl::preload_msg_queue_to_masterchain, desc); }, + retry_at); + return; + } + auto res = R.move_as_ok(); + auto &queue = res[desc->block_id()]; + CHECK(queue.not_null()); + td::actor::send_closure(SelfId, &ValidatorManagerImpl::loaded_msg_queue_to_masterchain, desc, std::move(queue)); + }); +} + +void ValidatorManagerImpl::loaded_msg_queue_to_masterchain(td::Ref desc, + td::Ref res) { + auto id = ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}; + auto it = shard_blocks_.find(id); + if (it == shard_blocks_.end()) { + return; + } + auto &info = it->second; + if (info.ready_desc.is_null() || info.ready_desc->block_id().seqno() < desc->block_id().seqno()) { + VLOG(VALIDATOR_DEBUG) << "loaded out msg queue to masterchain from " << desc->block_id(); + if (info.ready_desc.not_null()) { + cached_msg_queue_to_masterchain_.erase(info.ready_desc->block_id()); + } + info.ready_desc = desc; + cached_msg_queue_to_masterchain_[desc->block_id()] = std::move(res); + } +} + void ValidatorManagerImpl::add_ext_server_id(adnl::AdnlNodeIdShort id) { class Cb : public adnl::Adnl::Callback { private: @@ -629,6 +669,56 @@ void ValidatorManagerImpl::wait_neighbor_msg_queue_proofs( out_msg_queue_importer_ = td::actor::create_actor("outmsgqueueimporter", actor_id(this), opts_, last_masterchain_state_); } + if (dst_shard.is_masterchain()) { + // We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries + // {dst_shard, {block_1}}, ..., {dst_shard, {block_n}} + // Also, use cache + class Worker : public td::actor::Actor { + public: + Worker(size_t pending, td::Promise>> promise) + : pending_(pending), promise_(std::move(promise)) { + CHECK(pending_ > 0); + } + + void on_result(td::Ref res) { + result_[res->block_id_] = res; + if (--pending_ == 0) { + promise_.set_result(std::move(result_)); + stop(); + } + } + + void on_error(td::Status error) { + promise_.set_error(std::move(error)); + stop(); + } + + private: + size_t pending_; + td::Promise>> promise_; + std::map> result_; + }; + auto worker = td::actor::create_actor("queueworker", blocks.size(), std::move(promise)).release(); + for (const BlockIdExt &block : blocks) { + auto it = cached_msg_queue_to_masterchain_.find(block); + if (it != cached_msg_queue_to_masterchain_.end()) { + td::actor::send_closure(worker, &Worker::on_result, it->second); + continue; + } + td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::get_neighbor_msg_queue_proofs, dst_shard, + std::vector{1, block}, timeout, + [=](td::Result>> R) { + if (R.is_error()) { + td::actor::send_closure(worker, &Worker::on_error, R.move_as_error()); + } else { + auto res = R.move_as_ok(); + CHECK(res.size() == 1); + td::actor::send_closure(worker, &Worker::on_result, res.begin()->second); + } + }); + } + return; + } td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::get_neighbor_msg_queue_proofs, dst_shard, std::move(blocks), timeout, std::move(promise)); } @@ -843,11 +933,14 @@ void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise>> promise) { +void ValidatorManagerImpl::get_shard_blocks_for_collator( + BlockIdExt masterchain_block_id, td::Promise>> promise) { std::vector> v; for (auto &b : shard_blocks_) { - v.push_back(b.second); + if (b.second.ready_desc.is_null()) { + continue; + } + v.push_back(b.second.ready_desc); } promise.set_value(std::move(v)); } @@ -1786,7 +1879,7 @@ void ValidatorManagerImpl::new_masterchain_block() { } if (is_collator()) { std::set collating_shards; - if (validating_masterchain()) { + if (collating_masterchain()) { collating_shards.emplace(masterchainId); } for (const auto &collator : collator_nodes_) { @@ -2080,12 +2173,19 @@ void ValidatorManagerImpl::update_shard_blocks() { auto it = shard_blocks_.begin(); while (it != shard_blocks_.end()) { auto &B = it->second; - if (!B->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) { - auto it2 = it++; - shard_blocks_.erase(it2); - } else { - ++it; + if (!B.latest_desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) { + if (B.ready_desc.not_null()) { + cached_msg_queue_to_masterchain_.erase(B.ready_desc->block_id()); + } + it = shard_blocks_.erase(it); + continue; } + if (B.ready_desc.not_null() && + !B.ready_desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) { + cached_msg_queue_to_masterchain_.erase(B.ready_desc->block_id()); + B.ready_desc = {}; + } + ++it; } } diff --git a/validator/manager.hpp b/validator/manager.hpp index f8ca08f3..45a0c1e6 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -220,7 +220,15 @@ class ValidatorManagerImpl : public ValidatorManager { } }; // DATA FOR COLLATOR - std::map> shard_blocks_; + // Shard block will not be used until queue is ready (to avoid too long masterchain collation) + // latest_desc - latest known block + // ready_desc - block with ready msg queue (may be null) + struct ShardTopBlock { + td::Ref latest_desc; + td::Ref ready_desc; + }; + std::map shard_blocks_; + std::map> cached_msg_queue_to_masterchain_; std::map, std::unique_ptr>> ext_messages_; std::map, std::map>> ext_addr_messages_; std::map> ext_messages_hashes_; @@ -410,8 +418,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void get_external_messages(ShardIdFull shard, td::Promise>> promise) override; void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) override; - void get_shard_blocks(BlockIdExt masterchain_block_id, - td::Promise>> promise) override; + void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id, + td::Promise>> promise) override; void complete_external_messages(std::vector to_delay, std::vector to_delete) override; void complete_ihr_messages(std::vector to_delay, std::vector to_delete) override; @@ -493,6 +501,8 @@ class ValidatorManagerImpl : public ValidatorManager { } void add_shard_block_description(td::Ref desc); + void preload_msg_queue_to_masterchain(td::Ref desc); + void loaded_msg_queue_to_masterchain(td::Ref desc, td::Ref res); void register_block_handle(BlockHandle handle);