diff --git a/create-hardfork/create-hardfork.cpp b/create-hardfork/create-hardfork.cpp index 3110b1a8..501ce3b9 100644 --- a/create-hardfork/create-hardfork.cpp +++ b/create-hardfork/create-hardfork.cpp @@ -249,7 +249,7 @@ class HardforkCreator : public td::actor::Actor { void send_block_candidate(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data) override { } - void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override { + void send_broadcast(ton::BlockBroadcast broadcast, int mode) override { } void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) override { diff --git a/test/test-ton-collator.cpp b/test/test-ton-collator.cpp index 60476cf8..7c13870e 100644 --- a/test/test-ton-collator.cpp +++ b/test/test-ton-collator.cpp @@ -350,7 +350,7 @@ class TestNode : public td::actor::Actor { void send_block_candidate(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data) override { } - void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override { + void send_broadcast(ton::BlockBroadcast broadcast, int mode) override { } void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) override { diff --git a/validator-session/validator-session-types.h b/validator-session/validator-session-types.h index 78a9b246..7147bf2d 100644 --- a/validator-session/validator-session-types.h +++ b/validator-session/validator-session-types.h @@ -178,6 +178,12 @@ struct EndValidatorGroupStats { std::vector nodes; }; +struct BlockSourceInfo { + td::uint32 round, first_block_round; + PublicKey source; + td::int32 source_priority; +}; + } // namespace validatorsession } // namespace ton diff --git a/validator-session/validator-session.cpp b/validator-session/validator-session.cpp index 33b78e84..3a913990 100644 --- a/validator-session/validator-session.cpp +++ b/validator-session/validator-session.cpp @@ -553,7 +553,9 @@ void ValidatorSessionImpl::check_generate_slot() { LOG(WARNING) << print_id << ": failed to generate block candidate: " << R.move_as_error(); } }); - callback_->on_generate_slot(cur_round_, std::move(P)); + callback_->on_generate_slot( + BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(local_idx()), priority}, + std::move(P)); } else { alarm_timestamp().relax(t); } @@ -631,8 +633,10 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) { }); pending_approve_.insert(block_id); - callback_->on_candidate(cur_round_, description().get_source_public_key(block->get_src_idx()), B->root_hash_, - B->data_.clone(), B->collated_data_.clone(), std::move(P)); + callback_->on_candidate( + BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(block->get_src_idx()), + description().get_node_priority(block->get_src_idx(), cur_round_)}, + B->root_hash_, B->data_.clone(), B->collated_data_.clone(), std::move(P)); } else if (T.is_in_past()) { if (!active_requests_.count(block_id)) { auto v = virtual_state_->get_block_approvers(description(), block_id); @@ -905,15 +909,19 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) { stats.rounds.pop_back(); } + BlockSourceInfo source_info{cur_round_, first_block_round_, + description().get_source_public_key(block->get_src_idx()), + description().get_node_priority(block->get_src_idx(), cur_round_)}; if (it == blocks_.end()) { - callback_->on_block_committed(cur_round_, description().get_source_public_key(block->get_src_idx()), - block->get_root_hash(), block->get_file_hash(), td::BufferSlice(), - std::move(export_sigs), std::move(export_approve_sigs), std::move(stats)); + callback_->on_block_committed(std::move(source_info), block->get_root_hash(), block->get_file_hash(), + td::BufferSlice(), std::move(export_sigs), std::move(export_approve_sigs), + std::move(stats)); } else { - callback_->on_block_committed(cur_round_, description().get_source_public_key(block->get_src_idx()), - block->get_root_hash(), block->get_file_hash(), it->second->data_.clone(), - std::move(export_sigs), std::move(export_approve_sigs), std::move(stats)); + callback_->on_block_committed(std::move(source_info), block->get_root_hash(), block->get_file_hash(), + it->second->data_.clone(), std::move(export_sigs), std::move(export_approve_sigs), + std::move(stats)); } + first_block_round_ = cur_round_ + 1; } cur_round_++; if (have_block) { diff --git a/validator-session/validator-session.h b/validator-session/validator-session.h index e60330b0..b099d65e 100644 --- a/validator-session/validator-session.h +++ b/validator-session/validator-session.h @@ -85,11 +85,10 @@ class ValidatorSession : public td::actor::Actor { class Callback { public: - virtual void on_candidate(td::uint32 round, PublicKey source, ValidatorSessionRootHash root_hash, - td::BufferSlice data, td::BufferSlice collated_data, - td::Promise promise) = 0; - virtual void on_generate_slot(td::uint32 round, td::Promise promise) = 0; - virtual void on_block_committed(td::uint32 round, PublicKey source, ValidatorSessionRootHash root_hash, + virtual void on_candidate(BlockSourceInfo source_info, ValidatorSessionRootHash root_hash, td::BufferSlice data, + td::BufferSlice collated_data, td::Promise promise) = 0; + virtual void on_generate_slot(BlockSourceInfo source_info, td::Promise promise) = 0; + virtual void on_block_committed(BlockSourceInfo source_info, ValidatorSessionRootHash root_hash, ValidatorSessionFileHash file_hash, td::BufferSlice data, std::vector> signatures, std::vector> approve_signatures, diff --git a/validator-session/validator-session.hpp b/validator-session/validator-session.hpp index 39f196d8..690346f7 100644 --- a/validator-session/validator-session.hpp +++ b/validator-session/validator-session.hpp @@ -53,7 +53,7 @@ class ValidatorSessionImpl : public ValidatorSession { const ValidatorSessionState *real_state_ = nullptr; const ValidatorSessionState *virtual_state_ = nullptr; - td::uint32 cur_round_ = 0; + td::uint32 cur_round_ = 0, first_block_round_ = 0; td::Timestamp round_started_at_ = td::Timestamp::never(); td::Timestamp round_debug_at_ = td::Timestamp::never(); std::set pending_approve_; diff --git a/validator/fabric.h b/validator/fabric.h index 47418047..2c39aceb 100644 --- a/validator/fabric.h +++ b/validator/fabric.h @@ -56,7 +56,7 @@ void run_check_external_message(td::Ref message, td::actor::ActorId< void run_accept_block_query(BlockIdExt id, td::Ref data, std::vector prev, td::Ref validator_set, td::Ref signatures, - td::Ref approve_signatures, bool send_broadcast, + td::Ref approve_signatures, int send_broadcast_mode, td::actor::ActorId manager, td::Promise promise); void run_fake_accept_block_query(BlockIdExt id, td::Ref data, std::vector prev, td::Ref validator_set, td::actor::ActorId manager, diff --git a/validator/full-node.cpp b/validator/full-node.cpp index a72be3ff..bb6da2c8 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -261,21 +261,22 @@ void FullNodeImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_se } } -void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) { - send_block_broadcast_to_custom_overlays(broadcast); - if (custom_overlays_only) { - return; +void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, int mode) { + if (mode & broadcast_mode_custom) { + send_block_broadcast_to_custom_overlays(broadcast); } auto shard = get_shard(ShardIdFull{masterchainId}); if (shard.empty()) { VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard"; return; } - if (broadcast.block_id.is_masterchain() && !private_block_overlays_.empty()) { + if (!private_block_overlays_.empty() && (mode & broadcast_mode_private_block)) { td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_broadcast, broadcast.clone()); } - td::actor::send_closure(shard, &FullNodeShard::send_broadcast, std::move(broadcast)); + if (mode & broadcast_mode_public) { + td::actor::send_closure(shard, &FullNodeShard::send_broadcast, std::move(broadcast)); + } } void FullNodeImpl::download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, @@ -496,8 +497,8 @@ void FullNodeImpl::start_up() { td::actor::send_closure(id_, &FullNodeImpl::send_block_candidate, block_id, cc_seqno, validator_set_hash, std::move(data)); } - void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override { - td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), custom_overlays_only); + void send_broadcast(BlockBroadcast broadcast, int mode) override { + td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), mode); } void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) override { diff --git a/validator/full-node.h b/validator/full-node.h index c3719f67..621cdac0 100644 --- a/validator/full-node.h +++ b/validator/full-node.h @@ -99,6 +99,7 @@ class FullNode : public td::actor::Actor { static constexpr td::uint64 max_state_size() { return 4ull << 30; } + enum { broadcast_mode_public = 1, broadcast_mode_private_block = 2, broadcast_mode_custom = 4 }; static td::actor::ActorOwn create(ton::PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash, FullNodeConfig config, diff --git a/validator/full-node.hpp b/validator/full-node.hpp index 3dfa17fd..584be5ee 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -68,7 +68,7 @@ class FullNodeImpl : public FullNode { void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqnp, td::BufferSlice data); void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data); - void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only); + void send_broadcast(BlockBroadcast broadcast, int mode); void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise); void download_zero_state(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise); diff --git a/validator/impl/accept-block.cpp b/validator/impl/accept-block.cpp index cfdeb052..a9dd7fe2 100644 --- a/validator/impl/accept-block.cpp +++ b/validator/impl/accept-block.cpp @@ -41,7 +41,7 @@ using namespace std::literals::string_literals; AcceptBlockQuery::AcceptBlockQuery(BlockIdExt id, td::Ref data, std::vector prev, td::Ref validator_set, td::Ref signatures, - td::Ref approve_signatures, bool send_broadcast, + td::Ref approve_signatures, int send_broadcast_mode, td::actor::ActorId manager, td::Promise promise) : id_(id) , data_(std::move(data)) @@ -51,7 +51,7 @@ AcceptBlockQuery::AcceptBlockQuery(BlockIdExt id, td::Ref data, std:: , approve_signatures_(std::move(approve_signatures)) , is_fake_(false) , is_fork_(false) - , send_broadcast_(send_broadcast) + , send_broadcast_mode_(send_broadcast_mode) , manager_(manager) , promise_(std::move(promise)) , perf_timer_("acceptblock", 0.1, [manager](double duration) { @@ -72,7 +72,6 @@ AcceptBlockQuery::AcceptBlockQuery(AcceptBlockQuery::IsFake fake, BlockIdExt id, , validator_set_(std::move(validator_set)) , is_fake_(true) , is_fork_(false) - , send_broadcast_(false) , manager_(manager) , promise_(std::move(promise)) , perf_timer_("acceptblock", 0.1, [manager](double duration) { @@ -90,7 +89,6 @@ AcceptBlockQuery::AcceptBlockQuery(ForceFork ffork, BlockIdExt id, td::Refdata(); b.block_id = id_; @@ -947,8 +949,7 @@ void AcceptBlockQuery::applied() { } // do not wait for answer - td::actor::send_closure_later(manager_, &ValidatorManager::send_block_broadcast, std::move(b), - /* custom_overlays_only = */ !send_broadcast_); + td::actor::send_closure_later(manager_, &ValidatorManager::send_block_broadcast, std::move(b), send_broadcast_mode_); finish_query(); } diff --git a/validator/impl/accept-block.hpp b/validator/impl/accept-block.hpp index 95b5f281..d1c0baa6 100644 --- a/validator/impl/accept-block.hpp +++ b/validator/impl/accept-block.hpp @@ -50,7 +50,7 @@ class AcceptBlockQuery : public td::actor::Actor { struct ForceFork {}; AcceptBlockQuery(BlockIdExt id, td::Ref data, std::vector prev, td::Ref validator_set, td::Ref signatures, - td::Ref approve_signatures, bool send_broadcast, + td::Ref approve_signatures, int send_broadcast_mode, td::actor::ActorId manager, td::Promise promise); AcceptBlockQuery(IsFake fake, BlockIdExt id, td::Ref data, std::vector prev, td::Ref validator_set, td::actor::ActorId manager, @@ -99,7 +99,7 @@ class AcceptBlockQuery : public td::actor::Actor { Ref approve_signatures_; bool is_fake_; bool is_fork_; - bool send_broadcast_; + int send_broadcast_mode_{0}; bool ancestors_split_{false}, is_key_block_{false}; td::Timestamp timeout_ = td::Timestamp::in(600.0); td::actor::ActorId manager_; diff --git a/validator/impl/fabric.cpp b/validator/impl/fabric.cpp index 36fa052a..65b92262 100644 --- a/validator/impl/fabric.cpp +++ b/validator/impl/fabric.cpp @@ -131,11 +131,11 @@ td::Result> create_ihr_message(td::BufferSlice data) { void run_accept_block_query(BlockIdExt id, td::Ref data, std::vector prev, td::Ref validator_set, td::Ref signatures, - td::Ref approve_signatures, bool send_broadcast, + td::Ref approve_signatures, int send_broadcast_mode, td::actor::ActorId manager, td::Promise promise) { - td::actor::create_actor(PSTRING() << "accept" << id.id.to_str(), id, std::move(data), prev, - std::move(validator_set), std::move(signatures), - std::move(approve_signatures), send_broadcast, manager, std::move(promise)) + td::actor::create_actor( + PSTRING() << "accept" << id.id.to_str(), id, std::move(data), prev, std::move(validator_set), + std::move(signatures), std::move(approve_signatures), send_broadcast_mode, manager, std::move(promise)) .release(); } diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index b6016bc2..ce0c27e1 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -105,6 +105,8 @@ class ValidatorManager : public ValidatorManagerInterface { virtual void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::Promise promise) = 0; + virtual void send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) = 0; virtual void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) = 0; @@ -144,7 +146,7 @@ class ValidatorManager : public ValidatorManagerInterface { virtual void send_external_message(td::Ref message) = 0; virtual void send_ihr_message(td::Ref message) = 0; virtual void send_top_shard_block_description(td::Ref desc) = 0; - virtual void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) = 0; + virtual void send_block_broadcast(BlockBroadcast broadcast, int mode) = 0; virtual void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) = 0; virtual void get_shard_client_state(bool from_db, td::Promise promise) = 0; diff --git a/validator/manager-disk.cpp b/validator/manager-disk.cpp index 3418608d..62fdc4b4 100644 --- a/validator/manager-disk.cpp +++ b/validator/manager-disk.cpp @@ -785,6 +785,11 @@ void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate can td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise)); } +void ValidatorManagerImpl::send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::BufferSlice data) { + callback_->send_block_candidate(id, cc_seqno, validator_set_hash, std::move(data)); +} + void ValidatorManagerImpl::write_handle(BlockHandle handle, td::Promise promise) { td::actor::send_closure(db_, &Db::store_block_handle, std::move(handle), std::move(promise)); } diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index e15def2a..5287a387 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -185,6 +185,8 @@ class ValidatorManagerImpl : public ValidatorManager { void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::Promise promise) override; + void send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) override; void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; @@ -259,7 +261,7 @@ class ValidatorManagerImpl : public ValidatorManager { new_ihr_message(message->serialize()); } void send_top_shard_block_description(td::Ref desc) override; - void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override { + void send_block_broadcast(BlockBroadcast broadcast, int mode) override { } void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) override; diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index a43d4a70..2e703faf 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -226,6 +226,10 @@ class ValidatorManagerImpl : public ValidatorManager { td::uint32 validator_set_hash, td::Promise promise) override { promise.set_value(td::Unit()); } + void send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) { + callback_->send_block_candidate(id, cc_seqno, validator_set_hash, std::move(data)); + } void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; @@ -326,7 +330,7 @@ class ValidatorManagerImpl : public ValidatorManager { void send_top_shard_block_description(td::Ref desc) override { UNREACHABLE(); } - void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override { + void send_block_broadcast(BlockBroadcast broadcast, int mode) override { } void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) override { diff --git a/validator/manager.cpp b/validator/manager.cpp index 03c4f795..67cd60a7 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1284,11 +1284,15 @@ void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate can } if (!id.is_masterchain()) { add_cached_block_candidate(ReceivedBlock{id, candidate.data.clone()}); - callback_->send_block_candidate(id, cc_seqno, validator_set_hash, candidate.data.clone()); } td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise)); } +void ValidatorManagerImpl::send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::BufferSlice data) { + callback_->send_block_candidate(id, cc_seqno, validator_set_hash, std::move(data)); +} + void ValidatorManagerImpl::write_handle(BlockHandle handle, td::Promise promise) { auto P = td::PromiseCreator::lambda( [SelfId = actor_id(this), handle, promise = std::move(promise)](td::Result R) mutable { @@ -1616,8 +1620,8 @@ void ValidatorManagerImpl::send_top_shard_block_description(td::Refsend_broadcast(std::move(broadcast), custom_overlays_only); +void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast, int mode) { + callback_->send_broadcast(std::move(broadcast), mode); } void ValidatorManagerImpl::start_up() { diff --git a/validator/manager.hpp b/validator/manager.hpp index 28949837..7410f75f 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -434,6 +434,8 @@ class ValidatorManagerImpl : public ValidatorManager { void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::Promise promise) override; + void send_block_candidate_broadcast(BlockIdExt id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) override; void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; @@ -498,7 +500,7 @@ class ValidatorManagerImpl : public ValidatorManager { void send_external_message(td::Ref message) override; void send_ihr_message(td::Ref message) override; void send_top_shard_block_description(td::Ref desc) override; - void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override; + void send_block_broadcast(BlockBroadcast broadcast, int mode) override; void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) override; void get_shard_client_state(bool from_db, td::Promise promise) override; diff --git a/validator/validator-group.cpp b/validator/validator-group.cpp index 2f43b3b9..1c6a9d0e 100644 --- a/validator/validator-group.cpp +++ b/validator/validator-group.cpp @@ -18,6 +18,7 @@ */ #include "validator-group.hpp" #include "fabric.h" +#include "full-node-master.hpp" #include "ton/ton-io.hpp" #include "td/utils/overloaded.h" #include "common/delay.h" @@ -27,8 +28,14 @@ namespace ton { namespace validator { +static bool need_send_candidate_broadcast(const validatorsession::BlockSourceInfo &source_info, bool is_masterchain) { + return source_info.first_block_round == source_info.round && source_info.source_priority == 0 && !is_masterchain; +} + void ValidatorGroup::generate_block_candidate( - td::uint32 round_id, td::Promise promise) { + validatorsession::BlockSourceInfo source_info, + td::Promise promise) { + td::uint32 round_id = source_info.round; if (round_id > last_known_round_id_) { last_known_round_id_ = round_id; } @@ -53,12 +60,15 @@ void ValidatorGroup::generate_block_candidate( run_collate_query( shard_, min_masterchain_block_id_, prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()}, validator_set_, opts_->get_collator_options(), manager_, td::Timestamp::in(10.0), - [SelfId = actor_id(this), cache = cached_collated_block_](td::Result R) { - td::actor::send_closure(SelfId, &ValidatorGroup::generated_block_candidate, std::move(cache), std::move(R)); + [SelfId = actor_id(this), cache = cached_collated_block_, source_info](td::Result R) mutable { + td::actor::send_closure(SelfId, &ValidatorGroup::generated_block_candidate, std::move(source_info), + std::move(cache), std::move(R)); }, cancellation_token_source_.get_cancellation_token(), /* mode = */ 0); } -void ValidatorGroup::generated_block_candidate(std::shared_ptr cache, td::Result R) { +void ValidatorGroup::generated_block_candidate(validatorsession::BlockSourceInfo source_info, + std::shared_ptr cache, + td::Result R) { if (R.is_error()) { for (auto &p : cache->promises) { p.set_error(R.error().clone()); @@ -69,6 +79,9 @@ void ValidatorGroup::generated_block_candidate(std::shared_ptrresult = std::move(candidate); for (auto &p : cache->promises) { p.set_value(cache->result.value().clone()); @@ -77,8 +90,9 @@ void ValidatorGroup::generated_block_candidate(std::shared_ptrpromises.clear(); } -void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidate block, +void ValidatorGroup::validate_block_candidate(validatorsession::BlockSourceInfo source_info, BlockCandidate block, td::Promise> promise) { + td::uint32 round_id = source_info.round; if (round_id > last_known_round_id_) { last_known_round_id_ = round_id; } @@ -97,7 +111,8 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat return; } - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), round_id, block = block.clone(), + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), source_info, block = block.clone(), manager = manager_, + validator_set = validator_set_, promise = std::move(promise)](td::Result R) mutable { if (R.is_error()) { auto S = R.move_as_error(); @@ -105,19 +120,22 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat LOG(ERROR) << "failed to validate candidate: " << S; } delay_action( - [SelfId, round_id, block = std::move(block), promise = std::move(promise)]() mutable { - td::actor::send_closure(SelfId, &ValidatorGroup::validate_block_candidate, round_id, std::move(block), - std::move(promise)); + [SelfId, source_info, block = std::move(block), promise = std::move(promise)]() mutable { + td::actor::send_closure(SelfId, &ValidatorGroup::validate_block_candidate, std::move(source_info), + std::move(block), std::move(promise)); }, td::Timestamp::in(0.1)); } else { auto v = R.move_as_ok(); v.visit(td::overloaded( [&](UnixTime ts) { - td::actor::send_closure(SelfId, &ValidatorGroup::update_approve_cache, block_to_cache_key(block), - ts); + td::actor::send_closure(SelfId, &ValidatorGroup::update_approve_cache, block_to_cache_key(block), ts); td::actor::send_closure(SelfId, &ValidatorGroup::add_available_block_candidate, block.pubkey.as_bits256(), block.id, block.collated_file_hash); + if (need_send_candidate_broadcast(source_info, block.id.is_masterchain())) { + td::actor::send_closure(SelfId, &ValidatorGroup::send_block_candidate_broadcast, block.id, + block.data.clone()); + } promise.set_value({ts, false}); }, [&](CandidateReject reject) { @@ -140,13 +158,14 @@ void ValidatorGroup::update_approve_cache(CacheKey key, UnixTime value) { approved_candidates_cache_[key] = value; } -void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash src, td::BufferSlice block_data, +void ValidatorGroup::accept_block_candidate(validatorsession::BlockSourceInfo source_info, td::BufferSlice block_data, RootHash root_hash, FileHash file_hash, std::vector signatures, std::vector approve_signatures, validatorsession::ValidatorSessionStats stats, td::Promise promise) { stats.cc_seqno = validator_set_->get_catchain_seqno(); + td::uint32 round_id = source_info.round; if (round_id >= last_known_round_id_) { last_known_round_id_ = round_id + 1; } @@ -165,7 +184,21 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s td::actor::send_closure(manager_, &ValidatorManager::log_validator_session_stats, next_block_id, std::move(stats)); auto block = block_data.size() > 0 ? create_block(next_block_id, std::move(block_data)).move_as_ok() : td::Ref{}; - bool send_broadcast = src == local_id_; + + // Creator of the block sends broadcast to public overlays + // Creator of the block sends broadcast to private block overlay unless candidate broadcast was sent + // Any node sends broadcast to custom overlays unless candidate broadcast was sent + int send_broadcast_mode = 0; + bool sent_candidate = sent_candidate_broadcasts_.contains(block->block_id()); + if (source_info.source.compute_short_id() == local_id_) { + send_broadcast_mode |= fullnode::FullNode::broadcast_mode_public; + if (!sent_candidate) { + send_broadcast_mode |= fullnode::FullNode::broadcast_mode_private_block; + } + } + if (!sent_candidate) { + send_broadcast_mode |= fullnode::FullNode::broadcast_mode_custom; + } auto P = td::PromiseCreator::lambda([=, SelfId = actor_id(this), block_id = next_block_id, prev = prev_block_ids_, promise = std::move(promise)](td::Result R) mutable { @@ -176,7 +209,7 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s } LOG_CHECK(R.error().code() == ErrorCode::timeout || R.error().code() == ErrorCode::notready) << R.move_as_error(); td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block), - std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast, + std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast_mode, std::move(promise)); } else { promise.set_value(R.move_as_ok()); @@ -184,7 +217,8 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s }); run_accept_block_query(next_block_id, std::move(block), prev_block_ids_, validator_set_, std::move(sig_set), - std::move(approve_sig_set), send_broadcast, manager_, std::move(P)); + std::move(approve_sig_set), send_broadcast_mode, manager_, + std::move(P)); prev_block_ids_ = std::vector{next_block_id}; cached_collated_block_ = nullptr; approved_candidates_cache_.clear(); @@ -193,14 +227,14 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s void ValidatorGroup::retry_accept_block_query(BlockIdExt block_id, td::Ref block, std::vector prev, td::Ref sig_set, - td::Ref approve_sig_set, bool send_broadcast, + td::Ref approve_sig_set, int send_broadcast_mode, td::Promise promise) { auto P = td::PromiseCreator::lambda( [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { if (R.is_error()) { LOG_CHECK(R.error().code() == ErrorCode::timeout) << R.move_as_error(); td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block), - std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast, + std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast_mode, std::move(promise)); } else { promise.set_value(R.move_as_ok()); @@ -208,7 +242,7 @@ void ValidatorGroup::retry_accept_block_query(BlockIdExt block_id, td::Ref ValidatorGroup::ma public: Callback(td::actor::ActorId id) : id_(id) { } - void on_candidate(td::uint32 round, PublicKey source, validatorsession::ValidatorSessionRootHash root_hash, - td::BufferSlice data, td::BufferSlice collated_data, + void on_candidate(validatorsession::BlockSourceInfo source_info, + validatorsession::ValidatorSessionRootHash root_hash, td::BufferSlice data, + td::BufferSlice collated_data, td::Promise promise) override { auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result> R) mutable { @@ -266,18 +301,19 @@ std::unique_ptr ValidatorGroup::ma } }); - BlockCandidate candidate{Ed25519_PublicKey{source.ed25519_value().raw()}, + BlockCandidate candidate{Ed25519_PublicKey{source_info.source.ed25519_value().raw()}, BlockIdExt{0, 0, 0, root_hash, sha256_bits256(data.as_slice())}, sha256_bits256(collated_data.as_slice()), data.clone(), collated_data.clone()}; - td::actor::send_closure(id_, &ValidatorGroup::validate_block_candidate, round, std::move(candidate), - std::move(P)); + td::actor::send_closure(id_, &ValidatorGroup::validate_block_candidate, std::move(source_info), + std::move(candidate), std::move(P)); } - void on_generate_slot(td::uint32 round, + void on_generate_slot(validatorsession::BlockSourceInfo source_info, td::Promise promise) override { - td::actor::send_closure(id_, &ValidatorGroup::generate_block_candidate, round, std::move(promise)); + td::actor::send_closure(id_, &ValidatorGroup::generate_block_candidate, std::move(source_info), + std::move(promise)); } - void on_block_committed(td::uint32 round, PublicKey source, validatorsession::ValidatorSessionRootHash root_hash, + void on_block_committed(validatorsession::BlockSourceInfo source_info, validatorsession::ValidatorSessionRootHash root_hash, validatorsession::ValidatorSessionFileHash file_hash, td::BufferSlice data, std::vector> signatures, std::vector> approve_signatures, @@ -291,9 +327,9 @@ std::unique_ptr ValidatorGroup::ma approve_sigs.emplace_back(BlockSignature{sig.first.bits256_value(), std::move(sig.second)}); } auto P = td::PromiseCreator::lambda([](td::Result) {}); - td::actor::send_closure(id_, &ValidatorGroup::accept_block_candidate, round, source.compute_short_id(), - std::move(data), root_hash, file_hash, std::move(sigs), std::move(approve_sigs), - std::move(stats), std::move(P)); + td::actor::send_closure(id_, &ValidatorGroup::accept_block_candidate, std::move(source_info), std::move(data), + root_hash, file_hash, std::move(sigs), std::move(approve_sigs), std::move(stats), + std::move(P)); } void on_block_skipped(td::uint32 round) override { td::actor::send_closure(id_, &ValidatorGroup::skip_round, round); @@ -379,7 +415,7 @@ void ValidatorGroup::start(std::vector prev, BlockIdExt min_masterch auto block = p.block.size() > 0 ? create_block(next_block_id, std::move(p.block)).move_as_ok() : td::Ref{}; retry_accept_block_query(next_block_id, std::move(block), prev_block_ids_, std::move(p.sigs), - std::move(p.approve_sigs), false, std::move(p.promise)); + std::move(p.approve_sigs), 0, std::move(p.promise)); prev_block_ids_ = std::vector{next_block_id}; } postponed_accept_.clear(); diff --git a/validator/validator-group.hpp b/validator/validator-group.hpp index 35d574ac..db55614f 100644 --- a/validator/validator-group.hpp +++ b/validator/validator-group.hpp @@ -34,18 +34,18 @@ class ValidatorManager; class ValidatorGroup : public td::actor::Actor { public: - void generate_block_candidate(td::uint32 round_id, + void generate_block_candidate(validatorsession::BlockSourceInfo source_info, td::Promise promise); - void validate_block_candidate(td::uint32 round_id, BlockCandidate block, + void validate_block_candidate(validatorsession::BlockSourceInfo source_info, BlockCandidate block, td::Promise> promise); - void accept_block_candidate(td::uint32 round_id, PublicKeyHash src, td::BufferSlice block, RootHash root_hash, + void accept_block_candidate(validatorsession::BlockSourceInfo source_info, td::BufferSlice block, RootHash root_hash, FileHash file_hash, std::vector signatures, std::vector approve_signatures, validatorsession::ValidatorSessionStats stats, td::Promise promise); void skip_round(td::uint32 round); void retry_accept_block_query(BlockIdExt block_id, td::Ref block, std::vector prev, td::Ref sigs, td::Ref approve_sigs, - bool send_broadcast, td::Promise promise); + int send_broadcast_mode, td::Promise promise); void get_approved_candidate(PublicKey source, RootHash root_hash, FileHash file_hash, FileHash collated_data_file_hash, td::Promise promise); BlockIdExt create_next_block_id(RootHash root_hash, FileHash file_hash) const; @@ -140,7 +140,8 @@ class ValidatorGroup : public td::actor::Actor { std::shared_ptr cached_collated_block_; td::CancellationTokenSource cancellation_token_source_; - void generated_block_candidate(std::shared_ptr cache, td::Result R); + void generated_block_candidate(validatorsession::BlockSourceInfo source_info, + std::shared_ptr cache, td::Result R); using CacheKey = std::tuple; std::map approved_candidates_cache_; @@ -161,6 +162,16 @@ class ValidatorGroup : public td::actor::Actor { void add_available_block_candidate(td::Bits256 source, BlockIdExt id, FileHash collated_data_hash) { available_block_candidates_.emplace(source, id, collated_data_hash); } + + std::set sent_candidate_broadcasts_; + + void send_block_candidate_broadcast(BlockIdExt id, td::BufferSlice data) { + if (sent_candidate_broadcasts_.insert(id).second) { + td::actor::send_closure(manager_, &ValidatorManager::send_block_candidate_broadcast, id, + validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(), + std::move(data)); + } + } }; } // namespace validator diff --git a/validator/validator.h b/validator/validator.h index 45e617f9..9dbaa185 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -174,7 +174,7 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0; virtual void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data) = 0; - virtual void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only = false) = 0; + virtual void send_broadcast(BlockBroadcast broadcast, int mode) = 0; virtual void download_block(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) = 0; virtual void download_zero_state(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,