mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Optimize masterchain collation
Use only shard blocks with ready msg queues
This commit is contained in:
parent
47c60d8bf0
commit
1e3a12259b
10 changed files with 151 additions and 82 deletions
|
@ -236,12 +236,12 @@ void Collator::start_up() {
|
||||||
}
|
}
|
||||||
if (is_masterchain() && !is_hardfork_) {
|
if (is_masterchain() && !is_hardfork_) {
|
||||||
// 5. load shard block info messages
|
// 5. load shard block info messages
|
||||||
LOG(DEBUG) << "sending get_shard_blocks() query to Manager";
|
LOG(DEBUG) << "sending get_shard_blocks_for_collator() query to Manager";
|
||||||
++pending;
|
++pending;
|
||||||
td::actor::send_closure_later(
|
td::actor::send_closure_later(
|
||||||
manager, &ValidatorManager::get_shard_blocks, prev_blocks[0],
|
manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0],
|
||||||
[self = get_self()](td::Result<std::vector<Ref<ShardTopBlockDescription>>> res) -> void {
|
[self = get_self()](td::Result<std::vector<Ref<ShardTopBlockDescription>>> res) -> void {
|
||||||
LOG(DEBUG) << "got answer to get_shard_blocks() query";
|
LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query";
|
||||||
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks, std::move(res));
|
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks, std::move(res));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1405,8 +1405,8 @@ bool Collator::import_new_shard_top_blocks() {
|
||||||
prev_descr.clear();
|
prev_descr.clear();
|
||||||
descr.clear();
|
descr.clear();
|
||||||
} else {
|
} else {
|
||||||
LOG(INFO) << "updated top shard block information with " << sh_bd->block_id().to_str() << " and "
|
LOG(DEBUG) << "updated top shard block information with " << sh_bd->block_id().to_str() << " and "
|
||||||
<< prev_bd->block_id().to_str();
|
<< prev_bd->block_id().to_str();
|
||||||
CHECK(ures.move_as_ok());
|
CHECK(ures.move_as_ok());
|
||||||
store_shard_fees(std::move(prev_descr));
|
store_shard_fees(std::move(prev_descr));
|
||||||
store_shard_fees(std::move(descr));
|
store_shard_fees(std::move(descr));
|
||||||
|
@ -1448,7 +1448,7 @@ bool Collator::import_new_shard_top_blocks() {
|
||||||
store_shard_fees(std::move(descr));
|
store_shard_fees(std::move(descr));
|
||||||
register_shard_block_creators(sh_bd->get_creator_list(chain_len));
|
register_shard_block_creators(sh_bd->get_creator_list(chain_len));
|
||||||
shards_max_end_lt_ = std::max(shards_max_end_lt_, end_lt);
|
shards_max_end_lt_ = std::max(shards_max_end_lt_, end_lt);
|
||||||
LOG(INFO) << "updated top shard block information with " << sh_bd->block_id().to_str();
|
LOG(DEBUG) << "updated top shard block information with " << sh_bd->block_id().to_str();
|
||||||
CHECK(ures.move_as_ok());
|
CHECK(ures.move_as_ok());
|
||||||
++tb_act;
|
++tb_act;
|
||||||
used_shard_block_descr_.emplace_back(sh_bd);
|
used_shard_block_descr_.emplace_back(sh_bd);
|
||||||
|
@ -1456,10 +1456,13 @@ bool Collator::import_new_shard_top_blocks() {
|
||||||
if (tb_act) {
|
if (tb_act) {
|
||||||
shard_conf_adjusted_ = true;
|
shard_conf_adjusted_ = true;
|
||||||
}
|
}
|
||||||
if (tb_act && verbosity >= 0) { // DEBUG
|
if (tb_act) {
|
||||||
LOG(INFO) << "updated shard block configuration to ";
|
LOG(INFO) << "updated shard block configuration: " << tb_act << " new top shard blocks";
|
||||||
auto csr = shard_conf_->get_root_csr();
|
if (verbosity >= 1) {
|
||||||
block::gen::t_ShardHashes.print(std::cerr, csr.write());
|
LOG(INFO) << "updated shard block configuration to ";
|
||||||
|
auto csr = shard_conf_->get_root_csr();
|
||||||
|
block::gen::t_ShardHashes.print(std::cerr, csr.write());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
block::gen::ShardFeeCreated::Record fc;
|
block::gen::ShardFeeCreated::Record fc;
|
||||||
if (!(tlb::csr_unpack(fees_import_dict_->get_root_extra(),
|
if (!(tlb::csr_unpack(fees_import_dict_->get_root_extra(),
|
||||||
|
|
|
@ -289,50 +289,6 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
|
||||||
promise.set_value({});
|
promise.set_value({});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (dst_shard.is_masterchain() && blocks.size() != 1) {
|
|
||||||
// We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries
|
|
||||||
// {dst_shard, {block_1}}, ..., {dst_shard, {block_n}}
|
|
||||||
class Worker : public td::actor::Actor {
|
|
||||||
public:
|
|
||||||
Worker(size_t pending, td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise)
|
|
||||||
: pending_(pending), promise_(std::move(promise)) {
|
|
||||||
CHECK(pending_ > 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
void on_result(td::Ref<OutMsgQueueProof> res) {
|
|
||||||
result_[res->block_id_] = res;
|
|
||||||
if (--pending_ == 0) {
|
|
||||||
promise_.set_result(std::move(result_));
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void on_error(td::Status error) {
|
|
||||||
promise_.set_error(std::move(error));
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
size_t pending_;
|
|
||||||
td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise_;
|
|
||||||
std::map<BlockIdExt, td::Ref<OutMsgQueueProof>> result_;
|
|
||||||
};
|
|
||||||
auto worker = td::actor::create_actor<Worker>("queueworker", blocks.size(), std::move(promise)).release();
|
|
||||||
for (const BlockIdExt& block : blocks) {
|
|
||||||
get_neighbor_msg_queue_proofs(dst_shard, {block}, timeout,
|
|
||||||
[=](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> R) {
|
|
||||||
if (R.is_error()) {
|
|
||||||
td::actor::send_closure(worker, &Worker::on_error, R.move_as_error());
|
|
||||||
} else {
|
|
||||||
auto res = R.move_as_ok();
|
|
||||||
CHECK(res.size() == 1);
|
|
||||||
td::actor::send_closure(worker, &Worker::on_result, res.begin()->second);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::sort(blocks.begin(), blocks.end());
|
std::sort(blocks.begin(), blocks.end());
|
||||||
auto entry = cache_[{dst_shard, blocks}];
|
auto entry = cache_[{dst_shard, blocks}];
|
||||||
if (entry) {
|
if (entry) {
|
||||||
|
|
|
@ -72,7 +72,7 @@ class OutMsgQueueImporter : public td::actor::Actor {
|
||||||
void finish_query(std::shared_ptr<CacheEntry> entry);
|
void finish_query(std::shared_ptr<CacheEntry> entry);
|
||||||
bool check_timeout(std::shared_ptr<CacheEntry> entry);
|
bool check_timeout(std::shared_ptr<CacheEntry> entry);
|
||||||
|
|
||||||
constexpr static const double CACHE_TTL = 30.0;
|
constexpr static const double CACHE_TTL = 60.0;
|
||||||
};
|
};
|
||||||
|
|
||||||
class BuildOutMsgQueueProof : public td::actor::Actor {
|
class BuildOutMsgQueueProof : public td::actor::Actor {
|
||||||
|
|
|
@ -102,8 +102,8 @@ class ValidatorManager : public ValidatorManagerInterface {
|
||||||
td::Promise<td::Ref<MessageQueue>> promise) = 0;
|
td::Promise<td::Ref<MessageQueue>> promise) = 0;
|
||||||
virtual void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) = 0;
|
virtual void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) = 0;
|
||||||
virtual void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) = 0;
|
virtual void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) = 0;
|
||||||
virtual void get_shard_blocks(BlockIdExt masterchain_block_id,
|
virtual void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
|
||||||
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) = 0;
|
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) = 0;
|
||||||
virtual void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
|
virtual void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
|
||||||
std::vector<ExtMessage::Hash> to_delete) = 0;
|
std::vector<ExtMessage::Hash> to_delete) = 0;
|
||||||
virtual void complete_ihr_messages(std::vector<IhrMessage::Hash> to_delay,
|
virtual void complete_ihr_messages(std::vector<IhrMessage::Hash> to_delay,
|
||||||
|
|
|
@ -516,8 +516,8 @@ void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::
|
||||||
promise.set_result(ihr_messages_);
|
promise.set_result(ihr_messages_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ValidatorManagerImpl::get_shard_blocks(BlockIdExt masterchain_block_id,
|
void ValidatorManagerImpl::get_shard_blocks_for_collator(
|
||||||
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
|
BlockIdExt masterchain_block_id, td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
|
||||||
if (!last_masterchain_block_handle_) {
|
if (!last_masterchain_block_handle_) {
|
||||||
promise.set_result(std::vector<td::Ref<ShardTopBlockDescription>>{});
|
promise.set_result(std::vector<td::Ref<ShardTopBlockDescription>>{});
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -193,8 +193,8 @@ class ValidatorManagerImpl : public ValidatorManager {
|
||||||
td::Promise<td::Ref<MessageQueue>> promise) override;
|
td::Promise<td::Ref<MessageQueue>> promise) override;
|
||||||
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
|
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
|
||||||
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
|
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
|
||||||
void get_shard_blocks(BlockIdExt masterchain_block_id,
|
void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
|
||||||
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
|
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
|
||||||
void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
|
void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
|
||||||
std::vector<ExtMessage::Hash> to_delete) override;
|
std::vector<ExtMessage::Hash> to_delete) override;
|
||||||
void complete_ihr_messages(std::vector<IhrMessage::Hash> to_delay, std::vector<IhrMessage::Hash> to_delete) override;
|
void complete_ihr_messages(std::vector<IhrMessage::Hash> to_delay, std::vector<IhrMessage::Hash> to_delete) override;
|
||||||
|
|
|
@ -366,8 +366,8 @@ void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::
|
||||||
promise.set_result(ihr_messages_);
|
promise.set_result(ihr_messages_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ValidatorManagerImpl::get_shard_blocks(BlockIdExt masterchain_block_id,
|
void ValidatorManagerImpl::get_shard_blocks_for_collator(
|
||||||
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
|
BlockIdExt masterchain_block_id, td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ValidatorManagerImpl::get_block_data_from_db(ConstBlockHandle handle, td::Promise<td::Ref<BlockData>> promise) {
|
void ValidatorManagerImpl::get_block_data_from_db(ConstBlockHandle handle, td::Promise<td::Ref<BlockData>> promise) {
|
||||||
|
|
|
@ -233,7 +233,7 @@ class ValidatorManagerImpl : public ValidatorManager {
|
||||||
td::Promise<td::Ref<MessageQueue>> promise) override;
|
td::Promise<td::Ref<MessageQueue>> promise) override;
|
||||||
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
|
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
|
||||||
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
|
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
|
||||||
void get_shard_blocks(BlockIdExt masterchain_block_id,
|
void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
|
||||||
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
|
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
|
||||||
void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
|
void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
|
||||||
std::vector<ExtMessage::Hash> to_delete) override {
|
std::vector<ExtMessage::Hash> to_delete) override {
|
||||||
|
|
|
@ -441,7 +441,7 @@ void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{block_id.shard_full(), cc_seqno});
|
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{block_id.shard_full(), cc_seqno});
|
||||||
if (it != shard_blocks_.end() && block_id.id.seqno <= it->second->block_id().id.seqno) {
|
if (it != shard_blocks_.end() && block_id.id.seqno <= it->second.latest_desc->block_id().id.seqno) {
|
||||||
VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast";
|
VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -459,11 +459,11 @@ void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc
|
||||||
void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDescription> desc) {
|
void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDescription> desc) {
|
||||||
if (desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
|
if (desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
|
||||||
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{desc->shard(), desc->catchain_seqno()});
|
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{desc->shard(), desc->catchain_seqno()});
|
||||||
if (it != shard_blocks_.end() && desc->block_id().id.seqno <= it->second->block_id().id.seqno) {
|
if (it != shard_blocks_.end() && desc->block_id().id.seqno <= it->second.latest_desc->block_id().id.seqno) {
|
||||||
VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast";
|
VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}] = desc;
|
shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}].latest_desc = desc;
|
||||||
VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id();
|
VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id();
|
||||||
if (need_monitor(desc->block_id().shard_full())) {
|
if (need_monitor(desc->block_id().shard_full())) {
|
||||||
auto P = td::PromiseCreator::lambda([](td::Result<td::Ref<ShardState>> R) {
|
auto P = td::PromiseCreator::lambda([](td::Result<td::Ref<ShardState>> R) {
|
||||||
|
@ -478,13 +478,53 @@ void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDesc
|
||||||
});
|
});
|
||||||
wait_block_state_short(desc->block_id(), 0, td::Timestamp::in(60.0), std::move(P));
|
wait_block_state_short(desc->block_id(), 0, td::Timestamp::in(60.0), std::move(P));
|
||||||
}
|
}
|
||||||
if (collating_masterchain() && desc->generated_at() > td::Clocks::system() - 20) {
|
if (collating_masterchain()) {
|
||||||
wait_neighbor_msg_queue_proofs(ShardIdFull{masterchainId}, {desc->block_id()}, td::Timestamp::in(15.0),
|
preload_msg_queue_to_masterchain(desc);
|
||||||
[](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>>) {});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ValidatorManagerImpl::preload_msg_queue_to_masterchain(td::Ref<ShardTopBlockDescription> desc) {
|
||||||
|
auto id = ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()};
|
||||||
|
auto it = shard_blocks_.find(id);
|
||||||
|
if (!collating_masterchain() || it == shard_blocks_.end() || it->second.latest_desc->block_id() != desc->block_id()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
wait_neighbor_msg_queue_proofs(
|
||||||
|
ShardIdFull{masterchainId}, {desc->block_id()}, td::Timestamp::in(10.0),
|
||||||
|
[=, SelfId = actor_id(this),
|
||||||
|
retry_at = td::Timestamp::in(1.0)](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> R) {
|
||||||
|
if (R.is_error()) {
|
||||||
|
delay_action(
|
||||||
|
[=]() { td::actor::send_closure(SelfId, &ValidatorManagerImpl::preload_msg_queue_to_masterchain, desc); },
|
||||||
|
retry_at);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto res = R.move_as_ok();
|
||||||
|
auto &queue = res[desc->block_id()];
|
||||||
|
CHECK(queue.not_null());
|
||||||
|
td::actor::send_closure(SelfId, &ValidatorManagerImpl::loaded_msg_queue_to_masterchain, desc, std::move(queue));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void ValidatorManagerImpl::loaded_msg_queue_to_masterchain(td::Ref<ShardTopBlockDescription> desc,
|
||||||
|
td::Ref<OutMsgQueueProof> res) {
|
||||||
|
auto id = ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()};
|
||||||
|
auto it = shard_blocks_.find(id);
|
||||||
|
if (it == shard_blocks_.end()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto &info = it->second;
|
||||||
|
if (info.ready_desc.is_null() || info.ready_desc->block_id().seqno() < desc->block_id().seqno()) {
|
||||||
|
VLOG(VALIDATOR_DEBUG) << "loaded out msg queue to masterchain from " << desc->block_id();
|
||||||
|
if (info.ready_desc.not_null()) {
|
||||||
|
cached_msg_queue_to_masterchain_.erase(info.ready_desc->block_id());
|
||||||
|
}
|
||||||
|
info.ready_desc = desc;
|
||||||
|
cached_msg_queue_to_masterchain_[desc->block_id()] = std::move(res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void ValidatorManagerImpl::add_ext_server_id(adnl::AdnlNodeIdShort id) {
|
void ValidatorManagerImpl::add_ext_server_id(adnl::AdnlNodeIdShort id) {
|
||||||
class Cb : public adnl::Adnl::Callback {
|
class Cb : public adnl::Adnl::Callback {
|
||||||
private:
|
private:
|
||||||
|
@ -629,6 +669,56 @@ void ValidatorManagerImpl::wait_neighbor_msg_queue_proofs(
|
||||||
out_msg_queue_importer_ = td::actor::create_actor<OutMsgQueueImporter>("outmsgqueueimporter", actor_id(this), opts_,
|
out_msg_queue_importer_ = td::actor::create_actor<OutMsgQueueImporter>("outmsgqueueimporter", actor_id(this), opts_,
|
||||||
last_masterchain_state_);
|
last_masterchain_state_);
|
||||||
}
|
}
|
||||||
|
if (dst_shard.is_masterchain()) {
|
||||||
|
// We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries
|
||||||
|
// {dst_shard, {block_1}}, ..., {dst_shard, {block_n}}
|
||||||
|
// Also, use cache
|
||||||
|
class Worker : public td::actor::Actor {
|
||||||
|
public:
|
||||||
|
Worker(size_t pending, td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise)
|
||||||
|
: pending_(pending), promise_(std::move(promise)) {
|
||||||
|
CHECK(pending_ > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
void on_result(td::Ref<OutMsgQueueProof> res) {
|
||||||
|
result_[res->block_id_] = res;
|
||||||
|
if (--pending_ == 0) {
|
||||||
|
promise_.set_result(std::move(result_));
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void on_error(td::Status error) {
|
||||||
|
promise_.set_error(std::move(error));
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
size_t pending_;
|
||||||
|
td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise_;
|
||||||
|
std::map<BlockIdExt, td::Ref<OutMsgQueueProof>> result_;
|
||||||
|
};
|
||||||
|
auto worker = td::actor::create_actor<Worker>("queueworker", blocks.size(), std::move(promise)).release();
|
||||||
|
for (const BlockIdExt &block : blocks) {
|
||||||
|
auto it = cached_msg_queue_to_masterchain_.find(block);
|
||||||
|
if (it != cached_msg_queue_to_masterchain_.end()) {
|
||||||
|
td::actor::send_closure(worker, &Worker::on_result, it->second);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::get_neighbor_msg_queue_proofs, dst_shard,
|
||||||
|
std::vector<BlockIdExt>{1, block}, timeout,
|
||||||
|
[=](td::Result<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> R) {
|
||||||
|
if (R.is_error()) {
|
||||||
|
td::actor::send_closure(worker, &Worker::on_error, R.move_as_error());
|
||||||
|
} else {
|
||||||
|
auto res = R.move_as_ok();
|
||||||
|
CHECK(res.size() == 1);
|
||||||
|
td::actor::send_closure(worker, &Worker::on_result, res.begin()->second);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::get_neighbor_msg_queue_proofs, dst_shard,
|
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::get_neighbor_msg_queue_proofs, dst_shard,
|
||||||
std::move(blocks), timeout, std::move(promise));
|
std::move(blocks), timeout, std::move(promise));
|
||||||
}
|
}
|
||||||
|
@ -843,11 +933,14 @@ void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::
|
||||||
promise.set_value(std::move(res));
|
promise.set_value(std::move(res));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ValidatorManagerImpl::get_shard_blocks(BlockIdExt masterchain_block_id,
|
void ValidatorManagerImpl::get_shard_blocks_for_collator(
|
||||||
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
|
BlockIdExt masterchain_block_id, td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) {
|
||||||
std::vector<td::Ref<ShardTopBlockDescription>> v;
|
std::vector<td::Ref<ShardTopBlockDescription>> v;
|
||||||
for (auto &b : shard_blocks_) {
|
for (auto &b : shard_blocks_) {
|
||||||
v.push_back(b.second);
|
if (b.second.ready_desc.is_null()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
v.push_back(b.second.ready_desc);
|
||||||
}
|
}
|
||||||
promise.set_value(std::move(v));
|
promise.set_value(std::move(v));
|
||||||
}
|
}
|
||||||
|
@ -1786,7 +1879,7 @@ void ValidatorManagerImpl::new_masterchain_block() {
|
||||||
}
|
}
|
||||||
if (is_collator()) {
|
if (is_collator()) {
|
||||||
std::set<ShardIdFull> collating_shards;
|
std::set<ShardIdFull> collating_shards;
|
||||||
if (validating_masterchain()) {
|
if (collating_masterchain()) {
|
||||||
collating_shards.emplace(masterchainId);
|
collating_shards.emplace(masterchainId);
|
||||||
}
|
}
|
||||||
for (const auto &collator : collator_nodes_) {
|
for (const auto &collator : collator_nodes_) {
|
||||||
|
@ -2080,12 +2173,19 @@ void ValidatorManagerImpl::update_shard_blocks() {
|
||||||
auto it = shard_blocks_.begin();
|
auto it = shard_blocks_.begin();
|
||||||
while (it != shard_blocks_.end()) {
|
while (it != shard_blocks_.end()) {
|
||||||
auto &B = it->second;
|
auto &B = it->second;
|
||||||
if (!B->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
|
if (!B.latest_desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
|
||||||
auto it2 = it++;
|
if (B.ready_desc.not_null()) {
|
||||||
shard_blocks_.erase(it2);
|
cached_msg_queue_to_masterchain_.erase(B.ready_desc->block_id());
|
||||||
} else {
|
}
|
||||||
++it;
|
it = shard_blocks_.erase(it);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
if (B.ready_desc.not_null() &&
|
||||||
|
!B.ready_desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
|
||||||
|
cached_msg_queue_to_masterchain_.erase(B.ready_desc->block_id());
|
||||||
|
B.ready_desc = {};
|
||||||
|
}
|
||||||
|
++it;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -220,7 +220,15 @@ class ValidatorManagerImpl : public ValidatorManager {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// DATA FOR COLLATOR
|
// DATA FOR COLLATOR
|
||||||
std::map<ShardTopBlockDescriptionId, td::Ref<ShardTopBlockDescription>> shard_blocks_;
|
// Shard block will not be used until queue is ready (to avoid too long masterchain collation)
|
||||||
|
// latest_desc - latest known block
|
||||||
|
// ready_desc - block with ready msg queue (may be null)
|
||||||
|
struct ShardTopBlock {
|
||||||
|
td::Ref<ShardTopBlockDescription> latest_desc;
|
||||||
|
td::Ref<ShardTopBlockDescription> ready_desc;
|
||||||
|
};
|
||||||
|
std::map<ShardTopBlockDescriptionId, ShardTopBlock> shard_blocks_;
|
||||||
|
std::map<BlockIdExt, td::Ref<OutMsgQueueProof>> cached_msg_queue_to_masterchain_;
|
||||||
std::map<MessageId<ExtMessage>, std::unique_ptr<MessageExt<ExtMessage>>> ext_messages_;
|
std::map<MessageId<ExtMessage>, std::unique_ptr<MessageExt<ExtMessage>>> ext_messages_;
|
||||||
std::map<std::pair<ton::WorkchainId,ton::StdSmcAddress>, std::map<ExtMessage::Hash, MessageId<ExtMessage>>> ext_addr_messages_;
|
std::map<std::pair<ton::WorkchainId,ton::StdSmcAddress>, std::map<ExtMessage::Hash, MessageId<ExtMessage>>> ext_addr_messages_;
|
||||||
std::map<ExtMessage::Hash, MessageId<ExtMessage>> ext_messages_hashes_;
|
std::map<ExtMessage::Hash, MessageId<ExtMessage>> ext_messages_hashes_;
|
||||||
|
@ -410,8 +418,8 @@ class ValidatorManagerImpl : public ValidatorManager {
|
||||||
td::Promise<td::Ref<MessageQueue>> promise) override;
|
td::Promise<td::Ref<MessageQueue>> promise) override;
|
||||||
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
|
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
|
||||||
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
|
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
|
||||||
void get_shard_blocks(BlockIdExt masterchain_block_id,
|
void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
|
||||||
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
|
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
|
||||||
void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
|
void complete_external_messages(std::vector<ExtMessage::Hash> to_delay,
|
||||||
std::vector<ExtMessage::Hash> to_delete) override;
|
std::vector<ExtMessage::Hash> to_delete) override;
|
||||||
void complete_ihr_messages(std::vector<IhrMessage::Hash> to_delay, std::vector<IhrMessage::Hash> to_delete) override;
|
void complete_ihr_messages(std::vector<IhrMessage::Hash> to_delay, std::vector<IhrMessage::Hash> to_delete) override;
|
||||||
|
@ -493,6 +501,8 @@ class ValidatorManagerImpl : public ValidatorManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
void add_shard_block_description(td::Ref<ShardTopBlockDescription> desc);
|
void add_shard_block_description(td::Ref<ShardTopBlockDescription> desc);
|
||||||
|
void preload_msg_queue_to_masterchain(td::Ref<ShardTopBlockDescription> desc);
|
||||||
|
void loaded_msg_queue_to_masterchain(td::Ref<ShardTopBlockDescription> desc, td::Ref<OutMsgQueueProof> res);
|
||||||
|
|
||||||
void register_block_handle(BlockHandle handle);
|
void register_block_handle(BlockHandle handle);
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue