1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

New shard overlays

This commit is contained in:
SpyCheese 2022-07-29 10:39:02 +03:00
parent 53270a00e6
commit 7ac60bea7d
30 changed files with 411 additions and 271 deletions

View file

@ -18,6 +18,7 @@
*/
#include "auto/tl/ton_api.h"
#include "td/utils/Random.h"
#include "common/delay.h"
#include "adnl/utils.hpp"
#include "dht/dht.h"
@ -73,7 +74,12 @@ OverlayImpl::OverlayImpl(td::actor::ActorId<keyring::Keyring> keyring, td::actor
, scope_(scope) {
overlay_id_ = id_full_.compute_short_id();
VLOG(OVERLAY_INFO) << this << ": creating " << (public_ ? "public" : "private");
if (is_external()) {
CHECK(public_);
VLOG(OVERLAY_INFO) << this << ": creating public external";
} else {
VLOG(OVERLAY_INFO) << this << ": creating " << (public_ ? "public" : "private");
}
for (auto &node : nodes) {
CHECK(!public_);
@ -86,6 +92,7 @@ OverlayImpl::OverlayImpl(td::actor::ActorId<keyring::Keyring> keyring, td::actor
void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getRandomPeers &query,
td::Promise<td::BufferSlice> promise) {
CHECK(!is_external());
if (public_) {
VLOG(OVERLAY_DEBUG) << this << ": received " << query.peers_->nodes_.size() << " nodes from " << src
<< " in getRandomPeers query";
@ -106,6 +113,7 @@ void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getR
void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getBroadcast &query,
td::Promise<td::BufferSlice> promise) {
CHECK(!is_external());
auto it = broadcasts_.find(query.hash_);
if (it == broadcasts_.end()) {
VLOG(OVERLAY_NOTICE) << this << ": received getBroadcastQuery(" << query.hash_ << ") from " << src
@ -127,16 +135,17 @@ void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getB
void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getBroadcastList &query,
td::Promise<td::BufferSlice> promise) {
CHECK(!is_external());
VLOG(OVERLAY_WARNING) << this << ": DROPPING getBroadcastList query";
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "dropping get broadcast list query"));
}
/*void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, adnl::AdnlQueryId query_id, ton_api::overlay_customQuery &query) {
callback_->receive_query(src, query_id, id_, std::move(query.data_));
}
*/
void OverlayImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
if (is_external()) {
LOG(OVERLAY_WARNING) << "dropping query in external overlay " << overlay_id_;
promise.set_error(td::Status::Error("overlay is external"));
return;
}
if (!public_) {
auto P = peers_.get(src);
if (P == nullptr) {
@ -149,9 +158,6 @@ void OverlayImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data,
if (R.is_error()) {
// allow custom query to be here
if (!subscribed()) {
return;
}
callback_->receive_query(src, overlay_id_, std::move(data), std::move(promise));
return;
}
@ -165,27 +171,32 @@ void OverlayImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data,
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_broadcast> bcast) {
CHECK(!is_external());
return BroadcastSimple::create(this, message_from, std::move(bcast));
}
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_broadcastFec> b) {
CHECK(!is_external());
return OverlayFecBroadcastPart::create(this, message_from, std::move(b));
}
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_broadcastFecShort> b) {
CHECK(!is_external());
return OverlayFecBroadcastPart::create(this, message_from, std::move(b));
}
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_broadcastNotFound> bcast) {
CHECK(!is_external());
return td::Status::Error(ErrorCode::protoviolation,
PSTRING() << "received strange message broadcastNotFound from " << message_from);
}
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_fec_received> msg) {
CHECK(!is_external());
auto it = fec_broadcasts_.find(msg->hash_);
if (it != fec_broadcasts_.end()) {
VLOG(OVERLAY_DEBUG) << this << ": received fec opt-out message from " << message_from << " for broadcast "
@ -200,6 +211,7 @@ td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_fec_completed> msg) {
CHECK(!is_external());
auto it = fec_broadcasts_.find(msg->hash_);
if (it != fec_broadcasts_.end()) {
VLOG(OVERLAY_DEBUG) << this << ": received fec completed message from " << message_from << " for broadcast "
@ -214,12 +226,17 @@ td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
td::Status OverlayImpl::process_broadcast(adnl::AdnlNodeIdShort message_from,
tl_object_ptr<ton_api::overlay_unicast> msg) {
CHECK(!is_external());
VLOG(OVERLAY_DEBUG) << this << ": received unicast from " << message_from;
callback_->receive_message(message_from, overlay_id_, std::move(msg->data_));
return td::Status::OK();
}
void OverlayImpl::receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data) {
if (is_external()) {
LOG(OVERLAY_WARNING) << "dropping message in external overlay " << overlay_id_;
return;
}
if (!public_) {
if (peers_.get(src) == nullptr) {
VLOG(OVERLAY_WARNING) << this << ": received query in private overlay from unknown source " << src;
@ -228,9 +245,6 @@ void OverlayImpl::receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice dat
}
auto X = fetch_tl_object<ton_api::overlay_Broadcast>(data.clone(), true);
if (X.is_error()) {
if (!subscribed()) {
return;
}
VLOG(OVERLAY_DEBUG) << this << ": received custom message";
callback_->receive_message(src, overlay_id_, std::move(data));
return;
@ -274,7 +288,7 @@ void OverlayImpl::alarm() {
}
if (public_) {
if (peers_.size() > 0 && subscribed()) {
if (peers_.size() > 0) {
auto P = get_random_peer();
if (P) {
send_random_peers(P->get_id(), {});
@ -330,6 +344,10 @@ void OverlayImpl::receive_dht_nodes(td::Result<dht::DhtValue> res, bool dummy) {
VLOG(OVERLAY_NOTICE) << this << ": can not get value from DHT: " << res.move_as_error();
}
if (is_external()) {
return;
}
VLOG(OVERLAY_INFO) << this << ": adding self node to DHT overlay's nodes";
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), oid = print_id()](td::Result<OverlayNode> R) {
if (R.is_error()) {
@ -342,7 +360,7 @@ void OverlayImpl::receive_dht_nodes(td::Result<dht::DhtValue> res, bool dummy) {
}
void OverlayImpl::update_dht_nodes(OverlayNode node) {
if (!public_ || !subscribed()) {
if (!public_) {
return;
}
@ -399,12 +417,30 @@ void OverlayImpl::bcast_gc() {
}
void OverlayImpl::send_message_to_neighbours(td::BufferSlice data) {
if (neighbours_.empty()) {
// TODO: limit retries
delay_action(
[SelfId = actor_id(this), data = std::move(data)]() mutable {
td::actor::send_closure(SelfId, &OverlayImpl::send_message_to_neighbours, std::move(data));
},
td::Timestamp::in(0.5));
return;
}
for (auto &n : neighbours_) {
td::actor::send_closure(manager_, &OverlayManager::send_message, n, local_id_, overlay_id_, data.clone());
}
}
void OverlayImpl::send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
if (neighbours_.empty()) {
// TODO: limit retries
delay_action(
[SelfId = actor_id(this), send_as, flags, data = std::move(data)]() mutable {
td::actor::send_closure(SelfId, &OverlayImpl::send_broadcast, send_as, flags, std::move(data));
},
td::Timestamp::in(0.5));
return;
}
auto S = BroadcastSimple::create_new(actor_id(this), keyring_, send_as, std::move(data), flags);
if (S.is_error()) {
LOG(WARNING) << "failed to send broadcast: " << S;
@ -412,6 +448,15 @@ void OverlayImpl::send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::Bu
}
void OverlayImpl::send_broadcast_fec(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
if (neighbours_.empty()) {
// TODO: limit retries
delay_action(
[SelfId = actor_id(this), send_as, flags, data = std::move(data)]() mutable {
td::actor::send_closure(SelfId, &OverlayImpl::send_broadcast_fec, send_as, flags, std::move(data));
},
td::Timestamp::in(0.5));
return;
}
OverlayOutboundFecBroadcast::create(std::move(data), flags, actor_id(this), send_as);
}
@ -503,7 +548,7 @@ void OverlayImpl::send_new_fec_broadcast_part(PublicKeyHash local_id, Overlay::B
}
void OverlayImpl::deliver_broadcast(PublicKeyHash source, td::BufferSlice data) {
if (!subscribed()) {
if (is_external()) {
return;
}
callback_->receive_broadcast(source, overlay_id_, std::move(data));
@ -578,7 +623,8 @@ void OverlayImpl::set_privacy_rules(OverlayPrivacyRules rules) {
}
void OverlayImpl::check_broadcast(PublicKeyHash src, td::BufferSlice data, td::Promise<td::Unit> promise) {
if (!subscribed()) {
if (is_external()) {
promise.set_result(td::Unit());
return;
}
callback_->check_broadcast(src, overlay_id_, std::move(data), std::move(promise));