1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-12 11:12:16 +00:00

tonNode.getOutMsgQueueProof query in public shard overlays (#1413)

* tonNode.getOutMsgQueueProof query in public shard overlays

* Allow responding to getOutMsgQueueProof requests one at a time only
This commit is contained in:
SpyCheese 2024-12-04 14:38:57 +03:00 committed by GitHub
parent 9ae88d87e3
commit 7bc50e63d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 729 additions and 51 deletions

View file

@ -272,6 +272,10 @@ class HardforkCreator : public td::actor::Actor {
void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir, void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override { td::Timestamp timeout, td::Promise<std::string> promise) override {
} }
void download_out_msg_queue_proof(
ton::ShardIdFull dst_shard, std::vector<ton::BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<std::vector<td::Ref<ton::validator::OutMsgQueueProof>>> promise) override {
}
void new_key_block(ton::validator::BlockHandle handle) override { void new_key_block(ton::validator::BlockHandle handle) override {
} }

View file

@ -660,6 +660,12 @@ bool EnqueuedMsgDescr::check_key(td::ConstBitPtr key) const {
hash_ == key + 96; hash_ == key + 96;
} }
bool ImportedMsgQueueLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0xd3 // imported_msg_queue_limits#d3
&& cs.fetch_uint_to(32, max_bytes) // max_bytes:#
&& cs.fetch_uint_to(32, max_msgs); // max_msgs:#
}
bool ParamLimits::deserialize(vm::CellSlice& cs) { bool ParamLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0xc3 // param_limits#c3 return cs.fetch_ulong(8) == 0xc3 // param_limits#c3
&& cs.fetch_uint_to(32, limits_[0]) // underload:uint32 && cs.fetch_uint_to(32, limits_[0]) // underload:uint32

View file

@ -216,6 +216,16 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC
return proc_coll.print(os); return proc_coll.print(os);
} }
struct ImportedMsgQueueLimits {
// Default values
td::uint32 max_bytes = 1 << 16;
td::uint32 max_msgs = 30;
bool deserialize(vm::CellSlice& cs);
ImportedMsgQueueLimits operator*(td::uint32 x) const {
return {max_bytes * x, max_msgs * x};
}
};
struct ParamLimits { struct ParamLimits {
enum { limits_cnt = 4 }; enum { limits_cnt = 4 };
enum { cl_underload = 0, cl_normal = 1, cl_soft = 2, cl_medium = 3, cl_hard = 4 }; enum { cl_underload = 0, cl_normal = 1, cl_soft = 2, cl_medium = 3, cl_hard = 4 };

View file

@ -149,4 +149,19 @@ std::enable_if_t<std::is_arithmetic<T>::value, string> to_string(const T &x) {
return sb.as_cslice().str(); return sb.as_cslice().str();
} }
template <class SB>
struct LambdaPrintHelper {
SB& sb;
};
template <class SB, class F>
SB& operator<<(const LambdaPrintHelper<SB>& helper, F&& f) {
f(helper.sb);
return helper.sb;
}
struct LambdaPrint {};
inline LambdaPrintHelper<td::StringBuilder> operator<<(td::StringBuilder& sb, const LambdaPrint&) {
return LambdaPrintHelper<td::StringBuilder>{sb};
}
} // namespace td } // namespace td

View file

@ -74,6 +74,7 @@
#define LOG(level) LOG_IMPL(level, level, true, ::td::Slice()) #define LOG(level) LOG_IMPL(level, level, true, ::td::Slice())
#define LOG_IF(level, condition) LOG_IMPL(level, level, condition, #condition) #define LOG_IF(level, condition) LOG_IMPL(level, level, condition, #condition)
#define FLOG(level) LOG_IMPL(level, level, true, ::td::Slice()) << td::LambdaPrint{} << [&](auto &sb)
#define VLOG(level) LOG_IMPL(DEBUG, level, true, TD_DEFINE_STR(level)) #define VLOG(level) LOG_IMPL(DEBUG, level, true, TD_DEFINE_STR(level))
#define VLOG_IF(level, condition) LOG_IMPL(DEBUG, level, condition, TD_DEFINE_STR(level) " " #condition) #define VLOG_IF(level, condition) LOG_IMPL(DEBUG, level, condition, TD_DEFINE_STR(level) " " #condition)
@ -95,13 +96,13 @@ inline bool no_return_func() {
#define DUMMY_LOG_CHECK(condition) LOG_IF(NEVER, !(condition)) #define DUMMY_LOG_CHECK(condition) LOG_IF(NEVER, !(condition))
#ifdef TD_DEBUG #ifdef TD_DEBUG
#if TD_MSVC #if TD_MSVC
#define LOG_CHECK(condition) \ #define LOG_CHECK(condition) \
__analysis_assume(!!(condition)); \ __analysis_assume(!!(condition)); \
LOG_IMPL(FATAL, FATAL, !(condition), #condition) LOG_IMPL(FATAL, FATAL, !(condition), #condition)
#else #else
#define LOG_CHECK(condition) LOG_IMPL(FATAL, FATAL, !(condition) && no_return_func(), #condition) #define LOG_CHECK(condition) LOG_IMPL(FATAL, FATAL, !(condition) && no_return_func(), #condition)
#endif #endif
#else #else
#define LOG_CHECK DUMMY_LOG_CHECK #define LOG_CHECK DUMMY_LOG_CHECK
#endif #endif
@ -263,6 +264,9 @@ class Logger {
sb_ << other; sb_ << other;
return *this; return *this;
} }
LambdaPrintHelper<td::Logger> operator<<(const LambdaPrint &) {
return LambdaPrintHelper<td::Logger>{*this};
}
MutableCSlice as_cslice() { MutableCSlice as_cslice() {
return sb_.as_cslice(); return sb_.as_cslice();

View file

@ -373,6 +373,10 @@ class TestNode : public td::actor::Actor {
void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir, void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override { td::Timestamp timeout, td::Promise<std::string> promise) override {
} }
void download_out_msg_queue_proof(
ton::ShardIdFull dst_shard, std::vector<ton::BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<std::vector<td::Ref<ton::validator::OutMsgQueueProof>>> promise) override {
}
void new_key_block(ton::validator::BlockHandle handle) override { void new_key_block(ton::validator::BlockHandle handle) override {
} }

View file

@ -454,6 +454,10 @@ tonNode.success = tonNode.Success;
tonNode.archiveNotFound = tonNode.ArchiveInfo; tonNode.archiveNotFound = tonNode.ArchiveInfo;
tonNode.archiveInfo id:long = tonNode.ArchiveInfo; tonNode.archiveInfo id:long = tonNode.ArchiveInfo;
tonNode.importedMsgQueueLimits max_bytes:int max_msgs:int = ImportedMsgQueueLimits;
tonNode.outMsgQueueProof queue_proofs:bytes block_state_proofs:bytes msg_counts:(vector int) = tonNode.OutMsgQueueProof;
tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof;
tonNode.forgetPeer = tonNode.ForgetPeer; tonNode.forgetPeer = tonNode.ForgetPeer;
---functions--- ---functions---
@ -483,6 +487,8 @@ tonNode.downloadKeyBlockProofLink block:tonNode.blockIdExt = tonNode.Data;
tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo; tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo;
tonNode.getShardArchiveInfo masterchain_seqno:int shard_prefix:tonNode.shardId = tonNode.ArchiveInfo; tonNode.getShardArchiveInfo masterchain_seqno:int shard_prefix:tonNode.shardId = tonNode.ArchiveInfo;
tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data; tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data;
tonNode.getOutMsgQueueProof dst_shard:tonNode.shardId blocks:(vector tonNode.blockIdExt)
limits:tonNode.importedMsgQueueLimits = tonNode.OutMsgQueueProof;
tonNode.getCapabilities = tonNode.Capabilities; tonNode.getCapabilities = tonNode.Capabilities;

Binary file not shown.

View file

@ -46,6 +46,7 @@ set(VALIDATOR_HEADERS
interfaces/db.h interfaces/db.h
interfaces/external-message.h interfaces/external-message.h
interfaces/liteserver.h interfaces/liteserver.h
interfaces/out-msg-queue-proof.h
interfaces/proof.h interfaces/proof.h
interfaces/shard.h interfaces/shard.h
interfaces/signature-set.h interfaces/signature-set.h

View file

@ -37,6 +37,7 @@
#include "net/download-proof.hpp" #include "net/download-proof.hpp"
#include "net/get-next-key-blocks.hpp" #include "net/get-next-key-blocks.hpp"
#include "net/download-archive-slice.hpp" #include "net/download-archive-slice.hpp"
#include "impl/out-msg-queue-proof.hpp"
#include "td/utils/Random.h" #include "td/utils/Random.h"
@ -669,6 +670,62 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
query.offset_, query.max_size_, std::move(promise)); query.offset_, query.max_size_, std::move(promise));
} }
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
td::Promise<td::BufferSlice> promise) {
std::vector<BlockIdExt> blocks;
for (const auto &x : query.blocks_) {
BlockIdExt id = create_block_id(x);
if (!id.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid block_id"));
return;
}
if (!shard_is_ancestor(shard_, id.shard_full())) {
promise.set_error(td::Status::Error("query in wrong overlay"));
return;
}
blocks.push_back(create_block_id(x));
}
ShardIdFull dst_shard = create_shard_id(query.dst_shard_);
if (!dst_shard.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid shard"));
return;
}
block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_};
if (limits.max_msgs > 512) {
promise.set_error(td::Status::Error("max_msgs is too big"));
return;
}
if (limits.max_bytes > (1 << 21)) {
promise.set_error(td::Status::Error("max_bytes is too big"));
return;
}
FLOG(DEBUG) {
sb << "Got query getOutMsgQueueProof to shard " << dst_shard.to_str() << " from blocks";
for (const BlockIdExt &id : blocks) {
sb << " " << id.id.to_str();
}
sb << " from " << src;
};
td::actor::send_closure(
full_node_, &FullNode::get_out_msg_queue_query_token,
[=, manager = validator_manager_, blocks = std::move(blocks),
promise = std::move(promise)](td::Result<std::unique_ptr<ActionToken>> R) mutable {
TRY_RESULT_PROMISE(promise, token, std::move(R));
auto P =
td::PromiseCreator::lambda([promise = std::move(promise), token = std::move(token)](
td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> R) mutable {
if (R.is_error()) {
promise.set_result(create_serialize_tl_object<ton_api::tonNode_outMsgQueueProofEmpty>());
} else {
promise.set_result(serialize_tl_object(R.move_as_ok(), true));
}
});
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", dst_shard, std::move(blocks), limits, manager,
std::move(P))
.release();
});
}
void FullNodeShardImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query, void FullNodeShardImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query,
td::Promise<td::BufferSlice> promise) { td::Promise<td::BufferSlice> promise) {
if (!active_) { if (!active_) {
@ -944,6 +1001,47 @@ void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFu
.release(); .release();
} }
void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
// TODO: maybe more complex download (like other requests here)
auto &b = choose_neighbour();
if (b.adnl_id == adnl::AdnlNodeIdShort::zero()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "no nodes"));
return;
}
std::vector<tl_object_ptr<ton_api::tonNode_blockIdExt>> blocks_tl;
for (const BlockIdExt &id : blocks) {
blocks_tl.push_back(create_tl_block_id(id));
}
td::BufferSlice query = create_serialize_tl_object<ton_api::tonNode_getOutMsgQueueProof>(
create_tl_shard_id(dst_shard), std::move(blocks_tl),
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(limits.max_bytes, limits.max_msgs));
auto P = td::PromiseCreator::lambda(
[=, promise = std::move(promise), blocks = std::move(blocks)](td::Result<td::BufferSlice> R) mutable {
if (R.is_error()) {
promise.set_result(R.move_as_error());
return;
}
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::tonNode_OutMsgQueueProof>(R.move_as_ok(), true));
ton_api::downcast_call(
*f, td::overloaded(
[&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
promise.set_error(td::Status::Error("node doesn't have this block"));
},
[&](ton_api::tonNode_outMsgQueueProof &x) {
delay_action(
[=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable {
promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x));
},
td::Timestamp::now());
}));
});
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_,
"get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_);
}
void FullNodeShardImpl::set_handle(BlockHandle handle, td::Promise<td::Unit> promise) { void FullNodeShardImpl::set_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
CHECK(!handle_); CHECK(!handle_);
handle_ = std::move(handle); handle_ = std::move(handle);

View file

@ -66,6 +66,9 @@ class FullNodeShard : public td::actor::Actor {
td::Promise<std::vector<BlockIdExt>> promise) = 0; td::Promise<std::vector<BlockIdExt>> promise) = 0;
virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) = 0; td::Timestamp timeout, td::Promise<std::string> promise) = 0;
virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) = 0;
virtual void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0; virtual void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0;

View file

@ -139,8 +139,8 @@ class FullNodeShardImpl : public FullNodeShard {
td::Promise<td::BufferSlice> promise); td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveSlice &query, void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveSlice &query,
td::Promise<td::BufferSlice> promise); td::Promise<td::BufferSlice> promise);
// void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_prepareNextKeyBlockProof &query, void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
// td::Promise<td::BufferSlice> promise); td::Promise<td::BufferSlice> promise);
void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query, td::Promise<td::BufferSlice> promise); void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query, td::Promise<td::BufferSlice> promise);
void receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data); void receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data);
@ -183,6 +183,9 @@ class FullNodeShardImpl : public FullNodeShard {
td::Promise<std::vector<BlockIdExt>> promise) override; td::Promise<std::vector<BlockIdExt>> promise) override;
void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override; td::Timestamp timeout, td::Promise<std::string> promise) override;
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override;
void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) override; void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;

View file

@ -21,6 +21,7 @@
#include "td/actor/MultiPromise.h" #include "td/actor/MultiPromise.h"
#include "full-node.h" #include "full-node.h"
#include "common/delay.h" #include "common/delay.h"
#include "impl/out-msg-queue-proof.hpp"
#include "td/utils/Random.h" #include "td/utils/Random.h"
#include "ton/ton-tl.hpp" #include "ton/ton-tl.hpp"
@ -430,6 +431,24 @@ void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFull sh
timeout, std::move(promise)); timeout, std::move(promise));
} }
void FullNodeImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
if (blocks.empty()) {
promise.set_value({});
return;
}
// All blocks are expected to have the same minsplit shard prefix
auto shard = get_shard(blocks[0].shard_full());
if (shard.empty()) {
VLOG(FULL_NODE_WARNING) << "dropping download msg queue query to unknown shard";
promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready"));
return;
}
td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits,
timeout, std::move(promise));
}
td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard) { td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard) {
if (shard.is_masterchain()) { if (shard.is_masterchain()) {
return shards_[ShardIdFull{masterchainId}].actor.get(); return shards_[ShardIdFull{masterchainId}].actor.get();
@ -557,6 +576,11 @@ void FullNodeImpl::process_block_candidate_broadcast(BlockIdExt block_id, Catcha
std::move(data)); std::move(data));
} }
void FullNodeImpl::get_out_msg_queue_query_token(td::Promise<std::unique_ptr<ActionToken>> promise) {
td::actor::send_closure(out_msg_queue_query_token_manager_, &TokenManager::get_token, 1, 0, td::Timestamp::in(10.0),
std::move(promise));
}
void FullNodeImpl::set_validator_telemetry_filename(std::string value) { void FullNodeImpl::set_validator_telemetry_filename(std::string value) {
validator_telemetry_filename_ = std::move(value); validator_telemetry_filename_ = std::move(value);
update_validator_telemetry_collector(); update_validator_telemetry_collector();
@ -645,6 +669,12 @@ void FullNodeImpl::start_up() {
td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, shard_prefix, std::move(tmp_dir), td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, shard_prefix, std::move(tmp_dir),
timeout, std::move(promise)); timeout, std::move(promise));
} }
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override {
td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits,
timeout, std::move(promise));
}
void new_key_block(BlockHandle handle) override { void new_key_block(BlockHandle handle) override {
td::actor::send_closure(id_, &FullNodeImpl::new_key_block, std::move(handle)); td::actor::send_closure(id_, &FullNodeImpl::new_key_block, std::move(handle));

View file

@ -91,6 +91,7 @@ class FullNode : public td::actor::Actor {
virtual void process_block_broadcast(BlockBroadcast broadcast) = 0; virtual void process_block_broadcast(BlockBroadcast broadcast) = 0;
virtual void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, virtual void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::BufferSlice data) = 0; td::uint32 validator_set_hash, td::BufferSlice data) = 0;
virtual void get_out_msg_queue_query_token(td::Promise<std::unique_ptr<ActionToken>> promise) = 0;
virtual void set_validator_telemetry_filename(std::string value) = 0; virtual void set_validator_telemetry_filename(std::string value) = 0;

View file

@ -28,6 +28,7 @@
#include <map> #include <map>
#include <set> #include <set>
#include <queue> #include <queue>
#include <token-manager.h>
namespace ton { namespace ton {
@ -79,6 +80,9 @@ class FullNodeImpl : public FullNode {
void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise<std::vector<BlockIdExt>> promise); void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise<std::vector<BlockIdExt>> promise);
void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise); td::Timestamp timeout, td::Promise<std::string> promise);
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise);
void got_key_block_config(td::Ref<ConfigHolder> config); void got_key_block_config(td::Ref<ConfigHolder> config);
void new_key_block(BlockHandle handle); void new_key_block(BlockHandle handle);
@ -87,6 +91,7 @@ class FullNodeImpl : public FullNode {
void process_block_broadcast(BlockBroadcast broadcast) override; void process_block_broadcast(BlockBroadcast broadcast) override;
void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) override; td::BufferSlice data) override;
void get_out_msg_queue_query_token(td::Promise<std::unique_ptr<ActionToken>> promise) override;
void set_validator_telemetry_filename(std::string value) override; void set_validator_telemetry_filename(std::string value) override;
@ -160,6 +165,9 @@ class FullNodeImpl : public FullNode {
PublicKeyHash validator_telemetry_collector_key_ = PublicKeyHash::zero(); PublicKeyHash validator_telemetry_collector_key_ = PublicKeyHash::zero();
void update_validator_telemetry_collector(); void update_validator_telemetry_collector();
td::actor::ActorOwn<TokenManager> out_msg_queue_query_token_manager_ =
td::actor::create_actor<TokenManager>("tokens", /* max_tokens = */ 1);
}; };
} // namespace fullnode } // namespace fullnode

View file

@ -16,6 +16,7 @@ set(TON_VALIDATOR_SOURCE
ihr-message.cpp ihr-message.cpp
liteserver.cpp liteserver.cpp
message-queue.cpp message-queue.cpp
out-msg-queue-proof.cpp
proof.cpp proof.cpp
shard.cpp shard.cpp
signature-set.cpp signature-set.cpp
@ -35,6 +36,7 @@ set(TON_VALIDATOR_SOURCE
liteserver.hpp liteserver.hpp
liteserver-cache.hpp liteserver-cache.hpp
message-queue.hpp message-queue.hpp
out-msg-queue-proof.hpp
proof.hpp proof.hpp
shard.hpp shard.hpp
signature-set.hpp signature-set.hpp

View file

@ -0,0 +1,294 @@
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#include "out-msg-queue-proof.hpp"
#include "interfaces/proof.h"
#include "shard.hpp"
#include "vm/cells/MerkleProof.h"
#include "common/delay.h"
#include "interfaces/validator-manager.h"
#include "block/block-parse.h"
#include "block/block-auto.h"
#include "output-queue-merger.h"
namespace ton {
namespace validator {
static td::Status check_no_prunned(const Ref<vm::Cell>& cell) {
if (cell.is_null()) {
return td::Status::OK();
}
TRY_RESULT(loaded_cell, cell->load_cell());
if (loaded_cell.data_cell->get_level() > 0) {
return td::Status::Error("prunned branch");
}
return td::Status::OK();
}
static td::Status check_no_prunned(const vm::CellSlice& cs) {
for (unsigned i = 0; i < cs.size_refs(); ++i) {
TRY_STATUS(check_no_prunned(cs.prefetch_ref(i)));
}
return td::Status::OK();
}
static td::Result<std::vector<td::int32>> process_queue(
ShardIdFull dst_shard, std::vector<std::pair<BlockIdExt, block::gen::OutMsgQueueInfo::Record>> blocks,
block::ImportedMsgQueueLimits limits) {
td::uint64 estimated_proof_size = 0;
td::HashSet<vm::Cell::Hash> visited;
std::function<void(const vm::CellSlice&)> dfs_cs;
auto dfs = [&](const Ref<vm::Cell>& cell) {
if (cell.is_null() || !visited.insert(cell->get_hash()).second) {
return;
}
dfs_cs(vm::CellSlice(vm::NoVm(), cell));
};
dfs_cs = [&](const vm::CellSlice& cs) {
// Based on BlockLimitStatus::estimate_block_size
estimated_proof_size += 12 + (cs.size() + 7) / 8 + cs.size_refs() * 3;
for (unsigned i = 0; i < cs.size_refs(); i++) {
dfs(cs.prefetch_ref(i));
}
};
std::vector<block::OutputQueueMerger::Neighbor> neighbors;
for (auto& b : blocks) {
TRY_STATUS_PREFIX(check_no_prunned(*b.second.proc_info), "invalid proc_info proof: ")
dfs_cs(*b.second.proc_info);
neighbors.emplace_back(b.first, b.second.out_queue->prefetch_ref());
}
block::OutputQueueMerger queue_merger{dst_shard, std::move(neighbors)};
std::vector<td::int32> msg_count(blocks.size());
td::int32 msg_count_total = 0;
bool limit_reached = false;
while (!queue_merger.is_eof()) {
auto kv = queue_merger.extract_cur();
queue_merger.next();
block::EnqueuedMsgDescr enq;
auto msg = kv->msg;
if (!enq.unpack(msg.write())) {
return td::Status::Error("cannot unpack EnqueuedMsgDescr");
}
if (limit_reached) {
break;
}
++msg_count[kv->source];
++msg_count_total;
dfs_cs(*kv->msg);
TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ")
if (estimated_proof_size >= limits.max_bytes || msg_count_total >= (long long)limits.max_msgs) {
limit_reached = true;
}
}
if (!limit_reached) {
std::fill(msg_count.begin(), msg_count.end(), -1);
}
return msg_count;
}
td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> OutMsgQueueProof::build(
ShardIdFull dst_shard, std::vector<OneBlock> blocks, block::ImportedMsgQueueLimits limits) {
if (!dst_shard.is_valid_ext()) {
return td::Status::Error("invalid shard");
}
if (blocks.empty()) {
return create_tl_object<ton_api::tonNode_outMsgQueueProof>(td::BufferSlice{}, td::BufferSlice{},
std::vector<td::int32>{});
}
std::vector<td::Ref<vm::Cell>> block_state_proofs;
for (auto& block : blocks) {
if (block.id.seqno() != 0) {
if (block.block_root.is_null()) {
return td::Status::Error("block is null");
}
TRY_RESULT(proof, create_block_state_proof(block.block_root));
block_state_proofs.push_back(std::move(proof));
}
if (!block::ShardConfig::is_neighbor(dst_shard, block.id.shard_full())) {
return td::Status::Error("shards are not neighbors");
}
}
TRY_RESULT(block_state_proof, vm::std_boc_serialize_multi(block_state_proofs));
vm::Dictionary states_dict_pure{32};
for (size_t i = 0; i < blocks.size(); ++i) {
if (blocks[i].state_root.is_null()) {
return td::Status::Error("state is null");
}
states_dict_pure.set_ref(td::BitArray<32>{(long long)i}, blocks[i].state_root);
}
vm::MerkleProofBuilder mpb{states_dict_pure.get_root_cell()};
vm::Dictionary states_dict{mpb.root(), 32};
std::vector<std::pair<BlockIdExt, block::gen::OutMsgQueueInfo::Record>> data(blocks.size());
for (size_t i = 0; i < blocks.size(); ++i) {
data[i].first = blocks[i].id;
TRY_RESULT(state, ShardStateQ::fetch(blocks[i].id, {}, states_dict.lookup_ref(td::BitArray<32>{(long long)i})));
TRY_RESULT(outq_descr, state->message_queue());
block::gen::OutMsgQueueInfo::Record qinfo;
if (!tlb::unpack_cell(outq_descr->root_cell(), data[i].second)) {
return td::Status::Error("invalid message queue");
}
}
TRY_RESULT(msg_count, process_queue(dst_shard, std::move(data), limits));
TRY_RESULT(proof, mpb.extract_proof());
vm::Dictionary states_dict_proof{vm::CellSlice{vm::NoVm(), proof}.prefetch_ref(), 32};
std::vector<td::Ref<vm::Cell>> state_proofs;
for (size_t i = 0; i < blocks.size(); ++i) {
td::Ref<vm::Cell> proof_raw = states_dict_proof.lookup_ref(td::BitArray<32>{(long long)i});
CHECK(proof_raw.not_null());
state_proofs.push_back(vm::CellBuilder::create_merkle_proof(proof_raw));
}
TRY_RESULT(queue_proof, vm::std_boc_serialize_multi(state_proofs));
return create_tl_object<ton_api::tonNode_outMsgQueueProof>(std::move(queue_proof), std::move(block_state_proof),
std::move(msg_count));
}
td::Result<std::vector<td::Ref<OutMsgQueueProof>>> OutMsgQueueProof::fetch(ShardIdFull dst_shard,
std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
const ton_api::tonNode_outMsgQueueProof& f) {
try {
std::vector<td::Ref<OutMsgQueueProof>> res;
TRY_RESULT(queue_proofs, vm::std_boc_deserialize_multi(f.queue_proofs_, (int)blocks.size()));
TRY_RESULT(block_state_proofs, vm::std_boc_deserialize_multi(f.block_state_proofs_, (int)blocks.size()));
if (queue_proofs.size() != blocks.size()) {
return td::Status::Error("invalid size of queue_proofs");
}
if (f.msg_counts_.size() != blocks.size()) {
return td::Status::Error("invalid size of msg_counts");
}
size_t j = 0;
std::vector<std::pair<BlockIdExt, block::gen::OutMsgQueueInfo::Record>> data(blocks.size());
for (size_t i = 0; i < blocks.size(); ++i) {
td::Bits256 state_root_hash;
Ref<vm::Cell> block_state_proof = {};
if (blocks[i].seqno() == 0) {
state_root_hash = blocks[i].root_hash;
} else {
if (j == block_state_proofs.size()) {
return td::Status::Error("invalid size of block_state_proofs");
}
block_state_proof = block_state_proofs[j++];
TRY_RESULT_ASSIGN(state_root_hash, unpack_block_state_proof(blocks[i], block_state_proof));
}
auto state_root = vm::MerkleProof::virtualize(queue_proofs[i], 1);
if (state_root->get_hash().as_slice() != state_root_hash.as_slice()) {
return td::Status::Error("state root hash mismatch");
}
res.emplace_back(true, blocks[i], state_root, block_state_proof, f.msg_counts_[i]);
data[i].first = blocks[i];
TRY_RESULT(state, ShardStateQ::fetch(blocks[i], {}, state_root));
TRY_RESULT(outq_descr, state->message_queue());
block::gen::OutMsgQueueInfo::Record qinfo;
if (!tlb::unpack_cell(outq_descr->root_cell(), data[i].second)) {
return td::Status::Error("invalid message queue");
}
}
if (j != block_state_proofs.size()) {
return td::Status::Error("invalid size of block_state_proofs");
}
TRY_RESULT(msg_count, process_queue(dst_shard, std::move(data), limits));
if (msg_count != f.msg_counts_) {
return td::Status::Error("incorrect msg_count");
}
return res;
} catch (vm::VmVirtError& err) {
return td::Status::Error(PSTRING() << "invalid proof: " << err.get_msg());
}
}
void BuildOutMsgQueueProof::abort_query(td::Status reason) {
if (promise_) {
FLOG(DEBUG) {
sb << "failed to build msg queue proof to " << dst_shard_.to_str() << " from";
for (const auto& block : blocks_) {
sb << " " << block.id.id.to_str();
}
sb << ": " << reason;
};
promise_.set_error(
reason.move_as_error_prefix(PSTRING() << "failed to build msg queue proof to " << dst_shard_.to_str() << ": "));
}
stop();
}
void BuildOutMsgQueueProof::start_up() {
for (size_t i = 0; i < blocks_.size(); ++i) {
BlockIdExt id = blocks_[i].id;
++pending;
td::actor::send_closure(manager_, &ValidatorManagerInterface::get_shard_state_from_db_short, id,
[SelfId = actor_id(this), i](td::Result<Ref<ShardState>> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::abort_query,
R.move_as_error_prefix("failed to get shard state: "));
} else {
td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::got_state_root, i,
R.move_as_ok()->root_cell());
}
});
if (id.seqno() != 0) {
++pending;
td::actor::send_closure(manager_, &ValidatorManagerInterface::get_block_data_from_db_short, id,
[SelfId = actor_id(this), i](td::Result<Ref<BlockData>> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::abort_query,
R.move_as_error_prefix("failed to get block data: "));
} else {
td::actor::send_closure(SelfId, &BuildOutMsgQueueProof::got_block_root, i,
R.move_as_ok()->root_cell());
}
});
}
}
if (pending == 0) {
build_proof();
}
}
void BuildOutMsgQueueProof::got_state_root(size_t i, Ref<vm::Cell> root) {
blocks_[i].state_root = std::move(root);
if (--pending == 0) {
build_proof();
}
}
void BuildOutMsgQueueProof::got_block_root(size_t i, Ref<vm::Cell> root) {
blocks_[i].block_root = std::move(root);
if (--pending == 0) {
build_proof();
}
}
void BuildOutMsgQueueProof::build_proof() {
auto result = OutMsgQueueProof::build(dst_shard_, std::move(blocks_), limits_);
if (result.is_error()) {
LOG(ERROR) << "Failed to build msg queue proof: " << result.error();
}
promise_.set_result(std::move(result));
stop();
}
} // namespace validator
} // namespace ton

View file

@ -0,0 +1,64 @@
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "vm/cells.h"
#include "ton/ton-types.h"
#include "auto/tl/ton_api.h"
#include "interfaces/out-msg-queue-proof.h"
#include "td/actor/actor.h"
#include "interfaces/shard.h"
#include "validator.h"
namespace ton {
namespace validator {
using td::Ref;
class ValidatorManager;
class ValidatorManagerInterface;
class BuildOutMsgQueueProof : public td::actor::Actor {
public:
BuildOutMsgQueueProof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::actor::ActorId<ValidatorManagerInterface> manager,
td::Promise<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> promise)
: dst_shard_(dst_shard), limits_(limits), manager_(manager), promise_(std::move(promise)) {
blocks_.resize(blocks.size());
for (size_t i = 0; i < blocks_.size(); ++i) {
blocks_[i].id = blocks[i];
}
}
void abort_query(td::Status reason);
void start_up() override;
void got_state_root(size_t i, Ref<vm::Cell> root);
void got_block_root(size_t i, Ref<vm::Cell> root);
void build_proof();
private:
ShardIdFull dst_shard_;
std::vector<OutMsgQueueProof::OneBlock> blocks_;
block::ImportedMsgQueueLimits limits_;
td::actor::ActorId<ValidatorManagerInterface> manager_;
td::Promise<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> promise_;
size_t pending = 0;
};
} // namespace validator
} // namespace ton

View file

@ -162,5 +162,40 @@ td::Result<Ref<vm::Cell>> ProofQ::get_signatures_root() const {
return proof.signatures->prefetch_ref(); return proof.signatures->prefetch_ref();
} }
td::Result<td::Ref<vm::Cell>> create_block_state_proof(td::Ref<vm::Cell> root) {
if (root.is_null()) {
return td::Status::Error("root is null");
}
vm::MerkleProofBuilder mpb{std::move(root)};
block::gen::Block::Record block;
if (!tlb::unpack_cell(mpb.root(), block) || block.state_update->load_cell().is_error()) {
return td::Status::Error("invalid block");
}
TRY_RESULT(proof, mpb.extract_proof());
if (proof.is_null()) {
return td::Status::Error("failed to create proof");
}
return proof;
}
td::Result<RootHash> unpack_block_state_proof(BlockIdExt block_id, td::Ref<vm::Cell> proof) {
auto virt_root = vm::MerkleProof::virtualize(proof, 1);
if (virt_root.is_null()) {
return td::Status::Error("invalid Merkle proof");
}
if (virt_root->get_hash().as_slice() != block_id.root_hash.as_slice()) {
return td::Status::Error("hash mismatch");
}
block::gen::Block::Record block;
if (!tlb::unpack_cell(virt_root, block)) {
return td::Status::Error("invalid block");
}
vm::CellSlice upd_cs{vm::NoVmSpec(), block.state_update};
if (!(upd_cs.is_special() && upd_cs.prefetch_long(8) == 4 && upd_cs.size_ext() == 0x20228)) {
return td::Status::Error("invalid Merkle update");
}
return upd_cs.prefetch_ref(1)->get_hash(0).bits();
}
} // namespace validator } // namespace validator
} // namespace ton } // namespace ton

View file

@ -0,0 +1,57 @@
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "vm/cells.h"
#include "ton/ton-types.h"
#include "auto/tl/ton_api.h"
#include "block/block.h"
namespace ton {
namespace validator {
using td::Ref;
struct OutMsgQueueProof : public td::CntObject {
OutMsgQueueProof(BlockIdExt block_id, Ref<vm::Cell> state_root, Ref<vm::Cell> block_state_proof,
td::int32 msg_count = -1)
: block_id_(block_id)
, state_root_(std::move(state_root))
, block_state_proof_(std::move(block_state_proof))
, msg_count_(msg_count) {
}
BlockIdExt block_id_;
Ref<vm::Cell> state_root_;
Ref<vm::Cell> block_state_proof_;
td::int32 msg_count_; // -1 - no limit
static td::Result<std::vector<td::Ref<OutMsgQueueProof>>> fetch(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
const ton_api::tonNode_outMsgQueueProof &f);
struct OneBlock {
BlockIdExt id;
Ref<vm::Cell> state_root;
Ref<vm::Cell> block_root;
};
static td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> build(ShardIdFull dst_shard,
std::vector<OneBlock> blocks,
block::ImportedMsgQueueLimits limits);
};
} // namespace validator
} // namespace ton

View file

@ -48,6 +48,9 @@ class Proof : virtual public ProofLink {
virtual td::Result<td::Ref<ProofLink>> export_as_proof_link() const = 0; virtual td::Result<td::Ref<ProofLink>> export_as_proof_link() const = 0;
}; };
td::Result<td::Ref<vm::Cell>> create_block_state_proof(td::Ref<vm::Cell> root);
td::Result<RootHash> unpack_block_state_proof(BlockIdExt block_id, td::Ref<vm::Cell> proof);
} // namespace validator } // namespace validator
} // namespace ton } // namespace ton

View file

@ -146,6 +146,9 @@ class ValidatorManager : public ValidatorManagerInterface {
virtual void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) = 0; virtual void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) = 0;
virtual void send_block_broadcast(BlockBroadcast broadcast, int mode) = 0; virtual void send_block_broadcast(BlockBroadcast broadcast, int mode) = 0;
virtual void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) = 0; virtual void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) = 0;
virtual void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) = 0;
virtual void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, virtual void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) = 0; td::Timestamp timeout, td::Promise<std::string> promise) = 0;

View file

@ -265,6 +265,11 @@ class ValidatorManagerImpl : public ValidatorManager {
} }
void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) override { void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) override {
} }
void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override {
UNREACHABLE();
}
void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override { td::Timestamp timeout, td::Promise<std::string> promise) override {
UNREACHABLE(); UNREACHABLE();
@ -283,7 +288,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) override; void try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) override;
void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout,
td::Promise<std::unique_ptr<DownloadToken>> promise) override { td::Promise<std::unique_ptr<ActionToken>> promise) override {
promise.set_error(td::Status::Error(ErrorCode::error, "download disabled")); promise.set_error(td::Status::Error(ErrorCode::error, "download disabled"));
} }

View file

@ -335,6 +335,11 @@ class ValidatorManagerImpl : public ValidatorManager {
} }
void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) override { void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) override {
} }
void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override {
UNREACHABLE();
}
void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override { td::Timestamp timeout, td::Promise<std::string> promise) override {
UNREACHABLE(); UNREACHABLE();
@ -357,7 +362,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) override; void try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) override;
void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout,
td::Promise<std::unique_ptr<DownloadToken>> promise) override { td::Promise<std::unique_ptr<ActionToken>> promise) override {
promise.set_error(td::Status::Error(ErrorCode::error, "download disabled")); promise.set_error(td::Status::Error(ErrorCode::error, "download disabled"));
} }

View file

@ -1652,6 +1652,13 @@ void ValidatorManagerImpl::send_validator_telemetry(PublicKeyHash key,
callback_->send_validator_telemetry(key, std::move(telemetry)); callback_->send_validator_telemetry(key, std::move(telemetry));
} }
void ValidatorManagerImpl::send_get_out_msg_queue_proof_request(
ShardIdFull dst_shard, std::vector<BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
callback_->download_out_msg_queue_proof(dst_shard, std::move(blocks), limits, td::Timestamp::in(10.0),
std::move(promise));
}
void ValidatorManagerImpl::send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, void ValidatorManagerImpl::send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix,
std::string tmp_dir, td::Timestamp timeout, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise) { td::Promise<std::string> promise) {

View file

@ -512,6 +512,9 @@ class ValidatorManagerImpl : public ValidatorManager {
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override; void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override;
void send_block_broadcast(BlockBroadcast broadcast, int mode) override; void send_block_broadcast(BlockBroadcast broadcast, int mode) override;
void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) override; void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) override;
void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override;
void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir, void send_download_archive_request(BlockSeqno mc_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override; td::Timestamp timeout, td::Promise<std::string> promise) override;
@ -524,8 +527,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) override; void try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) override;
void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout,
td::Promise<std::unique_ptr<DownloadToken>> promise) override { td::Promise<std::unique_ptr<ActionToken>> promise) override {
td::actor::send_closure(token_manager_, &TokenManager::get_download_token, download_size, priority, timeout, td::actor::send_closure(token_manager_, &TokenManager::get_token, download_size, priority, timeout,
std::move(promise)); std::move(promise));
} }

View file

@ -144,7 +144,7 @@ void DownloadBlockNew::got_block_handle(BlockHandle handle) {
return; return;
} }
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::unique_ptr<DownloadToken>> R) { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::unique_ptr<ActionToken>> R) {
if (R.is_error()) { if (R.is_error()) {
td::actor::send_closure(SelfId, &DownloadBlockNew::abort_query, td::actor::send_closure(SelfId, &DownloadBlockNew::abort_query,
R.move_as_error_prefix("failed to get download token: ")); R.move_as_error_prefix("failed to get download token: "));
@ -156,7 +156,7 @@ void DownloadBlockNew::got_block_handle(BlockHandle handle) {
std::move(P)); std::move(P));
} }
void DownloadBlockNew::got_download_token(std::unique_ptr<DownloadToken> token) { void DownloadBlockNew::got_download_token(std::unique_ptr<ActionToken> token) {
token_ = std::move(token); token_ = std::move(token);
if (download_from_.is_zero() && client_.empty()) { if (download_from_.is_zero() && client_.empty()) {

View file

@ -49,7 +49,7 @@ class DownloadBlockNew : public td::actor::Actor {
void start_up() override; void start_up() override;
void got_block_handle(BlockHandle handle); void got_block_handle(BlockHandle handle);
void got_download_token(std::unique_ptr<DownloadToken> token); void got_download_token(std::unique_ptr<ActionToken> token);
void got_node_to_download(adnl::AdnlNodeIdShort node); void got_node_to_download(adnl::AdnlNodeIdShort node);
void got_data(td::BufferSlice data); void got_data(td::BufferSlice data);
void got_data_from_db(td::BufferSlice data); void got_data_from_db(td::BufferSlice data);
@ -79,7 +79,7 @@ class DownloadBlockNew : public td::actor::Actor {
bool allow_partial_proof_ = false; bool allow_partial_proof_ = false;
std::unique_ptr<DownloadToken> token_; std::unique_ptr<ActionToken> token_;
}; };
} // namespace fullnode } // namespace fullnode

View file

@ -128,7 +128,7 @@ void DownloadBlock::got_block_handle(BlockHandle handle) {
return; return;
} }
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::unique_ptr<DownloadToken>> R) { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::unique_ptr<ActionToken>> R) {
if (R.is_error()) { if (R.is_error()) {
td::actor::send_closure(SelfId, &DownloadBlock::abort_query, td::actor::send_closure(SelfId, &DownloadBlock::abort_query,
R.move_as_error_prefix("failed to get download token: ")); R.move_as_error_prefix("failed to get download token: "));
@ -140,7 +140,7 @@ void DownloadBlock::got_block_handle(BlockHandle handle) {
std::move(P)); std::move(P));
} }
void DownloadBlock::got_download_token(std::unique_ptr<DownloadToken> token) { void DownloadBlock::got_download_token(std::unique_ptr<ActionToken> token) {
token_ = std::move(token); token_ = std::move(token);
if (download_from_.is_zero() && !short_ && client_.empty()) { if (download_from_.is_zero() && !short_ && client_.empty()) {

View file

@ -49,7 +49,7 @@ class DownloadBlock : public td::actor::Actor {
void start_up() override; void start_up() override;
void got_block_handle(BlockHandle handle); void got_block_handle(BlockHandle handle);
void got_download_token(std::unique_ptr<DownloadToken> token); void got_download_token(std::unique_ptr<ActionToken> token);
void got_node_to_download(adnl::AdnlNodeIdShort node); void got_node_to_download(adnl::AdnlNodeIdShort node);
void got_block_proof_description(td::BufferSlice proof_description); void got_block_proof_description(td::BufferSlice proof_description);
void got_block_proof(td::BufferSlice data); void got_block_proof(td::BufferSlice data);
@ -86,7 +86,7 @@ class DownloadBlock : public td::actor::Actor {
bool allow_partial_proof_ = false; bool allow_partial_proof_ = false;
std::unique_ptr<DownloadToken> token_; std::unique_ptr<ActionToken> token_;
}; };
} // namespace fullnode } // namespace fullnode

View file

@ -107,7 +107,7 @@ void DownloadProof::start_up() {
} }
void DownloadProof::checked_db() { void DownloadProof::checked_db() {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::unique_ptr<DownloadToken>> R) { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::unique_ptr<ActionToken>> R) {
if (R.is_error()) { if (R.is_error()) {
td::actor::send_closure(SelfId, &DownloadProof::abort_query, td::actor::send_closure(SelfId, &DownloadProof::abort_query,
R.move_as_error_prefix("failed to get download token: ")); R.move_as_error_prefix("failed to get download token: "));
@ -119,7 +119,7 @@ void DownloadProof::checked_db() {
std::move(P)); std::move(P));
} }
void DownloadProof::got_download_token(std::unique_ptr<DownloadToken> token) { void DownloadProof::got_download_token(std::unique_ptr<ActionToken> token) {
token_ = std::move(token); token_ = std::move(token);
if (download_from_.is_zero() && client_.empty()) { if (download_from_.is_zero() && client_.empty()) {

View file

@ -45,7 +45,7 @@ class DownloadProof : public td::actor::Actor {
void start_up() override; void start_up() override;
void checked_db(); void checked_db();
void got_download_token(std::unique_ptr<DownloadToken> token); void got_download_token(std::unique_ptr<ActionToken> token);
void got_node_to_download(adnl::AdnlNodeIdShort node); void got_node_to_download(adnl::AdnlNodeIdShort node);
void got_block_proof_description(td::BufferSlice proof_description); void got_block_proof_description(td::BufferSlice proof_description);
void got_block_proof(td::BufferSlice data); void got_block_proof(td::BufferSlice data);
@ -72,7 +72,7 @@ class DownloadProof : public td::actor::Actor {
td::BufferSlice data_; td::BufferSlice data_;
std::unique_ptr<DownloadToken> token_; std::unique_ptr<ActionToken> token_;
}; };
} // namespace fullnode } // namespace fullnode

View file

@ -84,7 +84,7 @@ void GetNextKeyBlocks::finish_query() {
void GetNextKeyBlocks::start_up() { void GetNextKeyBlocks::start_up() {
alarm_timestamp() = timeout_; alarm_timestamp() = timeout_;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::unique_ptr<DownloadToken>> R) { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::unique_ptr<ActionToken>> R) {
if (R.is_error()) { if (R.is_error()) {
td::actor::send_closure(SelfId, &GetNextKeyBlocks::abort_query, td::actor::send_closure(SelfId, &GetNextKeyBlocks::abort_query,
R.move_as_error_prefix("failed to get download token: ")); R.move_as_error_prefix("failed to get download token: "));
@ -96,7 +96,7 @@ void GetNextKeyBlocks::start_up() {
std::move(P)); std::move(P));
} }
void GetNextKeyBlocks::got_download_token(std::unique_ptr<DownloadToken> token) { void GetNextKeyBlocks::got_download_token(std::unique_ptr<ActionToken> token) {
token_ = std::move(token); token_ = std::move(token);
if (download_from_.is_zero() && client_.empty()) { if (download_from_.is_zero() && client_.empty()) {

View file

@ -44,7 +44,7 @@ class GetNextKeyBlocks : public td::actor::Actor {
void finish_query(); void finish_query();
void start_up() override; void start_up() override;
void got_download_token(std::unique_ptr<DownloadToken> token); void got_download_token(std::unique_ptr<ActionToken> token);
void got_node_to_download(adnl::AdnlNodeIdShort node); void got_node_to_download(adnl::AdnlNodeIdShort node);
void send_request(); void send_request();
void got_result(td::BufferSlice res); void got_result(td::BufferSlice res);
@ -75,7 +75,7 @@ class GetNextKeyBlocks : public td::actor::Actor {
std::vector<BlockIdExt> pending_; std::vector<BlockIdExt> pending_;
std::vector<BlockIdExt> res_; std::vector<BlockIdExt> res_;
std::unique_ptr<DownloadToken> token_; std::unique_ptr<ActionToken> token_;
}; };
} // namespace fullnode } // namespace fullnode

View file

@ -22,23 +22,23 @@ namespace ton {
namespace validator { namespace validator {
void TokenManager::get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, void TokenManager::get_token(size_t size, td::uint32 priority, td::Timestamp timeout,
td::Promise<std::unique_ptr<DownloadToken>> promise) { td::Promise<std::unique_ptr<ActionToken>> promise) {
if (free_priority_tokens_ > 0 && priority > 0) { if (free_priority_tokens_ > 0 && priority > 0) {
--free_priority_tokens_; --free_priority_tokens_;
promise.set_value(gen_token(download_size, priority)); promise.set_value(gen_token(size, priority));
return; return;
} }
if (free_tokens_ > 0) { if (free_tokens_ > 0) {
--free_tokens_; --free_tokens_;
promise.set_value(gen_token(download_size, priority)); promise.set_value(gen_token(size, priority));
return; return;
} }
pending_.emplace(PendingPromiseKey{download_size, priority, seqno_++}, PendingPromise{timeout, std::move(promise)}); pending_.emplace(PendingPromiseKey{size, priority, seqno_++}, PendingPromise{timeout, std::move(promise)});
} }
void TokenManager::download_token_cleared(size_t download_size, td::uint32 priority) { void TokenManager::token_cleared(size_t size, td::uint32 priority) {
(priority ? free_priority_tokens_ : free_tokens_)++; (priority ? free_priority_tokens_ : free_tokens_)++;
if (free_priority_tokens_ > max_priority_tokens_) { if (free_priority_tokens_ > max_priority_tokens_) {
free_priority_tokens_--; free_priority_tokens_--;
@ -47,7 +47,7 @@ void TokenManager::download_token_cleared(size_t download_size, td::uint32 prior
for (auto it = pending_.begin(); it != pending_.end();) { for (auto it = pending_.begin(); it != pending_.end();) {
if (it->first.priority && (free_tokens_ || free_priority_tokens_)) { if (it->first.priority && (free_tokens_ || free_priority_tokens_)) {
it->second.promise.set_value(gen_token(download_size, priority)); it->second.promise.set_value(gen_token(size, priority));
auto it2 = it++; auto it2 = it++;
pending_.erase(it2); pending_.erase(it2);
if (free_priority_tokens_ > 0) { if (free_priority_tokens_ > 0) {
@ -56,7 +56,7 @@ void TokenManager::download_token_cleared(size_t download_size, td::uint32 prior
free_tokens_--; free_tokens_--;
} }
} else if (!it->first.priority && free_tokens_) { } else if (!it->first.priority && free_tokens_) {
it->second.promise.set_value(gen_token(download_size, priority)); it->second.promise.set_value(gen_token(size, priority));
auto it2 = it++; auto it2 = it++;
pending_.erase(it2); pending_.erase(it2);
free_tokens_--; free_tokens_--;
@ -69,7 +69,7 @@ void TokenManager::download_token_cleared(size_t download_size, td::uint32 prior
void TokenManager::alarm() { void TokenManager::alarm() {
for (auto it = pending_.begin(); it != pending_.end();) { for (auto it = pending_.begin(); it != pending_.end();) {
if (it->second.timeout.is_in_past()) { if (it->second.timeout.is_in_past()) {
it->second.promise.set_error(td::Status::Error(ErrorCode::timeout, "timeout in wait download token")); it->second.promise.set_error(td::Status::Error(ErrorCode::timeout, "timeout in wait token"));
it = pending_.erase(it); it = pending_.erase(it);
} else { } else {
it++; it++;
@ -77,23 +77,23 @@ void TokenManager::alarm() {
} }
} }
std::unique_ptr<DownloadToken> TokenManager::gen_token(size_t download_size, td::uint32 priority) { std::unique_ptr<ActionToken> TokenManager::gen_token(size_t size, td::uint32 priority) {
class Token : public DownloadToken { class TokenImpl : public ActionToken {
public: public:
Token(size_t download_size, td::uint32 priority, td::actor::ActorId<TokenManager> manager) TokenImpl(size_t size, td::uint32 priority, td::actor::ActorId<TokenManager> manager)
: download_size_(download_size), priority_(priority), manager_(manager) { : size_(size), priority_(priority), manager_(manager) {
} }
~Token() override { ~TokenImpl() override {
td::actor::send_closure(manager_, &TokenManager::download_token_cleared, download_size_, priority_); td::actor::send_closure(manager_, &TokenManager::token_cleared, size_, priority_);
} }
private: private:
size_t download_size_; size_t size_;
td::uint32 priority_; td::uint32 priority_;
td::actor::ActorId<TokenManager> manager_; td::actor::ActorId<TokenManager> manager_;
}; };
return std::make_unique<Token>(download_size, priority, actor_id(this)); return std::make_unique<TokenImpl>(size, priority, actor_id(this));
} }
} // namespace validator } // namespace validator

View file

@ -31,16 +31,19 @@ class TokenManager : public td::actor::Actor {
public: public:
TokenManager() { TokenManager() {
} }
explicit TokenManager(td::uint32 max_tokens)
: free_tokens_(max_tokens), free_priority_tokens_(max_tokens), max_priority_tokens_(max_tokens) {
}
void alarm() override; void alarm() override;
void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, void get_token(size_t size, td::uint32 priority, td::Timestamp timeout,
td::Promise<std::unique_ptr<DownloadToken>> promise); td::Promise<std::unique_ptr<ActionToken>> promise);
void download_token_cleared(size_t download_size, td::uint32 priority); void token_cleared(size_t size, td::uint32 priority);
private: private:
std::unique_ptr<DownloadToken> gen_token(size_t download_size, td::uint32 priority); std::unique_ptr<ActionToken> gen_token(size_t size, td::uint32 priority);
struct PendingPromiseKey { struct PendingPromiseKey {
size_t download_size; size_t size;
td::uint32 priority; td::uint32 priority;
td::uint64 seqno; td::uint64 seqno;
@ -50,7 +53,7 @@ class TokenManager : public td::actor::Actor {
}; };
struct PendingPromise { struct PendingPromise {
td::Timestamp timeout; td::Timestamp timeout;
td::Promise<std::unique_ptr<DownloadToken>> promise; td::Promise<std::unique_ptr<ActionToken>> promise;
}; };
td::uint64 seqno_ = 0; td::uint64 seqno_ = 0;
std::map<PendingPromiseKey, PendingPromise> pending_; std::map<PendingPromiseKey, PendingPromise> pending_;

View file

@ -35,15 +35,16 @@
#include "interfaces/proof.h" #include "interfaces/proof.h"
#include "interfaces/shard.h" #include "interfaces/shard.h"
#include "catchain/catchain-types.h" #include "catchain/catchain-types.h"
#include "interfaces/out-msg-queue-proof.h"
#include "interfaces/external-message.h" #include "interfaces/external-message.h"
namespace ton { namespace ton {
namespace validator { namespace validator {
class DownloadToken { class ActionToken {
public: public:
virtual ~DownloadToken() = default; virtual ~ActionToken() = default;
}; };
struct PerfTimerStats { struct PerfTimerStats {
@ -186,6 +187,9 @@ class ValidatorManagerInterface : public td::actor::Actor {
td::Promise<std::vector<BlockIdExt>> promise) = 0; td::Promise<std::vector<BlockIdExt>> promise) = 0;
virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir, virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) = 0; td::Timestamp timeout, td::Promise<std::string> promise) = 0;
virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) = 0;
virtual void new_key_block(BlockHandle handle) = 0; virtual void new_key_block(BlockHandle handle) = 0;
virtual void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) = 0; virtual void send_validator_telemetry(PublicKeyHash key, tl_object_ptr<ton_api::validator_telemetry> telemetry) = 0;
@ -248,7 +252,7 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void add_ext_server_port(td::uint16 port) = 0; virtual void add_ext_server_port(td::uint16 port) = 0;
virtual void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout, virtual void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout,
td::Promise<std::unique_ptr<DownloadToken>> promise) = 0; td::Promise<std::unique_ptr<ActionToken>> promise) = 0;
virtual void get_block_data_from_db(ConstBlockHandle handle, td::Promise<td::Ref<BlockData>> promise) = 0; virtual void get_block_data_from_db(ConstBlockHandle handle, td::Promise<td::Ref<BlockData>> promise) = 0;
virtual void get_block_data_from_db_short(BlockIdExt block_id, td::Promise<td::Ref<BlockData>> promise) = 0; virtual void get_block_data_from_db_short(BlockIdExt block_id, td::Promise<td::Ref<BlockData>> promise) = 0;