1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-12 11:12:16 +00:00

Merge branch 'testnet' into accelerator

This commit is contained in:
SpyCheese 2024-11-26 09:28:07 +03:00
commit e6aac0b143
30 changed files with 426 additions and 11 deletions

View file

@ -279,6 +279,9 @@ class HardforkCreator : public td::actor::Actor {
void new_key_block(ton::validator::BlockHandle handle) override {
}
void send_validator_telemetry(ton::PublicKeyHash key,
ton::tl_object_ptr<ton::ton_api::validator_telemetry> telemetry) override {
}
};
td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::install_callback,

View file

@ -68,6 +68,9 @@ void OverlayManager::register_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdS
}
overlays_[local_id][overlay_id] = OverlayDescription{std::move(overlay), std::move(cert)};
if (!with_db_) {
return;
}
auto P =
td::PromiseCreator::lambda([id = overlays_[local_id][overlay_id].overlay.get()](td::Result<DbType::GetResult> R) {
R.ensure();
@ -417,13 +420,19 @@ OverlayManager::OverlayManager(std::string db_root, td::actor::ActorId<keyring::
}
void OverlayManager::start_up() {
std::shared_ptr<td::KeyValue> kv =
std::make_shared<td::RocksDb>(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
db_ = DbType{std::move(kv)};
if (!db_root_.empty()) {
with_db_ = true;
std::shared_ptr<td::KeyValue> kv =
std::make_shared<td::RocksDb>(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
db_ = DbType{std::move(kv)};
}
}
void OverlayManager::save_to_db(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
std::vector<OverlayNode> nodes) {
if (!with_db_) {
return;
}
std::vector<tl_object_ptr<ton_api::overlay_node>> nodes_vec;
for (auto &n : nodes) {
nodes_vec.push_back(n.tl());

View file

@ -131,6 +131,7 @@ class OverlayManager : public Overlays {
td::actor::ActorId<dht::Dht> dht_node_;
using DbType = td::KeyValueAsync<td::Bits256, td::BufferSlice>;
bool with_db_ = false;
DbType db_;
class AdnlCallback : public adnl::Adnl::Callback {

View file

@ -213,7 +213,7 @@ void OverlayImpl::add_peer(OverlayNode node) {
peer_list_.peers_.insert(id, OverlayPeer(std::move(node)));
del_some_peers();
auto X = peer_list_.peers_.get(id);
if (X != nullptr && peer_list_.neighbours_.size() < max_neighbours() &&
if (X != nullptr && !X->is_neighbour() && peer_list_.neighbours_.size() < max_neighbours() &&
!(X->get_node()->flags() & OverlayMemberFlags::DoNotReceiveBroadcasts) && X->get_id() != local_id_) {
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
@ -440,7 +440,7 @@ void OverlayImpl::update_neighbours(td::uint32 nodes_to_change) {
VLOG(OVERLAY_INFO) << this << ": adding new neighbour " << X->get_id();
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
} else {
} else if (X->is_alive()) {
CHECK(nodes_to_change > 0);
auto i = td::Random::fast(0, static_cast<td::uint32>(peer_list_.neighbours_.size()) - 1);
auto Y = peer_list_.peers_.get(peer_list_.neighbours_[i]);

View file

@ -347,7 +347,12 @@ void OverlayImpl::alarm() {
update_db_at_ = td::Timestamp::in(60.0);
}
update_neighbours(0);
if (update_neighbours_at_.is_in_past()) {
update_neighbours(2);
update_neighbours_at_ = td::Timestamp::in(td::Random::fast(30.0, 120.0));
} else {
update_neighbours(0);
}
alarm_timestamp() = td::Timestamp::in(1.0);
} else {
update_neighbours(0);

View file

@ -391,6 +391,7 @@ class OverlayImpl : public Overlay {
td::Timestamp next_dht_store_query_ = td::Timestamp::in(1.0);
td::Timestamp update_db_at_;
td::Timestamp update_throughput_at_;
td::Timestamp update_neighbours_at_;
td::Timestamp last_throughput_update_;
std::unique_ptr<Overlays::Callback> callback_;

View file

@ -472,4 +472,45 @@ Result<TotalMemStat> get_total_mem_stat() {
#endif
}
Result<uint32> get_cpu_cores() {
#if TD_LINUX
uint32 result = 0;
TRY_RESULT(fd, FileFd::open("/proc/cpuinfo", FileFd::Read));
SCOPE_EXIT {
fd.close();
};
std::string data;
char buf[10000];
while (true) {
TRY_RESULT(size, fd.read(MutableSlice{buf, sizeof(buf) - 1}));
if (size == 0) {
break;
}
buf[size] = '\0';
data += buf;
}
size_t i = 0;
while (i < data.size()) {
const char *line_begin = data.data() + i;
while (i < data.size() && data[i] != '\n') {
++i;
}
auto line_end = data.data() + i;
++i;
Slice line{line_begin, line_end};
size_t j = 0;
while (j < line.size() && line[j] != ' ' && line[j] != '\t' && line[j] != ':') {
++j;
}
Slice name = line.substr(0, j);
if (name == "processor") {
++result;
}
}
return result;
#else
return Status::Error("Not supported");
#endif
}
} // namespace td

View file

@ -70,4 +70,6 @@ struct TotalMemStat {
};
Result<TotalMemStat> get_total_mem_stat() TD_WARN_UNUSED_RESULT;
Result<uint32> get_cpu_cores() TD_WARN_UNUSED_RESULT;
} // namespace td

View file

@ -380,6 +380,9 @@ class TestNode : public td::actor::Actor {
void new_key_block(ton::validator::BlockHandle handle) override {
}
void send_validator_telemetry(ton::PublicKeyHash key,
ton::tl_object_ptr<ton::ton_api::validator_telemetry> telemetry) override {
}
};
td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::install_callback,

View file

@ -595,6 +595,10 @@ validator.group workchain:int shard:long catchain_seqno:int config_hash:int256 m
validator.groupEx workchain:int shard:long vertical_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;
validator.groupNew workchain:int shard:long vertical_seqno:int last_key_block_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;
validator.telemetry flags:# timestamp:double adnl_id:int256
node_version:string os_version:string node_started_at:int
ram_size:long cpu_cores:int node_threads:int = validator.Telemetry;
---functions---

Binary file not shown.

View file

@ -2138,6 +2138,10 @@ void ValidatorEngine::start_full_node() {
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::import_fast_sync_member_certificate,
x.first, x.second);
}
if (!validator_telemetry_filename_.empty()) {
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::set_validator_telemetry_filename,
validator_telemetry_filename_);
}
load_custom_overlays_config();
} else {
started_full_node();
@ -4979,6 +4983,15 @@ int main(int argc, char *argv[]) {
acts.push_back(
[&x]() { td::actor::send_closure(x, &ValidatorEngine::set_fast_state_serializer_enabled, true); });
});
p.add_option(
'\0', "collect-validator-telemetry",
"store validator telemetry from private block overlay to a given file (json format)",
[&](td::Slice s) {
acts.push_back(
[&x, s = s.str()]() {
td::actor::send_closure(x, &ValidatorEngine::set_validator_telemetry_filename, s);
});
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();

View file

@ -233,6 +233,7 @@ class ValidatorEngine : public td::actor::Actor {
ton::BlockSeqno truncate_seqno_{0};
std::string session_logs_file_;
bool fast_state_serializer_enabled_ = false;
std::string validator_telemetry_filename_;
bool not_all_shards_ = false;
std::vector<ton::ShardIdFull> add_shard_cmds_;
@ -323,6 +324,9 @@ class ValidatorEngine : public td::actor::Actor {
void set_fast_state_serializer_enabled(bool value) {
fast_state_serializer_enabled_ = value;
}
void set_validator_telemetry_filename(std::string value) {
validator_telemetry_filename_ = std::move(value);
}
void set_not_all_shards() {
not_all_shards_ = true;
}

View file

@ -57,6 +57,7 @@ set(VALIDATOR_HEADERS
import-db-slice.hpp
queue-size-counter.hpp
validator-telemetry.hpp
collation-manager.hpp
collator-node.hpp
@ -87,6 +88,7 @@ set(VALIDATOR_SOURCE
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp
validator-telemetry.cpp
downloaders/wait-block-data.cpp
downloaders/wait-block-state.cpp

View file

@ -19,6 +19,9 @@
#include "common/delay.h"
#include "common/checksum.h"
#include "full-node-serializer.hpp"
#include "auto/tl/ton_api_json.h"
#include "td/utils/JsonBuilder.h"
#include "tl/tl_json.h"
namespace ton::validator::fullnode {
@ -85,15 +88,52 @@ void FullNodePrivateBlockOverlay::process_block_candidate_broadcast(PublicKeyHas
validator_set_hash, std::move(data));
}
void FullNodePrivateBlockOverlay::process_telemetry_broadcast(
PublicKeyHash src, const tl_object_ptr<ton_api::validator_telemetry>& telemetry) {
if (telemetry->adnl_id_ != src.bits256_value()) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": adnl_id mismatch";
return;
}
auto now = (td::int32)td::Clocks::system();
if (telemetry->timestamp_ < now - 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too old ("
<< now - telemetry->timestamp_ << "s ago)";
return;
}
if (telemetry->timestamp_ > now + 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too new ("
<< telemetry->timestamp_ - now << "s in the future)";
return;
}
VLOG(FULL_NODE_DEBUG) << "Got telemetry broadcast from " << src;
auto s = td::json_encode<std::string>(td::ToJson(*telemetry), false);
std::erase_if(s, [](char c) {
return c == '\n' || c == '\r';
});
telemetry_file_ << s << "\n";
telemetry_file_.flush();
if (telemetry_file_.fail()) {
VLOG(FULL_NODE_WARNING) << "Failed to write telemetry to file";
}
}
void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
}
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
if (collect_telemetry_ && src != local_id_.pubkey_hash()) {
auto R = fetch_tl_prefix<ton_api::validator_telemetry>(broadcast, true);
if (R.is_ok()) {
process_telemetry_broadcast(src, R.ok());
}
}
return;
}
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto& obj) {
Self->process_broadcast(src, obj);
});
}
void FullNodePrivateBlockOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno,
@ -144,6 +184,30 @@ void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) {
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodePrivateBlockOverlay::send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry) {
process_telemetry_broadcast(local_id_.pubkey_hash(), telemetry);
auto data = serialize_tl_object(telemetry, true);
if (data.size() <= overlay::Overlays::max_simple_broadcast_size()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
} else {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
}
}
void FullNodePrivateBlockOverlay::collect_validator_telemetry(std::string filename) {
if (collect_telemetry_) {
telemetry_file_.close();
}
collect_telemetry_ = true;
LOG(FULL_NODE_WARNING) << "Collecting validator telemetry to " << filename << " (local id: " << local_id_ << ")";
telemetry_file_.open(filename, std::ios_base::app);
if (!telemetry_file_.is_open()) {
LOG(WARNING) << "Cannot open file " << filename << " for validator telemetry";
}
}
void FullNodePrivateBlockOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());

View file

@ -17,6 +17,7 @@
#pragma once
#include "full-node.h"
#include <fstream>
namespace ton::validator::fullnode {
@ -32,6 +33,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);
void process_telemetry_broadcast(PublicKeyHash src, const tl_object_ptr<ton_api::validator_telemetry>& telemetry);
template <class T>
void process_broadcast(PublicKeyHash, T &) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
@ -42,6 +45,9 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);
void send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry);
void collect_validator_telemetry(std::string filename);
void set_config(FullNodeConfig config) {
config_ = std::move(config);
@ -91,6 +97,9 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
void try_init();
void init();
bool collect_telemetry_ = false;
std::ofstream telemetry_file_;
};
class FullNodeCustomOverlay : public td::actor::Actor {

View file

@ -69,6 +69,7 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
}
local_keys_.erase(key);
private_block_overlays_.erase(key);
update_validator_telemetry_collector();
for (auto &p : custom_overlays_) {
update_custom_overlay(p.second);
}
@ -593,6 +594,15 @@ void FullNodeImpl::new_key_block(BlockHandle handle) {
}
}
void FullNodeImpl::send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) {
auto it = private_block_overlays_.find(key);
if (it == private_block_overlays_.end()) {
VLOG(FULL_NODE_INFO) << "Cannot send validator telemetry for " << key << " : no private block overlay";
return;
}
td::actor::send_closure(it->second, &FullNodePrivateBlockOverlay::send_validator_telemetry, std::move(telemetry));
}
void FullNodeImpl::process_block_broadcast(BlockBroadcast broadcast) {
send_block_broadcast_to_custom_overlays(broadcast);
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, std::move(broadcast),
@ -615,6 +625,24 @@ void FullNodeImpl::process_block_candidate_broadcast(BlockIdExt block_id, Catcha
std::move(data));
}
void FullNodeImpl::set_validator_telemetry_filename(std::string value) {
validator_telemetry_filename_ = std::move(value);
update_validator_telemetry_collector();
}
void FullNodeImpl::update_validator_telemetry_collector() {
if (validator_telemetry_filename_.empty() || private_block_overlays_.empty()) {
validator_telemetry_collector_key_ = PublicKeyHash::zero();
return;
}
if (!private_block_overlays_.contains(validator_telemetry_collector_key_)) {
auto it = private_block_overlays_.begin();
validator_telemetry_collector_key_ = it->first;
td::actor::send_closure(it->second, &FullNodePrivateBlockOverlay::collect_validator_telemetry,
validator_telemetry_filename_);
}
}
void FullNodeImpl::start_up() {
update_shard_actor(ShardIdFull{masterchainId}, true);
if (local_id_.is_zero()) {
@ -695,6 +723,9 @@ void FullNodeImpl::start_up() {
void new_key_block(BlockHandle handle) override {
td::actor::send_closure(id_, &FullNodeImpl::new_key_block, std::move(handle));
}
void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) override {
td::actor::send_closure(id_, &FullNodeImpl::send_validator_telemetry, key, std::move(telemetry));
}
explicit Callback(td::actor::ActorId<FullNodeImpl> id) : id_(id) {
}
@ -713,6 +744,7 @@ void FullNodeImpl::update_private_overlays() {
}
private_block_overlays_.clear();
update_validator_telemetry_collector();
if (local_keys_.empty()) {
return;
}
@ -734,6 +766,7 @@ void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) {
private_block_overlays_[key] = td::actor::create_actor<FullNodePrivateBlockOverlay>(
"BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_, keyring_,
adnl_, rldp_, rldp2_, overlays_, validator_manager_, actor_id(this));
update_validator_telemetry_collector();
}
}

View file

@ -94,6 +94,8 @@ class FullNode : public td::actor::Actor {
virtual void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::BufferSlice data) = 0;
virtual void set_validator_telemetry_filename(std::string value) = 0;
virtual void import_fast_sync_member_certificate(adnl::AdnlNodeIdShort local_id,
overlay::OverlayMemberCertificate cert) = 0;

View file

@ -88,11 +88,14 @@ class FullNodeImpl : public FullNode {
void got_key_block_config(td::Ref<ConfigHolder> config);
void new_key_block(BlockHandle handle);
void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry);
void process_block_broadcast(BlockBroadcast broadcast) override;
void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) override;
void set_validator_telemetry_filename(std::string value) override;
void import_fast_sync_member_certificate(adnl::AdnlNodeIdShort local_id,
overlay::OverlayMemberCertificate cert) override {
fast_sync_overlays_.add_member_certificate(local_id, std::move(cert));
@ -170,6 +173,11 @@ class FullNodeImpl : public FullNode {
void send_block_broadcast_to_custom_overlays(const BlockBroadcast& broadcast);
void send_block_candidate_broadcast_to_custom_overlays(const BlockIdExt& block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, const td::BufferSlice& data);
std::string validator_telemetry_filename_;
PublicKeyHash validator_telemetry_collector_key_ = PublicKeyHash::zero();
void update_validator_telemetry_collector();
};
} // namespace fullnode

View file

@ -28,14 +28,14 @@ namespace ton {
namespace validator {
using td::Ref;
const ValidatorDescr *ValidatorSetQ::find_validator(const NodeIdShort &id) const {
const ValidatorDescr *ValidatorSetQ::get_validator(const NodeIdShort &id) const {
auto it =
std::lower_bound(ids_map_.begin(), ids_map_.end(), id, [](const auto &p, const auto &x) { return p.first < x; });
return it < ids_map_.end() && it->first == id ? &ids_[it->second] : nullptr;
}
bool ValidatorSetQ::is_validator(NodeIdShort id) const {
return find_validator(id);
return get_validator(id);
}
td::Result<ValidatorWeight> ValidatorSetQ::check_signatures(RootHash root_hash, FileHash file_hash,
@ -53,7 +53,7 @@ td::Result<ValidatorWeight> ValidatorSetQ::check_signatures(RootHash root_hash,
}
nodes.insert(sig.node);
auto vdescr = find_validator(sig.node);
auto vdescr = get_validator(sig.node);
if (!vdescr) {
return td::Status::Error(ErrorCode::protoviolation, "unknown node to sign");
}
@ -84,7 +84,7 @@ td::Result<ValidatorWeight> ValidatorSetQ::check_approve_signatures(RootHash roo
}
nodes.insert(sig.node);
auto vdescr = find_validator(sig.node);
auto vdescr = get_validator(sig.node);
if (!vdescr) {
return td::Status::Error(ErrorCode::protoviolation, "unknown node to sign");
}

View file

@ -32,6 +32,7 @@ namespace validator {
class ValidatorSetQ : public ValidatorSet {
public:
const ValidatorDescr* get_validator(const NodeIdShort& id) const override;
bool is_validator(NodeIdShort id) const override;
CatchainSeqno get_catchain_seqno() const override {
return cc_seqno_;

View file

@ -157,6 +157,7 @@ class ValidatorManager : public ValidatorManagerInterface {
virtual void send_ihr_message(td::Ref<IhrMessage> message) = 0;
virtual void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) = 0;
virtual void send_block_broadcast(BlockBroadcast broadcast, int mode) = 0;
virtual void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) = 0;
virtual void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) = 0;

View file

@ -30,6 +30,7 @@ namespace validator {
class ValidatorSet : public td::CntObject {
public:
virtual ~ValidatorSet() = default;
virtual const ValidatorDescr* get_validator(const NodeIdShort& id) const = 0;
virtual bool is_validator(NodeIdShort id) const = 0;
virtual CatchainSeqno get_catchain_seqno() const = 0;
virtual td::uint32 get_validator_set_hash() const = 0;

View file

@ -267,6 +267,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override;
void send_block_broadcast(BlockBroadcast broadcast, int mode) override {
}
void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) override {
}
void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override {

View file

@ -337,6 +337,8 @@ class ValidatorManagerImpl : public ValidatorManager {
}
void send_block_broadcast(BlockBroadcast broadcast, int mode) override {
}
void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) override {
}
void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override {

View file

@ -1775,6 +1775,11 @@ void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast, int mo
callback_->send_broadcast(std::move(broadcast), mode);
}
void ValidatorManagerImpl::send_validator_telemetry(PublicKeyHash key,
tl_object_ptr<ton_api::validator_telemetry> telemetry) {
callback_->send_validator_telemetry(key, std::move(telemetry));
}
void ValidatorManagerImpl::send_get_out_msg_queue_proof_request(
ShardIdFull dst_shard, std::vector<BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
@ -1900,6 +1905,7 @@ void ValidatorManagerImpl::started(ValidatorManagerInitResult R) {
if (opts_->nonfinal_ls_queries_enabled()) {
candidates_buffer_ = td::actor::create_actor<CandidatesBuffer>("candidates-buffer", actor_id(this));
}
init_validator_telemetry();
auto Q = td::PromiseCreator::lambda(
[SelfId = actor_id(this)](td::Result<std::vector<td::Ref<PersistentStateDescription>>> R) {
@ -2091,6 +2097,7 @@ void ValidatorManagerImpl::new_masterchain_block() {
td::actor::send_closure(serializer_, &AsyncStateSerializer::update_last_known_key_block_ts,
last_key_block_handle_->unix_time());
}
init_validator_telemetry();
}
update_shard_overlays();
@ -3677,6 +3684,41 @@ void ValidatorManagerImpl::CheckedExtMsgCounter::before_query() {
}
}
void ValidatorManagerImpl::init_validator_telemetry() {
if (last_masterchain_state_.is_null()) {
return;
}
td::Ref<ValidatorSet> validator_set = last_masterchain_state_->get_total_validator_set(0);
if (validator_set.is_null()) {
validator_telemetry_.clear();
return;
}
std::set<PublicKeyHash> processed;
for (auto& key : temp_keys_) {
if (const ValidatorDescr* desc = validator_set->get_validator(key.bits256_value())) {
processed.insert(key);
adnl::AdnlNodeIdShort adnl_id;
if (desc->addr.is_zero()) {
adnl_id = adnl::AdnlNodeIdShort{ValidatorFullId{desc->key}.compute_short_id()};
} else {
adnl_id = adnl::AdnlNodeIdShort{desc->addr};
}
auto& telemetry = validator_telemetry_[key];
if (telemetry.empty()) {
telemetry = td::actor::create_actor<ValidatorTelemetry>(
"telemetry", key, adnl_id, opts_->zero_block_id().file_hash, actor_id(this));
}
}
}
for (auto it = validator_telemetry_.begin(); it != validator_telemetry_.end();) {
if (processed.contains(it->first)) {
++it;
} else {
it = validator_telemetry_.erase(it);
}
}
}
} // namespace validator
} // namespace ton

View file

@ -34,6 +34,7 @@
#include "rldp/rldp.h"
#include "token-manager.h"
#include "queue-size-counter.hpp"
#include "validator-telemetry.hpp"
#include "impl/candidates-buffer.hpp"
#include "collator-node.hpp"
@ -359,6 +360,7 @@ class ValidatorManagerImpl : public ValidatorManager {
}
void add_temp_key(PublicKeyHash key, td::Promise<td::Unit> promise) override {
temp_keys_.insert(key);
init_validator_telemetry();
promise.set_value(td::Unit());
}
void del_permanent_key(PublicKeyHash key, td::Promise<td::Unit> promise) override {
@ -367,6 +369,7 @@ class ValidatorManagerImpl : public ValidatorManager {
}
void del_temp_key(PublicKeyHash key, td::Promise<td::Unit> promise) override {
temp_keys_.erase(key);
init_validator_telemetry();
promise.set_value(td::Unit());
}
@ -524,6 +527,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void send_ihr_message(td::Ref<IhrMessage> message) override;
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override;
void send_block_broadcast(BlockBroadcast broadcast, int mode) override;
void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) override;
void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override;
@ -784,6 +788,10 @@ class ValidatorManagerImpl : public ValidatorManager {
void record_validate_query_stats(BlockIdExt block_id, double work_time, double cpu_work_time) override;
RecordedBlockStats &new_block_stats_record(BlockIdExt block_id);
std::map<PublicKeyHash, td::actor::ActorOwn<ValidatorTelemetry>> validator_telemetry_;
void init_validator_telemetry();
struct Collator {
td::actor::ActorOwn<CollatorNode> actor;
std::set<ShardIdFull> shards;

View file

@ -0,0 +1,87 @@
/*
This file is part of TON Blockchain source code.
TON Blockchain is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
TON Blockchain is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with TON Blockchain. If not, see <http://www.gnu.org/licenses/>.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
You must obey the GNU General Public License in all respects for all
of the code used other than OpenSSL. If you modify file(s) with this
exception, you may extend this exception to your version of the file(s),
but you are not obligated to do so. If you do not wish to do so, delete this
exception statement from your version. If you delete this exception statement
from all source files in the program, then also delete it here.
*/
#include "validator-telemetry.hpp"
#include "git.h"
#include "td/utils/Random.h"
#include "td/utils/port/uname.h"
#include "interfaces/validator-manager.h"
namespace ton::validator {
void ValidatorTelemetry::start_up() {
node_version_ = PSTRING() << "validator-engine, Commit: " << GitMetadata::CommitSHA1()
<< ", Date: " << GitMetadata::CommitDate();
os_version_ = td::get_operating_system_version().str();
auto r_total_mem_stat = td::get_total_mem_stat();
if (r_total_mem_stat.is_error()) {
LOG(WARNING) << "Cannot get RAM size: " << r_total_mem_stat.move_as_error();
} else {
ram_size_ = r_total_mem_stat.ok().total_ram;
}
auto r_cpu_cores = td::get_cpu_cores();
if (r_cpu_cores.is_error()) {
LOG(WARNING) << "Cannot get CPU info: " << r_cpu_cores.move_as_error();
} else {
cpu_cores_ = r_cpu_cores.move_as_ok();
}
LOG(DEBUG) << "Initializing validator telemetry, key = " << key_ << ", adnl_id = " << local_id_;
alarm_timestamp().relax(send_telemetry_at_ = td::Timestamp::in(td::Random::fast(30.0, 60.0)));
}
void ValidatorTelemetry::alarm() {
if (send_telemetry_at_.is_in_past()) {
send_telemetry_at_ = td::Timestamp::never();
send_telemetry();
}
alarm_timestamp().relax(send_telemetry_at_);
}
void ValidatorTelemetry::send_telemetry() {
send_telemetry_at_ = td::Timestamp::in(PERIOD);
auto telemetry = create_tl_object<ton_api::validator_telemetry>();
telemetry->flags_ = 0;
telemetry->timestamp_ = td::Clocks::system();
telemetry->adnl_id_ = local_id_.bits256_value();
telemetry->node_version_ = node_version_;
telemetry->os_version_ = os_version_;
telemetry->node_started_at_ = adnl::Adnl::adnl_start_time();
telemetry->ram_size_ = ram_size_;
telemetry->cpu_cores_ = cpu_cores_;
telemetry->node_threads_ = (td::int32)td::actor::SchedulerContext::get()
->scheduler_group()
->schedulers.at(td::actor::SchedulerContext::get()->get_scheduler_id().value())
.cpu_threads_count;
LOG(DEBUG) << "Sending validator telemetry for adnl id " << local_id_;
td::actor::send_closure(manager_, &ValidatorManager::send_validator_telemetry, key_, std::move(telemetry));
}
} // namespace ton::validator

View file

@ -0,0 +1,66 @@
/*
This file is part of TON Blockchain source code.
TON Blockchain is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
TON Blockchain is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with TON Blockchain. If not, see <http://www.gnu.org/licenses/>.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
You must obey the GNU General Public License in all respects for all
of the code used other than OpenSSL. If you modify file(s) with this
exception, you may extend this exception to your version of the file(s),
but you are not obligated to do so. If you do not wish to do so, delete this
exception statement from your version. If you delete this exception statement
from all source files in the program, then also delete it here.
*/
#pragma once
#include "overlay.h"
#include "td/actor/actor.h"
#include "adnl/adnl.h"
#include "interfaces/shard.h"
namespace ton::validator {
class ValidatorManager;
class ValidatorTelemetry : public td::actor::Actor {
public:
ValidatorTelemetry(PublicKeyHash key, adnl::AdnlNodeIdShort local_id, td::Bits256 zero_state_file_hash,
td::actor::ActorId<ValidatorManager> manager)
: key_(key)
, local_id_(local_id)
, zero_state_file_hash_(zero_state_file_hash)
, manager_(std::move(manager)) {
}
void start_up() override;
void alarm() override;
private:
PublicKeyHash key_;
adnl::AdnlNodeIdShort local_id_;
td::Bits256 zero_state_file_hash_;
td::actor::ActorId<ValidatorManager> manager_;
std::string node_version_;
std::string os_version_;
td::uint32 cpu_cores_ = 0;
td::uint64 ram_size_ = 0;
td::Timestamp send_telemetry_at_ = td::Timestamp::never();
void send_telemetry();
static constexpr double PERIOD = 600.0;
static constexpr td::uint32 MAX_SIZE = 8192;
};
} // namespace ton::validator

View file

@ -217,6 +217,7 @@ class ValidatorManagerInterface : public td::actor::Actor {
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) = 0;
virtual void new_key_block(BlockHandle handle) = 0;
virtual void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) = 0;
};
virtual ~ValidatorManagerInterface() = default;