mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Add collator options (#1052)
* Set collator options from validator console * Fix compilation error in manager-disk * Defer all messages if out msg queue is too big * Fix checking queue size in collator --------- Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
parent
c54f095c1b
commit
57f95cc282
22 changed files with 294 additions and 27 deletions
|
@ -71,6 +71,7 @@ class Collator final : public td::actor::Actor {
|
|||
std::vector<Ref<ShardState>> prev_states;
|
||||
std::vector<Ref<BlockData>> prev_block_data;
|
||||
Ed25519_PublicKey created_by_;
|
||||
Ref<CollatorOptions> collator_opts_;
|
||||
Ref<ValidatorSet> validator_set_;
|
||||
td::actor::ActorId<ValidatorManager> manager;
|
||||
td::Timestamp timeout;
|
||||
|
@ -90,7 +91,8 @@ class Collator final : public td::actor::Actor {
|
|||
public:
|
||||
Collator(ShardIdFull shard, bool is_hardfork, td::uint32 min_ts, BlockIdExt min_masterchain_block_id,
|
||||
std::vector<BlockIdExt> prev, Ref<ValidatorSet> validator_set, Ed25519_PublicKey collator_id,
|
||||
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout, td::Promise<BlockCandidate> promise);
|
||||
Ref<CollatorOptions> collator_opts, td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
|
||||
td::Promise<BlockCandidate> promise);
|
||||
~Collator() override = default;
|
||||
bool is_busy() const {
|
||||
return busy_;
|
||||
|
@ -195,6 +197,7 @@ class Collator final : public td::actor::Actor {
|
|||
std::unique_ptr<vm::AugmentedDictionary> in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_;
|
||||
std::map<StdSmcAddress, size_t> unprocessed_deferred_messages_; // number of messages from dispatch queue in new_msgs
|
||||
td::uint64 out_msg_queue_size_ = 0;
|
||||
td::uint64 old_out_msg_queue_size_ = 0;
|
||||
bool have_out_msg_queue_size_in_state_ = false;
|
||||
std::unique_ptr<vm::Dictionary> ihr_pending;
|
||||
std::shared_ptr<block::MsgProcessedUptoCollection> processed_upto_, sibling_processed_upto_;
|
||||
|
@ -211,6 +214,8 @@ class Collator final : public td::actor::Actor {
|
|||
unsigned dispatch_queue_ops_{0};
|
||||
std::map<StdSmcAddress, LogicalTime> last_dispatch_queue_emitted_lt_;
|
||||
bool have_unprocessed_account_dispatch_queue_ = true;
|
||||
td::uint64 defer_out_queue_size_limit_;
|
||||
td::uint64 hard_defer_out_queue_size_limit_;
|
||||
|
||||
bool msg_metadata_enabled_ = false;
|
||||
bool deferring_messages_enabled_ = false;
|
||||
|
|
|
@ -50,7 +50,6 @@ static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000;
|
|||
static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047;
|
||||
static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000;
|
||||
static const int HIGH_PRIORITY_EXTERNAL = 10; // don't skip high priority externals when queue is big
|
||||
static const int DEFER_MESSAGES_AFTER = 10; // 10'th and later messages from address will be deferred
|
||||
|
||||
#define DBG(__n) dbg(__n)&&
|
||||
#define DSTART int __dcnt = 0;
|
||||
|
@ -72,20 +71,22 @@ static inline bool dbg(int c) {
|
|||
* @param prev A vector of BlockIdExt representing the previous blocks.
|
||||
* @param validator_set A reference to the ValidatorSet.
|
||||
* @param collator_id The public key of the block creator.
|
||||
* @param collator_opts A reference to CollatorOptions.
|
||||
* @param manager The ActorId of the ValidatorManager.
|
||||
* @param timeout The timeout for the collator.
|
||||
* @param promise The promise to return the result.
|
||||
*/
|
||||
Collator::Collator(ShardIdFull shard, bool is_hardfork, UnixTime min_ts, BlockIdExt min_masterchain_block_id,
|
||||
std::vector<BlockIdExt> prev, td::Ref<ValidatorSet> validator_set, Ed25519_PublicKey collator_id,
|
||||
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
|
||||
td::Promise<BlockCandidate> promise)
|
||||
std::vector<BlockIdExt> prev, Ref<ValidatorSet> validator_set, Ed25519_PublicKey collator_id,
|
||||
Ref<CollatorOptions> collator_opts, td::actor::ActorId<ValidatorManager> manager,
|
||||
td::Timestamp timeout, td::Promise<BlockCandidate> promise)
|
||||
: shard_(shard)
|
||||
, is_hardfork_(is_hardfork)
|
||||
, min_ts(min_ts)
|
||||
, min_mc_block_id{min_masterchain_block_id}
|
||||
, prev_blocks(std::move(prev))
|
||||
, created_by_(collator_id)
|
||||
, collator_opts_(collator_opts)
|
||||
, validator_set_(std::move(validator_set))
|
||||
, manager(manager)
|
||||
, timeout(timeout)
|
||||
|
@ -1786,6 +1787,7 @@ bool Collator::try_collate() {
|
|||
last_proc_int_msg_.second.set_zero();
|
||||
first_unproc_int_msg_.first = ~0ULL;
|
||||
first_unproc_int_msg_.second.set_ones();
|
||||
old_out_msg_queue_size_ = out_msg_queue_size_;
|
||||
if (is_masterchain()) {
|
||||
LOG(DEBUG) << "getting the list of special smart contracts";
|
||||
auto res = config_->get_special_smartcontracts();
|
||||
|
@ -1970,6 +1972,10 @@ bool Collator::fetch_config_params() {
|
|||
return fatal_error(res.move_as_error());
|
||||
}
|
||||
compute_phase_cfg_.libraries = std::make_unique<vm::Dictionary>(config_->get_libraries_root(), 256);
|
||||
defer_out_queue_size_limit_ = std::max<td::uint64>(collator_opts_->defer_out_queue_size_limit,
|
||||
compute_phase_cfg_.size_limits.defer_out_queue_size_limit);
|
||||
// This one is checked in validate-query
|
||||
hard_defer_out_queue_size_limit_ = compute_phase_cfg_.size_limits.defer_out_queue_size_limit;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -3053,8 +3059,10 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
|
|||
bool is_special_account = is_masterchain() && config_->is_special_smartcontract(src_addr);
|
||||
bool defer = false;
|
||||
if (!from_dispatch_queue) {
|
||||
if (deferring_messages_enabled_ && !is_special && !is_special_account && msg.msg_idx != 0) {
|
||||
if (++sender_generated_messages_count_[src_addr] >= DEFER_MESSAGES_AFTER) {
|
||||
if (deferring_messages_enabled_ && collator_opts_->deferring_enabled && !is_special && !is_special_account &&
|
||||
msg.msg_idx != 0) {
|
||||
if (++sender_generated_messages_count_[src_addr] >= collator_opts_->defer_messages_after ||
|
||||
out_msg_queue_size_ > defer_out_queue_size_limit_) {
|
||||
defer = true;
|
||||
}
|
||||
}
|
||||
|
@ -3626,18 +3634,24 @@ int Collator::process_external_message(Ref<vm::Cell> msg) {
|
|||
* @returns True if the processing was successful, false otherwise.
|
||||
*/
|
||||
bool Collator::process_dispatch_queue() {
|
||||
if (out_msg_queue_size_ > defer_out_queue_size_limit_ && old_out_msg_queue_size_ > hard_defer_out_queue_size_limit_) {
|
||||
return true;
|
||||
}
|
||||
have_unprocessed_account_dispatch_queue_ = true;
|
||||
size_t max_total_count[3] = {1 << 30, 150, 150};
|
||||
size_t max_per_initiator[3] = {1 << 30, 20, 0};
|
||||
if (out_msg_queue_size_ <= 256) {
|
||||
size_t max_total_count[3] = {1 << 30, collator_opts_->dispatch_phase_2_max_total,
|
||||
collator_opts_->dispatch_phase_3_max_total};
|
||||
size_t max_per_initiator[3] = {1 << 30, collator_opts_->dispatch_phase_2_max_per_initiator, 0};
|
||||
if (collator_opts_->dispatch_phase_3_max_per_initiator) {
|
||||
max_per_initiator[2] = collator_opts_->dispatch_phase_3_max_per_initiator.value();
|
||||
} else if (out_msg_queue_size_ <= 256) {
|
||||
max_per_initiator[2] = 10;
|
||||
} else if (out_msg_queue_size_ <= 512) {
|
||||
max_per_initiator[2] = 2;
|
||||
} else if (out_msg_queue_size_ <= 2048) {
|
||||
} else if (out_msg_queue_size_ <= 1500) {
|
||||
max_per_initiator[2] = 1;
|
||||
}
|
||||
for (int iter = 0; iter < 3; ++iter) {
|
||||
if (max_per_initiator[iter] == 0) {
|
||||
if (max_per_initiator[iter] == 0 || max_total_count[iter] == 0) {
|
||||
continue;
|
||||
}
|
||||
vm::AugmentedDictionary cur_dispatch_queue{dispatch_queue_->get_root(), 256, block::tlb::aug_DispatchQueue};
|
||||
|
@ -3676,7 +3690,8 @@ bool Collator::process_dispatch_queue() {
|
|||
|
||||
// Remove message from DispatchQueue
|
||||
bool ok;
|
||||
if (iter == 0 || (iter == 1 && sender_generated_messages_count_[src_addr] >= DEFER_MESSAGES_AFTER)) {
|
||||
if (iter == 0 ||
|
||||
(iter == 1 && sender_generated_messages_count_[src_addr] >= collator_opts_->defer_messages_after)) {
|
||||
ok = cur_dispatch_queue.lookup_delete(src_addr).not_null();
|
||||
} else {
|
||||
dict.lookup_delete(key);
|
||||
|
|
|
@ -213,8 +213,8 @@ void run_validate_query(ShardIdFull shard, UnixTime min_ts, BlockIdExt min_maste
|
|||
|
||||
void run_collate_query(ShardIdFull shard, td::uint32 min_ts, const BlockIdExt& min_masterchain_block_id,
|
||||
std::vector<BlockIdExt> prev, Ed25519_PublicKey collator_id, td::Ref<ValidatorSet> validator_set,
|
||||
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
|
||||
td::Promise<BlockCandidate> promise) {
|
||||
td::Ref<CollatorOptions> collator_opts, td::actor::ActorId<ValidatorManager> manager,
|
||||
td::Timestamp timeout, td::Promise<BlockCandidate> promise) {
|
||||
BlockSeqno seqno = 0;
|
||||
for (auto& p : prev) {
|
||||
if (p.seqno() > seqno) {
|
||||
|
@ -223,7 +223,8 @@ void run_collate_query(ShardIdFull shard, td::uint32 min_ts, const BlockIdExt& m
|
|||
}
|
||||
td::actor::create_actor<Collator>(PSTRING() << "collate" << shard.to_str() << ":" << (seqno + 1), shard, false,
|
||||
min_ts, min_masterchain_block_id, std::move(prev), std::move(validator_set),
|
||||
collator_id, std::move(manager), timeout, std::move(promise))
|
||||
collator_id, std::move(collator_opts), std::move(manager), timeout,
|
||||
std::move(promise))
|
||||
.release();
|
||||
}
|
||||
|
||||
|
@ -238,7 +239,8 @@ void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_b
|
|||
}
|
||||
td::actor::create_actor<Collator>(PSTRING() << "collate" << shard.to_str() << ":" << (seqno + 1), shard, true, 0,
|
||||
min_masterchain_block_id, std::move(prev), td::Ref<ValidatorSet>{},
|
||||
Ed25519_PublicKey{Bits256::zero()}, std::move(manager), timeout, std::move(promise))
|
||||
Ed25519_PublicKey{Bits256::zero()}, td::Ref<CollatorOptions>{true},
|
||||
std::move(manager), timeout, std::move(promise))
|
||||
.release();
|
||||
}
|
||||
|
||||
|
|
|
@ -3405,7 +3405,7 @@ bool ValidateQuery::check_account_dispatch_queue_update(td::Bits256 addr, Ref<vm
|
|||
}
|
||||
}
|
||||
if (old_dict_size > 0 && max_removed_lt == 0) {
|
||||
have_unprocessed_account_dispatch_queue_ = true;
|
||||
++processed_account_dispatch_queues_;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -3432,6 +3432,27 @@ bool ValidateQuery::unpack_dispatch_queue_update() {
|
|||
if (!res) {
|
||||
return reject_query("invalid DispatchQueue dictionary in the new state");
|
||||
}
|
||||
|
||||
if (old_out_msg_queue_size_ <= compute_phase_cfg_.size_limits.defer_out_queue_size_limit) {
|
||||
// Check that at least one message was taken from each AccountDispatchQueue
|
||||
try {
|
||||
have_unprocessed_account_dispatch_queue_ = false;
|
||||
td::uint64 total_account_dispatch_queues = 0;
|
||||
ps_.dispatch_queue_->check_for_each([&](Ref<vm::CellSlice>, td::ConstBitPtr, int n) -> bool {
|
||||
assert(n == 352);
|
||||
++total_account_dispatch_queues;
|
||||
if (total_account_dispatch_queues > processed_account_dispatch_queues_) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
have_unprocessed_account_dispatch_queue_ =
|
||||
(total_account_dispatch_queues != processed_account_dispatch_queues_);
|
||||
} catch (vm::VmVirtError&) {
|
||||
// VmVirtError can happen if we have only a proof of ShardState
|
||||
have_unprocessed_account_dispatch_queue_ = true;
|
||||
}
|
||||
}
|
||||
} catch (vm::VmError& err) {
|
||||
return reject_query("invalid DispatchQueue dictionary difference between the old and the new state: "s +
|
||||
err.get_msg());
|
||||
|
@ -3694,7 +3715,8 @@ bool ValidateQuery::check_in_msg(td::ConstBitPtr key, Ref<vm::CellSlice> in_msg)
|
|||
}
|
||||
if (have_unprocessed_account_dispatch_queue_ && tag != block::gen::InMsg::msg_import_ext &&
|
||||
tag != block::gen::InMsg::msg_import_deferred_tr && tag != block::gen::InMsg::msg_import_deferred_fin) {
|
||||
// Collator is requeired to take at least one message from each AccountDispatchQueue (unless the block is full)
|
||||
// Collator is requeired to take at least one message from each AccountDispatchQueue
|
||||
// (unless the block is full or unless out_msg_queue_size is big)
|
||||
// If some AccountDispatchQueue is unporcessed then it's not allowed to import other messages except for externals
|
||||
return reject_query("required DispatchQueue processing is not done, but some other internal messages are imported");
|
||||
}
|
||||
|
|
|
@ -241,6 +241,7 @@ class ValidateQuery : public td::actor::Actor {
|
|||
bool deferring_messages_enabled_ = false;
|
||||
bool store_out_msg_queue_size_ = false;
|
||||
|
||||
td::uint64 processed_account_dispatch_queues_ = 0;
|
||||
bool have_unprocessed_account_dispatch_queue_ = false;
|
||||
|
||||
td::PerfWarningTimer perf_timer_;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue