1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-12 11:12:16 +00:00
ton/rldp2/RldpConnection.cpp

382 lines
12 KiB
C++
Raw Permalink Normal View History

2020-07-06 14:07:20 +00:00
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
Copyright 2017-2020 Telegram Systems LLP
*/
#include "RldpConnection.h"
TON Storage utilities (#564) * Rename chunk to piece in MerkleTree for consistency * Refactor PeerManager * Make PeerState thread-safe * Download torrent by hash * First version of storage daemon * Download torrents partially * Improve storing and loading torrent state in DB * Rewrite MerkleTree * "Remove torrent" in storage daemon * Process errors, fix bugs in storage * Move TonlibClientWrapper from rldp-http-proxy to tonlib * Initial version of storage provider * Move interaction with contracts to smc-util * Improve TonlibClientWrapper interface * Various improvements in storage provider * Fix TorrentCreator.cpp * Improve interface for partial download * Client mode in storage-daemon * Improve interface of storage-daemon-cli * Fix calculating speed, show peers in storage-daemon * Use permanent adnl id in storage daemon * Fix sending large "storage.addUpdate" messages * Improve printing torrents in cli * Update tlo * Fix RldpSender::on_ack * Update storage provider * Add "address" parameter to get-provider-params * Allow client to close storage contract * Limit torrent description * Add more logs to storage provider * smc.forget tonlib method * Use smc.forget in storage daemon * Optimize sending messages in smc-util.cpp * Fix verbosity, remove excessive logs * Json output in storage-daemon-cli * Update storage provider contracts * Fix rldp2 acks * Change verbosity of logs in rldp2 * Update help and output of commands and in storage-daemon-cli Co-authored-by: SpyCheese <mikle98@yandex.ru>
2022-12-22 09:24:13 +00:00
#include "rldp.hpp"
#include "td/utils/overloaded.h"
#include "td/utils/Random.h"
#include "td/utils/tl_helpers.h"
#include "tl-utils/tl-utils.hpp"
#include "auto/tl/ton_api.h"
#include "auto/tl/ton_api.hpp"
#include "common/errorcode.h"
#include "td/actor//actor.h"
namespace ton {
namespace rldp2 {
void RldpConnection::add_limit(td::Timestamp timeout, Limit limit) {
CHECK(timeout);
auto p = limits_set_.insert(limit);
LOG_CHECK(p.second) << limit.transfer_id.to_hex();
limits_heap_.insert(timeout.at(), const_cast<Limit *>(&*p.first));
}
td::Timestamp RldpConnection::next_limit_expires_at() {
if (limits_heap_.empty()) {
return td::Timestamp::never();
}
return td::Timestamp::at(limits_heap_.top_key());
}
void RldpConnection::drop_limits(TransferId id) {
Limit limit;
limit.transfer_id = id;
auto it = limits_set_.find(limit);
if (it == limits_set_.end()) {
return;
}
limits_heap_.erase(const_cast<td::HeapNode *>(static_cast<const td::HeapNode *>(&*it)));
limits_set_.erase(it);
}
void RldpConnection::on_inbound_completed(TransferId transfer_id, td::Timestamp now) {
inbound_transfers_.erase(transfer_id);
completed_set_.insert(transfer_id);
completed_queue_.push(CompletedId{transfer_id, now.in(20)});
while (completed_queue_.size() > 128 && completed_queue_.front().timeout.is_in_past(now)) {
completed_set_.erase(completed_queue_.pop().transfer_id);
}
}
td::Timestamp RldpConnection::loop_limits(td::Timestamp now) {
while (!limits_heap_.empty() && td::Timestamp::at(limits_heap_.top_key()).is_in_past(now)) {
auto *limit = static_cast<Limit *>(limits_heap_.pop());
auto error = td::Status::Error(ErrorCode::timeout, "timeout");
if (limit->is_inbound) {
on_inbound_completed(limit->transfer_id, now);
to_receive_.emplace_back(limit->transfer_id, std::move(error));
} else {
auto it = outbound_transfers_.find(limit->transfer_id);
if (it != outbound_transfers_.end()) {
for (auto &part : it->second.parts(RldpSender::Config{})) {
in_flight_count_ -= part.second.sender.get_inflight_symbols_count();
}
outbound_transfers_.erase(it);
to_on_sent_.emplace_back(limit->transfer_id, std::move(error));
} else {
TON Storage utilities (#564) * Rename chunk to piece in MerkleTree for consistency * Refactor PeerManager * Make PeerState thread-safe * Download torrent by hash * First version of storage daemon * Download torrents partially * Improve storing and loading torrent state in DB * Rewrite MerkleTree * "Remove torrent" in storage daemon * Process errors, fix bugs in storage * Move TonlibClientWrapper from rldp-http-proxy to tonlib * Initial version of storage provider * Move interaction with contracts to smc-util * Improve TonlibClientWrapper interface * Various improvements in storage provider * Fix TorrentCreator.cpp * Improve interface for partial download * Client mode in storage-daemon * Improve interface of storage-daemon-cli * Fix calculating speed, show peers in storage-daemon * Use permanent adnl id in storage daemon * Fix sending large "storage.addUpdate" messages * Improve printing torrents in cli * Update tlo * Fix RldpSender::on_ack * Update storage provider * Add "address" parameter to get-provider-params * Allow client to close storage contract * Limit torrent description * Add more logs to storage provider * smc.forget tonlib method * Use smc.forget in storage daemon * Optimize sending messages in smc-util.cpp * Fix verbosity, remove excessive logs * Json output in storage-daemon-cli * Update storage provider contracts * Fix rldp2 acks * Change verbosity of logs in rldp2 * Update help and output of commands and in storage-daemon-cli Co-authored-by: SpyCheese <mikle98@yandex.ru>
2022-12-22 09:24:13 +00:00
VLOG(RLDP_WARNING) << "Timeout on unknown transfer " << limit->transfer_id.to_hex();
}
}
limits_set_.erase(*limit);
}
return next_limit_expires_at();
}
void RldpConnection::set_receive_limits(TransferId transfer_id, td::Timestamp timeout, td::uint64 max_size) {
CHECK(timeout);
Limit limit;
limit.transfer_id = transfer_id;
limit.max_size = max_size;
limit.is_inbound = true;
add_limit(timeout, limit);
}
RldpConnection::RldpConnection() {
bdw_stats_.on_update(td::Timestamp::now(), 0);
rtt_stats_.windowed_min_rtt = 0.5;
bdw_stats_.windowed_max_bdw = 10;
}
void RldpConnection::send(TransferId transfer_id, td::BufferSlice data, td::Timestamp timeout) {
if (transfer_id.is_zero()) {
td::Random::secure_bytes(transfer_id.as_slice());
} else {
if (outbound_transfers_.find(transfer_id) != outbound_transfers_.end()) {
TON Storage utilities (#564) * Rename chunk to piece in MerkleTree for consistency * Refactor PeerManager * Make PeerState thread-safe * Download torrent by hash * First version of storage daemon * Download torrents partially * Improve storing and loading torrent state in DB * Rewrite MerkleTree * "Remove torrent" in storage daemon * Process errors, fix bugs in storage * Move TonlibClientWrapper from rldp-http-proxy to tonlib * Initial version of storage provider * Move interaction with contracts to smc-util * Improve TonlibClientWrapper interface * Various improvements in storage provider * Fix TorrentCreator.cpp * Improve interface for partial download * Client mode in storage-daemon * Improve interface of storage-daemon-cli * Fix calculating speed, show peers in storage-daemon * Use permanent adnl id in storage daemon * Fix sending large "storage.addUpdate" messages * Improve printing torrents in cli * Update tlo * Fix RldpSender::on_ack * Update storage provider * Add "address" parameter to get-provider-params * Allow client to close storage contract * Limit torrent description * Add more logs to storage provider * smc.forget tonlib method * Use smc.forget in storage daemon * Optimize sending messages in smc-util.cpp * Fix verbosity, remove excessive logs * Json output in storage-daemon-cli * Update storage provider contracts * Fix rldp2 acks * Change verbosity of logs in rldp2 * Update help and output of commands and in storage-daemon-cli Co-authored-by: SpyCheese <mikle98@yandex.ru>
2022-12-22 09:24:13 +00:00
VLOG(RLDP_WARNING) << "Skip resend of " << transfer_id.to_hex();
return;
}
}
if (timeout) {
Limit limit;
limit.transfer_id = transfer_id;
limit.max_size = 0;
limit.is_inbound = false;
add_limit(timeout, limit);
}
outbound_transfers_.emplace(transfer_id, OutboundTransfer{std::move(data)});
}
void RldpConnection::receive_raw(td::BufferSlice packet) {
auto F = ton::fetch_tl_object<ton::ton_api::rldp2_MessagePart>(std::move(packet), true);
if (F.is_error()) {
return;
}
downcast_call(*F.move_as_ok(), [&](auto &obj) { this->receive_raw_obj(obj); });
}
void RldpConnection::loop_bbr(td::Timestamp now) {
bbr_.step(rtt_stats_, bdw_stats_, in_flight_count_, td::Timestamp::now());
//LOG(ERROR) << td::format::as_time(rtt_stats_.windowed_min_rtt) << " "
//<< td::format::as_size((td::int64)bdw_stats_.windowed_max_bdw * 768) << " " << rtt_stats_.rtt_round;
double speed = bbr_.get_rate();
td::uint32 congestion_window = bbr_.get_window_size();
pacer_.set_speed(speed);
congestion_window_ = congestion_window;
}
td::Timestamp RldpConnection::run(ConnectionCallback &callback) {
auto now = td::Timestamp::now();
loop_bbr(now);
td::Timestamp alarm_timestamp;
td::VectorQueue<std::pair<const TransferId, OutboundTransfer> *> queue;
for (auto &outbound : outbound_transfers_) {
queue.push(&outbound);
}
while (!queue.empty()) {
auto outbound = queue.pop();
auto o_timeout = step(outbound->first, outbound->second, now);
if (o_timeout) {
alarm_timestamp.relax(o_timeout.unwrap());
} else {
queue.push(outbound);
}
}
if (in_flight_count_ > congestion_window_) {
bdw_stats_.on_pause(now);
}
if (in_flight_count_ == 0) {
bdw_stats_.on_pause(now);
}
for (auto &inbound : inbound_transfers_) {
alarm_timestamp.relax(run(inbound.first, inbound.second));
}
alarm_timestamp.relax(loop_limits(td::Timestamp::now()));
for (auto &data : to_receive_) {
callback.receive(data.first, std::move(data.second));
}
for (auto &raw : to_send_raw_) {
callback.send_raw(std::move(raw));
}
to_send_raw_.clear();
to_receive_.clear();
for (auto &res : to_on_sent_) {
callback.on_sent(res.first, std::move(res.second));
}
to_on_sent_.clear();
return alarm_timestamp;
}
td::Timestamp RldpConnection::run(const TransferId &transfer_id, InboundTransfer &inbound) {
td::Timestamp wakeup_at;
bool has_actions = true;
while (has_actions) {
has_actions = false;
for (auto &it : inbound.parts()) {
auto &inbound = it.second;
inbound.receiver.next_action(td::Timestamp::now())
.visit(td::overloaded([&](const RldpReceiver::ActionWait &wait) { wakeup_at.relax(wait.wait_till); },
[&](const RldpReceiver::ActionSendAck &send) {
send_packet(ton::create_serialize_tl_object<ton::ton_api::rldp2_confirm>(
transfer_id, it.first, send.ack.max_seqno, send.ack.received_mask,
send.ack.received_count));
inbound.receiver.on_ack_sent(td::Timestamp::now());
has_actions = true;
}));
}
}
return wakeup_at;
}
td::optional<td::Timestamp> RldpConnection::step(const TransferId &transfer_id, OutboundTransfer &outbound,
td::Timestamp now) {
bool only_probe = in_flight_count_ > congestion_window_;
td::Timestamp wakeup_at;
if (!pacer_.wakeup_at().is_in_past(now)) {
wakeup_at = pacer_.wakeup_at();
only_probe = true;
}
for (auto &it : outbound.parts(RldpSender::Config{})) {
auto &part = it.second;
Guard guard(in_flight_count_, part.sender);
auto action = part.sender.next_action(now, only_probe);
bool was_send = false;
action.visit(td::overloaded(
[&](const RldpSender::ActionSend &send) {
auto seqno = send.seqno - 1;
if (part.encoder->get_info().ready_symbol_count <= seqno) {
part.encoder->prepare_more_symbols();
}
auto symbol = part.encoder->gen_symbol(seqno).data;
send_packet(ton::create_serialize_tl_object<ton::ton_api::rldp2_messagePart>(
transfer_id, part.fec_type.tl(), it.first, outbound.total_size(), seqno, std::move(symbol)));
if (!send.is_probe) {
pacer_.send(1, now);
}
part.sender.on_send(send.seqno, now, send.is_probe, rtt_stats_, bdw_stats_);
if (send.is_probe) {
//LOG(ERROR) << "PROBE " << it.first << " " << send.seqno;
}
//LOG(ERROR) << "SEND";
was_send = true;
},
[&](const RldpSender::ActionWait &wait) {
//LOG(ERROR) << "WAIT";
wakeup_at.relax(wait.wait_till);
}));
if (was_send) {
return {};
}
}
return wakeup_at;
}
void RldpConnection::receive_raw_obj(ton::ton_api::rldp2_messagePart &part) {
if (completed_set_.count(part.transfer_id_) > 0) {
send_packet(ton::create_serialize_tl_object<ton::ton_api::rldp2_complete>(part.transfer_id_, part.part_));
return;
}
auto r_total_size = td::narrow_cast_safe<std::size_t>(part.total_size_);
if (r_total_size.is_error()) {
return;
}
auto r_fec_type = ton::fec::FecType::create(std::move(part.fec_type_));
if (r_fec_type.is_error()) {
return;
}
auto total_size = r_total_size.move_as_ok();
auto transfer_id = part.transfer_id_;
// check total_size limits
td::uint64 max_size = default_mtu();
Limit key;
key.transfer_id = transfer_id;
auto limit_it = limits_set_.find(key);
bool has_limit = limit_it != limits_set_.end();
if (has_limit && limit_it->max_size != 0) {
max_size = limit_it->max_size;
}
if (total_size > max_size) {
TON Storage utilities (#564) * Rename chunk to piece in MerkleTree for consistency * Refactor PeerManager * Make PeerState thread-safe * Download torrent by hash * First version of storage daemon * Download torrents partially * Improve storing and loading torrent state in DB * Rewrite MerkleTree * "Remove torrent" in storage daemon * Process errors, fix bugs in storage * Move TonlibClientWrapper from rldp-http-proxy to tonlib * Initial version of storage provider * Move interaction with contracts to smc-util * Improve TonlibClientWrapper interface * Various improvements in storage provider * Fix TorrentCreator.cpp * Improve interface for partial download * Client mode in storage-daemon * Improve interface of storage-daemon-cli * Fix calculating speed, show peers in storage-daemon * Use permanent adnl id in storage daemon * Fix sending large "storage.addUpdate" messages * Improve printing torrents in cli * Update tlo * Fix RldpSender::on_ack * Update storage provider * Add "address" parameter to get-provider-params * Allow client to close storage contract * Limit torrent description * Add more logs to storage provider * smc.forget tonlib method * Use smc.forget in storage daemon * Optimize sending messages in smc-util.cpp * Fix verbosity, remove excessive logs * Json output in storage-daemon-cli * Update storage provider contracts * Fix rldp2 acks * Change verbosity of logs in rldp2 * Update help and output of commands and in storage-daemon-cli Co-authored-by: SpyCheese <mikle98@yandex.ru>
2022-12-22 09:24:13 +00:00
VLOG(RLDP_INFO) << "Drop too big rldp query " << part.total_size_ << " > " << max_size;
return;
}
auto it = inbound_transfers_.find(transfer_id);
if (it == inbound_transfers_.end()) {
if (!has_limit) {
// set timeout even for small inbound queries
// TODO: other party stil may ddos us with small transfers
set_receive_limits(transfer_id, td::Timestamp::in(10), max_size);
}
it = inbound_transfers_.emplace(transfer_id, InboundTransfer{total_size}).first;
}
auto &inbound = it->second;
auto o_res = [&]() -> td::optional<td::Result<td::BufferSlice>> {
TRY_RESULT(in_part, inbound.get_part(part.part_, r_fec_type.move_as_ok()));
if (!in_part) {
if (inbound.is_part_completed(part.part_)) {
send_packet(ton::create_serialize_tl_object<ton::ton_api::rldp2_complete>(transfer_id, part.part_));
}
return {};
}
TON Storage utilities (#564) * Rename chunk to piece in MerkleTree for consistency * Refactor PeerManager * Make PeerState thread-safe * Download torrent by hash * First version of storage daemon * Download torrents partially * Improve storing and loading torrent state in DB * Rewrite MerkleTree * "Remove torrent" in storage daemon * Process errors, fix bugs in storage * Move TonlibClientWrapper from rldp-http-proxy to tonlib * Initial version of storage provider * Move interaction with contracts to smc-util * Improve TonlibClientWrapper interface * Various improvements in storage provider * Fix TorrentCreator.cpp * Improve interface for partial download * Client mode in storage-daemon * Improve interface of storage-daemon-cli * Fix calculating speed, show peers in storage-daemon * Use permanent adnl id in storage daemon * Fix sending large "storage.addUpdate" messages * Improve printing torrents in cli * Update tlo * Fix RldpSender::on_ack * Update storage provider * Add "address" parameter to get-provider-params * Allow client to close storage contract * Limit torrent description * Add more logs to storage provider * smc.forget tonlib method * Use smc.forget in storage daemon * Optimize sending messages in smc-util.cpp * Fix verbosity, remove excessive logs * Json output in storage-daemon-cli * Update storage provider contracts * Fix rldp2 acks * Change verbosity of logs in rldp2 * Update help and output of commands and in storage-daemon-cli Co-authored-by: SpyCheese <mikle98@yandex.ru>
2022-12-22 09:24:13 +00:00
if (in_part->receiver.on_received(part.seqno_ + 1, td::Timestamp::now())) {
TRY_STATUS_PREFIX(in_part->decoder->add_symbol({static_cast<td::uint32>(part.seqno_), std::move(part.data_)}),
td::Status::Error(ErrorCode::protoviolation, "invalid symbol"));
if (in_part->decoder->may_try_decode()) {
auto r_data = in_part->decoder->try_decode(false);
if (r_data.is_ok()) {
inbound.finish_part(part.part_, r_data.move_as_ok().data);
}
}
}
return inbound.try_finish();
}();
if (o_res) {
drop_limits(transfer_id);
on_inbound_completed(transfer_id, td::Timestamp::now());
to_receive_.emplace_back(transfer_id, o_res.unwrap());
}
}
void RldpConnection::receive_raw_obj(ton::ton_api::rldp2_complete &complete) {
auto transfer_id = complete.transfer_id_;
auto it = outbound_transfers_.find(transfer_id);
if (it == outbound_transfers_.end()) {
return;
}
auto *part = it->second.get_part(complete.part_);
if (part) {
in_flight_count_ -= part->sender.get_inflight_symbols_count();
it->second.drop_part(complete.part_);
}
if (it->second.is_done()) {
drop_limits(it->first);
to_on_sent_.emplace_back(it->first, td::Unit());
outbound_transfers_.erase(it);
}
}
void RldpConnection::receive_raw_obj(ton::ton_api::rldp2_confirm &confirm) {
auto transfer_id = confirm.transfer_id_;
auto it = outbound_transfers_.find(transfer_id);
if (it == outbound_transfers_.end()) {
return;
}
auto *part = it->second.get_part(confirm.part_);
if (!part) {
return;
}
Guard guard(in_flight_count_, part->sender);
Ack ack;
ack.max_seqno = confirm.max_seqno_;
ack.received_count = confirm.received_count_;
ack.received_mask = confirm.received_mask_;
auto update = part->sender.on_ack(ack, 0, td::Timestamp::now(), rtt_stats_, bdw_stats_, loss_stats_);
// update.new_received event
// update.o_loss_at event
}
} // namespace rldp2
} // namespace ton