diff --git a/dht/dht-in.hpp b/dht/dht-in.hpp index 59ce2184..c2d20455 100644 --- a/dht/dht-in.hpp +++ b/dht/dht-in.hpp @@ -155,10 +155,7 @@ class DhtMemberImpl : public DhtMember { } } - void add_full_node(DhtKeyId id, DhtNode node) override { - add_full_node_impl(id, std::move(node)); - } - void add_full_node_impl(DhtKeyId id, DhtNode node, bool set_active = false); + void add_full_node(DhtKeyId id, DhtNode node, bool set_active) override; adnl::AdnlNodeIdShort get_id() const override { return id_; diff --git a/dht/dht-query.cpp b/dht/dht-query.cpp index bc61242d..b84ef8c3 100644 --- a/dht/dht-query.cpp +++ b/dht/dht-query.cpp @@ -34,24 +34,33 @@ namespace ton { namespace dht { void DhtQuery::send_queries() { + while (pending_queries_.size() > k_ * 2) { + pending_queries_.erase(--pending_queries_.end()); + } VLOG(DHT_EXTRA_DEBUG) << this << ": sending new queries. active=" << active_queries_ << " max_active=" << a_; - while (pending_ids_.size() > 0 && active_queries_ < a_) { + while (pending_queries_.size() > 0 && active_queries_ < a_) { + auto id_xor = *pending_queries_.begin(); + if (result_list_.size() == k_ && *result_list_.rbegin() < id_xor) { + break; + } active_queries_++; - auto id_xor = *pending_ids_.begin(); auto id = id_xor ^ key_; VLOG(DHT_EXTRA_DEBUG) << this << ": sending " << get_name() << " query to " << id; - pending_ids_.erase(id_xor); + pending_queries_.erase(id_xor); - auto it = list_.find(id_xor); - CHECK(it != list_.end()); - td::actor::send_closure(adnl_, &adnl::Adnl::add_peer, get_src(), it->second.adnl_id(), it->second.addr_list()); + auto it = nodes_.find(id_xor); + CHECK(it != nodes_.end()); + td::actor::send_closure(adnl_, &adnl::Adnl::add_peer, get_src(), it->second.node.adnl_id(), + it->second.node.addr_list()); send_one_query(id.to_adnl()); } if (active_queries_ == 0) { - CHECK(pending_ids_.size() == 0); + pending_queries_.clear(); DhtNodesList list; - for (auto &node : list_) { - list.push_back(std::move(node.second)); + for (auto id_xor : result_list_) { + auto it = nodes_.find(id_xor); + CHECK(it != nodes_.end()); + list.push_back(it->second.node.clone()); } CHECK(list.size() <= k_); VLOG(DHT_EXTRA_DEBUG) << this << ": finalizing " << get_name() << " query. List size=" << list.size(); @@ -65,30 +74,32 @@ void DhtQuery::add_nodes(DhtNodesList list) { for (auto &node : list.list()) { auto id = node.get_key(); auto id_xor = key_ ^ id; - if (list_.find(id_xor) != list_.end()) { + if (nodes_.find(id_xor) != nodes_.end()) { continue; } - td::actor::send_closure(node_, &DhtMember::add_full_node, id, node.clone()); + VLOG(DHT_EXTRA_DEBUG) << this << ": " << get_name() << " query: adding " << id << " key"; + td::actor::send_closure(node_, &DhtMember::add_full_node, id, node.clone(), false); + nodes_[id_xor].node = std::move(node); + pending_queries_.insert(id_xor); + } +} - DhtKeyId last_id_xor; - if (list_.size() > 0) { - last_id_xor = list_.rbegin()->first; +void DhtQuery::finish_query(adnl::AdnlNodeIdShort id, bool success) { + active_queries_--; + CHECK(active_queries_ <= k_); + auto id_xor = key_ ^ DhtKeyId(id); + if (success) { + result_list_.insert(id_xor); + if (result_list_.size() > k_) { + result_list_.erase(--result_list_.end()); } - - if (list_.size() < k_ || id_xor < last_id_xor) { - list_[id_xor] = std::move(node); - pending_ids_.insert(id_xor); - if (list_.size() > k_) { - CHECK(id_xor != last_id_xor); - VLOG(DHT_EXTRA_DEBUG) << this << ": " << get_name() << " query: replacing " << (last_id_xor ^ key_) - << " key with " << id; - pending_ids_.erase(last_id_xor); - list_.erase(last_id_xor); - } else { - VLOG(DHT_EXTRA_DEBUG) << this << ": " << get_name() << " query: adding " << id << " key"; - } + } else { + NodeInfo &info = nodes_[id_xor]; + if (++info.failed_attempts < MAX_ATTEMPTS) { + pending_queries_.insert(id_xor); } } + send_queries(); } void DhtQueryFindNodes::send_one_query(adnl::AdnlNodeIdShort id) { @@ -111,7 +122,7 @@ void DhtQueryFindNodes::send_one_query(adnl::AdnlNodeIdShort id) { void DhtQueryFindNodes::on_result(td::Result R, adnl::AdnlNodeIdShort dst) { if (R.is_error()) { VLOG(DHT_INFO) << this << ": failed find nodes query " << get_src() << "->" << dst << ": " << R.move_as_error(); - finish_query(); + finish_query(dst, false); return; } @@ -122,7 +133,7 @@ void DhtQueryFindNodes::on_result(td::Result R, adnl::AdnlNodeI } else { add_nodes(DhtNodesList{Res.move_as_ok(), our_network_id()}); } - finish_query(); + finish_query(dst); } void DhtQueryFindNodes::finish(DhtNodesList list) { @@ -166,14 +177,14 @@ void DhtQueryFindValue::send_one_query_nodes(adnl::AdnlNodeIdShort id) { void DhtQueryFindValue::on_result(td::Result R, adnl::AdnlNodeIdShort dst) { if (R.is_error()) { VLOG(DHT_INFO) << this << ": failed find value query " << get_src() << "->" << dst << ": " << R.move_as_error(); - finish_query(); + finish_query(dst, false); return; } auto Res = fetch_tl_object(R.move_as_ok(), true); if (Res.is_error()) { VLOG(DHT_WARNING) << this << ": dropping incorrect answer on dht.findValue query from " << dst << ": " << Res.move_as_error(); - finish_query(); + finish_query(dst, false); return; } @@ -210,26 +221,26 @@ void DhtQueryFindValue::on_result(td::Result R, adnl::AdnlNodeI } else if (send_get_nodes) { send_one_query_nodes(dst); } else { - finish_query(); + finish_query(dst); } } void DhtQueryFindValue::on_result_nodes(td::Result R, adnl::AdnlNodeIdShort dst) { if (R.is_error()) { VLOG(DHT_INFO) << this << ": failed find nodes query " << get_src() << "->" << dst << ": " << R.move_as_error(); - finish_query(); + finish_query(dst, false); return; } auto Res = fetch_tl_object(R.move_as_ok(), true); if (Res.is_error()) { VLOG(DHT_WARNING) << this << ": dropping incorrect answer on dht.findNodes query from " << dst << ": " << Res.move_as_error(); - finish_query(); + finish_query(dst, false); return; } auto r = Res.move_as_ok(); add_nodes(DhtNodesList{create_tl_object(std::move(r->nodes_)), our_network_id()}); - finish_query(); + finish_query(dst); } void DhtQueryFindValue::finish(DhtNodesList list) { @@ -422,14 +433,14 @@ void DhtQueryRequestReversePing::send_one_query(adnl::AdnlNodeIdShort id) { void DhtQueryRequestReversePing::on_result(td::Result R, adnl::AdnlNodeIdShort dst) { if (R.is_error()) { VLOG(DHT_INFO) << this << ": failed reverse ping query " << get_src() << "->" << dst << ": " << R.move_as_error(); - finish_query(); + finish_query(dst, false); return; } auto Res = fetch_tl_object(R.move_as_ok(), true); if (Res.is_error()) { VLOG(DHT_WARNING) << this << ": dropping incorrect answer on dht.requestReversePing query from " << dst << ": " << Res.move_as_error(); - finish_query(); + finish_query(dst, false); return; } @@ -441,7 +452,7 @@ void DhtQueryRequestReversePing::on_result(td::Result R, adnl:: }, [&](ton_api::dht_clientNotFound &v) { add_nodes(DhtNodesList{std::move(v.nodes_), our_network_id()}); - finish_query(); + finish_query(dst); })); } diff --git a/dht/dht-query.hpp b/dht/dht-query.hpp index c1db0a0e..e4740361 100644 --- a/dht/dht-query.hpp +++ b/dht/dht-query.hpp @@ -63,11 +63,7 @@ class DhtQuery : public td::actor::Actor { } void send_queries(); void add_nodes(DhtNodesList list); - void finish_query() { - active_queries_--; - CHECK(active_queries_ <= k_); - send_queries(); - } + void finish_query(adnl::AdnlNodeIdShort id, bool success = true); DhtKeyId get_key() const { return key_; } @@ -88,16 +84,22 @@ class DhtQuery : public td::actor::Actor { virtual std::string get_name() const = 0; private: + struct NodeInfo { + DhtNode node; + int failed_attempts = 0; + }; DhtMember::PrintId print_id_; adnl::AdnlNodeIdShort src_; - std::map list_; - std::set pending_ids_; + std::map nodes_; + std::set result_list_, pending_queries_; td::uint32 k_; td::uint32 a_; td::int32 our_network_id_; td::actor::ActorId node_; td::uint32 active_queries_ = 0; + static const int MAX_ATTEMPTS = 1; + protected: td::actor::ActorId adnl_; }; diff --git a/dht/dht.cpp b/dht/dht.cpp index e1e20d45..8d7b02b7 100644 --- a/dht/dht.cpp +++ b/dht/dht.cpp @@ -57,7 +57,7 @@ td::Result> Dht::create(adnl::AdnlNodeIdShort id, std:: for (auto &node : nodes.list()) { auto key = node.get_key(); - td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone()); + td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone(), true); } return std::move(D); } @@ -74,7 +74,7 @@ td::Result> Dht::create_client(adnl::AdnlNodeIdShort id for (auto &node : nodes.list()) { auto key = node.get_key(); - td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone()); + td::actor::send_closure(D, &DhtMember::add_full_node, key, node.clone(), true); } return std::move(D); } @@ -368,7 +368,7 @@ void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice dat auto node = N.move_as_ok(); if (node.adnl_id().compute_short_id() == src) { auto key = node.get_key(); - add_full_node_impl(key, std::move(node), true); + add_full_node(key, std::move(node), true); } else { VLOG(DHT_WARNING) << this << ": dropping bad node: unexpected adnl id"; } @@ -398,7 +398,7 @@ void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice dat ton_api::downcast_call(*Q, [&](auto &object) { this->process_query(src, object, std::move(promise)); }); } -void DhtMemberImpl::add_full_node_impl(DhtKeyId key, DhtNode node, bool set_active) { +void DhtMemberImpl::add_full_node(DhtKeyId key, DhtNode node, bool set_active) { VLOG(DHT_EXTRA_DEBUG) << this << ": adding full node " << key; auto eid = key ^ key_; @@ -466,7 +466,7 @@ void DhtMemberImpl::set_value(DhtValue value, td::Promise promise) { void DhtMemberImpl::get_value_in(DhtKeyId key, td::Promise result) { auto P = td::PromiseCreator::lambda([key, promise = std::move(result), SelfId = actor_id(this), print_id = print_id(), - adnl = adnl_, list = get_nearest_nodes(key, k_), k = k_, a = a_, + adnl = adnl_, list = get_nearest_nodes(key, k_ * 2), k = k_, a = a_, network_id = network_id_, id = id_, client_only = client_only_](td::Result R) mutable { R.ensure(); @@ -485,7 +485,7 @@ void DhtMemberImpl::register_reverse_connection(adnl::AdnlNodeIdFull client, td: auto key_id = get_reverse_connection_key(client_short).compute_key_id(); td::actor::send_closure(keyring_, &keyring::Keyring::sign_message, client_short.pubkey_hash(), register_reverse_connection_to_sign(client_short, id_, ttl), - [=, print_id = print_id(), list = get_nearest_nodes(key_id, k_), SelfId = actor_id(this), + [=, print_id = print_id(), list = get_nearest_nodes(key_id, k_ * 2), SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { TRY_RESULT_PROMISE_PREFIX(promise, signature, std::move(R), "Failed to sign: "); td::actor::send_closure(SelfId, &DhtMemberImpl::get_self_node, @@ -532,7 +532,7 @@ void DhtMemberImpl::request_reverse_ping_cont(adnl::AdnlNode target, td::BufferS } auto key_id = get_reverse_connection_key(client).compute_key_id(); get_self_node([=, target = std::move(target), signature = std::move(signature), promise = std::move(promise), - SelfId = actor_id(this), print_id = print_id(), list = get_nearest_nodes(key_id, k_), + SelfId = actor_id(this), print_id = print_id(), list = get_nearest_nodes(key_id, k_ * 2), client_only = client_only_](td::Result R) mutable { R.ensure(); td::actor::create_actor( @@ -651,8 +651,8 @@ void DhtMemberImpl::check() { DhtKeyId key{x}; auto P = td::PromiseCreator::lambda([key, promise = std::move(promise), SelfId = actor_id(this), - print_id = print_id(), adnl = adnl_, list = get_nearest_nodes(key, k_), k = k_, - a = a_, network_id = network_id_, id = id_, + print_id = print_id(), adnl = adnl_, list = get_nearest_nodes(key, k_ * 2), + k = k_, a = a_, network_id = network_id_, id = id_, client_only = client_only_](td::Result R) mutable { R.ensure(); td::actor::create_actor("FindNodesQuery", key, print_id, id, std::move(list), k, a, network_id, @@ -677,8 +677,8 @@ void DhtMemberImpl::send_store(DhtValue value, td::Promise promise) { auto key_id = value.key_id(); auto P = td::PromiseCreator::lambda([value = std::move(value), print_id = print_id(), id = id_, - client_only = client_only_, list = get_nearest_nodes(key_id, k_), k = k_, a = a_, - network_id = network_id_, SelfId = actor_id(this), adnl = adnl_, + client_only = client_only_, list = get_nearest_nodes(key_id, k_ * 2), k = k_, + a = a_, network_id = network_id_, SelfId = actor_id(this), adnl = adnl_, promise = std::move(promise)](td::Result R) mutable { R.ensure(); td::actor::create_actor("StoreQuery", std::move(value), print_id, id, std::move(list), k, a, diff --git a/dht/dht.hpp b/dht/dht.hpp index 0b46d635..9fb05e08 100644 --- a/dht/dht.hpp +++ b/dht/dht.hpp @@ -95,7 +95,7 @@ class DhtMember : public Dht { //virtual void update_addr_list(tl_object_ptr addr_list) = 0; //virtual void add_node(adnl::AdnlNodeIdShort id) = 0; - virtual void add_full_node(DhtKeyId id, DhtNode node) = 0; + virtual void add_full_node(DhtKeyId id, DhtNode node, bool set_active) = 0; virtual void receive_ping(DhtKeyId id, DhtNode result) = 0; diff --git a/overlay/overlay-manager.cpp b/overlay/overlay-manager.cpp index 43192190..3c5f5eab 100644 --- a/overlay/overlay-manager.cpp +++ b/overlay/overlay-manager.cpp @@ -93,17 +93,17 @@ void OverlayManager::create_public_overlay(adnl::AdnlNodeIdShort local_id, Overl std::unique_ptr callback, OverlayPrivacyRules rules, td::string scope) { create_public_overlay_ex(local_id, std::move(overlay_id), std::move(callback), std::move(rules), std::move(scope), - true); + {}); } void OverlayManager::create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::unique_ptr callback, OverlayPrivacyRules rules, - td::string scope, bool announce_self) { + td::string scope, OverlayOptions opts) { CHECK(!dht_node_.empty()); auto id = overlay_id.compute_short_id(); register_overlay(local_id, id, Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id), - std::move(callback), std::move(rules), scope, announce_self)); + std::move(callback), std::move(rules), scope, std::move(opts))); } void OverlayManager::create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, diff --git a/overlay/overlay-manager.h b/overlay/overlay-manager.h index fe1166ac..035ef3e8 100644 --- a/overlay/overlay-manager.h +++ b/overlay/overlay-manager.h @@ -54,7 +54,7 @@ class OverlayManager : public Overlays { std::unique_ptr callback, OverlayPrivacyRules rules, td::string scope) override; void create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::unique_ptr callback, OverlayPrivacyRules rules, td::string scope, - bool announce_self) override; + OverlayOptions opts) override; void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, std::unique_ptr callback, OverlayPrivacyRules rules) override; diff --git a/overlay/overlay.cpp b/overlay/overlay.cpp index fcf766fe..af01e045 100644 --- a/overlay/overlay.cpp +++ b/overlay/overlay.cpp @@ -37,10 +37,10 @@ td::actor::ActorOwn Overlay::create(td::actor::ActorId manager, td::actor::ActorId dht_node, adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::unique_ptr callback, - OverlayPrivacyRules rules, td::string scope, bool announce_self) { + OverlayPrivacyRules rules, td::string scope, OverlayOptions opts) { auto R = td::actor::create_actor("overlay", keyring, adnl, manager, dht_node, local_id, std::move(overlay_id), true, std::vector(), - std::move(callback), std::move(rules), scope, announce_self); + std::move(callback), std::move(rules), scope, opts); return td::actor::ActorOwn(std::move(R)); } @@ -60,7 +60,7 @@ OverlayImpl::OverlayImpl(td::actor::ActorId keyring, td::actor td::actor::ActorId manager, td::actor::ActorId dht_node, adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, bool pub, std::vector nodes, std::unique_ptr callback, - OverlayPrivacyRules rules, td::string scope, bool announce_self) + OverlayPrivacyRules rules, td::string scope, OverlayOptions opts) : keyring_(keyring) , adnl_(adnl) , manager_(manager) @@ -71,7 +71,8 @@ OverlayImpl::OverlayImpl(td::actor::ActorId keyring, td::actor , public_(pub) , rules_(std::move(rules)) , scope_(scope) - , announce_self_(announce_self) { + , announce_self_(opts.announce_self_) + , frequent_dht_lookup_(opts.frequent_dht_lookup_) { overlay_id_ = id_full_.compute_short_id(); VLOG(OVERLAY_INFO) << this << ": creating " << (public_ ? "public" : "private"); @@ -279,13 +280,13 @@ void OverlayImpl::alarm() { send_random_peers(P->get_id(), {}); } } - if (next_dht_query_.is_in_past()) { + if (next_dht_query_ && next_dht_query_.is_in_past()) { + next_dht_query_ = td::Timestamp::never(); auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result res) { td::actor::send_closure(SelfId, &OverlayImpl::receive_dht_nodes, std::move(res), true); }); td::actor::send_closure(dht_node_, &dht::Dht::get_value, dht::DhtKey{overlay_id_.pubkey_hash(), "nodes", 0}, std::move(P)); - next_dht_query_ = td::Timestamp::in(td::Random::fast(60.0, 100.0)); } if (update_db_at_.is_in_past()) { if (peers_.size() > 0) { @@ -333,7 +334,13 @@ void OverlayImpl::receive_dht_nodes(td::Result res, bool dummy) { VLOG(OVERLAY_NOTICE) << this << ": can not get value from DHT: " << res.move_as_error(); } + if (!(next_dht_store_query_ && next_dht_store_query_.is_in_past())) { + finish_dht_query(); + return; + } + next_dht_store_query_ = td::Timestamp::never(); if (!announce_self_) { + finish_dht_query(); return; } @@ -341,6 +348,7 @@ void OverlayImpl::receive_dht_nodes(td::Result res, bool dummy) { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), oid = print_id()](td::Result R) { if (R.is_error()) { LOG(ERROR) << oid << "cannot get self node"; + td::actor::send_closure(SelfId, &OverlayImpl::finish_dht_query); return; } td::actor::send_closure(SelfId, &OverlayImpl::update_dht_nodes, R.move_as_ok()); @@ -365,10 +373,11 @@ void OverlayImpl::update_dht_nodes(OverlayNode node) { static_cast(td::Clocks::system() + 3600), td::BufferSlice()}; value.check().ensure(); - auto P = td::PromiseCreator::lambda([oid = print_id()](td::Result res) { + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), oid = print_id()](td::Result res) { if (res.is_error()) { VLOG(OVERLAY_NOTICE) << oid << ": error storing to DHT: " << res.move_as_error(); } + td::actor::send_closure(SelfId, &OverlayImpl::finish_dht_query); }); td::actor::send_closure(dht_node_, &dht::Dht::set_value, std::move(value), std::move(P)); diff --git a/overlay/overlay.h b/overlay/overlay.h index a5f7b3a4..da41a247 100644 --- a/overlay/overlay.h +++ b/overlay/overlay.h @@ -42,7 +42,7 @@ class Overlay : public td::actor::Actor { td::actor::ActorId manager, td::actor::ActorId dht_node, adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::unique_ptr callback, - OverlayPrivacyRules rules, td::string scope, bool announce_self = true); + OverlayPrivacyRules rules, td::string scope, OverlayOptions opts = {}); static td::actor::ActorOwn create(td::actor::ActorId keyring, td::actor::ActorId adnl, td::actor::ActorId manager, diff --git a/overlay/overlay.hpp b/overlay/overlay.hpp index 86d37d5b..90fcc43d 100644 --- a/overlay/overlay.hpp +++ b/overlay/overlay.hpp @@ -82,12 +82,17 @@ class OverlayPeer { void on_ping_result(bool success) { if (success) { missed_pings_ = 0; + last_ping_at_ = td::Timestamp::now(); + is_alive_ = true; } else { ++missed_pings_; + if (missed_pings_ >= 3 && last_ping_at_.is_in_past(td::Timestamp::in(-15.0))) { + is_alive_ = false; + } } } bool is_alive() const { - return missed_pings_ < 3; + return is_alive_; } td::uint32 throughput_out_bytes = 0; @@ -116,6 +121,8 @@ class OverlayPeer { bool is_neighbour_ = false; size_t missed_pings_ = 0; + bool is_alive_ = true; + td::Timestamp last_ping_at_ = td::Timestamp::now(); }; class OverlayImpl : public Overlay { @@ -124,7 +131,7 @@ class OverlayImpl : public Overlay { td::actor::ActorId manager, td::actor::ActorId dht_node, adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, bool pub, std::vector nodes, std::unique_ptr callback, - OverlayPrivacyRules rules, td::string scope = "{ \"type\": \"undefined\" }", bool announce_self = true); + OverlayPrivacyRules rules, td::string scope = "{ \"type\": \"undefined\" }", OverlayOptions opts = {}); void update_dht_node(td::actor::ActorId dht) override { dht_node_ = dht; } @@ -295,6 +302,17 @@ class OverlayImpl : public Overlay { void del_peer(adnl::AdnlNodeIdShort id); OverlayPeer *get_random_peer(bool only_alive = false); + void finish_dht_query() { + if (!next_dht_store_query_) { + next_dht_store_query_ = td::Timestamp::in(td::Random::fast(60.0, 100.0)); + } + if (frequent_dht_lookup_ && peers_.size() == bad_peers_.size()) { + next_dht_query_ = td::Timestamp::in(td::Random::fast(6.0, 10.0)); + } else { + next_dht_query_ = next_dht_store_query_; + } + } + td::actor::ActorId keyring_; td::actor::ActorId adnl_; td::actor::ActorId manager_; @@ -305,6 +323,7 @@ class OverlayImpl : public Overlay { td::DecTree peers_; td::Timestamp next_dht_query_ = td::Timestamp::in(1.0); + td::Timestamp next_dht_store_query_ = td::Timestamp::in(1.0); td::Timestamp update_db_at_; td::Timestamp update_throughput_at_; td::Timestamp last_throughput_update_; @@ -367,6 +386,7 @@ class OverlayImpl : public Overlay { OverlayPrivacyRules rules_; td::string scope_; bool announce_self_ = true; + bool frequent_dht_lookup_ = false; std::map> certs_; class CachedEncryptor : public td::ListNode { diff --git a/overlay/overlays.h b/overlay/overlays.h index e12bbbdb..79551e05 100644 --- a/overlay/overlays.h +++ b/overlay/overlays.h @@ -158,6 +158,11 @@ class Certificate { td::SharedSlice signature_; }; +struct OverlayOptions { + bool announce_self_ = true; + bool frequent_dht_lookup_ = false; +}; + class Overlays : public td::actor::Actor { public: class Callback { @@ -197,7 +202,7 @@ class Overlays : public td::actor::Actor { td::string scope) = 0; virtual void create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::unique_ptr callback, OverlayPrivacyRules rules, - td::string scope, bool announce_self) = 0; + td::string scope, OverlayOptions opts) = 0; virtual void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, std::unique_ptr callback, OverlayPrivacyRules rules) = 0; diff --git a/rldp-http-proxy/rldp-http-proxy.cpp b/rldp-http-proxy/rldp-http-proxy.cpp index e9186822..0d518d6d 100644 --- a/rldp-http-proxy/rldp-http-proxy.cpp +++ b/rldp-http-proxy/rldp-http-proxy.cpp @@ -54,6 +54,7 @@ #include "git.h" #include "td/utils/BufferedFd.h" #include "common/delay.h" +#include "td/utils/port/path.h" #include "tonlib/tonlib/TonlibClientWrapper.h" #include "DNSResolver.h" @@ -920,6 +921,12 @@ class RldpHttpProxy : public td::actor::Actor { } void run() { + if (!db_root_.empty()) { + td::mkpath(db_root_ + "/").ensure(); + } else if (!is_client_) { + LOG(ERROR) << "DB root is required for server proxy"; + std::_Exit(2); + } keyring_ = ton::keyring::Keyring::create(is_client_ ? std::string("") : (db_root_ + "/keyring")); { auto S = load_global_config(); @@ -955,9 +962,16 @@ class RldpHttpProxy : public td::actor::Actor { auto conf_dataR = td::read_file(global_config_); conf_dataR.ensure(); + ton::tl_object_ptr key_store; + if (db_root_.empty()) { + key_store = tonlib_api::make_object(); + } else { + td::mkpath(db_root_ + "/tonlib-cache/").ensure(); + key_store = tonlib_api::make_object(db_root_ + "/tonlib-cache/"); + } auto tonlib_options = tonlib_api::make_object( tonlib_api::make_object(conf_dataR.move_as_ok().as_slice().str(), "", false, false), - tonlib_api::make_object()); + std::move(key_store)); tonlib_client_ = td::actor::create_actor("tonlibclient", std::move(tonlib_options)); dns_resolver_ = td::actor::create_actor("dnsresolver", tonlib_client_.get()); } diff --git a/storage/PeerActor.cpp b/storage/PeerActor.cpp index 0cb21c0a..48d45626 100644 --- a/storage/PeerActor.cpp +++ b/storage/PeerActor.cpp @@ -25,6 +25,7 @@ #include "td/utils/overloaded.h" #include "td/utils/Random.h" #include "vm/boc.h" +#include "common/delay.h" namespace ton { @@ -119,9 +120,9 @@ void PeerActor::on_get_piece_result(PartId piece_id, td::Result return std::move(res); }(); if (res.is_error()) { - LOG(DEBUG) << "getPiece " << piece_id << "query: " << res.error(); + LOG(DEBUG) << "getPiece " << piece_id << " query: " << res.error(); } else { - LOG(DEBUG) << "getPiece " << piece_id << "query: OK"; + LOG(DEBUG) << "getPiece " << piece_id << " query: OK"; } state_->node_queries_results_.add_element(std::make_pair(piece_id, std::move(res))); notify_node(); @@ -343,11 +344,20 @@ void PeerActor::loop_node_get_piece() { } auto piece_size = std::min(torrent_info_->piece_size, torrent_info_->file_size - part * torrent_info_->piece_size); - td::actor::send_closure(state_->speed_limiters_.download, &SpeedLimiter::enqueue, (double)piece_size, - td::Timestamp::in(3.0), [part, SelfId = actor_id(this)](td::Result R) { - td::actor::send_closure(SelfId, &PeerActor::node_get_piece_query_ready, part, - std::move(R)); - }); + td::Timestamp timeout = td::Timestamp::in(3.0); + td::actor::send_closure( + state_->speed_limiters_.download, &SpeedLimiter::enqueue, (double)piece_size, timeout, + [=, SelfId = actor_id(this)](td::Result R) { + if (R.is_ok()) { + td::actor::send_closure(SelfId, &PeerActor::node_get_piece_query_ready, part, std::move(R)); + } else { + delay_action( + [=, R = std::move(R)]() mutable { + td::actor::send_closure(SelfId, &PeerActor::node_get_piece_query_ready, part, std::move(R)); + }, + timeout); + } + }); } } diff --git a/storage/PeerManager.h b/storage/PeerManager.h index 52297ac5..38d1494b 100644 --- a/storage/PeerManager.h +++ b/storage/PeerManager.h @@ -143,9 +143,11 @@ class PeerManager : public td::actor::Actor { td::actor::ActorId peer_manager_; ton::adnl::AdnlNodeIdShort dst_; }; + ton::overlay::OverlayOptions opts; + opts.announce_self_ = !client_mode_; + opts.frequent_dht_lookup_ = true; send_closure(overlays_, &ton::overlay::Overlays::create_public_overlay_ex, src_id, overlay_id_.clone(), - std::make_unique(actor_id(this), src_id), rules, R"({ "type": "storage" })", - !client_mode_); + std::make_unique(actor_id(this), src_id), rules, R"({ "type": "storage" })", opts); } promise.set_value({}); } diff --git a/storage/SpeedLimiter.cpp b/storage/SpeedLimiter.cpp index 952005fe..704c7402 100644 --- a/storage/SpeedLimiter.cpp +++ b/storage/SpeedLimiter.cpp @@ -16,6 +16,7 @@ */ #include "SpeedLimiter.h" +#include "common/errorcode.h" namespace ton { @@ -41,11 +42,11 @@ void SpeedLimiter::enqueue(double size, td::Timestamp timeout, td::Promise( tonlib_api::make_object(r_conf_data.move_as_ok().as_slice().str(), "", false, false), - tonlib_api::make_object()); + tonlib_api::make_object(key_store)); tonlib_client_ = td::actor::create_actor("tonlibclient", std::move(tonlib_options)); }