From 7bc50e63d723fc5af4395413b84679f94d3e03cf Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Wed, 4 Dec 2024 14:38:57 +0300 Subject: [PATCH] tonNode.getOutMsgQueueProof query in public shard overlays (#1413) * tonNode.getOutMsgQueueProof query in public shard overlays * Allow responding to getOutMsgQueueProof requests one at a time only --- create-hardfork/create-hardfork.cpp | 4 + crypto/block/block.cpp | 6 + crypto/block/block.h | 10 + tdutils/td/utils/StringBuilder.h | 15 ++ tdutils/td/utils/logging.h | 10 +- test/test-ton-collator.cpp | 4 + tl/generate/scheme/ton_api.tl | 6 + tl/generate/scheme/ton_api.tlo | Bin 101764 -> 102484 bytes validator/CMakeLists.txt | 1 + validator/full-node-shard.cpp | 98 +++++++ validator/full-node-shard.h | 3 + validator/full-node-shard.hpp | 7 +- validator/full-node.cpp | 30 +++ validator/full-node.h | 1 + validator/full-node.hpp | 8 + validator/impl/CMakeLists.txt | 2 + validator/impl/out-msg-queue-proof.cpp | 294 +++++++++++++++++++++ validator/impl/out-msg-queue-proof.hpp | 64 +++++ validator/impl/proof.cpp | 35 +++ validator/interfaces/out-msg-queue-proof.h | 57 ++++ validator/interfaces/proof.h | 3 + validator/interfaces/validator-manager.h | 3 + validator/manager-disk.hpp | 7 +- validator/manager-hardfork.hpp | 7 +- validator/manager.cpp | 7 + validator/manager.hpp | 7 +- validator/net/download-block-new.cpp | 4 +- validator/net/download-block-new.hpp | 4 +- validator/net/download-block.cpp | 4 +- validator/net/download-block.hpp | 4 +- validator/net/download-proof.cpp | 4 +- validator/net/download-proof.hpp | 4 +- validator/net/get-next-key-blocks.cpp | 4 +- validator/net/get-next-key-blocks.hpp | 4 +- validator/token-manager.cpp | 34 +-- validator/token-manager.h | 15 +- validator/validator.h | 10 +- 37 files changed, 729 insertions(+), 51 deletions(-) create mode 100644 validator/impl/out-msg-queue-proof.cpp create mode 100644 validator/impl/out-msg-queue-proof.hpp create mode 100644 validator/interfaces/out-msg-queue-proof.h diff --git a/create-hardfork/create-hardfork.cpp b/create-hardfork/create-hardfork.cpp index 31a60b56..72bffae5 100644 --- a/create-hardfork/create-hardfork.cpp +++ b/create-hardfork/create-hardfork.cpp @@ -272,6 +272,10 @@ class HardforkCreator : public td::actor::Actor { void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) override { } + void download_out_msg_queue_proof( + ton::ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, + td::Timestamp timeout, td::Promise>> promise) override { + } void new_key_block(ton::validator::BlockHandle handle) override { } diff --git a/crypto/block/block.cpp b/crypto/block/block.cpp index 6b9ca8bf..98546a2d 100644 --- a/crypto/block/block.cpp +++ b/crypto/block/block.cpp @@ -660,6 +660,12 @@ bool EnqueuedMsgDescr::check_key(td::ConstBitPtr key) const { hash_ == key + 96; } +bool ImportedMsgQueueLimits::deserialize(vm::CellSlice& cs) { + return cs.fetch_ulong(8) == 0xd3 // imported_msg_queue_limits#d3 + && cs.fetch_uint_to(32, max_bytes) // max_bytes:# + && cs.fetch_uint_to(32, max_msgs); // max_msgs:# +} + bool ParamLimits::deserialize(vm::CellSlice& cs) { return cs.fetch_ulong(8) == 0xc3 // param_limits#c3 && cs.fetch_uint_to(32, limits_[0]) // underload:uint32 diff --git a/crypto/block/block.h b/crypto/block/block.h index 0247d79c..56e6dd38 100644 --- a/crypto/block/block.h +++ b/crypto/block/block.h @@ -216,6 +216,16 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC return proc_coll.print(os); } +struct ImportedMsgQueueLimits { + // Default values + td::uint32 max_bytes = 1 << 16; + td::uint32 max_msgs = 30; + bool deserialize(vm::CellSlice& cs); + ImportedMsgQueueLimits operator*(td::uint32 x) const { + return {max_bytes * x, max_msgs * x}; + } +}; + struct ParamLimits { enum { limits_cnt = 4 }; enum { cl_underload = 0, cl_normal = 1, cl_soft = 2, cl_medium = 3, cl_hard = 4 }; diff --git a/tdutils/td/utils/StringBuilder.h b/tdutils/td/utils/StringBuilder.h index 99e9d517..685416fe 100644 --- a/tdutils/td/utils/StringBuilder.h +++ b/tdutils/td/utils/StringBuilder.h @@ -149,4 +149,19 @@ std::enable_if_t::value, string> to_string(const T &x) { return sb.as_cslice().str(); } +template +struct LambdaPrintHelper { + SB& sb; +}; +template +SB& operator<<(const LambdaPrintHelper& helper, F&& f) { + f(helper.sb); + return helper.sb; +} +struct LambdaPrint {}; + +inline LambdaPrintHelper operator<<(td::StringBuilder& sb, const LambdaPrint&) { + return LambdaPrintHelper{sb}; +} + } // namespace td diff --git a/tdutils/td/utils/logging.h b/tdutils/td/utils/logging.h index d00fba15..5c9a0621 100644 --- a/tdutils/td/utils/logging.h +++ b/tdutils/td/utils/logging.h @@ -74,6 +74,7 @@ #define LOG(level) LOG_IMPL(level, level, true, ::td::Slice()) #define LOG_IF(level, condition) LOG_IMPL(level, level, condition, #condition) +#define FLOG(level) LOG_IMPL(level, level, true, ::td::Slice()) << td::LambdaPrint{} << [&](auto &sb) #define VLOG(level) LOG_IMPL(DEBUG, level, true, TD_DEFINE_STR(level)) #define VLOG_IF(level, condition) LOG_IMPL(DEBUG, level, condition, TD_DEFINE_STR(level) " " #condition) @@ -95,13 +96,13 @@ inline bool no_return_func() { #define DUMMY_LOG_CHECK(condition) LOG_IF(NEVER, !(condition)) #ifdef TD_DEBUG - #if TD_MSVC +#if TD_MSVC #define LOG_CHECK(condition) \ __analysis_assume(!!(condition)); \ LOG_IMPL(FATAL, FATAL, !(condition), #condition) - #else +#else #define LOG_CHECK(condition) LOG_IMPL(FATAL, FATAL, !(condition) && no_return_func(), #condition) - #endif +#endif #else #define LOG_CHECK DUMMY_LOG_CHECK #endif @@ -263,6 +264,9 @@ class Logger { sb_ << other; return *this; } + LambdaPrintHelper operator<<(const LambdaPrint &) { + return LambdaPrintHelper{*this}; + } MutableCSlice as_cslice() { return sb_.as_cslice(); diff --git a/test/test-ton-collator.cpp b/test/test-ton-collator.cpp index 11824851..0fde1e68 100644 --- a/test/test-ton-collator.cpp +++ b/test/test-ton-collator.cpp @@ -373,6 +373,10 @@ class TestNode : public td::actor::Actor { void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) override { } + void download_out_msg_queue_proof( + ton::ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, + td::Timestamp timeout, td::Promise>> promise) override { + } void new_key_block(ton::validator::BlockHandle handle) override { } diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 940dbe0a..cfc9f3a1 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -454,6 +454,10 @@ tonNode.success = tonNode.Success; tonNode.archiveNotFound = tonNode.ArchiveInfo; tonNode.archiveInfo id:long = tonNode.ArchiveInfo; +tonNode.importedMsgQueueLimits max_bytes:int max_msgs:int = ImportedMsgQueueLimits; +tonNode.outMsgQueueProof queue_proofs:bytes block_state_proofs:bytes msg_counts:(vector int) = tonNode.OutMsgQueueProof; +tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof; + tonNode.forgetPeer = tonNode.ForgetPeer; ---functions--- @@ -483,6 +487,8 @@ tonNode.downloadKeyBlockProofLink block:tonNode.blockIdExt = tonNode.Data; tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo; tonNode.getShardArchiveInfo masterchain_seqno:int shard_prefix:tonNode.shardId = tonNode.ArchiveInfo; tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data; +tonNode.getOutMsgQueueProof dst_shard:tonNode.shardId blocks:(vector tonNode.blockIdExt) + limits:tonNode.importedMsgQueueLimits = tonNode.OutMsgQueueProof; tonNode.getCapabilities = tonNode.Capabilities; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index f150221b4de8e0ff867bec49634aeb50fa65a2d4..96ecb7751b1ad3a31830cba56037d0037988cde3 100644 GIT binary patch delta 498 zcmZpf&30u08}Fmp`c@23pf-{Bu{8JNv){!$a|`l|N>WpNi_-&3Q%h5QGIKLaiYKm@ z-1x!ANY+4f#vh52{5-$>lvF+c(h``efTH~Tw8??S(wl9JMc5fPZ4R{h&TRoQP!6U# z6RW`tAT>9S3HdMqT_XMLz9wgGV#V|aa*PU-4KA=U=N6|=HgHs60jb(-<0f#)_L4MH z4BQ3zXf6Pn0dgV8JZ7kQJcS@%#20`ZSUh=RzqF`OQciwyc6@P3VhK!~fnl;@xHvZ# z(7O2K{L;LV;>r2-hCU#(paH~iRgNp>5#Ou@vM~Rk+wGcLP*RDoAEd9ynN5eow(TIu zob3)0j0t*-o!b|>F+R38RgCm$k%k$So?3z)mgwH#OerpjpKcq-C_Q~j7o&v+TMjs0 Wz^(&X0SX{Ss9RC2t!QK9VFv)pf3af# delta 64 zcmcbzfURXV8}Fmp`c@23pfZv7@n#i9AEV7Hj78WP*Kgiy^__dOi#yAu?FmwhA$p7* Q+qZf!KDJ*T(#gmU0F}}eM*si- diff --git a/validator/CMakeLists.txt b/validator/CMakeLists.txt index b7f3787c..5f5544b2 100644 --- a/validator/CMakeLists.txt +++ b/validator/CMakeLists.txt @@ -46,6 +46,7 @@ set(VALIDATOR_HEADERS interfaces/db.h interfaces/external-message.h interfaces/liteserver.h + interfaces/out-msg-queue-proof.h interfaces/proof.h interfaces/shard.h interfaces/signature-set.h diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 9c1ba3c3..7d33a195 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -37,6 +37,7 @@ #include "net/download-proof.hpp" #include "net/get-next-key-blocks.hpp" #include "net/download-archive-slice.hpp" +#include "impl/out-msg-queue-proof.hpp" #include "td/utils/Random.h" @@ -669,6 +670,62 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod query.offset_, query.max_size_, std::move(promise)); } +void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query, + td::Promise promise) { + std::vector blocks; + for (const auto &x : query.blocks_) { + BlockIdExt id = create_block_id(x); + if (!id.is_valid_ext()) { + promise.set_error(td::Status::Error("invalid block_id")); + return; + } + if (!shard_is_ancestor(shard_, id.shard_full())) { + promise.set_error(td::Status::Error("query in wrong overlay")); + return; + } + blocks.push_back(create_block_id(x)); + } + ShardIdFull dst_shard = create_shard_id(query.dst_shard_); + if (!dst_shard.is_valid_ext()) { + promise.set_error(td::Status::Error("invalid shard")); + return; + } + block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_}; + if (limits.max_msgs > 512) { + promise.set_error(td::Status::Error("max_msgs is too big")); + return; + } + if (limits.max_bytes > (1 << 21)) { + promise.set_error(td::Status::Error("max_bytes is too big")); + return; + } + FLOG(DEBUG) { + sb << "Got query getOutMsgQueueProof to shard " << dst_shard.to_str() << " from blocks"; + for (const BlockIdExt &id : blocks) { + sb << " " << id.id.to_str(); + } + sb << " from " << src; + }; + td::actor::send_closure( + full_node_, &FullNode::get_out_msg_queue_query_token, + [=, manager = validator_manager_, blocks = std::move(blocks), + promise = std::move(promise)](td::Result> R) mutable { + TRY_RESULT_PROMISE(promise, token, std::move(R)); + auto P = + td::PromiseCreator::lambda([promise = std::move(promise), token = std::move(token)]( + td::Result> R) mutable { + if (R.is_error()) { + promise.set_result(create_serialize_tl_object()); + } else { + promise.set_result(serialize_tl_object(R.move_as_ok(), true)); + } + }); + td::actor::create_actor("buildqueueproof", dst_shard, std::move(blocks), limits, manager, + std::move(P)) + .release(); + }); +} + void FullNodeShardImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query, td::Promise promise) { if (!active_) { @@ -944,6 +1001,47 @@ void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFu .release(); } +void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, td::Timestamp timeout, + td::Promise>> promise) { + // TODO: maybe more complex download (like other requests here) + auto &b = choose_neighbour(); + if (b.adnl_id == adnl::AdnlNodeIdShort::zero()) { + promise.set_error(td::Status::Error(ErrorCode::notready, "no nodes")); + return; + } + std::vector> blocks_tl; + for (const BlockIdExt &id : blocks) { + blocks_tl.push_back(create_tl_block_id(id)); + } + td::BufferSlice query = create_serialize_tl_object( + create_tl_shard_id(dst_shard), std::move(blocks_tl), + create_tl_object(limits.max_bytes, limits.max_msgs)); + + auto P = td::PromiseCreator::lambda( + [=, promise = std::move(promise), blocks = std::move(blocks)](td::Result R) mutable { + if (R.is_error()) { + promise.set_result(R.move_as_error()); + return; + } + TRY_RESULT_PROMISE(promise, f, fetch_tl_object(R.move_as_ok(), true)); + ton_api::downcast_call( + *f, td::overloaded( + [&](ton_api::tonNode_outMsgQueueProofEmpty &x) { + promise.set_error(td::Status::Error("node doesn't have this block")); + }, + [&](ton_api::tonNode_outMsgQueueProof &x) { + delay_action( + [=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable { + promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x)); + }, + td::Timestamp::now()); + })); + }); + td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_, + "get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_); +} + void FullNodeShardImpl::set_handle(BlockHandle handle, td::Promise promise) { CHECK(!handle_); handle_ = std::move(handle); diff --git a/validator/full-node-shard.h b/validator/full-node-shard.h index f8f500f3..16945325 100644 --- a/validator/full-node-shard.h +++ b/validator/full-node-shard.h @@ -66,6 +66,9 @@ class FullNodeShard : public td::actor::Actor { td::Promise> promise) = 0; virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) = 0; + virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, td::Timestamp timeout, + td::Promise>> promise) = 0; virtual void set_handle(BlockHandle handle, td::Promise promise) = 0; diff --git a/validator/full-node-shard.hpp b/validator/full-node-shard.hpp index 0ae26a7c..86748134 100644 --- a/validator/full-node-shard.hpp +++ b/validator/full-node-shard.hpp @@ -139,8 +139,8 @@ class FullNodeShardImpl : public FullNodeShard { td::Promise promise); void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveSlice &query, td::Promise promise); - // void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_prepareNextKeyBlockProof &query, - // td::Promise promise); + void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query, + td::Promise promise); void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query, td::Promise promise); void receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data); @@ -183,6 +183,9 @@ class FullNodeShardImpl : public FullNodeShard { td::Promise> promise) override; void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) override; + void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, td::Timestamp timeout, + td::Promise>> promise) override; void set_handle(BlockHandle handle, td::Promise promise) override; diff --git a/validator/full-node.cpp b/validator/full-node.cpp index c1173e44..658cb34e 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -21,6 +21,7 @@ #include "td/actor/MultiPromise.h" #include "full-node.h" #include "common/delay.h" +#include "impl/out-msg-queue-proof.hpp" #include "td/utils/Random.h" #include "ton/ton-tl.hpp" @@ -430,6 +431,24 @@ void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFull sh timeout, std::move(promise)); } +void FullNodeImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, td::Timestamp timeout, + td::Promise>> promise) { + if (blocks.empty()) { + promise.set_value({}); + return; + } + // All blocks are expected to have the same minsplit shard prefix + auto shard = get_shard(blocks[0].shard_full()); + if (shard.empty()) { + VLOG(FULL_NODE_WARNING) << "dropping download msg queue query to unknown shard"; + promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready")); + return; + } + td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits, + timeout, std::move(promise)); +} + td::actor::ActorId FullNodeImpl::get_shard(ShardIdFull shard) { if (shard.is_masterchain()) { return shards_[ShardIdFull{masterchainId}].actor.get(); @@ -557,6 +576,11 @@ void FullNodeImpl::process_block_candidate_broadcast(BlockIdExt block_id, Catcha std::move(data)); } +void FullNodeImpl::get_out_msg_queue_query_token(td::Promise> promise) { + td::actor::send_closure(out_msg_queue_query_token_manager_, &TokenManager::get_token, 1, 0, td::Timestamp::in(10.0), + std::move(promise)); +} + void FullNodeImpl::set_validator_telemetry_filename(std::string value) { validator_telemetry_filename_ = std::move(value); update_validator_telemetry_collector(); @@ -645,6 +669,12 @@ void FullNodeImpl::start_up() { td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, shard_prefix, std::move(tmp_dir), timeout, std::move(promise)); } + void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, td::Timestamp timeout, + td::Promise>> promise) override { + td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits, + timeout, std::move(promise)); + } void new_key_block(BlockHandle handle) override { td::actor::send_closure(id_, &FullNodeImpl::new_key_block, std::move(handle)); diff --git a/validator/full-node.h b/validator/full-node.h index 73ecbd72..fdb1bf3b 100644 --- a/validator/full-node.h +++ b/validator/full-node.h @@ -91,6 +91,7 @@ class FullNode : public td::actor::Actor { virtual void process_block_broadcast(BlockBroadcast broadcast) = 0; virtual void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data) = 0; + virtual void get_out_msg_queue_query_token(td::Promise> promise) = 0; virtual void set_validator_telemetry_filename(std::string value) = 0; diff --git a/validator/full-node.hpp b/validator/full-node.hpp index b82dd473..0ea6fa0b 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -28,6 +28,7 @@ #include #include #include +#include namespace ton { @@ -79,6 +80,9 @@ class FullNodeImpl : public FullNode { void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise> promise); void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise); + void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, td::Timestamp timeout, + td::Promise>> promise); void got_key_block_config(td::Ref config); void new_key_block(BlockHandle handle); @@ -87,6 +91,7 @@ class FullNodeImpl : public FullNode { void process_block_broadcast(BlockBroadcast broadcast) override; void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, td::BufferSlice data) override; + void get_out_msg_queue_query_token(td::Promise> promise) override; void set_validator_telemetry_filename(std::string value) override; @@ -160,6 +165,9 @@ class FullNodeImpl : public FullNode { PublicKeyHash validator_telemetry_collector_key_ = PublicKeyHash::zero(); void update_validator_telemetry_collector(); + + td::actor::ActorOwn out_msg_queue_query_token_manager_ = + td::actor::create_actor("tokens", /* max_tokens = */ 1); }; } // namespace fullnode diff --git a/validator/impl/CMakeLists.txt b/validator/impl/CMakeLists.txt index b8f2edfb..978cf859 100644 --- a/validator/impl/CMakeLists.txt +++ b/validator/impl/CMakeLists.txt @@ -16,6 +16,7 @@ set(TON_VALIDATOR_SOURCE ihr-message.cpp liteserver.cpp message-queue.cpp + out-msg-queue-proof.cpp proof.cpp shard.cpp signature-set.cpp @@ -35,6 +36,7 @@ set(TON_VALIDATOR_SOURCE liteserver.hpp liteserver-cache.hpp message-queue.hpp + out-msg-queue-proof.hpp proof.hpp shard.hpp signature-set.hpp diff --git a/validator/impl/out-msg-queue-proof.cpp b/validator/impl/out-msg-queue-proof.cpp new file mode 100644 index 00000000..95ad4a41 --- /dev/null +++ b/validator/impl/out-msg-queue-proof.cpp @@ -0,0 +1,294 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#include "out-msg-queue-proof.hpp" +#include "interfaces/proof.h" +#include "shard.hpp" +#include "vm/cells/MerkleProof.h" +#include "common/delay.h" +#include "interfaces/validator-manager.h" +#include "block/block-parse.h" +#include "block/block-auto.h" +#include "output-queue-merger.h" + +namespace ton { + +namespace validator { + +static td::Status check_no_prunned(const Ref& cell) { + if (cell.is_null()) { + return td::Status::OK(); + } + TRY_RESULT(loaded_cell, cell->load_cell()); + if (loaded_cell.data_cell->get_level() > 0) { + return td::Status::Error("prunned branch"); + } + return td::Status::OK(); +} + +static td::Status check_no_prunned(const vm::CellSlice& cs) { + for (unsigned i = 0; i < cs.size_refs(); ++i) { + TRY_STATUS(check_no_prunned(cs.prefetch_ref(i))); + } + return td::Status::OK(); +} + +static td::Result> process_queue( + ShardIdFull dst_shard, std::vector> blocks, + block::ImportedMsgQueueLimits limits) { + td::uint64 estimated_proof_size = 0; + + td::HashSet visited; + std::function dfs_cs; + auto dfs = [&](const Ref& cell) { + if (cell.is_null() || !visited.insert(cell->get_hash()).second) { + return; + } + dfs_cs(vm::CellSlice(vm::NoVm(), cell)); + }; + dfs_cs = [&](const vm::CellSlice& cs) { + // Based on BlockLimitStatus::estimate_block_size + estimated_proof_size += 12 + (cs.size() + 7) / 8 + cs.size_refs() * 3; + for (unsigned i = 0; i < cs.size_refs(); i++) { + dfs(cs.prefetch_ref(i)); + } + }; + std::vector neighbors; + for (auto& b : blocks) { + TRY_STATUS_PREFIX(check_no_prunned(*b.second.proc_info), "invalid proc_info proof: ") + dfs_cs(*b.second.proc_info); + neighbors.emplace_back(b.first, b.second.out_queue->prefetch_ref()); + } + + block::OutputQueueMerger queue_merger{dst_shard, std::move(neighbors)}; + std::vector msg_count(blocks.size()); + td::int32 msg_count_total = 0; + bool limit_reached = false; + + while (!queue_merger.is_eof()) { + auto kv = queue_merger.extract_cur(); + queue_merger.next(); + block::EnqueuedMsgDescr enq; + auto msg = kv->msg; + if (!enq.unpack(msg.write())) { + return td::Status::Error("cannot unpack EnqueuedMsgDescr"); + } + if (limit_reached) { + break; + } + ++msg_count[kv->source]; + ++msg_count_total; + + dfs_cs(*kv->msg); + TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ") + if (estimated_proof_size >= limits.max_bytes || msg_count_total >= (long long)limits.max_msgs) { + limit_reached = true; + } + } + if (!limit_reached) { + std::fill(msg_count.begin(), msg_count.end(), -1); + } + return msg_count; +} + +td::Result> OutMsgQueueProof::build( + ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits) { + if (!dst_shard.is_valid_ext()) { + return td::Status::Error("invalid shard"); + } + if (blocks.empty()) { + return create_tl_object(td::BufferSlice{}, td::BufferSlice{}, + std::vector{}); + } + + std::vector> block_state_proofs; + for (auto& block : blocks) { + if (block.id.seqno() != 0) { + if (block.block_root.is_null()) { + return td::Status::Error("block is null"); + } + TRY_RESULT(proof, create_block_state_proof(block.block_root)); + block_state_proofs.push_back(std::move(proof)); + } + if (!block::ShardConfig::is_neighbor(dst_shard, block.id.shard_full())) { + return td::Status::Error("shards are not neighbors"); + } + } + TRY_RESULT(block_state_proof, vm::std_boc_serialize_multi(block_state_proofs)); + + vm::Dictionary states_dict_pure{32}; + for (size_t i = 0; i < blocks.size(); ++i) { + if (blocks[i].state_root.is_null()) { + return td::Status::Error("state is null"); + } + states_dict_pure.set_ref(td::BitArray<32>{(long long)i}, blocks[i].state_root); + } + + vm::MerkleProofBuilder mpb{states_dict_pure.get_root_cell()}; + vm::Dictionary states_dict{mpb.root(), 32}; + std::vector> data(blocks.size()); + for (size_t i = 0; i < blocks.size(); ++i) { + data[i].first = blocks[i].id; + TRY_RESULT(state, ShardStateQ::fetch(blocks[i].id, {}, states_dict.lookup_ref(td::BitArray<32>{(long long)i}))); + TRY_RESULT(outq_descr, state->message_queue()); + block::gen::OutMsgQueueInfo::Record qinfo; + if (!tlb::unpack_cell(outq_descr->root_cell(), data[i].second)) { + return td::Status::Error("invalid message queue"); + } + } + TRY_RESULT(msg_count, process_queue(dst_shard, std::move(data), limits)); + + TRY_RESULT(proof, mpb.extract_proof()); + vm::Dictionary states_dict_proof{vm::CellSlice{vm::NoVm(), proof}.prefetch_ref(), 32}; + std::vector> state_proofs; + for (size_t i = 0; i < blocks.size(); ++i) { + td::Ref proof_raw = states_dict_proof.lookup_ref(td::BitArray<32>{(long long)i}); + CHECK(proof_raw.not_null()); + state_proofs.push_back(vm::CellBuilder::create_merkle_proof(proof_raw)); + } + TRY_RESULT(queue_proof, vm::std_boc_serialize_multi(state_proofs)); + return create_tl_object(std::move(queue_proof), std::move(block_state_proof), + std::move(msg_count)); +} + +td::Result>> OutMsgQueueProof::fetch(ShardIdFull dst_shard, + std::vector blocks, + block::ImportedMsgQueueLimits limits, + const ton_api::tonNode_outMsgQueueProof& f) { + try { + std::vector> res; + TRY_RESULT(queue_proofs, vm::std_boc_deserialize_multi(f.queue_proofs_, (int)blocks.size())); + TRY_RESULT(block_state_proofs, vm::std_boc_deserialize_multi(f.block_state_proofs_, (int)blocks.size())); + if (queue_proofs.size() != blocks.size()) { + return td::Status::Error("invalid size of queue_proofs"); + } + if (f.msg_counts_.size() != blocks.size()) { + return td::Status::Error("invalid size of msg_counts"); + } + size_t j = 0; + std::vector> data(blocks.size()); + for (size_t i = 0; i < blocks.size(); ++i) { + td::Bits256 state_root_hash; + Ref block_state_proof = {}; + if (blocks[i].seqno() == 0) { + state_root_hash = blocks[i].root_hash; + } else { + if (j == block_state_proofs.size()) { + return td::Status::Error("invalid size of block_state_proofs"); + } + block_state_proof = block_state_proofs[j++]; + TRY_RESULT_ASSIGN(state_root_hash, unpack_block_state_proof(blocks[i], block_state_proof)); + } + auto state_root = vm::MerkleProof::virtualize(queue_proofs[i], 1); + if (state_root->get_hash().as_slice() != state_root_hash.as_slice()) { + return td::Status::Error("state root hash mismatch"); + } + res.emplace_back(true, blocks[i], state_root, block_state_proof, f.msg_counts_[i]); + + data[i].first = blocks[i]; + TRY_RESULT(state, ShardStateQ::fetch(blocks[i], {}, state_root)); + TRY_RESULT(outq_descr, state->message_queue()); + block::gen::OutMsgQueueInfo::Record qinfo; + if (!tlb::unpack_cell(outq_descr->root_cell(), data[i].second)) { + return td::Status::Error("invalid message queue"); + } + } + if (j != block_state_proofs.size()) { + return td::Status::Error("invalid size of block_state_proofs"); + } + TRY_RESULT(msg_count, process_queue(dst_shard, std::move(data), limits)); + if (msg_count != f.msg_counts_) { + return td::Status::Error("incorrect msg_count"); + } + return res; + } catch (vm::VmVirtError& err) { + return td::Status::Error(PSTRING() << "invalid proof: " << err.get_msg()); + } +} + +void BuildOutMsgQueueProof::abort_query(td::Status reason) { + if (promise_) { + FLOG(DEBUG) { + sb << "failed to build msg queue proof to " << dst_shard_.to_str() << " from"; + for (const auto& block : blocks_) { + sb << " " << block.id.id.to_str(); + } + sb << ": " << reason; + }; + promise_.set_error( + reason.move_as_error_prefix(PSTRING() << "failed to build msg queue proof to " << dst_shard_.to_str() << ": ")); + } + stop(); +} + +void BuildOutMsgQueueProof::start_up() { + for (size_t i = 0; i < blocks_.size(); ++i) { + BlockIdExt id = blocks_[i].id; + ++pending; + td::actor::send_closure(manager_, &ValidatorManagerInterface::get_shard_state_from_db_short, id, + [SelfId = actor_id(this), i](td::Result> R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::abort_query, + R.move_as_error_prefix("failed to get shard state: ")); + } else { + td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::got_state_root, i, + R.move_as_ok()->root_cell()); + } + }); + if (id.seqno() != 0) { + ++pending; + td::actor::send_closure(manager_, &ValidatorManagerInterface::get_block_data_from_db_short, id, + [SelfId = actor_id(this), i](td::Result> R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::abort_query, + R.move_as_error_prefix("failed to get block data: ")); + } else { + td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::got_block_root, i, + R.move_as_ok()->root_cell()); + } + }); + } + } + if (pending == 0) { + build_proof(); + } +} + +void BuildOutMsgQueueProof::got_state_root(size_t i, Ref root) { + blocks_[i].state_root = std::move(root); + if (--pending == 0) { + build_proof(); + } +} + +void BuildOutMsgQueueProof::got_block_root(size_t i, Ref root) { + blocks_[i].block_root = std::move(root); + if (--pending == 0) { + build_proof(); + } +} + +void BuildOutMsgQueueProof::build_proof() { + auto result = OutMsgQueueProof::build(dst_shard_, std::move(blocks_), limits_); + if (result.is_error()) { + LOG(ERROR) << "Failed to build msg queue proof: " << result.error(); + } + promise_.set_result(std::move(result)); + stop(); +} + +} // namespace validator +} // namespace ton diff --git a/validator/impl/out-msg-queue-proof.hpp b/validator/impl/out-msg-queue-proof.hpp new file mode 100644 index 00000000..e28561e2 --- /dev/null +++ b/validator/impl/out-msg-queue-proof.hpp @@ -0,0 +1,64 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once +#include "vm/cells.h" +#include "ton/ton-types.h" +#include "auto/tl/ton_api.h" +#include "interfaces/out-msg-queue-proof.h" +#include "td/actor/actor.h" +#include "interfaces/shard.h" +#include "validator.h" + +namespace ton { + +namespace validator { +using td::Ref; + +class ValidatorManager; +class ValidatorManagerInterface; + +class BuildOutMsgQueueProof : public td::actor::Actor { + public: + BuildOutMsgQueueProof(ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, + td::actor::ActorId manager, + td::Promise> promise) + : dst_shard_(dst_shard), limits_(limits), manager_(manager), promise_(std::move(promise)) { + blocks_.resize(blocks.size()); + for (size_t i = 0; i < blocks_.size(); ++i) { + blocks_[i].id = blocks[i]; + } + } + + void abort_query(td::Status reason); + void start_up() override; + void got_state_root(size_t i, Ref root); + void got_block_root(size_t i, Ref root); + void build_proof(); + + private: + ShardIdFull dst_shard_; + std::vector blocks_; + block::ImportedMsgQueueLimits limits_; + + td::actor::ActorId manager_; + td::Promise> promise_; + + size_t pending = 0; +}; + +} // namespace validator +} // namespace ton diff --git a/validator/impl/proof.cpp b/validator/impl/proof.cpp index 033a1ab1..d7222211 100644 --- a/validator/impl/proof.cpp +++ b/validator/impl/proof.cpp @@ -162,5 +162,40 @@ td::Result> ProofQ::get_signatures_root() const { return proof.signatures->prefetch_ref(); } +td::Result> create_block_state_proof(td::Ref root) { + if (root.is_null()) { + return td::Status::Error("root is null"); + } + vm::MerkleProofBuilder mpb{std::move(root)}; + block::gen::Block::Record block; + if (!tlb::unpack_cell(mpb.root(), block) || block.state_update->load_cell().is_error()) { + return td::Status::Error("invalid block"); + } + TRY_RESULT(proof, mpb.extract_proof()); + if (proof.is_null()) { + return td::Status::Error("failed to create proof"); + } + return proof; +} + +td::Result unpack_block_state_proof(BlockIdExt block_id, td::Ref proof) { + auto virt_root = vm::MerkleProof::virtualize(proof, 1); + if (virt_root.is_null()) { + return td::Status::Error("invalid Merkle proof"); + } + if (virt_root->get_hash().as_slice() != block_id.root_hash.as_slice()) { + return td::Status::Error("hash mismatch"); + } + block::gen::Block::Record block; + if (!tlb::unpack_cell(virt_root, block)) { + return td::Status::Error("invalid block"); + } + vm::CellSlice upd_cs{vm::NoVmSpec(), block.state_update}; + if (!(upd_cs.is_special() && upd_cs.prefetch_long(8) == 4 && upd_cs.size_ext() == 0x20228)) { + return td::Status::Error("invalid Merkle update"); + } + return upd_cs.prefetch_ref(1)->get_hash(0).bits(); +} + } // namespace validator } // namespace ton diff --git a/validator/interfaces/out-msg-queue-proof.h b/validator/interfaces/out-msg-queue-proof.h new file mode 100644 index 00000000..c0aa5610 --- /dev/null +++ b/validator/interfaces/out-msg-queue-proof.h @@ -0,0 +1,57 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once +#include "vm/cells.h" +#include "ton/ton-types.h" +#include "auto/tl/ton_api.h" +#include "block/block.h" + +namespace ton { + +namespace validator { +using td::Ref; + +struct OutMsgQueueProof : public td::CntObject { + OutMsgQueueProof(BlockIdExt block_id, Ref state_root, Ref block_state_proof, + td::int32 msg_count = -1) + : block_id_(block_id) + , state_root_(std::move(state_root)) + , block_state_proof_(std::move(block_state_proof)) + , msg_count_(msg_count) { + } + + BlockIdExt block_id_; + Ref state_root_; + Ref block_state_proof_; + td::int32 msg_count_; // -1 - no limit + + static td::Result>> fetch(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, + const ton_api::tonNode_outMsgQueueProof &f); + + struct OneBlock { + BlockIdExt id; + Ref state_root; + Ref block_root; + }; + static td::Result> build(ShardIdFull dst_shard, + std::vector blocks, + block::ImportedMsgQueueLimits limits); +}; + +} // namespace validator +} // namespace ton diff --git a/validator/interfaces/proof.h b/validator/interfaces/proof.h index 99471a1f..6665ad08 100644 --- a/validator/interfaces/proof.h +++ b/validator/interfaces/proof.h @@ -48,6 +48,9 @@ class Proof : virtual public ProofLink { virtual td::Result> export_as_proof_link() const = 0; }; +td::Result> create_block_state_proof(td::Ref root); +td::Result unpack_block_state_proof(BlockIdExt block_id, td::Ref proof); + } // namespace validator } // namespace ton diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index 3a568ba4..20d4bd62 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -146,6 +146,9 @@ class ValidatorManager : public ValidatorManagerInterface { virtual void send_top_shard_block_description(td::Ref desc) = 0; virtual void send_block_broadcast(BlockBroadcast broadcast, int mode) = 0; virtual void send_validator_telemetry(PublicKeyHash key, tl_object_ptr telemetry) = 0; + virtual void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, + td::Promise>> promise) = 0; virtual void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) = 0; diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index 18649ba8..cd06bf55 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -265,6 +265,11 @@ class ValidatorManagerImpl : public ValidatorManager { } void send_validator_telemetry(PublicKeyHash key, tl_object_ptr telemetry) override { } + void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, + td::Promise>> promise) override { + UNREACHABLE(); + } void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) override { UNREACHABLE(); @@ -283,7 +288,7 @@ class ValidatorManagerImpl : public ValidatorManager { void try_get_static_file(FileHash file_hash, td::Promise promise) override; void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, - td::Promise> promise) override { + td::Promise> promise) override { promise.set_error(td::Status::Error(ErrorCode::error, "download disabled")); } diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 7cd8c705..0b8b9e73 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -335,6 +335,11 @@ class ValidatorManagerImpl : public ValidatorManager { } void send_validator_telemetry(PublicKeyHash key, tl_object_ptr telemetry) override { } + void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, + td::Promise>> promise) override { + UNREACHABLE(); + } void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) override { UNREACHABLE(); @@ -357,7 +362,7 @@ class ValidatorManagerImpl : public ValidatorManager { void try_get_static_file(FileHash file_hash, td::Promise promise) override; void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, - td::Promise> promise) override { + td::Promise> promise) override { promise.set_error(td::Status::Error(ErrorCode::error, "download disabled")); } diff --git a/validator/manager.cpp b/validator/manager.cpp index a75de0a9..a631bd09 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1652,6 +1652,13 @@ void ValidatorManagerImpl::send_validator_telemetry(PublicKeyHash key, callback_->send_validator_telemetry(key, std::move(telemetry)); } +void ValidatorManagerImpl::send_get_out_msg_queue_proof_request( + ShardIdFull dst_shard, std::vector blocks, block::ImportedMsgQueueLimits limits, + td::Promise>> promise) { + callback_->download_out_msg_queue_proof(dst_shard, std::move(blocks), limits, td::Timestamp::in(10.0), + std::move(promise)); +} + void ValidatorManagerImpl::send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) { diff --git a/validator/manager.hpp b/validator/manager.hpp index d55e3442..519cab12 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -512,6 +512,9 @@ class ValidatorManagerImpl : public ValidatorManager { void send_top_shard_block_description(td::Ref desc) override; void send_block_broadcast(BlockBroadcast broadcast, int mode) override; void send_validator_telemetry(PublicKeyHash key, tl_object_ptr telemetry) override; + void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, + td::Promise>> promise) override; void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) override; @@ -524,8 +527,8 @@ class ValidatorManagerImpl : public ValidatorManager { void try_get_static_file(FileHash file_hash, td::Promise promise) override; void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, - td::Promise> promise) override { - td::actor::send_closure(token_manager_, &TokenManager::get_download_token, download_size, priority, timeout, + td::Promise> promise) override { + td::actor::send_closure(token_manager_, &TokenManager::get_token, download_size, priority, timeout, std::move(promise)); } diff --git a/validator/net/download-block-new.cpp b/validator/net/download-block-new.cpp index e9a193b4..37580cef 100644 --- a/validator/net/download-block-new.cpp +++ b/validator/net/download-block-new.cpp @@ -144,7 +144,7 @@ void DownloadBlockNew::got_block_handle(BlockHandle handle) { return; } - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { td::actor::send_closure(SelfId, &DownloadBlockNew::abort_query, R.move_as_error_prefix("failed to get download token: ")); @@ -156,7 +156,7 @@ void DownloadBlockNew::got_block_handle(BlockHandle handle) { std::move(P)); } -void DownloadBlockNew::got_download_token(std::unique_ptr token) { +void DownloadBlockNew::got_download_token(std::unique_ptr token) { token_ = std::move(token); if (download_from_.is_zero() && client_.empty()) { diff --git a/validator/net/download-block-new.hpp b/validator/net/download-block-new.hpp index d2a0e136..ecd062ee 100644 --- a/validator/net/download-block-new.hpp +++ b/validator/net/download-block-new.hpp @@ -49,7 +49,7 @@ class DownloadBlockNew : public td::actor::Actor { void start_up() override; void got_block_handle(BlockHandle handle); - void got_download_token(std::unique_ptr token); + void got_download_token(std::unique_ptr token); void got_node_to_download(adnl::AdnlNodeIdShort node); void got_data(td::BufferSlice data); void got_data_from_db(td::BufferSlice data); @@ -79,7 +79,7 @@ class DownloadBlockNew : public td::actor::Actor { bool allow_partial_proof_ = false; - std::unique_ptr token_; + std::unique_ptr token_; }; } // namespace fullnode diff --git a/validator/net/download-block.cpp b/validator/net/download-block.cpp index 9ca84be2..c60955ed 100644 --- a/validator/net/download-block.cpp +++ b/validator/net/download-block.cpp @@ -128,7 +128,7 @@ void DownloadBlock::got_block_handle(BlockHandle handle) { return; } - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { td::actor::send_closure(SelfId, &DownloadBlock::abort_query, R.move_as_error_prefix("failed to get download token: ")); @@ -140,7 +140,7 @@ void DownloadBlock::got_block_handle(BlockHandle handle) { std::move(P)); } -void DownloadBlock::got_download_token(std::unique_ptr token) { +void DownloadBlock::got_download_token(std::unique_ptr token) { token_ = std::move(token); if (download_from_.is_zero() && !short_ && client_.empty()) { diff --git a/validator/net/download-block.hpp b/validator/net/download-block.hpp index b1847d58..2e2a715b 100644 --- a/validator/net/download-block.hpp +++ b/validator/net/download-block.hpp @@ -49,7 +49,7 @@ class DownloadBlock : public td::actor::Actor { void start_up() override; void got_block_handle(BlockHandle handle); - void got_download_token(std::unique_ptr token); + void got_download_token(std::unique_ptr token); void got_node_to_download(adnl::AdnlNodeIdShort node); void got_block_proof_description(td::BufferSlice proof_description); void got_block_proof(td::BufferSlice data); @@ -86,7 +86,7 @@ class DownloadBlock : public td::actor::Actor { bool allow_partial_proof_ = false; - std::unique_ptr token_; + std::unique_ptr token_; }; } // namespace fullnode diff --git a/validator/net/download-proof.cpp b/validator/net/download-proof.cpp index 2ff95b88..784ecac2 100644 --- a/validator/net/download-proof.cpp +++ b/validator/net/download-proof.cpp @@ -107,7 +107,7 @@ void DownloadProof::start_up() { } void DownloadProof::checked_db() { - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { td::actor::send_closure(SelfId, &DownloadProof::abort_query, R.move_as_error_prefix("failed to get download token: ")); @@ -119,7 +119,7 @@ void DownloadProof::checked_db() { std::move(P)); } -void DownloadProof::got_download_token(std::unique_ptr token) { +void DownloadProof::got_download_token(std::unique_ptr token) { token_ = std::move(token); if (download_from_.is_zero() && client_.empty()) { diff --git a/validator/net/download-proof.hpp b/validator/net/download-proof.hpp index 0739dcaf..9caf9c3a 100644 --- a/validator/net/download-proof.hpp +++ b/validator/net/download-proof.hpp @@ -45,7 +45,7 @@ class DownloadProof : public td::actor::Actor { void start_up() override; void checked_db(); - void got_download_token(std::unique_ptr token); + void got_download_token(std::unique_ptr token); void got_node_to_download(adnl::AdnlNodeIdShort node); void got_block_proof_description(td::BufferSlice proof_description); void got_block_proof(td::BufferSlice data); @@ -72,7 +72,7 @@ class DownloadProof : public td::actor::Actor { td::BufferSlice data_; - std::unique_ptr token_; + std::unique_ptr token_; }; } // namespace fullnode diff --git a/validator/net/get-next-key-blocks.cpp b/validator/net/get-next-key-blocks.cpp index 3354b005..2c12e495 100644 --- a/validator/net/get-next-key-blocks.cpp +++ b/validator/net/get-next-key-blocks.cpp @@ -84,7 +84,7 @@ void GetNextKeyBlocks::finish_query() { void GetNextKeyBlocks::start_up() { alarm_timestamp() = timeout_; - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { td::actor::send_closure(SelfId, &GetNextKeyBlocks::abort_query, R.move_as_error_prefix("failed to get download token: ")); @@ -96,7 +96,7 @@ void GetNextKeyBlocks::start_up() { std::move(P)); } -void GetNextKeyBlocks::got_download_token(std::unique_ptr token) { +void GetNextKeyBlocks::got_download_token(std::unique_ptr token) { token_ = std::move(token); if (download_from_.is_zero() && client_.empty()) { diff --git a/validator/net/get-next-key-blocks.hpp b/validator/net/get-next-key-blocks.hpp index 074289e2..14c040bd 100644 --- a/validator/net/get-next-key-blocks.hpp +++ b/validator/net/get-next-key-blocks.hpp @@ -44,7 +44,7 @@ class GetNextKeyBlocks : public td::actor::Actor { void finish_query(); void start_up() override; - void got_download_token(std::unique_ptr token); + void got_download_token(std::unique_ptr token); void got_node_to_download(adnl::AdnlNodeIdShort node); void send_request(); void got_result(td::BufferSlice res); @@ -75,7 +75,7 @@ class GetNextKeyBlocks : public td::actor::Actor { std::vector pending_; std::vector res_; - std::unique_ptr token_; + std::unique_ptr token_; }; } // namespace fullnode diff --git a/validator/token-manager.cpp b/validator/token-manager.cpp index 0bc4a9c6..8242f921 100644 --- a/validator/token-manager.cpp +++ b/validator/token-manager.cpp @@ -22,23 +22,23 @@ namespace ton { namespace validator { -void TokenManager::get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, - td::Promise> promise) { +void TokenManager::get_token(size_t size, td::uint32 priority, td::Timestamp timeout, + td::Promise> promise) { if (free_priority_tokens_ > 0 && priority > 0) { --free_priority_tokens_; - promise.set_value(gen_token(download_size, priority)); + promise.set_value(gen_token(size, priority)); return; } if (free_tokens_ > 0) { --free_tokens_; - promise.set_value(gen_token(download_size, priority)); + promise.set_value(gen_token(size, priority)); return; } - pending_.emplace(PendingPromiseKey{download_size, priority, seqno_++}, PendingPromise{timeout, std::move(promise)}); + pending_.emplace(PendingPromiseKey{size, priority, seqno_++}, PendingPromise{timeout, std::move(promise)}); } -void TokenManager::download_token_cleared(size_t download_size, td::uint32 priority) { +void TokenManager::token_cleared(size_t size, td::uint32 priority) { (priority ? free_priority_tokens_ : free_tokens_)++; if (free_priority_tokens_ > max_priority_tokens_) { free_priority_tokens_--; @@ -47,7 +47,7 @@ void TokenManager::download_token_cleared(size_t download_size, td::uint32 prior for (auto it = pending_.begin(); it != pending_.end();) { if (it->first.priority && (free_tokens_ || free_priority_tokens_)) { - it->second.promise.set_value(gen_token(download_size, priority)); + it->second.promise.set_value(gen_token(size, priority)); auto it2 = it++; pending_.erase(it2); if (free_priority_tokens_ > 0) { @@ -56,7 +56,7 @@ void TokenManager::download_token_cleared(size_t download_size, td::uint32 prior free_tokens_--; } } else if (!it->first.priority && free_tokens_) { - it->second.promise.set_value(gen_token(download_size, priority)); + it->second.promise.set_value(gen_token(size, priority)); auto it2 = it++; pending_.erase(it2); free_tokens_--; @@ -69,7 +69,7 @@ void TokenManager::download_token_cleared(size_t download_size, td::uint32 prior void TokenManager::alarm() { for (auto it = pending_.begin(); it != pending_.end();) { if (it->second.timeout.is_in_past()) { - it->second.promise.set_error(td::Status::Error(ErrorCode::timeout, "timeout in wait download token")); + it->second.promise.set_error(td::Status::Error(ErrorCode::timeout, "timeout in wait token")); it = pending_.erase(it); } else { it++; @@ -77,23 +77,23 @@ void TokenManager::alarm() { } } -std::unique_ptr TokenManager::gen_token(size_t download_size, td::uint32 priority) { - class Token : public DownloadToken { +std::unique_ptr TokenManager::gen_token(size_t size, td::uint32 priority) { + class TokenImpl : public ActionToken { public: - Token(size_t download_size, td::uint32 priority, td::actor::ActorId manager) - : download_size_(download_size), priority_(priority), manager_(manager) { + TokenImpl(size_t size, td::uint32 priority, td::actor::ActorId manager) + : size_(size), priority_(priority), manager_(manager) { } - ~Token() override { - td::actor::send_closure(manager_, &TokenManager::download_token_cleared, download_size_, priority_); + ~TokenImpl() override { + td::actor::send_closure(manager_, &TokenManager::token_cleared, size_, priority_); } private: - size_t download_size_; + size_t size_; td::uint32 priority_; td::actor::ActorId manager_; }; - return std::make_unique(download_size, priority, actor_id(this)); + return std::make_unique(size, priority, actor_id(this)); } } // namespace validator diff --git a/validator/token-manager.h b/validator/token-manager.h index 0d75710f..0fd0126a 100644 --- a/validator/token-manager.h +++ b/validator/token-manager.h @@ -31,16 +31,19 @@ class TokenManager : public td::actor::Actor { public: TokenManager() { } + explicit TokenManager(td::uint32 max_tokens) + : free_tokens_(max_tokens), free_priority_tokens_(max_tokens), max_priority_tokens_(max_tokens) { + } void alarm() override; - void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, - td::Promise> promise); - void download_token_cleared(size_t download_size, td::uint32 priority); + void get_token(size_t size, td::uint32 priority, td::Timestamp timeout, + td::Promise> promise); + void token_cleared(size_t size, td::uint32 priority); private: - std::unique_ptr gen_token(size_t download_size, td::uint32 priority); + std::unique_ptr gen_token(size_t size, td::uint32 priority); struct PendingPromiseKey { - size_t download_size; + size_t size; td::uint32 priority; td::uint64 seqno; @@ -50,7 +53,7 @@ class TokenManager : public td::actor::Actor { }; struct PendingPromise { td::Timestamp timeout; - td::Promise> promise; + td::Promise> promise; }; td::uint64 seqno_ = 0; std::map pending_; diff --git a/validator/validator.h b/validator/validator.h index cc7cbe62..73065aa9 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -35,15 +35,16 @@ #include "interfaces/proof.h" #include "interfaces/shard.h" #include "catchain/catchain-types.h" +#include "interfaces/out-msg-queue-proof.h" #include "interfaces/external-message.h" namespace ton { namespace validator { -class DownloadToken { +class ActionToken { public: - virtual ~DownloadToken() = default; + virtual ~ActionToken() = default; }; struct PerfTimerStats { @@ -186,6 +187,9 @@ class ValidatorManagerInterface : public td::actor::Actor { td::Promise> promise) = 0; virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, td::Timestamp timeout, td::Promise promise) = 0; + virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector blocks, + block::ImportedMsgQueueLimits limits, td::Timestamp timeout, + td::Promise>> promise) = 0; virtual void new_key_block(BlockHandle handle) = 0; virtual void send_validator_telemetry(PublicKeyHash key, tl_object_ptr telemetry) = 0; @@ -248,7 +252,7 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void add_ext_server_port(td::uint16 port) = 0; virtual void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, - td::Promise> promise) = 0; + td::Promise> promise) = 0; virtual void get_block_data_from_db(ConstBlockHandle handle, td::Promise> promise) = 0; virtual void get_block_data_from_db_short(BlockIdExt block_id, td::Promise> promise) = 0;