1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Move msg queue limits to config

This commit is contained in:
SpyCheese 2023-07-24 15:29:55 +03:00
parent 66b98b6d6a
commit f1e62d0075
21 changed files with 109 additions and 48 deletions

View file

@ -655,6 +655,12 @@ bool EnqueuedMsgDescr::check_key(td::ConstBitPtr key) const {
hash_ == key + 96; hash_ == key + 96;
} }
bool ImportedMsgQueueLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0xd3 // imported_msg_queue_limits#d3
&& cs.fetch_uint_to(32, max_bytes) // max_bytes:#
&& cs.fetch_uint_to(32, max_msgs); // max_msgs:#
}
bool ParamLimits::deserialize(vm::CellSlice& cs) { bool ParamLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0xc3 // param_limits#c3 return cs.fetch_ulong(8) == 0xc3 // param_limits#c3
&& cs.fetch_uint_to(32, limits_[0]) // underload:uint32 && cs.fetch_uint_to(32, limits_[0]) // underload:uint32
@ -666,10 +672,16 @@ bool ParamLimits::deserialize(vm::CellSlice& cs) {
} }
bool BlockLimits::deserialize(vm::CellSlice& cs) { bool BlockLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0x5d // block_limits#5d auto tag = cs.fetch_ulong(8);
&& bytes.deserialize(cs) // bytes:ParamLimits if (tag != 0x5d && tag != 0x5e) {
&& gas.deserialize(cs) // gas:ParamLimits return false;
&& lt_delta.deserialize(cs); // lt_delta:ParamLimits }
// block_limits#5d
// block_limits_v2#5e
return bytes.deserialize(cs) // bytes:ParamLimits
&& gas.deserialize(cs) // gas:ParamLimits
&& lt_delta.deserialize(cs) // lt_delta:ParamLimits
&& (tag == 0x5d || imported_msg_queue.deserialize(cs)); // imported_msg_queue:ImportedMsgQueueLimits
} }
int ParamLimits::classify(td::uint64 value) const { int ParamLimits::classify(td::uint64 value) const {

View file

@ -216,6 +216,13 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC
return proc_coll.print(os); return proc_coll.print(os);
} }
struct ImportedMsgQueueLimits {
// Default values
td::uint32 max_bytes = 1 << 18;
td::uint32 max_msgs = 40;
bool deserialize(vm::CellSlice& cs);
};
struct ParamLimits { struct ParamLimits {
enum { limits_cnt = 4 }; enum { limits_cnt = 4 };
enum { cl_underload = 0, cl_normal = 1, cl_soft = 2, cl_medium = 3, cl_hard = 4 }; enum { cl_underload = 0, cl_normal = 1, cl_soft = 2, cl_medium = 3, cl_hard = 4 };
@ -247,6 +254,7 @@ struct ParamLimits {
struct BlockLimits { struct BlockLimits {
ParamLimits bytes, gas, lt_delta; ParamLimits bytes, gas, lt_delta;
ton::LogicalTime start_lt{0}; ton::LogicalTime start_lt{0};
ImportedMsgQueueLimits imported_msg_queue;
const vm::CellUsageTree* usage_tree{nullptr}; const vm::CellUsageTree* usage_tree{nullptr};
bool deserialize(vm::CellSlice& cs); bool deserialize(vm::CellSlice& cs);
int classify_size(td::uint64 size) const; int classify_size(td::uint64 size) const;

View file

@ -704,9 +704,13 @@ config_gas_prices#_ GasLimitsPrices = ConfigParam 21;
param_limits#c3 underload:# soft_limit:# { underload <= soft_limit } param_limits#c3 underload:# soft_limit:# { underload <= soft_limit }
hard_limit:# { soft_limit <= hard_limit } = ParamLimits; hard_limit:# { soft_limit <= hard_limit } = ParamLimits;
imported_msg_queue_limits#d3 max_bytes:# max_msgs:# = ImportedMsgQueueLimits;
block_limits#5d bytes:ParamLimits gas:ParamLimits lt_delta:ParamLimits block_limits#5d bytes:ParamLimits gas:ParamLimits lt_delta:ParamLimits
= BlockLimits; = BlockLimits;
block_limits_v2#5e bytes:ParamLimits gas:ParamLimits lt_delta:ParamLimits
imported_msg_queue:ImportedMsgQueueLimits
= BlockLimits;
config_mc_block_limits#_ BlockLimits = ConfigParam 22; config_mc_block_limits#_ BlockLimits = ConfigParam 22;
config_block_limits#_ BlockLimits = ConfigParam 23; config_block_limits#_ BlockLimits = ConfigParam 23;

View file

@ -415,6 +415,7 @@ tonNode.success = tonNode.Success;
tonNode.archiveNotFound = tonNode.ArchiveInfo; tonNode.archiveNotFound = tonNode.ArchiveInfo;
tonNode.archiveInfo id:long = tonNode.ArchiveInfo; tonNode.archiveInfo id:long = tonNode.ArchiveInfo;
tonNode.importedMsgQueueLimits max_bytes:int max_msgs:int = ImportedMsgQueueLimits;
tonNode.outMsgQueueProof queue_proof:bytes block_state_proof:bytes msg_count:int = tonNode.OutMsgQueueProof; tonNode.outMsgQueueProof queue_proof:bytes block_state_proof:bytes msg_count:int = tonNode.OutMsgQueueProof;
tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof; tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof;
@ -451,7 +452,8 @@ tonNode.downloadBlockProofLinks blocks:(vector tonNode.blockIdExt) = tonNode.Dat
tonNode.downloadKeyBlockProofLinks blocks:(vector tonNode.blockIdExt) = tonNode.DataList; tonNode.downloadKeyBlockProofLinks blocks:(vector tonNode.blockIdExt) = tonNode.DataList;
tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo; tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo;
tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data; tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data;
tonNode.getOutMsgQueueProof block_id:tonNode.blockIdExt dst_workchain:int dst_shard:long = tonNode.OutMsgQueueProof; tonNode.getOutMsgQueueProof block_id:tonNode.blockIdExt dst_workchain:int dst_shard:long
limits:tonNode.importedMsgQueueLimits = tonNode.OutMsgQueueProof;
tonNode.getCapabilities = tonNode.Capabilities; tonNode.getCapabilities = tonNode.Capabilities;
tonNode.getCapabilitiesV2 = tonNode.Capabilities; tonNode.getCapabilitiesV2 = tonNode.Capabilities;

Binary file not shown.

View file

@ -666,6 +666,7 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
td::Promise<td::BufferSlice> promise) { td::Promise<td::BufferSlice> promise) {
BlockIdExt block_id = create_block_id(query.block_id_); BlockIdExt block_id = create_block_id(query.block_id_);
ShardIdFull dst_shard(query.dst_workchain_, query.dst_shard_); ShardIdFull dst_shard(query.dst_workchain_, query.dst_shard_);
block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_};
if (!block_id.is_valid_ext()) { if (!block_id.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid block_id")); promise.set_error(td::Status::Error("invalid block_id"));
return; return;
@ -674,6 +675,10 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
promise.set_error(td::Status::Error("invalid shard")); promise.set_error(td::Status::Error("invalid shard"));
return; return;
} }
if (limits.max_bytes > (1 << 21)) {
promise.set_error(td::Status::Error("max_bytes is too big"));
return;
}
auto P = td::PromiseCreator::lambda( auto P = td::PromiseCreator::lambda(
[promise = std::move(promise)](td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> R) mutable { [promise = std::move(promise)](td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> R) mutable {
if (R.is_error()) { if (R.is_error()) {
@ -684,7 +689,7 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
}); });
VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof " << block_id.to_str() << " " << dst_shard.to_str() VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof " << block_id.to_str() << " " << dst_shard.to_str()
<< " from " << src; << " from " << src;
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", block_id, dst_shard, validator_manager_, td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", block_id, dst_shard, limits, validator_manager_,
std::move(P)) std::move(P))
.release(); .release();
} }
@ -924,7 +929,8 @@ void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, std::stri
.release(); .release();
} }
void FullNodeShardImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout, void FullNodeShardImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) { td::Promise<td::Ref<OutMsgQueueProof>> promise) {
// TODO: maybe more complex download (like other requests here) // TODO: maybe more complex download (like other requests here)
auto &b = choose_neighbour(true); auto &b = choose_neighbour(true);
@ -944,13 +950,14 @@ void FullNodeShardImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardI
promise.set_error(td::Status::Error("node doesn't have this block")); promise.set_error(td::Status::Error("node doesn't have this block"));
}, },
[&](ton_api::tonNode_outMsgQueueProof &x) { [&](ton_api::tonNode_outMsgQueueProof &x) {
promise.set_result(OutMsgQueueProof::fetch(block_id, dst_shard, x)); promise.set_result(OutMsgQueueProof::fetch(block_id, dst_shard, limits, x));
})); }));
}); });
td::BufferSlice query = create_serialize_tl_object<ton_api::tonNode_getOutMsgQueueProof>( td::BufferSlice query = create_serialize_tl_object<ton_api::tonNode_getOutMsgQueueProof>(
create_tl_block_id(block_id), dst_shard.workchain, dst_shard.shard); create_tl_block_id(block_id), dst_shard.workchain, dst_shard.shard,
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(limits.max_bytes, limits.max_msgs));
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_, td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_,
"get_msg_queue", std::move(P), timeout, std::move(query), 1 << 20, rldp_); "get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_);
} }
void FullNodeShardImpl::set_handle(BlockHandle handle, td::Promise<td::Unit> promise) { void FullNodeShardImpl::set_handle(BlockHandle handle, td::Promise<td::Unit> promise) {

View file

@ -70,7 +70,8 @@ class FullNodeShard : public td::actor::Actor {
td::Promise<std::vector<BlockIdExt>> promise) = 0; td::Promise<std::vector<BlockIdExt>> promise) = 0;
virtual void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, virtual void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise) = 0; td::Promise<std::string> promise) = 0;
virtual void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout, virtual void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) = 0; td::Promise<td::Ref<OutMsgQueueProof>> promise) = 0;
virtual void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0; virtual void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0;

View file

@ -186,8 +186,8 @@ class FullNodeShardImpl : public FullNodeShard {
td::Promise<std::vector<BlockIdExt>> promise) override; td::Promise<std::vector<BlockIdExt>> promise) override;
void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise) override; td::Promise<std::string> promise) override;
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout, void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::Promise<td::Ref<OutMsgQueueProof>> promise) override; td::Timestamp timeout, td::Promise<td::Ref<OutMsgQueueProof>> promise) override;
void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) override; void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;

View file

@ -378,7 +378,8 @@ void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, std::string tm
std::move(promise)); std::move(promise));
} }
void FullNodeImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout, void FullNodeImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) { td::Promise<td::Ref<OutMsgQueueProof>> promise) {
auto shard = get_shard(block_id.shard_full()); auto shard = get_shard(block_id.shard_full());
if (shard.empty()) { if (shard.empty()) {
@ -386,7 +387,7 @@ void FullNodeImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull
promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready")); promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready"));
return; return;
} }
td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, block_id, dst_shard, timeout, td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, block_id, dst_shard, limits, timeout,
std::move(promise)); std::move(promise));
} }
@ -587,9 +588,9 @@ void FullNodeImpl::start_up() {
td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, std::move(tmp_dir), timeout, td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, std::move(tmp_dir), timeout,
std::move(promise)); std::move(promise));
} }
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout, void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::Promise<td::Ref<OutMsgQueueProof>> promise) override { td::Timestamp timeout, td::Promise<td::Ref<OutMsgQueueProof>> promise) override {
td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, block_id, dst_shard, timeout, td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, block_id, dst_shard, limits, timeout,
std::move(promise)); std::move(promise));
} }

View file

@ -74,8 +74,8 @@ class FullNodeImpl : public FullNode {
void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise<std::vector<BlockIdExt>> promise); void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise<std::vector<BlockIdExt>> promise);
void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise); td::Promise<std::string> promise);
void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout, void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::Promise<td::Ref<OutMsgQueueProof>> promise); td::Timestamp timeout, td::Promise<td::Ref<OutMsgQueueProof>> promise);
void got_key_block_proof(td::Ref<ProofLink> proof); void got_key_block_proof(td::Ref<ProofLink> proof);
void got_zero_block_state(td::Ref<ShardState> state); void got_zero_block_state(td::Ref<ShardState> state);

View file

@ -47,6 +47,7 @@ static td::Status check_no_prunned(const vm::CellSlice& cs) {
} }
static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_shard, static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits,
const block::gen::OutMsgQueueInfo::Record& qinfo) { const block::gen::OutMsgQueueInfo::Record& qinfo) {
td::uint64 estimated_proof_size = 0; td::uint64 estimated_proof_size = 0;
@ -113,17 +114,16 @@ static td::Result<td::int32> process_queue(BlockIdExt block_id, ShardIdFull dst_
dfs_cs(*kv->msg); dfs_cs(*kv->msg);
TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ") TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ")
if (estimated_proof_size > OutMsgQueueProof::QUEUE_SIZE_THRESHOLD) { if (estimated_proof_size >= limits.max_bytes || msg_count >= (long long)limits.max_msgs) {
limit_reached = true; limit_reached = true;
} }
} }
return limit_reached ? msg_count : -1; return limit_reached ? msg_count : -1;
} }
td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> OutMsgQueueProof::build(BlockIdExt block_id, td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> OutMsgQueueProof::build(
ShardIdFull dst_shard, BlockIdExt block_id, ShardIdFull dst_shard,
Ref<vm::Cell> state_root, block::ImportedMsgQueueLimits limits, Ref<vm::Cell> state_root, Ref<vm::Cell> block_root) {
Ref<vm::Cell> block_root) {
if (!dst_shard.is_valid_ext()) { if (!dst_shard.is_valid_ext()) {
return td::Status::Error("invalid shard"); return td::Status::Error("invalid shard");
} }
@ -135,7 +135,7 @@ td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> OutMsgQueueProof::b
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) { if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
return td::Status::Error("invalid message queue"); return td::Status::Error("invalid message queue");
} }
TRY_RESULT(cnt, process_queue(block_id, dst_shard, qinfo)); TRY_RESULT(cnt, process_queue(block_id, dst_shard, limits, qinfo));
TRY_RESULT(queue_proof, mpb.extract_proof_boc()); TRY_RESULT(queue_proof, mpb.extract_proof_boc());
td::BufferSlice block_state_proof; td::BufferSlice block_state_proof;
@ -148,6 +148,7 @@ td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> OutMsgQueueProof::b
} }
td::Result<td::Ref<OutMsgQueueProof>> OutMsgQueueProof::fetch(BlockIdExt block_id, ShardIdFull dst_shard, td::Result<td::Ref<OutMsgQueueProof>> OutMsgQueueProof::fetch(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits,
const ton_api::tonNode_outMsgQueueProof& f) { const ton_api::tonNode_outMsgQueueProof& f) {
try { try {
Ref<vm::Cell> block_state_proof; Ref<vm::Cell> block_state_proof;
@ -181,7 +182,7 @@ td::Result<td::Ref<OutMsgQueueProof>> OutMsgQueueProof::fetch(BlockIdExt block_i
} }
TRY_STATUS_PREFIX(check_no_prunned(qinfo.proc_info->prefetch_ref(0)), "invalid proc_info: ") TRY_STATUS_PREFIX(check_no_prunned(qinfo.proc_info->prefetch_ref(0)), "invalid proc_info: ")
TRY_STATUS_PREFIX(check_no_prunned(qinfo.ihr_pending->prefetch_ref(0)), "invalid ihr_pending: ") TRY_STATUS_PREFIX(check_no_prunned(qinfo.ihr_pending->prefetch_ref(0)), "invalid ihr_pending: ")
TRY_RESULT(cnt, process_queue(block_id, dst_shard, qinfo)); TRY_RESULT(cnt, process_queue(block_id, dst_shard, limits, qinfo));
if (cnt != f.msg_count_) { if (cnt != f.msg_count_) {
return td::Status::Error(PSTRING() << "invalid msg_count: expected=" << f.msg_count_ << ", found=" << cnt); return td::Status::Error(PSTRING() << "invalid msg_count: expected=" << f.msg_count_ << ", found=" << cnt);
} }
@ -295,7 +296,7 @@ void WaitOutMsgQueueProof::run_net() {
}); });
td::actor::send_closure(manager_, &ValidatorManager::send_get_out_msg_queue_proof_request, block_id_, dst_shard_, td::actor::send_closure(manager_, &ValidatorManager::send_get_out_msg_queue_proof_request, block_id_, dst_shard_,
priority_, std::move(P)); limits_, priority_, std::move(P));
} }
void BuildOutMsgQueueProof::abort_query(td::Status reason) { void BuildOutMsgQueueProof::abort_query(td::Status reason) {
@ -349,7 +350,7 @@ void BuildOutMsgQueueProof::got_block_root(Ref<vm::Cell> root) {
} }
void BuildOutMsgQueueProof::build_proof() { void BuildOutMsgQueueProof::build_proof() {
auto result = OutMsgQueueProof::build(block_id_, dst_shard_, std::move(state_root_), std::move(block_root_)); auto result = OutMsgQueueProof::build(block_id_, dst_shard_, limits_, std::move(state_root_), std::move(block_root_));
if (result.is_error()) { if (result.is_error()) {
LOG(ERROR) << "Failed to build msg queue proof: " << result.error(); LOG(ERROR) << "Failed to build msg queue proof: " << result.error();
} }

View file

@ -31,11 +31,12 @@ class ValidatorManagerInterface;
class WaitOutMsgQueueProof : public td::actor::Actor { class WaitOutMsgQueueProof : public td::actor::Actor {
public: public:
WaitOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard, bool local, td::uint32 priority, WaitOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits, bool local,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout, td::uint32 priority, td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
td::Promise<Ref<OutMsgQueueProof>> promise) td::Promise<Ref<OutMsgQueueProof>> promise)
: block_id_(std::move(block_id)) : block_id_(std::move(block_id))
, dst_shard_(dst_shard) , dst_shard_(dst_shard)
, limits_(limits)
, local_(local) , local_(local)
, priority_(priority) , priority_(priority)
, manager_(manager) , manager_(manager)
@ -65,6 +66,7 @@ class WaitOutMsgQueueProof : public td::actor::Actor {
private: private:
BlockIdExt block_id_; BlockIdExt block_id_;
ShardIdFull dst_shard_; ShardIdFull dst_shard_;
block::ImportedMsgQueueLimits limits_;
bool local_; bool local_;
td::uint32 priority_; td::uint32 priority_;
@ -78,10 +80,14 @@ class WaitOutMsgQueueProof : public td::actor::Actor {
class BuildOutMsgQueueProof : public td::actor::Actor { class BuildOutMsgQueueProof : public td::actor::Actor {
public: public:
BuildOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard, BuildOutMsgQueueProof(BlockIdExt block_id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::actor::ActorId<ValidatorManagerInterface> manager, td::actor::ActorId<ValidatorManagerInterface> manager,
td::Promise<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> promise) td::Promise<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> promise)
: block_id_(std::move(block_id)), dst_shard_(dst_shard), manager_(manager), promise_(std::move(promise)) { : block_id_(std::move(block_id))
, dst_shard_(dst_shard)
, limits_(limits)
, manager_(manager)
, promise_(std::move(promise)) {
} }
void abort_query(td::Status reason); void abort_query(td::Status reason);
@ -93,6 +99,7 @@ class BuildOutMsgQueueProof : public td::actor::Actor {
private: private:
BlockIdExt block_id_; BlockIdExt block_id_;
ShardIdFull dst_shard_; ShardIdFull dst_shard_;
block::ImportedMsgQueueLimits limits_;
td::actor::ActorId<ValidatorManagerInterface> manager_; td::actor::ActorId<ValidatorManagerInterface> manager_;
td::Promise<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> promise_; td::Promise<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> promise_;

View file

@ -130,6 +130,13 @@ class MasterchainStateQ : public MasterchainState, public ShardStateQ {
auto R = config_->get_size_limits_config(); auto R = config_->get_size_limits_config();
return R.is_error() ? block::SizeLimitsConfig::ExtMsgLimits() : R.ok_ref().ext_msg_limits; return R.is_error() ? block::SizeLimitsConfig::ExtMsgLimits() : R.ok_ref().ext_msg_limits;
} }
block::ImportedMsgQueueLimits get_imported_msg_queue_limits(bool is_masterchain) const override {
auto R = config_->get_block_limits(is_masterchain);
if (R.is_ok() && R.ok()) {
return R.ok()->imported_msg_queue;
}
return {};
}
BlockIdExt last_key_block_id() const override; BlockIdExt last_key_block_id() const override;
BlockIdExt next_key_block_id(BlockSeqno seqno) const override; BlockIdExt next_key_block_id(BlockSeqno seqno) const override;
BlockIdExt prev_key_block_id(BlockSeqno seqno) const override; BlockIdExt prev_key_block_id(BlockSeqno seqno) const override;

View file

@ -35,12 +35,13 @@ struct OutMsgQueueProof : public td::CntObject {
td::int32 msg_count_; // -1 - up to end of queue td::int32 msg_count_; // -1 - up to end of queue
static td::Result<td::Ref<OutMsgQueueProof>> fetch(BlockIdExt block_id, ShardIdFull dst_shard, static td::Result<td::Ref<OutMsgQueueProof>> fetch(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits,
const ton_api::tonNode_outMsgQueueProof &f); const ton_api::tonNode_outMsgQueueProof &f);
static td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> build(BlockIdExt block_id, ShardIdFull dst_shard, static td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> build(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits,
Ref<vm::Cell> state_root, Ref<vm::Cell> state_root,
Ref<vm::Cell> block_root); Ref<vm::Cell> block_root);
static const td::uint64 QUEUE_SIZE_THRESHOLD = 128 * 1024;
}; };
} // namespace validator } // namespace validator

View file

@ -88,6 +88,7 @@ class MasterchainState : virtual public ShardState {
return td::Status::OK(); return td::Status::OK();
} }
virtual block::SizeLimitsConfig::ExtMsgLimits get_ext_msg_limits() const = 0; virtual block::SizeLimitsConfig::ExtMsgLimits get_ext_msg_limits() const = 0;
virtual block::ImportedMsgQueueLimits get_imported_msg_queue_limits(bool is_masterchain) const = 0;
}; };
} // namespace validator } // namespace validator

