1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Reverse connections in adnl (#545)

Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
EmelyanenkoK 2022-12-06 17:06:54 +03:00 committed by GitHub
parent 7754b3615e
commit fcf59b4eb5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 489 additions and 12 deletions

View file

@ -66,6 +66,15 @@ class DhtMemberImpl : public DhtMember {
DhtKeyId last_republish_key_ = DhtKeyId::zero();
DhtKeyId last_check_key_ = DhtKeyId::zero();
adnl::AdnlNodeIdShort last_check_reverse_conn_ = adnl::AdnlNodeIdShort::zero();
struct ReverseConnection {
adnl::AdnlNodeIdShort dht_node_;
DhtKeyId key_id_;
td::Timestamp ttl_;
};
std::map<adnl::AdnlNodeIdShort, ReverseConnection> reverse_connections_;
std::set<adnl::AdnlNodeIdShort> our_reverse_connections_;
class Callback : public adnl::Adnl::Callback {
public:
@ -122,6 +131,10 @@ class DhtMemberImpl : public DhtMember {
void process_query(adnl::AdnlNodeIdShort src, ton_api::dht_store &query, td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::dht_getSignedAddressList &query,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::dht_registerReverseConnection &query,
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::dht_requestReversePing &query,
td::Promise<td::BufferSlice> promise);
public:
DhtMemberImpl(adnl::AdnlNodeIdShort id, std::string db_root, td::actor::ActorId<keyring::Keyring> keyring,
@ -143,6 +156,12 @@ class DhtMemberImpl : public DhtMember {
void set_value(DhtValue key_value, td::Promise<td::Unit> result) override;
td::uint32 distance(DhtKeyId key_id, td::uint32 max_value);
void register_reverse_connection(adnl::AdnlNodeIdFull client, td::Promise<td::Unit> promise) override;
void request_reverse_ping(adnl::AdnlNode target, adnl::AdnlNodeIdShort client,
td::Promise<td::Unit> promise) override;
void request_reverse_ping_cont(adnl::AdnlNode target, td::BufferSlice signature, adnl::AdnlNodeIdShort client,
td::Promise<td::Unit> promise);
td::Status store_in(DhtValue value) override;
void send_store(DhtValue value, td::Promise<td::Unit> promise);

View file

@ -279,6 +279,131 @@ void DhtQueryStore::store_ready(td::Result<td::BufferSlice> R) {
}
}
DhtQueryRegisterReverseConnection::DhtQueryRegisterReverseConnection(
DhtKeyId key_id, adnl::AdnlNodeIdFull client, td::uint32 ttl, td::BufferSlice signature,
DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src, DhtNodesList list, td::uint32 k, td::uint32 a, DhtNode self,
bool client_only, td::actor::ActorId<DhtMember> node, td::actor::ActorId<adnl::Adnl> adnl,
td::Promise<td::Unit> promise)
: print_id_(print_id)
, k_(k)
, a_(a)
, promise_(std::move(promise))
, key_id_(key_id)
, list_(std::move(list))
, self_(std::move(self))
, client_only_(client_only) {
node_ = node;
adnl_ = adnl;
src_ = src;
query_ = create_serialize_tl_object<ton_api::dht_registerReverseConnection>(client.tl(), ttl, std::move(signature));
}
void DhtQueryRegisterReverseConnection::start_up() {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<DhtNodesList> res) {
td::actor::send_closure(SelfId, &DhtQueryRegisterReverseConnection::send_queries, std::move(res));
});
auto A = td::actor::create_actor<DhtQueryFindNodes>("FindNodesQuery", key_id_, print_id_, src_, std::move(list_), k_,
a_, self_.clone(), client_only_, node_, adnl_, std::move(P));
A.release();
}
void DhtQueryRegisterReverseConnection::send_queries(td::Result<DhtNodesList> R) {
if (R.is_error()) {
auto S = R.move_as_error();
VLOG(DHT_NOTICE) << this << ": failed to get nearest nodes to " << key_id_ << ": " << S;
promise_.set_error(std::move(S));
stop();
return;
}
auto list = R.move_as_ok();
remaining_ = static_cast<td::uint32>(list.size());
if (remaining_ == 0) {
VLOG(DHT_NOTICE) << this << ": failed to get nearest nodes to " << key_id_ << ": no nodes";
promise_.set_error(td::Status::Error("no dht nodes"));
stop();
return;
}
for (auto &node : list.list()) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::BufferSlice> R) {
td::actor::send_closure(SelfId, &DhtQueryRegisterReverseConnection::ready, std::move(R));
});
td::actor::send_closure(adnl_, &adnl::Adnl::send_query, src_, node.adnl_id().compute_short_id(), "dht regrevcon",
std::move(P), td::Timestamp::in(2.0 + td::Random::fast(0, 20) * 0.1), query_.clone());
}
}
void DhtQueryRegisterReverseConnection::ready(td::Result<td::BufferSlice> R) {
if (R.is_error()) {
fail_++;
VLOG(DHT_INFO) << this << ": failed register reverse connection query: " << R.move_as_error();
} else {
auto R2 = fetch_tl_object<ton_api::dht_stored>(R.move_as_ok(), true);
if (R2.is_error()) {
fail_++;
VLOG(DHT_WARNING) << this << ": can not parse answer (expected dht.stored): " << R2.move_as_error();
} else {
success_++;
}
}
CHECK(remaining_ > 0);
remaining_--;
if (remaining_ == 0) {
if (success_ > 0) {
promise_.set_value(td::Unit());
} else {
promise_.set_result(td::Status::Error("failed to make actual query"));
}
stop();
}
}
void DhtQueryRequestReversePing::send_one_query(adnl::AdnlNodeIdShort id) {
td::BufferSlice B;
if (client_only_) {
B = query_.clone();
} else {
B = create_serialize_tl_object_suffix<ton_api::dht_query>(query_.as_slice(), self_.tl());
}
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), dst = id](td::Result<td::BufferSlice> R) {
td::actor::send_closure(SelfId, &DhtQueryRequestReversePing::on_result, std::move(R), dst);
});
td::actor::send_closure(adnl_, &adnl::Adnl::send_query, get_src(), id, "dht requestReversePing", std::move(P),
td::Timestamp::in(2.0 + td::Random::fast(0, 20) * 0.1), std::move(B));
}
void DhtQueryRequestReversePing::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst) {
if (R.is_error()) {
VLOG(DHT_INFO) << this << ": failed reverse ping query " << get_src() << "->" << dst << ": " << R.move_as_error();
finish_query();
return;
}
auto Res = fetch_tl_object<ton_api::dht_ReversePingResult>(R.move_as_ok(), true);
if (Res.is_error()) {
VLOG(DHT_WARNING) << this << ": dropping incorrect answer on dht.requestReversePing query from " << dst << ": "
<< Res.move_as_error();
finish_query();
return;
}
auto A = Res.move_as_ok();
ton_api::downcast_call(*A, td::overloaded(
[&](ton_api::dht_reversePingOk &v) {
promise_.set_value(td::Unit());
stop();
},
[&](ton_api::dht_clientNotFound &v) {
add_nodes(DhtNodesList{std::move(v.nodes_)});
finish_query();
}));
}
void DhtQueryRequestReversePing::finish(DhtNodesList list) {
promise_.set_error(td::Status::Error(ErrorCode::notready, "dht key not found"));
}
} // namespace dht
} // namespace ton

View file

@ -165,6 +165,62 @@ class DhtQueryStore : public td::actor::Actor {
}
};
class DhtQueryRegisterReverseConnection : public td::actor::Actor {
private:
DhtMember::PrintId print_id_;
td::uint32 k_;
td::uint32 a_;
td::Promise<td::Unit> promise_;
td::actor::ActorId<DhtMember> node_;
td::actor::ActorId<adnl::Adnl> adnl_;
adnl::AdnlNodeIdShort src_;
DhtKeyId key_id_;
td::BufferSlice query_;
td::uint32 success_ = 0;
td::uint32 fail_ = 0;
td::uint32 remaining_;
DhtNodesList list_;
DhtNode self_;
bool client_only_;
public:
DhtQueryRegisterReverseConnection(DhtKeyId key_id, adnl::AdnlNodeIdFull client, td::uint32 ttl,
td::BufferSlice signature, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src,
DhtNodesList list, td::uint32 k, td::uint32 a, DhtNode self, bool client_only,
td::actor::ActorId<DhtMember> node, td::actor::ActorId<adnl::Adnl> adnl,
td::Promise<td::Unit> promise);
void send_queries(td::Result<DhtNodesList> R);
void ready(td::Result<td::BufferSlice> R);
void start_up() override;
DhtMember::PrintId print_id() const {
return print_id_;
}
};
class DhtQueryRequestReversePing : public DhtQuery {
private:
td::Promise<td::Unit> promise_;
td::BufferSlice query_;
public:
DhtQueryRequestReversePing(adnl::AdnlNodeIdShort client, adnl::AdnlNode target, td::BufferSlice signature,
DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src, DhtNodesList list, td::uint32 k,
td::uint32 a, DhtNode self, bool client_only, td::actor::ActorId<DhtMember> node,
td::actor::ActorId<adnl::Adnl> adnl, td::Promise<td::Unit> promise)
: DhtQuery(DhtMember::get_reverse_connection_key(client).compute_key_id(), print_id, src, std::move(list), k, a,
std::move(self), client_only, node, adnl)
, promise_(std::move(promise))
, query_(create_serialize_tl_object<ton_api::dht_requestReversePing>(target.tl(), std::move(signature),
client.bits256_value(), k)) {
}
void send_one_query(adnl::AdnlNodeIdShort id) override;
void on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst);
void finish(DhtNodesList list) override;
std::string get_name() const override {
return "request remote ping";
}
};
inline td::StringBuilder &operator<<(td::StringBuilder &sb, const DhtQuery &dht) {
sb << dht.print_id();
return sb;

View file

@ -90,8 +90,11 @@ void DhtMemberImpl::start_up() {
ton_api::dht_findValue::ID,
ton_api::dht_store::ID,
ton_api::dht_ping::ID,
ton_api::dht_registerReverseConnection::ID,
ton_api::dht_requestReversePing::ID,
ton_api::dht_query::ID,
ton_api::dht_message::ID};
ton_api::dht_message::ID,
ton_api::dht_requestReversePingCont::ID};
for (auto it : methods) {
td::actor::send_closure(adnl_, &adnl::Adnl::subscribe, id_, adnl::Adnl::int_to_bytestring(it),
@ -131,8 +134,11 @@ void DhtMemberImpl::tear_down() {
ton_api::dht_findValue::ID,
ton_api::dht_store::ID,
ton_api::dht_ping::ID,
ton_api::dht_registerReverseConnection::ID,
ton_api::dht_requestReversePing::ID,
ton_api::dht_query::ID,
ton_api::dht_message::ID};
ton_api::dht_message::ID,
ton_api::dht_requestReversePingCont::ID};
for (auto it : methods) {
td::actor::send_closure(adnl_, &adnl::Adnl::unsubscribe, id_, adnl::Adnl::int_to_bytestring(it));
@ -299,6 +305,61 @@ void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_getSig
get_self_node(std::move(P));
}
static td::BufferSlice register_reverse_connection_to_sign(adnl::AdnlNodeIdShort client, adnl::AdnlNodeIdShort dht_id,
td::uint32 ttl) {
td::BufferSlice result(32 + 32 + 4);
td::MutableSlice s = result.as_slice();
s.copy_from(client.as_slice());
s.remove_prefix(32);
s.copy_from(dht_id.as_slice());
s.remove_prefix(32);
s.copy_from(std::string(reinterpret_cast<char *>(&ttl), 4));
return result;
}
void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_registerReverseConnection &query,
td::Promise<td::BufferSlice> promise) {
td::uint32 ttl = query.ttl_, now = (td::uint32)td::Clocks::system();
if (ttl <= now) {
return;
}
PublicKey pub{query.node_};
adnl::AdnlNodeIdShort client_id{pub.compute_short_id()};
td::BufferSlice to_sign = register_reverse_connection_to_sign(client_id, src, ttl);
TRY_RESULT_PROMISE(promise, encryptor, pub.create_encryptor());
TRY_STATUS_PROMISE(promise, encryptor->check_signature(to_sign, query.signature_));
DhtKeyId key_id = get_reverse_connection_key(client_id).compute_key_id();
reverse_connections_[client_id] = ReverseConnection{src, key_id, td::Timestamp::at_unix(std::min(ttl, now + 300))};
promise.set_value(create_serialize_tl_object<ton_api::dht_stored>());
}
void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_requestReversePing &query,
td::Promise<td::BufferSlice> promise) {
adnl::AdnlNodeIdShort client{query.client_};
auto it = reverse_connections_.find(client);
if (it != reverse_connections_.end()) {
if (it->second.ttl_.is_in_past()) {
reverse_connections_.erase(it);
} else {
PublicKey pub{query.target_->id_};
TRY_RESULT_PROMISE(promise, encryptor, pub.create_encryptor());
TRY_STATUS_PROMISE(promise,
encryptor->check_signature(serialize_tl_object(query.target_, true), query.signature_));
td::actor::send_closure(adnl_, &adnl::Adnl::send_message, id_, it->second.dht_node_,
create_serialize_tl_object<ton_api::dht_requestReversePingCont>(
std::move(query.target_), std::move(query.signature_), query.client_));
promise.set_result(create_serialize_tl_object<ton_api::dht_reversePingOk>());
return;
}
}
auto k = static_cast<td::uint32>(query.k_);
if (k > max_k()) {
k = max_k();
}
auto R = get_nearest_nodes(get_reverse_connection_key(client).compute_key_id(), k);
promise.set_value(create_serialize_tl_object<ton_api::dht_clientNotFound>(R.tl()));
}
void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) {
if (client_only_) {
@ -369,6 +430,27 @@ void DhtMemberImpl::receive_ping(DhtKeyId key, DhtNode result) {
}
void DhtMemberImpl::receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data) {
auto F = fetch_tl_object<ton_api::dht_requestReversePingCont>(data, true);
if (F.is_ok()) {
auto S = [&]() -> td::Status {
auto f = F.move_as_ok();
adnl::AdnlNodeIdShort client{f->client_};
if (!our_reverse_connections_.count(client)) {
return td::Status::Error(PSTRING() << ": unknown id for reverse ping: " << client);
}
TRY_RESULT_PREFIX(node, adnl::AdnlNode::create(f->target_), "failed to parse node: ");
TRY_RESULT_PREFIX(encryptor, node.pub_id().pubkey().create_encryptor(), "failed to create encryptor: ");
TRY_STATUS_PREFIX(encryptor->check_signature(serialize_tl_object(f->target_, true), f->signature_),
"invalid signature: ");
VLOG(DHT_INFO) << this << ": sending reverse ping to " << node.compute_short_id();
td::actor::send_closure(adnl_, &adnl::Adnl::add_peer, client, node.pub_id(), node.addr_list());
td::actor::send_closure(adnl_, &adnl::Adnl::send_message, client, node.compute_short_id(), td::BufferSlice());
return td::Status::OK();
}();
if (S.is_error()) {
VLOG(DHT_INFO) << this << ": " << S;
}
}
}
void DhtMemberImpl::set_value(DhtValue value, td::Promise<td::Unit> promise) {
@ -396,6 +478,70 @@ void DhtMemberImpl::get_value_in(DhtKeyId key, td::Promise<DhtValue> result) {
get_self_node(std::move(P));
}
void DhtMemberImpl::register_reverse_connection(adnl::AdnlNodeIdFull client, td::Promise<td::Unit> promise) {
auto client_short = client.compute_short_id();
td::uint32 ttl = (td::uint32)td::Clocks::system() + 300;
our_reverse_connections_.insert(client_short);
auto key_id = get_reverse_connection_key(client_short).compute_key_id();
td::actor::send_closure(keyring_, &keyring::Keyring::sign_message, client_short.pubkey_hash(),
register_reverse_connection_to_sign(client_short, id_, ttl),
[=, print_id = print_id(), list = get_nearest_nodes(key_id, k_), SelfId = actor_id(this),
promise = std::move(promise)](td::Result<td::BufferSlice> R) mutable {
TRY_RESULT_PROMISE_PREFIX(promise, signature, std::move(R), "Failed to sign: ");
td::actor::send_closure(SelfId, &DhtMemberImpl::get_self_node,
[=, list = std::move(list), signature = std::move(signature),
promise = std::move(promise)](td::Result<DhtNode> R) mutable {
R.ensure();
td::actor::create_actor<DhtQueryRegisterReverseConnection>(
"RegisterReverseQuery", key_id, std::move(client), ttl,
std::move(signature), print_id, id_, std::move(list), k_, a_,
R.move_as_ok(), client_only_, SelfId, adnl_,
std::move(promise))
.release();
});
});
}
void DhtMemberImpl::request_reverse_ping(adnl::AdnlNode target, adnl::AdnlNodeIdShort client,
td::Promise<td::Unit> promise) {
auto pubkey_hash = target.compute_short_id().pubkey_hash();
td::BufferSlice to_sign = serialize_tl_object(target.tl(), true);
td::actor::send_closure(keyring_, &keyring::Keyring::sign_message, pubkey_hash, std::move(to_sign),
[SelfId = actor_id(this), promise = std::move(promise), target = std::move(target),
client](td::Result<td::BufferSlice> R) mutable {
TRY_RESULT_PROMISE(promise, signature, std::move(R));
td::actor::send_closure(SelfId, &DhtMemberImpl::request_reverse_ping_cont,
std::move(target), std::move(signature), client,
std::move(promise));
});
}
void DhtMemberImpl::request_reverse_ping_cont(adnl::AdnlNode target, td::BufferSlice signature,
adnl::AdnlNodeIdShort client, td::Promise<td::Unit> promise) {
auto it = reverse_connections_.find(client);
if (it != reverse_connections_.end()) {
if (it->second.ttl_.is_in_past()) {
reverse_connections_.erase(it);
} else {
td::actor::send_closure(adnl_, &adnl::Adnl::send_message, id_, it->second.dht_node_,
create_serialize_tl_object<ton_api::dht_requestReversePingCont>(
target.tl(), std::move(signature), client.bits256_value()));
promise.set_result(td::Unit());
return;
}
}
auto key_id = get_reverse_connection_key(client).compute_key_id();
get_self_node([=, target = std::move(target), signature = std::move(signature), promise = std::move(promise),
SelfId = actor_id(this), print_id = print_id(), list = get_nearest_nodes(key_id, k_),
client_only = client_only_](td::Result<DhtNode> R) mutable {
R.ensure();
td::actor::create_actor<DhtQueryRequestReversePing>("RequestReversePing", client, std::move(target),
std::move(signature), print_id, id_, std::move(list), k_, a_,
R.move_as_ok(), client_only, SelfId, adnl_, std::move(promise))
.release();
});
}
void DhtMemberImpl::check() {
VLOG(DHT_INFO) << this << ": ping=" << ping_queries_ << " fnode=" << find_node_queries_
<< " fvalue=" << find_value_queries_ << " store=" << store_queries_
@ -454,6 +600,16 @@ void DhtMemberImpl::check() {
}
}
}
if (reverse_connections_.size() > 0) {
auto it = reverse_connections_.upper_bound(last_check_reverse_conn_);
if (it == reverse_connections_.end()) {
it = reverse_connections_.begin();
}
last_check_reverse_conn_ = it->first;
if (it->second.ttl_.is_in_past()) {
reverse_connections_.erase(it);
}
}
if (republish_att_.is_in_past()) {
auto it = our_values_.lower_bound(last_republish_key_);

View file

@ -54,6 +54,10 @@ class Dht : public td::actor::Actor {
virtual void set_value(DhtValue key_value, td::Promise<td::Unit> result) = 0;
virtual void get_value(DhtKey key, td::Promise<DhtValue> result) = 0;
virtual void register_reverse_connection(adnl::AdnlNodeIdFull client, td::Promise<td::Unit> promise) = 0;
virtual void request_reverse_ping(adnl::AdnlNode target, adnl::AdnlNodeIdShort client,
td::Promise<td::Unit> promise) = 0;
virtual void dump(td::StringBuilder &sb) const = 0;
virtual ~Dht() = default;

View file

@ -101,6 +101,10 @@ class DhtMember : public Dht {
virtual void get_self_node(td::Promise<DhtNode> promise) = 0;
virtual PrintId print_id() const = 0;
static DhtKey get_reverse_connection_key(adnl::AdnlNodeIdShort node) {
return DhtKey{node.pubkey_hash(), "address", 0};
}
};
inline td::StringBuilder &operator<<(td::StringBuilder &sb, const DhtMember::PrintId &id) {