1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Improve CollatorNode

* Keep track of validator groups
* Pre-generate shard blocks
This commit is contained in:
SpyCheese 2024-06-25 14:06:15 +03:00
parent 3695bf0797
commit 90d2edf535
12 changed files with 410 additions and 174 deletions

View file

@ -21,6 +21,7 @@
#include "block-db.h"
#include "td/utils/lz4.h"
#include "checksum.h"
#include "impl/shard.hpp"
#include "validator-session/candidate-serializer.h"
namespace ton::validator {
@ -33,7 +34,7 @@ CollatorNode::CollatorNode(adnl::AdnlNodeIdShort local_id, td::actor::ActorId<Va
void CollatorNode::start_up() {
class Cb : public adnl::Adnl::Callback {
public:
Cb(td::actor::ActorId<CollatorNode> id) : id_(std::move(id)) {
explicit Cb(td::actor::ActorId<CollatorNode> id) : id_(std::move(id)) {
}
void receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data) override {
}
@ -57,54 +58,195 @@ void CollatorNode::tear_down() {
}
void CollatorNode::add_shard(ShardIdFull shard) {
if (std::find(shards_.begin(), shards_.end(), shard) != shards_.end()) {
CHECK(shard.is_valid_ext() && !shard.is_masterchain());
if (std::find(collating_shards_.begin(), collating_shards_.end(), shard) != collating_shards_.end()) {
return;
}
LOG(INFO) << "Collator node: local_id=" << local_id_ << " , shard=" << shard.to_str();
shards_.push_back(shard);
collating_shards_.push_back(shard);
}
void CollatorNode::del_shard(ShardIdFull shard) {
auto it = std::find(shards_.begin(), shards_.end(), shard);
if (it != shards_.end()) {
shards_.erase(it);
auto it = std::find(collating_shards_.begin(), collating_shards_.end(), shard);
if (it != collating_shards_.end()) {
collating_shards_.erase(it);
}
}
void CollatorNode::new_masterchain_block_notification(td::Ref<MasterchainState> state) {
last_masterchain_block_ = state->get_block_id();
last_top_blocks_.clear();
last_top_blocks_[ShardIdFull{masterchainId, shardIdAll}] = last_masterchain_block_;
for (const auto& desc : state->get_shards()) {
last_top_blocks_[desc->shard()] = desc->top_block_id();
}
if (validators_.empty() || state->is_key_state()) {
validators_.clear();
last_masterchain_state_ = state;
if (validator_adnl_ids_.empty() || state->is_key_state()) {
validator_adnl_ids_.clear();
for (int next : {-1, 0, 1}) {
td::Ref<ValidatorSet> vals = state->get_total_validator_set(next);
if (vals.not_null()) {
for (const ValidatorDescr& descr : vals->export_vector()) {
if (descr.addr.is_zero()) {
validators_.insert(
validator_adnl_ids_.insert(
adnl::AdnlNodeIdShort(PublicKey(pubkeys::Ed25519{descr.key.as_bits256()}).compute_short_id()));
} else {
validators_.insert(adnl::AdnlNodeIdShort(descr.addr));
validator_adnl_ids_.insert(adnl::AdnlNodeIdShort(descr.addr));
}
}
}
}
}
// Remove old cache entries
auto it = cache_.begin();
while (it != cache_.end()) {
auto prev_block_id = std::get<2>(it->first)[0];
auto top_block = get_shard_top_block(prev_block_id.shard_full());
if (top_block && top_block.value().seqno() > prev_block_id.seqno()) {
it = cache_.erase(it);
std::map<ShardIdFull, std::vector<BlockIdExt>> new_shards;
for (auto& v : state->get_shards()) {
auto shard = v->shard();
if (v->before_split()) {
CHECK(!v->before_merge());
new_shards.emplace(shard_child(shard, true), std::vector{v->top_block_id()});
new_shards.emplace(shard_child(shard, false), std::vector{v->top_block_id()});
} else if (v->before_merge()) {
ShardIdFull p_shard = shard_parent(shard);
auto it = new_shards.find(p_shard);
if (it == new_shards.end()) {
new_shards.emplace(p_shard, std::vector<BlockIdExt>(2));
}
bool left = shard_child(p_shard.shard, true) == shard.shard;
new_shards[p_shard][left ? 0 : 1] = v->top_block_id();
} else {
++it;
new_shards.emplace(shard, std::vector{v->top_block_id()});
}
}
for (auto& [shard, prev] : new_shards) {
CatchainSeqno cc_seqno = state->get_validator_set(shard)->get_catchain_seqno();
auto it = validator_groups_.emplace(shard, ValidatorGroupInfo{});
ValidatorGroupInfo& info = it.first->second;
if (it.second || info.cc_seqno != cc_seqno) {
info.cleanup();
info.cc_seqno = cc_seqno;
}
}
for (auto it = validator_groups_.begin(); it != validator_groups_.end();) {
if (new_shards.count(it->first)) {
++it;
} else {
it->second.cleanup();
it = validator_groups_.erase(it);
}
}
for (auto& [shard, prev] : new_shards) {
ValidatorGroupInfo& info = validator_groups_[shard];
update_validator_group_info(shard, std::move(prev), info.cc_seqno);
auto it = future_validator_groups_.find({shard, info.cc_seqno});
if (it != future_validator_groups_.end()) {
for (auto& new_prev : it->second.pending_blocks) {
update_validator_group_info(shard, std::move(new_prev), info.cc_seqno);
}
for (auto& promise : it->second.promises) {
promise.set_value(td::Unit());
}
future_validator_groups_.erase(it);
}
}
for (auto it = future_validator_groups_.begin(); it != future_validator_groups_.end(); ++it) {
if (get_future_validator_group(it->first.first, it->first.second).is_ok()) {
++it;
} else {
auto& future_group = it->second;
for (auto& promise : future_group.promises) {
promise.set_error(td::Status::Error("validator group is outdated"));
}
it = future_validator_groups_.erase(it);
}
}
}
void CollatorNode::update_validator_group_info(ShardIdFull shard, std::vector<BlockIdExt> prev,
CatchainSeqno cc_seqno) {
if (!can_collate_shard(shard)) {
return;
}
CHECK(prev.size() == 1 || prev.size() == 2);
BlockSeqno next_block_seqno = prev[0].seqno() + 1;
if (prev.size() == 2) {
next_block_seqno = std::max(next_block_seqno, prev[1].seqno() + 1);
}
auto it = validator_groups_.find(shard);
if (it != validator_groups_.end()) {
ValidatorGroupInfo& info = it->second;
if (info.cc_seqno == cc_seqno) { // block from currently known validator group
if (info.next_block_seqno < next_block_seqno) {
LOG(DEBUG) << "updated validator group info: shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno
<< ", next_block_seqno=" << next_block_seqno;
info.next_block_seqno = next_block_seqno;
info.prev = std::move(prev);
for (auto cache_it = info.cache.begin(); cache_it != info.cache.end();) {
auto& [cached_prev, cache_entry] = *cache_it;
if (cache_entry->block_seqno < info.next_block_seqno) {
cache_entry->cancel(td::Status::Error(PSTRING() << "next block seqno " << cache_entry->block_seqno
<< " is too small, expected " << info.next_block_seqno));
cache_it = info.cache.erase(cache_it);
continue;
}
if (cache_entry->block_seqno == info.next_block_seqno && cached_prev != info.prev) {
cache_entry->cancel(td::Status::Error("invalid prev blocks"));
cache_it = info.cache.erase(cache_it);
continue;
}
++cache_it;
}
if (do_pregen_) {
generate_block(shard, cc_seqno, info.prev, td::Timestamp::in(10.0), [](td::Result<BlockCandidate>) {});
}
}
return;
}
}
auto future_validator_group = get_future_validator_group(shard, cc_seqno);
if (future_validator_group.is_ok()) {
// future validator group, remember for later
future_validator_group.ok()->pending_blocks.push_back(std::move(prev));
}
}
td::Result<CollatorNode::FutureValidatorGroup*> CollatorNode::get_future_validator_group(ShardIdFull shard,
CatchainSeqno cc_seqno) {
auto it = validator_groups_.find(shard);
if (it == validator_groups_.end() && shard.pfx_len() != 0) {
it = validator_groups_.find(shard_parent(shard));
}
if (it == validator_groups_.end() && shard.pfx_len() < max_shard_pfx_len) {
it = validator_groups_.find(shard_child(shard, true));
}
if (it == validator_groups_.end() && shard.pfx_len() < max_shard_pfx_len) {
it = validator_groups_.find(shard_child(shard, false));
}
if (it == validator_groups_.end()) {
return td::Status::Error("no such shard");
}
if (cc_seqno < it->second.cc_seqno) { // past validator group
return td::Status::Error(PSTRING() << "cc_seqno " << cc_seqno << " is outdated (current is" << it->second.cc_seqno
<< ")");
}
if (cc_seqno - it->second.cc_seqno > 1) { // future validator group, cc_seqno too big
return td::Status::Error(PSTRING() << "cc_seqno " << cc_seqno << " is too big (currently known is"
<< it->second.cc_seqno << ")");
}
// future validator group
return &future_validator_groups_[{shard, cc_seqno}];
}
void CollatorNode::ValidatorGroupInfo::cleanup() {
prev.clear();
next_block_seqno = 0;
for (auto& [_, cache_entry] : cache) {
cache_entry->cancel(td::Status::Error("validator group is outdated"));
}
cache.clear();
}
void CollatorNode::CacheEntry::cancel(td::Status reason) {
for (auto& promise : promises) {
promise.set_error(reason.clone());
}
promises.clear();
cancellation_token_source.cancel();
}
static td::BufferSlice serialize_error(td::Status error) {
@ -136,111 +278,121 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data
td::Promise<td::BufferSlice> promise) {
td::Promise<BlockCandidate> new_promise = [promise = std::move(promise), src](td::Result<BlockCandidate> R) mutable {
if (R.is_error()) {
LOG(WARNING) << "Query from " << src << ", error: " << R.error();
promise.set_result(serialize_error(R.move_as_error()));
LOG(INFO) << "adnl query from " << src << ", error: " << R.error();
if (R.error().code() == ErrorCode::timeout) {
promise.set_error(R.move_as_error());
} else {
promise.set_result(serialize_error(R.move_as_error()));
}
} else {
LOG(INFO) << "Query from " << src << ", success";
LOG(INFO) << "adnl query from " << src << ", success";
promise.set_result(create_serialize_tl_object<ton_api::collatorNode_generateBlockSuccess>(
serialize_candidate(R.move_as_ok(), true)));
}
};
if (!last_masterchain_block_.is_valid()) {
new_promise.set_error(td::Status::Error("not ready"));
return;
}
if (!validators_.count(src)) {
if (!validator_adnl_ids_.count(src)) {
new_promise.set_error(td::Status::Error("src is not a validator"));
}
TRY_RESULT_PROMISE(new_promise, f, fetch_tl_object<ton_api::collatorNode_generateBlock>(std::move(data), true));
ShardIdFull shard(f->workchain_, f->shard_);
BlockIdExt min_mc_id = create_block_id(f->min_mc_id_);
LOG(INFO) << "Got query from " << src << ": shard=" << shard.to_str() << ", min_mc_seqno=" << min_mc_id.seqno();
ShardIdFull shard = create_shard_id(f->shard_);
CatchainSeqno cc_seqno = f->cc_seqno_;
std::vector<BlockIdExt> prev_blocks;
for (const auto& b : f->prev_blocks_) {
prev_blocks.push_back(create_block_id(b));
}
Ed25519_PublicKey creator(f->creator_);
new_promise = [new_promise = std::move(new_promise), creator](td::Result<BlockCandidate> R) mutable {
TRY_RESULT_PROMISE(new_promise, block, std::move(R));
new_promise.set_value(change_creator(std::move(block), creator));
};
if (!shard.is_valid_ext()) {
new_promise.set_error(td::Status::Error(PSTRING() << "invalid shard " << shard.to_str()));
return;
}
if (!can_collate_shard(shard)) {
new_promise.set_error(td::Status::Error(PSTRING() << "this node doesn't collate shard " << shard.to_str()));
if (prev_blocks.size() != 1 && prev_blocks.size() != 2) {
new_promise.set_error(td::Status::Error(PSTRING() << "invalid size of prev_blocks: " << prev_blocks.size()));
return;
}
if (f->prev_blocks_.size() != 1 && f->prev_blocks_.size() != 2) {
new_promise.set_error(td::Status::Error(PSTRING() << "invalid size of prev_blocks: " << f->prev_blocks_.size()));
return;
}
if (!min_mc_id.is_masterchain_ext()) {
new_promise.set_error(td::Status::Error("min_mc_id is not form masterchain"));
return;
}
std::vector<BlockIdExt> prev_blocks;
for (const auto& b : f->prev_blocks_) {
auto id = create_block_id(b);
if (!id.is_valid_full()) {
new_promise.set_error(td::Status::Error("invalid prev_block"));
return;
}
auto top_block = get_shard_top_block(id.shard_full());
if (top_block && top_block.value().seqno() > id.seqno()) {
new_promise.set_error(td::Status::Error("cannot collate block: already exists in blockchain"));
return;
}
prev_blocks.push_back(id);
}
Ed25519_PublicKey creator(f->creator_);
if (!shard.is_masterchain()) {
// Collation of masterchain cannot be cached because changing "created_by" in masterchain is hard
// It does not really matter because validators can collate masterchain themselves
new_promise = [promise = std::move(new_promise), creator](td::Result<BlockCandidate> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error());
} else {
promise.set_result(change_creator(R.move_as_ok(), creator));
}
};
auto cache_key = std::make_tuple(min_mc_id.seqno(), shard, prev_blocks);
auto cache_entry = cache_[cache_key];
if (cache_entry == nullptr) {
cache_entry = cache_[cache_key] = std::make_shared<CacheEntry>();
}
if (cache_entry->result) {
LOG(INFO) << "Using cached result";
new_promise.set_result(cache_entry->result.value().clone());
return;
}
cache_entry->promises.push_back(std::move(new_promise));
if (cache_entry->started) {
LOG(INFO) << "Collating of this block is already in progress, waiting";
return;
}
cache_entry->started = true;
new_promise = [SelfId = actor_id(this), cache_entry](td::Result<BlockCandidate> R) mutable {
td::actor::send_closure(SelfId, &CollatorNode::process_result, cache_entry, std::move(R));
};
}
auto P = td::PromiseCreator::lambda([=, SelfId = actor_id(this), prev_blocks = std::move(prev_blocks),
promise = std::move(new_promise)](td::Result<td::Ref<ShardState>> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error_prefix("failed to get masterchain state: "));
} else {
td::Ref<MasterchainState> state(R.move_as_ok());
if (state.is_null()) {
promise.set_error(R.move_as_error_prefix("failed to get masterchain state: "));
return;
}
td::actor::send_closure(SelfId, &CollatorNode::receive_query_cont, shard, std::move(state),
std::move(prev_blocks), creator, std::move(promise));
}
});
td::actor::send_closure(manager_, &ValidatorManager::wait_block_state_short, min_mc_id, 1, td::Timestamp::in(5.0),
std::move(P));
LOG(INFO) << "got adnl query from " << src << ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno;
generate_block(shard, cc_seqno, std::move(prev_blocks), td::Timestamp::in(10.0), std::move(new_promise));
}
void CollatorNode::receive_query_cont(ShardIdFull shard, td::Ref<MasterchainState> min_mc_state,
std::vector<BlockIdExt> prev_blocks, Ed25519_PublicKey creator,
td::Promise<BlockCandidate> promise) {
run_collate_query(shard, min_mc_state->get_block_id(), std::move(prev_blocks), creator,
min_mc_state->get_validator_set(shard), manager_, td::Timestamp::in(10.0), std::move(promise),
CollateMode::skip_store_candidate);
void CollatorNode::generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std::vector<BlockIdExt> prev_blocks,
td::Timestamp timeout, td::Promise<BlockCandidate> promise) {
if (last_masterchain_state_.is_null()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "not ready"));
return;
}
if (!can_collate_shard(shard)) {
promise.set_error(td::Status::Error(PSTRING() << "this node can't collate shard " << shard.to_str()));
return;
}
auto it = validator_groups_.find(shard);
if (it == validator_groups_.end() || it->second.cc_seqno != cc_seqno) {
TRY_RESULT_PROMISE(promise, future_validator_group, get_future_validator_group(shard, cc_seqno));
future_validator_group->promises.push_back([=, SelfId = actor_id(this), prev_blocks = std::move(prev_blocks),
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error());
return;
}
if (timeout.is_in_past()) {
promise.set_error(td::Status::Error(ErrorCode::timeout));
return;
}
td::actor::send_closure(SelfId, &CollatorNode::generate_block, shard, cc_seqno, std::move(prev_blocks), timeout,
std::move(promise));
});
return;
}
ValidatorGroupInfo& validator_group_info = it->second;
BlockSeqno block_seqno = prev_blocks.at(0).seqno() + 1;
if (prev_blocks.size() == 2) {
block_seqno = std::max(block_seqno, prev_blocks.at(1).seqno() + 1);
}
if (validator_group_info.next_block_seqno > block_seqno) {
promise.set_error(td::Status::Error(PSTRING() << "next block seqno " << block_seqno << " is too small, expected "
<< validator_group_info.next_block_seqno));
return;
}
if (validator_group_info.next_block_seqno == block_seqno && validator_group_info.prev != prev_blocks) {
promise.set_error(td::Status::Error("invalid prev_blocks"));
return;
}
auto cache_entry = validator_group_info.cache[prev_blocks];
if (cache_entry == nullptr) {
cache_entry = validator_group_info.cache[prev_blocks] = std::make_shared<CacheEntry>();
}
if (cache_entry->result) {
LOG(INFO) << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno
<< ": using cached result";
promise.set_result(cache_entry->result.value().clone());
return;
}
cache_entry->promises.push_back(std::move(promise));
if (cache_entry->started) {
LOG(INFO) << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno
<< ": collation in progress, waiting";
return;
}
LOG(INFO) << "generate block query"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno
<< ": starting collation";
cache_entry->started = true;
cache_entry->block_seqno = block_seqno;
run_collate_query(
shard, last_masterchain_state_->get_block_id(), std::move(prev_blocks), Ed25519_PublicKey{td::Bits256::zero()},
last_masterchain_state_->get_validator_set(shard), manager_, timeout,
[=, SelfId = actor_id(this), timer = td::Timer{}](td::Result<BlockCandidate> R) {
LOG(INFO) << "generate block result"
<< ": shard=" << shard.to_str() << ", cc_seqno=" << cc_seqno << ", next_block_seqno=" << block_seqno
<< ", time=" << timer.elapsed() << ": " << (R.is_ok() ? "OK" : R.error().to_string());
td::actor::send_closure(SelfId, &CollatorNode::process_result, cache_entry, std::move(R));
},
cache_entry->cancellation_token_source.get_cancellation_token(), CollateMode::skip_store_candidate);
}
void CollatorNode::process_result(std::shared_ptr<CacheEntry> cache_entry, td::Result<BlockCandidate> R) {
@ -259,7 +411,7 @@ void CollatorNode::process_result(std::shared_ptr<CacheEntry> cache_entry, td::R
}
bool CollatorNode::can_collate_shard(ShardIdFull shard) const {
return std::any_of(shards_.begin(), shards_.end(),
return std::any_of(collating_shards_.begin(), collating_shards_.end(),
[&](const ShardIdFull& our_shard) { return shard_intersects(shard, our_shard); });
}