From bb21f732fd65411ef949dcda9f7c56962216742a Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Fri, 7 Apr 2023 12:50:07 +0000 Subject: [PATCH] Recent updates in storage (#667) * Fix error handling in Torrent.cpp, improve choosing peers for upload * Various improvements in storage daemon "get-pieces-info" Store "added at" Improve calculating up/down speed Improve TL protocol for future compatibility Remove empty directories on "--remove-files" Better windows support Debug logs in PeerActor More restrictions on TorrentInfo Bugfixes * Global speed limits for download and upload +bugfix * Reset download/upload speed on changing settings or completion * Exclude some system files in TorrentCreator --- storage/CMakeLists.txt | 5 +- storage/LoadSpeed.cpp | 6 +- storage/LoadSpeed.h | 1 + storage/NodeActor.cpp | 109 ++-- storage/NodeActor.h | 20 +- storage/PeerActor.cpp | 138 ++++- storage/PeerActor.h | 5 + storage/PeerState.h | 6 +- storage/SpeedLimiter.cpp | 82 +++ storage/SpeedLimiter.h | 52 ++ storage/Torrent.cpp | 12 +- storage/TorrentCreator.cpp | 25 +- storage/TorrentInfo.cpp | 6 + storage/storage-cli.cpp | 6 +- storage/storage-daemon/StorageManager.cpp | 106 +++- storage/storage-daemon/StorageManager.h | 14 + storage/storage-daemon/storage-daemon-cli.cpp | 476 +++++++++++++----- storage/storage-daemon/storage-daemon.cpp | 70 +++ storage/test/storage.cpp | 4 +- tl/generate/scheme/ton_api.tl | 44 +- tl/generate/scheme/ton_api.tlo | Bin 84224 -> 86540 bytes 21 files changed, 974 insertions(+), 213 deletions(-) create mode 100644 storage/SpeedLimiter.cpp create mode 100644 storage/SpeedLimiter.h diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 840dfd46..a5f36ff2 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -8,9 +8,11 @@ endif() set(STORAGE_SOURCE LoadSpeed.cpp MerkleTree.cpp + MicrochunkTree.cpp NodeActor.cpp PeerActor.cpp PeerState.cpp + SpeedLimiter.cpp Torrent.cpp TorrentCreator.cpp TorrentHeader.cpp @@ -25,13 +27,14 @@ set(STORAGE_SOURCE PartsHelper.h PeerActor.h PeerState.h + SpeedLimiter.h Torrent.h TorrentCreator.h TorrentHeader.h TorrentInfo.h TorrentMeta.h PeerManager.h - MicrochunkTree.h MicrochunkTree.cpp) + MicrochunkTree.h) set(STORAGE_CLI_SOURCE storage-cli.cpp ) diff --git a/storage/LoadSpeed.cpp b/storage/LoadSpeed.cpp index b47f4120..46aac2ff 100644 --- a/storage/LoadSpeed.cpp +++ b/storage/LoadSpeed.cpp @@ -31,13 +31,17 @@ double LoadSpeed::speed(td::Timestamp now) const { update(now); return (double)total_size_ / duration(now); } +void LoadSpeed::reset() { + events_ = {}; + total_size_ = 0; +} td::StringBuilder &operator<<(td::StringBuilder &sb, const LoadSpeed &speed) { return sb << td::format::as_size(static_cast(speed.speed())) << "/s"; } void LoadSpeed::update(td::Timestamp now) const { - while (duration(now) > 30) { + while (duration(now) > 10) { total_size_ -= events_.front().size; events_.pop(); } diff --git a/storage/LoadSpeed.h b/storage/LoadSpeed.h index 92d947f7..37115b88 100644 --- a/storage/LoadSpeed.h +++ b/storage/LoadSpeed.h @@ -28,6 +28,7 @@ class LoadSpeed { public: void add(td::uint64 size, td::Timestamp now = td::Timestamp::now()); double speed(td::Timestamp now = td::Timestamp::now()) const; + void reset(); friend td::StringBuilder &operator<<(td::StringBuilder &sb, const LoadSpeed &speed); private: diff --git a/storage/NodeActor.cpp b/storage/NodeActor.cpp index 041d74cd..e24fffff 100644 --- a/storage/NodeActor.cpp +++ b/storage/NodeActor.cpp @@ -25,27 +25,14 @@ #include "td/utils/Enumerator.h" #include "td/utils/tests.h" #include "td/utils/overloaded.h" -#include "tl-utils/common-utils.hpp" #include "tl-utils/tl-utils.hpp" #include "auto/tl/ton_api.hpp" #include "td/actor/MultiPromise.h" namespace ton { NodeActor::NodeActor(PeerId self_id, Torrent torrent, td::unique_ptr callback, - td::unique_ptr node_callback, std::shared_ptr db, bool should_download, - bool should_upload) - : self_id_(self_id) - , torrent_(std::move(torrent)) - , callback_(std::move(callback)) - , node_callback_(std::move(node_callback)) - , db_(std::move(db)) - , should_download_(should_download) - , should_upload_(should_upload) { -} - -NodeActor::NodeActor(PeerId self_id, ton::Torrent torrent, td::unique_ptr callback, - td::unique_ptr node_callback, std::shared_ptr db, bool should_download, - bool should_upload, DbInitialData db_initial_data) + td::unique_ptr node_callback, std::shared_ptr db, + SpeedLimiters speed_limiters, bool should_download, bool should_upload) : self_id_(self_id) , torrent_(std::move(torrent)) , callback_(std::move(callback)) @@ -53,6 +40,23 @@ NodeActor::NodeActor(PeerId self_id, ton::Torrent torrent, td::unique_ptr callback, + td::unique_ptr node_callback, std::shared_ptr db, + SpeedLimiters speed_limiters, bool should_download, bool should_upload, + DbInitialData db_initial_data) + : self_id_(self_id) + , torrent_(std::move(torrent)) + , callback_(std::move(callback)) + , node_callback_(std::move(node_callback)) + , db_(std::move(db)) + , should_download_(should_download) + , should_upload_(should_upload) + , added_at_(db_initial_data.added_at) + , speed_limiters_(std::move(speed_limiters)) , pending_set_file_priority_(std::move(db_initial_data.priorities)) , pieces_in_db_(std::move(db_initial_data.pieces_in_db)) { } @@ -100,12 +104,12 @@ void NodeActor::init_torrent() { } } - torrent_info_str_ = - std::make_shared(vm::std_boc_serialize(torrent_.get_info().as_cell()).move_as_ok()); + torrent_info_shared_ = std::make_shared(torrent_.get_info()); for (auto &p : peers_) { auto &state = p.second.state; - state->torrent_info_str_ = torrent_info_str_; + state->torrent_info_ = torrent_info_shared_; CHECK(!state->torrent_info_ready_.exchange(true)); + state->notify_peer(); } LOG(INFO) << "Inited torrent info for " << torrent_.get_hash().to_hex() << ": size=" << torrent_.get_info().file_size << ", pieces=" << torrent_.get_info().pieces_count(); @@ -185,7 +189,7 @@ void NodeActor::loop_will_upload() { auto &state = it.second.state; bool needed = false; if (state->peer_state_ready_) { - needed = state->peer_state_.load().want_download; + needed = state->peer_state_.load().want_download && state->peer_online_; } peers.emplace_back(!needed, !state->node_state_.load().want_download, -it.second.download_speed.speed(), it.first); } @@ -247,6 +251,10 @@ void NodeActor::loop() { } wait_for_completion_.clear(); is_completed_ = true; + download_speed_.reset(); + for (auto &peer : peers_) { + peer.second.download_speed.reset(); + } callback_->on_completed(); } } @@ -398,6 +406,12 @@ void NodeActor::set_should_download(bool should_download) { return; } should_download_ = should_download; + if (!should_download_) { + download_speed_.reset(); + for (auto &peer : peers_) { + peer.second.download_speed.reset(); + } + } db_store_torrent(); yield(); } @@ -407,7 +421,14 @@ void NodeActor::set_should_upload(bool should_upload) { return; } should_upload_ = should_upload; + if (!should_upload_) { + upload_speed_.reset(); + for (auto &peer : peers_) { + peer.second.upload_speed.reset(); + } + } db_store_torrent(); + will_upload_at_ = td::Timestamp::now(); yield(); } @@ -478,6 +499,7 @@ void NodeActor::loop_start_stop_peers() { if (peer.actor.empty()) { auto &state = peer.state = std::make_shared(peer.notifier.get()); + state->speed_limiters_ = speed_limiters_; if (torrent_.inited_info()) { std::vector node_ready_parts; for (td::uint32 i = 0; i < parts_.parts.size(); i++) { @@ -486,7 +508,7 @@ void NodeActor::loop_start_stop_peers() { } } state->node_ready_parts_.add_elements(std::move(node_ready_parts)); - state->torrent_info_str_ = torrent_info_str_; + state->torrent_info_ = torrent_info_shared_; state->torrent_info_ready_ = true; } else { state->torrent_info_response_callback_ = [SelfId = actor_id(this)](td::BufferSlice data) { @@ -575,6 +597,11 @@ void NodeActor::loop_peer(const PeerId &peer_id, Peer &peer) { for (auto part_id : state->peer_ready_parts_.read()) { parts_helper_.on_peer_part_ready(peer.peer_token, part_id); } + if (state->peer_online_set_.load()) { + state->peer_online_set_ = false; + will_upload_at_ = td::Timestamp::now(); + loop_will_upload(); + } // Answer queries from peer bool should_notify_peer = false; @@ -600,7 +627,7 @@ void NodeActor::loop_peer(const PeerId &peer_id, Peer &peer) { TRY_RESULT(proof_serialized, vm::std_boc_serialize(std::move(proof))); res.proof = std::move(proof_serialized); res.data = td::BufferSlice(std::move(data)); - td::uint64 size = res.data.size() + res.proof.size(); + td::uint64 size = res.data.size(); upload_speed_.add(size); peer.upload_speed.add(size); return std::move(res); @@ -701,10 +728,11 @@ void NodeActor::db_store_torrent() { if (!db_) { return; } - auto obj = create_tl_object(); + auto obj = create_tl_object(); obj->active_download_ = should_download_; obj->active_upload_ = should_upload_; obj->root_dir_ = torrent_.get_root_dir(); + obj->added_at_ = added_at_; db_->set(create_hash_tl_object(torrent_.get_hash()), serialize_tl_object(obj, true), [](td::Result R) { if (R.is_error()) { @@ -823,16 +851,18 @@ void NodeActor::db_update_pieces_list() { } void NodeActor::load_from_db(std::shared_ptr db, td::Bits256 hash, td::unique_ptr callback, - td::unique_ptr node_callback, + td::unique_ptr node_callback, SpeedLimiters speed_limiters, td::Promise> promise) { class Loader : public td::actor::Actor { public: Loader(std::shared_ptr db, td::Bits256 hash, td::unique_ptr callback, - td::unique_ptr node_callback, td::Promise> promise) + td::unique_ptr node_callback, SpeedLimiters speed_limiters, + td::Promise> promise) : db_(std::move(db)) , hash_(hash) , callback_(std::move(callback)) , node_callback_(std::move(node_callback)) + , speed_limiters_(std::move(speed_limiters)) , promise_(std::move(promise)) { } @@ -842,9 +872,9 @@ void NodeActor::load_from_db(std::shared_ptr db, td::Bits256 hash, t } void start_up() override { - db::db_get( + db::db_get( *db_, create_hash_tl_object(hash_), false, - [SelfId = actor_id(this)](td::Result> R) { + [SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { td::actor::send_closure(SelfId, &Loader::finish, R.move_as_error_prefix("Torrent: ")); } else { @@ -853,10 +883,20 @@ void NodeActor::load_from_db(std::shared_ptr db, td::Bits256 hash, t }); } - void got_torrent(tl_object_ptr obj) { - root_dir_ = std::move(obj->root_dir_); - active_download_ = obj->active_download_; - active_upload_ = obj->active_upload_; + void got_torrent(tl_object_ptr obj) { + ton_api::downcast_call(*obj, td::overloaded( + [&](ton_api::storage_db_torrent &t) { + root_dir_ = std::move(t.root_dir_); + active_download_ = t.active_download_; + active_upload_ = t.active_upload_; + added_at_ = (td::uint32)td::Clocks::system(); + }, + [&](ton_api::storage_db_torrentV2 &t) { + root_dir_ = std::move(t.root_dir_); + active_download_ = t.active_download_; + active_upload_ = t.active_upload_; + added_at_ = t.added_at_; + })); db_->get(create_hash_tl_object(hash_), [SelfId = actor_id(this)](td::Result R) { if (R.is_error()) { @@ -980,9 +1020,10 @@ void NodeActor::load_from_db(std::shared_ptr db, td::Bits256 hash, t DbInitialData data; data.priorities = std::move(priorities_); data.pieces_in_db = std::move(pieces_in_db_); + data.added_at = added_at_; finish(td::actor::create_actor("Node", 1, torrent_.unwrap(), std::move(callback_), - std::move(node_callback_), std::move(db_), active_download_, - active_upload_, std::move(data))); + std::move(node_callback_), std::move(db_), std::move(speed_limiters_), + active_download_, active_upload_, std::move(data))); } private: @@ -990,18 +1031,20 @@ void NodeActor::load_from_db(std::shared_ptr db, td::Bits256 hash, t td::Bits256 hash_; td::unique_ptr callback_; td::unique_ptr node_callback_; + SpeedLimiters speed_limiters_; td::Promise> promise_; std::string root_dir_; bool active_download_{false}; bool active_upload_{false}; + td::uint32 added_at_; td::optional torrent_; std::vector priorities_; std::set pieces_in_db_; size_t remaining_pieces_in_db_ = 0; }; td::actor::create_actor("loader", std::move(db), hash, std::move(callback), std::move(node_callback), - std::move(promise)) + std::move(speed_limiters), std::move(promise)) .release(); } diff --git a/storage/NodeActor.h b/storage/NodeActor.h index f8cadce2..2f0a3232 100644 --- a/storage/NodeActor.h +++ b/storage/NodeActor.h @@ -23,6 +23,7 @@ #include "PartsHelper.h" #include "PeerActor.h" #include "Torrent.h" +#include "SpeedLimiter.h" #include "td/utils/Random.h" #include "td/utils/Variant.h" @@ -31,6 +32,7 @@ #include "db.h" namespace ton { + class NodeActor : public td::actor::Actor { public: class NodeCallback { @@ -60,14 +62,15 @@ class NodeActor : public td::actor::Actor { struct DbInitialData { std::vector priorities; std::set pieces_in_db; + td::uint32 added_at; }; NodeActor(PeerId self_id, ton::Torrent torrent, td::unique_ptr callback, - td::unique_ptr node_callback, std::shared_ptr db, bool should_download = true, - bool should_upload = true); + td::unique_ptr node_callback, std::shared_ptr db, SpeedLimiters speed_limiters, + bool should_download = true, bool should_upload = true); NodeActor(PeerId self_id, ton::Torrent torrent, td::unique_ptr callback, - td::unique_ptr node_callback, std::shared_ptr db, bool should_download, - bool should_upload, DbInitialData db_initial_data); + td::unique_ptr node_callback, std::shared_ptr db, SpeedLimiters speed_limiters, + bool should_download, bool should_upload, DbInitialData db_initial_data); void start_peer(PeerId peer_id, td::Promise> promise); struct NodeState { @@ -76,11 +79,12 @@ class NodeActor : public td::actor::Actor { bool active_upload; double download_speed; double upload_speed; + td::uint32 added_at; const std::vector &file_priority; }; void with_torrent(td::Promise promise) { promise.set_value(NodeState{torrent_, should_download_, should_upload_, download_speed_.speed(), - upload_speed_.speed(), file_priority_}); + upload_speed_.speed(), added_at_, file_priority_}); } std::string get_stats_str(); @@ -98,20 +102,22 @@ class NodeActor : public td::actor::Actor { void get_peers_info(td::Promise> promise); static void load_from_db(std::shared_ptr db, td::Bits256 hash, td::unique_ptr callback, - td::unique_ptr node_callback, + td::unique_ptr node_callback, SpeedLimiters speed_limiters, td::Promise> promise); static void cleanup_db(std::shared_ptr db, td::Bits256 hash, td::Promise promise); private: PeerId self_id_; ton::Torrent torrent_; - std::shared_ptr torrent_info_str_; + std::shared_ptr torrent_info_shared_; std::vector file_priority_; td::unique_ptr callback_; td::unique_ptr node_callback_; std::shared_ptr db_; bool should_download_{false}; bool should_upload_{false}; + td::uint32 added_at_{0}; + SpeedLimiters speed_limiters_; class Notifier : public td::actor::Actor { public: diff --git a/storage/PeerActor.cpp b/storage/PeerActor.cpp index b2bf5a89..0cb21c0a 100644 --- a/storage/PeerActor.cpp +++ b/storage/PeerActor.cpp @@ -24,6 +24,7 @@ #include "td/utils/overloaded.h" #include "td/utils/Random.h" +#include "vm/boc.h" namespace ton { @@ -44,7 +45,7 @@ PeerActor::PeerActor(td::unique_ptr callback, std::shared_ptr -td::uint64 PeerActor::create_and_send_query(ArgsT &&... args) { +td::uint64 PeerActor::create_and_send_query(ArgsT &&...args) { return send_query(ton::create_serialize_tl_object(std::forward(args)...)); } @@ -86,7 +87,9 @@ void PeerActor::on_ping_result(td::Result r_answer) { void PeerActor::on_pong() { wait_pong_till_ = td::Timestamp::in(10); - state_->peer_online_ = true; + if (!state_->peer_online_.exchange(true)) { + state_->peer_online_set_ = true; + } notify_node(); } @@ -115,6 +118,11 @@ void PeerActor::on_get_piece_result(PartId piece_id, td::Result res.proof = std::move(piece->proof_); return std::move(res); }(); + if (res.is_error()) { + LOG(DEBUG) << "getPiece " << piece_id << "query: " << res.error(); + } else { + LOG(DEBUG) << "getPiece " << piece_id << "query: OK"; + } state_->node_queries_results_.add_element(std::make_pair(piece_id, std::move(res))); notify_node(); } @@ -130,13 +138,16 @@ void PeerActor::on_get_info_result(td::Result r_answer) { next_get_info_at_ = td::Timestamp::in(5.0); alarm_timestamp().relax(next_get_info_at_); if (r_answer.is_error()) { + LOG(DEBUG) << "getTorrentInfo query: " << r_answer.move_as_error(); return; } auto R = fetch_tl_object(r_answer.move_as_ok(), true); if (R.is_error()) { + LOG(DEBUG) << "getTorrentInfo query: " << R.move_as_error(); return; } td::BufferSlice data = std::move(R.ok_ref()->data_); + LOG(DEBUG) << "getTorrentInfo query: got result (" << data.size() << " bytes)"; if (!data.empty() && !state_->torrent_info_ready_) { state_->torrent_info_response_callback_(std::move(data)); } @@ -245,6 +256,8 @@ void PeerActor::loop_update_init() { update_state_query_.state = node_state; update_state_query_.query_id = 0; + LOG(DEBUG) << "Sending updateInit query (" << update_state_query_.state.want_download << ", " + << update_state_query_.state.will_upload << ", offset=" << peer_init_offset_ * 8 << ")"; update_query_id_ = send_query(std::move(query)); } @@ -265,13 +278,15 @@ void PeerActor::loop_update_state() { auto query = create_update_query( ton::create_tl_object(to_ton_api(update_state_query_.state))); + LOG(DEBUG) << "Sending updateState query (" << update_state_query_.state.want_download << ", " + << update_state_query_.state.will_upload << ")"; update_state_query_.query_id = send_query(std::move(query)); } void PeerActor::update_have_pieces() { auto node_ready_parts = state_->node_ready_parts_.read(); for (auto piece_id : node_ready_parts) { - if (piece_id < peer_init_offset_ + UPDATE_INIT_BLOCK_SIZE) { + if (piece_id < (peer_init_offset_ + UPDATE_INIT_BLOCK_SIZE) * 8 && !have_pieces_.get(piece_id)) { have_pieces_list_.push_back(piece_id); } have_pieces_.set_one(piece_id); @@ -291,23 +306,49 @@ void PeerActor::loop_update_pieces() { have_pieces_list_.erase(have_pieces_list_.end() - count, have_pieces_list_.end()); auto query = create_update_query(ton::create_tl_object( td::transform(sent_have_pieces_list_, [](auto x) { return static_cast(x); }))); + LOG(DEBUG) << "Sending updateHavePieces query (" << sent_have_pieces_list_.size() << " pieces)"; update_query_id_ = send_query(std::move(query)); } } void PeerActor::loop_get_torrent_info() { - if (get_info_query_id_ || state_->torrent_info_ready_) { + if (state_->torrent_info_ready_) { + if (!torrent_info_) { + torrent_info_ = state_->torrent_info_; + for (const auto &u : pending_update_peer_parts_) { + process_update_peer_parts(u); + } + pending_update_peer_parts_.clear(); + } + return; + } + if (get_info_query_id_) { return; } if (next_get_info_at_ && !next_get_info_at_.is_in_past()) { return; } + LOG(DEBUG) << "Sending getTorrentInfo query"; get_info_query_id_ = create_and_send_query(); } void PeerActor::loop_node_get_piece() { for (auto part : state_->node_queries_.read()) { - node_get_piece_.emplace(part, NodePieceQuery{}); + if (state_->speed_limiters_.download.empty()) { + node_get_piece_.emplace(part, NodePieceQuery{}); + } else { + if (!torrent_info_) { + CHECK(state_->torrent_info_ready_); + loop_get_torrent_info(); + } + auto piece_size = + std::min(torrent_info_->piece_size, torrent_info_->file_size - part * torrent_info_->piece_size); + td::actor::send_closure(state_->speed_limiters_.download, &SpeedLimiter::enqueue, (double)piece_size, + td::Timestamp::in(3.0), [part, SelfId = actor_id(this)](td::Result R) { + td::actor::send_closure(SelfId, &PeerActor::node_get_piece_query_ready, part, + std::move(R)); + }); + } } for (auto &query_it : node_get_piece_) { @@ -315,11 +356,21 @@ void PeerActor::loop_node_get_piece() { continue; } + LOG(DEBUG) << "Sending getPiece " << query_it.first << " query"; query_it.second.query_id = create_and_send_query(static_cast(query_it.first)); } } +void PeerActor::node_get_piece_query_ready(PartId part, td::Result R) { + if (R.is_error()) { + on_get_piece_result(part, R.move_as_error()); + } else { + node_get_piece_.emplace(part, NodePieceQuery{}); + } + schedule_loop(); +} + void PeerActor::loop_peer_get_piece() { // process answers for (auto &p : state_->peer_queries_results_.read()) { @@ -328,9 +379,26 @@ void PeerActor::loop_peer_get_piece() { if (promise_it == peer_get_piece_.end()) { continue; } - promise_it->second.promise.set_result(p.second.move_map([](PeerState::Part part) { - return ton::create_serialize_tl_object(std::move(part.proof), std::move(part.data)); - })); + td::Promise promise = + [i = p.first, promise = std::move(promise_it->second.promise)](td::Result R) mutable { + LOG(DEBUG) << "Responding to getPiece " << i << ": " << (R.is_ok() ? "OK" : R.error().to_string()); + promise.set_result(R.move_map([](PeerState::Part part) { + return create_serialize_tl_object(std::move(part.proof), std::move(part.data)); + })); + }; + if (p.second.is_error()) { + promise.set_error(p.second.move_as_error()); + } else { + auto part = p.second.move_as_ok(); + auto size = (double)part.data.size(); + td::Promise P = promise.wrap([part = std::move(part)](td::Unit) mutable { return std::move(part); }); + if (state_->speed_limiters_.upload.empty()) { + P.set_result(td::Unit()); + } else { + td::actor::send_closure(state_->speed_limiters_.upload, &SpeedLimiter::enqueue, size, td::Timestamp::in(3.0), + std::move(P)); + } + } peer_get_piece_.erase(promise_it); notify_node(); } @@ -373,18 +441,8 @@ void PeerActor::execute_add_update(ton::ton_api::storage_addUpdate &add_update, promise.set_error(td::Status::Error(404, "INVALID_SESSION")); return; } - promise.set_value(ton::create_serialize_tl_object()); - std::vector new_peer_ready_parts; - auto add_piece = [&](PartId id) { - if (!peer_have_pieces_.get(id)) { - peer_have_pieces_.set_one(id); - new_peer_ready_parts.push_back(id); - notify_node(); - } - }; - auto seqno = static_cast(add_update.seqno_); auto update_peer_state = [&](PeerState::State peer_state) { if (peer_seqno_ >= seqno) { @@ -393,22 +451,48 @@ void PeerActor::execute_add_update(ton::ton_api::storage_addUpdate &add_update, if (state_->peer_state_ready_ && state_->peer_state_.load() == peer_state) { return; } + LOG(DEBUG) << "Processing update peer state query (" << peer_state.want_download << ", " << peer_state.will_upload + << ")"; peer_seqno_ = seqno; state_->peer_state_.exchange(peer_state); state_->peer_state_ready_ = true; notify_node(); }; + downcast_call( + *add_update.update_, + td::overloaded( + [&](const ton::ton_api::storage_updateHavePieces &have_pieces) {}, + [&](const ton::ton_api::storage_updateState &state) { update_peer_state(from_ton_api(*state.state_)); }, + [&](const ton::ton_api::storage_updateInit &init) { update_peer_state(from_ton_api(*init.state_)); })); + if (torrent_info_) { + process_update_peer_parts(add_update.update_); + } else { + pending_update_peer_parts_.push_back(std::move(add_update.update_)); + } +} - downcast_call(*add_update.update_, +void PeerActor::process_update_peer_parts(const tl_object_ptr &update) { + CHECK(torrent_info_); + td::uint64 pieces_count = torrent_info_->pieces_count(); + std::vector new_peer_ready_parts; + auto add_piece = [&](PartId id) { + if (id < pieces_count && !peer_have_pieces_.get(id)) { + peer_have_pieces_.set_one(id); + new_peer_ready_parts.push_back(id); + notify_node(); + } + }; + downcast_call(*update, td::overloaded( - [&](ton::ton_api::storage_updateHavePieces &have_pieces) { + [&](const ton::ton_api::storage_updateHavePieces &have_pieces) { + LOG(DEBUG) << "Processing updateHavePieces query (" << have_pieces.piece_id_ << " pieces)"; for (auto id : have_pieces.piece_id_) { add_piece(id); } }, - [&](ton::ton_api::storage_updateState &state) { update_peer_state(from_ton_api(*state.state_)); }, - [&](ton::ton_api::storage_updateInit &init) { - update_peer_state(from_ton_api(*init.state_)); + [&](const ton::ton_api::storage_updateState &state) {}, + [&](const ton::ton_api::storage_updateInit &init) { + LOG(DEBUG) << "Processing updateInit query (offset=" << init.have_pieces_offset_ * 8 << ")"; td::Bitset new_bitset; new_bitset.set_raw(init.have_pieces_.as_slice().str()); size_t offset = init.have_pieces_offset_ * 8; @@ -428,7 +512,13 @@ void PeerActor::execute_get_piece(ton::ton_api::storage_getPiece &get_piece, td: void PeerActor::execute_get_torrent_info(td::Promise promise) { td::BufferSlice result = create_serialize_tl_object( - state_->torrent_info_ready_ ? state_->torrent_info_str_->clone() : td::BufferSlice()); + state_->torrent_info_ready_ ? vm::std_boc_serialize(state_->torrent_info_->as_cell()).move_as_ok() + : td::BufferSlice()); + if (state_->torrent_info_ready_) { + LOG(DEBUG) << "Responding to getTorrentInfo: " << result.size() << " bytes"; + } else { + LOG(DEBUG) << "Responding to getTorrentInfo: no info"; + } promise.set_result(std::move(result)); } } // namespace ton diff --git a/storage/PeerActor.h b/storage/PeerActor.h index fd34bc88..a4229e16 100644 --- a/storage/PeerActor.h +++ b/storage/PeerActor.h @@ -59,6 +59,10 @@ class PeerActor : public td::actor::Actor { // startSession td::uint64 node_session_id_; td::Bitset peer_have_pieces_; + std::shared_ptr torrent_info_; + std::vector> pending_update_peer_parts_; + + void process_update_peer_parts(const tl_object_ptr &update); // update td::optional peer_session_id_; @@ -112,6 +116,7 @@ class PeerActor : public td::actor::Actor { td::BufferSlice create_update_query(ton::tl_object_ptr update); void loop_node_get_piece(); + void node_get_piece_query_ready(PartId part, td::Result R); void loop_peer_get_piece(); diff --git a/storage/PeerState.h b/storage/PeerState.h index c578b544..29c080a2 100644 --- a/storage/PeerState.h +++ b/storage/PeerState.h @@ -23,6 +23,8 @@ #include "td/utils/optional.h" #include "td/actor/actor.h" +#include "TorrentInfo.h" +#include "SpeedLimiter.h" #include #include @@ -100,6 +102,8 @@ struct PeerState { std::atomic_bool peer_state_ready_{false}; std::atomic peer_state_{State{false, false}}; std::atomic_bool peer_online_{false}; + std::atomic_bool peer_online_set_{false}; + SpeedLimiters speed_limiters_; struct Part { td::BufferSlice proof; @@ -121,7 +125,7 @@ struct PeerState { // Node -> Peer std::atomic_bool torrent_info_ready_{false}; - std::shared_ptr torrent_info_str_; + std::shared_ptr torrent_info_; std::function torrent_info_response_callback_; const td::actor::ActorId<> node; diff --git a/storage/SpeedLimiter.cpp b/storage/SpeedLimiter.cpp new file mode 100644 index 00000000..952005fe --- /dev/null +++ b/storage/SpeedLimiter.cpp @@ -0,0 +1,82 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ + +#include "SpeedLimiter.h" + +namespace ton { + +SpeedLimiter::SpeedLimiter(double max_speed) : max_speed_(max_speed) { +} + +void SpeedLimiter::set_max_speed(double max_speed) { + max_speed_ = max_speed; + auto old_queue = std::move(queue_); + unlock_at_ = (queue_.empty() ? td::Timestamp::now() : queue_.front().execute_at_); + queue_ = {}; + while (!old_queue.empty()) { + auto &e = old_queue.front(); + enqueue(e.size_, e.timeout_, std::move(e.promise_)); + old_queue.pop(); + } + process_queue(); +} + +void SpeedLimiter::enqueue(double size, td::Timestamp timeout, td::Promise promise) { + if (max_speed_ < 0.0) { + promise.set_result(td::Unit()); + return; + } + if (max_speed_ == 0.0) { + promise.set_error(td::Status::Error("Speed limit is 0")); + return; + } + if (timeout < unlock_at_) { + promise.set_error(td::Status::Error("Timeout caused by speed limit")); + return; + } + if (queue_.empty() && unlock_at_.is_in_past()) { + unlock_at_ = td::Timestamp::now(); + promise.set_result(td::Unit()); + } else { + queue_.push({unlock_at_, size, timeout, std::move(promise)}); + } + unlock_at_ = td::Timestamp::in(size / max_speed_, unlock_at_); + if (!queue_.empty()) { + alarm_timestamp() = queue_.front().execute_at_; + } +} + +void SpeedLimiter::alarm() { + process_queue(); +} + +void SpeedLimiter::process_queue() { + while (!queue_.empty()) { + auto &e = queue_.front(); + if (e.execute_at_.is_in_past()) { + e.promise_.set_result(td::Unit()); + queue_.pop(); + } else { + break; + } + } + if (!queue_.empty()) { + alarm_timestamp() = queue_.front().execute_at_; + } +} + +} // namespace ton diff --git a/storage/SpeedLimiter.h b/storage/SpeedLimiter.h new file mode 100644 index 00000000..b6230732 --- /dev/null +++ b/storage/SpeedLimiter.h @@ -0,0 +1,52 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ + +#pragma once +#include "td/actor/actor.h" +#include + +namespace ton { + +class SpeedLimiter : public td::actor::Actor { + public: + explicit SpeedLimiter(double max_speed); + + void set_max_speed(double max_speed); // Negative = unlimited + void enqueue(double size, td::Timestamp timeout, td::Promise promise); + + void alarm() override; + + private: + double max_speed_ = -1.0; + td::Timestamp unlock_at_ = td::Timestamp::never(); + + struct Event { + td::Timestamp execute_at_; + double size_; + td::Timestamp timeout_; + td::Promise promise_; + }; + std::queue queue_; + + void process_queue(); +}; + +struct SpeedLimiters { + td::actor::ActorId download, upload; +}; + +} // namespace ton diff --git a/storage/Torrent.cpp b/storage/Torrent.cpp index 1dafb999..6807519c 100644 --- a/storage/Torrent.cpp +++ b/storage/Torrent.cpp @@ -266,7 +266,9 @@ td::Result Torrent::get_piece_data(td::uint64 piece_i) { if (!inited_info_) { return td::Status::Error("Torrent info not inited"); } - CHECK(piece_i < info_.pieces_count()); + if (piece_i >= info_.pieces_count()) { + return td::Status::Error("Piece idx is too big"); + } if (!piece_is_ready_[piece_i]) { return td::Status::Error("Piece is not ready"); } @@ -291,7 +293,9 @@ td::Result> Torrent::get_piece_proof(td::uint64 piece_i) { if (!inited_info_) { return td::Status::Error("Torrent info not inited"); } - CHECK(piece_i < info_.pieces_count()); + if (piece_i >= info_.pieces_count()) { + return td::Status::Error("Piece idx is too big"); + } return merkle_tree_.gen_proof(piece_i, piece_i); } @@ -305,7 +309,9 @@ td::Status Torrent::add_piece(td::uint64 piece_i, td::Slice data, td::Ref= info_.pieces_count()) { + return td::Status::Error("Piece idx is too big"); + } if (piece_is_ready_[piece_i]) { return td::Status::OK(); } diff --git a/storage/TorrentCreator.cpp b/storage/TorrentCreator.cpp index 35fb8212..ab060023 100644 --- a/storage/TorrentCreator.cpp +++ b/storage/TorrentCreator.cpp @@ -29,18 +29,22 @@ #include "TorrentHeader.hpp" namespace ton { +static bool is_dir_slash(char c) { + return (c == TD_DIR_SLASH) | (c == '/'); +} + td::Result Torrent::Creator::create_from_path(Options options, td::CSlice raw_path) { TRY_RESULT(path, td::realpath(raw_path)); TRY_RESULT(stat, td::stat(path)); std::string root_dir = path; - while (!root_dir.empty() && root_dir.back() == TD_DIR_SLASH) { + while (!root_dir.empty() && is_dir_slash(root_dir.back())) { root_dir.pop_back(); } - while (!root_dir.empty() && root_dir.back() != TD_DIR_SLASH) { + while (!root_dir.empty() && !is_dir_slash(root_dir.back())) { root_dir.pop_back(); } if (stat.is_dir_) { - if (!path.empty() && path.back() != TD_DIR_SLASH) { + if (!path.empty() && !is_dir_slash(path.back())) { path += TD_DIR_SLASH; } if (!options.dir_name) { @@ -50,7 +54,20 @@ td::Result Torrent::Creator::create_from_path(Options options, td::CSli td::Status status; auto walk_status = td::WalkPath::run(path, [&](td::CSlice name, td::WalkPath::Type type) { if (type == td::WalkPath::Type::NotDir) { - status = creator.add_file(td::PathView::relative(name, path), name); + std::string rel_name = td::PathView::relative(name, path).str(); + td::Slice file_name = rel_name; + for (size_t i = 0; i < rel_name.size(); ++i) { + if (is_dir_slash(rel_name[i])) { + rel_name[i] = '/'; + file_name = td::Slice(rel_name.c_str() + i + 1, rel_name.c_str() + rel_name.size()); + } + } + // Exclude OS-created files that can be modified automatically and thus break some torrent pieces + if (file_name == ".DS_Store" || td::to_lower(file_name) == "desktop.ini" || + td::to_lower(file_name) == "thumbs.db") { + return td::WalkPath::Action::Continue; + } + status = creator.add_file(rel_name, name); if (status.is_error()) { return td::WalkPath::Action::Abort; } diff --git a/storage/TorrentInfo.cpp b/storage/TorrentInfo.cpp index 48b0d592..26aa092e 100644 --- a/storage/TorrentInfo.cpp +++ b/storage/TorrentInfo.cpp @@ -73,6 +73,12 @@ td::Status TorrentInfo::validate() const { if (description.size() > 1024) { return td::Status::Error("Description is too long"); } + if (piece_size > (1 << 23)) { + return td::Status::Error("Piece size is too big"); + } + if (pieces_count() >= (1ULL << 31)) { + return td::Status::Error("Too many pieces"); + } return td::Status::OK(); } } // namespace ton diff --git a/storage/storage-cli.cpp b/storage/storage-cli.cpp index 43ddbeaf..858433d1 100644 --- a/storage/storage-cli.cpp +++ b/storage/storage-cli.cpp @@ -458,9 +458,9 @@ class StorageCli : public td::actor::Actor { } auto callback = td::make_unique(actor_id(this), ptr->id, std::move(on_completed)); auto context = PeerManager::create_callback(ptr->peer_manager.get()); - ptr->node = - td::actor::create_actor(PSLICE() << "Node#" << self_id, self_id, ptr->torrent.unwrap(), - std::move(callback), std::move(context), nullptr, should_download); + ptr->node = td::actor::create_actor(PSLICE() << "Node#" << self_id, self_id, ptr->torrent.unwrap(), + std::move(callback), std::move(context), nullptr, + ton::SpeedLimiters{}, should_download); td::TerminalIO::out() << "Torrent #" << ptr->id << " started\n"; promise.release().release(); if (promise) { diff --git a/storage/storage-daemon/StorageManager.cpp b/storage/storage-daemon/StorageManager.cpp index 8edd5224..cf93f735 100644 --- a/storage/storage-daemon/StorageManager.cpp +++ b/storage/storage-daemon/StorageManager.cpp @@ -50,6 +50,29 @@ void StorageManager::start_up() { db_ = std::make_shared( std::make_shared(td::RocksDb::open(db_root_ + "/torrent-db").move_as_ok())); + + db::db_get( + *db_, create_hash_tl_object(), true, + [SelfId = actor_id(this)](td::Result> R) { + if (R.is_error()) { + LOG(ERROR) << "Failed to load config from db: " << R.move_as_error(); + td::actor::send_closure(SelfId, &StorageManager::loaded_config_from_db, nullptr); + } else { + td::actor::send_closure(SelfId, &StorageManager::loaded_config_from_db, R.move_as_ok()); + } + }); +} + +void StorageManager::loaded_config_from_db(tl_object_ptr config) { + if (config) { + LOG(INFO) << "Loaded config from DB. Speed limits: download=" << config->download_speed_limit_ + << ", upload=" << config->upload_speed_limit_; + download_speed_limit_ = config->download_speed_limit_; + upload_speed_limit_ = config->upload_speed_limit_; + td::actor::send_closure(download_speed_limiter_, &SpeedLimiter::set_max_speed, download_speed_limit_); + td::actor::send_closure(upload_speed_limiter_, &SpeedLimiter::set_max_speed, upload_speed_limit_); + } + db::db_get( *db_, create_hash_tl_object(), true, [SelfId = actor_id(this)](td::Result> R) { @@ -79,6 +102,7 @@ void StorageManager::load_torrents_from_db(std::vector torrents) { client_mode_, overlays_, adnl_, rldp_); NodeActor::load_from_db( db_, hash, create_callback(hash, entry.closing_state), PeerManager::create_callback(entry.peer_manager.get()), + SpeedLimiters{download_speed_limiter_.get(), upload_speed_limiter_.get()}, [SelfId = actor_id(this), hash, promise = ig.get_promise()](td::Result> R) mutable { td::actor::send_closure(SelfId, &StorageManager::loaded_torrent_from_db, hash, std::move(R)); @@ -162,9 +186,9 @@ td::Status StorageManager::add_torrent_impl(Torrent torrent, bool start_download client_mode_, overlays_, adnl_, rldp_); auto context = PeerManager::create_callback(entry.peer_manager.get()); LOG(INFO) << "Added torrent " << hash.to_hex() << " , root_dir = " << torrent.get_root_dir(); - entry.actor = - td::actor::create_actor("Node", 1, std::move(torrent), create_callback(hash, entry.closing_state), - std::move(context), db_, start_download, allow_upload); + entry.actor = td::actor::create_actor( + "Node", 1, std::move(torrent), create_callback(hash, entry.closing_state), std::move(context), db_, + SpeedLimiters{download_speed_limiter_.get(), upload_speed_limiter_.get()}, start_download, allow_upload); return td::Status::OK(); } @@ -210,6 +234,18 @@ void StorageManager::get_all_torrents(td::Promise> prom promise.set_result(std::move(result)); } +void StorageManager::db_store_config() { + auto config = create_tl_object(); + config->download_speed_limit_ = download_speed_limit_; + config->upload_speed_limit_ = upload_speed_limit_; + db_->set(create_hash_tl_object(), serialize_tl_object(config, true), + [](td::Result R) { + if (R.is_error()) { + LOG(ERROR) << "Failed to save config to db: " << R.move_as_error(); + } + }); +} + void StorageManager::db_store_torrent_list() { std::vector torrents; for (const auto& p : torrents_) { @@ -259,19 +295,55 @@ void StorageManager::load_from(td::Bits256 hash, td::optional meta, std::move(promise)); } +static bool try_rm_empty_dir(const std::string& path) { + auto stat = td::stat(path); + if (stat.is_error() || !stat.ok().is_dir_) { + return true; + } + size_t cnt = 0; + td::WalkPath::run(path, [&](td::CSlice name, td::WalkPath::Type type) { + if (type != td::WalkPath::Type::ExitDir) { + ++cnt; + } + if (cnt < 2) { + return td::WalkPath::Action::Continue; + } else { + return td::WalkPath::Action::Abort; + } + }).ignore(); + if (cnt == 1) { + td::rmdir(path).ignore(); + return true; + } + return false; +} + void StorageManager::on_torrent_closed(Torrent torrent, std::shared_ptr closing_state) { if (!closing_state->removing) { return; } if (closing_state->remove_files && torrent.inited_header()) { + // Ignore all errors: files may just not exist size_t files_count = torrent.get_files_count().unwrap(); for (size_t i = 0; i < files_count; ++i) { std::string path = torrent.get_file_path(i); td::unlink(path).ignore(); - // TODO: Check errors, remove empty directories + std::string name = torrent.get_file_name(i).str(); + for (int j = (int)name.size() - 1; j >= 0; --j) { + if (name[j] == '/') { + name.resize(j + 1); + if (!try_rm_empty_dir(torrent.get_root_dir() + '/' + torrent.get_header().dir_name + '/' + name)) { + break; + } + } + } + if (!torrent.get_header().dir_name.empty()) { + try_rm_empty_dir(torrent.get_root_dir() + '/' + torrent.get_header().dir_name); + } } } - td::rmrf(db_root_ + "/torrent-files/" + torrent.get_hash().to_hex()).ignore(); + std::string path = db_root_ + "/torrent-files/" + torrent.get_hash().to_hex(); + td::rmrf(path).ignore(); NodeActor::cleanup_db(db_, torrent.get_hash(), [promise = std::move(closing_state->promise)](td::Result R) mutable { if (R.is_error()) { @@ -292,4 +364,28 @@ void StorageManager::get_peers_info(td::Bits256 hash, td::actor::send_closure(entry->actor, &NodeActor::get_peers_info, std::move(promise)); } +void StorageManager::get_speed_limits(td::Promise> promise) { + promise.set_result(std::make_pair(download_speed_limit_, upload_speed_limit_)); +} + +void StorageManager::set_download_speed_limit(double max_speed) { + if (max_speed < 0.0) { + max_speed = -1.0; + } + LOG(INFO) << "Set download speed limit to " << max_speed; + download_speed_limit_ = max_speed; + td::actor::send_closure(download_speed_limiter_, &SpeedLimiter::set_max_speed, max_speed); + db_store_config(); +} + +void StorageManager::set_upload_speed_limit(double max_speed) { + if (max_speed < 0.0) { + max_speed = -1.0; + } + LOG(INFO) << "Set upload speed limit to " << max_speed; + upload_speed_limit_ = max_speed; + td::actor::send_closure(upload_speed_limiter_, &SpeedLimiter::set_max_speed, max_speed); + db_store_config(); +} + } // namespace ton \ No newline at end of file diff --git a/storage/storage-daemon/StorageManager.h b/storage/storage-daemon/StorageManager.h index 102e1b9a..fbfd68eb 100644 --- a/storage/storage-daemon/StorageManager.h +++ b/storage/storage-daemon/StorageManager.h @@ -22,6 +22,7 @@ #include "overlay/overlays.h" #include "storage/PeerManager.h" #include "storage/db.h" +#include "SpeedLimiter.h" namespace ton { @@ -63,6 +64,10 @@ class StorageManager : public td::actor::Actor { void wait_for_completion(td::Bits256 hash, td::Promise promise); void get_peers_info(td::Bits256 hash, td::Promise> promise); + void get_speed_limits(td::Promise> promise); // Download, upload + void set_download_speed_limit(double max_speed); + void set_upload_speed_limit(double max_speed); + private: adnl::AdnlNodeIdShort local_id_; std::string db_root_; @@ -89,6 +94,13 @@ class StorageManager : public td::actor::Actor { std::map torrents_; + double download_speed_limit_ = -1.0; + double upload_speed_limit_ = -1.0; + td::actor::ActorOwn download_speed_limiter_ = + td::actor::create_actor("DownloadRateLimitrer", -1.0); + td::actor::ActorOwn upload_speed_limiter_ = + td::actor::create_actor("DownloadRateLimitrer", -1.0); + td::Status add_torrent_impl(Torrent torrent, bool start_download, bool allow_upload); td::Result get_torrent(td::Bits256 hash) { @@ -102,9 +114,11 @@ class StorageManager : public td::actor::Actor { td::unique_ptr create_callback(td::Bits256 hash, std::shared_ptr closing_state); + void loaded_config_from_db(tl_object_ptr config); void load_torrents_from_db(std::vector torrents); void loaded_torrent_from_db(td::Bits256 hash, td::Result> R); void after_load_torrents_from_db(); + void db_store_config(); void db_store_torrent_list(); void on_torrent_closed(Torrent torrent, std::shared_ptr closing_state); diff --git a/storage/storage-daemon/storage-daemon-cli.cpp b/storage/storage-daemon/storage-daemon-cli.cpp index 4d9345a8..9cb1d417 100644 --- a/storage/storage-daemon/storage-daemon-cli.cpp +++ b/storage/storage-daemon/storage-daemon-cli.cpp @@ -107,6 +107,29 @@ std::string size_to_str(td::uint64 size) { return s.as_cslice().str(); } +td::Result str_to_size(std::string s) { + if (!s.empty() && std::tolower(s.back()) == 'b') { + s.pop_back(); + } + int shift = 0; + if (!s.empty()) { + auto c = std::tolower(s.back()); + static std::string suffixes = "kmgtpe"; + for (size_t i = 0; i < suffixes.size(); ++i) { + if (c == suffixes[i]) { + shift = 10 * (int)(i + 1); + s.pop_back(); + break; + } + } + } + TRY_RESULT(x, td::to_integer_safe(s)); + if (td::count_leading_zeroes64(x) < shift) { + return td::Status::Error("Number is too big"); + } + return x << shift; +} + std::string time_to_str(td::uint32 time) { char time_buffer[80]; time_t rawtime = time; @@ -285,10 +308,13 @@ class StorageDaemonCli : public td::actor::Actor { } std::_Exit(0); } else if (tokens[0] == "help") { - if (tokens.size() != 1) { + std::string category; + if (tokens.size() == 2) { + category = tokens[1]; + } else if (tokens.size() != 1) { return td::Status::Error("Unexpected tokens"); } - return execute_help(); + return execute_help(category); } else if (tokens[0] == "setverbosity") { if (tokens.size() != 2) { return td::Status::Error("Expected level"); @@ -460,6 +486,46 @@ class StorageDaemonCli : public td::actor::Actor { return td::Status::Error("Unexpected EOLN"); } return execute_get_peers(hash, json); + } else if (tokens[0] == "get-pieces-info") { + td::Bits256 hash; + bool found_hash = false; + bool json = false; + bool files = false; + td::uint64 offset = 0; + td::optional max_pieces; + for (size_t i = 1; i < tokens.size(); ++i) { + if (!tokens[i].empty() && tokens[i][0] == '-') { + if (tokens[i] == "--json") { + json = true; + continue; + } + if (tokens[i] == "--files") { + files = true; + continue; + } + if (tokens[i] == "--offset") { + TRY_RESULT_PREFIX_ASSIGN(offset, td::to_integer_safe(tokens[i + 1]), "Invalid offset: "); + ++i; + continue; + } + if (tokens[i] == "--max-pieces") { + TRY_RESULT_PREFIX_ASSIGN(max_pieces, td::to_integer_safe(tokens[i + 1]), "Invalid offset: "); + max_pieces.value() = std::min(max_pieces.value(), ((td::uint64)1 << 62)); + ++i; + continue; + } + return td::Status::Error(PSTRING() << "Unknown flag " << tokens[i]); + } + if (found_hash) { + return td::Status::Error("Unexpected token"); + } + TRY_RESULT_ASSIGN(hash, parse_torrent(tokens[i])); + found_hash = true; + } + if (!found_hash) { + return td::Status::Error("Unexpected EOLN"); + } + return execute_get_pieces_info(hash, files, offset, max_pieces, json); } else if (tokens[0] == "download-pause" || tokens[0] == "download-resume") { if (tokens.size() != 2) { return td::Status::Error("Expected bag"); @@ -544,6 +610,48 @@ class StorageDaemonCli : public td::actor::Actor { return td::Status::Error("Unexpected EOLN"); } return execute_load_from(hash, std::move(meta), std::move(path)); + } else if (tokens[0] == "get-speed-limits") { + bool json = false; + for (size_t i = 1; i < tokens.size(); ++i) { + if (!tokens[i].empty() && tokens[i][0] == '-') { + if (tokens[i] == "--json") { + json = true; + continue; + } + return td::Status::Error(PSTRING() << "Unknown flag " << tokens[i]); + } + return td::Status::Error("Unexpected token"); + } + return execute_get_speed_limits(json); + } else if (tokens[0] == "set-speed-limits") { + td::optional download, upload; + for (size_t i = 1; i < tokens.size(); ++i) { + if (!tokens[i].empty() && tokens[i][0] == '-') { + if (tokens[i] == "--download") { + ++i; + if (tokens[i] == "unlimited") { + download = -1.0; + } else { + TRY_RESULT_PREFIX(x, str_to_size(tokens[i]), "Invalid download speed: "); + download = (double)x; + } + continue; + } + if (tokens[i] == "--upload") { + ++i; + if (tokens[i] == "unlimited") { + upload = -1.0; + } else { + TRY_RESULT_PREFIX(x, str_to_size(tokens[i]), "Invalid upload speed: "); + upload = (double)x; + } + continue; + } + return td::Status::Error(PSTRING() << "Unknown flag " << tokens[i]); + } + return td::Status::Error("Unexpected token"); + } + return execute_set_speed_limits(download, upload); } else if (tokens[0] == "new-contract-message") { td::Bits256 hash; std::string file; @@ -659,12 +767,12 @@ class StorageDaemonCli : public td::actor::Actor { continue; } if (tokens[i] == "--min-file-size") { - TRY_RESULT_PREFIX(x, td::to_integer_safe(tokens[i + 1]), "Invalid value for --min-file-size: "); + TRY_RESULT_PREFIX(x, str_to_size(tokens[i + 1]), "Invalid value for --min-file-size: "); new_params.minimal_file_size = x; continue; } if (tokens[i] == "--max-file-size") { - TRY_RESULT_PREFIX(x, td::to_integer_safe(tokens[i + 1]), "Invalid value for --max-file-size: "); + TRY_RESULT_PREFIX(x, str_to_size(tokens[i + 1]), "Invalid value for --max-file-size: "); new_params.maximal_file_size = x; continue; } @@ -708,7 +816,7 @@ class StorageDaemonCli : public td::actor::Actor { continue; } if (tokens[i] == "--max-total-size") { - TRY_RESULT_PREFIX(x, td::to_integer_safe(tokens[i + 1]), "Invalid value for --max-total-size: "); + TRY_RESULT_PREFIX(x, str_to_size(tokens[i + 1]), "Invalid value for --max-total-size: "); new_config.max_total_size = x; continue; } @@ -765,93 +873,111 @@ class StorageDaemonCli : public td::actor::Actor { } } - td::Status execute_help() { - td::TerminalIO::out() << "help\tPrint this help\n"; - td::TerminalIO::out() - << "create [-d description] [--no-upload] [--copy] [--json] \tCreate bag of files from \n"; - td::TerminalIO::out() << "\t-d\tDescription will be stored in torrent info\n"; - td::TerminalIO::out() << "\t--no-upload\tDon't share bag with peers\n"; - td::TerminalIO::out() << "\t--copy\tFiles will be copied to an internal directory of storage-daemon\n"; - td::TerminalIO::out() << "\t--json\tOutput in json\n"; - td::TerminalIO::out() << "add-by-hash [-d root_dir] [--paused] [--no-upload] [--json] [--partial file1 " - "file2 ...]\tAdd bag with given BagID (in hex)\n"; - td::TerminalIO::out() << "\t-d\tTarget directory, default is an internal directory of storage-daemon\n"; - td::TerminalIO::out() << "\t--paused\tDon't start download immediately\n"; - td::TerminalIO::out() << "\t--no-upload\tDon't share bag with peers\n"; - td::TerminalIO::out() - << "\t--partial\tEverything after this flag is a list of filenames. Only these files will be downloaded.\n"; - td::TerminalIO::out() << "\t--json\tOutput in json\n"; - td::TerminalIO::out() << "add-by-meta [-d root_dir] [--paused] [--no-upload] [--json] [--partial file1 " - "file2 ...]\tLoad meta from file and add bag\n"; - td::TerminalIO::out() << "\tFlags are the same as in add-by-hash\n"; - td::TerminalIO::out() << "list [--hashes] [--json]\tPrint list of bags\n"; - td::TerminalIO::out() << "\t--hashes\tPrint full BagID\n"; - td::TerminalIO::out() << "\t--json\tOutput in json\n"; - td::TerminalIO::out() << "get [--json]\tPrint information about \n"; - td::TerminalIO::out() << "\t--json\tOutput in json\n"; - td::TerminalIO::out() << "\tHere and below bags are identified by BagID (in hex) or index (see bag list)\n"; - td::TerminalIO::out() << "get-meta \tSave bag meta of to \n"; - td::TerminalIO::out() << "get-peers [--json]\tPrint a list of peers\n"; - td::TerminalIO::out() << "\t--json\tOutput in json\n"; - td::TerminalIO::out() << "download-pause \tPause download of \n"; - td::TerminalIO::out() << "download-resume \tResume download of \n"; - td::TerminalIO::out() << "upload-pause \tPause upload of \n"; - td::TerminalIO::out() << "upload-resume \tResume upload of \n"; - td::TerminalIO::out() << "priority-all

\tSet priority of all files in to

\n"; - td::TerminalIO::out() << "\tPriority is in [0..255], 0 - don't download\n"; - td::TerminalIO::out() << "priority-idx

\tSet priority of file # in to

\n"; - td::TerminalIO::out() << "\tPriority is in [0..255], 0 - don't download\n"; - td::TerminalIO::out() << "priority-name

\tSet priority of file in to

\n"; - td::TerminalIO::out() << "\tPriority is in [0..255], 0 - don't download\n"; - td::TerminalIO::out() << "remove [--remove-files]\tRemove \n"; - td::TerminalIO::out() << "\t--remove-files - also remove all files\n"; - td::TerminalIO::out() << "load-from [--meta meta] [--files path]\tProvide meta and data for an existing " - "incomplete bag.\n"; - td::TerminalIO::out() << "\t--meta meta\ttorrent info and header will be inited (if not ready) from meta file\n"; - td::TerminalIO::out() << "\t--files path\tdata for files will be taken from here\n"; - td::TerminalIO::out() << "new-contract-message [--query-id id] --provider \tCreate " - "\"new contract message\" for storage provider. Saves message body to .\n"; - td::TerminalIO::out() << "\t\tAddress of storage provider account to take parameters from.\n"; - td::TerminalIO::out() << "new-contract-message [--query-id id] --rate --max-span " - "\tSame thing, but parameters are not fetched automatically.\n"; - td::TerminalIO::out() << "exit\tExit\n"; - td::TerminalIO::out() << "quit\tExit\n"; - td::TerminalIO::out() << "setverbosity \tSet vetbosity to in [0..10]\n"; - td::TerminalIO::out() << "\nStorage provider control:\n"; - td::TerminalIO::out() << "import-pk \tImport private key from \n"; - td::TerminalIO::out() << "deploy-provider\tInit storage provider by deploying a new provider smart contract\n"; - td::TerminalIO::out() - << "init-provider \tInit storage provider using the existing provider smart contract\n"; - td::TerminalIO::out() << "remove-storage-provider\tRemove storage provider\n"; - td::TerminalIO::out() - << "\tSmart contracts in blockchain and bags will remain intact, but they will not be managed anymore\n"; - td::TerminalIO::out() << "get-provider-params [address] [--json]\tPrint parameters of the smart contract\n"; - td::TerminalIO::out() - << "\taddress\tAddress of a smart contract. Default is the provider managed by this daemon.\n"; - td::TerminalIO::out() << "\t--json\tOutput in json\n"; - td::TerminalIO::out() << "set-provider-params [--accept x] [--rate x] [--max-span x] [--min-file-size x] " - "[--max-file-size x]\tSet parameters of the smart contract\n"; - td::TerminalIO::out() << "\t--accept\tAccept new contracts: 0 (no) or 1 (yes)\n"; - td::TerminalIO::out() << "\t--rate\tPrice of storage, nanoTON per MB*day\n"; - td::TerminalIO::out() << "\t--max-span\n"; - td::TerminalIO::out() << "\t--min-file-size\tMinimal total size of a bag of files (bytes)\n"; - td::TerminalIO::out() << "\t--max-file-size\tMaximal total size of a bag of files (bytes)\n"; - td::TerminalIO::out() - << "get-provider-info [--balances] [--contracts] [--json]\tPrint information about storage provider\n"; - td::TerminalIO::out() << "\t--contracts\tPrint list of storage contracts\n"; - td::TerminalIO::out() << "\t--balances\tPrint balances of the main contract and storage contracts\n"; - td::TerminalIO::out() << "\t--json\tOutput in json\n"; - td::TerminalIO::out() - << "set-provider-config [--max-contracts x] [--max-total-size x]\tSet configuration parameters\n"; - td::TerminalIO::out() << "\t--max-contracts\tMaximal number of storage contracts\n"; - td::TerminalIO::out() << "\t--max-total-size\tMaximal total size storage contracts (in bytes)\n"; - td::TerminalIO::out() << "withdraw

\tSend bounty from storage contract
to the main contract\n"; - td::TerminalIO::out() << "withdraw-all\tSend bounty from all storage contracts (where at least 1 TON is available) " - "to the main contract\n"; - td::TerminalIO::out() - << "send-coins
[--message msg]\tSend nanoTON to
from the main contract\n"; - td::TerminalIO::out() - << "close-contract
\tClose storage contract
and delete bag (if possible)\n"; + td::Status execute_help(std::string category) { + if (category == "") { + td::TerminalIO::out() << "create [-d description] [--no-upload] [--copy] [--json] \tCreate bag of " + "files from \n"; + td::TerminalIO::out() << "\t-d\tDescription will be stored in torrent info\n"; + td::TerminalIO::out() << "\t--no-upload\tDon't share bag with peers\n"; + td::TerminalIO::out() << "\t--copy\tFiles will be copied to an internal directory of storage-daemon\n"; + td::TerminalIO::out() << "\t--json\tOutput in json\n"; + td::TerminalIO::out() << "add-by-hash [-d root_dir] [--paused] [--no-upload] [--json] [--partial file1 " + "file2 ...]\tAdd bag with given BagID (in hex)\n"; + td::TerminalIO::out() << "\t-d\tTarget directory, default is an internal directory of storage-daemon\n"; + td::TerminalIO::out() << "\t--paused\tDon't start download immediately\n"; + td::TerminalIO::out() << "\t--no-upload\tDon't share bag with peers\n"; + td::TerminalIO::out() + << "\t--partial\tEverything after this flag is a list of filenames. Only these files will be downloaded.\n"; + td::TerminalIO::out() << "\t--json\tOutput in json\n"; + td::TerminalIO::out() << "add-by-meta [-d root_dir] [--paused] [--no-upload] [--json] [--partial file1 " + "file2 ...]\tLoad meta from file and add bag\n"; + td::TerminalIO::out() << "\tFlags are the same as in add-by-hash\n"; + td::TerminalIO::out() << "list [--hashes] [--json]\tPrint list of bags\n"; + td::TerminalIO::out() << "\t--hashes\tPrint full BagID\n"; + td::TerminalIO::out() << "\t--json\tOutput in json\n"; + td::TerminalIO::out() << "get [--json]\tPrint information about \n"; + td::TerminalIO::out() << "\t--json\tOutput in json\n"; + td::TerminalIO::out() << "\tHere and below bags are identified by BagID (in hex) or index (see bag list)\n"; + td::TerminalIO::out() << "get-meta \tSave bag meta of to \n"; + td::TerminalIO::out() << "get-peers [--json]\tPrint a list of peers\n"; + td::TerminalIO::out() << "get-pieces-info [--files] [--offset l] [--max-pieces m] [--json]\tPrint " + "information about ready pieces\n"; + td::TerminalIO::out() << "\t--files\tShow piece ranges for each file\n"; + td::TerminalIO::out() << "\t--offset l\tShow pieces starting from l (deafault: 0)\n"; + td::TerminalIO::out() << "\t--max-pieces m\tShow no more than m pieces (deafault: unlimited)\n"; + td::TerminalIO::out() << "\t--json\tOutput in json\n"; + td::TerminalIO::out() << "download-pause \tPause download of \n"; + td::TerminalIO::out() << "download-resume \tResume download of \n"; + td::TerminalIO::out() << "upload-pause \tPause upload of \n"; + td::TerminalIO::out() << "upload-resume \tResume upload of \n"; + td::TerminalIO::out() << "priority-all

\tSet priority of all files in to

\n"; + td::TerminalIO::out() << "\tPriority is in [0..255], 0 - don't download\n"; + td::TerminalIO::out() << "priority-idx

\tSet priority of file # in to

\n"; + td::TerminalIO::out() << "\tPriority is in [0..255], 0 - don't download\n"; + td::TerminalIO::out() << "priority-name

\tSet priority of file in to

\n"; + td::TerminalIO::out() << "\tPriority is in [0..255], 0 - don't download\n"; + td::TerminalIO::out() << "remove [--remove-files]\tRemove \n"; + td::TerminalIO::out() << "\t--remove-files - also remove all files\n"; + td::TerminalIO::out() << "load-from [--meta meta] [--files path]\tProvide meta and data for an existing " + "incomplete bag.\n"; + td::TerminalIO::out() << "\t--meta meta\ttorrent info and header will be inited (if not ready) from meta file\n"; + td::TerminalIO::out() << "\t--files path\tdata for files will be taken from here\n"; + td::TerminalIO::out() << "get-speed-limits [--json]\tShow global limits for download and upload speed\n"; + td::TerminalIO::out() << "\t--json\tOutput in json\n"; + td::TerminalIO::out() + << "set-speed-limits [--download x] [--upload x]\tSet global limits for download and upload speed\n"; + td::TerminalIO::out() << "\t--download x\tDownload speed limit in bytes/s, or \"unlimited\"\n"; + td::TerminalIO::out() << "\t--upload x\tUpload speed limit in bytes/s, or \"unlimited\"\n"; + td::TerminalIO::out() << "new-contract-message [--query-id id] --provider \tCreate " + "\"new contract message\" for storage provider. Saves message body to .\n"; + td::TerminalIO::out() << "\t\tAddress of storage provider account to take parameters from.\n"; + td::TerminalIO::out() << "new-contract-message [--query-id id] --rate --max-span " + "\tSame thing, but parameters are not fetched automatically.\n"; + td::TerminalIO::out() << "exit\tExit\n"; + td::TerminalIO::out() << "quit\tExit\n"; + td::TerminalIO::out() << "setverbosity \tSet vetbosity to in [0..10]\n"; + td::TerminalIO::out() << "help\tPrint this help\n"; + td::TerminalIO::out() << "help provider\tcommands for deploying and controling storage provider\n"; + } else if (category == "provider") { + td::TerminalIO::out() << "\nStorage provider control:\n"; + td::TerminalIO::out() << "import-pk \tImport private key from \n"; + td::TerminalIO::out() << "deploy-provider\tInit storage provider by deploying a new provider smart contract\n"; + td::TerminalIO::out() + << "init-provider \tInit storage provider using the existing provider smart contract\n"; + td::TerminalIO::out() << "remove-storage-provider\tRemove storage provider\n"; + td::TerminalIO::out() + << "\tSmart contracts in blockchain and bags will remain intact, but they will not be managed anymore\n"; + td::TerminalIO::out() << "get-provider-params [address] [--json]\tPrint parameters of the smart contract\n"; + td::TerminalIO::out() + << "\taddress\tAddress of a smart contract. Default is the provider managed by this daemon.\n"; + td::TerminalIO::out() << "\t--json\tOutput in json\n"; + td::TerminalIO::out() << "set-provider-params [--accept x] [--rate x] [--max-span x] [--min-file-size x] " + "[--max-file-size x]\tSet parameters of the smart contract\n"; + td::TerminalIO::out() << "\t--accept\tAccept new contracts: 0 (no) or 1 (yes)\n"; + td::TerminalIO::out() << "\t--rate\tPrice of storage, nanoTON per MB*day\n"; + td::TerminalIO::out() << "\t--max-span\n"; + td::TerminalIO::out() << "\t--min-file-size\tMinimal total size of a bag of files (bytes)\n"; + td::TerminalIO::out() << "\t--max-file-size\tMaximal total size of a bag of files (bytes)\n"; + td::TerminalIO::out() + << "get-provider-info [--balances] [--contracts] [--json]\tPrint information about storage provider\n"; + td::TerminalIO::out() << "\t--contracts\tPrint list of storage contracts\n"; + td::TerminalIO::out() << "\t--balances\tPrint balances of the main contract and storage contracts\n"; + td::TerminalIO::out() << "\t--json\tOutput in json\n"; + td::TerminalIO::out() + << "set-provider-config [--max-contracts x] [--max-total-size x]\tSet configuration parameters\n"; + td::TerminalIO::out() << "\t--max-contracts\tMaximal number of storage contracts\n"; + td::TerminalIO::out() << "\t--max-total-size\tMaximal total size storage contracts (in bytes)\n"; + td::TerminalIO::out() << "withdraw

\tSend bounty from storage contract
to the main contract\n"; + td::TerminalIO::out() + << "withdraw-all\tSend bounty from all storage contracts (where at least 1 TON is available) " + "to the main contract\n"; + td::TerminalIO::out() << "send-coins
[--message msg]\tSend nanoTON to
from " + "the main contract\n"; + td::TerminalIO::out() + << "close-contract
\tClose storage contract
and delete bag (if possible)\n"; + } else { + td::TerminalIO::out() << "Unknown command 'help " << category << "'\n"; + } command_finished(td::Status::OK()); return td::Status::OK(); } @@ -871,7 +997,7 @@ class StorageDaemonCli : public td::actor::Actor { td::Status execute_create(std::string path, std::string description, bool upload, bool copy, bool json) { TRY_RESULT_PREFIX_ASSIGN(path, td::realpath(path), "Invalid path: "); - auto query = create_tl_object(path, description, upload, copy); + auto query = create_tl_object(path, description, upload, copy, 0); send_query(std::move(query), [=, SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { @@ -904,7 +1030,7 @@ class StorageDaemonCli : public td::actor::Actor { } } auto query = create_tl_object(hash, std::move(root_dir), !paused, upload, - std::move(priorities)); + std::move(priorities), 0); send_query(std::move(query), [=, SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { @@ -938,7 +1064,7 @@ class StorageDaemonCli : public td::actor::Actor { } } auto query = create_tl_object(std::move(meta), std::move(root_dir), !paused, - upload, std::move(priorities)); + upload, std::move(priorities), 0); send_query(std::move(query), [=, SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { @@ -957,7 +1083,7 @@ class StorageDaemonCli : public td::actor::Actor { } td::Status execute_list(bool with_hashes, bool json) { - auto query = create_tl_object(); + auto query = create_tl_object(0); send_query(std::move(query), [=, SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { @@ -975,7 +1101,7 @@ class StorageDaemonCli : public td::actor::Actor { } td::Status execute_get(td::Bits256 hash, bool json) { - auto query = create_tl_object(hash); + auto query = create_tl_object(hash, 0); send_query(std::move(query), [=, SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { @@ -993,7 +1119,7 @@ class StorageDaemonCli : public td::actor::Actor { } td::Status execute_get_meta(td::Bits256 hash, std::string meta_file) { - auto query = create_tl_object(hash); + auto query = create_tl_object(hash, 0); send_query(std::move(query), [SelfId = actor_id(this), meta_file](td::Result> R) { if (R.is_error()) { @@ -1014,7 +1140,7 @@ class StorageDaemonCli : public td::actor::Actor { } td::Status execute_get_peers(td::Bits256 hash, bool json) { - auto query = create_tl_object(hash); + auto query = create_tl_object(hash, 0); send_query( std::move(query), [=, SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { @@ -1054,6 +1180,63 @@ class StorageDaemonCli : public td::actor::Actor { return td::Status::OK(); } + td::Status execute_get_pieces_info(td::Bits256 hash, bool files, td::uint64 offset, + td::optional max_pieces, bool json) { + auto query = create_tl_object(hash, files ? 1 : 0, offset, + max_pieces ? max_pieces.value() : -1); + send_query(std::move(query), + [=, SelfId = actor_id(this)](td::Result> R) { + if (R.is_error()) { + return; + } + if (json) { + print_json(R.ok()); + td::actor::send_closure(SelfId, &StorageDaemonCli::command_finished, td::Status::OK()); + return; + } + auto obj = R.move_as_ok(); + td::TerminalIO::out() << "BagID " << hash.to_hex() << "\n"; + td::TerminalIO::out() << "Total pieces: " << obj->total_pieces_ << ", piece size: " << obj->piece_size_ + << "\n"; + if (files) { + if (obj->flags_ & 1) { + td::TerminalIO::out() << "Files:\n"; + std::vector> table; + table.push_back({"#####", "Piece range", "Name"}); + size_t i = 0; + for (const auto& f : obj->files_) { + table.push_back({i == 0 ? "" : td::to_string(i - 1), + PSTRING() << "[" << f->range_l_ << ".." << f->range_r_ << ")", + f->name_.empty() ? "[HEADER]" : f->name_}); + ++i; + } + print_table(table, {1, 2}); + } else { + td::TerminalIO::out() << "Cannot show files: torrent header is not available\n"; + } + } + td::uint64 l = obj->range_l_, r = obj->range_r_; + td::TerminalIO::out() << "Pieces [" << l << ".." << r << ")\n"; + if (obj->range_l_ != obj->range_r_) { + std::vector> table; + td::uint64 i = l; + while (i < r) { + td::uint64 ir = std::min(i + 100, r); + std::string s = "["; + for (td::uint64 j = i; j < ir; ++j) { + s += (obj->piece_ready_bitset_[(j - l) / 8] & (1 << ((j - l) % 8)) ? '#' : '-'); + } + s += ']'; + table.push_back({std::to_string(i), s}); + i = ir; + } + print_table(table, {1}); + } + td::actor::send_closure(SelfId, &StorageDaemonCli::command_finished, td::Status::OK()); + }); + return td::Status::OK(); + } + td::Status execute_set_active_download(td::Bits256 hash, bool active) { auto query = create_tl_object(hash, active); send_query(std::move(query), @@ -1156,7 +1339,7 @@ class StorageDaemonCli : public td::actor::Actor { if (!path.empty()) { TRY_RESULT_PREFIX_ASSIGN(path, td::realpath(path), "Invalid path: "); } - auto query = create_tl_object(hash, std::move(meta_data), std::move(path)); + auto query = create_tl_object(hash, std::move(meta_data), std::move(path), 0); send_query(std::move(query), [SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { @@ -1183,6 +1366,59 @@ class StorageDaemonCli : public td::actor::Actor { return td::Status::OK(); } + td::Status execute_get_speed_limits(bool json) { + auto query = create_tl_object(0); + send_query(std::move(query), [=, SelfId = actor_id(this)]( + td::Result> R) { + if (R.is_error()) { + return; + } + if (json) { + print_json(R.ok()); + td::actor::send_closure(SelfId, &StorageDaemonCli::command_finished, td::Status::OK()); + return; + } + auto obj = R.move_as_ok(); + if (obj->download_ < 0.0) { + td::TerminalIO::out() << "Download speed limit: unlimited\n"; + } else { + td::TerminalIO::out() << "Download speed limit: " << td::format::as_size((td::uint64)obj->download_) << "/s\n"; + } + if (obj->upload_ < 0.0) { + td::TerminalIO::out() << "Upload speed limit: unlimited\n"; + } else { + td::TerminalIO::out() << "Upload speed limit: " << td::format::as_size((td::uint64)obj->upload_) << "/s\n"; + } + td::actor::send_closure(SelfId, &StorageDaemonCli::command_finished, td::Status::OK()); + }); + return td::Status::OK(); + } + + td::Status execute_set_speed_limits(td::optional download, td::optional upload) { + if (!download && !upload) { + return td::Status::Error("No parameters are set"); + } + auto query = create_tl_object(); + query->flags_ = 0; + if (download) { + query->flags_ |= 1; + query->download_ = download.value(); + } + if (upload) { + query->flags_ |= 2; + query->upload_ = upload.value(); + } + send_query(std::move(query), + [SelfId = actor_id(this)](td::Result> R) { + if (R.is_error()) { + return; + } + td::TerminalIO::out() << "Speed limits were set\n"; + td::actor::send_closure(SelfId, &StorageDaemonCli::command_finished, td::Status::OK()); + }); + return td::Status::OK(); + } + td::Status execute_new_contract_message(td::Bits256 hash, std::string file, td::uint64 query_id, td::optional provider_address, td::optional rate, td::optional max_span) { @@ -1282,25 +1518,26 @@ class StorageDaemonCli : public td::actor::Actor { td::Status execute_get_provider_params(std::string address, bool json) { auto query = create_tl_object(address); - send_query(std::move(query), - [=, SelfId = actor_id(this)](td::Result> R) { - if (R.is_error()) { - return; - } - if (json) { - print_json(R.ok()); - td::actor::send_closure(SelfId, &StorageDaemonCli::command_finished, td::Status::OK()); - return; - } - auto params = R.move_as_ok(); - td::TerminalIO::out() << "Storage provider parameters:\n"; - td::TerminalIO::out() << "Accept new contracts: " << params->accept_new_contracts_ << "\n"; - td::TerminalIO::out() << "Rate (nanoTON per day*MB): " << params->rate_per_mb_day_ << "\n"; - td::TerminalIO::out() << "Max span: " << (td::uint32)params->max_span_ << "\n"; - td::TerminalIO::out() << "Min file size: " << (td::uint64)params->minimal_file_size_ << "\n"; - td::TerminalIO::out() << "Max file size: " << (td::uint64)params->maximal_file_size_ << "\n"; - td::actor::send_closure(SelfId, &StorageDaemonCli::command_finished, td::Status::OK()); - }); + send_query(std::move(query), [=, SelfId = actor_id(this)]( + td::Result> R) { + if (R.is_error()) { + return; + } + if (json) { + print_json(R.ok()); + td::actor::send_closure(SelfId, &StorageDaemonCli::command_finished, td::Status::OK()); + return; + } + auto params = R.move_as_ok(); + td::TerminalIO::out() << "Storage provider parameters:\n"; + td::TerminalIO::out() << "Accept new contracts: " << params->accept_new_contracts_ << "\n"; + td::TerminalIO::out() << "Rate (nanoTON per day*MB): " << params->rate_per_mb_day_ << "\n"; + td::TerminalIO::out() << "Max span: " << (td::uint32)params->max_span_ << "\n"; + auto min_size = (td::uint64)params->minimal_file_size_, max_size = (td::uint64)params->maximal_file_size_; + td::TerminalIO::out() << "Min file size: " << td::format::as_size(min_size) << " (" << min_size << ")\n"; + td::TerminalIO::out() << "Max file size: " << td::format::as_size(max_size) << " (" << max_size << ")\n"; + td::actor::send_closure(SelfId, &StorageDaemonCli::command_finished, td::Status::OK()); + }); return td::Status::OK(); } @@ -1632,6 +1869,7 @@ class StorageDaemonCli : public td::actor::Actor { add_id(obj.torrent_->hash_); td::TerminalIO::out() << "BagID = " << obj.torrent_->hash_.to_hex() << "\n"; td::TerminalIO::out() << "Index = " << hash_to_id_[obj.torrent_->hash_] << "\n"; + td::TerminalIO::out() << "Added: " << time_to_str(obj.torrent_->added_at_) << "\n"; if (obj.torrent_->flags_ & 4) { // fatal error td::TerminalIO::out() << "FATAL ERROR: " << obj.torrent_->fatal_error_ << "\n"; } diff --git a/storage/storage-daemon/storage-daemon.cpp b/storage/storage-daemon/storage-daemon.cpp index 7563332a..372829f4 100644 --- a/storage/storage-daemon/storage-daemon.cpp +++ b/storage/storage-daemon/storage-daemon.cpp @@ -180,6 +180,9 @@ class StorageDaemon : public td::actor::Actor { dht_id_ = dht_id_full.compute_short_id(); td::actor::send_closure(adnl_, &adnl::Adnl::add_id, dht_id_full, addr_list, static_cast(0)); + LOG(INFO) << "Storage daemon ADNL id is " << local_id_; + LOG(INFO) << "DHT id is " << dht_id_; + if (client_mode_) { auto D = dht::Dht::create_client(dht_id_, db_root_, dht_config_, keyring_.get(), adnl_.get()); D.ensure(); @@ -431,6 +434,52 @@ class StorageDaemon : public td::actor::Actor { })); } + void run_control_query(ton_api::storage_daemon_getTorrentPiecesInfo &query, td::Promise promise) { + td::Bits256 hash = query.hash_; + td::actor::send_closure( + manager_, &StorageManager::with_torrent, hash, + promise.wrap([query = std::move(query)](NodeActor::NodeState state) -> td::Result { + Torrent &torrent = state.torrent; + if (!torrent.inited_info()) { + return td::Status::Error("Torrent info is not available"); + } + td::uint64 total_pieces = torrent.get_info().pieces_count(); + td::uint64 range_l = std::min(total_pieces, query.offset_); + td::uint64 size = query.max_pieces_ != -1 ? std::min(total_pieces - range_l, query.max_pieces_) + : total_pieces - range_l; + td::BufferSlice piece_ready((size + 7) / 8); + std::fill(piece_ready.data(), piece_ready.data() + piece_ready.size(), 0); + for (td::uint64 i = range_l; i < range_l + size; ++i) { + if (torrent.is_piece_ready(i)) { + piece_ready.data()[(i - range_l) / 8] |= (1 << ((i - range_l) % 8)); + } + } + + auto result = create_tl_object(); + result->total_pieces_ = total_pieces; + result->piece_size_ = torrent.get_info().piece_size; + result->range_l_ = range_l; + result->range_r_ = range_l + size; + result->piece_ready_bitset_ = std::move(piece_ready); + + if ((query.flags_ & 1) && torrent.inited_header()) { + result->flags_ = 1; + auto range = torrent.get_header_parts_range(); + result->files_.push_back( + create_tl_object("", range.begin, range.end)); + for (size_t i = 0; i < torrent.get_files_count().value(); ++i) { + auto range = torrent.get_file_parts_range(i); + result->files_.push_back(create_tl_object( + torrent.get_file_name(i).str(), range.begin, range.end)); + } + } else { + result->flags_ = 0; + } + + return serialize_tl_object(result, true); + })); + } + void run_control_query(ton_api::storage_daemon_setFilePriorityAll &query, td::Promise promise) { TRY_RESULT_PROMISE(promise, priority, td::narrow_cast_safe(query.priority_)); td::actor::send_closure(manager_, &StorageManager::set_all_files_priority, query.hash_, priority, @@ -491,6 +540,24 @@ class StorageDaemon : public td::actor::Actor { }); } + void run_control_query(ton_api::storage_daemon_getSpeedLimits &query, td::Promise promise) { + td::actor::send_closure(manager_, &StorageManager::get_speed_limits, + promise.wrap([](std::pair limits) -> td::BufferSlice { + return create_serialize_tl_object(limits.first, + limits.second); + })); + } + + void run_control_query(ton_api::storage_daemon_setSpeedLimits &query, td::Promise promise) { + if (query.flags_ & 1) { + td::actor::send_closure(manager_, &StorageManager::set_download_speed_limit, query.download_); + } + if (query.flags_ & 2) { + td::actor::send_closure(manager_, &StorageManager::set_upload_speed_limit, query.upload_); + } + promise.set_result(create_serialize_tl_object()); + } + void run_control_query(ton_api::storage_daemon_getNewContractMessage &query, td::Promise promise) { td::Promise> P = [promise = std::move(promise), hash = query.hash_, query_id = query.query_id_, @@ -779,6 +846,7 @@ class StorageDaemon : public td::actor::Actor { file->name_ = torrent.get_file_name(i).str(); file->size_ = torrent.get_file_size(i); file->downloaded_size_ = torrent.get_file_ready_size(i); + file->flags_ = 0; obj.files_.push_back(std::move(file)); } } @@ -798,6 +866,7 @@ class StorageDaemon : public td::actor::Actor { obj->active_upload_ = state.active_upload; obj->download_speed_ = state.download_speed; obj->upload_speed_ = state.upload_speed; + obj->added_at_ = state.added_at; promise.set_result(std::move(obj)); }); } @@ -816,6 +885,7 @@ class StorageDaemon : public td::actor::Actor { obj->torrent_->active_upload_ = state.active_upload; obj->torrent_->download_speed_ = state.download_speed; obj->torrent_->upload_speed_ = state.upload_speed; + obj->torrent_->added_at_ = state.added_at; for (size_t i = 0; i < obj->files_.size(); ++i) { obj->files_[i]->priority_ = (i < state.file_priority.size() ? state.file_priority[i] : 1); diff --git a/storage/test/storage.cpp b/storage/test/storage.cpp index b4f67b9b..e7a97352 100644 --- a/storage/test/storage.cpp +++ b/storage/test/storage.cpp @@ -1342,7 +1342,7 @@ TEST(Torrent, Peer) { guard->push_back(td::actor::create_actor( "Node#1", 1, std::move(torrent), td::make_unique(stop_watcher, complete_watcher), - td::make_unique(peer_manager.get(), 1, gen_peers(1, 2)), nullptr)); + td::make_unique(peer_manager.get(), 1, gen_peers(1, 2)), nullptr, ton::SpeedLimiters{})); for (size_t i = 2; i <= peers_n; i++) { ton::Torrent::Options options; options.in_memory = true; @@ -1351,7 +1351,7 @@ TEST(Torrent, Peer) { PSLICE() << "Node#" << i, i, std::move(other_torrent), td::make_unique(stop_watcher, complete_watcher), td::make_unique(peer_manager.get(), i, gen_peers(i, 2)), - nullptr); + nullptr, ton::SpeedLimiters{}); if (i == 3) { td::actor::create_actor("StatsActor", node_actor.get()).release(); diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index e9b41182..66e6f4a6 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -764,9 +764,12 @@ storage.db.key.torrentMeta hash:int256 = storage.db.key.TorrentMeta; storage.db.key.priorities hash:int256 = storage.db.key.Priorities; storage.db.key.piecesInDb hash:int256 = storage.db.key.PiecesInDb; storage.db.key.pieceInDb hash:int256 idx:long = storage.db.key.PieceInDb; +storage.db.key.config = storage.db.key.Config; +storage.db.config flags:# download_speed_limit:double upload_speed_limit:double = storage.db.Config; storage.db.torrentList torrents:(vector int256) = storage.db.TorrentList; storage.db.torrent root_dir:string active_download:Bool active_upload:Bool = storage.db.TorrentShort; +storage.db.torrentV2 flags:# root_dir:string added_at:int active_download:Bool active_upload:Bool = storage.db.TorrentShort; storage.db.priorities actions:(vector storage.PriorityAction) = storage.db.Priorities; storage.db.piecesInDb pieces:(vector long) = storage.db.PiecesInDb; @@ -794,6 +797,7 @@ storage.provider.db.microchunkTree data:bytes = storage.provider.db.MicrochunkTr storage.daemon.queryError message:string = storage.daemon.QueryError; storage.daemon.success = storage.daemon.Success; + storage.daemon.torrent hash:int256 flags:# // 0 - info ready @@ -802,19 +806,29 @@ storage.daemon.torrent total_size:flags.0?long description:flags.0?string files_count:flags.1?long included_size:flags.1?long dir_name:flags.1?string downloaded_size:long - root_dir:string active_download:Bool active_upload:Bool completed:Bool + added_at:int root_dir:string active_download:Bool active_upload:Bool completed:Bool download_speed:double upload_speed:double fatal_error:flags.2?string = storage.daemon.Torrent; + storage.daemon.fileInfo - name:string size:long + name:string size:long flags:# priority:int downloaded_size:long = storage.daemon.FileInfo; + storage.daemon.torrentFull torrent:storage.daemon.torrent files:(vector storage.daemon.fileInfo) = storage.daemon.TorrentFull; storage.daemon.torrentList torrents:(vector storage.daemon.torrent) = storage.daemon.TorrentList; storage.daemon.torrentMeta meta:bytes = storage.daemon.TorrentMeta; +storage.daemon.filePiecesInfo name:string range_l:long range_r:long = storage.daemon.FilePiecesInfo; +storage.daemon.torrentPiecesInfo + flags:# // 0 - with file ranges + total_pieces:long piece_size:int + range_l:long range_r:long piece_ready_bitset:bytes + files:flags.0?(vector storage.daemon.filePiecesInfo) // files[0] is header + = storage.daemon.TorrentPiecesInfo; + storage.daemon.newContractParams rate:string max_span:int = storage.daemon.NewContractParams; storage.daemon.newContractParamsAuto provider_address:string = storage.daemon.NewContractParams; storage.daemon.newContractMessage body:bytes rate:string max_span:int = storage.daemon.NewContractMessage; @@ -827,6 +841,8 @@ storage.daemon.priorityPending = storage.daemon.SetPriorityStatus; storage.daemon.keyHash key_hash:int256 = storage.daemon.KeyHash; +storage.daemon.speedLimits download:double upload:double = storage.daemon.SpeedLimits; + storage.daemon.providerConfig max_contracts:int max_total_size:long = storage.daemon.ProviderConfig; storage.daemon.contractInfo address:string state:int torrent:int256 created_time:int file_size:long downloaded_size:long rate:string max_span:int client_balance:string contract_balance:string = storage.daemon.ContractInfo; @@ -837,24 +853,32 @@ storage.daemon.providerAddress address:string = storage.daemon.ProviderAddress; ---functions--- storage.daemon.setVerbosity verbosity:int = storage.daemon.Success; -storage.daemon.createTorrent path:string description:string allow_upload:Bool copy_inside:Bool = storage.daemon.TorrentFull; -storage.daemon.addByHash hash:int256 root_dir:string start_download:Bool allow_upload:Bool priorities:(vector storage.PriorityAction) = storage.daemon.TorrentFull; -storage.daemon.addByMeta meta:bytes root_dir:string start_download:Bool allow_upload:Bool priorities:(vector storage.PriorityAction) = storage.daemon.TorrentFull; +storage.daemon.createTorrent path:string description:string allow_upload:Bool copy_inside:Bool flags:# = storage.daemon.TorrentFull; +storage.daemon.addByHash hash:int256 root_dir:string start_download:Bool allow_upload:Bool priorities:(vector storage.PriorityAction) flags:# = storage.daemon.TorrentFull; +storage.daemon.addByMeta meta:bytes root_dir:string start_download:Bool allow_upload:Bool priorities:(vector storage.PriorityAction) flags:# = storage.daemon.TorrentFull; storage.daemon.setActiveDownload hash:int256 active:Bool = storage.daemon.Success; storage.daemon.setActiveUpload hash:int256 active:Bool = storage.daemon.Success; -storage.daemon.getTorrents = storage.daemon.TorrentList; -storage.daemon.getTorrentFull hash:int256 = storage.daemon.TorrentFull; -storage.daemon.getTorrentMeta hash:int256 = storage.daemon.TorrentMeta; +storage.daemon.getTorrents flags:# = storage.daemon.TorrentList; +storage.daemon.getTorrentFull hash:int256 flags:# = storage.daemon.TorrentFull; +storage.daemon.getTorrentMeta hash:int256 flags:# = storage.daemon.TorrentMeta; storage.daemon.getNewContractMessage hash:int256 query_id:long params:storage.daemon.NewContractParams = storage.daemon.NewContractMessage; -storage.daemon.getTorrentPeers hash:int256 = storage.daemon.PeerList; +storage.daemon.getTorrentPeers hash:int256 flags:# = storage.daemon.PeerList; +storage.daemon.getTorrentPiecesInfo hash:int256 + flags:# // 0 - with file ranges + offset:long max_pieces:long + = storage.daemon.TorrentPiecesInfo; storage.daemon.setFilePriorityAll hash:int256 priority:int = storage.daemon.SetPriorityStatus; storage.daemon.setFilePriorityByIdx hash:int256 idx:long priority:int = storage.daemon.SetPriorityStatus; storage.daemon.setFilePriorityByName hash:int256 name:string priority:int = storage.daemon.SetPriorityStatus; storage.daemon.removeTorrent hash:int256 remove_files:Bool = storage.daemon.Success; -storage.daemon.loadFrom hash:int256 meta:bytes path:string = storage.daemon.Torrent; +storage.daemon.loadFrom hash:int256 meta:bytes path:string flags:# = storage.daemon.Torrent; + +storage.daemon.getSpeedLimits flags:# = storage.daemon.SpeedLimits; +storage.daemon.setSpeedLimits flags:# download:flags.0?double upload:flags.1?double = storage.daemon.Success; + storage.daemon.importPrivateKey key:PrivateKey = storage.daemon.KeyHash; storage.daemon.initProvider account_address:string = storage.daemon.Success; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index 702fbd6a0c352460d7c5afd18bac916f9a57acda..8ce7a57915b72831287ea6490d8590216472546b 100644 GIT binary patch delta 1608 zcmZvce`p(J7{~9;?wX`Owlw{9OD^`>rZ!#LB(1B6I0y+rPQlTZF7>+}3*yj5&fQTq_zua* zq?Ay?GjdYN1lbnz-o@|RD(pw4lloz{H{F1qR^JRfbguBiTc5I4^3-qE>>&n@1v|iY z(FO-rdAK#|hUKd~1jg#%`PDu9h?jxr>pSWvYjK&rjSEE)9};a=t_3cKIW_%-6`2K5l*EXDukA!Gbt&J0 zXyOd-eHkh&==Fe(5vP5Khl%xxqJ90$kY3l?H%!I_&$X7&P5Fw2dSqC0GEOrD7cA!8 zR!gx+3S1v|!PWdIiFu$pW{rj@J&xE&MwLj&o;tb7-i1zQkDHO)MPHg3c^}{OAUCSh zg`(Y!C;HJD*cx@?MIU16C)ywV0B`jZ5A525HcA#udr_SBezf!wR=miA8~c$JzwSjZ zz>PeuH5#v8Y7ziOAUZD2z{JUMI!b}jMKsMlQ`r<9!PI` zin?YCnf^U^I7LhjyzWCgEA7BXOYP8e!_dDzuPlvbJJWB-oyIPjPc?Z%hA*qGlb&I;P1a)@m3?%+T%8)+@C9IN#IsvqKJ?a fTj1=Pt=SkWdLwBsK+uJq_(?CaWna7-#qIwCIdMMe delta 532 zcmeBK!rCy2mG{wXeJchi__dMuvOLQaC!fg4Pvi|Y2Pil&O+KMy!s5nvE@hJPjmZ)! z6`K#Ja4@pWGS^)@`HZT>eogC<;K3zedamDlwBSs09H;b)HrgIp8WTrD(GOAD3IIl9j!ho>_EIr-A zkTHiPi-&i@^c9AT8jLK{FB&pV23fLwf)S$t6N}4|cXuLLqmOfJY2nGTX* z;a@3e1rnEsm{>c+g z3b2Om-_|MxQBus{Bq8Gu^5FCf=8SJx&J?wEO+R44hzLsN=?^U!lP7OD&H)a