1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-12 11:12:16 +00:00

Enhance overlay stats output (#386)

* Expand overlay stats

* Add scope and peer broadcast errors to stats

* Add json output format
Co-authored-by: EmelyanenkoK <emelyanenko.kirill@gmail.com>
This commit is contained in:
Tsenilov Oleg 2022-06-07 16:24:26 +03:00 committed by GitHub
parent fea912e05c
commit e30049930a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 396 additions and 38 deletions

View file

@ -39,6 +39,9 @@ class AdnlNetworkConnectionUdp : public AdnlNetworkConnection {
void start_up() override { void start_up() override {
callback_->on_change_state(true); callback_->on_change_state(true);
} }
void get_ip_str(td::Promise<td::string> promise) override {
promise.set_value(PSTRING() << addr_.get_ip_str().str() << ":" << addr_.get_port());
}
AdnlNetworkConnectionUdp(td::actor::ActorId<AdnlNetworkManager> network_manager, td::uint32 ip, td::uint16 port, AdnlNetworkConnectionUdp(td::actor::ActorId<AdnlNetworkManager> network_manager, td::uint32 ip, td::uint16 port,
std::unique_ptr<AdnlNetworkConnection::Callback> callback); std::unique_ptr<AdnlNetworkConnection::Callback> callback);
@ -88,6 +91,9 @@ class AdnlNetworkConnectionTunnel : public AdnlNetworkConnection {
pub_key_hash_ = pub_key_.compute_short_id(); pub_key_hash_ = pub_key_.compute_short_id();
//ready_.store(true, std::memory_order_release); //ready_.store(true, std::memory_order_release);
} }
void get_ip_str(td::Promise<td::string> promise) override {
promise.set_value("tunnel");
}
AdnlNetworkConnectionTunnel(td::actor::ActorId<AdnlNetworkManager> network_manager, td::actor::ActorId<Adnl> adnl, AdnlNetworkConnectionTunnel(td::actor::ActorId<AdnlNetworkManager> network_manager, td::actor::ActorId<Adnl> adnl,
adnl::AdnlNodeIdShort adnl_id, PublicKey pubkey, adnl::AdnlNodeIdShort adnl_id, PublicKey pubkey,

View file

@ -50,6 +50,8 @@ class AdnlNetworkConnection : public td::actor::Actor {
virtual void send(AdnlNodeIdShort src, AdnlNodeIdShort dst, td::uint32 priority, td::BufferSlice message) = 0; virtual void send(AdnlNodeIdShort src, AdnlNodeIdShort dst, td::uint32 priority, td::BufferSlice message) = 0;
virtual bool is_alive() const = 0; virtual bool is_alive() const = 0;
virtual bool is_active() const = 0; virtual bool is_active() const = 0;
virtual void get_ip_str(td::Promise<td::string> promise) = 0;
virtual ~AdnlNetworkConnection() = default; virtual ~AdnlNetworkConnection() = default;
}; };

View file

@ -376,6 +376,15 @@ void AdnlPeerTableImpl::create_tunnel(AdnlNodeIdShort dst, td::uint32 size,
td::Promise<std::pair<td::actor::ActorOwn<AdnlTunnel>, AdnlAddress>> promise) { td::Promise<std::pair<td::actor::ActorOwn<AdnlTunnel>, AdnlAddress>> promise) {
} }
void AdnlPeerTableImpl::get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_id, td::Promise<td::string> promise) {
auto it = peers_.find(p_id);
if (it == peers_.end()) {
promise.set_value("undefined");
return;
}
td::actor::send_closure(it->second, &AdnlPeer::get_conn_ip_str, l_id, std::move(promise));
}
} // namespace adnl } // namespace adnl
} // namespace ton } // namespace ton

View file

@ -110,6 +110,7 @@ class AdnlPeerTable : public Adnl {
virtual void deliver_query(AdnlNodeIdShort src, AdnlNodeIdShort dst, td::BufferSlice data, virtual void deliver_query(AdnlNodeIdShort src, AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) = 0; td::Promise<td::BufferSlice> promise) = 0;
virtual void decrypt_message(AdnlNodeIdShort dst, td::BufferSlice data, td::Promise<td::BufferSlice> promise) = 0; virtual void decrypt_message(AdnlNodeIdShort dst, td::BufferSlice data, td::Promise<td::BufferSlice> promise) = 0;
virtual void get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_id, td::Promise<td::string> promise) = 0;
}; };
} // namespace adnl } // namespace adnl

View file

@ -102,6 +102,7 @@ class AdnlPeerTableImpl : public AdnlPeerTable {
void create_tunnel(AdnlNodeIdShort dst, td::uint32 size, void create_tunnel(AdnlNodeIdShort dst, td::uint32 size,
td::Promise<std::pair<td::actor::ActorOwn<AdnlTunnel>, AdnlAddress>> promise) override; td::Promise<std::pair<td::actor::ActorOwn<AdnlTunnel>, AdnlAddress>> promise) override;
void get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_id, td::Promise<td::string> promise) override;
struct PrintId {}; struct PrintId {};
PrintId print_id() const { PrintId print_id() const {

View file

@ -725,6 +725,28 @@ void AdnlPeerPairImpl::update_addr_list(AdnlAddressList addr_list) {
(priority ? priority_addr_list_ : addr_list_) = addr_list; (priority ? priority_addr_list_ : addr_list_) = addr_list;
} }
void AdnlPeerPairImpl::get_conn_ip_str(td::Promise<td::string> promise) {
if (conns_.size() == 0 && priority_conns_.size() == 0) {
promise.set_value("undefined");
return;
}
for (auto &conn : priority_conns_) {
if (conn.ready()) {
td::actor::send_closure(conn.conn, &AdnlNetworkConnection::get_ip_str, std::move(promise));
return;
}
}
for (auto &conn : conns_) {
if (conn.ready()) {
td::actor::send_closure(conn.conn, &AdnlNetworkConnection::get_ip_str, std::move(promise));
return;
}
}
promise.set_value("undefined");
}
void AdnlPeerImpl::update_id(AdnlNodeIdFull id) { void AdnlPeerImpl::update_id(AdnlNodeIdFull id) {
CHECK(id.compute_short_id() == peer_id_short_); CHECK(id.compute_short_id() == peer_id_short_);
if (!peer_id_.empty()) { if (!peer_id_.empty()) {
@ -841,6 +863,16 @@ void AdnlPeerImpl::update_dht_node(td::actor::ActorId<dht::Dht> dht_node) {
} }
} }
void AdnlPeerImpl::get_conn_ip_str(AdnlNodeIdShort l_id, td::Promise<td::string> promise) {
auto it = peer_pairs_.find(l_id);
if (it == peer_pairs_.end()) {
promise.set_value("undefined");
return;
}
td::actor::send_closure(it->second, &AdnlPeerPair::get_conn_ip_str, std::move(promise));
}
void AdnlPeerImpl::update_addr_list(AdnlNodeIdShort local_id, td::uint32 local_mode, void AdnlPeerImpl::update_addr_list(AdnlNodeIdShort local_id, td::uint32 local_mode,
td::actor::ActorId<AdnlLocalId> local_actor, AdnlAddressList addr_list) { td::actor::ActorId<AdnlLocalId> local_actor, AdnlAddressList addr_list) {
auto it = peer_pairs_.find(local_id); auto it = peer_pairs_.find(local_id);

View file

@ -58,6 +58,7 @@ class AdnlPeerPair : public td::actor::Actor {
virtual void update_dht_node(td::actor::ActorId<dht::Dht> dht_node) = 0; virtual void update_dht_node(td::actor::ActorId<dht::Dht> dht_node) = 0;
virtual void update_peer_id(AdnlNodeIdFull id) = 0; virtual void update_peer_id(AdnlNodeIdFull id) = 0;
virtual void update_addr_list(AdnlAddressList addr_list) = 0; virtual void update_addr_list(AdnlAddressList addr_list) = 0;
virtual void get_conn_ip_str(td::Promise<td::string> promise) = 0;
static td::actor::ActorOwn<AdnlPeerPair> create(td::actor::ActorId<AdnlNetworkManager> network_manager, static td::actor::ActorOwn<AdnlPeerPair> create(td::actor::ActorId<AdnlNetworkManager> network_manager,
td::actor::ActorId<AdnlPeerTable> peer_table, td::uint32 local_mode, td::actor::ActorId<AdnlPeerTable> peer_table, td::uint32 local_mode,
@ -98,6 +99,7 @@ class AdnlPeer : public td::actor::Actor {
virtual void update_addr_list(AdnlNodeIdShort local_id, td::uint32 local_mode, virtual void update_addr_list(AdnlNodeIdShort local_id, td::uint32 local_mode,
td::actor::ActorId<AdnlLocalId> local_actor, AdnlAddressList addr_list) = 0; td::actor::ActorId<AdnlLocalId> local_actor, AdnlAddressList addr_list) = 0;
virtual void update_dht_node(td::actor::ActorId<dht::Dht> dht_node) = 0; virtual void update_dht_node(td::actor::ActorId<dht::Dht> dht_node) = 0;
virtual void get_conn_ip_str(AdnlNodeIdShort l_id, td::Promise<td::string> promise) = 0;
}; };
} // namespace adnl } // namespace adnl

View file

@ -88,6 +88,8 @@ class AdnlPeerPairImpl : public AdnlPeerPair {
void update_addr_list(AdnlAddressList addr_list) override; void update_addr_list(AdnlAddressList addr_list) override;
void update_peer_id(AdnlNodeIdFull id) override; void update_peer_id(AdnlNodeIdFull id) override;
void get_conn_ip_str(td::Promise<td::string> promise) override;
void got_data_from_db(td::Result<AdnlDbItem> R); void got_data_from_db(td::Result<AdnlDbItem> R);
void got_data_from_static_nodes(td::Result<AdnlNode> R); void got_data_from_static_nodes(td::Result<AdnlNode> R);
void got_data_from_dht(td::Result<AdnlNode> R); void got_data_from_dht(td::Result<AdnlNode> R);
@ -265,6 +267,7 @@ class AdnlPeerImpl : public AdnlPeer {
void update_addr_list(AdnlNodeIdShort local_id, td::uint32 local_mode, td::actor::ActorId<AdnlLocalId> local_actor, void update_addr_list(AdnlNodeIdShort local_id, td::uint32 local_mode, td::actor::ActorId<AdnlLocalId> local_actor,
AdnlAddressList addr_list) override; AdnlAddressList addr_list) override;
void update_dht_node(td::actor::ActorId<dht::Dht> dht_node) override; void update_dht_node(td::actor::ActorId<dht::Dht> dht_node) override;
void get_conn_ip_str(AdnlNodeIdShort l_id, td::Promise<td::string> promise) override;
//void check_signature(td::BufferSlice data, td::BufferSlice signature, td::Promise<td::Unit> promise) override; //void check_signature(td::BufferSlice data, td::BufferSlice signature, td::Promise<td::Unit> promise) override;
AdnlPeerImpl(td::actor::ActorId<AdnlNetworkManager> network_manager, td::actor::ActorId<AdnlPeerTable> peer_table, AdnlPeerImpl(td::actor::ActorId<AdnlNetworkManager> network_manager, td::actor::ActorId<AdnlPeerTable> peer_table,

View file

@ -56,6 +56,7 @@ class AdnlSenderInterface : public td::actor::Actor {
virtual void send_query_ex(AdnlNodeIdShort src, AdnlNodeIdShort dst, std::string name, virtual void send_query_ex(AdnlNodeIdShort src, AdnlNodeIdShort dst, std::string name,
td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice data, td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice data,
td::uint64 max_answer_size) = 0; td::uint64 max_answer_size) = 0;
virtual void get_conn_ip_str(AdnlNodeIdShort l_id, AdnlNodeIdShort p_id, td::Promise<td::string> promise) = 0;
}; };
class AdnlTunnel : public td::actor::Actor {}; class AdnlTunnel : public td::actor::Actor {};

View file

@ -80,6 +80,7 @@ td::Status BroadcastSimple::distribute() {
void BroadcastSimple::broadcast_checked(td::Result<td::Unit> R) { void BroadcastSimple::broadcast_checked(td::Result<td::Unit> R) {
if (R.is_error()) { if (R.is_error()) {
td::actor::send_closure(actor_id(overlay_), &OverlayImpl::update_peer_err_ctr, src_peer_id_, false);
return; return;
} }
is_valid_ = true; is_valid_ = true;
@ -114,7 +115,7 @@ td::Status BroadcastSimple::run() {
return run_continue(); return run_continue();
} }
td::Status BroadcastSimple::create(OverlayImpl *overlay, tl_object_ptr<ton_api::overlay_broadcast> broadcast) { td::Status BroadcastSimple::create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, tl_object_ptr<ton_api::overlay_broadcast> broadcast) {
auto src = PublicKey{broadcast->src_}; auto src = PublicKey{broadcast->src_};
auto data_hash = sha256_bits256(broadcast->data_.as_slice()); auto data_hash = sha256_bits256(broadcast->data_.as_slice());
auto broadcast_hash = compute_broadcast_id(src, data_hash, broadcast->flags_); auto broadcast_hash = compute_broadcast_id(src, data_hash, broadcast->flags_);
@ -125,7 +126,7 @@ td::Status BroadcastSimple::create(OverlayImpl *overlay, tl_object_ptr<ton_api::
auto B = std::make_unique<BroadcastSimple>(broadcast_hash, src, std::move(cert), broadcast->flags_, auto B = std::make_unique<BroadcastSimple>(broadcast_hash, src, std::move(cert), broadcast->flags_,
std::move(broadcast->data_), broadcast->date_, std::move(broadcast->data_), broadcast->date_,
std::move(broadcast->signature_), false, overlay); std::move(broadcast->signature_), false, overlay, src_peer_id);
TRY_STATUS(B->run()); TRY_STATUS(B->run());
overlay->register_simple_broadcast(std::move(B)); overlay->register_simple_broadcast(std::move(B));
return td::Status::OK(); return td::Status::OK();
@ -139,7 +140,7 @@ td::Status BroadcastSimple::create_new(td::actor::ActorId<OverlayImpl> overlay,
auto date = static_cast<td::uint32>(td::Clocks::system()); auto date = static_cast<td::uint32>(td::Clocks::system());
auto B = std::make_unique<BroadcastSimple>(broadcast_hash, PublicKey{}, nullptr, flags, std::move(data), date, auto B = std::make_unique<BroadcastSimple>(broadcast_hash, PublicKey{}, nullptr, flags, std::move(data), date,
td::BufferSlice{}, false, nullptr); td::BufferSlice{}, false, nullptr, adnl::AdnlNodeIdShort::zero());
auto to_sign = B->to_sign(); auto to_sign = B->to_sign();
auto P = td::PromiseCreator::lambda( auto P = td::PromiseCreator::lambda(

View file

@ -49,6 +49,8 @@ class BroadcastSimple : public td::ListNode {
OverlayImpl *overlay_; OverlayImpl *overlay_;
adnl::AdnlNodeIdShort src_peer_id_ = adnl::AdnlNodeIdShort::zero();
td::Status check_time(); td::Status check_time();
td::Status check_duplicate(); td::Status check_duplicate();
td::Status check_source(); td::Status check_source();
@ -61,7 +63,7 @@ class BroadcastSimple : public td::ListNode {
public: public:
BroadcastSimple(Overlay::BroadcastHash broadcast_hash, PublicKey source, std::shared_ptr<Certificate> cert, BroadcastSimple(Overlay::BroadcastHash broadcast_hash, PublicKey source, std::shared_ptr<Certificate> cert,
td::uint32 flags, td::BufferSlice data, td::uint32 date, td::BufferSlice signature, bool is_valid, td::uint32 flags, td::BufferSlice data, td::uint32 date, td::BufferSlice signature, bool is_valid,
OverlayImpl *overlay) OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id)
: broadcast_hash_(broadcast_hash) : broadcast_hash_(broadcast_hash)
, source_(std::move(source)) , source_(std::move(source))
, cert_(std::move(cert)) , cert_(std::move(cert))
@ -70,7 +72,8 @@ class BroadcastSimple : public td::ListNode {
, date_(date) , date_(date)
, signature_(std::move(signature)) , signature_(std::move(signature))
, is_valid_(is_valid) , is_valid_(is_valid)
, overlay_(overlay) { , overlay_(overlay)
, src_peer_id_(src_peer_id) {
} }
Overlay::BroadcastHash get_hash() const { Overlay::BroadcastHash get_hash() const {
@ -98,7 +101,7 @@ class BroadcastSimple : public td::ListNode {
void update_overlay(OverlayImpl *overlay); void update_overlay(OverlayImpl *overlay);
void broadcast_checked(td::Result<td::Unit> R); void broadcast_checked(td::Result<td::Unit> R);
static td::Status create(OverlayImpl *overlay, tl_object_ptr<ton_api::overlay_broadcast> broadcast); static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, tl_object_ptr<ton_api::overlay_broadcast> broadcast);
static td::Status create_new(td::actor::ActorId<OverlayImpl> overlay, td::actor::ActorId<keyring::Keyring> keyring, static td::Status create_new(td::actor::ActorId<OverlayImpl> overlay, td::actor::ActorId<keyring::Keyring> keyring,
PublicKeyHash local_id, td::BufferSlice data, td::uint32 flags); PublicKeyHash local_id, td::BufferSlice data, td::uint32 flags);

View file

@ -88,6 +88,7 @@ td::Status OverlayFecBroadcastPart::run_checks() {
void BroadcastFec::broadcast_checked(td::Result<td::Unit> R) { void BroadcastFec::broadcast_checked(td::Result<td::Unit> R) {
if (R.is_error()) { if (R.is_error()) {
td::actor::send_closure(actor_id(overlay_), &OverlayImpl::update_peer_err_ctr, src_peer_id_, true);
return; return;
} }
overlay_->deliver_broadcast(get_source().compute_short_id(), data_.clone()); overlay_->deliver_broadcast(get_source().compute_short_id(), data_.clone());
@ -155,6 +156,7 @@ td::Status OverlayFecBroadcastPart::apply() {
if (!bcast_->finalized()) { if (!bcast_->finalized()) {
bcast_->set_overlay(overlay_); bcast_->set_overlay(overlay_);
bcast_->set_src_peer_id(src_peer_id_);
TRY_STATUS(bcast_->add_part(seqno_, data_.clone(), export_serialized_short(), export_serialized())); TRY_STATUS(bcast_->add_part(seqno_, data_.clone(), export_serialized_short(), export_serialized()));
auto R = bcast_->finish(); auto R = bcast_->finish();
if (R.is_error()) { if (R.is_error()) {
@ -211,7 +213,7 @@ td::BufferSlice OverlayFecBroadcastPart::to_sign() {
return serialize_tl_object(obj, true); return serialize_tl_object(obj, true);
} }
td::Status OverlayFecBroadcastPart::create(OverlayImpl *overlay, td::Status OverlayFecBroadcastPart::create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id,
tl_object_ptr<ton_api::overlay_broadcastFec> broadcast) { tl_object_ptr<ton_api::overlay_broadcastFec> broadcast) {
TRY_STATUS(overlay->check_date(broadcast->date_)); TRY_STATUS(overlay->check_date(broadcast->date_));
auto source = PublicKey{broadcast->src_}; auto source = PublicKey{broadcast->src_};
@ -244,12 +246,13 @@ td::Status OverlayFecBroadcastPart::create(OverlayImpl *overlay,
std::move(broadcast->signature_), std::move(broadcast->signature_),
false, false,
overlay->get_fec_broadcast(broadcast_hash), overlay->get_fec_broadcast(broadcast_hash),
overlay}; overlay,
src_peer_id};
TRY_STATUS(B.run()); TRY_STATUS(B.run());
return td::Status::OK(); return td::Status::OK();
} }
td::Status OverlayFecBroadcastPart::create(OverlayImpl *overlay, td::Status OverlayFecBroadcastPart::create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id,
tl_object_ptr<ton_api::overlay_broadcastFecShort> broadcast) { tl_object_ptr<ton_api::overlay_broadcastFecShort> broadcast) {
auto bcast = overlay->get_fec_broadcast(broadcast->broadcast_hash_); auto bcast = overlay->get_fec_broadcast(broadcast->broadcast_hash_);
if (!bcast) { if (!bcast) {
@ -285,7 +288,8 @@ td::Status OverlayFecBroadcastPart::create(OverlayImpl *overlay,
std::move(broadcast->signature_), std::move(broadcast->signature_),
true, true,
bcast, bcast,
overlay}; overlay,
src_peer_id};
TRY_STATUS(B.run()); TRY_STATUS(B.run());
return td::Status::OK(); return td::Status::OK();
} }
@ -300,7 +304,7 @@ td::Status OverlayFecBroadcastPart::create_new(OverlayImpl *overlay, td::actor::
auto B = std::make_unique<OverlayFecBroadcastPart>( auto B = std::make_unique<OverlayFecBroadcastPart>(
broadcast_hash, part_hash, PublicKey{}, overlay->get_certificate(local_id), data_hash, size, flags, broadcast_hash, part_hash, PublicKey{}, overlay->get_certificate(local_id), data_hash, size, flags,
part_data_hash, std::move(part), seqno, std::move(fec_type), date, td::BufferSlice{}, false, nullptr, overlay); part_data_hash, std::move(part), seqno, std::move(fec_type), date, td::BufferSlice{}, false, nullptr, overlay, adnl::AdnlNodeIdShort::zero());
auto to_sign = B->to_sign(); auto to_sign = B->to_sign();
auto P = td::PromiseCreator::lambda( auto P = td::PromiseCreator::lambda(

View file

@ -194,6 +194,9 @@ class BroadcastFec : public td::ListNode {
void set_overlay(OverlayImpl *overlay) { void set_overlay(OverlayImpl *overlay) {
overlay_ = overlay; overlay_ = overlay;
} }
void set_src_peer_id(adnl::AdnlNodeIdShort src_peer_id) {
src_peer_id_ = src_peer_id;
}
td::Status distribute_part(td::uint32 seqno); td::Status distribute_part(td::uint32 seqno);
@ -220,6 +223,7 @@ class BroadcastFec : public td::ListNode {
std::map<td::uint32, std::pair<td::BufferSlice, td::BufferSlice>> parts_; std::map<td::uint32, std::pair<td::BufferSlice, td::BufferSlice>> parts_;
OverlayImpl *overlay_; OverlayImpl *overlay_;
adnl::AdnlNodeIdShort src_peer_id_ = adnl::AdnlNodeIdShort::zero();
td::BufferSlice data_; td::BufferSlice data_;
}; };
@ -245,6 +249,7 @@ class OverlayFecBroadcastPart : public td::ListNode {
BroadcastFec *bcast_; BroadcastFec *bcast_;
OverlayImpl *overlay_; OverlayImpl *overlay_;
adnl::AdnlNodeIdShort src_peer_id_ = adnl::AdnlNodeIdShort::zero();
td::Status check_time(); td::Status check_time();
td::Status check_duplicate(); td::Status check_duplicate();
@ -260,7 +265,7 @@ class OverlayFecBroadcastPart : public td::ListNode {
std::shared_ptr<Certificate> cert, Overlay::BroadcastDataHash data_hash, td::uint32 data_size, std::shared_ptr<Certificate> cert, Overlay::BroadcastDataHash data_hash, td::uint32 data_size,
td::uint32 flags, Overlay::BroadcastDataHash part_data_hash, td::BufferSlice data, td::uint32 flags, Overlay::BroadcastDataHash part_data_hash, td::BufferSlice data,
td::uint32 seqno, fec::FecType fec_type, td::uint32 date, td::BufferSlice signature, td::uint32 seqno, fec::FecType fec_type, td::uint32 date, td::BufferSlice signature,
bool is_short, BroadcastFec *bcast, OverlayImpl *overlay) bool is_short, BroadcastFec *bcast, OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id)
: broadcast_hash_(broadcast_hash) : broadcast_hash_(broadcast_hash)
, part_hash_(part_hash) , part_hash_(part_hash)
, source_(std::move(source)) , source_(std::move(source))
@ -276,7 +281,8 @@ class OverlayFecBroadcastPart : public td::ListNode {
, signature_(std::move(signature)) , signature_(std::move(signature))
, is_short_(is_short) , is_short_(is_short)
, bcast_(bcast) , bcast_(bcast)
, overlay_(overlay) { , overlay_(overlay)
, src_peer_id_(src_peer_id) {
} }
td::uint32 data_size() const { td::uint32 data_size() const {
@ -310,8 +316,8 @@ class OverlayFecBroadcastPart : public td::ListNode {
return td::Status::OK(); return td::Status::OK();
} }
static td::Status create(OverlayImpl *overlay, tl_object_ptr<ton_api::overlay_broadcastFec> broadcast); static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, tl_object_ptr<ton_api::overlay_broadcastFec> broadcast);
static td::Status create(OverlayImpl *overlay, tl_object_ptr<ton_api::overlay_broadcastFecShort> broadcast); static td::Status create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, tl_object_ptr<ton_api::overlay_broadcastFecShort> broadcast);
static td::Status create_new(OverlayImpl *overlay, td::actor::ActorId<OverlayImpl> overlay_actor_id, static td::Status create_new(OverlayImpl *overlay, td::actor::ActorId<OverlayImpl> overlay_actor_id,
PublicKeyHash local_id, Overlay::BroadcastDataHash data_hash, td::uint32 size, PublicKeyHash local_id, Overlay::BroadcastDataHash data_hash, td::uint32 size,
td::uint32 flags, td::BufferSlice part, td::uint32 seqno, fec::FecType fec_type, td::uint32 flags, td::BufferSlice part, td::uint32 seqno, fec::FecType fec_type,

View file

@ -90,12 +90,12 @@ void OverlayManager::delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdSho
} }
void OverlayManager::create_public_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, void OverlayManager::create_public_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules) { std::unique_ptr<Callback> callback, OverlayPrivacyRules rules, td::string scope) {
CHECK(!dht_node_.empty()); CHECK(!dht_node_.empty());
auto id = overlay_id.compute_short_id(); auto id = overlay_id.compute_short_id();
register_overlay(local_id, id, register_overlay(local_id, id,
Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id), Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id),
std::move(callback), std::move(rules))); std::move(callback), std::move(rules), scope));
} }
void OverlayManager::create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, void OverlayManager::create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
@ -128,6 +128,7 @@ void OverlayManager::receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeId
return; return;
} }
td::actor::send_closure(it2->second, &Overlay::update_throughput_in_ctr, src, (td::uint32)data.size(), false);
td::actor::send_closure(it2->second, &Overlay::receive_message, src, std::move(data)); td::actor::send_closure(it2->second, &Overlay::receive_message, src, std::move(data));
} }
@ -152,12 +153,12 @@ void OverlayManager::receive_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdSh
} }
auto it2 = it->second.find(OverlayIdShort{M->overlay_}); auto it2 = it->second.find(OverlayIdShort{M->overlay_});
if (it2 == it->second.end()) { if (it2 == it->second.end()) {
VLOG(OVERLAY_NOTICE) << this << ": query to localid not in overlay " << M->overlay_ << "@" << dst << " from " VLOG(OVERLAY_NOTICE) << this << ": query to localid not in overlay " << M->overlay_ << "@" << dst << " from " << src;
<< src;
promise.set_error(td::Status::Error(ErrorCode::protoviolation, PSTRING() << "bad overlay_id " << M->overlay_)); promise.set_error(td::Status::Error(ErrorCode::protoviolation, PSTRING() << "bad overlay_id " << M->overlay_));
return; return;
} }
td::actor::send_closure(it2->second, &Overlay::update_throughput_in_ctr, src, (td::uint32)data.size(), true);
td::actor::send_closure(it2->second, &Overlay::receive_query, src, std::move(data), std::move(promise)); td::actor::send_closure(it2->second, &Overlay::receive_query, src, std::move(data), std::move(promise));
} }
@ -166,6 +167,15 @@ void OverlayManager::send_query_via(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdS
td::BufferSlice query, td::uint64 max_answer_size, td::BufferSlice query, td::uint64 max_answer_size,
td::actor::ActorId<adnl::AdnlSenderInterface> via) { td::actor::ActorId<adnl::AdnlSenderInterface> via) {
CHECK(query.size() <= adnl::Adnl::huge_packet_max_size()); CHECK(query.size() <= adnl::Adnl::huge_packet_max_size());
auto it = overlays_.find(src);
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
td::actor::send_closure(it2->second, &Overlay::update_throughput_out_ctr, dst, (td::uint32)query.size(), true);
}
}
td::actor::send_closure( td::actor::send_closure(
via, &adnl::AdnlSenderInterface::send_query_ex, src, dst, std::move(name), std::move(promise), timeout, via, &adnl::AdnlSenderInterface::send_query_ex, src, dst, std::move(name), std::move(promise), timeout,
create_serialize_tl_object_suffix<ton_api::overlay_query>(query.as_slice(), overlay_id.tl()), max_answer_size); create_serialize_tl_object_suffix<ton_api::overlay_query>(query.as_slice(), overlay_id.tl()), max_answer_size);
@ -174,6 +184,15 @@ void OverlayManager::send_query_via(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdS
void OverlayManager::send_message_via(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id, void OverlayManager::send_message_via(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id,
td::BufferSlice object, td::actor::ActorId<adnl::AdnlSenderInterface> via) { td::BufferSlice object, td::actor::ActorId<adnl::AdnlSenderInterface> via) {
CHECK(object.size() <= adnl::Adnl::huge_packet_max_size()); CHECK(object.size() <= adnl::Adnl::huge_packet_max_size());
auto it = overlays_.find(src);
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
td::actor::send_closure(it2->second, &Overlay::update_throughput_out_ctr, dst, (td::uint32)object.size(), false);
}
}
td::actor::send_closure( td::actor::send_closure(
via, &adnl::AdnlSenderInterface::send_message, src, dst, via, &adnl::AdnlSenderInterface::send_message, src, dst,
create_serialize_tl_object_suffix<ton_api::overlay_message>(object.as_slice(), overlay_id.tl())); create_serialize_tl_object_suffix<ton_api::overlay_message>(object.as_slice(), overlay_id.tl()));

View file

@ -51,7 +51,7 @@ class OverlayManager : public Overlays {
void update_dht_node(td::actor::ActorId<dht::Dht> dht) override; void update_dht_node(td::actor::ActorId<dht::Dht> dht) override;
void create_public_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, void create_public_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules) override; std::unique_ptr<Callback> callback, OverlayPrivacyRules rules, td::string scope) override;
void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback, std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback,
OverlayPrivacyRules rules) override; OverlayPrivacyRules rules) override;

View file

@ -37,10 +37,10 @@ td::actor::ActorOwn<Overlay> Overlay::create(td::actor::ActorId<keyring::Keyring
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<OverlayManager> manager,
td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id, td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::unique_ptr<Overlays::Callback> callback, OverlayIdFull overlay_id, std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules) { OverlayPrivacyRules rules, td::string scope) {
auto R = td::actor::create_actor<OverlayImpl>("overlay", keyring, adnl, manager, dht_node, local_id, auto R = td::actor::create_actor<OverlayImpl>("overlay", keyring, adnl, manager, dht_node, local_id,
std::move(overlay_id), true, std::vector<adnl::AdnlNodeIdShort>(), std::move(overlay_id), true, std::vector<adnl::AdnlNodeIdShort>(),
std::move(callback), std::move(rules)); std::move(callback), std::move(rules), scope);
return td::actor::ActorOwn<Overlay>(std::move(R)); return td::actor::ActorOwn<Overlay>(std::move(R));
} }
@ -60,7 +60,7 @@ OverlayImpl::OverlayImpl(td::actor::ActorId<keyring::Keyring> keyring, td::actor
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node, td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node,
adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, bool pub, adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, bool pub,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Overlays::Callback> callback, std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules) OverlayPrivacyRules rules, td::string scope)
: keyring_(keyring) : keyring_(keyring)
, adnl_(adnl) , adnl_(adnl)
, manager_(manager) , manager_(manager)
@ -69,7 +69,8 @@ OverlayImpl::OverlayImpl(td::actor::ActorId<keyring::Keyring> keyring, td::actor
, id_full_(std::move(overlay_id)) , id_full_(std::move(overlay_id))
, callback_(std::move(callback)) , callback_(std::move(callback))
, public_(pub) , public_(pub)
, rules_(std::move(rules)) { , rules_(std::move(rules))
, scope_(scope) {
overlay_id_ = id_full_.compute_short_id(); overlay_id_ = id_full_.compute_short_id();
VLOG(OVERLAY_INFO) << this << ": creating " << (public_ ? "public" : "private"); VLOG(OVERLAY_INFO) << this << ": creating " << (public_ ? "public" : "private");
@ -161,17 +162,17 @@ void OverlayImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data,
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from, td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_broadcast> bcast) { tl_object_ptr<ton_api::overlay_broadcast> bcast) {
return BroadcastSimple::create(this, std::move(bcast)); return BroadcastSimple::create(this, message_from, std::move(bcast));
} }
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from, td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_broadcastFec> b) { tl_object_ptr<ton_api::overlay_broadcastFec> b) {
return OverlayFecBroadcastPart::create(this, std::move(b)); return OverlayFecBroadcastPart::create(this, message_from, std::move(b));
} }
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from, td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_broadcastFecShort> b) { tl_object_ptr<ton_api::overlay_broadcastFecShort> b) {
return OverlayFecBroadcastPart::create(this, std::move(b)); return OverlayFecBroadcastPart::create(this, message_from, std::move(b));
} }
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from, td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
@ -236,6 +237,35 @@ void OverlayImpl::receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice dat
void OverlayImpl::alarm() { void OverlayImpl::alarm() {
bcast_gc(); bcast_gc();
if(update_throughput_at_.is_in_past()) {
double t_elapsed = td::Time::now() - last_throughput_update_.at();
peers_.iterate([&](const adnl::AdnlNodeIdShort &key, OverlayPeer &peer) {
peer.throughput_out_bytes = static_cast<td::uint32>(peer.throughput_out_bytes_ctr / t_elapsed);
peer.throughput_in_bytes = static_cast<td::uint32>(peer.throughput_in_bytes_ctr / t_elapsed);
peer.throughput_out_packets = static_cast<td::uint32>(peer.throughput_out_packets_ctr / t_elapsed);
peer.throughput_in_packets = static_cast<td::uint32>(peer.throughput_in_packets_ctr / t_elapsed);
peer.throughput_out_bytes_ctr = 0;
peer.throughput_in_bytes_ctr = 0;
peer.throughput_out_packets_ctr = 0;
peer.throughput_in_packets_ctr = 0;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), peer_id = key](td::Result<td::string> result) {
result.ensure();
td::actor::send_closure(SelfId, &Overlay::update_peer_ip_str, peer_id, result.move_as_ok());
});
td::actor::send_closure(adnl_, &adnl::AdnlSenderInterface::get_conn_ip_str, local_id_, key, std::move(P));
});
update_throughput_at_ = td::Timestamp::in(50.0);
last_throughput_update_ = td::Timestamp::now();
}
if (public_) { if (public_) {
if (peers_.size() > 0) { if (peers_.size() > 0) {
auto P = get_random_peer(); auto P = get_random_peer();
@ -541,6 +571,17 @@ void OverlayImpl::check_broadcast(PublicKeyHash src, td::BufferSlice data, td::P
callback_->check_broadcast(src, overlay_id_, std::move(data), std::move(promise)); callback_->check_broadcast(src, overlay_id_, std::move(data), std::move(promise));
} }
void OverlayImpl::update_peer_err_ctr(adnl::AdnlNodeIdShort peer_id, bool is_fec) {
auto src_peer = peers_.get(peer_id);
if(src_peer) {
if(is_fec) {
src_peer->fec_broadcast_errors++;
} else {
src_peer->broadcast_errors++;
}
}
}
void OverlayImpl::broadcast_checked(Overlay::BroadcastHash hash, td::Result<td::Unit> R) { void OverlayImpl::broadcast_checked(Overlay::BroadcastHash hash, td::Result<td::Unit> R) {
{ {
auto it = broadcasts_.find(hash); auto it = broadcasts_.find(hash);
@ -561,7 +602,26 @@ void OverlayImpl::get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_
res->adnl_id_ = local_id_.bits256_value(); res->adnl_id_ = local_id_.bits256_value();
res->overlay_id_ = overlay_id_.bits256_value(); res->overlay_id_ = overlay_id_.bits256_value();
res->overlay_id_full_ = id_full_.pubkey().tl(); res->overlay_id_full_ = id_full_.pubkey().tl();
peers_.iterate([&](const adnl::AdnlNodeIdShort &key, const OverlayPeer &peer) { res->nodes_.push_back(key.tl()); }); res->scope_ = scope_;
peers_.iterate([&](const adnl::AdnlNodeIdShort &key, const OverlayPeer &peer) {
auto node_obj = create_tl_object<ton_api::engine_validator_overlayStatsNode>();
node_obj->adnl_id_ = key.bits256_value();
node_obj->t_out_bytes_ = peer.throughput_out_bytes;
node_obj->t_in_bytes_ = peer.throughput_in_bytes;
node_obj->t_out_pckts_ = peer.throughput_out_packets;
node_obj->t_in_pckts_ = peer.throughput_in_packets;
node_obj->ip_addr_ = peer.ip_addr_str;
node_obj->last_in_query_ = static_cast<td::uint32>(peer.last_in_query_at.at_unix());
node_obj->last_out_query_ = static_cast<td::uint32>(peer.last_out_query_at.at_unix());
node_obj->bdcst_errors_ = peer.broadcast_errors;
node_obj->fec_bdcst_errors_ = peer.fec_broadcast_errors;
res->nodes_.push_back(std::move(node_obj));
});
res->stats_.push_back( res->stats_.push_back(
create_tl_object<ton_api::engine_validator_oneStat>("neighbours_cnt", PSTRING() << neighbours_.size())); create_tl_object<ton_api::engine_validator_oneStat>("neighbours_cnt", PSTRING() << neighbours_.size()));

View file

@ -42,7 +42,7 @@ class Overlay : public td::actor::Actor {
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<OverlayManager> manager,
td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id, td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::unique_ptr<Overlays::Callback> callback, OverlayIdFull overlay_id, std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules); OverlayPrivacyRules rules, td::string scope);
static td::actor::ActorOwn<Overlay> create(td::actor::ActorId<keyring::Keyring> keyring, static td::actor::ActorOwn<Overlay> create(td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<OverlayManager> manager,
@ -64,6 +64,9 @@ class Overlay : public td::actor::Actor {
virtual void set_privacy_rules(OverlayPrivacyRules rules) = 0; virtual void set_privacy_rules(OverlayPrivacyRules rules) = 0;
virtual void receive_nodes_from_db(tl_object_ptr<ton_api::overlay_nodes> nodes) = 0; virtual void receive_nodes_from_db(tl_object_ptr<ton_api::overlay_nodes> nodes) = 0;
virtual void get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_overlayStats>> promise) = 0; virtual void get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_overlayStats>> promise) = 0;
virtual void update_throughput_out_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) = 0;
virtual void update_throughput_in_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) = 0;
virtual void update_peer_ip_str(adnl::AdnlNodeIdShort peer_id, td::string ip_str) = 0;
//virtual void receive_broadcast(td::BufferSlice data) = 0; //virtual void receive_broadcast(td::BufferSlice data) = 0;
//virtual void subscribe(std::unique_ptr<Overlays::Callback> callback) = 0; //virtual void subscribe(std::unique_ptr<Overlays::Callback> callback) = 0;
}; };

View file

@ -79,6 +79,26 @@ class OverlayPeer {
td::int32 get_version() const { td::int32 get_version() const {
return node_.version(); return node_.version();
} }
td::uint32 throughput_out_bytes = 0;
td::uint32 throughput_in_bytes = 0;
td::uint32 throughput_out_packets = 0;
td::uint32 throughput_in_packets = 0;
td::uint32 throughput_out_bytes_ctr = 0;
td::uint32 throughput_in_bytes_ctr = 0;
td::uint32 throughput_out_packets_ctr = 0;
td::uint32 throughput_in_packets_ctr = 0;
td::uint32 broadcast_errors = 0;
td::uint32 fec_broadcast_errors = 0;
td::Timestamp last_in_query_at = td::Timestamp::now();
td::Timestamp last_out_query_at = td::Timestamp::now();
td::string ip_addr_str = "undefined";
private: private:
OverlayNode node_; OverlayNode node_;
@ -93,7 +113,7 @@ class OverlayImpl : public Overlay {
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node, td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node,
adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, bool pub, adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, bool pub,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Overlays::Callback> callback, std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules); OverlayPrivacyRules rules, td::string scope = "{ \"type\": \"undefined\" }");
void update_dht_node(td::actor::ActorId<dht::Dht> dht) override { void update_dht_node(td::actor::ActorId<dht::Dht> dht) override {
dht_node_ = dht; dht_node_ = dht;
} }
@ -109,6 +129,9 @@ class OverlayImpl : public Overlay {
void alarm() override; void alarm() override;
void start_up() override { void start_up() override {
update_throughput_at_ = td::Timestamp::in(50.0);
last_throughput_update_ = td::Timestamp::now();
if (public_) { if (public_) {
update_db_at_ = td::Timestamp::in(60.0); update_db_at_ = td::Timestamp::in(60.0);
} }
@ -150,6 +173,8 @@ class OverlayImpl : public Overlay {
void broadcast_checked(Overlay::BroadcastHash hash, td::Result<td::Unit> R); void broadcast_checked(Overlay::BroadcastHash hash, td::Result<td::Unit> R);
void check_broadcast(PublicKeyHash src, td::BufferSlice data, td::Promise<td::Unit> promise); void check_broadcast(PublicKeyHash src, td::BufferSlice data, td::Promise<td::Unit> promise);
void update_peer_err_ctr(adnl::AdnlNodeIdShort peer_id, bool is_fec);
BroadcastFec *get_fec_broadcast(BroadcastHash hash); BroadcastFec *get_fec_broadcast(BroadcastHash hash);
void register_fec_broadcast(std::unique_ptr<BroadcastFec> bcast); void register_fec_broadcast(std::unique_ptr<BroadcastFec> bcast);
void register_simple_broadcast(std::unique_ptr<BroadcastSimple> bcast); void register_simple_broadcast(std::unique_ptr<BroadcastSimple> bcast);
@ -191,6 +216,39 @@ class OverlayImpl : public Overlay {
td::Result<Encryptor *> get_encryptor(PublicKey source); td::Result<Encryptor *> get_encryptor(PublicKey source);
void get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_overlayStats>> promise) override; void get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_overlayStats>> promise) override;
void update_throughput_out_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) override {
auto out_peer = peers_.get(peer_id);
if(out_peer) {
out_peer->throughput_out_bytes_ctr += msg_size;
out_peer->throughput_out_packets_ctr++;
if(is_query)
{
out_peer->last_out_query_at = td::Timestamp::now();
}
}
}
void update_throughput_in_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) override {
auto in_peer = peers_.get(peer_id);
if(in_peer) {
in_peer->throughput_in_bytes_ctr += msg_size;
in_peer->throughput_in_packets_ctr++;
if(is_query)
{
in_peer->last_in_query_at = td::Timestamp::now();
}
}
}
void update_peer_ip_str(adnl::AdnlNodeIdShort peer_id, td::string ip_str) override {
auto fpeer = peers_.get(peer_id);
if(fpeer) {
fpeer->ip_addr_str = ip_str;
}
}
private: private:
template <class T> template <class T>
@ -236,6 +294,8 @@ class OverlayImpl : public Overlay {
td::DecTree<adnl::AdnlNodeIdShort, OverlayPeer> peers_; td::DecTree<adnl::AdnlNodeIdShort, OverlayPeer> peers_;
td::Timestamp next_dht_query_ = td::Timestamp::in(1.0); td::Timestamp next_dht_query_ = td::Timestamp::in(1.0);
td::Timestamp update_db_at_; td::Timestamp update_db_at_;
td::Timestamp update_throughput_at_;
td::Timestamp last_throughput_update_;
std::unique_ptr<Overlays::Callback> callback_; std::unique_ptr<Overlays::Callback> callback_;
@ -291,6 +351,7 @@ class OverlayImpl : public Overlay {
bool public_; bool public_;
bool semi_public_ = false; bool semi_public_ = false;
OverlayPrivacyRules rules_; OverlayPrivacyRules rules_;
td::string scope_;
std::map<PublicKeyHash, std::shared_ptr<Certificate>> certs_; std::map<PublicKeyHash, std::shared_ptr<Certificate>> certs_;
class CachedEncryptor : public td::ListNode { class CachedEncryptor : public td::ListNode {

View file

@ -193,7 +193,7 @@ class Overlays : public td::actor::Actor {
virtual void update_dht_node(td::actor::ActorId<dht::Dht> dht) = 0; virtual void update_dht_node(td::actor::ActorId<dht::Dht> dht) = 0;
virtual void create_public_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, virtual void create_public_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules) = 0; std::unique_ptr<Callback> callback, OverlayPrivacyRules rules, td::string scope) = 0;
virtual void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, virtual void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback, std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback,
OverlayPrivacyRules rules) = 0; OverlayPrivacyRules rules) = 0;

View file

@ -99,6 +99,7 @@ class RldpIn : public RldpImpl {
void in_transfer_completed(TransferId transfer_id); void in_transfer_completed(TransferId transfer_id);
void add_id(adnl::AdnlNodeIdShort local_id) override; void add_id(adnl::AdnlNodeIdShort local_id) override;
void get_conn_ip_str(adnl::AdnlNodeIdShort l_id, adnl::AdnlNodeIdShort p_id, td::Promise<td::string> promise) override;
RldpIn(td::actor::ActorId<adnl::AdnlPeerTable> adnl) : adnl_(adnl) { RldpIn(td::actor::ActorId<adnl::AdnlPeerTable> adnl) : adnl_(adnl) {
} }

View file

@ -259,6 +259,10 @@ void RldpIn::add_id(adnl::AdnlNodeIdShort local_id) {
local_ids_.insert(local_id); local_ids_.insert(local_id);
} }
void RldpIn::get_conn_ip_str(adnl::AdnlNodeIdShort l_id, adnl::AdnlNodeIdShort p_id, td::Promise<td::string> promise) {
td::actor::send_closure(adnl_, &adnl::AdnlPeerTable::get_conn_ip_str, l_id, p_id, std::move(promise));
}
std::unique_ptr<adnl::Adnl::Callback> RldpIn::make_adnl_callback() { std::unique_ptr<adnl::Adnl::Callback> RldpIn::make_adnl_callback() {
class Callback : public adnl::Adnl::Callback { class Callback : public adnl::Adnl::Callback {
private: private:

View file

@ -90,6 +90,8 @@ class RldpIn : public RldpImpl {
void add_id(adnl::AdnlNodeIdShort local_id) override; void add_id(adnl::AdnlNodeIdShort local_id) override;
void get_conn_ip_str(adnl::AdnlNodeIdShort l_id, adnl::AdnlNodeIdShort p_id, td::Promise<td::string> promise) override;
RldpIn(td::actor::ActorId<adnl::AdnlPeerTable> adnl) : adnl_(adnl) { RldpIn(td::actor::ActorId<adnl::AdnlPeerTable> adnl) : adnl_(adnl) {
} }

View file

@ -217,6 +217,10 @@ void RldpIn::add_id(adnl::AdnlNodeIdShort local_id) {
local_ids_.insert(local_id); local_ids_.insert(local_id);
} }
void RldpIn::get_conn_ip_str(adnl::AdnlNodeIdShort l_id, adnl::AdnlNodeIdShort p_id, td::Promise<td::string> promise) {
td::actor::send_closure(adnl_, &adnl::AdnlPeerTable::get_conn_ip_str, l_id, p_id, std::move(promise));
}
std::unique_ptr<adnl::Adnl::Callback> RldpIn::make_adnl_callback() { std::unique_ptr<adnl::Adnl::Callback> RldpIn::make_adnl_callback() {
class Callback : public adnl::Adnl::Callback { class Callback : public adnl::Adnl::Callback {
private: private:

View file

@ -95,7 +95,7 @@ class PeerManager : public td::actor::Actor {
} }
}; };
send_closure(overlays_, &ton::overlay::Overlays::create_public_overlay, adnl_id_, overlay_id_.clone(), send_closure(overlays_, &ton::overlay::Overlays::create_public_overlay, adnl_id_, overlay_id_.clone(),
std::make_unique<Callback>(), rules); std::make_unique<Callback>(), rules, "{ \"type\": \"storage\" }");
} }
void tear_down() override { void tear_down() override {
send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, adnl_id_, overlay_id_.compute_short_id()); send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, adnl_id_, overlay_id_.compute_short_id());

View file

@ -620,8 +620,9 @@ engine.validator.proposalVote perm_key:int256 to_send:bytes = engine.validator.P
engine.validator.dhtServerStatus id:int256 status:int = engine.validator.DhtServerStatus; engine.validator.dhtServerStatus id:int256 status:int = engine.validator.DhtServerStatus;
engine.validator.dhtServersStatus servers:(vector engine.validator.dhtServerStatus) = engine.validator.DhtServersStatus; engine.validator.dhtServersStatus servers:(vector engine.validator.dhtServerStatus) = engine.validator.DhtServersStatus;
engine.validator.overlayStats overlay_id:int256 overlay_id_full:PublicKey adnl_id:int256 nodes:(vector adnl.id.short) engine.validator.overlayStatsNode adnl_id:int256 ip_addr:string bdcst_errors:int fec_bdcst_errors:int last_in_query:int last_out_query:int t_out_bytes:int t_in_bytes:int t_out_pckts:int t_in_pckts:int = engine.validator.OverlayStatsNode;
stats:(vector engine.validator.oneStat) = engine.validator.OverlayStats;
engine.validator.overlayStats overlay_id:int256 overlay_id_full:PublicKey adnl_id:int256 scope:string nodes:(vector engine.validator.overlayStatsNode) stats:(vector engine.validator.oneStat) = engine.validator.OverlayStats;
engine.validator.overlaysStats overlays:(vector engine.validator.overlayStats) = engine.validator.OverlaysStats; engine.validator.overlaysStats overlays:(vector engine.validator.overlayStats) = engine.validator.OverlaysStats;

Binary file not shown.

View file

@ -34,6 +34,7 @@
#include "overlay/overlays.h" #include "overlay/overlays.h"
#include <cctype> #include <cctype>
#include <fstream>
Tokenizer::Tokenizer(td::BufferSlice data) : data_(std::move(data)) { Tokenizer::Tokenizer(td::BufferSlice data) : data_(std::move(data)) {
remaining_ = data_.as_slice(); remaining_ = data_.as_slice();
@ -835,11 +836,25 @@ td::Status GetOverlaysStatsQuery::receive(td::BufferSlice data) {
"received incorrect answer: "); "received incorrect answer: ");
for (auto &s : f->overlays_) { for (auto &s : f->overlays_) {
td::StringBuilder sb; td::StringBuilder sb;
sb << "overlay_id=" << s->overlay_id_ << " adnl_id=" << s->adnl_id_ << "\n"; sb << "overlay_id: " << s->overlay_id_ << " adnl_id: " << s->adnl_id_ << " scope: " << s->scope_ << "\n";
sb << " nodes:"; sb << " nodes:\n";
td::uint32 overlay_t_out_bytes = 0;
td::uint32 overlay_t_out_pckts = 0;
td::uint32 overlay_t_in_bytes = 0;
td::uint32 overlay_t_in_pckts = 0;
for (auto &n : s->nodes_) { for (auto &n : s->nodes_) {
sb << " " << n->id_ << "\n"; sb << " adnl_id: " << n->adnl_id_ << " ip_addr: " << n->ip_addr_ << " broadcast_errors: " << n->bdcst_errors_ << " fec_broadcast_errors: " << n->fec_bdcst_errors_ << " last_in_query: " << n->last_in_query_ << " (" << time_to_human(n->last_in_query_) << ")" << " last_out_query: " << n->last_out_query_ << " (" << time_to_human(n->last_out_query_) << ")" << "\n throughput:\n out: " << n->t_out_bytes_ << " bytes/sec, " << n->t_out_pckts_ << " pckts/sec\n in: " << n->t_in_bytes_ << " bytes/sec, " << n->t_in_pckts_ << " pckts/sec\n";
overlay_t_out_bytes += n->t_out_bytes_;
overlay_t_out_pckts += n->t_out_pckts_;
overlay_t_in_bytes += n->t_in_bytes_;
overlay_t_in_pckts += n->t_in_pckts_;
} }
sb << " total_throughput:\n out: " << overlay_t_out_bytes << " bytes/sec, " << overlay_t_out_pckts << " pckts/sec\n in: " << overlay_t_in_bytes << " bytes/sec, " << overlay_t_in_pckts << " pckts/sec\n";
sb << " stats:\n"; sb << " stats:\n";
for (auto &t : s->stats_) { for (auto &t : s->stats_) {
sb << " " << t->key_ << "\t" << t->value_ << "\n"; sb << " " << t->key_ << "\t" << t->value_ << "\n";
@ -849,6 +864,82 @@ td::Status GetOverlaysStatsQuery::receive(td::BufferSlice data) {
return td::Status::OK(); return td::Status::OK();
} }
td::Status GetOverlaysStatsJsonQuery::run() {
TRY_RESULT_ASSIGN(file_name_, tokenizer_.get_token<std::string>());
TRY_STATUS(tokenizer_.check_endl());
return td::Status::OK();
}
td::Status GetOverlaysStatsJsonQuery::send() {
auto b = ton::create_serialize_tl_object<ton::ton_api::engine_validator_getOverlaysStats>();
td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise());
return td::Status::OK();
}
td::Status GetOverlaysStatsJsonQuery::receive(td::BufferSlice data) {
TRY_RESULT_PREFIX(f, ton::fetch_tl_object<ton::ton_api::engine_validator_overlaysStats>(data.as_slice(), true),
"received incorrect answer: ");
std::ofstream sb(file_name_);
sb << "[\n";
bool rtail = false;
for (auto &s : f->overlays_) {
if(rtail) {
sb << ",\n";
} else {
rtail = true;
}
sb << "{\n \"overlay_id\": \"" << s->overlay_id_ << "\",\n \"adnl_id\": \"" << s->adnl_id_ << "\",\n \"scope\": " << s->scope_ << ",\n";
sb << " \"nodes\": [\n";
td::uint32 overlay_t_out_bytes = 0;
td::uint32 overlay_t_out_pckts = 0;
td::uint32 overlay_t_in_bytes = 0;
td::uint32 overlay_t_in_pckts = 0;
bool tail = false;
for (auto &n : s->nodes_) {
if(tail) {
sb << ",\n";
} else {
tail = true;
}
sb << " {\n \"adnl_id\": \"" << n->adnl_id_ << "\",\n \"ip_addr\": \"" << n->ip_addr_ << "\",\n \"broadcast_errors\": " << n->bdcst_errors_ << ",\n \"fec_broadcast_errors\": " << n->fec_bdcst_errors_ << ",\n \"last_in_query_unix\": " << n->last_in_query_ << ",\n \"last_in_query_human\": \"" << time_to_human(n->last_in_query_) << "\",\n" << " \"last_out_query_unix\": " << n->last_out_query_ << ",\n \"last_out_query_human\": \"" << time_to_human(n->last_out_query_) << "\",\n" << "\n \"throughput\": { \"out_bytes_sec\": " << n->t_out_bytes_ << ", \"out_pckts_sec\": " << n->t_out_pckts_ << ", \"in_bytes_sec\": " << n->t_in_bytes_ << ", \"in_pckts_sec\": " << n->t_in_pckts_ << " }\n }";
overlay_t_out_bytes += n->t_out_bytes_;
overlay_t_out_pckts += n->t_out_pckts_;
overlay_t_in_bytes += n->t_in_bytes_;
overlay_t_in_pckts += n->t_in_pckts_;
}
sb << " ],\n";
sb << " \"total_throughput\": { \"out_bytes_sec\": " << overlay_t_out_bytes << ", \"out_pckts_sec\": " << overlay_t_out_pckts << ", \"in_bytes_sec\": " << overlay_t_in_bytes << ", \"in_pckts_sec\": " << overlay_t_in_pckts << " },\n";
sb << " \"stats\": {\n";
tail = false;
for (auto &t : s->stats_) {
if(tail) {
sb << ",\n";
} else {
tail = true;
}
sb << " \"" << t->key_ << "\": \"" << t->value_ << "\"";
}
sb << "\n }\n";
sb << "}\n";
}
sb << "]\n";
sb << std::flush;
td::TerminalIO::output(std::string("wrote stats to " + file_name_ + "\n"));
return td::Status::OK();
}
td::Status ImportCertificateQuery::receive(td::BufferSlice data) { td::Status ImportCertificateQuery::receive(td::BufferSlice data) {
TRY_RESULT_PREFIX(f, ton::fetch_tl_object<ton::ton_api::engine_validator_success>(data.as_slice(), true), TRY_RESULT_PREFIX(f, ton::fetch_tl_object<ton::ton_api::engine_validator_success>(data.as_slice(), true),

View file

@ -918,11 +918,51 @@ class GetOverlaysStatsQuery : public Query {
static std::string get_help() { static std::string get_help() {
return "getoverlaysstats\tgets stats for all overlays"; return "getoverlaysstats\tgets stats for all overlays";
} }
static std::string time_to_human(int unixtime) {
char time_buffer[80];
time_t rawtime = unixtime;
struct tm tInfo;
struct tm* timeinfo = localtime_r(&rawtime, &tInfo);
assert(timeinfo == &tInfo);
strftime(time_buffer, 80, "%c", timeinfo);
return std::string(time_buffer);
}
std::string name() const override { std::string name() const override {
return get_name(); return get_name();
} }
}; };
class GetOverlaysStatsJsonQuery : public Query {
public:
GetOverlaysStatsJsonQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer)
: Query(console, std::move(tokenizer)) {
}
td::Status run() override;
td::Status send() override;
td::Status receive(td::BufferSlice data) override;
static std::string get_name() {
return "getoverlaysstatsjson";
}
static std::string get_help() {
return "getoverlaysstatsjson <outfile>\tgets stats for all overlays and writes to json file";
}
static std::string time_to_human(int unixtime) {
char time_buffer[80];
time_t rawtime = unixtime;
struct tm tInfo;
struct tm* timeinfo = localtime_r(&rawtime, &tInfo);
assert(timeinfo == &tInfo);
strftime(time_buffer, 80, "%c", timeinfo);
return std::string(time_buffer);
}
std::string name() const override {
return get_name();
}
private:
std::string file_name_;
};
class SignCertificateQuery : public Query { class SignCertificateQuery : public Query {
public: public:
SignCertificateQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer) SignCertificateQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer)

View file

@ -137,6 +137,7 @@ void ValidatorEngineConsole::run() {
add_query_runner(std::make_unique<QueryRunnerImpl<SignCertificateQuery>>()); add_query_runner(std::make_unique<QueryRunnerImpl<SignCertificateQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<ImportCertificateQuery>>()); add_query_runner(std::make_unique<QueryRunnerImpl<ImportCertificateQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<GetOverlaysStatsQuery>>()); add_query_runner(std::make_unique<QueryRunnerImpl<GetOverlaysStatsQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<GetOverlaysStatsJsonQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<ImportShardOverlayCertificateQuery>>()); add_query_runner(std::make_unique<QueryRunnerImpl<ImportShardOverlayCertificateQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<SignShardOverlayCertificateQuery>>()); add_query_runner(std::make_unique<QueryRunnerImpl<SignShardOverlayCertificateQuery>>());
} }

View file

@ -93,7 +93,7 @@ void FullNodeShardImpl::create_overlay() {
}; };
td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay, adnl_id_, overlay_id_full_.clone(), td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay, adnl_id_, overlay_id_full_.clone(),
std::make_unique<Callback>(actor_id(this)), rules_); std::make_unique<Callback>(actor_id(this)), rules_, PSTRING() << "{ \"type\": \"shard\", \"shard_id\": " << get_shard() << ", \"workchain_id\": " << get_workchain() << " }");
td::actor::send_closure(rldp_, &rldp::Rldp::add_id, adnl_id_); td::actor::send_closure(rldp_, &rldp::Rldp::add_id, adnl_id_);
if (cert_) { if (cert_) {