1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Get neighbors' msg queues from other nodes

This commit is contained in:
SpyCheese 2022-08-01 17:48:22 +03:00
parent 1869a25062
commit e43e235143
22 changed files with 658 additions and 68 deletions

View file

@ -453,24 +453,18 @@ void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDesc
}
shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}] = desc;
VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id();
if (opts_->need_monitor(desc->block_id().shard_full()) && last_masterchain_block_handle_ &&
last_masterchain_seqno_ > 0 && desc->generated_at() < last_masterchain_block_handle_->unix_time() + 60) {
delay_action(
[SelfId = actor_id(this), desc]() {
auto P = td::PromiseCreator::lambda([](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
auto S = R.move_as_error();
if (S.code() != ErrorCode::timeout && S.code() != ErrorCode::notready) {
VLOG(VALIDATOR_NOTICE) << "failed to get shard state: " << S;
} else {
VLOG(VALIDATOR_DEBUG) << "failed to get shard state: " << S;
}
}
});
td::actor::send_closure(SelfId, &ValidatorManager::wait_block_state_short, desc->block_id(), 0,
td::Timestamp::in(60.0), std::move(P));
},
td::Timestamp::in(1.0));
if (opts_->need_monitor(desc->block_id().shard_full())) {
auto P = td::PromiseCreator::lambda([](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
auto S = R.move_as_error();
if (S.code() != ErrorCode::timeout && S.code() != ErrorCode::notready) {
VLOG(VALIDATOR_NOTICE) << "failed to get shard state: " << S;
} else {
VLOG(VALIDATOR_DEBUG) << "failed to get shard state: " << S;
}
}
});
wait_block_state_short(desc->block_id(), 0, td::Timestamp::in(60.0), std::move(P));
}
}
}
@ -606,6 +600,30 @@ void ValidatorManagerImpl::wait_block_state_short(BlockIdExt block_id, td::uint3
get_block_handle(block_id, true, std::move(P));
}
void ValidatorManagerImpl::wait_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::uint32 priority,
td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) {
auto key = std::make_pair(block_id, dst_shard);
auto it = wait_out_msg_queue_proof_.find(key);
if (it == wait_out_msg_queue_proof_.end()) {
auto P = td::PromiseCreator::lambda(
[SelfId = actor_id(this), block_id, dst_shard](td::Result<td::Ref<OutMsgQueueProof>> R) {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard,
std::move(R));
});
auto id = td::actor::create_actor<WaitOutMsgQueueProof>("waitmsgqueue", block_id, dst_shard,
opts_->need_monitor(block_id.shard_full()), priority,
actor_id(this), td::Timestamp::in(10.0), std::move(P))
.release();
wait_out_msg_queue_proof_[key].actor_ = id;
it = wait_out_msg_queue_proof_.find(key);
}
it->second.waiting_.emplace_back(timeout, priority, std::move(promise));
auto X = it->second.get_timeout();
td::actor::send_closure(it->second.actor_, &WaitOutMsgQueueProof::update_timeout, X.first, X.second);
}
void ValidatorManagerImpl::wait_block_data(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<BlockData>> promise) {
auto it = wait_block_data_.find(handle->id());
@ -1037,6 +1055,40 @@ void ValidatorManagerImpl::finished_wait_data(BlockHandle handle, td::Result<td:
}
}
void ValidatorManagerImpl::finished_wait_msg_queue(BlockIdExt block_id, ShardIdFull dst_shard,
td::Result<td::Ref<OutMsgQueueProof>> R) {
auto it = wait_out_msg_queue_proof_.find({block_id, dst_shard});
if (it != wait_out_msg_queue_proof_.end()) {
if (R.is_error()) {
auto S = R.move_as_error();
if (S.code() != ErrorCode::timeout) {
for (auto &X : it->second.waiting_) {
X.promise.set_error(S.clone());
}
} else {
auto X = it->second.get_timeout();
auto P = td::PromiseCreator::lambda(
[SelfId = actor_id(this), block_id, dst_shard](td::Result<td::Ref<OutMsgQueueProof>> R) {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard,
std::move(R));
});
auto id = td::actor::create_actor<WaitOutMsgQueueProof>("waitmsgqueue", block_id, dst_shard,
opts_->need_monitor(block_id.shard_full()), X.second,
actor_id(this), X.first, std::move(P))
.release();
it->second.actor_ = id;
return;
}
} else {
auto r = R.move_as_ok();
for (auto &X : it->second.waiting_) {
X.promise.set_result(r);
}
}
wait_out_msg_queue_proof_.erase(it);
}
}
void ValidatorManagerImpl::set_block_state(BlockHandle handle, td::Ref<ShardState> state,
td::Promise<td::Ref<ShardState>> promise) {
auto P = td::PromiseCreator::lambda(
@ -1410,6 +1462,12 @@ void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast) {
callback_->send_broadcast(std::move(broadcast));
}
void ValidatorManagerImpl::send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard,
td::uint32 priority,
td::Promise<td::Ref<OutMsgQueueProof>> promise) {
callback_->download_out_msg_queue_proof(id, dst_shard, td::Timestamp::in(10.0), std::move(promise));
}
void ValidatorManagerImpl::start_up() {
db_ = create_db_actor(actor_id(this), db_root_);
lite_server_cache_ = create_liteserver_cache_actor(actor_id(this), db_root_);