1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Send telemetry broadcasts to fast sync overlays

This commit is contained in:
SpyCheese 2024-11-26 10:53:55 +03:00
parent e6aac0b143
commit d9aeab07db
8 changed files with 141 additions and 27 deletions

View file

@ -87,9 +87,42 @@ void FullNodeFastSyncOverlay::process_block_candidate_broadcast(PublicKeyHash sr
validator_set_hash, std::move(data));
}
void FullNodeFastSyncOverlay::process_telemetry_broadcast(
adnl::AdnlNodeIdShort src, const tl_object_ptr<ton_api::validator_telemetry> &telemetry) {
if (telemetry->adnl_id_ != src.bits256_value()) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": adnl_id mismatch";
return;
}
auto now = (td::int32)td::Clocks::system();
if (telemetry->timestamp_ < now - 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too old ("
<< now - telemetry->timestamp_ << "s ago)";
return;
}
if (telemetry->timestamp_ > now + 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too new ("
<< telemetry->timestamp_ - now << "s in the future)";
return;
}
VLOG(FULL_NODE_DEBUG) << "Got telemetry broadcast from " << src;
auto s = td::json_encode<std::string>(td::ToJson(*telemetry), false);
std::erase_if(s, [](char c) { return c == '\n' || c == '\r'; });
telemetry_file_ << s << "\n";
telemetry_file_.flush();
if (telemetry_file_.fail()) {
VLOG(FULL_NODE_WARNING) << "Failed to write telemetry to file";
}
}
void FullNodeFastSyncOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
if (collect_telemetry_ && src != local_id_.pubkey_hash()) {
auto R = fetch_tl_prefix<ton_api::validator_telemetry>(broadcast, true);
if (R.is_ok()) {
process_telemetry_broadcast(adnl::AdnlNodeIdShort{src}, R.ok());
}
}
return;
}
@ -143,6 +176,30 @@ void FullNodeFastSyncOverlay::send_block_candidate(BlockIdExt block_id, Catchain
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodeFastSyncOverlay::send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry) {
process_telemetry_broadcast(local_id_, telemetry);
auto data = serialize_tl_object(telemetry, true);
if (data.size() <= overlay::Overlays::max_simple_broadcast_size()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
} else {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
}
}
void FullNodeFastSyncOverlay::collect_validator_telemetry(std::string filename) {
if (collect_telemetry_) {
telemetry_file_.close();
}
collect_telemetry_ = true;
LOG(FULL_NODE_WARNING) << "Collecting validator telemetry to " << filename << " (local id: " << local_id_ << ")";
telemetry_file_.open(filename, std::ios_base::app);
if (!telemetry_file_.is_open()) {
LOG(WARNING) << "Cannot open file " << filename << " for validator telemetry";
}
}
void FullNodeFastSyncOverlay::start_up() {
auto X = create_hash_tl_object<ton_api::tonNode_fastSyncOverlayId>(zero_state_file_hash_, create_tl_shard_id(shard_));
td::BufferSlice b{32};
@ -261,14 +318,15 @@ void FullNodeFastSyncOverlay::get_stats_extra(td::Promise<std::string> promise)
promise.set_result(td::json_encode<std::string>(td::ToJson(*res), true));
}
td::actor::ActorId<FullNodeFastSyncOverlay> FullNodeFastSyncOverlays::choose_overlay(ShardIdFull shard) {
std::pair<td::actor::ActorId<FullNodeFastSyncOverlay>, adnl::AdnlNodeIdShort> FullNodeFastSyncOverlays::choose_overlay(
ShardIdFull shard) {
for (auto &p : id_to_overlays_) {
auto &overlays = p.second.overlays_;
ShardIdFull cur_shard = shard;
while (true) {
auto it = overlays.find(cur_shard);
if (it != overlays.end()) {
return it->second.get();
return {it->second.get(), p.first};
}
if (cur_shard.pfx_len() == 0) {
break;
@ -276,7 +334,20 @@ td::actor::ActorId<FullNodeFastSyncOverlay> FullNodeFastSyncOverlays::choose_ove
cur_shard = shard_parent(cur_shard);
}
}
return {};
return {td::actor::ActorId<FullNodeFastSyncOverlay>{}, adnl::AdnlNodeIdShort::zero()};
}
td::actor::ActorId<FullNodeFastSyncOverlay> FullNodeFastSyncOverlays::get_masterchain_overlay_for(
adnl::AdnlNodeIdShort adnl_id) {
auto it = id_to_overlays_.find(adnl_id);
if (it == id_to_overlays_.end()) {
return {};
}
auto it2 = it->second.overlays_.find(ShardIdFull{masterchainId});
if (it2 == it->second.overlays_.end()) {
return {};
}
return it2->second.get();
}
void FullNodeFastSyncOverlays::update_overlays(td::Ref<MasterchainState> state,
@ -291,7 +362,7 @@ void FullNodeFastSyncOverlays::update_overlays(td::Ref<MasterchainState> state,
monitoring_shards.insert(ShardIdFull{masterchainId});
std::set<ShardIdFull> all_shards;
all_shards.insert(ShardIdFull{masterchainId});
for (const auto& desc : state->get_shards()) {
for (const auto &desc : state->get_shards()) {
ShardIdFull shard = desc->shard();
td::uint32 monitor_min_split = state->monitor_min_split_depth(shard.workchain);
if (shard.pfx_len() > monitor_min_split) {