View file

@ -129,7 +129,8 @@ class ValidatorManager : public ValidatorManagerInterface {
virtual void send_ihr_message(td::Ref<IhrMessage> message) = 0; virtual void send_ihr_message(td::Ref<IhrMessage> message) = 0;
virtual void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) = 0; virtual void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) = 0;
virtual void send_block_broadcast(BlockBroadcast broadcast) = 0; virtual void send_block_broadcast(BlockBroadcast broadcast) = 0;
virtual void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, td::uint32 priority, virtual void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits, td::uint32 priority,
td::Promise<td::Ref<OutMsgQueueProof>> promise) = 0; td::Promise<td::Ref<OutMsgQueueProof>> promise) = 0;
virtual void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) = 0; virtual void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) = 0;

View file

@ -255,7 +255,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override; void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override;
void send_block_broadcast(BlockBroadcast broadcast) override { void send_block_broadcast(BlockBroadcast broadcast) override {
} }
void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, td::uint32 priority, void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::uint32 priority,
td::Promise<td::Ref<OutMsgQueueProof>> promise) override { td::Promise<td::Ref<OutMsgQueueProof>> promise) override {
UNREACHABLE(); UNREACHABLE();
} }

View file

@ -321,7 +321,8 @@ class ValidatorManagerImpl : public ValidatorManager {
} }
void send_block_broadcast(BlockBroadcast broadcast) override { void send_block_broadcast(BlockBroadcast broadcast) override {
} }
void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, td::uint32 priority, void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::uint32 priority,
td::Promise<td::Ref<OutMsgQueueProof>> promise) override { td::Promise<td::Ref<OutMsgQueueProof>> promise) override {
UNREACHABLE(); UNREACHABLE();
} }

View file

@ -630,16 +630,18 @@ void ValidatorManagerImpl::wait_out_msg_queue_proof(BlockIdExt block_id, ShardId
std::move(R)); std::move(R));
}); });
auto id = td::actor::create_actor<WaitOutMsgQueueProof>( auto id = td::actor::create_actor<WaitOutMsgQueueProof>(
"waitmsgqueue", block_id, dst_shard, need_monitor(block_id.shard_full()), priority, actor_id(this), "waitmsgqueue", block_id, dst_shard,
td::Timestamp::at(timeout.at() + 10.0), std::move(P)) last_masterchain_state_->get_imported_msg_queue_limits(block_id.is_masterchain()),
need_monitor(block_id.shard_full()), priority, actor_id(this), td::Timestamp::at(timeout.at() + 10.0),
std::move(P))
.release(); .release();
wait_out_msg_queue_proof_[key].actor_ = id; wait_out_msg_queue_proof_[key].actor_ = id;
it = wait_out_msg_queue_proof_.find(key); it = wait_out_msg_queue_proof_.find(key);
} else if (it->second.done_) { } else if (it->second.done_) {
promise.set_result(it->second.result_); promise.set_result(it->second.result_);
it->second.remove_at_ = td::Timestamp::in(30.0); it->second.remove_at_ = td::Timestamp::in(30.0);
return;
} }
it->second.waiting_.emplace_back(timeout, priority, std::move(promise)); it->second.waiting_.emplace_back(timeout, priority, std::move(promise));
auto X = it->second.get_timeout(); auto X = it->second.get_timeout();
td::actor::send_closure(it->second.actor_, &WaitOutMsgQueueProof::update_timeout, X.first, X.second); td::actor::send_closure(it->second.actor_, &WaitOutMsgQueueProof::update_timeout, X.first, X.second);
@ -1097,9 +1099,10 @@ void ValidatorManagerImpl::finished_wait_msg_queue(BlockIdExt block_id, ShardIdF
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard, td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard,
std::move(R)); std::move(R));
}); });
auto id = td::actor::create_actor<WaitOutMsgQueueProof>("waitmsgqueue", block_id, dst_shard, auto id = td::actor::create_actor<WaitOutMsgQueueProof>(
need_monitor(block_id.shard_full()), X.second, "waitmsgqueue", block_id, dst_shard,
actor_id(this), X.first, std::move(P)) last_masterchain_state_->get_imported_msg_queue_limits(block_id.is_masterchain()),
need_monitor(block_id.shard_full()), X.second, actor_id(this), X.first, std::move(P))
.release(); .release();
it->second.actor_ = id; it->second.actor_ = id;
return; return;
@ -1511,9 +1514,10 @@ void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast) {
} }
void ValidatorManagerImpl::send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, void ValidatorManagerImpl::send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits,
td::uint32 priority, td::uint32 priority,
td::Promise<td::Ref<OutMsgQueueProof>> promise) { td::Promise<td::Ref<OutMsgQueueProof>> promise) {
callback_->download_out_msg_queue_proof(id, dst_shard, td::Timestamp::in(10.0), std::move(promise)); callback_->download_out_msg_queue_proof(id, dst_shard, limits, td::Timestamp::in(10.0), std::move(promise));
} }
void ValidatorManagerImpl::start_up() { void ValidatorManagerImpl::start_up() {
@ -1839,7 +1843,7 @@ void ValidatorManagerImpl::new_masterchain_block() {
// Prepare neighboours' queues for collating masterchain // Prepare neighboours' queues for collating masterchain
for (const auto &desc : last_masterchain_state_->get_shards()) { for (const auto &desc : last_masterchain_state_->get_shards()) {
wait_out_msg_queue_proof(desc->top_block_id(), ShardIdFull(masterchainId), 0, td::Timestamp::in(10.0), wait_out_msg_queue_proof(desc->top_block_id(), ShardIdFull(masterchainId), 0, td::Timestamp::in(10.0),
[](td::Ref<OutMsgQueueProof>) {}); [](td::Result<td::Ref<OutMsgQueueProof>>) {});
} }
} }

View file

@ -462,7 +462,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void send_ihr_message(td::Ref<IhrMessage> message) override; void send_ihr_message(td::Ref<IhrMessage> message) override;
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override; void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override;
void send_block_broadcast(BlockBroadcast broadcast) override; void send_block_broadcast(BlockBroadcast broadcast) override;
void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, td::uint32 priority, void send_get_out_msg_queue_proof_request(BlockIdExt id, ShardIdFull dst_shard, block::ImportedMsgQueueLimits limits,
td::uint32 priority,
td::Promise<td::Ref<OutMsgQueueProof>> promise) override; td::Promise<td::Ref<OutMsgQueueProof>> promise) override;
void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) override; void update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) override;

View file

@ -140,7 +140,8 @@ class ValidatorManagerInterface : public td::actor::Actor {
td::Promise<std::vector<BlockIdExt>> promise) = 0; td::Promise<std::vector<BlockIdExt>> promise) = 0;
virtual void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, virtual void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise) = 0; td::Promise<std::string> promise) = 0;
virtual void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout, virtual void download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<td::Ref<OutMsgQueueProof>> promise) = 0; td::Promise<td::Ref<OutMsgQueueProof>> promise) = 0;
virtual void new_key_block(BlockHandle handle) = 0; virtual void new_key_block(BlockHandle handle) = 0;