mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	* Fix error handling in Torrent.cpp, improve choosing peers for upload * Various improvements in storage daemon "get-pieces-info" Store "added at" Improve calculating up/down speed Improve TL protocol for future compatibility Remove empty directories on "--remove-files" Better windows support Debug logs in PeerActor More restrictions on TorrentInfo Bugfixes * Global speed limits for download and upload +bugfix * Reset download/upload speed on changing settings or completion * Exclude some system files in TorrentCreator
		
			
				
	
	
		
			1117 lines
		
	
	
	
		
			40 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1117 lines
		
	
	
	
		
			40 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
    This file is part of TON Blockchain Library.
 | 
						|
 | 
						|
    TON Blockchain Library is free software: you can redistribute it and/or modify
 | 
						|
    it under the terms of the GNU Lesser General Public License as published by
 | 
						|
    the Free Software Foundation, either version 2 of the License, or
 | 
						|
    (at your option) any later version.
 | 
						|
 | 
						|
    TON Blockchain Library is distributed in the hope that it will be useful,
 | 
						|
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
    GNU Lesser General Public License for more details.
 | 
						|
 | 
						|
    You should have received a copy of the GNU Lesser General Public License
 | 
						|
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
    Copyright 2017-2020 Telegram Systems LLP
 | 
						|
*/
 | 
						|
 | 
						|
#include "NodeActor.h"
 | 
						|
 | 
						|
#include "vm/boc.h"
 | 
						|
#include "vm/cellslice.h"
 | 
						|
 | 
						|
#include "td/utils/Enumerator.h"
 | 
						|
#include "td/utils/tests.h"
 | 
						|
#include "td/utils/overloaded.h"
 | 
						|
#include "tl-utils/tl-utils.hpp"
 | 
						|
#include "auto/tl/ton_api.hpp"
 | 
						|
#include "td/actor/MultiPromise.h"
 | 
						|
 | 
						|
