mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
updated liteserver
- new methods for liteserver/liteclient - added ADNL/DHT client-only work mode - fixed crash in ADNL
This commit is contained in:
parent
acf16718e6
commit
53ec9684bd
70 changed files with 816 additions and 322 deletions
|
@ -145,13 +145,13 @@ void DhtBucket::promote_node(size_t idx) {
|
|||
}
|
||||
}
|
||||
|
||||
void DhtBucket::check(td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> dht,
|
||||
void DhtBucket::check(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> dht,
|
||||
adnl::AdnlNodeIdShort src) {
|
||||
size_t have_space = 0;
|
||||
for (size_t i = 0; i < active_nodes_.size(); i++) {
|
||||
auto &node = active_nodes_[i];
|
||||
if (node && td::Time::now_cached() - node->last_ping_at() > ping_timeout_) {
|
||||
node->send_ping(adnl, dht, src);
|
||||
node->send_ping(client_only, adnl, dht, src);
|
||||
if (node->ready_from() == 0) {
|
||||
demote_node(i);
|
||||
}
|
||||
|
@ -163,7 +163,7 @@ void DhtBucket::check(td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<Dh
|
|||
for (size_t i = 0; i < backup_nodes_.size(); i++) {
|
||||
auto &node = backup_nodes_[i];
|
||||
if (node && td::Time::now_cached() - node->last_ping_at() > ping_timeout_) {
|
||||
node->send_ping(adnl, dht, src);
|
||||
node->send_ping(client_only, adnl, dht, src);
|
||||
}
|
||||
if (node && have_space > 0 && node->is_ready()) {
|
||||
promote_node(i);
|
||||
|
|
|
@ -39,8 +39,8 @@ class DhtBucket {
|
|||
|
||||
//std::map<td::UInt256, std::unique_ptr<DhtRemoteNode>> pending_nodes_;
|
||||
td::uint32 k_;
|
||||
bool check_one(td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node, adnl::AdnlNodeIdShort src,
|
||||
const DhtMember::PrintId &print_id);
|
||||
//bool check_one(td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node, adnl::AdnlNodeIdShort src,
|
||||
// const DhtMember::PrintId &print_id);
|
||||
void demote_node(size_t idx);
|
||||
void promote_node(size_t idx);
|
||||
|
||||
|
@ -52,7 +52,8 @@ class DhtBucket {
|
|||
td::uint32 active_cnt();
|
||||
td::Status add_full_node(DhtKeyId id, DhtNode node, td::actor::ActorId<adnl::Adnl> adnl,
|
||||
adnl::AdnlNodeIdShort self_id);
|
||||
void check(td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node, adnl::AdnlNodeIdShort src);
|
||||
void check(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node,
|
||||
adnl::AdnlNodeIdShort src);
|
||||
void receive_ping(DhtKeyId id, DhtNode result, td::actor::ActorId<adnl::Adnl> adnl, adnl::AdnlNodeIdShort self_id);
|
||||
void get_nearest_nodes(DhtKeyId id, td::uint32 bit, DhtNodesList &vec, td::uint32 k);
|
||||
void dump(td::StringBuilder &sb) const;
|
||||
|
|
|
@ -94,6 +94,8 @@ class DhtMemberImpl : public DhtMember {
|
|||
td::actor::ActorId<keyring::Keyring> keyring_;
|
||||
td::actor::ActorId<adnl::Adnl> adnl_;
|
||||
|
||||
bool client_only_{false};
|
||||
|
||||
td::uint64 ping_queries_{0};
|
||||
td::uint64 find_node_queries_{0};
|
||||
td::uint64 find_value_queries_{0};
|
||||
|
@ -123,8 +125,8 @@ class DhtMemberImpl : public DhtMember {
|
|||
|
||||
public:
|
||||
DhtMemberImpl(adnl::AdnlNodeIdShort id, std::string db_root, td::actor::ActorId<keyring::Keyring> keyring,
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::uint32 k, td::uint32 a = 3)
|
||||
: id_(id), key_{id_}, k_(k), a_(a), db_root_(db_root), keyring_(keyring), adnl_(adnl) {
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::uint32 k, td::uint32 a = 3, bool client_only = false)
|
||||
: id_(id), key_{id_}, k_(k), a_(a), db_root_(db_root), keyring_(keyring), adnl_(adnl), client_only_(client_only) {
|
||||
for (size_t i = 0; i < 256; i++) {
|
||||
buckets_.emplace_back(k_);
|
||||
}
|
||||
|
|
|
@ -96,7 +96,12 @@ void DhtQuery::add_nodes(DhtNodesList list) {
|
|||
|
||||
void DhtQueryFindNodes::send_one_query(adnl::AdnlNodeIdShort id) {
|
||||
auto P = create_serialize_tl_object<ton_api::dht_findNode>(get_key().tl(), get_k());
|
||||
auto B = create_serialize_tl_object_suffix<ton_api::dht_query>(P.as_slice(), self_.tl());
|
||||
td::BufferSlice B;
|
||||
if (client_only_) {
|
||||
B = std::move(P);
|
||||
} else {
|
||||
B = create_serialize_tl_object_suffix<ton_api::dht_query>(P.as_slice(), self_.tl());
|
||||
}
|
||||
|
||||
auto Pr = td::PromiseCreator::lambda([SelfId = actor_id(this), dst = id](td::Result<td::BufferSlice> R) {
|
||||
td::actor::send_closure(SelfId, &DhtQueryFindNodes::on_result, std::move(R), dst);
|
||||
|
@ -129,7 +134,12 @@ void DhtQueryFindNodes::finish(DhtNodesList list) {
|
|||
|
||||
void DhtQueryFindValue::send_one_query(adnl::AdnlNodeIdShort id) {
|
||||
auto P = create_serialize_tl_object<ton_api::dht_findValue>(get_key().tl(), get_k());
|
||||
auto B = create_serialize_tl_object_suffix<ton_api::dht_query>(P.as_slice(), self_.tl());
|
||||
td::BufferSlice B;
|
||||
if (client_only_) {
|
||||
B = std::move(P);
|
||||
} else {
|
||||
B = create_serialize_tl_object_suffix<ton_api::dht_query>(P.as_slice(), self_.tl());
|
||||
}
|
||||
|
||||
auto Pr = td::PromiseCreator::lambda([SelfId = actor_id(this), dst = id](td::Result<td::BufferSlice> R) {
|
||||
td::actor::send_closure(SelfId, &DhtQueryFindValue::on_result, std::move(R), dst);
|
||||
|
@ -186,7 +196,7 @@ void DhtQueryFindValue::finish(DhtNodesList list) {
|
|||
}
|
||||
|
||||
DhtQueryStore::DhtQueryStore(DhtValue key_value, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src,
|
||||
DhtNodesList list, td::uint32 k, td::uint32 a, DhtNode self,
|
||||
DhtNodesList list, td::uint32 k, td::uint32 a, DhtNode self, bool client_only,
|
||||
td::actor::ActorId<DhtMember> node, td::actor::ActorId<adnl::Adnl> adnl,
|
||||
td::Promise<td::Unit> promise)
|
||||
: print_id_(print_id)
|
||||
|
@ -195,7 +205,8 @@ DhtQueryStore::DhtQueryStore(DhtValue key_value, DhtMember::PrintId print_id, ad
|
|||
, promise_(std::move(promise))
|
||||
, value_(std::move(key_value))
|
||||
, list_(std::move(list))
|
||||
, self_(std::move(self)) {
|
||||
, self_(std::move(self))
|
||||
, client_only_(client_only) {
|
||||
node_ = node;
|
||||
adnl_ = adnl;
|
||||
src_ = src;
|
||||
|
@ -208,7 +219,7 @@ void DhtQueryStore::start_up() {
|
|||
|
||||
auto key = value_.key_id();
|
||||
auto A = td::actor::create_actor<DhtQueryFindNodes>("FindNodesQuery", key, print_id_, src_, std::move(list_), k_, a_,
|
||||
self_.clone(), node_, adnl_, std::move(P));
|
||||
self_.clone(), client_only_, node_, adnl_, std::move(P));
|
||||
A.release();
|
||||
}
|
||||
|
||||
|
|
|
@ -41,11 +41,21 @@ class DhtQuery : public td::actor::Actor {
|
|||
protected:
|
||||
DhtKeyId key_;
|
||||
DhtNode self_;
|
||||
bool client_only_;
|
||||
|
||||
public:
|
||||
DhtQuery(DhtKeyId key, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src, DhtNodesList list, td::uint32 k,
|
||||
td::uint32 a, DhtNode self, td::actor::ActorId<DhtMember> node, td::actor::ActorId<adnl::Adnl> adnl)
|
||||
: key_(key), self_(std::move(self)), print_id_(print_id), src_(src), k_(k), a_(a), node_(node), adnl_(adnl) {
|
||||
td::uint32 a, DhtNode self, bool client_only, td::actor::ActorId<DhtMember> node,
|
||||
td::actor::ActorId<adnl::Adnl> adnl)
|
||||
: key_(key)
|
||||
, self_(std::move(self))
|
||||
, client_only_(client_only)
|
||||
, print_id_(print_id)
|
||||
, src_(src)
|
||||
, k_(k)
|
||||
, a_(a)
|
||||
, node_(node)
|
||||
, adnl_(adnl) {
|
||||
add_nodes(std::move(list));
|
||||
}
|
||||
DhtMember::PrintId print_id() const {
|
||||
|
@ -94,9 +104,10 @@ class DhtQueryFindNodes : public DhtQuery {
|
|||
|
||||
public:
|
||||
DhtQueryFindNodes(DhtKeyId key, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src, DhtNodesList list,
|
||||
td::uint32 k, td::uint32 a, DhtNode self, td::actor::ActorId<DhtMember> node,
|
||||
td::uint32 k, td::uint32 a, DhtNode self, bool client_only, td::actor::ActorId<DhtMember> node,
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::Promise<DhtNodesList> promise)
|
||||
: DhtQuery(key, print_id, src, std::move(list), k, a, std::move(self), node, adnl), promise_(std::move(promise)) {
|
||||
: DhtQuery(key, print_id, src, std::move(list), k, a, std::move(self), client_only, node, adnl)
|
||||
, promise_(std::move(promise)) {
|
||||
}
|
||||
void send_one_query(adnl::AdnlNodeIdShort id) override;
|
||||
void on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst);
|
||||
|
@ -112,9 +123,10 @@ class DhtQueryFindValue : public DhtQuery {
|
|||
|
||||
public:
|
||||
DhtQueryFindValue(DhtKeyId key, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src, DhtNodesList list,
|
||||
td::uint32 k, td::uint32 a, DhtNode self, td::actor::ActorId<DhtMember> node,
|
||||
td::uint32 k, td::uint32 a, DhtNode self, bool client_only, td::actor::ActorId<DhtMember> node,
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::Promise<DhtValue> promise)
|
||||
: DhtQuery(key, print_id, src, std::move(list), k, a, std::move(self), node, adnl), promise_(std::move(promise)) {
|
||||
: DhtQuery(key, print_id, src, std::move(list), k, a, std::move(self), client_only, node, adnl)
|
||||
, promise_(std::move(promise)) {
|
||||
}
|
||||
void send_one_query(adnl::AdnlNodeIdShort id) override;
|
||||
void on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst);
|
||||
|
@ -139,10 +151,11 @@ class DhtQueryStore : public td::actor::Actor {
|
|||
td::uint32 remaining_;
|
||||
DhtNodesList list_;
|
||||
DhtNode self_;
|
||||
bool client_only_;
|
||||
|
||||
public:
|
||||
DhtQueryStore(DhtValue key_value, DhtMember::PrintId print_id, adnl::AdnlNodeIdShort src, DhtNodesList list,
|
||||
td::uint32 k, td::uint32 a, DhtNode self, td::actor::ActorId<DhtMember> node,
|
||||
td::uint32 k, td::uint32 a, DhtNode self, bool client_only, td::actor::ActorId<DhtMember> node,
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::Promise<td::Unit> promise);
|
||||
void send_stores(td::Result<DhtNodesList> res);
|
||||
void store_ready(td::Result<td::BufferSlice> res);
|
||||
|
|
|
@ -63,7 +63,7 @@ td::Status DhtRemoteNode::update_value(DhtNode node, td::actor::ActorId<adnl::Ad
|
|||
return td::Status::OK();
|
||||
}
|
||||
|
||||
void DhtRemoteNode::send_ping(td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node,
|
||||
void DhtRemoteNode::send_ping(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node,
|
||||
adnl::AdnlNodeIdShort src) {
|
||||
missed_pings_++;
|
||||
if (missed_pings_ > max_missed_pings_ && ready_from_ > 0) {
|
||||
|
@ -75,37 +75,42 @@ void DhtRemoteNode::send_ping(td::actor::ActorId<adnl::Adnl> adnl, td::actor::Ac
|
|||
|
||||
td::actor::send_closure(adnl, &adnl::Adnl::add_peer, src, node_.adnl_id(), node_.addr_list());
|
||||
|
||||
auto P = td::PromiseCreator::lambda(
|
||||
[key = id_, id = node_.adnl_id().compute_short_id(), node, src, adnl](td::Result<DhtNode> R) mutable {
|
||||
if (R.is_error()) {
|
||||
LOG(ERROR) << "[dht]: failed to get self node";
|
||||
return;
|
||||
}
|
||||
auto P = td::PromiseCreator::lambda([key, node, adnl](td::Result<td::BufferSlice> R) {
|
||||
if (R.is_error()) {
|
||||
VLOG(DHT_INFO) << "[dht]: received error for query to " << key << ": " << R.move_as_error();
|
||||
return;
|
||||
}
|
||||
auto F = fetch_tl_object<ton_api::dht_node>(R.move_as_ok(), true);
|
||||
auto P = td::PromiseCreator::lambda([key = id_, id = node_.adnl_id().compute_short_id(), client_only, node, src,
|
||||
adnl](td::Result<DhtNode> R) mutable {
|
||||
if (R.is_error()) {
|
||||
LOG(ERROR) << "[dht]: failed to get self node";
|
||||
return;
|
||||
}
|
||||
auto P = td::PromiseCreator::lambda([key, node, adnl](td::Result<td::BufferSlice> R) {
|
||||
if (R.is_error()) {
|
||||
VLOG(DHT_INFO) << "[dht]: received error for query to " << key << ": " << R.move_as_error();
|
||||
return;
|
||||
}
|
||||
auto F = fetch_tl_object<ton_api::dht_node>(R.move_as_ok(), true);
|
||||
|
||||
if (F.is_ok()) {
|
||||
auto N = DhtNode::create(F.move_as_ok());
|
||||
if (N.is_ok()) {
|
||||
td::actor::send_closure(node, &DhtMember::receive_ping, key, N.move_as_ok());
|
||||
} else {
|
||||
VLOG(DHT_WARNING) << "[dht]: bad answer from " << key
|
||||
<< ": dropping bad getSignedAddressList() query answer: " << N.move_as_error();
|
||||
}
|
||||
} else {
|
||||
VLOG(DHT_WARNING) << "[dht]: bad answer from " << key
|
||||
<< ": dropping invalid getSignedAddressList() query answer: " << F.move_as_error();
|
||||
}
|
||||
});
|
||||
auto Q = create_serialize_tl_object<ton_api::dht_getSignedAddressList>();
|
||||
auto B = create_serialize_tl_object_suffix<ton_api::dht_query>(Q.as_slice(), R.move_as_ok().tl());
|
||||
td::actor::send_closure(adnl, &adnl::Adnl::send_query, src, id, "dht ping", std::move(P),
|
||||
td::Timestamp::in(10.0 + td::Random::fast(0, 100) * 0.1), std::move(B));
|
||||
});
|
||||
if (F.is_ok()) {
|
||||
auto N = DhtNode::create(F.move_as_ok());
|
||||
if (N.is_ok()) {
|
||||
td::actor::send_closure(node, &DhtMember::receive_ping, key, N.move_as_ok());
|
||||
} else {
|
||||
VLOG(DHT_WARNING) << "[dht]: bad answer from " << key
|
||||
<< ": dropping bad getSignedAddressList() query answer: " << N.move_as_error();
|
||||
}
|
||||
} else {
|
||||
VLOG(DHT_WARNING) << "[dht]: bad answer from " << key
|
||||
<< ": dropping invalid getSignedAddressList() query answer: " << F.move_as_error();
|
||||
}
|
||||
});
|
||||
auto Q = create_serialize_tl_object<ton_api::dht_getSignedAddressList>();
|
||||
td::BufferSlice B;
|
||||
if (client_only) {
|
||||
B = std::move(Q);
|
||||
} else {
|
||||
B = create_serialize_tl_object_suffix<ton_api::dht_query>(Q.as_slice(), R.move_as_ok().tl());
|
||||
}
|
||||
td::actor::send_closure(adnl, &adnl::Adnl::send_query, src, id, "dht ping", std::move(P),
|
||||
td::Timestamp::in(10.0 + td::Random::fast(0, 100) * 0.1), std::move(B));
|
||||
});
|
||||
|
||||
td::actor::send_closure(node, &DhtMember::get_self_node, std::move(P));
|
||||
}
|
||||
|
|
|
@ -76,7 +76,8 @@ class DhtRemoteNode {
|
|||
double last_ping_at() const {
|
||||
return last_ping_at_;
|
||||
}
|
||||
void send_ping(td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node, adnl::AdnlNodeIdShort src);
|
||||
void send_ping(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node,
|
||||
adnl::AdnlNodeIdShort src);
|
||||
td::Status receive_ping(DhtNode node, td::actor::ActorId<adnl::Adnl> adnl, adnl::AdnlNodeIdShort self_id);
|
||||
td::Status update_value(DhtNode node, td::actor::ActorId<adnl::Adnl> adnl, adnl::AdnlNodeIdShort self_id);
|
||||
};
|
||||
|
|
135
dht/dht.cpp
135
dht/dht.cpp
|
@ -14,7 +14,7 @@
|
|||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
Copyright 2017-2019 Telegram Systems LLP
|
||||
Copyright 2017-2020 Telegram Systems LLP
|
||||
*/
|
||||
#include "dht.hpp"
|
||||
|
||||
|
@ -44,9 +44,10 @@ namespace dht {
|
|||
|
||||
td::actor::ActorOwn<DhtMember> DhtMember::create(adnl::AdnlNodeIdShort id, std::string db_root,
|
||||
td::actor::ActorId<keyring::Keyring> keyring,
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::uint32 k, td::uint32 a) {
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::uint32 k, td::uint32 a,
|
||||
bool client_only) {
|
||||
return td::actor::ActorOwn<DhtMember>(
|
||||
td::actor::create_actor<DhtMemberImpl>("dht", id, db_root, keyring, adnl, k, a));
|
||||
td::actor::create_actor<DhtMemberImpl>("dht", id, db_root, keyring, adnl, k, a, client_only));
|
||||
}
|
||||
|
||||
td::Result<td::actor::ActorOwn<Dht>> Dht::create(adnl::AdnlNodeIdShort id, std::string db_root,
|
||||
|
@ -66,9 +67,24 @@ td::Result<td::actor::ActorOwn<Dht>> Dht::create(adnl::AdnlNodeIdShort id, std::
|
|||
return std::move(D);
|
||||
}
|
||||
|
||||
td::Result<td::actor::ActorOwn<Dht>> Dht::create_client(adnl::AdnlNodeIdShort id, std::string db_root,
|
||||
std::shared_ptr<DhtGlobalConfig> conf,
|
||||
td::actor::ActorId<keyring::Keyring> keyring,
|
||||
td::actor::ActorId<adnl::Adnl> adnl) {
|
||||
CHECK(conf->get_k() > 0);
|
||||
CHECK(conf->get_a() > 0);
|
||||
|
||||
auto D = DhtMember::create(id, db_root, keyring, adnl, conf->get_k(), conf->get_a(), true);
|
||||
auto &nodes = conf->nodes();
|
||||
|
||||
for (auto &node : nodes.list()) {
|
||||
auto key = node.get_key();
|
||||
td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone());
|
||||
}
|
||||
return std::move(D);
|
||||
}
|
||||
|
||||
void DhtMemberImpl::start_up() {
|
||||
std::shared_ptr<td::KeyValue> kv = std::make_shared<td::RocksDb>(
|
||||
td::RocksDb::open(PSTRING() << db_root_ << "/dht-" << td::base64url_encode(id_.as_slice())).move_as_ok());
|
||||
std::vector<td::int32> methods = {ton_api::dht_getSignedAddressList::ID,
|
||||
ton_api::dht_findNode::ID,
|
||||
ton_api::dht_findValue::ID,
|
||||
|
@ -82,26 +98,31 @@ void DhtMemberImpl::start_up() {
|
|||
std::make_unique<Callback>(actor_id(this), id_));
|
||||
}
|
||||
alarm_timestamp() = td::Timestamp::in(1.0);
|
||||
for (td::uint32 bit = 0; bit < 256; bit++) {
|
||||
auto key = create_hash_tl_object<ton_api::dht_db_key_bucket>(bit);
|
||||
std::string value;
|
||||
auto R = kv->get(key.as_slice(), value);
|
||||
R.ensure();
|
||||
if (R.move_as_ok() == td::KeyValue::GetStatus::Ok) {
|
||||
auto V = fetch_tl_object<ton_api::dht_db_bucket>(td::BufferSlice{value}, true);
|
||||
V.ensure();
|
||||
auto nodes = std::move(V.move_as_ok()->nodes_);
|
||||
auto s = nodes->nodes_.size();
|
||||
DhtNodesList list{std::move(nodes)};
|
||||
CHECK(list.size() == s);
|
||||
auto &B = buckets_[bit];
|
||||
for (auto &node : list.list()) {
|
||||
auto key = node.get_key();
|
||||
B.add_full_node(key, std::move(node), adnl_, id_);
|
||||
|
||||
if (!db_root_.empty()) {
|
||||
std::shared_ptr<td::KeyValue> kv = std::make_shared<td::RocksDb>(
|
||||
td::RocksDb::open(PSTRING() << db_root_ << "/dht-" << td::base64url_encode(id_.as_slice())).move_as_ok());
|
||||
for (td::uint32 bit = 0; bit < 256; bit++) {
|
||||
auto key = create_hash_tl_object<ton_api::dht_db_key_bucket>(bit);
|
||||
std::string value;
|
||||
auto R = kv->get(key.as_slice(), value);
|
||||
R.ensure();
|
||||
if (R.move_as_ok() == td::KeyValue::GetStatus::Ok) {
|
||||
auto V = fetch_tl_object<ton_api::dht_db_bucket>(td::BufferSlice{value}, true);
|
||||
V.ensure();
|
||||
auto nodes = std::move(V.move_as_ok()->nodes_);
|
||||
auto s = nodes->nodes_.size();
|
||||
DhtNodesList list{std::move(nodes)};
|
||||
CHECK(list.size() == s);
|
||||
auto &B = buckets_[bit];
|
||||
for (auto &node : list.list()) {
|
||||
auto key = node.get_key();
|
||||
B.add_full_node(key, std::move(node), adnl_, id_);
|
||||
}
|
||||
}
|
||||
}
|
||||
db_ = DbType{std::move(kv)};
|
||||
}
|
||||
db_ = DbType{std::move(kv)};
|
||||
}
|
||||
|
||||
void DhtMemberImpl::tear_down() {
|
||||
|
@ -119,6 +140,9 @@ void DhtMemberImpl::tear_down() {
|
|||
}
|
||||
|
||||
void DhtMemberImpl::save_to_db() {
|
||||
if (db_root_.empty()) {
|
||||
return;
|
||||
}
|
||||
next_save_to_db_at_ = td::Timestamp::in(10.0);
|
||||
alarm_timestamp().relax(next_save_to_db_at_);
|
||||
|
||||
|
@ -277,6 +301,9 @@ void DhtMemberImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::dht_getSig
|
|||
|
||||
void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data,
|
||||
td::Promise<td::BufferSlice> promise) {
|
||||
if (client_only_) {
|
||||
return;
|
||||
}
|
||||
{
|
||||
auto R = fetch_tl_prefix<ton_api::dht_query>(data, true);
|
||||
if (R.is_ok()) {
|
||||
|
@ -358,11 +385,11 @@ void DhtMemberImpl::set_value(DhtValue value, td::Promise<td::Unit> promise) {
|
|||
|
||||
void DhtMemberImpl::get_value_in(DhtKeyId key, td::Promise<DhtValue> result) {
|
||||
auto P = td::PromiseCreator::lambda([key, promise = std::move(result), SelfId = actor_id(this), print_id = print_id(),
|
||||
adnl = adnl_, list = get_nearest_nodes(key, k_), k = k_, a = a_,
|
||||
id = id_](td::Result<DhtNode> R) mutable {
|
||||
adnl = adnl_, list = get_nearest_nodes(key, k_), k = k_, a = a_, id = id_,
|
||||
client_only = client_only_](td::Result<DhtNode> R) mutable {
|
||||
R.ensure();
|
||||
td::actor::create_actor<DhtQueryFindValue>("FindValueQuery", key, print_id, id, std::move(list), k, a,
|
||||
R.move_as_ok(), SelfId, adnl, std::move(promise))
|
||||
R.move_as_ok(), client_only, SelfId, adnl, std::move(promise))
|
||||
.release();
|
||||
});
|
||||
|
||||
|
@ -374,7 +401,7 @@ void DhtMemberImpl::check() {
|
|||
<< " fvalue=" << find_value_queries_ << " store=" << store_queries_
|
||||
<< " addrlist=" << get_addr_list_queries_;
|
||||
for (auto &bucket : buckets_) {
|
||||
bucket.check(adnl_, actor_id(this), id_);
|
||||
bucket.check(client_only_, adnl_, actor_id(this), id_);
|
||||
}
|
||||
if (next_save_to_db_at_.is_in_past()) {
|
||||
save_to_db();
|
||||
|
@ -469,10 +496,10 @@ void DhtMemberImpl::check() {
|
|||
DhtKeyId key{x};
|
||||
auto P = td::PromiseCreator::lambda([key, promise = std::move(promise), SelfId = actor_id(this),
|
||||
print_id = print_id(), adnl = adnl_, list = get_nearest_nodes(key, k_), k = k_,
|
||||
a = a_, id = id_](td::Result<DhtNode> R) mutable {
|
||||
a = a_, id = id_, client_only = client_only_](td::Result<DhtNode> R) mutable {
|
||||
R.ensure();
|
||||
td::actor::create_actor<DhtQueryFindNodes>("FindNodesQuery", key, print_id, id, std::move(list), k, a,
|
||||
R.move_as_ok(), SelfId, adnl, std::move(promise))
|
||||
R.move_as_ok(), client_only, SelfId, adnl, std::move(promise))
|
||||
.release();
|
||||
});
|
||||
|
||||
|
@ -492,35 +519,39 @@ void DhtMemberImpl::send_store(DhtValue value, td::Promise<td::Unit> promise) {
|
|||
value.check().ensure();
|
||||
auto key_id = value.key_id();
|
||||
|
||||
auto P = td::PromiseCreator::lambda([value = std::move(value), print_id = print_id(), id = id_,
|
||||
list = get_nearest_nodes(key_id, k_), k = k_, a = a_, SelfId = actor_id(this),
|
||||
adnl = adnl_, promise = std::move(promise)](td::Result<DhtNode> R) mutable {
|
||||
R.ensure();
|
||||
td::actor::create_actor<DhtQueryStore>("StoreQuery", std::move(value), print_id, id, std::move(list), k, a,
|
||||
R.move_as_ok(), SelfId, adnl, std::move(promise))
|
||||
.release();
|
||||
});
|
||||
auto P =
|
||||
td::PromiseCreator::lambda([value = std::move(value), print_id = print_id(), id = id_, client_only = client_only_,
|
||||
list = get_nearest_nodes(key_id, k_), k = k_, a = a_, SelfId = actor_id(this),
|
||||
adnl = adnl_, promise = std::move(promise)](td::Result<DhtNode> R) mutable {
|
||||
R.ensure();
|
||||
td::actor::create_actor<DhtQueryStore>("StoreQuery", std::move(value), print_id, id, std::move(list), k, a,
|
||||
R.move_as_ok(), client_only, SelfId, adnl, std::move(promise))
|
||||
.release();
|
||||
});
|
||||
|
||||
get_self_node(std::move(P));
|
||||
}
|
||||
|
||||
void DhtMemberImpl::get_self_node(td::Promise<DhtNode> promise) {
|
||||
auto P = td::PromiseCreator::lambda([promise = std::move(promise), print_id = print_id(), id = id_,
|
||||
keyring = keyring_](td::Result<adnl::AdnlNode> R) mutable {
|
||||
R.ensure();
|
||||
auto node = R.move_as_ok();
|
||||
auto version = static_cast<td::int32>(td::Clocks::system());
|
||||
auto B = create_serialize_tl_object<ton_api::dht_node>(node.pub_id().tl(), node.addr_list().tl(), version,
|
||||
td::BufferSlice());
|
||||
CHECK(node.addr_list().size() > 0);
|
||||
auto P = td::PromiseCreator::lambda(
|
||||
[promise = std::move(promise), node = std::move(node), version](td::Result<td::BufferSlice> R) mutable {
|
||||
R.ensure();
|
||||
DhtNode n{node.pub_id(), node.addr_list(), version, R.move_as_ok()};
|
||||
promise.set_result(std::move(n));
|
||||
});
|
||||
td::actor::send_closure(keyring, &keyring::Keyring::sign_message, id.pubkey_hash(), std::move(B), std::move(P));
|
||||
});
|
||||
auto P =
|
||||
td::PromiseCreator::lambda([promise = std::move(promise), print_id = print_id(), id = id_, keyring = keyring_,
|
||||
client_only = client_only_](td::Result<adnl::AdnlNode> R) mutable {
|
||||
R.ensure();
|
||||
auto node = R.move_as_ok();
|
||||
auto version = static_cast<td::int32>(td::Clocks::system());
|
||||
auto B = create_serialize_tl_object<ton_api::dht_node>(node.pub_id().tl(), node.addr_list().tl(), version,
|
||||
td::BufferSlice());
|
||||
if (!client_only) {
|
||||
CHECK(node.addr_list().size() > 0);
|
||||
}
|
||||
auto P = td::PromiseCreator::lambda(
|
||||
[promise = std::move(promise), node = std::move(node), version](td::Result<td::BufferSlice> R) mutable {
|
||||
R.ensure();
|
||||
DhtNode n{node.pub_id(), node.addr_list(), version, R.move_as_ok()};
|
||||
promise.set_result(std::move(n));
|
||||
});
|
||||
td::actor::send_closure(keyring, &keyring::Keyring::sign_message, id.pubkey_hash(), std::move(B), std::move(P));
|
||||
});
|
||||
td::actor::send_closure(adnl_, &adnl::Adnl::get_self_node, id_, std::move(P));
|
||||
}
|
||||
|
||||
|
|
|
@ -42,6 +42,10 @@ class Dht : public td::actor::Actor {
|
|||
std::shared_ptr<DhtGlobalConfig> conf,
|
||||
td::actor::ActorId<keyring::Keyring> keyring,
|
||||
td::actor::ActorId<adnl::Adnl> adnl);
|
||||
static td::Result<td::actor::ActorOwn<Dht>> create_client(adnl::AdnlNodeIdShort id, std::string db_root,
|
||||
std::shared_ptr<DhtGlobalConfig> conf,
|
||||
td::actor::ActorId<keyring::Keyring> keyring,
|
||||
td::actor::ActorId<adnl::Adnl> adnl);
|
||||
static td::Result<std::shared_ptr<DhtGlobalConfig>> create_global_config(
|
||||
tl_object_ptr<ton_api::dht_config_global> conf);
|
||||
|
||||
|
|
|
@ -85,8 +85,8 @@ class DhtMember : public Dht {
|
|||
|
||||
static td::actor::ActorOwn<DhtMember> create(adnl::AdnlNodeIdShort id, std::string db_root,
|
||||
td::actor::ActorId<keyring::Keyring> keyring,
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::uint32 k = 10,
|
||||
td::uint32 a = 3);
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::uint32 k = 10, td::uint32 a = 3,
|
||||
bool client_only = false);
|
||||
|
||||
//virtual void update_addr_list(tl_object_ptr<ton_api::adnl_addressList> addr_list) = 0;
|
||||
//virtual void add_node(adnl::AdnlNodeIdShort id) = 0;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue