1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Optimize importing out queues

This commit is contained in:
SpyCheese 2023-07-31 18:12:09 +03:00
parent 44ba040934
commit 5c02459fd8
22 changed files with 588 additions and 450 deletions

View file

@ -618,33 +618,15 @@ void ValidatorManagerImpl::wait_block_state_short(BlockIdExt block_id, td::uint3
get_block_handle(block_id, true, std::move(P));
}
void ValidatorManagerImpl::wait_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::uint32 priority,
td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) {
auto key = std::make_pair(block_id, dst_shard);
auto it = wait_out_msg_queue_proof_.find(key);
if (it == wait_out_msg_queue_proof_.end()) {
auto P = td::PromiseCreator::lambda(
[SelfId = actor_id(this), block_id, dst_shard](td::Result<td::Ref<OutMsgQueueProof>> R) {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard,
std::move(R));
});
auto id = td::actor::create_actor<WaitOutMsgQueueProof>(
"waitmsgqueue", block_id, dst_shard,
last_masterchain_state_->get_imported_msg_queue_limits(block_id.is_masterchain()),
need_monitor(block_id.shard_full()), priority, actor_id(this), td::Timestamp::at(timeout.at() + 10.0),
std::move(P))
.release();
wait_out_msg_queue_proof_[key].actor_ = id;
it = wait_out_msg_queue_proof_.find(key);
} else if (it->second.done_) {
promise.set_result(it->second.result_);
it->second.remove_at_ = td::Timestamp::in(30.0);
return;
void ValidatorManagerImpl::wait_neighbor_msg_queue_proofs(
ShardIdFull dst_shard, std::vector<BlockIdExt> blocks, td::Timestamp timeout,
td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise) {
if (out_msg_queue_importer_.empty()) {
out_msg_queue_importer_ = td::actor::create_actor<OutMsgQueueImporter>("outmsgqueueimporter", actor_id(this), opts_,
last_masterchain_state_);
}
it->second.waiting_.emplace_back(timeout, priority, std::move(promise));
auto X = it->second.get_timeout();
td::actor::send_closure(it->second.actor_, &WaitOutMsgQueueProof::update_timeout, X.first, X.second);
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::get_neighbor_msg_queue_proofs, dst_shard,
std::move(blocks), timeout, std::move(promise));
}
void ValidatorManagerImpl::wait_block_data(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
@ -1082,44 +1064,6 @@ void ValidatorManagerImpl::finished_wait_data(BlockHandle handle, td::Result<td:
}
}
void ValidatorManagerImpl::finished_wait_msg_queue(BlockIdExt block_id, ShardIdFull dst_shard,
td::Result<td::Ref<OutMsgQueueProof>> R) {
auto it = wait_out_msg_queue_proof_.find({block_id, dst_shard});
if (it != wait_out_msg_queue_proof_.end()) {
if (R.is_error()) {
auto S = R.move_as_error();
if (S.code() != ErrorCode::timeout) {
for (auto &X : it->second.waiting_) {
X.promise.set_error(S.clone());
}
} else {
auto X = it->second.get_timeout();
auto P = td::PromiseCreator::lambda(
[SelfId = actor_id(this), block_id, dst_shard](td::Result<td::Ref<OutMsgQueueProof>> R) {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard,
std::move(R));
});
auto id = td::actor::create_actor<WaitOutMsgQueueProof>(
"waitmsgqueue", block_id, dst_shard,
last_masterchain_state_->get_imported_msg_queue_limits(block_id.is_masterchain()),
need_monitor(block_id.shard_full()), X.second, actor_id(this), X.first, std::move(P))
.release();
it->second.actor_ = id;
return;
}
wait_out_msg_queue_proof_.erase(it);
} else {
auto r = R.move_as_ok();
for (auto &X : it->second.waiting_) {
X.promise.set_result(r);
}
it->second.done_ = true;
it->second.result_ = std::move(r);
it->second.remove_at_ = td::Timestamp::in(30.0);
}
}
}
void ValidatorManagerImpl::set_block_state(BlockHandle handle, td::Ref<ShardState> state,
td::Promise<td::Ref<ShardState>> promise) {
auto P = td::PromiseCreator::lambda(
@ -1513,11 +1457,11 @@ void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast) {
callback_->send_broadcast(std::move(broadcast));
}
void ValidatorManagerImpl::send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits,
td::uint32 priority,
td::Promise<td::Ref<OutMsgQueueProof>> promise) {
callback_->download_out_msg_queue_proof(id, dst_shard, limits, td::Timestamp::in(10.0), std::move(promise));
void ValidatorManagerImpl::send_get_out_msg_queue_proof_request(
ShardIdFull dst_shard, std::vector<BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
callback_->download_out_msg_queue_proof(dst_shard, std::move(blocks), limits, td::Timestamp::in(10.0),
std::move(promise));
}
void ValidatorManagerImpl::start_up() {
@ -1836,18 +1780,26 @@ void ValidatorManagerImpl::new_masterchain_block() {
td::actor::send_closure(shard_client_, &ShardClient::new_masterchain_block_notification,
last_masterchain_block_handle_, last_masterchain_state_);
}
if (is_collator()) {
std::set<ShardIdFull> collating_shards;
if (validating_masterchain()) {
collating_shards.emplace(masterchainId);
}
for (const auto &collator : collator_nodes_) {
collating_shards.insert(collator.second.shards.begin(), collator.second.shards.end());
}
if (!collating_shards.empty()) {
if (out_msg_queue_importer_.empty()) {
out_msg_queue_importer_ = td::actor::create_actor<OutMsgQueueImporter>("outmsgqueueimporter", actor_id(this),
opts_, last_masterchain_state_);
}
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::new_masterchain_block_notification,
last_masterchain_state_, std::move(collating_shards));
}
}
for (auto &c : collator_nodes_) {
td::actor::send_closure(c.second.actor, &CollatorNode::new_masterchain_block_notification, last_masterchain_state_);
}
if (opts_->validator_mode() == ValidatorManagerOptions::validator_lite_shards && validating_masterchain() &&
last_masterchain_state_->get_unix_time() > (td::uint32)td::Clocks::system() - 20) {
// Prepare neighboours' queues for collating masterchain
for (const auto &desc : last_masterchain_state_->get_shards()) {
wait_out_msg_queue_proof(desc->top_block_id(), ShardIdFull(masterchainId), 0, td::Timestamp::in(10.0),
[](td::Result<td::Ref<OutMsgQueueProof>>) {});
}
}
if (last_masterchain_seqno_ % 1024 == 0) {
LOG(WARNING) << "applied masterchain block " << last_masterchain_block_id_;
}
@ -2514,18 +2466,6 @@ void ValidatorManagerImpl::alarm() {
}
}
alarm_timestamp().relax(check_shard_clients_);
if (cleanup_wait_caches_at_.is_in_past()) {
auto it = wait_out_msg_queue_proof_.begin();
while (it != wait_out_msg_queue_proof_.end()) {
if (it->second.done_ && it->second.remove_at_.is_in_past()) {
it = wait_out_msg_queue_proof_.erase(it);
} else {
++it;
}
}
cleanup_wait_caches_at_ = td::Timestamp::in(10.0);
}
alarm_timestamp().relax(cleanup_wait_caches_at_);
}
void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) {
@ -2809,6 +2749,9 @@ void ValidatorManagerImpl::update_options(td::Ref<ValidatorManagerOptions> opts)
if (!serializer_.empty()) {
td::actor::send_closure(serializer_, &AsyncStateSerializer::update_options, opts);
}
if (!out_msg_queue_importer_.empty()) {
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::update_options, opts);
}
opts_ = std::move(opts);
}