1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

optimistic out-msg-queue broadcast

This commit is contained in:
birydrad 2024-11-26 14:01:20 +04:00
parent 6df6f182bf
commit bf572f9599
24 changed files with 623 additions and 126 deletions

View file

@ -237,6 +237,7 @@ class Collator final : public td::actor::Actor {
bool store_out_msg_queue_size_ = false;
td::PerfWarningTimer perf_timer_;
td::PerfLog perf_log_;
//
block::Account* lookup_account(td::ConstBitPtr addr) const;
std::unique_ptr<block::Account> make_account_from(td::ConstBitPtr addr, Ref<vm::CellSlice> account,
@ -252,18 +253,18 @@ class Collator final : public td::actor::Actor {
bool fatal_error(int err_code, std::string err_msg);
bool fatal_error(std::string err_msg, int err_code = -666);
void check_pending();
void after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res);
void after_get_shard_state(int idx, td::Result<Ref<ShardState>> res);
void after_get_block_data(int idx, td::Result<Ref<BlockData>> res);
void after_get_shard_blocks(td::Result<std::vector<Ref<ShardTopBlockDescription>>> res);
void after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res, td::PerfLogAction token);
void after_get_shard_state(int idx, td::Result<Ref<ShardState>> res, td::PerfLogAction token);
void after_get_block_data(int idx, td::Result<Ref<BlockData>> res, td::PerfLogAction token);
void after_get_shard_blocks(td::Result<std::vector<Ref<ShardTopBlockDescription>>> res, td::PerfLogAction token);
bool preprocess_prev_mc_state();
bool register_mc_state(Ref<MasterchainStateQ> other_mc_state);
bool request_aux_mc_state(BlockSeqno seqno, Ref<MasterchainStateQ>& state);
Ref<MasterchainStateQ> get_aux_mc_state(BlockSeqno seqno) const;
void after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result<Ref<ShardState>> res);
void after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result<Ref<ShardState>> res, td::PerfLogAction token);
bool fix_one_processed_upto(block::MsgProcessedUpto& proc, const ton::ShardIdFull& owner);
bool fix_processed_upto(block::MsgProcessedUptoCollection& upto);
void got_neighbor_msg_queues(td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> R);
void got_neighbor_msg_queues(td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> R, td::PerfLogAction token);
void got_neighbor_msg_queue(unsigned i, Ref<OutMsgQueueProof> res);
void got_out_queue_size(size_t i, td::Result<td::uint64> res);
bool adjust_shard_config();
@ -309,7 +310,7 @@ class Collator final : public td::actor::Actor {
bool is_our_address(Ref<vm::CellSlice> addr_ref) const;
bool is_our_address(ton::AccountIdPrefixFull addr_prefix) const;
bool is_our_address(const ton::StdSmcAddress& addr) const;
void after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res);
void after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res, td::PerfLogAction token);
td::Result<bool> register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash,
int priority);
// td::Result<bool> register_external_message(td::Slice ext_msg_boc);

View file

