diff --git a/validator/collator-node.cpp b/validator/collator-node.cpp index 84ef9eaa..c11b01c8 100644 --- a/validator/collator-node.cpp +++ b/validator/collator-node.cpp @@ -58,6 +58,40 @@ void CollatorNode::add_shard(ShardIdFull shard) { shards_.push_back(shard); } +void CollatorNode::new_masterchain_block_notification(td::Ref state) { + std::vector top_blocks = {state->get_block_id()}; + std::vector next_shards; + if (collate_shard(ShardIdFull(masterchainId))) { + next_shards.push_back(ShardIdFull(masterchainId)); + } + for (const auto& desc : state->get_shards()) { + top_blocks.push_back(desc->top_block_id()); + ShardIdFull shard = desc->shard(); + if (desc->before_split()) { + if (collate_shard(shard_child(shard, true))) { + next_shards.push_back(shard_child(shard, true)); + } + if (collate_shard(shard_child(shard, false))) { + next_shards.push_back(shard_child(shard, false)); + } + } else if (desc->before_merge()) { + if (is_left_child(shard) && collate_shard(shard_parent(shard))) { + next_shards.push_back(shard_parent(shard)); + } + } else if (collate_shard(shard)) { + next_shards.push_back(shard); + } + } + for (const ShardIdFull& shard : next_shards) { + for (const BlockIdExt& neighbor : top_blocks) { + if (neighbor.shard_full() != shard && block::ShardConfig::is_neighbor(shard, neighbor.shard_full())) { + td::actor::send_closure(manager_, &ValidatorManager::wait_out_msg_queue_proof, neighbor, shard, 0, + td::Timestamp::in(10.0), [](td::Ref) {}); + } + } + } +} + static td::BufferSlice serialize_error(td::Status error) { return create_serialize_tl_object(error.code(), error.message().c_str()); } @@ -71,14 +105,7 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data if (!shard.is_valid_ext()) { return td::Status::Error(PSTRING() << "invalid shard " << shard.to_str()); } - bool found = false; - for (ShardIdFull our_shard : shards_) { - if (shard_is_ancestor(shard, our_shard)) { - found = true; - break; - } - } - if (!found) { + if (!collate_shard(shard)) { return td::Status::Error(PSTRING() << "this node doesn't collate shard " << shard.to_str()); } if (f->prev_blocks_.size() != 1 && f->prev_blocks_.size() != 2) { @@ -141,6 +168,15 @@ void CollatorNode::receive_query_cont(adnl::AdnlNodeIdShort src, ShardIdFull sha min_mc_state->get_validator_set(shard), manager_, td::Timestamp::in(10.0), std::move(P)); } +bool CollatorNode::collate_shard(ShardIdFull shard) const { + for (ShardIdFull our_shard : shards_) { + if (shard_is_ancestor(shard, our_shard)) { + return true; + } + } + return false; +} + } // namespace validator } // namespace ton diff --git a/validator/collator-node.hpp b/validator/collator-node.hpp index 854e98ce..b0a2ad90 100644 --- a/validator/collator-node.hpp +++ b/validator/collator-node.hpp @@ -33,12 +33,16 @@ class CollatorNode : public td::actor::Actor { void tear_down() override; void add_shard(ShardIdFull shard); + void new_masterchain_block_notification(td::Ref state); + private: void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data, td::Promise promise); void receive_query_cont(adnl::AdnlNodeIdShort src, ShardIdFull shard, td::Ref min_mc_state, std::vector prev_blocks, Ed25519_PublicKey creator, td::Promise promise); + bool collate_shard(ShardIdFull shard) const; + adnl::AdnlNodeIdShort local_id_; td::actor::ActorId manager_; td::actor::ActorId adnl_; diff --git a/validator/manager.cpp b/validator/manager.cpp index 7e7e1e84..6323c4f2 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -617,6 +617,9 @@ void ValidatorManagerImpl::wait_out_msg_queue_proof(BlockIdExt block_id, ShardId .release(); wait_out_msg_queue_proof_[key].actor_ = id; it = wait_out_msg_queue_proof_.find(key); + } else if (it->second.done_) { + promise.set_result(it->second.result_); + it->second.remove_at_ = td::Timestamp::in(30.0); } it->second.waiting_.emplace_back(timeout, priority, std::move(promise)); @@ -1079,13 +1082,16 @@ void ValidatorManagerImpl::finished_wait_msg_queue(BlockIdExt block_id, ShardIdF it->second.actor_ = id; return; } + wait_out_msg_queue_proof_.erase(it); } else { auto r = R.move_as_ok(); for (auto &X : it->second.waiting_) { X.promise.set_result(r); } + it->second.done_ = true; + it->second.result_ = std::move(r); + it->second.remove_at_ = td::Timestamp::in(30.0); } - wait_out_msg_queue_proof_.erase(it); } } @@ -1773,6 +1779,9 @@ void ValidatorManagerImpl::new_masterchain_block() { td::actor::send_closure(shard_client_, &ShardClient::new_masterchain_block_notification, last_masterchain_block_handle_, last_masterchain_state_); } + for (auto &c : collator_nodes_) { + td::actor::send_closure(c.second, &CollatorNode::new_masterchain_block_notification, last_masterchain_state_); + } if (last_masterchain_seqno_ % 1024 == 0) { LOG(WARNING) << "applied masterchain block " << last_masterchain_block_id_; @@ -2436,6 +2445,18 @@ void ValidatorManagerImpl::alarm() { } } alarm_timestamp().relax(check_shard_clients_); + if (cleanup_wait_caches_at_.is_in_past()) { + auto it = wait_out_msg_queue_proof_.begin(); + while (it != wait_out_msg_queue_proof_.end()) { + if (it->second.done_ && it->second.remove_at_.is_in_past()) { + it = wait_out_msg_queue_proof_.erase(it); + } else { + ++it; + } + } + cleanup_wait_caches_at_ = td::Timestamp::in(10.0); + } + alarm_timestamp().relax(cleanup_wait_caches_at_); } void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) { diff --git a/validator/manager.hpp b/validator/manager.hpp index c65914aa..15598f4c 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -181,10 +181,17 @@ class ValidatorManagerImpl : public ValidatorManager { waiting_.resize(j); } }; + template + struct WaitListCaching : public WaitList { + bool done_ = false; + ResType result_; + td::Timestamp remove_at_; + }; std::map>> wait_state_; std::map>> wait_block_data_; - std::map, WaitList>> + std::map, WaitListCaching>> wait_out_msg_queue_proof_; + td::Timestamp cleanup_wait_caches_at_ = td::Timestamp::now(); struct WaitBlockHandle { std::vector> waiting_;