mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Merge message dispatch queue (#1030)
* Deferred messages and msg metadata * Store out msg queue size in state * Add checks for queue processing 1. Collator must process at least one message from AccountDispatchQueue (unless block is full) 2. The first message from a transaction is not counted, it cannot be deferred (unless AccountDispatchQueue is not empty) * Return msg metadata from LS in listBlockTransactions[Ext] * Enable new features by capabilities * Changes in deferred messages * Process deferred messages via new_msgs in collator * Rework setting deferred_lt, bring back check_message_processing_order, check order of deferred_lt in validator * Use have_unprocessed_account_dispatch_queue_ in collator * Fix setting transaction lt for deferred messages * Fix lite-client compilation error * Changes in process_dispatch_queue, rename deferred_lt -> emitted_lt * Fix compilation error * Use uint64 for msg queue size * Add liteServer.getBlockOutMsgQueueSize * Fix compilation error * Fix typos in comments --------- Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
parent
38fc1d5456
commit
0daee1d887
29 changed files with 1889 additions and 318 deletions
|
@ -1,7 +1,7 @@
|
|||
/*
|
||||
This file is part of TON Blockchain Library.
|
||||
This file is part of TON Blockchain Library.
|
||||
|
||||
TON Blockchain Library is free software: you can redistribute it and/or modify
|
||||
TON Blockchain Library is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation, either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
@ -50,6 +50,7 @@ static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000;
|
|||
static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047;
|
||||
static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000;
|
||||
static const int HIGH_PRIORITY_EXTERNAL = 10; // don't skip high priority externals when queue is big
|
||||
static const int DEFER_MESSAGES_AFTER = 10; // 10'th and later messages from address will be deferred
|
||||
|
||||
#define DBG(__n) dbg(__n)&&
|
||||
#define DSTART int __dcnt = 0;
|
||||
|
@ -694,6 +695,9 @@ bool Collator::unpack_last_mc_state() {
|
|||
create_stats_enabled_ = config_->create_stats_enabled();
|
||||
report_version_ = config_->has_capability(ton::capReportVersion);
|
||||
short_dequeue_records_ = config_->has_capability(ton::capShortDequeue);
|
||||
store_out_msg_queue_size_ = config_->has_capability(ton::capStoreOutMsgQueueSize);
|
||||
msg_metadata_enabled_ = config_->has_capability(ton::capMsgMetadata);
|
||||
deferring_messages_enabled_ = config_->has_capability(ton::capDeferMessages);
|
||||
shard_conf_ = std::make_unique<block::ShardConfig>(*config_);
|
||||
prev_key_block_exists_ = config_->get_last_key_block(prev_key_block_, prev_key_block_lt_);
|
||||
if (prev_key_block_exists_) {
|
||||
|
@ -794,19 +798,20 @@ bool Collator::request_neighbor_msg_queues() {
|
|||
}
|
||||
|
||||
/**
|
||||
* Requests the size of the outbound message queue from the previous state(s).
|
||||
* Requests the size of the outbound message queue from the previous state(s) if needed.
|
||||
*
|
||||
* @returns True if the request was successful, false otherwise.
|
||||
*/
|
||||
bool Collator::request_out_msg_queue_size() {
|
||||
if (after_split_) {
|
||||
// If block is after split, the size is calculated during split (see Collator::split_last_state)
|
||||
if (have_out_msg_queue_size_in_state_) {
|
||||
// if after_split then have_out_msg_queue_size_in_state_ is always true, since the size is calculated during split
|
||||
return true;
|
||||
}
|
||||
out_msg_queue_size_ = 0;
|
||||
for (size_t i = 0; i < prev_blocks.size(); ++i) {
|
||||
++pending;
|
||||
send_closure_later(manager, &ValidatorManager::get_out_msg_queue_size, prev_blocks[i],
|
||||
[self = get_self(), i](td::Result<td::uint32> res) {
|
||||
[self = get_self(), i](td::Result<td::uint64> res) {
|
||||
td::actor::send_closure(std::move(self), &Collator::got_out_queue_size, i, std::move(res));
|
||||
});
|
||||
}
|
||||
|
@ -885,14 +890,14 @@ void Collator::got_neighbor_out_queue(int i, td::Result<Ref<MessageQueue>> res)
|
|||
* @param i The index of the previous block (0 or 1).
|
||||
* @param res The result object containing the size of the queue.
|
||||
*/
|
||||
void Collator::got_out_queue_size(size_t i, td::Result<td::uint32> res) {
|
||||
void Collator::got_out_queue_size(size_t i, td::Result<td::uint64> res) {
|
||||
--pending;
|
||||
if (res.is_error()) {
|
||||
fatal_error(
|
||||
res.move_as_error_prefix(PSTRING() << "failed to get message queue size from prev block #" << i << ": "));
|
||||
return;
|
||||
}
|
||||
td::uint32 size = res.move_as_ok();
|
||||
td::uint64 size = res.move_as_ok();
|
||||
LOG(WARNING) << "got outbound queue size from prev block #" << i << ": " << size;
|
||||
out_msg_queue_size_ += size;
|
||||
check_pending();
|
||||
|
@ -1016,7 +1021,7 @@ bool Collator::split_last_state(block::ShardState& ss) {
|
|||
return fatal_error(res2.move_as_error());
|
||||
}
|
||||
sibling_processed_upto_ = res2.move_as_ok();
|
||||
auto res3 = ss.split(shard_, &out_msg_queue_size_);
|
||||
auto res3 = ss.split(shard_);
|
||||
if (res3.is_error()) {
|
||||
return fatal_error(std::move(res3));
|
||||
}
|
||||
|
@ -1052,7 +1057,12 @@ bool Collator::import_shard_state_data(block::ShardState& ss) {
|
|||
out_msg_queue_ = std::move(ss.out_msg_queue_);
|
||||
processed_upto_ = std::move(ss.processed_upto_);
|
||||
ihr_pending = std::move(ss.ihr_pending_);
|
||||
dispatch_queue_ = std::move(ss.dispatch_queue_);
|
||||
block_create_stats_ = std::move(ss.block_create_stats_);
|
||||
if (ss.out_msg_queue_size_) {
|
||||
have_out_msg_queue_size_in_state_ = true;
|
||||
out_msg_queue_size_ = ss.out_msg_queue_size_.value();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2090,6 +2100,11 @@ bool Collator::do_collate() {
|
|||
if (!init_value_create()) {
|
||||
return fatal_error("cannot compute the value to be created / minted / recovered");
|
||||
}
|
||||
// 2-. take messages from dispatch queue
|
||||
LOG(INFO) << "process dispatch queue";
|
||||
if (!process_dispatch_queue()) {
|
||||
return fatal_error("cannot process dispatch queue");
|
||||
}
|
||||
// 2. tick transactions
|
||||
LOG(INFO) << "create tick transactions";
|
||||
if (!create_ticktock_transactions(2)) {
|
||||
|
@ -2597,7 +2612,7 @@ bool Collator::create_special_transaction(block::CurrencyCollection amount, Ref<
|
|||
}
|
||||
CHECK(block::gen::t_Message_Any.validate_ref(msg));
|
||||
CHECK(block::tlb::t_Message.validate_ref(msg));
|
||||
if (process_one_new_message(block::NewOutMsg{lt, msg, Ref<vm::Cell>{}}, false, &in_msg) != 1) {
|
||||
if (process_one_new_message(block::NewOutMsg{lt, msg, Ref<vm::Cell>{}, 0}, false, &in_msg) != 1) {
|
||||
return fatal_error("cannot generate special transaction for recovering "s + amount.to_str() + " to account " +
|
||||
addr.to_hex());
|
||||
}
|
||||
|
@ -2639,13 +2654,18 @@ bool Collator::create_ticktock_transaction(const ton::StdSmcAddress& smc_addr, t
|
|||
return true;
|
||||
}
|
||||
req_start_lt = std::max(req_start_lt, start_lt + 1);
|
||||
auto it = last_dispatch_queue_emitted_lt_.find(acc->addr);
|
||||
if (it != last_dispatch_queue_emitted_lt_.end()) {
|
||||
req_start_lt = std::max(req_start_lt, it->second + 1);
|
||||
}
|
||||
if (acc->last_trans_end_lt_ >= start_lt && acc->transactions.empty()) {
|
||||
return fatal_error(td::Status::Error(-666, PSTRING()
|
||||
<< "last transaction time in the state of account " << workchain()
|
||||
<< ":" << smc_addr.to_hex() << " is too large"));
|
||||
}
|
||||
std::unique_ptr<block::transaction::Transaction> trans = std::make_unique<block::transaction::Transaction>(
|
||||
*acc, mask == 2 ? block::transaction::Transaction::tr_tick : block::transaction::Transaction::tr_tock, req_start_lt, now_);
|
||||
*acc, mask == 2 ? block::transaction::Transaction::tr_tick : block::transaction::Transaction::tr_tock,
|
||||
req_start_lt, now_);
|
||||
if (!trans->prepare_storage_phase(storage_phase_cfg_, true)) {
|
||||
return fatal_error(td::Status::Error(
|
||||
-666, std::string{"cannot create storage phase of a new transaction for smart contract "} + smc_addr.to_hex()));
|
||||
|
@ -2675,7 +2695,8 @@ bool Collator::create_ticktock_transaction(const ton::StdSmcAddress& smc_addr, t
|
|||
td::Status::Error(-666, std::string{"cannot commit new transaction for smart contract "} + smc_addr.to_hex()));
|
||||
}
|
||||
update_max_lt(acc->last_trans_end_lt_);
|
||||
register_new_msgs(*trans);
|
||||
block::MsgMetadata new_msg_metadata{0, acc->workchain, acc->addr, trans->start_lt};
|
||||
register_new_msgs(*trans, std::move(new_msg_metadata));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2683,11 +2704,15 @@ bool Collator::create_ticktock_transaction(const ton::StdSmcAddress& smc_addr, t
|
|||
* Creates an ordinary transaction using a given message.
|
||||
*
|
||||
* @param msg_root The root of the message to be processed serialized using Message TLB-scheme.
|
||||
* @param msg_metadata Metadata of the inbound message.
|
||||
* @param after_lt Transaction lt will be grater than after_lt. Used for deferred messages.
|
||||
* @param is_special_tx True if creating a special transaction (mint/recover), false otherwise.
|
||||
*
|
||||
* @returns The root of the serialized transaction, or an empty reference if the transaction creation fails.
|
||||
*/
|
||||
Ref<vm::Cell> Collator::create_ordinary_transaction(Ref<vm::Cell> msg_root, bool is_special_tx) {
|
||||
Ref<vm::Cell> Collator::create_ordinary_transaction(Ref<vm::Cell> msg_root,
|
||||
td::optional<block::MsgMetadata> msg_metadata, LogicalTime after_lt,
|
||||
bool is_special_tx) {
|
||||
ton::StdSmcAddress addr;
|
||||
auto cs = vm::load_cell_slice(msg_root);
|
||||
bool external;
|
||||
|
@ -2731,8 +2756,15 @@ Ref<vm::Cell> Collator::create_ordinary_transaction(Ref<vm::Cell> msg_root, bool
|
|||
block::Account* acc = acc_res.move_as_ok();
|
||||
assert(acc);
|
||||
|
||||
if (external) {
|
||||
after_lt = std::max(after_lt, last_proc_int_msg_.first);
|
||||
}
|
||||
auto it = last_dispatch_queue_emitted_lt_.find(acc->addr);
|
||||
if (it != last_dispatch_queue_emitted_lt_.end()) {
|
||||
after_lt = std::max(after_lt, it->second);
|
||||
}
|
||||
auto res = impl_create_ordinary_transaction(msg_root, acc, now_, start_lt, &storage_phase_cfg_, &compute_phase_cfg_,
|
||||
&action_phase_cfg_, external, last_proc_int_msg_.first);
|
||||
&action_phase_cfg_, external, after_lt);
|
||||
if (res.is_error()) {
|
||||
auto error = res.move_as_error();
|
||||
if (error.code() == -701) {
|
||||
|
@ -2756,7 +2788,14 @@ Ref<vm::Cell> Collator::create_ordinary_transaction(Ref<vm::Cell> msg_root, bool
|
|||
return {};
|
||||
}
|
||||
|
||||
register_new_msgs(*trans);
|
||||
td::optional<block::MsgMetadata> new_msg_metadata;
|
||||
if (external || is_special_tx) {
|
||||
new_msg_metadata = block::MsgMetadata{0, acc->workchain, acc->addr, trans->start_lt};
|
||||
} else if (msg_metadata) {
|
||||
new_msg_metadata = std::move(msg_metadata);
|
||||
++new_msg_metadata.value().depth;
|
||||
}
|
||||
register_new_msgs(*trans, std::move(new_msg_metadata));
|
||||
update_max_lt(acc->last_trans_end_lt_);
|
||||
value_flow_.burned += trans->blackhole_burned;
|
||||
return trans_root;
|
||||
|
@ -2791,13 +2830,12 @@ td::Result<std::unique_ptr<block::transaction::Transaction>> Collator::impl_crea
|
|||
<< ":" << acc->addr.to_hex() << " is too large");
|
||||
}
|
||||
auto trans_min_lt = lt;
|
||||
if (external) {
|
||||
// transactions processing external messages must have lt larger than all processed internal messages
|
||||
trans_min_lt = std::max(trans_min_lt, after_lt);
|
||||
}
|
||||
// transactions processing external messages must have lt larger than all processed internal messages
|
||||
// if account has deferred message processed in this block, the next transaction should have lt > emitted_lt
|
||||
trans_min_lt = std::max(trans_min_lt, after_lt);
|
||||
|
||||
std::unique_ptr<block::transaction::Transaction> trans =
|
||||
std::make_unique<block::transaction::Transaction>(*acc, block::transaction::Transaction::tr_ord, trans_min_lt + 1, utime, msg_root);
|
||||
std::unique_ptr<block::transaction::Transaction> trans = std::make_unique<block::transaction::Transaction>(
|
||||
*acc, block::transaction::Transaction::tr_ord, trans_min_lt + 1, utime, msg_root);
|
||||
bool ihr_delivered = false; // FIXME
|
||||
if (!trans->unpack_input_msg(ihr_delivered, action_phase_cfg)) {
|
||||
if (external) {
|
||||
|
@ -2948,7 +2986,7 @@ bool Collator::is_our_address(const ton::StdSmcAddress& addr) const {
|
|||
}
|
||||
|
||||
/**
|
||||
* Processes a message generated in this block.
|
||||
* Processes a message generated in this block or a message from DispatchQueue.
|
||||
*
|
||||
* @param msg The new message to be processed.
|
||||
* @param enqueue_only Flag indicating whether the message should only be enqueued.
|
||||
|
@ -2961,6 +2999,7 @@ bool Collator::is_our_address(const ton::StdSmcAddress& addr) const {
|
|||
* -1 - error occured.
|
||||
*/
|
||||
int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, Ref<vm::Cell>* is_special) {
|
||||
bool from_dispatch_queue = msg.msg_env_from_dispatch_queue.not_null();
|
||||
Ref<vm::CellSlice> src, dest;
|
||||
bool enqueue, external;
|
||||
auto cs = load_cell_slice(msg.msg);
|
||||
|
@ -2972,7 +3011,7 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
|
|||
if (!tlb::unpack(cs, info)) {
|
||||
return -1;
|
||||
}
|
||||
CHECK(info.created_lt == msg.lt && info.created_at == now_);
|
||||
CHECK(info.created_lt == msg.lt && info.created_at == now_ && !from_dispatch_queue);
|
||||
src = std::move(info.src);
|
||||
enqueue = external = true;
|
||||
break;
|
||||
|
@ -2982,7 +3021,7 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
|
|||
if (!tlb::unpack(cs, info)) {
|
||||
return -1;
|
||||
}
|
||||
CHECK(info.created_lt == msg.lt && info.created_at == now_);
|
||||
CHECK(from_dispatch_queue || (info.created_lt == msg.lt && info.created_at == now_));
|
||||
src = std::move(info.src);
|
||||
dest = std::move(info.dest);
|
||||
fwd_fees = block::tlb::t_Grams.as_integer(info.fwd_fee);
|
||||
|
@ -2994,7 +3033,7 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
|
|||
default:
|
||||
return -1;
|
||||
}
|
||||
CHECK(is_our_address(std::move(src)));
|
||||
CHECK(is_our_address(src));
|
||||
if (external) {
|
||||
// 1. construct a msg_export_ext OutMsg
|
||||
vm::CellBuilder cb;
|
||||
|
@ -3006,9 +3045,44 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
|
|||
// (if ever a structure in the block for listing all external outbound messages appears, insert this message there as well)
|
||||
return 0;
|
||||
}
|
||||
if (enqueue) {
|
||||
auto lt = msg.lt;
|
||||
bool ok = enqueue_message(std::move(msg), std::move(fwd_fees), lt);
|
||||
|
||||
WorkchainId src_wc;
|
||||
StdSmcAddress src_addr;
|
||||
CHECK(block::tlb::t_MsgAddressInt.extract_std_address(src, src_wc, src_addr));
|
||||
CHECK(src_wc == workchain());
|
||||
bool is_special_account = is_masterchain() && config_->is_special_smartcontract(src_addr);
|
||||
bool defer = false;
|
||||
if (!from_dispatch_queue) {
|
||||
if (deferring_messages_enabled_ && !is_special && !is_special_account && msg.msg_idx != 0) {
|
||||
if (++sender_generated_messages_count_[src_addr] >= DEFER_MESSAGES_AFTER) {
|
||||
defer = true;
|
||||
}
|
||||
}
|
||||
if (dispatch_queue_->lookup(src_addr).not_null() || unprocessed_deferred_messages_.count(src_addr)) {
|
||||
defer = true;
|
||||
}
|
||||
} else {
|
||||
auto &x = unprocessed_deferred_messages_[src_addr];
|
||||
CHECK(x > 0);
|
||||
if (--x == 0) {
|
||||
unprocessed_deferred_messages_.erase(src_addr);
|
||||
}
|
||||
}
|
||||
|
||||
if (enqueue || defer) {
|
||||
bool ok;
|
||||
if (from_dispatch_queue) {
|
||||
auto msg_env = msg.msg_env_from_dispatch_queue;
|
||||
block::tlb::MsgEnvelope::Record_std env;
|
||||
CHECK(block::tlb::unpack_cell(msg_env, env));
|
||||
auto src_prefix = block::tlb::MsgAddressInt::get_prefix(src);
|
||||
auto dest_prefix = block::tlb::MsgAddressInt::get_prefix(dest);
|
||||
CHECK(env.emitted_lt && env.emitted_lt.value() == msg.lt);
|
||||
ok = enqueue_transit_message(std::move(msg.msg), std::move(msg_env), src_prefix, src_prefix, dest_prefix,
|
||||
std::move(env.fwd_fee_remaining), std::move(env.metadata), msg.lt);
|
||||
} else {
|
||||
ok = enqueue_message(std::move(msg), std::move(fwd_fees), src_addr, defer);
|
||||
}
|
||||
return ok ? 0 : -1;
|
||||
}
|
||||
// process message by a transaction in this block:
|
||||
|
@ -3019,26 +3093,36 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
|
|||
return -1;
|
||||
}
|
||||
// 1. create a Transaction processing this Message
|
||||
auto trans_root = create_ordinary_transaction(msg.msg, is_special != nullptr);
|
||||
auto trans_root = create_ordinary_transaction(msg.msg, msg.metadata, msg.lt, is_special != nullptr);
|
||||
if (trans_root.is_null()) {
|
||||
fatal_error("cannot create transaction for re-processing output message");
|
||||
return -1;
|
||||
}
|
||||
// 2. create a MsgEnvelope enveloping this Message
|
||||
vm::CellBuilder cb;
|
||||
CHECK(cb.store_long_bool(0x46060, 20) // msg_envelope#4 cur_addr:.. next_addr:..
|
||||
&& block::tlb::t_Grams.store_integer_ref(cb, fwd_fees) // fwd_fee_remaining:t_Grams
|
||||
&& cb.store_ref_bool(msg.msg)); // msg:^(Message Any)
|
||||
Ref<vm::Cell> msg_env = cb.finalize();
|
||||
block::tlb::MsgEnvelope::Record_std msg_env_rec{0x60, 0x60, fwd_fees, msg.msg, {}, msg.metadata};
|
||||
Ref<vm::Cell> msg_env;
|
||||
CHECK(block::tlb::pack_cell(msg_env, msg_env_rec));
|
||||
if (verbosity > 2) {
|
||||
std::cerr << "new (processed outbound) message envelope: ";
|
||||
block::gen::t_MsgEnvelope.print_ref(std::cerr, msg_env);
|
||||
}
|
||||
// 3. create InMsg, referring to this MsgEnvelope and this Transaction
|
||||
CHECK(cb.store_long_bool(3, 3) // msg_import_imm$011
|
||||
&& cb.store_ref_bool(msg_env) // in_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(trans_root) // transaction:^Transaction
|
||||
&& block::tlb::t_Grams.store_integer_ref(cb, fwd_fees)); // fwd_fee:Grams
|
||||
vm::CellBuilder cb;
|
||||
if (from_dispatch_queue) {
|
||||
auto msg_env = msg.msg_env_from_dispatch_queue;
|
||||
block::tlb::MsgEnvelope::Record_std env;
|
||||
CHECK(block::tlb::unpack_cell(msg_env, env));
|
||||
CHECK(env.emitted_lt && env.emitted_lt.value() == msg.lt);
|
||||
CHECK(cb.store_long_bool(0b00100, 5) // msg_import_deferred_fin$00100
|
||||
&& cb.store_ref_bool(msg_env) // in_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(trans_root) // transaction:^Transaction
|
||||
&& block::tlb::t_Grams.store_integer_ref(cb, env.fwd_fee_remaining)); // fwd_fee:Grams
|
||||
} else {
|
||||
CHECK(cb.store_long_bool(3, 3) // msg_import_imm$011
|
||||
&& cb.store_ref_bool(msg_env) // in_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(trans_root) // transaction:^Transaction
|
||||
&& block::tlb::t_Grams.store_integer_ref(cb, fwd_fees)); // fwd_fee:Grams
|
||||
}
|
||||
// 4. insert InMsg into InMsgDescr
|
||||
Ref<vm::Cell> in_msg = cb.finalize();
|
||||
if (!insert_in_msg(in_msg)) {
|
||||
|
@ -3049,14 +3133,16 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
|
|||
*is_special = in_msg;
|
||||
return 1;
|
||||
}
|
||||
// 5. create OutMsg, referring to this MsgEnvelope and InMsg
|
||||
CHECK(cb.store_long_bool(2, 3) // msg_export_imm$010
|
||||
&& cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(msg.trans) // transaction:^Transaction
|
||||
&& cb.store_ref_bool(in_msg)); // reimport:^InMsg
|
||||
// 6. insert OutMsg into OutMsgDescr
|
||||
if (!insert_out_msg(cb.finalize())) {
|
||||
return -1;
|
||||
if (!from_dispatch_queue) {
|
||||
// 5. create OutMsg, referring to this MsgEnvelope and InMsg
|
||||
CHECK(cb.store_long_bool(2, 3) // msg_export_imm$010
|
||||
&& cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(msg.trans) // transaction:^Transaction
|
||||
&& cb.store_ref_bool(in_msg)); // reimport:^InMsg
|
||||
// 6. insert OutMsg into OutMsgDescr
|
||||
if (!insert_out_msg(cb.finalize())) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
// 7. check whether the block is full now
|
||||
if (!block_limit_status_->fits(block::ParamLimits::cl_normal)) {
|
||||
|
@ -3081,41 +3167,61 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R
|
|||
* @param cur_prefix The account ID prefix for the next hop.
|
||||
* @param dest_prefix The prefix of the destination account ID.
|
||||
* @param fwd_fee_remaining The remaining forward fee.
|
||||
* @param msg_metadata Metadata of the message.
|
||||
* @param emitted_lt If present - the message was taken from DispatchQueue, and msg_env will have this emitted_lt.
|
||||
*
|
||||
* @returns True if the transit message is successfully enqueued, false otherwise.
|
||||
*/
|
||||
bool Collator::enqueue_transit_message(Ref<vm::Cell> msg, Ref<vm::Cell> old_msg_env,
|
||||
ton::AccountIdPrefixFull prev_prefix, ton::AccountIdPrefixFull cur_prefix,
|
||||
ton::AccountIdPrefixFull dest_prefix, td::RefInt256 fwd_fee_remaining) {
|
||||
LOG(DEBUG) << "enqueueing transit message " << msg->get_hash().bits().to_hex(256);
|
||||
bool requeue = is_our_address(prev_prefix);
|
||||
ton::AccountIdPrefixFull dest_prefix, td::RefInt256 fwd_fee_remaining,
|
||||
td::optional<block::MsgMetadata> msg_metadata,
|
||||
td::optional<LogicalTime> emitted_lt) {
|
||||
bool from_dispatch_queue = (bool)emitted_lt;
|
||||
if (from_dispatch_queue) {
|
||||
LOG(DEBUG) << "enqueueing message from dispatch queue " << msg->get_hash().bits().to_hex(256)
|
||||
<< ", emitted_lt=" << emitted_lt.value();
|
||||
} else {
|
||||
LOG(DEBUG) << "enqueueing transit message " << msg->get_hash().bits().to_hex(256);
|
||||
}
|
||||
bool requeue = !from_dispatch_queue && is_our_address(prev_prefix) && !from_dispatch_queue;
|
||||
// 1. perform hypercube routing
|
||||
auto route_info = block::perform_hypercube_routing(cur_prefix, dest_prefix, shard_);
|
||||
if ((unsigned)route_info.first > 96 || (unsigned)route_info.second > 96) {
|
||||
return fatal_error("cannot perform hypercube routing for a transit message");
|
||||
}
|
||||
// 2. compute our part of transit fees
|
||||
td::RefInt256 transit_fee = action_phase_cfg_.fwd_std.get_next_part(fwd_fee_remaining);
|
||||
td::RefInt256 transit_fee =
|
||||
from_dispatch_queue ? td::zero_refint() : action_phase_cfg_.fwd_std.get_next_part(fwd_fee_remaining);
|
||||
fwd_fee_remaining -= transit_fee;
|
||||
CHECK(td::sgn(transit_fee) >= 0 && td::sgn(fwd_fee_remaining) >= 0);
|
||||
// 3. create a new MsgEnvelope
|
||||
vm::CellBuilder cb;
|
||||
CHECK(cb.store_long_bool(4, 4) // msg_envelope#4 cur_addr:.. next_addr:..
|
||||
&& cb.store_long_bool(route_info.first, 8) // cur_addr:IntermediateAddress
|
||||
&& cb.store_long_bool(route_info.second, 8) // next_addr:IntermediateAddress
|
||||
&& block::tlb::t_Grams.store_integer_ref(cb, fwd_fee_remaining) // fwd_fee_remaining:t_Grams
|
||||
&& cb.store_ref_bool(msg)); // msg:^(Message Any)
|
||||
Ref<vm::Cell> msg_env = cb.finalize();
|
||||
block::tlb::MsgEnvelope::Record_std msg_env_rec{route_info.first, route_info.second, fwd_fee_remaining, msg,
|
||||
emitted_lt, std::move(msg_metadata)};
|
||||
Ref<vm::Cell> msg_env;
|
||||
CHECK(block::tlb::t_MsgEnvelope.pack_cell(msg_env, msg_env_rec));
|
||||
// 4. create InMsg
|
||||
CHECK(cb.store_long_bool(5, 3) // msg_import_tr$101
|
||||
&& cb.store_ref_bool(old_msg_env) // in_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
|
||||
&& block::tlb::t_Grams.store_integer_ref(cb, transit_fee)); // transit_fee:Grams
|
||||
vm::CellBuilder cb;
|
||||
if (from_dispatch_queue) {
|
||||
CHECK(cb.store_long_bool(0b00101, 5) // msg_import_deferred_tr$00101
|
||||
&& cb.store_ref_bool(old_msg_env) // in_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(msg_env)); // out_msg:^MsgEnvelope
|
||||
} else {
|
||||
CHECK(cb.store_long_bool(5, 3) // msg_import_tr$101
|
||||
&& cb.store_ref_bool(old_msg_env) // in_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
|
||||
&& block::tlb::t_Grams.store_integer_ref(cb, transit_fee)); // transit_fee:Grams
|
||||
}
|
||||
Ref<vm::Cell> in_msg = cb.finalize();
|
||||
// 5. create a new OutMsg
|
||||
CHECK(cb.store_long_bool(requeue ? 7 : 3, 3) // msg_export_tr$011 or msg_export_tr_req$111
|
||||
&& cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(in_msg)); // imported:^InMsg
|
||||
// msg_export_tr$011 / msg_export_tr_req$111 / msg_export_deferred_tr$10101
|
||||
if (from_dispatch_queue) {
|
||||
CHECK(cb.store_long_bool(0b10101, 5));
|
||||
} else {
|
||||
CHECK(cb.store_long_bool(requeue ? 7 : 3, 3));
|
||||
}
|
||||
CHECK(cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(in_msg)); // imported:^InMsg
|
||||
Ref<vm::Cell> out_msg = cb.finalize();
|
||||
// 4.1. insert OutMsg into OutMsgDescr
|
||||
if (verbosity > 2) {
|
||||
|
@ -3134,8 +3240,8 @@ bool Collator::enqueue_transit_message(Ref<vm::Cell> msg, Ref<vm::Cell> old_msg_
|
|||
return fatal_error("cannot insert a new InMsg into InMsgDescr");
|
||||
}
|
||||
// 5. create EnqueuedMsg
|
||||
CHECK(cb.store_long_bool(start_lt) // _ enqueued_lt:uint64
|
||||
&& cb.store_ref_bool(msg_env)); // out_msg:^MsgEnvelope = EnqueuedMsg;
|
||||
CHECK(cb.store_long_bool(from_dispatch_queue ? emitted_lt.value() : start_lt) // _ enqueued_lt:uint64
|
||||
&& cb.store_ref_bool(msg_env)); // out_msg:^MsgEnvelope = EnqueuedMsg;
|
||||
// 6. insert EnqueuedMsg into OutMsgQueue
|
||||
// NB: we use here cur_prefix instead of src_prefix; should we check that route_info.first >= next_addr.use_dest_bits of the old envelope?
|
||||
auto next_hop = block::interpolate_addr(cur_prefix, dest_prefix, route_info.second);
|
||||
|
@ -3237,9 +3343,14 @@ bool Collator::process_inbound_message(Ref<vm::CellSlice> enq_msg, ton::LogicalT
|
|||
LOG(ERROR) << "cannot unpack CommonMsgInfo of an inbound internal message";
|
||||
return false;
|
||||
}
|
||||
if (info.created_lt != lt) {
|
||||
if (!env.emitted_lt && info.created_lt != lt) {
|
||||
LOG(ERROR) << "inbound internal message has an augmentation value in source OutMsgQueue distinct from the one in "
|
||||
"its contents";
|
||||
"its contents (CommonMsgInfo)";
|
||||
return false;
|
||||
}
|
||||
if (env.emitted_lt && env.emitted_lt.value() != lt) {
|
||||
LOG(ERROR) << "inbound internal message has an augmentation value in source OutMsgQueue distinct from the one in "
|
||||
"its contents (deferred_it in MsgEnvelope)";
|
||||
return false;
|
||||
}
|
||||
if (!block::tlb::validate_message_libs(env.msg)) {
|
||||
|
@ -3302,7 +3413,8 @@ bool Collator::process_inbound_message(Ref<vm::CellSlice> enq_msg, ton::LogicalT
|
|||
bool our = ton::shard_contains(shard_, cur_prefix);
|
||||
bool to_us = ton::shard_contains(shard_, dest_prefix);
|
||||
|
||||
block::EnqueuedMsgDescr enq_msg_descr{cur_prefix, next_prefix, info.created_lt, enqueued_lt,
|
||||
block::EnqueuedMsgDescr enq_msg_descr{cur_prefix, next_prefix,
|
||||
env.emitted_lt ? env.emitted_lt.value() : info.created_lt, enqueued_lt,
|
||||
env.msg->get_hash().bits()};
|
||||
if (processed_upto_->already_processed(enq_msg_descr)) {
|
||||
LOG(DEBUG) << "inbound internal message with lt=" << enq_msg_descr.lt_ << " hash=" << enq_msg_descr.hash_.to_hex()
|
||||
|
@ -3319,7 +3431,7 @@ bool Collator::process_inbound_message(Ref<vm::CellSlice> enq_msg, ton::LogicalT
|
|||
// destination is outside our shard, relay transit message
|
||||
// (very similar to enqueue_message())
|
||||
if (!enqueue_transit_message(std::move(env.msg), std::move(msg_env), cur_prefix, next_prefix, dest_prefix,
|
||||
std::move(env.fwd_fee_remaining))) {
|
||||
std::move(env.fwd_fee_remaining), std::move(env.metadata))) {
|
||||
return fatal_error("cannot enqueue transit internal message with key "s + key.to_hex(352));
|
||||
}
|
||||
return !our || delete_out_msg_queue_msg(key);
|
||||
|
@ -3328,7 +3440,7 @@ bool Collator::process_inbound_message(Ref<vm::CellSlice> enq_msg, ton::LogicalT
|
|||
// process the message by an ordinary transaction similarly to process_one_new_message()
|
||||
//
|
||||
// 8. create a Transaction processing this Message
|
||||
auto trans_root = create_ordinary_transaction(env.msg);
|
||||
auto trans_root = create_ordinary_transaction(env.msg, env.metadata, 0);
|
||||
if (trans_root.is_null()) {
|
||||
return fatal_error("cannot create transaction for processing inbound message");
|
||||
}
|
||||
|
@ -3368,6 +3480,9 @@ bool Collator::process_inbound_message(Ref<vm::CellSlice> enq_msg, ton::LogicalT
|
|||
* @returns True if the processing was successful, false otherwise.
|
||||
*/
|
||||
bool Collator::process_inbound_internal_messages() {
|
||||
if (have_unprocessed_account_dispatch_queue_) {
|
||||
return true;
|
||||
}
|
||||
while (!block_full_ && !nb_out_msgs_->is_eof()) {
|
||||
block_full_ = !block_limit_status_->fits(block::ParamLimits::cl_normal);
|
||||
if (block_full_) {
|
||||
|
@ -3476,7 +3591,7 @@ int Collator::process_external_message(Ref<vm::Cell> msg) {
|
|||
}
|
||||
// process message by a transaction in this block:
|
||||
// 1. create a Transaction processing this Message
|
||||
auto trans_root = create_ordinary_transaction(msg);
|
||||
auto trans_root = create_ordinary_transaction(msg, /* metadata = */ {}, 0);
|
||||
if (trans_root.is_null()) {
|
||||
if (busy_) {
|
||||
// transaction rejected by account
|
||||
|
@ -3500,6 +3615,222 @@ int Collator::process_external_message(Ref<vm::Cell> msg) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes messages from dispatch queue
|
||||
*
|
||||
* Messages from dispatch queue are taken in three steps:
|
||||
* 1. Take one message from each account (in the order of lt)
|
||||
* 2. Take up to 10 per account (including from p.1), up to 20 per initiator, up to 150 in total
|
||||
* 3. Take up to X messages per initiator, up to 150 in total. X depends on out msg queue size
|
||||
*
|
||||
* @returns True if the processing was successful, false otherwise.
|
||||
*/
|
||||
bool Collator::process_dispatch_queue() {
|
||||
have_unprocessed_account_dispatch_queue_ = true;
|
||||
size_t max_total_count[3] = {1 << 30, 150, 150};
|
||||
size_t max_per_initiator[3] = {1 << 30, 20, 0};
|
||||
if (out_msg_queue_size_ <= 256) {
|
||||
max_per_initiator[2] = 10;
|
||||
} else if (out_msg_queue_size_ <= 512) {
|
||||
max_per_initiator[2] = 2;
|
||||
} else if (out_msg_queue_size_ <= 2048) {
|
||||
max_per_initiator[2] = 1;
|
||||
}
|
||||
for (int iter = 0; iter < 3; ++iter) {
|
||||
if (max_per_initiator[iter] == 0) {
|
||||
continue;
|
||||
}
|
||||
vm::AugmentedDictionary cur_dispatch_queue{dispatch_queue_->get_root(), 256, block::tlb::aug_DispatchQueue};
|
||||
std::map<std::tuple<WorkchainId, StdSmcAddress, LogicalTime>, size_t> count_per_initiator;
|
||||
size_t total_count = 0;
|
||||
while (!cur_dispatch_queue.is_empty()) {
|
||||
block_full_ = !block_limit_status_->fits(block::ParamLimits::cl_normal);
|
||||
if (block_full_) {
|
||||
LOG(INFO) << "BLOCK FULL, stop processing dispatch queue";
|
||||
return true;
|
||||
}
|
||||
if (soft_timeout_.is_in_past(td::Timestamp::now())) {
|
||||
block_full_ = true;
|
||||
LOG(WARNING) << "soft timeout reached, stop processing dispatch queue";
|
||||
return true;
|
||||
}
|
||||
StdSmcAddress src_addr;
|
||||
auto account_dispatch_queue = block::get_dispatch_queue_min_lt_account(cur_dispatch_queue, src_addr);
|
||||
if (account_dispatch_queue.is_null()) {
|
||||
return fatal_error("invalid dispatch queue in shard state");
|
||||
}
|
||||
vm::Dictionary dict{64};
|
||||
td::uint64 dict_size;
|
||||
if (!block::unpack_account_dispatch_queue(account_dispatch_queue, dict, dict_size)) {
|
||||
return fatal_error(PSTRING() << "invalid account dispatch queue for account " << src_addr.to_hex());
|
||||
}
|
||||
td::BitArray<64> key;
|
||||
Ref<vm::CellSlice> enqueued_msg = dict.extract_minmax_key(key.bits(), 64, false, false);
|
||||
LogicalTime lt = key.to_ulong();
|
||||
|
||||
td::optional<block::MsgMetadata> msg_metadata;
|
||||
if (!process_deferred_message(std::move(enqueued_msg), src_addr, lt, msg_metadata)) {
|
||||
return fatal_error(PSTRING() << "error processing internal message from dispatch queue: account="
|
||||
<< src_addr.to_hex() << ", lt=" << lt);
|
||||
}
|
||||
|
||||
// Remove message from DispatchQueue
|
||||
bool ok;
|
||||
if (iter == 0 || (iter == 1 && sender_generated_messages_count_[src_addr] >= DEFER_MESSAGES_AFTER)) {
|
||||
ok = cur_dispatch_queue.lookup_delete(src_addr).not_null();
|
||||
} else {
|
||||
dict.lookup_delete(key);
|
||||
--dict_size;
|
||||
account_dispatch_queue = block::pack_account_dispatch_queue(dict, dict_size);
|
||||
ok = account_dispatch_queue.not_null() ? cur_dispatch_queue.set(src_addr, account_dispatch_queue)
|
||||
: cur_dispatch_queue.lookup_delete(src_addr).not_null();
|
||||
}
|
||||
if (!ok) {
|
||||
return fatal_error(PSTRING() << "error processing internal message from dispatch queue: account="
|
||||
<< src_addr.to_hex() << ", lt=" << lt);
|
||||
}
|
||||
if (msg_metadata) {
|
||||
auto initiator = std::make_tuple(msg_metadata.value().initiator_wc, msg_metadata.value().initiator_addr,
|
||||
msg_metadata.value().initiator_lt);
|
||||
size_t initiator_count = ++count_per_initiator[initiator];
|
||||
if (initiator_count >= max_per_initiator[iter]) {
|
||||
cur_dispatch_queue.lookup_delete(src_addr);
|
||||
}
|
||||
}
|
||||
++total_count;
|
||||
if (total_count >= max_total_count[iter]) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (iter == 0) {
|
||||
have_unprocessed_account_dispatch_queue_ = false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes an internal message from DispatchQueue.
|
||||
* The message may create a transaction or be enqueued.
|
||||
*
|
||||
* Similar to Collator::process_inbound_message.
|
||||
*
|
||||
* @param enq_msg The internal message serialized using EnqueuedMsg TLB-scheme.
|
||||
* @param src_addr 256-bit address of the sender.
|
||||
* @param lt The logical time of the message.
|
||||
* @param msg_metadata Reference to store msg_metadata
|
||||
*
|
||||
* @returns True if the message was processed successfully, false otherwise.
|
||||
*/
|
||||
bool Collator::process_deferred_message(Ref<vm::CellSlice> enq_msg, StdSmcAddress src_addr, LogicalTime lt,
|
||||
td::optional<block::MsgMetadata>& msg_metadata) {
|
||||
if (!block::remove_dispatch_queue_entry(*dispatch_queue_, src_addr, lt)) {
|
||||
return fatal_error(PSTRING() << "failed to delete message from DispatchQueue: address=" << src_addr.to_hex()
|
||||
<< ", lt=" << lt);
|
||||
}
|
||||
++dispatch_queue_ops_;
|
||||
if (!(dispatch_queue_ops_ & 63)) {
|
||||
if (!block_limit_status_->add_proof(dispatch_queue_->get_root_cell())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
++sender_generated_messages_count_[src_addr];
|
||||
|
||||
LogicalTime enqueued_lt = 0;
|
||||
if (enq_msg.is_null() || enq_msg->size_ext() != 0x10040 || (enqueued_lt = enq_msg->prefetch_ulong(64)) != lt) {
|
||||
if (enq_msg.not_null()) {
|
||||
block::gen::t_EnqueuedMsg.print(std::cerr, *enq_msg);
|
||||
}
|
||||
LOG(ERROR) << "internal message in DispatchQueue is not a valid EnqueuedMsg (created lt " << lt << ", enqueued "
|
||||
<< enqueued_lt << ")";
|
||||
return false;
|
||||
}
|
||||
auto msg_env = enq_msg->prefetch_ref();
|
||||
CHECK(msg_env.not_null());
|
||||
// 0. check MsgEnvelope
|
||||
if (msg_env->get_level() != 0) {
|
||||
LOG(ERROR) << "cannot import a message with non-zero level!";
|
||||
return false;
|
||||
}
|
||||
if (!block::gen::t_MsgEnvelope.validate_ref(msg_env)) {
|
||||
LOG(ERROR) << "MsgEnvelope from DispatchQueue is invalid according to automated checks";
|
||||
return false;
|
||||
}
|
||||
if (!block::tlb::t_MsgEnvelope.validate_ref(msg_env)) {
|
||||
LOG(ERROR) << "MsgEnvelope from DispatchQueue is invalid according to hand-written checks";
|
||||
return false;
|
||||
}
|
||||
// 1. unpack MsgEnvelope
|
||||
block::tlb::MsgEnvelope::Record_std env;
|
||||
if (!tlb::unpack_cell(msg_env, env)) {
|
||||
LOG(ERROR) << "cannot unpack MsgEnvelope from DispatchQueue";
|
||||
return false;
|
||||
}
|
||||
// 2. unpack CommonMsgInfo of the message
|
||||
vm::CellSlice cs{vm::NoVmOrd{}, env.msg};
|
||||
if (block::gen::t_CommonMsgInfo.get_tag(cs) != block::gen::CommonMsgInfo::int_msg_info) {
|
||||
LOG(ERROR) << "internal message from DispatchQueue is not in fact internal!";
|
||||
return false;
|
||||
}
|
||||
block::gen::CommonMsgInfo::Record_int_msg_info info;
|
||||
if (!tlb::unpack(cs, info)) {
|
||||
LOG(ERROR) << "cannot unpack CommonMsgInfo of an internal message from DispatchQueue";
|
||||
return false;
|
||||
}
|
||||
if (info.created_lt != lt) {
|
||||
LOG(ERROR) << "internal message has lt in DispatchQueue distinct from the one in "
|
||||
"its contents";
|
||||
return false;
|
||||
}
|
||||
if (!block::tlb::validate_message_libs(env.msg)) {
|
||||
LOG(ERROR) << "internal message in DispatchQueue has invalid StateInit";
|
||||
return false;
|
||||
}
|
||||
// 2.1. check fwd_fee and fwd_fee_remaining
|
||||
td::RefInt256 orig_fwd_fee = block::tlb::t_Grams.as_integer(info.fwd_fee);
|
||||
if (env.fwd_fee_remaining > orig_fwd_fee) {
|
||||
LOG(ERROR) << "internal message if DispatchQueue has fwd_fee_remaining=" << td::dec_string(env.fwd_fee_remaining)
|
||||
<< " larger than original fwd_fee=" << td::dec_string(orig_fwd_fee);
|
||||
return false;
|
||||
}
|
||||
// 3. extract source and destination shards
|
||||
auto src_prefix = block::tlb::t_MsgAddressInt.get_prefix(info.src);
|
||||
auto dest_prefix = block::tlb::t_MsgAddressInt.get_prefix(info.dest);
|
||||
if (!(src_prefix.is_valid() && dest_prefix.is_valid())) {
|
||||
LOG(ERROR) << "internal message in DispatchQueue has invalid source or destination address";
|
||||
return false;
|
||||
}
|
||||
// 4. chech current and next hop shards
|
||||
if (env.cur_addr != 0 || env.next_addr != 0) {
|
||||
LOG(ERROR) << "internal message in DispatchQueue is expected to have zero cur_addr and next_addr";
|
||||
return false;
|
||||
}
|
||||
// 5. calculate emitted_lt
|
||||
LogicalTime emitted_lt = std::max(start_lt, last_dispatch_queue_emitted_lt_[src_addr]) + 1;
|
||||
auto it = accounts.find(src_addr);
|
||||
if (it != accounts.end()) {
|
||||
emitted_lt = std::max(emitted_lt, it->second->last_trans_end_lt_ + 1);
|
||||
}
|
||||
last_dispatch_queue_emitted_lt_[src_addr] = emitted_lt;
|
||||
update_max_lt(emitted_lt + 1);
|
||||
|
||||
env.emitted_lt = emitted_lt;
|
||||
if (!block::tlb::pack_cell(msg_env, env)) {
|
||||
return fatal_error("cannot pack msg envelope");
|
||||
}
|
||||
|
||||
// 6. create NewOutMsg
|
||||
block::NewOutMsg new_msg{emitted_lt, env.msg, {}, 0};
|
||||
new_msg.metadata = env.metadata;
|
||||
new_msg.msg_env_from_dispatch_queue = msg_env;
|
||||
++unprocessed_deferred_messages_[src_addr];
|
||||
LOG(INFO) << "delivering deferred message from account " << src_addr.to_hex() << ", lt=" << lt
|
||||
<< ", emitted_lt=" << emitted_lt;
|
||||
register_new_msg(std::move(new_msg));
|
||||
msg_metadata = std::move(env.metadata);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts an InMsg into the block's InMsgDescr.
|
||||
*
|
||||
|
@ -3517,8 +3848,9 @@ bool Collator::insert_in_msg(Ref<vm::Cell> in_msg) {
|
|||
return false;
|
||||
}
|
||||
Ref<vm::Cell> msg = cs.prefetch_ref();
|
||||
int tag = (int)cs.prefetch_ulong(3);
|
||||
if (!(tag == 0 || tag == 2)) { // msg_import_ext$000 or msg_import_ihr$010 contain (Message Any) directly
|
||||
int tag = block::gen::t_InMsg.get_tag(cs);
|
||||
// msg_import_ext$000 or msg_import_ihr$010 contain (Message Any) directly
|
||||
if (!(tag == block::gen::InMsg::msg_import_ext || tag == block::gen::InMsg::msg_import_ihr)) {
|
||||
// extract Message Any from MsgEnvelope to compute correct key
|
||||
auto cs2 = load_cell_slice(std::move(msg));
|
||||
if (!cs2.size_refs()) {
|
||||
|
@ -3599,11 +3931,15 @@ bool Collator::insert_out_msg(Ref<vm::Cell> out_msg, td::ConstBitPtr msg_hash) {
|
|||
*
|
||||
* @param msg The new outbound message to enqueue.
|
||||
* @param fwd_fees_remaining The remaining forward fees for the message.
|
||||
* @param enqueued_lt The logical time at which the message is enqueued.
|
||||
* @param src_addr 256-bit address of the sender
|
||||
* @param defer Put the message to DispatchQueue
|
||||
*
|
||||
* @returns True if the message was successfully enqueued, false otherwise.
|
||||
*/
|
||||
bool Collator::enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_remaining, ton::LogicalTime enqueued_lt) {
|
||||
bool Collator::enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_remaining, StdSmcAddress src_addr,
|
||||
bool defer) {
|
||||
LogicalTime enqueued_lt = msg.lt;
|
||||
CHECK(msg.msg_env_from_dispatch_queue.is_null());
|
||||
// 0. unpack src_addr and dest_addr
|
||||
block::gen::CommonMsgInfo::Record_int_msg_info info;
|
||||
if (!tlb::unpack_cell_inexact(msg.msg, info)) {
|
||||
|
@ -3623,18 +3959,24 @@ bool Collator::enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_rema
|
|||
return fatal_error("cannot perform hypercube routing for a new outbound message");
|
||||
}
|
||||
// 2. create a new MsgEnvelope
|
||||
vm::CellBuilder cb;
|
||||
CHECK(cb.store_long_bool(4, 4) // msg_envelope#4 cur_addr:.. next_addr:..
|
||||
&& cb.store_long_bool(route_info.first, 8) // cur_addr:IntermediateAddress
|
||||
&& cb.store_long_bool(route_info.second, 8) // next_addr:IntermediateAddress
|
||||
&& block::tlb::t_Grams.store_integer_ref(cb, fwd_fees_remaining) // fwd_fee_remaining:t_Grams
|
||||
&& cb.store_ref_bool(msg.msg)); // msg:^(Message Any)
|
||||
Ref<vm::Cell> msg_env = cb.finalize();
|
||||
block::tlb::MsgEnvelope::Record_std msg_env_rec{
|
||||
defer ? 0 : route_info.first, defer ? 0 : route_info.second, fwd_fees_remaining, msg.msg, {}, msg.metadata};
|
||||
Ref<vm::Cell> msg_env;
|
||||
CHECK(block::tlb::pack_cell(msg_env, msg_env_rec));
|
||||
// 3. create a new OutMsg
|
||||
CHECK(cb.store_long_bool(1, 3) // msg_export_new$001
|
||||
&& cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(msg.trans)); // transaction:^Transaction
|
||||
Ref<vm::Cell> out_msg = cb.finalize();
|
||||
vm::CellBuilder cb;
|
||||
Ref<vm::Cell> out_msg;
|
||||
if (defer) {
|
||||
CHECK(cb.store_long_bool(0b10100, 5) // msg_export_new_defer$10100
|
||||
&& cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(msg.trans)); // transaction:^Transaction
|
||||
out_msg = cb.finalize();
|
||||
} else {
|
||||
CHECK(cb.store_long_bool(1, 3) // msg_export_new$001
|
||||
&& cb.store_ref_bool(msg_env) // out_msg:^MsgEnvelope
|
||||
&& cb.store_ref_bool(msg.trans)); // transaction:^Transaction
|
||||
out_msg = cb.finalize();
|
||||
}
|
||||
// 4. insert OutMsg into OutMsgDescr
|
||||
if (verbosity > 2) {
|
||||
std::cerr << "OutMsg for a newly-generated message: ";
|
||||
|
@ -3646,7 +3988,30 @@ bool Collator::enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_rema
|
|||
// 5. create EnqueuedMsg
|
||||
CHECK(cb.store_long_bool(enqueued_lt) // _ enqueued_lt:uint64
|
||||
&& cb.store_ref_bool(msg_env)); // out_msg:^MsgEnvelope = EnqueuedMsg;
|
||||
// 6. insert EnqueuedMsg into OutMsgQueue
|
||||
|
||||
// 6. insert EnqueuedMsg into OutMsgQueue (or DispatchQueue)
|
||||
if (defer) {
|
||||
LOG(INFO) << "deferring new message from account " << workchain() << ":" << src_addr.to_hex() << ", lt=" << msg.lt;
|
||||
vm::Dictionary dispatch_dict{64};
|
||||
td::uint64 dispatch_dict_size;
|
||||
if (!block::unpack_account_dispatch_queue(dispatch_queue_->lookup(src_addr), dispatch_dict, dispatch_dict_size)) {
|
||||
return fatal_error(PSTRING() << "cannot unpack AccountDispatchQueue for account " << src_addr.to_hex());
|
||||
}
|
||||
td::BitArray<64> key;
|
||||
key.store_ulong(msg.lt);
|
||||
if (!dispatch_dict.set_builder(key, cb, vm::Dictionary::SetMode::Add)) {
|
||||
return fatal_error(PSTRING() << "cannot add message to AccountDispatchQueue for account " << src_addr.to_hex()
|
||||
<< ", lt=" << msg.lt);
|
||||
}
|
||||
++dispatch_dict_size;
|
||||
dispatch_queue_->set(src_addr, block::pack_account_dispatch_queue(dispatch_dict, dispatch_dict_size));
|
||||
++dispatch_queue_ops_;
|
||||
if (!(dispatch_queue_ops_ & 63)) {
|
||||
return block_limit_status_->add_proof(dispatch_queue_->get_root_cell());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
auto next_hop = block::interpolate_addr(src_prefix, dest_prefix, route_info.second);
|
||||
td::BitArray<32 + 64 + 256> key;
|
||||
key.bits().store_int(next_hop.workchain, 32);
|
||||
|
@ -3680,7 +4045,7 @@ bool Collator::process_new_messages(bool enqueue_only) {
|
|||
block::NewOutMsg msg = new_msgs.top();
|
||||
new_msgs.pop();
|
||||
block_limit_status_->extra_out_msgs--;
|
||||
if (block_full_ && !enqueue_only) {
|
||||
if ((block_full_ || have_unprocessed_account_dispatch_queue_) && !enqueue_only) {
|
||||
LOG(INFO) << "BLOCK FULL, enqueue all remaining new messages";
|
||||
enqueue_only = true;
|
||||
}
|
||||
|
@ -3713,11 +4078,17 @@ void Collator::register_new_msg(block::NewOutMsg new_msg) {
|
|||
* Registers new messages that were created in the transaction.
|
||||
*
|
||||
* @param trans The transaction containing the messages.
|
||||
* @param msg_metadata Metadata of the new messages.
|
||||
*/
|
||||
void Collator::register_new_msgs(block::transaction::Transaction& trans) {
|
||||
void Collator::register_new_msgs(block::transaction::Transaction& trans,
|
||||
td::optional<block::MsgMetadata> msg_metadata) {
|
||||
CHECK(trans.root.not_null());
|
||||
for (unsigned i = 0; i < trans.out_msgs.size(); i++) {
|
||||
register_new_msg(trans.extract_out_msg_ext(i));
|
||||
block::NewOutMsg msg = trans.extract_out_msg_ext(i);
|
||||
if (msg_metadata_enabled_) {
|
||||
msg.metadata = msg_metadata;
|
||||
}
|
||||
register_new_msg(std::move(msg));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4617,9 +4988,27 @@ bool Collator::compute_out_msg_queue_info(Ref<vm::Cell>& out_msg_queue_info) {
|
|||
rt->print_rec(std::cerr);
|
||||
}
|
||||
vm::CellBuilder cb;
|
||||
// out_msg_queue_extra#0 dispatch_queue:DispatchQueue out_queue_size:(Maybe uint48) = OutMsgQueueExtra;
|
||||
// ... extra:(Maybe OutMsgQueueExtra)
|
||||
if (!dispatch_queue_->is_empty() || store_out_msg_queue_size_) {
|
||||
if (!(cb.store_long_bool(1, 1) && cb.store_long_bool(0, 4) && dispatch_queue_->append_dict_to_bool(cb))) {
|
||||
return false;
|
||||
}
|
||||
if (!(cb.store_bool_bool(store_out_msg_queue_size_) &&
|
||||
(!store_out_msg_queue_size_ || cb.store_long_bool(out_msg_queue_size_, 48)))) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (!cb.store_long_bool(0, 1)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
vm::CellSlice maybe_extra = cb.as_cellslice();
|
||||
cb.reset();
|
||||
|
||||
return register_out_msg_queue_op(true) && out_msg_queue_->append_dict_to_bool(cb) // _ out_queue:OutMsgQueue
|
||||
&& processed_upto_->pack(cb) // proc_info:ProcessedInfo
|
||||
&& ihr_pending->append_dict_to_bool(cb) // ihr_pending:IhrPendingInfo
|
||||
&& cb.append_cellslice_bool(maybe_extra) // extra:(Maybe OutMsgQueueExtra)
|
||||
&& cb.finalize_to(out_msg_queue_info);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue