mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Fix estimating block size, repeat collation on error (#1178)
* Fix extimating block size, repeat collation on error * Cancel collation when it is non needed
This commit is contained in:
parent
fc5e71fc15
commit
257cd8cd9c
8 changed files with 150 additions and 44 deletions
|
@ -45,11 +45,13 @@ using td::Ref;
|
|||
using namespace std::literals::string_literals;
|
||||
|
||||
// Don't increase MERGE_MAX_QUEUE_LIMIT too much: merging requires cleaning the whole queue in out_msg_queue_cleanup
|
||||
static const td::uint32 FORCE_SPLIT_QUEUE_SIZE = 4096;
|
||||
static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000;
|
||||
static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047;
|
||||
static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000;
|
||||
static const int HIGH_PRIORITY_EXTERNAL = 10; // don't skip high priority externals when queue is big
|
||||
static constexpr td::uint32 FORCE_SPLIT_QUEUE_SIZE = 4096;
|
||||
static constexpr td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000;
|
||||
static constexpr td::uint32 MERGE_MAX_QUEUE_SIZE = 2047;
|
||||
static constexpr td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000;
|
||||
static constexpr int HIGH_PRIORITY_EXTERNAL = 10; // don't skip high priority externals when queue is big
|
||||
|
||||
static constexpr int MAX_ATTEMPTS = 5;
|
||||
|
||||
#define DBG(__n) dbg(__n)&&
|
||||
#define DSTART int __dcnt = 0;
|
||||
|
@ -74,11 +76,15 @@ static inline bool dbg(int c) {
|
|||
* @param manager The ActorId of the ValidatorManager.
|
||||
* @param timeout The timeout for the collator.
|
||||
* @param promise The promise to return the result.
|
||||
* @param cancellation_token Token to cancel collation.
|
||||
* @param mode +1 - skip storing candidate to disk.
|
||||
* @param attempt_idx The index of the attempt, starting from 0. On later attempts collator decreases block limits and skips some steps.
|
||||
*/
|
||||
Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id,
|
||||
std::vector<BlockIdExt> prev, td::Ref<ValidatorSet> validator_set, Ed25519_PublicKey collator_id,
|
||||
Ref<CollatorOptions> collator_opts, td::actor::ActorId<ValidatorManager> manager,
|
||||
td::Timestamp timeout, td::Promise<BlockCandidate> promise)
|
||||
td::Timestamp timeout, td::Promise<BlockCandidate> promise, td::CancellationToken cancellation_token,
|
||||
unsigned mode, int attempt_idx)
|
||||
: shard_(shard)
|
||||
, is_hardfork_(is_hardfork)
|
||||
, min_mc_block_id{min_masterchain_block_id}
|
||||
|
@ -93,9 +99,13 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha
|
|||
, soft_timeout_(td::Timestamp::at(timeout.at() - 3.0))
|
||||
, medium_timeout_(td::Timestamp::at(timeout.at() - 1.5))
|
||||
, main_promise(std::move(promise))
|
||||
, perf_timer_("collate", 0.1, [manager](double duration) {
|
||||
send_closure(manager, &ValidatorManager::add_perf_timer_stat, "collate", duration);
|
||||
}) {
|
||||
, mode_(mode)
|
||||
, attempt_idx_(attempt_idx)
|
||||
, perf_timer_("collate", 0.1,
|
||||
[manager](double duration) {
|
||||
send_closure(manager, &ValidatorManager::add_perf_timer_stat, "collate", duration);
|
||||
})
|
||||
, cancellation_token_(std::move(cancellation_token)) {
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -107,7 +117,11 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha
|
|||
* The results of these queries are handled by corresponding callback functions.
|
||||
*/
|
||||
void Collator::start_up() {
|
||||
LOG(WARNING) << "Collator for shard " << shard_.to_str() << " started";
|
||||
LOG(WARNING) << "Collator for shard " << shard_.to_str() << " started"
|
||||
<< (attempt_idx_ ? PSTRING() << " (attempt #" << attempt_idx_ << ")" : "");
|
||||
if (!check_cancelled()) {
|
||||
return;
|
||||
}
|
||||
LOG(DEBUG) << "Previous block #1 is " << prev_blocks.at(0).to_str();
|
||||
if (prev_blocks.size() > 1) {
|
||||
LOG(DEBUG) << "Previous block #2 is " << prev_blocks.at(1).to_str();
|
||||
|
@ -340,7 +354,15 @@ bool Collator::fatal_error(td::Status error) {
|
|||
error.ensure_error();
|
||||
LOG(ERROR) << "cannot generate block candidate for " << show_shard(shard_) << " : " << error.to_string();
|
||||
if (busy_) {
|
||||
main_promise(std::move(error));
|
||||
if (allow_repeat_collation_ && error.code() != ErrorCode::cancelled && attempt_idx_ + 1 < MAX_ATTEMPTS &&
|
||||
!is_hardfork_ && !timeout.is_in_past()) {
|
||||
LOG(WARNING) << "Repeating collation (attempt #" << attempt_idx_ + 1 << ")";
|
||||
run_collate_query(shard_, min_mc_block_id, prev_blocks, created_by_, validator_set_, collator_opts_, manager,
|
||||
td::Timestamp::in(10.0), std::move(main_promise), std::move(cancellation_token_), mode_,
|
||||
attempt_idx_ + 1);
|
||||
} else {
|
||||
main_promise(std::move(error));
|
||||
}
|
||||
busy_ = false;
|
||||
}
|
||||
stop();
|
||||
|
@ -382,6 +404,9 @@ bool Collator::fatal_error(std::string err_msg, int err_code) {
|
|||
*/
|
||||
void Collator::check_pending() {
|
||||
// LOG(DEBUG) << "pending = " << pending;
|
||||
if (!check_cancelled()) {
|
||||
return;
|
||||
}
|
||||
if (!pending) {
|
||||
step = 2;
|
||||
try {
|
||||
|
@ -712,6 +737,15 @@ bool Collator::unpack_last_mc_state() {
|
|||
return fatal_error(limits.move_as_error());
|
||||
}
|
||||
block_limits_ = limits.move_as_ok();
|
||||
if (attempt_idx_ == 3) {
|
||||
LOG(INFO) << "Attempt #3: bytes, gas limits /= 2";
|
||||
block_limits_->bytes.multiply_by(0.5);
|
||||
block_limits_->gas.multiply_by(0.5);
|
||||
} else if (attempt_idx_ == 4) {
|
||||
LOG(INFO) << "Attempt #4: bytes, gas limits /= 4";
|
||||
block_limits_->bytes.multiply_by(0.25);
|
||||
block_limits_->gas.multiply_by(0.25);
|
||||
}
|
||||
LOG(DEBUG) << "block limits: bytes [" << block_limits_->bytes.underload() << ", " << block_limits_->bytes.soft()
|
||||
<< ", " << block_limits_->bytes.hard() << "]";
|
||||
LOG(DEBUG) << "block limits: gas [" << block_limits_->gas.underload() << ", " << block_limits_->gas.soft() << ", "
|
||||
|
@ -2093,6 +2127,7 @@ bool Collator::do_collate() {
|
|||
if (max_lt == start_lt) {
|
||||
++max_lt;
|
||||
}
|
||||
allow_repeat_collation_ = true;
|
||||
// NB: interchanged 1.2 and 1.1 (is this always correct?)
|
||||
// 1.1. re-adjust neighbors' out_msg_queues (for oneself)
|
||||
if (!add_trivial_neighbor()) {
|
||||
|
@ -2333,6 +2368,9 @@ bool Collator::out_msg_queue_cleanup() {
|
|||
LOG(WARNING) << "cleaning up outbound queue takes too long, ending";
|
||||
break;
|
||||
}
|
||||
if (!check_cancelled()) {
|
||||
return false;
|
||||
}
|
||||
if (i == queue_parts.size()) {
|
||||
i = 0;
|
||||
}
|
||||
|
@ -3532,6 +3570,9 @@ bool Collator::process_inbound_internal_messages() {
|
|||
stats_.limits_log += PSTRING() << "INBOUND_INT_MESSAGES: timeout\n";
|
||||
break;
|
||||
}
|
||||
if (!check_cancelled()) {
|
||||
return false;
|
||||
}
|
||||
auto kv = nb_out_msgs_->extract_cur();
|
||||
CHECK(kv && kv->msg.not_null());
|
||||
LOG(DEBUG) << "processing inbound message with (lt,hash)=(" << kv->lt << "," << kv->key.to_hex()
|
||||
|
@ -3565,6 +3606,10 @@ bool Collator::process_inbound_external_messages() {
|
|||
LOG(INFO) << "skipping processing of inbound external messages";
|
||||
return true;
|
||||
}
|
||||
if (attempt_idx_ >= 2) {
|
||||
LOG(INFO) << "Attempt #" << attempt_idx_ << ": skip external messages";
|
||||
return true;
|
||||
}
|
||||
if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE) {
|
||||
LOG(INFO) << "skipping processing of inbound external messages (except for high-priority) because out_msg_queue is "
|
||||
"too big ("
|
||||
|
@ -3586,6 +3631,9 @@ bool Collator::process_inbound_external_messages() {
|
|||
stats_.limits_log += PSTRING() << "INBOUND_EXT_MESSAGES: timeout\n";
|
||||
break;
|
||||
}
|
||||
if (!check_cancelled()) {
|
||||
return false;
|
||||
}
|
||||
auto ext_msg = ext_msg_struct.cell;
|
||||
ton::Bits256 hash{ext_msg->get_hash().bits()};
|
||||
int r = process_external_message(std::move(ext_msg));
|
||||
|
@ -3692,6 +3740,10 @@ bool Collator::process_dispatch_queue() {
|
|||
if (max_per_initiator[iter] == 0 || max_total_count[iter] == 0) {
|
||||
continue;
|
||||
}
|
||||
if (iter > 0 && attempt_idx_ >= 1) {
|
||||
LOG(INFO) << "Attempt #" << attempt_idx_ << ": skip process_dispatch_queue";
|
||||
break;
|
||||
}
|
||||
vm::AugmentedDictionary cur_dispatch_queue{dispatch_queue_->get_root(), 256, block::tlb::aug_DispatchQueue};
|
||||
std::map<std::tuple<WorkchainId, StdSmcAddress, LogicalTime>, size_t> count_per_initiator;
|
||||
size_t total_count = 0;
|
||||
|
@ -3704,13 +3756,13 @@ bool Collator::process_dispatch_queue() {
|
|||
stats_.limits_log += PSTRING() << "DISPATCH_QUEUE_STAGE_" << iter << ": "
|
||||
<< block_full_comment(*block_limit_status_, block::ParamLimits::cl_normal)
|
||||
<< "\n";
|
||||
return true;
|
||||
return register_dispatch_queue_op(true);
|
||||
}
|
||||
if (soft_timeout_.is_in_past(td::Timestamp::now())) {
|
||||
block_full_ = true;
|
||||
LOG(WARNING) << "soft timeout reached, stop processing dispatch queue";
|
||||
stats_.limits_log += PSTRING() << "DISPATCH_QUEUE_STAGE_" << iter << ": timeout\n";
|
||||
return true;
|
||||
return register_dispatch_queue_op(true);
|
||||
}
|
||||
StdSmcAddress src_addr;
|
||||
td::Ref<vm::CellSlice> account_dispatch_queue;
|
||||
|
@ -3788,6 +3840,7 @@ bool Collator::process_dispatch_queue() {
|
|||
if (iter == 0) {
|
||||
have_unprocessed_account_dispatch_queue_ = false;
|
||||
}
|
||||
register_dispatch_queue_op(true);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -3811,12 +3864,7 @@ bool Collator::process_deferred_message(Ref<vm::CellSlice> enq_msg, StdSmcAddres
|
|||
return fatal_error(PSTRING() << "failed to delete message from DispatchQueue: address=" << src_addr.to_hex()
|
||||
<< ", lt=" << lt);
|
||||
}
|
||||
++dispatch_queue_ops_;
|
||||
if (!(dispatch_queue_ops_ & 63)) {
|
||||
if (!block_limit_status_->add_proof(dispatch_queue_->get_root_cell())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
register_dispatch_queue_op();
|
||||
++sender_generated_messages_count_[src_addr];
|
||||
|
||||
LogicalTime enqueued_lt = 0;
|
||||
|
@ -3909,6 +3957,7 @@ bool Collator::process_deferred_message(Ref<vm::CellSlice> enq_msg, StdSmcAddres
|
|||
++unprocessed_deferred_messages_[src_addr];
|
||||
LOG(INFO) << "delivering deferred message from account " << src_addr.to_hex() << ", lt=" << lt
|
||||
<< ", emitted_lt=" << emitted_lt;
|
||||
block_limit_status_->add_cell(msg_env);
|
||||
register_new_msg(std::move(new_msg));
|
||||
msg_metadata = std::move(env.metadata);
|
||||
return true;
|
||||
|
@ -4088,11 +4137,7 @@ bool Collator::enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_rema
|
|||
}
|
||||
++dispatch_dict_size;
|
||||
dispatch_queue_->set(src_addr, block::pack_account_dispatch_queue(dispatch_dict, dispatch_dict_size));
|
||||
++dispatch_queue_ops_;
|
||||
if (!(dispatch_queue_ops_ & 63)) {
|
||||
return block_limit_status_->add_proof(dispatch_queue_->get_root_cell());
|
||||
}
|
||||
return true;
|
||||
return register_dispatch_queue_op();
|
||||
}
|
||||
|
||||
auto next_hop = block::interpolate_addr(src_prefix, dest_prefix, route_info.second);
|
||||
|
@ -4134,6 +4179,9 @@ bool Collator::process_new_messages(bool enqueue_only) {
|
|||
stats_.limits_log += PSTRING() << "NEW_MESSAGES: "
|
||||
<< block_full_comment(*block_limit_status_, block::ParamLimits::cl_normal) << "\n";
|
||||
}
|
||||
if (!check_cancelled()) {
|
||||
return false;
|
||||
}
|
||||
LOG(DEBUG) << "have message with lt=" << msg.lt;
|
||||
int res = process_one_new_message(std::move(msg), enqueue_only);
|
||||
if (res < 0) {
|
||||
|
@ -4973,6 +5021,23 @@ bool Collator::register_out_msg_queue_op(bool force) {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a dispatch queue message queue operation.
|
||||
* Adds the proof to the block limit status every 64 operations.
|
||||
*
|
||||
* @param force If true, the proof will always be added to the block limit status.
|
||||
*
|
||||
* @returns True if the operation was successfully registered, false otherwise.
|
||||
*/
|
||||
bool Collator::register_dispatch_queue_op(bool force) {
|
||||
++dispatch_queue_ops_;
|
||||
if (force || !(dispatch_queue_ops_ & 63)) {
|
||||
return block_limit_status_->add_proof(dispatch_queue_->get_root_cell());
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new shard state and the Merkle update.
|
||||
*
|
||||
|
@ -5098,9 +5163,10 @@ bool Collator::compute_out_msg_queue_info(Ref<vm::Cell>& out_msg_queue_info) {
|
|||
vm::CellSlice maybe_extra = cb.as_cellslice();
|
||||
cb.reset();
|
||||
|
||||
return register_out_msg_queue_op(true) && out_msg_queue_->append_dict_to_bool(cb) // _ out_queue:OutMsgQueue
|
||||
&& processed_upto_->pack(cb) // proc_info:ProcessedInfo
|
||||
&& cb.append_cellslice_bool(maybe_extra) // extra:(Maybe OutMsgQueueExtra)
|
||||
return register_out_msg_queue_op(true) && register_dispatch_queue_op(true) &&
|
||||
out_msg_queue_->append_dict_to_bool(cb) // _ out_queue:OutMsgQueue
|
||||
&& processed_upto_->pack(cb) // proc_info:ProcessedInfo
|
||||
&& cb.append_cellslice_bool(maybe_extra) // extra:(Maybe OutMsgQueueExtra)
|
||||
&& cb.finalize_to(out_msg_queue_info);
|
||||
}
|
||||
|
||||
|
@ -5492,14 +5558,18 @@ bool Collator::create_block_candidate() {
|
|||
<< consensus_config.max_collated_data_size << ")");
|
||||
}
|
||||
// 4. save block candidate
|
||||
LOG(INFO) << "saving new BlockCandidate";
|
||||
td::actor::send_closure_later(
|
||||
manager, &ValidatorManager::set_block_candidate, block_candidate->id, block_candidate->clone(),
|
||||
validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(),
|
||||
[self = get_self()](td::Result<td::Unit> saved) -> void {
|
||||
LOG(DEBUG) << "got answer to set_block_candidate";
|
||||
td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, std::move(saved));
|
||||
});
|
||||
if (mode_ & CollateMode::skip_store_candidate) {
|
||||
td::actor::send_closure_later(actor_id(this), &Collator::return_block_candidate, td::Unit());
|
||||
} else {
|
||||
LOG(INFO) << "saving new BlockCandidate";
|
||||
td::actor::send_closure_later(
|
||||
manager, &ValidatorManager::set_block_candidate, block_candidate->id, block_candidate->clone(),
|
||||
validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(),
|
||||
[self = get_self()](td::Result<td::Unit> saved) -> void {
|
||||
LOG(DEBUG) << "got answer to set_block_candidate";
|
||||
td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, std::move(saved));
|
||||
});
|
||||
}
|
||||
// 5. communicate about bad and delayed external messages
|
||||
if (!bad_ext_msgs_.empty() || !delay_ext_msgs_.empty()) {
|
||||
LOG(INFO) << "sending complete_external_messages() to Manager";
|
||||
|
@ -5643,6 +5713,18 @@ void Collator::after_get_external_messages(td::Result<std::vector<std::pair<Ref<
|
|||
check_pending();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if collation was cancelled via cancellation token
|
||||
*
|
||||
* @returns false if the collation was cancelled, true otherwise
|
||||
*/
|
||||
bool Collator::check_cancelled() {
|
||||
if (cancellation_token_) {
|
||||
return fatal_error(td::Status::Error(ErrorCode::cancelled, "cancelled"));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
td::uint32 Collator::get_skip_externals_queue_size() {
|
||||
return SKIP_EXTERNALS_QUEUE_SIZE;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue