mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Bugfix in processing message queues; improve out_msg_queue_cleanup
This commit is contained in:
parent
5dd0c15d07
commit
f10c7f54a8
5 changed files with 68 additions and 17 deletions
|
@ -229,7 +229,7 @@ bool OutputQueueMerger::load() {
|
|||
}
|
||||
}
|
||||
msg_list.resize(j);
|
||||
return true;
|
||||
return msg_list.size() > orig_size;
|
||||
}
|
||||
|
||||
} // namespace block
|
||||
|
|
|
@ -60,6 +60,7 @@ struct OutputQueueMerger {
|
|||
td::Ref<vm::Cell> outmsg_root_;
|
||||
bool disabled_;
|
||||
td::int32 msg_limit_; // -1 - unlimited
|
||||
Neighbor() = default;
|
||||
Neighbor(ton::BlockIdExt block_id, td::Ref<vm::Cell> outmsg_root, bool disabled = false, td::int32 msg_limit = -1)
|
||||
: block_id_(block_id), outmsg_root_(std::move(outmsg_root)), disabled_(disabled), msg_limit_(msg_limit) {
|
||||
}
|
||||
|
|
|
@ -133,8 +133,8 @@ class Collator final : public td::actor::Actor {
|
|||
std::unique_ptr<block::ConfigInfo> config_;
|
||||
std::unique_ptr<block::ShardConfig> shard_conf_;
|
||||
std::map<BlockSeqno, Ref<MasterchainStateQ>> aux_mc_states_;
|
||||
std::vector<block::OutputQueueMerger::Neighbor> neighbor_queues_;
|
||||
vm::Dictionary neighbor_msg_queues_limits_{32 + 64};
|
||||
std::map<ShardIdFull, td::int32> neighbor_msg_queues_limits_;
|
||||
vm::Dictionary neighbor_msg_queues_limits_dict_{32 + 64};
|
||||
std::vector<block::McShardDescr> neighbors_;
|
||||
std::unique_ptr<block::OutputQueueMerger> nb_out_msgs_;
|
||||
std::vector<ton::StdSmcAddress> special_smcs;
|
||||
|
@ -241,6 +241,7 @@ class Collator final : public td::actor::Actor {
|
|||
bool register_shard_block_creators(std::vector<td::Bits256> creator_list);
|
||||
bool init_block_limits();
|
||||
bool compute_minted_amount(block::CurrencyCollection& to_mint);
|
||||
bool create_output_queue_merger();
|
||||
bool init_value_create();
|
||||
bool try_collate();
|
||||
bool do_preinit();
|
||||
|
|
|
@ -672,9 +672,9 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Result<Ref<OutMsgQueueProo
|
|||
fatal_error(outq_descr_res.move_as_error());
|
||||
return;
|
||||
}
|
||||
LOG(DEBUG) << "obtained outbound queue for neighbor #" << i;
|
||||
Ref<MessageQueue> outq_descr = outq_descr_res.move_as_ok();
|
||||
block::McShardDescr& descr = neighbors_.at(i);
|
||||
LOG(DEBUG) << "obtained outbound queue for neighbor #" << i << "(" << descr.shard().to_str() << ")";
|
||||
if (outq_descr->get_block_id() != descr.blk_) {
|
||||
LOG(DEBUG) << "outq_descr->id = " << outq_descr->get_block_id().to_str() << " ; descr.id = " << descr.blk_.to_str();
|
||||
fatal_error(
|
||||
|
@ -692,12 +692,13 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Result<Ref<OutMsgQueueProo
|
|||
}
|
||||
auto queue_root = qinfo.out_queue->prefetch_ref(0);
|
||||
descr.set_queue_root(queue_root);
|
||||
neighbor_queues_.emplace_back(descr.top_block_id(), queue_root, descr.is_disabled(), res->msg_count_);
|
||||
if (res->msg_count_ != -1) {
|
||||
LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << res->msg_count_;
|
||||
td::BitArray<96> key;
|
||||
key.bits().store_int(block_id.id.workchain, 32);
|
||||
(key.bits() + 32).store_uint(block_id.id.shard, 64);
|
||||
neighbor_msg_queues_limits_.set_builder(key, vm::CellBuilder().store_long(res->msg_count_, 32));
|
||||
neighbor_msg_queues_limits_dict_.set_builder(key, vm::CellBuilder().store_long(res->msg_count_, 32));
|
||||
neighbor_msg_queues_limits_[block_id.shard_full()] = res->msg_count_;
|
||||
}
|
||||
// comment the next two lines in the future when the output queues become huge
|
||||
// CHECK(block::gen::t_OutMsgQueueInfo.validate_ref(1000000, outq_descr->root_cell()));
|
||||
|
@ -1674,6 +1675,17 @@ bool Collator::compute_minted_amount(block::CurrencyCollection& to_mint) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Collator::create_output_queue_merger() {
|
||||
std::vector<block::OutputQueueMerger::Neighbor> neighbor_queues;
|
||||
for (const auto& descr : neighbors_) {
|
||||
auto it = neighbor_msg_queues_limits_.find(descr.shard());
|
||||
td::int32 msg_limit = it == neighbor_msg_queues_limits_.end() ? -1 : it->second;
|
||||
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_, msg_limit);
|
||||
}
|
||||
nb_out_msgs_ = std::make_unique<block::OutputQueueMerger>(shard_, neighbor_queues);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Collator::init_value_create() {
|
||||
value_flow_.created.set_zero();
|
||||
value_flow_.minted.set_zero();
|
||||
|
@ -1731,7 +1743,9 @@ bool Collator::do_collate() {
|
|||
// 1.3. create OutputQueueMerger from adjusted neighbors
|
||||
CHECK(!nb_out_msgs_);
|
||||
LOG(DEBUG) << "creating OutputQueueMerger";
|
||||
nb_out_msgs_ = std::make_unique<block::OutputQueueMerger>(shard_, neighbor_queues_);
|
||||
if (!create_output_queue_merger()) {
|
||||
return fatal_error("cannot compute the value to be created / minted / recovered");
|
||||
}
|
||||
// 1.4. compute created / minted / recovered
|
||||
if (!init_value_create()) {
|
||||
return fatal_error("cannot compute the value to be created / minted / recovered");
|
||||
|
@ -1861,18 +1875,34 @@ bool Collator::out_msg_queue_cleanup() {
|
|||
}
|
||||
}
|
||||
|
||||
auto res = out_msg_queue_->filter([&](vm::CellSlice& cs, td::ConstBitPtr key, int n) -> int {
|
||||
auto queue_root = out_msg_queue_->get_root_cell();
|
||||
if (queue_root.is_null()) {
|
||||
LOG(DEBUG) << "out_msg_queue is empty";
|
||||
return true;
|
||||
}
|
||||
// Unwrap UsageCell: don't build proof for visiting output queue (unless something is deleted)
|
||||
auto r_cell = queue_root->load_cell();
|
||||
if (r_cell.is_error()) {
|
||||
return fatal_error(r_cell.move_as_error());
|
||||
}
|
||||
auto pure_out_msg_queue =
|
||||
std::make_unique<vm::AugmentedDictionary>(r_cell.move_as_ok().data_cell, 352, block::tlb::aug_OutMsgQueue);
|
||||
|
||||
int deleted = 0;
|
||||
bool fail = false;
|
||||
pure_out_msg_queue->check_for_each([&](Ref<vm::CellSlice> value, td::ConstBitPtr key, int n) -> bool {
|
||||
assert(n == 352);
|
||||
vm::CellSlice& cs = value.write();
|
||||
// LOG(DEBUG) << "key is " << key.to_hex(n);
|
||||
if (queue_cleanup_timeout_.is_in_past(td::Timestamp::now())) {
|
||||
LOG(WARNING) << "cleaning up outbound queue takes too long, ending";
|
||||
outq_cleanup_partial_ = true;
|
||||
return (1 << 30) + 1; // retain all remaining outbound queue entries including this one without processing
|
||||
return false; // retain all remaining outbound queue entries including this one without processing
|
||||
}
|
||||
if (block_full_) {
|
||||
LOG(WARNING) << "BLOCK FULL while cleaning up outbound queue, cleanup completed only partially";
|
||||
outq_cleanup_partial_ = true;
|
||||
return (1 << 30) + 1; // retain all remaining outbound queue entries including this one without processing
|
||||
return false; // retain all remaining outbound queue entries including this one without processing
|
||||
}
|
||||
block::EnqueuedMsgDescr enq_msg_descr;
|
||||
unsigned long long created_lt;
|
||||
|
@ -1881,7 +1911,8 @@ bool Collator::out_msg_queue_cleanup() {
|
|||
&& enq_msg_descr.check_key(key) // check key
|
||||
&& enq_msg_descr.lt_ == created_lt)) {
|
||||
LOG(ERROR) << "cannot unpack EnqueuedMsg with key " << key.to_hex(n);
|
||||
return -1;
|
||||
fail = true;
|
||||
return false;
|
||||
}
|
||||
LOG(DEBUG) << "scanning outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
|
||||
<< enq_msg_descr.hash_.to_hex() << ") enqueued_lt=" << enq_msg_descr.enqueued_lt_;
|
||||
|
@ -1899,20 +1930,30 @@ bool Collator::out_msg_queue_cleanup() {
|
|||
if (delivered) {
|
||||
LOG(DEBUG) << "outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << "," << enq_msg_descr.hash_.to_hex()
|
||||
<< ") enqueued_lt=" << enq_msg_descr.enqueued_lt_ << " has been already delivered, dequeueing";
|
||||
// Get value from out_msg_queue_ instead of pure_out_msg_queue (for proof)
|
||||
auto value2 = out_msg_queue_->lookup_delete_with_extra(key, n);
|
||||
CHECK(value2.not_null());
|
||||
vm::CellSlice& cs2 = value2.write();
|
||||
CHECK(cs2.fetch_ulong_bool(64, created_lt) // augmentation
|
||||
&& enq_msg_descr.unpack(cs2) // unpack EnqueuedMsg
|
||||
&& enq_msg_descr.check_key(key) // check key
|
||||
&& enq_msg_descr.lt_ == created_lt);
|
||||
|
||||
if (!dequeue_message(std::move(enq_msg_descr.msg_env_), deliver_lt)) {
|
||||
fatal_error(PSTRING() << "cannot dequeue outbound message with (lt,hash)=(" << enq_msg_descr.lt_ << ","
|
||||
<< enq_msg_descr.hash_.to_hex() << ") by inserting a msg_export_deq record");
|
||||
return -1;
|
||||
fail = true;
|
||||
return false;
|
||||
}
|
||||
register_out_msg_queue_op();
|
||||
if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) {
|
||||
block_full_ = true;
|
||||
}
|
||||
}
|
||||
return !delivered;
|
||||
return true;
|
||||
});
|
||||
LOG(DEBUG) << "deleted " << res << " messages from out_msg_queue";
|
||||
if (res < 0) {
|
||||
LOG(DEBUG) << "deleted " << deleted << " messages from out_msg_queue";
|
||||
if (fail) {
|
||||
return fatal_error("error scanning/updating OutMsgQueue");
|
||||
}
|
||||
auto rt = out_msg_queue_->get_root();
|
||||
|
@ -4026,10 +4067,10 @@ bool Collator::create_collated_data() {
|
|||
collated_roots_.push_back(std::move(cell));
|
||||
}
|
||||
// 2. Message count for neighbors' out queues
|
||||
if (!neighbor_msg_queues_limits_.is_empty()) {
|
||||
if (!neighbor_msg_queues_limits_dict_.is_empty()) {
|
||||
vm::CellBuilder cb;
|
||||
cb.store_long(block::gen::t_NeighborMsgQueueLimits.cons_tag[0], 32);
|
||||
cb.store_maybe_ref(neighbor_msg_queues_limits_.get_root_cell());
|
||||
cb.store_maybe_ref(neighbor_msg_queues_limits_dict_.get_root_cell());
|
||||
collated_roots_.push_back(cb.finalize_novm());
|
||||
}
|
||||
if (!full_collated_data_) {
|
||||
|
|
|
@ -528,6 +528,7 @@ bool ValidateQuery::extract_collated_data_from(Ref<vm::Cell> croot, int idx) {
|
|||
return reject_query("invalid NeighborMsgQueueLimits");
|
||||
}
|
||||
neighbor_msg_queues_limits_ = vm::Dictionary{cs.prefetch_ref(0), 32 + 64};
|
||||
return true;
|
||||
}
|
||||
LOG(WARNING) << "collated datum # " << idx << " has unknown type (magic " << cs.prefetch_ulong(32) << "), ignoring";
|
||||
return true;
|
||||
|
@ -4123,7 +4124,14 @@ bool ValidateQuery::check_in_queue() {
|
|||
if (msg_limit < -1) {
|
||||
return reject_query("invalid value in NeighborMsgQueueLimits");
|
||||
}
|
||||
LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << msg_limit;
|
||||
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_, msg_limit);
|
||||
if (msg_limit != -1 && descr.shard().is_masterchain()) {
|
||||
return reject_query("masterchain out message queue cannot be limited");
|
||||
}
|
||||
if (msg_limit != -1 && shard_intersects(descr.shard(), shard_)) {
|
||||
return reject_query("prev block out message queue cannot be limited");
|
||||
}
|
||||
}
|
||||
block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues));
|
||||
while (!nb_out_msgs.is_eof()) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue