1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-12 19:22:37 +00:00

Merge pull request #1405 from birydrad/accelerator

optimistic out-msg-queue broadcast
This commit is contained in:
SpyCheese 2024-11-27 14:05:49 +04:00 committed by GitHub
commit 3dce9d11d9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 623 additions and 126 deletions

View file

@ -149,4 +149,19 @@ std::enable_if_t<std::is_arithmetic<T>::value, string> to_string(const T &x) {
return sb.as_cslice().str();
}
template <class SB>
struct LambdaPrintHelper {
SB& sb;
};
template <class SB, class F>
SB& operator<<(const LambdaPrintHelper<SB>& helper, F&& f) {
f(helper.sb);
return helper.sb;
}
struct LambdaPrint {};
inline LambdaPrintHelper<td::StringBuilder> operator<<(td::StringBuilder& sb, const LambdaPrint&) {
return LambdaPrintHelper<td::StringBuilder>{sb};
}
} // namespace td

View file

@ -22,6 +22,8 @@
#include "td/utils/logging.h"
#include "td/utils/Time.h"
#include <numeric>
namespace td {
Timer::Timer(bool is_paused) : is_paused_(is_paused) {
@ -60,12 +62,15 @@ StringBuilder &operator<<(StringBuilder &string_builder, const Timer &timer) {
return string_builder << format::as_time(timer.elapsed());
}
PerfWarningTimer::PerfWarningTimer(string name, double max_duration, std::function<void(double)>&& callback)
PerfWarningTimer::PerfWarningTimer(string name, double max_duration, std::function<void(double)> &&callback)
: name_(std::move(name)), start_at_(Time::now()), max_duration_(max_duration), callback_(std::move(callback)) {
}
PerfWarningTimer::PerfWarningTimer(PerfWarningTimer &&other)
: name_(std::move(other.name_)), start_at_(other.start_at_), max_duration_(other.max_duration_), callback_(std::move(other.callback_)) {
: name_(std::move(other.name_))
, start_at_(other.start_at_)
, max_duration_(other.max_duration_)
, callback_(std::move(other.callback_)) {
other.start_at_ = 0;
}
@ -134,4 +139,34 @@ double ThreadCpuTimer::elapsed() const {
return res;
}
PerfLogAction PerfLog::start_action(std::string name) {
auto i = entries_.size();
entries_.push_back({.name = std::move(name), .begin = td::Timestamp::now().at()});
return PerfLogAction{i, std::unique_ptr<PerfLog, EmptyDeleter>(this)};
}
td::StringBuilder &operator<<(StringBuilder &sb, const PerfLog &log) {
sb << "{";
std::vector<size_t> ids(log.entries_.size());
std::iota(ids.begin(), ids.end(), 0);
std::sort(ids.begin(), ids.end(), [&](auto a, auto b) {
return log.entries_[a].end - log.entries_[a].begin > log.entries_[b].end - log.entries_[b].begin;
});
sb << "{";
for (size_t i = 0; i < log.entries_.size(); i++) {
sb << "\n\t";
auto &entry = log.entries_[ids[i]];
sb << "{" << entry.name << ":" << entry.begin << "->" << entry.end << "(" << entry.end - entry.begin << ")"
<< td::format::cond(entry.status.is_error(), entry.status, "") << "}";
}
sb << "\n}";
return sb;
}
double PerfLog::finish_action(size_t i, td::Status status) {
auto &entry = entries_[i];
CHECK(entry.end == 0);
entry.end = td::Timestamp::now().at();
entry.status = std::move(status);
return entry.end - entry.begin;
}
} // namespace td

View file

@ -19,6 +19,7 @@
#pragma once
#include "td/utils/StringBuilder.h"
#include "td/utils/Status.h"
#include <functional>
@ -46,7 +47,7 @@ class Timer {
class PerfWarningTimer {
public:
explicit PerfWarningTimer(string name, double max_duration = 0.1, std::function<void(double)>&& callback = {});
explicit PerfWarningTimer(string name, double max_duration = 0.1, std::function<void(double)> &&callback = {});
PerfWarningTimer(const PerfWarningTimer &) = delete;
PerfWarningTimer &operator=(const PerfWarningTimer &) = delete;
PerfWarningTimer(PerfWarningTimer &&other);
@ -80,4 +81,44 @@ class ThreadCpuTimer {
bool is_paused_{false};
};
class PerfLog;
struct EmptyDeleter {
template <class T>
void operator()(T *) {
}
};
class PerfLogAction {
public:
template <class T>
double finish(const T &result);
size_t i_{0};
std::unique_ptr<PerfLog, EmptyDeleter> perf_log_;
};
class PerfLog {
public:
PerfLogAction start_action(std::string name);
friend td::StringBuilder &operator<<(td::StringBuilder &sb, const PerfLog &log);
private:
struct Entry {
std::string name;
double begin{};
double end{};
td::Status status;
};
std::vector<Entry> entries_;
friend class PerfLogAction;
double finish_action(size_t i, td::Status status);
};
template <class T>
double PerfLogAction::finish(const T &result) {
if (result.is_ok()) {
return perf_log_->finish_action(i_, td::Status::OK());
} else {
return perf_log_->finish_action(i_, result.error().clone());
}
}
} // namespace td

View file

@ -74,6 +74,7 @@
#define LOG(level) LOG_IMPL(level, level, true, ::td::Slice())
#define LOG_IF(level, condition) LOG_IMPL(level, level, condition, #condition)
#define FLOG(level) LOG_IMPL(level, level, true, ::td::Slice()) << td::LambdaPrint{} << [&](auto &sb)
#define VLOG(level) LOG_IMPL(DEBUG, level, true, TD_DEFINE_STR(level))
#define VLOG_IF(level, condition) LOG_IMPL(DEBUG, level, condition, TD_DEFINE_STR(level) " " #condition)
@ -95,13 +96,13 @@ inline bool no_return_func() {
#define DUMMY_LOG_CHECK(condition) LOG_IF(NEVER, !(condition))
#ifdef TD_DEBUG
#if TD_MSVC
#if TD_MSVC
#define LOG_CHECK(condition) \
__analysis_assume(!!(condition)); \
LOG_IMPL(FATAL, FATAL, !(condition), #condition)
#else
#else
#define LOG_CHECK(condition) LOG_IMPL(FATAL, FATAL, !(condition) && no_return_func(), #condition)
#endif
#endif
#else
#define LOG_CHECK DUMMY_LOG_CHECK
#endif
@ -263,6 +264,9 @@ class Logger {
sb_ << other;
return *this;
}
LambdaPrintHelper<td::Logger> operator<<(const LambdaPrint &) {
return LambdaPrintHelper<td::Logger>{*this};
}
MutableCSlice as_cslice() {
return sb_.as_cslice();

View file

@ -433,6 +433,10 @@ tonNode.newBlockCandidateBroadcast id:tonNode.blockIdExt catchain_seqno:int vali
tonNode.newBlockCandidateBroadcastCompressed id:tonNode.blockIdExt catchain_seqno:int validator_set_hash:int
collator_signature:tonNode.blockSignature flags:# compressed:bytes = tonNode.Broadcast;
// optimistic broadcast of response to tonNode.getOutMsgQueueProof with dst_shard, block and limits arguments
tonNode.outMsgQueueProofBroadcast dst_shard:tonNode.shardId block:tonNode.blockIdExt
limits:ImportedMsgQueueLimits proof:tonNode.OutMsgQueueProof = tonNode.Broadcast;
tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId;
tonNode.privateBlockOverlayId zero_state_file_hash:int256 nodes:(vector int256) = tonNode.PrivateBlockOverlayId;
@ -924,7 +928,7 @@ validatorSession.endValidatorGroupStats session_id:int256 timestamp:double
---functions---
collatorNode.generateBlock shard:tonNode.shardId cc_seqno:int prev_blocks:(vector tonNode.blockIdExt)
creator:int256 = collatorNode.Candidate;
creator:int256 round:int first_block_round:int priority:int = collatorNode.Candidate;
collatorNode.ping flags:# = collatorNode.Pong;
---types---

Binary file not shown.

View file

@ -425,14 +425,47 @@ struct Ed25519_PublicKey {
};
// represents (the contents of) a block
struct OutMsgQueueProofBroadcast : public td::CntObject {
OutMsgQueueProofBroadcast(ShardIdFull dst_shard, BlockIdExt block_id, td::int32 max_bytes, td::int32 max_msgs,
td::BufferSlice queue_proofs, td::BufferSlice block_state_proofs,
std::vector<std::int32_t> msg_counts)
: dst_shard(std::move(dst_shard))
, block_id(block_id)
, max_bytes(max_bytes)
, max_msgs(max_msgs)
, queue_proofs(std::move(queue_proofs))
, block_state_proofs(std::move(block_state_proofs))
, msg_counts(std::move(msg_counts)) {
}
ShardIdFull dst_shard;
BlockIdExt block_id;
// importedMsgQueueLimits
td::uint32 max_bytes;
td::uint32 max_msgs;
// outMsgQueueProof
td::BufferSlice queue_proofs;
td::BufferSlice block_state_proofs;
std::vector<std::int32_t> msg_counts;
virtual OutMsgQueueProofBroadcast* make_copy() const {
return new OutMsgQueueProofBroadcast(dst_shard, block_id, max_bytes, max_msgs, queue_proofs.clone(),
block_state_proofs.clone(), msg_counts);
}
};
struct BlockCandidate {
BlockCandidate(Ed25519_PublicKey pubkey, BlockIdExt id, FileHash collated_file_hash, td::BufferSlice data,
td::BufferSlice collated_data)
td::BufferSlice collated_data,
std::vector<td::Ref<OutMsgQueueProofBroadcast>> out_msg_queue_broadcasts = {})
: pubkey(pubkey)
, id(id)
, collated_file_hash(collated_file_hash)
, data(std::move(data))
, collated_data(std::move(collated_data)) {
, collated_data(std::move(collated_data))
, out_msg_queue_proof_broadcasts(std::move(out_msg_queue_broadcasts)) {
}
Ed25519_PublicKey pubkey;
BlockIdExt id;
@ -440,11 +473,21 @@ struct BlockCandidate {
td::BufferSlice data;
td::BufferSlice collated_data;
// used only locally
std::vector<td::Ref<OutMsgQueueProofBroadcast>> out_msg_queue_proof_broadcasts;
BlockCandidate clone() const {
return BlockCandidate{pubkey, id, collated_file_hash, data.clone(), collated_data.clone()};
return BlockCandidate{
pubkey, id, collated_file_hash, data.clone(), collated_data.clone(), out_msg_queue_proof_broadcasts};
}
};
struct BlockCandidatePriority {
td::uint32 round{};
td::uint32 first_block_round{};
td::int32 priority{};
};
struct ValidatorDescr {
/* ton::validator::ValidatorFullId */ Ed25519_PublicKey key;
ValidatorWeight weight;

View file

@ -33,6 +33,7 @@ void CollationManager::start_up() {
void CollationManager::collate_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id,
std::vector<BlockIdExt> prev, Ed25519_PublicKey creator,
BlockCandidatePriority priority,
td::Ref<ValidatorSet> validator_set, td::uint64 max_answer_size,
td::CancellationToken cancellation_token, td::Promise<BlockCandidate> promise) {
if (shard.is_masterchain()) {
@ -41,12 +42,13 @@ void CollationManager::collate_block(ShardIdFull shard, BlockIdExt min_mastercha
std::move(cancellation_token), 0);
return;
}
collate_shard_block(shard, min_masterchain_block_id, std::move(prev), creator, std::move(validator_set),
collate_shard_block(shard, min_masterchain_block_id, std::move(prev), creator, priority, std::move(validator_set),
max_answer_size, std::move(cancellation_token), std::move(promise), td::Timestamp::in(10.0));
}
void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id,
std::vector<BlockIdExt> prev, Ed25519_PublicKey creator,
BlockCandidatePriority priority,
td::Ref<ValidatorSet> validator_set, td::uint64 max_answer_size,
td::CancellationToken cancellation_token,
td::Promise<BlockCandidate> promise, td::Timestamp timeout) {
@ -133,8 +135,8 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas
delay_action(
[=, promise = std::move(promise)]() mutable {
td::actor::send_closure(SelfId, &CollationManager::collate_shard_block, shard, min_masterchain_block_id, prev,
creator, validator_set, max_answer_size, cancellation_token, std::move(promise),
timeout);
creator, priority, validator_set, max_answer_size, cancellation_token,
std::move(promise), timeout);
},
retry_at);
};
@ -145,7 +147,8 @@ void CollationManager::collate_shard_block(ShardIdFull shard, BlockIdExt min_mas
}
td::BufferSlice query = create_serialize_tl_object<ton_api::collatorNode_generateBlock>(
create_tl_shard_id(shard), validator_set->get_catchain_seqno(), std::move(prev_blocks), creator.as_bits256());
create_tl_shard_id(shard), validator_set->get_catchain_seqno(), std::move(prev_blocks), creator.as_bits256(),
priority.round, priority.first_block_round, priority.priority);
LOG(INFO) << "sending collate query for " << next_block_id.to_str() << ": send to #" << selected_idx << "("
<< selected_collator << ")";

View file

@ -35,7 +35,8 @@ class CollationManager : public td::actor::Actor {
void alarm() override;
void collate_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id, std::vector<BlockIdExt> prev,
Ed25519_PublicKey creator, td::Ref<ValidatorSet> validator_set, td::uint64 max_answer_size,
Ed25519_PublicKey creator, BlockCandidatePriority priority,
td::Ref<ValidatorSet> validator_set, td::uint64 max_answer_size,
td::CancellationToken cancellation_token, td::Promise<BlockCandidate> promise);
void update_options(td::Ref<ValidatorManagerOptions> opts);
@ -52,7 +53,8 @@ class CollationManager : public td::actor::Actor {
td::actor::ActorId<rldp::Rldp> rldp_;
void collate_shard_block(ShardIdFull shard, BlockIdExt min_masterchain_block_id, std::vector<BlockIdExt> prev,
Ed25519_PublicKey creator, td::Ref<ValidatorSet> validator_set, td::uint64 max_answer_size,
Ed25519_PublicKey creator, BlockCandidatePriority priority,
td::Ref<ValidatorSet> validator_set, td::uint64 max_answer_size,
td::CancellationToken cancellation_token, td::Promise<BlockCandidate> promise,
td::Timestamp timeout);

View file

@ -191,17 +191,41 @@ void CollatorNode::update_validator_group_info(ShardIdFull shard, std::vector<Bl
if (cache_entry->block_seqno < info.next_block_seqno) {
cache_entry->cancel(td::Status::Error(PSTRING() << "next block seqno " << cache_entry->block_seqno
<< " is too small, expected " << info.next_block_seqno));
if (!cache_entry->has_external_query_at && cache_entry->has_internal_query_at) {
LOG(INFO) << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno
<< ", next_block_seqno=" << cache_entry->block_seqno
<< ": nobody asked for block, but we tried to generate it";
}
if (cache_entry->has_external_query_at && !cache_entry->has_internal_query_at) {
LOG(INFO) << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno
<< ", next_block_seqno=" << cache_entry->block_seqno
<< ": somebody asked for block we didn't even tried to generate";
}
cache_it = info.cache.erase(cache_it);
continue;
}
if (cache_entry->block_seqno == info.next_block_seqno && cached_prev != info.prev) {
cache_entry->cancel(td::Status::Error("invalid prev blocks"));
if (!cache_entry->has_external_query_at && cache_entry->has_internal_query_at) {
LOG(INFO) << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno
<< ", next_block_seqno=" << cache_entry->block_seqno
<< ": nobody asked for block, but we tried to generate it";
}
if (cache_entry->has_external_query_at && !cache_entry->has_internal_query_at) {
LOG(INFO) << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno
<< ", next_block_seqno=" << cache_entry->block_seqno
<< ": somebody asked for block we didn't even tried to generate";
}
cache_it = info.cache.erase(cache_it);
continue;
}
++cache_it;
}
generate_block(shard, cc_seqno, info.prev, td::Timestamp::in(10.0), [](td::Result<BlockCandidate>) {});
generate_block(shard, cc_seqno, info.prev, {}, td::Timestamp::in(10.0), [](td::Result<BlockCandidate>) {});
}
return;
}
@ -285,6 +309,14 @@ static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey cre
cc_seqno = info.gen_catchain_seqno;
val_set_hash = info.gen_validator_list_hash_short;
for (auto& broadcast_ref : block.out_msg_queue_proof_broadcasts) {
auto block_state_proof = create_block_state_proof(root).move_as_ok();
auto &broadcast = broadcast_ref.write();
broadcast.block_id = block.id;
broadcast.block_state_proofs = vm::std_boc_serialize(std::move(block_state_proof), 31).move_as_ok();
}
return block;
}
@ -322,6 +354,11 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data
for (const auto& b : f->prev_blocks_) {
prev_blocks.push_back(create_block_id(b));
}
auto priority = BlockCandidatePriority {
.round = static_cast<td::uint32>(f->round_),
.first_block_round = static_cast<td::uint32>(f->first_block_round_),
.priority = f->priority_
};
Ed25519_PublicKey creator(f->creator_);
td::Promise<BlockCandidate> new_promise = [promise = std::move(promise), src,
shard](td::Result<BlockCandidate> R) mutable {
@ -353,11 +390,13 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data
return;
}
LOG(INFO) << "got adnl query from " << src << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno;
generate_block(shard, cc_seqno, std::move(prev_blocks), td::Timestamp::in(10.0), std::move(new_promise));
generate_block(shard, cc_seqno, std::move(prev_blocks), priority, td::Timestamp::in(10.0), std::move(new_promise));
}
void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std::vector<BlockIdExt> prev_blocks,
td::Timestamp timeout, td::Promise<BlockCandidate> promise) {
std::optional<BlockCandidatePriority> o_priority, td::Timestamp timeout,
td::Promise<BlockCandidate> promise) {
bool is_external = !o_priority;
if (last_masterchain_state_.is_null()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "not ready"));
return;
@ -379,8 +418,8 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std
promise.set_error(td::Status::Error(ErrorCode::timeout));
return;
}
td::actor::send_closure(SelfId, &CollatorNode::generate_block, shard, cc_seqno, std::move(prev_blocks), timeout,
std::move(promise));
td::actor::send_closure(SelfId, &CollatorNode::generate_block, shard, cc_seqno, std::move(prev_blocks),
std::move(o_priority), timeout, std::move(promise));
});
return;
}
@ -399,36 +438,81 @@ void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std
return;
}
static auto prefix_inner = [] (auto &sb, auto &shard, auto cc_seqno, auto block_seqno,
const std::optional<BlockCandidatePriority> &o_priority) {
sb << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno;
if (o_priority) {
sb << " external{";
sb << "round_offset=" << o_priority->round - o_priority->first_block_round << ",priority=" << o_priority->priority;
sb << ",first_block_round=" << o_priority->first_block_round;
sb << "}";
} else {
sb << " internal" ;
}
};
auto prefix = [&] (auto &sb) {
prefix_inner(sb, shard, cc_seqno, block_seqno, o_priority);
};
auto cache_entry = validator_group_info.cache[prev_blocks];
if (cache_entry == nullptr) {
cache_entry = validator_group_info.cache[prev_blocks] = std::make_shared<CacheEntry>();
}
if (is_external && !cache_entry->has_external_query_at) {
cache_entry->has_external_query_at = td::Timestamp::now();
if (cache_entry->has_internal_query_at && cache_entry->has_external_query_at) {
FLOG(INFO) {
prefix(sb);
sb << ": got external query " << cache_entry->has_external_query_at.at() - cache_entry->has_internal_query_at.at()
<< "s after internal query [WON]";
};
}
}
if (!is_external && !cache_entry->has_internal_query_at) {
cache_entry->has_internal_query_at = td::Timestamp::now();
if (cache_entry->has_internal_query_at && cache_entry->has_external_query_at) {
FLOG(INFO) {
prefix(sb);
sb << ": got internal query " << cache_entry->has_internal_query_at.at() - cache_entry->has_external_query_at.at()
<< "s after external query [LOST]";
};
}
}
if (cache_entry->result) {
LOG(INFO) << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno
<< ": using cached result";
auto has_result_ago = td::Timestamp::now().at() - cache_entry->has_result_at.at();
FLOG(INFO) {
prefix(sb);
sb << ": using cached result " << " generated " << has_result_ago << "s ago";
sb << (is_external ? " for external query [WON]" : " for internal query ");
};
promise.set_result(cache_entry->result.value().clone());
return;
}
cache_entry->promises.push_back(std::move(promise));
if (cache_entry->started) {
LOG(INFO) << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno
<< ": collation in progress, waiting";
FLOG(INFO) {
prefix(sb);
sb << ": collation in progress, waiting";
};
return;
}
LOG(INFO) << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno
<< ": starting collation";
FLOG(INFO) {
prefix(sb);
sb << ": starting collation";
};
cache_entry->started = true;
cache_entry->block_seqno = block_seqno;
run_collate_query(
shard, last_masterchain_state_->get_block_id(), std::move(prev_blocks), Ed25519_PublicKey{td::Bits256::zero()},
last_masterchain_state_->get_validator_set(shard), opts_->get_collator_options(), manager_, timeout,
[=, SelfId = actor_id(this), timer = td::Timer{}](td::Result<BlockCandidate> R) {
LOG(INFO) << "generate block result"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno
<< ", time=" << timer.elapsed() << ": " << (R.is_ok() ? "OK" : R.error().to_string());
FLOG(INFO) {
prefix_inner(sb, shard, cc_seqno, block_seqno,o_priority);
sb << timer.elapsed() << ": " << (R.is_ok() ? "OK" : R.error().to_string());
};
td::actor::send_closure(SelfId, &CollatorNode::process_result, cache_entry, std::move(R));
},
cache_entry->cancellation_token_source.get_cancellation_token(),
@ -443,6 +527,7 @@ void CollatorNode::process_result(std::shared_ptr<CacheEntry> cache_entry, td::R
}
} else {
cache_entry->result = R.move_as_ok();
cache_entry->has_result_at = td::Timestamp::now();
for (auto& p : cache_entry->promises) {
p.set_result(cache_entry->result.value().clone());
}

View file

@ -19,6 +19,7 @@
#include "interfaces/validator-manager.h"
#include "rldp/rldp.h"
#include <map>
#include <optional>
namespace ton::validator {
@ -57,6 +58,9 @@ class CollatorNode : public td::actor::Actor {
struct CacheEntry {
bool started = false;
td::Timestamp has_internal_query_at;
td::Timestamp has_external_query_at;
td::Timestamp has_result_at;
BlockSeqno block_seqno = 0;
td::optional<BlockCandidate> result;
td::CancellationTokenSource cancellation_token_source;
@ -84,7 +88,8 @@ class CollatorNode : public td::actor::Actor {
td::Result<FutureValidatorGroup*> get_future_validator_group(ShardIdFull shard, CatchainSeqno cc_seqno);
void generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std::vector<BlockIdExt> prev_blocks,
td::Timestamp timeout, td::Promise<BlockCandidate> promise);
std::optional<BlockCandidatePriority> o_priority, td::Timestamp timeout,
td::Promise<BlockCandidate> promise);
void process_result(std::shared_ptr<CacheEntry> cache_entry, td::Result<BlockCandidate> R);
public:

View file

@ -46,6 +46,33 @@ void FullNodeFastSyncOverlay::process_block_broadcast(PublicKeyHash src, ton_api
td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok());
}
void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast &query) {
BlockIdExt block_id = create_block_id(query.block_);
ShardIdFull shard_id = create_shard_id(query.dst_shard_);
if (query.proof_->get_id() != ton_api::tonNode_outMsgQueueProof::ID) {
LOG(ERROR) << "got tonNode.outMsgQueueProofBroadcast with proof not tonNode.outMsgQueueProof";
return;
}
auto tl_proof = move_tl_object_as<ton_api::tonNode_outMsgQueueProof>(query.proof_);
auto R = OutMsgQueueProof::fetch(shard_id, {block_id},
block::ImportedMsgQueueLimits{.max_bytes = td::uint32(query.limits_->max_bytes_),
.max_msgs = td::uint32(query.limits_->max_msgs_)},
*tl_proof);
if (R.is_error()) {
LOG(ERROR) << "got tonNode.outMsgQueueProofBroadcast with invalid proof: " << R.error();
return;
}
if (R.ok().size() != 1) {
LOG(ERROR) << "got tonNode.outMsgQueueProofBroadcast with invalid proofs count=" << R.ok().size();
return;
}
auto proof = std::move(R.move_as_ok()[0]);
LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast " << shard_id.to_str() << " " << block_id.to_str();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::add_out_msg_queue_proof, shard_id,
std::move(proof));
}
void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
BlockIdExt block_id = create_block_id(query.block_->block_);
VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in fast sync overlay from " << src << ": "
@ -200,6 +227,22 @@ void FullNodeFastSyncOverlay::collect_validator_telemetry(std::string filename)
}
}
void FullNodeFastSyncOverlay::send_out_msg_queue_proof_broadcast(td::Ref<OutMsgQueueProofBroadcast> broadcast) {
if (!inited_) {
return;
}
auto B = create_serialize_tl_object<ton_api::tonNode_outMsgQueueProofBroadcast>(
create_tl_shard_id(broadcast->dst_shard), create_tl_block_id(broadcast->block_id),
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(broadcast->max_bytes, broadcast->max_msgs),
create_tl_object<ton_api::tonNode_outMsgQueueProof>(broadcast->queue_proofs.clone(),
broadcast->block_state_proofs.clone(),
std::vector<std::int32_t>(broadcast->msg_counts)));
VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay: " << broadcast->dst_shard.to_str() << " "
<< broadcast->block_id.to_str();
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
}
void FullNodeFastSyncOverlay::start_up() {
auto X = create_hash_tl_object<ton_api::tonNode_fastSyncOverlayId>(zero_state_file_hash_, create_tl_shard_id(shard_));
td::BufferSlice b{32};

View file

@ -25,6 +25,7 @@ class FullNodeFastSyncOverlay : public td::actor::Actor {
public:
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast& query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed& query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast& query);
void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast& query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast& query);
@ -46,6 +47,7 @@ class FullNodeFastSyncOverlay : public td::actor::Actor {
void send_broadcast(BlockBroadcast broadcast);
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void send_out_msg_queue_proof_broadcast(td::Ref<OutMsgQueueProofBroadcast> broadcast);
void send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry);
void collect_validator_telemetry(std::string filename);