namespace ton {
 | 
						|
NodeActor::NodeActor(PeerId self_id, Torrent torrent, td::unique_ptr<Callback> callback,
 | 
						|
                     td::unique_ptr<NodeCallback> node_callback, std::shared_ptr<db::DbType> db,
 | 
						|
                     SpeedLimiters speed_limiters, bool should_download, bool should_upload)
 | 
						|
    : self_id_(self_id)
 | 
						|
    , torrent_(std::move(torrent))
 | 
						|
    , callback_(std::move(callback))
 | 
						|
    , node_callback_(std::move(node_callback))
 | 
						|
    , db_(std::move(db))
 | 
						|
    , should_download_(should_download)
 | 
						|
    , should_upload_(should_upload)
 | 
						|
    , added_at_((td::uint32)td::Clocks::system())
 | 
						|
    , speed_limiters_(std::move(speed_limiters)) {
 | 
						|
}
 | 
						|
 | 
						|
NodeActor::NodeActor(PeerId self_id, ton::Torrent torrent, td::unique_ptr<Callback> callback,
 | 
						|
                     td::unique_ptr<NodeCallback> node_callback, std::shared_ptr<db::DbType> db,
 | 
						|
                     SpeedLimiters speed_limiters, bool should_download, bool should_upload,
 | 
						|
                     DbInitialData db_initial_data)
 | 
						|
    : self_id_(self_id)
 | 
						|
    , torrent_(std::move(torrent))
 | 
						|
    , callback_(std::move(callback))
 | 
						|
    , node_callback_(std::move(node_callback))
 | 
						|
    , db_(std::move(db))
 | 
						|
    , should_download_(should_download)
 | 
						|
    , should_upload_(should_upload)
 | 
						|
    , added_at_(db_initial_data.added_at)
 | 
						|
    , speed_limiters_(std::move(speed_limiters))
 | 
						|
    , pending_set_file_priority_(std::move(db_initial_data.priorities))
 | 
						|
    , pieces_in_db_(std::move(db_initial_data.pieces_in_db)) {
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::start_peer(PeerId peer_id, td::Promise<td::actor::ActorId<PeerActor>> promise) {
 | 
						|
  peers_[peer_id];
 | 
						|
  loop();
 | 
						|
  auto it = peers_.find(peer_id);
 | 
						|
  if (it == peers_.end() || it->second.actor.empty()) {
 | 
						|
    promise.set_error(td::Status::Error("Won't start peer now"));
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  promise.set_value(it->second.actor.get());
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::on_signal_from_peer(PeerId peer_id) {
 | 
						|
  auto it = peers_.find(peer_id);
 | 
						|
  if (it == peers_.end()) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  loop_peer(peer_id, it->second);
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::start_up() {
 | 
						|
  node_callback_->register_self(actor_id(this));
 | 
						|
  db_store_torrent();
 | 
						|
  if (torrent_.inited_info()) {
 | 
						|
    init_torrent();
 | 
						|
  }
 | 
						|
  loop();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::init_torrent() {
 | 
						|
  auto pieces_count = torrent_.get_info().pieces_count();
 | 
						|
  parts_helper_.init_parts_count(pieces_count);
 | 
						|
  parts_.parts.resize(pieces_count);
 | 
						|
 | 
						|
  auto header = torrent_.get_header_parts_range();
 | 
						|
  for (auto i = static_cast<td::uint32>(header.begin); i < header.end; i++) {
 | 
						|
    parts_helper_.set_part_priority(i, 255);
 | 
						|
  }
 | 
						|
  for (td::uint32 i = 0; i < pieces_count; i++) {
 | 
						|
    if (torrent_.is_piece_ready(i)) {
 | 
						|
      on_part_ready(i);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  torrent_info_shared_ = std::make_shared<TorrentInfo>(torrent_.get_info());
 | 
						|
  for (auto &p : peers_) {
 | 
						|
    auto &state = p.second.state;
 | 
						|
    state->torrent_info_ = torrent_info_shared_;
 | 
						|
    CHECK(!state->torrent_info_ready_.exchange(true));
 | 
						|
    state->notify_peer();
 | 
						|
  }
 | 
						|
  LOG(INFO) << "Inited torrent info for " << torrent_.get_hash().to_hex() << ": size=" << torrent_.get_info().file_size
 | 
						|
            << ", pieces=" << torrent_.get_info().pieces_count();
 | 
						|
  if (torrent_.inited_header()) {
 | 
						|
    init_torrent_header();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::init_torrent_header() {
 | 
						|
  if (header_ready_) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  header_ready_ = true;
 | 
						|
  size_t files_count = torrent_.get_files_count().unwrap();
 | 
						|
  for (size_t i = 0; i < files_count; ++i) {
 | 
						|
    file_name_to_idx_[torrent_.get_file_name(i).str()] = i;
 | 
						|
  }
 | 
						|
  db_store_priorities_paused_ = true;
 | 
						|
  file_priority_.resize(files_count, 1);
 | 
						|
  for (auto &s : pending_set_file_priority_) {
 | 
						|
    td::Promise<bool> P = [](td::Result<bool>) {};
 | 
						|
    s.file.visit(
 | 
						|
        td::overloaded([&](const PendingSetFilePriority::All &) { set_all_files_priority(s.priority, std::move(P)); },
 | 
						|
                       [&](const size_t &i) { set_file_priority_by_idx(i, s.priority, std::move(P)); },
 | 
						|
                       [&](const std::string &name) { set_file_priority_by_name(name, s.priority, std::move(P)); }));
 | 
						|
  }
 | 
						|
  pending_set_file_priority_.clear();
 | 
						|
  torrent_.enable_write_to_files();
 | 
						|
  db_store_priorities_paused_ = false;
 | 
						|
  db_store_priorities();
 | 
						|
 | 
						|
  auto pieces = pieces_in_db_;
 | 
						|
  for (td::uint64 p : pieces) {
 | 
						|
    if (!torrent_.is_piece_in_memory(p)) {
 | 
						|
      db_erase_piece(p);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  for (td::uint64 p : torrent_.get_pieces_in_memory()) {
 | 
						|
    if (!pieces_in_db_.count(p)) {
 | 
						|
      db_store_piece(p, torrent_.get_piece_data(p).move_as_ok());
 | 
						|
    }
 | 
						|
  }
 | 
						|
  db_update_pieces_list();
 | 
						|
  recheck_parts(Torrent::PartsRange{0, torrent_.get_info().pieces_count()});
 | 
						|
  db_store_torrent_meta();
 | 
						|
 | 
						|
  LOG(INFO) << "Inited torrent header for " << torrent_.get_hash().to_hex()
 | 
						|
            << ": files=" << torrent_.get_files_count().value() << ", included_size=" << torrent_.get_included_size();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::recheck_parts(Torrent::PartsRange range) {
 | 
						|
  CHECK(torrent_.inited_info());
 | 
						|
  for (size_t i = range.begin; i < range.end; ++i) {
 | 
						|
    if (parts_.parts[i].ready && !torrent_.is_piece_ready(i)) {
 | 
						|
      parts_helper_.on_self_part_not_ready(i);
 | 
						|
      parts_.parts[i].ready = false;
 | 
						|
    } else if (!parts_.parts[i].ready && torrent_.is_piece_ready(i)) {
 | 
						|
      on_part_ready((PartId)i);
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::loop_will_upload() {
 | 
						|
  if (peers_.empty()) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  if (!will_upload_at_.is_in_past()) {
 | 
						|
    alarm_timestamp().relax(will_upload_at_);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  will_upload_at_ = td::Timestamp::in(5);
 | 
						|
  alarm_timestamp().relax(will_upload_at_);
 | 
						|
  std::vector<std::tuple<bool, bool, double, PeerId>> peers;
 | 
						|
  for (auto &it : peers_) {
 | 
						|
    auto &state = it.second.state;
 | 
						|
    bool needed = false;
 | 
						|
    if (state->peer_state_ready_) {
 | 
						|
      needed = state->peer_state_.load().want_download && state->peer_online_;
 | 
						|
    }
 | 
						|
    peers.emplace_back(!needed, !state->node_state_.load().want_download, -it.second.download_speed.speed(), it.first);
 | 
						|
  }
 | 
						|
  std::sort(peers.begin(), peers.end());
 | 
						|
 | 
						|
  if (peers.size() > 5) {
 | 
						|
    std::swap(peers[4], peers[td::Random::fast(5, (int)peers.size() - 1)]);
 | 
						|
    peers.resize(5);
 | 
						|
  }
 | 
						|
 | 
						|
  std::set<PeerId> peers_set;
 | 
						|
  for (auto id : peers) {
 | 
						|
    peers_set.insert(std::get<PeerId>(id));
 | 
						|
  }
 | 
						|
 | 
						|
  for (auto &it : peers_) {
 | 
						|
    auto will_upload = peers_set.count(it.first) > 0 && should_upload_;
 | 
						|
    auto &state = it.second.state;
 | 
						|
    auto node_state = state->node_state_.load();
 | 
						|
    if (node_state.will_upload != will_upload) {
 | 
						|
      node_state.will_upload = will_upload;
 | 
						|
      state->node_state_.exchange(node_state);
 | 
						|
      state->notify_peer();
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::loop() {
 | 
						|
  loop_get_peers();
 | 
						|
  loop_start_stop_peers();
 | 
						|
  if (torrent_.inited_info()) {
 | 
						|
    loop_queries();
 | 
						|
    loop_will_upload();
 | 
						|
  }
 | 
						|
 | 
						|
  if (!ready_parts_.empty()) {
 | 
						|
    for (auto &it : peers_) {
 | 
						|
      auto &state = it.second.state;
 | 
						|
      state->node_ready_parts_.add_elements(ready_parts_);
 | 
						|
      state->notify_peer();
 | 
						|
    }
 | 
						|
    ready_parts_.clear();
 | 
						|
  }
 | 
						|
 | 
						|
  if (next_db_store_meta_at_ && next_db_store_meta_at_.is_in_past()) {
 | 
						|
    db_store_torrent_meta();
 | 
						|
  }
 | 
						|
 | 
						|
  if (torrent_.get_fatal_error().is_error()) {
 | 
						|
    for (auto &promise : wait_for_completion_) {
 | 
						|
      promise.set_error(torrent_.get_fatal_error().clone());
 | 
						|
    }
 | 
						|
    wait_for_completion_.clear();
 | 
						|
  } else if (torrent_.is_completed()) {
 | 
						|
    db_store_torrent_meta();
 | 
						|
    if (!is_completed_) {
 | 
						|
      for (auto &promise : wait_for_completion_) {
 | 
						|
        promise.set_result(td::Unit());
 | 
						|
      }
 | 
						|
      wait_for_completion_.clear();
 | 
						|
      is_completed_ = true;
 | 
						|
      download_speed_.reset();
 | 
						|
      for (auto &peer : peers_) {
 | 
						|
        peer.second.download_speed.reset();
 | 
						|
      }
 | 
						|
      callback_->on_completed();
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
std::string NodeActor::get_stats_str() {
 | 
						|
  td::StringBuilder sb;
 | 
						|
  sb << "Node " << self_id_ << " " << torrent_.get_ready_parts_count() << "\t" << download_speed_;
 | 
						|
  sb << "\toutq " << parts_.total_queries;
 | 
						|
  sb << "\n";
 | 
						|
  for (auto &it : peers_) {
 | 
						|
    auto &state = it.second.state;
 | 
						|
    sb << "\tPeer " << it.first;
 | 
						|
    if (torrent_.inited_info()) {
 | 
						|
      sb << "\t" << parts_helper_.get_ready_parts(it.second.peer_token).ones_count();
 | 
						|
    }
 | 
						|
    sb << "\t" << it.second.download_speed;
 | 
						|
    if (state->peer_state_ready_) {
 | 
						|
      auto peer_state = state->peer_state_.load();
 | 
						|
      sb << "\t  up:" << peer_state.will_upload;
 | 
						|
      sb << "\tdown:" << peer_state.want_download;
 | 
						|
      if (torrent_.inited_info()) {
 | 
						|
        sb << "\tcnt:" << parts_helper_.get_want_download_count(it.second.peer_token);
 | 
						|
      }
 | 
						|
    }
 | 
						|
    sb << "\toutq:" << state->node_queries_active_.size();
 | 
						|
    auto node_state = state->node_state_.load();
 | 
						|
    sb << "\tNup:" << node_state.will_upload;
 | 
						|
    sb << "\tNdown:" << node_state.want_download;
 | 
						|
    sb << "\n";
 | 
						|
  }
 | 
						|
 | 
						|
  auto o_n = torrent_.get_files_count();
 | 
						|
  if (o_n) {
 | 
						|
    // by default all parts priority == 1
 | 
						|
    auto n = o_n.unwrap();
 | 
						|
    file_priority_.resize(n, 1);
 | 
						|
    for (size_t i = 0; i < n; i++) {
 | 
						|
      auto size = torrent_.get_file_size(i);
 | 
						|
      auto ready_size = torrent_.get_file_ready_size(i);
 | 
						|
      sb << "#" << i << " " << torrent_.get_file_name(i) << "\t" << 100 * ready_size / size << "%%  "
 | 
						|
         << td::format::as_size(ready_size) << "/" << td::format::as_size(size) << "\t priority=" << file_priority_[i]
 | 
						|
         << "\n";
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return sb.as_cslice().str();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::set_all_files_priority(td::uint8 priority, td::Promise<bool> promise) {
 | 
						|
  if (!header_ready_) {
 | 
						|
    pending_set_file_priority_.clear();
 | 
						|
    pending_set_file_priority_.push_back(PendingSetFilePriority{PendingSetFilePriority::All(), priority});
 | 
						|
    db_store_priorities();
 | 
						|
    promise.set_result(false);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  auto header_range = torrent_.get_header_parts_range();
 | 
						|
  for (td::uint32 i = 0; i < torrent_.get_info().pieces_count(); i++) {
 | 
						|
    if (!header_range.contains(i)) {
 | 
						|
      parts_helper_.set_part_priority(i, priority);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  for (size_t i = 0; i < file_priority_.size(); ++i) {
 | 
						|
    file_priority_[i] = priority;
 | 
						|
    torrent_.set_file_excluded(i, priority == 0);
 | 
						|
  }
 | 
						|
  recheck_parts(Torrent::PartsRange{0, torrent_.get_info().pieces_count()});
 | 
						|
  db_store_priorities();
 | 
						|
  update_pieces_in_db(0, torrent_.get_info().pieces_count());
 | 
						|
  if (!torrent_.is_completed()) {
 | 
						|
    is_completed_ = false;
 | 
						|
  }
 | 
						|
  promise.set_result(true);
 | 
						|
  yield();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::set_file_priority_by_idx(size_t i, td::uint8 priority, td::Promise<bool> promise) {
 | 
						|
  if (!header_ready_) {
 | 
						|
    pending_set_file_priority_.push_back(PendingSetFilePriority{i, priority});
 | 
						|
    db_store_priorities();
 | 
						|
    promise.set_result(false);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  auto files_count = torrent_.get_files_count().unwrap();
 | 
						|
  if (i >= files_count) {
 | 
						|
    promise.set_error(td::Status::Error("File index is too big"));
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  if (file_priority_[i] == priority) {
 | 
						|
    promise.set_result(true);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  file_priority_[i] = priority;
 | 
						|
  torrent_.set_file_excluded(i, priority == 0);
 | 
						|
  auto range = torrent_.get_file_parts_range(i);
 | 
						|
  recheck_parts(range);
 | 
						|
  update_pieces_in_db(range.begin, range.end);
 | 
						|
  for (auto i = range.begin; i < range.end; i++) {
 | 
						|
    if (i == range.begin || i + 1 == range.end) {
 | 
						|
      auto chunks = torrent_.chunks_by_piece(i);
 | 
						|
      td::uint8 max_priority = 0;
 | 
						|
      for (auto chunk_id : chunks) {
 | 
						|
        if (chunk_id == 0) {
 | 
						|
          max_priority = 255;
 | 
						|
        } else {
 | 
						|
          max_priority = td::max(max_priority, file_priority_[chunk_id - 1]);
 | 
						|
        }
 | 
						|
      }
 | 
						|
      parts_helper_.set_part_priority(i, max_priority);
 | 
						|
    } else {
 | 
						|
      parts_helper_.set_part_priority(i, priority);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  db_store_priorities();
 | 
						|
  if (!torrent_.is_completed()) {
 | 
						|
    is_completed_ = false;
 | 
						|
  }
 | 
						|
  promise.set_result(true);
 | 
						|
  yield();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::set_file_priority_by_name(std::string name, td::uint8 priority, td::Promise<bool> promise) {
 | 
						|
  if (!header_ready_) {
 | 
						|
    pending_set_file_priority_.push_back(PendingSetFilePriority{name, priority});
 | 
						|
    db_store_priorities();
 | 
						|
    promise.set_result(false);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  auto it = file_name_to_idx_.find(name);
 | 
						|
  if (it == file_name_to_idx_.end()) {
 | 
						|
    promise.set_error(td::Status::Error("No such file"));
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  set_file_priority_by_idx(it->second, priority, std::move(promise));
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::wait_for_completion(td::Promise<td::Unit> promise) {
 | 
						|
  if (torrent_.get_fatal_error().is_error()) {
 | 
						|
    promise.set_error(torrent_.get_fatal_error().clone());
 | 
						|
  } else if (is_completed_) {
 | 
						|
    promise.set_result(td::Unit());
 | 
						|
  } else {
 | 
						|
    wait_for_completion_.push_back(std::move(promise));
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::set_should_download(bool should_download) {
 | 
						|
  if (should_download == should_download_) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  should_download_ = should_download;
 | 
						|
  if (!should_download_) {
 | 
						|
    download_speed_.reset();
 | 
						|
    for (auto &peer : peers_) {
 | 
						|
      peer.second.download_speed.reset();
 | 
						|
    }
 | 
						|
  }
 | 
						|
  db_store_torrent();
 | 
						|
  yield();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::set_should_upload(bool should_upload) {
 | 
						|
  if (should_upload == should_upload_) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  should_upload_ = should_upload;
 | 
						|
  if (!should_upload_) {
 | 
						|
    upload_speed_.reset();
 | 
						|
    for (auto &peer : peers_) {
 | 
						|
      peer.second.upload_speed.reset();
 | 
						|
    }
 | 
						|
  }
 | 
						|
  db_store_torrent();
 | 
						|
  will_upload_at_ = td::Timestamp::now();
 | 
						|
  yield();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::load_from(td::optional<TorrentMeta> meta, std::string files_path, td::Promise<td::Unit> promise) {
 | 
						|
  auto S = [&]() -> td::Status {
 | 
						|
    if (meta) {
 | 
						|
      TorrentInfo &info = meta.value().info;
 | 
						|
      if (info.get_hash() != torrent_.get_hash()) {
 | 
						|
        return td::Status::Error("Incorrect hash in meta");
 | 
						|
      }
 | 
						|
      if (!torrent_.inited_info()) {
 | 
						|
        LOG(INFO) << "Loading torrent info for " << torrent_.get_hash().to_hex();
 | 
						|
        TRY_STATUS(torrent_.init_info(std::move(info)));
 | 
						|
        init_torrent();
 | 
						|
      }
 | 
						|
      auto &header = meta.value().header;
 | 
						|
      if (header && !torrent_.inited_header()) {
 | 
						|
        LOG(INFO) << "Loading torrent header for " << torrent_.get_hash().to_hex();
 | 
						|
        TRY_STATUS(torrent_.set_header(header.unwrap()));
 | 
						|
        init_torrent_header();
 | 
						|
      }
 | 
						|
      auto proof = std::move(meta.value().root_proof);
 | 
						|
      if (!proof.is_null()) {
 | 
						|
        LOG(INFO) << "Loading proof for " << torrent_.get_hash().to_hex();
 | 
						|
        TRY_STATUS(torrent_.add_proof(std::move(proof)));
 | 
						|
      }
 | 
						|
    }
 | 
						|
    TRY_STATUS_PREFIX(torrent_.get_fatal_error().clone(), "Fatal error: ");
 | 
						|
    if (torrent_.inited_header() && !files_path.empty()) {
 | 
						|
      torrent_.load_from_files(std::move(files_path));
 | 
						|
    }
 | 
						|
    TRY_STATUS_PREFIX(torrent_.get_fatal_error().clone(), "Fatal error: ");
 | 
						|
    return td::Status::OK();
 | 
						|
  }();
 | 
						|
  if (S.is_error()) {
 | 
						|
    LOG(WARNING) << "Load from failed: " << S;
 | 
						|
    promise.set_error(std::move(S));
 | 
						|
  } else {
 | 
						|
    promise.set_result(td::Unit());
 | 
						|
  }
 | 
						|
  if (torrent_.inited_header()) {
 | 
						|
    recheck_parts(Torrent::PartsRange{0, torrent_.get_info().pieces_count()});
 | 
						|
  }
 | 
						|
  loop();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::copy_to_new_root_dir(std::string new_root_dir, td::Promise<td::Unit> promise) {
 | 
						|
  TRY_STATUS_PROMISE(promise, torrent_.copy_to(new_root_dir));
 | 
						|
  db_store_torrent();
 | 
						|
  promise.set_result(td::Unit());
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::tear_down() {
 | 
						|
  for (auto &promise : wait_for_completion_) {
 | 
						|
    promise.set_error(td::Status::Error("Torrent closed"));
 | 
						|
  }
 | 
						|
  callback_->on_closed(std::move(torrent_));
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::loop_start_stop_peers() {
 | 
						|
  for (auto &it : peers_) {
 | 
						|
    auto &peer = it.second;
 | 
						|
    auto peer_id = it.first;
 | 
						|
 | 
						|
    if (peer.notifier.empty()) {
 | 
						|
      peer.notifier = td::actor::create_actor<Notifier>("Notifier", actor_id(this), peer_id);
 | 
						|
    }
 | 
						|
 | 
						|
    if (peer.actor.empty()) {
 | 
						|
      auto &state = peer.state = std::make_shared<PeerState>(peer.notifier.get());
 | 
						|
      state->speed_limiters_ = speed_limiters_;
 | 
						|
      if (torrent_.inited_info()) {
 | 
						|
        std::vector<td::uint32> node_ready_parts;
 | 
						|
        for (td::uint32 i = 0; i < parts_.parts.size(); i++) {
 | 
						|
          if (parts_.parts[i].ready) {
 | 
						|
            node_ready_parts.push_back(i);
 | 
						|
          }
 | 
						|
        }
 | 
						|
        state->node_ready_parts_.add_elements(std::move(node_ready_parts));
 | 
						|
        state->torrent_info_ = torrent_info_shared_;
 | 
						|
        state->torrent_info_ready_ = true;
 | 
						|
      } else {
 | 
						|
        state->torrent_info_response_callback_ = [SelfId = actor_id(this)](td::BufferSlice data) {
 | 
						|
          td::actor::send_closure(SelfId, &NodeActor::got_torrent_info_str, std::move(data));
 | 
						|
        };
 | 
						|
      }
 | 
						|
      peer.peer_token = parts_helper_.register_peer(peer_id);
 | 
						|
      peer.actor = node_callback_->create_peer(self_id_, peer_id, peer.state);
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::loop_queries() {
 | 
						|
  if (!should_download_) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  for (auto &it : peers_) {
 | 
						|
    auto peer_token = it.second.peer_token;
 | 
						|
    auto &state = it.second.state;
 | 
						|
    if (!state->peer_state_ready_) {
 | 
						|
      parts_helper_.set_peer_limit(peer_token, 0);
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    if (!state->peer_state_.load().will_upload) {
 | 
						|
      parts_helper_.set_peer_limit(peer_token, 0);
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    parts_helper_.set_peer_limit(
 | 
						|
        peer_token, td::narrow_cast<td::uint32>(MAX_PEER_TOTAL_QUERIES - state->node_queries_active_.size()));
 | 
						|
  }
 | 
						|
 | 
						|
  auto parts = parts_helper_.get_rarest_parts(MAX_TOTAL_QUERIES);
 | 
						|
  for (auto &part : parts) {
 | 
						|
    auto it = peers_.find(part.peer_id);
 | 
						|
    CHECK(it != peers_.end());
 | 
						|
    auto &state = it->second.state;
 | 
						|
    CHECK(state->peer_state_ready_);
 | 
						|
    CHECK(state->peer_state_.load().will_upload);
 | 
						|
    CHECK(state->node_queries_active_.size() < MAX_PEER_TOTAL_QUERIES);
 | 
						|
    auto part_id = part.part_id;
 | 
						|
    if (state->node_queries_active_.insert(static_cast<td::uint32>(part_id)).second) {
 | 
						|
      state->node_queries_.add_element(static_cast<td::uint32>(part_id));
 | 
						|
    }
 | 
						|
    parts_helper_.lock_part(part_id);
 | 
						|
    parts_.total_queries++;
 | 
						|
    parts_.parts[part_id].query_to_peer = part.peer_id;
 | 
						|
    state->notify_peer();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::loop_get_peers() {
 | 
						|
  if (has_get_peers_) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  if (next_get_peers_at_.is_in_past()) {
 | 
						|
    node_callback_->get_peers(self_id_, promise_send_closure(td::actor::actor_id(this), &NodeActor::got_peers));
 | 
						|
    has_get_peers_ = true;
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  alarm_timestamp().relax(next_get_peers_at_);
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::got_peers(td::Result<std::vector<PeerId>> r_peers) {
 | 
						|
  if (r_peers.is_error()) {
 | 
						|
    next_get_peers_at_ = td::Timestamp::in(GET_PEER_RETRY_TIMEOUT);
 | 
						|
  } else {
 | 
						|
    auto peers = r_peers.move_as_ok();
 | 
						|
    for (auto &peer : peers) {
 | 
						|
      if (peer == self_id_) {
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
      peers_[peer];
 | 
						|
    }
 | 
						|
    next_get_peers_at_ = td::Timestamp::in(GET_PEER_EACH);
 | 
						|
  }
 | 
						|
  has_get_peers_ = false;
 | 
						|
  loop();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::loop_peer(const PeerId &peer_id, Peer &peer) {
 | 
						|
  auto &state = peer.state;
 | 
						|
  if (!state->peer_ready_ || !torrent_.inited_info()) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  for (auto part_id : state->peer_ready_parts_.read()) {
 | 
						|
    parts_helper_.on_peer_part_ready(peer.peer_token, part_id);
 | 
						|
  }
 | 
						|
  if (state->peer_online_set_.load()) {
 | 
						|
    state->peer_online_set_ = false;
 | 
						|
    will_upload_at_ = td::Timestamp::now();
 | 
						|
    loop_will_upload();
 | 
						|
  }
 | 
						|
 | 
						|
  // Answer queries from peer
 | 
						|
  bool should_notify_peer = false;
 | 
						|
 | 
						|
  auto want_download = parts_helper_.get_want_download_count(peer.peer_token) > 0;
 | 
						|
  auto node_state = state->node_state_.load();
 | 
						|
  if (node_state.want_download != want_download) {
 | 
						|
    node_state.want_download = want_download;
 | 
						|
    state->node_state_.exchange(node_state);
 | 
						|
    should_notify_peer = true;
 | 
						|
  }
 | 
						|
 | 
						|
  std::vector<std::pair<td::uint32, td::Result<PeerState::Part>>> results;
 | 
						|
  for (td::uint32 part_id : state->peer_queries_.read()) {
 | 
						|
    should_notify_peer = true;
 | 
						|
    auto res = [&]() -> td::Result<PeerState::Part> {
 | 
						|
      if (!node_state.will_upload || !should_upload_) {
 | 
						|
        return td::Status::Error("Won't upload");
 | 
						|
      }
 | 
						|
      TRY_RESULT(proof, torrent_.get_piece_proof(part_id));
 | 
						|
      TRY_RESULT(data, torrent_.get_piece_data(part_id));
 | 
						|
      PeerState::Part res;
 | 
						|
      TRY_RESULT(proof_serialized, vm::std_boc_serialize(std::move(proof)));
 | 
						|
      res.proof = std::move(proof_serialized);
 | 
						|
      res.data = td::BufferSlice(std::move(data));
 | 
						|
      td::uint64 size = res.data.size();
 | 
						|
      upload_speed_.add(size);
 | 
						|
      peer.upload_speed.add(size);
 | 
						|
      return std::move(res);
 | 
						|
    }();
 | 
						|
    results.emplace_back(part_id, std::move(res));
 | 
						|
  }
 | 
						|
  state->peer_queries_results_.add_elements(std::move(results));
 | 
						|
 | 
						|
  // Handle results from peer
 | 
						|
  for (auto &p : state->node_queries_results_.read()) {
 | 
						|
    auto part_id = p.first;
 | 
						|
    if (!state->node_queries_active_.count(part_id)) {
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    auto r_unit = p.second.move_fmap([&](PeerState::Part part) -> td::Result<td::Unit> {
 | 
						|
      TRY_RESULT(proof, vm::std_boc_deserialize(part.proof));
 | 
						|
      TRY_STATUS(torrent_.add_piece(part_id, part.data.as_slice(), std::move(proof)));
 | 
						|
      update_pieces_in_db(part_id, part_id + 1);
 | 
						|
      download_speed_.add(part.data.size());
 | 
						|
      peer.download_speed.add(part.data.size());
 | 
						|
      return td::Unit();
 | 
						|
    });
 | 
						|
 | 
						|
    parts_.parts[part_id].query_to_peer = {};
 | 
						|
    parts_.total_queries--;
 | 
						|
    state->node_queries_active_.erase(part_id);
 | 
						|
    parts_helper_.unlock_part(part_id);
 | 
						|
 | 
						|
    if (r_unit.is_ok()) {
 | 
						|
      on_part_ready(part_id);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (!header_ready_ && torrent_.inited_info() && torrent_.inited_header()) {
 | 
						|
    init_torrent_header();
 | 
						|
  }
 | 
						|
 | 
						|
  if (should_notify_peer) {
 | 
						|
    state->notify_peer();
 | 
						|
  }
 | 
						|
 | 
						|
  yield();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::on_part_ready(PartId part_id) {
 | 
						|
  parts_helper_.on_self_part_ready(part_id);
 | 
						|
  CHECK(!parts_.parts[part_id].ready);
 | 
						|
  parts_.parts[part_id].ready = true;
 | 
						|
  for (auto &peer : peers_) {
 | 
						|
    // TODO: notify only peer want_download_count == 0
 | 
						|
    peer.second.state->notify_peer();
 | 
						|
  }
 | 
						|
  ready_parts_.push_back(part_id);
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::got_torrent_info_str(td::BufferSlice data) {
 | 
						|
  if (torrent_.inited_info()) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  auto r_info_cell = vm::std_boc_deserialize(data.as_slice());
 | 
						|
  if (r_info_cell.is_error()) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  TorrentInfo info;
 | 
						|
  vm::CellSlice cs = vm::load_cell_slice(r_info_cell.move_as_ok());
 | 
						|
  if (!info.unpack(cs)) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  info.init_cell();
 | 
						|
  if (torrent_.init_info(std::move(info)).is_error()) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  init_torrent();
 | 
						|
  loop();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::update_pieces_in_db(td::uint64 begin, td::uint64 end) {
 | 
						|
  bool changed = false;
 | 
						|
  for (auto i = begin; i < end; ++i) {
 | 
						|
    bool stored = pieces_in_db_.count(i);
 | 
						|
    bool need_store = torrent_.is_piece_in_memory(i);
 | 
						|
    if (need_store == stored) {
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    changed = true;
 | 
						|
    if (need_store) {
 | 
						|
      db_store_piece(i, torrent_.get_piece_data(i).move_as_ok());
 | 
						|
    } else {
 | 
						|
      db_erase_piece(i);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  if (changed) {
 | 
						|
    db_update_pieces_list();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::db_store_torrent() {
 | 
						|
  if (!db_) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  auto obj = create_tl_object<ton_api::storage_db_torrentV2>();
 | 
						|
  obj->active_download_ = should_download_;
 | 
						|
  obj->active_upload_ = should_upload_;
 | 
						|
  obj->root_dir_ = torrent_.get_root_dir();
 | 
						|
  obj->added_at_ = added_at_;
 | 
						|
  db_->set(create_hash_tl_object<ton_api::storage_db_key_torrent>(torrent_.get_hash()), serialize_tl_object(obj, true),
 | 
						|
           [](td::Result<td::Unit> R) {
 | 
						|
             if (R.is_error()) {
 | 
						|
               LOG(ERROR) << "Failed to save torrent to db: " << R.move_as_error();
 | 
						|
             }
 | 
						|
           });
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::db_store_priorities() {
 | 
						|
  if (!db_ || db_store_priorities_paused_) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  auto obj = create_tl_object<ton_api::storage_db_priorities>();
 | 
						|
  if (file_priority_.empty()) {
 | 
						|
    for (auto &s : pending_set_file_priority_) {
 | 
						|
      s.file.visit(td::overloaded(
 | 
						|
          [&](const PendingSetFilePriority::All &) {
 | 
						|
            obj->actions_.push_back(create_tl_object<ton_api::storage_priorityAction_all>(s.priority));
 | 
						|
          },
 | 
						|
          [&](const size_t &i) {
 | 
						|
            obj->actions_.push_back(create_tl_object<ton_api::storage_priorityAction_idx>(i, s.priority));
 | 
						|
          },
 | 
						|
          [&](const std::string &name) {
 | 
						|
            obj->actions_.push_back(create_tl_object<ton_api::storage_priorityAction_name>(name, s.priority));
 | 
						|
          }));
 | 
						|
    }
 | 
						|
  } else {
 | 
						|
    size_t prior_cnt[256];
 | 
						|
    std::fill(prior_cnt, prior_cnt + 256, 0);
 | 
						|
    for (td::uint8 p : file_priority_) {
 | 
						|
      ++prior_cnt[p];
 | 
						|
    }
 | 
						|
    auto base_priority = (td::uint8)(std::max_element(prior_cnt, prior_cnt + 256) - prior_cnt);
 | 
						|
    obj->actions_.push_back(create_tl_object<ton_api::storage_priorityAction_all>(base_priority));
 | 
						|
    for (size_t i = 0; i < file_priority_.size(); ++i) {
 | 
						|
      if (file_priority_[i] != base_priority) {
 | 
						|
        obj->actions_.push_back(create_tl_object<ton_api::storage_priorityAction_idx>(i, file_priority_[i]));
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  db_->set(create_hash_tl_object<ton_api::storage_db_key_priorities>(torrent_.get_hash()),
 | 
						|
           serialize_tl_object(obj, true), [](td::Result<td::Unit> R) {
 | 
						|
             if (R.is_error()) {
 | 
						|
               LOG(ERROR) << "Failed to save torrent priorities to db: " << R.move_as_error();
 | 
						|
             }
 | 
						|
           });
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::db_store_torrent_meta() {
 | 
						|
  if (!db_ || !torrent_.inited_info() || (td::int64)torrent_.get_ready_parts_count() == last_stored_meta_count_) {
 | 
						|
    after_db_store_torrent_meta(last_stored_meta_count_);
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  next_db_store_meta_at_ = td::Timestamp::never();
 | 
						|
  auto meta = torrent_.get_meta_str();
 | 
						|
  db_->set(create_hash_tl_object<ton_api::storage_db_key_torrentMeta>(torrent_.get_hash()), td::BufferSlice(meta),
 | 
						|
           [new_count = (td::int64)torrent_.get_ready_parts_count(), SelfId = actor_id(this)](td::Result<td::Unit> R) {
 | 
						|
             if (R.is_error()) {
 | 
						|
               td::actor::send_closure(SelfId, &NodeActor::after_db_store_torrent_meta, R.move_as_error());
 | 
						|
             } else {
 | 
						|
               td::actor::send_closure(SelfId, &NodeActor::after_db_store_torrent_meta, new_count);
 | 
						|
             }
 | 
						|
           });
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::after_db_store_torrent_meta(td::Result<td::int64> R) {
 | 
						|
  if (R.is_error()) {
 | 
						|
    LOG(ERROR) << "Failed to save torrent meta to db: " << R.move_as_error();
 | 
						|
  } else {
 | 
						|
    last_stored_meta_count_ = R.move_as_ok();
 | 
						|
  }
 | 
						|
  next_db_store_meta_at_ = td::Timestamp::in(td::Random::fast(10.0, 20.0));
 | 
						|
  alarm_timestamp().relax(next_db_store_meta_at_);
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::db_store_piece(td::uint64 i, std::string s) {
 | 
						|
  pieces_in_db_.insert(i);
 | 
						|
  if (!db_) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  db_->set(create_hash_tl_object<ton_api::storage_db_key_pieceInDb>(torrent_.get_hash(), i), td::BufferSlice(s),
 | 
						|
           [](td::Result<td::Unit> R) {
 | 
						|
             if (R.is_error()) {
 | 
						|
               LOG(ERROR) << "Failed to store piece to db: " << R.move_as_error();
 | 
						|
             }
 | 
						|
           });
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::db_erase_piece(td::uint64 i) {
 | 
						|
  pieces_in_db_.erase(i);
 | 
						|
  if (!db_) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  db_->erase(create_hash_tl_object<ton_api::storage_db_key_pieceInDb>(torrent_.get_hash(), i),
 | 
						|
             [](td::Result<td::Unit> R) {
 | 
						|
               if (R.is_error()) {
 | 
						|
                 LOG(ERROR) << "Failed to store piece to db: " << R.move_as_error();
 | 
						|
               }
 | 
						|
             });
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::db_update_pieces_list() {
 | 
						|
  if (!db_) {
 | 
						|
    return;
 | 
						|
  }
 | 
						|
  auto obj = create_tl_object<ton_api::storage_db_piecesInDb>();
 | 
						|
  for (td::uint64 p : pieces_in_db_) {
 | 
						|
    obj->pieces_.push_back(p);
 | 
						|
  }
 | 
						|
  db_->set(create_hash_tl_object<ton_api::storage_db_key_piecesInDb>(torrent_.get_hash()),
 | 
						|
           serialize_tl_object(obj, true), [](td::Result<td::Unit> R) {
 | 
						|
             if (R.is_error()) {
 | 
						|
               LOG(ERROR) << "Failed to store list of pieces to db: " << R.move_as_error();
 | 
						|
             }
 | 
						|
           });
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::load_from_db(std::shared_ptr<db::DbType> db, td::Bits256 hash, td::unique_ptr<Callback> callback,
 | 
						|
                             td::unique_ptr<NodeCallback> node_callback, SpeedLimiters speed_limiters,
 | 
						|
                             td::Promise<td::actor::ActorOwn<NodeActor>> promise) {
 | 
						|
  class Loader : public td::actor::Actor {
 | 
						|
   public:
 | 
						|
    Loader(std::shared_ptr<db::DbType> db, td::Bits256 hash, td::unique_ptr<Callback> callback,
 | 
						|
           td::unique_ptr<NodeCallback> node_callback, SpeedLimiters speed_limiters,
 | 
						|
           td::Promise<td::actor::ActorOwn<NodeActor>> promise)
 | 
						|
        : db_(std::move(db))
 | 
						|
        , hash_(hash)
 | 
						|
        , callback_(std::move(callback))
 | 
						|
        , node_callback_(std::move(node_callback))
 | 
						|
        , speed_limiters_(std::move(speed_limiters))
 | 
						|
        , promise_(std::move(promise)) {
 | 
						|
    }
 | 
						|
 | 
						|
    void finish(td::Result<td::actor::ActorOwn<NodeActor>> R) {
 | 
						|
      promise_.set_result(std::move(R));
 | 
						|
      stop();
 | 
						|
    }
 | 
						|
 | 
						|
    void start_up() override {
 | 
						|
      db::db_get<ton_api::storage_db_TorrentShort>(
 | 
						|
          *db_, create_hash_tl_object<ton_api::storage_db_key_torrent>(hash_), false,
 | 
						|
          [SelfId = actor_id(this)](td::Result<tl_object_ptr<ton_api::storage_db_TorrentShort>> R) {
 | 
						|
            if (R.is_error()) {
 | 
						|
              td::actor::send_closure(SelfId, &Loader::finish, R.move_as_error_prefix("Torrent: "));
 | 
						|
            } else {
 | 
						|
              td::actor::send_closure(SelfId, &Loader::got_torrent, R.move_as_ok());
 | 
						|
            }
 | 
						|
          });
 | 
						|
    }
 | 
						|
 | 
						|
    void got_torrent(tl_object_ptr<ton_api::storage_db_TorrentShort> obj) {
 | 
						|
      ton_api::downcast_call(*obj, td::overloaded(
 | 
						|
                                       [&](ton_api::storage_db_torrent &t) {
 | 
						|
                                         root_dir_ = std::move(t.root_dir_);
 | 
						|
                                         active_download_ = t.active_download_;
 | 
						|
                                         active_upload_ = t.active_upload_;
 | 
						|
                                         added_at_ = (td::uint32)td::Clocks::system();
 | 
						|
                                       },
 | 
						|
                                       [&](ton_api::storage_db_torrentV2 &t) {
 | 
						|
                                         root_dir_ = std::move(t.root_dir_);
 | 
						|
                                         active_download_ = t.active_download_;
 | 
						|
                                         active_upload_ = t.active_upload_;
 | 
						|
                                         added_at_ = t.added_at_;
 | 
						|
                                       }));
 | 
						|
      db_->get(create_hash_tl_object<ton_api::storage_db_key_torrentMeta>(hash_),
 | 
						|
               [SelfId = actor_id(this)](td::Result<db::DbType::GetResult> R) {
 | 
						|
                 if (R.is_error()) {
 | 
						|
                   td::actor::send_closure(SelfId, &Loader::finish, R.move_as_error_prefix("Meta: "));
 | 
						|
                   return;
 | 
						|
                 }
 | 
						|
                 auto r = R.move_as_ok();
 | 
						|
                 if (r.status == td::KeyValueReader::GetStatus::NotFound) {
 | 
						|
                   td::actor::send_closure(SelfId, &Loader::got_meta_str, td::optional<td::BufferSlice>());
 | 
						|
                 } else {
 | 
						|
                   td::actor::send_closure(SelfId, &Loader::got_meta_str, std::move(r.value));
 | 
						|
                 }
 | 
						|
               });
 | 
						|
    }
 | 
						|
 | 
						|
    void got_meta_str(td::optional<td::BufferSlice> meta_str) {
 | 
						|
      auto r_torrent = [&]() -> td::Result<Torrent> {
 | 
						|
        Torrent::Options options;
 | 
						|
        options.root_dir = std::move(root_dir_);
 | 
						|
        options.in_memory = false;
 | 
						|
        options.validate = false;
 | 
						|
        if (meta_str) {
 | 
						|
          TRY_RESULT(meta, TorrentMeta::deserialize(meta_str.value().as_slice()));
 | 
						|
          options.validate = true;
 | 
						|
          return Torrent::open(std::move(options), std::move(meta));
 | 
						|
        } else {
 | 
						|
          return Torrent::open(std::move(options), hash_);
 | 
						|
        }
 | 
						|
      }();
 | 
						|
      if (r_torrent.is_error()) {
 | 
						|
        finish(r_torrent.move_as_error());
 | 
						|
        return;
 | 
						|
      }
 | 
						|
      torrent_ = r_torrent.move_as_ok();
 | 
						|
 | 
						|
      db::db_get<ton_api::storage_db_priorities>(
 | 
						|
          *db_, create_hash_tl_object<ton_api::storage_db_key_priorities>(hash_), true,
 | 
						|
          [SelfId = actor_id(this)](td::Result<tl_object_ptr<ton_api::storage_db_priorities>> R) {
 | 
						|
            if (R.is_error()) {
 | 
						|
              td::actor::send_closure(SelfId, &Loader::finish, R.move_as_error_prefix("Priorities: "));
 | 
						|
            } else {
 | 
						|
              td::actor::send_closure(SelfId, &Loader::got_priorities, R.move_as_ok());
 | 
						|
            }
 | 
						|
          });
 | 
						|
    }
 | 
						|
 | 
						|
    void got_priorities(tl_object_ptr<ton_api::storage_db_priorities> priorities) {
 | 
						|
      if (priorities != nullptr) {
 | 
						|
        for (auto &p : priorities->actions_) {
 | 
						|
          td::Variant<PendingSetFilePriority::All, size_t, std::string> file;
 | 
						|
          int priority = 0;
 | 
						|
          ton_api::downcast_call(*p, td::overloaded(
 | 
						|
                                         [&](ton_api::storage_priorityAction_all &obj) {
 | 
						|
                                           file = PendingSetFilePriority::All();
 | 
						|
                                           priority = obj.priority_;
 | 
						|
                                         },
 | 
						|
                                         [&](ton_api::storage_priorityAction_idx &obj) {
 | 
						|
                                           file = (size_t)obj.idx_;
 | 
						|
                                           priority = obj.priority_;
 | 
						|
                                         },
 | 
						|
                                         [&](ton_api::storage_priorityAction_name &obj) {
 | 
						|
                                           file = std::move(obj.name_);
 | 
						|
                                           priority = obj.priority_;
 | 
						|
                                         }));
 | 
						|
          auto R = td::narrow_cast_safe<td::uint8>(priority);
 | 
						|
          if (R.is_error()) {
 | 
						|
            LOG(ERROR) << "Invalid priority in db: " << R.move_as_error();
 | 
						|
            continue;
 | 
						|
          }
 | 
						|
          priorities_.push_back(PendingSetFilePriority{std::move(file), R.move_as_ok()});
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      db::db_get<ton_api::storage_db_piecesInDb>(
 | 
						|
          *db_, create_hash_tl_object<ton_api::storage_db_key_piecesInDb>(hash_), true,
 | 
						|
          [SelfId = actor_id(this)](td::Result<tl_object_ptr<ton_api::storage_db_piecesInDb>> R) {
 | 
						|
            if (R.is_error()) {
 | 
						|
              td::actor::send_closure(SelfId, &Loader::finish, R.move_as_error_prefix("Pieces in db: "));
 | 
						|
            } else {
 | 
						|
              td::actor::send_closure(SelfId, &Loader::got_pieces_in_db, R.move_as_ok());
 | 
						|
            }
 | 
						|
          });
 | 
						|
    }
 | 
						|
 | 
						|
    void got_pieces_in_db(tl_object_ptr<ton_api::storage_db_piecesInDb> list) {
 | 
						|
      for (auto idx : list == nullptr ? std::vector<td::int64>() : list->pieces_) {
 | 
						|
        ++remaining_pieces_in_db_;
 | 
						|
        db_->get(create_hash_tl_object<ton_api::storage_db_key_pieceInDb>(hash_, idx),
 | 
						|
                 [SelfId = actor_id(this), idx](td::Result<db::DbType::GetResult> R) {
 | 
						|
                   if (R.is_error()) {
 | 
						|
                     td::actor::send_closure(SelfId, &Loader::finish, R.move_as_error_prefix("Piece in db: "));
 | 
						|
                     return;
 | 
						|
                   }
 | 
						|
                   auto r = R.move_as_ok();
 | 
						|
                   td::optional<td::BufferSlice> piece;
 | 
						|
                   if (r.status == td::KeyValueReader::GetStatus::Ok) {
 | 
						|
                     piece = std::move(r.value);
 | 
						|
                   }
 | 
						|
                   td::actor::send_closure(SelfId, &Loader::got_piece_in_db, idx, std::move(piece));
 | 
						|
                 });
 | 
						|
      }
 | 
						|
      if (remaining_pieces_in_db_ == 0) {
 | 
						|
        finished_db_read();
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    void got_piece_in_db(size_t idx, td::optional<td::BufferSlice> data) {
 | 
						|
      if (data) {
 | 
						|
        auto r_proof = torrent_.value().get_piece_proof(idx);
 | 
						|
        if (r_proof.is_ok()) {
 | 
						|
          torrent_.value().add_piece(idx, data.unwrap(), r_proof.move_as_ok());
 | 
						|
        }
 | 
						|
        pieces_in_db_.insert(idx);
 | 
						|
      }
 | 
						|
      if (--remaining_pieces_in_db_ == 0) {
 | 
						|
        finished_db_read();
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    void finished_db_read() {
 | 
						|
      DbInitialData data;
 | 
						|
      data.priorities = std::move(priorities_);
 | 
						|
      data.pieces_in_db = std::move(pieces_in_db_);
 | 
						|
      data.added_at = added_at_;
 | 
						|
      finish(td::actor::create_actor<NodeActor>("Node", 1, torrent_.unwrap(), std::move(callback_),
 | 
						|
                                                std::move(node_callback_), std::move(db_), std::move(speed_limiters_),
 | 
						|
                                                active_download_, active_upload_, std::move(data)));
 | 
						|
    }
 | 
						|
 | 
						|
   private:
 | 
						|
    std::shared_ptr<db::DbType> db_;
 | 
						|
    td::Bits256 hash_;
 | 
						|
    td::unique_ptr<Callback> callback_;
 | 
						|
    td::unique_ptr<NodeCallback> node_callback_;
 | 
						|
    SpeedLimiters speed_limiters_;
 | 
						|
    td::Promise<td::actor::ActorOwn<NodeActor>> promise_;
 | 
						|
 | 
						|
    std::string root_dir_;
 | 
						|
    bool active_download_{false};
 | 
						|
    bool active_upload_{false};
 | 
						|
    td::uint32 added_at_;
 | 
						|
    td::optional<Torrent> torrent_;
 | 
						|
    std::vector<PendingSetFilePriority> priorities_;
 | 
						|
    std::set<td::uint64> pieces_in_db_;
 | 
						|
    size_t remaining_pieces_in_db_ = 0;
 | 
						|
  };
 | 
						|
  td::actor::create_actor<Loader>("loader", std::move(db), hash, std::move(callback), std::move(node_callback),
 | 
						|
                                  std::move(speed_limiters), std::move(promise))
 | 
						|
      .release();
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::cleanup_db(std::shared_ptr<db::DbType> db, td::Bits256 hash, td::Promise<td::Unit> promise) {
 | 
						|
  td::MultiPromise mp;
 | 
						|
  auto ig = mp.init_guard();
 | 
						|
  ig.add_promise(std::move(promise));
 | 
						|
  db->erase(create_hash_tl_object<ton_api::storage_db_key_torrent>(hash), ig.get_promise());
 | 
						|
  db->erase(create_hash_tl_object<ton_api::storage_db_key_torrentMeta>(hash), ig.get_promise());
 | 
						|
  db->erase(create_hash_tl_object<ton_api::storage_db_key_priorities>(hash), ig.get_promise());
 | 
						|
  db::db_get<ton_api::storage_db_piecesInDb>(
 | 
						|
      *db, create_hash_tl_object<ton_api::storage_db_key_piecesInDb>(hash), true,
 | 
						|
      [db, promise = ig.get_promise(), hash](td::Result<tl_object_ptr<ton_api::storage_db_piecesInDb>> R) mutable {
 | 
						|
        if (R.is_error()) {
 | 
						|
          promise.set_error(R.move_as_error());
 | 
						|
          return;
 | 
						|
        }
 | 
						|
        auto pieces = R.move_as_ok();
 | 
						|
        if (pieces == nullptr) {
 | 
						|
          promise.set_result(td::Unit());
 | 
						|
          return;
 | 
						|
        }
 | 
						|
        td::MultiPromise mp;
 | 
						|
        auto ig = mp.init_guard();
 | 
						|
        ig.add_promise(std::move(promise));
 | 
						|
        db->erase(create_hash_tl_object<ton_api::storage_db_key_piecesInDb>(hash), ig.get_promise());
 | 
						|
        for (auto idx : pieces->pieces_) {
 | 
						|
          db->erase(create_hash_tl_object<ton_api::storage_db_key_pieceInDb>(hash, idx), ig.get_promise());
 | 
						|
        }
 | 
						|
      });
 | 
						|
}
 | 
						|
 | 
						|
void NodeActor::get_peers_info(td::Promise<tl_object_ptr<ton_api::storage_daemon_peerList>> promise) {
 | 
						|
  auto result = std::make_shared<std::vector<tl_object_ptr<ton_api::storage_daemon_peer>>>();
 | 
						|
  td::MultiPromise mp;
 | 
						|
  auto ig = mp.init_guard();
 | 
						|
  ig.add_promise([result, promise = std::move(promise), download_speed = download_speed_.speed(),
 | 
						|
                  upload_speed = upload_speed_.speed(), parts = parts_.parts.size()](td::Result<td::Unit> R) mutable {
 | 
						|
    if (R.is_error()) {
 | 
						|
      promise.set_error(R.move_as_error());
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    promise.set_result(
 | 
						|
        create_tl_object<ton_api::storage_daemon_peerList>(std::move(*result), download_speed, upload_speed, parts));
 | 
						|
  });
 | 
						|
 | 
						|
  result->reserve(peers_.size());
 | 
						|
  size_t i = 0;
 | 
						|
  for (auto &peer : peers_) {
 | 
						|
    if (!peer.second.state->peer_online_) {
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    result->push_back(create_tl_object<ton_api::storage_daemon_peer>());
 | 
						|
    auto &obj = *result->back();
 | 
						|
    obj.download_speed_ = peer.second.download_speed.speed();
 | 
						|
    obj.upload_speed_ = peer.second.upload_speed.speed();
 | 
						|
    obj.ready_parts_ = parts_helper_.get_ready_parts(peer.second.peer_token).ones_count();
 | 
						|
    node_callback_->get_peer_info(
 | 
						|
        self_id_, peer.first,
 | 
						|
        [result, i, promise = ig.get_promise()](td::Result<std::pair<td::Bits256, std::string>> R) mutable {
 | 
						|
          TRY_RESULT_PROMISE(promise, r, std::move(R));
 | 
						|
          result->at(i)->adnl_id_ = r.first;
 | 
						|
          result->at(i)->ip_str_ = r.second;
 | 
						|
          promise.set_result(td::Unit());
 | 
						|
        });
 | 
						|
    ++i;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace ton
 |