@ -217,25 +217,30 @@ void Collator::start_up() {
// 2. learn latest masterchain state and block id
LOG(DEBUG) << "sending get_top_masterchain_state_block() to Manager";
++pending;
auto token = perf_log_.start_action("get_top_masterchain_state_block");
if (!is_hardfork_) {
td::actor::send_closure_later(manager, &ValidatorManager::get_top_masterchain_state_block,
[self = get_self()](td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res) {
[self = get_self(), token = std::move(token)](
td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res) mutable {
LOG(DEBUG) << "got answer to get_top_masterchain_state_block";
td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state,
std::move(res));
std::move(res), std::move(token));
});
} else {
td::actor::send_closure_later(
manager, &ValidatorManager::get_shard_state_from_db_short, min_mc_block_id,
[self = get_self(), block_id = min_mc_block_id](td::Result<Ref<ShardState>> res) {
LOG(DEBUG) << "got answer to get_top_masterchain_state_block";
if (res.is_error()) {
td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state, res.move_as_error());
} else {
td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state,
std::make_pair(Ref<MasterchainState>(res.move_as_ok()), block_id));
}
});
td::actor::send_closure_later(manager, &ValidatorManager::get_shard_state_from_db_short, min_mc_block_id,
[self = get_self(), block_id = min_mc_block_id,
token = std::move(token)](td::Result<Ref<ShardState>> res) mutable {
LOG(DEBUG) << "got answer to get_top_masterchain_state_block";
if (res.is_error()) {
td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state,
res.move_as_error(), std::move(token));
} else {
td::actor::send_closure_later(
std::move(self), &Collator::after_get_mc_state,
std::make_pair(Ref<MasterchainState>(res.move_as_ok()), block_id),
std::move(token));
}
});
}
}
// 3. load previous block(s) and corresponding state(s)
@ -245,23 +250,27 @@ void Collator::start_up() {
// 3.1. load state
LOG(DEBUG) << "sending wait_block_state() query #" << i << " for " << prev_blocks[i].to_str() << " to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::wait_block_state_short, prev_blocks[i], priority(),
timeout, [self = get_self(), i](td::Result<Ref<ShardState>> res) {
LOG(DEBUG) << "got answer to wait_block_state query #" << i;
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_state, i,
std::move(res));
});
auto token = perf_log_.start_action(PSTRING() << "wait_block_state #" << i);
td::actor::send_closure_later(
manager, &ValidatorManager::wait_block_state_short, prev_blocks[i], priority(), timeout,
[self = get_self(), i, token = std::move(token)](td::Result<Ref<ShardState>> res) mutable {
LOG(DEBUG) << "got answer to wait_block_state query #" << i;
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_state, i, std::move(res),
std::move(token));
});
if (prev_blocks[i].seqno()) {
// 3.2. load block
// NB: we need the block itself only for extracting start_lt and end_lt to create correct prev_blk:ExtBlkRef and related Merkle proofs
LOG(DEBUG) << "sending wait_block_data() query #" << i << " for " << prev_blocks[i].to_str() << " to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::wait_block_data_short, prev_blocks[i], priority(),
timeout, [self = get_self(), i](td::Result<Ref<BlockData>> res) {
LOG(DEBUG) << "got answer to wait_block_data query #" << i;
td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, i,
std::move(res));
});
auto token = perf_log_.start_action(PSTRING() << "wait_block_data #" << i);
td::actor::send_closure_later(
manager, &ValidatorManager::wait_block_data_short, prev_blocks[i], priority(), timeout,
[self = get_self(), i, token = std::move(token)](td::Result<Ref<BlockData>> res) mutable {
LOG(DEBUG) << "got answer to wait_block_data query #" << i;
td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, i, std::move(res),
std::move(token));
});
}
}
if (is_hardfork_) {
@ -271,22 +280,28 @@ void Collator::start_up() {
if (!is_hardfork_) {
LOG(DEBUG) << "sending get_external_messages() query to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::get_external_messages, shard_,
[self = get_self()](td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res) -> void {
auto token = perf_log_.start_action("get_external_messages");
td::actor::send_closure_later(
manager, &ValidatorManager::get_external_messages, shard_,
[self = get_self(),
token = std::move(token)](td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res) mutable -> void {
LOG(DEBUG) << "got answer to get_external_messages() query";
td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res));
td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res),
std::move(token));
});
}
if (is_masterchain() && !is_hardfork_) {
// 5. load shard block info messages
LOG(DEBUG) << "sending get_shard_blocks_for_collator() query to Manager";
++pending;
td::actor::send_closure_later(
manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0],
[self = get_self()](td::Result<std::vector<Ref<ShardTopBlockDescription>>> res) -> void {
LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query";
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks, std::move(res));
});
auto token = perf_log_.start_action("get_shard_blocks_for_collator");
td::actor::send_closure_later(manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0],
[self = get_self(), token = std::move(token)](
td::Result<std::vector<Ref<ShardTopBlockDescription>>> res) mutable -> void {
LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query";
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks,
std::move(res), std::move(token));
});
}
// 6. set timeout
alarm_timestamp() = timeout;
@ -362,6 +377,8 @@ bool Collator::fatal_error(td::Status error) {
td::Timestamp::in(10.0), std::move(main_promise), std::move(cancellation_token_), mode_,
attempt_idx_ + 1);
} else {
LOG(INFO) << "collation failed in " << perf_timer_.elapsed() << " s " << error;
LOG(INFO) << perf_log_;
main_promise(std::move(error));
}
busy_ = false;
@ -482,12 +499,14 @@ bool Collator::request_aux_mc_state(BlockSeqno seqno, Ref<MasterchainStateQ>& st
CHECK(blkid.is_valid_ext() && blkid.is_masterchain());
LOG(DEBUG) << "sending auxiliary wait_block_state() query for " << blkid.to_str() << " to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::wait_block_state_short, blkid, priority(), timeout,
[self = get_self(), blkid](td::Result<Ref<ShardState>> res) {
LOG(DEBUG) << "got answer to wait_block_state query for " << blkid.to_str();
td::actor::send_closure_later(std::move(self), &Collator::after_get_aux_shard_state,
blkid, std::move(res));
});
auto token = perf_log_.start_action(PSTRING() << "auxiliary wait_block_state " << blkid.to_str());
td::actor::send_closure_later(
manager, &ValidatorManager::wait_block_state_short, blkid, priority(), timeout,
[self = get_self(), blkid, token = std::move(token)](td::Result<Ref<ShardState>> res) mutable {
LOG(DEBUG) << "got answer to wait_block_state query for " << blkid.to_str();
td::actor::send_closure_later(std::move(self), &Collator::after_get_aux_shard_state, blkid, std::move(res),
std::move(token));
});
state.clear();
return true;
}
@ -515,9 +534,11 @@ Ref<MasterchainStateQ> Collator::get_aux_mc_state(BlockSeqno seqno) const {
* @param blkid The BlockIdExt of the shard state.
* @param res The result of retrieving the shard state.
*/
void Collator::after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result<Ref<ShardState>> res) {
void Collator::after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result<Ref<ShardState>> res,
td::PerfLogAction token) {
LOG(DEBUG) << "in Collator::after_get_aux_shard_state(" << blkid.to_str() << ")";
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error("cannot load auxiliary masterchain state for "s + blkid.to_str() + " : " +
res.move_as_error().to_string());
@ -579,9 +600,11 @@ bool Collator::preprocess_prev_mc_state() {
*
* @param res The retrieved masterchain state.
*/
void Collator::after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res) {
void Collator::after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res,
td::PerfLogAction token) {
LOG(WARNING) << "in Collator::after_get_mc_state()";
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error(res.move_as_error());
return;
@ -598,12 +621,14 @@ void Collator::after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, Bl
// NB. it is needed only for creating a correct ExtBlkRef reference to it, which requires start_lt and end_lt
LOG(DEBUG) << "sending wait_block_data() query #-1 for " << mc_block_id_.to_str() << " to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::wait_block_data_short, mc_block_id_, priority(), timeout,
[self = get_self()](td::Result<Ref<BlockData>> res) {
LOG(DEBUG) << "got answer to wait_block_data query #-1";
td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, -1,
std::move(res));
});
auto token = perf_log_.start_action("wait_block_data #-1");
td::actor::send_closure_later(
manager, &ValidatorManager::wait_block_data_short, mc_block_id_, priority(), timeout,
[self = get_self(), token = std::move(token)](td::Result<Ref<BlockData>> res) mutable {
LOG(DEBUG) << "got answer to wait_block_data query #-1";
td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, -1, std::move(res),
std::move(token));
});
}
check_pending();
}
@ -614,9 +639,10 @@ void Collator::after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, Bl
* @param idx The index of the previous shard block (0 or 1).
* @param res The retrieved shard state.
*/
void Collator::after_get_shard_state(int idx, td::Result<Ref<ShardState>> res) {
void Collator::after_get_shard_state(int idx, td::Result<Ref<ShardState>> res, td::PerfLogAction token) {
LOG(WARNING) << "in Collator::after_get_shard_state(" << idx << ")";
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error(res.move_as_error());
return;
@ -647,9 +673,10 @@ void Collator::after_get_shard_state(int idx, td::Result<Ref<ShardState>> res) {
* @param idx The index of the previous block (0 or 1).
* @param res The retreved block data.
*/
void Collator::after_get_block_data(int idx, td::Result<Ref<BlockData>> res) {
void Collator::after_get_block_data(int idx, td::Result<Ref<BlockData>> res, td::PerfLogAction token) {
LOG(DEBUG) << "in Collator::after_get_block_data(" << idx << ")";
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error(res.move_as_error());
return;
@ -691,8 +718,10 @@ void Collator::after_get_block_data(int idx, td::Result<Ref<BlockData>> res) {
*
* @param res The retrieved shard block descriptions.
*/
void Collator::after_get_shard_blocks(td::Result<std::vector<Ref<ShardTopBlockDescription>>> res) {
void Collator::after_get_shard_blocks(td::Result<std::vector<Ref<ShardTopBlockDescription>>> res,
td::PerfLogAction token) {
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error(res.move_as_error());
return;
@ -848,10 +877,13 @@ bool Collator::request_neighbor_msg_queues() {
++i;
}
++pending;
auto token = perf_log_.start_action("neighbor_msg_queues");
td::actor::send_closure_later(
manager, &ValidatorManager::wait_neighbor_msg_queue_proofs, shard_, std::move(top_blocks), timeout,
[self = get_self()](td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> res) {
td::actor::send_closure_later(std::move(self), &Collator::got_neighbor_msg_queues, std::move(res));
[self = get_self(),
token = std::move(token)](td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> res) mutable {
td::actor::send_closure_later(std::move(self), &Collator::got_neighbor_msg_queues, std::move(res),
std::move(token));
});
return true;
}
@ -883,13 +915,15 @@ bool Collator::request_out_msg_queue_size() {
* @param i The index of the neighbor.
* @param res The obtained outbound queue.
*/
void Collator::got_neighbor_msg_queues(td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> R) {
void Collator::got_neighbor_msg_queues(td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> R,
td::PerfLogAction token) {
--pending;
double duration = token.finish(R);
if (R.is_error()) {
fatal_error(R.move_as_error_prefix("failed to get neighbor msg queues: "));
return;
}
LOG(INFO) << "neighbor output queues fetched";
LOG(INFO) << "neighbor output queues fetched, took " << duration << "s";
auto res = R.move_as_ok();
unsigned i = 0;
for (block::McShardDescr& descr : neighbors_) {
@ -2091,12 +2125,9 @@ bool Collator::init_lt() {
* @returns True if the configuration parameters were successfully fetched and initialized, false otherwise.
*/
bool Collator::fetch_config_params() {
auto res = block::FetchConfigParams::fetch_config_params(*config_,
&old_mparams_, &storage_prices_, &storage_phase_cfg_,
&rand_seed_, &compute_phase_cfg_, &action_phase_cfg_,
&masterchain_create_fee_, &basechain_create_fee_,
workchain(), now_
);
auto res = block::FetchConfigParams::fetch_config_params(
*config_, &old_mparams_, &storage_prices_, &storage_phase_cfg_, &rand_seed_, &compute_phase_cfg_,
&action_phase_cfg_, &masterchain_create_fee_, &basechain_create_fee_, workchain(), now_);
if (res.is_error()) {
return fatal_error(res.move_as_error());
}
@ -2217,6 +2248,11 @@ bool Collator::init_value_create() {
bool Collator::do_collate() {
// After do_collate started it will not be interrupted by timeout
alarm_timestamp() = td::Timestamp::never();
auto token = perf_log_.start_action("do_collate");
td::Status status = td::Status::Error("some error");
SCOPE_EXIT {
token.finish(status);
};
LOG(WARNING) << "do_collate() : start";
if (!fetch_config_params()) {
@ -2342,6 +2378,7 @@ bool Collator::do_collate() {
if (!create_block_candidate()) {
return fatal_error("cannot serialize a new Block candidate");
}
status = td::Status::OK();
return true;
}
@ -5811,12 +5848,43 @@ bool Collator::create_block_candidate() {
<< block_limit_status_->transactions;
LOG(INFO) << "serialized collated data size " << cdata_slice.size() << " bytes (preliminary estimate was "
<< block_limit_status_->collated_data_stat.estimate_proof_size() << ")";
auto new_block_id_ext = ton::BlockIdExt{ton::BlockId{shard_, new_block_seqno}, new_block->get_hash().bits(),
block::compute_file_hash(blk_slice.as_slice())};
// 3. create a BlockCandidate
block_candidate = std::make_unique<BlockCandidate>(
created_by_,
ton::BlockIdExt{ton::BlockId{shard_, new_block_seqno}, new_block->get_hash().bits(),
block::compute_file_hash(blk_slice.as_slice())},
block::compute_file_hash(cdata_slice.as_slice()), blk_slice.clone(), cdata_slice.clone());
block_candidate =
std::make_unique<BlockCandidate>(created_by_, new_block_id_ext, block::compute_file_hash(cdata_slice.as_slice()),
blk_slice.clone(), cdata_slice.clone());
const bool need_out_msg_queue_broadcasts = true;
if (need_out_msg_queue_broadcasts) {
// we can't generate two proofs at the same time for the same root (it is not currently supported by cells)
// so we have can't reuse new state and have to regenerate it with merkle update
auto new_state = vm::MerkleUpdate::apply(prev_state_root_pure_, state_update);
CHECK(new_state.not_null());
CHECK(new_state->get_hash() == state_root->get_hash());
assert(config_ && shard_conf_);
auto neighbor_list = shard_conf_->get_neighbor_shard_hash_ids(shard_);
LOG(INFO) << "Build OutMsgQueueProofs for " << neighbor_list.size() << " neighbours";
for (ton::BlockId blk_id : neighbor_list) {
auto prefix = blk_id.shard_full();
auto limits = mc_state_->get_imported_msg_queue_limits(blk_id.workchain);
// one could use monitor_min_split_depth here, to decrease number of broadcasts
// but current implementation OutMsgQueueImporter doesn't support it
auto r_proof = OutMsgQueueProof::build(
prefix, {OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}}, limits);
if (r_proof.is_ok()) {
auto proof = r_proof.move_as_ok();
block_candidate->out_msg_queue_proof_broadcasts.push_back(td::Ref<OutMsgQueueProofBroadcast>(
true, OutMsgQueueProofBroadcast(prefix, new_block_id_ext, limits.max_bytes, limits.max_msgs,
std::move(proof->queue_proofs_), std::move(proof->block_state_proofs_),
std::move(proof->msg_counts_))));
} else {
LOG(ERROR) << "Failed to build OutMsgQueueProof: " << r_proof.error();
}
}
}
// 3.1 check block and collated data size
auto consensus_config = config_->get_consensus_config();
if (block_candidate->data.size() > consensus_config.max_block_size) {
@ -5896,6 +5964,7 @@ void Collator::return_block_candidate(td::Result<td::Unit> saved) {
CHECK(block_candidate);
LOG(WARNING) << "sending new BlockCandidate to Promise";
LOG(WARNING) << "collation took " << perf_timer_.elapsed() << " s";
LOG(WARNING) << perf_log_;
main_promise(block_candidate->clone());
busy_ = false;
stop();
@ -5974,9 +6043,11 @@ td::Result<bool> Collator::register_external_message_cell(Ref<vm::Cell> ext_msg,
*
* @param res The result of the external message retrieval operation.
*/
void Collator::after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res) {
void Collator::after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res,
td::PerfLogAction token) {
// res: pair {ext msg, priority}
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error(res.move_as_error());
return;

View file

@ -320,13 +320,23 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
if (prefix.pfx_len() > min_split) {
prefix = shard_prefix(prefix, min_split);
}
new_queries[prefix].push_back(block);
LOG(INFO) << "search for out msg queue proof " << prefix.to_str() << block.to_str();
auto& small_entry = small_cache_[std::make_pair(dst_shard, block)];
if (!small_entry.result.is_null()) {
entry->result[block] = small_entry.result;
entry->from_small_cache++;
alarm_timestamp().relax(small_entry.timeout = td::Timestamp::in(CACHE_TTL));
} else {
small_entry.pending_entries.push_back(entry);
++entry->pending;
new_queries[prefix].push_back(block);
}
}
};
auto limits = last_masterchain_state_->get_imported_msg_queue_limits(dst_shard.workchain);
for (auto& p : new_queries) {
for (size_t i = 0; i < p.second.size(); i += 16) {
++entry->pending;
size_t j = std::min(i + 16, p.second.size());
get_proof_import(entry, std::vector<BlockIdExt>(p.second.begin() + i, p.second.begin() + j),
limits * (td::uint32)(j - i));
@ -355,7 +365,7 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr<CacheEntry> entry, Blo
if (block.seqno() == 0) {
std::vector<td::Ref<OutMsgQueueProof>> proof = {
td::Ref<OutMsgQueueProof>(true, block, state->root_cell(), td::Ref<vm::Cell>{})};
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof));
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof), ProofSource::Local);
return;
}
td::actor::send_closure(
@ -371,7 +381,8 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr<CacheEntry> entry, Blo
Ref<vm::Cell> block_state_proof = create_block_state_proof(R.ok()->root_cell()).move_as_ok();
std::vector<td::Ref<OutMsgQueueProof>> proof = {
td::Ref<OutMsgQueueProof>(true, block, state->root_cell(), std::move(block_state_proof))};
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof));
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof),
ProofSource::Local);
});
});
}
@ -395,27 +406,70 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr<CacheEntry> entry, st
retry_after);
return;
}
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, R.move_as_ok());
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, R.move_as_ok(), ProofSource::Query);
});
}
void OutMsgQueueImporter::got_proof(std::shared_ptr<CacheEntry> entry, std::vector<td::Ref<OutMsgQueueProof>> proofs) {
void OutMsgQueueImporter::got_proof(std::shared_ptr<CacheEntry> entry, std::vector<td::Ref<OutMsgQueueProof>> proofs,
ProofSource proof_source) {
if (!check_timeout(entry)) {
return;
}
// TODO: maybe save proof to small cache? It would allow other queries to reuse this result
for (auto& p : proofs) {
entry->result[p->block_id_] = std::move(p);
}
CHECK(entry->pending > 0);
if (--entry->pending == 0) {
finish_query(entry);
auto block_id = p->block_id_;
if (entry->result.emplace(block_id, std::move(p)).second) {
CHECK(entry->pending > 0);
switch (proof_source) {
case ProofSource::SmallCache:
entry->from_small_cache++;
break;
case ProofSource::Broadcast:
entry->from_broadcast++;
break;
case ProofSource::Query:
entry->from_query++;
break;
case ProofSource::Local:
entry->from_local++;
break;
}
if (--entry->pending == 0) {
finish_query(entry);
}
}
}
}
void OutMsgQueueImporter::finish_query(std::shared_ptr<CacheEntry> entry) {
LOG(DEBUG) << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", "
<< entry->blocks.size() << " blocks in " << entry->timer.elapsed() << "s";
FLOG(INFO) {
sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " << entry->blocks.size()
<< " blocks in " << entry->timer.elapsed() << "s";
sb << " sources{";
if (entry->from_broadcast) {
sb << " broadcast=" << entry->from_broadcast;
}
if (entry->from_small_cache) {
sb << " small_cache=" << entry->from_small_cache;
}
if (entry->from_local) {
sb << " local=" << entry->from_local;
}
if (entry->from_query) {
sb << " query=" << entry->from_query;
}
sb << "}";
if (!small_cache_.empty()) {
sb << " small_cache_size=" << small_cache_.size();
}
if (!cache_.empty()) {
sb << " cache_size=" << cache_.size();
}
};
entry->done = true;
CHECK(entry->blocks.size() == entry->result.size());
alarm_timestamp().relax(entry->timeout = td::Timestamp::in(CACHE_TTL));
for (auto& p : entry->promises) {
p.first.set_result(entry->result);
@ -441,8 +495,7 @@ bool OutMsgQueueImporter::check_timeout(std::shared_ptr<CacheEntry> entry) {
}
void OutMsgQueueImporter::alarm() {
auto it = cache_.begin();
while (it != cache_.end()) {
for (auto it = cache_.begin(); it != cache_.end();) {
auto& promises = it->second->promises;
if (it->second->timeout.is_in_past()) {
if (!it->second->done) {
@ -469,6 +522,36 @@ void OutMsgQueueImporter::alarm() {
promises.resize(j);
++it;
}
for (auto it = small_cache_.begin(); it != small_cache_.end();) {
td::remove_if(it->second.pending_entries,
[](const std::shared_ptr<CacheEntry>& entry) { return entry->done || entry->promises.empty(); });
if (it->second.timeout.is_in_past()) {
if (it->second.pending_entries.empty()) {
it = small_cache_.erase(it);
} else {
++it;
}
} else {
alarm_timestamp().relax(it->second.timeout);
++it;
}
}
}
void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof) {
LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << proof->block_id_.to_str();
auto& small_entry = small_cache_[std::make_pair(dst_shard, proof->block_id_)];
if (!small_entry.result.is_null()) {
return;
}
alarm_timestamp().relax(small_entry.timeout = td::Timestamp::in(CACHE_TTL));
small_entry.result = proof;
CHECK(proof.not_null());
auto pending_entries = std::move(small_entry.pending_entries);
for (auto& entry : pending_entries) {
got_proof(entry, {proof}, ProofSource::Broadcast);
}
}
void BuildOutMsgQueueProof::abort_query(td::Status reason) {

View file

@ -41,6 +41,7 @@ class OutMsgQueueImporter : public td::actor::Actor {
void new_masterchain_block_notification(td::Ref<MasterchainState> state, std::set<ShardIdFull> collating_shards);
void get_neighbor_msg_queue_proofs(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks, td::Timestamp timeout,
td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise);
void add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof);
void update_options(td::Ref<ValidatorManagerOptions> opts) {
opts_ = std::move(opts);
@ -62,13 +63,28 @@ class OutMsgQueueImporter : public td::actor::Actor {
td::Timestamp timeout = td::Timestamp::never();
td::Timer timer;
size_t pending = 0;
size_t from_small_cache = 0;
size_t from_broadcast = 0;
size_t from_query = 0;
size_t from_local = 0;
};
std::map<std::pair<ShardIdFull, std::vector<BlockIdExt>>, std::shared_ptr<CacheEntry>> cache_;
// This cache has smaller granularity, proof is stored for each block separately
struct SmallCacheEntry {
td::Ref<OutMsgQueueProof> result;
std::vector<std::shared_ptr<CacheEntry>> pending_entries;
td::Timestamp timeout = td::Timestamp::never();
};
std::map<std::pair<ShardIdFull, BlockIdExt>, SmallCacheEntry> small_cache_;
void get_proof_local(std::shared_ptr<CacheEntry> entry, BlockIdExt block);
void get_proof_import(std::shared_ptr<CacheEntry> entry, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits);
void got_proof(std::shared_ptr<CacheEntry> entry, std::vector<td::Ref<OutMsgQueueProof>> proofs);
enum class ProofSource {
SmallCache, Broadcast, Query, Local
};
void got_proof(std::shared_ptr<CacheEntry> entry, std::vector<td::Ref<OutMsgQueueProof>> proofs, ProofSource proof_source);
void finish_query(std::shared_ptr<CacheEntry> entry);
bool check_timeout(std::shared_ptr<CacheEntry> entry);