mirror of
https://github.com/ton-blockchain/ton
synced 2025-02-13 11:42:18 +00:00
Rework limiting imported msg queues
This commit is contained in:
parent
e6b77ef71d
commit
869c6fe675
9 changed files with 30 additions and 51 deletions
|
@ -829,7 +829,6 @@ top_block_descr#d5 proof_for:BlockIdExt signatures:(Maybe ^BlockSignatures)
|
|||
// COLLATED DATA
|
||||
//
|
||||
top_block_descr_set#4ac789f3 collection:(HashmapE 96 ^TopBlockDescr) = TopBlockDescrSet;
|
||||
neighbor_msg_queue_limits#7e549333 neighbors:(HashmapE 96 int32) = NeighborMsgQueueLimits;
|
||||
|
||||
//
|
||||
// VALIDATOR MISBEHAVIOR COMPLAINTS
|
||||
|
|
|
@ -199,10 +199,6 @@ bool OutputQueueMerger::load() {
|
|||
unsigned long long lt = heap[0]->lt;
|
||||
std::size_t orig_size = msg_list.size();
|
||||
do {
|
||||
if (src_remaining_msgs_[heap[0]->source] == 0) {
|
||||
std::pop_heap(heap.begin(), heap.end(), MsgKeyValue::greater);
|
||||
continue;
|
||||
}
|
||||
while (heap[0]->is_fork()) {
|
||||
auto other = std::make_unique<MsgKeyValue>();
|
||||
if (!heap[0]->split(*other)) {
|
||||
|
@ -218,17 +214,17 @@ bool OutputQueueMerger::load() {
|
|||
heap.pop_back();
|
||||
} while (!heap.empty() && heap[0]->lt <= lt);
|
||||
std::sort(msg_list.begin() + orig_size, msg_list.end(), MsgKeyValue::less);
|
||||
size_t j = orig_size;
|
||||
for (size_t i = orig_size; i < msg_list.size(); ++i) {
|
||||
td::int32 &remaining = src_remaining_msgs_[msg_list[i]->source];
|
||||
if (remaining != 0) {
|
||||
if (remaining > 0) {
|
||||
if (remaining != -1) {
|
||||
if (remaining == 0) {
|
||||
limit_exceeded = true;
|
||||
} else {
|
||||
--remaining;
|
||||
}
|
||||
msg_list[j++] = std::move(msg_list[i]);
|
||||
}
|
||||
msg_list[i]->limit_exceeded = limit_exceeded;
|
||||
}
|
||||
msg_list.resize(j);
|
||||
return msg_list.size() > orig_size;
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ struct OutputQueueMerger {
|
|||
int source;
|
||||
int key_len{0};
|
||||
td::BitArray<max_key_len> key;
|
||||
bool limit_exceeded{false};
|
||||
MsgKeyValue() = default;
|
||||
MsgKeyValue(int src, Ref<vm::Cell> node);
|
||||
MsgKeyValue(td::ConstBitPtr key_pfx, int key_pfx_len, int src, Ref<vm::Cell> node);
|
||||
|
@ -82,6 +83,7 @@ struct OutputQueueMerger {
|
|||
std::vector<td::int32> src_remaining_msgs_;
|
||||
bool eof;
|
||||
bool failed;
|
||||
bool limit_exceeded{false};
|
||||
void add_root(int src, Ref<vm::Cell> outmsg_root, td::int32 msg_limit);
|
||||
bool load();
|
||||
};
|
||||
|
|
|
@ -134,7 +134,6 @@ class Collator final : public td::actor::Actor {
|
|||
std::unique_ptr<block::ShardConfig> shard_conf_;
|
||||
std::map<BlockSeqno, Ref<MasterchainStateQ>> aux_mc_states_;
|
||||
std::map<ShardIdFull, td::int32> neighbor_msg_queues_limits_;
|
||||
vm::Dictionary neighbor_msg_queues_limits_dict_{32 + 64};
|
||||
std::vector<block::McShardDescr> neighbors_;
|
||||
std::unique_ptr<block::OutputQueueMerger> nb_out_msgs_;
|
||||
std::vector<ton::StdSmcAddress> special_smcs;
|
||||
|
|
|
@ -701,11 +701,7 @@ void Collator::got_neighbor_msg_queue(unsigned i, td::Result<Ref<OutMsgQueueProo
|
|||
auto queue_root = qinfo.out_queue->prefetch_ref(0);
|
||||
descr.set_queue_root(queue_root);
|
||||
if (res->msg_count_ != -1) {
|
||||
LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << res->msg_count_;
|
||||
td::BitArray<96> key;
|
||||
key.bits().store_int(block_id.id.workchain, 32);
|
||||
(key.bits() + 32).store_uint(block_id.id.shard, 64);
|
||||
neighbor_msg_queues_limits_dict_.set_builder(key, vm::CellBuilder().store_long(res->msg_count_, 32));
|
||||
LOG(INFO) << "neighbor " << descr.shard().to_str() << " has msg_limit=" << res->msg_count_;
|
||||
neighbor_msg_queues_limits_[block_id.shard_full()] = res->msg_count_;
|
||||
}
|
||||
// comment the next two lines in the future when the output queues become huge
|
||||
|
@ -2833,6 +2829,12 @@ bool Collator::process_inbound_internal_messages() {
|
|||
block_full_ = !block_limit_status_->fits(block::ParamLimits::cl_normal);
|
||||
auto kv = nb_out_msgs_->extract_cur();
|
||||
CHECK(kv && kv->msg.not_null());
|
||||
if (kv->limit_exceeded) {
|
||||
LOG(INFO) << "limit for imported messages is reached, stop processing inbound internal messages";
|
||||
block::EnqueuedMsgDescr enq;
|
||||
enq.unpack(kv->msg.write()); // Visit cells to include it in proof
|
||||
break;
|
||||
}
|
||||
if (!precheck_inbound_message(kv->msg, kv->lt)) {
|
||||
if (verbosity > 1) {
|
||||
std::cerr << "invalid inbound message: lt=" << kv->lt << " from=" << kv->source << " key=" << kv->key.to_hex()
|
||||
|
@ -4089,21 +4091,14 @@ bool Collator::create_collated_data() {
|
|||
}
|
||||
collated_roots_.push_back(std::move(cell));
|
||||
}
|
||||
// 2. Message count for neighbors' out queues
|
||||
if (!neighbor_msg_queues_limits_dict_.is_empty()) {
|
||||
vm::CellBuilder cb;
|
||||
cb.store_long(block::gen::t_NeighborMsgQueueLimits.cons_tag[0], 32);
|
||||
cb.store_maybe_ref(neighbor_msg_queues_limits_dict_.get_root_cell());
|
||||
collated_roots_.push_back(cb.finalize_novm());
|
||||
}
|
||||
if (!full_collated_data_) {
|
||||
return true;
|
||||
}
|
||||
// 3. Proofs for hashes of states: previous states + neighbors
|
||||
// 2. Proofs for hashes of states: previous states + neighbors
|
||||
for (const auto& p : block_state_proofs_) {
|
||||
collated_roots_.push_back(p.second);
|
||||
}
|
||||
// 4. Previous state proof (only shadchains)
|
||||
// 3. Previous state proof (only shadchains)
|
||||
std::map<td::Bits256, Ref<vm::Cell>> proofs;
|
||||
if (!is_masterchain()) {
|
||||
if (!prepare_msg_queue_proof()) {
|
||||
|
@ -4117,7 +4112,7 @@ bool Collator::create_collated_data() {
|
|||
}
|
||||
proofs[prev_state_root_->get_hash().bits()] = std::move(state_proof);
|
||||
}
|
||||
// 5. Proofs for message queues
|
||||
// 4. Proofs for message queues
|
||||
for (vm::MerkleProofBuilder &mpb : neighbor_proof_builders_) {
|
||||
auto r_proof = mpb.extract_proof();
|
||||
if (r_proof.is_error()) {
|
||||
|
|
|
@ -52,7 +52,7 @@ static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_
|
|||
|
||||
td::HashSet<vm::Cell::Hash> visited;
|
||||
std::function<void(const vm::CellSlice&)> dfs_cs;
|
||||
auto dfs = [&](Ref<vm::Cell> cell) {
|
||||
auto dfs = [&](const Ref<vm::Cell>& cell) {
|
||||
if (cell.is_null() || !visited.insert(cell->get_hash()).second) {
|
||||
return;
|
||||
}
|
||||
|
@ -65,6 +65,8 @@ static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_
|
|||
dfs(cs.prefetch_ref(i));
|
||||
}
|
||||
};
|
||||
TRY_STATUS_PREFIX(check_no_prunned(*qinfo.proc_info), "invalid proc_info proof: ")
|
||||
TRY_STATUS_PREFIX(check_no_prunned(*qinfo.ihr_pending), "invalid ihr_pending proof: ")
|
||||
dfs_cs(*qinfo.proc_info);
|
||||
dfs_cs(*qinfo.ihr_pending);
|
||||
|
||||
|
@ -76,6 +78,14 @@ static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_
|
|||
while (!queue_merger.is_eof()) {
|
||||
auto kv = queue_merger.extract_cur();
|
||||
queue_merger.next();
|
||||
block::EnqueuedMsgDescr enq;
|
||||
auto msg = kv->msg;
|
||||
if (!enq.unpack(msg.write())) {
|
||||
return td::Status::Error("cannot unpack EnqueuedMsgDescr");
|
||||
}
|
||||
if (limit_reached) {
|
||||
break;
|
||||
}
|
||||
++msg_count;
|
||||
|
||||
// TODO: Get processed_upto from destination shard (in request?)
|
||||
|
@ -105,7 +115,6 @@ static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_
|
|||
TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ")
|
||||
if (estimated_proof_size > OutMsgQueueProof::QUEUE_SIZE_THRESHOLD) {
|
||||
limit_reached = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return limit_reached ? msg_count : -1;
|
||||
|
|
|
@ -526,14 +526,6 @@ bool ValidateQuery::extract_collated_data_from(Ref<vm::Cell> croot, int idx) {
|
|||
top_shard_descr_dict_ = std::make_unique<vm::Dictionary>(cs.prefetch_ref(), 96);
|
||||
return true;
|
||||
}
|
||||
if (block::gen::t_NeighborMsgQueueLimits.has_valid_tag(cs)) {
|
||||
LOG(DEBUG) << "collated datum # " << idx << " is a NeighborMsgQueueLimits";
|
||||
if (!block::gen::t_NeighborMsgQueueLimits.validate_upto(10000, cs)) {
|
||||
return reject_query("invalid NeighborMsgQueueLimits");
|
||||
}
|
||||
neighbor_msg_queues_limits_ = vm::Dictionary{cs.prefetch_ref(0), 32 + 64};
|
||||
return true;
|
||||
}
|
||||
LOG(WARNING) << "collated datum # " << idx << " has unknown type (magic " << cs.prefetch_ulong(32) << "), ignoring";
|
||||
return true;
|
||||
}
|
||||
|
@ -4115,19 +4107,7 @@ bool ValidateQuery::check_in_queue() {
|
|||
td::BitArray<96> key;
|
||||
key.bits().store_int(descr.workchain(), 32);
|
||||
(key.bits() + 32).store_uint(descr.shard().shard, 64);
|
||||
auto r = neighbor_msg_queues_limits_.lookup(key);
|
||||
td::int32 msg_limit = r.is_null() ? -1 : (td::int32)r->prefetch_long(32);
|
||||
if (msg_limit < -1) {
|
||||
return reject_query("invalid value in NeighborMsgQueueLimits");
|
||||
}
|
||||
LOG(DEBUG) << "Neighbor " << descr.shard().to_str() << " has msg_limit=" << msg_limit;
|
||||
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_, msg_limit);
|
||||
if (msg_limit != -1 && descr.shard().is_masterchain()) {
|
||||
return reject_query("masterchain out message queue cannot be limited");
|
||||
}
|
||||
if (msg_limit != -1 && shard_intersects(descr.shard(), shard_)) {
|
||||
return reject_query("prev block out message queue cannot be limited");
|
||||
}
|
||||
neighbor_queues.emplace_back(descr.top_block_id(), descr.outmsg_root, descr.disabled_);
|
||||
}
|
||||
block::OutputQueueMerger nb_out_msgs(shard_, std::move(neighbor_queues));
|
||||
while (!nb_out_msgs.is_eof()) {
|
||||
|
|
|
@ -206,7 +206,6 @@ class ValidateQuery : public td::actor::Actor {
|
|||
block::ActionPhaseConfig action_phase_cfg_;
|
||||
td::RefInt256 masterchain_create_fee_, basechain_create_fee_;
|
||||
|
||||
vm::Dictionary neighbor_msg_queues_limits_{32 + 64};
|
||||
std::vector<block::McShardDescr> neighbors_;
|
||||
std::map<BlockSeqno, Ref<MasterchainStateQ>> aux_mc_states_;
|
||||
|
||||
|
|
|
@ -417,7 +417,7 @@ void ValidatorGroup::send_collate_query(td::uint32 round_id, td::Timestamp timeo
|
|||
promise = td::PromiseCreator::lambda([=, SelfId = actor_id(this), promise = std::move(promise),
|
||||
timer = td::Timer()](td::Result<BlockCandidate> R) mutable {
|
||||
if (R.is_ok()) {
|
||||
LOG(WARNING) << "collate query for " << next_block_id.to_str() << ": success, time=" << timer.elapsed() << "s";
|
||||
LOG(INFO) << "collate query for " << next_block_id.to_str() << ": success, time=" << timer.elapsed() << "s";
|
||||
promise.set_result(R.move_as_ok());
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue