mirror of
https://github.com/ton-blockchain/ton
synced 2025-02-14 12:12:21 +00:00
Improve handling backup nodes in dht (#562)
This commit is contained in:
parent
360ef54e6b
commit
dd9cdba587
6 changed files with 100 additions and 47 deletions
|
@ -66,39 +66,66 @@ td::uint32 DhtBucket::active_cnt() {
|
||||||
}
|
}
|
||||||
|
|
||||||
td::Status DhtBucket::add_full_node(DhtKeyId id, DhtNode newnode, td::actor::ActorId<adnl::Adnl> adnl,
|
td::Status DhtBucket::add_full_node(DhtKeyId id, DhtNode newnode, td::actor::ActorId<adnl::Adnl> adnl,
|
||||||
adnl::AdnlNodeIdShort self_id, td::int32 our_network_id) {
|
adnl::AdnlNodeIdShort self_id, td::int32 our_network_id, bool set_active) {
|
||||||
for (auto &node : active_nodes_) {
|
for (auto &node : active_nodes_) {
|
||||||
if (node && node->get_key() == id) {
|
if (node && node->get_key() == id) {
|
||||||
return node->update_value(std::move(newnode), adnl, self_id);
|
if (set_active) {
|
||||||
|
return node->receive_ping(std::move(newnode), adnl, self_id);
|
||||||
|
} else {
|
||||||
|
return node->update_value(std::move(newnode), adnl, self_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (auto &node : backup_nodes_) {
|
for (size_t i = 0; i < backup_nodes_.size(); ++i) {
|
||||||
|
auto &node = backup_nodes_[i];
|
||||||
if (node && node->get_key() == id) {
|
if (node && node->get_key() == id) {
|
||||||
return node->update_value(std::move(newnode), adnl, self_id);
|
if (set_active) {
|
||||||
|
TRY_STATUS(node->receive_ping(std::move(newnode), adnl, self_id));
|
||||||
|
if (node->is_ready()) {
|
||||||
|
promote_node(i);
|
||||||
|
}
|
||||||
|
return td::Status::OK();
|
||||||
|
} else {
|
||||||
|
return node->update_value(std::move(newnode), adnl, self_id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TRY_RESULT_PREFIX(N, DhtRemoteNode::create(std::move(newnode), max_missed_pings_, our_network_id),
|
TRY_RESULT_PREFIX(N, DhtRemoteNode::create(std::move(newnode), max_missed_pings_, our_network_id),
|
||||||
"failed to add new node: ");
|
"failed to add new node: ");
|
||||||
|
if (set_active) {
|
||||||
for (auto &node : backup_nodes_) {
|
for (auto &node : active_nodes_) {
|
||||||
if (node == nullptr) {
|
if (node == nullptr) {
|
||||||
node = std::move(N);
|
node = std::move(N);
|
||||||
return td::Status::OK();
|
node->receive_ping();
|
||||||
|
return td::Status::OK();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto &node : backup_nodes_) {
|
size_t idx = select_backup_node_to_drop();
|
||||||
CHECK(node);
|
if (idx < backup_nodes_.size()) {
|
||||||
if (node->ready_from() == 0 && node->failed_from() + 60 < td::Time::now_cached()) {
|
backup_nodes_[idx] = std::move(N);
|
||||||
node = std::move(N);
|
|
||||||
return td::Status::OK();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t DhtBucket::select_backup_node_to_drop() const {
|
||||||
|
size_t result = backup_nodes_.size();
|
||||||
|
for (size_t idx = 0; idx < backup_nodes_.size(); ++idx) {
|
||||||
|
const auto &node = backup_nodes_[idx];
|
||||||
|
if (node == nullptr) {
|
||||||
|
return idx;
|
||||||
|
}
|
||||||
|
if (node->ready_from() == 0 && node->failed_from() + 60 < td::Time::now_cached()) {
|
||||||
|
if (result == backup_nodes_.size() || node->failed_from() < backup_nodes_[result]->failed_from()) {
|
||||||
|
result = idx;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
void DhtBucket::receive_ping(DhtKeyId id, DhtNode result, td::actor::ActorId<adnl::Adnl> adnl,
|
void DhtBucket::receive_ping(DhtKeyId id, DhtNode result, td::actor::ActorId<adnl::Adnl> adnl,
|
||||||
adnl::AdnlNodeIdShort self_id) {
|
adnl::AdnlNodeIdShort self_id) {
|
||||||
for (auto &node : active_nodes_) {
|
for (auto &node : active_nodes_) {
|
||||||
|
@ -120,17 +147,9 @@ void DhtBucket::receive_ping(DhtKeyId id, DhtNode result, td::actor::ActorId<adn
|
||||||
}
|
}
|
||||||
|
|
||||||
void DhtBucket::demote_node(size_t idx) {
|
void DhtBucket::demote_node(size_t idx) {
|
||||||
for (auto &node : backup_nodes_) {
|
size_t new_idx = select_backup_node_to_drop();
|
||||||
if (node == nullptr) {
|
if (new_idx < backup_nodes_.size()) {
|
||||||
node = std::move(active_nodes_[idx]);
|
backup_nodes_[new_idx] = std::move(active_nodes_[idx]);
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (auto &node : backup_nodes_) {
|
|
||||||
if (node->ready_from() == 0 && node->failed_from() + 60 < td::Time::now_cached()) {
|
|
||||||
node = std::move(active_nodes_[idx]);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
active_nodes_[idx] = nullptr;
|
active_nodes_[idx] = nullptr;
|
||||||
}
|
}
|
||||||
|
@ -151,7 +170,7 @@ void DhtBucket::check(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td:
|
||||||
size_t have_space = 0;
|
size_t have_space = 0;
|
||||||
for (size_t i = 0; i < active_nodes_.size(); i++) {
|
for (size_t i = 0; i < active_nodes_.size(); i++) {
|
||||||
auto &node = active_nodes_[i];
|
auto &node = active_nodes_[i];
|
||||||
if (node && td::Time::now_cached() - node->last_ping_at() > ping_timeout_) {
|
if (node && td::Time::now_cached() - node->last_ping_at() > node->ping_interval()) {
|
||||||
node->send_ping(client_only, adnl, dht, src);
|
node->send_ping(client_only, adnl, dht, src);
|
||||||
if (node->ready_from() == 0) {
|
if (node->ready_from() == 0) {
|
||||||
demote_node(i);
|
demote_node(i);
|
||||||
|
@ -163,7 +182,7 @@ void DhtBucket::check(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td:
|
||||||
}
|
}
|
||||||
for (size_t i = 0; i < backup_nodes_.size(); i++) {
|
for (size_t i = 0; i < backup_nodes_.size(); i++) {
|
||||||
auto &node = backup_nodes_[i];
|
auto &node = backup_nodes_[i];
|
||||||
if (node && td::Time::now_cached() - node->last_ping_at() > ping_timeout_) {
|
if (node && td::Time::now_cached() - node->last_ping_at() > node->ping_interval()) {
|
||||||
node->send_ping(client_only, adnl, dht, src);
|
node->send_ping(client_only, adnl, dht, src);
|
||||||
}
|
}
|
||||||
if (node && have_space > 0 && node->is_ready()) {
|
if (node && have_space > 0 && node->is_ready()) {
|
||||||
|
@ -201,6 +220,9 @@ DhtNodesList DhtBucket::export_nodes() const {
|
||||||
list.push_back(node->get_node());
|
list.push_back(node->get_node());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (list.size() > k_) {
|
||||||
|
list.list().resize(k_);
|
||||||
|
}
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,6 @@ class DhtMember;
|
||||||
|
|
||||||
class DhtBucket {
|
class DhtBucket {
|
||||||
private:
|
private:
|
||||||
double ping_timeout_ = 60;
|
|
||||||
td::uint32 max_missed_pings_ = 3;
|
td::uint32 max_missed_pings_ = 3;
|
||||||
|
|
||||||
std::vector<std::unique_ptr<DhtRemoteNode>> active_nodes_;
|
std::vector<std::unique_ptr<DhtRemoteNode>> active_nodes_;
|
||||||
|
@ -43,6 +42,7 @@ class DhtBucket {
|
||||||
// const DhtMember::PrintId &print_id);
|
// const DhtMember::PrintId &print_id);
|
||||||
void demote_node(size_t idx);
|
void demote_node(size_t idx);
|
||||||
void promote_node(size_t idx);
|
void promote_node(size_t idx);
|
||||||
|
size_t select_backup_node_to_drop() const;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
DhtBucket(td::uint32 k) : k_(k) {
|
DhtBucket(td::uint32 k) : k_(k) {
|
||||||
|
@ -51,7 +51,7 @@ class DhtBucket {
|
||||||
}
|
}
|
||||||
td::uint32 active_cnt();
|
td::uint32 active_cnt();
|
||||||
td::Status add_full_node(DhtKeyId id, DhtNode node, td::actor::ActorId<adnl::Adnl> adnl,
|
td::Status add_full_node(DhtKeyId id, DhtNode node, td::actor::ActorId<adnl::Adnl> adnl,
|
||||||
adnl::AdnlNodeIdShort self_id, td::int32 our_network_id);
|
adnl::AdnlNodeIdShort self_id, td::int32 our_network_id, bool set_active = false);
|
||||||
void check(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node,
|
void check(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node,
|
||||||
adnl::AdnlNodeIdShort src);
|
adnl::AdnlNodeIdShort src);
|
||||||
void receive_ping(DhtKeyId id, DhtNode result, td::actor::ActorId<adnl::Adnl> adnl, adnl::AdnlNodeIdShort self_id);
|
void receive_ping(DhtKeyId id, DhtNode result, td::actor::ActorId<adnl::Adnl> adnl, adnl::AdnlNodeIdShort self_id);
|
||||||
|
|
|
@ -155,7 +155,10 @@ class DhtMemberImpl : public DhtMember {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void add_full_node(DhtKeyId id, DhtNode node) override;
|
void add_full_node(DhtKeyId id, DhtNode node) override {
|
||||||
|
add_full_node_impl(id, std::move(node));
|
||||||
|
}
|
||||||
|
void add_full_node_impl(DhtKeyId id, DhtNode node, bool set_active = false);
|
||||||
|
|
||||||
adnl::AdnlNodeIdShort get_id() const override {
|
adnl::AdnlNodeIdShort get_id() const override {
|
||||||
return id_;
|
return id_;
|
||||||
|
|
|
@ -32,19 +32,39 @@ namespace ton {
|
||||||
|
|
||||||
namespace dht {
|
namespace dht {
|
||||||
|
|
||||||
|
static const double PING_INTERVAL_DEFAULT = 60.0;
|
||||||
|
static const double PING_INTERVAL_MULTIPLIER = 1.1;
|
||||||
|
static const double PING_INTERVAL_MAX = 3600.0 * 4;
|
||||||
|
|
||||||
|
DhtRemoteNode::DhtRemoteNode(DhtNode node, td::uint32 max_missed_pings, td::int32 our_network_id)
|
||||||
|
: node_(std::move(node))
|
||||||
|
, max_missed_pings_(max_missed_pings)
|
||||||
|
, our_network_id_(our_network_id)
|
||||||
|
, ping_interval_(PING_INTERVAL_DEFAULT) {
|
||||||
|
failed_from_ = td::Time::now_cached();
|
||||||
|
id_ = node_.get_key();
|
||||||
|
}
|
||||||
|
|
||||||
td::Status DhtRemoteNode::receive_ping(DhtNode node, td::actor::ActorId<adnl::Adnl> adnl,
|
td::Status DhtRemoteNode::receive_ping(DhtNode node, td::actor::ActorId<adnl::Adnl> adnl,
|
||||||
adnl::AdnlNodeIdShort self_id) {
|
adnl::AdnlNodeIdShort self_id) {
|
||||||
TRY_STATUS(update_value(std::move(node), adnl, self_id));
|
TRY_STATUS(update_value(std::move(node), adnl, self_id));
|
||||||
|
receive_ping();
|
||||||
|
return td::Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
void DhtRemoteNode::receive_ping() {
|
||||||
missed_pings_ = 0;
|
missed_pings_ = 0;
|
||||||
|
ping_interval_ = PING_INTERVAL_DEFAULT;
|
||||||
if (ready_from_ == 0) {
|
if (ready_from_ == 0) {
|
||||||
ready_from_ = td::Time::now_cached();
|
ready_from_ = td::Time::now_cached();
|
||||||
}
|
}
|
||||||
return td::Status::OK();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
td::Status DhtRemoteNode::update_value(DhtNode node, td::actor::ActorId<adnl::Adnl> adnl,
|
td::Status DhtRemoteNode::update_value(DhtNode node, td::actor::ActorId<adnl::Adnl> adnl,
|
||||||
adnl::AdnlNodeIdShort self_id) {
|
adnl::AdnlNodeIdShort self_id) {
|
||||||
CHECK(node.adnl_id() == node_.adnl_id());
|
if (node.adnl_id() != node_.adnl_id()) {
|
||||||
|
return td::Status::Error("Wrong adnl id");
|
||||||
|
}
|
||||||
if (node.version() <= node_.version()) {
|
if (node.version() <= node_.version()) {
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
}
|
}
|
||||||
|
@ -58,9 +78,12 @@ td::Status DhtRemoteNode::update_value(DhtNode node, td::actor::ActorId<adnl::Ad
|
||||||
void DhtRemoteNode::send_ping(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node,
|
void DhtRemoteNode::send_ping(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node,
|
||||||
adnl::AdnlNodeIdShort src) {
|
adnl::AdnlNodeIdShort src) {
|
||||||
missed_pings_++;
|
missed_pings_++;
|
||||||
if (missed_pings_ > max_missed_pings_ && ready_from_ > 0) {
|
if (missed_pings_ > max_missed_pings_) {
|
||||||
ready_from_ = 0;
|
ping_interval_ = std::min(ping_interval_ * PING_INTERVAL_MULTIPLIER, PING_INTERVAL_MAX);
|
||||||
failed_from_ = td::Time::now_cached();
|
if (ready_from_ > 0) {
|
||||||
|
ready_from_ = 0;
|
||||||
|
failed_from_ = td::Time::now_cached();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
last_ping_at_ = td::Time::now_cached();
|
last_ping_at_ = td::Time::now_cached();
|
||||||
|
|
|
@ -45,14 +45,11 @@ class DhtRemoteNode {
|
||||||
double last_ping_at_ = 0;
|
double last_ping_at_ = 0;
|
||||||
double ready_from_ = 0;
|
double ready_from_ = 0;
|
||||||
double failed_from_ = 0;
|
double failed_from_ = 0;
|
||||||
|
double ping_interval_;
|
||||||
td::int32 version_;
|
td::int32 version_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
DhtRemoteNode(DhtNode node, td::uint32 max_missed_pings, td::int32 our_network_id)
|
DhtRemoteNode(DhtNode node, td::uint32 max_missed_pings, td::int32 our_network_id);
|
||||||
: node_(std::move(node)), max_missed_pings_(max_missed_pings), our_network_id_(our_network_id) {
|
|
||||||
failed_from_ = td::Time::now_cached();
|
|
||||||
id_ = node_.get_key();
|
|
||||||
}
|
|
||||||
static td::Result<std::unique_ptr<DhtRemoteNode>> create(DhtNode node, td::uint32 max_missed_pings,
|
static td::Result<std::unique_ptr<DhtRemoteNode>> create(DhtNode node, td::uint32 max_missed_pings,
|
||||||
td::int32 our_network_id);
|
td::int32 our_network_id);
|
||||||
DhtNode get_node() const {
|
DhtNode get_node() const {
|
||||||
|
@ -78,9 +75,13 @@ class DhtRemoteNode {
|
||||||
double last_ping_at() const {
|
double last_ping_at() const {
|
||||||
return last_ping_at_;
|
return last_ping_at_;
|
||||||
}
|
}
|
||||||
|
double ping_interval() const {
|
||||||
|
return ping_interval_;
|
||||||
|
}
|
||||||
void send_ping(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node,
|
void send_ping(bool client_only, td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<DhtMember> node,
|
||||||
adnl::AdnlNodeIdShort src);
|
adnl::AdnlNodeIdShort src);
|
||||||
td::Status receive_ping(DhtNode node, td::actor::ActorId<adnl::Adnl> adnl, adnl::AdnlNodeIdShort self_id);
|
td::Status receive_ping(DhtNode node, td::actor::ActorId<adnl::Adnl> adnl, adnl::AdnlNodeIdShort self_id);
|
||||||
|
void receive_ping();
|
||||||
td::Status update_value(DhtNode node, td::actor::ActorId<adnl::Adnl> adnl, adnl::AdnlNodeIdShort self_id);
|
td::Status update_value(DhtNode node, td::actor::ActorId<adnl::Adnl> adnl, adnl::AdnlNodeIdShort self_id);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
14
dht/dht.cpp
14
dht/dht.cpp
|
@ -111,7 +111,7 @@ void DhtMemberImpl::start_up() {
|
||||||
auto nodes = std::move(V.move_as_ok()->nodes_);
|
auto nodes = std::move(V.move_as_ok()->nodes_);
|
||||||
auto s = nodes->nodes_.size();
|
auto s = nodes->nodes_.size();
|
||||||
DhtNodesList list{std::move(nodes), network_id_};
|
DhtNodesList list{std::move(nodes), network_id_};
|
||||||
CHECK(list.size() == s);
|
CHECK(list.size() <= s); // Some nodes can be dropped due to a wrong network id
|
||||||
auto &B = buckets_[bit];
|
auto &B = buckets_[bit];
|
||||||
for (auto &node : list.list()) {
|
for (auto &node : list.list()) {
|
||||||
auto key = node.get_key();
|
auto key = node.get_key();
|
||||||
|
@ -366,8 +366,12 @@ void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice dat
|
||||||
auto N = DhtNode::create(std::move(R.move_as_ok()->node_), network_id_);
|
auto N = DhtNode::create(std::move(R.move_as_ok()->node_), network_id_);
|
||||||
if (N.is_ok()) {
|
if (N.is_ok()) {
|
||||||
auto node = N.move_as_ok();
|
auto node = N.move_as_ok();
|
||||||
auto key = node.get_key();
|
if (node.adnl_id().compute_short_id() == src) {
|
||||||
add_full_node(key, std::move(node));
|
auto key = node.get_key();
|
||||||
|
add_full_node_impl(key, std::move(node), true);
|
||||||
|
} else {
|
||||||
|
VLOG(DHT_WARNING) << this << ": dropping bad node: unexpected adnl id";
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
VLOG(DHT_WARNING) << this << ": dropping bad node " << N.move_as_error();
|
VLOG(DHT_WARNING) << this << ": dropping bad node " << N.move_as_error();
|
||||||
}
|
}
|
||||||
|
@ -394,7 +398,7 @@ void DhtMemberImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice dat
|
||||||
ton_api::downcast_call(*Q, [&](auto &object) { this->process_query(src, object, std::move(promise)); });
|
ton_api::downcast_call(*Q, [&](auto &object) { this->process_query(src, object, std::move(promise)); });
|
||||||
}
|
}
|
||||||
|
|
||||||
void DhtMemberImpl::add_full_node(DhtKeyId key, DhtNode node) {
|
void DhtMemberImpl::add_full_node_impl(DhtKeyId key, DhtNode node, bool set_active) {
|
||||||
VLOG(DHT_EXTRA_DEBUG) << this << ": adding full node " << key;
|
VLOG(DHT_EXTRA_DEBUG) << this << ": adding full node " << key;
|
||||||
|
|
||||||
auto eid = key ^ key_;
|
auto eid = key ^ key_;
|
||||||
|
@ -406,7 +410,7 @@ void DhtMemberImpl::add_full_node(DhtKeyId key, DhtNode node) {
|
||||||
#endif
|
#endif
|
||||||
if (bit < 256) {
|
if (bit < 256) {
|
||||||
CHECK(key.get_bit(bit) != key_.get_bit(bit));
|
CHECK(key.get_bit(bit) != key_.get_bit(bit));
|
||||||
buckets_[bit].add_full_node(key, std::move(node), adnl_, id_, network_id_);
|
buckets_[bit].add_full_node(key, std::move(node), adnl_, id_, network_id_, set_active);
|
||||||
} else {
|
} else {
|
||||||
CHECK(key == key_);
|
CHECK(key == key_);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue