mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Limit checked external messages per address (#1005)
* Limit checked external messages per address * Change max_ext_msg_per_addr_time_window; cleanup mempool by timer --------- Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
parent
d80ce8d3eb
commit
ceefac74cf
6 changed files with 92 additions and 23 deletions
|
@ -49,8 +49,8 @@ td::Result<std::vector<td::Ref<ShardTopBlockDescription>>> create_new_shard_bloc
|
|||
|
||||
td::Ref<BlockSignatureSet> create_signature_set(std::vector<BlockSignature> sig_set);
|
||||
|
||||
void run_check_external_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
|
||||
td::actor::ActorId<ValidatorManager> manager, td::Promise<td::Ref<ExtMessage>> promise);
|
||||
void run_check_external_message(td::Ref<ExtMessage> message, td::actor::ActorId<ValidatorManager> manager,
|
||||
td::Promise<td::Ref<ExtMessage>> promise);
|
||||
|
||||
void run_accept_block_query(BlockIdExt id, td::Ref<BlockData> data, std::vector<BlockIdExt> prev,
|
||||
td::Ref<ValidatorSet> validator_set, td::Ref<BlockSignatureSet> signatures,
|
||||
|
|
|
@ -86,24 +86,18 @@ td::Result<Ref<ExtMessageQ>> ExtMessageQ::create_ext_message(td::BufferSlice dat
|
|||
return Ref<ExtMessageQ>{true, std::move(data), std::move(ext_msg), dest_prefix, wc, addr};
|
||||
}
|
||||
|
||||
void ExtMessageQ::run_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
|
||||
td::actor::ActorId<ton::validator::ValidatorManager> manager,
|
||||
void ExtMessageQ::run_message(td::Ref<ExtMessage> message, td::actor::ActorId<ton::validator::ValidatorManager> manager,
|
||||
td::Promise<td::Ref<ExtMessage>> promise) {
|
||||
auto R = create_ext_message(std::move(data), limits);
|
||||
if (R.is_error()) {
|
||||
return promise.set_error(R.move_as_error_prefix("failed to parse external message "));
|
||||
}
|
||||
auto M = R.move_as_ok();
|
||||
auto root = M->root_cell();
|
||||
auto root = message->root_cell();
|
||||
block::gen::CommonMsgInfo::Record_ext_in_msg_info info;
|
||||
tlb::unpack_cell_inexact(root, info); // checked in create message
|
||||
ton::StdSmcAddress addr = M->addr();
|
||||
ton::WorkchainId wc = M->wc();
|
||||
ton::StdSmcAddress addr = message->addr();
|
||||
ton::WorkchainId wc = message->wc();
|
||||
|
||||
run_fetch_account_state(
|
||||
wc, addr, manager,
|
||||
[promise = std::move(promise), msg_root = root, wc, addr,
|
||||
M](td::Result<std::tuple<td::Ref<vm::CellSlice>, UnixTime, LogicalTime, std::unique_ptr<block::ConfigInfo>>>
|
||||
[promise = std::move(promise), msg_root = root, wc, addr, message](
|
||||
td::Result<std::tuple<td::Ref<vm::CellSlice>, UnixTime, LogicalTime, std::unique_ptr<block::ConfigInfo>>>
|
||||
res) mutable {
|
||||
if (res.is_error()) {
|
||||
promise.set_error(td::Status::Error(PSLICE() << "Failed to get account state"));
|
||||
|
@ -120,7 +114,7 @@ void ExtMessageQ::run_message(td::BufferSlice data, block::SizeLimitsConfig::Ext
|
|||
} else {
|
||||
auto status = run_message_on_account(wc, &acc, utime, lt + 1, msg_root, std::move(config));
|
||||
if (status.is_ok()) {
|
||||
promise.set_value(std::move(M));
|
||||
promise.set_value(std::move(message));
|
||||
} else {
|
||||
promise.set_error(td::Status::Error(PSLICE() << "External message was not accepted\n"
|
||||
<< status.message()));
|
||||
|
|
|
@ -61,8 +61,7 @@ class ExtMessageQ : public ExtMessage {
|
|||
ton::StdSmcAddress addr);
|
||||
static td::Result<td::Ref<ExtMessageQ>> create_ext_message(td::BufferSlice data,
|
||||
block::SizeLimitsConfig::ExtMsgLimits limits);
|
||||
static void run_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
|
||||
td::actor::ActorId<ton::validator::ValidatorManager> manager,
|
||||
static void run_message(td::Ref<ExtMessage> message, td::actor::ActorId<ton::validator::ValidatorManager> manager,
|
||||
td::Promise<td::Ref<ExtMessage>> promise);
|
||||
static td::Status run_message_on_account(ton::WorkchainId wc,
|
||||
block::Account* acc,
|
||||
|
|
|
@ -119,10 +119,9 @@ td::Result<td::Ref<ExtMessage>> create_ext_message(td::BufferSlice data,
|
|||
return std::move(res);
|
||||
}
|
||||
|
||||
void run_check_external_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
|
||||
td::actor::ActorId<ValidatorManager> manager,
|
||||
void run_check_external_message(Ref<ExtMessage> message, td::actor::ActorId<ValidatorManager> manager,
|
||||
td::Promise<td::Ref<ExtMessage>> promise) {
|
||||
ExtMessageQ::run_message(std::move(data), limits, std::move(manager), std::move(promise));
|
||||
ExtMessageQ::run_message(std::move(message), std::move(manager), std::move(promise));
|
||||
}
|
||||
|
||||
td::Result<td::Ref<IhrMessage>> create_ihr_message(td::BufferSlice data) {
|
||||
|
|
|
@ -412,14 +412,42 @@ void ValidatorManagerImpl::add_external_message(td::Ref<ExtMessage> msg, int pri
|
|||
ext_messages_hashes_[id.hash] = {priority, id};
|
||||
}
|
||||
void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) {
|
||||
++ls_stats_check_ext_messages_;
|
||||
auto state = do_get_last_liteserver_state();
|
||||
if (state.is_null()) {
|
||||
promise.set_error(td::Status::Error(ErrorCode::notready, "not ready"));
|
||||
return;
|
||||
}
|
||||
run_check_external_message(std::move(data), state->get_ext_msg_limits(), actor_id(this),
|
||||
std::move(promise));
|
||||
auto R = create_ext_message(std::move(data), state->get_ext_msg_limits());
|
||||
if (R.is_error()) {
|
||||
promise.set_error(R.move_as_error_prefix("failed to parse external message: "));
|
||||
return;
|
||||
}
|
||||
auto message = R.move_as_ok();
|
||||
WorkchainId wc = message->wc();
|
||||
StdSmcAddress addr = message->addr();
|
||||
if (checked_ext_msg_counter_.get_msg_count(wc, addr) >= max_ext_msg_per_addr()) {
|
||||
promise.set_error(
|
||||
td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex()));
|
||||
return;
|
||||
}
|
||||
|
||||
promise = [self = this, wc, addr, promise = std::move(promise),
|
||||
SelfId = actor_id(this)](td::Result<td::Ref<ExtMessage>> R) mutable {
|
||||
if (R.is_error()) {
|
||||
promise.set_error(R.move_as_error());
|
||||
return;
|
||||
}
|
||||
td::actor::send_lambda(SelfId, [=, promise = std::move(promise), message = R.move_as_ok()]() mutable {
|
||||
if (self->checked_ext_msg_counter_.inc_msg_count(wc, addr) > max_ext_msg_per_addr()) {
|
||||
promise.set_error(
|
||||
td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex()));
|
||||
return;
|
||||
}
|
||||
promise.set_result(std::move(message));
|
||||
});
|
||||
};
|
||||
++ls_stats_check_ext_messages_;
|
||||
run_check_external_message(std::move(message), actor_id(this), std::move(promise));
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::new_ihr_message(td::BufferSlice data) {
|
||||
|
@ -2592,6 +2620,16 @@ void ValidatorManagerImpl::alarm() {
|
|||
log_ls_stats_at_ = td::Timestamp::in(60.0);
|
||||
}
|
||||
alarm_timestamp().relax(log_ls_stats_at_);
|
||||
if (cleanup_mempool_at_.is_in_past()) {
|
||||
if (is_validator()) {
|
||||
get_external_messages(ShardIdFull{masterchainId, shardIdAll},
|
||||
[](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
|
||||
get_external_messages(ShardIdFull{basechainId, shardIdAll},
|
||||
[](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
|
||||
}
|
||||
cleanup_mempool_at_ = td::Timestamp::in(250.0);
|
||||
}
|
||||
alarm_timestamp().relax(cleanup_mempool_at_);
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) {
|
||||
|
@ -3102,6 +3140,29 @@ td::actor::ActorOwn<ValidatorManagerInterface> ValidatorManagerFactory::create(
|
|||
rldp, overlays);
|
||||
}
|
||||
|
||||
size_t ValidatorManagerImpl::CheckedExtMsgCounter::get_msg_count(WorkchainId wc, StdSmcAddress addr) {
|
||||
before_query();
|
||||
auto it1 = counter_cur_.find({wc, addr});
|
||||
auto it2 = counter_prev_.find({wc, addr});
|
||||
return (it1 == counter_cur_.end() ? 0 : it1->second) + (it2 == counter_prev_.end() ? 0 : it2->second);
|
||||
}
|
||||
size_t ValidatorManagerImpl::CheckedExtMsgCounter::inc_msg_count(WorkchainId wc, StdSmcAddress addr) {
|
||||
before_query();
|
||||
auto it2 = counter_prev_.find({wc, addr});
|
||||
return (it2 == counter_prev_.end() ? 0 : it2->second) + ++counter_cur_[{wc, addr}];
|
||||
}
|
||||
void ValidatorManagerImpl::CheckedExtMsgCounter::before_query() {
|
||||
while (cleanup_at_.is_in_past()) {
|
||||
counter_prev_ = std::move(counter_cur_);
|
||||
counter_cur_.clear();
|
||||
if (counter_prev_.empty()) {
|
||||
cleanup_at_ = td::Timestamp::in(max_ext_msg_per_addr_time_window() / 2.0);
|
||||
break;
|
||||
}
|
||||
cleanup_at_ += max_ext_msg_per_addr_time_window() / 2.0;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace validator
|
||||
|
||||
} // namespace ton
|
||||
|
|
|
@ -241,10 +241,20 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
};
|
||||
std::map<int, ExtMessages> ext_msgs_; // priority -> messages
|
||||
std::map<ExtMessage::Hash, std::pair<int, MessageId<ExtMessage>>> ext_messages_hashes_; // hash -> priority
|
||||
td::Timestamp cleanup_mempool_at_;
|
||||
// IHR ?
|
||||
std::map<MessageId<IhrMessage>, std::unique_ptr<MessageExt<IhrMessage>>> ihr_messages_;
|
||||
std::map<IhrMessage::Hash, MessageId<IhrMessage>> ihr_messages_hashes_;
|
||||
|
||||
struct CheckedExtMsgCounter {
|
||||
std::map<std::pair<WorkchainId, StdSmcAddress>, size_t> counter_cur_, counter_prev_;
|
||||
td::Timestamp cleanup_at_ = td::Timestamp::now();
|
||||
|
||||
size_t get_msg_count(WorkchainId wc, StdSmcAddress addr);
|
||||
size_t inc_msg_count(WorkchainId wc, StdSmcAddress addr);
|
||||
void before_query();
|
||||
} checked_ext_msg_counter_;
|
||||
|
||||
private:
|
||||
// VALIDATOR GROUPS
|
||||
ValidatorSessionId get_validator_set_id(ShardIdFull shard, td::Ref<ValidatorSet> val_set, td::Bits256 opts_hash,
|
||||
|
@ -678,6 +688,12 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
size_t max_cached_candidates() const {
|
||||
return 128;
|
||||
}
|
||||
static double max_ext_msg_per_addr_time_window() {
|
||||
return 10.0;
|
||||
}
|
||||
static size_t max_ext_msg_per_addr() {
|
||||
return 3 * 10;
|
||||
}
|
||||
|
||||
private:
|
||||
std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue