mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Recent updates in storage (#667)
* Fix error handling in Torrent.cpp, improve choosing peers for upload * Various improvements in storage daemon "get-pieces-info" Store "added at" Improve calculating up/down speed Improve TL protocol for future compatibility Remove empty directories on "--remove-files" Better windows support Debug logs in PeerActor More restrictions on TorrentInfo Bugfixes * Global speed limits for download and upload +bugfix * Reset download/upload speed on changing settings or completion * Exclude some system files in TorrentCreator
This commit is contained in:
parent
e3af63e6c0
commit
bb21f732fd
21 changed files with 974 additions and 213 deletions
|
@ -24,6 +24,7 @@
|
|||
|
||||
#include "td/utils/overloaded.h"
|
||||
#include "td/utils/Random.h"
|
||||
#include "vm/boc.h"
|
||||
|
||||
namespace ton {
|
||||
|
||||
|
@ -44,7 +45,7 @@ PeerActor::PeerActor(td::unique_ptr<Callback> callback, std::shared_ptr<PeerStat
|
|||
}
|
||||
|
||||
template <class T, class... ArgsT>
|
||||
td::uint64 PeerActor::create_and_send_query(ArgsT &&... args) {
|
||||
td::uint64 PeerActor::create_and_send_query(ArgsT &&...args) {
|
||||
return send_query(ton::create_serialize_tl_object<T>(std::forward<ArgsT>(args)...));
|
||||
}
|
||||
|
||||
|
@ -86,7 +87,9 @@ void PeerActor::on_ping_result(td::Result<td::BufferSlice> r_answer) {
|
|||
|
||||
void PeerActor::on_pong() {
|
||||
wait_pong_till_ = td::Timestamp::in(10);
|
||||
state_->peer_online_ = true;
|
||||
if (!state_->peer_online_.exchange(true)) {
|
||||
state_->peer_online_set_ = true;
|
||||
}
|
||||
notify_node();
|
||||
}
|
||||
|
||||
|
@ -115,6 +118,11 @@ void PeerActor::on_get_piece_result(PartId piece_id, td::Result<td::BufferSlice>
|
|||
res.proof = std::move(piece->proof_);
|
||||
return std::move(res);
|
||||
}();
|
||||
if (res.is_error()) {
|
||||
LOG(DEBUG) << "getPiece " << piece_id << "query: " << res.error();
|
||||
} else {
|
||||
LOG(DEBUG) << "getPiece " << piece_id << "query: OK";
|
||||
}
|
||||
state_->node_queries_results_.add_element(std::make_pair(piece_id, std::move(res)));
|
||||
notify_node();
|
||||
}
|
||||
|
@ -130,13 +138,16 @@ void PeerActor::on_get_info_result(td::Result<td::BufferSlice> r_answer) {
|
|||
next_get_info_at_ = td::Timestamp::in(5.0);
|
||||
alarm_timestamp().relax(next_get_info_at_);
|
||||
if (r_answer.is_error()) {
|
||||
LOG(DEBUG) << "getTorrentInfo query: " << r_answer.move_as_error();
|
||||
return;
|
||||
}
|
||||
auto R = fetch_tl_object<ton::ton_api::storage_torrentInfo>(r_answer.move_as_ok(), true);
|
||||
if (R.is_error()) {
|
||||
LOG(DEBUG) << "getTorrentInfo query: " << R.move_as_error();
|
||||
return;
|
||||
}
|
||||
td::BufferSlice data = std::move(R.ok_ref()->data_);
|
||||
LOG(DEBUG) << "getTorrentInfo query: got result (" << data.size() << " bytes)";
|
||||
if (!data.empty() && !state_->torrent_info_ready_) {
|
||||
state_->torrent_info_response_callback_(std::move(data));
|
||||
}
|
||||
|
@ -245,6 +256,8 @@ void PeerActor::loop_update_init() {
|
|||
update_state_query_.state = node_state;
|
||||
update_state_query_.query_id = 0;
|
||||
|
||||
LOG(DEBUG) << "Sending updateInit query (" << update_state_query_.state.want_download << ", "
|
||||
<< update_state_query_.state.will_upload << ", offset=" << peer_init_offset_ * 8 << ")";
|
||||
update_query_id_ = send_query(std::move(query));
|
||||
}
|
||||
|
||||
|
@ -265,13 +278,15 @@ void PeerActor::loop_update_state() {
|
|||
|
||||
auto query = create_update_query(
|
||||
ton::create_tl_object<ton::ton_api::storage_updateState>(to_ton_api(update_state_query_.state)));
|
||||
LOG(DEBUG) << "Sending updateState query (" << update_state_query_.state.want_download << ", "
|
||||
<< update_state_query_.state.will_upload << ")";
|
||||
update_state_query_.query_id = send_query(std::move(query));
|
||||
}
|
||||
|
||||
void PeerActor::update_have_pieces() {
|
||||
auto node_ready_parts = state_->node_ready_parts_.read();
|
||||
for (auto piece_id : node_ready_parts) {
|
||||
if (piece_id < peer_init_offset_ + UPDATE_INIT_BLOCK_SIZE) {
|
||||
if (piece_id < (peer_init_offset_ + UPDATE_INIT_BLOCK_SIZE) * 8 && !have_pieces_.get(piece_id)) {
|
||||
have_pieces_list_.push_back(piece_id);
|
||||
}
|
||||
have_pieces_.set_one(piece_id);
|
||||
|
@ -291,23 +306,49 @@ void PeerActor::loop_update_pieces() {
|
|||
have_pieces_list_.erase(have_pieces_list_.end() - count, have_pieces_list_.end());
|
||||
auto query = create_update_query(ton::create_tl_object<ton::ton_api::storage_updateHavePieces>(
|
||||
td::transform(sent_have_pieces_list_, [](auto x) { return static_cast<td::int32>(x); })));
|
||||
LOG(DEBUG) << "Sending updateHavePieces query (" << sent_have_pieces_list_.size() << " pieces)";
|
||||
update_query_id_ = send_query(std::move(query));
|
||||
}
|
||||
}
|
||||
|
||||
void PeerActor::loop_get_torrent_info() {
|
||||
if (get_info_query_id_ || state_->torrent_info_ready_) {
|
||||
if (state_->torrent_info_ready_) {
|
||||
if (!torrent_info_) {
|
||||
torrent_info_ = state_->torrent_info_;
|
||||
for (const auto &u : pending_update_peer_parts_) {
|
||||
process_update_peer_parts(u);
|
||||
}
|
||||
pending_update_peer_parts_.clear();
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (get_info_query_id_) {
|
||||
return;
|
||||
}
|
||||
if (next_get_info_at_ && !next_get_info_at_.is_in_past()) {
|
||||
return;
|
||||
}
|
||||
LOG(DEBUG) << "Sending getTorrentInfo query";
|
||||
get_info_query_id_ = create_and_send_query<ton::ton_api::storage_getTorrentInfo>();
|
||||
}
|
||||
|
||||
void PeerActor::loop_node_get_piece() {
|
||||
for (auto part : state_->node_queries_.read()) {
|
||||
node_get_piece_.emplace(part, NodePieceQuery{});
|
||||
if (state_->speed_limiters_.download.empty()) {
|
||||
node_get_piece_.emplace(part, NodePieceQuery{});
|
||||
} else {
|
||||
if (!torrent_info_) {
|
||||
CHECK(state_->torrent_info_ready_);
|
||||
loop_get_torrent_info();
|
||||
}
|
||||
auto piece_size =
|
||||
std::min<td::uint64>(torrent_info_->piece_size, torrent_info_->file_size - part * torrent_info_->piece_size);
|
||||
td::actor::send_closure(state_->speed_limiters_.download, &SpeedLimiter::enqueue, (double)piece_size,
|
||||
td::Timestamp::in(3.0), [part, SelfId = actor_id(this)](td::Result<td::Unit> R) {
|
||||
td::actor::send_closure(SelfId, &PeerActor::node_get_piece_query_ready, part,
|
||||
std::move(R));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
for (auto &query_it : node_get_piece_) {
|
||||
|
@ -315,11 +356,21 @@ void PeerActor::loop_node_get_piece() {
|
|||
continue;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "Sending getPiece " << query_it.first << " query";
|
||||
query_it.second.query_id =
|
||||
create_and_send_query<ton::ton_api::storage_getPiece>(static_cast<td::int32>(query_it.first));
|
||||
}
|
||||
}
|
||||
|
||||
void PeerActor::node_get_piece_query_ready(PartId part, td::Result<td::Unit> R) {
|
||||
if (R.is_error()) {
|
||||
on_get_piece_result(part, R.move_as_error());
|
||||
} else {
|
||||
node_get_piece_.emplace(part, NodePieceQuery{});
|
||||
}
|
||||
schedule_loop();
|
||||
}
|
||||
|
||||
void PeerActor::loop_peer_get_piece() {
|
||||
// process answers
|
||||
for (auto &p : state_->peer_queries_results_.read()) {
|
||||
|
@ -328,9 +379,26 @@ void PeerActor::loop_peer_get_piece() {
|
|||
if (promise_it == peer_get_piece_.end()) {
|
||||
continue;
|
||||
}
|
||||
promise_it->second.promise.set_result(p.second.move_map([](PeerState::Part part) {
|
||||
return ton::create_serialize_tl_object<ton::ton_api::storage_piece>(std::move(part.proof), std::move(part.data));
|
||||
}));
|
||||
td::Promise<PeerState::Part> promise =
|
||||
[i = p.first, promise = std::move(promise_it->second.promise)](td::Result<PeerState::Part> R) mutable {
|
||||
LOG(DEBUG) << "Responding to getPiece " << i << ": " << (R.is_ok() ? "OK" : R.error().to_string());
|
||||
promise.set_result(R.move_map([](PeerState::Part part) {
|
||||
return create_serialize_tl_object<ton_api::storage_piece>(std::move(part.proof), std::move(part.data));
|
||||
}));
|
||||
};
|
||||
if (p.second.is_error()) {
|
||||
promise.set_error(p.second.move_as_error());
|
||||
} else {
|
||||
auto part = p.second.move_as_ok();
|
||||
auto size = (double)part.data.size();
|
||||
td::Promise<td::Unit> P = promise.wrap([part = std::move(part)](td::Unit) mutable { return std::move(part); });
|
||||
if (state_->speed_limiters_.upload.empty()) {
|
||||
P.set_result(td::Unit());
|
||||
} else {
|
||||
td::actor::send_closure(state_->speed_limiters_.upload, &SpeedLimiter::enqueue, size, td::Timestamp::in(3.0),
|
||||
std::move(P));
|
||||
}
|
||||
}
|
||||
peer_get_piece_.erase(promise_it);
|
||||
notify_node();
|
||||
}
|
||||
|
@ -373,18 +441,8 @@ void PeerActor::execute_add_update(ton::ton_api::storage_addUpdate &add_update,
|
|||
promise.set_error(td::Status::Error(404, "INVALID_SESSION"));
|
||||
return;
|
||||
}
|
||||
|
||||
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::storage_ok>());
|
||||
|
||||
std::vector<td::uint32> new_peer_ready_parts;
|
||||
auto add_piece = [&](PartId id) {
|
||||
if (!peer_have_pieces_.get(id)) {
|
||||
peer_have_pieces_.set_one(id);
|
||||
new_peer_ready_parts.push_back(id);
|
||||
notify_node();
|
||||
}
|
||||
};
|
||||
|
||||
auto seqno = static_cast<td::uint32>(add_update.seqno_);
|
||||
auto update_peer_state = [&](PeerState::State peer_state) {
|
||||
if (peer_seqno_ >= seqno) {
|
||||
|
@ -393,22 +451,48 @@ void PeerActor::execute_add_update(ton::ton_api::storage_addUpdate &add_update,
|
|||
if (state_->peer_state_ready_ && state_->peer_state_.load() == peer_state) {
|
||||
return;
|
||||
}
|
||||
LOG(DEBUG) << "Processing update peer state query (" << peer_state.want_download << ", " << peer_state.will_upload
|
||||
<< ")";
|
||||
peer_seqno_ = seqno;
|
||||
state_->peer_state_.exchange(peer_state);
|
||||
state_->peer_state_ready_ = true;
|
||||
notify_node();
|
||||
};
|
||||
downcast_call(
|
||||
*add_update.update_,
|
||||
td::overloaded(
|
||||
[&](const ton::ton_api::storage_updateHavePieces &have_pieces) {},
|
||||
[&](const ton::ton_api::storage_updateState &state) { update_peer_state(from_ton_api(*state.state_)); },
|
||||
[&](const ton::ton_api::storage_updateInit &init) { update_peer_state(from_ton_api(*init.state_)); }));
|
||||
if (torrent_info_) {
|
||||
process_update_peer_parts(add_update.update_);
|
||||
} else {
|
||||
pending_update_peer_parts_.push_back(std::move(add_update.update_));
|
||||
}
|
||||
}
|
||||
|
||||
downcast_call(*add_update.update_,
|
||||
void PeerActor::process_update_peer_parts(const tl_object_ptr<ton_api::storage_Update> &update) {
|
||||
CHECK(torrent_info_);
|
||||
td::uint64 pieces_count = torrent_info_->pieces_count();
|
||||
std::vector<td::uint32> new_peer_ready_parts;
|
||||
auto add_piece = [&](PartId id) {
|
||||
if (id < pieces_count && !peer_have_pieces_.get(id)) {
|
||||
peer_have_pieces_.set_one(id);
|
||||
new_peer_ready_parts.push_back(id);
|
||||
notify_node();
|
||||
}
|
||||
};
|
||||
downcast_call(*update,
|
||||
td::overloaded(
|
||||
[&](ton::ton_api::storage_updateHavePieces &have_pieces) {
|
||||
[&](const ton::ton_api::storage_updateHavePieces &have_pieces) {
|
||||
LOG(DEBUG) << "Processing updateHavePieces query (" << have_pieces.piece_id_ << " pieces)";
|
||||
for (auto id : have_pieces.piece_id_) {
|
||||
add_piece(id);
|
||||
}
|
||||
},
|
||||
[&](ton::ton_api::storage_updateState &state) { update_peer_state(from_ton_api(*state.state_)); },
|
||||
[&](ton::ton_api::storage_updateInit &init) {
|
||||
update_peer_state(from_ton_api(*init.state_));
|
||||
[&](const ton::ton_api::storage_updateState &state) {},
|
||||
[&](const ton::ton_api::storage_updateInit &init) {
|
||||
LOG(DEBUG) << "Processing updateInit query (offset=" << init.have_pieces_offset_ * 8 << ")";
|
||||
td::Bitset new_bitset;
|
||||
new_bitset.set_raw(init.have_pieces_.as_slice().str());
|
||||
size_t offset = init.have_pieces_offset_ * 8;
|
||||
|
@ -428,7 +512,13 @@ void PeerActor::execute_get_piece(ton::ton_api::storage_getPiece &get_piece, td:
|
|||
|
||||
void PeerActor::execute_get_torrent_info(td::Promise<td::BufferSlice> promise) {
|
||||
td::BufferSlice result = create_serialize_tl_object<ton_api::storage_torrentInfo>(
|
||||
state_->torrent_info_ready_ ? state_->torrent_info_str_->clone() : td::BufferSlice());
|
||||
state_->torrent_info_ready_ ? vm::std_boc_serialize(state_->torrent_info_->as_cell()).move_as_ok()
|
||||
: td::BufferSlice());
|
||||
if (state_->torrent_info_ready_) {
|
||||
LOG(DEBUG) << "Responding to getTorrentInfo: " << result.size() << " bytes";
|
||||
} else {
|
||||
LOG(DEBUG) << "Responding to getTorrentInfo: no info";
|
||||
}
|
||||
promise.set_result(std::move(result));
|
||||
}
|
||||
} // namespace ton
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue