1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Merge branch 'testnet' into accelerator

This commit is contained in:
SpyCheese 2024-10-09 21:10:36 +03:00
commit df21424047
44 changed files with 373 additions and 180 deletions

View file

@ -18,6 +18,7 @@
*/
#include "validator-group.hpp"
#include "fabric.h"
#include "full-node-master.hpp"
#include "ton/ton-io.hpp"
#include "td/utils/overloaded.h"
#include "common/delay.h"
@ -30,8 +31,14 @@ namespace ton {
namespace validator {
static bool need_send_candidate_broadcast(const validatorsession::BlockSourceInfo &source_info, bool is_masterchain) {
return source_info.first_block_round == source_info.round && source_info.source_priority == 0 && !is_masterchain;
}
void ValidatorGroup::generate_block_candidate(
td::uint32 round_id, td::Promise<validatorsession::ValidatorSession::GeneratedCandidate> promise) {
validatorsession::BlockSourceInfo source_info,
td::Promise<validatorsession::ValidatorSession::GeneratedCandidate> promise) {
td::uint32 round_id = source_info.round;
if (round_id > last_known_round_id_) {
last_known_round_id_ = round_id;
}
@ -53,14 +60,16 @@ void ValidatorGroup::generate_block_candidate(
cached_collated_block_->promises.push_back(promise.wrap([](BlockCandidate &&res) {
return validatorsession::ValidatorSession::GeneratedCandidate{std::move(res), false};
}));
td::Promise<BlockCandidate> P = [SelfId = actor_id(this),
cache = cached_collated_block_](td::Result<BlockCandidate> R) {
td::actor::send_closure(SelfId, &ValidatorGroup::generated_block_candidate, std::move(cache), std::move(R));
td::Promise<BlockCandidate> P = [SelfId = actor_id(this), cache = cached_collated_block_,
source_info](td::Result<BlockCandidate> R) {
td::actor::send_closure(SelfId, &ValidatorGroup::generated_block_candidate, source_info, std::move(cache),
std::move(R));
};
collate_block(round_id, td::Timestamp::in(10.0), std::move(P));
collate_block(source_info, td::Timestamp::in(10.0), std::move(P));
}
void ValidatorGroup::generated_block_candidate(std::shared_ptr<CachedCollatedBlock> cache,
void ValidatorGroup::generated_block_candidate(validatorsession::BlockSourceInfo source_info,
std::shared_ptr<CachedCollatedBlock> cache,
td::Result<BlockCandidate> R) {
if (R.is_error()) {
for (auto &p : cache->promises) {
@ -72,6 +81,9 @@ void ValidatorGroup::generated_block_candidate(std::shared_ptr<CachedCollatedBlo
} else {
auto candidate = R.move_as_ok();
add_available_block_candidate(candidate.pubkey.as_bits256(), candidate.id, candidate.collated_file_hash);
if (need_send_candidate_broadcast(source_info, shard_.is_masterchain())) {
send_block_candidate_broadcast(candidate.id, candidate.data.clone());
}
cache->result = std::move(candidate);
for (auto &p : cache->promises) {
p.set_value(cache->result.value().clone());
@ -80,8 +92,9 @@ void ValidatorGroup::generated_block_candidate(std::shared_ptr<CachedCollatedBlo
cache->promises.clear();
}
void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidate block,
void ValidatorGroup::validate_block_candidate(validatorsession::BlockSourceInfo source_info, BlockCandidate block,
td::Promise<std::pair<UnixTime, bool>> promise) {
td::uint32 round_id = source_info.round;
if (round_id > last_known_round_id_) {
last_known_round_id_ = round_id;
}
@ -100,7 +113,7 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat
return;
}
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), round_id, block = block.clone(),
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), source_info, block = block.clone(),
promise = std::move(promise)](td::Result<ValidateCandidateResult> R) mutable {
if (R.is_error()) {
auto S = R.move_as_error();
@ -108,19 +121,22 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat
LOG(ERROR) << "failed to validate candidate: " << S;
}
delay_action(
[SelfId, round_id, block = std::move(block), promise = std::move(promise)]() mutable {
td::actor::send_closure(SelfId, &ValidatorGroup::validate_block_candidate, round_id, std::move(block),
std::move(promise));
[SelfId, source_info, block = std::move(block), promise = std::move(promise)]() mutable {
td::actor::send_closure(SelfId, &ValidatorGroup::validate_block_candidate, std::move(source_info),
std::move(block), std::move(promise));
},
td::Timestamp::in(0.1));
} else {
auto v = R.move_as_ok();
v.visit(td::overloaded(
[&](UnixTime ts) {
td::actor::send_closure(SelfId, &ValidatorGroup::update_approve_cache, block_to_cache_key(block),
ts);
td::actor::send_closure(SelfId, &ValidatorGroup::update_approve_cache, block_to_cache_key(block), ts);
td::actor::send_closure(SelfId, &ValidatorGroup::add_available_block_candidate, block.pubkey.as_bits256(),
block.id, block.collated_file_hash);
if (need_send_candidate_broadcast(source_info, block.id.is_masterchain())) {
td::actor::send_closure(SelfId, &ValidatorGroup::send_block_candidate_broadcast, block.id,
block.data.clone());
}
promise.set_value({ts, false});
},
[&](CandidateReject reject) {
@ -142,13 +158,14 @@ void ValidatorGroup::update_approve_cache(CacheKey key, UnixTime value) {
approved_candidates_cache_[key] = value;
}
void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash src, td::BufferSlice block_data,
void ValidatorGroup::accept_block_candidate(validatorsession::BlockSourceInfo source_info, td::BufferSlice block_data,
RootHash root_hash, FileHash file_hash,
std::vector<BlockSignature> signatures,
std::vector<BlockSignature> approve_signatures,
validatorsession::ValidatorSessionStats stats,
td::Promise<td::Unit> promise) {
stats.cc_seqno = validator_set_->get_catchain_seqno();
td::uint32 round_id = source_info.round;
if (round_id >= last_known_round_id_) {
last_known_round_id_ = round_id + 1;
}
@ -167,8 +184,23 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
td::actor::send_closure(manager_, &ValidatorManager::log_validator_session_stats, next_block_id, std::move(stats));
auto block =
block_data.size() > 0 ? create_block(next_block_id, std::move(block_data)).move_as_ok() : td::Ref<BlockData>{};
// Creator of the block sends broadcast to public overlays
// Creator of the block sends broadcast to private block overlay unless candidate broadcast was sent
// Any node sends broadcast to custom overlays unless candidate broadcast was sent
int send_broadcast_mode = 0;
bool sent_candidate = sent_candidate_broadcasts_.contains(block->block_id());
if (source_info.source.compute_short_id() == local_id_) {
send_broadcast_mode |= fullnode::FullNode::broadcast_mode_public;
if (!sent_candidate) {
send_broadcast_mode |= fullnode::FullNode::broadcast_mode_private_block;
}
}
if (!sent_candidate) {
send_broadcast_mode |= fullnode::FullNode::broadcast_mode_custom;
}
accept_block_query(next_block_id, std::move(block), std::move(prev_block_ids_), std::move(sig_set),
std::move(approve_sig_set), src == local_id_, std::move(promise));
std::move(approve_sig_set), send_broadcast_mode, std::move(promise));
prev_block_ids_ = std::vector<BlockIdExt>{next_block_id};
cached_collated_block_ = nullptr;
approved_candidates_cache_.clear();
@ -177,7 +209,7 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData> block, std::vector<BlockIdExt> prev,
td::Ref<BlockSignatureSet> sig_set, td::Ref<BlockSignatureSet> approve_sig_set,
bool send_broadcast, td::Promise<td::Unit> promise, bool is_retry) {
int send_broadcast_mode, td::Promise<td::Unit> promise, bool is_retry) {
auto P = td::PromiseCreator::lambda([=, SelfId = actor_id(this),
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
@ -187,7 +219,7 @@ void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData>
}
LOG_CHECK(R.error().code() == ErrorCode::timeout || R.error().code() == ErrorCode::notready) << R.move_as_error();
td::actor::send_closure(SelfId, &ValidatorGroup::accept_block_query, block_id, std::move(block),
std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast,
std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast_mode,
std::move(promise), true);
} else {
promise.set_value(R.move_as_ok());
@ -195,7 +227,7 @@ void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData>
});
run_accept_block_query(block_id, std::move(block), std::move(prev), validator_set_, std::move(sig_set),
std::move(approve_sig_set), send_broadcast, monitoring_shard_, manager_, std::move(P));
std::move(approve_sig_set), send_broadcast_mode, monitoring_shard_, manager_, std::move(P));
}
void ValidatorGroup::skip_round(td::uint32 round_id) {
@ -231,8 +263,9 @@ std::unique_ptr<validatorsession::ValidatorSession::Callback> ValidatorGroup::ma
public:
Callback(td::actor::ActorId<ValidatorGroup> id) : id_(id) {
}
void on_candidate(td::uint32 round, PublicKey source, validatorsession::ValidatorSessionRootHash root_hash,
td::BufferSlice data, td::BufferSlice collated_data,
void on_candidate(validatorsession::BlockSourceInfo source_info,
validatorsession::ValidatorSessionRootHash root_hash, td::BufferSlice data,
td::BufferSlice collated_data,
td::Promise<validatorsession::ValidatorSession::CandidateDecision> promise) override {
auto P =
td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<std::pair<td::uint32, bool>> R) mutable {
@ -247,18 +280,19 @@ std::unique_ptr<validatorsession::ValidatorSession::Callback> ValidatorGroup::ma
}
});
BlockCandidate candidate{Ed25519_PublicKey{source.ed25519_value().raw()},
BlockCandidate candidate{Ed25519_PublicKey{source_info.source.ed25519_value().raw()},
BlockIdExt{0, 0, 0, root_hash, sha256_bits256(data.as_slice())},
sha256_bits256(collated_data.as_slice()), data.clone(), collated_data.clone()};
td::actor::send_closure(id_, &ValidatorGroup::validate_block_candidate, round, std::move(candidate),
std::move(P));
td::actor::send_closure(id_, &ValidatorGroup::validate_block_candidate, std::move(source_info),
std::move(candidate), std::move(P));
}
void on_generate_slot(td::uint32 round,
void on_generate_slot(validatorsession::BlockSourceInfo source_info,
td::Promise<validatorsession::ValidatorSession::GeneratedCandidate> promise) override {
td::actor::send_closure(id_, &ValidatorGroup::generate_block_candidate, round, std::move(promise));
td::actor::send_closure(id_, &ValidatorGroup::generate_block_candidate, std::move(source_info),
std::move(promise));
}
void on_block_committed(td::uint32 round, PublicKey source, validatorsession::ValidatorSessionRootHash root_hash,
void on_block_committed(validatorsession::BlockSourceInfo source_info, validatorsession::ValidatorSessionRootHash root_hash,
validatorsession::ValidatorSessionFileHash file_hash, td::BufferSlice data,
std::vector<std::pair<PublicKeyHash, td::BufferSlice>> signatures,
std::vector<std::pair<PublicKeyHash, td::BufferSlice>> approve_signatures,
@ -272,9 +306,9 @@ std::unique_ptr<validatorsession::ValidatorSession::Callback> ValidatorGroup::ma
approve_sigs.emplace_back(BlockSignature{sig.first.bits256_value(), std::move(sig.second)});
}
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit>) {});
td::actor::send_closure(id_, &ValidatorGroup::accept_block_candidate, round, source.compute_short_id(),
std::move(data), root_hash, file_hash, std::move(sigs), std::move(approve_sigs),
std::move(stats), std::move(P));
td::actor::send_closure(id_, &ValidatorGroup::accept_block_candidate, std::move(source_info), std::move(data),
root_hash, file_hash, std::move(sigs), std::move(approve_sigs), std::move(stats),
std::move(P));
}
void on_block_skipped(td::uint32 round) override {
td::actor::send_closure(id_, &ValidatorGroup::skip_round, round);
@ -363,7 +397,7 @@ void ValidatorGroup::start(std::vector<BlockIdExt> prev, BlockIdExt min_masterch
auto block =
p.block.size() > 0 ? create_block(next_block_id, std::move(p.block)).move_as_ok() : td::Ref<BlockData>{};
accept_block_query(next_block_id, std::move(block), std::move(prev_block_ids_), std::move(p.sigs),
std::move(p.approve_sigs), false, std::move(p.promise));
std::move(p.approve_sigs), 0, std::move(p.promise));
prev_block_ids_ = std::vector<BlockIdExt>{next_block_id};
}
postponed_accept_.clear();
@ -463,9 +497,9 @@ void ValidatorGroup::get_validator_group_info_for_litequery_cont(
promise.set_result(std::move(result));
}
void ValidatorGroup::collate_block(td::uint32 round_id, td::Timestamp timeout, td::Promise<BlockCandidate> promise,
unsigned max_retries) {
if (round_id < last_known_round_id_) {
void ValidatorGroup::collate_block(validatorsession::BlockSourceInfo source_info, td::Timestamp timeout,
td::Promise<BlockCandidate> promise, unsigned max_retries) {
if (source_info.round < last_known_round_id_) {
promise.set_error(td::Status::Error("too old"));
return;
}
@ -528,7 +562,7 @@ void ValidatorGroup::collate_block(td::uint32 round_id, td::Timestamp timeout, t
LOG(WARNING) << "collate query for " << next_block_id.to_str() << ": " << R.error() << ", time=" << timer.elapsed()
<< "s, " << (retry ? "retrying" : "giving up");
if (retry) {
td::actor::send_closure(SelfId, &ValidatorGroup::collate_block, round_id, timeout, std::move(promise),
td::actor::send_closure(SelfId, &ValidatorGroup::collate_block, source_info, timeout, std::move(promise),
max_retries - 1);
} else {
promise.set_result(td::Status::Error(ErrorCode::timeout, "timeout"));
@ -549,7 +583,7 @@ void ValidatorGroup::collate_block(td::uint32 round_id, td::Timestamp timeout, t
promise.set_error(R.move_as_error_prefix("rldp query failed: "));
return;
}
td::actor::send_closure(SelfId, &ValidatorGroup::receive_collate_query_response, round_id, R.move_as_ok(),
td::actor::send_closure(SelfId, &ValidatorGroup::receive_collate_query_response, source_info, R.move_as_ok(),
trusted_collator, std::move(promise));
});
LOG(INFO) << "sending collate query for " << next_block_id.to_str() << ": send to " << collator_adnl_id;
@ -560,24 +594,24 @@ void ValidatorGroup::collate_block(td::uint32 round_id, td::Timestamp timeout, t
std::move(P), timeout, std::move(query), max_answer_size);
}
void ValidatorGroup::receive_collate_query_response(td::uint32 round_id, td::BufferSlice data, bool trusted_collator,
td::Promise<BlockCandidate> promise) {
if (round_id < last_known_round_id_) {
void ValidatorGroup::receive_collate_query_response(validatorsession::BlockSourceInfo source_info, td::BufferSlice data,
bool trusted_collator, td::Promise<BlockCandidate> promise) {
if (source_info.round < last_known_round_id_) {
promise.set_error(td::Status::Error("too old"));
return;
}
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::collatorNode_GenerateBlockResult>(data, true));
td::Result<BlockCandidate> res;
ton_api::downcast_call(*f, td::overloaded(
[&](ton_api::collatorNode_generateBlockError &r) {
td::Status error = td::Status::Error(r.code_, r.message_);
res = error.move_as_error_prefix("collate query: ");
},
[&](ton_api::collatorNode_generateBlockSuccess &r) {
res = CollatorNode::deserialize_candidate(
std::move(r.candidate_),
config_.max_block_size + config_.max_collated_data_size + 1024);
}));
[&](ton_api::collatorNode_generateBlockError &r) {
td::Status error = td::Status::Error(r.code_, r.message_);
res = error.move_as_error_prefix("collate query: ");
},
[&](ton_api::collatorNode_generateBlockSuccess &r) {
res = CollatorNode::deserialize_candidate(
std::move(r.candidate_),
config_.max_block_size + config_.max_collated_data_size + 1024);
}));
TRY_RESULT_PROMISE(promise, candidate, std::move(res));
if (candidate.pubkey.as_bits256() != local_id_full_.ed25519_value().raw()) {
promise.set_error(td::Status::Error("collate query: block candidate source mismatch"));
@ -600,7 +634,7 @@ void ValidatorGroup::receive_collate_query_response(td::uint32 round_id, td::Buf
}
promise.set_result(std::move(candidate));
});
validate_block_candidate(round_id, std::move(candidate), std::move(P));
validate_block_candidate(source_info, std::move(candidate), std::move(P));
}
} // namespace validator