mirror of
https://github.com/ton-blockchain/ton
synced 2025-02-12 11:12:16 +00:00
Send only first block candidate optimistically (#1260)
* Broadcast only the first block candidate * Fix sending block broadcast --------- Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
parent
8364a2425f
commit
1da94e62ad
22 changed files with 164 additions and 82 deletions
|
@ -249,7 +249,7 @@ class HardforkCreator : public td::actor::Actor {
|
|||
void send_block_candidate(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
|
||||
td::BufferSlice data) override {
|
||||
}
|
||||
void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override {
|
||||
void send_broadcast(ton::BlockBroadcast broadcast, int mode) override {
|
||||
}
|
||||
void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
|
||||
td::Promise<ton::ReceivedBlock> promise) override {
|
||||
|
|
|
@ -350,7 +350,7 @@ class TestNode : public td::actor::Actor {
|
|||
void send_block_candidate(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
|
||||
td::BufferSlice data) override {
|
||||
}
|
||||
void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override {
|
||||
void send_broadcast(ton::BlockBroadcast broadcast, int mode) override {
|
||||
}
|
||||
void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
|
||||
td::Promise<ton::ReceivedBlock> promise) override {
|
||||
|
|
|
@ -178,6 +178,12 @@ struct EndValidatorGroupStats {
|
|||
std::vector<Node> nodes;
|
||||
};
|
||||
|
||||
struct BlockSourceInfo {
|
||||
td::uint32 round, first_block_round;
|
||||
PublicKey source;
|
||||
td::int32 source_priority;
|
||||
};
|
||||
|
||||
} // namespace validatorsession
|
||||
|
||||
} // namespace ton
|
||||
|
|
|
@ -553,7 +553,9 @@ void ValidatorSessionImpl::check_generate_slot() {
|
|||
LOG(WARNING) << print_id << ": failed to generate block candidate: " << R.move_as_error();
|
||||
}
|
||||
});
|
||||
callback_->on_generate_slot(cur_round_, std::move(P));
|
||||
callback_->on_generate_slot(
|
||||
BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(local_idx()), priority},
|
||||
std::move(P));
|
||||
} else {
|
||||
alarm_timestamp().relax(t);
|
||||
}
|
||||
|
@ -631,8 +633,10 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) {
|
|||
});
|
||||
pending_approve_.insert(block_id);
|
||||
|
||||
callback_->on_candidate(cur_round_, description().get_source_public_key(block->get_src_idx()), B->root_hash_,
|
||||
B->data_.clone(), B->collated_data_.clone(), std::move(P));
|
||||
callback_->on_candidate(
|
||||
BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(block->get_src_idx()),
|
||||
description().get_node_priority(block->get_src_idx(), cur_round_)},
|
||||
B->root_hash_, B->data_.clone(), B->collated_data_.clone(), std::move(P));
|
||||
} else if (T.is_in_past()) {
|
||||
if (!active_requests_.count(block_id)) {
|
||||
auto v = virtual_state_->get_block_approvers(description(), block_id);
|
||||
|
@ -905,15 +909,19 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) {
|
|||
stats.rounds.pop_back();
|
||||
}
|
||||
|
||||
BlockSourceInfo source_info{cur_round_, first_block_round_,
|
||||
description().get_source_public_key(block->get_src_idx()),
|
||||
description().get_node_priority(block->get_src_idx(), cur_round_)};
|
||||
if (it == blocks_.end()) {
|
||||
callback_->on_block_committed(cur_round_, description().get_source_public_key(block->get_src_idx()),
|
||||
block->get_root_hash(), block->get_file_hash(), td::BufferSlice(),
|
||||
std::move(export_sigs), std::move(export_approve_sigs), std::move(stats));
|
||||
callback_->on_block_committed(std::move(source_info), block->get_root_hash(), block->get_file_hash(),
|
||||
td::BufferSlice(), std::move(export_sigs), std::move(export_approve_sigs),
|
||||
std::move(stats));
|
||||
} else {
|
||||
callback_->on_block_committed(cur_round_, description().get_source_public_key(block->get_src_idx()),
|
||||
block->get_root_hash(), block->get_file_hash(), it->second->data_.clone(),
|
||||
std::move(export_sigs), std::move(export_approve_sigs), std::move(stats));
|
||||
callback_->on_block_committed(std::move(source_info), block->get_root_hash(), block->get_file_hash(),
|
||||
it->second->data_.clone(), std::move(export_sigs), std::move(export_approve_sigs),
|
||||
std::move(stats));
|
||||
}
|
||||
first_block_round_ = cur_round_ + 1;
|
||||
}
|
||||
cur_round_++;
|
||||
if (have_block) {
|
||||
|
|
|
@ -85,11 +85,10 @@ class ValidatorSession : public td::actor::Actor {
|
|||
|
||||
class Callback {
|
||||
public:
|
||||
virtual void on_candidate(td::uint32 round, PublicKey source, ValidatorSessionRootHash root_hash,
|
||||
td::BufferSlice data, td::BufferSlice collated_data,
|
||||
td::Promise<CandidateDecision> promise) = 0;
|
||||
virtual void on_generate_slot(td::uint32 round, td::Promise<GeneratedCandidate> promise) = 0;
|
||||
virtual void on_block_committed(td::uint32 round, PublicKey source, ValidatorSessionRootHash root_hash,
|
||||
virtual void on_candidate(BlockSourceInfo source_info, ValidatorSessionRootHash root_hash, td::BufferSlice data,
|
||||
td::BufferSlice collated_data, td::Promise<CandidateDecision> promise) = 0;
|
||||
virtual void on_generate_slot(BlockSourceInfo source_info, td::Promise<GeneratedCandidate> promise) = 0;
|
||||
virtual void on_block_committed(BlockSourceInfo source_info, ValidatorSessionRootHash root_hash,
|
||||
ValidatorSessionFileHash file_hash, td::BufferSlice data,
|
||||
std::vector<std::pair<PublicKeyHash, td::BufferSlice>> signatures,
|
||||
std::vector<std::pair<PublicKeyHash, td::BufferSlice>> approve_signatures,
|
||||
|
|
|
@ -53,7 +53,7 @@ class ValidatorSessionImpl : public ValidatorSession {
|
|||
const ValidatorSessionState *real_state_ = nullptr;
|
||||
const ValidatorSessionState *virtual_state_ = nullptr;
|
||||
|
||||
td::uint32 cur_round_ = 0;
|
||||
td::uint32 cur_round_ = 0, first_block_round_ = 0;
|
||||
td::Timestamp round_started_at_ = td::Timestamp::never();
|
||||
td::Timestamp round_debug_at_ = td::Timestamp::never();
|
||||
std::set<ValidatorSessionCandidateId> pending_approve_;
|
||||
|
|
|
@ -56,7 +56,7 @@ void run_check_external_message(td::Ref<ExtMessage> message, td::actor::ActorId<
|
|||
|
||||
void run_accept_block_query(BlockIdExt id, td::Ref<BlockData> data, std::vector<BlockIdExt> prev,
|
||||
td::Ref<ValidatorSet> validator_set, td::Ref<BlockSignatureSet> signatures,
|
||||
td::Ref<BlockSignatureSet> approve_signatures, bool send_broadcast,
|
||||
td::Ref<BlockSignatureSet> approve_signatures, int send_broadcast_mode,
|
||||
td::actor::ActorId<ValidatorManager> manager, td::Promise<td::Unit> promise);
|
||||
void run_fake_accept_block_query(BlockIdExt id, td::Ref<BlockData> data, std::vector<BlockIdExt> prev,
|
||||
td::Ref<ValidatorSet> validator_set, td::actor::ActorId<ValidatorManager> manager,
|
||||
|
|
|
@ -261,21 +261,22 @@ void FullNodeImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_se
|
|||
}
|
||||
}
|
||||
|
||||
void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) {
|
||||
void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, int mode) {
|
||||
if (mode & broadcast_mode_custom) {
|
||||
send_block_broadcast_to_custom_overlays(broadcast);
|
||||
if (custom_overlays_only) {
|
||||
return;
|
||||
}
|
||||
auto shard = get_shard(ShardIdFull{masterchainId});
|
||||
if (shard.empty()) {
|
||||
VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard";
|
||||
return;
|
||||
}
|
||||
if (broadcast.block_id.is_masterchain() && !private_block_overlays_.empty()) {
|
||||
if (!private_block_overlays_.empty() && (mode & broadcast_mode_private_block)) {
|
||||
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_broadcast,
|
||||
broadcast.clone());
|
||||
}
|
||||
if (mode & broadcast_mode_public) {
|
||||
td::actor::send_closure(shard, &FullNodeShard::send_broadcast, std::move(broadcast));
|
||||
}
|
||||
}
|
||||
|
||||
void FullNodeImpl::download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
|
||||
|
@ -496,8 +497,8 @@ void FullNodeImpl::start_up() {
|
|||
td::actor::send_closure(id_, &FullNodeImpl::send_block_candidate, block_id, cc_seqno, validator_set_hash,
|
||||
std::move(data));
|
||||
}
|
||||
void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override {
|
||||
td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), custom_overlays_only);
|
||||
void send_broadcast(BlockBroadcast broadcast, int mode) override {
|
||||
td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), mode);
|
||||
}
|
||||
void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
|
||||
td::Promise<ReceivedBlock> promise) override {
|
||||
|
|
|
@ -99,6 +99,7 @@ class FullNode : public td::actor::Actor {
|
|||
static constexpr td::uint64 max_state_size() {
|
||||
return 4ull << 30;
|
||||
}
|
||||
enum { broadcast_mode_public = 1, broadcast_mode_private_block = 2, broadcast_mode_custom = 4 };
|
||||
|
||||
static td::actor::ActorOwn<FullNode> create(ton::PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id,
|
||||
FileHash zero_state_file_hash, FullNodeConfig config,
|
||||
|
|
|
@ -68,7 +68,7 @@ class FullNodeImpl : public FullNode {
|
|||
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqnp, td::BufferSlice data);
|
||||
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
|
||||
td::BufferSlice data);
|
||||
void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only);
|
||||
void send_broadcast(BlockBroadcast broadcast, int mode);
|
||||
void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise<ReceivedBlock> promise);
|
||||
void download_zero_state(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
|
||||
td::Promise<td::BufferSlice> promise);
|
||||
|
|
|
@ -41,7 +41,7 @@ using namespace std::literals::string_literals;
|
|||
|
||||
AcceptBlockQuery::AcceptBlockQuery(BlockIdExt id, td::Ref<BlockData> data, std::vector<BlockIdExt> prev,
|
||||
td::Ref<ValidatorSet> validator_set, td::Ref<BlockSignatureSet> signatures,
|
||||
td::Ref<BlockSignatureSet> approve_signatures, bool send_broadcast,
|
||||
td::Ref<BlockSignatureSet> approve_signatures, int send_broadcast_mode,
|
||||
td::actor::ActorId<ValidatorManager> manager, td::Promise<td::Unit> promise)
|
||||
: id_(id)
|
||||
, data_(std::move(data))
|
||||
|
@ -51,7 +51,7 @@ AcceptBlockQuery::AcceptBlockQuery(BlockIdExt id, td::Ref<BlockData> data, std::
|
|||
, approve_signatures_(std::move(approve_signatures))
|
||||
, is_fake_(false)
|
||||
, is_fork_(false)
|
||||
, send_broadcast_(send_broadcast)
|
||||
, send_broadcast_mode_(send_broadcast_mode)
|
||||
, manager_(manager)
|
||||
, promise_(std::move(promise))
|
||||
, perf_timer_("acceptblock", 0.1, [manager](double duration) {
|
||||
|
@ -72,7 +72,6 @@ AcceptBlockQuery::AcceptBlockQuery(AcceptBlockQuery::IsFake fake, BlockIdExt id,
|
|||
, validator_set_(std::move(validator_set))
|
||||
, is_fake_(true)
|
||||
, is_fork_(false)
|
||||
, send_broadcast_(false)
|
||||
, manager_(manager)
|
||||
, promise_(std::move(promise))
|
||||
, perf_timer_("acceptblock", 0.1, [manager](double duration) {
|
||||
|
@ -90,7 +89,6 @@ AcceptBlockQuery::AcceptBlockQuery(ForceFork ffork, BlockIdExt id, td::Ref<Block
|
|||
, data_(std::move(data))
|
||||
, is_fake_(true)
|
||||
, is_fork_(true)
|
||||
, send_broadcast_(false)
|
||||
, manager_(manager)
|
||||
, promise_(std::move(promise))
|
||||
, perf_timer_("acceptblock", 0.1, [manager](double duration) {
|
||||
|
@ -928,6 +926,10 @@ void AcceptBlockQuery::written_block_info_2() {
|
|||
}
|
||||
|
||||
void AcceptBlockQuery::applied() {
|
||||
if (send_broadcast_mode_ == 0) {
|
||||
finish_query();
|
||||
return;
|
||||
}
|
||||
BlockBroadcast b;
|
||||
b.data = data_->data();
|
||||
b.block_id = id_;
|
||||
|
@ -947,8 +949,7 @@ void AcceptBlockQuery::applied() {
|
|||
}
|
||||
|
||||
// do not wait for answer
|
||||
td::actor::send_closure_later(manager_, &ValidatorManager::send_block_broadcast, std::move(b),
|
||||
/* custom_overlays_only = */ !send_broadcast_);
|
||||
td::actor::send_closure_later(manager_, &ValidatorManager::send_block_broadcast, std::move(b), send_broadcast_mode_);
|
||||
|
||||
finish_query();
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ class AcceptBlockQuery : public td::actor::Actor {
|
|||
struct ForceFork {};
|
||||
AcceptBlockQuery(BlockIdExt id, td::Ref<BlockData> data, std::vector<BlockIdExt> prev,
|
||||
td::Ref<ValidatorSet> validator_set, td::Ref<BlockSignatureSet> signatures,
|
||||
td::Ref<BlockSignatureSet> approve_signatures, bool send_broadcast,
|
||||
td::Ref<BlockSignatureSet> approve_signatures, int send_broadcast_mode,
|
||||
td::actor::ActorId<ValidatorManager> manager, td::Promise<td::Unit> promise);
|
||||
AcceptBlockQuery(IsFake fake, BlockIdExt id, td::Ref<BlockData> data, std::vector<BlockIdExt> prev,
|
||||
td::Ref<ValidatorSet> validator_set, td::actor::ActorId<ValidatorManager> manager,
|
||||
|
@ -99,7 +99,7 @@ class AcceptBlockQuery : public td::actor::Actor {
|
|||
Ref<BlockSignatureSetQ> approve_signatures_;
|
||||
bool is_fake_;
|
||||
bool is_fork_;
|
||||
bool send_broadcast_;
|
||||
int send_broadcast_mode_{0};
|
||||
bool ancestors_split_{false}, is_key_block_{false};
|
||||
td::Timestamp timeout_ = td::Timestamp::in(600.0);
|
||||
td::actor::ActorId<ValidatorManager> manager_;
|
||||
|
|
|
@ -131,11 +131,11 @@ td::Result<td::Ref<IhrMessage>> create_ihr_message(td::BufferSlice data) {
|
|||
|
||||
void run_accept_block_query(BlockIdExt id, td::Ref<BlockData> data, std::vector<BlockIdExt> prev,
|
||||
td::Ref<ValidatorSet> validator_set, td::Ref<BlockSignatureSet> signatures,
|
||||
td::Ref<BlockSignatureSet> approve_signatures, bool send_broadcast,
|
||||
td::Ref<BlockSignatureSet> approve_signatures, int send_broadcast_mode,
|
||||
td::actor::ActorId<ValidatorManager> manager, td::Promise<td::Unit> promise) {
|
||||
td::actor::create_actor<AcceptBlockQuery>(PSTRING() << "accept" << id.id.to_str(), id, std::move(data), prev,
|
||||
std::move(validator_set), std::move(signatures),
|
||||
std::move(approve_signatures), send_broadcast, manager, std::move(promise))
|
||||
td::actor::create_actor<AcceptBlockQuery>(
|
||||
PSTRING() << "accept" << id.id.to_str(), id, std::move(data), prev, std::move(validator_set),
|
||||
std::move(signatures), std::move(approve_signatures), send_broadcast_mode, manager, std::move(promise))
|
||||
.release();
|
||||
}
|
||||
|
||||
|
|
|
@ -105,6 +105,8 @@ class ValidatorManager : public ValidatorManagerInterface {
|
|||
|
||||
virtual void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno,
|
||||
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) = 0;
|
||||
virtual void send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
|
||||
td::BufferSlice data) = 0;
|
||||
|
||||
virtual void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority,
|
||||
td::Timestamp timeout, td::Promise<td::Ref<ShardState>> promise) = 0;
|
||||
|
@ -144,7 +146,7 @@ class ValidatorManager : public ValidatorManagerInterface {
|
|||
virtual void send_external_message(td::Ref<ExtMessage> message) = 0;
|
||||
virtual void send_ihr_message(td::Ref<IhrMessage> message) = 0;
|
||||
virtual void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) = 0;
|
||||
virtual void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) = 0;
|
||||
virtual void send_block_broadcast(BlockBroadcast broadcast, int mode) = 0;
|
||||
|
||||
virtual void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) = 0;
|
||||
virtual void get_shard_client_state(bool from_db, td::Promise<BlockIdExt> promise) = 0;
|
||||
|
|
|
@ -785,6 +785,11 @@ void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate can
|
|||
td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise));
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno,
|
||||
td::uint32 validator_set_hash, td::BufferSlice data) {
|
||||
callback_->send_block_candidate(id, cc_seqno, validator_set_hash, std::move(data));
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::write_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
|
||||
td::actor::send_closure(db_, &Db::store_block_handle, std::move(handle), std::move(promise));
|
||||
}
|
||||
|
|
|
@ -185,6 +185,8 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
|
||||
void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno,
|
||||
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) override;
|
||||
void send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
|
||||
td::BufferSlice data) override;
|
||||
|
||||
void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout,
|
||||
td::Promise<td::Ref<ShardState>> promise) override;
|
||||
|
@ -259,7 +261,7 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
new_ihr_message(message->serialize());
|
||||
}
|
||||
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override;
|
||||
void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override {
|
||||
void send_block_broadcast(BlockBroadcast broadcast, int mode) override {
|
||||
}
|
||||
|
||||
void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) override;
|
||||
|
|
|
@ -226,6 +226,10 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) override {
|
||||
promise.set_value(td::Unit());
|
||||
}
|
||||
void send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
|
||||
td::BufferSlice data) {
|
||||
callback_->send_block_candidate(id, cc_seqno, validator_set_hash, std::move(data));
|
||||
}
|
||||
|
||||
void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout,
|
||||
td::Promise<td::Ref<ShardState>> promise) override;
|
||||
|
@ -326,7 +330,7 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override {
|
||||
UNREACHABLE();
|
||||
}
|
||||
void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override {
|
||||
void send_block_broadcast(BlockBroadcast broadcast, int mode) override {
|
||||
}
|
||||
|
||||
void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) override {
|
||||
|
|
|
@ -1284,11 +1284,15 @@ void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate can
|
|||
}
|
||||
if (!id.is_masterchain()) {
|
||||
add_cached_block_candidate(ReceivedBlock{id, candidate.data.clone()});
|
||||
callback_->send_block_candidate(id, cc_seqno, validator_set_hash, candidate.data.clone());
|
||||
}
|
||||
td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise));
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno,
|
||||
td::uint32 validator_set_hash, td::BufferSlice data) {
|
||||
callback_->send_block_candidate(id, cc_seqno, validator_set_hash, std::move(data));
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::write_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
|
||||
auto P = td::PromiseCreator::lambda(
|
||||
[SelfId = actor_id(this), handle, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
|
||||
|
@ -1616,8 +1620,8 @@ void ValidatorManagerImpl::send_top_shard_block_description(td::Ref<ShardTopBloc
|
|||
}
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) {
|
||||
callback_->send_broadcast(std::move(broadcast), custom_overlays_only);
|
||||
void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast, int mode) {
|
||||
callback_->send_broadcast(std::move(broadcast), mode);
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::start_up() {
|
||||
|
|
|
@ -434,6 +434,8 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
|
||||
void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno,
|
||||
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) override;
|
||||
void send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
|
||||
td::BufferSlice data) override;
|
||||
|
||||
void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout,
|
||||
td::Promise<td::Ref<ShardState>> promise) override;
|
||||
|
@ -498,7 +500,7 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
void send_external_message(td::Ref<ExtMessage> message) override;
|
||||
void send_ihr_message(td::Ref<IhrMessage> message) override;
|
||||
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override;
|
||||
void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override;
|
||||
void send_block_broadcast(BlockBroadcast broadcast, int mode) override;
|
||||
|
||||
void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) override;
|
||||
void get_shard_client_state(bool from_db, td::Promise<BlockIdExt> promise) override;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
#include "validator-group.hpp"
|
||||
#include "fabric.h"
|
||||
#include "full-node-master.hpp"
|
||||
#include "ton/ton-io.hpp"
|
||||
#include "td/utils/overloaded.h"
|
||||
#include "common/delay.h"
|
||||
|
@ -27,8 +28,14 @@ namespace ton {
|
|||
|
||||
namespace validator {
|
||||
|
||||
static bool need_send_candidate_broadcast(const validatorsession::BlockSourceInfo &source_info, bool is_masterchain) {
|
||||
return source_info.first_block_round == source_info.round && source_info.source_priority == 0 && !is_masterchain;
|
||||
}
|
||||
|
||||
void ValidatorGroup::generate_block_candidate(
|
||||
td::uint32 round_id, td::Promise<validatorsession::ValidatorSession::GeneratedCandidate> promise) {
|
||||
validatorsession::BlockSourceInfo source_info,
|
||||
td::Promise<validatorsession::ValidatorSession::GeneratedCandidate> promise) {
|
||||
td::uint32 round_id = source_info.round;
|
||||
if (round_id > last_known_round_id_) {
|
||||
last_known_round_id_ = round_id;
|
||||
}
|
||||
|
@ -53,12 +60,15 @@ void ValidatorGroup::generate_block_candidate(
|
|||
run_collate_query(
|
||||
shard_, min_masterchain_block_id_, prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()},
|
||||
validator_set_, opts_->get_collator_options(), manager_, td::Timestamp::in(10.0),
|
||||
[SelfId = actor_id(this), cache = cached_collated_block_](td::Result<BlockCandidate> R) {
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::generated_block_candidate, std::move(cache), std::move(R));
|
||||
[SelfId = actor_id(this), cache = cached_collated_block_, source_info](td::Result<BlockCandidate> R) mutable {
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::generated_block_candidate, std::move(source_info),
|
||||
std::move(cache), std::move(R));
|
||||
}, cancellation_token_source_.get_cancellation_token(), /* mode = */ 0);
|
||||
}
|
||||
|
||||
void ValidatorGroup::generated_block_candidate(std::shared_ptr<CachedCollatedBlock> cache, td::Result<BlockCandidate> R) {
|
||||
void ValidatorGroup::generated_block_candidate(validatorsession::BlockSourceInfo source_info,
|
||||
std::shared_ptr<CachedCollatedBlock> cache,
|
||||
td::Result<BlockCandidate> R) {
|
||||
if (R.is_error()) {
|
||||
for (auto &p : cache->promises) {
|
||||
p.set_error(R.error().clone());
|
||||
|
@ -69,6 +79,9 @@ void ValidatorGroup::generated_block_candidate(std::shared_ptr<CachedCollatedBlo
|
|||
} else {
|
||||
auto candidate = R.move_as_ok();
|
||||
add_available_block_candidate(candidate.pubkey.as_bits256(), candidate.id, candidate.collated_file_hash);
|
||||
if (need_send_candidate_broadcast(source_info, shard_.is_masterchain())) {
|
||||
send_block_candidate_broadcast(candidate.id, candidate.data.clone());
|
||||
}
|
||||
cache->result = std::move(candidate);
|
||||
for (auto &p : cache->promises) {
|
||||
p.set_value(cache->result.value().clone());
|
||||
|
@ -77,8 +90,9 @@ void ValidatorGroup::generated_block_candidate(std::shared_ptr<CachedCollatedBlo
|
|||
cache->promises.clear();
|
||||
}
|
||||
|
||||
void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidate block,
|
||||
void ValidatorGroup::validate_block_candidate(validatorsession::BlockSourceInfo source_info, BlockCandidate block,
|
||||
td::Promise<std::pair<UnixTime, bool>> promise) {
|
||||
td::uint32 round_id = source_info.round;
|
||||
if (round_id > last_known_round_id_) {
|
||||
last_known_round_id_ = round_id;
|
||||
}
|
||||
|
@ -97,7 +111,8 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat
|
|||
return;
|
||||
}
|
||||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), round_id, block = block.clone(),
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), source_info, block = block.clone(), manager = manager_,
|
||||
validator_set = validator_set_,
|
||||
promise = std::move(promise)](td::Result<ValidateCandidateResult> R) mutable {
|
||||
if (R.is_error()) {
|
||||
auto S = R.move_as_error();
|
||||
|
@ -105,19 +120,22 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat
|
|||
LOG(ERROR) << "failed to validate candidate: " << S;
|
||||
}
|
||||
delay_action(
|
||||
[SelfId, round_id, block = std::move(block), promise = std::move(promise)]() mutable {
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::validate_block_candidate, round_id, std::move(block),
|
||||
std::move(promise));
|
||||
[SelfId, source_info, block = std::move(block), promise = std::move(promise)]() mutable {
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::validate_block_candidate, std::move(source_info),
|
||||
std::move(block), std::move(promise));
|
||||
},
|
||||
td::Timestamp::in(0.1));
|
||||
} else {
|
||||
auto v = R.move_as_ok();
|
||||
v.visit(td::overloaded(
|
||||
[&](UnixTime ts) {
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::update_approve_cache, block_to_cache_key(block),
|
||||
ts);
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::update_approve_cache, block_to_cache_key(block), ts);
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::add_available_block_candidate, block.pubkey.as_bits256(),
|
||||
block.id, block.collated_file_hash);
|
||||
if (need_send_candidate_broadcast(source_info, block.id.is_masterchain())) {
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::send_block_candidate_broadcast, block.id,
|
||||
block.data.clone());
|
||||
}
|
||||
promise.set_value({ts, false});
|
||||
},
|
||||
[&](CandidateReject reject) {
|
||||
|
@ -140,13 +158,14 @@ void ValidatorGroup::update_approve_cache(CacheKey key, UnixTime value) {
|
|||
approved_candidates_cache_[key] = value;
|
||||
}
|
||||
|
||||
void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash src, td::BufferSlice block_data,
|
||||
void ValidatorGroup::accept_block_candidate(validatorsession::BlockSourceInfo source_info, td::BufferSlice block_data,
|
||||
RootHash root_hash, FileHash file_hash,
|
||||
std::vector<BlockSignature> signatures,
|
||||
std::vector<BlockSignature> approve_signatures,
|
||||
validatorsession::ValidatorSessionStats stats,
|
||||
td::Promise<td::Unit> promise) {
|
||||
stats.cc_seqno = validator_set_->get_catchain_seqno();
|
||||
td::uint32 round_id = source_info.round;
|
||||
if (round_id >= last_known_round_id_) {
|
||||
last_known_round_id_ = round_id + 1;
|
||||
}
|
||||
|
@ -165,7 +184,21 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
|
|||
td::actor::send_closure(manager_, &ValidatorManager::log_validator_session_stats, next_block_id, std::move(stats));
|
||||
auto block =
|
||||
block_data.size() > 0 ? create_block(next_block_id, std::move(block_data)).move_as_ok() : td::Ref<BlockData>{};
|
||||
bool send_broadcast = src == local_id_;
|
||||
|
||||
// Creator of the block sends broadcast to public overlays
|
||||
// Creator of the block sends broadcast to private block overlay unless candidate broadcast was sent
|
||||
// Any node sends broadcast to custom overlays unless candidate broadcast was sent
|
||||
int send_broadcast_mode = 0;
|
||||
bool sent_candidate = sent_candidate_broadcasts_.contains(block->block_id());
|
||||
if (source_info.source.compute_short_id() == local_id_) {
|
||||
send_broadcast_mode |= fullnode::FullNode::broadcast_mode_public;
|
||||
if (!sent_candidate) {
|
||||
send_broadcast_mode |= fullnode::FullNode::broadcast_mode_private_block;
|
||||
}
|
||||
}
|
||||
if (!sent_candidate) {
|
||||
send_broadcast_mode |= fullnode::FullNode::broadcast_mode_custom;
|
||||
}
|
||||
|
||||
auto P = td::PromiseCreator::lambda([=, SelfId = actor_id(this), block_id = next_block_id, prev = prev_block_ids_,
|
||||
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
|
||||
|
@ -176,7 +209,7 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
|
|||
}
|
||||
LOG_CHECK(R.error().code() == ErrorCode::timeout || R.error().code() == ErrorCode::notready) << R.move_as_error();
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block),
|
||||
std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast,
|
||||
std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast_mode,
|
||||
std::move(promise));
|
||||
} else {
|
||||
promise.set_value(R.move_as_ok());
|
||||
|
@ -184,7 +217,8 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
|
|||
});
|
||||
|
||||
run_accept_block_query(next_block_id, std::move(block), prev_block_ids_, validator_set_, std::move(sig_set),
|
||||
std::move(approve_sig_set), send_broadcast, manager_, std::move(P));
|
||||
std::move(approve_sig_set), send_broadcast_mode, manager_,
|
||||
std::move(P));
|
||||
prev_block_ids_ = std::vector<BlockIdExt>{next_block_id};
|
||||
cached_collated_block_ = nullptr;
|
||||
approved_candidates_cache_.clear();
|
||||
|
@ -193,14 +227,14 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
|
|||
|
||||
void ValidatorGroup::retry_accept_block_query(BlockIdExt block_id, td::Ref<BlockData> block,
|
||||
std::vector<BlockIdExt> prev, td::Ref<BlockSignatureSet> sig_set,
|
||||
td::Ref<BlockSignatureSet> approve_sig_set, bool send_broadcast,
|
||||
td::Ref<BlockSignatureSet> approve_sig_set, int send_broadcast_mode,
|
||||
td::Promise<td::Unit> promise) {
|
||||
auto P = td::PromiseCreator::lambda(
|
||||
[=, SelfId = actor_id(this), promise = std::move(promise)](td::Result<td::Unit> R) mutable {
|
||||
if (R.is_error()) {
|
||||
LOG_CHECK(R.error().code() == ErrorCode::timeout) << R.move_as_error();
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block),
|
||||
std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast,
|
||||
std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast_mode,
|
||||
std::move(promise));
|
||||
} else {
|
||||
promise.set_value(R.move_as_ok());
|
||||
|
@ -208,7 +242,7 @@ void ValidatorGroup::retry_accept_block_query(BlockIdExt block_id, td::Ref<Block
|
|||
});
|
||||
|
||||
run_accept_block_query(block_id, std::move(block), prev, validator_set_, std::move(sig_set),
|
||||
std::move(approve_sig_set), send_broadcast, manager_, std::move(P));
|
||||
std::move(approve_sig_set), send_broadcast_mode, manager_, std::move(P));
|
||||
}
|
||||
|
||||
void ValidatorGroup::skip_round(td::uint32 round_id) {
|
||||
|
@ -250,8 +284,9 @@ std::unique_ptr<validatorsession::ValidatorSession::Callback> ValidatorGroup::ma
|
|||
public:
|
||||
Callback(td::actor::ActorId<ValidatorGroup> id) : id_(id) {
|
||||
}
|
||||
void on_candidate(td::uint32 round, PublicKey source, validatorsession::ValidatorSessionRootHash root_hash,
|
||||
td::BufferSlice data, td::BufferSlice collated_data,
|
||||
void on_candidate(validatorsession::BlockSourceInfo source_info,
|
||||
validatorsession::ValidatorSessionRootHash root_hash, td::BufferSlice data,
|
||||
td::BufferSlice collated_data,
|
||||
td::Promise<validatorsession::ValidatorSession::CandidateDecision> promise) override {
|
||||
auto P =
|
||||
td::PromiseCreator::lambda([promise = std::move(promise)](td::Result<std::pair<td::uint32, bool>> R) mutable {
|
||||
|
@ -266,18 +301,19 @@ std::unique_ptr<validatorsession::ValidatorSession::Callback> ValidatorGroup::ma
|
|||
}
|
||||
});
|
||||
|
||||
BlockCandidate candidate{Ed25519_PublicKey{source.ed25519_value().raw()},
|
||||
BlockCandidate candidate{Ed25519_PublicKey{source_info.source.ed25519_value().raw()},
|
||||
BlockIdExt{0, 0, 0, root_hash, sha256_bits256(data.as_slice())},
|
||||
sha256_bits256(collated_data.as_slice()), data.clone(), collated_data.clone()};
|
||||
|
||||
td::actor::send_closure(id_, &ValidatorGroup::validate_block_candidate, round, std::move(candidate),
|
||||
std::move(P));
|
||||
td::actor::send_closure(id_, &ValidatorGroup::validate_block_candidate, std::move(source_info),
|
||||
std::move(candidate), std::move(P));
|
||||
}
|
||||
void on_generate_slot(td::uint32 round,
|
||||
void on_generate_slot(validatorsession::BlockSourceInfo source_info,
|
||||
td::Promise<validatorsession::ValidatorSession::GeneratedCandidate> promise) override {
|
||||
td::actor::send_closure(id_, &ValidatorGroup::generate_block_candidate, round, std::move(promise));
|
||||
td::actor::send_closure(id_, &ValidatorGroup::generate_block_candidate, std::move(source_info),
|
||||
std::move(promise));
|
||||
}
|
||||
void on_block_committed(td::uint32 round, PublicKey source, validatorsession::ValidatorSessionRootHash root_hash,
|
||||
void on_block_committed(validatorsession::BlockSourceInfo source_info, validatorsession::ValidatorSessionRootHash root_hash,
|
||||
validatorsession::ValidatorSessionFileHash file_hash, td::BufferSlice data,
|
||||
std::vector<std::pair<PublicKeyHash, td::BufferSlice>> signatures,
|
||||
std::vector<std::pair<PublicKeyHash, td::BufferSlice>> approve_signatures,
|
||||
|
@ -291,9 +327,9 @@ std::unique_ptr<validatorsession::ValidatorSession::Callback> ValidatorGroup::ma
|
|||
approve_sigs.emplace_back(BlockSignature{sig.first.bits256_value(), std::move(sig.second)});
|
||||
}
|
||||
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit>) {});
|
||||
td::actor::send_closure(id_, &ValidatorGroup::accept_block_candidate, round, source.compute_short_id(),
|
||||
std::move(data), root_hash, file_hash, std::move(sigs), std::move(approve_sigs),
|
||||
std::move(stats), std::move(P));
|
||||
td::actor::send_closure(id_, &ValidatorGroup::accept_block_candidate, std::move(source_info), std::move(data),
|
||||
root_hash, file_hash, std::move(sigs), std::move(approve_sigs), std::move(stats),
|
||||
std::move(P));
|
||||
}
|
||||
void on_block_skipped(td::uint32 round) override {
|
||||
td::actor::send_closure(id_, &ValidatorGroup::skip_round, round);
|
||||
|
@ -379,7 +415,7 @@ void ValidatorGroup::start(std::vector<BlockIdExt> prev, BlockIdExt min_masterch
|
|||
auto block =
|
||||
p.block.size() > 0 ? create_block(next_block_id, std::move(p.block)).move_as_ok() : td::Ref<BlockData>{};
|
||||
retry_accept_block_query(next_block_id, std::move(block), prev_block_ids_, std::move(p.sigs),
|
||||
std::move(p.approve_sigs), false, std::move(p.promise));
|
||||
std::move(p.approve_sigs), 0, std::move(p.promise));
|
||||
prev_block_ids_ = std::vector<BlockIdExt>{next_block_id};
|
||||
}
|
||||
postponed_accept_.clear();
|
||||
|
|
|
@ -34,18 +34,18 @@ class ValidatorManager;
|
|||
|
||||
class ValidatorGroup : public td::actor::Actor {
|
||||
public:
|
||||
void generate_block_candidate(td::uint32 round_id,
|
||||
void generate_block_candidate(validatorsession::BlockSourceInfo source_info,
|
||||
td::Promise<validatorsession::ValidatorSession::GeneratedCandidate> promise);
|
||||
void validate_block_candidate(td::uint32 round_id, BlockCandidate block,
|
||||
void validate_block_candidate(validatorsession::BlockSourceInfo source_info, BlockCandidate block,
|
||||
td::Promise<std::pair<UnixTime, bool>> promise);
|
||||
void accept_block_candidate(td::uint32 round_id, PublicKeyHash src, td::BufferSlice block, RootHash root_hash,
|
||||
void accept_block_candidate(validatorsession::BlockSourceInfo source_info, td::BufferSlice block, RootHash root_hash,
|
||||
FileHash file_hash, std::vector<BlockSignature> signatures,
|
||||
std::vector<BlockSignature> approve_signatures,
|
||||
validatorsession::ValidatorSessionStats stats, td::Promise<td::Unit> promise);
|
||||
void skip_round(td::uint32 round);
|
||||
void retry_accept_block_query(BlockIdExt block_id, td::Ref<BlockData> block, std::vector<BlockIdExt> prev,
|
||||
td::Ref<BlockSignatureSet> sigs, td::Ref<BlockSignatureSet> approve_sigs,
|
||||
bool send_broadcast, td::Promise<td::Unit> promise);
|
||||
int send_broadcast_mode, td::Promise<td::Unit> promise);
|
||||
void get_approved_candidate(PublicKey source, RootHash root_hash, FileHash file_hash,
|
||||
FileHash collated_data_file_hash, td::Promise<BlockCandidate> promise);
|
||||
BlockIdExt create_next_block_id(RootHash root_hash, FileHash file_hash) const;
|
||||
|
@ -140,7 +140,8 @@ class ValidatorGroup : public td::actor::Actor {
|
|||
std::shared_ptr<CachedCollatedBlock> cached_collated_block_;
|
||||
td::CancellationTokenSource cancellation_token_source_;
|
||||
|
||||
void generated_block_candidate(std::shared_ptr<CachedCollatedBlock> cache, td::Result<BlockCandidate> R);
|
||||
void generated_block_candidate(validatorsession::BlockSourceInfo source_info,
|
||||
std::shared_ptr<CachedCollatedBlock> cache, td::Result<BlockCandidate> R);
|
||||
|
||||
using CacheKey = std::tuple<td::Bits256, BlockIdExt, FileHash, FileHash>;
|
||||
std::map<CacheKey, UnixTime> approved_candidates_cache_;
|
||||
|
@ -161,6 +162,16 @@ class ValidatorGroup : public td::actor::Actor {
|
|||
void add_available_block_candidate(td::Bits256 source, BlockIdExt id, FileHash collated_data_hash) {
|
||||
available_block_candidates_.emplace(source, id, collated_data_hash);
|
||||
}
|
||||
|
||||
std::set<BlockIdExt> sent_candidate_broadcasts_;
|
||||
|
||||
void send_block_candidate_broadcast(BlockIdExt id, td::BufferSlice data) {
|
||||
if (sent_candidate_broadcasts_.insert(id).second) {
|
||||
td::actor::send_closure(manager_, &ValidatorManager::send_block_candidate_broadcast, id,
|
||||
validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(),
|
||||
std::move(data));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace validator
|
||||
|
|
|
@ -174,7 +174,7 @@ class ValidatorManagerInterface : public td::actor::Actor {
|
|||
virtual void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0;
|
||||
virtual void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
|
||||
td::BufferSlice data) = 0;
|
||||
virtual void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only = false) = 0;
|
||||
virtual void send_broadcast(BlockBroadcast broadcast, int mode) = 0;
|
||||
virtual void download_block(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
|
||||
td::Promise<ReceivedBlock> promise) = 0;
|
||||
virtual void download_zero_state(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
|
||||
|
|
Loading…
Reference in a new issue