1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Collator nodes preload msg queues

This commit is contained in:
SpyCheese 2022-08-12 16:14:03 +03:00
parent 910398da92
commit 597fd8443d
4 changed files with 78 additions and 10 deletions

View file

@ -58,6 +58,40 @@ void CollatorNode::add_shard(ShardIdFull shard) {
shards_.push_back(shard); shards_.push_back(shard);
} }
void CollatorNode::new_masterchain_block_notification(td::Ref<MasterchainState> state) {
std::vector<BlockIdExt> top_blocks = {state->get_block_id()};
std::vector<ShardIdFull> next_shards;
if (collate_shard(ShardIdFull(masterchainId))) {
next_shards.push_back(ShardIdFull(masterchainId));
}
for (const auto& desc : state->get_shards()) {
top_blocks.push_back(desc->top_block_id());
ShardIdFull shard = desc->shard();
if (desc->before_split()) {
if (collate_shard(shard_child(shard, true))) {
next_shards.push_back(shard_child(shard, true));
}
if (collate_shard(shard_child(shard, false))) {
next_shards.push_back(shard_child(shard, false));
}
} else if (desc->before_merge()) {
if (is_left_child(shard) && collate_shard(shard_parent(shard))) {
next_shards.push_back(shard_parent(shard));
}
} else if (collate_shard(shard)) {
next_shards.push_back(shard);
}
}
for (const ShardIdFull& shard : next_shards) {
for (const BlockIdExt& neighbor : top_blocks) {
if (neighbor.shard_full() != shard && block::ShardConfig::is_neighbor(shard, neighbor.shard_full())) {
td::actor::send_closure(manager_, &ValidatorManager::wait_out_msg_queue_proof, neighbor, shard, 0,
td::Timestamp::in(10.0), [](td::Ref<OutMsgQueueProof>) {});
}
}
}
}
static td::BufferSlice serialize_error(td::Status error) { static td::BufferSlice serialize_error(td::Status error) {
return create_serialize_tl_object<ton_api::collatorNode_generateBlockError>(error.code(), error.message().c_str()); return create_serialize_tl_object<ton_api::collatorNode_generateBlockError>(error.code(), error.message().c_str());
} }
@ -71,14 +105,7 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data
if (!shard.is_valid_ext()) { if (!shard.is_valid_ext()) {
return td::Status::Error(PSTRING() << "invalid shard " << shard.to_str()); return td::Status::Error(PSTRING() << "invalid shard " << shard.to_str());
} }
bool found = false; if (!collate_shard(shard)) {
for (ShardIdFull our_shard : shards_) {
if (shard_is_ancestor(shard, our_shard)) {
found = true;
break;
}
}
if (!found) {
return td::Status::Error(PSTRING() << "this node doesn't collate shard " << shard.to_str()); return td::Status::Error(PSTRING() << "this node doesn't collate shard " << shard.to_str());
} }
if (f->prev_blocks_.size() != 1 && f->prev_blocks_.size() != 2) { if (f->prev_blocks_.size() != 1 && f->prev_blocks_.size() != 2) {
@ -141,6 +168,15 @@ void CollatorNode::receive_query_cont(adnl::AdnlNodeIdShort src, ShardIdFull sha
min_mc_state->get_validator_set(shard), manager_, td::Timestamp::in(10.0), std::move(P)); min_mc_state->get_validator_set(shard), manager_, td::Timestamp::in(10.0), std::move(P));
} }
bool CollatorNode::collate_shard(ShardIdFull shard) const {
for (ShardIdFull our_shard : shards_) {
if (shard_is_ancestor(shard, our_shard)) {
return true;
}
}
return false;
}
} // namespace validator } // namespace validator
} // namespace ton } // namespace ton

View file

@ -33,12 +33,16 @@ class CollatorNode : public td::actor::Actor {
void tear_down() override; void tear_down() override;
void add_shard(ShardIdFull shard); void add_shard(ShardIdFull shard);
void new_masterchain_block_notification(td::Ref<MasterchainState> state);
private: private:
void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise); void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise);
void receive_query_cont(adnl::AdnlNodeIdShort src, ShardIdFull shard, td::Ref<MasterchainState> min_mc_state, void receive_query_cont(adnl::AdnlNodeIdShort src, ShardIdFull shard, td::Ref<MasterchainState> min_mc_state,
std::vector<BlockIdExt> prev_blocks, Ed25519_PublicKey creator, std::vector<BlockIdExt> prev_blocks, Ed25519_PublicKey creator,
td::Promise<td::BufferSlice> promise); td::Promise<td::BufferSlice> promise);
bool collate_shard(ShardIdFull shard) const;
adnl::AdnlNodeIdShort local_id_; adnl::AdnlNodeIdShort local_id_;
td::actor::ActorId<ValidatorManager> manager_; td::actor::ActorId<ValidatorManager> manager_;
td::actor::ActorId<adnl::Adnl> adnl_; td::actor::ActorId<adnl::Adnl> adnl_;

View file

@ -617,6 +617,9 @@ void ValidatorManagerImpl::wait_out_msg_queue_proof(BlockIdExt block_id, ShardId
.release(); .release();
wait_out_msg_queue_proof_[key].actor_ = id; wait_out_msg_queue_proof_[key].actor_ = id;
it = wait_out_msg_queue_proof_.find(key); it = wait_out_msg_queue_proof_.find(key);
} else if (it->second.done_) {
promise.set_result(it->second.result_);
it->second.remove_at_ = td::Timestamp::in(30.0);
} }
it->second.waiting_.emplace_back(timeout, priority, std::move(promise)); it->second.waiting_.emplace_back(timeout, priority, std::move(promise));
@ -1079,13 +1082,16 @@ void ValidatorManagerImpl::finished_wait_msg_queue(BlockIdExt block_id, ShardIdF
it->second.actor_ = id; it->second.actor_ = id;
return; return;
} }
wait_out_msg_queue_proof_.erase(it);
} else { } else {
auto r = R.move_as_ok(); auto r = R.move_as_ok();
for (auto &X : it->second.waiting_) { for (auto &X : it->second.waiting_) {
X.promise.set_result(r); X.promise.set_result(r);
} }
it->second.done_ = true;
it->second.result_ = std::move(r);
it->second.remove_at_ = td::Timestamp::in(30.0);
} }
wait_out_msg_queue_proof_.erase(it);
} }
} }
@ -1773,6 +1779,9 @@ void ValidatorManagerImpl::new_masterchain_block() {
td::actor::send_closure(shard_client_, &ShardClient::new_masterchain_block_notification, td::actor::send_closure(shard_client_, &ShardClient::new_masterchain_block_notification,
last_masterchain_block_handle_, last_masterchain_state_); last_masterchain_block_handle_, last_masterchain_state_);
} }
for (auto &c : collator_nodes_) {
td::actor::send_closure(c.second, &CollatorNode::new_masterchain_block_notification, last_masterchain_state_);
}
if (last_masterchain_seqno_ % 1024 == 0) { if (last_masterchain_seqno_ % 1024 == 0) {
LOG(WARNING) << "applied masterchain block " << last_masterchain_block_id_; LOG(WARNING) << "applied masterchain block " << last_masterchain_block_id_;
@ -2436,6 +2445,18 @@ void ValidatorManagerImpl::alarm() {
} }
} }
alarm_timestamp().relax(check_shard_clients_); alarm_timestamp().relax(check_shard_clients_);
if (cleanup_wait_caches_at_.is_in_past()) {
auto it = wait_out_msg_queue_proof_.begin();
while (it != wait_out_msg_queue_proof_.end()) {
if (it->second.done_ && it->second.remove_at_.is_in_past()) {
it = wait_out_msg_queue_proof_.erase(it);
} else {
++it;
}
}
cleanup_wait_caches_at_ = td::Timestamp::in(10.0);
}
alarm_timestamp().relax(cleanup_wait_caches_at_);
} }
void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) { void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) {

View file

@ -181,10 +181,17 @@ class ValidatorManagerImpl : public ValidatorManager {
waiting_.resize(j); waiting_.resize(j);
} }
}; };
template <typename ActorT, typename ResType>
struct WaitListCaching : public WaitList<ActorT, ResType> {
bool done_ = false;
ResType result_;
td::Timestamp remove_at_;
};
std::map<BlockIdExt, WaitList<WaitBlockState, td::Ref<ShardState>>> wait_state_; std::map<BlockIdExt, WaitList<WaitBlockState, td::Ref<ShardState>>> wait_state_;
std::map<BlockIdExt, WaitList<WaitBlockData, td::Ref<BlockData>>> wait_block_data_; std::map<BlockIdExt, WaitList<WaitBlockData, td::Ref<BlockData>>> wait_block_data_;
std::map<std::pair<BlockIdExt, ShardIdFull>, WaitList<WaitOutMsgQueueProof, td::Ref<OutMsgQueueProof>>> std::map<std::pair<BlockIdExt, ShardIdFull>, WaitListCaching<WaitOutMsgQueueProof, td::Ref<OutMsgQueueProof>>>
wait_out_msg_queue_proof_; wait_out_msg_queue_proof_;
td::Timestamp cleanup_wait_caches_at_ = td::Timestamp::now();
struct WaitBlockHandle { struct WaitBlockHandle {
std::vector<td::Promise<BlockHandle>> waiting_; std::vector<td::Promise<BlockHandle>> waiting_;