mirror of
https://github.com/ton-blockchain/ton
synced 2025-02-12 11:12:16 +00:00
Improve dht lookup in overlays (#1104)
Continue dht lookup even if value was found
This commit is contained in:
parent
77a816e461
commit
9661676646
8 changed files with 127 additions and 35 deletions
|
@ -179,6 +179,7 @@ class DhtMemberImpl : public DhtMember {
|
||||||
void get_value(DhtKey key, td::Promise<DhtValue> result) override {
|
void get_value(DhtKey key, td::Promise<DhtValue> result) override {
|
||||||
get_value_in(key.compute_key_id(), std::move(result));
|
get_value_in(key.compute_key_id(), std::move(result));
|
||||||
}
|
}
|
||||||
|
void get_value_many(DhtKey key, std::function<void(DhtValue)> callback, td::Promise<td::Unit> promise) override;
|
||||||
|
|
||||||
void alarm() override {
|
void alarm() override {
|
||||||
alarm_timestamp() = td::Timestamp::in(1.0);
|
alarm_timestamp() = td::Timestamp::in(1.0);
|
||||||
|
|
|
@ -210,8 +210,11 @@ void DhtQueryFindValue::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeI
|
||||||
send_get_nodes = true;
|
send_get_nodes = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
promise_.set_value(std::move(value));
|
if (on_value_found(std::move(value))) {
|
||||||
need_stop = true;
|
send_get_nodes = true;
|
||||||
|
} else {
|
||||||
|
need_stop = true;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
[&](ton_api::dht_valueNotFound &v) {
|
[&](ton_api::dht_valueNotFound &v) {
|
||||||
add_nodes(DhtNodesList{std::move(v.nodes_), our_network_id()});
|
add_nodes(DhtNodesList{std::move(v.nodes_), our_network_id()});
|
||||||
|
@ -244,7 +247,32 @@ void DhtQueryFindValue::on_result_nodes(td::Result<td::BufferSlice> R, adnl::Adn
|
||||||
}
|
}
|
||||||
|
|
||||||
void DhtQueryFindValue::finish(DhtNodesList list) {
|
void DhtQueryFindValue::finish(DhtNodesList list) {
|
||||||
promise_.set_error(td::Status::Error(ErrorCode::notready, "dht key not found"));
|
}
|
||||||
|
|
||||||
|
bool DhtQueryFindValueSingle::on_value_found(DhtValue value) {
|
||||||
|
promise_.set_value(std::move(value));
|
||||||
|
found_ = true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DhtQueryFindValueSingle::tear_down() {
|
||||||
|
if (!found_) {
|
||||||
|
promise_.set_error(td::Status::Error(ErrorCode::notready, "dht key not found"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool DhtQueryFindValueMany::on_value_found(DhtValue value) {
|
||||||
|
callback_(std::move(value));
|
||||||
|
found_ = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DhtQueryFindValueMany::tear_down() {
|
||||||
|
if (found_) {
|
||||||
|
promise_.set_value(td::Unit());
|
||||||
|
} else {
|
||||||
|
promise_.set_error(td::Status::Error(ErrorCode::notready, "dht key not found"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DhtQueryStore::DhtQueryStore(DhtValue key_value, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src,
|
DhtQueryStore::DhtQueryStore(DhtValue key_value, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src,
|
||||||
|
|
|
@ -126,16 +126,11 @@ class DhtQueryFindNodes : public DhtQuery {
|
||||||
};
|
};
|
||||||
|
|
||||||
class DhtQueryFindValue : public DhtQuery {
|
class DhtQueryFindValue : public DhtQuery {
|
||||||
private:
|
|
||||||
td::Promise<DhtValue> promise_;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
DhtQueryFindValue(DhtKeyId key, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src, DhtNodesList list,
|
DhtQueryFindValue(DhtKeyId key, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src, DhtNodesList list,
|
||||||
td::uint32 k, td::uint32 a, td::int32 our_network_id, DhtNode self, bool client_only,
|
td::uint32 k, td::uint32 a, td::int32 our_network_id, DhtNode self, bool client_only,
|
||||||
td::actor::ActorId<DhtMember> node, td::actor::ActorId<adnl::Adnl> adnl,
|
td::actor::ActorId<DhtMember> node, td::actor::ActorId<adnl::Adnl> adnl)
|
||||||
td::Promise<DhtValue> promise)
|
: DhtQuery(key, print_id, src, k, a, our_network_id, std::move(self), client_only, node, adnl) {
|
||||||
: DhtQuery(key, print_id, src, k, a, our_network_id, std::move(self), client_only, node, adnl)
|
|
||||||
, promise_(std::move(promise)) {
|
|
||||||
add_nodes(std::move(list));
|
add_nodes(std::move(list));
|
||||||
}
|
}
|
||||||
void send_one_query(adnl::AdnlNodeIdShort id) override;
|
void send_one_query(adnl::AdnlNodeIdShort id) override;
|
||||||
|
@ -146,6 +141,48 @@ class DhtQueryFindValue : public DhtQuery {
|
||||||
std::string get_name() const override {
|
std::string get_name() const override {
|
||||||
return "find value";
|
return "find value";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtual bool on_value_found(DhtValue value) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
class DhtQueryFindValueSingle : public DhtQueryFindValue {
|
||||||
|
public:
|
||||||
|
DhtQueryFindValueSingle(DhtKeyId key, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src, DhtNodesList list,
|
||||||
|
td::uint32 k, td::uint32 a, td::int32 our_network_id, DhtNode self, bool client_only,
|
||||||
|
td::actor::ActorId<DhtMember> node, td::actor::ActorId<adnl::Adnl> adnl,
|
||||||
|
td::Promise<DhtValue> promise)
|
||||||
|
: DhtQueryFindValue(key, print_id, src, std::move(list), k, a, our_network_id, std::move(self), client_only, node,
|
||||||
|
adnl)
|
||||||
|
, promise_(std::move(promise)) {
|
||||||
|
add_nodes(std::move(list));
|
||||||
|
}
|
||||||
|
bool on_value_found(DhtValue value) override;
|
||||||
|
void tear_down() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
td::Promise<DhtValue> promise_;
|
||||||
|
bool found_ = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
class DhtQueryFindValueMany : public DhtQueryFindValue {
|
||||||
|
public:
|
||||||
|
DhtQueryFindValueMany(DhtKeyId key, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src, DhtNodesList list,
|
||||||
|
td::uint32 k, td::uint32 a, td::int32 our_network_id, DhtNode self, bool client_only,
|
||||||
|
td::actor::ActorId<DhtMember> node, td::actor::ActorId<adnl::Adnl> adnl,
|
||||||
|
std::function<void(DhtValue)> callback, td::Promise<td::Unit> promise)
|
||||||
|
: DhtQueryFindValue(key, print_id, src, std::move(list), k, a, our_network_id, std::move(self), client_only, node,
|
||||||
|
adnl)
|
||||||
|
, callback_(std::move(callback))
|
||||||
|
, promise_(std::move(promise)) {
|
||||||
|
add_nodes(std::move(list));
|
||||||
|
}
|
||||||
|
bool on_value_found(DhtValue value) override;
|
||||||
|
void tear_down() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::function<void(DhtValue)> callback_;
|
||||||
|
td::Promise<td::Unit> promise_;
|
||||||
|
bool found_ = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
class DhtQueryStore : public td::actor::Actor {
|
class DhtQueryStore : public td::actor::Actor {
|
||||||
|
|
17
dht/dht.cpp
17
dht/dht.cpp
|
@ -470,7 +470,7 @@ void DhtMemberImpl::get_value_in(DhtKeyId key, td::Promise<DhtValue> result) {
|
||||||
network_id = network_id_, id = id_,
|
network_id = network_id_, id = id_,
|
||||||
client_only = client_only_](td::Result<DhtNode> R) mutable {
|
client_only = client_only_](td::Result<DhtNode> R) mutable {
|
||||||
R.ensure();
|
R.ensure();
|
||||||
td::actor::create_actor<DhtQueryFindValue>("FindValueQuery", key, print_id, id, std::move(list), k, a, network_id,
|
td::actor::create_actor<DhtQueryFindValueSingle>("FindValueQuery", key, print_id, id, std::move(list), k, a, network_id,
|
||||||
R.move_as_ok(), client_only, SelfId, adnl, std::move(promise))
|
R.move_as_ok(), client_only, SelfId, adnl, std::move(promise))
|
||||||
.release();
|
.release();
|
||||||
});
|
});
|
||||||
|
@ -478,6 +478,21 @@ void DhtMemberImpl::get_value_in(DhtKeyId key, td::Promise<DhtValue> result) {
|
||||||
get_self_node(std::move(P));
|
get_self_node(std::move(P));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DhtMemberImpl::get_value_many(DhtKey key, std::function<void(DhtValue)> callback, td::Promise<td::Unit> promise) {
|
||||||
|
DhtKeyId key_id = key.compute_key_id();
|
||||||
|
auto P = td::PromiseCreator::lambda(
|
||||||
|
[key = key_id, callback = std::move(callback), promise = std::move(promise), SelfId = actor_id(this),
|
||||||
|
print_id = print_id(), adnl = adnl_, list = get_nearest_nodes(key_id, k_ * 2), k = k_, a = a_,
|
||||||
|
network_id = network_id_, id = id_, client_only = client_only_](td::Result<DhtNode> R) mutable {
|
||||||
|
R.ensure();
|
||||||
|
td::actor::create_actor<DhtQueryFindValueMany>("FindValueManyQuery", key, print_id, id, std::move(list), k, a,
|
||||||
|
network_id, R.move_as_ok(), client_only, SelfId, adnl,
|
||||||
|
std::move(callback), std::move(promise))
|
||||||
|
.release();
|
||||||
|
});
|
||||||
|
get_self_node(std::move(P));
|
||||||
|
}
|
||||||
|
|
||||||
void DhtMemberImpl::register_reverse_connection(adnl::AdnlNodeIdFull client, td::Promise<td::Unit> promise) {
|
void DhtMemberImpl::register_reverse_connection(adnl::AdnlNodeIdFull client, td::Promise<td::Unit> promise) {
|
||||||
auto client_short = client.compute_short_id();
|
auto client_short = client.compute_short_id();
|
||||||
td::uint32 ttl = (td::uint32)td::Clocks::system() + 300;
|
td::uint32 ttl = (td::uint32)td::Clocks::system() + 300;
|
||||||
|
|
|
@ -53,6 +53,7 @@ class Dht : public td::actor::Actor {
|
||||||
|
|
||||||
virtual void set_value(DhtValue key_value, td::Promise<td::Unit> result) = 0;
|
virtual void set_value(DhtValue key_value, td::Promise<td::Unit> result) = 0;
|
||||||
virtual void get_value(DhtKey key, td::Promise<DhtValue> result) = 0;
|
virtual void get_value(DhtKey key, td::Promise<DhtValue> result) = 0;
|
||||||
|
virtual void get_value_many(DhtKey key, std::function<void(DhtValue)> callback, td::Promise<td::Unit> promise) = 0;
|
||||||
|
|
||||||
virtual void register_reverse_connection(adnl::AdnlNodeIdFull client, td::Promise<td::Unit> promise) = 0;
|
virtual void register_reverse_connection(adnl::AdnlNodeIdFull client, td::Promise<td::Unit> promise) = 0;
|
||||||
virtual void request_reverse_ping(adnl::AdnlNode target, adnl::AdnlNodeIdShort client,
|
virtual void request_reverse_ping(adnl::AdnlNode target, adnl::AdnlNodeIdShort client,
|
||||||
|
|
|
@ -283,11 +283,14 @@ void OverlayImpl::alarm() {
|
||||||
}
|
}
|
||||||
if (next_dht_query_ && next_dht_query_.is_in_past()) {
|
if (next_dht_query_ && next_dht_query_.is_in_past()) {
|
||||||
next_dht_query_ = td::Timestamp::never();
|
next_dht_query_ = td::Timestamp::never();
|
||||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<dht::DhtValue> res) {
|
std::function<void(dht::DhtValue)> callback = [SelfId = actor_id(this)](dht::DhtValue value) {
|
||||||
td::actor::send_closure(SelfId, &OverlayImpl::receive_dht_nodes, std::move(res), true);
|
td::actor::send_closure(SelfId, &OverlayImpl::receive_dht_nodes, std::move(value));
|
||||||
});
|
};
|
||||||
td::actor::send_closure(dht_node_, &dht::Dht::get_value, dht::DhtKey{overlay_id_.pubkey_hash(), "nodes", 0},
|
td::Promise<td::Unit> on_finish = [SelfId = actor_id(this)](td::Result<td::Unit> R) {
|
||||||
std::move(P));
|
td::actor::send_closure(SelfId, &OverlayImpl::dht_lookup_finished, R.move_as_status());
|
||||||
|
};
|
||||||
|
td::actor::send_closure(dht_node_, &dht::Dht::get_value_many, dht::DhtKey{overlay_id_.pubkey_hash(), "nodes", 0},
|
||||||
|
std::move(callback), std::move(on_finish));
|
||||||
}
|
}
|
||||||
if (update_db_at_.is_in_past()) {
|
if (update_db_at_.is_in_past()) {
|
||||||
if (peers_.size() > 0) {
|
if (peers_.size() > 0) {
|
||||||
|
@ -311,30 +314,30 @@ void OverlayImpl::alarm() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void OverlayImpl::receive_dht_nodes(td::Result<dht::DhtValue> res, bool dummy) {
|
void OverlayImpl::receive_dht_nodes(dht::DhtValue v) {
|
||||||
CHECK(public_);
|
CHECK(public_);
|
||||||
if (res.is_ok()) {
|
auto R = fetch_tl_object<ton_api::overlay_nodes>(v.value().clone(), true);
|
||||||
auto v = res.move_as_ok();
|
if (R.is_ok()) {
|
||||||
auto R = fetch_tl_object<ton_api::overlay_nodes>(v.value().clone(), true);
|
auto r = R.move_as_ok();
|
||||||
if (R.is_ok()) {
|
VLOG(OVERLAY_INFO) << this << ": received " << r->nodes_.size() << " nodes from overlay";
|
||||||
auto r = R.move_as_ok();
|
VLOG(OVERLAY_EXTRA_DEBUG) << this << ": nodes: " << ton_api::to_string(r);
|
||||||
VLOG(OVERLAY_INFO) << this << ": received " << r->nodes_.size() << " nodes from overlay";
|
std::vector<OverlayNode> nodes;
|
||||||
VLOG(OVERLAY_EXTRA_DEBUG) << this << ": nodes: " << ton_api::to_string(r);
|
for (auto &n : r->nodes_) {
|
||||||
std::vector<OverlayNode> nodes;
|
auto N = OverlayNode::create(n);
|
||||||
for (auto &n : r->nodes_) {
|
if (N.is_ok()) {
|
||||||
auto N = OverlayNode::create(n);
|
nodes.emplace_back(N.move_as_ok());
|
||||||
if (N.is_ok()) {
|
|
||||||
nodes.emplace_back(N.move_as_ok());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
add_peers(std::move(nodes));
|
|
||||||
} else {
|
|
||||||
VLOG(OVERLAY_WARNING) << this << ": incorrect value in DHT for overlay nodes: " << R.move_as_error();
|
|
||||||
}
|
}
|
||||||
|
add_peers(std::move(nodes));
|
||||||
} else {
|
} else {
|
||||||
VLOG(OVERLAY_NOTICE) << this << ": can not get value from DHT: " << res.move_as_error();
|
VLOG(OVERLAY_WARNING) << this << ": incorrect value in DHT for overlay nodes: " << R.move_as_error();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void OverlayImpl::dht_lookup_finished(td::Status S) {
|
||||||
|
if (S.is_error()) {
|
||||||
|
VLOG(OVERLAY_NOTICE) << this << ": can not get value from DHT: " << S;
|
||||||
|
}
|
||||||
if (!(next_dht_store_query_ && next_dht_store_query_.is_in_past())) {
|
if (!(next_dht_store_query_ && next_dht_store_query_.is_in_past())) {
|
||||||
finish_dht_query();
|
finish_dht_query();
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -166,7 +166,8 @@ class OverlayImpl : public Overlay {
|
||||||
certs_[key] = std::move(cert);
|
certs_[key] = std::move(cert);
|
||||||
}
|
}
|
||||||
|
|
||||||
void receive_dht_nodes(td::Result<dht::DhtValue> res, bool dummy);
|
void receive_dht_nodes(dht::DhtValue v);
|
||||||
|
void dht_lookup_finished(td::Status S);
|
||||||
void update_dht_nodes(OverlayNode node);
|
void update_dht_nodes(OverlayNode node);
|
||||||
|
|
||||||
void update_neighbours(td::uint32 nodes_to_change);
|
void update_neighbours(td::uint32 nodes_to_change);
|
||||||
|
|
|
@ -555,6 +555,12 @@ class Result {
|
||||||
};
|
};
|
||||||
return status_.move_as_error_suffix(suffix);
|
return status_.move_as_error_suffix(suffix);
|
||||||
}
|
}
|
||||||
|
Status move_as_status() TD_WARN_UNUSED_RESULT {
|
||||||
|
if (status_.is_error()) {
|
||||||
|
return move_as_error();
|
||||||
|
}
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
const T &ok() const {
|
const T &ok() const {
|
||||||
LOG_CHECK(status_.is_ok()) << status_;
|
LOG_CHECK(status_.is_ok()) << status_;
|
||||||
return value_;
|
return value_;
|
||||||
|
|
Loading…
Reference in a new issue