From d5c09936cf019f57d5a3bb7246bc5445006db8ae Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Fri, 10 May 2024 17:04:49 +0300 Subject: [PATCH] Block broadcasts in custom overlays (#986) Co-authored-by: SpyCheese --- create-hardfork/create-hardfork.cpp | 2 +- test/test-ton-collator.cpp | 2 +- tl/generate/scheme/ton_api.tl | 2 +- tl/generate/scheme/ton_api.tlo | Bin 89348 -> 89388 bytes .../validator-engine-console-query.cpp | 7 +- validator-engine/validator-engine.cpp | 22 +-- validator/full-node-private-overlay.cpp | 79 ++++++++--- validator/full-node-private-overlay.hpp | 34 +++-- validator/full-node-shard.cpp | 21 +-- validator/full-node-shard.h | 2 +- validator/full-node-shard.hpp | 3 +- validator/full-node.cpp | 130 ++++++++++++------ validator/full-node.h | 17 ++- validator/full-node.hpp | 25 ++-- validator/impl/accept-block.cpp | 8 +- validator/interfaces/validator-manager.h | 2 +- validator/manager-disk.hpp | 2 +- validator/manager-hardfork.hpp | 2 +- validator/manager.cpp | 4 +- validator/manager.hpp | 2 +- validator/validator.h | 2 +- 21 files changed, 234 insertions(+), 134 deletions(-) diff --git a/create-hardfork/create-hardfork.cpp b/create-hardfork/create-hardfork.cpp index c5f1add8..a9e7403a 100644 --- a/create-hardfork/create-hardfork.cpp +++ b/create-hardfork/create-hardfork.cpp @@ -246,7 +246,7 @@ class HardforkCreator : public td::actor::Actor { } void send_shard_block_info(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::BufferSlice data) override { } - void send_broadcast(ton::BlockBroadcast broadcast) override { + void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override { } void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) override { diff --git a/test/test-ton-collator.cpp b/test/test-ton-collator.cpp index 286a5fea..d36db5d2 100644 --- a/test/test-ton-collator.cpp +++ b/test/test-ton-collator.cpp @@ -347,7 +347,7 @@ class TestNode : public td::actor::Actor { } } } - void send_broadcast(ton::BlockBroadcast broadcast) override { + void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override { } void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) override { diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 3db2ace9..d0a063f7 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -595,7 +595,7 @@ engine.validator.config out_port:int addrs:(vector engine.Addr) adnl:(vector eng liteservers:(vector engine.liteServer) control:(vector engine.controlInterface) gc:engine.gc = engine.validator.Config; -engine.validator.customOverlayNode adnl_id:int256 msg_sender:Bool msg_sender_priority:int = engine.validator.CustomOverlayNode; +engine.validator.customOverlayNode adnl_id:int256 msg_sender:Bool msg_sender_priority:int block_sender:Bool = engine.validator.CustomOverlayNode; engine.validator.customOverlay name:string nodes:(vector engine.validator.customOverlayNode) = engine.validator.CustomOverlay; engine.validator.customOverlaysConfig overlays:(vector engine.validator.customOverlay) = engine.validator.CustomOverlaysConfig; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index b0d5f315b0603c43be7a61d0f9290c55a69a744a..fb7f219765fea2ff1aa4dee29f67166b388d01f9 100644 GIT binary patch delta 91 zcmZqK#kyt}>jnW4*8lh1HI*jc6qeg;EMnTg0^&~2nW6w;Oqim=2@<(^Ovs02^29mn mlb=lSnY_S~l_x1DKRG+TI5jUNwP^Cge0i|?%{Eg{90vgF<0sYt delta 78 zcmZ3pi?wAJ>jnW4)<<2JLzE`p6qeg;EMnTg0^&~2nW6w;Oqim=2@<(^Ovs0Ma^p(z Z$rt7bOnx%OXL5oS2UyAGl&L3<0{~H}Bq9I+ diff --git a/validator-engine-console/validator-engine-console-query.cpp b/validator-engine-console/validator-engine-console-query.cpp index 956c23aa..38f524fc 100644 --- a/validator-engine-console/validator-engine-console-query.cpp +++ b/validator-engine-console/validator-engine-console-query.cpp @@ -1171,9 +1171,10 @@ td::Status ShowCustomOverlaysQuery::receive(td::BufferSlice data) { td::TerminalIO::out() << "Overlay \"" << overlay->name_ << "\": " << overlay->nodes_.size() << " nodes\n"; for (const auto &node : overlay->nodes_) { td::TerminalIO::out() << " " << node->adnl_id_ - << (node->msg_sender_ ? (PSTRING() << " (sender, p=" << node->msg_sender_priority_ << ")") - : "") - << "\n"; + << (node->msg_sender_ + ? (PSTRING() << " (msg sender, p=" << node->msg_sender_priority_ << ")") + : "") + << (node->block_sender_ ? " (block sender)" : "") << "\n"; } td::TerminalIO::out() << "\n"; } diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index 8ba99178..3c98a4ac 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -2357,16 +2357,9 @@ void ValidatorEngine::load_custom_overlays_config() { } for (auto &overlay : custom_overlays_config_->overlays_) { - std::vector nodes; - std::map senders; - for (const auto &node : overlay->nodes_) { - nodes.emplace_back(node->adnl_id_); - if (node->msg_sender_) { - senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->msg_sender_priority_; - } - } - td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes), - std::move(senders), overlay->name_, [](td::Result R) { R.ensure(); }); + td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_custom_overlay, + ton::validator::fullnode::CustomOverlayParams::fetch(*overlay), + [](td::Result R) { R.ensure(); }); } } @@ -3571,11 +3564,10 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_addCustom senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->msg_sender_priority_; } } - std::string name = overlay->name_; + auto params = ton::validator::fullnode::CustomOverlayParams::fetch(*query.overlay_); td::actor::send_closure( - full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes), std::move(senders), - std::move(name), - [SelfId = actor_id(this), overlay = std::move(overlay), + full_node_, &ton::validator::fullnode::FullNode::add_custom_overlay, std::move(params), + [SelfId = actor_id(this), overlay = std::move(query.overlay_), promise = std::move(promise)](td::Result R) mutable { if (R.is_error()) { promise.set_value(create_control_query_error(R.move_as_error())); @@ -3605,7 +3597,7 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_delCustom return; } td::actor::send_closure( - full_node_, &ton::validator::fullnode::FullNode::del_ext_msg_overlay, query.name_, + full_node_, &ton::validator::fullnode::FullNode::del_custom_overlay, query.name_, [SelfId = actor_id(this), name = query.name_, promise = std::move(promise)](td::Result R) mutable { if (R.is_error()) { promise.set_value(create_control_query_error(R.move_as_error())); diff --git a/validator/full-node-private-overlay.cpp b/validator/full-node-private-overlay.cpp index a64c0e9b..74bb75c5 100644 --- a/validator/full-node-private-overlay.cpp +++ b/validator/full-node-private-overlay.cpp @@ -38,17 +38,7 @@ void FullNodePrivateBlockOverlay::process_block_broadcast(PublicKeyHash src, ton } VLOG(FULL_NODE_DEBUG) << "Received block broadcast in private overlay from " << src << ": " << B.ok().block_id.to_str(); - auto P = td::PromiseCreator::lambda([](td::Result R) { - if (R.is_error()) { - if (R.error().code() == ErrorCode::notready) { - LOG(DEBUG) << "dropped broadcast: " << R.move_as_error(); - } else { - LOG(INFO) << "dropped broadcast: " << R.move_as_error(); - } - } - }); - td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, B.move_as_ok(), - std::move(P)); + td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok()); } void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) { @@ -60,6 +50,9 @@ void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api:: } void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { + if (adnl::AdnlNodeIdShort{src} == local_id_) { + return; + } auto B = fetch_tl_object(std::move(broadcast), true); if (B.is_error()) { return; @@ -169,18 +162,47 @@ void FullNodePrivateBlockOverlay::tear_down() { } } -void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query) { - auto it = senders_.find(adnl::AdnlNodeIdShort{src}); - if (it == senders_.end()) { +void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) { + process_block_broadcast(src, query); +} + +void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query) { + process_block_broadcast(src, query); +} + +void FullNodeCustomOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) { + if (!block_senders_.count(adnl::AdnlNodeIdShort(src))) { + VLOG(FULL_NODE_DEBUG) << "Dropping block broadcast in private overlay \"" << name_ << "\" from unauthorized sender " + << src; return; } - LOG(FULL_NODE_DEBUG) << "Got external message in private overlay \"" << name_ << "\" from " << src - << " (priority=" << it->second << ")"; + auto B = deserialize_block_broadcast(query, overlay::Overlays::max_fec_broadcast_size()); + if (B.is_error()) { + LOG(DEBUG) << "dropped broadcast: " << B.move_as_error(); + return; + } + VLOG(FULL_NODE_DEBUG) << "Received block broadcast in custom overlay \"" << name_ << "\" from " << src << ": " + << B.ok().block_id.to_str(); + td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok()); +} + +void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query) { + auto it = msg_senders_.find(adnl::AdnlNodeIdShort{src}); + if (it == msg_senders_.end()) { + VLOG(FULL_NODE_DEBUG) << "Dropping external message broadcast in custom overlay \"" << name_ + << "\" from unauthorized sender " << src; + return; + } + VLOG(FULL_NODE_DEBUG) << "Got external message in custom overlay \"" << name_ << "\" from " << src + << " (priority=" << it->second << ")"; td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message, std::move(query.message_->data_), it->second); } void FullNodeCustomOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { + if (adnl::AdnlNodeIdShort{src} == local_id_) { + return; + } auto B = fetch_tl_object(std::move(broadcast), true); if (B.is_error()) { return; @@ -192,7 +214,7 @@ void FullNodeCustomOverlay::send_external_message(td::BufferSlice data) { if (!inited_ || config_.ext_messages_broadcast_disabled_) { return; } - LOG(FULL_NODE_DEBUG) << "Sending external message to private overlay \"" << name_ << "\""; + VLOG(FULL_NODE_DEBUG) << "Sending external message to custom overlay \"" << name_ << "\""; auto B = create_serialize_tl_object( create_tl_object(std::move(data))); if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) { @@ -204,6 +226,21 @@ void FullNodeCustomOverlay::send_external_message(td::BufferSlice data) { } } +void FullNodeCustomOverlay::send_broadcast(BlockBroadcast broadcast) { + if (!inited_) { + return; + } + VLOG(FULL_NODE_DEBUG) << "Sending block broadcast to custom overlay \"" << name_ + << "\": " << broadcast.block_id.to_str(); + auto B = serialize_block_broadcast(broadcast, true); // compression_enabled = true + if (B.is_error()) { + VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error(); + return; + } + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok()); +} + void FullNodeCustomOverlay::start_up() { std::sort(nodes_.begin(), nodes_.end()); nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end()); @@ -234,7 +271,8 @@ void FullNodeCustomOverlay::try_init() { void FullNodeCustomOverlay::init() { LOG(FULL_NODE_WARNING) << "Creating custom overlay \"" << name_ << "\" for adnl id " << local_id_ << " : " - << nodes_.size() << " nodes, overlay_id=" << overlay_id_; + << nodes_.size() << " nodes, " << msg_senders_.size() << " msg senders, " + << block_senders_.size() << " block senders, overlay_id=" << overlay_id_; class Callback : public overlay::Overlays::Callback { public: void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { @@ -256,9 +294,12 @@ void FullNodeCustomOverlay::init() { }; std::map authorized_keys; - for (const auto &sender : senders_) { + for (const auto &sender : msg_senders_) { authorized_keys[sender.first.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size(); } + for (const auto &sender : block_senders_) { + authorized_keys[sender.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size(); + } overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), 0, std::move(authorized_keys)}; td::actor::send_closure( overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), nodes_, diff --git a/validator/full-node-private-overlay.hpp b/validator/full-node-private-overlay.hpp index c651acef..196d6da6 100644 --- a/validator/full-node-private-overlay.hpp +++ b/validator/full-node-private-overlay.hpp @@ -52,7 +52,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor { td::actor::ActorId keyring, td::actor::ActorId adnl, td::actor::ActorId rldp, td::actor::ActorId rldp2, td::actor::ActorId overlays, - td::actor::ActorId validator_manager) + td::actor::ActorId validator_manager, + td::actor::ActorId full_node) : local_id_(local_id) , nodes_(std::move(nodes)) , zero_state_file_hash_(zero_state_file_hash) @@ -63,7 +64,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor { , rldp_(rldp) , rldp2_(rldp2) , overlays_(overlays) - , validator_manager_(validator_manager) { + , validator_manager_(validator_manager) + , full_node_(full_node) { } private: @@ -79,6 +81,7 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor { td::actor::ActorId rldp2_; td::actor::ActorId overlays_; td::actor::ActorId validator_manager_; + td::actor::ActorId full_node_; bool inited_ = false; overlay::OverlayIdFull overlay_id_full_; @@ -90,6 +93,10 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor { class FullNodeCustomOverlay : public td::actor::Actor { public: + void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query); + void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query); template void process_broadcast(PublicKeyHash, T &) { @@ -98,6 +105,7 @@ class FullNodeCustomOverlay : public td::actor::Actor { void receive_broadcast(PublicKeyHash src, td::BufferSlice query); void send_external_message(td::BufferSlice data); + void send_broadcast(BlockBroadcast broadcast); void set_config(FullNodeConfig config) { config_ = std::move(config); @@ -106,16 +114,17 @@ class FullNodeCustomOverlay : public td::actor::Actor { void start_up() override; void tear_down() override; - FullNodeCustomOverlay(adnl::AdnlNodeIdShort local_id, std::vector nodes, - std::map senders, std::string name, FileHash zero_state_file_hash, + FullNodeCustomOverlay(adnl::AdnlNodeIdShort local_id, CustomOverlayParams params, FileHash zero_state_file_hash, FullNodeConfig config, td::actor::ActorId keyring, td::actor::ActorId adnl, td::actor::ActorId rldp, td::actor::ActorId rldp2, td::actor::ActorId overlays, - td::actor::ActorId validator_manager) + td::actor::ActorId validator_manager, + td::actor::ActorId full_node) : local_id_(local_id) - , nodes_(std::move(nodes)) - , senders_(std::move(senders)) - , name_(std::move(name)) + , name_(std::move(params.name_)) + , nodes_(std::move(params.nodes_)) + , msg_senders_(std::move(params.msg_senders_)) + , block_senders_(std::move(params.block_senders_)) , zero_state_file_hash_(zero_state_file_hash) , config_(config) , keyring_(keyring) @@ -123,14 +132,16 @@ class FullNodeCustomOverlay : public td::actor::Actor { , rldp_(rldp) , rldp2_(rldp2) , overlays_(overlays) - , validator_manager_(validator_manager) { + , validator_manager_(validator_manager) + , full_node_(full_node) { } private: adnl::AdnlNodeIdShort local_id_; - std::vector nodes_; - std::map senders_; std::string name_; + std::vector nodes_; + std::map msg_senders_; + std::set block_senders_; FileHash zero_state_file_hash_; FullNodeConfig config_; @@ -140,6 +151,7 @@ class FullNodeCustomOverlay : public td::actor::Actor { td::actor::ActorId rldp2_; td::actor::ActorId overlays_; td::actor::ActorId validator_manager_; + td::actor::ActorId full_node_; bool inited_ = false; overlay::OverlayIdFull overlay_id_full_; diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 433d9469..6e265cbc 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -661,17 +661,7 @@ void FullNodeShardImpl::process_block_broadcast(PublicKeyHash src, ton_api::tonN return; } VLOG(FULL_NODE_DEBUG) << "Received block broadcast from " << src << ": " << B.ok().block_id.to_str(); - auto P = td::PromiseCreator::lambda([](td::Result R) { - if (R.is_error()) { - if (R.error().code() == ErrorCode::notready) { - LOG(DEBUG) << "dropped broadcast: " << R.move_as_error(); - } else { - LOG(INFO) << "dropped broadcast: " << R.move_as_error(); - } - } - }); - td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, B.move_as_ok(), - std::move(P)); + td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok()); } void FullNodeShardImpl::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { @@ -1137,7 +1127,8 @@ FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id, td::actor::ActorId rldp, td::actor::ActorId rldp2, td::actor::ActorId overlays, td::actor::ActorId validator_manager, - td::actor::ActorId client) + td::actor::ActorId client, + td::actor::ActorId full_node) : shard_(shard) , local_id_(local_id) , adnl_id_(adnl_id) @@ -1149,6 +1140,7 @@ FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id, , overlays_(overlays) , validator_manager_(validator_manager) , client_(client) + , full_node_(full_node) , config_(config) { } @@ -1157,9 +1149,10 @@ td::actor::ActorOwn FullNodeShard::create( FullNodeConfig config, td::actor::ActorId keyring, td::actor::ActorId adnl, td::actor::ActorId rldp, td::actor::ActorId rldp2, td::actor::ActorId overlays, td::actor::ActorId validator_manager, - td::actor::ActorId client) { + td::actor::ActorId client, td::actor::ActorId full_node) { return td::actor::create_actor("tonnode", shard, local_id, adnl_id, zero_state_file_hash, config, - keyring, adnl, rldp, rldp2, overlays, validator_manager, client); + keyring, adnl, rldp, rldp2, overlays, validator_manager, client, + full_node); } } // namespace fullnode diff --git a/validator/full-node-shard.h b/validator/full-node-shard.h index 1b742fb9..2ed13d66 100644 --- a/validator/full-node-shard.h +++ b/validator/full-node-shard.h @@ -72,7 +72,7 @@ class FullNodeShard : public td::actor::Actor { FullNodeConfig config, td::actor::ActorId keyring, td::actor::ActorId adnl, td::actor::ActorId rldp, td::actor::ActorId rldp2, td::actor::ActorId overlays, td::actor::ActorId validator_manager, - td::actor::ActorId client); + td::actor::ActorId client, td::actor::ActorId full_node); }; } // namespace fullnode diff --git a/validator/full-node-shard.hpp b/validator/full-node-shard.hpp index c8b5301a..1fdbce1c 100644 --- a/validator/full-node-shard.hpp +++ b/validator/full-node-shard.hpp @@ -214,7 +214,7 @@ class FullNodeShardImpl : public FullNodeShard { td::actor::ActorId adnl, td::actor::ActorId rldp, td::actor::ActorId rldp2, td::actor::ActorId overlays, td::actor::ActorId validator_manager, - td::actor::ActorId client); + td::actor::ActorId client, td::actor::ActorId full_node); private: bool use_new_download() const { @@ -236,6 +236,7 @@ class FullNodeShardImpl : public FullNodeShard { td::actor::ActorId overlays_; td::actor::ActorId validator_manager_; td::actor::ActorId client_; + td::actor::ActorId full_node_; td::uint32 attempt_ = 0; diff --git a/validator/full-node.cpp b/validator/full-node.cpp index affb8bcc..8d0f26bc 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -36,8 +36,8 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise pr local_keys_.insert(key); create_private_block_overlay(key); - for (auto &p : private_custom_overlays_) { - update_ext_msg_overlay(p.first, p.second); + for (auto &p : custom_overlays_) { + update_custom_overlay(p.second); } if (!sign_cert_by_.is_zero()) { @@ -64,8 +64,8 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise pr } local_keys_.erase(key); private_block_overlays_.erase(key); - for (auto &p : private_custom_overlays_) { - update_ext_msg_overlay(p.first, p.second); + for (auto &p : custom_overlays_) { + update_custom_overlay(p.second); } if (sign_cert_by_ != key) { @@ -119,8 +119,8 @@ void FullNodeImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise nodes, - std::map senders, std::string name, - td::Promise promise) { - if (nodes.empty()) { +void FullNodeImpl::add_custom_overlay(CustomOverlayParams params, td::Promise promise) { + if (params.nodes_.empty()) { promise.set_error(td::Status::Error("list of nodes is empty")); return; } - if (private_custom_overlays_.count(name)) { - promise.set_error(td::Status::Error(PSTRING() << "duplicate overlay name \"" << name << "\"")); + std::string name = params.name_; + if (custom_overlays_.count(name)) { + promise.set_error(td::Status::Error(PSTRING() << "duplicate custom overlay name \"" << name << "\"")); return; } - VLOG(FULL_NODE_WARNING) << "Adding private overlay for external messages \"" << name << "\", " << nodes.size() - << " nodes"; - auto &p = private_custom_overlays_[name]; - p.nodes_ = nodes; - p.senders_ = senders; - update_ext_msg_overlay(name, p); + VLOG(FULL_NODE_WARNING) << "Adding custom overlay \"" << name << "\", " << params.nodes_.size() << " nodes"; + auto &p = custom_overlays_[name]; + p.params_ = std::move(params); + update_custom_overlay(p); promise.set_result(td::Unit()); } -void FullNodeImpl::del_ext_msg_overlay(std::string name, td::Promise promise) { - auto it = private_custom_overlays_.find(name); - if (it == private_custom_overlays_.end()) { +void FullNodeImpl::del_custom_overlay(std::string name, td::Promise promise) { + auto it = custom_overlays_.find(name); + if (it == custom_overlays_.end()) { promise.set_error(td::Status::Error(PSTRING() << "no such overlay \"" << name << "\"")); return; } - private_custom_overlays_.erase(it); + custom_overlays_.erase(it); promise.set_result(td::Unit()); } @@ -182,8 +179,9 @@ void FullNodeImpl::initial_read_complete(BlockHandle top_handle) { void FullNodeImpl::add_shard(ShardIdFull shard) { while (true) { if (shards_.count(shard) == 0) { - shards_.emplace(shard, FullNodeShard::create(shard, local_id_, adnl_id_, zero_state_file_hash_, config_, keyring_, - adnl_, rldp_, rldp2_, overlays_, validator_manager_, client_)); + shards_.emplace(shard, + FullNodeShard::create(shard, local_id_, adnl_id_, zero_state_file_hash_, config_, keyring_, adnl_, + rldp_, rldp2_, overlays_, validator_manager_, client_, actor_id(this))); if (all_validators_.size() > 0) { td::actor::send_closure(shards_[shard], &FullNodeShard::update_validators, all_validators_, sign_cert_by_); } @@ -221,10 +219,10 @@ void FullNodeImpl::send_ext_message(AccountIdPrefixFull dst, td::BufferSlice dat VLOG(FULL_NODE_WARNING) << "dropping OUT ext message to unknown shard"; return; } - for (auto &private_overlay : private_custom_overlays_) { + for (auto &private_overlay : custom_overlays_) { for (auto &actor : private_overlay.second.actors_) { auto local_id = actor.first; - if (private_overlay.second.senders_.count(local_id)) { + if (private_overlay.second.params_.msg_senders_.count(local_id)) { td::actor::send_closure(actor.second, &FullNodeCustomOverlay::send_external_message, data.clone()); } } @@ -245,7 +243,11 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s td::actor::send_closure(shard, &FullNodeShard::send_shard_block_info, block_id, cc_seqno, std::move(data)); } -void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) { +void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) { + send_block_broadcast_to_custom_overlays(broadcast); + if (custom_overlays_only) { + return; + } auto shard = get_shard(ShardIdFull{masterchainId}); if (shard.empty()) { VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard"; @@ -404,7 +406,22 @@ void FullNodeImpl::new_key_block(BlockHandle handle) { std::move(P)); } +void FullNodeImpl::process_block_broadcast(BlockBroadcast broadcast) { + send_block_broadcast_to_custom_overlays(broadcast); + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, std::move(broadcast), + [](td::Result R) { + if (R.is_error()) { + if (R.error().code() == ErrorCode::notready) { + LOG(DEBUG) << "dropped broadcast: " << R.move_as_error(); + } else { + LOG(INFO) << "dropped broadcast: " << R.move_as_error(); + } + } + }); +} + void FullNodeImpl::start_up() { + add_shard(ShardIdFull{masterchainId}); if (local_id_.is_zero()) { if(adnl_id_.is_zero()) { auto pk = ton::PrivateKey{ton::privkeys::Ed25519::random()}; @@ -435,8 +452,8 @@ void FullNodeImpl::start_up() { void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override { td::actor::send_closure(id_, &FullNodeImpl::send_shard_block_info, block_id, cc_seqno, std::move(data)); } - void send_broadcast(BlockBroadcast broadcast) override { - td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast)); + void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override { + td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), custom_overlays_only); } void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) override { @@ -488,8 +505,8 @@ void FullNodeImpl::start_up() { } void FullNodeImpl::update_private_overlays() { - for (auto &p : private_custom_overlays_) { - update_ext_msg_overlay(p.first, p.second); + for (auto &p : custom_overlays_) { + update_custom_overlay(p.second); } private_block_overlays_.clear(); @@ -520,28 +537,30 @@ void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) { } private_block_overlays_[key] = td::actor::create_actor( "BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_, - private_block_overlays_enable_compression_, keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_); + private_block_overlays_enable_compression_, keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_, + actor_id(this)); } } -void FullNodeImpl::update_ext_msg_overlay(const std::string &name, ExtMsgOverlayInfo &overlay) { +void FullNodeImpl::update_custom_overlay(CustomOverlayInfo &overlay) { auto old_actors = std::move(overlay.actors_); overlay.actors_.clear(); + CustomOverlayParams ¶ms = overlay.params_; auto try_local_id = [&](const adnl::AdnlNodeIdShort &local_id) { - if (std::find(overlay.nodes_.begin(), overlay.nodes_.end(), local_id) != overlay.nodes_.end()) { + if (std::find(params.nodes_.begin(), params.nodes_.end(), local_id) != params.nodes_.end()) { auto it = old_actors.find(local_id); if (it != old_actors.end()) { overlay.actors_[local_id] = std::move(it->second); old_actors.erase(it); } else { overlay.actors_[local_id] = td::actor::create_actor( - "CustomOverlay", local_id, overlay.nodes_, overlay.senders_, name, zero_state_file_hash_, config_, - keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_); + "CustomOverlay", local_id, params, zero_state_file_hash_, config_, keyring_, adnl_, rldp_, rldp2_, + overlays_, validator_manager_, actor_id(this)); } } }; try_local_id(adnl_id_); - for (const PublicKeyHash &local_key: local_keys_) { + for (const PublicKeyHash &local_key : local_keys_) { auto it = current_validators_.find(local_key); if (it != current_validators_.end()) { try_local_id(it->second); @@ -549,6 +568,25 @@ void FullNodeImpl::update_ext_msg_overlay(const std::string &name, ExtMsgOverlay } } +void FullNodeImpl::send_block_broadcast_to_custom_overlays(const BlockBroadcast& broadcast) { + if (!custom_overlays_sent_broadcasts_.insert(broadcast.block_id).second) { + return; + } + custom_overlays_sent_broadcasts_lru_.push(broadcast.block_id); + if (custom_overlays_sent_broadcasts_lru_.size() > 256) { + custom_overlays_sent_broadcasts_.erase(custom_overlays_sent_broadcasts_lru_.front()); + custom_overlays_sent_broadcasts_lru_.pop(); + } + for (auto &private_overlay : custom_overlays_) { + for (auto &actor : private_overlay.second.actors_) { + auto local_id = actor.first; + if (private_overlay.second.params_.block_senders_.count(local_id)) { + td::actor::send_closure(actor.second, &FullNodeCustomOverlay::send_broadcast, broadcast.clone()); + } + } + } +} + FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash, FullNodeConfig config, td::actor::ActorId keyring, td::actor::ActorId adnl, td::actor::ActorId rldp, @@ -569,7 +607,6 @@ FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id , client_(client) , db_root_(db_root) , config_(config) { - add_shard(ShardIdFull{masterchainId}); } td::actor::ActorOwn FullNode::create(ton::PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, @@ -598,6 +635,21 @@ bool FullNodeConfig::operator!=(const FullNodeConfig &rhs) const { return !(*this == rhs); } +CustomOverlayParams CustomOverlayParams::fetch(const ton_api::engine_validator_customOverlay& f) { + CustomOverlayParams c; + c.name_ = f.name_; + for (const auto &node : f.nodes_) { + c.nodes_.emplace_back(node->adnl_id_); + if (node->msg_sender_) { + c.msg_senders_[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->msg_sender_priority_; + } + if (node->block_sender_) { + c.block_senders_.emplace(node->adnl_id_); + } + } + return c; +} + } // namespace fullnode } // namespace validator diff --git a/validator/full-node.h b/validator/full-node.h index 82e0dd5c..ac260dd7 100644 --- a/validator/full-node.h +++ b/validator/full-node.h @@ -55,6 +55,15 @@ struct FullNodeConfig { bool ext_messages_broadcast_disabled_ = false; }; +struct CustomOverlayParams { + std::string name_; + std::vector nodes_; + std::map msg_senders_; + std::set block_senders_; + + static CustomOverlayParams fetch(const ton_api::engine_validator_customOverlay& f); +}; + class FullNode : public td::actor::Actor { public: virtual ~FullNode() = default; @@ -74,10 +83,10 @@ class FullNode : public td::actor::Actor { virtual void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise promise) = 0; virtual void set_config(FullNodeConfig config) = 0; - virtual void add_ext_msg_overlay(std::vector nodes, - std::map senders, std::string name, - td::Promise promise) = 0; - virtual void del_ext_msg_overlay(std::string name, td::Promise promise) = 0; + virtual void add_custom_overlay(CustomOverlayParams params, td::Promise promise) = 0; + virtual void del_custom_overlay(std::string name, td::Promise promise) = 0; + + virtual void process_block_broadcast(BlockBroadcast broadcast) = 0; static constexpr td::uint32 max_block_size() { return 4 << 20; diff --git a/validator/full-node.hpp b/validator/full-node.hpp index 86664a3b..946a1705 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -27,6 +27,7 @@ #include #include +#include namespace ton { @@ -53,9 +54,8 @@ class FullNodeImpl : public FullNode { void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise promise) override; void set_config(FullNodeConfig config) override; - void add_ext_msg_overlay(std::vector nodes, std::map senders, - std::string name, td::Promise promise) override; - void del_ext_msg_overlay(std::string name, td::Promise promise) override; + void add_custom_overlay(CustomOverlayParams params, td::Promise promise) override; + void del_custom_overlay(std::string name, td::Promise promise) override; void add_shard(ShardIdFull shard); void del_shard(ShardIdFull shard); @@ -66,7 +66,7 @@ class FullNodeImpl : public FullNode { void send_ihr_message(AccountIdPrefixFull dst, td::BufferSlice data); void send_ext_message(AccountIdPrefixFull dst, td::BufferSlice data); void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqnp, td::BufferSlice data); - void send_broadcast(BlockBroadcast broadcast); + void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only); void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise); void download_zero_state(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise); @@ -83,6 +83,8 @@ class FullNodeImpl : public FullNode { void got_key_block_state(td::Ref state); void new_key_block(BlockHandle handle); + void process_block_broadcast(BlockBroadcast broadcast) override; + void start_up() override; FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash, @@ -123,18 +125,19 @@ class FullNodeImpl : public FullNode { std::map> private_block_overlays_; bool private_block_overlays_enable_compression_ = false; - struct ExtMsgOverlayInfo { - std::vector nodes_; - std::map senders_; - std::map> - actors_; // our local id -> actor + struct CustomOverlayInfo { + CustomOverlayParams params_; + std::map> actors_; // our local id -> actor }; - std::map private_custom_overlays_; + std::map custom_overlays_; + std::set custom_overlays_sent_broadcasts_; + std::queue custom_overlays_sent_broadcasts_lru_; void update_private_overlays(); void set_private_block_overlays_enable_compression(bool value); void create_private_block_overlay(PublicKeyHash key); - void update_ext_msg_overlay(const std::string& name, ExtMsgOverlayInfo& overlay); + void update_custom_overlay(CustomOverlayInfo& overlay); + void send_block_broadcast_to_custom_overlays(const BlockBroadcast& broadcast); }; } // namespace fullnode diff --git a/validator/impl/accept-block.cpp b/validator/impl/accept-block.cpp index cda1c787..3da1167a 100644 --- a/validator/impl/accept-block.cpp +++ b/validator/impl/accept-block.cpp @@ -899,11 +899,6 @@ void AcceptBlockQuery::written_block_info_2() { } void AcceptBlockQuery::applied() { - if (!send_broadcast_) { - finish_query(); - return; - } - BlockBroadcast b; b.data = data_->data(); b.block_id = id_; @@ -923,7 +918,8 @@ void AcceptBlockQuery::applied() { } // do not wait for answer - td::actor::send_closure_later(manager_, &ValidatorManager::send_block_broadcast, std::move(b)); + td::actor::send_closure_later(manager_, &ValidatorManager::send_block_broadcast, std::move(b), + /* custom_overlays_only = */ !send_broadcast_); finish_query(); } diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index 278915f1..d435b756 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -133,7 +133,7 @@ class ValidatorManager : public ValidatorManagerInterface { virtual void send_external_message(td::Ref message) = 0; virtual void send_ihr_message(td::Ref message) = 0; virtual void send_top_shard_block_description(td::Ref desc) = 0; - virtual void send_block_broadcast(BlockBroadcast broadcast) = 0; + virtual void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) = 0; virtual void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) = 0; virtual void get_shard_client_state(bool from_db, td::Promise promise) = 0; diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index eca6a741..5cf93884 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -251,7 +251,7 @@ class ValidatorManagerImpl : public ValidatorManager { new_ihr_message(message->serialize()); } void send_top_shard_block_description(td::Ref desc) override; - void send_block_broadcast(BlockBroadcast broadcast) override { + void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override { } void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) override; diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index f0cf83de..fb5237d6 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -317,7 +317,7 @@ class ValidatorManagerImpl : public ValidatorManager { void send_top_shard_block_description(td::Ref desc) override { UNREACHABLE(); } - void send_block_broadcast(BlockBroadcast broadcast) override { + void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override { } void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) override { diff --git a/validator/manager.cpp b/validator/manager.cpp index ddc58225..82cc10c2 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1503,8 +1503,8 @@ void ValidatorManagerImpl::send_top_shard_block_description(td::Refsend_broadcast(std::move(broadcast)); +void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) { + callback_->send_broadcast(std::move(broadcast), custom_overlays_only); } void ValidatorManagerImpl::start_up() { diff --git a/validator/manager.hpp b/validator/manager.hpp index d6d0307b..65636a94 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -473,7 +473,7 @@ class ValidatorManagerImpl : public ValidatorManager { void send_external_message(td::Ref message) override; void send_ihr_message(td::Ref message) override; void send_top_shard_block_description(td::Ref desc) override; - void send_block_broadcast(BlockBroadcast broadcast) override; + void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override; void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) override; void get_shard_client_state(bool from_db, td::Promise promise) override; diff --git a/validator/validator.h b/validator/validator.h index 9ede0711..17e55c70 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -134,7 +134,7 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void send_ihr_message(AccountIdPrefixFull dst, td::BufferSlice data) = 0; virtual void send_ext_message(AccountIdPrefixFull dst, td::BufferSlice data) = 0; virtual void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0; - virtual void send_broadcast(BlockBroadcast broadcast) = 0; + virtual void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only = false) = 0; virtual void download_block(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) = 0; virtual void download_zero_state(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,