1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Improve DHT store/load, pinging overlay peers (#840)

* Improve DHT store/load, pinging overlay peers

* Fix speed limits in storage

* Use keyStoreTypeDirectory in rldp-http-proxy and storage-daemon

Mainly for caching synced block in tonlib.

---------

Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
EmelyanenkoK 2023-12-28 09:43:10 +03:00 committed by GitHub
parent c8918f0c02
commit 550c28d7db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 162 additions and 89 deletions

View file

@ -34,24 +34,33 @@ namespace ton {
namespace dht {
void DhtQuery::send_queries() {
while (pending_queries_.size() > k_ * 2) {
pending_queries_.erase(--pending_queries_.end());
}
VLOG(DHT_EXTRA_DEBUG) << this << ": sending new queries. active=" << active_queries_ << " max_active=" << a_;
while (pending_ids_.size() > 0 && active_queries_ < a_) {
while (pending_queries_.size() > 0 && active_queries_ < a_) {
auto id_xor = *pending_queries_.begin();
if (result_list_.size() == k_ && *result_list_.rbegin() < id_xor) {
break;
}
active_queries_++;
auto id_xor = *pending_ids_.begin();
auto id = id_xor ^ key_;
VLOG(DHT_EXTRA_DEBUG) << this << ": sending " << get_name() << " query to " << id;
pending_ids_.erase(id_xor);
pending_queries_.erase(id_xor);
auto it = list_.find(id_xor);
CHECK(it != list_.end());
td::actor::send_closure(adnl_, &adnl::Adnl::add_peer, get_src(), it->second.adnl_id(), it->second.addr_list());
auto it = nodes_.find(id_xor);
CHECK(it != nodes_.end());
td::actor::send_closure(adnl_, &adnl::Adnl::add_peer, get_src(), it->second.node.adnl_id(),
it->second.node.addr_list());
send_one_query(id.to_adnl());
}
if (active_queries_ == 0) {
CHECK(pending_ids_.size() == 0);
pending_queries_.clear();
DhtNodesList list;
for (auto &node : list_) {
list.push_back(std::move(node.second));
for (auto id_xor : result_list_) {
auto it = nodes_.find(id_xor);
CHECK(it != nodes_.end());
list.push_back(it->second.node.clone());
}
CHECK(list.size() <= k_);
VLOG(DHT_EXTRA_DEBUG) << this << ": finalizing " << get_name() << " query. List size=" << list.size();
@ -65,30 +74,32 @@ void DhtQuery::add_nodes(DhtNodesList list) {
for (auto &node : list.list()) {
auto id = node.get_key();
auto id_xor = key_ ^ id;
if (list_.find(id_xor) != list_.end()) {
if (nodes_.find(id_xor) != nodes_.end()) {
continue;
}
td::actor::send_closure(node_, &DhtMember::add_full_node, id, node.clone());
VLOG(DHT_EXTRA_DEBUG) << this << ": " << get_name() << " query: adding " << id << " key";
td::actor::send_closure(node_, &DhtMember::add_full_node, id, node.clone(), false);
nodes_[id_xor].node = std::move(node);
pending_queries_.insert(id_xor);
}
}
DhtKeyId last_id_xor;
if (list_.size() > 0) {
last_id_xor = list_.rbegin()->first;
void DhtQuery::finish_query(adnl::AdnlNodeIdShort id, bool success) {
active_queries_--;
CHECK(active_queries_ <= k_);
auto id_xor = key_ ^ DhtKeyId(id);
if (success) {
result_list_.insert(id_xor);
if (result_list_.size() > k_) {
result_list_.erase(--result_list_.end());
}
if (list_.size() < k_ || id_xor < last_id_xor) {
list_[id_xor] = std::move(node);
pending_ids_.insert(id_xor);
if (list_.size() > k_) {
CHECK(id_xor != last_id_xor);
VLOG(DHT_EXTRA_DEBUG) << this << ": " << get_name() << " query: replacing " << (last_id_xor ^ key_)
<< " key with " << id;
pending_ids_.erase(last_id_xor);
list_.erase(last_id_xor);
} else {
VLOG(DHT_EXTRA_DEBUG) << this << ": " << get_name() << " query: adding " << id << " key";
}
} else {
NodeInfo &info = nodes_[id_xor];
if (++info.failed_attempts < MAX_ATTEMPTS) {
pending_queries_.insert(id_xor);
}
}
send_queries();
}
void DhtQueryFindNodes::send_one_query(adnl::AdnlNodeIdShort id) {
@ -111,7 +122,7 @@ void DhtQueryFindNodes::send_one_query(adnl::AdnlNodeIdShort id) {
void DhtQueryFindNodes::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst) {
if (R.is_error()) {
VLOG(DHT_INFO) << this << ": failed find nodes query " << get_src() << "->" << dst << ": " << R.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
@ -122,7 +133,7 @@ void DhtQueryFindNodes::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeI
} else {
add_nodes(DhtNodesList{Res.move_as_ok(), our_network_id()});
}
finish_query();
finish_query(dst);
}
void DhtQueryFindNodes::finish(DhtNodesList list) {
@ -166,14 +177,14 @@ void DhtQueryFindValue::send_one_query_nodes(adnl::AdnlNodeIdShort id) {
void DhtQueryFindValue::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst) {
if (R.is_error()) {
VLOG(DHT_INFO) << this << ": failed find value query " << get_src() << "->" << dst << ": " << R.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
auto Res = fetch_tl_object<ton_api::dht_ValueResult>(R.move_as_ok(), true);
if (Res.is_error()) {
VLOG(DHT_WARNING) << this << ": dropping incorrect answer on dht.findValue query from " << dst << ": "
<< Res.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
@ -210,26 +221,26 @@ void DhtQueryFindValue::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeI
} else if (send_get_nodes) {
send_one_query_nodes(dst);
} else {
finish_query();
finish_query(dst);
}
}
void DhtQueryFindValue::on_result_nodes(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst) {
if (R.is_error()) {
VLOG(DHT_INFO) << this << ": failed find nodes query " << get_src() << "->" << dst << ": " << R.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
auto Res = fetch_tl_object<ton_api::dht_nodes>(R.move_as_ok(), true);
if (Res.is_error()) {
VLOG(DHT_WARNING) << this << ": dropping incorrect answer on dht.findNodes query from " << dst << ": "
<< Res.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
auto r = Res.move_as_ok();
add_nodes(DhtNodesList{create_tl_object<ton_api::dht_nodes>(std::move(r->nodes_)), our_network_id()});
finish_query();
finish_query(dst);
}
void DhtQueryFindValue::finish(DhtNodesList list) {
@ -422,14 +433,14 @@ void DhtQueryRequestReversePing::send_one_query(adnl::AdnlNodeIdShort id) {
void DhtQueryRequestReversePing::on_result(td::Result<td::BufferSlice> R, adnl::AdnlNodeIdShort dst) {
if (R.is_error()) {
VLOG(DHT_INFO) << this << ": failed reverse ping query " << get_src() << "->" << dst << ": " << R.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
auto Res = fetch_tl_object<ton_api::dht_ReversePingResult>(R.move_as_ok(), true);
if (Res.is_error()) {
VLOG(DHT_WARNING) << this << ": dropping incorrect answer on dht.requestReversePing query from " << dst << ": "
<< Res.move_as_error();
finish_query();
finish_query(dst, false);
return;
}
@ -441,7 +452,7 @@ void DhtQueryRequestReversePing::on_result(td::Result<td::BufferSlice> R, adnl::
},
[&](ton_api::dht_clientNotFound &v) {
add_nodes(DhtNodesList{std::move(v.nodes_), our_network_id()});
finish_query();
finish_query(dst);
}));
}