View file

@ -151,6 +151,9 @@ class FullNodeShardImpl : public FullNodeShard {
void process_broadcast(PublicKeyHash src, ton_api::tonNode_ihrMessageBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast &query) {
LOG(ERROR) << "Ignore outMsgQueueProofBroadcast";
}
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);

View file

@ -371,6 +371,14 @@ void FullNodeImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_se
}
}
void FullNodeImpl::send_out_msg_queue_proof_broadcast(td::Ref<OutMsgQueueProofBroadcast> broadcast) {
auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(broadcast->dst_shard).first;
if (!fast_sync_overlay.empty()) {
td::actor::send_closure(fast_sync_overlay, &FullNodeFastSyncOverlay::send_out_msg_queue_proof_broadcast,
std::move(broadcast));
}
}
void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, int mode) {
if (mode & broadcast_mode_custom) {
send_block_broadcast_to_custom_overlays(broadcast);
@ -713,6 +721,9 @@ void FullNodeImpl::start_up() {
td::actor::send_closure(id_, &FullNodeImpl::send_block_candidate, block_id, cc_seqno, validator_set_hash,
std::move(data));
}
void send_out_msg_queue_proof_broadcast(td::Ref<OutMsgQueueProofBroadcast> broadcast) override {
td::actor::send_closure(id_, &FullNodeImpl::send_out_msg_queue_proof_broadcast, std::move(broadcast));
}
void send_broadcast(BlockBroadcast broadcast, int mode) override {
td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), mode);
}

View file

@ -70,6 +70,7 @@ class FullNodeImpl : public FullNode {
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast, int mode);
void send_out_msg_queue_proof_broadcast(td::Ref<OutMsgQueueProofBroadcast> broadcats);
void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise<ReceivedBlock> promise);
void download_zero_state(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::BufferSlice> promise);

View file

@ -237,6 +237,7 @@ class Collator final : public td::actor::Actor {
bool store_out_msg_queue_size_ = false;
td::PerfWarningTimer perf_timer_;
td::PerfLog perf_log_;
//
block::Account* lookup_account(td::ConstBitPtr addr) const;
std::unique_ptr<block::Account> make_account_from(td::ConstBitPtr addr, Ref<vm::CellSlice> account,
@ -252,18 +253,18 @@ class Collator final : public td::actor::Actor {
bool fatal_error(int err_code, std::string err_msg);
bool fatal_error(std::string err_msg, int err_code = -666);
void check_pending();
void after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res);
void after_get_shard_state(int idx, td::Result<Ref<ShardState>> res);
void after_get_block_data(int idx, td::Result<Ref<BlockData>> res);
void after_get_shard_blocks(td::Result<std::vector<Ref<ShardTopBlockDescription>>> res);
void after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res, td::PerfLogAction token);
void after_get_shard_state(int idx, td::Result<Ref<ShardState>> res, td::PerfLogAction token);
void after_get_block_data(int idx, td::Result<Ref<BlockData>> res, td::PerfLogAction token);
void after_get_shard_blocks(td::Result<std::vector<Ref<ShardTopBlockDescription>>> res, td::PerfLogAction token);
bool preprocess_prev_mc_state();
bool register_mc_state(Ref<MasterchainStateQ> other_mc_state);
bool request_aux_mc_state(BlockSeqno seqno, Ref<MasterchainStateQ>& state);
Ref<MasterchainStateQ> get_aux_mc_state(BlockSeqno seqno) const;
void after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result<Ref<ShardState>> res);
void after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result<Ref<ShardState>> res, td::PerfLogAction token);
bool fix_one_processed_upto(block::MsgProcessedUpto& proc, const ton::ShardIdFull& owner);
bool fix_processed_upto(block::MsgProcessedUptoCollection& upto);
void got_neighbor_msg_queues(td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> R);
void got_neighbor_msg_queues(td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> R, td::PerfLogAction token);
void got_neighbor_msg_queue(unsigned i, Ref<OutMsgQueueProof> res);
void got_out_queue_size(size_t i, td::Result<td::uint64> res);
bool adjust_shard_config();
@ -309,7 +310,7 @@ class Collator final : public td::actor::Actor {
bool is_our_address(Ref<vm::CellSlice> addr_ref) const;
bool is_our_address(ton::AccountIdPrefixFull addr_prefix) const;
bool is_our_address(const ton::StdSmcAddress& addr) const;
void after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res);
void after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res, td::PerfLogAction token);
td::Result<bool> register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash,
int priority);
// td::Result<bool> register_external_message(td::Slice ext_msg_boc);

View file

@ -217,25 +217,30 @@ void Collator::start_up() {
// 2. learn latest masterchain state and block id
LOG(DEBUG) << "sending get_top_masterchain_state_block() to Manager";
++pending;
auto token = perf_log_.start_action("get_top_masterchain_state_block");
if (!is_hardfork_) {
td::actor::send_closure_later(manager, &ValidatorManager::get_top_masterchain_state_block,
[self = get_self()](td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res) {
[self = get_self(), token = std::move(token)](
td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res) mutable {
LOG(DEBUG) << "got answer to get_top_masterchain_state_block";
td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state,
std::move(res));
std::move(res), std::move(token));
});
} else {
td::actor::send_closure_later(
manager, &ValidatorManager::get_shard_state_from_db_short, min_mc_block_id,
[self = get_self(), block_id = min_mc_block_id](td::Result<Ref<ShardState>> res) {
LOG(DEBUG) << "got answer to get_top_masterchain_state_block";
if (res.is_error()) {
td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state, res.move_as_error());
} else {
td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state,
std::make_pair(Ref<MasterchainState>(res.move_as_ok()), block_id));
}
});
td::actor::send_closure_later(manager, &ValidatorManager::get_shard_state_from_db_short, min_mc_block_id,
[self = get_self(), block_id = min_mc_block_id,
token = std::move(token)](td::Result<Ref<ShardState>> res) mutable {
LOG(DEBUG) << "got answer to get_top_masterchain_state_block";
if (res.is_error()) {
td::actor::send_closure_later(std::move(self), &Collator::after_get_mc_state,
res.move_as_error(), std::move(token));
} else {
td::actor::send_closure_later(
std::move(self), &Collator::after_get_mc_state,
std::make_pair(Ref<MasterchainState>(res.move_as_ok()), block_id),
std::move(token));
}
});
}
}
// 3. load previous block(s) and corresponding state(s)
@ -245,23 +250,27 @@ void Collator::start_up() {
// 3.1. load state
LOG(DEBUG) << "sending wait_block_state() query #" << i << " for " << prev_blocks[i].to_str() << " to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::wait_block_state_short, prev_blocks[i], priority(),
timeout, [self = get_self(), i](td::Result<Ref<ShardState>> res) {
LOG(DEBUG) << "got answer to wait_block_state query #" << i;
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_state, i,
std::move(res));
});
auto token = perf_log_.start_action(PSTRING() << "wait_block_state #" << i);
td::actor::send_closure_later(
manager, &ValidatorManager::wait_block_state_short, prev_blocks[i], priority(), timeout,
[self = get_self(), i, token = std::move(token)](td::Result<Ref<ShardState>> res) mutable {
LOG(DEBUG) << "got answer to wait_block_state query #" << i;
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_state, i, std::move(res),
std::move(token));
});
if (prev_blocks[i].seqno()) {
// 3.2. load block
// NB: we need the block itself only for extracting start_lt and end_lt to create correct prev_blk:ExtBlkRef and related Merkle proofs
LOG(DEBUG) << "sending wait_block_data() query #" << i << " for " << prev_blocks[i].to_str() << " to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::wait_block_data_short, prev_blocks[i], priority(),
timeout, [self = get_self(), i](td::Result<Ref<BlockData>> res) {
LOG(DEBUG) << "got answer to wait_block_data query #" << i;
td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, i,
std::move(res));
});
auto token = perf_log_.start_action(PSTRING() << "wait_block_data #" << i);
td::actor::send_closure_later(
manager, &ValidatorManager::wait_block_data_short, prev_blocks[i], priority(), timeout,
[self = get_self(), i, token = std::move(token)](td::Result<Ref<BlockData>> res) mutable {
LOG(DEBUG) << "got answer to wait_block_data query #" << i;
td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, i, std::move(res),
std::move(token));
});
}
}
if (is_hardfork_) {
@ -271,22 +280,28 @@ void Collator::start_up() {
if (!is_hardfork_) {
LOG(DEBUG) << "sending get_external_messages() query to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::get_external_messages, shard_,
[self = get_self()](td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res) -> void {
auto token = perf_log_.start_action("get_external_messages");
td::actor::send_closure_later(
manager, &ValidatorManager::get_external_messages, shard_,
[self = get_self(),
token = std::move(token)](td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res) mutable -> void {
LOG(DEBUG) << "got answer to get_external_messages() query";
td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res));
td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res),
std::move(token));
});
}
if (is_masterchain() && !is_hardfork_) {
// 5. load shard block info messages
LOG(DEBUG) << "sending get_shard_blocks_for_collator() query to Manager";
++pending;
td::actor::send_closure_later(
manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0],
[self = get_self()](td::Result<std::vector<Ref<ShardTopBlockDescription>>> res) -> void {
LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query";
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks, std::move(res));
});
auto token = perf_log_.start_action("get_shard_blocks_for_collator");
td::actor::send_closure_later(manager, &ValidatorManager::get_shard_blocks_for_collator, prev_blocks[0],
[self = get_self(), token = std::move(token)](
td::Result<std::vector<Ref<ShardTopBlockDescription>>> res) mutable -> void {
LOG(DEBUG) << "got answer to get_shard_blocks_for_collator() query";
td::actor::send_closure_later(std::move(self), &Collator::after_get_shard_blocks,
std::move(res), std::move(token));
});
}
// 6. set timeout
alarm_timestamp() = timeout;
@ -362,6 +377,8 @@ bool Collator::fatal_error(td::Status error) {
td::Timestamp::in(10.0), std::move(main_promise), std::move(cancellation_token_), mode_,
attempt_idx_ + 1);
} else {
LOG(INFO) << "collation failed in " << perf_timer_.elapsed() << " s " << error;
LOG(INFO) << perf_log_;
main_promise(std::move(error));
}
busy_ = false;
@ -482,12 +499,14 @@ bool Collator::request_aux_mc_state(BlockSeqno seqno, Ref<MasterchainStateQ>& st
CHECK(blkid.is_valid_ext() && blkid.is_masterchain());
LOG(DEBUG) << "sending auxiliary wait_block_state() query for " << blkid.to_str() << " to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::wait_block_state_short, blkid, priority(), timeout,
[self = get_self(), blkid](td::Result<Ref<ShardState>> res) {
LOG(DEBUG) << "got answer to wait_block_state query for " << blkid.to_str();
td::actor::send_closure_later(std::move(self), &Collator::after_get_aux_shard_state,
blkid, std::move(res));
});
auto token = perf_log_.start_action(PSTRING() << "auxiliary wait_block_state " << blkid.to_str());
td::actor::send_closure_later(
manager, &ValidatorManager::wait_block_state_short, blkid, priority(), timeout,
[self = get_self(), blkid, token = std::move(token)](td::Result<Ref<ShardState>> res) mutable {
LOG(DEBUG) << "got answer to wait_block_state query for " << blkid.to_str();
td::actor::send_closure_later(std::move(self), &Collator::after_get_aux_shard_state, blkid, std::move(res),
std::move(token));
});
state.clear();
return true;
}
@ -515,9 +534,11 @@ Ref<MasterchainStateQ> Collator::get_aux_mc_state(BlockSeqno seqno) const {
* @param blkid The BlockIdExt of the shard state.
* @param res The result of retrieving the shard state.
*/
void Collator::after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result<Ref<ShardState>> res) {
void Collator::after_get_aux_shard_state(ton::BlockIdExt blkid, td::Result<Ref<ShardState>> res,
td::PerfLogAction token) {
LOG(DEBUG) << "in Collator::after_get_aux_shard_state(" << blkid.to_str() << ")";
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error("cannot load auxiliary masterchain state for "s + blkid.to_str() + " : " +
res.move_as_error().to_string());
@ -579,9 +600,11 @@ bool Collator::preprocess_prev_mc_state() {
*
* @param res The retrieved masterchain state.
*/
void Collator::after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res) {
void Collator::after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, BlockIdExt>> res,
td::PerfLogAction token) {
LOG(WARNING) << "in Collator::after_get_mc_state()";
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error(res.move_as_error());
return;
@ -598,12 +621,14 @@ void Collator::after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, Bl
// NB. it is needed only for creating a correct ExtBlkRef reference to it, which requires start_lt and end_lt
LOG(DEBUG) << "sending wait_block_data() query #-1 for " << mc_block_id_.to_str() << " to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::wait_block_data_short, mc_block_id_, priority(), timeout,
[self = get_self()](td::Result<Ref<BlockData>> res) {
LOG(DEBUG) << "got answer to wait_block_data query #-1";
td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, -1,
std::move(res));
});
auto token = perf_log_.start_action("wait_block_data #-1");
td::actor::send_closure_later(
manager, &ValidatorManager::wait_block_data_short, mc_block_id_, priority(), timeout,
[self = get_self(), token = std::move(token)](td::Result<Ref<BlockData>> res) mutable {
LOG(DEBUG) << "got answer to wait_block_data query #-1";
td::actor::send_closure_later(std::move(self), &Collator::after_get_block_data, -1, std::move(res),
std::move(token));
});
}
check_pending();
}
@ -614,9 +639,10 @@ void Collator::after_get_mc_state(td::Result<std::pair<Ref<MasterchainState>, Bl
* @param idx The index of the previous shard block (0 or 1).
* @param res The retrieved shard state.
*/
void Collator::after_get_shard_state(int idx, td::Result<Ref<ShardState>> res) {
void Collator::after_get_shard_state(int idx, td::Result<Ref<ShardState>> res, td::PerfLogAction token) {
LOG(WARNING) << "in Collator::after_get_shard_state(" << idx << ")";
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error(res.move_as_error());
return;
@ -647,9 +673,10 @@ void Collator::after_get_shard_state(int idx, td::Result<Ref<ShardState>> res) {
* @param idx The index of the previous block (0 or 1).
* @param res The retreved block data.
*/
void Collator::after_get_block_data(int idx, td::Result<Ref<BlockData>> res) {
void Collator::after_get_block_data(int idx, td::Result<Ref<BlockData>> res, td::PerfLogAction token) {
LOG(DEBUG) << "in Collator::after_get_block_data(" << idx << ")";
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error(res.move_as_error());
return;
@ -691,8 +718,10 @@ void Collator::after_get_block_data(int idx, td::Result<Ref<BlockData>> res) {
*
* @param res The retrieved shard block descriptions.
*/
void Collator::after_get_shard_blocks(td::Result<std::vector<Ref<ShardTopBlockDescription>>> res) {
void Collator::after_get_shard_blocks(td::Result<std::vector<Ref<ShardTopBlockDescription>>> res,
td::PerfLogAction token) {
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error(res.move_as_error());
return;
@ -848,10 +877,13 @@ bool Collator::request_neighbor_msg_queues() {
++i;
}
++pending;
auto token = perf_log_.start_action("neighbor_msg_queues");
td::actor::send_closure_later(
manager, &ValidatorManager::wait_neighbor_msg_queue_proofs, shard_, std::move(top_blocks), timeout,
[self = get_self()](td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> res) {
td::actor::send_closure_later(std::move(self), &Collator::got_neighbor_msg_queues, std::move(res));
[self = get_self(),
token = std::move(token)](td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> res) mutable {
td::actor::send_closure_later(std::move(self), &Collator::got_neighbor_msg_queues, std::move(res),
std::move(token));
});
return true;
}
@ -883,13 +915,15 @@ bool Collator::request_out_msg_queue_size() {
* @param i The index of the neighbor.
* @param res The obtained outbound queue.
*/
void Collator::got_neighbor_msg_queues(td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> R) {
void Collator::got_neighbor_msg_queues(td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> R,
td::PerfLogAction token) {
--pending;
double duration = token.finish(R);
if (R.is_error()) {
fatal_error(R.move_as_error_prefix("failed to get neighbor msg queues: "));
return;
}
LOG(INFO) << "neighbor output queues fetched";
LOG(INFO) << "neighbor output queues fetched, took " << duration << "s";
auto res = R.move_as_ok();
unsigned i = 0;
for (block::McShardDescr& descr : neighbors_) {
@ -2091,12 +2125,9 @@ bool Collator::init_lt() {
* @returns True if the configuration parameters were successfully fetched and initialized, false otherwise.
*/
bool Collator::fetch_config_params() {
auto res = block::FetchConfigParams::fetch_config_params(*config_,
&old_mparams_, &storage_prices_, &storage_phase_cfg_,
&rand_seed_, &compute_phase_cfg_, &action_phase_cfg_,
&masterchain_create_fee_, &basechain_create_fee_,
workchain(), now_
);
auto res = block::FetchConfigParams::fetch_config_params(
*config_, &old_mparams_, &storage_prices_, &storage_phase_cfg_, &rand_seed_, &compute_phase_cfg_,
&action_phase_cfg_, &masterchain_create_fee_, &basechain_create_fee_, workchain(), now_);
if (res.is_error()) {
return fatal_error(res.move_as_error());
}
@ -2217,6 +2248,11 @@ bool Collator::init_value_create() {
bool Collator::do_collate() {
// After do_collate started it will not be interrupted by timeout
alarm_timestamp() = td::Timestamp::never();
auto token = perf_log_.start_action("do_collate");
td::Status status = td::Status::Error("some error");
SCOPE_EXIT {
token.finish(status);
};
LOG(WARNING) << "do_collate() : start";
if (!fetch_config_params()) {
@ -2342,6 +2378,7 @@ bool Collator::do_collate() {
if (!create_block_candidate()) {
return fatal_error("cannot serialize a new Block candidate");
}
status = td::Status::OK();
return true;
}
@ -5811,12 +5848,43 @@ bool Collator::create_block_candidate() {
<< block_limit_status_->transactions;
LOG(INFO) << "serialized collated data size " << cdata_slice.size() << " bytes (preliminary estimate was "
<< block_limit_status_->collated_data_stat.estimate_proof_size() << ")";
auto new_block_id_ext = ton::BlockIdExt{ton::BlockId{shard_, new_block_seqno}, new_block->get_hash().bits(),
block::compute_file_hash(blk_slice.as_slice())};
// 3. create a BlockCandidate
block_candidate = std::make_unique<BlockCandidate>(
created_by_,
ton::BlockIdExt{ton::BlockId{shard_, new_block_seqno}, new_block->get_hash().bits(),
block::compute_file_hash(blk_slice.as_slice())},
block::compute_file_hash(cdata_slice.as_slice()), blk_slice.clone(), cdata_slice.clone());
block_candidate =
std::make_unique<BlockCandidate>(created_by_, new_block_id_ext, block::compute_file_hash(cdata_slice.as_slice()),
blk_slice.clone(), cdata_slice.clone());
const bool need_out_msg_queue_broadcasts = true;
if (need_out_msg_queue_broadcasts) {
// we can't generate two proofs at the same time for the same root (it is not currently supported by cells)
// so we have can't reuse new state and have to regenerate it with merkle update
auto new_state = vm::MerkleUpdate::apply(prev_state_root_pure_, state_update);
CHECK(new_state.not_null());
CHECK(new_state->get_hash() == state_root->get_hash());
assert(config_ && shard_conf_);
auto neighbor_list = shard_conf_->get_neighbor_shard_hash_ids(shard_);
LOG(INFO) << "Build OutMsgQueueProofs for " << neighbor_list.size() << " neighbours";
for (ton::BlockId blk_id : neighbor_list) {
auto prefix = blk_id.shard_full();
auto limits = mc_state_->get_imported_msg_queue_limits(blk_id.workchain);
// one could use monitor_min_split_depth here, to decrease number of broadcasts
// but current implementation OutMsgQueueImporter doesn't support it
auto r_proof = OutMsgQueueProof::build(
prefix, {OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}}, limits);
if (r_proof.is_ok()) {
auto proof = r_proof.move_as_ok();
block_candidate->out_msg_queue_proof_broadcasts.push_back(td::Ref<OutMsgQueueProofBroadcast>(
true, OutMsgQueueProofBroadcast(prefix, new_block_id_ext, limits.max_bytes, limits.max_msgs,
std::move(proof->queue_proofs_), std::move(proof->block_state_proofs_),
std::move(proof->msg_counts_))));
} else {
LOG(ERROR) << "Failed to build OutMsgQueueProof: " << r_proof.error();
}
}
}
// 3.1 check block and collated data size
auto consensus_config = config_->get_consensus_config();
if (block_candidate->data.size() > consensus_config.max_block_size) {
@ -5896,6 +5964,7 @@ void Collator::return_block_candidate(td::Result<td::Unit> saved) {
CHECK(block_candidate);
LOG(WARNING) << "sending new BlockCandidate to Promise";
LOG(WARNING) << "collation took " << perf_timer_.elapsed() << " s";
LOG(WARNING) << perf_log_;
main_promise(block_candidate->clone());
busy_ = false;
stop();
@ -5974,9 +6043,11 @@ td::Result<bool> Collator::register_external_message_cell(Ref<vm::Cell> ext_msg,
*
* @param res The result of the external message retrieval operation.
*/
void Collator::after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res) {
void Collator::after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res,
td::PerfLogAction token) {
// res: pair {ext msg, priority}
--pending;
token.finish(res);
if (res.is_error()) {
fatal_error(res.move_as_error());
return;

View file

@ -320,13 +320,23 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
if (prefix.pfx_len() > min_split) {
prefix = shard_prefix(prefix, min_split);
}
new_queries[prefix].push_back(block);
LOG(INFO) << "search for out msg queue proof " << prefix.to_str() << block.to_str();
auto& small_entry = small_cache_[std::make_pair(dst_shard, block)];
if (!small_entry.result.is_null()) {
entry->result[block] = small_entry.result;
entry->from_small_cache++;
alarm_timestamp().relax(small_entry.timeout = td::Timestamp::in(CACHE_TTL));
} else {
small_entry.pending_entries.push_back(entry);
++entry->pending;
new_queries[prefix].push_back(block);
}
}
};
auto limits = last_masterchain_state_->get_imported_msg_queue_limits(dst_shard.workchain);
for (auto& p : new_queries) {
for (size_t i = 0; i < p.second.size(); i += 16) {
++entry->pending;
size_t j = std::min(i + 16, p.second.size());
get_proof_import(entry, std::vector<BlockIdExt>(p.second.begin() + i, p.second.begin() + j),
limits * (td::uint32)(j - i));
@ -355,7 +365,7 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr<CacheEntry> entry, Blo
if (block.seqno() == 0) {
std::vector<td::Ref<OutMsgQueueProof>> proof = {
td::Ref<OutMsgQueueProof>(true, block, state->root_cell(), td::Ref<vm::Cell>{})};
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof));
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof), ProofSource::Local);
return;
}
td::actor::send_closure(
@ -371,7 +381,8 @@ void OutMsgQueueImporter::get_proof_local(std::shared_ptr<CacheEntry> entry, Blo
Ref<vm::Cell> block_state_proof = create_block_state_proof(R.ok()->root_cell()).move_as_ok();
std::vector<td::Ref<OutMsgQueueProof>> proof = {
td::Ref<OutMsgQueueProof>(true, block, state->root_cell(), std::move(block_state_proof))};
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof));
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, std::move(proof),
ProofSource::Local);
});
});
}
@ -395,27 +406,70 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr<CacheEntry> entry, st
retry_after);
return;
}
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, R.move_as_ok());
td::actor::send_closure(SelfId, &OutMsgQueueImporter::got_proof, entry, R.move_as_ok(), ProofSource::Query);
});
}
void OutMsgQueueImporter::got_proof(std::shared_ptr<CacheEntry> entry, std::vector<td::Ref<OutMsgQueueProof>> proofs) {
void OutMsgQueueImporter::got_proof(std::shared_ptr<CacheEntry> entry, std::vector<td::Ref<OutMsgQueueProof>> proofs,
ProofSource proof_source) {
if (!check_timeout(entry)) {
return;
}
// TODO: maybe save proof to small cache? It would allow other queries to reuse this result
for (auto& p : proofs) {
entry->result[p->block_id_] = std::move(p);
}
CHECK(entry->pending > 0);
if (--entry->pending == 0) {
finish_query(entry);
auto block_id = p->block_id_;
if (entry->result.emplace(block_id, std::move(p)).second) {
CHECK(entry->pending > 0);
switch (proof_source) {
case ProofSource::SmallCache:
entry->from_small_cache++;
break;
case ProofSource::Broadcast:
entry->from_broadcast++;
break;
case ProofSource::Query:
entry->from_query++;
break;
case ProofSource::Local:
entry->from_local++;
break;
}
if (--entry->pending == 0) {
finish_query(entry);
}
}
}
}
void OutMsgQueueImporter::finish_query(std::shared_ptr<CacheEntry> entry) {
LOG(DEBUG) << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", "
<< entry->blocks.size() << " blocks in " << entry->timer.elapsed() << "s";
FLOG(INFO) {
sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " << entry->blocks.size()
<< " blocks in " << entry->timer.elapsed() << "s";
sb << " sources{";
if (entry->from_broadcast) {
sb << " broadcast=" << entry->from_broadcast;
}
if (entry->from_small_cache) {
sb << " small_cache=" << entry->from_small_cache;
}
if (entry->from_local) {
sb << " local=" << entry->from_local;
}
if (entry->from_query) {
sb << " query=" << entry->from_query;
}
sb << "}";
if (!small_cache_.empty()) {
sb << " small_cache_size=" << small_cache_.size();
}
if (!cache_.empty()) {
sb << " cache_size=" << cache_.size();
}
};
entry->done = true;
CHECK(entry->blocks.size() == entry->result.size());
alarm_timestamp().relax(entry->timeout = td::Timestamp::in(CACHE_TTL));
for (auto& p : entry->promises) {
p.first.set_result(entry->result);
@ -441,8 +495,7 @@ bool OutMsgQueueImporter::check_timeout(std::shared_ptr<CacheEntry> entry) {
}
void OutMsgQueueImporter::alarm() {
auto it = cache_.begin();
while (it != cache_.end()) {
for (auto it = cache_.begin(); it != cache_.end();) {
auto& promises = it->second->promises;
if (it->second->timeout.is_in_past()) {
if (!it->second->done) {
@ -469,6 +522,36 @@ void OutMsgQueueImporter::alarm() {
promises.resize(j);
++it;
}
for (auto it = small_cache_.begin(); it != small_cache_.end();) {
td::remove_if(it->second.pending_entries,
[](const std::shared_ptr<CacheEntry>& entry) { return entry->done || entry->promises.empty(); });
if (it->second.timeout.is_in_past()) {
if (it->second.pending_entries.empty()) {
it = small_cache_.erase(it);
} else {
++it;
}
} else {
alarm_timestamp().relax(it->second.timeout);
++it;
}
}
}
void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof) {
LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << proof->block_id_.to_str();
auto& small_entry = small_cache_[std::make_pair(dst_shard, proof->block_id_)];
if (!small_entry.result.is_null()) {
return;
}
alarm_timestamp().relax(small_entry.timeout = td::Timestamp::in(CACHE_TTL));
small_entry.result = proof;
CHECK(proof.not_null());
auto pending_entries = std::move(small_entry.pending_entries);
for (auto& entry : pending_entries) {
got_proof(entry, {proof}, ProofSource::Broadcast);
}
}
void BuildOutMsgQueueProof::abort_query(td::Status reason) {

View file

@ -41,6 +41,7 @@ class OutMsgQueueImporter : public td::actor::Actor {
void new_masterchain_block_notification(td::Ref<MasterchainState> state, std::set<ShardIdFull> collating_shards);
void get_neighbor_msg_queue_proofs(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks, td::Timestamp timeout,
td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise);
void add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof);
void update_options(td::Ref<ValidatorManagerOptions> opts) {
opts_ = std::move(opts);
@ -62,13 +63,28 @@ class OutMsgQueueImporter : public td::actor::Actor {
td::Timestamp timeout = td::Timestamp::never();
td::Timer timer;
size_t pending = 0;
size_t from_small_cache = 0;
size_t from_broadcast = 0;
size_t from_query = 0;
size_t from_local = 0;
};
std::map<std::pair<ShardIdFull, std::vector<BlockIdExt>>, std::shared_ptr<CacheEntry>> cache_;
// This cache has smaller granularity, proof is stored for each block separately
struct SmallCacheEntry {
td::Ref<OutMsgQueueProof> result;
std::vector<std::shared_ptr<CacheEntry>> pending_entries;
td::Timestamp timeout = td::Timestamp::never();
};
std::map<std::pair<ShardIdFull, BlockIdExt>, SmallCacheEntry> small_cache_;
void get_proof_local(std::shared_ptr<CacheEntry> entry, BlockIdExt block);
void get_proof_import(std::shared_ptr<CacheEntry> entry, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits);
void got_proof(std::shared_ptr<CacheEntry> entry, std::vector<td::Ref<OutMsgQueueProof>> proofs);
enum class ProofSource {
SmallCache, Broadcast, Query, Local
};
void got_proof(std::shared_ptr<CacheEntry> entry, std::vector<td::Ref<OutMsgQueueProof>> proofs, ProofSource proof_source);
void finish_query(std::shared_ptr<CacheEntry> entry);
bool check_timeout(std::shared_ptr<CacheEntry> entry);

View file

@ -1439,6 +1439,11 @@ void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate can
if (!id.is_masterchain()) {
add_cached_block_candidate(ReceivedBlock{id, candidate.data.clone()});
}
LOG(INFO) << "Got candidate " << id.to_str() << " with " << candidate.out_msg_queue_proof_broadcasts.size()
<< " out msg queue proof broadcasts";
for (auto broadcast : candidate.out_msg_queue_proof_broadcasts) {
callback_->send_out_msg_queue_proof_broadcast(broadcast);
}
td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise));
}
@ -3519,7 +3524,7 @@ void ValidatorManagerImpl::del_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh
} else {
td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard);
}
}
};
void ValidatorManagerImpl::get_collation_manager_stats(
td::Promise<tl_object_ptr<ton_api::engine_validator_collationManagerStats>> promise) {
@ -3569,6 +3574,16 @@ void ValidatorManagerImpl::get_collation_manager_stats(
td::actor::send_closure(callback, &Cb::dec_pending);
}
void ValidatorManagerImpl::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof) {
if (!collator_nodes_.empty()) {
if (out_msg_queue_importer_.empty()) {
out_msg_queue_importer_ = td::actor::create_actor<OutMsgQueueImporter>("outmsgqueueimporter", actor_id(this),
opts_, last_masterchain_state_);
}
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::add_out_msg_queue_proof, dst_shard,
std::move(proof));
}
}
void ValidatorManagerImpl::add_persistent_state_description(td::Ref<PersistentStateDescription> desc) {
auto now = (UnixTime)td::Clocks::system();
if (desc->end_time <= now) {

View file

@ -640,6 +640,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override;
void del_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override;
void add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof) override;
void get_collation_manager_stats(
td::Promise<tl_object_ptr<ton_api::engine_validator_collationManagerStats>> promise) override;

View file

@ -66,8 +66,14 @@ void ValidatorGroup::generate_block_candidate(
std::move(R));
};
td::uint64 max_answer_size = config_.max_block_size + config_.max_collated_data_size + 1024;
auto block_candidate_priority = BlockCandidatePriority{
.round = source_info.round,
.first_block_round = source_info.first_block_round,
.priority = source_info.source_priority
};
td::actor::send_closure(collation_manager_, &CollationManager::collate_block, shard_, min_masterchain_block_id_,
prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()}, validator_set_,
prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()},
block_candidate_priority, validator_set_,
max_answer_size, cancellation_token_source_.get_cancellation_token(), std::move(P));
}

View file

@ -198,6 +198,9 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) = 0;
virtual void send_broadcast(BlockBroadcast broadcast, int mode) = 0;
virtual void send_out_msg_queue_proof_broadcast(td::Ref<OutMsgQueueProofBroadcast> broadcats) {
LOG(ERROR) << "Unimplemented send_out_msg_queue_proof_broadcast - ignore broadcast";
}
virtual void download_block(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<ReceivedBlock> promise) = 0;
virtual void download_zero_state(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
@ -325,6 +328,10 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) = 0;
virtual void del_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) = 0;
virtual void add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof) {
LOG(ERROR) << "Unimplemented add_out_msg_queu_proof - ignore broadcast";
}
virtual void get_collation_manager_stats(
td::Promise<tl_object_ptr<ton_api::engine_validator_collationManagerStats>> promise) = 0;
};