1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Accelerator, part 1 (#1119)

This commit contains some parts of https://github.com/ton-blockchain/ton/tree/accelerator
This is auxiliary code that mostly does not change node behavior.

1) Semiprivate overlays and other improvements in overlays code
2) Rename actual_min_split -> monitor_min_split, fix building shard overlays
3) Loading block candidates by block id from DB, fix accept_block after validator restart
4) Cells: ProofStorageStat and changes in CellUsageTree
5) Remove some unused code, other minor changes
This commit is contained in:
SpyCheese 2024-08-23 11:46:40 +03:00 committed by GitHub
parent 9a10f79fba
commit 908415d00b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
66 changed files with 2221 additions and 638 deletions

View file

@ -68,7 +68,7 @@ td::Status BroadcastSimple::run_checks() {
td::Status BroadcastSimple::distribute() {
auto B = serialize();
auto nodes = overlay_->get_neighbours(3);
auto nodes = overlay_->get_neighbours(overlay_->propagate_broadcast_to());
auto manager = overlay_->overlay_manager();
for (auto &n : nodes) {
@ -115,7 +115,8 @@ td::Status BroadcastSimple::run() {
return run_continue();
}
td::Status BroadcastSimple::create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, tl_object_ptr<ton_api::overlay_broadcast> broadcast) {
td::Status BroadcastSimple::create(OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id,
tl_object_ptr<ton_api::overlay_broadcast> broadcast) {
auto src = PublicKey{broadcast->src_};
auto data_hash = sha256_bits256(broadcast->data_.as_slice());
auto broadcast_hash = compute_broadcast_id(src, data_hash, broadcast->flags_);

View file

@ -112,7 +112,7 @@ td::Status BroadcastFec::distribute_part(td::uint32 seqno) {
td::BufferSlice data_short = std::move(tls.first);
td::BufferSlice data = std::move(tls.second);
auto nodes = overlay_->get_neighbours(5);
auto nodes = overlay_->get_neighbours(overlay_->propagate_broadcast_to());
auto manager = overlay_->overlay_manager();
for (auto &n : nodes) {

View file

@ -21,8 +21,14 @@
#include "auto/tl/ton_api.h"
#include "adnl/adnl-node-id.hpp"
#include "overlay/overlays.h"
#include "td/utils/SharedSlice.h"
#include "td/utils/buffer.h"
#include "td/utils/overloaded.h"
#include "keys/encryptor.h"
#include "td/utils/port/StdStreams.h"
#include "td/utils/unique_ptr.h"
#include <limits>
#include <memory>
namespace ton {
@ -30,18 +36,30 @@ namespace overlay {
class OverlayNode {
public:
explicit OverlayNode(adnl::AdnlNodeIdShort self_id, OverlayIdShort overlay) {
explicit OverlayNode(adnl::AdnlNodeIdShort self_id, OverlayIdShort overlay, td::uint32 flags) {
source_ = self_id;
overlay_ = overlay;
flags_ = flags;
version_ = static_cast<td::int32>(td::Clocks::system());
}
static td::Result<OverlayNode> create(const tl_object_ptr<ton_api::overlay_node> &node) {
TRY_RESULT(source, adnl::AdnlNodeIdFull::create(node->id_));
return OverlayNode{source, OverlayIdShort{node->overlay_}, node->version_, node->signature_.as_slice()};
return OverlayNode{source, OverlayIdShort{node->overlay_}, 0, node->version_, node->signature_.as_slice()};
}
OverlayNode(td::Variant<adnl::AdnlNodeIdFull, adnl::AdnlNodeIdShort> source, OverlayIdShort overlay,
static td::Result<OverlayNode> create(const tl_object_ptr<ton_api::overlay_nodeV2> &node) {
TRY_RESULT(source, adnl::AdnlNodeIdFull::create(node->id_));
auto res = OverlayNode{source, OverlayIdShort{node->overlay_}, (td::uint32)node->flags_, node->version_,
node->signature_.as_slice()};
res.update_certificate(OverlayMemberCertificate(node->certificate_.get()));
return res;
}
OverlayNode(td::Variant<adnl::AdnlNodeIdFull, adnl::AdnlNodeIdShort> source, OverlayIdShort overlay, td::uint32 flags,
td::int32 version, td::Slice signature)
: source_(std::move(source)), overlay_(overlay), version_(version), signature_(td::SharedSlice(signature)) {
: source_(std::move(source))
, overlay_(overlay)
, flags_(flags)
, version_(version)
, signature_(td::SharedSlice(signature)) {
}
OverlayNode(td::Variant<adnl::AdnlNodeIdFull, adnl::AdnlNodeIdShort> source, OverlayIdShort overlay,
td::int32 version, td::SharedSlice signature)
@ -64,10 +82,17 @@ class OverlayNode {
}
td::BufferSlice to_sign() const {
auto obj = create_tl_object<ton_api::overlay_node_toSign>(nullptr, overlay_.tl(), version_);
source_.visit(td::overloaded([&](const adnl::AdnlNodeIdShort &id) { obj->id_ = id.tl(); },
[&](const adnl::AdnlNodeIdFull &id) { obj->id_ = id.compute_short_id().tl(); }));
return serialize_tl_object(obj, true);
if (flags_ == 0) {
auto obj = create_tl_object<ton_api::overlay_node_toSign>(nullptr, overlay_.tl(), version_);
source_.visit(td::overloaded([&](const adnl::AdnlNodeIdShort &id) { obj->id_ = id.tl(); },
[&](const adnl::AdnlNodeIdFull &id) { obj->id_ = id.compute_short_id().tl(); }));
return serialize_tl_object(obj, true);
} else {
auto obj = create_tl_object<ton_api::overlay_node_toSignEx>(nullptr, overlay_.tl(), flags_, version_);
source_.visit(td::overloaded([&](const adnl::AdnlNodeIdShort &id) { obj->id_ = id.tl(); },
[&](const adnl::AdnlNodeIdFull &id) { obj->id_ = id.compute_short_id().tl(); }));
return serialize_tl_object(obj, true);
}
}
void update_adnl_id(adnl::AdnlNodeIdFull node_id) {
source_ = node_id;
@ -81,6 +106,9 @@ class OverlayNode {
td::int32 version() const {
return version_;
}
td::uint32 flags() const {
return flags_;
}
td::BufferSlice signature() const {
return signature_.clone_as_buffer_slice();
}
@ -103,15 +131,69 @@ class OverlayNode {
[&](const adnl::AdnlNodeIdFull &id) { obj->id_ = id.tl(); }));
return obj;
}
tl_object_ptr<ton_api::overlay_nodeV2> tl_v2() const {
tl_object_ptr<ton_api::overlay_MemberCertificate> cert;
if (cert_ && !cert_->empty()) {
cert = cert_->tl();
} else {
cert = create_tl_object<ton_api::overlay_emptyMemberCertificate>();
}
auto obj = create_tl_object<ton_api::overlay_nodeV2>(nullptr, overlay_.tl(), flags_, version_,
signature_.clone_as_buffer_slice(), std::move(cert));
source_.visit(td::overloaded([&](const adnl::AdnlNodeIdShort &id) { UNREACHABLE(); },
[&](const adnl::AdnlNodeIdFull &id) { obj->id_ = id.tl(); }));
return obj;
}
OverlayNode clone() const {
return OverlayNode{source_, overlay_, version_, signature_.clone()};
auto res = OverlayNode{source_, overlay_, version_, signature_.clone()};
if (cert_) {
res.cert_ = td::make_unique<OverlayMemberCertificate>(*cert_);
}
return res;
}
const OverlayMemberCertificate *certificate() const {
if (cert_) {
return cert_.get();
}
return &empty_certificate_;
}
void update_certificate(OverlayMemberCertificate cert) {
if (!cert_ || cert_->empty() || cert_->is_expired() || cert.is_newer(*cert_)) {
cert_ = td::make_unique<OverlayMemberCertificate>(std::move(cert));
}
}
void update(OverlayNode from) {
if (version_ < from.version_) {
source_ = from.source_;
overlay_ = from.overlay_;
flags_ = from.flags_;
version_ = from.version_;
signature_ = from.signature_.clone();
}
if (from.cert_ && !from.cert_->empty()) {
update_certificate(std::move(*from.cert_));
}
}
void clear_certificate() {
cert_ = nullptr;
}
bool has_full_id() const {
return source_.get_offset() == source_.offset<adnl::AdnlNodeIdFull>();
}
private:
td::Variant<adnl::AdnlNodeIdFull, adnl::AdnlNodeIdShort> source_;
OverlayIdShort overlay_;
td::uint32 flags_;
td::int32 version_;
td::unique_ptr<OverlayMemberCertificate> cert_;
td::SharedSlice signature_;
static const OverlayMemberCertificate empty_certificate_;
};
} // namespace overlay

View file

@ -18,6 +18,7 @@
*/
#include "overlay-manager.h"
#include "auto/tl/ton_api.h"
#include "auto/tl/ton_api.hpp"
#include "overlay.h"
#include "adnl/utils.hpp"
@ -28,9 +29,9 @@
#include "td/db/RocksDb.h"
#include "td/utils/Status.h"
#include "td/utils/buffer.h"
#include "td/utils/overloaded.h"
#include "keys/encryptor.h"
#include "td/utils/port/Poll.h"
#include <vector>
@ -42,13 +43,13 @@ void OverlayManager::update_dht_node(td::actor::ActorId<dht::Dht> dht) {
dht_node_ = dht;
for (auto &X : overlays_) {
for (auto &Y : X.second) {
td::actor::send_closure(Y.second, &Overlay::update_dht_node, dht);
td::actor::send_closure(Y.second.overlay, &Overlay::update_dht_node, dht);
}
}
}
void OverlayManager::register_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
td::actor::ActorOwn<Overlay> overlay) {
OverlayMemberCertificate cert, td::actor::ActorOwn<Overlay> overlay) {
auto it = overlays_.find(local_id);
VLOG(OVERLAY_INFO) << this << ": registering overlay " << overlay_id << "@" << local_id;
if (it == overlays_.end()) {
@ -58,19 +59,34 @@ void OverlayManager::register_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdS
td::actor::send_closure(adnl_, &adnl::Adnl::subscribe, local_id,
adnl::Adnl::int_to_bytestring(ton_api::overlay_query::ID),
std::make_unique<AdnlCallback>(actor_id(this)));
td::actor::send_closure(adnl_, &adnl::Adnl::subscribe, local_id,
adnl::Adnl::int_to_bytestring(ton_api::overlay_messageWithExtra::ID),
std::make_unique<AdnlCallback>(actor_id(this)));
td::actor::send_closure(adnl_, &adnl::Adnl::subscribe, local_id,
adnl::Adnl::int_to_bytestring(ton_api::overlay_queryWithExtra::ID),
std::make_unique<AdnlCallback>(actor_id(this)));
}
overlays_[local_id][overlay_id] = std::move(overlay);
overlays_[local_id][overlay_id] = OverlayDescription{std::move(overlay), std::move(cert)};
auto P = td::PromiseCreator::lambda([id = overlays_[local_id][overlay_id].get()](td::Result<DbType::GetResult> R) {
R.ensure();
auto value = R.move_as_ok();
if (value.status == td::KeyValue::GetStatus::Ok) {
auto F = fetch_tl_object<ton_api::overlay_db_nodes>(std::move(value.value), true);
F.ensure();
auto nodes = std::move(F.move_as_ok()->nodes_);
td::actor::send_closure(id, &Overlay::receive_nodes_from_db, std::move(nodes));
}
});
auto P =
td::PromiseCreator::lambda([id = overlays_[local_id][overlay_id].overlay.get()](td::Result<DbType::GetResult> R) {
R.ensure();
auto value = R.move_as_ok();
if (value.status == td::KeyValue::GetStatus::Ok) {
auto F = fetch_tl_object<ton_api::overlay_db_Nodes>(std::move(value.value), true);
F.ensure();
ton_api::downcast_call(
*F.move_as_ok(), td::overloaded(
[&](ton_api::overlay_db_nodes &V) {
auto nodes = std::move(V.nodes_);
td::actor::send_closure(id, &Overlay::receive_nodes_from_db, std::move(nodes));
},
[&](ton_api::overlay_db_nodesV2 &V) {
auto nodes = std::move(V.nodes_);
td::actor::send_closure(id, &Overlay::receive_nodes_from_db_v2, std::move(nodes));
}));
}
});
auto key = create_hash_tl_object<ton_api::overlay_db_key_nodes>(local_id.bits256_value(), overlay_id.bits256_value());
db_.get(key, std::move(P));
}
@ -84,6 +100,10 @@ void OverlayManager::delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdSho
adnl::Adnl::int_to_bytestring(ton_api::overlay_message::ID));
td::actor::send_closure(adnl_, &adnl::Adnl::unsubscribe, local_id,
adnl::Adnl::int_to_bytestring(ton_api::overlay_query::ID));
td::actor::send_closure(adnl_, &adnl::Adnl::unsubscribe, local_id,
adnl::Adnl::int_to_bytestring(ton_api::overlay_messageWithExtra::ID));
td::actor::send_closure(adnl_, &adnl::Adnl::unsubscribe, local_id,
adnl::Adnl::int_to_bytestring(ton_api::overlay_queryWithExtra::ID));
overlays_.erase(it);
}
}
@ -101,74 +121,113 @@ void OverlayManager::create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, Ov
td::string scope, OverlayOptions opts) {
CHECK(!dht_node_.empty());
auto id = overlay_id.compute_short_id();
register_overlay(local_id, id,
Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id),
std::move(callback), std::move(rules), scope, std::move(opts)));
register_overlay(local_id, id, OverlayMemberCertificate{},
Overlay::create_public(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id),
std::move(callback), std::move(rules), scope, std::move(opts)));
}
void OverlayManager::create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules,
std::string scope) {
create_private_overlay_ex(local_id, std::move(overlay_id), std::move(nodes), std::move(callback), std::move(rules),
std::move(scope), {});
}
void OverlayManager::create_private_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules,
std::string scope, OverlayOptions opts) {
auto id = overlay_id.compute_short_id();
register_overlay(local_id, id,
Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id),
std::move(nodes), std::move(callback), std::move(rules), std::move(scope)));
register_overlay(local_id, id, OverlayMemberCertificate{},
Overlay::create_private(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id),
std::move(nodes), std::move(callback), std::move(rules), std::move(scope),
std::move(opts)));
}
void OverlayManager::create_semiprivate_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes,
std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate certificate,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules,
td::string scope, OverlayOptions opts) {
auto id = overlay_id.compute_short_id();
register_overlay(
local_id, id, certificate,
Overlay::create_semiprivate(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id),
std::move(nodes), std::move(root_public_keys), certificate, std::move(callback),
std::move(rules), std::move(scope), std::move(opts)));
}
void OverlayManager::receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data) {
auto R = fetch_tl_prefix<ton_api::overlay_message>(data, true);
if (R.is_error()) {
VLOG(OVERLAY_WARNING) << this << ": can not parse overlay message: " << R.move_as_error();
return;
OverlayIdShort overlay_id;
tl_object_ptr<ton_api::overlay_messageExtra> extra;
auto R = fetch_tl_prefix<ton_api::overlay_messageWithExtra>(data, true);
if (R.is_ok()) {
overlay_id = OverlayIdShort{R.ok()->overlay_};
extra = std::move(R.ok()->extra_);
} else {
auto R2 = fetch_tl_prefix<ton_api::overlay_message>(data, true);
if (R2.is_ok()) {
overlay_id = OverlayIdShort{R2.ok()->overlay_};
} else {
VLOG(OVERLAY_WARNING) << this << ": can not parse overlay message [" << src << "->" << dst
<< "]: " << R2.move_as_error();
return;
}
}
auto M = R.move_as_ok();
auto it = overlays_.find(dst);
if (it == overlays_.end()) {
VLOG(OVERLAY_NOTICE) << this << ": message to unknown overlay " << M->overlay_ << "@" << dst;
VLOG(OVERLAY_NOTICE) << this << ": message to unknown overlay " << overlay_id << "@" << dst;
return;
}
auto it2 = it->second.find(OverlayIdShort{M->overlay_});
auto it2 = it->second.find(overlay_id);
if (it2 == it->second.end()) {
VLOG(OVERLAY_NOTICE) << this << ": message to localid is not in overlay " << M->overlay_ << "@" << dst;
VLOG(OVERLAY_NOTICE) << this << ": message to localid is not in overlay " << overlay_id << "@" << dst;
return;
}
td::actor::send_closure(it2->second, &Overlay::update_throughput_in_ctr, src, (td::uint32)data.size(), false);
td::actor::send_closure(it2->second, &Overlay::receive_message, src, std::move(data));
td::actor::send_closure(it2->second.overlay, &Overlay::update_throughput_in_ctr, src, (td::uint32)data.size(), false);
td::actor::send_closure(it2->second.overlay, &Overlay::receive_message, src, std::move(extra), std::move(data));
}
void OverlayManager::receive_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) {
auto R = fetch_tl_prefix<ton_api::overlay_query>(data, true);
if (R.is_error()) {
VLOG(OVERLAY_WARNING) << this << ": can not parse overlay query [" << src << "->" << dst
<< "]: " << R.move_as_error();
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "bad overlay query header"));
return;
OverlayIdShort overlay_id;
tl_object_ptr<ton_api::overlay_messageExtra> extra;
auto R = fetch_tl_prefix<ton_api::overlay_queryWithExtra>(data, true);
if (R.is_ok()) {
overlay_id = OverlayIdShort{R.ok()->overlay_};
extra = std::move(R.ok()->extra_);
} else {
auto R2 = fetch_tl_prefix<ton_api::overlay_query>(data, true);
if (R2.is_ok()) {
overlay_id = OverlayIdShort{R2.ok()->overlay_};
} else {
VLOG(OVERLAY_WARNING) << this << ": can not parse overlay query [" << src << "->" << dst
<< "]: " << R2.move_as_error();
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "bad overlay query header"));
return;
}
}
auto M = R.move_as_ok();
auto it = overlays_.find(dst);
if (it == overlays_.end()) {
VLOG(OVERLAY_NOTICE) << this << ": query to unknown overlay " << M->overlay_ << "@" << dst << " from " << src;
VLOG(OVERLAY_NOTICE) << this << ": query to unknown overlay " << overlay_id << "@" << dst << " from " << src;
promise.set_error(td::Status::Error(ErrorCode::protoviolation, PSTRING() << "bad local_id " << dst));
return;
}
auto it2 = it->second.find(OverlayIdShort{M->overlay_});
auto it2 = it->second.find(overlay_id);
if (it2 == it->second.end()) {
VLOG(OVERLAY_NOTICE) << this << ": query to localid not in overlay " << M->overlay_ << "@" << dst << " from " << src;
promise.set_error(td::Status::Error(ErrorCode::protoviolation, PSTRING() << "bad overlay_id " << M->overlay_));
VLOG(OVERLAY_NOTICE) << this << ": query to localid not in overlay " << overlay_id << "@" << dst << " from " << src;
promise.set_error(td::Status::Error(ErrorCode::protoviolation, PSTRING() << "bad overlay_id " << overlay_id));
return;
}
td::actor::send_closure(it2->second, &Overlay::update_throughput_in_ctr, src, (td::uint32)data.size(), true);
td::actor::send_closure(it2->second, &Overlay::receive_query, src, std::move(data), std::move(promise));
td::actor::send_closure(it2->second.overlay, &Overlay::update_throughput_in_ctr, src, (td::uint32)data.size(), true);
td::actor::send_closure(it2->second.overlay, &Overlay::receive_query, src, std::move(extra), std::move(data),
std::move(promise));
}
void OverlayManager::send_query_via(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id,
@ -176,35 +235,64 @@ void OverlayManager::send_query_via(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdS
td::BufferSlice query, td::uint64 max_answer_size,
td::actor::ActorId<adnl::AdnlSenderInterface> via) {
CHECK(query.size() <= adnl::Adnl::huge_packet_max_size());
auto extra = create_tl_object<ton_api::overlay_messageExtra>();
extra->flags_ = 0;
auto it = overlays_.find(src);
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
td::actor::send_closure(it2->second, &Overlay::update_throughput_out_ctr, dst, (td::uint32)query.size(), true);
td::actor::send_closure(it2->second.overlay, &Overlay::update_throughput_out_ctr, dst, (td::uint32)query.size(),
true);
if (!it2->second.member_certificate.empty()) {
extra->flags_ |= 1;
extra->certificate_ = it2->second.member_certificate.tl();
}
}
}
td::actor::send_closure(
via, &adnl::AdnlSenderInterface::send_query_ex, src, dst, std::move(name), std::move(promise), timeout,
create_serialize_tl_object_suffix<ton_api::overlay_query>(query.as_slice(), overlay_id.tl()), max_answer_size);
auto extra_flags = extra->flags_;
td::BufferSlice serialized_query =
(extra_flags ? create_serialize_tl_object_suffix<ton_api::overlay_queryWithExtra>(
query.as_slice(), overlay_id.tl(), std::move(extra))
: create_serialize_tl_object_suffix<ton_api::overlay_query>(query.as_slice(), overlay_id.tl()));
td::actor::send_closure(via, &adnl::AdnlSenderInterface::send_query_ex, src, dst, std::move(name), std::move(promise),
timeout, std::move(serialized_query), max_answer_size);
}
void OverlayManager::send_message_via(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id,
td::BufferSlice object, td::actor::ActorId<adnl::AdnlSenderInterface> via) {
CHECK(object.size() <= adnl::Adnl::huge_packet_max_size());
auto extra = create_tl_object<ton_api::overlay_messageExtra>();
extra->flags_ = 0;
auto it = overlays_.find(src);
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
td::actor::send_closure(it2->second, &Overlay::update_throughput_out_ctr, dst, (td::uint32)object.size(), false);
td::actor::send_closure(it2->second.overlay, &Overlay::update_throughput_out_ctr, dst, (td::uint32)object.size(),
false);
if (!it2->second.member_certificate.empty()) {
// do not send certificate here, we hope that all our neighbours already know of out certificate
// we send it every second to some random nodes. Here we don't want to increase the size of the message
if (false) {
extra->flags_ |= 1;
extra->certificate_ = it2->second.member_certificate.tl();
}
}
}
}
td::actor::send_closure(
via, &adnl::AdnlSenderInterface::send_message, src, dst,
create_serialize_tl_object_suffix<ton_api::overlay_message>(object.as_slice(), overlay_id.tl()));
auto extra_flags = extra->flags_;
td::BufferSlice serialized_message =
(extra_flags ? create_serialize_tl_object_suffix<ton_api::overlay_messageWithExtra>(
object.as_slice(), overlay_id.tl(), std::move(extra))
: create_serialize_tl_object_suffix<ton_api::overlay_message>(object.as_slice(), overlay_id.tl()));
td::actor::send_closure(via, &adnl::AdnlSenderInterface::send_message, src, dst, std::move(serialized_message));
}
void OverlayManager::send_broadcast(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id, td::BufferSlice object) {
@ -218,7 +306,7 @@ void OverlayManager::send_broadcast_ex(adnl::AdnlNodeIdShort local_id, OverlayId
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
td::actor::send_closure(it2->second, &Overlay::send_broadcast, send_as, flags, std::move(object));
td::actor::send_closure(it2->second.overlay, &Overlay::send_broadcast, send_as, flags, std::move(object));
}
}
}
@ -235,7 +323,7 @@ void OverlayManager::send_broadcast_fec_ex(adnl::AdnlNodeIdShort local_id, Overl
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
td::actor::send_closure(it2->second, &Overlay::send_broadcast_fec, send_as, flags, std::move(object));
td::actor::send_closure(it2->second.overlay, &Overlay::send_broadcast_fec, send_as, flags, std::move(object));
}
}
}
@ -246,7 +334,7 @@ void OverlayManager::set_privacy_rules(adnl::AdnlNodeIdShort local_id, OverlayId
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
td::actor::send_closure(it2->second, &Overlay::set_privacy_rules, std::move(rules));
td::actor::send_closure(it2->second.overlay, &Overlay::set_privacy_rules, std::move(rules));
}
}
}
@ -257,7 +345,34 @@ void OverlayManager::update_certificate(adnl::AdnlNodeIdShort local_id, OverlayI
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
td::actor::send_closure(it2->second, &Overlay::add_certificate, key, std::move(cert));
td::actor::send_closure(it2->second.overlay, &Overlay::add_certificate, key, std::move(cert));
}
}
}
void OverlayManager::update_member_certificate(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
OverlayMemberCertificate certificate) {
auto it = overlays_.find(local_id);
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
it2->second.member_certificate = certificate;
td::actor::send_closure(it2->second.overlay, &Overlay::update_member_certificate, certificate);
}
}
}
void OverlayManager::update_root_member_list(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes,
std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate certificate) {
auto it = overlays_.find(local_id);
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
it2->second.member_certificate = certificate;
td::actor::send_closure(it2->second.overlay, &Overlay::update_root_member_list, std::move(nodes),
std::move(root_public_keys), std::move(certificate));
}
}
}
@ -266,12 +381,16 @@ void OverlayManager::get_overlay_random_peers(adnl::AdnlNodeIdShort local_id, Ov
td::uint32 max_peers,
td::Promise<std::vector<adnl::AdnlNodeIdShort>> promise) {
auto it = overlays_.find(local_id);
if (it != overlays_.end()) {
auto it2 = it->second.find(overlay_id);
if (it2 != it->second.end()) {
td::actor::send_closure(it2->second, &Overlay::get_overlay_random_peers, max_peers, std::move(promise));
}
if (it == overlays_.end()) {
promise.set_error(td::Status::Error(PSTRING() << "no such local id " << local_id));
return;
}
auto it2 = it->second.find(overlay_id);
if (it2 == it->second.end()) {
promise.set_error(td::Status::Error(PSTRING() << "no such overlay " << overlay_id));
return;
}
td::actor::send_closure(it2->second.overlay, &Overlay::get_overlay_random_peers, max_peers, std::move(promise));
}
td::actor::ActorOwn<Overlays> Overlays::create(std::string db_root, td::actor::ActorId<keyring::Keyring> keyring,
@ -334,7 +453,7 @@ void OverlayManager::get_stats(td::Promise<tl_object_ptr<ton_api::engine_validat
for (auto &a : overlays_) {
for (auto &b : a.second) {
td::actor::send_closure(act, &Cb::incr_pending);
td::actor::send_closure(b.second, &Overlay::get_stats,
td::actor::send_closure(b.second.overlay, &Overlay::get_stats,
[act](td::Result<tl_object_ptr<ton_api::engine_validator_overlayStats>> R) {
if (R.is_ok()) {
td::actor::send_closure(act, &Cb::receive_answer, R.move_as_ok());
@ -348,6 +467,19 @@ void OverlayManager::get_stats(td::Promise<tl_object_ptr<ton_api::engine_validat
td::actor::send_closure(act, &Cb::decr_pending);
}
void OverlayManager::forget_peer(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay,
adnl::AdnlNodeIdShort peer_id) {
auto it = overlays_.find(local_id);
if (it == overlays_.end()) {
return;
}
auto it2 = it->second.find(overlay);
if (it2 == it->second.end()) {
return;
}
td::actor::send_closure(it2->second.overlay, &Overlay::forget_peer, peer_id);
}
Certificate::Certificate(PublicKey issued_by, td::int32 expire_at, td::uint32 max_size, td::uint32 flags,
td::BufferSlice signature)
: issued_by_(issued_by)
@ -454,6 +586,35 @@ tl_object_ptr<ton_api::overlay_Certificate> Certificate::empty_tl() {
return create_tl_object<ton_api::overlay_emptyCertificate>();
}
OverlayMemberCertificate::OverlayMemberCertificate(const ton_api::overlay_MemberCertificate *cert) {
if (!cert) {
expire_at_ = std::numeric_limits<td::int32>::max();
return;
}
if (cert->get_id() == ton_api::overlay_emptyMemberCertificate::ID) {
expire_at_ = std::numeric_limits<td::int32>::max();
return;
}
CHECK(cert->get_id() == ton_api::overlay_memberCertificate::ID);
const auto *real_cert = static_cast<const ton_api::overlay_memberCertificate *>(cert);
signed_by_ = PublicKey(real_cert->issued_by_);
flags_ = real_cert->flags_;
slot_ = real_cert->slot_;
expire_at_ = real_cert->expire_at_;
signature_ = td::SharedSlice(real_cert->signature_.as_slice());
}
td::Status OverlayMemberCertificate::check_signature(const adnl::AdnlNodeIdShort &node) {
if (is_expired()) {
return td::Status::Error(ErrorCode::notready, "certificate is expired");
}
td::BufferSlice data_to_sign = to_sign_data(node);
TRY_RESULT(encryptor, signed_by_.create_encryptor());
TRY_STATUS(encryptor->check_signature(data_to_sign.as_slice(), signature_.as_slice()));
return td::Status::OK();
}
} // namespace overlay
} // namespace ton

View file

@ -53,11 +53,19 @@ class OverlayManager : public Overlays {
void create_public_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules, td::string scope) override;
void create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules, td::string scope,
OverlayOptions opts) override;
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules, td::string scope,
OverlayOptions opts) override;
void create_semiprivate_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate certificate,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules, td::string scope,
OverlayOptions opts) override;
void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback,
OverlayPrivacyRules rules, std::string scope) override;
void create_private_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback,
OverlayPrivacyRules rules, std::string scope, OverlayOptions opts) override;
void delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id) override;
void send_query(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id, std::string name,
td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice query) override {
@ -84,6 +92,11 @@ class OverlayManager : public Overlays {
void set_privacy_rules(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id, OverlayPrivacyRules rules) override;
void update_certificate(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id, PublicKeyHash key,
std::shared_ptr<Certificate> cert) override;
void update_member_certificate(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
OverlayMemberCertificate certificate) override;
void update_root_member_list(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate certificate) override;
void get_overlay_random_peers(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay, td::uint32 max_peers,
td::Promise<std::vector<adnl::AdnlNodeIdShort>> promise) override;
@ -92,10 +105,12 @@ class OverlayManager : public Overlays {
td::Promise<td::BufferSlice> promise);
void receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data);
void register_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
void register_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id, OverlayMemberCertificate cert,
td::actor::ActorOwn<Overlay> overlay);
void get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_overlaysStats>> promise) override;
void forget_peer(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay, adnl::AdnlNodeIdShort peer_id) override;
struct PrintId {};
PrintId print_id() const {
@ -103,7 +118,11 @@ class OverlayManager : public Overlays {
}
private:
std::map<adnl::AdnlNodeIdShort, std::map<OverlayIdShort, td::actor::ActorOwn<Overlay>>> overlays_;
struct OverlayDescription {
td::actor::ActorOwn<Overlay> overlay;
OverlayMemberCertificate member_certificate;
};
std::map<adnl::AdnlNodeIdShort, std::map<OverlayIdShort, OverlayDescription>> overlays_;
std::string db_root_;

View file

@ -16,89 +16,165 @@
Copyright 2017-2020 Telegram Systems LLP
*/
#include "adnl/adnl-node-id.hpp"
#include "adnl/adnl-node.h"
#include "auto/tl/ton_api.h"
#include "overlay.hpp"
#include "td/utils/Status.h"
#include "td/utils/Time.h"
#include "td/utils/port/signals.h"
#include <algorithm>
#include <vector>
namespace ton {
namespace overlay {
void OverlayImpl::del_peer(adnl::AdnlNodeIdShort id) {
auto P = peers_.get(id);
CHECK(P != nullptr);
void OverlayImpl::del_peer(const adnl::AdnlNodeIdShort &id) {
auto P = peer_list_.peers_.get(id);
if (P == nullptr) {
return;
}
if (P->is_permanent_member()) {
VLOG(OVERLAY_DEBUG) << this << ": not deleting peer " << id << ": a permanent member";
return;
}
VLOG(OVERLAY_DEBUG) << this << ": deleting peer " << id;
if (P->is_neighbour()) {
VLOG(OVERLAY_INFO) << this << ": deleting neighbour " << id;
bool deleted = false;
for (auto &n : neighbours_) {
if (n == id) {
n = neighbours_[neighbours_.size() - 1];
neighbours_.resize(neighbours_.size() - 1);
deleted = true;
break;
}
}
CHECK(deleted);
P->set_neighbour(false);
del_from_neighbour_list(P);
}
peers_.remove(id);
bad_peers_.erase(id);
update_neighbours(0);
peer_list_.peers_.remove(id);
peer_list_.bad_peers_.erase(id);
}
void OverlayImpl::del_from_neighbour_list(OverlayPeer *P) {
CHECK(P);
if (!P->is_neighbour()) {
return;
}
auto id = P->get_id();
bool deleted = false;
auto &neighbours = peer_list_.neighbours_;
for (auto &n : neighbours) {
if (n == id) {
n = neighbours[neighbours.size() - 1];
neighbours.resize(neighbours.size() - 1);
deleted = true;
break;
}
}
CHECK(deleted);
P->set_neighbour(false);
}
void OverlayImpl::del_from_neighbour_list(const adnl::AdnlNodeIdShort &id) {
auto P = peer_list_.peers_.get(id);
CHECK(P != nullptr);
return del_from_neighbour_list(P);
}
void OverlayImpl::del_some_peers() {
if (!public_) {
if (overlay_type_ == OverlayType::FixedMemberList) {
return;
}
while (peers_.size() > max_peers()) {
const size_t max_iterations = 10;
size_t iteration_seqno = 0;
while (peer_list_.peers_.size() > max_peers() && iteration_seqno++ < max_iterations) {
OverlayPeer *P;
if (bad_peers_.empty()) {
if (peer_list_.bad_peers_.empty()) {
P = get_random_peer();
} else {
auto it = bad_peers_.upper_bound(next_bad_peer_);
if (it == bad_peers_.end()) {
it = bad_peers_.begin();
auto it = peer_list_.bad_peers_.upper_bound(peer_list_.next_bad_peer_);
if (it == peer_list_.bad_peers_.end()) {
it = peer_list_.bad_peers_.begin();
}
P = peers_.get(next_bad_peer_ = *it);
P = peer_list_.peers_.get(peer_list_.next_bad_peer_ = *it);
}
if (P) {
if (P && !P->is_permanent_member()) {
auto id = P->get_id();
del_peer(id);
}
}
update_neighbours(0);
}
void OverlayImpl::do_add_peer(OverlayNode node) {
auto id = node.adnl_id_short();
auto V = peers_.get(id);
if (V) {
VLOG(OVERLAY_DEBUG) << this << ": updating peer " << id << " up to version " << node.version();
V->update(std::move(node));
} else {
VLOG(OVERLAY_DEBUG) << this << ": adding peer " << id << " of version " << node.version();
peers_.insert(id, OverlayPeer(std::move(node)));
del_some_peers();
update_neighbours(0);
td::Status OverlayImpl::validate_peer_certificate(const adnl::AdnlNodeIdShort &node,
const OverlayMemberCertificate &cert) {
if (cert.empty()) {
if (is_persistent_node(node) || overlay_type_ == OverlayType::Public) {
return td::Status::OK();
}
return td::Status::Error(ErrorCode::protoviolation, "no member certificate found");
}
if (cert.is_expired()) {
return td::Status::Error(ErrorCode::timeout, "member certificate is expired");
}
if (cert.slot() < 0 || cert.slot() >= opts_.max_slaves_in_semiprivate_overlay_) {
return td::Status::Error(ErrorCode::timeout, "member certificate has invalid slot");
}
const auto &issued_by = cert.issued_by();
auto it = peer_list_.root_public_keys_.find(issued_by.compute_short_id());
if (it == peer_list_.root_public_keys_.end()) {
return td::Status::Error(ErrorCode::protoviolation, "member certificate is signed by unknown public key");
}
if (it->second.size() > (size_t)cert.slot()) {
auto &el = it->second[cert.slot()];
if (cert.expire_at() < el.expire_at) {
return td::Status::Error(ErrorCode::protoviolation,
"member certificate rejected, because we know of newer certificate at the same slot");
} else if (cert.expire_at() == el.expire_at) {
if (node < el.node) {
return td::Status::Error(ErrorCode::protoviolation,
"member certificate rejected, because we know of newer certificate at the same slot");
} else if (el.node == node) {
// we could return OK here, but we must make sure, that the unchecked signature will not be used for updating PeerNode.
}
}
}
auto R = get_encryptor(issued_by);
if (R.is_error()) {
return R.move_as_error_prefix("failed to check member certificate: failed to create encryptor: ");
}
auto enc = R.move_as_ok();
auto S = enc->check_signature(cert.to_sign_data(node).as_slice(), cert.signature());
if (S.is_error()) {
return S.move_as_error_prefix("failed to check member certificate: bad signature: ");
}
if (it->second.size() <= (size_t)cert.slot()) {
it->second.resize((size_t)cert.slot() + 1);
}
it->second[cert.slot()].expire_at = cert.expire_at();
it->second[cert.slot()].node = node;
return td::Status::OK();
}
void OverlayImpl::add_peer_in_cont(OverlayNode node) {
CHECK(public_);
do_add_peer(std::move(node));
td::Status OverlayImpl::validate_peer_certificate(const adnl::AdnlNodeIdShort &node,
ton_api::overlay_MemberCertificate *cert) {
OverlayMemberCertificate ncert(cert);
return validate_peer_certificate(node, ncert);
}
void OverlayImpl::add_peer_in(OverlayNode node) {
CHECK(public_);
td::Status OverlayImpl::validate_peer_certificate(const adnl::AdnlNodeIdShort &node,
const OverlayMemberCertificate *cert) {
if (!cert) {
if (is_persistent_node(node) || overlay_type_ == OverlayType::Public) {
return td::Status::OK();
}
return td::Status::Error(ErrorCode::protoviolation, "no member certificate found");
}
return validate_peer_certificate(node, *cert);
}
void OverlayImpl::add_peer(OverlayNode node) {
CHECK(overlay_type_ != OverlayType::FixedMemberList);
if (node.overlay_id() != overlay_id_) {
VLOG(OVERLAY_WARNING) << this << ": received node with bad overlay";
return;
}
auto t = td::Clocks::system();
if (node.version() + 600 < t || node.version() > t + 60) {
if (node.version() + Overlays::overlay_peer_ttl() < t || node.version() > t + 60) {
VLOG(OVERLAY_INFO) << this << ": ignoring node of too old version " << node.version();
return;
}
@ -115,35 +191,80 @@ void OverlayImpl::add_peer_in(OverlayNode node) {
return;
}
add_peer_in_cont(std::move(node));
if (overlay_type_ == OverlayType::CertificatedMembers) {
auto R = validate_peer_certificate(node.adnl_id_short(), *node.certificate());
if (R.is_error()) {
VLOG(OVERLAY_WARNING) << this << ": bad peer certificate node=" << node.adnl_id_short() << ": "
<< R.move_as_error();
UNREACHABLE();
return;
}
}
auto id = node.adnl_id_short();
auto V = peer_list_.peers_.get(id);
if (V) {
VLOG(OVERLAY_DEBUG) << this << ": updating peer " << id << " up to version " << node.version();
V->update(std::move(node));
} else {
VLOG(OVERLAY_DEBUG) << this << ": adding peer " << id << " of version " << node.version();
CHECK(overlay_type_ != OverlayType::CertificatedMembers || (node.certificate() && !node.certificate()->empty()));
peer_list_.peers_.insert(id, OverlayPeer(std::move(node)));
del_some_peers();
auto X = peer_list_.peers_.get(id);
CHECK(X);
if (peer_list_.neighbours_.size() < max_neighbours() &&
!(X->get_node()->flags() & OverlayMemberFlags::DoNotReceiveBroadcasts) && X->get_id() != local_id_) {
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
}
update_neighbours(0);
}
}
void OverlayImpl::add_peers(std::vector<OverlayNode> peers) {
for (auto &node : peers) {
add_peer_in(std::move(node));
add_peer(std::move(node));
}
}
void OverlayImpl::add_peer(OverlayNode P) {
add_peer_in(std::move(P));
void OverlayImpl::add_peers(const tl_object_ptr<ton_api::overlay_nodes> &nodes) {
for (auto &n : nodes->nodes_) {
auto N = OverlayNode::create(n);
if (N.is_ok()) {
add_peer(N.move_as_ok());
}
}
}
void OverlayImpl::add_peers(const tl_object_ptr<ton_api::overlay_nodesV2> &nodes) {
for (auto &n : nodes->nodes_) {
auto N = OverlayNode::create(n);
if (N.is_ok()) {
add_peer(N.move_as_ok());
}
}
}
void OverlayImpl::on_ping_result(adnl::AdnlNodeIdShort peer, bool success) {
if (!public_) {
if (overlay_type_ == OverlayType::FixedMemberList) {
return;
}
if (OverlayPeer *p = peers_.get(peer)) {
if (OverlayPeer *p = peer_list_.peers_.get(peer)) {
p->on_ping_result(success);
if (p->is_alive()) {
bad_peers_.erase(peer);
peer_list_.bad_peers_.erase(peer);
} else {
bad_peers_.insert(peer);
peer_list_.bad_peers_.insert(peer);
}
}
}
void OverlayImpl::receive_random_peers(adnl::AdnlNodeIdShort src, td::Result<td::BufferSlice> R) {
CHECK(public_);
CHECK(overlay_type_ != OverlayType::FixedMemberList);
on_ping_result(src, R.is_ok());
if (R.is_error()) {
VLOG(OVERLAY_NOTICE) << this << ": failed getRandomPeers query: " << R.move_as_error();
@ -156,16 +277,24 @@ void OverlayImpl::receive_random_peers(adnl::AdnlNodeIdShort src, td::Result<td:
return;
}
auto res = R2.move_as_ok();
add_peers(R2.move_as_ok());
}
std::vector<OverlayNode> nodes;
for (auto &n : res->nodes_) {
auto N = OverlayNode::create(n);
if (N.is_ok()) {
nodes.emplace_back(N.move_as_ok());
}
void OverlayImpl::receive_random_peers_v2(adnl::AdnlNodeIdShort src, td::Result<td::BufferSlice> R) {
CHECK(overlay_type_ != OverlayType::FixedMemberList);
on_ping_result(src, R.is_ok());
if (R.is_error()) {
VLOG(OVERLAY_NOTICE) << this << ": failed getRandomPeersV2 query: " << R.move_as_error();
return;
}
add_peers(std::move(nodes));
auto R2 = fetch_tl_object<ton_api::overlay_nodesV2>(R.move_as_ok(), true);
if (R2.is_error()) {
VLOG(OVERLAY_WARNING) << this << ": dropping incorrect answer to overlay.getRandomPeers query from " << src << ": "
<< R2.move_as_error();
return;
}
add_peers(R2.move_as_ok());
}
void OverlayImpl::send_random_peers_cont(adnl::AdnlNodeIdShort src, OverlayNode node,
@ -175,10 +304,13 @@ void OverlayImpl::send_random_peers_cont(adnl::AdnlNodeIdShort src, OverlayNode
vec.emplace_back(node.tl());
}
for (td::uint32 i = 0; i < nodes_to_send(); i++) {
td::uint32 max_iterations = nodes_to_send() + 16;
for (td::uint32 i = 0; i < max_iterations && vec.size() < nodes_to_send(); i++) {
auto P = get_random_peer(true);
if (P) {
vec.emplace_back(P->get().tl());
if (P->has_full_id()) {
vec.emplace_back(P->get_node()->tl());
}
} else {
break;
}
@ -213,58 +345,110 @@ void OverlayImpl::send_random_peers(adnl::AdnlNodeIdShort src, td::Promise<td::B
get_self_node(std::move(P));
}
void OverlayImpl::send_random_peers_v2_cont(adnl::AdnlNodeIdShort src, OverlayNode node,
td::Promise<td::BufferSlice> promise) {
std::vector<tl_object_ptr<ton_api::overlay_nodeV2>> vec;
if (announce_self_) {
CHECK(is_persistent_node(node.adnl_id_short()) || !node.certificate()->empty());
vec.emplace_back(node.tl_v2());
}
td::uint32 max_iterations = nodes_to_send() + 16;
for (td::uint32 i = 0; i < max_iterations && vec.size() < nodes_to_send(); i++) {
auto P = get_random_peer(true);
if (P) {
if (P->has_full_id() && !P->is_permanent_member()) {
vec.emplace_back(P->get_node()->tl_v2());
}
} else {
break;
}
}
if (promise) {
auto Q = create_tl_object<ton_api::overlay_nodesV2>(std::move(vec));
promise.set_value(serialize_tl_object(Q, true));
} else {
auto P =
td::PromiseCreator::lambda([SelfId = actor_id(this), src, oid = print_id()](td::Result<td::BufferSlice> res) {
td::actor::send_closure(SelfId, &OverlayImpl::receive_random_peers_v2, src, std::move(res));
});
auto Q =
create_tl_object<ton_api::overlay_getRandomPeersV2>(create_tl_object<ton_api::overlay_nodesV2>(std::move(vec)));
td::actor::send_closure(manager_, &OverlayManager::send_query, src, local_id_, overlay_id_,
"overlay getRandomPeers", std::move(P),
td::Timestamp::in(5.0 + td::Random::fast(0, 50) * 0.1), serialize_tl_object(Q, true));
}
}
void OverlayImpl::send_random_peers_v2(adnl::AdnlNodeIdShort src, td::Promise<td::BufferSlice> promise) {
auto P = td::PromiseCreator::lambda([src, promise = std::move(promise),
SelfId = actor_id(this)](td::Result<OverlayNode> res) mutable {
if (res.is_error()) {
promise.set_error(td::Status::Error(ErrorCode::error, "cannot get self node"));
return;
}
td::actor::send_closure(SelfId, &OverlayImpl::send_random_peers_v2_cont, src, res.move_as_ok(), std::move(promise));
});
get_self_node(std::move(P));
}
void OverlayImpl::update_neighbours(td::uint32 nodes_to_change) {
if (peers_.size() == 0) {
if (peer_list_.peers_.size() == 0) {
return;
}
td::uint32 iter = 0;
while (iter < 10 && (nodes_to_change > 0 || neighbours_.size() < max_neighbours())) {
auto X = peers_.get_random();
while (iter++ < 10 && (nodes_to_change > 0 || peer_list_.neighbours_.size() < max_neighbours())) {
auto X = peer_list_.peers_.get_random();
if (!X) {
break;
}
if (X->get_id() == local_id_) {
iter++;
continue;
}
if (public_ && X->get_version() <= td::Clocks::system() - 600) {
if (X->is_neighbour()) {
bool found = false;
for (auto &n : neighbours_) {
if (n == X->get_id()) {
n = *neighbours_.rbegin();
found = true;
break;
}
}
CHECK(found);
neighbours_.pop_back();
X->set_neighbour(false);
if (X->get_version() <= td::Clocks::system() - Overlays::overlay_peer_ttl()) {
if (X->is_permanent_member()) {
del_from_neighbour_list(X);
} else {
auto id = X->get_id();
del_peer(id);
}
continue;
}
if (overlay_type_ == OverlayType::CertificatedMembers && !X->is_permanent_member() &&
X->certificate()->is_expired()) {
auto id = X->get_id();
del_peer(id);
continue;
}
if (X->get_node()->flags() & OverlayMemberFlags::DoNotReceiveBroadcasts) {
if (X->is_neighbour()) {
del_from_neighbour_list(X);
}
bad_peers_.erase(X->get_id());
peers_.remove(X->get_id());
continue;
}
if (X->is_neighbour()) {
iter++;
continue;
}
if (neighbours_.size() < max_neighbours()) {
if (peer_list_.neighbours_.size() < max_neighbours()) {
VLOG(OVERLAY_INFO) << this << ": adding new neighbour " << X->get_id();
neighbours_.push_back(X->get_id());
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
} else {
CHECK(nodes_to_change > 0);
auto i = td::Random::fast(0, static_cast<td::uint32>(neighbours_.size()) - 1);
auto Y = peers_.get(neighbours_[i]);
auto i = td::Random::fast(0, static_cast<td::uint32>(peer_list_.neighbours_.size()) - 1);
auto Y = peer_list_.peers_.get(peer_list_.neighbours_[i]);
CHECK(Y != nullptr);
CHECK(Y->is_neighbour());
Y->set_neighbour(false);
neighbours_[i] = X->get_id();
peer_list_.neighbours_[i] = X->get_id();
X->set_neighbour(true);
nodes_to_change--;
VLOG(OVERLAY_INFO) << this << ": changing neighbour " << Y->get_id() << " -> " << X->get_id();
@ -274,9 +458,11 @@ void OverlayImpl::update_neighbours(td::uint32 nodes_to_change) {
OverlayPeer *OverlayImpl::get_random_peer(bool only_alive) {
size_t skip_bad = 3;
while (peers_.size() > (only_alive ? bad_peers_.size() : 0)) {
auto P = peers_.get_random();
if (public_ && P->get_version() + 3600 < td::Clocks::system()) {
OverlayPeer *res = nullptr;
while (!res && peer_list_.peers_.size() > (only_alive ? peer_list_.bad_peers_.size() : 0)) {
auto P = peer_list_.peers_.get_random();
if (!P->is_permanent_member() &&
(P->get_version() + 3600 < td::Clocks::system() || P->certificate()->is_expired())) {
VLOG(OVERLAY_INFO) << this << ": deleting outdated peer " << P->get_id();
del_peer(P->get_id());
continue;
@ -290,18 +476,19 @@ OverlayPeer *OverlayImpl::get_random_peer(bool only_alive) {
continue;
}
}
return P;
res = P;
}
return nullptr;
update_neighbours(0);
return res;
}
void OverlayImpl::get_overlay_random_peers(td::uint32 max_peers,
td::Promise<std::vector<adnl::AdnlNodeIdShort>> promise) {
std::vector<adnl::AdnlNodeIdShort> v;
auto t = td::Clocks::system();
while (v.size() < max_peers && v.size() < peers_.size() - bad_peers_.size()) {
auto P = peers_.get_random();
if (public_ && P->get_version() + 3600 < t) {
while (v.size() < max_peers && v.size() < peer_list_.peers_.size() - peer_list_.bad_peers_.size()) {
auto P = peer_list_.peers_.get_random();
if (!P->is_permanent_member() && (P->get_version() + 3600 < t || P->certificate()->is_expired(t))) {
VLOG(OVERLAY_INFO) << this << ": deleting outdated peer " << P->get_id();
del_peer(P->get_id());
} else if (P->is_alive()) {
@ -317,22 +504,227 @@ void OverlayImpl::get_overlay_random_peers(td::uint32 max_peers,
}
}
}
update_neighbours(0);
promise.set_result(std::move(v));
}
void OverlayImpl::receive_nodes_from_db(tl_object_ptr<ton_api::overlay_nodes> tl_nodes) {
if (public_) {
std::vector<OverlayNode> nodes;
for (auto &n : tl_nodes->nodes_) {
auto N = OverlayNode::create(n);
if (N.is_ok()) {
nodes.emplace_back(N.move_as_ok());
if (overlay_type_ != OverlayType::FixedMemberList) {
add_peers(tl_nodes);
}
}
void OverlayImpl::receive_nodes_from_db_v2(tl_object_ptr<ton_api::overlay_nodesV2> tl_nodes) {
if (overlay_type_ != OverlayType::FixedMemberList) {
add_peers(tl_nodes);
}
}
bool OverlayImpl::is_persistent_node(const adnl::AdnlNodeIdShort &id) {
auto P = peer_list_.peers_.get(id);
if (!P) {
return false;
}
return P->is_permanent_member();
}
bool OverlayImpl::is_valid_peer(const adnl::AdnlNodeIdShort &src,
const ton_api::overlay_MemberCertificate *certificate) {
if (overlay_type_ == OverlayType::Public) {
on_ping_result(src, true);
return true;
} else if (overlay_type_ == OverlayType::FixedMemberList) {
return peer_list_.peers_.get(src);
} else {
OverlayMemberCertificate cert(certificate);
if (cert.empty()) {
auto P = peer_list_.peers_.get(src);
if (P && !P->is_permanent_member()) {
auto C = P->certificate();
if (C) {
cert = *C;
}
}
}
add_peers(std::move(nodes));
auto S = validate_peer_certificate(src, cert);
if (S.is_error()) {
VLOG(OVERLAY_WARNING) << "adnl=" << src << ": certificate is invalid: " << S;
return false;
}
auto P = peer_list_.peers_.get(src);
if (P) {
CHECK(P->is_permanent_member() || !cert.empty());
P->update_certificate(std::move(cert));
}
return true;
}
}
void OverlayImpl::iterate_all_peers(std::function<void(const adnl::AdnlNodeIdShort &key, OverlayPeer &peer)> cb) {
peer_list_.peers_.iterate([&](const adnl::AdnlNodeIdShort &key, OverlayPeer &peer) { cb(key, peer); });
}
void OverlayImpl::update_peer_err_ctr(adnl::AdnlNodeIdShort peer_id, bool is_fec) {
auto src_peer = peer_list_.peers_.get(peer_id);
if (src_peer) {
if (is_fec) {
src_peer->fec_broadcast_errors++;
} else {
src_peer->broadcast_errors++;
}
}
}
void OverlayImpl::update_throughput_out_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) {
auto out_peer = peer_list_.peers_.get(peer_id);
if (out_peer) {
out_peer->throughput_out_bytes_ctr += msg_size;
out_peer->throughput_out_packets_ctr++;
if (is_query) {
out_peer->last_out_query_at = td::Timestamp::now();
}
}
}
void OverlayImpl::update_throughput_in_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) {
auto in_peer = peer_list_.peers_.get(peer_id);
if (in_peer) {
in_peer->throughput_in_bytes_ctr += msg_size;
in_peer->throughput_in_packets_ctr++;
if (is_query) {
in_peer->last_in_query_at = td::Timestamp::now();
}
}
}
void OverlayImpl::update_peer_ip_str(adnl::AdnlNodeIdShort peer_id, td::string ip_str) {
auto fpeer = peer_list_.peers_.get(peer_id);
if (fpeer) {
fpeer->ip_addr_str = ip_str;
}
}
bool OverlayImpl::has_good_peers() const {
return peer_list_.peers_.size() > peer_list_.bad_peers_.size();
}
bool OverlayImpl::is_root_public_key(const PublicKeyHash &key) const {
return peer_list_.root_public_keys_.count(key) > 0;
}
std::vector<adnl::AdnlNodeIdShort> OverlayImpl::get_neighbours(td::uint32 max_size) const {
if (max_size == 0 || max_size >= peer_list_.neighbours_.size()) {
return peer_list_.neighbours_;
} else {
std::vector<adnl::AdnlNodeIdShort> vec;
std::vector<td::uint32> ul;
for (td::uint32 i = 0; i < max_size; i++) {
td::uint32 t = td::Random::fast(0, static_cast<td::int32>(peer_list_.neighbours_.size()) - 1 - i);
td::uint32 j;
for (j = 0; j < i && ul[j] <= t; j++) {
t++;
}
ul.emplace(ul.begin() + j, t);
vec.push_back(peer_list_.neighbours_[t]);
}
return vec;
}
}
void OverlayImpl::send_message_to_neighbours(td::BufferSlice data) {
wait_neighbours_not_empty([this, data = std::move(data)](td::Result<td::Unit> R) {
if (R.is_error()) {
return;
}
for (auto &n : peer_list_.neighbours_) {
td::actor::send_closure(manager_, &OverlayManager::send_message, n, local_id_, overlay_id_, data.clone());
}
});
}
size_t OverlayImpl::neighbours_cnt() const {
return peer_list_.neighbours_.size();
}
void OverlayImpl::update_root_member_list(std::vector<adnl::AdnlNodeIdShort> ids,
std::vector<PublicKeyHash> root_public_keys, OverlayMemberCertificate cert) {
td::uint32 expectd_size =
(td::uint32)(ids.size() + root_public_keys.size() * opts_.max_slaves_in_semiprivate_overlay_);
if (expectd_size > opts_.max_peers_) {
opts_.max_peers_ = expectd_size;
}
if (expectd_size > opts_.max_neighbours_) {
opts_.max_neighbours_ = expectd_size;
}
std::sort(ids.begin(), ids.end());
auto old_root_public_keys = std::move(peer_list_.root_public_keys_);
for (const auto &pub_key : root_public_keys) {
auto it = old_root_public_keys.find(pub_key);
if (it != old_root_public_keys.end()) {
peer_list_.root_public_keys_.emplace(it->first, std::move(it->second));
} else {
peer_list_.root_public_keys_.emplace(pub_key, PeerList::SlaveKeys{});
}
}
std::vector<adnl::AdnlNodeIdShort> to_del;
peer_list_.peers_.iterate([&](const adnl::AdnlNodeIdShort &key, OverlayPeer &peer) {
peer.set_permanent(std::binary_search(ids.begin(), ids.end(), key));
if (peer.is_permanent_member()) {
peer.clear_certificate();
} else {
auto S = validate_peer_certificate(peer.get_id(), peer.certificate());
if (S.is_error()) {
to_del.push_back(peer.get_id());
}
}
});
for (const auto &id : to_del) {
del_peer(id);
}
for (const auto &id : ids) {
if (!peer_list_.peers_.exists(id)) {
OverlayNode node(id, overlay_id_, opts_.default_permanent_members_flags_);
OverlayPeer peer(std::move(node));
peer.set_permanent(true);
CHECK(peer.is_permanent_member());
peer_list_.peers_.insert(std::move(id), std::move(peer));
}
}
update_member_certificate(std::move(cert));
update_neighbours(0);
}
void OverlayImpl::update_member_certificate(OverlayMemberCertificate cert) {
peer_list_.cert_ = std::move(cert);
if (is_persistent_node(local_id_)) {
peer_list_.local_cert_is_valid_until_ = td::Timestamp::in(86400.0 * 365 * 100); /* 100 years */
} else {
auto R = validate_peer_certificate(local_id_, &peer_list_.cert_);
if (R.is_ok()) {
peer_list_.local_cert_is_valid_until_ = td::Timestamp::at_unix(cert.expire_at());
} else {
peer_list_.local_cert_is_valid_until_ = td::Timestamp::never();
}
}
}
bool OverlayImpl::has_valid_membership_certificate() {
if (overlay_type_ != OverlayType::CertificatedMembers) {
return true;
}
if (!peer_list_.local_cert_is_valid_until_) {
return false;
}
return !peer_list_.local_cert_is_valid_until_.is_in_past();
}
} // namespace overlay
} // namespace ton

View file

@ -18,6 +18,7 @@
*/
#include "auto/tl/ton_api.h"
#include "td/utils/Random.h"
#include "common/delay.h"
#include "adnl/utils.hpp"
#include "dht/dht.h"
@ -26,41 +27,61 @@
#include "auto/tl/ton_api.hpp"
#include "keys/encryptor.h"
#include "td/utils/Status.h"
#include "td/utils/StringBuilder.h"
#include "td/utils/port/signals.h"
#include <limits>
namespace ton {
namespace overlay {
td::actor::ActorOwn<Overlay> Overlay::create(td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager,
td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules, td::string scope, OverlayOptions opts) {
auto R = td::actor::create_actor<OverlayImpl>("overlay", keyring, adnl, manager, dht_node, local_id,
std::move(overlay_id), true, std::vector<adnl::AdnlNodeIdShort>(),
std::move(callback), std::move(rules), scope, opts);
const OverlayMemberCertificate OverlayNode::empty_certificate_{};
td::actor::ActorOwn<Overlay> Overlay::create_public(td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager,
td::actor::ActorId<dht::Dht> dht_node,
adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules, td::string scope, OverlayOptions opts) {
auto R = td::actor::create_actor<OverlayImpl>(
"overlay", keyring, adnl, manager, dht_node, local_id, std::move(overlay_id), OverlayType::Public,
std::vector<adnl::AdnlNodeIdShort>(), std::vector<PublicKeyHash>(), OverlayMemberCertificate{}, std::move(callback),
std::move(rules), std::move(scope), std::move(opts));
return td::actor::ActorOwn<Overlay>(std::move(R));
}
td::actor::ActorOwn<Overlay> Overlay::create(td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager,
td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::vector<adnl::AdnlNodeIdShort> nodes,
std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules,
std::string scope) {
td::actor::ActorOwn<Overlay> Overlay::create_private(
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules, std::string scope, OverlayOptions opts) {
auto R = td::actor::create_actor<OverlayImpl>("overlay", keyring, adnl, manager, dht_node, local_id,
std::move(overlay_id), false, std::move(nodes), std::move(callback),
std::move(rules), std::move(scope));
std::move(overlay_id), OverlayType::FixedMemberList, std::move(nodes),
std::vector<PublicKeyHash>(), OverlayMemberCertificate{},
std::move(callback), std::move(rules), std::move(scope));
return td::actor::ActorOwn<Overlay>(std::move(R));
}
td::actor::ActorOwn<Overlay> Overlay::create_semiprivate(
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::vector<adnl::AdnlNodeIdShort> nodes, std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate cert, std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules,
std::string scope, OverlayOptions opts) {
auto R = td::actor::create_actor<OverlayImpl>(
"overlay", keyring, adnl, manager, dht_node, local_id, std::move(overlay_id), OverlayType::CertificatedMembers,
std::move(nodes), std::move(root_public_keys), std::move(cert), std::move(callback), std::move(rules),
std::move(scope), std::move(opts));
return td::actor::ActorOwn<Overlay>(std::move(R));
}
OverlayImpl::OverlayImpl(td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node,
adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, bool pub,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Overlays::Callback> callback,
adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, OverlayType overlay_type,
std::vector<adnl::AdnlNodeIdShort> nodes, std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate cert, std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules, td::string scope, OverlayOptions opts)
: keyring_(keyring)
, adnl_(adnl)
@ -69,37 +90,28 @@ OverlayImpl::OverlayImpl(td::actor::ActorId<keyring::Keyring> keyring, td::actor
, local_id_(local_id)
, id_full_(std::move(overlay_id))
, callback_(std::move(callback))
, public_(pub)
, overlay_type_(overlay_type)
, rules_(std::move(rules))
, scope_(scope)
, announce_self_(opts.announce_self_)
, frequent_dht_lookup_(opts.frequent_dht_lookup_) {
, opts_(std::move(opts)) {
overlay_id_ = id_full_.compute_short_id();
frequent_dht_lookup_ = opts_.frequent_dht_lookup_;
peer_list_.local_member_flags_ = opts_.local_overlay_member_flags_;
VLOG(OVERLAY_INFO) << this << ": creating " << (public_ ? "public" : "private");
VLOG(OVERLAY_INFO) << this << ": creating";
for (auto &node : nodes) {
CHECK(!public_);
auto X = OverlayNode{node, overlay_id_};
do_add_peer(std::move(X));
}
update_root_member_list(std::move(nodes), std::move(root_public_keys), std::move(cert));
update_neighbours(static_cast<td::uint32>(nodes.size()));
}
void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getRandomPeers &query,
td::Promise<td::BufferSlice> promise) {
if (public_) {
if (overlay_type_ != OverlayType::FixedMemberList) {
VLOG(OVERLAY_DEBUG) << this << ": received " << query.peers_->nodes_.size() << " nodes from " << src
<< " in getRandomPeers query";
std::vector<OverlayNode> nodes;
for (auto &n : query.peers_->nodes_) {
auto N = OverlayNode::create(n);
if (N.is_ok()) {
nodes.emplace_back(N.move_as_ok());
}
}
add_peers(std::move(nodes));
add_peers(query.peers_);
send_random_peers(src, std::move(promise));
} else {
VLOG(OVERLAY_WARNING) << this << ": DROPPING getRandomPeers query from " << src << " in private overlay";
@ -107,6 +119,19 @@ void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getR
}
}
void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getRandomPeersV2 &query,
td::Promise<td::BufferSlice> promise) {
if (overlay_type_ != OverlayType::FixedMemberList) {
VLOG(OVERLAY_DEBUG) << this << ": received " << query.peers_->nodes_.size() << " nodes from " << src
<< " in getRandomPeers query";
add_peers(query.peers_);
send_random_peers_v2(src, std::move(promise));
} else {
VLOG(OVERLAY_WARNING) << this << ": DROPPING getRandomPeers query from " << src << " in private overlay";
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "overlay is private"));
}
}
void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getBroadcast &query,
td::Promise<td::BufferSlice> promise) {
auto it = broadcasts_.find(query.hash_);
@ -139,17 +164,14 @@ void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getB
}
*/
void OverlayImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
if (!public_) {
auto P = peers_.get(src);
if (P == nullptr) {
VLOG(OVERLAY_WARNING) << this << ": received query in private overlay from unknown source " << src;
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "overlay is private"));
return;
}
} else {
on_ping_result(src, true);
void OverlayImpl::receive_query(adnl::AdnlNodeIdShort src, tl_object_ptr<ton_api::overlay_messageExtra> extra,
td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
if (!is_valid_peer(src, extra ? extra->certificate_.get() : nullptr)) {
VLOG(OVERLAY_WARNING) << this << ": received query in private overlay from unknown source " << src;
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "overlay is not public"));
return;
}
auto R = fetch_tl_object<ton_api::Function>(data.clone(), true);
if (R.is_error()) {
@ -167,16 +189,25 @@ void OverlayImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data,
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_broadcast> bcast) {
if (peer_list_.local_member_flags_ & OverlayMemberFlags::DoNotReceiveBroadcasts) {
return td::Status::OK();
}
return BroadcastSimple::create(this, message_from, std::move(bcast));
}
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_broadcastFec> b) {
if (peer_list_.local_member_flags_ & OverlayMemberFlags::DoNotReceiveBroadcasts) {
return td::Status::OK();
}
return OverlayFecBroadcastPart::create(this, message_from, std::move(b));
}
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_broadcastFecShort> b) {
if (peer_list_.local_member_flags_ & OverlayMemberFlags::DoNotReceiveBroadcasts) {
return td::Status::OK();
}
return OverlayFecBroadcastPart::create(this, message_from, std::move(b));
}
@ -188,6 +219,7 @@ td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_fec_received> msg) {
return td::Status::OK(); // disable this logic for now
auto it = fec_broadcasts_.find(msg->hash_);
if (it != fec_broadcasts_.end()) {
VLOG(OVERLAY_DEBUG) << this << ": received fec opt-out message from " << message_from << " for broadcast "
@ -202,6 +234,7 @@ td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_fec_completed> msg) {
return td::Status::OK(); // disable this logic for now
auto it = fec_broadcasts_.find(msg->hash_);
if (it != fec_broadcasts_.end()) {
VLOG(OVERLAY_DEBUG) << this << ": received fec completed message from " << message_from << " for broadcast "
@ -221,15 +254,13 @@ td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
return td::Status::OK();
}
void OverlayImpl::receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data) {
if (!public_) {
if (peers_.get(src) == nullptr) {
VLOG(OVERLAY_WARNING) << this << ": received query in private overlay from unknown source " << src;
return;
}
} else {
on_ping_result(src, true);
void OverlayImpl::receive_message(adnl::AdnlNodeIdShort src, tl_object_ptr<ton_api::overlay_messageExtra> extra,
td::BufferSlice data) {
if (!is_valid_peer(src, extra ? extra->certificate_.get() : nullptr)) {
VLOG(OVERLAY_WARNING) << this << ": received message in private overlay from unknown source " << src;
return;
}
auto X = fetch_tl_object<ton_api::overlay_Broadcast>(data.clone(), true);
if (X.is_error()) {
VLOG(OVERLAY_DEBUG) << this << ": received custom message";
@ -244,44 +275,51 @@ void OverlayImpl::receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice dat
void OverlayImpl::alarm() {
bcast_gc();
if(update_throughput_at_.is_in_past()) {
if (update_throughput_at_.is_in_past()) {
double t_elapsed = td::Time::now() - last_throughput_update_.at();
auto SelfId = actor_id(this);
peers_.iterate([&](const adnl::AdnlNodeIdShort &key, OverlayPeer &peer) {
iterate_all_peers([&](const adnl::AdnlNodeIdShort &key, OverlayPeer &peer) {
peer.throughput_out_bytes = static_cast<td::uint32>(peer.throughput_out_bytes_ctr / t_elapsed);
peer.throughput_in_bytes = static_cast<td::uint32>(peer.throughput_in_bytes_ctr / t_elapsed);
peer.throughput_out_packets = static_cast<td::uint32>(peer.throughput_out_packets_ctr / t_elapsed);
peer.throughput_in_packets = static_cast<td::uint32>(peer.throughput_in_packets_ctr / t_elapsed);
peer.throughput_out_bytes_ctr = 0;
peer.throughput_in_bytes_ctr = 0;
peer.throughput_out_packets_ctr = 0;
peer.throughput_in_packets_ctr = 0;
auto P = td::PromiseCreator::lambda([SelfId, peer_id = key](td::Result<td::string> result) {
result.ensure();
td::actor::send_closure(SelfId, &Overlay::update_peer_ip_str, peer_id, result.move_as_ok());
});
td::actor::send_closure(adnl_, &adnl::AdnlSenderInterface::get_conn_ip_str, local_id_, key, std::move(P));
});
update_throughput_at_ = td::Timestamp::in(50.0);
last_throughput_update_ = td::Timestamp::now();
}
if (public_) {
if (peers_.size() > 0) {
if (overlay_type_ != OverlayType::FixedMemberList) {
if (has_valid_membership_certificate()) {
auto P = get_random_peer();
if (P) {
send_random_peers(P->get_id(), {});
if (overlay_type_ == OverlayType::Public) {
send_random_peers(P->get_id(), {});
} else {
send_random_peers_v2(P->get_id(), {});
}
}
} else {
VLOG(OVERLAY_WARNING) << "meber certificate ist invalid, valid_until="
<< peer_list_.local_cert_is_valid_until_.at_unix();
}
if (next_dht_query_ && next_dht_query_.is_in_past()) {
if (next_dht_query_ && next_dht_query_.is_in_past() && overlay_type_ == OverlayType::Public) {
next_dht_query_ = td::Timestamp::never();
std::function<void(dht::DhtValue)> callback = [SelfId = actor_id(this)](dht::DhtValue value) {
td::actor::send_closure(SelfId, &OverlayImpl::receive_dht_nodes, std::move(value));
@ -292,21 +330,22 @@ void OverlayImpl::alarm() {
td::actor::send_closure(dht_node_, &dht::Dht::get_value_many, dht::DhtKey{overlay_id_.pubkey_hash(), "nodes", 0},
std::move(callback), std::move(on_finish));
}
if (update_db_at_.is_in_past()) {
if (peers_.size() > 0) {
std::vector<OverlayNode> vec;
for (td::uint32 i = 0; i < 20; i++) {
auto P = get_random_peer();
if (!P) {
break;
}
vec.push_back(P->get());
if (update_db_at_.is_in_past() && overlay_type_ == OverlayType::Public) {
std::vector<OverlayNode> vec;
for (td::uint32 i = 0; i < 20; i++) {
auto P = get_random_peer();
if (!P) {
break;
}
vec.push_back(P->get_node()->clone());
}
if (vec.size() > 0) {
td::actor::send_closure(manager_, &OverlayManager::save_to_db, local_id_, overlay_id_, std::move(vec));
}
update_db_at_ = td::Timestamp::in(60.0);
}
update_neighbours(0);
alarm_timestamp() = td::Timestamp::in(1.0);
} else {
update_neighbours(0);
@ -315,7 +354,7 @@ void OverlayImpl::alarm() {
}
void OverlayImpl::receive_dht_nodes(dht::DhtValue v) {
CHECK(public_);
CHECK(overlay_type_ == OverlayType::Public);
auto R = fetch_tl_object<ton_api::overlay_nodes>(v.value().clone(), true);
if (R.is_ok()) {
auto r = R.move_as_ok();
@ -361,7 +400,7 @@ void OverlayImpl::dht_lookup_finished(td::Status S) {
}
void OverlayImpl::update_dht_nodes(OverlayNode node) {
if (!public_) {
if (overlay_type_ != OverlayType::Public) {
return;
}
@ -418,21 +457,57 @@ void OverlayImpl::bcast_gc() {
CHECK(delivered_broadcasts_.size() == bcast_lru_.size());
}
void OverlayImpl::send_message_to_neighbours(td::BufferSlice data) {
for (auto &n : neighbours_) {
td::actor::send_closure(manager_, &OverlayManager::send_message, n, local_id_, overlay_id_, data.clone());
void OverlayImpl::wait_neighbours_not_empty(td::Promise<td::Unit> promise, int max_retries) {
if (!peer_list_.neighbours_.empty()) {
promise.set_result(td::Unit());
} else if (max_retries > 0) {
delay_action(
[SelfId = actor_id(this), promise = std::move(promise), max_retries]() mutable {
td::actor::send_closure(SelfId, &OverlayImpl::wait_neighbours_not_empty, std::move(promise), max_retries - 1);
},
td::Timestamp::in(0.5));
} else {
promise.set_error(td::Status::Error(ErrorCode::timeout));
}
}
void OverlayImpl::send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
auto S = BroadcastSimple::create_new(actor_id(this), keyring_, send_as, std::move(data), flags);
if (S.is_error()) {
LOG(WARNING) << "failed to send broadcast: " << S;
if (!has_valid_membership_certificate()) {
VLOG(OVERLAY_WARNING) << "member certificate is invalid, valid_until="
<< peer_list_.local_cert_is_valid_until_.at_unix();
return;
}
if (!has_valid_broadcast_certificate(send_as, data.size(), false)) {
VLOG(OVERLAY_WARNING) << "broadcast source certificate is invalid";
return;
}
wait_neighbours_not_empty([this, send_as, flags, data = std::move(data)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
return;
}
auto S = BroadcastSimple::create_new(actor_id(this), keyring_, send_as, std::move(data), flags);
if (S.is_error()) {
LOG(WARNING) << "failed to send broadcast: " << S;
}
});
}
void OverlayImpl::send_broadcast_fec(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
OverlayOutboundFecBroadcast::create(std::move(data), flags, actor_id(this), send_as);
if (!has_valid_membership_certificate()) {
VLOG(OVERLAY_WARNING) << "meber certificate ist invalid, valid_until="
<< peer_list_.local_cert_is_valid_until_.at_unix();
return;
}
if (!has_valid_broadcast_certificate(send_as, data.size(), true)) {
VLOG(OVERLAY_WARNING) << "broadcast source certificate is invalid";
return;
}
wait_neighbours_not_empty([this, send_as, flags, data = std::move(data)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
return;
}
OverlayOutboundFecBroadcast::create(std::move(data), flags, actor_id(this), send_as);
});
}
void OverlayImpl::print(td::StringBuilder &sb) {
@ -450,6 +525,22 @@ td::Status OverlayImpl::check_date(td::uint32 date) {
return td::Status::OK();
}
BroadcastCheckResult OverlayImpl::check_source_eligible(const PublicKeyHash &source, const Certificate *cert,
td::uint32 size, bool is_fec) {
if (size == 0) {
return BroadcastCheckResult::Forbidden;
}
auto r = rules_.check_rules(source, size, is_fec);
if (!cert || r == BroadcastCheckResult::Allowed) {
return r;
}
auto r2 = cert->check(source, overlay_id_, static_cast<td::int32>(td::Clocks::system()), size, is_fec);
r2 = broadcast_check_result_min(r2, rules_.check_rules(cert->issuer_hash(), size, is_fec));
return broadcast_check_result_max(r, r2);
}
BroadcastCheckResult OverlayImpl::check_source_eligible(PublicKey source, const Certificate *cert, td::uint32 size,
bool is_fec) {
if (size == 0) {
@ -457,7 +548,7 @@ BroadcastCheckResult OverlayImpl::check_source_eligible(PublicKey source, const
}
auto short_id = source.compute_short_id();
auto r = rules_.check_rules(source.compute_short_id(), size, is_fec);
auto r = rules_.check_rules(short_id, size, is_fec);
if (!cert || r == BroadcastCheckResult::Allowed) {
return r;
}
@ -492,21 +583,23 @@ void OverlayImpl::register_fec_broadcast(std::unique_ptr<BroadcastFec> bcast) {
}
void OverlayImpl::get_self_node(td::Promise<OverlayNode> promise) {
OverlayNode s{local_id_, overlay_id_};
OverlayNode s{local_id_, overlay_id_, peer_list_.local_member_flags_};
auto to_sign = s.to_sign();
auto P = td::PromiseCreator::lambda([oid = print_id(), s = std::move(s), promise = std::move(promise)](
td::Result<std::pair<td::BufferSlice, PublicKey>> R) mutable {
if (R.is_error()) {
auto S = R.move_as_error();
LOG(ERROR) << oid << ": failed to get self node: " << S;
promise.set_error(std::move(S));
return;
}
auto V = R.move_as_ok();
s.update_signature(std::move(V.first));
s.update_adnl_id(adnl::AdnlNodeIdFull{V.second});
promise.set_value(std::move(s));
});
auto P = td::PromiseCreator::lambda(
[oid = print_id(), s = std::move(s), cert = peer_list_.cert_,
promise = std::move(promise)](td::Result<std::pair<td::BufferSlice, PublicKey>> R) mutable {
if (R.is_error()) {
auto S = R.move_as_error();
LOG(ERROR) << oid << ": failed to get self node: " << S;
promise.set_error(std::move(S));
return;
}
auto V = R.move_as_ok();
s.update_signature(std::move(V.first));
s.update_adnl_id(adnl::AdnlNodeIdFull{V.second});
s.update_certificate(std::move(cert));
promise.set_value(std::move(s));
});
td::actor::send_closure(keyring_, &keyring::Keyring::sign_add_get_public_key, local_id_.pubkey_hash(),
std::move(to_sign), std::move(P));
@ -598,17 +691,6 @@ void OverlayImpl::check_broadcast(PublicKeyHash src, td::BufferSlice data, td::P
callback_->check_broadcast(src, overlay_id_, std::move(data), std::move(promise));
}
void OverlayImpl::update_peer_err_ctr(adnl::AdnlNodeIdShort peer_id, bool is_fec) {
auto src_peer = peers_.get(peer_id);
if(src_peer) {
if(is_fec) {
src_peer->fec_broadcast_errors++;
} else {
src_peer->broadcast_errors++;
}
}
}
void OverlayImpl::broadcast_checked(Overlay::BroadcastHash hash, td::Result<td::Unit> R) {
{
auto it = broadcasts_.find(hash);
@ -630,30 +712,47 @@ void OverlayImpl::get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_
res->overlay_id_ = overlay_id_.bits256_value();
res->overlay_id_full_ = id_full_.pubkey().tl();
res->scope_ = scope_;
peers_.iterate([&](const adnl::AdnlNodeIdShort &key, const OverlayPeer &peer) {
iterate_all_peers([&](const adnl::AdnlNodeIdShort &key, const OverlayPeer &peer) {
auto node_obj = create_tl_object<ton_api::engine_validator_overlayStatsNode>();
node_obj->adnl_id_ = key.bits256_value();
node_obj->t_out_bytes_ = peer.throughput_out_bytes;
node_obj->t_in_bytes_ = peer.throughput_in_bytes;
node_obj->t_out_pckts_ = peer.throughput_out_packets;
node_obj->t_in_pckts_ = peer.throughput_in_packets;
node_obj->ip_addr_ = peer.ip_addr_str;
node_obj->last_in_query_ = static_cast<td::uint32>(peer.last_in_query_at.at_unix());
node_obj->last_out_query_ = static_cast<td::uint32>(peer.last_out_query_at.at_unix());
node_obj->bdcst_errors_ = peer.broadcast_errors;
node_obj->fec_bdcst_errors_ = peer.fec_broadcast_errors;
node_obj->is_neighbour_ = peer.is_neighbour();
node_obj->is_alive_ = peer.is_alive();
node_obj->node_flags_ = peer.get_node()->flags();
res->nodes_.push_back(std::move(node_obj));
});
res->stats_.push_back(
create_tl_object<ton_api::engine_validator_oneStat>("neighbours_cnt", PSTRING() << neighbours_.size()));
create_tl_object<ton_api::engine_validator_oneStat>("neighbours_cnt", PSTRING() << neighbours_cnt()));
promise.set_value(std::move(res));
callback_->get_stats_extra([promise = std::move(promise), res = std::move(res)](td::Result<std::string> R) mutable {
if (R.is_ok()) {
res->extra_ = R.move_as_ok();
}
promise.set_value(std::move(res));
});
}
bool OverlayImpl::has_valid_broadcast_certificate(const PublicKeyHash &source, size_t size, bool is_fec) {
if (size > std::numeric_limits<td::uint32>::max()) {
return false;
}
auto it = certs_.find(source);
return check_source_eligible(source, it == certs_.end() ? nullptr : it->second.get(), (td::uint32)size, is_fec);
}
} // namespace overlay

View file

@ -18,6 +18,7 @@
*/
#pragma once
#include "auto/tl/ton_api.h"
#include "td/utils/buffer.h"
#include "td/utils/int_types.h"
@ -37,24 +38,29 @@ class Overlay : public td::actor::Actor {
using BroadcastDataHash = td::Bits256;
using BroadcastPartHash = td::Bits256;
static td::actor::ActorOwn<Overlay> create(td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager,
td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules, td::string scope, OverlayOptions opts = {});
static td::actor::ActorOwn<Overlay> create(td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager,
td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::vector<adnl::AdnlNodeIdShort> nodes,
std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules,
std::string scope);
static td::actor::ActorOwn<Overlay> create_public(
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules,
td::string scope, OverlayOptions opts = {});
static td::actor::ActorOwn<Overlay> create_private(
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules, std::string scope, OverlayOptions opts = {});
static td::actor::ActorOwn<Overlay> create_semiprivate(
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::vector<adnl::AdnlNodeIdShort> nodes, std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate cert, std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules,
std::string scope, OverlayOptions opts = {});
virtual void update_dht_node(td::actor::ActorId<dht::Dht> dht) = 0;
virtual void receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data) = 0;
virtual void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise) = 0;
virtual void receive_message(adnl::AdnlNodeIdShort src, tl_object_ptr<ton_api::overlay_messageExtra> extra,
td::BufferSlice data) = 0;
virtual void receive_query(adnl::AdnlNodeIdShort src, tl_object_ptr<ton_api::overlay_messageExtra> extra,
td::BufferSlice data, td::Promise<td::BufferSlice> promise) = 0;
virtual void send_message_to_neighbours(td::BufferSlice data) = 0;
virtual void send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) = 0;
virtual void send_broadcast_fec(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) = 0;
@ -64,12 +70,17 @@ class Overlay : public td::actor::Actor {
virtual void add_certificate(PublicKeyHash key, std::shared_ptr<Certificate>) = 0;
virtual void set_privacy_rules(OverlayPrivacyRules rules) = 0;
virtual void receive_nodes_from_db(tl_object_ptr<ton_api::overlay_nodes> nodes) = 0;
virtual void receive_nodes_from_db_v2(tl_object_ptr<ton_api::overlay_nodesV2> nodes) = 0;
virtual void get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_overlayStats>> promise) = 0;
virtual void update_throughput_out_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) = 0;
virtual void update_throughput_in_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) = 0;
virtual void update_peer_ip_str(adnl::AdnlNodeIdShort peer_id, td::string ip_str) = 0;
virtual void update_member_certificate(OverlayMemberCertificate cert) = 0;
virtual void update_root_member_list(std::vector<adnl::AdnlNodeIdShort> nodes,
std::vector<PublicKeyHash> root_public_keys, OverlayMemberCertificate cert) = 0;
//virtual void receive_broadcast(td::BufferSlice data) = 0;
//virtual void subscribe(std::unique_ptr<Overlays::Callback> callback) = 0;
virtual void forget_peer(adnl::AdnlNodeIdShort peer_id) = 0;
};
} // namespace overlay

View file

@ -18,11 +18,15 @@
*/
#pragma once
#include <any>
#include <memory>
#include <vector>
#include <map>
#include <set>
#include <unordered_set>
#include <queue>
#include "adnl/adnl-node-id.hpp"
#include "overlay.h"
#include "overlay-manager.h"
#include "overlay-fec.hpp"
@ -32,6 +36,9 @@
#include "td/utils/DecTree.h"
#include "td/utils/List.h"
#include "td/utils/Status.h"
#include "td/utils/Time.h"
#include "td/utils/buffer.h"
#include "td/utils/overloaded.h"
#include "fec/fec.h"
@ -40,6 +47,8 @@
#include "auto/tl/ton_api.h"
#include "auto/tl/ton_api.hpp"
#include "td/utils/port/signals.h"
#include "tl-utils/common-utils.hpp"
namespace ton {
@ -58,15 +67,17 @@ class OverlayPeer {
adnl::AdnlNodeIdFull get_full_id() const {
return node_.adnl_id_full();
}
OverlayNode get() const {
return node_.clone();
const OverlayNode *get_node() const {
return &node_;
}
void update(OverlayNode node) {
CHECK(get_id() == node.adnl_id_short());
if (node.version() > node_.version()) {
node_ = std::move(node);
}
node_.update(std::move(node));
}
void update_certificate(OverlayMemberCertificate cert) {
node_.update_certificate(std::move(cert));
}
OverlayPeer(OverlayNode node) : node_(std::move(node)) {
id_ = node_.adnl_id_short();
}
@ -95,24 +106,44 @@ class OverlayPeer {
return is_alive_;
}
bool is_permanent_member() const {
return is_permanent_member_;
}
void set_permanent(bool value) {
is_permanent_member_ = value;
}
void clear_certificate() {
node_.clear_certificate();
}
auto certificate() const {
return node_.certificate();
}
bool has_full_id() const {
return node_.has_full_id();
}
td::uint32 throughput_out_bytes = 0;
td::uint32 throughput_in_bytes = 0;
td::uint32 throughput_out_packets = 0;
td::uint32 throughput_in_packets = 0;
td::uint32 throughput_out_bytes_ctr = 0;
td::uint32 throughput_in_bytes_ctr = 0;
td::uint32 throughput_out_packets_ctr = 0;
td::uint32 throughput_in_packets_ctr = 0;
td::uint32 broadcast_errors = 0;
td::uint32 fec_broadcast_errors = 0;
td::Timestamp last_in_query_at = td::Timestamp::now();
td::Timestamp last_out_query_at = td::Timestamp::now();
td::string ip_addr_str = "undefined";
private:
@ -122,6 +153,7 @@ class OverlayPeer {
bool is_neighbour_ = false;
size_t missed_pings_ = 0;
bool is_alive_ = true;
bool is_permanent_member_ = false;
td::Timestamp last_ping_at_ = td::Timestamp::now();
};
@ -129,19 +161,23 @@ class OverlayImpl : public Overlay {
public:
OverlayImpl(td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<OverlayManager> manager, td::actor::ActorId<dht::Dht> dht_node,
adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, bool pub,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Overlays::Callback> callback,
OverlayPrivacyRules rules, td::string scope = "{ \"type\": \"undefined\" }", OverlayOptions opts = {});
adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, OverlayType overlay_type,
std::vector<adnl::AdnlNodeIdShort> nodes, std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate cert, std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules,
td::string scope = "{ \"type\": \"undefined\" }", OverlayOptions opts = {});
void update_dht_node(td::actor::ActorId<dht::Dht> dht) override {
dht_node_ = dht;
}
void receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data) override;
void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise) override;
void receive_message(adnl::AdnlNodeIdShort src, tl_object_ptr<ton_api::overlay_messageExtra> extra,
td::BufferSlice data) override;
void receive_query(adnl::AdnlNodeIdShort src, tl_object_ptr<ton_api::overlay_messageExtra> extra,
td::BufferSlice data, td::Promise<td::BufferSlice> promise) override;
void send_message_to_neighbours(td::BufferSlice data) override;
void send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) override;
void send_broadcast_fec(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) override;
void receive_nodes_from_db(tl_object_ptr<ton_api::overlay_nodes> nodes) override;
void receive_nodes_from_db_v2(tl_object_ptr<ton_api::overlay_nodesV2> nodes) override;
void get_self_node(td::Promise<OverlayNode> promise);
@ -149,8 +185,8 @@ class OverlayImpl : public Overlay {
void start_up() override {
update_throughput_at_ = td::Timestamp::in(50.0);
last_throughput_update_ = td::Timestamp::now();
if (public_) {
if (overlay_type_ == OverlayType::Public) {
update_db_at_ = td::Timestamp::in(60.0);
}
alarm_timestamp() = td::Timestamp::in(1);
@ -158,13 +194,17 @@ class OverlayImpl : public Overlay {
void on_ping_result(adnl::AdnlNodeIdShort peer, bool success);
void receive_random_peers(adnl::AdnlNodeIdShort src, td::Result<td::BufferSlice> R);
void receive_random_peers_v2(adnl::AdnlNodeIdShort src, td::Result<td::BufferSlice> R);
void send_random_peers(adnl::AdnlNodeIdShort dst, td::Promise<td::BufferSlice> promise);
void send_random_peers_v2(adnl::AdnlNodeIdShort dst, td::Promise<td::BufferSlice> promise);
void send_random_peers_cont(adnl::AdnlNodeIdShort dst, OverlayNode node, td::Promise<td::BufferSlice> promise);
void send_random_peers_v2_cont(adnl::AdnlNodeIdShort dst, OverlayNode node, td::Promise<td::BufferSlice> promise);
void get_overlay_random_peers(td::uint32 max_peers, td::Promise<std::vector<adnl::AdnlNodeIdShort>> promise) override;
void set_privacy_rules(OverlayPrivacyRules rules) override;
void add_certificate(PublicKeyHash key, std::shared_ptr<Certificate> cert) override {
certs_[key] = std::move(cert);
}
void update_member_certificate(OverlayMemberCertificate cert) override;
void receive_dht_nodes(dht::DhtValue v);
void dht_lookup_finished(td::Status S);
@ -188,6 +228,8 @@ class OverlayImpl : public Overlay {
td::Status check_date(td::uint32 date);
BroadcastCheckResult check_source_eligible(PublicKey source, const Certificate *cert, td::uint32 size, bool is_fec);
BroadcastCheckResult check_source_eligible(const PublicKeyHash &source, const Certificate *cert, td::uint32 size,
bool is_fec);
td::Status check_delivered(BroadcastHash hash);
void broadcast_checked(Overlay::BroadcastHash hash, td::Result<td::Unit> R);
@ -206,17 +248,7 @@ class OverlayImpl : public Overlay {
void send_new_fec_broadcast_part(PublicKeyHash local_id, Overlay::BroadcastDataHash data_hash, td::uint32 size,
td::uint32 flags, td::BufferSlice part, td::uint32 seqno, fec::FecType fec_type,
td::uint32 date);
std::vector<adnl::AdnlNodeIdShort> get_neighbours(td::uint32 max_size = 0) const {
if (max_size == 0 || max_size >= neighbours_.size()) {
return neighbours_;
} else {
std::vector<adnl::AdnlNodeIdShort> vec;
for (td::uint32 i = 0; i < max_size; i++) {
vec.push_back(neighbours_[td::Random::fast(0, static_cast<td::int32>(neighbours_.size()) - 1)]);
}
return vec;
}
}
std::vector<adnl::AdnlNodeIdShort> get_neighbours(td::uint32 max_size = 0) const;
td::actor::ActorId<OverlayManager> overlay_manager() const {
return manager_;
}
@ -236,39 +268,59 @@ class OverlayImpl : public Overlay {
td::Result<Encryptor *> get_encryptor(PublicKey source);
void get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_overlayStats>> promise) override;
void update_throughput_out_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) override {
auto out_peer = peers_.get(peer_id);
if(out_peer) {
out_peer->throughput_out_bytes_ctr += msg_size;
out_peer->throughput_out_packets_ctr++;
if(is_query)
{
out_peer->last_out_query_at = td::Timestamp::now();
}
}
void update_throughput_out_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) override;
void update_throughput_in_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) override;
void update_peer_ip_str(adnl::AdnlNodeIdShort peer_id, td::string ip_str) override;
void update_root_member_list(std::vector<adnl::AdnlNodeIdShort> nodes, std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate cert) override;
bool is_valid_peer(const adnl::AdnlNodeIdShort &id, const ton_api::overlay_MemberCertificate *certificate);
bool is_persistent_node(const adnl::AdnlNodeIdShort &id);
td::uint32 max_data_bcasts() const {
return 100;
}
void update_throughput_in_ctr(adnl::AdnlNodeIdShort peer_id, td::uint32 msg_size, bool is_query) override {
auto in_peer = peers_.get(peer_id);
if(in_peer) {
in_peer->throughput_in_bytes_ctr += msg_size;
in_peer->throughput_in_packets_ctr++;
if(is_query)
{
in_peer->last_in_query_at = td::Timestamp::now();
}
}
td::uint32 max_bcasts() const {
return 1000;
}
void update_peer_ip_str(adnl::AdnlNodeIdShort peer_id, td::string ip_str) override {
auto fpeer = peers_.get(peer_id);
if(fpeer) {
fpeer->ip_addr_str = ip_str;
}
td::uint32 max_fec_bcasts() const {
return 20;
}
td::uint32 max_sources() const {
return 10;
}
td::uint32 max_encryptors() const {
return 16;
}
td::uint32 max_neighbours() const {
return opts_.max_neighbours_;
}
td::uint32 max_peers() const {
return opts_.max_peers_;
}
td::uint32 nodes_to_send() const {
return opts_.nodes_to_send_;
}
td::uint32 propagate_broadcast_to() const {
return opts_.propagate_broadcast_to_;
}
bool has_valid_membership_certificate();
bool has_valid_broadcast_certificate(const PublicKeyHash &source, size_t size, bool is_fec);
void forget_peer(adnl::AdnlNodeIdShort peer_id) override {
del_peer(peer_id);
}
void wait_neighbours_not_empty(td::Promise<td::Unit> promise, int max_retries = 20);
private:
template <class T>
@ -278,6 +330,8 @@ class OverlayImpl : public Overlay {
void process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getRandomPeers &query,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getRandomPeersV2 &query,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getBroadcast &query,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getBroadcastList &query,
@ -294,20 +348,28 @@ class OverlayImpl : public Overlay {
td::Status process_broadcast(adnl::AdnlNodeIdShort message_from, tl_object_ptr<ton_api::overlay_fec_completed> msg);
td::Status process_broadcast(adnl::AdnlNodeIdShort message_from, tl_object_ptr<ton_api::overlay_unicast> msg);
void do_add_peer(OverlayNode node);
void add_peer_in_cont(OverlayNode node);
void add_peer_in(OverlayNode node);
td::Status validate_peer_certificate(const adnl::AdnlNodeIdShort &node, const OverlayMemberCertificate &cert);
td::Status validate_peer_certificate(const adnl::AdnlNodeIdShort &node, const OverlayMemberCertificate *cert);
td::Status validate_peer_certificate(const adnl::AdnlNodeIdShort &node, ton_api::overlay_MemberCertificate *cert);
void add_peer(OverlayNode node);
void add_peers(std::vector<OverlayNode> nodes);
void add_peers(const tl_object_ptr<ton_api::overlay_nodes> &nodes);
void add_peers(const tl_object_ptr<ton_api::overlay_nodesV2> &nodes);
void del_some_peers();
void del_peer(adnl::AdnlNodeIdShort id);
void del_peer(const adnl::AdnlNodeIdShort &id);
void del_from_neighbour_list(OverlayPeer *P);
void del_from_neighbour_list(const adnl::AdnlNodeIdShort &id);
void iterate_all_peers(std::function<void(const adnl::AdnlNodeIdShort &key, OverlayPeer &peer)> cb);
OverlayPeer *get_random_peer(bool only_alive = false);
bool is_root_public_key(const PublicKeyHash &key) const;
bool has_good_peers() const;
size_t neighbours_cnt() const;
void finish_dht_query() {
if (!next_dht_store_query_) {
next_dht_store_query_ = td::Timestamp::in(td::Random::fast(60.0, 100.0));
}
if (frequent_dht_lookup_ && peers_.size() == bad_peers_.size()) {
if (frequent_dht_lookup_ && !has_good_peers()) {
next_dht_query_ = td::Timestamp::in(td::Random::fast(6.0, 10.0));
} else {
next_dht_query_ = next_dht_store_query_;
@ -322,14 +384,11 @@ class OverlayImpl : public Overlay {
OverlayIdFull id_full_;
OverlayIdShort overlay_id_;
td::DecTree<adnl::AdnlNodeIdShort, OverlayPeer> peers_;
td::Timestamp next_dht_query_ = td::Timestamp::in(1.0);
td::Timestamp next_dht_store_query_ = td::Timestamp::in(1.0);
td::Timestamp update_db_at_;
td::Timestamp update_throughput_at_;
td::Timestamp last_throughput_update_;
std::set<adnl::AdnlNodeIdShort> bad_peers_;
adnl::AdnlNodeIdShort next_bad_peer_ = adnl::AdnlNodeIdShort::zero();
std::unique_ptr<Overlays::Callback> callback_;
@ -337,7 +396,6 @@ class OverlayImpl : public Overlay {
std::map<BroadcastHash, std::unique_ptr<BroadcastFec>> fec_broadcasts_;
std::set<BroadcastHash> delivered_broadcasts_;
std::vector<adnl::AdnlNodeIdShort> neighbours_;
td::ListNode bcast_data_lru_;
td::ListNode bcast_fec_lru_;
std::queue<BroadcastHash> bcast_lru_;
@ -346,33 +404,6 @@ class OverlayImpl : public Overlay {
void bcast_gc();
static td::uint32 max_data_bcasts() {
return 100;
}
static td::uint32 max_bcasts() {
return 1000;
}
static td::uint32 max_fec_bcasts() {
return 20;
}
static td::uint32 max_sources() {
return 10;
}
static td::uint32 max_neighbours() {
return 5;
}
static td::uint32 max_encryptors() {
return 16;
}
static td::uint32 max_peers() {
return 20;
}
static td::uint32 nodes_to_send() {
return 4;
}
static BroadcastHash get_broadcast_hash(adnl::AdnlNodeIdShort &src, td::Bits256 &data_hash) {
td::uint8 buf[64];
td::MutableSlice m{buf, 64};
@ -382,8 +413,7 @@ class OverlayImpl : public Overlay {
return td::sha256_bits256(td::Slice(buf, 64));
}
bool public_;
bool semi_public_ = false;
OverlayType overlay_type_;
OverlayPrivacyRules rules_;
td::string scope_;
bool announce_self_ = true;
@ -412,6 +442,25 @@ class OverlayImpl : public Overlay {
td::ListNode encryptor_lru_;
std::map<PublicKeyHash, std::unique_ptr<CachedEncryptor>> encryptor_map_;
struct PeerList {
struct SlaveKey {
td::int32 expire_at{0};
adnl::AdnlNodeIdShort node{};
};
using SlaveKeys = std::vector<SlaveKey>;
std::map<PublicKeyHash, SlaveKeys> root_public_keys_;
OverlayMemberCertificate cert_;
std::set<adnl::AdnlNodeIdShort> bad_peers_;
adnl::AdnlNodeIdShort next_bad_peer_ = adnl::AdnlNodeIdShort::zero();
td::DecTree<adnl::AdnlNodeIdShort, OverlayPeer> peers_;
std::vector<adnl::AdnlNodeIdShort> neighbours_;
td::Timestamp local_cert_is_valid_until_;
td::uint32 local_member_flags_{0};
} peer_list_;
OverlayOptions opts_;
};
} // namespace overlay

View file

@ -18,7 +18,9 @@
*/
#pragma once
#include "adnl/adnl-node-id.hpp"
#include "adnl/adnl.h"
#include "auto/tl/ton_api.h"
#include "dht/dht.h"
#include "td/actor/PromiseFuture.h"
@ -33,6 +35,8 @@ namespace ton {
namespace overlay {
enum class OverlayType { Public, FixedMemberList, CertificatedMembers };
class OverlayIdShort {
public:
OverlayIdShort() {
@ -88,6 +92,10 @@ struct CertificateFlags {
enum Values : td::uint32 { AllowFec = 1, Trusted = 2 };
};
struct OverlayMemberFlags {
enum Values : td::uint32 { DoNotReceiveBroadcasts = 1 };
};
enum BroadcastCheckResult { Forbidden = 1, NeedCheck = 2, Allowed = 3 };
inline BroadcastCheckResult broadcast_check_result_max(BroadcastCheckResult l, BroadcastCheckResult r) {
@ -108,7 +116,6 @@ class OverlayPrivacyRules {
}
BroadcastCheckResult check_rules(PublicKeyHash hash, td::uint32 size, bool is_fec) {
auto it = authorized_keys_.find(hash);
if (it == authorized_keys_.end()) {
if (size > max_unath_size_) {
@ -158,9 +165,110 @@ class Certificate {
td::SharedSlice signature_;
};
class OverlayMemberCertificate {
public:
OverlayMemberCertificate() {
expire_at_ = std::numeric_limits<td::int32>::max();
}
OverlayMemberCertificate(PublicKey signed_by, td::uint32 flags, td::int32 slot, td::int32 expire_at,
td::BufferSlice signature)
: signed_by_(std::move(signed_by))
, flags_(flags)
, slot_(slot)
, expire_at_(expire_at)
, signature_(std::move(signature)) {
}
OverlayMemberCertificate(const OverlayMemberCertificate &other)
: signed_by_(other.signed_by_)
, flags_(other.flags_)
, slot_(other.slot_)
, expire_at_(other.expire_at_)
, signature_(other.signature_.clone()) {
}
OverlayMemberCertificate(OverlayMemberCertificate &&) = default;
OverlayMemberCertificate &operator=(OverlayMemberCertificate &&) = default;
OverlayMemberCertificate &operator=(const OverlayMemberCertificate &other) {
signed_by_ = other.signed_by_;
flags_ = other.flags_;
slot_ = other.slot_;
expire_at_ = other.expire_at_;
signature_ = other.signature_.clone();
return *this;
}
explicit OverlayMemberCertificate(const ton_api::overlay_MemberCertificate *cert);
td::Status check_signature(const adnl::AdnlNodeIdShort &node);
bool is_expired() const {
return expire_at_ < td::Clocks::system() - 3;
}
bool is_expired(double cur_time) const {
return expire_at_ < cur_time - 3;
}
tl_object_ptr<ton_api::overlay_MemberCertificate> tl() const {
if (empty()) {
return create_tl_object<ton_api::overlay_emptyMemberCertificate>();
}
return create_tl_object<ton_api::overlay_memberCertificate>(signed_by_.tl(), flags_, slot_, expire_at_,
signature_.clone_as_buffer_slice());
}
const auto &issued_by() const {
return signed_by_;
}
td::Slice signature() const {
return signature_.as_slice();
}
td::BufferSlice to_sign_data(const adnl::AdnlNodeIdShort &node) const {
return ton::create_serialize_tl_object<ton::ton_api::overlay_memberCertificateId>(node.tl(), flags_, slot_,
expire_at_);
}
bool empty() const {
return signed_by_.empty();
}
bool is_newer(const OverlayMemberCertificate &other) const {
return !empty() && expire_at_ > other.expire_at_;
}
auto slot() const {
return slot_;
}
auto expire_at() const {
return expire_at_;
}
void set_signature(td::Slice signature) {
signature_ = td::SharedSlice(signature);
}
void set_signature(td::SharedSlice signature) {
signature_ = std::move(signature);
}
private:
PublicKey signed_by_;
td::uint32 flags_;
td::int32 slot_;
td::int32 expire_at_ = std::numeric_limits<td::int32>::max();
td::SharedSlice signature_;
};
struct OverlayOptions {
bool announce_self_ = true;
bool frequent_dht_lookup_ = false;
td::uint32 local_overlay_member_flags_ = 0;
td::int32 max_slaves_in_semiprivate_overlay_ = 5;
td::uint32 max_peers_ = 20;
td::uint32 max_neighbours_ = 5;
td::uint32 nodes_to_send_ = 4;
td::uint32 propagate_broadcast_to_ = 5;
td::uint32 default_permanent_members_flags_ = 0;
};
class Overlays : public td::actor::Actor {
@ -175,6 +283,9 @@ class Overlays : public td::actor::Actor {
td::Promise<td::Unit> promise) {
promise.set_value(td::Unit());
}
virtual void get_stats_extra(td::Promise<std::string> promise) {
promise.set_result("");
}
virtual ~Callback() = default;
};
@ -192,6 +303,10 @@ class Overlays : public td::actor::Actor {
return 1;
}
static constexpr td::uint32 overlay_peer_ttl() {
return 600;
}
static td::actor::ActorOwn<Overlays> create(std::string db_root, td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<dht::Dht> dht);
@ -201,11 +316,20 @@ class Overlays : public td::actor::Actor {
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules,
td::string scope) = 0;
virtual void create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules,
td::string scope, OverlayOptions opts) = 0;
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules, td::string scope,
OverlayOptions opts) = 0;
virtual void create_semiprivate_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes,
std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate certificate,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules,
td::string scope, OverlayOptions opts) = 0;
virtual void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback,
OverlayPrivacyRules rules, std::string scope) = 0;
virtual void create_private_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback,
OverlayPrivacyRules rules, std::string scope, OverlayOptions opts) = 0;
virtual void delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id) = 0;
virtual void send_query(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id,
@ -239,9 +363,18 @@ class Overlays : public td::actor::Actor {
virtual void update_certificate(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id, PublicKeyHash key,
std::shared_ptr<Certificate> cert) = 0;
virtual void update_member_certificate(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
OverlayMemberCertificate certificate) = 0;
virtual void update_root_member_list(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes,
std::vector<PublicKeyHash> root_public_keys,
OverlayMemberCertificate certificate) = 0;
virtual void get_overlay_random_peers(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay, td::uint32 max_peers,
td::Promise<std::vector<adnl::AdnlNodeIdShort>> promise) = 0;
virtual void get_stats(td::Promise<tl_object_ptr<ton_api::engine_validator_overlaysStats>> promise) = 0;
virtual void forget_peer(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay, adnl::AdnlNodeIdShort peer_id) = 0;
};
} // namespace overlay