mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
TON Storage utilities (#564)
* Rename chunk to piece in MerkleTree for consistency * Refactor PeerManager * Make PeerState thread-safe * Download torrent by hash * First version of storage daemon * Download torrents partially * Improve storing and loading torrent state in DB * Rewrite MerkleTree * "Remove torrent" in storage daemon * Process errors, fix bugs in storage * Move TonlibClientWrapper from rldp-http-proxy to tonlib * Initial version of storage provider * Move interaction with contracts to smc-util * Improve TonlibClientWrapper interface * Various improvements in storage provider * Fix TorrentCreator.cpp * Improve interface for partial download * Client mode in storage-daemon * Improve interface of storage-daemon-cli * Fix calculating speed, show peers in storage-daemon * Use permanent adnl id in storage daemon * Fix sending large "storage.addUpdate" messages * Improve printing torrents in cli * Update tlo * Fix RldpSender::on_ack * Update storage provider * Add "address" parameter to get-provider-params * Allow client to close storage contract * Limit torrent description * Add more logs to storage provider * smc.forget tonlib method * Use smc.forget in storage daemon * Optimize sending messages in smc-util.cpp * Fix verbosity, remove excessive logs * Json output in storage-daemon-cli * Update storage provider contracts * Fix rldp2 acks * Change verbosity of logs in rldp2 * Update help and output of commands and in storage-daemon-cli Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
parent
434dc487a4
commit
360ef54e6b
75 changed files with 8872 additions and 1148 deletions
|
@ -38,7 +38,7 @@ ton::ton_api::object_ptr<ton::ton_api::storage_state> to_ton_api(const PeerState
|
|||
return ton::ton_api::make_object<ton::ton_api::storage_state>(state.will_upload, state.want_download);
|
||||
}
|
||||
|
||||
PeerActor::PeerActor(td::unique_ptr<Callback> callback, td::SharedState<PeerState> state)
|
||||
PeerActor::PeerActor(td::unique_ptr<Callback> callback, std::shared_ptr<PeerState> state)
|
||||
: callback_(std::move(callback)), state_(std::move(state)) {
|
||||
CHECK(callback_);
|
||||
}
|
||||
|
@ -50,7 +50,6 @@ td::uint64 PeerActor::create_and_send_query(ArgsT &&... args) {
|
|||
|
||||
td::uint64 PeerActor::send_query(td::BufferSlice query) {
|
||||
auto query_id = next_query_id_++;
|
||||
//LOG(ERROR) << "send_query " << to_string(ton::fetch_tl_object<ton::ton_api::Function>(std::move(query), true).ok());
|
||||
callback_->send_query(query_id, std::move(query));
|
||||
return query_id;
|
||||
}
|
||||
|
@ -64,8 +63,8 @@ void PeerActor::notify_node() {
|
|||
}
|
||||
|
||||
void PeerActor::execute_query(td::BufferSlice query, td::Promise<td::BufferSlice> promise) {
|
||||
on_pong();
|
||||
TRY_RESULT_PROMISE(promise, f, ton::fetch_tl_object<ton::ton_api::Function>(std::move(query), true));
|
||||
//LOG(ERROR) << "execute_query " << to_string(f);
|
||||
ton::ton_api::downcast_call(
|
||||
*f, td::overloaded(
|
||||
[&](ton::ton_api::storage_ping &ping) {
|
||||
|
@ -73,6 +72,7 @@ void PeerActor::execute_query(td::BufferSlice query, td::Promise<td::BufferSlice
|
|||
},
|
||||
[&](ton::ton_api::storage_addUpdate &add_update) { execute_add_update(add_update, std::move(promise)); },
|
||||
[&](ton::ton_api::storage_getPiece &get_piece) { execute_get_piece(get_piece, std::move(promise)); },
|
||||
[&](ton::ton_api::storage_getTorrentInfo &) { execute_get_torrent_info(std::move(promise)); },
|
||||
[&](auto &other) { promise.set_error(td::Status::Error("Unknown function")); }));
|
||||
schedule_loop();
|
||||
}
|
||||
|
@ -85,28 +85,29 @@ void PeerActor::on_ping_result(td::Result<td::BufferSlice> r_answer) {
|
|||
}
|
||||
|
||||
void PeerActor::on_pong() {
|
||||
wait_pong_till_ = td::Timestamp::in(4);
|
||||
state_.lock()->peer_online_ = true;
|
||||
wait_pong_till_ = td::Timestamp::in(10);
|
||||
state_->peer_online_ = true;
|
||||
notify_node();
|
||||
}
|
||||
|
||||
void PeerActor::on_update_result(td::Result<td::BufferSlice> r_answer) {
|
||||
update_query_id_ = {};
|
||||
if (r_answer.is_ok()) {
|
||||
peer_is_inited_ = true;
|
||||
have_pieces_list_.clear();
|
||||
if (!peer_is_inited_) {
|
||||
peer_init_offset_ += UPDATE_INIT_BLOCK_SIZE;
|
||||
if (peer_init_offset_ >= have_pieces_.as_slice().size()) {
|
||||
peer_is_inited_ = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
have_pieces_list_.insert(have_pieces_list_.end(), sent_have_pieces_list_.begin(), sent_have_pieces_list_.end());
|
||||
}
|
||||
sent_have_pieces_list_.clear();
|
||||
}
|
||||
|
||||
void PeerActor::on_get_piece_result(PartId piece_id, td::Result<td::BufferSlice> r_answer) {
|
||||
auto state = state_.lock();
|
||||
auto it = state->node_queries_.find(piece_id);
|
||||
if (it == state->node_queries_.end()) {
|
||||
LOG(ERROR) << "???";
|
||||
return;
|
||||
}
|
||||
//TODO: handle errors ???
|
||||
it->second = [&]() -> td::Result<PeerState::Part> {
|
||||
auto res = [&]() -> td::Result<PeerState::Part> {
|
||||
TRY_RESULT(slice, std::move(r_answer));
|
||||
TRY_RESULT(piece, ton::fetch_result<ton::ton_api::storage_getPiece>(slice.as_slice()));
|
||||
PeerState::Part res;
|
||||
|
@ -114,6 +115,7 @@ void PeerActor::on_get_piece_result(PartId piece_id, td::Result<td::BufferSlice>
|
|||
res.proof = std::move(piece->proof_);
|
||||
return std::move(res);
|
||||
}();
|
||||
state_->node_queries_results_.add_element(std::make_pair(piece_id, std::move(res)));
|
||||
notify_node();
|
||||
}
|
||||
|
||||
|
@ -123,10 +125,26 @@ void PeerActor::on_update_state_result(td::Result<td::BufferSlice> r_answer) {
|
|||
}
|
||||
}
|
||||
|
||||
void PeerActor::on_get_info_result(td::Result<td::BufferSlice> r_answer) {
|
||||
get_info_query_id_ = {};
|
||||
next_get_info_at_ = td::Timestamp::in(5.0);
|
||||
alarm_timestamp().relax(next_get_info_at_);
|
||||
if (r_answer.is_error()) {
|
||||
return;
|
||||
}
|
||||
auto R = fetch_tl_object<ton::ton_api::storage_torrentInfo>(r_answer.move_as_ok(), true);
|
||||
if (R.is_error()) {
|
||||
return;
|
||||
}
|
||||
td::BufferSlice data = std::move(R.ok_ref()->data_);
|
||||
if (!data.empty() && !state_->torrent_info_ready_) {
|
||||
state_->torrent_info_response_callback_(std::move(data));
|
||||
}
|
||||
}
|
||||
|
||||
void PeerActor::on_query_result(td::uint64 query_id, td::Result<td::BufferSlice> r_answer) {
|
||||
if (r_answer.is_ok()) {
|
||||
on_pong();
|
||||
state_.lock()->download.add(r_answer.ok().size(), td::Timestamp::now());
|
||||
}
|
||||
if (ping_query_id_ && ping_query_id_.value() == query_id) {
|
||||
on_ping_result(std::move(r_answer));
|
||||
|
@ -134,11 +152,14 @@ void PeerActor::on_query_result(td::uint64 query_id, td::Result<td::BufferSlice>
|
|||
on_update_result(std::move(r_answer));
|
||||
} else if (update_state_query_.query_id && update_state_query_.query_id.value() == query_id) {
|
||||
on_update_state_result(std::move(r_answer));
|
||||
} else if (get_info_query_id_ && get_info_query_id_.value() == query_id) {
|
||||
on_get_info_result(std::move(r_answer));
|
||||
} else {
|
||||
for (auto &query_it : node_get_piece_) {
|
||||
if (query_it.second.query_id && query_it.second.query_id.value() == query_id) {
|
||||
on_get_piece_result(query_it.first, std::move(r_answer));
|
||||
query_it.second.query_id = {};
|
||||
node_get_piece_.erase(query_it.first);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -151,8 +172,8 @@ void PeerActor::start_up() {
|
|||
|
||||
node_session_id_ = td::Random::secure_uint64();
|
||||
|
||||
auto state = state_.lock();
|
||||
state->peer = actor_id(this);
|
||||
state_->peer = actor_id(this);
|
||||
state_->peer_ready_ = true;
|
||||
|
||||
notify_node();
|
||||
schedule_loop();
|
||||
|
@ -165,6 +186,7 @@ void PeerActor::loop() {
|
|||
loop_update_init();
|
||||
loop_update_state();
|
||||
loop_update_pieces();
|
||||
loop_get_torrent_info();
|
||||
|
||||
loop_node_get_piece();
|
||||
loop_peer_get_piece();
|
||||
|
@ -175,8 +197,8 @@ void PeerActor::loop() {
|
|||
void PeerActor::loop_pong() {
|
||||
if (wait_pong_till_ && wait_pong_till_.is_in_past()) {
|
||||
wait_pong_till_ = {};
|
||||
LOG(INFO) << "Disconnected";
|
||||
state_.lock()->peer_online_ = false;
|
||||
LOG(DEBUG) << "Disconnected from peer";
|
||||
state_->peer_online_ = false;
|
||||
notify_node();
|
||||
}
|
||||
alarm_timestamp().relax(wait_pong_till_);
|
||||
|
@ -203,25 +225,24 @@ td::BufferSlice PeerActor::create_update_query(ton::tl_object_ptr<ton::ton_api::
|
|||
}
|
||||
|
||||
void PeerActor::loop_update_init() {
|
||||
if (!peer_session_id_) {
|
||||
return;
|
||||
}
|
||||
if (update_query_id_) {
|
||||
return;
|
||||
}
|
||||
if (peer_is_inited_) {
|
||||
if (!peer_session_id_ || update_query_id_ || peer_is_inited_) {
|
||||
return;
|
||||
}
|
||||
|
||||
update_have_pieces();
|
||||
have_pieces_list_.clear();
|
||||
|
||||
auto state = state_.lock();
|
||||
auto node_state = state_->node_state_.load();
|
||||
auto s = have_pieces_.as_slice();
|
||||
if (s.size() <= peer_init_offset_) {
|
||||
peer_is_inited_ = true;
|
||||
return;
|
||||
}
|
||||
s = s.substr(peer_init_offset_, UPDATE_INIT_BLOCK_SIZE);
|
||||
auto query = create_update_query(ton::create_tl_object<ton::ton_api::storage_updateInit>(
|
||||
td::BufferSlice(have_pieces_.as_slice()), to_ton_api(state->node_state_)));
|
||||
td::BufferSlice(s), (int)peer_init_offset_, to_ton_api(node_state)));
|
||||
|
||||
// take care about update_state_query initial state
|
||||
update_state_query_.state = state->node_state_;
|
||||
update_state_query_.state = node_state;
|
||||
update_state_query_.query_id = 0;
|
||||
|
||||
update_query_id_ = send_query(std::move(query));
|
||||
|
@ -232,9 +253,9 @@ void PeerActor::loop_update_state() {
|
|||
return;
|
||||
}
|
||||
|
||||
auto state = state_.lock();
|
||||
if (!(update_state_query_.state == state->node_state_)) {
|
||||
update_state_query_.state = state->node_state_;
|
||||
auto node_state = state_->node_state_.load();
|
||||
if (!(update_state_query_.state == node_state)) {
|
||||
update_state_query_.state = node_state;
|
||||
update_state_query_.query_id = {};
|
||||
}
|
||||
|
||||
|
@ -248,49 +269,45 @@ void PeerActor::loop_update_state() {
|
|||
}
|
||||
|
||||
void PeerActor::update_have_pieces() {
|
||||
auto state = state_.lock();
|
||||
have_pieces_list_.insert(have_pieces_list_.end(), state->node_ready_parts_.begin(), state->node_ready_parts_.end());
|
||||
for (auto piece_id : state->node_ready_parts_) {
|
||||
auto node_ready_parts = state_->node_ready_parts_.read();
|
||||
for (auto piece_id : node_ready_parts) {
|
||||
if (piece_id < peer_init_offset_ + UPDATE_INIT_BLOCK_SIZE) {
|
||||
have_pieces_list_.push_back(piece_id);
|
||||
}
|
||||
have_pieces_.set_one(piece_id);
|
||||
}
|
||||
state->node_ready_parts_.clear();
|
||||
}
|
||||
|
||||
void PeerActor::loop_update_pieces() {
|
||||
if (update_query_id_) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!peer_is_inited_) {
|
||||
if (update_query_id_ || !peer_is_inited_) {
|
||||
return;
|
||||
}
|
||||
|
||||
update_have_pieces();
|
||||
|
||||
if (!have_pieces_list_.empty()) {
|
||||
size_t count = std::min<size_t>(have_pieces_list_.size(), 1500);
|
||||
sent_have_pieces_list_.assign(have_pieces_list_.end() - count, have_pieces_list_.end());
|
||||
have_pieces_list_.erase(have_pieces_list_.end() - count, have_pieces_list_.end());
|
||||
auto query = create_update_query(ton::create_tl_object<ton::ton_api::storage_updateHavePieces>(
|
||||
td::transform(have_pieces_list_, [](auto x) { return static_cast<td::int32>(x); })));
|
||||
td::transform(sent_have_pieces_list_, [](auto x) { return static_cast<td::int32>(x); })));
|
||||
update_query_id_ = send_query(std::move(query));
|
||||
}
|
||||
}
|
||||
|
||||
void PeerActor::loop_node_get_piece() {
|
||||
auto state = state_.lock();
|
||||
|
||||
for (auto it = node_get_piece_.begin(); it != node_get_piece_.end();) {
|
||||
auto other_it = state->node_queries_.find(it->first);
|
||||
if (other_it == state->node_queries_.end() || other_it->second) {
|
||||
it = node_get_piece_.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
void PeerActor::loop_get_torrent_info() {
|
||||
if (get_info_query_id_ || state_->torrent_info_ready_) {
|
||||
return;
|
||||
}
|
||||
if (next_get_info_at_ && !next_get_info_at_.is_in_past()) {
|
||||
return;
|
||||
}
|
||||
get_info_query_id_ = create_and_send_query<ton::ton_api::storage_getTorrentInfo>();
|
||||
}
|
||||
|
||||
for (auto &query_it : state->node_queries_) {
|
||||
if (query_it.second) {
|
||||
continue;
|
||||
}
|
||||
node_get_piece_.emplace(query_it.first, NodePieceQuery{});
|
||||
void PeerActor::loop_node_get_piece() {
|
||||
for (auto part : state_->node_queries_.read()) {
|
||||
node_get_piece_.emplace(part, NodePieceQuery{});
|
||||
}
|
||||
|
||||
for (auto &query_it : node_get_piece_) {
|
||||
|
@ -304,40 +321,29 @@ void PeerActor::loop_node_get_piece() {
|
|||
}
|
||||
|
||||
void PeerActor::loop_peer_get_piece() {
|
||||
auto state = state_.lock();
|
||||
|
||||
// process answers
|
||||
for (auto &it : state->peer_queries_) {
|
||||
if (!it.second) {
|
||||
continue;
|
||||
}
|
||||
auto promise_it = peer_get_piece_.find(it.first);
|
||||
for (auto &p : state_->peer_queries_results_.read()) {
|
||||
state_->peer_queries_active_.erase(p.first);
|
||||
auto promise_it = peer_get_piece_.find(p.first);
|
||||
if (promise_it == peer_get_piece_.end()) {
|
||||
continue;
|
||||
}
|
||||
promise_it->second.promise.set_result(it.second.unwrap().move_map([](PeerState::Part part) {
|
||||
promise_it->second.promise.set_result(p.second.move_map([](PeerState::Part part) {
|
||||
return ton::create_serialize_tl_object<ton::ton_api::storage_piece>(std::move(part.proof), std::move(part.data));
|
||||
}));
|
||||
peer_get_piece_.erase(promise_it);
|
||||
}
|
||||
|
||||
// erase unneeded queries
|
||||
for (auto it = state->peer_queries_.begin(); it != state->peer_queries_.end();) {
|
||||
if (peer_get_piece_.count(it->first) == 0) {
|
||||
it = state->peer_queries_.erase(it);
|
||||
notify_node();
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
notify_node();
|
||||
}
|
||||
|
||||
// create queries
|
||||
std::vector<td::uint32> new_peer_queries;
|
||||
for (auto &query_it : peer_get_piece_) {
|
||||
auto res = state->peer_queries_.emplace(query_it.first, td::optional<td::Result<PeerState::Part>>());
|
||||
if (res.second) {
|
||||
if (state_->peer_queries_active_.insert(query_it.first).second) {
|
||||
new_peer_queries.push_back(query_it.first);
|
||||
notify_node();
|
||||
}
|
||||
}
|
||||
state_->peer_queries_.add_elements(std::move(new_peer_queries));
|
||||
}
|
||||
|
||||
void PeerActor::loop_notify_node() {
|
||||
|
@ -345,13 +351,14 @@ void PeerActor::loop_notify_node() {
|
|||
return;
|
||||
}
|
||||
need_notify_node_ = false;
|
||||
state_.lock()->notify_node();
|
||||
state_->notify_node();
|
||||
}
|
||||
|
||||
void PeerActor::execute_ping(td::uint64 session_id, td::Promise<td::BufferSlice> promise) {
|
||||
if (!peer_session_id_ || peer_session_id_.value() != session_id) {
|
||||
peer_session_id_ = session_id;
|
||||
peer_is_inited_ = false;
|
||||
peer_init_offset_ = 0;
|
||||
|
||||
update_query_id_ = {};
|
||||
update_state_query_.query_id = {};
|
||||
|
@ -369,11 +376,11 @@ void PeerActor::execute_add_update(ton::ton_api::storage_addUpdate &add_update,
|
|||
|
||||
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::storage_ok>());
|
||||
|
||||
auto state = state_.lock();
|
||||
std::vector<td::uint32> new_peer_ready_parts;
|
||||
auto add_piece = [&](PartId id) {
|
||||
if (!peer_have_pieces_.get(id)) {
|
||||
peer_have_pieces_.set_one(id);
|
||||
state->peer_ready_parts_.push_back(id);
|
||||
new_peer_ready_parts.push_back(id);
|
||||
notify_node();
|
||||
}
|
||||
};
|
||||
|
@ -383,15 +390,15 @@ void PeerActor::execute_add_update(ton::ton_api::storage_addUpdate &add_update,
|
|||
if (peer_seqno_ >= seqno) {
|
||||
return;
|
||||
}
|
||||
if (state->peer_state_ && state->peer_state_.value() == peer_state) {
|
||||
if (state_->peer_state_ready_ && state_->peer_state_.load() == peer_state) {
|
||||
return;
|
||||
}
|
||||
peer_seqno_ = seqno;
|
||||
state->peer_state_ = peer_state;
|
||||
state_->peer_state_.exchange(peer_state);
|
||||
state_->peer_state_ready_ = true;
|
||||
notify_node();
|
||||
};
|
||||
|
||||
//LOG(ERROR) << "Got " << to_string(add_update);
|
||||
downcast_call(*add_update.update_,
|
||||
td::overloaded(
|
||||
[&](ton::ton_api::storage_updateHavePieces &have_pieces) {
|
||||
|
@ -404,16 +411,24 @@ void PeerActor::execute_add_update(ton::ton_api::storage_addUpdate &add_update,
|
|||
update_peer_state(from_ton_api(*init.state_));
|
||||
td::Bitset new_bitset;
|
||||
new_bitset.set_raw(init.have_pieces_.as_slice().str());
|
||||
size_t offset = init.have_pieces_offset_ * 8;
|
||||
for (auto size = new_bitset.size(), i = size_t(0); i < size; i++) {
|
||||
if (new_bitset.get(i)) {
|
||||
add_piece(static_cast<PartId>(i));
|
||||
add_piece(static_cast<PartId>(offset + i));
|
||||
}
|
||||
}
|
||||
}));
|
||||
state_->peer_ready_parts_.add_elements(std::move(new_peer_ready_parts));
|
||||
}
|
||||
|
||||
void PeerActor::execute_get_piece(ton::ton_api::storage_getPiece &get_piece, td::Promise<td::BufferSlice> promise) {
|
||||
PartId piece_id = get_piece.piece_id_;
|
||||
peer_get_piece_[piece_id] = {std::move(promise)};
|
||||
}
|
||||
|
||||
void PeerActor::execute_get_torrent_info(td::Promise<td::BufferSlice> promise) {
|
||||
td::BufferSlice result = create_serialize_tl_object<ton_api::storage_torrentInfo>(
|
||||
state_->torrent_info_ready_ ? state_->torrent_info_str_->clone() : td::BufferSlice());
|
||||
promise.set_result(std::move(result));
|
||||
}
|
||||
} // namespace ton
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue