From 59927ba534094b5fa0d8abdbbeadc3e04cd71b9c Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Thu, 1 Feb 2024 20:20:45 +0300 Subject: [PATCH 01/10] Improve block broadcasts processing; add special overlay for blocks for validators (#885) * Improve block broadcast processing * ValidatorManagerImpl::written_handle * Retry sending broadcasts in ValidatorGroup * Fix setting channel_ready in AdnlPeerPair * Add special overlay for validators for block broadcasting (#842) * Private overlay for broadcasting blocks --------- Co-authored-by: SpyCheese (cherry picked from commit a52045bd91004bccabf10fd159371ffbbf60ffad) --------- Co-authored-by: SpyCheese --- adnl/adnl-peer-table.hpp | 4 + adnl/adnl-peer.cpp | 4 +- adnl/adnl.h | 2 + tl/generate/scheme/ton_api.tl | 2 + tl/generate/scheme/ton_api.tlo | Bin 86292 -> 86524 bytes ton/ton-types.h | 8 + validator/CMakeLists.txt | 4 +- validator/downloaders/wait-block-state.cpp | 22 ++- validator/downloaders/wait-block-state.hpp | 20 ++- validator/full-node-private-overlay.cpp | 175 +++++++++++++++++++++ validator/full-node-private-overlay.hpp | 86 ++++++++++ validator/full-node.cpp | 51 ++++++ validator/full-node.hpp | 7 + validator/manager.cpp | 17 +- validator/validator-group.cpp | 36 +++-- validator/validator-group.hpp | 2 +- 16 files changed, 410 insertions(+), 30 deletions(-) create mode 100644 validator/full-node-private-overlay.cpp create mode 100644 validator/full-node-private-overlay.hpp diff --git a/adnl/adnl-peer-table.hpp b/adnl/adnl-peer-table.hpp index 2a27a802..1c30b84c 100644 --- a/adnl/adnl-peer-table.hpp +++ b/adnl/adnl-peer-table.hpp @@ -77,6 +77,10 @@ class AdnlPeerTableImpl : public AdnlPeerTable { td::actor::ActorId channel) override; void unregister_channel(AdnlChannelIdShort id) override; + void check_id_exists(AdnlNodeIdShort id, td::Promise promise) override { + promise.set_value(local_ids_.count(id)); + } + void write_new_addr_list_to_db(AdnlNodeIdShort local_id, AdnlNodeIdShort peer_id, AdnlDbItem node, td::Promise promise) override; void get_addr_list_from_db(AdnlNodeIdShort local_id, AdnlNodeIdShort peer_id, diff --git a/adnl/adnl-peer.cpp b/adnl/adnl-peer.cpp index 44979885..3e21a7f5 100644 --- a/adnl/adnl-peer.cpp +++ b/adnl/adnl-peer.cpp @@ -212,7 +212,9 @@ void AdnlPeerPairImpl::receive_packet_from_channel(AdnlChannelIdShort id, AdnlPa VLOG(ADNL_NOTICE) << this << ": dropping IN message: outdated channel id" << id; return; } - channel_ready_ = true; + if (channel_inited_) { + channel_ready_ = true; + } receive_packet_checked(std::move(packet)); } diff --git a/adnl/adnl.h b/adnl/adnl.h index b7dad216..a1c39d5e 100644 --- a/adnl/adnl.h +++ b/adnl/adnl.h @@ -97,6 +97,8 @@ class Adnl : public AdnlSenderInterface { virtual void add_id_ex(AdnlNodeIdFull id, AdnlAddressList addr_list, td::uint8 cat, td::uint32 mode) = 0; virtual void del_id(AdnlNodeIdShort id, td::Promise promise) = 0; + virtual void check_id_exists(AdnlNodeIdShort id, td::Promise promise) = 0; + // subscribe to (some) messages(+queries) to this local id virtual void subscribe(AdnlNodeIdShort dst, std::string prefix, std::unique_ptr callback) = 0; virtual void unsubscribe(AdnlNodeIdShort dst, std::string prefix) = 0; diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 21e8318f..0f94e7f1 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -394,6 +394,8 @@ tonNode.newShardBlockBroadcast block:tonNode.newShardBlock = tonNode.Broadcast; tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId; +tonNode.privateBlockOverlayId zero_state_file_hash:int256 nodes:(vector int256) = tonNode.PrivateBlockOverlayId; + tonNode.keyBlocks blocks:(vector tonNode.blockIdExt) incomplete:Bool error:Bool = tonNode.KeyBlocks; ton.blockId root_cell_hash:int256 file_hash:int256 = ton.BlockId; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index 9b74f1c3bbef41b10dba639833177fe7f6b6df74..e71fb0f726456b06fbbbad3dc63d41f59a226c3d 100644 GIT binary patch delta 139 zcmbQTi1p86R^CUm^{p77;QvNmXI-VF8()^mmgML8<)@_T1r%kLC6=T*<>V)4`mrcA!aEVVg6*M)_#cJoxj?R#}mv=m^|!oUDhbMu&x57Xq<55|)(MA@)_L^eCT Hzmf<5bmlp1 delta 41 scmeyfn03k`R^CUm^{p77;NM1GXWh*`x-Kk?HJdLQZr{7v new_signatures; + for (const BlockSignature& s : signatures) { + new_signatures.emplace_back(s.node, s.signature.clone()); + } + return {block_id, std::move(new_signatures), catchain_seqno, validator_set_hash, data.clone(), proof.clone()}; + } }; struct Ed25519_PrivateKey { diff --git a/validator/CMakeLists.txt b/validator/CMakeLists.txt index 4ecc865c..8de60081 100644 --- a/validator/CMakeLists.txt +++ b/validator/CMakeLists.txt @@ -143,7 +143,9 @@ set(FULL_NODE_SOURCE full-node-master.h full-node-master.hpp full-node-master.cpp - + full-node-private-overlay.hpp + full-node-private-overlay.cpp + net/download-block.hpp net/download-block.cpp net/download-block-new.hpp diff --git a/validator/downloaders/wait-block-state.cpp b/validator/downloaders/wait-block-state.cpp index 56137fc3..42f5c791 100644 --- a/validator/downloaders/wait-block-state.cpp +++ b/validator/downloaders/wait-block-state.cpp @@ -109,12 +109,14 @@ void WaitBlockState::start() { } else if (!handle_->inited_prev() || (!handle_->inited_proof() && !handle_->inited_proof_link())) { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle = handle_](td::Result R) { if (R.is_error()) { - delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::start); }, td::Timestamp::in(0.1)); + delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link); }, + td::Timestamp::in(0.1)); } else { td::actor::send_closure(SelfId, &WaitBlockState::got_proof_link, R.move_as_ok()); } }); + waiting_proof_link_ = true; td::actor::send_closure(manager_, &ValidatorManager::send_get_block_proof_link_request, handle_->id(), priority_, std::move(P)); } else if (prev_state_.is_null()) { @@ -133,12 +135,14 @@ void WaitBlockState::start() { } else if (handle_->id().is_masterchain() && !handle_->inited_proof()) { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle = handle_](td::Result R) { if (R.is_error()) { - delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::start); }, td::Timestamp::in(0.1)); + delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof); }, + td::Timestamp::in(0.1)); } else { td::actor::send_closure(SelfId, &WaitBlockState::got_proof, R.move_as_ok()); } }); + waiting_proof_ = true; td::actor::send_closure(manager_, &ValidatorManager::send_get_block_proof_request, handle_->id(), priority_, std::move(P)); } else if (block_.is_null()) { @@ -172,6 +176,9 @@ void WaitBlockState::got_prev_state(td::Ref state) { } void WaitBlockState::got_proof_link(td::BufferSlice data) { + if (!waiting_proof_link_) { + return; + } auto R = create_proof_link(handle_->id(), std::move(data)); if (R.is_error()) { LOG(INFO) << "received bad proof link: " << R.move_as_error(); @@ -182,22 +189,25 @@ void WaitBlockState::got_proof_link(td::BufferSlice data) { if (R.is_ok()) { auto h = R.move_as_ok(); CHECK(h->inited_prev()); - td::actor::send_closure(SelfId, &WaitBlockState::start); + td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link); } else { LOG(INFO) << "received bad proof link: " << R.move_as_error(); - td::actor::send_closure(SelfId, &WaitBlockState::start); + td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link); } }); run_check_proof_link_query(handle_->id(), R.move_as_ok(), manager_, timeout_, std::move(P)); } void WaitBlockState::got_proof(td::BufferSlice data) { + if (!waiting_proof_) { + return; + } auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result R) { if (R.is_ok()) { - td::actor::send_closure(SelfId, &WaitBlockState::start); + td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof); } else { LOG(INFO) << "received bad proof link: " << R.move_as_error(); - td::actor::send_closure(SelfId, &WaitBlockState::start); + td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof); } }); td::actor::send_closure(manager_, &ValidatorManager::validate_block_proof, handle_->id(), std::move(data), diff --git a/validator/downloaders/wait-block-state.hpp b/validator/downloaders/wait-block-state.hpp index 7cdc0699..4b484ca8 100644 --- a/validator/downloaders/wait-block-state.hpp +++ b/validator/downloaders/wait-block-state.hpp @@ -45,11 +45,9 @@ class WaitBlockState : public td::actor::Actor { void force_read_from_db(); void start_up() override; - void got_block_handle(BlockHandle handle); void start(); void got_state_from_db(td::Ref data); void got_state_from_static_file(td::Ref state, td::BufferSlice data); - void failed_to_get_state_from_db(td::Status reason); void got_prev_state(td::Ref state); void failed_to_get_prev_state(td::Status reason); void got_block_data(td::Ref data); @@ -68,6 +66,22 @@ class WaitBlockState : public td::actor::Actor { priority_ = priority; } + // These two methods can be called from ValidatorManagerImpl::written_handle + void after_get_proof_link() { + if (!waiting_proof_link_) { + return; + } + waiting_proof_link_ = false; + start(); + } + void after_get_proof() { + if (!waiting_proof_) { + return; + } + waiting_proof_ = false; + start(); + } + private: BlockHandle handle_; @@ -81,6 +95,8 @@ class WaitBlockState : public td::actor::Actor { td::Ref block_; bool reading_from_db_ = false; + bool waiting_proof_link_ = false; + bool waiting_proof_ = false; td::Timestamp next_static_file_attempt_; td::PerfWarningTimer perf_timer_; diff --git a/validator/full-node-private-overlay.cpp b/validator/full-node-private-overlay.cpp new file mode 100644 index 00000000..ea72230b --- /dev/null +++ b/validator/full-node-private-overlay.cpp @@ -0,0 +1,175 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once + +#include "full-node-private-overlay.hpp" +#include "ton/ton-tl.hpp" +#include "common/delay.h" + +namespace ton { + +namespace validator { + +namespace fullnode { + +void FullNodePrivateOverlay::process_broadcast(PublicKeyHash, ton_api::tonNode_blockBroadcast &query) { + std::vector signatures; + for (auto &sig : query.signatures_) { + signatures.emplace_back(BlockSignature{sig->who_, std::move(sig->signature_)}); + } + + BlockIdExt block_id = create_block_id(query.id_); + BlockBroadcast B{block_id, + std::move(signatures), + static_cast(query.catchain_seqno_), + static_cast(query.validator_set_hash_), + std::move(query.data_), + std::move(query.proof_)}; + + auto P = td::PromiseCreator::lambda([](td::Result R) { + if (R.is_error()) { + if (R.error().code() == ErrorCode::notready) { + LOG(DEBUG) << "dropped broadcast: " << R.move_as_error(); + } else { + LOG(INFO) << "dropped broadcast: " << R.move_as_error(); + } + } + }); + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, std::move(B), + std::move(P)); +} + +void FullNodePrivateOverlay::process_broadcast(PublicKeyHash, ton_api::tonNode_newShardBlockBroadcast &query) { + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, + create_block_id(query.block_->block_), query.block_->cc_seqno_, + std::move(query.block_->data_)); +} + +void FullNodePrivateOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { + auto B = fetch_tl_object(std::move(broadcast), true); + if (B.is_error()) { + return; + } + + ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); }); +} + +void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) { + if (!inited_) { + return; + } + auto B = create_serialize_tl_object( + create_tl_object(create_tl_block_id(block_id), cc_seqno, std::move(data))); + if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) { + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), 0, std::move(B)); + } else { + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B)); + } +} + +void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) { + if (!inited_) { + return; + } + std::vector> sigs; + for (auto &sig : broadcast.signatures) { + sigs.emplace_back(create_tl_object(sig.node, sig.signature.clone())); + } + auto B = create_serialize_tl_object( + create_tl_block_id(broadcast.block_id), broadcast.catchain_seqno, broadcast.validator_set_hash, std::move(sigs), + broadcast.proof.clone(), broadcast.data.clone()); + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B)); +} + +void FullNodePrivateOverlay::start_up() { + std::sort(nodes_.begin(), nodes_.end()); + nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end()); + + std::vector nodes; + for (const adnl::AdnlNodeIdShort &id : nodes_) { + nodes.push_back(id.bits256_value()); + } + auto X = create_hash_tl_object(zero_state_file_hash_, std::move(nodes)); + td::BufferSlice b{32}; + b.as_slice().copy_from(as_slice(X)); + overlay_id_full_ = overlay::OverlayIdFull{std::move(b)}; + overlay_id_ = overlay_id_full_.compute_short_id(); + + try_init(); +} + +void FullNodePrivateOverlay::try_init() { + // Sometimes adnl id is added to validator engine later (or not at all) + td::actor::send_closure( + adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result R) { + if (R.is_ok() && R.ok()) { + td::actor::send_closure(SelfId, &FullNodePrivateOverlay::init); + } else { + delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateOverlay::try_init); }, + td::Timestamp::in(30.0)); + } + }); +} + +void FullNodePrivateOverlay::init() { + LOG(FULL_NODE_INFO) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size() + << " nodes"; + class Callback : public overlay::Overlays::Callback { + public: + void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { + } + void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data, + td::Promise promise) override { + } + void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { + td::actor::send_closure(node_, &FullNodePrivateOverlay::receive_broadcast, src, std::move(data)); + } + void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data, + td::Promise promise) override { + } + Callback(td::actor::ActorId node) : node_(node) { + } + + private: + td::actor::ActorId node_; + }; + + overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), + overlay::CertificateFlags::AllowFec | overlay::CertificateFlags::Trusted, + {}}; + td::actor::send_closure(overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), + nodes_, std::make_unique(actor_id(this)), rules); + + td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_); + td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_); + inited_ = true; +} + +void FullNodePrivateOverlay::tear_down() { + if (inited_) { + td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_); + } +} + +} // namespace fullnode + +} // namespace validator + +} // namespace ton diff --git a/validator/full-node-private-overlay.hpp b/validator/full-node-private-overlay.hpp new file mode 100644 index 00000000..6463fda2 --- /dev/null +++ b/validator/full-node-private-overlay.hpp @@ -0,0 +1,86 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once + +#include "full-node.h" + +namespace ton { + +namespace validator { + +namespace fullnode { + +class FullNodePrivateOverlay : public td::actor::Actor { + public: + void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query); + template + void process_broadcast(PublicKeyHash, T &) { + VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast"; + } + void receive_broadcast(PublicKeyHash src, td::BufferSlice query); + + void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data); + void send_broadcast(BlockBroadcast broadcast); + + void start_up() override; + void tear_down() override; + + FullNodePrivateOverlay(adnl::AdnlNodeIdShort local_id, std::vector nodes, + FileHash zero_state_file_hash, FullNodeConfig config, + td::actor::ActorId keyring, td::actor::ActorId adnl, + td::actor::ActorId rldp, td::actor::ActorId rldp2, + td::actor::ActorId overlays, + td::actor::ActorId validator_manager) + : local_id_(local_id) + , nodes_(std::move(nodes)) + , zero_state_file_hash_(zero_state_file_hash) + , config_(config) + , keyring_(keyring) + , adnl_(adnl) + , rldp_(rldp) + , rldp2_(rldp2) + , overlays_(overlays) + , validator_manager_(validator_manager) { + } + + private: + adnl::AdnlNodeIdShort local_id_; + std::vector nodes_; + FileHash zero_state_file_hash_; + FullNodeConfig config_; + + td::actor::ActorId keyring_; + td::actor::ActorId adnl_; + td::actor::ActorId rldp_; + td::actor::ActorId rldp2_; + td::actor::ActorId overlays_; + td::actor::ActorId validator_manager_; + + bool inited_ = false; + overlay::OverlayIdFull overlay_id_full_; + overlay::OverlayIdShort overlay_id_; + + void try_init(); + void init(); +}; + +} // namespace fullnode + +} // namespace validator + +} // namespace ton diff --git a/validator/full-node.cpp b/validator/full-node.cpp index ebba50a0..5a822b26 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -50,6 +50,7 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise pr for (auto &shard : shards_) { td::actor::send_closure(shard.second, &FullNodeShard::update_validators, all_validators_, sign_cert_by_); } + create_private_block_overlay(key); promise.set_value(td::Unit()); } @@ -74,6 +75,7 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise pr for (auto &shard : shards_) { td::actor::send_closure(shard.second, &FullNodeShard::update_validators, all_validators_, sign_cert_by_); } + private_block_overlays_.erase(key); promise.set_value(td::Unit()); } @@ -179,6 +181,10 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s VLOG(FULL_NODE_WARNING) << "dropping OUT shard block info message to unknown shard"; return; } + if (!private_block_overlays_.empty()) { + td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateOverlay::send_shard_block_info, + block_id, cc_seqno, data.clone()); + } td::actor::send_closure(shard, &FullNodeShard::send_shard_block_info, block_id, cc_seqno, std::move(data)); } @@ -188,6 +194,10 @@ void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) { VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard"; return; } + if (!private_block_overlays_.empty()) { + td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateOverlay::send_broadcast, + broadcast.clone()); + } td::actor::send_closure(shard, &FullNodeShard::send_broadcast, std::move(broadcast)); } @@ -289,6 +299,7 @@ void FullNodeImpl::got_key_block_proof(td::Ref proof) { PublicKeyHash l = PublicKeyHash::zero(); std::vector keys; + std::map current_validators; for (td::int32 i = -1; i <= 1; i++) { auto r = config->get_total_validator_set(i < 0 ? i : 1 - i); if (r.not_null()) { @@ -299,10 +310,18 @@ void FullNodeImpl::got_key_block_proof(td::Ref proof) { if (local_keys_.count(key)) { l = key; } + if (i == 1) { + current_validators[key] = adnl::AdnlNodeIdShort{el.addr.is_zero() ? key.bits256_value() : el.addr}; + } } } } + if (current_validators != current_validators_) { + current_validators_ = std::move(current_validators); + update_private_block_overlays(); + } + if (keys == all_validators_) { return; } @@ -321,6 +340,7 @@ void FullNodeImpl::got_zero_block_state(td::Ref state) { PublicKeyHash l = PublicKeyHash::zero(); std::vector keys; + std::map current_validators; for (td::int32 i = -1; i <= 1; i++) { auto r = m->get_total_validator_set(i < 0 ? i : 1 - i); if (r.not_null()) { @@ -331,10 +351,18 @@ void FullNodeImpl::got_zero_block_state(td::Ref state) { if (local_keys_.count(key)) { l = key; } + if (i == 1) { + current_validators[key] = adnl::AdnlNodeIdShort{el.addr.is_zero() ? key.bits256_value() : el.addr}; + } } } } + if (current_validators != current_validators_) { + current_validators_ = std::move(current_validators); + update_private_block_overlays(); + } + if (keys == all_validators_) { return; } @@ -456,6 +484,29 @@ void FullNodeImpl::start_up() { std::make_unique(actor_id(this)), std::move(P)); } +void FullNodeImpl::update_private_block_overlays() { + private_block_overlays_.clear(); + if (local_keys_.empty()) { + return; + } + for (const auto &key : local_keys_) { + create_private_block_overlay(key); + } +} + +void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) { + CHECK(local_keys_.count(key)); + if (current_validators_.count(key)) { + std::vector nodes; + for (const auto &p : current_validators_) { + nodes.push_back(p.second); + } + private_block_overlays_[key] = td::actor::create_actor( + "BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_, keyring_, + adnl_, rldp_, rldp2_, overlays_, validator_manager_); + } +} + FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash, FullNodeConfig config, td::actor::ActorId keyring, td::actor::ActorId adnl, td::actor::ActorId rldp, diff --git a/validator/full-node.hpp b/validator/full-node.hpp index fc2dd75c..838700b5 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -23,6 +23,7 @@ //#include "ton-node-slave.h" #include "interfaces/proof.h" #include "interfaces/shard.h" +#include "full-node-private-overlay.hpp" #include #include @@ -111,9 +112,15 @@ class FullNodeImpl : public FullNode { PublicKeyHash sign_cert_by_; std::vector all_validators_; + std::map current_validators_; std::set local_keys_; FullNodeConfig config_; + + std::map> private_block_overlays_; + + void update_private_block_overlays(); + void create_private_block_overlay(PublicKeyHash key); }; } // namespace fullnode diff --git a/validator/manager.cpp b/validator/manager.cpp index af01ff8a..97c5192f 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1185,7 +1185,7 @@ void ValidatorManagerImpl::write_handle(BlockHandle handle, td::Promise promise) { bool received = handle->received(); bool inited_state = handle->received_state(); - bool inited_proof = handle->id().is_masterchain() ? handle->inited_proof() : handle->inited_proof(); + bool inited_proof = handle->id().is_masterchain() ? handle->inited_proof() : handle->inited_proof_link(); if (handle->need_flush()) { handle->flush(actor_id(this), handle, std::move(promise)); @@ -1198,11 +1198,24 @@ void ValidatorManagerImpl::written_handle(BlockHandle handle, td::Promisesecond.actor_, &WaitBlockData::force_read_from_db); } } - if (inited_state && inited_proof) { + if (inited_state) { auto it = wait_state_.find(handle->id()); if (it != wait_state_.end()) { td::actor::send_closure(it->second.actor_, &WaitBlockState::force_read_from_db); } + } else { + if (handle->inited_proof_link()) { + auto it = wait_state_.find(handle->id()); + if (it != wait_state_.end()) { + td::actor::send_closure(it->second.actor_, &WaitBlockState::after_get_proof_link); + } + } + if (handle->id().is_masterchain() && handle->inited_proof()) { + auto it = wait_state_.find(handle->id()); + if (it != wait_state_.end()) { + td::actor::send_closure(it->second.actor_, &WaitBlockState::after_get_proof); + } + } } promise.set_value(td::Unit()); diff --git a/validator/validator-group.cpp b/validator/validator-group.cpp index 51217bf9..73ca22a6 100644 --- a/validator/validator-group.cpp +++ b/validator/validator-group.cpp @@ -155,9 +155,9 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s td::actor::send_closure(manager_, &ValidatorManager::log_validator_session_stats, next_block_id, std::move(stats)); auto block = block_data.size() > 0 ? create_block(next_block_id, std::move(block_data)).move_as_ok() : td::Ref{}; + bool send_broadcast = src == local_id_; - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), block_id = next_block_id, block, prev = prev_block_ids_, - sig_set, approve_sig_set, + auto P = td::PromiseCreator::lambda([=, SelfId = actor_id(this), block_id = next_block_id, prev = prev_block_ids_, promise = std::move(promise)](td::Result R) mutable { if (R.is_error()) { if (R.error().code() == ErrorCode::cancelled) { @@ -166,14 +166,15 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s } LOG_CHECK(R.error().code() == ErrorCode::timeout || R.error().code() == ErrorCode::notready) << R.move_as_error(); td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block), - std::move(prev), std::move(sig_set), std::move(approve_sig_set), std::move(promise)); + std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast, + std::move(promise)); } else { promise.set_value(R.move_as_ok()); } }); run_accept_block_query(next_block_id, std::move(block), prev_block_ids_, validator_set_, std::move(sig_set), - std::move(approve_sig_set), src == local_id_, manager_, std::move(P)); + std::move(approve_sig_set), send_broadcast, manager_, std::move(P)); prev_block_ids_ = std::vector{next_block_id}; cached_collated_block_ = nullptr; approved_candidates_cache_.clear(); @@ -181,21 +182,22 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s void ValidatorGroup::retry_accept_block_query(BlockIdExt block_id, td::Ref block, std::vector prev, td::Ref sig_set, - td::Ref approve_sig_set, + td::Ref approve_sig_set, bool send_broadcast, td::Promise promise) { - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), block_id, block, prev, sig_set, approve_sig_set, - promise = std::move(promise)](td::Result R) mutable { - if (R.is_error()) { - LOG_CHECK(R.error().code() == ErrorCode::timeout) << R.move_as_error(); - td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block), - std::move(prev), std::move(sig_set), std::move(approve_sig_set), std::move(promise)); - } else { - promise.set_value(R.move_as_ok()); - } - }); + auto P = td::PromiseCreator::lambda( + [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + LOG_CHECK(R.error().code() == ErrorCode::timeout) << R.move_as_error(); + td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block), + std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast, + std::move(promise)); + } else { + promise.set_value(R.move_as_ok()); + } + }); run_accept_block_query(block_id, std::move(block), prev, validator_set_, std::move(sig_set), - std::move(approve_sig_set), false, manager_, std::move(P)); + std::move(approve_sig_set), send_broadcast, manager_, std::move(P)); } void ValidatorGroup::skip_round(td::uint32 round_id) { @@ -347,7 +349,7 @@ void ValidatorGroup::start(std::vector prev, BlockIdExt min_masterch auto block = p.block.size() > 0 ? create_block(next_block_id, std::move(p.block)).move_as_ok() : td::Ref{}; retry_accept_block_query(next_block_id, std::move(block), prev_block_ids_, std::move(p.sigs), - std::move(p.approve_sigs), std::move(p.promise)); + std::move(p.approve_sigs), false, std::move(p.promise)); prev_block_ids_ = std::vector{next_block_id}; } postponed_accept_.clear(); diff --git a/validator/validator-group.hpp b/validator/validator-group.hpp index a158bc43..26818d43 100644 --- a/validator/validator-group.hpp +++ b/validator/validator-group.hpp @@ -43,7 +43,7 @@ class ValidatorGroup : public td::actor::Actor { void skip_round(td::uint32 round); void retry_accept_block_query(BlockIdExt block_id, td::Ref block, std::vector prev, td::Ref sigs, td::Ref approve_sigs, - td::Promise promise); + bool send_broadcast, td::Promise promise); void get_approved_candidate(PublicKey source, RootHash root_hash, FileHash file_hash, FileHash collated_data_file_hash, td::Promise promise); BlockIdExt create_next_block_id(RootHash root_hash, FileHash file_hash) const; From 957ecd723157c1ac806a6c5eb930c5857a6cfc6e Mon Sep 17 00:00:00 2001 From: Maksim Kurbatov <94808996+yungwine@users.noreply.github.com> Date: Fri, 2 Feb 2024 20:59:07 +0700 Subject: [PATCH 02/10] fix lite-client logging (#883) * fix lite-client logging * fix logging in continue_check_validator_load2 --- lite-client/lite-client.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lite-client/lite-client.cpp b/lite-client/lite-client.cpp index dd6df40f..c9be170c 100644 --- a/lite-client/lite-client.cpp +++ b/lite-client/lite-client.cpp @@ -3529,7 +3529,7 @@ void TestNode::continue_check_validator_load2(std::unique_ptr info2, int mode, std::string file_pfx) { LOG(INFO) << "continue_check_validator_load2 for blocks " << info1->blk_id.to_str() << " and " - << info1->blk_id.to_str() << " : requesting block creators data"; + << info2->blk_id.to_str() << " : requesting block creators data"; td::Status st = info1->unpack_vset(); if (st.is_error()) { LOG(ERROR) << "cannot unpack validator set from block " << info1->blk_id.to_str() << " :" << st.move_as_error(); @@ -3627,7 +3627,7 @@ void TestNode::continue_check_validator_load3(std::unique_ptrcreated[i].first - info1->created[i].first; auto y1 = info2->created[i].second - info1->created[i].second; if (x1 < 0 || y1 < 0 || (x1 | y1) >= (1u << 31)) { - LOG(ERROR) << "impossible situation: validator #i created a negative amount of blocks: " << x1 + LOG(ERROR) << "impossible situation: validator #" << i << " created a negative amount of blocks: " << x1 << " masterchain blocks, " << y1 << " shardchain blocks"; return; } From c38b2928ec0aad978fb46f39c0a450aca184f2f6 Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Fri, 2 Feb 2024 17:02:38 +0300 Subject: [PATCH 03/10] Fix typo in archive-slice.cpp (#850) (#879) --- validator/db/archive-slice.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index 5f1bc708..d3d63360 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -532,7 +532,7 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise promise) { ig.add_promise(std::move(promise)); for (auto &p : packages_) { - td::actor::send_closure(p.writer, &PackageWriter::set_async_mode, mode, std::move(promise)); + td::actor::send_closure(p.writer, &PackageWriter::set_async_mode, mode, ig.get_promise()); } } From e723213d5c6dc0fd6cd5fb5ced0f086d79f2a1c5 Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Tue, 6 Feb 2024 16:52:12 +0300 Subject: [PATCH 04/10] Log number of LS queries by type (#891) Co-authored-by: SpyCheese --- tl-utils/lite-utils.cpp | 34 ++++++++++++++++++++++++ tl-utils/lite-utils.hpp | 2 ++ validator/impl/liteserver.cpp | 2 ++ validator/interfaces/validator-manager.h | 3 +++ validator/manager.cpp | 24 +++++++++++++++++ validator/manager.hpp | 8 ++++++ 6 files changed, 73 insertions(+) diff --git a/tl-utils/lite-utils.cpp b/tl-utils/lite-utils.cpp index cd2ace0a..427852cb 100644 --- a/tl-utils/lite-utils.cpp +++ b/tl-utils/lite-utils.cpp @@ -22,6 +22,7 @@ #include "td/utils/tl_storers.h" #include "td/utils/crypto.h" #include "crypto/common/bitstring.h" +#include namespace ton { @@ -129,4 +130,37 @@ td::Bits256 get_tl_object_sha_bits256(const lite_api::Object *T) { return id256; } +std::string lite_query_name_by_id(int id) { + static std::map names = { + {lite_api::liteServer_getMasterchainInfo::ID, "getMasterchainInfo"}, + {lite_api::liteServer_getMasterchainInfoExt::ID, "getMasterchainInfoExt"}, + {lite_api::liteServer_getTime::ID, "getTime"}, + {lite_api::liteServer_getVersion::ID, "getVersion"}, + {lite_api::liteServer_getBlock::ID, "getBlock"}, + {lite_api::liteServer_getState::ID, "getState"}, + {lite_api::liteServer_getBlockHeader::ID, "getBlockHeader"}, + {lite_api::liteServer_sendMessage::ID, "sendMessage"}, + {lite_api::liteServer_getAccountState::ID, "getAccountState"}, + {lite_api::liteServer_getAccountStatePrunned::ID, "getAccountStatePrunned"}, + {lite_api::liteServer_runSmcMethod::ID, "runSmcMethod"}, + {lite_api::liteServer_getShardInfo::ID, "getShardInfo"}, + {lite_api::liteServer_getAllShardsInfo::ID, "getAllShardsInfo"}, + {lite_api::liteServer_getOneTransaction::ID, "getOneTransaction"}, + {lite_api::liteServer_getTransactions::ID, "getTransactions"}, + {lite_api::liteServer_lookupBlock::ID, "lookupBlock"}, + {lite_api::liteServer_listBlockTransactions::ID, "listBlockTransactions"}, + {lite_api::liteServer_listBlockTransactionsExt::ID, "listBlockTransactionsExt"}, + {lite_api::liteServer_getBlockProof::ID, "getBlockProof"}, + {lite_api::liteServer_getConfigAll::ID, "getConfigAll"}, + {lite_api::liteServer_getConfigParams::ID, "getConfigParams"}, + {lite_api::liteServer_getValidatorStats::ID, "getValidatorStats"}, + {lite_api::liteServer_getLibraries::ID, "getLibraries"}, + {lite_api::liteServer_getShardBlockProof::ID, "getShardBlockProof"}}; + auto it = names.find(id); + if (it == names.end()) { + return "unknown"; + } + return it->second; +} + } // namespace ton diff --git a/tl-utils/lite-utils.hpp b/tl-utils/lite-utils.hpp index 520ff710..fb3c1dc8 100644 --- a/tl-utils/lite-utils.hpp +++ b/tl-utils/lite-utils.hpp @@ -46,4 +46,6 @@ template ::valu td::Bits256 get_tl_object_sha_bits256(const Tp &T) { return get_tl_object_sha_bits256(static_cast(&T)); } + +std::string lite_query_name_by_id(int id); } // namespace ton diff --git a/validator/impl/liteserver.cpp b/validator/impl/liteserver.cpp index 34b9f1f1..f2be6ef9 100644 --- a/validator/impl/liteserver.cpp +++ b/validator/impl/liteserver.cpp @@ -131,9 +131,11 @@ void LiteQuery::start_up() { auto F = fetch_tl_object(std::move(query_), true); if (F.is_error()) { + td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, 0); // unknown abort_query(F.move_as_error()); return; } + td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, F.ok()->get_id()); lite_api::downcast_call( *F.move_as_ok().get(), diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index 6b375bac..f0e1bb18 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -178,6 +178,9 @@ class ValidatorManager : public ValidatorManagerInterface { virtual void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno, td::Promise promise) = 0; + virtual void add_lite_query_stats(int lite_query_id) { + } + static bool is_persistent_state(UnixTime ts, UnixTime prev_ts) { return ts / (1 << 17) != prev_ts / (1 << 17); } diff --git a/validator/manager.cpp b/validator/manager.cpp index 97c5192f..baaa78eb 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -401,6 +401,7 @@ void ValidatorManagerImpl::add_external_message(td::Ref msg) { } } void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise> promise) { + ++ls_stats_check_ext_messages_; auto state = do_get_last_liteserver_state(); if (state.is_null()) { promise.set_error(td::Status::Error(ErrorCode::notready, "not ready")); @@ -2466,6 +2467,29 @@ void ValidatorManagerImpl::alarm() { } } alarm_timestamp().relax(check_shard_clients_); + + if (log_ls_stats_at_.is_in_past()) { + if (!ls_stats_.empty() || ls_stats_check_ext_messages_ != 0) { + td::StringBuilder sb; + sb << "Liteserver stats (1 minute):"; + td::uint32 total = 0; + for (const auto &p : ls_stats_) { + sb << " " << lite_query_name_by_id(p.first) << ":" << p.second; + total += p.second; + } + if (total > 0) { + sb << " TOTAL:" << total; + } + if (ls_stats_check_ext_messages_ > 0) { + sb << " checkExtMessage:" << ls_stats_check_ext_messages_; + } + LOG(WARNING) << sb.as_cslice(); + } + ls_stats_.clear(); + ls_stats_check_ext_messages_ = 0; + log_ls_stats_at_ = td::Timestamp::in(60.0); + } + alarm_timestamp().relax(log_ls_stats_at_); } void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) { diff --git a/validator/manager.hpp b/validator/manager.hpp index bdf0155e..5a1fbc60 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -575,6 +575,10 @@ class ValidatorManagerImpl : public ValidatorManager { td::Result r_handle, td::Promise promise); + void add_lite_query_stats(int lite_query_id) override { + ++ls_stats_[lite_query_id]; + } + private: td::Timestamp resend_shard_blocks_at_; td::Timestamp check_waiters_at_; @@ -640,6 +644,10 @@ class ValidatorManagerImpl : public ValidatorManager { private: std::map> shard_client_waiters_; td::actor::ActorOwn queue_size_counter_; + + td::Timestamp log_ls_stats_at_; + std::map ls_stats_; // lite_api ID -> count, 0 for unknown + td::uint32 ls_stats_check_ext_messages_{0}; }; } // namespace validator From 12c1b1a2e6c14206b6cfb9c9cfb5fbfa9bedf5bb Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Wed, 7 Feb 2024 14:56:37 +0300 Subject: [PATCH 05/10] Limit file descriptors num by adding archive slice lru (#892) * --max-archive-fd option limits open files in archive manager * Don't close the latest archives + bugfix * Delete temp packages early --------- Co-authored-by: SpyCheese --- validator-engine/validator-engine.cpp | 8 + validator-engine/validator-engine.hpp | 4 + validator/db/archive-manager.cpp | 39 +++- validator/db/archive-manager.hpp | 13 +- validator/db/archive-slice.cpp | 244 +++++++++++++++++++------- validator/db/archive-slice.hpp | 70 ++++++-- validator/db/rootdb.cpp | 6 +- validator/db/rootdb.hpp | 3 +- validator/interfaces/db.h | 2 +- validator/manager.cpp | 6 +- validator/validator-options.hpp | 7 + validator/validator.h | 2 + 12 files changed, 316 insertions(+), 88 deletions(-) diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index e488504f..c7e4787c 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -1364,6 +1364,7 @@ td::Status ValidatorEngine::load_global_config() { validator_options_.write().set_session_logs_file(session_logs_file_); } validator_options_.write().set_celldb_compress_depth(celldb_compress_depth_); + validator_options_.write().set_max_open_archive_files(max_open_archive_files_); std::vector h; for (auto &x : conf.validator_->hardforks_) { @@ -3793,6 +3794,13 @@ int main(int argc, char *argv[]) { }); return td::Status::OK(); }); + p.add_checked_option( + '\0', "max-archive-fd", "limit for a number of open file descriptirs in archive manager. 0 is unlimited (default)", + [&](td::Slice s) -> td::Status { + TRY_RESULT(v, td::to_integer_safe(s)); + acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_max_open_archive_files, v); }); + return td::Status::OK(); + }); auto S = p.run(argc, argv); if (S.is_error()) { LOG(ERROR) << "failed to parse options: " << S.move_as_error(); diff --git a/validator-engine/validator-engine.hpp b/validator-engine/validator-engine.hpp index e59bb418..76b6134b 100644 --- a/validator-engine/validator-engine.hpp +++ b/validator-engine/validator-engine.hpp @@ -204,6 +204,7 @@ class ValidatorEngine : public td::actor::Actor { double archive_ttl_ = 0; double key_proof_ttl_ = 0; td::uint32 celldb_compress_depth_ = 0; + size_t max_open_archive_files_ = 0; bool read_config_ = false; bool started_keyring_ = false; bool started_ = false; @@ -264,6 +265,9 @@ class ValidatorEngine : public td::actor::Actor { void set_celldb_compress_depth(td::uint32 value) { celldb_compress_depth_ = value; } + void set_max_open_archive_files(size_t value) { + max_open_archive_files_ = value; + } void start_up() override; ValidatorEngine() { } diff --git a/validator/db/archive-manager.cpp b/validator/db/archive-manager.cpp index 91b7b880..25d686bf 100644 --- a/validator/db/archive-manager.cpp +++ b/validator/db/archive-manager.cpp @@ -55,7 +55,9 @@ std::string PackageId::name() const { } } -ArchiveManager::ArchiveManager(td::actor::ActorId root, std::string db_root) : db_root_(db_root) { +ArchiveManager::ArchiveManager(td::actor::ActorId root, std::string db_root, + td::Ref opts) + : db_root_(db_root), opts_(opts) { } void ArchiveManager::add_handle(BlockHandle handle, td::Promise promise) { @@ -598,9 +600,11 @@ void ArchiveManager::load_package(PackageId id) { } } - desc.file = td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_); + desc.file = + td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get()); m.emplace(id, std::move(desc)); + update_permanent_slices(); } const ArchiveManager::FileDescription *ArchiveManager::get_file_desc(ShardIdFull shard, PackageId id, BlockSeqno seqno, @@ -631,7 +635,8 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull FileDescription new_desc{id, false}; td::mkdir(db_root_ + id.path()).ensure(); std::string prefix = PSTRING() << db_root_ << id.path() << id.name(); - new_desc.file = td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_); + new_desc.file = + td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get()); const FileDescription &desc = f.emplace(id, std::move(new_desc)); if (!id.temp) { update_desc(f, desc, shard, seqno, ts, lt); @@ -673,6 +678,7 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull .ensure(); } index_->commit_transaction().ensure(); + update_permanent_slices(); return &desc; } @@ -820,6 +826,9 @@ void ArchiveManager::start_up() { td::mkdir(db_root_ + "/archive/states/").ensure(); td::mkdir(db_root_ + "/files/").ensure(); td::mkdir(db_root_ + "/files/packages/").ensure(); + if (opts_->get_max_open_archive_files() > 0) { + archive_lru_ = td::actor::create_actor("archive_lru", opts_->get_max_open_archive_files()); + } index_ = std::make_shared(td::RocksDb::open(db_root_ + "/files/globalindex").move_as_ok()); std::string value; auto v = index_->get(create_serialize_tl_object().as_slice(), value); @@ -878,8 +887,8 @@ void ArchiveManager::start_up() { persistent_state_gc(FileHash::zero()); } -void ArchiveManager::run_gc(UnixTime ts, UnixTime archive_ttl) { - auto p = get_temp_package_id_by_unixtime(ts); +void ArchiveManager::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) { + auto p = get_temp_package_id_by_unixtime(std::max(gc_ts, mc_ts - TEMP_PACKAGES_TTL)); std::vector vec; for (auto &x : temp_files_) { if (x.first < p) { @@ -907,7 +916,7 @@ void ArchiveManager::run_gc(UnixTime ts, UnixTime archive_ttl) { if (it == desc.first_blocks.end()) { continue; } - if (it->second.ts < ts - archive_ttl) { + if (it->second.ts < gc_ts - archive_ttl) { vec.push_back(f.first); } } @@ -1200,6 +1209,7 @@ void ArchiveManager::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle han } } } + update_permanent_slices(); } void ArchiveManager::FileMap::shard_index_add(const FileDescription &desc) { @@ -1298,6 +1308,23 @@ const ArchiveManager::FileDescription *ArchiveManager::FileMap::get_next_file_de return it2->second->deleted ? nullptr : it2->second; } +void ArchiveManager::update_permanent_slices() { + if (archive_lru_.empty()) { + return; + } + std::vector ids; + if (!files_.empty()) { + ids.push_back(files_.rbegin()->first); + } + if (!key_files_.empty()) { + ids.push_back(key_files_.rbegin()->first); + } + if (!temp_files_.empty()) { + ids.push_back(temp_files_.rbegin()->first); + } + td::actor::send_closure(archive_lru_, &ArchiveLru::set_permanent_slices, std::move(ids)); +} + } // namespace validator } // namespace ton diff --git a/validator/db/archive-manager.hpp b/validator/db/archive-manager.hpp index e5008764..1c5deaf8 100644 --- a/validator/db/archive-manager.hpp +++ b/validator/db/archive-manager.hpp @@ -28,7 +28,7 @@ class RootDb; class ArchiveManager : public td::actor::Actor { public: - ArchiveManager(td::actor::ActorId root, std::string db_root); + ArchiveManager(td::actor::ActorId root, std::string db_root, td::Ref opts); void add_handle(BlockHandle handle, td::Promise promise); void update_handle(BlockHandle handle, td::Promise promise); @@ -58,7 +58,7 @@ class ArchiveManager : public td::actor::Actor { void truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise promise); //void truncate_continue(BlockSeqno masterchain_seqno, td::Promise promise); - void run_gc(UnixTime ts, UnixTime archive_ttl); + void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl); /* from LTDB */ void get_block_by_unix_time(AccountIdPrefixFull account_id, UnixTime ts, td::Promise promise); @@ -123,6 +123,9 @@ class ArchiveManager : public td::actor::Actor { size_t size() const { return files_.size(); } + bool empty() const { + return files_.empty(); + } std::map::const_iterator lower_bound(const PackageId &x) const { return files_.lower_bound(x); } @@ -164,6 +167,7 @@ class ArchiveManager : public td::actor::Actor { void shard_index_del(const FileDescription &desc); }; FileMap files_, key_files_, temp_files_; + td::actor::ActorOwn archive_lru_; BlockSeqno finalized_up_to_{0}; bool async_mode_ = false; bool huge_transaction_started_ = false; @@ -206,6 +210,7 @@ class ArchiveManager : public td::actor::Actor { void got_gc_masterchain_handle(ConstBlockHandle handle, FileHash hash); std::string db_root_; + td::Ref opts_; std::shared_ptr index_; @@ -215,6 +220,10 @@ class ArchiveManager : public td::actor::Actor { PackageId get_temp_package_id() const; PackageId get_key_package_id(BlockSeqno seqno) const; PackageId get_temp_package_id_by_unixtime(UnixTime ts) const; + + void update_permanent_slices(); + + static const td::uint32 TEMP_PACKAGES_TTL = 86400 * 7; }; } // namespace validator diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index d3d63360..4a7c419a 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -21,7 +21,6 @@ #include "td/actor/MultiPromise.h" #include "validator/fabric.h" #include "td/db/RocksDb.h" -#include "ton/ton-io.hpp" #include "td/utils/port/path.h" #include "common/delay.h" #include "files-async.hpp" @@ -32,9 +31,16 @@ namespace validator { void PackageWriter::append(std::string filename, td::BufferSlice data, td::Promise> promise) { - auto offset = package_->append(std::move(filename), std::move(data), !async_mode_); - auto size = package_->size(); - + td::uint64 offset, size; + { + auto p = package_.lock(); + if (!p) { + promise.set_error(td::Status::Error("Package is closed")); + return; + } + offset = p->append(std::move(filename), std::move(data), !async_mode_); + size = p->size(); + } promise.set_value(std::pair{offset, size}); } @@ -44,8 +50,10 @@ class PackageReader : public td::actor::Actor { td::Promise> promise) : package_(std::move(package)), offset_(offset), promise_(std::move(promise)) { } - void start_up() { - promise_.set_result(package_->read(offset_)); + void start_up() override { + auto result = package_->read(offset_); + package_ = {}; + promise_.set_result(std::move(result)); stop(); } @@ -64,6 +72,7 @@ void ArchiveSlice::add_handle(BlockHandle handle, td::Promise promise) update_handle(std::move(handle), std::move(promise)); return; } + before_query(); CHECK(!key_blocks_only_); CHECK(!temp_); CHECK(handle->inited_unix_time()); @@ -146,6 +155,7 @@ void ArchiveSlice::update_handle(BlockHandle handle, td::Promise promi promise.set_value(td::Unit()); return; } + before_query(); CHECK(!key_blocks_only_); begin_transaction(); @@ -168,6 +178,7 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); return; } + before_query(); TRY_RESULT_PROMISE( promise, p, choose_package( @@ -179,6 +190,7 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer promise.set_value(td::Unit()); return; } + promise = begin_async_query(std::move(promise)); auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), idx = p->idx, ref_id, promise = std::move(promise)]( td::Result> R) mutable { if (R.is_error()) { @@ -217,6 +229,7 @@ void ArchiveSlice::get_handle(BlockIdExt block_id, td::Promise prom promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); return; } + before_query(); CHECK(!key_blocks_only_); std::string value; auto R = kv_->get(get_db_key_block_info(block_id), value); @@ -239,6 +252,7 @@ void ArchiveSlice::get_temp_handle(BlockIdExt block_id, td::Promiseget(get_db_key_block_info(block_id), value); @@ -261,6 +275,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); return; } + before_query(); std::string value; auto R = kv_->get(ref_id.hash().to_hex(), value); R.ensure(); @@ -273,6 +288,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P promise, p, choose_package( handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, false)); + promise = begin_async_query(std::move(promise)); auto P = td::PromiseCreator::lambda( [promise = std::move(promise)](td::Result> R) mutable { if (R.is_error()) { @@ -292,6 +308,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id, promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); return; } + before_query(); bool f = false; BlockIdExt block_id; td::uint32 ls = 0; @@ -312,7 +329,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id, auto G = fetch_tl_object(value, true); G.ensure(); auto g = G.move_as_ok(); - if (compare_desc(*g.get()) > 0) { + if (compare_desc(*g) > 0) { continue; } td::uint32 l = g->first_idx_ - 1; @@ -328,7 +345,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id, auto E = fetch_tl_object(td::BufferSlice{value}, true); E.ensure(); auto e = E.move_as_ok(); - int cmp_val = compare(*e.get()); + int cmp_val = compare(*e); if (cmp_val < 0) { rseq = create_block_id(e->id_); @@ -342,9 +359,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id, } } if (rseq.is_valid()) { - if (!block_id.is_valid()) { - block_id = rseq; - } else if (block_id.id.seqno > rseq.id.seqno) { + if (!block_id.is_valid() || block_id.id.seqno > rseq.id.seqno) { block_id = rseq; } } @@ -430,12 +445,15 @@ void ArchiveSlice::get_slice(td::uint64 archive_id, td::uint64 offset, td::uint3 promise.set_error(td::Status::Error(ErrorCode::error, "bad archive id")); return; } + before_query(); auto value = static_cast(archive_id >> 32); TRY_RESULT_PROMISE(promise, p, choose_package(value, false)); + promise = begin_async_query(std::move(promise)); td::actor::create_actor("readfile", p->path, offset, limit, 0, std::move(promise)).release(); } void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) { + before_query(); if (!sliced_mode_) { promise.set_result(archive_id_); } else { @@ -444,62 +462,107 @@ void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise(td::RocksDb::open(db_path).move_as_ok()); +void ArchiveSlice::before_query() { + if (status_ == st_closed) { + LOG(DEBUG) << "Opening archive slice " << db_path_; + kv_ = std::make_unique(td::RocksDb::open(db_path_).move_as_ok()); + std::string value; + auto R2 = kv_->get("status", value); + R2.ensure(); + sliced_mode_ = false; + slice_size_ = 100; - std::string value; - auto R2 = kv_->get("status", value); - R2.ensure(); - - if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { - if (value == "sliced") { - sliced_mode_ = true; - R2 = kv_->get("slices", value); - R2.ensure(); - auto tot = td::to_integer(value); - R2 = kv_->get("slice_size", value); - R2.ensure(); - slice_size_ = td::to_integer(value); - CHECK(slice_size_ > 0); - for (td::uint32 i = 0; i < tot; i++) { - R2 = kv_->get(PSTRING() << "status." << i, value); + if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { + if (value == "sliced") { + sliced_mode_ = true; + R2 = kv_->get("slices", value); R2.ensure(); - auto len = td::to_integer(value); - R2 = kv_->get(PSTRING() << "version." << i, value); + auto tot = td::to_integer(value); + R2 = kv_->get("slice_size", value); R2.ensure(); - td::uint32 ver = 0; - if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { - ver = td::to_integer(value); + slice_size_ = td::to_integer(value); + CHECK(slice_size_ > 0); + for (td::uint32 i = 0; i < tot; i++) { + R2 = kv_->get(PSTRING() << "status." << i, value); + R2.ensure(); + auto len = td::to_integer(value); + R2 = kv_->get(PSTRING() << "version." << i, value); + R2.ensure(); + td::uint32 ver = 0; + if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { + ver = td::to_integer(value); + } + auto v = archive_id_ + slice_size_ * i; + add_package(v, len, ver); } - auto v = archive_id_ + slice_size_ * i; - add_package(v, len, ver); + } else { + auto len = td::to_integer(value); + add_package(archive_id_, len, 0); } } else { - auto len = td::to_integer(value); - add_package(archive_id_, len, 0); + if (!temp_ && !key_blocks_only_) { + sliced_mode_ = true; + kv_->begin_transaction().ensure(); + kv_->set("status", "sliced").ensure(); + kv_->set("slices", "1").ensure(); + kv_->set("slice_size", td::to_string(slice_size_)).ensure(); + kv_->set("status.0", "0").ensure(); + kv_->set("version.0", td::to_string(default_package_version())).ensure(); + kv_->commit_transaction().ensure(); + add_package(archive_id_, 0, default_package_version()); + } else { + kv_->begin_transaction().ensure(); + kv_->set("status", "0").ensure(); + kv_->commit_transaction().ensure(); + add_package(archive_id_, 0, 0); + } } - } else { - if (!temp_ && !key_blocks_only_) { - sliced_mode_ = true; - kv_->begin_transaction().ensure(); - kv_->set("status", "sliced").ensure(); - kv_->set("slices", "1").ensure(); - kv_->set("slice_size", td::to_string(slice_size_)).ensure(); - kv_->set("status.0", "0").ensure(); - kv_->set("version.0", td::to_string(default_package_version())).ensure(); - kv_->commit_transaction().ensure(); - add_package(archive_id_, 0, default_package_version()); + } + status_ = st_open; + if (!archive_lru_.empty()) { + td::actor::send_closure(archive_lru_, &ArchiveLru::on_query, actor_id(this), p_id_, + packages_.size() + ESTIMATED_DB_OPEN_FILES); + } +} + +void ArchiveSlice::close_files() { + if (status_ == st_open) { + if (active_queries_ == 0) { + do_close(); } else { - kv_->begin_transaction().ensure(); - kv_->set("status", "0").ensure(); - kv_->commit_transaction().ensure(); - add_package(archive_id_, 0, 0); + status_ = st_want_close; } } } +void ArchiveSlice::do_close() { + if (destroyed_) { + return; + } + CHECK(status_ != st_closed && active_queries_ == 0); + LOG(DEBUG) << "Closing archive slice " << db_path_; + status_ = st_closed; + kv_ = {}; + packages_.clear(); +} + +template +td::Promise ArchiveSlice::begin_async_query(td::Promise promise) { + ++active_queries_; + return [SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { + td::actor::send_closure(SelfId, &ArchiveSlice::end_async_query); + promise.set_result(std::move(R)); + }; +} + +void ArchiveSlice::end_async_query() { + CHECK(active_queries_ > 0); + --active_queries_; + if (active_queries_ == 0 && status_ == st_want_close) { + do_close(); + } +} + void ArchiveSlice::begin_transaction() { if (!async_mode_ || !huge_transaction_started_) { kv_->begin_transaction().ensure(); @@ -521,7 +584,7 @@ void ArchiveSlice::commit_transaction() { void ArchiveSlice::set_async_mode(bool mode, td::Promise promise) { async_mode_ = mode; - if (!async_mode_ && huge_transaction_started_) { + if (!async_mode_ && huge_transaction_started_ && kv_) { kv_->commit_transaction().ensure(); huge_transaction_size_ = 0; huge_transaction_started_ = false; @@ -536,12 +599,16 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise promise) { } } -ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root) +ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root, + td::actor::ActorId archive_lru) : archive_id_(archive_id) , key_blocks_only_(key_blocks_only) , temp_(temp) , finalized_(finalized) - , db_root_(std::move(db_root)) { + , p_id_(archive_id_, key_blocks_only_, temp_) + , db_root_(std::move(db_root)) + , archive_lru_(std::move(archive_lru)) { + db_path_ = PSTRING() << db_root_ << p_id_.path() << p_id_.name() << ".index"; } td::Result ArchiveSlice::choose_package(BlockSeqno masterchain_seqno, bool force) { @@ -587,7 +654,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver if (version >= 1) { pack->truncate(size).ensure(); } - auto writer = td::actor::create_actor("writer", pack); + auto writer = td::actor::create_actor("writer", pack, async_mode_); packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, idx, version); } @@ -611,6 +678,7 @@ void destroy_db(std::string name, td::uint32 attempt, td::Promise prom } // namespace void ArchiveSlice::destroy(td::Promise promise) { + before_query(); td::MultiPromise mp; auto ig = mp.init_guard(); ig.add_promise(std::move(promise)); @@ -763,6 +831,7 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl destroy(std::move(promise)); return; } + before_query(); LOG(INFO) << "TRUNCATE: slice " << archive_id_ << " maxseqno= " << max_masterchain_seqno() << " truncate_upto=" << masterchain_seqno; if (max_masterchain_seqno() <= masterchain_seqno) { @@ -819,7 +888,7 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl pack->writer.reset(); td::unlink(pack->path).ensure(); td::rename(pack->path + ".new", pack->path).ensure(); - pack->writer = td::actor::create_actor("writer", new_package); + pack->writer = td::actor::create_actor("writer", new_package, async_mode_); for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) { td::unlink(packages_[idx].path).ensure(); @@ -831,6 +900,61 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl promise.set_value(td::Unit()); } +static std::tuple to_tuple(const PackageId &id) { + return {id.id, id.temp, id.key}; +} + +void ArchiveLru::on_query(td::actor::ActorId slice, PackageId id, size_t files_count) { + SliceInfo &info = slices_[to_tuple(id)]; + if (info.opened_idx != 0) { + total_files_ -= info.files_count; + lru_.erase(info.opened_idx); + } + info.actor = std::move(slice); + total_files_ += (info.files_count = files_count); + info.opened_idx = current_idx_++; + if (!info.is_permanent) { + lru_.emplace(info.opened_idx, id); + } + enforce_limit(); +} + +void ArchiveLru::set_permanent_slices(std::vector ids) { + for (auto id : permanent_slices_) { + SliceInfo &info = slices_[to_tuple(id)]; + if (!info.is_permanent) { + continue; + } + info.is_permanent = false; + if (info.opened_idx) { + lru_.emplace(info.opened_idx, id); + } + } + permanent_slices_ = std::move(ids); + for (auto id : permanent_slices_) { + SliceInfo &info = slices_[to_tuple(id)]; + if (info.is_permanent) { + continue; + } + info.is_permanent = true; + if (info.opened_idx) { + lru_.erase(info.opened_idx); + } + } + enforce_limit(); +} + +void ArchiveLru::enforce_limit() { + while (total_files_ > max_total_files_ && lru_.size() > 1) { + auto it = lru_.begin(); + auto it2 = slices_.find(to_tuple(it->second)); + lru_.erase(it); + total_files_ -= it2->second.files_count; + td::actor::send_closure(it2->second.actor, &ArchiveSlice::close_files); + it2->second.opened_idx = 0; + } +} + } // namespace validator } // namespace ton diff --git a/validator/db/archive-slice.hpp b/validator/db/archive-slice.hpp index 487115fe..9f775d2e 100644 --- a/validator/db/archive-slice.hpp +++ b/validator/db/archive-slice.hpp @@ -21,6 +21,7 @@ #include "validator/interfaces/db.h" #include "package.hpp" #include "fileref.hpp" +#include namespace ton { @@ -44,7 +45,7 @@ struct PackageId { std::string path() const; std::string name() const; - bool is_empty() { + bool is_empty() const { return id == std::numeric_limits::max(); } static PackageId empty(bool key, bool temp) { @@ -54,26 +55,33 @@ struct PackageId { class PackageWriter : public td::actor::Actor { public: - PackageWriter(std::shared_ptr package) : package_(std::move(package)) { + PackageWriter(std::weak_ptr package, bool async_mode = false) + : package_(std::move(package)), async_mode_(async_mode) { } void append(std::string filename, td::BufferSlice data, td::Promise> promise); void set_async_mode(bool mode, td::Promise promise) { async_mode_ = mode; if (!async_mode_) { - package_->sync(); + auto p = package_.lock(); + if (p) { + p->sync(); + } } promise.set_value(td::Unit()); } private: - std::shared_ptr package_; + std::weak_ptr package_; bool async_mode_ = false; }; +class ArchiveLru; + class ArchiveSlice : public td::actor::Actor { public: - ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root); + ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root, + td::actor::ActorId archive_lru); void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise); @@ -95,16 +103,23 @@ class ArchiveSlice : public td::actor::Actor { void get_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise promise); - void start_up() override; void destroy(td::Promise promise); void truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise promise); - void begin_transaction(); - void commit_transaction(); void set_async_mode(bool mode, td::Promise promise); + void close_files(); + private: - void written_data(BlockHandle handle, td::Promise promise); + void before_query(); + void do_close(); + template + td::Promise begin_async_query(td::Promise promise); + void end_async_query(); + + void begin_transaction(); + void commit_transaction(); + void add_file_cont(size_t idx, FileReference ref_id, td::uint64 offset, td::uint64 size, td::Promise promise); @@ -112,13 +127,14 @@ class ArchiveSlice : public td::actor::Actor { td::BufferSlice get_db_key_lt_desc(ShardIdFull shard); td::BufferSlice get_db_key_lt_el(ShardIdFull shard, td::uint32 idx); td::BufferSlice get_db_key_block_info(BlockIdExt block_id); - td::BufferSlice get_lt_from_db(ShardIdFull shard, td::uint32 idx); td::uint32 archive_id_; bool key_blocks_only_; bool temp_; bool finalized_; + PackageId p_id_; + std::string db_path_; bool destroyed_ = false; bool async_mode_ = false; @@ -127,8 +143,14 @@ class ArchiveSlice : public td::actor::Actor { td::uint32 huge_transaction_size_ = 0; td::uint32 slice_size_{100}; + enum Status { + st_closed, st_open, st_want_close + } status_ = st_closed; + size_t active_queries_ = 0; + std::string db_root_; - std::shared_ptr kv_; + td::actor::ActorId archive_lru_; + std::unique_ptr kv_; struct PackageInfo { PackageInfo(std::shared_ptr package, td::actor::ActorOwn writer, BlockSeqno id, @@ -164,6 +186,32 @@ class ArchiveSlice : public td::actor::Actor { static constexpr td::uint32 default_package_version() { return 1; } + + static const size_t ESTIMATED_DB_OPEN_FILES = 5; +}; + +class ArchiveLru : public td::actor::Actor { + public: + explicit ArchiveLru(size_t max_total_files) : max_total_files_(max_total_files) { + CHECK(max_total_files_ > 0); + } + void on_query(td::actor::ActorId slice, PackageId id, size_t files_count); + void set_permanent_slices(std::vector ids); + private: + size_t current_idx_ = 1; + struct SliceInfo { + td::actor::ActorId actor; + size_t files_count = 0; + size_t opened_idx = 0; // 0 - not opened + bool is_permanent = false; + }; + std::map, SliceInfo> slices_; + std::map lru_; + size_t total_files_ = 0; + size_t max_total_files_ = 0; + std::vector permanent_slices_; + + void enforce_limit(); }; } // namespace validator diff --git a/validator/db/rootdb.cpp b/validator/db/rootdb.cpp index 601b07c1..ff9abae6 100644 --- a/validator/db/rootdb.cpp +++ b/validator/db/rootdb.cpp @@ -400,7 +400,7 @@ void RootDb::start_up() { cell_db_ = td::actor::create_actor("celldb", actor_id(this), root_path_ + "/celldb/", opts_); state_db_ = td::actor::create_actor("statedb", actor_id(this), root_path_ + "/state/"); static_files_db_ = td::actor::create_actor("staticfilesdb", actor_id(this), root_path_ + "/static/"); - archive_db_ = td::actor::create_actor("archive", actor_id(this), root_path_); + archive_db_ = td::actor::create_actor("archive", actor_id(this), root_path_, opts_); } void RootDb::archive(BlockHandle handle, td::Promise promise) { @@ -497,8 +497,8 @@ void RootDb::set_async_mode(bool mode, td::Promise promise) { td::actor::send_closure(archive_db_, &ArchiveManager::set_async_mode, mode, std::move(promise)); } -void RootDb::run_gc(UnixTime ts, UnixTime archive_ttl) { - td::actor::send_closure(archive_db_, &ArchiveManager::run_gc, ts, archive_ttl); +void RootDb::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) { + td::actor::send_closure(archive_db_, &ArchiveManager::run_gc, mc_ts, gc_ts, archive_ttl); } } // namespace validator diff --git a/validator/db/rootdb.hpp b/validator/db/rootdb.hpp index 598defcb..97b9550b 100644 --- a/validator/db/rootdb.hpp +++ b/validator/db/rootdb.hpp @@ -134,11 +134,10 @@ class RootDb : public Db { td::Promise promise) override; void set_async_mode(bool mode, td::Promise promise) override; - void run_gc(UnixTime ts, UnixTime archive_ttl) override; + void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) override; private: td::actor::ActorId validator_manager_; - std::string root_path_; td::Ref opts_; diff --git a/validator/interfaces/db.h b/validator/interfaces/db.h index ba4d9dda..84ea2b36 100644 --- a/validator/interfaces/db.h +++ b/validator/interfaces/db.h @@ -119,7 +119,7 @@ class Db : public td::actor::Actor { td::Promise promise) = 0; virtual void set_async_mode(bool mode, td::Promise promise) = 0; - virtual void run_gc(UnixTime ts, UnixTime archive_ttl) = 0; + virtual void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) = 0; }; } // namespace validator diff --git a/validator/manager.cpp b/validator/manager.cpp index baaa78eb..55891ead 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -2385,9 +2385,9 @@ void ValidatorManagerImpl::state_serializer_update(BlockSeqno seqno) { void ValidatorManagerImpl::alarm() { try_advance_gc_masterchain_block(); alarm_timestamp() = td::Timestamp::in(1.0); - if (gc_masterchain_handle_) { - td::actor::send_closure(db_, &Db::run_gc, gc_masterchain_handle_->unix_time(), - static_cast(opts_->archive_ttl())); + if (last_masterchain_block_handle_ && gc_masterchain_handle_) { + td::actor::send_closure(db_, &Db::run_gc, last_masterchain_block_handle_->unix_time(), + gc_masterchain_handle_->unix_time(), static_cast(opts_->archive_ttl())); } if (log_status_at_.is_in_past()) { if (last_masterchain_block_handle_) { diff --git a/validator/validator-options.hpp b/validator/validator-options.hpp index 3a7e5ba7..3980b6c6 100644 --- a/validator/validator-options.hpp +++ b/validator/validator-options.hpp @@ -117,6 +117,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { td::uint32 get_celldb_compress_depth() const override { return celldb_compress_depth_; } + size_t get_max_open_archive_files() const override { + return max_open_archive_files_; + } void set_zero_block_id(BlockIdExt block_id) override { zero_block_id_ = block_id; @@ -173,6 +176,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { void set_celldb_compress_depth(td::uint32 value) override { celldb_compress_depth_ = value; } + void set_max_open_archive_files(size_t value) override { + max_open_archive_files_ = value; + } ValidatorManagerOptionsImpl *make_copy() const override { return new ValidatorManagerOptionsImpl(*this); @@ -216,6 +222,7 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { BlockSeqno sync_upto_{0}; std::string session_logs_file_; td::uint32 celldb_compress_depth_{0}; + size_t max_open_archive_files_ = 0; }; } // namespace validator diff --git a/validator/validator.h b/validator/validator.h index 2fefb064..d24467a7 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -82,6 +82,7 @@ struct ValidatorManagerOptions : public td::CntObject { virtual BlockSeqno sync_upto() const = 0; virtual std::string get_session_logs_file() const = 0; virtual td::uint32 get_celldb_compress_depth() const = 0; + virtual size_t get_max_open_archive_files() const = 0; virtual void set_zero_block_id(BlockIdExt block_id) = 0; virtual void set_init_block_id(BlockIdExt block_id) = 0; @@ -102,6 +103,7 @@ struct ValidatorManagerOptions : public td::CntObject { virtual void set_sync_upto(BlockSeqno seqno) = 0; virtual void set_session_logs_file(std::string f) = 0; virtual void set_celldb_compress_depth(td::uint32 value) = 0; + virtual void set_max_open_archive_files(size_t value) = 0; static td::Ref create( BlockIdExt zero_block_id, BlockIdExt init_block_id, From 79c48ebbba5724ce2817b340c6dbe8f0799572d9 Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Wed, 7 Feb 2024 15:45:51 +0300 Subject: [PATCH 06/10] Add cache for some LS requests (#893) * Cache runSmcMethod queries to LS * Drop duplicate sendMessage * Drop sendMessage cache once a minute --------- Co-authored-by: SpyCheese --- validator/CMakeLists.txt | 1 + validator/impl/CMakeLists.txt | 1 + validator/impl/fabric.cpp | 5 +- validator/impl/liteserver-cache.hpp | 112 ++++++++++++++++++++++++++++ validator/impl/liteserver.cpp | 58 +++++++++++--- validator/impl/liteserver.hpp | 14 +++- validator/interfaces/liteserver.h | 17 +++-- 7 files changed, 185 insertions(+), 23 deletions(-) create mode 100644 validator/impl/liteserver-cache.hpp diff --git a/validator/CMakeLists.txt b/validator/CMakeLists.txt index 8de60081..832374c6 100644 --- a/validator/CMakeLists.txt +++ b/validator/CMakeLists.txt @@ -43,6 +43,7 @@ set(VALIDATOR_HEADERS fabric.h interfaces/db.h interfaces/external-message.h + interfaces/liteserver.h interfaces/proof.h interfaces/shard.h interfaces/signature-set.h diff --git a/validator/impl/CMakeLists.txt b/validator/impl/CMakeLists.txt index f4b967a8..7cd273f2 100644 --- a/validator/impl/CMakeLists.txt +++ b/validator/impl/CMakeLists.txt @@ -32,6 +32,7 @@ set(TON_VALIDATOR_SOURCE external-message.hpp ihr-message.hpp liteserver.hpp + liteserver-cache.hpp message-queue.hpp proof.hpp shard.hpp diff --git a/validator/impl/fabric.cpp b/validator/impl/fabric.cpp index ede8d36d..e3478594 100644 --- a/validator/impl/fabric.cpp +++ b/validator/impl/fabric.cpp @@ -34,6 +34,7 @@ #include "ton/ton-io.hpp" #include "liteserver.hpp" #include "validator/fabric.h" +#include "liteserver-cache.hpp" namespace ton { @@ -46,7 +47,7 @@ td::actor::ActorOwn create_db_actor(td::actor::ActorId man td::actor::ActorOwn create_liteserver_cache_actor(td::actor::ActorId manager, std::string db_root) { - return td::actor::create_actor("cache"); + return td::actor::create_actor("cache"); } td::Result> create_block(BlockIdExt block_id, td::BufferSlice data) { @@ -244,7 +245,7 @@ void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_b void run_liteserver_query(td::BufferSlice data, td::actor::ActorId manager, td::actor::ActorId cache, td::Promise promise) { - LiteQuery::run_query(std::move(data), std::move(manager), std::move(promise)); + LiteQuery::run_query(std::move(data), std::move(manager), std::move(cache), std::move(promise)); } void run_fetch_account_state(WorkchainId wc, StdSmcAddress addr, td::actor::ActorId manager, diff --git a/validator/impl/liteserver-cache.hpp b/validator/impl/liteserver-cache.hpp new file mode 100644 index 00000000..0e58e051 --- /dev/null +++ b/validator/impl/liteserver-cache.hpp @@ -0,0 +1,112 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once + +#include "interfaces/liteserver.h" +#include + +namespace ton::validator { + +class LiteServerCacheImpl : public LiteServerCache { + public: + void start_up() override { + alarm(); + } + + void alarm() override { + alarm_timestamp() = td::Timestamp::in(60.0); + if (queries_cnt_ > 0 || !send_message_cache_.empty()) { + LOG(WARNING) << "LS Cache stats: " << queries_cnt_ << " queries, " << queries_hit_cnt_ << " hits; " + << cache_.size() << " entries, size=" << total_size_ << "/" << MAX_CACHE_SIZE << "; " + << send_message_cache_.size() << " different sendMessage queries, " << send_message_error_cnt_ + << " duplicates"; + queries_cnt_ = 0; + queries_hit_cnt_ = 0; + send_message_cache_.clear(); + send_message_error_cnt_ = 0; + } + } + + void lookup(td::Bits256 key, td::Promise promise) override { + ++queries_cnt_; + auto it = cache_.find(key); + if (it == cache_.end()) { + promise.set_error(td::Status::Error("not found")); + return; + } + ++queries_hit_cnt_; + auto entry = it->second.get(); + entry->remove(); + lru_.put(entry); + promise.set_value(entry->value_.clone()); + } + + void update(td::Bits256 key, td::BufferSlice value) override { + std::unique_ptr &entry = cache_[key]; + if (entry == nullptr) { + entry = std::make_unique(key, std::move(value)); + } else { + total_size_ -= entry->size(); + entry->value_ = std::move(value); + entry->remove(); + } + lru_.put(entry.get()); + total_size_ += entry->size(); + + while (total_size_ > MAX_CACHE_SIZE) { + auto to_remove = (CacheEntry *)lru_.get(); + CHECK(to_remove); + total_size_ -= to_remove->size(); + to_remove->remove(); + cache_.erase(to_remove->key_); + } + } + + void process_send_message(td::Bits256 key, td::Promise promise) override { + if (send_message_cache_.insert(key).second) { + promise.set_result(td::Unit()); + } else { + ++send_message_error_cnt_; + promise.set_error(td::Status::Error("duplicate message")); + } + } + + private: + struct CacheEntry : public td::ListNode { + explicit CacheEntry(td::Bits256 key, td::BufferSlice value) : key_(key), value_(std::move(value)) { + } + td::Bits256 key_; + td::BufferSlice value_; + + size_t size() const { + return value_.size() + 32 * 2; + } + }; + + std::map> cache_; + td::ListNode lru_; + size_t total_size_ = 0; + + size_t queries_cnt_ = 0, queries_hit_cnt_ = 0; + + std::set send_message_cache_; + size_t send_message_error_cnt_ = 0; + + static const size_t MAX_CACHE_SIZE = 64 << 20; +}; + +} // namespace ton::validator diff --git a/validator/impl/liteserver.cpp b/validator/impl/liteserver.cpp index f2be6ef9..7d25b408 100644 --- a/validator/impl/liteserver.cpp +++ b/validator/impl/liteserver.cpp @@ -54,8 +54,11 @@ td::int32 get_tl_tag(td::Slice slice) { } void LiteQuery::run_query(td::BufferSlice data, td::actor::ActorId manager, + td::actor::ActorId cache, td::Promise promise) { - td::actor::create_actor("litequery", std::move(data), std::move(manager), std::move(promise)).release(); + td::actor::create_actor("litequery", std::move(data), std::move(manager), std::move(cache), + std::move(promise)) + .release(); } void LiteQuery::fetch_account_state(WorkchainId wc, StdSmcAddress acc_addr, td::actor::ActorId manager, @@ -64,8 +67,8 @@ void LiteQuery::fetch_account_state(WorkchainId wc, StdSmcAddress acc_addr, td: } LiteQuery::LiteQuery(td::BufferSlice data, td::actor::ActorId manager, - td::Promise promise) - : query_(std::move(data)), manager_(std::move(manager)), promise_(std::move(promise)) { + td::actor::ActorId cache, td::Promise promise) + : query_(std::move(data)), manager_(std::move(manager)), cache_(std::move(cache)), promise_(std::move(promise)) { timeout_ = td::Timestamp::in(default_timeout_msec * 0.001); } @@ -110,7 +113,10 @@ void LiteQuery::alarm() { fatal_error(-503, "timeout"); } -bool LiteQuery::finish_query(td::BufferSlice result) { +bool LiteQuery::finish_query(td::BufferSlice result, bool skip_cache_update) { + if (use_cache_ && !skip_cache_update) { + td::actor::send_closure(cache_, &LiteServerCache::update, cache_key_, result.clone()); + } if (promise_) { promise_.set_result(std::move(result)); stop(); @@ -124,21 +130,53 @@ bool LiteQuery::finish_query(td::BufferSlice result) { void LiteQuery::start_up() { alarm_timestamp() = timeout_; - if(acc_state_promise_) { - td::actor::send_closure_later(actor_id(this),&LiteQuery::perform_fetchAccountState); + if (acc_state_promise_) { + td::actor::send_closure_later(actor_id(this), &LiteQuery::perform_fetchAccountState); return; } - auto F = fetch_tl_object(std::move(query_), true); + auto F = fetch_tl_object(query_, true); if (F.is_error()) { td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, 0); // unknown abort_query(F.move_as_error()); return; } - td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, F.ok()->get_id()); + query_obj_ = F.move_as_ok(); + if (!cache_.empty() && query_obj_->get_id() == lite_api::liteServer_sendMessage::ID) { + // Dropping duplicate "sendMessage" + cache_key_ = td::sha256_bits256(query_); + td::actor::send_closure(cache_, &LiteServerCache::process_send_message, cache_key_, + [SelfId = actor_id(this)](td::Result R) { + if (R.is_ok()) { + td::actor::send_closure(SelfId, &LiteQuery::perform); + } else { + td::actor::send_closure(SelfId, &LiteQuery::abort_query, + R.move_as_error_prefix("cannot send external message : ")); + } + }); + return; + } + use_cache_ = !cache_.empty() && query_obj_->get_id() == lite_api::liteServer_runSmcMethod::ID; + if (use_cache_) { + cache_key_ = td::sha256_bits256(query_); + td::actor::send_closure( + cache_, &LiteServerCache::lookup, cache_key_, [SelfId = actor_id(this)](td::Result R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &LiteQuery::perform); + } else { + td::actor::send_closure(SelfId, &LiteQuery::finish_query, R.move_as_ok(), true); + } + }); + } else { + perform(); + } +} + +void LiteQuery::perform() { + td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, query_obj_->get_id()); lite_api::downcast_call( - *F.move_as_ok().get(), + *query_obj_, td::overloaded( [&](lite_api::liteServer_getTime& q) { this->perform_getTime(); }, [&](lite_api::liteServer_getVersion& q) { this->perform_getVersion(); }, @@ -501,7 +539,7 @@ void LiteQuery::perform_sendMessage(td::BufferSlice data) { LOG(INFO) << "sending an external message to validator manager"; td::actor::send_closure_later(manager, &ValidatorManager::send_external_message, res.move_as_ok()); auto b = ton::create_serialize_tl_object(1); - td::actor::send_closure(Self, &LiteQuery::finish_query, std::move(b)); + td::actor::send_closure(Self, &LiteQuery::finish_query, std::move(b), false); } }); } diff --git a/validator/impl/liteserver.hpp b/validator/impl/liteserver.hpp index 2707fdfe..57ec7c3c 100644 --- a/validator/impl/liteserver.hpp +++ b/validator/impl/liteserver.hpp @@ -27,7 +27,7 @@ #include "shard.hpp" #include "proof.hpp" #include "block/block-auto.h" - +#include "auto/tl/lite_api.h" namespace ton { @@ -37,11 +37,16 @@ using td::Ref; class LiteQuery : public td::actor::Actor { td::BufferSlice query_; td::actor::ActorId manager_; + td::actor::ActorId cache_; td::Timestamp timeout_; td::Promise promise_; td::Promise,UnixTime,LogicalTime,std::unique_ptr>> acc_state_promise_; + tl_object_ptr query_obj_; + bool use_cache_{false}; + td::Bits256 cache_key_; + int pending_{0}; int mode_{0}; WorkchainId acc_workchain_; @@ -75,11 +80,11 @@ class LiteQuery : public td::actor::Actor { ls_capabilities = 7 }; // version 1.1; +1 = build block proof chains, +2 = masterchainInfoExt, +4 = runSmcMethod LiteQuery(td::BufferSlice data, td::actor::ActorId manager, - td::Promise promise); + td::actor::ActorId cache, td::Promise promise); LiteQuery(WorkchainId wc, StdSmcAddress acc_addr, td::actor::ActorId manager, td::Promise,UnixTime,LogicalTime,std::unique_ptr>> promise); static void run_query(td::BufferSlice data, td::actor::ActorId manager, - td::Promise promise); + td::actor::ActorId cache, td::Promise promise); static void fetch_account_state(WorkchainId wc, StdSmcAddress acc_addr, td::actor::ActorId manager, td::Promise,UnixTime,LogicalTime,std::unique_ptr>> promise); @@ -90,9 +95,10 @@ class LiteQuery : public td::actor::Actor { bool fatal_error(int err_code, std::string err_msg = ""); void abort_query(td::Status reason); void abort_query_ext(td::Status reason, std::string err_msg); - bool finish_query(td::BufferSlice result); + bool finish_query(td::BufferSlice result, bool skip_cache_update = false); void alarm() override; void start_up() override; + void perform(); void perform_getTime(); void perform_getVersion(); void perform_getMasterchainInfo(int mode); diff --git a/validator/interfaces/liteserver.h b/validator/interfaces/liteserver.h index 3e803029..0920c11f 100644 --- a/validator/interfaces/liteserver.h +++ b/validator/interfaces/liteserver.h @@ -19,16 +19,19 @@ #pragma once #include "td/actor/actor.h" +#include "td/utils/buffer.h" +#include "common/bitstring.h" -namespace ton { - -namespace validator { +namespace ton::validator { class LiteServerCache : public td::actor::Actor { public: - virtual ~LiteServerCache() = default; + ~LiteServerCache() override = default; + + virtual void lookup(td::Bits256 key, td::Promise promise) = 0; + virtual void update(td::Bits256 key, td::BufferSlice value) = 0; + + virtual void process_send_message(td::Bits256 key, td::Promise promise) = 0; }; -} // namespace validator - -} // namespace ton +} // namespace ton::validator \ No newline at end of file From f344aa46c3b4274e53362d12be50e212f6ffa7ca Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Fri, 9 Feb 2024 15:28:23 +0300 Subject: [PATCH 07/10] Drop duplicate ext msg broadcasts (#894) Co-authored-by: SpyCheese --- validator/full-node-shard.cpp | 15 +++++++++++++-- validator/full-node-shard.hpp | 4 ++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 7c59a79c..53c086e3 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -109,6 +109,9 @@ void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broad } auto q = B.move_as_ok(); + if (!processed_ext_msg_broadcasts_.insert(td::sha256_bits256(q->message_->data_)).second) { + return promise.set_error(td::Status::Error("duplicate external message broadcast")); + } if (config_.ext_messages_broadcast_disabled_) { promise.set_error(td::Status::Error("rebroadcasting external messages is disabled")); promise = [manager = validator_manager_, message = q->message_->data_.clone()](td::Result R) mutable { @@ -703,6 +706,9 @@ void FullNodeShardImpl::send_external_message(td::BufferSlice data) { }); return; } + if (!processed_ext_msg_broadcasts_.insert(td::sha256_bits256(data)).second) { + return; + } auto B = create_serialize_tl_object( create_tl_object(std::move(data))); if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) { @@ -852,10 +858,15 @@ void FullNodeShardImpl::alarm() { update_certificate_at_ = td::Timestamp::never(); } } + if (cleanup_processed_ext_msg_at_ && cleanup_processed_ext_msg_at_.is_in_past()) { + processed_ext_msg_broadcasts_.clear(); + cleanup_processed_ext_msg_at_ = td::Timestamp::in(60.0); + } alarm_timestamp().relax(sync_completed_at_); alarm_timestamp().relax(update_certificate_at_); alarm_timestamp().relax(reload_neighbours_at_); alarm_timestamp().relax(ping_neighbours_at_); + alarm_timestamp().relax(cleanup_processed_ext_msg_at_); } void FullNodeShardImpl::start_up() { @@ -872,8 +883,8 @@ void FullNodeShardImpl::start_up() { reload_neighbours_at_ = td::Timestamp::now(); ping_neighbours_at_ = td::Timestamp::now(); - alarm_timestamp().relax(reload_neighbours_at_); - alarm_timestamp().relax(ping_neighbours_at_); + cleanup_processed_ext_msg_at_ = td::Timestamp::now(); + alarm_timestamp().relax(td::Timestamp::now()); } } diff --git a/validator/full-node-shard.hpp b/validator/full-node-shard.hpp index dcf4c649..faf49598 100644 --- a/validator/full-node-shard.hpp +++ b/validator/full-node-shard.hpp @@ -21,6 +21,7 @@ #include "full-node-shard.h" #include "td/actor/PromiseFuture.h" #include "td/utils/port/Poll.h" +#include namespace ton { @@ -250,6 +251,9 @@ class FullNodeShardImpl : public FullNodeShard { adnl::AdnlNodeIdShort last_pinged_neighbour_ = adnl::AdnlNodeIdShort::zero(); FullNodeConfig config_; + + std::set processed_ext_msg_broadcasts_; + td::Timestamp cleanup_processed_ext_msg_at_; }; } // namespace fullnode From 4d39772e4011788e16d2eef2a5167e090620fac8 Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Mon, 12 Feb 2024 10:42:02 +0300 Subject: [PATCH 08/10] Drop only accepted duplicate ext messages, don't cache errors (#902) Co-authored-by: SpyCheese --- validator/impl/liteserver-cache.hpp | 4 ++++ validator/impl/liteserver.cpp | 11 +++++++---- validator/interfaces/liteserver.h | 1 + 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/validator/impl/liteserver-cache.hpp b/validator/impl/liteserver-cache.hpp index 0e58e051..5318371f 100644 --- a/validator/impl/liteserver-cache.hpp +++ b/validator/impl/liteserver-cache.hpp @@ -85,6 +85,10 @@ class LiteServerCacheImpl : public LiteServerCache { } } + void drop_send_message_from_cache(td::Bits256 key) override { + send_message_cache_.erase(key); + } + private: struct CacheEntry : public td::ListNode { explicit CacheEntry(td::Bits256 key, td::BufferSlice value) : key_(key), value_(std::move(value)) { diff --git a/validator/impl/liteserver.cpp b/validator/impl/liteserver.cpp index 7d25b408..a7219b72 100644 --- a/validator/impl/liteserver.cpp +++ b/validator/impl/liteserver.cpp @@ -531,10 +531,13 @@ void LiteQuery::perform_sendMessage(td::BufferSlice data) { auto copy = data.clone(); td::actor::send_closure_later( manager_, &ValidatorManager::check_external_message, std::move(copy), - [Self = actor_id(this), data = std::move(data), manager = manager_](td::Result> res) mutable { - if(res.is_error()) { - td::actor::send_closure(Self, &LiteQuery::abort_query, - res.move_as_error_prefix("cannot apply external message to current state : "s)); + [Self = actor_id(this), data = std::move(data), manager = manager_, cache = cache_, + cache_key = cache_key_](td::Result> res) mutable { + if (res.is_error()) { + // Don't cache errors + td::actor::send_closure(cache, &LiteServerCache::drop_send_message_from_cache, cache_key); + td::actor::send_closure(Self, &LiteQuery::abort_query, + res.move_as_error_prefix("cannot apply external message to current state : "s)); } else { LOG(INFO) << "sending an external message to validator manager"; td::actor::send_closure_later(manager, &ValidatorManager::send_external_message, res.move_as_ok()); diff --git a/validator/interfaces/liteserver.h b/validator/interfaces/liteserver.h index 0920c11f..f2f78d41 100644 --- a/validator/interfaces/liteserver.h +++ b/validator/interfaces/liteserver.h @@ -32,6 +32,7 @@ class LiteServerCache : public td::actor::Actor { virtual void update(td::Bits256 key, td::BufferSlice value) = 0; virtual void process_send_message(td::Bits256 key, td::Promise promise) = 0; + virtual void drop_send_message_from_cache(td::Bits256 key) = 0; }; } // namespace ton::validator \ No newline at end of file From eb4831d7d637e5b8991b9c72340bdf38836ec38a Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Thu, 15 Feb 2024 12:03:57 +0300 Subject: [PATCH 09/10] Add --archive-preload-period (#904) Co-authored-by: SpyCheese --- validator-engine/validator-engine.cpp | 11 +++++++++++ validator-engine/validator-engine.hpp | 4 ++++ validator/db/archive-manager.cpp | 18 ++++++++++++++++++ validator/db/archive-slice.cpp | 4 ++++ validator/db/archive-slice.hpp | 1 + validator/validator-options.hpp | 7 +++++++ validator/validator.h | 2 ++ 7 files changed, 47 insertions(+) diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index c7e4787c..b8d428f3 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -1365,6 +1365,7 @@ td::Status ValidatorEngine::load_global_config() { } validator_options_.write().set_celldb_compress_depth(celldb_compress_depth_); validator_options_.write().set_max_open_archive_files(max_open_archive_files_); + validator_options_.write().set_archive_preload_period(archive_preload_period_); std::vector h; for (auto &x : conf.validator_->hardforks_) { @@ -3801,6 +3802,16 @@ int main(int argc, char *argv[]) { acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_max_open_archive_files, v); }); return td::Status::OK(); }); + p.add_checked_option( + '\0', "archive-preload-period", "open archive slices for the past X second on startup (default: 0)", + [&](td::Slice s) -> td::Status { + auto v = td::to_double(s); + if (v < 0) { + return td::Status::Error("sync-before should be non-negative"); + } + acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_archive_preload_period, v); }); + return td::Status::OK(); + }); auto S = p.run(argc, argv); if (S.is_error()) { LOG(ERROR) << "failed to parse options: " << S.move_as_error(); diff --git a/validator-engine/validator-engine.hpp b/validator-engine/validator-engine.hpp index 76b6134b..da50c71f 100644 --- a/validator-engine/validator-engine.hpp +++ b/validator-engine/validator-engine.hpp @@ -205,6 +205,7 @@ class ValidatorEngine : public td::actor::Actor { double key_proof_ttl_ = 0; td::uint32 celldb_compress_depth_ = 0; size_t max_open_archive_files_ = 0; + double archive_preload_period_ = 0.0; bool read_config_ = false; bool started_keyring_ = false; bool started_ = false; @@ -268,6 +269,9 @@ class ValidatorEngine : public td::actor::Actor { void set_max_open_archive_files(size_t value) { max_open_archive_files_ = value; } + void set_archive_preload_period(double value) { + archive_preload_period_ = value; + } void start_up() override; ValidatorEngine() { } diff --git a/validator/db/archive-manager.cpp b/validator/db/archive-manager.cpp index 25d686bf..2c0c82e5 100644 --- a/validator/db/archive-manager.cpp +++ b/validator/db/archive-manager.cpp @@ -885,6 +885,24 @@ void ArchiveManager::start_up() { }).ensure(); persistent_state_gc(FileHash::zero()); + + double open_since = td::Clocks::system() - opts_->get_archive_preload_period(); + for (auto it = files_.rbegin(); it != files_.rend(); ++it) { + if (it->second.file_actor_id().empty()) { + continue; + } + td::actor::send_closure(it->second.file_actor_id(), &ArchiveSlice::open_files); + bool stop = true; + for (const auto &first_block : it->second.first_blocks) { + if ((double)first_block.second.ts >= open_since) { + stop = false; + break; + } + } + if (stop) { + break; + } + } } void ArchiveManager::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) { diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index 4a7c419a..52abc008 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -525,6 +525,10 @@ void ArchiveSlice::before_query() { } } +void ArchiveSlice::open_files() { + before_query(); +} + void ArchiveSlice::close_files() { if (status_ == st_open) { if (active_queries_ == 0) { diff --git a/validator/db/archive-slice.hpp b/validator/db/archive-slice.hpp index 9f775d2e..f178a9b8 100644 --- a/validator/db/archive-slice.hpp +++ b/validator/db/archive-slice.hpp @@ -108,6 +108,7 @@ class ArchiveSlice : public td::actor::Actor { void set_async_mode(bool mode, td::Promise promise); + void open_files(); void close_files(); private: diff --git a/validator/validator-options.hpp b/validator/validator-options.hpp index 3980b6c6..41019fa6 100644 --- a/validator/validator-options.hpp +++ b/validator/validator-options.hpp @@ -120,6 +120,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { size_t get_max_open_archive_files() const override { return max_open_archive_files_; } + double get_archive_preload_period() const override { + return archive_preload_period_; + } void set_zero_block_id(BlockIdExt block_id) override { zero_block_id_ = block_id; @@ -179,6 +182,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { void set_max_open_archive_files(size_t value) override { max_open_archive_files_ = value; } + void set_archive_preload_period(double value) override { + archive_preload_period_ = value; + } ValidatorManagerOptionsImpl *make_copy() const override { return new ValidatorManagerOptionsImpl(*this); @@ -223,6 +229,7 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { std::string session_logs_file_; td::uint32 celldb_compress_depth_{0}; size_t max_open_archive_files_ = 0; + double archive_preload_period_ = 0.0; }; } // namespace validator diff --git a/validator/validator.h b/validator/validator.h index d24467a7..f2dc56c4 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -83,6 +83,7 @@ struct ValidatorManagerOptions : public td::CntObject { virtual std::string get_session_logs_file() const = 0; virtual td::uint32 get_celldb_compress_depth() const = 0; virtual size_t get_max_open_archive_files() const = 0; + virtual double get_archive_preload_period() const = 0; virtual void set_zero_block_id(BlockIdExt block_id) = 0; virtual void set_init_block_id(BlockIdExt block_id) = 0; @@ -104,6 +105,7 @@ struct ValidatorManagerOptions : public td::CntObject { virtual void set_session_logs_file(std::string f) = 0; virtual void set_celldb_compress_depth(td::uint32 value) = 0; virtual void set_max_open_archive_files(size_t value) = 0; + virtual void set_archive_preload_period(double value) = 0; static td::Ref create( BlockIdExt zero_block_id, BlockIdExt init_block_id, From a4d618b0fc51003cb571c903a5974f7efb72fa4e Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Fri, 16 Feb 2024 11:51:43 +0300 Subject: [PATCH 10/10] Patch funcfiftlib and emulator-emscripten builds (#905) Co-authored-by: SpyCheese --- crypto/CMakeLists.txt | 1 + crypto/funcfiftlib/funcfiftlib-prejs.js | 1 + crypto/funcfiftlib/funcfiftlib.cpp | 25 +------------- emulator/CMakeLists.txt | 3 +- emulator/emulator-emscripten.cpp | 43 ++++++++++++++++++++++--- 5 files changed, 43 insertions(+), 30 deletions(-) create mode 100644 crypto/funcfiftlib/funcfiftlib-prejs.js diff --git a/crypto/CMakeLists.txt b/crypto/CMakeLists.txt index 0871d250..827c903b 100644 --- a/crypto/CMakeLists.txt +++ b/crypto/CMakeLists.txt @@ -401,6 +401,7 @@ if (USE_EMSCRIPTEN) target_link_options(funcfiftlib PRIVATE -sALLOW_MEMORY_GROWTH=1) target_link_options(funcfiftlib PRIVATE -sALLOW_TABLE_GROWTH=1) target_link_options(funcfiftlib PRIVATE --embed-file ${CMAKE_CURRENT_SOURCE_DIR}/fift/lib@/fiftlib) + target_link_options(funcfiftlib PRIVATE --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/funcfiftlib/funcfiftlib-prejs.js) target_link_options(funcfiftlib PRIVATE -fexceptions) target_compile_options(funcfiftlib PRIVATE -fexceptions -fno-stack-protector) endif() diff --git a/crypto/funcfiftlib/funcfiftlib-prejs.js b/crypto/funcfiftlib/funcfiftlib-prejs.js new file mode 100644 index 00000000..38326c38 --- /dev/null +++ b/crypto/funcfiftlib/funcfiftlib-prejs.js @@ -0,0 +1 @@ +var crypto = { getRandomValues: function(array) { for (var i = 0; i < array.length; i++) array[i] = (Math.random()*256)|0 } }; \ No newline at end of file diff --git a/crypto/funcfiftlib/funcfiftlib.cpp b/crypto/funcfiftlib/funcfiftlib.cpp index c8bf4fc5..a041c25d 100644 --- a/crypto/funcfiftlib/funcfiftlib.cpp +++ b/crypto/funcfiftlib/funcfiftlib.cpp @@ -34,29 +34,6 @@ #include #include -std::string escape_json(const std::string &s) { - std::ostringstream o; - for (auto c = s.cbegin(); c != s.cend(); c++) { - switch (*c) { - case '"': o << "\\\""; break; - case '\\': o << "\\\\"; break; - case '\b': o << "\\b"; break; - case '\f': o << "\\f"; break; - case '\n': o << "\\n"; break; - case '\r': o << "\\r"; break; - case '\t': o << "\\t"; break; - default: - if ('\x00' <= *c && *c <= '\x1f') { - o << "\\u" - << std::hex << std::setw(4) << std::setfill('0') << static_cast(*c); - } else { - o << *c; - } - } - } - return o.str(); -} - td::Result compile_internal(char *config_json) { TRY_RESULT(input_json, td::json_decode(td::MutableSlice(config_json))) auto &obj = input_json.get_object(); @@ -91,7 +68,7 @@ td::Result compile_internal(char *config_json) { auto result_obj = result_json.enter_object(); result_obj("status", "ok"); result_obj("codeBoc", td::base64_encode(boc)); - result_obj("fiftCode", escape_json(outs.str())); + result_obj("fiftCode", outs.str()); result_obj("codeHashHex", code_cell->get_hash().to_hex()); result_obj.leave(); diff --git a/emulator/CMakeLists.txt b/emulator/CMakeLists.txt index 969f9a88..7a4b7676 100644 --- a/emulator/CMakeLists.txt +++ b/emulator/CMakeLists.txt @@ -48,7 +48,7 @@ if (USE_EMSCRIPTEN) add_executable(emulator-emscripten ${EMULATOR_EMSCRIPTEN_SOURCE}) target_link_libraries(emulator-emscripten PUBLIC emulator) target_link_options(emulator-emscripten PRIVATE -sEXPORTED_RUNTIME_METHODS=_malloc,free,UTF8ToString,stringToUTF8,allocate,ALLOC_NORMAL,lengthBytesUTF8) - target_link_options(emulator-emscripten PRIVATE -sEXPORTED_FUNCTIONS=_emulate,_free,_run_get_method) + target_link_options(emulator-emscripten PRIVATE -sEXPORTED_FUNCTIONS=_emulate,_free,_run_get_method,_create_emulator,_destroy_emulator,_emulate_with_emulator) target_link_options(emulator-emscripten PRIVATE -sEXPORT_NAME=EmulatorModule) target_link_options(emulator-emscripten PRIVATE -sERROR_ON_UNDEFINED_SYMBOLS=0) target_link_options(emulator-emscripten PRIVATE -Oz) @@ -57,6 +57,7 @@ if (USE_EMSCRIPTEN) target_link_options(emulator-emscripten PRIVATE -sMODULARIZE=1) target_link_options(emulator-emscripten PRIVATE -sENVIRONMENT=web) target_link_options(emulator-emscripten PRIVATE -sFILESYSTEM=0) + target_link_options(emulator-emscripten PRIVATE -sALLOW_MEMORY_GROWTH=1) target_link_options(emulator-emscripten PRIVATE -fexceptions) if (USE_EMSCRIPTEN_NO_WASM) target_link_options(emulator-emscripten PRIVATE -sWASM=0) diff --git a/emulator/emulator-emscripten.cpp b/emulator/emulator-emscripten.cpp index dbd1c0d5..e76607c6 100644 --- a/emulator/emulator-emscripten.cpp +++ b/emulator/emulator-emscripten.cpp @@ -124,9 +124,39 @@ td::Result decode_get_method_params(const char* json) { return params; } +class NoopLog : public td::LogInterface { + public: + NoopLog() { + } + + void append(td::CSlice new_slice, int log_level) override { + } + + void rotate() override { + } +}; + extern "C" { -const char *emulate(const char *config, const char* libs, int verbosity, const char* account, const char* message, const char* params) { +void* create_emulator(const char *config, int verbosity) { + NoopLog logger; + + td::log_interface = &logger; + + SET_VERBOSITY_LEVEL(verbosity_NEVER); + return transaction_emulator_create(config, verbosity); +} + +void destroy_emulator(void* em) { + NoopLog logger; + + td::log_interface = &logger; + + SET_VERBOSITY_LEVEL(verbosity_NEVER); + transaction_emulator_destroy(em); +} + +const char *emulate_with_emulator(void* em, const char* libs, const char* account, const char* message, const char* params) { StringLog logger; td::log_interface = &logger; @@ -138,8 +168,6 @@ const char *emulate(const char *config, const char* libs, int verbosity, const c } auto decoded_params = decoded_params_res.move_as_ok(); - auto em = transaction_emulator_create(config, verbosity); - bool rand_seed_set = true; if (decoded_params.rand_seed_hex) { rand_seed_set = transaction_emulator_set_rand_seed(em, decoded_params.rand_seed_hex.unwrap().c_str()); @@ -162,8 +190,6 @@ const char *emulate(const char *config, const char* libs, int verbosity, const c result = transaction_emulator_emulate_transaction(em, account, message); } - transaction_emulator_destroy(em); - const char* output = nullptr; { td::JsonBuilder jb; @@ -178,6 +204,13 @@ const char *emulate(const char *config, const char* libs, int verbosity, const c return output; } +const char *emulate(const char *config, const char* libs, int verbosity, const char* account, const char* message, const char* params) { + auto em = transaction_emulator_create(config, verbosity); + auto result = emulate_with_emulator(em, libs, account, message, params); + transaction_emulator_destroy(em); + return result; +} + const char *run_get_method(const char *params, const char* stack, const char* config) { StringLog logger;