From d9aeab07db471d314b6f9a5ce7641daeb91fd4eb Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Tue, 26 Nov 2024 10:53:55 +0300 Subject: [PATCH] Send telemetry broadcasts to fast sync overlays --- validator-engine/validator-engine.cpp | 2 +- validator/full-node-fast-sync-overlays.cpp | 79 ++++++++++++++++++++-- validator/full-node-fast-sync-overlays.hpp | 13 +++- validator/full-node.cpp | 68 ++++++++++++++----- validator/full-node.hpp | 2 +- validator/impl/validator-set.hpp | 1 - validator/interfaces/validator-set.h | 1 - validator/manager.cpp | 2 +- 8 files changed, 141 insertions(+), 27 deletions(-) diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index 6b91d3eb..16aabde5 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -4985,7 +4985,7 @@ int main(int argc, char *argv[]) { }); p.add_option( '\0', "collect-validator-telemetry", - "store validator telemetry from private block overlay to a given file (json format)", + "store validator telemetry from fast sync overlay to a given file (json format)", [&](td::Slice s) { acts.push_back( [&x, s = s.str()]() { diff --git a/validator/full-node-fast-sync-overlays.cpp b/validator/full-node-fast-sync-overlays.cpp index a5789771..270c53f0 100644 --- a/validator/full-node-fast-sync-overlays.cpp +++ b/validator/full-node-fast-sync-overlays.cpp @@ -87,9 +87,42 @@ void FullNodeFastSyncOverlay::process_block_candidate_broadcast(PublicKeyHash sr validator_set_hash, std::move(data)); } +void FullNodeFastSyncOverlay::process_telemetry_broadcast( + adnl::AdnlNodeIdShort src, const tl_object_ptr &telemetry) { + if (telemetry->adnl_id_ != src.bits256_value()) { + VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": adnl_id mismatch"; + return; + } + auto now = (td::int32)td::Clocks::system(); + if (telemetry->timestamp_ < now - 60) { + VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too old (" + << now - telemetry->timestamp_ << "s ago)"; + return; + } + if (telemetry->timestamp_ > now + 60) { + VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too new (" + << telemetry->timestamp_ - now << "s in the future)"; + return; + } + VLOG(FULL_NODE_DEBUG) << "Got telemetry broadcast from " << src; + auto s = td::json_encode(td::ToJson(*telemetry), false); + std::erase_if(s, [](char c) { return c == '\n' || c == '\r'; }); + telemetry_file_ << s << "\n"; + telemetry_file_.flush(); + if (telemetry_file_.fail()) { + VLOG(FULL_NODE_WARNING) << "Failed to write telemetry to file"; + } +} + void FullNodeFastSyncOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { auto B = fetch_tl_object(std::move(broadcast), true); if (B.is_error()) { + if (collect_telemetry_ && src != local_id_.pubkey_hash()) { + auto R = fetch_tl_prefix(broadcast, true); + if (R.is_ok()) { + process_telemetry_broadcast(adnl::AdnlNodeIdShort{src}, R.ok()); + } + } return; } @@ -143,6 +176,30 @@ void FullNodeFastSyncOverlay::send_block_candidate(BlockIdExt block_id, Catchain local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok()); } +void FullNodeFastSyncOverlay::send_validator_telemetry(tl_object_ptr telemetry) { + process_telemetry_broadcast(local_id_, telemetry); + auto data = serialize_tl_object(telemetry, true); + if (data.size() <= overlay::Overlays::max_simple_broadcast_size()) { + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), 0, std::move(data)); + } else { + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), 0, std::move(data)); + } +} + +void FullNodeFastSyncOverlay::collect_validator_telemetry(std::string filename) { + if (collect_telemetry_) { + telemetry_file_.close(); + } + collect_telemetry_ = true; + LOG(FULL_NODE_WARNING) << "Collecting validator telemetry to " << filename << " (local id: " << local_id_ << ")"; + telemetry_file_.open(filename, std::ios_base::app); + if (!telemetry_file_.is_open()) { + LOG(WARNING) << "Cannot open file " << filename << " for validator telemetry"; + } +} + void FullNodeFastSyncOverlay::start_up() { auto X = create_hash_tl_object(zero_state_file_hash_, create_tl_shard_id(shard_)); td::BufferSlice b{32}; @@ -261,14 +318,15 @@ void FullNodeFastSyncOverlay::get_stats_extra(td::Promise promise) promise.set_result(td::json_encode(td::ToJson(*res), true)); } -td::actor::ActorId FullNodeFastSyncOverlays::choose_overlay(ShardIdFull shard) { +std::pair, adnl::AdnlNodeIdShort> FullNodeFastSyncOverlays::choose_overlay( + ShardIdFull shard) { for (auto &p : id_to_overlays_) { auto &overlays = p.second.overlays_; ShardIdFull cur_shard = shard; while (true) { auto it = overlays.find(cur_shard); if (it != overlays.end()) { - return it->second.get(); + return {it->second.get(), p.first}; } if (cur_shard.pfx_len() == 0) { break; @@ -276,7 +334,20 @@ td::actor::ActorId FullNodeFastSyncOverlays::choose_ove cur_shard = shard_parent(cur_shard); } } - return {}; + return {td::actor::ActorId{}, adnl::AdnlNodeIdShort::zero()}; +} + +td::actor::ActorId FullNodeFastSyncOverlays::get_masterchain_overlay_for( + adnl::AdnlNodeIdShort adnl_id) { + auto it = id_to_overlays_.find(adnl_id); + if (it == id_to_overlays_.end()) { + return {}; + } + auto it2 = it->second.overlays_.find(ShardIdFull{masterchainId}); + if (it2 == it->second.overlays_.end()) { + return {}; + } + return it2->second.get(); } void FullNodeFastSyncOverlays::update_overlays(td::Ref state, @@ -291,7 +362,7 @@ void FullNodeFastSyncOverlays::update_overlays(td::Ref state, monitoring_shards.insert(ShardIdFull{masterchainId}); std::set all_shards; all_shards.insert(ShardIdFull{masterchainId}); - for (const auto& desc : state->get_shards()) { + for (const auto &desc : state->get_shards()) { ShardIdFull shard = desc->shard(); td::uint32 monitor_min_split = state->monitor_min_split_depth(shard.workchain); if (shard.pfx_len() > monitor_min_split) { diff --git a/validator/full-node-fast-sync-overlays.hpp b/validator/full-node-fast-sync-overlays.hpp index b89f8802..05d83071 100644 --- a/validator/full-node-fast-sync-overlays.hpp +++ b/validator/full-node-fast-sync-overlays.hpp @@ -17,6 +17,7 @@ #pragma once #include "full-node.h" +#include namespace ton::validator::fullnode { @@ -32,6 +33,9 @@ class FullNodeFastSyncOverlay : public td::actor::Actor { void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed& query); void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast& query); + void process_telemetry_broadcast(adnl::AdnlNodeIdShort src, + const tl_object_ptr& telemetry); + template void process_broadcast(PublicKeyHash, T&) { VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast"; @@ -42,6 +46,9 @@ class FullNodeFastSyncOverlay : public td::actor::Actor { void send_broadcast(BlockBroadcast broadcast); void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data); + void send_validator_telemetry(tl_object_ptr telemetry); + + void collect_validator_telemetry(std::string filename); void start_up() override; void tear_down() override; @@ -96,11 +103,15 @@ class FullNodeFastSyncOverlay : public td::actor::Actor { void try_init(); void init(); void get_stats_extra(td::Promise promise); + + bool collect_telemetry_ = false; + std::ofstream telemetry_file_; }; class FullNodeFastSyncOverlays { public: - td::actor::ActorId choose_overlay(ShardIdFull shard); + std::pair, adnl::AdnlNodeIdShort> choose_overlay(ShardIdFull shard); + td::actor::ActorId get_masterchain_overlay_for(adnl::AdnlNodeIdShort adnl_id); void update_overlays(td::Ref state, std::set my_adnl_ids, std::set monitoring_shards, const FileHash& zero_state_file_hash, const td::actor::ActorId& keyring, const td::actor::ActorId& adnl, diff --git a/validator/full-node.cpp b/validator/full-node.cpp index ed3b5bcb..d927eef2 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -279,6 +279,7 @@ void FullNodeImpl::on_new_masterchain_block(td::Ref state, std fast_sync_overlays_.update_overlays(state, std::move(my_adnl_ids), std::move(monitoring_shards), zero_state_file_hash_, keyring_, adnl_, overlays_, validator_manager_, actor_id(this)); + update_validator_telemetry_collector(); } } @@ -338,7 +339,7 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_shard_block_info, block_id, cc_seqno, data.clone()); } - auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(ShardIdFull(masterchainId)); + auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(ShardIdFull(masterchainId)).first; if (!fast_sync_overlay.empty()) { td::actor::send_closure(fast_sync_overlay, &FullNodeFastSyncOverlay::send_shard_block_info, block_id, cc_seqno, data.clone()); @@ -358,7 +359,7 @@ void FullNodeImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_se td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_block_candidate, block_id, cc_seqno, validator_set_hash, data.clone()); } - auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(block_id.shard_full()); + auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(block_id.shard_full()).first; if (!fast_sync_overlay.empty()) { td::actor::send_closure(fast_sync_overlay, &FullNodeFastSyncOverlay::send_block_candidate, block_id, cc_seqno, validator_set_hash, data.clone()); @@ -383,7 +384,7 @@ void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, int mode) { td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_broadcast, broadcast.clone()); } - auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(broadcast.block_id.shard_full()); + auto fast_sync_overlay = fast_sync_overlays_.choose_overlay(broadcast.block_id.shard_full()).first; if (!fast_sync_overlay.empty()) { td::actor::send_closure(fast_sync_overlay, &FullNodeFastSyncOverlay::send_broadcast, broadcast.clone()); } @@ -595,12 +596,21 @@ void FullNodeImpl::new_key_block(BlockHandle handle) { } void FullNodeImpl::send_validator_telemetry(PublicKeyHash key, tl_object_ptr telemetry) { - auto it = private_block_overlays_.find(key); - if (it == private_block_overlays_.end()) { - VLOG(FULL_NODE_INFO) << "Cannot send validator telemetry for " << key << " : no private block overlay"; - return; + if (use_old_private_overlays_) { + auto it = private_block_overlays_.find(key); + if (it == private_block_overlays_.end()) { + VLOG(FULL_NODE_INFO) << "Cannot send validator telemetry for " << key << " : no private block overlay"; + return; + } + td::actor::send_closure(it->second, &FullNodePrivateBlockOverlay::send_validator_telemetry, std::move(telemetry)); + } else { + auto overlay = fast_sync_overlays_.get_masterchain_overlay_for(adnl::AdnlNodeIdShort{telemetry->adnl_id_}); + if (overlay.empty()) { + VLOG(FULL_NODE_INFO) << "Cannot send validator telemetry for adnl id " << key << " : no fast sync overlay"; + return; + } + td::actor::send_closure(overlay, &FullNodeFastSyncOverlay::send_validator_telemetry, std::move(telemetry)); } - td::actor::send_closure(it->second, &FullNodePrivateBlockOverlay::send_validator_telemetry, std::move(telemetry)); } void FullNodeImpl::process_block_broadcast(BlockBroadcast broadcast) { @@ -631,19 +641,43 @@ void FullNodeImpl::set_validator_telemetry_filename(std::string value) { } void FullNodeImpl::update_validator_telemetry_collector() { - if (validator_telemetry_filename_.empty() || private_block_overlays_.empty()) { - validator_telemetry_collector_key_ = PublicKeyHash::zero(); - return; - } - if (!private_block_overlays_.contains(validator_telemetry_collector_key_)) { - auto it = private_block_overlays_.begin(); - validator_telemetry_collector_key_ = it->first; - td::actor::send_closure(it->second, &FullNodePrivateBlockOverlay::collect_validator_telemetry, - validator_telemetry_filename_); + if (use_old_private_overlays_) { + if (validator_telemetry_filename_.empty() || private_block_overlays_.empty()) { + validator_telemetry_collector_key_ = PublicKeyHash::zero(); + return; + } + if (!private_block_overlays_.contains(validator_telemetry_collector_key_)) { + auto it = private_block_overlays_.begin(); + validator_telemetry_collector_key_ = it->first; + td::actor::send_closure(it->second, &FullNodePrivateBlockOverlay::collect_validator_telemetry, + validator_telemetry_filename_); + } + } else { + if (validator_telemetry_filename_.empty()) { + validator_telemetry_collector_key_ = PublicKeyHash::zero(); + return; + } + if (fast_sync_overlays_.get_masterchain_overlay_for(adnl::AdnlNodeIdShort{validator_telemetry_collector_key_}) + .empty()) { + auto [actor, adnl_id] = fast_sync_overlays_.choose_overlay(ShardIdFull{masterchainId}); + validator_telemetry_collector_key_ = adnl_id.pubkey_hash(); + if (!actor.empty()) { + td::actor::send_closure(actor, &FullNodeFastSyncOverlay::collect_validator_telemetry, + validator_telemetry_filename_); + } + } } } void FullNodeImpl::start_up() { + // TODO: enable fast sync overlays by other means (e.g. some config param) + // TODO: in the future - remove the old private overlay entirely + // This env var is for testing + auto fast_sync_env = getenv("TON_FAST_SYNC_OVERLAYS"); + if (fast_sync_env && !strcmp(fast_sync_env, "1")) { + use_old_private_overlays_ = false; + } + update_shard_actor(ShardIdFull{masterchainId}, true); if (local_id_.is_zero()) { if (adnl_id_.is_zero()) { diff --git a/validator/full-node.hpp b/validator/full-node.hpp index 4194407d..9e254d7d 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -154,7 +154,7 @@ class FullNodeImpl : public FullNode { // Old overlays - one private overlay for all validators // New overlays (fast sync overlays) - semiprivate overlay per shard (monitor_min_split depth) // for validators and authorized nodes - bool use_old_private_overlays_ = false; // TODO: set from config or something + bool use_old_private_overlays_ = true; std::map> private_block_overlays_; bool broadcast_block_candidates_in_public_overlay_ = false; FullNodeFastSyncOverlays fast_sync_overlays_; diff --git a/validator/impl/validator-set.hpp b/validator/impl/validator-set.hpp index 67fd9cd2..951ca4b7 100644 --- a/validator/impl/validator-set.hpp +++ b/validator/impl/validator-set.hpp @@ -51,7 +51,6 @@ class ValidatorSetQ : public ValidatorSet { td::Ref signatures) const override; td::Result check_approve_signatures(RootHash root_hash, FileHash file_hash, td::Ref signatures) const override; - const ValidatorDescr* find_validator(const NodeIdShort& id) const override; ValidatorSetQ* make_copy() const override; diff --git a/validator/interfaces/validator-set.h b/validator/interfaces/validator-set.h index d690937e..ad7fb9b5 100644 --- a/validator/interfaces/validator-set.h +++ b/validator/interfaces/validator-set.h @@ -36,7 +36,6 @@ class ValidatorSet : public td::CntObject { virtual td::uint32 get_validator_set_hash() const = 0; virtual ShardId get_validator_set_from() const = 0; virtual std::vector export_vector() const = 0; - virtual const ValidatorDescr* find_validator(const NodeIdShort& id) const = 0; virtual td::Result check_signatures(RootHash root_hash, FileHash file_hash, td::Ref signatures) const = 0; virtual td::Result check_approve_signatures(RootHash root_hash, FileHash file_hash, diff --git a/validator/manager.cpp b/validator/manager.cpp index c928090f..4c8e6ecc 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -2475,7 +2475,7 @@ td::actor::ActorOwn ValidatorManagerImpl::create_validator_group auto validator_id = get_validator(shard, validator_set); CHECK(!validator_id.is_zero()); - auto descr = validator_set->find_validator(validator_id.bits256_value()); + auto descr = validator_set->get_validator(validator_id.bits256_value()); CHECK(descr); auto adnl_id = adnl::AdnlNodeIdShort{ descr->addr.is_zero() ? ValidatorFullId{descr->key}.compute_short_id().bits256_value() : descr->addr};