1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Improve large OutMsgQueue clearance (#822)

* Improve Collator::opt_msg_queue_cleanup, increase collator timeout

* Disable importing ext msgs if queue is too big

* Extend timeout in collator if previous block is too old

---------

Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
EmelyanenkoK 2023-12-06 19:34:01 +03:00 committed by GitHub
parent 51baec48a0
commit 7fcf267717
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 105 additions and 25 deletions

View file

@ -703,6 +703,11 @@ bool Collator::unpack_last_mc_state() {
return fatal_error(limits.move_as_error());
}
block_limits_ = limits.move_as_ok();
if (!is_masterchain()) {
// block_limits_->bytes = {131072 / 3, 524288 / 3, 1048576 / 3};
// block_limits_->gas = {2000000 / 3, 10000000 / 3, 20000000 / 3};
block_limits_->lt_delta = {20, 180, 200};
}
LOG(DEBUG) << "block limits: bytes [" << block_limits_->bytes.underload() << ", " << block_limits_->bytes.soft()
<< ", " << block_limits_->bytes.hard() << "]";
LOG(DEBUG) << "block limits: gas [" << block_limits_->gas.underload() << ", " << block_limits_->gas.soft() << ", "
@ -1818,6 +1823,17 @@ bool Collator::init_utime() {
CHECK(config_);
// consider unixtime and lt from previous block(s) of the same shardchain
prev_now_ = prev_state_utime_;
// Extend collator timeout if previous block is too old
td::Timestamp new_timeout = td::Timestamp::in(std::min(30.0, (td::Clocks::system() - (double)prev_now_) / 2));
if (timeout < new_timeout) {
double add = new_timeout.at() - timeout.at();
timeout = new_timeout;
queue_cleanup_timeout_ += add;
soft_timeout_ += add;
medium_timeout_ += add;
alarm_timestamp() = timeout;
}
auto prev = std::max<td::uint32>(config_->utime, prev_now_);
now_ = std::max<td::uint32>(prev + 1, (unsigned)std::time(nullptr));
if (now_ > now_upper_limit_) {
@ -2170,18 +2186,30 @@ bool Collator::out_msg_queue_cleanup() {
}
}
auto res = out_msg_queue_->filter([&](vm::CellSlice& cs, td::ConstBitPtr key, int n) -> int {
auto queue_root = out_msg_queue_->get_root_cell();
if (queue_root.is_null()) {
LOG(DEBUG) << "out_msg_queue is empty";
return true;
}
auto old_out_msg_queue = std::make_unique<vm::AugmentedDictionary>(queue_root, 352, block::tlb::aug_OutMsgQueue);
int deleted = 0;
int total = 0;
bool fail = false;
old_out_msg_queue->check_for_each([&](Ref<vm::CellSlice> value, td::ConstBitPtr key, int n) -> bool {
++total;
assert(n == 352);
vm::CellSlice& cs = value.write();
// LOG(DEBUG) << "key is " << key.to_hex(n);
if (queue_cleanup_timeout_.is_in_past(td::Timestamp::now())) {
LOG(WARNING) << "cleaning up outbound queue takes too long, ending";
outq_cleanup_partial_ = true;
return (1 << 30) + 1; // retain all remaining outbound queue entries including this one without processing
return false; // retain all remaining outbound queue entries including this one without processing
}
if (block_full_) {
LOG(WARNING) << "BLOCK FULL while cleaning up outbound queue, cleanup completed only partially";
outq_cleanup_partial_ = true;
return (1 << 30) + 1; // retain all remaining outbound queue entries including this one without processing
return false; // retain all remaining outbound queue entries including this one without processing
}
block::EnqueuedMsgDescr enq_msg_descr;
unsigned long long created_lt;
@ -2190,7 +2218,8 @@ bool Collator::out_msg_queue_cleanup() {
&& enq_msg_descr.check_key(key) // check key
&& enq_msg_descr.lt_ == created_lt)) {
LOG(ERROR) << "cannot unpack EnqueuedMsg with key " << key.to_hex(n);
return -1;
fail = true;
return false;
}
LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
<< enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_;
@ -2208,22 +2237,29 @@ bool Collator::out_msg_queue_cleanup() {
if (delivered) {
LOG(DEBUG) << "outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," << enq_msg_descr.hash_.to_hex()
<< ") enqueued_lt=" << enq_msg_descr.enqueued_lt_ << " has been already delivered, dequeueing";
++deleted;
out_msg_queue_->lookup_delete_with_extra(key, n);
if (!dequeue_message(std::move(enq_msg_descr.msg_env_), deliver_lt)) {
fatal_error(PSTRING() << "cannot dequeue outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
<< enq_msg_descr.hash_.to_hex() << ") by inserting a msg_export_deq record");
return -1;
fail = true;
return false;
}
register_out_msg_queue_op();
if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) {
block_full_ = true;
}
}
return !delivered;
});
LOG(DEBUG) << "deleted " << res << " messages from out_msg_queue";
if (res < 0) {
return true;
}, false, true /* random order */);
LOG(INFO) << "deleted " << deleted << " messages from out_msg_queue, processed " << total << " messages in total";
if (fail) {
return fatal_error("error scanning/updating OutMsgQueue");
}
if (outq_cleanup_partial_ || total > 8000) {
LOG(INFO) << "out_msg_queue too big, skipping importing external messages";
skip_extmsg_ = true;
}
auto rt = out_msg_queue_->get_root();
if (verbosity >= 2) {
std::cerr << "new out_msg_queue is ";
@ -3277,6 +3313,10 @@ bool Collator::process_inbound_external_messages() {
LOG(INFO) << "skipping processing of inbound external messages";
return true;
}
if (out_msg_queue_->get_root_cell().not_null() && out_msg_queue_->get_root_cell()->get_depth() > 12) {
LOG(INFO) << "skipping processing of inbound external messages: out msg queue is too big";
return true;
}
bool full = !block_limit_status_->fits(block::ParamLimits::cl_soft);
for (auto& ext_msg_pair : ext_msg_list_) {
if (full) {