diff --git a/tdutils/td/utils/StringBuilder.h b/tdutils/td/utils/StringBuilder.h index 99e9d517..685416fe 100644 --- a/tdutils/td/utils/StringBuilder.h +++ b/tdutils/td/utils/StringBuilder.h @@ -149,4 +149,19 @@ std::enable_if_t::value, string> to_string(const T &x) { return sb.as_cslice().str(); } +template +struct LambdaPrintHelper { + SB& sb; +}; +template +SB& operator<<(const LambdaPrintHelper& helper, F&& f) { + f(helper.sb); + return helper.sb; +} +struct LambdaPrint {}; + +inline LambdaPrintHelper operator<<(td::StringBuilder& sb, const LambdaPrint&) { + return LambdaPrintHelper{sb}; +} + } // namespace td diff --git a/tdutils/td/utils/Timer.cpp b/tdutils/td/utils/Timer.cpp index 24de099a..c2c67895 100644 --- a/tdutils/td/utils/Timer.cpp +++ b/tdutils/td/utils/Timer.cpp @@ -22,6 +22,8 @@ #include "td/utils/logging.h" #include "td/utils/Time.h" +#include + namespace td { Timer::Timer(bool is_paused) : is_paused_(is_paused) { @@ -60,12 +62,15 @@ StringBuilder &operator<<(StringBuilder &string_builder, const Timer &timer) { return string_builder << format::as_time(timer.elapsed()); } -PerfWarningTimer::PerfWarningTimer(string name, double max_duration, std::function&& callback) +PerfWarningTimer::PerfWarningTimer(string name, double max_duration, std::function &&callback) : name_(std::move(name)), start_at_(Time::now()), max_duration_(max_duration), callback_(std::move(callback)) { } PerfWarningTimer::PerfWarningTimer(PerfWarningTimer &&other) - : name_(std::move(other.name_)), start_at_(other.start_at_), max_duration_(other.max_duration_), callback_(std::move(other.callback_)) { + : name_(std::move(other.name_)) + , start_at_(other.start_at_) + , max_duration_(other.max_duration_) + , callback_(std::move(other.callback_)) { other.start_at_ = 0; } @@ -134,4 +139,34 @@ double ThreadCpuTimer::elapsed() const { return res; } +PerfLogAction PerfLog::start_action(std::string name) { + auto i = entries_.size(); + entries_.push_back({.name = std::move(name), .begin = td::Timestamp::now().at()}); + return PerfLogAction{i, std::unique_ptr(this)}; +} +td::StringBuilder &operator<<(StringBuilder &sb, const PerfLog &log) { + sb << "{"; + std::vector ids(log.entries_.size()); + std::iota(ids.begin(), ids.end(), 0); + std::sort(ids.begin(), ids.end(), [&](auto a, auto b) { + return log.entries_[a].end - log.entries_[a].begin > log.entries_[b].end - log.entries_[b].begin; + }); + sb << "{"; + for (size_t i = 0; i < log.entries_.size(); i++) { + sb << "\n\t"; + auto &entry = log.entries_[ids[i]]; + sb << "{" << entry.name << ":" << entry.begin << "->" << entry.end << "(" << entry.end - entry.begin << ")" + << td::format::cond(entry.status.is_error(), entry.status, "") << "}"; + } + sb << "\n}"; + return sb; +} + +double PerfLog::finish_action(size_t i, td::Status status) { + auto &entry = entries_[i]; + CHECK(entry.end == 0); + entry.end = td::Timestamp::now().at(); + entry.status = std::move(status); + return entry.end - entry.begin; +} } // namespace td diff --git a/tdutils/td/utils/Timer.h b/tdutils/td/utils/Timer.h index a27cac8a..d787f1ca 100644 --- a/tdutils/td/utils/Timer.h +++ b/tdutils/td/utils/Timer.h @@ -19,6 +19,7 @@ #pragma once #include "td/utils/StringBuilder.h" +#include "td/utils/Status.h" #include @@ -46,7 +47,7 @@ class Timer { class PerfWarningTimer { public: - explicit PerfWarningTimer(string name, double max_duration = 0.1, std::function&& callback = {}); + explicit PerfWarningTimer(string name, double max_duration = 0.1, std::function &&callback = {}); PerfWarningTimer(const PerfWarningTimer &) = delete; PerfWarningTimer &operator=(const PerfWarningTimer &) = delete; PerfWarningTimer(PerfWarningTimer &&other); @@ -80,4 +81,44 @@ class ThreadCpuTimer { bool is_paused_{false}; }; +class PerfLog; +struct EmptyDeleter { + template + void operator()(T *) { + } +}; +class PerfLogAction { + public: + template + double finish(const T &result); + size_t i_{0}; + std::unique_ptr perf_log_; +}; + +class PerfLog { + public: + PerfLogAction start_action(std::string name); + friend td::StringBuilder &operator<<(td::StringBuilder &sb, const PerfLog &log); + + private: + struct Entry { + std::string name; + double begin{}; + double end{}; + td::Status status; + }; + std::vector entries_; + friend class PerfLogAction; + + double finish_action(size_t i, td::Status status); +}; +template +double PerfLogAction::finish(const T &result) { + if (result.is_ok()) { + return perf_log_->finish_action(i_, td::Status::OK()); + } else { + return perf_log_->finish_action(i_, result.error().clone()); + } +} + } // namespace td diff --git a/tdutils/td/utils/logging.h b/tdutils/td/utils/logging.h index d00fba15..5c9a0621 100644 --- a/tdutils/td/utils/logging.h +++ b/tdutils/td/utils/logging.h @@ -74,6 +74,7 @@ #define LOG(level) LOG_IMPL(level, level, true, ::td::Slice()) #define LOG_IF(level, condition) LOG_IMPL(level, level, condition, #condition) +#define FLOG(level) LOG_IMPL(level, level, true, ::td::Slice()) << td::LambdaPrint{} << [&](auto &sb) #define VLOG(level) LOG_IMPL(DEBUG, level, true, TD_DEFINE_STR(level)) #define VLOG_IF(level, condition) LOG_IMPL(DEBUG, level, condition, TD_DEFINE_STR(level) " " #condition) @@ -95,13 +96,13 @@ inline bool no_return_func() { #define DUMMY_LOG_CHECK(condition) LOG_IF(NEVER, !(condition)) #ifdef TD_DEBUG - #if TD_MSVC +#if TD_MSVC #define LOG_CHECK(condition) \ __analysis_assume(!!(condition)); \ LOG_IMPL(FATAL, FATAL, !(condition), #condition) - #else +#else #define LOG_CHECK(condition) LOG_IMPL(FATAL, FATAL, !(condition) && no_return_func(), #condition) - #endif +#endif #else #define LOG_CHECK DUMMY_LOG_CHECK #endif @@ -263,6 +264,9 @@ class Logger { sb_ << other; return *this; } + LambdaPrintHelper operator<<(const LambdaPrint &) { + return LambdaPrintHelper{*this}; + } MutableCSlice as_cslice() { return sb_.as_cslice(); diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 5b272f26..720f8d49 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -433,6 +433,10 @@ tonNode.newBlockCandidateBroadcast id:tonNode.blockIdExt catchain_seqno:int vali tonNode.newBlockCandidateBroadcastCompressed id:tonNode.blockIdExt catchain_seqno:int validator_set_hash:int collator_signature:tonNode.blockSignature flags:# compressed:bytes = tonNode.Broadcast; +// optimistic broadcast of response to tonNode.getOutMsgQueueProof with dst_shard, block and limits arguments +tonNode.outMsgQueueProofBroadcast dst_shard:tonNode.shardId block:tonNode.blockIdExt + limits:ImportedMsgQueueLimits proof:tonNode.OutMsgQueueProof = tonNode.Broadcast; + tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId; tonNode.privateBlockOverlayId zero_state_file_hash:int256 nodes:(vector int256) = tonNode.PrivateBlockOverlayId; @@ -924,7 +928,7 @@ validatorSession.endValidatorGroupStats session_id:int256 timestamp:double ---functions--- collatorNode.generateBlock shard:tonNode.shardId cc_seqno:int prev_blocks:(vector tonNode.blockIdExt) - creator:int256 = collatorNode.Candidate; + creator:int256 round:int first_block_round:int priority:int = collatorNode.Candidate; collatorNode.ping flags:# = collatorNode.Pong; ---types--- diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index c984fba0..19a8d131 100644 Binary files a/tl/generate/scheme/ton_api.tlo and b/tl/generate/scheme/ton_api.tlo differ diff --git a/ton/ton-types.h b/ton/ton-types.h index 11741c5e..aeb0595a 100644 --- a/ton/ton-types.h +++ b/ton/ton-types.h @@ -425,14 +425,47 @@ struct Ed25519_PublicKey { }; // represents (the contents of) a block + +struct OutMsgQueueProofBroadcast : public td::CntObject { + OutMsgQueueProofBroadcast(ShardIdFull dst_shard, BlockIdExt block_id, td::int32 max_bytes, td::int32 max_msgs, + td::BufferSlice queue_proofs, td::BufferSlice block_state_proofs, + std::vector msg_counts) + : dst_shard(std::move(dst_shard)) + , block_id(block_id) + , max_bytes(max_bytes) + , max_msgs(max_msgs) + , queue_proofs(std::move(queue_proofs)) + , block_state_proofs(std::move(block_state_proofs)) + , msg_counts(std::move(msg_counts)) { + } + ShardIdFull dst_shard; + BlockIdExt block_id; + + // importedMsgQueueLimits + td::uint32 max_bytes; + td::uint32 max_msgs; + + // outMsgQueueProof + td::BufferSlice queue_proofs; + td::BufferSlice block_state_proofs; + std::vector msg_counts; + + virtual OutMsgQueueProofBroadcast* make_copy() const { + return new OutMsgQueueProofBroadcast(dst_shard, block_id, max_bytes, max_msgs, queue_proofs.clone(), + block_state_proofs.clone(), msg_counts); + } +}; + struct BlockCandidate { BlockCandidate(Ed25519_PublicKey pubkey, BlockIdExt id, FileHash collated_file_hash, td::BufferSlice data, - td::BufferSlice collated_data) + td::BufferSlice collated_data, + std::vector> out_msg_queue_broadcasts = {}) : pubkey(pubkey) , id(id) , collated_file_hash(collated_file_hash) , data(std::move(data)) - , collated_data(std::move(collated_data)) { + , collated_data(std::move(collated_data)) + , out_msg_queue_proof_broadcasts(std::move(out_msg_queue_broadcasts)) { } Ed25519_PublicKey pubkey; BlockIdExt id; @@ -440,11 +473,21 @@ struct BlockCandidate { td::BufferSlice data; td::BufferSlice collated_data; + // used only locally + std::vector> out_msg_queue_proof_broadcasts; + BlockCandidate clone() const { - return BlockCandidate{pubkey, id, collated_file_hash, data.clone(), collated_data.clone()}; + return BlockCandidate{ + pubkey, id, collated_file_hash, data.clone(), collated_data.clone(), out_msg_queue_proof_broadcasts}; } }; +struct BlockCandidatePriority { + td::uint32 round{}; + td::uint32 first_block_round{}; + td::int32 priority{}; +}; + struct ValidatorDescr { /* ton::validator::ValidatorFullId */ Ed25519_PublicKey key; ValidatorWeight weight; diff --git a/validator/collation-manager.cpp b/validator/collation-manager.cpp index 2ca3a5f1..e8447059 100644 --- a/validator/collation-manager.cpp +++ b/validator/collation-manager.cpp @@ -33,6 +33,7 @@ void CollationManager::start_up() { void CollationManager::collate_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id, std::vector prev, Ed25519_PublicKey creator, + BlockCandidatePriority priority, td::Ref validator_set, td::uint64 max_answer_size, td::CancellationToken cancellation_token, td::Promise promise) { if (shard.is_masterchain()) { @@ -41,12 +42,13 @@ void CollationManager::collate_block(ShardIdFull shard, BlockIdExt min_mastercha std::move(cancellation_token), 0); return; } - collate_shard_block(shard, min_masterchain_block_id, std::move(prev), creator, std::move(validator_set), + collate_shard_block(shard, min_masterchain_block_id, std::move(prev), creator, priority, std::move(validator_set), max_answer_size, std::move(cancellation_token), std::move(promise), td::Timestamp::in(10.0)); } void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id, std::vector prev, Ed25519_PublicKey creator, + BlockCandidatePriority priority, td::Ref validator_set, td::uint64 max_answer_size, td::CancellationToken cancellation_token, td::Promise promise, td::Timestamp timeout) { @@ -133,8 +135,8 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas delay_action( [=, promise = std::move(promise)]() mutable { td::actor::send_closure(SelfId, &CollationManager::collate_shard_block, shard, min_masterchain_block_id, prev, - creator, validator_set, max_answer_size, cancellation_token, std::move(promise), - timeout); + creator, priority, validator_set, max_answer_size, cancellation_token, + std::move(promise), timeout); }, retry_at); }; @@ -145,7 +147,8 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas } td::BufferSlice query = create_serialize_tl_object( - create_tl_shard_id(shard), validator_set->get_catchain_seqno(), std::move(prev_blocks), creator.as_bits256()); + create_tl_shard_id(shard), validator_set->get_catchain_seqno(), std::move(prev_blocks), creator.as_bits256(), + priority.round, priority.first_block_round, priority.priority); LOG(INFO) << "sending collate query for " << next_block_id.to_str() << ": send to #" << selected_idx << "(" << selected_collator << ")"; diff --git a/validator/collation-manager.hpp b/validator/collation-manager.hpp index 7ceea1e6..9ca69814 100644 --- a/validator/collation-manager.hpp +++ b/validator/collation-manager.hpp @@ -35,7 +35,8 @@ class CollationManager : public td::actor::Actor { void alarm() override; void collate_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id, std::vector prev, - Ed25519_PublicKey creator, td::Ref validator_set, td::uint64 max_answer_size, + Ed25519_PublicKey creator, BlockCandidatePriority priority, + td::Ref validator_set, td::uint64 max_answer_size, td::CancellationToken cancellation_token, td::Promise promise); void update_options(td::Ref opts); @@ -52,7 +53,8 @@ class CollationManager : public td::actor::Actor { td::actor::ActorId rldp_; void collate_shard_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id, std::vector prev, - Ed25519_PublicKey creator, td::Ref validator_set, td::uint64 max_answer_size, + Ed25519_PublicKey creator, BlockCandidatePriority priority, + td::Ref validator_set, td::uint64 max_answer_size, td::CancellationToken cancellation_token, td::Promise promise, td::Timestamp timeout); diff --git a/validator/collator-node.cpp b/validator/collator-node.cpp index cf2770d9..1b8eb79e 100644 --- a/validator/collator-node.cpp +++ b/validator/collator-node.cpp @@ -191,17 +191,41 @@ void CollatorNode::update_validator_group_info(ShardIdFull shard, std::vectorblock_seqno < info.next_block_seqno) { cache_entry->cancel(td::Status::Error(PSTRING() << "next block seqno " << cache_entry->block_seqno << " is too small, expected " << info.next_block_seqno)); + if (!cache_entry->has_external_query_at && cache_entry->has_internal_query_at) { + LOG(INFO) << "generate block query" + << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno + << ", next_block_seqno=" << cache_entry->block_seqno + << ": nobody asked for block, but we tried to generate it"; + } + if (cache_entry->has_external_query_at && !cache_entry->has_internal_query_at) { + LOG(INFO) << "generate block query" + << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno + << ", next_block_seqno=" << cache_entry->block_seqno + << ": somebody asked for block we didn't even tried to generate"; + } cache_it = info.cache.erase(cache_it); continue; } if (cache_entry->block_seqno == info.next_block_seqno && cached_prev != info.prev) { cache_entry->cancel(td::Status::Error("invalid prev blocks")); + if (!cache_entry->has_external_query_at && cache_entry->has_internal_query_at) { + LOG(INFO) << "generate block query" + << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno + << ", next_block_seqno=" << cache_entry->block_seqno + << ": nobody asked for block, but we tried to generate it"; + } + if (cache_entry->has_external_query_at && !cache_entry->has_internal_query_at) { + LOG(INFO) << "generate block query" + << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno + << ", next_block_seqno=" << cache_entry->block_seqno + << ": somebody asked for block we didn't even tried to generate"; + } cache_it = info.cache.erase(cache_it); continue; } ++cache_it; } - generate_block(shard, cc_seqno, info.prev, td::Timestamp::in(10.0), [](td::Result) {}); + generate_block(shard, cc_seqno, info.prev, {}, td::Timestamp::in(10.0), [](td::Result) {}); } return; } @@ -285,6 +309,14 @@ static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey cre cc_seqno = info.gen_catchain_seqno; val_set_hash = info.gen_validator_list_hash_short; + + for (auto& broadcast_ref : block.out_msg_queue_proof_broadcasts) { + auto block_state_proof = create_block_state_proof(root).move_as_ok(); + + auto &broadcast = broadcast_ref.write(); + broadcast.block_id = block.id; + broadcast.block_state_proofs = vm::std_boc_serialize(std::move(block_state_proof), 31).move_as_ok(); + } return block; } @@ -322,6 +354,11 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data for (const auto& b : f->prev_blocks_) { prev_blocks.push_back(create_block_id(b)); } + auto priority = BlockCandidatePriority { + .round = static_cast(f->round_), + .first_block_round = static_cast(f->first_block_round_), + .priority = f->priority_ + }; Ed25519_PublicKey creator(f->creator_); td::Promise new_promise = [promise = std::move(promise), src, shard](td::Result R) mutable { @@ -353,11 +390,13 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data return; } LOG(INFO) << "got adnl query from " << src << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno; - generate_block(shard, cc_seqno, std::move(prev_blocks), td::Timestamp::in(10.0), std::move(new_promise)); + generate_block(shard, cc_seqno, std::move(prev_blocks), priority, td::Timestamp::in(10.0), std::move(new_promise)); } void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std::vector prev_blocks, - td::Timestamp timeout, td::Promise promise) { + std::optional o_priority, td::Timestamp timeout, + td::Promise promise) { + bool is_external = !o_priority; if (last_masterchain_state_.is_null()) { promise.set_error(td::Status::Error(ErrorCode::notready, "not ready")); return; @@ -379,8 +418,8 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std promise.set_error(td::Status::Error(ErrorCode::timeout)); return; } - td::actor::send_closure(SelfId, &CollatorNode::generate_block, shard, cc_seqno, std::move(prev_blocks), timeout, - std::move(promise)); + td::actor::send_closure(SelfId, &CollatorNode::generate_block, shard, cc_seqno, std::move(prev_blocks), + std::move(o_priority), timeout, std::move(promise)); }); return; } @@ -399,36 +438,81 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std return; } + static auto prefix_inner = [] (auto &sb, auto &shard, auto cc_seqno, auto block_seqno, + const std::optional &o_priority) { + sb << "generate block query" + << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno; + if (o_priority) { + sb << " external{"; + sb << "round_offset=" << o_priority->round - o_priority->first_block_round << ",priority=" << o_priority->priority; + sb << ",first_block_round=" << o_priority->first_block_round; + sb << "}"; + } else { + sb << " internal" ; + } + }; + auto prefix = [&] (auto &sb) { + prefix_inner(sb, shard, cc_seqno, block_seqno, o_priority); + }; + auto cache_entry = validator_group_info.cache[prev_blocks]; if (cache_entry == nullptr) { cache_entry = validator_group_info.cache[prev_blocks] = std::make_shared(); } + if (is_external && !cache_entry->has_external_query_at) { + cache_entry->has_external_query_at = td::Timestamp::now(); + if (cache_entry->has_internal_query_at && cache_entry->has_external_query_at) { + FLOG(INFO) { + prefix(sb); + sb << ": got external query " << cache_entry->has_external_query_at.at() - cache_entry->has_internal_query_at.at() + << "s after internal query [WON]"; + }; + } + } + if (!is_external && !cache_entry->has_internal_query_at) { + cache_entry->has_internal_query_at = td::Timestamp::now(); + if (cache_entry->has_internal_query_at && cache_entry->has_external_query_at) { + FLOG(INFO) { + prefix(sb); + sb << ": got internal query " << cache_entry->has_internal_query_at.at() - cache_entry->has_external_query_at.at() + << "s after external query [LOST]"; + }; + } + } if (cache_entry->result) { - LOG(INFO) << "generate block query" - << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno - << ": using cached result"; + auto has_result_ago = td::Timestamp::now().at() - cache_entry->has_result_at.at(); + FLOG(INFO) { + prefix(sb); + sb << ": using cached result " << " generated " << has_result_ago << "s ago"; + sb << (is_external ? " for external query [WON]" : " for internal query "); + }; + promise.set_result(cache_entry->result.value().clone()); return; } cache_entry->promises.push_back(std::move(promise)); + if (cache_entry->started) { - LOG(INFO) << "generate block query" - << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno - << ": collation in progress, waiting"; + FLOG(INFO) { + prefix(sb); + sb << ": collation in progress, waiting"; + }; return; } - LOG(INFO) << "generate block query" - << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno - << ": starting collation"; + FLOG(INFO) { + prefix(sb); + sb << ": starting collation"; + }; cache_entry->started = true; cache_entry->block_seqno = block_seqno; run_collate_query( shard, last_masterchain_state_->get_block_id(), std::move(prev_blocks), Ed25519_PublicKey{td::Bits256::zero()}, last_masterchain_state_->get_validator_set(shard), opts_->get_collator_options(), manager_, timeout, [=, SelfId = actor_id(this), timer = td::Timer{}](td::Result R) { - LOG(INFO) << "generate block result" - << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno - << ", time=" << timer.elapsed() << ": " << (R.is_ok() ? "OK" : R.error().to_string()); + FLOG(INFO) { + prefix_inner(sb, shard, cc_seqno, block_seqno,o_priority); + sb << timer.elapsed() << ": " << (R.is_ok() ? "OK" : R.error().to_string()); + }; td::actor::send_closure(SelfId, &CollatorNode::process_result, cache_entry, std::move(R)); }, cache_entry->cancellation_token_source.get_cancellation_token(), @@ -443,6 +527,7 @@ void CollatorNode::process_result(std::shared_ptr cache_entry, td::R } } else { cache_entry->result = R.move_as_ok(); + cache_entry->has_result_at = td::Timestamp::now(); for (auto& p : cache_entry->promises) { p.set_result(cache_entry->result.value().clone()); } diff --git a/validator/collator-node.hpp b/validator/collator-node.hpp index a361ceb3..54876c35 100644 --- a/validator/collator-node.hpp +++ b/validator/collator-node.hpp @@ -19,6 +19,7 @@ #include "interfaces/validator-manager.h" #include "rldp/rldp.h" #include +#include namespace ton::validator { @@ -57,6 +58,9 @@ class CollatorNode : public td::actor::Actor { struct CacheEntry { bool started = false; + td::Timestamp has_internal_query_at; + td::Timestamp has_external_query_at; + td::Timestamp has_result_at; BlockSeqno block_seqno = 0; td::optional result; td::CancellationTokenSource cancellation_token_source; @@ -84,7 +88,8 @@ class CollatorNode : public td::actor::Actor { td::Result get_future_validator_group(ShardIdFull shard, CatchainSeqno cc_seqno); void generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std::vector prev_blocks, - td::Timestamp timeout, td::Promise promise); + std::optional o_priority, td::Timestamp timeout, + td::Promise promise); void process_result(std::shared_ptr cache_entry, td::Result R); public: diff --git a/validator/full-node-fast-sync-overlays.cpp b/validator/full-node-fast-sync-overlays.cpp index 270c53f0..4e0d11e4 100644 --- a/validator/full-node-fast-sync-overlays.cpp +++ b/validator/full-node-fast-sync-overlays.cpp @@ -46,6 +46,33 @@ void FullNodeFastSyncOverlay::process_block_broadcast(PublicKeyHash src, ton_api td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok()); } +void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast &query) { + BlockIdExt block_id = create_block_id(query.block_); + ShardIdFull shard_id = create_shard_id(query.dst_shard_); + if (query.proof_->get_id() != ton_api::tonNode_outMsgQueueProof::ID) { + LOG(ERROR) << "got tonNode.outMsgQueueProofBroadcast with proof not tonNode.outMsgQueueProof"; + return; + } + auto tl_proof = move_tl_object_as(query.proof_); + auto R = OutMsgQueueProof::fetch(shard_id, {block_id}, + block::ImportedMsgQueueLimits{.max_bytes = td::uint32(query.limits_->max_bytes_), + .max_msgs = td::uint32(query.limits_->max_msgs_)}, + *tl_proof); + if (R.is_error()) { + LOG(ERROR) << "got tonNode.outMsgQueueProofBroadcast with invalid proof: " << R.error(); + return; + } + if (R.ok().size() != 1) { + LOG(ERROR) << "got tonNode.outMsgQueueProofBroadcast with invalid proofs count=" << R.ok().size(); + return; + } + auto proof = std::move(R.move_as_ok()[0]); + + LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast " << shard_id.to_str() << " " << block_id.to_str(); + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::add_out_msg_queue_proof, shard_id, + std::move(proof)); +} + void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) { BlockIdExt block_id = create_block_id(query.block_->block_); VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in fast sync overlay from " << src << ": " @@ -200,6 +227,22 @@ void FullNodeFastSyncOverlay::collect_validator_telemetry(std::string filename) } } +void FullNodeFastSyncOverlay::send_out_msg_queue_proof_broadcast(td::Ref broadcast) { + if (!inited_) { + return; + } + auto B = create_serialize_tl_object( + create_tl_shard_id(broadcast->dst_shard), create_tl_block_id(broadcast->block_id), + create_tl_object(broadcast->max_bytes, broadcast->max_msgs), + create_tl_object(broadcast->queue_proofs.clone(), + broadcast->block_state_proofs.clone(), + std::vector(broadcast->msg_counts))); + VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay: " << broadcast->dst_shard.to_str() << " " + << broadcast->block_id.to_str(); + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B)); +} + void FullNodeFastSyncOverlay::start_up() { auto X = create_hash_tl_object(zero_state_file_hash_, create_tl_shard_id(shard_)); td::BufferSlice b{32}; diff --git a/validator/full-node-fast-sync-overlays.hpp b/validator/full-node-fast-sync-overlays.hpp index 05d83071..e0db87b7 100644 --- a/validator/full-node-fast-sync-overlays.hpp +++ b/validator/full-node-fast-sync-overlays.hpp @@ -25,6 +25,7 @@ class FullNodeFastSyncOverlay : public td::actor::Actor { public: void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast& query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed& query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast& query); void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast& query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast& query); @@ -46,6 +47,7 @@ class FullNodeFastSyncOverlay : public td::actor::Actor { void send_broadcast(BlockBroadcast broadcast); void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data); + void send_out_msg_queue_proof_broadcast(td::Ref broadcast); void send_validator_telemetry(tl_object_ptr telemetry); void collect_validator_telemetry(std::string filename); diff --git a/validator/full-node-shard.hpp b/validator/full-node-shard.hpp index 8ac5185d..fd1ef943 100644 --- a/validator/full-node-shard.hpp +++ b/validator/full-node-shard.hpp @@ -151,6 +151,9 @@ class FullNodeShardImpl : public FullNodeShard { void process_broadcast(PublicKeyHash src, ton_api::tonNode_ihrMessageBroadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast &query) { + LOG(ERROR) << "Ignore outMsgQueueProofBroadcast"; + } void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query); diff --git a/validator/full-node.cpp b/validator/full-node.cpp index 0278c9ae..e7527d76 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -371,6 +371,14 @@ void FullNodeImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_se } } +void FullNodeImpl::send_out_msg_queue_proof_broadcast(td::Ref broadcast) { + auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(broadcast->dst_shard).first; + if (!fast_sync_overlay.empty()) { + td::actor::send_closure(fast_sync_overlay, &FullNodeFastSyncOverlay::send_out_msg_queue_proof_broadcast, + std::move(broadcast)); + } +} + void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, int mode) { if (mode & broadcast_mode_custom) { send_block_broadcast_to_custom_overlays(broadcast); @@ -713,6 +721,9 @@ void FullNodeImpl::start_up() { td::actor::send_closure(id_, &FullNodeImpl::send_block_candidate, block_id, cc_seqno, validator_set_hash, std::move(data)); } + void send_out_msg_queue_proof_broadcast(td::Ref broadcast) override { + td::actor::send_closure(id_, &FullNodeImpl::send_out_msg_queue_proof_broadcast, std::move(broadcast)); + } void send_broadcast(BlockBroadcast broadcast, int mode) override { td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), mode); } diff --git a/validator/full-node.hpp b/validator/full-node.hpp index 9e254d7d..5533b1f4 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -70,6 +70,7 @@ class FullNodeImpl : public FullNode { void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data); void send_broadcast(BlockBroadcast broadcast, int mode); + void send_out_msg_queue_proof_broadcast(td::Ref broadcats); void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise); void download_zero_state(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise); diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index 72154f86..0090e95d 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -237,6 +237,7 @@ class Collator final : public td::actor::Actor { bool store_out_msg_queue_size_ = false; td::PerfWarningTimer perf_timer_; + td::PerfLog perf_log_; // block::Account* lookup_account(td::ConstBitPtr addr) const; std::unique_ptr make_account_from(td::ConstBitPtr addr, Ref account, @@ -252,18 +253,18 @@ class Collator final : public td::actor::Actor { bool fatal_error(int err_code, std::string err_msg); bool fatal_error(std::string err_msg, int err_code = -666); void check_pending(); - void after_get_mc_state(td::Result, BlockIdExt>> res); - void after_get_shard_state(int idx, td::Result> res); - void after_get_block_data(int idx, td::Result> res); - void after_get_shard_blocks(td::Result>> res); + void after_get_mc_state(td::Result, BlockIdExt>> res, td::PerfLogAction token); + void after_get_shard_state(int idx, td::Result> res, td::PerfLogAction token); + void after_get_block_data(int idx, td::Result> res, td::PerfLogAction token); + void after_get_shard_blocks(td::Result>> res, td::PerfLogAction token); bool preprocess_prev_mc_state(); bool register_mc_state(Ref other_mc_state); bool request_aux_mc_state(BlockSeqno seqno, Ref& state); Ref get_aux_mc_state(BlockSeqno seqno) const; - void after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result> res); + void after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result> res, td::PerfLogAction token); bool fix_one_processed_upto(block::MsgProcessedUpto& proc, const ton::ShardIdFull& owner); bool fix_processed_upto(block::MsgProcessedUptoCollection& upto); - void got_neighbor_msg_queues(td::Result>> R); + void got_neighbor_msg_queues(td::Result>> R, td::PerfLogAction token); void got_neighbor_msg_queue(unsigned i, Ref res); void got_out_queue_size(size_t i, td::Result res); bool adjust_shard_config(); @@ -309,7 +310,7 @@ class Collator final : public td::actor::Actor { bool is_our_address(Ref addr_ref) const; bool is_our_address(ton::AccountIdPrefixFull addr_prefix) const; bool is_our_address(const ton::StdSmcAddress& addr) const; - void after_get_external_messages(td::Result, int>>> res); + void after_get_external_messages(td::Result, int>>> res, td::PerfLogAction token); td::Result register_external_message_cell(Ref ext_msg, const ExtMessage::Hash& ext_hash, int priority); // td::Result register_external_message(td::Slice ext_msg_boc); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 0054ea98..cd97e3fc 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -217,25 +217,30 @@ void Collator::start_up() { // 2. learn latest masterchain state and block id LOG(DEBUG) << "sending get_top_masterchain_state_block() to Manager"; ++pending; + auto token = perf_log_.start_action("get_top_masterchain_state_block"); if (!is_hardfork_) { td::actor::send_closure_later(manager, &ValidatorManager::get_top_masterchain_state_block, - [self = get_self()](td::Result, BlockIdExt>> res) { + [self = get_self(), token = std::move(token)]( + td::Result, BlockIdExt>> res) mutable { LOG(DEBUG) << "got answer to get_top_masterchain_state_block"; td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state, - std::move(res)); + std::move(res), std::move(token)); }); } else { - td::actor::send_closure_later( - manager, &ValidatorManager::get_shard_state_from_db_short, min_mc_block_id, - [self = get_self(), block_id = min_mc_block_id](td::Result> res) { - LOG(DEBUG) << "got answer to get_top_masterchain_state_block"; - if (res.is_error()) { - td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state, res.move_as_error()); - } else { - td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state, - std::make_pair(Ref(res.move_as_ok()), block_id)); - } - }); + td::actor::send_closure_later(manager, &ValidatorManager::get_shard_state_from_db_short, min_mc_block_id, + [self = get_self(), block_id = min_mc_block_id, + token = std::move(token)](td::Result> res) mutable { + LOG(DEBUG) << "got answer to get_top_masterchain_state_block"; + if (res.is_error()) { + td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state, + res.move_as_error(), std::move(token)); + } else { + td::actor::send_closure_later( + std::move(self), &Collator::after_get_mc_state, + std::make_pair(Ref(res.move_as_ok()), block_id), + std::move(token)); + } + }); } } // 3. load previous block(s) and corresponding state(s) @@ -245,23 +250,27 @@ void Collator::start_up() { // 3.1. load state LOG(DEBUG) << "sending wait_block_state() query #" << i << " for " << prev_blocks[i].to_str() << " to Manager"; ++pending; - td::actor::send_closure_later(manager, &ValidatorManager::wait_block_state_short, prev_blocks[i], priority(), - timeout, [self = get_self(), i](td::Result> res) { - LOG(DEBUG) << "got answer to wait_block_state query #" << i; - td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_state, i, - std::move(res)); - }); + auto token = perf_log_.start_action(PSTRING() << "wait_block_state #" << i); + td::actor::send_closure_later( + manager, &ValidatorManager::wait_block_state_short, prev_blocks[i], priority(), timeout, + [self = get_self(), i, token = std::move(token)](td::Result> res) mutable { + LOG(DEBUG) << "got answer to wait_block_state query #" << i; + td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_state, i, std::move(res), + std::move(token)); + }); if (prev_blocks[i].seqno()) { // 3.2. load block // NB: we need the block itself only for extracting start_lt and end_lt to create correct prev_blk:ExtBlkRef and related Merkle proofs LOG(DEBUG) << "sending wait_block_data() query #" << i << " for " << prev_blocks[i].to_str() << " to Manager"; ++pending; - td::actor::send_closure_later(manager, &ValidatorManager::wait_block_data_short, prev_blocks[i], priority(), - timeout, [self = get_self(), i](td::Result> res) { - LOG(DEBUG) << "got answer to wait_block_data query #" << i; - td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, i, - std::move(res)); - }); + auto token = perf_log_.start_action(PSTRING() << "wait_block_data #" << i); + td::actor::send_closure_later( + manager, &ValidatorManager::wait_block_data_short, prev_blocks[i], priority(), timeout, + [self = get_self(), i, token = std::move(token)](td::Result> res) mutable { + LOG(DEBUG) << "got answer to wait_block_data query #" << i; + td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, i, std::move(res), + std::move(token)); + }); } } if (is_hardfork_) { @@ -271,22 +280,28 @@ void Collator::start_up() { if (!is_hardfork_) { LOG(DEBUG) << "sending get_external_messages() query to Manager"; ++pending; - td::actor::send_closure_later(manager, &ValidatorManager::get_external_messages, shard_, - [self = get_self()](td::Result, int>>> res) -> void { + auto token = perf_log_.start_action("get_external_messages"); + td::actor::send_closure_later( + manager, &ValidatorManager::get_external_messages, shard_, + [self = get_self(), + token = std::move(token)](td::Result, int>>> res) mutable -> void { LOG(DEBUG) << "got answer to get_external_messages() query"; - td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res)); + td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res), + std::move(token)); }); } if (is_masterchain() && !is_hardfork_) { // 5. load shard block info messages LOG(DEBUG) << "sending get_shard_blocks_for_collator() query to Manager"; ++pending; - td::actor::send_closure_later( - manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0], - [self = get_self()](td::Result>> res) -> void { - LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query"; - td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks, std::move(res)); - }); + auto token = perf_log_.start_action("get_shard_blocks_for_collator"); + td::actor::send_closure_later(manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0], + [self = get_self(), token = std::move(token)]( + td::Result>> res) mutable -> void { + LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query"; + td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks, + std::move(res), std::move(token)); + }); } // 6. set timeout alarm_timestamp() = timeout; @@ -362,6 +377,8 @@ bool Collator::fatal_error(td::Status error) { td::Timestamp::in(10.0), std::move(main_promise), std::move(cancellation_token_), mode_, attempt_idx_ + 1); } else { + LOG(INFO) << "collation failed in " << perf_timer_.elapsed() << " s " << error; + LOG(INFO) << perf_log_; main_promise(std::move(error)); } busy_ = false; @@ -482,12 +499,14 @@ bool Collator::request_aux_mc_state(BlockSeqno seqno, Ref& st CHECK(blkid.is_valid_ext() && blkid.is_masterchain()); LOG(DEBUG) << "sending auxiliary wait_block_state() query for " << blkid.to_str() << " to Manager"; ++pending; - td::actor::send_closure_later(manager, &ValidatorManager::wait_block_state_short, blkid, priority(), timeout, - [self = get_self(), blkid](td::Result> res) { - LOG(DEBUG) << "got answer to wait_block_state query for " << blkid.to_str(); - td::actor::send_closure_later(std::move(self), &Collator::after_get_aux_shard_state, - blkid, std::move(res)); - }); + auto token = perf_log_.start_action(PSTRING() << "auxiliary wait_block_state " << blkid.to_str()); + td::actor::send_closure_later( + manager, &ValidatorManager::wait_block_state_short, blkid, priority(), timeout, + [self = get_self(), blkid, token = std::move(token)](td::Result> res) mutable { + LOG(DEBUG) << "got answer to wait_block_state query for " << blkid.to_str(); + td::actor::send_closure_later(std::move(self), &Collator::after_get_aux_shard_state, blkid, std::move(res), + std::move(token)); + }); state.clear(); return true; } @@ -515,9 +534,11 @@ Ref Collator::get_aux_mc_state(BlockSeqno seqno) const { * @param blkid The BlockIdExt of the shard state. * @param res The result of retrieving the shard state. */ -void Collator::after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result> res) { +void Collator::after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result> res, + td::PerfLogAction token) { LOG(DEBUG) << "in Collator::after_get_aux_shard_state(" << blkid.to_str() << ")"; --pending; + token.finish(res); if (res.is_error()) { fatal_error("cannot load auxiliary masterchain state for "s + blkid.to_str() + " : " + res.move_as_error().to_string()); @@ -579,9 +600,11 @@ bool Collator::preprocess_prev_mc_state() { * * @param res The retrieved masterchain state. */ -void Collator::after_get_mc_state(td::Result, BlockIdExt>> res) { +void Collator::after_get_mc_state(td::Result, BlockIdExt>> res, + td::PerfLogAction token) { LOG(WARNING) << "in Collator::after_get_mc_state()"; --pending; + token.finish(res); if (res.is_error()) { fatal_error(res.move_as_error()); return; @@ -598,12 +621,14 @@ void Collator::after_get_mc_state(td::Result, Bl // NB. it is needed only for creating a correct ExtBlkRef reference to it, which requires start_lt and end_lt LOG(DEBUG) << "sending wait_block_data() query #-1 for " << mc_block_id_.to_str() << " to Manager"; ++pending; - td::actor::send_closure_later(manager, &ValidatorManager::wait_block_data_short, mc_block_id_, priority(), timeout, - [self = get_self()](td::Result> res) { - LOG(DEBUG) << "got answer to wait_block_data query #-1"; - td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, -1, - std::move(res)); - }); + auto token = perf_log_.start_action("wait_block_data #-1"); + td::actor::send_closure_later( + manager, &ValidatorManager::wait_block_data_short, mc_block_id_, priority(), timeout, + [self = get_self(), token = std::move(token)](td::Result> res) mutable { + LOG(DEBUG) << "got answer to wait_block_data query #-1"; + td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, -1, std::move(res), + std::move(token)); + }); } check_pending(); } @@ -614,9 +639,10 @@ void Collator::after_get_mc_state(td::Result, Bl * @param idx The index of the previous shard block (0 or 1). * @param res The retrieved shard state. */ -void Collator::after_get_shard_state(int idx, td::Result> res) { +void Collator::after_get_shard_state(int idx, td::Result> res, td::PerfLogAction token) { LOG(WARNING) << "in Collator::after_get_shard_state(" << idx << ")"; --pending; + token.finish(res); if (res.is_error()) { fatal_error(res.move_as_error()); return; @@ -647,9 +673,10 @@ void Collator::after_get_shard_state(int idx, td::Result> res) { * @param idx The index of the previous block (0 or 1). * @param res The retreved block data. */ -void Collator::after_get_block_data(int idx, td::Result> res) { +void Collator::after_get_block_data(int idx, td::Result> res, td::PerfLogAction token) { LOG(DEBUG) << "in Collator::after_get_block_data(" << idx << ")"; --pending; + token.finish(res); if (res.is_error()) { fatal_error(res.move_as_error()); return; @@ -691,8 +718,10 @@ void Collator::after_get_block_data(int idx, td::Result> res) { * * @param res The retrieved shard block descriptions. */ -void Collator::after_get_shard_blocks(td::Result>> res) { +void Collator::after_get_shard_blocks(td::Result>> res, + td::PerfLogAction token) { --pending; + token.finish(res); if (res.is_error()) { fatal_error(res.move_as_error()); return; @@ -848,10 +877,13 @@ bool Collator::request_neighbor_msg_queues() { ++i; } ++pending; + auto token = perf_log_.start_action("neighbor_msg_queues"); td::actor::send_closure_later( manager, &ValidatorManager::wait_neighbor_msg_queue_proofs, shard_, std::move(top_blocks), timeout, - [self = get_self()](td::Result>> res) { - td::actor::send_closure_later(std::move(self), &Collator::got_neighbor_msg_queues, std::move(res)); + [self = get_self(), + token = std::move(token)](td::Result>> res) mutable { + td::actor::send_closure_later(std::move(self), &Collator::got_neighbor_msg_queues, std::move(res), + std::move(token)); }); return true; } @@ -883,13 +915,15 @@ bool Collator::request_out_msg_queue_size() { * @param i The index of the neighbor. * @param res The obtained outbound queue. */ -void Collator::got_neighbor_msg_queues(td::Result>> R) { +void Collator::got_neighbor_msg_queues(td::Result>> R, + td::PerfLogAction token) { --pending; + double duration = token.finish(R); if (R.is_error()) { fatal_error(R.move_as_error_prefix("failed to get neighbor msg queues: ")); return; } - LOG(INFO) << "neighbor output queues fetched"; + LOG(INFO) << "neighbor output queues fetched, took " << duration << "s"; auto res = R.move_as_ok(); unsigned i = 0; for (block::McShardDescr& descr : neighbors_) { @@ -2091,12 +2125,9 @@ bool Collator::init_lt() { * @returns True if the configuration parameters were successfully fetched and initialized, false otherwise. */ bool Collator::fetch_config_params() { - auto res = block::FetchConfigParams::fetch_config_params(*config_, - &old_mparams_, &storage_prices_, &storage_phase_cfg_, - &rand_seed_, &compute_phase_cfg_, &action_phase_cfg_, - &masterchain_create_fee_, &basechain_create_fee_, - workchain(), now_ - ); + auto res = block::FetchConfigParams::fetch_config_params( + *config_, &old_mparams_, &storage_prices_, &storage_phase_cfg_, &rand_seed_, &compute_phase_cfg_, + &action_phase_cfg_, &masterchain_create_fee_, &basechain_create_fee_, workchain(), now_); if (res.is_error()) { return fatal_error(res.move_as_error()); } @@ -2217,6 +2248,11 @@ bool Collator::init_value_create() { bool Collator::do_collate() { // After do_collate started it will not be interrupted by timeout alarm_timestamp() = td::Timestamp::never(); + auto token = perf_log_.start_action("do_collate"); + td::Status status = td::Status::Error("some error"); + SCOPE_EXIT { + token.finish(status); + }; LOG(WARNING) << "do_collate() : start"; if (!fetch_config_params()) { @@ -2342,6 +2378,7 @@ bool Collator::do_collate() { if (!create_block_candidate()) { return fatal_error("cannot serialize a new Block candidate"); } + status = td::Status::OK(); return true; } @@ -5811,12 +5848,43 @@ bool Collator::create_block_candidate() { << block_limit_status_->transactions; LOG(INFO) << "serialized collated data size " << cdata_slice.size() << " bytes (preliminary estimate was " << block_limit_status_->collated_data_stat.estimate_proof_size() << ")"; + auto new_block_id_ext = ton::BlockIdExt{ton::BlockId{shard_, new_block_seqno}, new_block->get_hash().bits(), + block::compute_file_hash(blk_slice.as_slice())}; // 3. create a BlockCandidate - block_candidate = std::make_unique( - created_by_, - ton::BlockIdExt{ton::BlockId{shard_, new_block_seqno}, new_block->get_hash().bits(), - block::compute_file_hash(blk_slice.as_slice())}, - block::compute_file_hash(cdata_slice.as_slice()), blk_slice.clone(), cdata_slice.clone()); + block_candidate = + std::make_unique(created_by_, new_block_id_ext, block::compute_file_hash(cdata_slice.as_slice()), + blk_slice.clone(), cdata_slice.clone()); + const bool need_out_msg_queue_broadcasts = true; + if (need_out_msg_queue_broadcasts) { + // we can't generate two proofs at the same time for the same root (it is not currently supported by cells) + // so we have can't reuse new state and have to regenerate it with merkle update + auto new_state = vm::MerkleUpdate::apply(prev_state_root_pure_, state_update); + CHECK(new_state.not_null()); + CHECK(new_state->get_hash() == state_root->get_hash()); + assert(config_ && shard_conf_); + auto neighbor_list = shard_conf_->get_neighbor_shard_hash_ids(shard_); + LOG(INFO) << "Build OutMsgQueueProofs for " << neighbor_list.size() << " neighbours"; + for (ton::BlockId blk_id : neighbor_list) { + auto prefix = blk_id.shard_full(); + auto limits = mc_state_->get_imported_msg_queue_limits(blk_id.workchain); + + // one could use monitor_min_split_depth here, to decrease number of broadcasts + // but current implementation OutMsgQueueImporter doesn't support it + + auto r_proof = OutMsgQueueProof::build( + prefix, {OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}}, limits); + if (r_proof.is_ok()) { + auto proof = r_proof.move_as_ok(); + block_candidate->out_msg_queue_proof_broadcasts.push_back(td::Ref( + true, OutMsgQueueProofBroadcast(prefix, new_block_id_ext, limits.max_bytes, limits.max_msgs, + std::move(proof->queue_proofs_), std::move(proof->block_state_proofs_), + std::move(proof->msg_counts_)))); + } else { + LOG(ERROR) << "Failed to build OutMsgQueueProof: " << r_proof.error(); + } + } + } + // 3.1 check block and collated data size auto consensus_config = config_->get_consensus_config(); if (block_candidate->data.size() > consensus_config.max_block_size) { @@ -5896,6 +5964,7 @@ void Collator::return_block_candidate(td::Result saved) { CHECK(block_candidate); LOG(WARNING) << "sending new BlockCandidate to Promise"; LOG(WARNING) << "collation took " << perf_timer_.elapsed() << " s"; + LOG(WARNING) << perf_log_; main_promise(block_candidate->clone()); busy_ = false; stop(); @@ -5974,9 +6043,11 @@ td::Result Collator::register_external_message_cell(Ref ext_msg, * * @param res The result of the external message retrieval operation. */ -void Collator::after_get_external_messages(td::Result, int>>> res) { +void Collator::after_get_external_messages(td::Result, int>>> res, + td::PerfLogAction token) { // res: pair {ext msg, priority} --pending; + token.finish(res); if (res.is_error()) { fatal_error(res.move_as_error()); return; diff --git a/validator/impl/out-msg-queue-proof.cpp b/validator/impl/out-msg-queue-proof.cpp index 444d5846..4e8802c7 100644 --- a/validator/impl/out-msg-queue-proof.cpp +++ b/validator/impl/out-msg-queue-proof.cpp @@ -320,13 +320,23 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs( if (prefix.pfx_len() > min_split) { prefix = shard_prefix(prefix, min_split); } - new_queries[prefix].push_back(block); + + LOG(INFO) << "search for out msg queue proof " << prefix.to_str() << block.to_str(); + auto& small_entry = small_cache_[std::make_pair(dst_shard, block)]; + if (!small_entry.result.is_null()) { + entry->result[block] = small_entry.result; + entry->from_small_cache++; + alarm_timestamp().relax(small_entry.timeout = td::Timestamp::in(CACHE_TTL)); + } else { + small_entry.pending_entries.push_back(entry); + ++entry->pending; + new_queries[prefix].push_back(block); + } } }; auto limits = last_masterchain_state_->get_imported_msg_queue_limits(dst_shard.workchain); for (auto& p : new_queries) { for (size_t i = 0; i < p.second.size(); i += 16) { - ++entry->pending; size_t j = std::min(i + 16, p.second.size()); get_proof_import(entry, std::vector(p.second.begin() + i, p.second.begin() + j), limits * (td::uint32)(j - i)); @@ -355,7 +365,7 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr entry, Blo if (block.seqno() == 0) { std::vector> proof = { td::Ref(true, block, state->root_cell(), td::Ref{})}; - td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof)); + td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof), ProofSource::Local); return; } td::actor::send_closure( @@ -371,7 +381,8 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr entry, Blo Ref block_state_proof = create_block_state_proof(R.ok()->root_cell()).move_as_ok(); std::vector> proof = { td::Ref(true, block, state->root_cell(), std::move(block_state_proof))}; - td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof)); + td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof), + ProofSource::Local); }); }); } @@ -395,27 +406,70 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr entry, st retry_after); return; } - td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, R.move_as_ok()); + td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, R.move_as_ok(), ProofSource::Query); }); } -void OutMsgQueueImporter::got_proof(std::shared_ptr entry, std::vector> proofs) { +void OutMsgQueueImporter::got_proof(std::shared_ptr entry, std::vector> proofs, + ProofSource proof_source) { if (!check_timeout(entry)) { return; } + // TODO: maybe save proof to small cache? It would allow other queries to reuse this result for (auto& p : proofs) { - entry->result[p->block_id_] = std::move(p); - } - CHECK(entry->pending > 0); - if (--entry->pending == 0) { - finish_query(entry); + auto block_id = p->block_id_; + if (entry->result.emplace(block_id, std::move(p)).second) { + CHECK(entry->pending > 0); + switch (proof_source) { + case ProofSource::SmallCache: + entry->from_small_cache++; + break; + case ProofSource::Broadcast: + entry->from_broadcast++; + break; + case ProofSource::Query: + entry->from_query++; + break; + case ProofSource::Local: + entry->from_local++; + break; + } + if (--entry->pending == 0) { + finish_query(entry); + } + } } } void OutMsgQueueImporter::finish_query(std::shared_ptr entry) { - LOG(DEBUG) << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " - << entry->blocks.size() << " blocks in " << entry->timer.elapsed() << "s"; + FLOG(INFO) { + sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " << entry->blocks.size() + << " blocks in " << entry->timer.elapsed() << "s"; + sb << " sources{"; + if (entry->from_broadcast) { + sb << " broadcast=" << entry->from_broadcast; + } + if (entry->from_small_cache) { + sb << " small_cache=" << entry->from_small_cache; + } + if (entry->from_local) { + sb << " local=" << entry->from_local; + } + if (entry->from_query) { + sb << " query=" << entry->from_query; + } + sb << "}"; + + if (!small_cache_.empty()) { + sb << " small_cache_size=" << small_cache_.size(); + } + if (!cache_.empty()) { + sb << " cache_size=" << cache_.size(); + } + }; + entry->done = true; + CHECK(entry->blocks.size() == entry->result.size()); alarm_timestamp().relax(entry->timeout = td::Timestamp::in(CACHE_TTL)); for (auto& p : entry->promises) { p.first.set_result(entry->result); @@ -441,8 +495,7 @@ bool OutMsgQueueImporter::check_timeout(std::shared_ptr entry) { } void OutMsgQueueImporter::alarm() { - auto it = cache_.begin(); - while (it != cache_.end()) { + for (auto it = cache_.begin(); it != cache_.end();) { auto& promises = it->second->promises; if (it->second->timeout.is_in_past()) { if (!it->second->done) { @@ -469,6 +522,36 @@ void OutMsgQueueImporter::alarm() { promises.resize(j); ++it; } + + for (auto it = small_cache_.begin(); it != small_cache_.end();) { + td::remove_if(it->second.pending_entries, + [](const std::shared_ptr& entry) { return entry->done || entry->promises.empty(); }); + if (it->second.timeout.is_in_past()) { + if (it->second.pending_entries.empty()) { + it = small_cache_.erase(it); + } else { + ++it; + } + } else { + alarm_timestamp().relax(it->second.timeout); + ++it; + } + } +} + +void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) { + LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << proof->block_id_.to_str(); + auto& small_entry = small_cache_[std::make_pair(dst_shard, proof->block_id_)]; + if (!small_entry.result.is_null()) { + return; + } + alarm_timestamp().relax(small_entry.timeout = td::Timestamp::in(CACHE_TTL)); + small_entry.result = proof; + CHECK(proof.not_null()); + auto pending_entries = std::move(small_entry.pending_entries); + for (auto& entry : pending_entries) { + got_proof(entry, {proof}, ProofSource::Broadcast); + } } void BuildOutMsgQueueProof::abort_query(td::Status reason) { diff --git a/validator/impl/out-msg-queue-proof.hpp b/validator/impl/out-msg-queue-proof.hpp index c115a276..6f4c08d1 100644 --- a/validator/impl/out-msg-queue-proof.hpp +++ b/validator/impl/out-msg-queue-proof.hpp @@ -41,6 +41,7 @@ class OutMsgQueueImporter : public td::actor::Actor { void new_masterchain_block_notification(td::Ref state, std::set collating_shards); void get_neighbor_msg_queue_proofs(ShardIdFull dst_shard, std::vector blocks, td::Timestamp timeout, td::Promise>> promise); + void add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof); void update_options(td::Ref opts) { opts_ = std::move(opts); @@ -62,13 +63,28 @@ class OutMsgQueueImporter : public td::actor::Actor { td::Timestamp timeout = td::Timestamp::never(); td::Timer timer; size_t pending = 0; + size_t from_small_cache = 0; + size_t from_broadcast = 0; + size_t from_query = 0; + size_t from_local = 0; }; std::map>, std::shared_ptr> cache_; + // This cache has smaller granularity, proof is stored for each block separately + struct SmallCacheEntry { + td::Ref result; + std::vector> pending_entries; + td::Timestamp timeout = td::Timestamp::never(); + }; + std::map, SmallCacheEntry> small_cache_; + void get_proof_local(std::shared_ptr entry, BlockIdExt block); void get_proof_import(std::shared_ptr entry, std::vector blocks, block::ImportedMsgQueueLimits limits); - void got_proof(std::shared_ptr entry, std::vector> proofs); + enum class ProofSource { + SmallCache, Broadcast, Query, Local + }; + void got_proof(std::shared_ptr entry, std::vector> proofs, ProofSource proof_source); void finish_query(std::shared_ptr entry); bool check_timeout(std::shared_ptr entry); diff --git a/validator/manager.cpp b/validator/manager.cpp index d4ea6810..3ff33d23 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1439,6 +1439,11 @@ void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate can if (!id.is_masterchain()) { add_cached_block_candidate(ReceivedBlock{id, candidate.data.clone()}); } + LOG(INFO) << "Got candidate " << id.to_str() << " with " << candidate.out_msg_queue_proof_broadcasts.size() + << " out msg queue proof broadcasts"; + for (auto broadcast : candidate.out_msg_queue_proof_broadcasts) { + callback_->send_out_msg_queue_proof_broadcast(broadcast); + } td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise)); } @@ -3519,7 +3524,7 @@ void ValidatorManagerImpl::del_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh } else { td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard); } -} +}; void ValidatorManagerImpl::get_collation_manager_stats( td::Promise> promise) { @@ -3569,6 +3574,16 @@ void ValidatorManagerImpl::get_collation_manager_stats( td::actor::send_closure(callback, &Cb::dec_pending); } +void ValidatorManagerImpl::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) { + if (!collator_nodes_.empty()) { + if (out_msg_queue_importer_.empty()) { + out_msg_queue_importer_ = td::actor::create_actor("outmsgqueueimporter", actor_id(this), + opts_, last_masterchain_state_); + } + td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::add_out_msg_queue_proof, dst_shard, + std::move(proof)); + } +} void ValidatorManagerImpl::add_persistent_state_description(td::Ref desc) { auto now = (UnixTime)td::Clocks::system(); if (desc->end_time <= now) { diff --git a/validator/manager.hpp b/validator/manager.hpp index c0d74456..649d5101 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -640,6 +640,7 @@ class ValidatorManagerImpl : public ValidatorManager { void add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override; void del_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override; + void add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) override; void get_collation_manager_stats( td::Promise> promise) override; diff --git a/validator/validator-group.cpp b/validator/validator-group.cpp index f95266b3..344f5c1c 100644 --- a/validator/validator-group.cpp +++ b/validator/validator-group.cpp @@ -66,8 +66,14 @@ void ValidatorGroup::generate_block_candidate( std::move(R)); }; td::uint64 max_answer_size = config_.max_block_size + config_.max_collated_data_size + 1024; + auto block_candidate_priority = BlockCandidatePriority{ + .round = source_info.round, + .first_block_round = source_info.first_block_round, + .priority = source_info.source_priority + }; td::actor::send_closure(collation_manager_, &CollationManager::collate_block, shard_, min_masterchain_block_id_, - prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()}, validator_set_, + prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()}, + block_candidate_priority, validator_set_, max_answer_size, cancellation_token_source_.get_cancellation_token(), std::move(P)); } diff --git a/validator/validator.h b/validator/validator.h index 2cd77102..c4a779cf 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -198,6 +198,9 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data) = 0; virtual void send_broadcast(BlockBroadcast broadcast, int mode) = 0; + virtual void send_out_msg_queue_proof_broadcast(td::Ref broadcats) { + LOG(ERROR) << "Unimplemented send_out_msg_queue_proof_broadcast - ignore broadcast"; + } virtual void download_block(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) = 0; virtual void download_zero_state(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, @@ -325,6 +328,10 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) = 0; virtual void del_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) = 0; + virtual void add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) { + LOG(ERROR) << "Unimplemented add_out_msg_queu_proof - ignore broadcast"; + } + virtual void get_collation_manager_stats( td::Promise> promise) = 0; };