diff --git a/dht/dht-bucket.cpp b/dht/dht-bucket.cpp index 8c1b4dda..f60f9961 100644 --- a/dht/dht-bucket.cpp +++ b/dht/dht-bucket.cpp @@ -66,39 +66,66 @@ td::uint32 DhtBucket::active_cnt() { } td::Status DhtBucket::add_full_node(DhtKeyId id, DhtNode newnode, td::actor::ActorId adnl, - adnl::AdnlNodeIdShort self_id, td::int32 our_network_id) { + adnl::AdnlNodeIdShort self_id, td::int32 our_network_id, bool set_active) { for (auto &node : active_nodes_) { if (node && node->get_key() == id) { - return node->update_value(std::move(newnode), adnl, self_id); + if (set_active) { + return node->receive_ping(std::move(newnode), adnl, self_id); + } else { + return node->update_value(std::move(newnode), adnl, self_id); + } } } - for (auto &node : backup_nodes_) { + for (size_t i = 0; i < backup_nodes_.size(); ++i) { + auto &node = backup_nodes_[i]; if (node && node->get_key() == id) { - return node->update_value(std::move(newnode), adnl, self_id); + if (set_active) { + TRY_STATUS(node->receive_ping(std::move(newnode), adnl, self_id)); + if (node->is_ready()) { + promote_node(i); + } + return td::Status::OK(); + } else { + return node->update_value(std::move(newnode), adnl, self_id); + } } } TRY_RESULT_PREFIX(N, DhtRemoteNode::create(std::move(newnode), max_missed_pings_, our_network_id), "failed to add new node: "); - - for (auto &node : backup_nodes_) { - if (node == nullptr) { - node = std::move(N); - return td::Status::OK(); + if (set_active) { + for (auto &node : active_nodes_) { + if (node == nullptr) { + node = std::move(N); + node->receive_ping(); + return td::Status::OK(); + } } } - for (auto &node : backup_nodes_) { - CHECK(node); - if (node->ready_from() == 0 && node->failed_from() + 60 < td::Time::now_cached()) { - node = std::move(N); - return td::Status::OK(); - } + size_t idx = select_backup_node_to_drop(); + if (idx < backup_nodes_.size()) { + backup_nodes_[idx] = std::move(N); } - return td::Status::OK(); } +size_t DhtBucket::select_backup_node_to_drop() const { + size_t result = backup_nodes_.size(); + for (size_t idx = 0; idx < backup_nodes_.size(); ++idx) { + const auto &node = backup_nodes_[idx]; + if (node == nullptr) { + return idx; + } + if (node->ready_from() == 0 && node->failed_from() + 60 < td::Time::now_cached()) { + if (result == backup_nodes_.size() || node->failed_from() < backup_nodes_[result]->failed_from()) { + result = idx; + } + } + } + return result; +} + void DhtBucket::receive_ping(DhtKeyId id, DhtNode result, td::actor::ActorId adnl, adnl::AdnlNodeIdShort self_id) { for (auto &node : active_nodes_) { @@ -120,17 +147,9 @@ void DhtBucket::receive_ping(DhtKeyId id, DhtNode result, td::actor::ActorIdready_from() == 0 && node->failed_from() + 60 < td::Time::now_cached()) { - node = std::move(active_nodes_[idx]); - return; - } + size_t new_idx = select_backup_node_to_drop(); + if (new_idx < backup_nodes_.size()) { + backup_nodes_[new_idx] = std::move(active_nodes_[idx]); } active_nodes_[idx] = nullptr; } @@ -151,7 +170,7 @@ void DhtBucket::check(bool client_only, td::actor::ActorId adnl, td: size_t have_space = 0; for (size_t i = 0; i < active_nodes_.size(); i++) { auto &node = active_nodes_[i]; - if (node && td::Time::now_cached() - node->last_ping_at() > ping_timeout_) { + if (node && td::Time::now_cached() - node->last_ping_at() > node->ping_interval()) { node->send_ping(client_only, adnl, dht, src); if (node->ready_from() == 0) { demote_node(i); @@ -163,7 +182,7 @@ void DhtBucket::check(bool client_only, td::actor::ActorId adnl, td: } for (size_t i = 0; i < backup_nodes_.size(); i++) { auto &node = backup_nodes_[i]; - if (node && td::Time::now_cached() - node->last_ping_at() > ping_timeout_) { + if (node && td::Time::now_cached() - node->last_ping_at() > node->ping_interval()) { node->send_ping(client_only, adnl, dht, src); } if (node && have_space > 0 && node->is_ready()) { @@ -201,6 +220,9 @@ DhtNodesList DhtBucket::export_nodes() const { list.push_back(node->get_node()); } } + if (list.size() > k_) { + list.list().resize(k_); + } return list; } diff --git a/dht/dht-bucket.hpp b/dht/dht-bucket.hpp index 9c5d82b1..e12fe6a4 100644 --- a/dht/dht-bucket.hpp +++ b/dht/dht-bucket.hpp @@ -31,7 +31,6 @@ class DhtMember; class DhtBucket { private: - double ping_timeout_ = 60; td::uint32 max_missed_pings_ = 3; std::vector> active_nodes_; @@ -43,6 +42,7 @@ class DhtBucket { // const DhtMember::PrintId &print_id); void demote_node(size_t idx); void promote_node(size_t idx); + size_t select_backup_node_to_drop() const; public: DhtBucket(td::uint32 k) : k_(k) { @@ -51,7 +51,7 @@ class DhtBucket { } td::uint32 active_cnt(); td::Status add_full_node(DhtKeyId id, DhtNode node, td::actor::ActorId adnl, - adnl::AdnlNodeIdShort self_id, td::int32 our_network_id); + adnl::AdnlNodeIdShort self_id, td::int32 our_network_id, bool set_active = false); void check(bool client_only, td::actor::ActorId adnl, td::actor::ActorId node, adnl::AdnlNodeIdShort src); void receive_ping(DhtKeyId id, DhtNode result, td::actor::ActorId adnl, adnl::AdnlNodeIdShort self_id); diff --git a/dht/dht-in.hpp b/dht/dht-in.hpp index d81f9a71..59ce2184 100644 --- a/dht/dht-in.hpp +++ b/dht/dht-in.hpp @@ -155,7 +155,10 @@ class DhtMemberImpl : public DhtMember { } } - void add_full_node(DhtKeyId id, DhtNode node) override; + void add_full_node(DhtKeyId id, DhtNode node) override { + add_full_node_impl(id, std::move(node)); + } + void add_full_node_impl(DhtKeyId id, DhtNode node, bool set_active = false); adnl::AdnlNodeIdShort get_id() const override { return id_; diff --git a/dht/dht-remote-node.cpp b/dht/dht-remote-node.cpp index e010bbdc..653de256 100644 --- a/dht/dht-remote-node.cpp +++ b/dht/dht-remote-node.cpp @@ -32,19 +32,39 @@ namespace ton { namespace dht { +static const double PING_INTERVAL_DEFAULT = 60.0; +static const double PING_INTERVAL_MULTIPLIER = 1.1; +static const double PING_INTERVAL_MAX = 3600.0 * 4; + +DhtRemoteNode::DhtRemoteNode(DhtNode node, td::uint32 max_missed_pings, td::int32 our_network_id) + : node_(std::move(node)) + , max_missed_pings_(max_missed_pings) + , our_network_id_(our_network_id) + , ping_interval_(PING_INTERVAL_DEFAULT) { + failed_from_ = td::Time::now_cached(); + id_ = node_.get_key(); +} + td::Status DhtRemoteNode::receive_ping(DhtNode node, td::actor::ActorId adnl, adnl::AdnlNodeIdShort self_id) { TRY_STATUS(update_value(std::move(node), adnl, self_id)); + receive_ping(); + return td::Status::OK(); +} + +void DhtRemoteNode::receive_ping() { missed_pings_ = 0; + ping_interval_ = PING_INTERVAL_DEFAULT; if (ready_from_ == 0) { ready_from_ = td::Time::now_cached(); } - return td::Status::OK(); } td::Status DhtRemoteNode::update_value(DhtNode node, td::actor::ActorId adnl, adnl::AdnlNodeIdShort self_id) { - CHECK(node.adnl_id() == node_.adnl_id()); + if (node.adnl_id() != node_.adnl_id()) { + return td::Status::Error("Wrong adnl id"); + } if (node.version() <= node_.version()) { return td::Status::OK(); } @@ -58,9 +78,12 @@ td::Status DhtRemoteNode::update_value(DhtNode node, td::actor::ActorId adnl, td::actor::ActorId node, adnl::AdnlNodeIdShort src) { missed_pings_++; - if (missed_pings_ > max_missed_pings_ && ready_from_ > 0) { - ready_from_ = 0; - failed_from_ = td::Time::now_cached(); + if (missed_pings_ > max_missed_pings_) { + ping_interval_ = std::min(ping_interval_ * PING_INTERVAL_MULTIPLIER, PING_INTERVAL_MAX); + if (ready_from_ > 0) { + ready_from_ = 0; + failed_from_ = td::Time::now_cached(); + } } last_ping_at_ = td::Time::now_cached(); diff --git a/dht/dht-remote-node.hpp b/dht/dht-remote-node.hpp index f0a67e32..8e7db489 100644 --- a/dht/dht-remote-node.hpp +++ b/dht/dht-remote-node.hpp @@ -45,14 +45,11 @@ class DhtRemoteNode { double last_ping_at_ = 0; double ready_from_ = 0; double failed_from_ = 0; + double ping_interval_; td::int32 version_; public: - DhtRemoteNode(DhtNode node, td::uint32 max_missed_pings, td::int32 our_network_id) - : node_(std::move(node)), max_missed_pings_(max_missed_pings), our_network_id_(our_network_id) { - failed_from_ = td::Time::now_cached(); - id_ = node_.get_key(); - } + DhtRemoteNode(DhtNode node, td::uint32 max_missed_pings, td::int32 our_network_id); static td::Result> create(DhtNode node, td::uint32 max_missed_pings, td::int32 our_network_id); DhtNode get_node() const { @@ -78,9 +75,13 @@ class DhtRemoteNode { double last_ping_at() const { return last_ping_at_; } + double ping_interval() const { + return ping_interval_; + } void send_ping(bool client_only, td::actor::ActorId adnl, td::actor::ActorId node, adnl::AdnlNodeIdShort src); td::Status receive_ping(DhtNode node, td::actor::ActorId adnl, adnl::AdnlNodeIdShort self_id); + void receive_ping(); td::Status update_value(DhtNode node, td::actor::ActorId adnl, adnl::AdnlNodeIdShort self_id); }; diff --git a/dht/dht.cpp b/dht/dht.cpp index a6367a35..e1e20d45 100644 --- a/dht/dht.cpp +++ b/dht/dht.cpp @@ -111,7 +111,7 @@ void DhtMemberImpl::start_up() { auto nodes = std::move(V.move_as_ok()->nodes_); auto s = nodes->nodes_.size(); DhtNodesList list{std::move(nodes), network_id_}; - CHECK(list.size() == s); + CHECK(list.size() <= s); // Some nodes can be dropped due to a wrong network id auto &B = buckets_[bit]; for (auto &node : list.list()) { auto key = node.get_key(); @@ -366,8 +366,12 @@ void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice dat auto N = DhtNode::create(std::move(R.move_as_ok()->node_), network_id_); if (N.is_ok()) { auto node = N.move_as_ok(); - auto key = node.get_key(); - add_full_node(key, std::move(node)); + if (node.adnl_id().compute_short_id() == src) { + auto key = node.get_key(); + add_full_node_impl(key, std::move(node), true); + } else { + VLOG(DHT_WARNING) << this << ": dropping bad node: unexpected adnl id"; + } } else { VLOG(DHT_WARNING) << this << ": dropping bad node " << N.move_as_error(); } @@ -394,7 +398,7 @@ void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice dat ton_api::downcast_call(*Q, [&](auto &object) { this->process_query(src, object, std::move(promise)); }); } -void DhtMemberImpl::add_full_node(DhtKeyId key, DhtNode node) { +void DhtMemberImpl::add_full_node_impl(DhtKeyId key, DhtNode node, bool set_active) { VLOG(DHT_EXTRA_DEBUG) << this << ": adding full node " << key; auto eid = key ^ key_; @@ -406,7 +410,7 @@ void DhtMemberImpl::add_full_node(DhtKeyId key, DhtNode node) { #endif if (bit < 256) { CHECK(key.get_bit(bit) != key_.get_bit(bit)); - buckets_[bit].add_full_node(key, std::move(node), adnl_, id_, network_id_); + buckets_[bit].add_full_node(key, std::move(node), adnl_, id_, network_id_, set_active); } else { CHECK(key == key_); }