1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Out msg queues: improve logs, various small changes

This commit is contained in:
SpyCheese 2024-11-27 18:12:23 +03:00
parent 3dce9d11d9
commit 5d79855c94
9 changed files with 106 additions and 79 deletions

View file

@ -428,15 +428,14 @@ struct Ed25519_PublicKey {
struct OutMsgQueueProofBroadcast : public td::CntObject {
OutMsgQueueProofBroadcast(ShardIdFull dst_shard, BlockIdExt block_id, td::int32 max_bytes, td::int32 max_msgs,
td::BufferSlice queue_proofs, td::BufferSlice block_state_proofs,
std::vector<std::int32_t> msg_counts)
td::BufferSlice queue_proof, td::BufferSlice block_state_proof, int msg_count)
: dst_shard(std::move(dst_shard))
, block_id(block_id)
, max_bytes(max_bytes)
, max_msgs(max_msgs)
, queue_proofs(std::move(queue_proofs))
, block_state_proofs(std::move(block_state_proofs))
, msg_counts(std::move(msg_counts)) {
, queue_proofs(std::move(queue_proof))
, block_state_proofs(std::move(block_state_proof))
, msg_count(std::move(msg_count)) {
}
ShardIdFull dst_shard;
BlockIdExt block_id;
@ -448,11 +447,11 @@ struct OutMsgQueueProofBroadcast : public td::CntObject {
// outMsgQueueProof
td::BufferSlice queue_proofs;
td::BufferSlice block_state_proofs;
std::vector<std::int32_t> msg_counts;
int msg_count;
virtual OutMsgQueueProofBroadcast* make_copy() const {
OutMsgQueueProofBroadcast* make_copy() const override {
return new OutMsgQueueProofBroadcast(dst_shard, block_id, max_bytes, max_msgs, queue_proofs.clone(),
block_state_proofs.clone(), msg_counts);
block_state_proofs.clone(), msg_count);
}
};

View file

@ -179,9 +179,8 @@ struct EndValidatorGroupStats {
};
struct BlockSourceInfo {
td::uint32 round, first_block_round;
PublicKey source;
td::int32 source_priority;
BlockCandidatePriority priority;
};
} // namespace validatorsession

View file

@ -553,8 +553,8 @@ void ValidatorSessionImpl::check_generate_slot() {
LOG(WARNING) << print_id << ": failed to generate block candidate: " << R.move_as_error();
}
});
callback_->on_generate_slot(
BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(local_idx()), priority},
callback_->on_generate_slot(BlockSourceInfo{description().get_source_public_key(local_idx()),
BlockCandidatePriority{cur_round_, first_block_round_, priority}},
std::move(P));
} else {
alarm_timestamp().relax(t);
@ -634,8 +634,9 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) {
pending_approve_.insert(block_id);
callback_->on_candidate(
BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(block->get_src_idx()),
description().get_node_priority(block->get_src_idx(), cur_round_)},
BlockSourceInfo{description().get_source_public_key(block->get_src_idx()),
BlockCandidatePriority{cur_round_, first_block_round_,
description().get_node_priority(block->get_src_idx(), cur_round_)}},
B->root_hash_, B->data_.clone(), B->collated_data_.clone(), std::move(P));
} else if (T.is_in_past()) {
if (!active_requests_.count(block_id)) {
@ -909,9 +910,10 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) {
stats.rounds.pop_back();
}
BlockSourceInfo source_info{cur_round_, first_block_round_,
BlockSourceInfo source_info{
description().get_source_public_key(block->get_src_idx()),
description().get_node_priority(block->get_src_idx(), cur_round_)};
BlockCandidatePriority{cur_round_, first_block_round_,
description().get_node_priority(block->get_src_idx(), cur_round_)}};
if (it == blocks_.end()) {
callback_->on_block_committed(std::move(source_info), block->get_root_hash(), block->get_file_hash(),
td::BufferSlice(), std::move(export_sigs), std::move(export_approve_sigs),

View file

@ -47,6 +47,9 @@ void FullNodeFastSyncOverlay::process_block_broadcast(PublicKeyHash src, ton_api
}
void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast &query) {
if (src == local_id_.pubkey_hash()) {
return; // dropping broadcast from self
}
BlockIdExt block_id = create_block_id(query.block_);
ShardIdFull shard_id = create_shard_id(query.dst_shard_);
if (query.proof_->get_id() != ton_api::tonNode_outMsgQueueProof::ID) {
@ -68,7 +71,8 @@ void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonN
}
auto proof = std::move(R.move_as_ok()[0]);
LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast " << shard_id.to_str() << " " << block_id.to_str();
LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast to " << shard_id.to_str() << " from " << block_id.to_str()
<< ", msgs=" << proof->msg_count_ << ", size=" << tl_proof->queue_proofs_.size();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::add_out_msg_queue_proof, shard_id,
std::move(proof));
}
@ -236,9 +240,10 @@ void FullNodeFastSyncOverlay::send_out_msg_queue_proof_broadcast(td::Ref<OutMsgQ
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(broadcast->max_bytes, broadcast->max_msgs),
create_tl_object<ton_api::tonNode_outMsgQueueProof>(broadcast->queue_proofs.clone(),
broadcast->block_state_proofs.clone(),
std::vector<std::int32_t>(broadcast->msg_counts)));
VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay: " << broadcast->dst_shard.to_str() << " "
<< broadcast->block_id.to_str();
std::vector<td::int32>(1, broadcast->msg_count)));
VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay to " << broadcast->dst_shard.to_str()
<< " from " << broadcast->block_id.to_str() << ", msgs=" << broadcast->msg_count
<< " bytes=" << broadcast->queue_proofs.size();
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
}

View file

@ -703,8 +703,13 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
promise.set_result(serialize_tl_object(R.move_as_ok(), true));
}
});
VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof (" << blocks.size() << " blocks) to shard "
<< dst_shard.to_str() << " from " << src;
FLOG(DEBUG) {
sb << "Got query getOutMsgQueueProof to shard " << dst_shard.to_str() << " from blocks";
for (const BlockIdExt &id : blocks) {
sb << " " << id.id.to_str();
}
sb << " from " << src;
};
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", dst_shard, std::move(blocks), limits,
validator_manager_, std::move(P))
.release();

View file

@ -5854,33 +5854,39 @@ bool Collator::create_block_candidate() {
block_candidate =
std::make_unique<BlockCandidate>(created_by_, new_block_id_ext, block::compute_file_hash(cdata_slice.as_slice()),
blk_slice.clone(), cdata_slice.clone());
const bool need_out_msg_queue_broadcasts = true;
const bool need_out_msg_queue_broadcasts = !is_masterchain();
if (need_out_msg_queue_broadcasts) {
// we can't generate two proofs at the same time for the same root (it is not currently supported by cells)
// so we have can't reuse new state and have to regenerate it with merkle update
auto new_state = vm::MerkleUpdate::apply(prev_state_root_pure_, state_update);
CHECK(new_state.not_null());
CHECK(new_state->get_hash() == state_root->get_hash());
assert(config_ && shard_conf_);
CHECK(shard_conf_);
auto neighbor_list = shard_conf_->get_neighbor_shard_hash_ids(shard_);
LOG(INFO) << "Build OutMsgQueueProofs for " << neighbor_list.size() << " neighbours";
for (ton::BlockId blk_id : neighbor_list) {
for (BlockId blk_id : neighbor_list) {
auto prefix = blk_id.shard_full();
if (shard_intersects(prefix, shard_)) {
continue;
}
auto limits = mc_state_->get_imported_msg_queue_limits(blk_id.workchain);
// one could use monitor_min_split_depth here, to decrease number of broadcasts
// but current implementation OutMsgQueueImporter doesn't support it
auto r_proof = OutMsgQueueProof::build(
prefix, {OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}}, limits);
prefix,
{OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}},
limits);
if (r_proof.is_ok()) {
auto proof = r_proof.move_as_ok();
CHECK(proof->msg_counts_.size() == 1);
block_candidate->out_msg_queue_proof_broadcasts.push_back(td::Ref<OutMsgQueueProofBroadcast>(
true, OutMsgQueueProofBroadcast(prefix, new_block_id_ext, limits.max_bytes, limits.max_msgs,
std::move(proof->queue_proofs_), std::move(proof->block_state_proofs_),
std::move(proof->msg_counts_))));
proof->msg_counts_[0])));
} else {
LOG(ERROR) << "Failed to build OutMsgQueueProof: " << r_proof.error();
LOG(ERROR) << "Failed to build OutMsgQueueProof to " << prefix.to_str() << ": " << r_proof.error();
}
}
}

View file

@ -92,29 +92,6 @@ static td::Result<std::vector<td::int32>> process_queue(
++msg_count[kv->source];
++msg_count_total;
// TODO: Get processed_upto from destination shard (in request?)
/*
// Parse message to check if it was processed (as in Collator::process_inbound_message)
ton::LogicalTime enqueued_lt = kv->msg->prefetch_ulong(64);
auto msg_env = kv->msg->prefetch_ref();
block::tlb::MsgEnvelope::Record_std env;
if (!tlb::unpack_cell(msg_env, env)) {
return td::Status::Error("cannot unpack MsgEnvelope of an internal message");
}
vm::CellSlice cs{vm::NoVmOrd{}, env.msg};
block::gen::CommonMsgInfo::Record_int_msg_info info;
if (!tlb::unpack(cs, info)) {
return td::Status::Error("cannot unpack CommonMsgInfo of an internal message");
}
auto src_prefix = block::tlb::MsgAddressInt::get_prefix(info.src);
auto dest_prefix = block::tlb::MsgAddressInt::get_prefix(info.dest);
auto cur_prefix = block::interpolate_addr(src_prefix, dest_prefix, env.cur_addr);
auto next_prefix = block::interpolate_addr(src_prefix, dest_prefix, env.next_addr);
block::EnqueuedMsgDescr descr{cur_prefix, next_prefix, kv->lt, enqueued_lt, env.msg->get_hash().bits()};
if (dst_processed_upto->already_processed(descr)) {
} else {
}*/
dfs_cs(*kv->msg);
TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ")
if (estimated_proof_size >= limits.max_bytes || msg_count_total >= (long long)limits.max_msgs) {
@ -301,7 +278,12 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
return;
}
LOG(DEBUG) << "Importing neighbor msg queues for shard " << dst_shard.to_str() << ", " << blocks.size() << " blocks";
FLOG(DEBUG) {
sb << "Importing neighbor msg queues for shard " << dst_shard.to_str() << ", " << blocks.size() << " blocks:";
for (const BlockIdExt& block : blocks) {
sb << " " << block.id.to_str();
}
};
cache_[{dst_shard, blocks}] = entry = std::make_shared<CacheEntry>();
entry->dst_shard = dst_shard;
@ -321,7 +303,7 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
prefix = shard_prefix(prefix, min_split);
}
LOG(INFO) << "search for out msg queue proof " << prefix.to_str() << block.to_str();
LOG(DEBUG) << "search for out msg queue proof " << prefix.to_str() << " " << block.to_str();
auto& small_entry = small_cache_[std::make_pair(dst_shard, block)];
if (!small_entry.result.is_null()) {
entry->result[block] = small_entry.result;
@ -397,7 +379,13 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr<CacheEntry> entry, st
[=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.1),
dst_shard = entry->dst_shard](td::Result<std::vector<td::Ref<OutMsgQueueProof>>> R) {
if (R.is_error()) {
LOG(DEBUG) << "Failed to get out msg queue for " << dst_shard.to_str() << ": " << R.move_as_error();
FLOG(DEBUG) {
sb << "Failed to get out msg queue for " << dst_shard.to_str() << " from";
for (const BlockIdExt &block : blocks) {
sb << " " << block.id.to_str();
}
sb << ": " << R.move_as_error();
};
delay_action(
[=]() {
td::actor::send_closure(SelfId, &OutMsgQueueImporter::get_proof_import, entry, std::move(blocks),
@ -443,8 +431,11 @@ void OutMsgQueueImporter::got_proof(std::shared_ptr<CacheEntry> entry, std::vect
void OutMsgQueueImporter::finish_query(std::shared_ptr<CacheEntry> entry) {
FLOG(INFO) {
sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " << entry->blocks.size()
<< " blocks in " << entry->timer.elapsed() << "s";
sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << " from";
for (const BlockIdExt &block : entry->blocks) {
sb << " " << block.id.to_str();
}
sb << " in " << entry->timer.elapsed() << "s";
sb << " sources{";
if (entry->from_broadcast) {
sb << " broadcast=" << entry->from_broadcast;
@ -479,8 +470,13 @@ void OutMsgQueueImporter::finish_query(std::shared_ptr<CacheEntry> entry) {
bool OutMsgQueueImporter::check_timeout(std::shared_ptr<CacheEntry> entry) {
if (entry->timeout.is_in_past()) {
LOG(DEBUG) << "Aborting importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", "
<< entry->blocks.size() << " blocks: timeout";
FLOG(DEBUG) {
sb << "Aborting importing neighbor msg queues for shard " << entry->dst_shard.to_str() << " from";
for (const BlockIdExt &block : entry->blocks) {
sb << " " << block.id.to_str();
}
sb << ": timeout";
};
for (auto& p : entry->promises) {
p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout"));
}
@ -499,8 +495,13 @@ void OutMsgQueueImporter::alarm() {
auto& promises = it->second->promises;
if (it->second->timeout.is_in_past()) {
if (!it->second->done) {
LOG(DEBUG) << "Aborting importing neighbor msg queues for shard " << it->second->dst_shard.to_str() << ", "
<< it->second->blocks.size() << " blocks: timeout";
FLOG(DEBUG) {
sb << "Aborting importing neighbor msg queues for shard " << it->second->dst_shard.to_str() << " from";
for (const BlockIdExt &block : it->second->blocks) {
sb << " " << block.id.to_str();
}
sb << ": timeout";
};
for (auto& p : promises) {
p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout"));
}
@ -540,7 +541,7 @@ void OutMsgQueueImporter::alarm() {
}
void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof) {
LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << proof->block_id_.to_str();
LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << " " << proof->block_id_.to_str();
auto& small_entry = small_cache_[std::make_pair(dst_shard, proof->block_id_)];
if (!small_entry.result.is_null()) {
return;
@ -556,7 +557,13 @@ void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref
void BuildOutMsgQueueProof::abort_query(td::Status reason) {
if (promise_) {
LOG(DEBUG) << "failed to build msg queue proof to " << dst_shard_.to_str() << ": " << reason;
FLOG(DEBUG) {
sb << "failed to build msg queue proof to " << dst_shard_.to_str() << " from";
for (const auto& block : blocks_) {
sb << " " << block.id.id.to_str();
}
sb << ": " << reason;
};
promise_.set_error(
reason.move_as_error_prefix(PSTRING() << "failed to build msg queue proof to " << dst_shard_.to_str() << ": "));
}

View file

@ -798,7 +798,9 @@ void ValidatorManagerImpl::wait_neighbor_msg_queue_proofs(
if (dst_shard.is_masterchain()) {
// We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries
// {dst_shard, {block_1}}, ..., {dst_shard, {block_n}}
// Also, use cache
// Also, use cache.
// This is performed here and not in OutMsgQueueImporter because it's important to use
// cached_msg_queue_to_masterchain_, which is related to the current list of shard block descriptions
class Worker : public td::actor::Actor {
public:
Worker(size_t pending, td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise)
@ -2958,12 +2960,15 @@ PublicKeyHash ValidatorManagerImpl::get_validator(ShardIdFull shard, td::Ref<Val
}
bool ValidatorManagerImpl::is_shard_collator(ShardIdFull shard) {
if (shard.is_masterchain()) {
return validating_masterchain();
}
for (auto &[_, collator_node] : collator_nodes_) {
if (collator_node.can_collate_shard(shard)) {
return true;
}
}
return false;
return is_validator() && opts_->get_collators_list()->self_collate;
}
bool ValidatorManagerImpl::Collator::can_collate_shard(ShardIdFull shard) const {
@ -3524,7 +3529,7 @@ void ValidatorManagerImpl::del_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh
} else {
td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard);
}
};
}
void ValidatorManagerImpl::get_collation_manager_stats(
td::Promise<tl_object_ptr<ton_api::engine_validator_collationManagerStats>> promise) {
@ -3575,15 +3580,18 @@ void ValidatorManagerImpl::get_collation_manager_stats(
}
void ValidatorManagerImpl::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof) {
if (!collator_nodes_.empty()) {
if (is_shard_collator(dst_shard)) {
if (out_msg_queue_importer_.empty()) {
out_msg_queue_importer_ = td::actor::create_actor<OutMsgQueueImporter>("outmsgqueueimporter", actor_id(this),
opts_, last_masterchain_state_);
}
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::add_out_msg_queue_proof, dst_shard,
std::move(proof));
} else {
VLOG(VALIDATOR_DEBUG) << "Dropping unneeded out msg queue proof to shard " << dst_shard.to_str();
}
}
void ValidatorManagerImpl::add_persistent_state_description(td::Ref<PersistentStateDescription> desc) {
auto now = (UnixTime)td::Clocks::system();
if (desc->end_time <= now) {

View file

@ -32,13 +32,14 @@ namespace ton {
namespace validator {
static bool need_send_candidate_broadcast(const validatorsession::BlockSourceInfo &source_info, bool is_masterchain) {
return source_info.first_block_round == source_info.round && source_info.source_priority == 0 && !is_masterchain;
return source_info.priority.first_block_round == source_info.priority.round && source_info.priority.priority == 0 &&
!is_masterchain;
}
void ValidatorGroup::generate_block_candidate(
validatorsession::BlockSourceInfo source_info,
td::Promise<validatorsession::ValidatorSession::GeneratedCandidate> promise) {
td::uint32 round_id = source_info.round;
td::uint32 round_id = source_info.priority.round;
if (round_id > last_known_round_id_) {
last_known_round_id_ = round_id;
}
@ -66,15 +67,10 @@ void ValidatorGroup::generate_block_candidate(
std::move(R));
};
td::uint64 max_answer_size = config_.max_block_size + config_.max_collated_data_size + 1024;
auto block_candidate_priority = BlockCandidatePriority{
.round = source_info.round,
.first_block_round = source_info.first_block_round,
.priority = source_info.source_priority
};
td::actor::send_closure(collation_manager_, &CollationManager::collate_block, shard_, min_masterchain_block_id_,
prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()},
block_candidate_priority, validator_set_,
max_answer_size, cancellation_token_source_.get_cancellation_token(), std::move(P));
source_info.priority, validator_set_, max_answer_size,
cancellation_token_source_.get_cancellation_token(), std::move(P));
}
void ValidatorGroup::generated_block_candidate(validatorsession::BlockSourceInfo source_info,
@ -103,7 +99,7 @@ void ValidatorGroup::generated_block_candidate(validatorsession::BlockSourceInfo
void ValidatorGroup::validate_block_candidate(validatorsession::BlockSourceInfo source_info, BlockCandidate block,
td::Promise<std::pair<UnixTime, bool>> promise) {
td::uint32 round_id = source_info.round;
td::uint32 round_id = source_info.priority.round;
if (round_id > last_known_round_id_) {
last_known_round_id_ = round_id;
}
@ -174,7 +170,7 @@ void ValidatorGroup::accept_block_candidate(validatorsession::BlockSourceInfo so
validatorsession::ValidatorSessionStats stats,
td::Promise<td::Unit> promise) {
stats.cc_seqno = validator_set_->get_catchain_seqno();
td::uint32 round_id = source_info.round;
td::uint32 round_id = source_info.priority.round;
if (round_id >= last_known_round_id_) {
last_known_round_id_ = round_id + 1;
}