1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-13 11:42:18 +00:00
ton/storage/storage-daemon/StorageManager.cpp
SpyCheese bb21f732fd
Recent updates in storage (#667)
* Fix error handling in Torrent.cpp, improve choosing peers for upload

* Various improvements in storage daemon

"get-pieces-info"
Store "added at"
Improve calculating up/down speed
Improve TL protocol for future compatibility
Remove empty directories on "--remove-files"
Better windows support
Debug logs in PeerActor
More restrictions on TorrentInfo
Bugfixes

* Global speed limits for download and upload

+bugfix

* Reset download/upload speed on changing settings or completion

* Exclude some system files in TorrentCreator
2023-04-07 15:50:07 +03:00

391 lines
No EOL
17 KiB
C++

/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#include "StorageManager.h"
#include "td/utils/filesystem.h"
#include "td/utils/port/path.h"
#include "td/db/RocksDb.h"
#include "td/actor/MultiPromise.h"
namespace ton {
static overlay::OverlayIdFull get_overlay_id(td::Bits256 hash) {
td::BufferSlice hash_str(hash.as_slice());
return overlay::OverlayIdFull(std::move(hash_str));
}
StorageManager::StorageManager(adnl::AdnlNodeIdShort local_id, std::string db_root, td::unique_ptr<Callback> callback,
bool client_mode, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<ton_rldp::Rldp> rldp, td::actor::ActorId<overlay::Overlays> overlays)
: local_id_(local_id)
, db_root_(std::move(db_root))
, callback_(std::move(callback))
, client_mode_(client_mode)
, adnl_(std::move(adnl))
, rldp_(std::move(rldp))
, overlays_(std::move(overlays)) {
}
void StorageManager::start_up() {
CHECK(db_root_ != "");
td::mkdir(db_root_).ensure();
db_root_ = td::realpath(db_root_).move_as_ok();
td::mkdir(db_root_ + "/torrent-db").ensure();
td::mkdir(db_root_ + "/torrent-files").ensure();
LOG(INFO) << "Starting Storage manager. DB = " << db_root_;
db_ = std::make_shared<db::DbType>(
std::make_shared<td::RocksDb>(td::RocksDb::open(db_root_ + "/torrent-db").move_as_ok()));
db::db_get<ton_api::storage_db_config>(
*db_, create_hash_tl_object<ton_api::storage_db_key_config>(), true,
[SelfId = actor_id(this)](td::Result<tl_object_ptr<ton_api::storage_db_config>> R) {
if (R.is_error()) {
LOG(ERROR) << "Failed to load config from db: " << R.move_as_error();
td::actor::send_closure(SelfId, &StorageManager::loaded_config_from_db, nullptr);
} else {
td::actor::send_closure(SelfId, &StorageManager::loaded_config_from_db, R.move_as_ok());
}
});
}
void StorageManager::loaded_config_from_db(tl_object_ptr<ton_api::storage_db_config> config) {
if (config) {
LOG(INFO) << "Loaded config from DB. Speed limits: download=" << config->download_speed_limit_
<< ", upload=" << config->upload_speed_limit_;
download_speed_limit_ = config->download_speed_limit_;
upload_speed_limit_ = config->upload_speed_limit_;
td::actor::send_closure(download_speed_limiter_, &SpeedLimiter::set_max_speed, download_speed_limit_);
td::actor::send_closure(upload_speed_limiter_, &SpeedLimiter::set_max_speed, upload_speed_limit_);
}
db::db_get<ton_api::storage_db_torrentList>(
*db_, create_hash_tl_object<ton_api::storage_db_key_torrentList>(), true,
[SelfId = actor_id(this)](td::Result<tl_object_ptr<ton_api::storage_db_torrentList>> R) {
std::vector<td::Bits256> torrents;
if (R.is_error()) {
LOG(ERROR) << "Failed to load torrent list from db: " << R.move_as_error();
} else {
auto r = R.move_as_ok();
if (r != nullptr) {
torrents = std::move(r->torrents_);
}
}
td::actor::send_closure(SelfId, &StorageManager::load_torrents_from_db, std::move(torrents));
});
}
void StorageManager::load_torrents_from_db(std::vector<td::Bits256> torrents) {
td::MultiPromise mp;
auto ig = mp.init_guard();
ig.add_promise([SelfId = actor_id(this)](td::Result<td::Unit> R) {
td::actor::send_closure(SelfId, &StorageManager::after_load_torrents_from_db);
});
for (auto hash : torrents) {
CHECK(!torrents_.count(hash))
auto& entry = torrents_[hash];
entry.peer_manager = td::actor::create_actor<PeerManager>("PeerManager", local_id_, get_overlay_id(hash),
client_mode_, overlays_, adnl_, rldp_);
NodeActor::load_from_db(
db_, hash, create_callback(hash, entry.closing_state), PeerManager::create_callback(entry.peer_manager.get()),
SpeedLimiters{download_speed_limiter_.get(), upload_speed_limiter_.get()},
[SelfId = actor_id(this), hash,
promise = ig.get_promise()](td::Result<td::actor::ActorOwn<NodeActor>> R) mutable {
td::actor::send_closure(SelfId, &StorageManager::loaded_torrent_from_db, hash, std::move(R));
promise.set_result(td::Unit());
});
}
}
void StorageManager::loaded_torrent_from_db(td::Bits256 hash, td::Result<td::actor::ActorOwn<NodeActor>> R) {
if (R.is_error()) {
LOG(ERROR) << "Failed to load torrent " << hash.to_hex() << " from db: " << R.move_as_error();
torrents_.erase(hash);
} else {
auto it = torrents_.find(hash);
CHECK(it != torrents_.end());
it->second.actor = R.move_as_ok();
LOG(INFO) << "Loaded torrent " << hash.to_hex() << " from db";
}
}
void StorageManager::after_load_torrents_from_db() {
LOG(INFO) << "Finished loading torrents from db (" << torrents_.size() << " torrents)";
db_store_torrent_list();
callback_->on_ready();
}
td::unique_ptr<NodeActor::Callback> StorageManager::create_callback(
td::Bits256 hash, std::shared_ptr<TorrentEntry::ClosingState> closing_state) {
class Callback : public NodeActor::Callback {
public:
Callback(td::actor::ActorId<StorageManager> id, td::Bits256 hash,
std::shared_ptr<TorrentEntry::ClosingState> closing_state)
: id_(std::move(id)), hash_(hash), closing_state_(std::move(closing_state)) {
}
void on_completed() override {
}
void on_closed(Torrent torrent) override {
CHECK(torrent.get_hash() == hash_);
td::actor::send_closure(id_, &StorageManager::on_torrent_closed, std::move(torrent), closing_state_);
}
private:
td::actor::ActorId<StorageManager> id_;
td::Bits256 hash_;
std::shared_ptr<TorrentEntry::ClosingState> closing_state_;
};
return td::make_unique<Callback>(actor_id(this), hash, std::move(closing_state));
}
void StorageManager::add_torrent(Torrent torrent, bool start_download, bool allow_upload, bool copy_inside,
td::Promise<td::Unit> promise) {
td::Bits256 hash = torrent.get_hash();
TRY_STATUS_PROMISE(promise, add_torrent_impl(std::move(torrent), start_download, allow_upload));
db_store_torrent_list();
if (!copy_inside) {
promise.set_result(td::Unit());
return;
}
TorrentEntry& entry = torrents_[hash];
std::string new_dir = db_root_ + "/torrent-files/" + hash.to_hex();
LOG(INFO) << "Copy torrent to " << new_dir;
td::actor::send_closure(
entry.actor, &NodeActor::copy_to_new_root_dir, std::move(new_dir),
[SelfId = actor_id(this), hash, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
LOG(WARNING) << "Copy torrent: " << R.error();
td::actor::send_closure(SelfId, &StorageManager::remove_torrent, hash, false, [](td::Result<td::Unit> R) {});
}
promise.set_result(std::move(R));
});
}
td::Status StorageManager::add_torrent_impl(Torrent torrent, bool start_download, bool allow_upload) {
td::Bits256 hash = torrent.get_hash();
if (torrents_.count(hash)) {
return td::Status::Error(PSTRING() << "Cannot add torrent: duplicate hash " << hash.to_hex());
}
TorrentEntry& entry = torrents_[hash];
entry.hash = hash;
entry.peer_manager = td::actor::create_actor<PeerManager>("PeerManager", local_id_, get_overlay_id(hash),
client_mode_, overlays_, adnl_, rldp_);
auto context = PeerManager::create_callback(entry.peer_manager.get());
LOG(INFO) << "Added torrent " << hash.to_hex() << " , root_dir = " << torrent.get_root_dir();
entry.actor = td::actor::create_actor<NodeActor>(
"Node", 1, std::move(torrent), create_callback(hash, entry.closing_state), std::move(context), db_,
SpeedLimiters{download_speed_limiter_.get(), upload_speed_limiter_.get()}, start_download, allow_upload);
return td::Status::OK();
}
void StorageManager::add_torrent_by_meta(TorrentMeta meta, std::string root_dir, bool start_download, bool allow_upload,
td::Promise<td::Unit> promise) {
td::Bits256 hash(meta.info.get_hash());
Torrent::Options options;
options.root_dir = root_dir.empty() ? db_root_ + "/torrent-files/" + hash.to_hex() : root_dir;
TRY_RESULT_PROMISE(promise, torrent, Torrent::open(std::move(options), std::move(meta)));
add_torrent(std::move(torrent), start_download, allow_upload, false, std::move(promise));
}
void StorageManager::add_torrent_by_hash(td::Bits256 hash, std::string root_dir, bool start_download, bool allow_upload,
td::Promise<td::Unit> promise) {
Torrent::Options options;
options.root_dir = root_dir.empty() ? db_root_ + "/torrent-files/" + hash.to_hex() : root_dir;
TRY_RESULT_PROMISE(promise, torrent, Torrent::open(std::move(options), hash));
add_torrent(std::move(torrent), start_download, allow_upload, false, std::move(promise));
}
void StorageManager::set_active_download(td::Bits256 hash, bool active, td::Promise<td::Unit> promise) {
TRY_RESULT_PROMISE(promise, entry, get_torrent(hash));
td::actor::send_closure(entry->actor, &NodeActor::set_should_download, active);
promise.set_result(td::Unit());
}
void StorageManager::set_active_upload(td::Bits256 hash, bool active, td::Promise<td::Unit> promise) {
TRY_RESULT_PROMISE(promise, entry, get_torrent(hash));
td::actor::send_closure(entry->actor, &NodeActor::set_should_upload, active);
promise.set_result(td::Unit());
}
void StorageManager::with_torrent(td::Bits256 hash, td::Promise<NodeActor::NodeState> promise) {
TRY_RESULT_PROMISE(promise, entry, get_torrent(hash));
td::actor::send_closure(entry->actor, &NodeActor::with_torrent, std::move(promise));
}
void StorageManager::get_all_torrents(td::Promise<std::vector<td::Bits256>> promise) {
std::vector<td::Bits256> result;
for (const auto& p : torrents_) {
result.push_back(p.first);
}
promise.set_result(std::move(result));
}
void StorageManager::db_store_config() {
auto config = create_tl_object<ton_api::storage_db_config>();
config->download_speed_limit_ = download_speed_limit_;
config->upload_speed_limit_ = upload_speed_limit_;
db_->set(create_hash_tl_object<ton_api::storage_db_key_config>(), serialize_tl_object(config, true),
[](td::Result<td::Unit> R) {
if (R.is_error()) {
LOG(ERROR) << "Failed to save config to db: " << R.move_as_error();
}
});
}
void StorageManager::db_store_torrent_list() {
std::vector<td::Bits256> torrents;
for (const auto& p : torrents_) {
torrents.push_back(p.first);
}
db_->set(create_hash_tl_object<ton_api::storage_db_key_torrentList>(),
create_serialize_tl_object<ton_api::storage_db_torrentList>(std::move(torrents)),
[](td::Result<td::Unit> R) {
if (R.is_error()) {
LOG(ERROR) << "Failed to save torrent list to db: " << R.move_as_error();
}
});
}
void StorageManager::set_all_files_priority(td::Bits256 hash, td::uint8 priority, td::Promise<bool> promise) {
TRY_RESULT_PROMISE(promise, entry, get_torrent(hash));
td::actor::send_closure(entry->actor, &NodeActor::set_all_files_priority, priority, std::move(promise));
}
void StorageManager::set_file_priority_by_idx(td::Bits256 hash, size_t idx, td::uint8 priority,
td::Promise<bool> promise) {
TRY_RESULT_PROMISE(promise, entry, get_torrent(hash));
td::actor::send_closure(entry->actor, &NodeActor::set_file_priority_by_idx, idx, priority, std::move(promise));
}
void StorageManager::set_file_priority_by_name(td::Bits256 hash, std::string name, td::uint8 priority,
td::Promise<bool> promise) {
TRY_RESULT_PROMISE(promise, entry, get_torrent(hash));
td::actor::send_closure(entry->actor, &NodeActor::set_file_priority_by_name, std::move(name), priority,
std::move(promise));
}
void StorageManager::remove_torrent(td::Bits256 hash, bool remove_files, td::Promise<td::Unit> promise) {
TRY_RESULT_PROMISE(promise, entry, get_torrent(hash));
LOG(INFO) << "Removing torrent " << hash.to_hex();
entry->closing_state->removing = true;
entry->closing_state->remove_files = remove_files;
entry->closing_state->promise = std::move(promise);
torrents_.erase(hash);
db_store_torrent_list();
}
void StorageManager::load_from(td::Bits256 hash, td::optional<TorrentMeta> meta, std::string files_path,
td::Promise<td::Unit> promise) {
TRY_RESULT_PROMISE(promise, entry, get_torrent(hash));
td::actor::send_closure(entry->actor, &NodeActor::load_from, std::move(meta), std::move(files_path),
std::move(promise));
}
static bool try_rm_empty_dir(const std::string& path) {
auto stat = td::stat(path);
if (stat.is_error() || !stat.ok().is_dir_) {
return true;
}
size_t cnt = 0;
td::WalkPath::run(path, [&](td::CSlice name, td::WalkPath::Type type) {
if (type != td::WalkPath::Type::ExitDir) {
++cnt;
}
if (cnt < 2) {
return td::WalkPath::Action::Continue;
} else {
return td::WalkPath::Action::Abort;
}
}).ignore();
if (cnt == 1) {
td::rmdir(path).ignore();
return true;
}
return false;
}
void StorageManager::on_torrent_closed(Torrent torrent, std::shared_ptr<TorrentEntry::ClosingState> closing_state) {
if (!closing_state->removing) {
return;
}
if (closing_state->remove_files && torrent.inited_header()) {
// Ignore all errors: files may just not exist
size_t files_count = torrent.get_files_count().unwrap();
for (size_t i = 0; i < files_count; ++i) {
std::string path = torrent.get_file_path(i);
td::unlink(path).ignore();
std::string name = torrent.get_file_name(i).str();
for (int j = (int)name.size() - 1; j >= 0; --j) {
if (name[j] == '/') {
name.resize(j + 1);
if (!try_rm_empty_dir(torrent.get_root_dir() + '/' + torrent.get_header().dir_name + '/' + name)) {
break;
}
}
}
if (!torrent.get_header().dir_name.empty()) {
try_rm_empty_dir(torrent.get_root_dir() + '/' + torrent.get_header().dir_name);
}
}
}
std::string path = db_root_ + "/torrent-files/" + torrent.get_hash().to_hex();
td::rmrf(path).ignore();
NodeActor::cleanup_db(db_, torrent.get_hash(),
[promise = std::move(closing_state->promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
LOG(ERROR) << "Failed to cleanup database: " << R.move_as_error();
}
promise.set_result(td::Unit());
});
}
void StorageManager::wait_for_completion(td::Bits256 hash, td::Promise<td::Unit> promise) {
TRY_RESULT_PROMISE(promise, entry, get_torrent(hash));
td::actor::send_closure(entry->actor, &NodeActor::wait_for_completion, std::move(promise));
}
void StorageManager::get_peers_info(td::Bits256 hash,
td::Promise<tl_object_ptr<ton_api::storage_daemon_peerList>> promise) {
TRY_RESULT_PROMISE(promise, entry, get_torrent(hash));
td::actor::send_closure(entry->actor, &NodeActor::get_peers_info, std::move(promise));
}
void StorageManager::get_speed_limits(td::Promise<std::pair<double, double>> promise) {
promise.set_result(std::make_pair(download_speed_limit_, upload_speed_limit_));
}
void StorageManager::set_download_speed_limit(double max_speed) {
if (max_speed < 0.0) {
max_speed = -1.0;
}
LOG(INFO) << "Set download speed limit to " << max_speed;
download_speed_limit_ = max_speed;
td::actor::send_closure(download_speed_limiter_, &SpeedLimiter::set_max_speed, max_speed);
db_store_config();
}
void StorageManager::set_upload_speed_limit(double max_speed) {
if (max_speed < 0.0) {
max_speed = -1.0;
}
LOG(INFO) << "Set upload speed limit to " << max_speed;
upload_speed_limit_ = max_speed;
td::actor::send_closure(upload_speed_limiter_, &SpeedLimiter::set_max_speed, max_speed);
db_store_config();
}
} // namespace ton