1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Ratelimit nochannel ADNL packets (#1147)

* Get ADNL stats in validator console

* Add timestamp to stats

* Limit nochannel adnl packets

---------

Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
EmelyanenkoK 2024-09-03 13:34:31 +03:00 committed by GitHub
parent e08111159f
commit b2b79fead1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 838 additions and 175 deletions

View file

@ -26,6 +26,7 @@
#include "td/utils/base64.h"
#include "td/utils/Random.h"
#include "auto/tl/ton_api.h"
#include "td/utils/overloaded.h"
namespace ton {
@ -50,9 +51,13 @@ void AdnlPeerPairImpl::start_up() {
}
void AdnlPeerPairImpl::alarm() {
if (next_dht_query_at_ && next_dht_query_at_.is_in_past()) {
next_dht_query_at_ = td::Timestamp::never();
discover();
if (!disable_dht_query_) {
disable_dht_query_ = true;
if (next_dht_query_at_ && next_dht_query_at_.is_in_past()) {
next_dht_query_at_ = td::Timestamp::never();
discover();
}
alarm_timestamp().relax(next_dht_query_at_);
}
if (next_db_update_at_ && next_db_update_at_.is_in_past()) {
if (received_from_db_ && received_from_static_nodes_ && !peer_id_.empty()) {
@ -68,11 +73,8 @@ void AdnlPeerPairImpl::alarm() {
}
if (retry_send_at_ && retry_send_at_.is_in_past()) {
retry_send_at_ = td::Timestamp::never();
auto messages = std::move(pending_messages_);
pending_messages_.clear();
send_messages_in(std::move(messages), false);
send_messages_from_queue();
}
alarm_timestamp().relax(next_dht_query_at_);
alarm_timestamp().relax(next_db_update_at_);
alarm_timestamp().relax(retry_send_at_);
}
@ -207,18 +209,24 @@ void AdnlPeerPairImpl::receive_packet_checked(AdnlPacket packet) {
}
}
void AdnlPeerPairImpl::receive_packet_from_channel(AdnlChannelIdShort id, AdnlPacket packet) {
void AdnlPeerPairImpl::receive_packet_from_channel(AdnlChannelIdShort id, AdnlPacket packet,
td::uint64 serialized_size) {
add_packet_stats(serialized_size, /* in = */ true, /* channel = */ true);
if (id != channel_in_id_) {
VLOG(ADNL_NOTICE) << this << ": dropping IN message: outdated channel id" << id;
return;
}
if (channel_inited_) {
if (channel_inited_ && !channel_ready_) {
channel_ready_ = true;
if (!out_messages_queue_.empty()) {
td::actor::send_closure(actor_id(this), &AdnlPeerPairImpl::send_messages_from_queue);
}
}
receive_packet_checked(std::move(packet));
}
void AdnlPeerPairImpl::receive_packet(AdnlPacket packet) {
void AdnlPeerPairImpl::receive_packet(AdnlPacket packet, td::uint64 serialized_size) {
add_packet_stats(serialized_size, /* in = */ true, /* channel = */ false);
packet.run_basic_checks().ensure();
if (!encryptor_) {
@ -239,132 +247,132 @@ void AdnlPeerPairImpl::deliver_message(AdnlMessage message) {
message.visit([&](const auto &obj) { this->process_message(obj); });
}
void AdnlPeerPairImpl::send_messages_in(std::vector<OutboundAdnlMessage> messages, bool allow_postpone) {
for (td::int32 idx = 0; idx < 2; idx++) {
std::vector<OutboundAdnlMessage> not_sent;
void AdnlPeerPairImpl::send_messages_from_queue() {
while (!out_messages_queue_.empty() && out_messages_queue_.front().second.is_in_past()) {
out_messages_queue_total_size_ -= out_messages_queue_.front().first.size();
add_expired_msg_stats(out_messages_queue_.front().first.size());
out_messages_queue_.pop();
VLOG(ADNL_NOTICE) << this << ": dropping OUT message: message in queue expired";
}
if (out_messages_queue_.empty()) {
return;
}
auto connR = get_conn(idx == 1);
if (connR.is_error()) {
if (!allow_postpone) {
VLOG(ADNL_NOTICE) << this << ": dropping OUT messages: cannot get conn: " << connR.move_as_error();
return;
}
VLOG(ADNL_INFO) << this << ": delaying OUT messages: cannot get conn: " << connR.move_as_error();
if (!retry_send_at_) {
retry_send_at_.relax(td::Timestamp::in(10.0));
alarm_timestamp().relax(retry_send_at_);
}
for (auto &m : messages) {
pending_messages_.push_back(std::move(m));
}
auto connR = get_conn();
if (connR.is_error()) {
disable_dht_query_ = false;
retry_send_at_.relax(td::Timestamp::in(message_in_queue_ttl_ - 1.0));
alarm_timestamp().relax(retry_send_at_);
VLOG(ADNL_INFO) << this << ": delaying OUT messages: cannot get conn: " << connR.move_as_error();
return;
}
disable_dht_query_ = true;
auto C = connR.move_as_ok();
auto conn = std::move(C.first);
bool is_direct = C.second;
bool first = !skip_init_packet_;
while (!out_messages_queue_.empty()) {
bool try_reinit = try_reinit_at_ && try_reinit_at_.is_in_past();
bool via_channel = channel_ready_ && !try_reinit;
if (!via_channel && !nochannel_rate_limiter_.take()) {
alarm_timestamp().relax(retry_send_at_ = nochannel_rate_limiter_.ready_at());
return;
}
auto C = connR.move_as_ok();
bool is_direct = C.second;
auto conn = std::move(C.first);
if (idx == 1) {
CHECK(is_direct);
if (try_reinit) {
try_reinit_at_ = td::Timestamp::in(td::Random::fast(0.5, 1.5));
}
respond_with_nop_after_ = td::Timestamp::in(td::Random::fast(1.0, 2.0));
size_t s = (via_channel ? channel_packet_header_max_size() : packet_header_max_size());
if (first) {
s += 2 * addr_list_max_size();
}
size_t ptr = 0;
bool first = true;
do {
respond_with_nop_after_ = td::Timestamp::in(td::Random::fast(1.0, 2.0));
bool try_reinit = try_reinit_at_ && try_reinit_at_.is_in_past();
if (try_reinit) {
try_reinit_at_ = td::Timestamp::in(td::Random::fast(0.5, 1.5));
}
bool via_channel = channel_ready_ && !try_reinit;
size_t s = (via_channel ? channel_packet_header_max_size() : packet_header_max_size());
if (first) {
s += 2 * addr_list_max_size();
}
AdnlPacket packet;
packet.set_seqno(++out_seqno_);
packet.set_confirm_seqno(in_seqno_);
AdnlPacket packet;
packet.set_seqno(++out_seqno_);
packet.set_confirm_seqno(in_seqno_);
if (first) {
if (!channel_inited_) {
auto M = adnlmessage::AdnlMessageCreateChannel{channel_pub_, channel_pk_date_};
s += M.size();
packet.add_message(std::move(M));
} else if (!channel_ready_) {
auto M = adnlmessage::AdnlMessageConfirmChannel{channel_pub_, peer_channel_pub_, channel_pk_date_};
s += M.size();
packet.add_message(std::move(M));
}
if (first) {
if (!channel_inited_) {
auto M = adnlmessage::AdnlMessageCreateChannel{channel_pub_, channel_pk_date_};
s += M.size();
packet.add_message(std::move(M));
} else if (!channel_ready_) {
auto M = adnlmessage::AdnlMessageConfirmChannel{channel_pub_, peer_channel_pub_, channel_pk_date_};
s += M.size();
packet.add_message(std::move(M));
}
}
if (!addr_list_.empty()) {
packet.set_received_addr_list_version(addr_list_.version());
}
if (!priority_addr_list_.empty()) {
packet.set_received_priority_addr_list_version(priority_addr_list_.version());
}
if (!addr_list_.empty()) {
packet.set_received_addr_list_version(addr_list_.version());
}
if (!priority_addr_list_.empty()) {
packet.set_received_priority_addr_list_version(priority_addr_list_.version());
}
while (ptr < messages.size()) {
auto &M = messages[ptr];
if (!is_direct && (M.flags() & Adnl::SendFlags::direct_only)) {
not_sent.push_back(std::move(M));
continue;
}
CHECK(M.size() <= get_mtu());
skip_init_packet_ = true;
while (!out_messages_queue_.empty()) {
auto &M = out_messages_queue_.front().first;
if (!is_direct && (M.flags() & Adnl::SendFlags::direct_only)) {
out_messages_queue_total_size_ -= M.size();
out_messages_queue_.pop();
continue;
}
CHECK(M.size() <= get_mtu());
if (s + M.size() <= AdnlNetworkManager::get_mtu()) {
s += M.size();
out_messages_queue_total_size_ -= M.size();
packet.add_message(M.release());
out_messages_queue_.pop();
skip_init_packet_ = false;
} else {
break;
}
}
if (!via_channel) {
packet.set_reinit_date(Adnl::adnl_start_time(), reinit_date_);
packet.set_source(local_id_);
}
if (!first) {
if (!channel_inited_) {
auto M = adnlmessage::AdnlMessageCreateChannel{channel_pub_, channel_pk_date_};
if (s + M.size() <= AdnlNetworkManager::get_mtu()) {
s += M.size();
packet.add_message(M.release());
ptr++;
} else {
break;
packet.add_message(std::move(M));
}
} else if (!channel_ready_) {
auto M = adnlmessage::AdnlMessageConfirmChannel{channel_pub_, peer_channel_pub_, channel_pk_date_};
if (s + M.size() <= AdnlNetworkManager::get_mtu()) {
s += M.size();
packet.add_message(std::move(M));
}
}
if (!via_channel) {
packet.set_reinit_date(Adnl::adnl_start_time(), reinit_date_);
packet.set_source(local_id_);
}
if (!first) {
if (!channel_inited_) {
auto M = adnlmessage::AdnlMessageCreateChannel{channel_pub_, channel_pk_date_};
if (s + M.size() <= AdnlNetworkManager::get_mtu()) {
s += M.size();
packet.add_message(std::move(M));
}
} else if (!channel_ready_) {
auto M = adnlmessage::AdnlMessageConfirmChannel{channel_pub_, peer_channel_pub_, channel_pk_date_};
if (s + M.size() <= AdnlNetworkManager::get_mtu()) {
s += M.size();
packet.add_message(std::move(M));
}
}
}
packet.run_basic_checks().ensure();
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), conn, id = print_id(),
via_channel](td::Result<AdnlPacket> res) {
if (res.is_error()) {
LOG(ERROR) << id << ": dropping OUT message: error while creating packet: " << res.move_as_error();
} else {
td::actor::send_closure(SelfId, &AdnlPeerPairImpl::send_packet_continue, res.move_as_ok(), conn, via_channel);
}
});
td::actor::send_closure(local_actor_, &AdnlLocalId::update_packet, std::move(packet),
(!channel_ready_ && ack_seqno_ == 0 && in_seqno_ == 0) || try_reinit, !via_channel,
(first || s + addr_list_max_size() <= AdnlNetworkManager::get_mtu())
? (try_reinit ? 0 : peer_recv_addr_list_version_)
: 0x7fffffff,
(first || s + 2 * addr_list_max_size() <= AdnlNetworkManager::get_mtu())
? peer_recv_priority_addr_list_version_
: 0x7fffffff,
std::move(P));
first = false;
} while (ptr < messages.size());
messages = std::move(not_sent);
if (!messages.size()) {
break;
}
packet.run_basic_checks().ensure();
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), conn, id = print_id(),
via_channel](td::Result<AdnlPacket> res) {
if (res.is_error()) {
LOG(ERROR) << id << ": dropping OUT message: error while creating packet: " << res.move_as_error();
} else {
td::actor::send_closure(SelfId, &AdnlPeerPairImpl::send_packet_continue, res.move_as_ok(), conn, via_channel);
}
});
td::actor::send_closure(local_actor_, &AdnlLocalId::update_packet, std::move(packet),
(!channel_ready_ && ack_seqno_ == 0 && in_seqno_ == 0) || try_reinit, !via_channel,
(first || s + addr_list_max_size() <= AdnlNetworkManager::get_mtu())
? (try_reinit ? 0 : peer_recv_addr_list_version_)
: 0x7fffffff,
(first || s + 2 * addr_list_max_size() <= AdnlNetworkManager::get_mtu())
? peer_recv_priority_addr_list_version_
: 0x7fffffff,
std::move(P));
first = false;
}
}
@ -395,7 +403,11 @@ void AdnlPeerPairImpl::send_messages(std::vector<OutboundAdnlMessage> messages)
}
}
}
send_messages_in(std::move(new_vec), true);
for (auto &m : new_vec) {
out_messages_queue_total_size_ += m.size();
out_messages_queue_.emplace(std::move(m), td::Timestamp::in(message_in_queue_ttl_));
}
send_messages_from_queue();
}
void AdnlPeerPairImpl::send_packet_continue(AdnlPacket packet, td::actor::ActorId<AdnlNetworkConnection> conn,
@ -407,6 +419,7 @@ void AdnlPeerPairImpl::send_packet_continue(AdnlPacket packet, td::actor::ActorI
auto B = serialize_tl_object(packet.tl(), true);
if (via_channel) {
if (channel_ready_) {
add_packet_stats(B.size(), /* in = */ false, /* channel = */ true);
td::actor::send_closure(channel_, &AdnlChannel::send_message, priority_, conn, std::move(B));
} else {
VLOG(ADNL_WARNING) << this << ": dropping OUT message [" << local_id_ << "->" << peer_id_short_
@ -434,6 +447,7 @@ void AdnlPeerPairImpl::send_packet_continue(AdnlPacket packet, td::actor::ActorI
S.remove_prefix(32);
S.copy_from(X.as_slice());
add_packet_stats(B.size(), /* in = */ false, /* channel = */ false);
td::actor::send_closure(conn, &AdnlNetworkConnection::send, local_id_, peer_id_short_, priority_, std::move(enc));
}
@ -520,7 +534,10 @@ void AdnlPeerPairImpl::process_message(const adnlmessage::AdnlMessageConfirmChan
VLOG(ADNL_NOTICE) << this << ": received adnl.message.confirmChannel with old key";
return;
}
channel_ready_ = true;
if (!channel_ready_) {
channel_ready_ = true;
send_messages_from_queue();
}
}
void AdnlPeerPairImpl::process_message(const adnlmessage::AdnlMessageCustom &message) {
@ -674,7 +691,7 @@ void AdnlPeerPairImpl::reinit(td::int32 date) {
}
}
td::Result<std::pair<td::actor::ActorId<AdnlNetworkConnection>, bool>> AdnlPeerPairImpl::get_conn(bool direct_only) {
td::Result<std::pair<td::actor::ActorId<AdnlNetworkConnection>, bool>> AdnlPeerPairImpl::get_conn() {
if (!priority_addr_list_.empty() && priority_addr_list_.expire_at() < td::Clocks::system()) {
priority_addr_list_ = AdnlAddressList{};
priority_conns_.clear();
@ -692,14 +709,18 @@ td::Result<std::pair<td::actor::ActorId<AdnlNetworkConnection>, bool>> AdnlPeerP
}
}
for (auto &conn : priority_conns_) {
if (conn.ready() && (!direct_only || conn.is_direct())) {
return std::make_pair(conn.conn.get(), conn.is_direct());
for (int direct_only = 1; direct_only >= 0; --direct_only) {
for (auto &conn : priority_conns_) {
if (conn.ready() && (!direct_only || conn.is_direct())) {
return std::make_pair(conn.conn.get(), conn.is_direct());
}
}
}
for (auto &conn : conns_) {
if (conn.ready() && (!direct_only || conn.is_direct())) {
return std::make_pair(conn.conn.get(), conn.is_direct());
for (int direct_only = 1; direct_only >= 0; --direct_only) {
for (auto &conn : conns_) {
if (conn.ready() && (!direct_only || conn.is_direct())) {
return std::make_pair(conn.conn.get(), conn.is_direct());
}
}
}
return td::Status::Error(ErrorCode::notready, "no active connections");
@ -787,6 +808,47 @@ void AdnlPeerPairImpl::get_conn_ip_str(td::Promise<td::string> promise) {
promise.set_value("undefined");
}
void AdnlPeerPairImpl::get_stats(td::Promise<tl_object_ptr<ton_api::adnl_stats_peerPair>> promise) {
auto stats = create_tl_object<ton_api::adnl_stats_peerPair>();
stats->local_id_ = local_id_.bits256_value();
stats->peer_id_ = peer_id_short_.bits256_value();
for (const AdnlAddress &addr : addr_list_.addrs()) {
ton_api::downcast_call(*addr->tl(), td::overloaded(
[&](const ton_api::adnl_address_udp &obj) {
stats->ip_str_ = PSTRING() << td::IPAddress::ipv4_to_str(obj.ip_) << ":"
<< obj.port_;
},
[&](const auto &) {}));
if (!stats->ip_str_.empty()) {
break;
}
}
prepare_packet_stats();
stats->last_in_packet_ts_ = last_in_packet_ts_;
stats->last_out_packet_ts_ = last_out_packet_ts_;
stats->packets_total_ = packet_stats_total_.tl();
stats->packets_total_->ts_start_ = started_ts_;
stats->packets_total_->ts_end_ = td::Clocks::system();
stats->packets_recent_ = packet_stats_prev_.tl();
if (channel_ready_) {
stats->channel_status_ = 2;
} else if (channel_inited_) {
stats->channel_status_ = 1;
} else {
stats->channel_status_ = 0;
}
stats->try_reinit_at_ = (try_reinit_at_ ? try_reinit_at_.at_unix() : 0.0);
stats->connection_ready_ =
std::any_of(conns_.begin(), conns_.end(), [](const Conn &conn) { return conn.ready(); }) ||
std::any_of(priority_conns_.begin(), priority_conns_.end(), [](const Conn &conn) { return conn.ready(); });
stats->out_queue_messages_ = out_messages_queue_.size();
stats->out_queue_bytes_ = out_messages_queue_total_size_;
promise.set_result(std::move(stats));
}
void AdnlPeerImpl::update_id(AdnlNodeIdFull id) {
CHECK(id.compute_short_id() == peer_id_short_);
if (!peer_id_.empty()) {
@ -810,10 +872,8 @@ void AdnlPeerPairImpl::Conn::create_conn(td::actor::ActorId<AdnlPeerPairImpl> pe
void AdnlPeerPairImpl::conn_change_state(AdnlConnectionIdShort id, bool ready) {
if (ready) {
if (pending_messages_.size() > 0) {
auto messages = std::move(pending_messages_);
pending_messages_.clear();
send_messages_in(std::move(messages), true);
if (out_messages_queue_.empty()) {
send_messages_from_queue();
}
}
}
@ -835,7 +895,7 @@ td::actor::ActorOwn<AdnlPeer> AdnlPeer::create(td::actor::ActorId<AdnlNetworkMan
}
void AdnlPeerImpl::receive_packet(AdnlNodeIdShort dst, td::uint32 dst_mode, td::actor::ActorId<AdnlLocalId> dst_actor,
AdnlPacket packet) {
AdnlPacket packet, td::uint64 serialized_size) {
if (packet.inited_from()) {
update_id(packet.from());
}
@ -853,7 +913,7 @@ void AdnlPeerImpl::receive_packet(AdnlNodeIdShort dst, td::uint32 dst_mode, td::
}
}
td::actor::send_closure(it->second.get(), &AdnlPeerPair::receive_packet, std::move(packet));
td::actor::send_closure(it->second.get(), &AdnlPeerPair::receive_packet, std::move(packet), serialized_size);
}
void AdnlPeerImpl::send_messages(AdnlNodeIdShort src, td::uint32 src_mode, td::actor::ActorId<AdnlLocalId> src_actor,
@ -933,6 +993,56 @@ void AdnlPeerImpl::update_addr_list(AdnlNodeIdShort local_id, td::uint32 local_m
td::actor::send_closure(it->second, &AdnlPeerPair::update_addr_list, std::move(addr_list));
}
void AdnlPeerImpl::get_stats(td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise) {
class Cb : public td::actor::Actor {
public:
explicit Cb(td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise)
: promise_(std::move(promise)) {
}
void got_peer_pair_stats(tl_object_ptr<ton_api::adnl_stats_peerPair> peer_pair) {
result_.push_back(std::move(peer_pair));
dec_pending();
}
void inc_pending() {
++pending_;
}
void dec_pending() {
CHECK(pending_ > 0);
--pending_;
if (pending_ == 0) {
promise_.set_result(std::move(result_));
stop();
}
}
private:
td::Promise<std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>>> promise_;
size_t pending_ = 1;
std::vector<tl_object_ptr<ton_api::adnl_stats_peerPair>> result_;
};
auto callback = td::actor::create_actor<Cb>("adnlpeerstats", std::move(promise)).release();
for (auto &[local_id, peer_pair] : peer_pairs_) {
td::actor::send_closure(callback, &Cb::inc_pending);
td::actor::send_closure(peer_pair, &AdnlPeerPair::get_stats,
[local_id = local_id, peer_id = peer_id_short_,
callback](td::Result<tl_object_ptr<ton_api::adnl_stats_peerPair>> R) {
if (R.is_error()) {
VLOG(ADNL_NOTICE) << "failed to get stats for peer pair " << peer_id << "->" << local_id
<< " : " << R.move_as_error();
td::actor::send_closure(callback, &Cb::dec_pending);
} else {
td::actor::send_closure(callback, &Cb::got_peer_pair_stats, R.move_as_ok());
}
});
}
td::actor::send_closure(callback, &Cb::dec_pending);
}
void AdnlPeerPairImpl::got_data_from_db(td::Result<AdnlDbItem> R) {
received_from_db_ = false;
if (R.is_error()) {
@ -1016,6 +1126,66 @@ void AdnlPeerPairImpl::request_reverse_ping_result(td::Result<td::Unit> R) {
}
}
void AdnlPeerPairImpl::add_packet_stats(td::uint64 bytes, bool in, bool channel) {
prepare_packet_stats();
auto add_stats = [&](PacketStats &stats) {
if (in) {
++stats.in_packets;
stats.in_bytes += bytes;
if (channel) {
++stats.in_packets_channel;
stats.in_bytes_channel += bytes;
}
} else {
++stats.out_packets;
stats.out_bytes += bytes;
if (channel) {
++stats.out_packets_channel;
stats.out_bytes_channel += bytes;
}
}
};
add_stats(packet_stats_cur_);
add_stats(packet_stats_total_);
if (in) {
last_in_packet_ts_ = td::Clocks::system();
} else {
last_out_packet_ts_ = td::Clocks::system();
}
}
void AdnlPeerPairImpl::add_expired_msg_stats(td::uint64 bytes) {
prepare_packet_stats();
auto add_stats = [&](PacketStats &stats) {
++stats.out_expired_messages;
stats.out_expired_bytes += bytes;
};
add_stats(packet_stats_cur_);
add_stats(packet_stats_total_);
}
void AdnlPeerPairImpl::prepare_packet_stats() {
double now = td::Clocks::system();
if (now >= packet_stats_cur_.ts_end) {
packet_stats_prev_ = std::move(packet_stats_cur_);
packet_stats_cur_ = {};
auto now_int = (int)now;
packet_stats_cur_.ts_start = (double)(now_int / 60 * 60);
packet_stats_cur_.ts_end = packet_stats_cur_.ts_start + 60.0;
if (packet_stats_prev_.ts_end < now - 60.0) {
packet_stats_prev_ = {};
packet_stats_prev_.ts_end = packet_stats_cur_.ts_start;
packet_stats_prev_.ts_start = packet_stats_prev_.ts_end - 60.0;
}
}
}
tl_object_ptr<ton_api::adnl_stats_packets> AdnlPeerPairImpl::PacketStats::tl() const {
return create_tl_object<ton_api::adnl_stats_packets>(ts_start, ts_end, in_packets, in_bytes, in_packets_channel,
in_bytes_channel, out_packets, out_bytes, out_packets_channel,
out_bytes_channel, out_expired_messages, out_expired_bytes);
}
} // namespace adnl
} // namespace ton