1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Merge branch 'testnet' into block-generation

This commit is contained in:
SpyCheese 2024-05-13 16:38:48 +03:00
commit 172c16ca2e
43 changed files with 545 additions and 248 deletions

View file

@ -19,7 +19,7 @@ jobs:
- name: Install system libraries
run: |
sudo apt-get update
sudo apt-get install -y build-essential git cmake ninja-build zlib1g-dev libsecp256k1-dev libmicrohttpd-dev libsodium-dev liblz4-dev
sudo apt-get install -y build-essential git cmake ninja-build zlib1g-dev libsecp256k1-dev libmicrohttpd-dev libsodium-dev liblz4-dev libjemalloc-dev
- name: Install clang-16
run: |

View file

@ -234,7 +234,7 @@ if (THREADS_HAVE_PTHREAD_ARG)
endif()
if (TON_USE_JEMALLOC)
find_package(JeMalloc REQUIRED)
find_package(jemalloc REQUIRED)
endif()
set(MEMPROF "" CACHE STRING "Use one of \"ON\", \"FAST\" or \"SAFE\" to enable memory profiling. \

View file

@ -1,6 +1,6 @@
FROM ubuntu:22.04 as builder
RUN apt-get update && \
DEBIAN_FRONTEND=noninteractive apt-get install -y build-essential cmake clang openssl libssl-dev zlib1g-dev gperf wget git ninja-build libsecp256k1-dev libsodium-dev libmicrohttpd-dev liblz4-dev pkg-config autoconf automake libtool && \
DEBIAN_FRONTEND=noninteractive apt-get install -y build-essential cmake clang openssl libssl-dev zlib1g-dev gperf wget git ninja-build libsecp256k1-dev libsodium-dev libmicrohttpd-dev liblz4-dev pkg-config autoconf automake libtool libjemalloc-dev && \
rm -rf /var/lib/apt/lists/*
ENV CC clang
ENV CXX clang++
@ -14,12 +14,12 @@ COPY ./ ./
RUN mkdir build && \
cd build && \
cmake -GNinja -DCMAKE_BUILD_TYPE=Release -DPORTABLE=1 -DTON_ARCH= .. && \
cmake -GNinja -DCMAKE_BUILD_TYPE=Release -DPORTABLE=1 -DTON_ARCH= -DTON_USE_JEMALLOC=ON .. && \
ninja storage-daemon storage-daemon-cli tonlibjson fift func validator-engine validator-engine-console generate-random-id dht-server lite-client
FROM ubuntu:22.04
RUN apt-get update && \
apt-get install -y wget libatomic1 openssl libsecp256k1-dev libsodium-dev libmicrohttpd-dev liblz4-dev && \
apt-get install -y wget libatomic1 openssl libsecp256k1-dev libsodium-dev libmicrohttpd-dev liblz4-dev libjemalloc-dev && \
rm -rf /var/lib/apt/lists/*
RUN mkdir -p /var/ton-work/db && \

View file

@ -1,7 +1,7 @@
#/bin/bash
#sudo apt-get update
#sudo apt-get install -y build-essential git cmake ninja-build zlib1g-dev libsecp256k1-dev libmicrohttpd-dev libsodium-dev liblz4-dev
#sudo apt-get install -y build-essential git cmake ninja-build zlib1g-dev libsecp256k1-dev libmicrohttpd-dev libsodium-dev liblz4-dev libjemalloc-dev
with_tests=false
with_artifacts=false
@ -42,7 +42,7 @@ else
echo "Using compiled openssl_3"
fi
cmake -GNinja .. \
cmake -GNinja -DTON_USE_JEMALLOC=ON .. \
-DCMAKE_BUILD_TYPE=Release \
-DOPENSSL_ROOT_DIR=$opensslPath \
-DOPENSSL_INCLUDE_DIR=$opensslPath/include \
@ -96,6 +96,8 @@ test $? -eq 0 || { echo "Can't strip final binaries"; exit 1; }
./lite-client/lite-client -V || exit 1
./crypto/fift -V || exit 1
ldd ./validator-engine/validator-engine || exit 1
cd ..
if [ "$with_artifacts" = true ]; then

View file

@ -246,7 +246,7 @@ class HardforkCreator : public td::actor::Actor {
}
void send_shard_block_info(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::BufferSlice data) override {
}
void send_broadcast(ton::BlockBroadcast broadcast) override {
void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override {
}
void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<ton::ReceivedBlock> promise) override {

View file

@ -14,6 +14,9 @@
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#include "td/utils/Time.h"
#include "td/utils/Timer.h"
#include <map>
#include "vm/boc.h"
#include "vm/boc-writers.h"
@ -30,7 +33,8 @@ class LargeBocSerializer {
public:
using Hash = Cell::Hash;
explicit LargeBocSerializer(std::shared_ptr<CellDbReader> reader) : reader(std::move(reader)) {}
explicit LargeBocSerializer(std::shared_ptr<CellDbReader> reader) : reader(std::move(reader)) {
}
void add_root(Hash root);
td::Status import_cells();
@ -65,7 +69,8 @@ class LargeBocSerializer {
std::map<Hash, CellInfo> cells;
std::vector<std::pair<const Hash, CellInfo>*> cell_list;
struct RootInfo {
RootInfo(Hash hash, int idx) : hash(hash), idx(idx) {}
RootInfo(Hash hash, int idx) : hash(hash), idx(idx) {
}
Hash hash;
int idx;
};
@ -78,6 +83,10 @@ class LargeBocSerializer {
void reorder_cells();
int revisit(int cell_idx, int force = 0);
td::uint64 compute_sizes(int mode, int& r_size, int& o_size);
td::Timestamp log_speed_at_;
size_t processed_cells_ = 0;
static constexpr double LOG_SPEED_PERIOD = 120.0;
};
void LargeBocSerializer::add_root(Hash root) {
@ -85,12 +94,16 @@ void LargeBocSerializer::add_root(Hash root) {
}
td::Status LargeBocSerializer::import_cells() {
td::Timer timer;
log_speed_at_ = td::Timestamp::in(LOG_SPEED_PERIOD);
processed_cells_ = 0;
for (auto& root : roots) {
TRY_RESULT(idx, import_cell(root.hash));
root.idx = idx;
}
reorder_cells();
CHECK(!cell_list.empty());
LOG(ERROR) << "serializer: import_cells took " << timer.elapsed() << "s, " << cell_count << " cells";
return td::Status::OK();
}
@ -98,6 +111,12 @@ td::Result<int> LargeBocSerializer::import_cell(Hash hash, int depth) {
if (depth > Cell::max_depth) {
return td::Status::Error("error while importing a cell into a bag of cells: cell depth too large");
}
++processed_cells_;
if (log_speed_at_.is_in_past()) {
log_speed_at_ += LOG_SPEED_PERIOD;
LOG(WARNING) << "serializer: import_cells " << (double)processed_cells_ / LOG_SPEED_PERIOD << " cells/s";
processed_cells_ = 0;
}
auto it = cells.find(hash);
if (it != cells.end()) {
it->second.should_cache = true;
@ -282,6 +301,7 @@ td::uint64 LargeBocSerializer::compute_sizes(int mode, int& r_size, int& o_size)
}
td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) {
td::Timer timer;
using Mode = BagOfCells::Mode;
BagOfCells::Info info;
if ((mode & Mode::WithCacheBits) && !(mode & Mode::WithIndex)) {
@ -313,13 +333,9 @@ td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) {
return td::Status::Error("bag of cells is too large");
}
boc_writers::FileWriter writer{fd, (size_t) info.total_size};
auto store_ref = [&](unsigned long long value) {
writer.store_uint(value, info.ref_byte_size);
};
auto store_offset = [&](unsigned long long value) {
writer.store_uint(value, info.offset_byte_size);
};
boc_writers::FileWriter writer{fd, (size_t)info.total_size};
auto store_ref = [&](unsigned long long value) { writer.store_uint(value, info.ref_byte_size); };
auto store_offset = [&](unsigned long long value) { writer.store_uint(value, info.offset_byte_size); };
writer.store_uint(info.magic, 4);
@ -371,6 +387,8 @@ td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) {
}
DCHECK(writer.position() == info.data_offset);
size_t keep_position = writer.position();
log_speed_at_ = td::Timestamp::in(LOG_SPEED_PERIOD);
processed_cells_ = 0;
for (int i = 0; i < cell_count; ++i) {
auto hash = cell_list[cell_count - 1 - i]->first;
const auto& dc_info = cell_list[cell_count - 1 - i]->second;
@ -389,6 +407,12 @@ td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) {
DCHECK(k > i && k < cell_count);
store_ref(k);
}
++processed_cells_;
if (log_speed_at_.is_in_past()) {
log_speed_at_ += LOG_SPEED_PERIOD;
LOG(WARNING) << "serializer: serialize " << (double)processed_cells_ / LOG_SPEED_PERIOD << " cells/s";
processed_cells_ = 0;
}
}
DCHECK(writer.position() - keep_position == info.data_size);
if (info.has_crc32c) {
@ -396,17 +420,23 @@ td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) {
writer.store_uint(td::bswap32(crc), 4);
}
DCHECK(writer.empty());
return writer.finalize();
}
TRY_STATUS(writer.finalize());
LOG(ERROR) << "serializer: serialize took " << timer.elapsed() << "s, " << cell_count << " cells, "
<< writer.position() << " bytes";
return td::Status::OK();
}
} // namespace
td::Status std_boc_serialize_to_file_large(std::shared_ptr<CellDbReader> reader, Cell::Hash root_hash,
td::FileFd& fd, int mode) {
td::Status std_boc_serialize_to_file_large(std::shared_ptr<CellDbReader> reader, Cell::Hash root_hash, td::FileFd& fd,
int mode) {
td::Timer timer;
CHECK(reader != nullptr)
LargeBocSerializer serializer(reader);
serializer.add_root(root_hash);
TRY_STATUS(serializer.import_cells());
return serializer.serialize(fd, mode);
TRY_STATUS(serializer.serialize(fd, mode));
LOG(ERROR) << "serialization took " << timer.elapsed() << "s";
return td::Status::OK();
}
}
} // namespace vm

View file

@ -59,38 +59,42 @@ RocksDb RocksDb::clone() const {
return RocksDb{db_, statistics_};
}
Result<RocksDb> RocksDb::open(std::string path, std::shared_ptr<rocksdb::Statistics> statistics) {
Result<RocksDb> RocksDb::open(std::string path, RocksDbOptions options) {
rocksdb::OptimisticTransactionDB *db;
{
rocksdb::Options options;
rocksdb::Options db_options;
static auto cache = rocksdb::NewLRUCache(1 << 30);
rocksdb::BlockBasedTableOptions table_options;
if (options.block_cache_size) {
table_options.block_cache = rocksdb::NewLRUCache(options.block_cache_size.value());
} else {
table_options.block_cache = cache;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
}
db_options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
options.manual_wal_flush = true;
options.create_if_missing = true;
options.max_background_compactions = 4;
options.max_background_flushes = 2;
options.bytes_per_sync = 1 << 20;
options.writable_file_max_buffer_size = 2 << 14;
options.statistics = statistics;
db_options.manual_wal_flush = true;
db_options.create_if_missing = true;
db_options.max_background_compactions = 4;
db_options.max_background_flushes = 2;
db_options.bytes_per_sync = 1 << 20;
db_options.writable_file_max_buffer_size = 2 << 14;
db_options.statistics = options.statistics;
rocksdb::OptimisticTransactionDBOptions occ_options;
occ_options.validate_policy = rocksdb::OccValidationPolicy::kValidateSerial;
rocksdb::ColumnFamilyOptions cf_options(options);
rocksdb::ColumnFamilyOptions cf_options(db_options);
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
column_families.push_back(rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName, cf_options));
std::vector<rocksdb::ColumnFamilyHandle *> handles;
TRY_STATUS(from_rocksdb(
rocksdb::OptimisticTransactionDB::Open(options, occ_options, std::move(path), column_families, &handles, &db)));
TRY_STATUS(from_rocksdb(rocksdb::OptimisticTransactionDB::Open(db_options, occ_options, std::move(path),
column_families, &handles, &db)));
CHECK(handles.size() == 1);
// i can delete the handle since DBImpl is always holding a reference to
// default column family
delete handles[0];
}
return RocksDb(std::shared_ptr<rocksdb::OptimisticTransactionDB>(db), std::move(statistics));
return RocksDb(std::shared_ptr<rocksdb::OptimisticTransactionDB>(db), std::move(options.statistics));
}
std::shared_ptr<rocksdb::Statistics> RocksDb::create_statistics() {

View file

@ -24,6 +24,7 @@
#include "td/db/KeyValue.h"
#include "td/utils/Status.h"
#include "td/utils/optional.h"
namespace rocksdb {
class OptimisticTransactionDB;
@ -34,11 +35,17 @@ class Statistics;
} // namespace rocksdb
namespace td {
struct RocksDbOptions {
std::shared_ptr<rocksdb::Statistics> statistics = nullptr;
optional<uint64> block_cache_size; // Default - one 1G cache for all RocksDb
};
class RocksDb : public KeyValue {
public:
static Status destroy(Slice path);
RocksDb clone() const;
static Result<RocksDb> open(std::string path, std::shared_ptr<rocksdb::Statistics> statistics = nullptr);
static Result<RocksDb> open(std::string path, RocksDbOptions options = {});
Result<GetStatus> get(Slice key, std::string &value) override;
Status set(Slice key, Slice value) override;

View file

@ -347,7 +347,7 @@ class TestNode : public td::actor::Actor {
}
}
}
void send_broadcast(ton::BlockBroadcast broadcast) override {
void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override {
}
void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<ton::ReceivedBlock> promise) override {

View file

@ -615,7 +615,7 @@ engine.validator.config out_port:int addrs:(vector engine.Addr) adnl:(vector eng
shards_to_monitor:(vector tonNode.shardId)
gc:engine.gc = engine.validator.Config;
engine.validator.customOverlayNode adnl_id:int256 msg_sender:Bool msg_sender_priority:int = engine.validator.CustomOverlayNode;
engine.validator.customOverlayNode adnl_id:int256 msg_sender:Bool msg_sender_priority:int block_sender:Bool = engine.validator.CustomOverlayNode;
engine.validator.customOverlay name:string nodes:(vector engine.validator.customOverlayNode) = engine.validator.CustomOverlay;
engine.validator.customOverlaysConfig overlays:(vector engine.validator.customOverlay) = engine.validator.CustomOverlaysConfig;
@ -798,9 +798,9 @@ validatorSession.statsProducer id:int256 candidate_id:int256 block_status:int co
signed_weight:long signed_33pct_at:double signed_66pct_at:double
serialize_time:double deserialize_time:double serialized_size:int = validatorSession.StatsProducer;
validatorSession.statsRound timestamp:long producers:(vector validatorSession.statsProducer) = validatorSession.StatsRound;
validatorSession.statsRound timestamp:double producers:(vector validatorSession.statsProducer) = validatorSession.StatsRound;
validatorSession.stats success:Bool id:tonNode.blockIdExt timestamp:long self:int256 session_id:int256 cc_seqno:int
validatorSession.stats success:Bool id:tonNode.blockIdExt timestamp:double self:int256 session_id:int256 cc_seqno:int
creator:int256 total_validators:int total_weight:long
signatures:int signatures_weight:long approve_signatures:int approve_signatures_weight:long
first_round:int rounds:(vector validatorSession.statsRound) = validatorSession.Stats;
@ -810,6 +810,10 @@ collatorNode.compressedCandidate flags:# source:PublicKey id:tonNode.blockIdExt
collatorNode.generateBlockSuccess candidate:collatorNode.Candidate = collatorNode.GenerateBlockResult;
collatorNode.generateBlockError code:int message:string = collatorNode.GenerateBlockResult;
validatorSession.newValidatorGroupStats.node id:int256 weight:long = validatorSession.newValidatorGroupStats.Node;
validatorSession.newValidatorGroupStats session_id:int256 workchain:int shard:long cc_seqno:int timestamp:double
self_idx:int nodes:(vector validatorSession.newValidatorGroupStats.node) = validatorSession.NewValidatorGroupStats;
---functions---
collatorNode.generateBlock workchain:int shard:long min_mc_id:tonNode.blockIdExt prev_blocks:(vector tonNode.blockIdExt)
creator:int256 = collatorNode.GenerateBlockResult;

Binary file not shown.

View file

@ -1182,9 +1182,10 @@ td::Status ShowCustomOverlaysQuery::receive(td::BufferSlice data) {
td::TerminalIO::out() << "Overlay \"" << overlay->name_ << "\": " << overlay->nodes_.size() << " nodes\n";
for (const auto &node : overlay->nodes_) {
td::TerminalIO::out() << " " << node->adnl_id_
<< (node->msg_sender_ ? (PSTRING() << " (sender, p=" << node->msg_sender_priority_ << ")")
<< (node->msg_sender_
? (PSTRING() << " (msg sender, p=" << node->msg_sender_priority_ << ")")
: "")
<< "\n";
<< (node->block_sender_ ? " (block sender)" : "") << "\n";
}
td::TerminalIO::out() << "\n";
}

View file

@ -1429,6 +1429,12 @@ td::Status ValidatorEngine::load_global_config() {
validator_options_.write().set_archive_preload_period(archive_preload_period_);
validator_options_.write().set_disable_rocksdb_stats(disable_rocksdb_stats_);
validator_options_.write().set_nonfinal_ls_queries_enabled(nonfinal_ls_queries_enabled_);
if (celldb_cache_size_) {
validator_options_.write().set_celldb_cache_size(celldb_cache_size_.value());
}
if (catchain_max_block_delay_) {
validator_options_.write().set_catchain_max_block_delay(catchain_max_block_delay_.value());
}
std::vector<ton::BlockIdExt> h;
for (auto &x : conf.validator_->hardforks_) {
@ -2466,16 +2472,9 @@ void ValidatorEngine::load_custom_overlays_config() {
}
for (auto &overlay : custom_overlays_config_->overlays_) {
std::vector<ton::adnl::AdnlNodeIdShort> nodes;
std::map<ton::adnl::AdnlNodeIdShort, int> senders;
for (const auto &node : overlay->nodes_) {
nodes.emplace_back(node->adnl_id_);
if (node->msg_sender_) {
senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->msg_sender_priority_;
}
}
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes),
std::move(senders), overlay->name_, [](td::Result<td::Unit> R) { R.ensure(); });
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_custom_overlay,
ton::validator::fullnode::CustomOverlayParams::fetch(*overlay),
[](td::Result<td::Unit> R) { R.ensure(); });
}
}
@ -3680,11 +3679,10 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_addCustom
senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->msg_sender_priority_;
}
}
std::string name = overlay->name_;
auto params = ton::validator::fullnode::CustomOverlayParams::fetch(*query.overlay_);
td::actor::send_closure(
full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes), std::move(senders),
std::move(name),
[SelfId = actor_id(this), overlay = std::move(overlay),
full_node_, &ton::validator::fullnode::FullNode::add_custom_overlay, std::move(params),
[SelfId = actor_id(this), overlay = std::move(query.overlay_),
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
@ -3714,7 +3712,7 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_delCustom
return;
}
td::actor::send_closure(
full_node_, &ton::validator::fullnode::FullNode::del_ext_msg_overlay, query.name_,
full_node_, &ton::validator::fullnode::FullNode::del_custom_overlay, query.name_,
[SelfId = actor_id(this), name = query.name_, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
@ -4286,6 +4284,27 @@ int main(int argc, char *argv[]) {
p.add_option('\0', "nonfinal-ls", "enable special LS queries to non-finalized blocks", [&]() {
acts.push_back([&x]() { td::actor::send_closure(x, &ValidatorEngine::set_nonfinal_ls_queries_enabled); });
});
p.add_checked_option(
'\0', "celldb-cache-size",
"block cache size for RocksDb in CellDb, in bytes (default: 1G cache shared by archive DB)",
[&](td::Slice s) -> td::Status {
TRY_RESULT(v, td::to_integer_safe<td::uint64>(s));
if (v == 0) {
return td::Status::Error("celldb-cache-size should be positive");
}
acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_celldb_cache_size, v); });
return td::Status::OK();
});
p.add_checked_option(
'\0', "catchain-max-block-delay", "delay before creating a new catchain block, in seconds (default: 0.5)",
[&](td::Slice s) -> td::Status {
auto v = td::to_double(s);
if (v < 0) {
return td::Status::Error("catchain-max-block-delay should be non-negative");
}
acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_catchain_max_block_delay, v); });
return td::Status::OK();
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();

View file

@ -223,6 +223,8 @@ class ValidatorEngine : public td::actor::Actor {
double archive_preload_period_ = 0.0;
bool disable_rocksdb_stats_ = false;
bool nonfinal_ls_queries_enabled_ = false;
td::optional<td::uint64> celldb_cache_size_;
td::optional<double> catchain_max_block_delay_;
bool read_config_ = false;
bool started_keyring_ = false;
bool started_ = false;
@ -298,6 +300,12 @@ class ValidatorEngine : public td::actor::Actor {
void set_nonfinal_ls_queries_enabled() {
nonfinal_ls_queries_enabled_ = true;
}
void set_celldb_cache_size(td::uint64 value) {
celldb_cache_size_ = value;
}
void set_catchain_max_block_delay(double value) {
catchain_max_block_delay_ = value;
}
void set_not_all_shards() {
not_all_shards_ = true;
}

View file

@ -129,7 +129,7 @@ struct ValidatorSessionStats {
}
};
struct Round {
td::uint64 timestamp = 0;
double timestamp = -1.0;
std::vector<Producer> producers;
};
@ -139,7 +139,7 @@ struct ValidatorSessionStats {
bool success = false;
ValidatorSessionId session_id = ValidatorSessionId::zero();
CatchainSeqno cc_seqno = 0;
td::uint64 timestamp = 0;
double timestamp = -1.0;
PublicKeyHash self = PublicKeyHash::zero();
PublicKeyHash creator = PublicKeyHash::zero();
td::uint32 total_validators = 0;
@ -150,6 +150,20 @@ struct ValidatorSessionStats {
ValidatorWeight approve_signatures_weight = 0;
};
struct NewValidatorGroupStats {
struct Node {
PublicKeyHash id = PublicKeyHash::zero();
ValidatorWeight weight = 0;
};
ValidatorSessionId session_id = ValidatorSessionId::zero();
ShardIdFull shard{masterchainId};
CatchainSeqno cc_seqno = 0;
double timestamp = -1.0;
td::uint32 self_idx = 0;
std::vector<Node> nodes;
};
} // namespace validatorsession
} // namespace ton

View file

@ -808,8 +808,8 @@ void ValidatorSessionImpl::request_new_block(bool now) {
} else {
double lambda = 10.0 / description().get_total_nodes();
double x = -1 / lambda * log(td::Random::fast(1, 999) * 0.001);
if (x > 0.5) {
x = 0.5;
if (x > catchain_max_block_delay_) { // default = 0.5
x = catchain_max_block_delay_;
}
td::actor::send_closure(catchain_, &catchain::CatChain::need_new_block, td::Timestamp::in(x));
}
@ -872,7 +872,7 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) {
callback_->on_block_skipped(cur_round_);
} else {
cur_stats_.success = true;
cur_stats_.timestamp = (td::uint64)td::Clocks::system();
cur_stats_.timestamp = td::Clocks::system();
cur_stats_.signatures = (td::uint32)export_sigs.size();
cur_stats_.signatures_weight = signatures_weight;
cur_stats_.approve_signatures = (td::uint32)export_approve_sigs.size();
@ -900,6 +900,12 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) {
cur_round_++;
if (have_block) {
stats_init();
} else {
size_t round_idx = cur_round_ - cur_stats_.first_round;
while (round_idx >= cur_stats_.rounds.size()) {
stats_add_round();
}
cur_stats_.rounds[round_idx].timestamp = td::Clocks::system();
}
auto it2 = blocks_.begin();
while (it2 != blocks_.end()) {
@ -989,9 +995,7 @@ void ValidatorSessionImpl::destroy() {
}
void ValidatorSessionImpl::get_current_stats(td::Promise<ValidatorSessionStats> promise) {
ValidatorSessionStats stats = cur_stats_;
stats.timestamp = (td::uint64)td::Clocks::system();
promise.set_result(std::move(stats));
promise.set_result(cur_stats_);
}
void ValidatorSessionImpl::get_validator_group_info_for_litequery(
@ -1085,26 +1089,31 @@ void ValidatorSessionImpl::stats_init() {
++it;
}
}
if (cur_stats_.rounds.empty()) {
stats_add_round();
}
cur_stats_.rounds[0].timestamp = td::Clocks::system();
stats_inited_ = true;
}
void ValidatorSessionImpl::stats_add_round() {
td::uint32 round = cur_stats_.first_round + cur_stats_.rounds.size();
cur_stats_.rounds.emplace_back();
auto& round = cur_stats_.rounds.back();
round.timestamp = (td::uint64)td::Clocks::system();
round.producers.resize(description().get_max_priority() + 1);
auto& stat = cur_stats_.rounds.back();
stat.producers.resize(description().get_max_priority() + 1);
for (td::uint32 i = 0; i < description().get_total_nodes(); i++) {
td::int32 priority = description().get_node_priority(i, cur_round_);
td::int32 priority = description().get_node_priority(i, round);
if (priority >= 0) {
CHECK((size_t)priority < round.producers.size());
round.producers[priority].id = description().get_source_id(i);
round.producers[priority].is_ours = (local_idx() == i);
round.producers[priority].approvers.resize(description().get_total_nodes(), false);
round.producers[priority].signers.resize(description().get_total_nodes(), false);
CHECK((size_t)priority < stat.producers.size());
stat.producers[priority].id = description().get_source_id(i);
stat.producers[priority].is_ours = (local_idx() == i);
stat.producers[priority].approvers.resize(description().get_total_nodes(), false);
stat.producers[priority].signers.resize(description().get_total_nodes(), false);
}
}
while (!round.producers.empty() && round.producers.back().id.is_zero()) {
round.producers.pop_back();
while (!stat.producers.empty() && stat.producers.back().id.is_zero()) {
stat.producers.pop_back();
}
}

View file

@ -108,6 +108,7 @@ class ValidatorSession : public td::actor::Actor {
virtual void get_validator_group_info_for_litequery(
td::uint32 cur_round,
td::Promise<std::vector<tl_object_ptr<lite_api::liteServer_nonfinal_candidateInfo>>> promise) = 0;
virtual void set_catchain_max_block_delay(double value) = 0;
virtual void get_session_info(td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionInfo>> promise) = 0;

View file

@ -90,6 +90,8 @@ class ValidatorSessionImpl : public ValidatorSession {
td::actor::ActorOwn<catchain::CatChain> catchain_;
std::unique_ptr<ValidatorSessionDescription> description_;
double catchain_max_block_delay_ = 0.5;
void on_new_round(td::uint32 round);
void on_catchain_started();
void check_vote_for_slot(td::uint32 att);
@ -190,6 +192,9 @@ class ValidatorSessionImpl : public ValidatorSession {
void get_validator_group_info_for_litequery(
td::uint32 cur_round,
td::Promise<std::vector<tl_object_ptr<lite_api::liteServer_nonfinal_candidateInfo>>> promise) override;
void set_catchain_max_block_delay(double value) override {
catchain_max_block_delay_ = value;
}
void process_blocks(std::vector<catchain::CatChainBlock *> blocks);
void finished_processing();

View file

@ -832,7 +832,10 @@ void ArchiveManager::start_up() {
if (!opts_->get_disable_rocksdb_stats()) {
statistics_.init();
}
index_ = std::make_shared<td::RocksDb>(td::RocksDb::open(db_root_ + "/files/globalindex", statistics_.rocksdb_statistics).move_as_ok());
td::RocksDbOptions db_options;
db_options.statistics = statistics_.rocksdb_statistics;
index_ = std::make_shared<td::RocksDb>(
td::RocksDb::open(db_root_ + "/files/globalindex", std::move(db_options)).move_as_ok());
std::string value;
auto v = index_->get(create_serialize_tl_object<ton_api::db_files_index_key>().as_slice(), value);
v.ensure();

View file

@ -554,7 +554,9 @@ void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::
void ArchiveSlice::before_query() {
if (status_ == st_closed) {
LOG(DEBUG) << "Opening archive slice " << db_path_;
kv_ = std::make_unique<td::RocksDb>(td::RocksDb::open(db_path_, statistics_.rocksdb_statistics).move_as_ok());
td::RocksDbOptions db_options;
db_options.statistics = statistics_.rocksdb_statistics;
kv_ = std::make_unique<td::RocksDb>(td::RocksDb::open(db_path_, std::move(db_options)).move_as_ok());
std::string value;
auto R2 = kv_->get("status", value);
R2.ensure();

View file

@ -88,7 +88,13 @@ void CellDbIn::start_up() {
statistics_ = td::RocksDb::create_statistics();
statistics_flush_at_ = td::Timestamp::in(60.0);
}
cell_db_ = std::make_shared<td::RocksDb>(td::RocksDb::open(path_, statistics_).move_as_ok());
td::RocksDbOptions db_options;
db_options.statistics = statistics_;
if (opts_->get_celldb_cache_size()) {
db_options.block_cache_size = opts_->get_celldb_cache_size().value();
LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(db_options.block_cache_size.value());
}
cell_db_ = std::make_shared<td::RocksDb>(td::RocksDb::open(path_, std::move(db_options)).move_as_ok());
boc_ = vm::DynamicBagOfCellsDb::create();

View file

@ -41,17 +41,7 @@ void FullNodePrivateOverlayV2::process_block_broadcast(PublicKeyHash src, ton_ap
}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast in private overlay from " << src << ": "
<< B.ok().block_id.to_str();
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
LOG(DEBUG) << "dropped broadcast: " << R.move_as_error();
} else {
LOG(INFO) << "dropped broadcast: " << R.move_as_error();
}
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, B.move_as_ok(),
std::move(P));
td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok());
}
void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
@ -220,7 +210,8 @@ void FullNodePrivateBlockOverlays::update_overlays(
const td::actor::ActorId<keyring::Keyring> &keyring, const td::actor::ActorId<adnl::Adnl> &adnl,
const td::actor::ActorId<rldp::Rldp> &rldp, const td::actor::ActorId<rldp2::Rldp> &rldp2,
const td::actor::ActorId<overlay::Overlays> &overlays,
const td::actor::ActorId<ValidatorManagerInterface> &validator_manager) {
const td::actor::ActorId<ValidatorManagerInterface> &validator_manager,
const td::actor::ActorId<FullNode> &full_node) {
if (my_adnl_ids.empty()) {
id_to_overlays_.clear();
return;
@ -314,7 +305,7 @@ void FullNodePrivateBlockOverlays::update_overlays(
new_overlay.is_sender_ = std::binary_search(new_overlay.senders_.begin(), new_overlay.senders_.end(), local_id);
new_overlay.overlay_ = td::actor::create_actor<FullNodePrivateOverlayV2>(
"BlocksPrivateOverlay", local_id, shard, new_overlay.nodes_, new_overlay.senders_, zero_state_file_hash,
keyring, adnl, rldp, rldp2, overlays, validator_manager);
keyring, adnl, rldp, rldp2, overlays, validator_manager, full_node);
}
}
}

View file

@ -22,13 +22,13 @@ namespace ton::validator::fullnode {
class FullNodePrivateOverlayV2 : public td::actor::Actor {
public:
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query);
void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast& query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed& query);
void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast& query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast& query);
template <class T>
void process_broadcast(PublicKeyHash, T &) {
void process_broadcast(PublicKeyHash, T&) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
}
void receive_broadcast(PublicKeyHash src, td::BufferSlice query);
@ -48,7 +48,8 @@ class FullNodePrivateOverlayV2 : public td::actor::Actor {
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<FullNode> full_node)
: local_id_(local_id)
, shard_(shard)
, nodes_(std::move(nodes))
@ -59,7 +60,8 @@ class FullNodePrivateOverlayV2 : public td::actor::Actor {
, rldp_(rldp)
, rldp2_(rldp2)
, overlays_(overlays)
, validator_manager_(validator_manager) {
, validator_manager_(validator_manager)
, full_node_(full_node) {
}
private:
@ -75,6 +77,7 @@ class FullNodePrivateOverlayV2 : public td::actor::Actor {
td::actor::ActorId<rldp2::Rldp> rldp2_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
td::actor::ActorId<FullNode> full_node_;
bool inited_ = false;
overlay::OverlayIdFull overlay_id_full_;
@ -94,7 +97,8 @@ class FullNodePrivateBlockOverlays {
const td::actor::ActorId<adnl::Adnl>& adnl, const td::actor::ActorId<rldp::Rldp>& rldp,
const td::actor::ActorId<rldp2::Rldp>& rldp2,
const td::actor::ActorId<overlay::Overlays>& overlays,
const td::actor::ActorId<ValidatorManagerInterface>& validator_manager);
const td::actor::ActorId<ValidatorManagerInterface>& validator_manager,
const td::actor::ActorId<FullNode>& full_node);
private:
struct Overlays {

View file

@ -38,17 +38,7 @@ void FullNodePrivateBlockOverlay::process_block_broadcast(PublicKeyHash src, ton
}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast in private overlay from " << src << ": "
<< B.ok().block_id.to_str();
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
LOG(DEBUG) << "dropped broadcast: " << R.move_as_error();
} else {
LOG(INFO) << "dropped broadcast: " << R.move_as_error();
}
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, B.move_as_ok(),
std::move(P));
td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok());
}
void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
@ -60,6 +50,9 @@ void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::
}
void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
}
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
return;
@ -169,18 +162,47 @@ void FullNodePrivateBlockOverlay::tear_down() {
}
}
void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query) {
auto it = senders_.find(adnl::AdnlNodeIdShort{src});
if (it == senders_.end()) {
void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
process_block_broadcast(src, query);
}
void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query) {
process_block_broadcast(src, query);
}
void FullNodeCustomOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
if (!block_senders_.count(adnl::AdnlNodeIdShort(src))) {
VLOG(FULL_NODE_DEBUG) << "Dropping block broadcast in private overlay \"" << name_ << "\" from unauthorized sender "
<< src;
return;
}
LOG(FULL_NODE_DEBUG) << "Got external message in private overlay \"" << name_ << "\" from " << src
auto B = deserialize_block_broadcast(query, overlay::Overlays::max_fec_broadcast_size());
if (B.is_error()) {
LOG(DEBUG) << "dropped broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast in custom overlay \"" << name_ << "\" from " << src << ": "
<< B.ok().block_id.to_str();
td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok());
}
void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query) {
auto it = msg_senders_.find(adnl::AdnlNodeIdShort{src});
if (it == msg_senders_.end()) {
VLOG(FULL_NODE_DEBUG) << "Dropping external message broadcast in custom overlay \"" << name_
<< "\" from unauthorized sender " << src;
return;
}
VLOG(FULL_NODE_DEBUG) << "Got external message in custom overlay \"" << name_ << "\" from " << src
<< " (priority=" << it->second << ")";
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message,
std::move(query.message_->data_), it->second);
}
void FullNodeCustomOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
}
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
return;
@ -192,7 +214,7 @@ void FullNodeCustomOverlay::send_external_message(td::BufferSlice data) {
if (!inited_ || config_.ext_messages_broadcast_disabled_) {
return;
}
LOG(FULL_NODE_DEBUG) << "Sending external message to private overlay \"" << name_ << "\"";
VLOG(FULL_NODE_DEBUG) << "Sending external message to custom overlay \"" << name_ << "\"";
auto B = create_serialize_tl_object<ton_api::tonNode_externalMessageBroadcast>(
create_tl_object<ton_api::tonNode_externalMessage>(std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
@ -204,6 +226,21 @@ void FullNodeCustomOverlay::send_external_message(td::BufferSlice data) {
}
}
void FullNodeCustomOverlay::send_broadcast(BlockBroadcast broadcast) {
if (!inited_) {
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending block broadcast to custom overlay \"" << name_
<< "\": " << broadcast.block_id.to_str();
auto B = serialize_block_broadcast(broadcast, true); // compression_enabled = true
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error();
return;
}
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodeCustomOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
@ -234,7 +271,8 @@ void FullNodeCustomOverlay::try_init() {
void FullNodeCustomOverlay::init() {
LOG(FULL_NODE_WARNING) << "Creating custom overlay \"" << name_ << "\" for adnl id " << local_id_ << " : "
<< nodes_.size() << " nodes, overlay_id=" << overlay_id_;
<< nodes_.size() << " nodes, " << msg_senders_.size() << " msg senders, "
<< block_senders_.size() << " block senders, overlay_id=" << overlay_id_;
class Callback : public overlay::Overlays::Callback {
public:
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
@ -256,9 +294,12 @@ void FullNodeCustomOverlay::init() {
};
std::map<PublicKeyHash, td::uint32> authorized_keys;
for (const auto &sender : senders_) {
for (const auto &sender : msg_senders_) {
authorized_keys[sender.first.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size();
}
for (const auto &sender : block_senders_) {
authorized_keys[sender.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size();
}
overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), 0, std::move(authorized_keys)};
td::actor::send_closure(
overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), nodes_,

View file

@ -52,7 +52,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<FullNode> full_node)
: local_id_(local_id)
, nodes_(std::move(nodes))
, zero_state_file_hash_(zero_state_file_hash)
@ -63,7 +64,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
, rldp_(rldp)
, rldp2_(rldp2)
, overlays_(overlays)
, validator_manager_(validator_manager) {
, validator_manager_(validator_manager)
, full_node_(full_node) {
}
private:
@ -79,6 +81,7 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
td::actor::ActorId<rldp2::Rldp> rldp2_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
td::actor::ActorId<FullNode> full_node_;
bool inited_ = false;
overlay::OverlayIdFull overlay_id_full_;
@ -90,6 +93,10 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
class FullNodeCustomOverlay : public td::actor::Actor {
public:
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query);
void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query);
template <class T>
void process_broadcast(PublicKeyHash, T &) {
@ -98,6 +105,7 @@ class FullNodeCustomOverlay : public td::actor::Actor {
void receive_broadcast(PublicKeyHash src, td::BufferSlice query);
void send_external_message(td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);
void set_config(FullNodeConfig config) {
config_ = std::move(config);
@ -106,16 +114,17 @@ class FullNodeCustomOverlay : public td::actor::Actor {
void start_up() override;
void tear_down() override;
FullNodeCustomOverlay(adnl::AdnlNodeIdShort local_id, std::vector<adnl::AdnlNodeIdShort> nodes,
std::map<adnl::AdnlNodeIdShort, int> senders, std::string name, FileHash zero_state_file_hash,
FullNodeCustomOverlay(adnl::AdnlNodeIdShort local_id, CustomOverlayParams params, FileHash zero_state_file_hash,
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
td::actor::ActorId<rldp2::Rldp> rldp2, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<FullNode> full_node)
: local_id_(local_id)
, nodes_(std::move(nodes))
, senders_(std::move(senders))
, name_(std::move(name))
, name_(std::move(params.name_))
, nodes_(std::move(params.nodes_))
, msg_senders_(std::move(params.msg_senders_))
, block_senders_(std::move(params.block_senders_))
, zero_state_file_hash_(zero_state_file_hash)
, config_(config)
, keyring_(keyring)
@ -123,14 +132,16 @@ class FullNodeCustomOverlay : public td::actor::Actor {
, rldp_(rldp)
, rldp2_(rldp2)
, overlays_(overlays)
, validator_manager_(validator_manager) {
, validator_manager_(validator_manager)
, full_node_(full_node) {
}
private:
adnl::AdnlNodeIdShort local_id_;
std::vector<adnl::AdnlNodeIdShort> nodes_;
std::map<adnl::AdnlNodeIdShort, int> senders_;
std::string name_;
std::vector<adnl::AdnlNodeIdShort> nodes_;
std::map<adnl::AdnlNodeIdShort, int> msg_senders_;
std::set<adnl::AdnlNodeIdShort> block_senders_;
FileHash zero_state_file_hash_;
FullNodeConfig config_;
@ -140,6 +151,7 @@ class FullNodeCustomOverlay : public td::actor::Actor {
td::actor::ActorId<rldp2::Rldp> rldp2_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
td::actor::ActorId<FullNode> full_node_;
bool inited_ = false;
overlay::OverlayIdFull overlay_id_full_;

View file

@ -759,17 +759,7 @@ void FullNodeShardImpl::process_block_broadcast(PublicKeyHash src, ton_api::tonN
// return;
//}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast from " << src << ": " << B.ok().block_id.to_str();
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
LOG(DEBUG) << "dropped broadcast: " << R.move_as_error();
} else {
LOG(INFO) << "dropped broadcast: " << R.move_as_error();
}
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, B.move_as_ok(),
std::move(P));
td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok());
}
void FullNodeShardImpl::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
@ -1340,7 +1330,8 @@ FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, FullNodeShardMode mode)
td::actor::ActorId<adnl::AdnlExtClient> client,
td::actor::ActorId<FullNode> full_node, FullNodeShardMode mode)
: shard_(shard)
, local_id_(local_id)
, adnl_id_(adnl_id)
@ -1352,7 +1343,8 @@ FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id,
, overlays_(overlays)
, validator_manager_(validator_manager)
, client_(client)
, mode_(shard.is_masterchain() ? FullNodeShardMode::active : mode)
, full_node_(full_node)
, mode_(mode)
, config_(config) {
}
@ -1361,10 +1353,10 @@ td::actor::ActorOwn<FullNodeShard> FullNodeShard::create(
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, FullNodeShardMode mode) {
return td::actor::create_actor<FullNodeShardImpl>(PSTRING() << "fullnode" << shard.to_str(), shard, local_id, adnl_id,
zero_state_file_hash, config, keyring, adnl, rldp, rldp2, overlays,
validator_manager, client, mode);
td::actor::ActorId<adnl::AdnlExtClient> client, td::actor::ActorId<FullNode> full_node, FullNodeShardMode mode) {
return td::actor::create_actor<FullNodeShardImpl>("tonnode", shard, local_id, adnl_id, zero_state_file_hash, config,
keyring, adnl, rldp, rldp2, overlays, validator_manager, client,
full_node, mode);
}
} // namespace fullnode

View file

@ -83,7 +83,8 @@ class FullNodeShard : public td::actor::Actor {
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, FullNodeShardMode mode = FullNodeShardMode::active);
td::actor::ActorId<adnl::AdnlExtClient> client, td::actor::ActorId<FullNode> full_node,
FullNodeShardMode mode = FullNodeShardMode::active);
};
} // namespace fullnode

View file

@ -237,7 +237,8 @@ class FullNodeShardImpl : public FullNodeShard {
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
td::actor::ActorId<rldp2::Rldp> rldp2, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, FullNodeShardMode mode = FullNodeShardMode::active);
td::actor::ActorId<adnl::AdnlExtClient> client, td::actor::ActorId<FullNode> full_node,
FullNodeShardMode mode = FullNodeShardMode::active);
private:
bool use_new_download() const {
@ -263,6 +264,7 @@ class FullNodeShardImpl : public FullNodeShard {
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
td::actor::ActorId<adnl::AdnlExtClient> client_;
td::actor::ActorId<FullNode> full_node_;
td::uint32 attempt_ = 0;

View file

@ -38,8 +38,8 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
local_keys_.insert(key);
// create_private_block_overlay(key);
for (auto &p : private_custom_overlays_) {
update_ext_msg_overlay(p.first, p.second);
for (auto &p : custom_overlays_) {
update_custom_overlay(p.second);
}
if (!sign_cert_by_.is_zero()) {
@ -68,8 +68,8 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
}
local_keys_.erase(key);
// private_block_overlays_.erase(key);
for (auto &p : private_custom_overlays_) {
update_ext_msg_overlay(p.first, p.second);
for (auto &p : custom_overlays_) {
update_custom_overlay(p.second);
}
if (sign_cert_by_ != key) {
@ -139,8 +139,8 @@ void FullNodeImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td:
}
local_id_ = adnl_id_.pubkey_hash();
for (auto &p : private_custom_overlays_) {
update_ext_msg_overlay(p.first, p.second);
for (auto &p : custom_overlays_) {
update_custom_overlay(p.second);
}
}
@ -154,40 +154,37 @@ void FullNodeImpl::set_config(FullNodeConfig config) {
/*for (auto& overlay : private_block_overlays_) {
td::actor::send_closure(overlay.second, &FullNodePrivateBlockOverlay::set_config, config);
}*/
for (auto& overlay : private_custom_overlays_) {
for (auto& overlay : custom_overlays_) {
for (auto &actor : overlay.second.actors_) {
td::actor::send_closure(actor.second, &FullNodeCustomOverlay::set_config, config);
}
}
}
void FullNodeImpl::add_ext_msg_overlay(std::vector<adnl::AdnlNodeIdShort> nodes,
std::map<adnl::AdnlNodeIdShort, int> senders, std::string name,
td::Promise<td::Unit> promise) {
if (nodes.empty()) {
void FullNodeImpl::add_custom_overlay(CustomOverlayParams params, td::Promise<td::Unit> promise) {
if (params.nodes_.empty()) {
promise.set_error(td::Status::Error("list of nodes is empty"));
return;
}
if (private_custom_overlays_.count(name)) {
promise.set_error(td::Status::Error(PSTRING() << "duplicate overlay name \"" << name << "\""));
std::string name = params.name_;
if (custom_overlays_.count(name)) {
promise.set_error(td::Status::Error(PSTRING() << "duplicate custom overlay name \"" << name << "\""));
return;
}
VLOG(FULL_NODE_WARNING) << "Adding private overlay for external messages \"" << name << "\", " << nodes.size()
<< " nodes";
auto &p = private_custom_overlays_[name];
p.nodes_ = nodes;
p.senders_ = senders;
update_ext_msg_overlay(name, p);
VLOG(FULL_NODE_WARNING) << "Adding custom overlay \"" << name << "\", " << params.nodes_.size() << " nodes";
auto &p = custom_overlays_[name];
p.params_ = std::move(params);
update_custom_overlay(p);
promise.set_result(td::Unit());
}
void FullNodeImpl::del_ext_msg_overlay(std::string name, td::Promise<td::Unit> promise) {
auto it = private_custom_overlays_.find(name);
if (it == private_custom_overlays_.end()) {
void FullNodeImpl::del_custom_overlay(std::string name, td::Promise<td::Unit> promise) {
auto it = custom_overlays_.find(name);
if (it == custom_overlays_.end()) {
promise.set_error(td::Status::Error(PSTRING() << "no such overlay \"" << name << "\""));
return;
}
private_custom_overlays_.erase(it);
custom_overlays_.erase(it);
promise.set_result(td::Unit());
}
@ -290,7 +287,7 @@ void FullNodeImpl::update_shard_configuration(td::Ref<MasterchainState> state, s
}
}
private_block_overlays_.update_overlays(state, std::move(my_adnl_ids), zero_state_file_hash_, keyring_, adnl_, rldp_,
rldp2_, overlays_, validator_manager_);
rldp2_, overlays_, validator_manager_, actor_id(this));
}
void FullNodeImpl::add_shard_actor(ShardIdFull shard, FullNodeShardMode mode) {
@ -299,7 +296,7 @@ void FullNodeImpl::add_shard_actor(ShardIdFull shard, FullNodeShardMode mode) {
return;
}
info.actor = FullNodeShard::create(shard, local_id_, adnl_id_, zero_state_file_hash_, config_, keyring_, adnl_, rldp_,
rldp2_, overlays_, validator_manager_, client_, mode);
rldp2_, overlays_, validator_manager_, client_, actor_id(this), mode);
info.mode = mode;
info.delete_at = mode != FullNodeShardMode::inactive ? td::Timestamp::never() : td::Timestamp::in(INACTIVE_SHARD_TTL);
if (all_validators_.size() > 0) {
@ -326,10 +323,10 @@ void FullNodeImpl::send_ext_message(AccountIdPrefixFull dst, td::BufferSlice dat
VLOG(FULL_NODE_WARNING) << "dropping OUT ext message to unknown shard";
return;
}
for (auto &private_overlay : private_custom_overlays_) {
for (auto &private_overlay : custom_overlays_) {
for (auto &actor : private_overlay.second.actors_) {
auto local_id = actor.first;
if (private_overlay.second.senders_.count(local_id)) {
if (private_overlay.second.params_.msg_senders_.count(local_id)) {
td::actor::send_closure(actor.second, &FullNodeCustomOverlay::send_external_message, data.clone());
}
}
@ -355,7 +352,11 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s
td::actor::send_closure(shard, &FullNodeShard::send_shard_block_info, block_id, cc_seqno, std::move(data));
}
void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) {
void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) {
send_block_broadcast_to_custom_overlays(broadcast);
if (custom_overlays_only) {
return;
}
auto shard = get_shard(broadcast.block_id.shard_full());
if (shard.empty()) {
VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard";
@ -560,7 +561,22 @@ void FullNodeImpl::new_key_block(BlockHandle handle) {
std::move(P));
}
void FullNodeImpl::process_block_broadcast(BlockBroadcast broadcast) {
send_block_broadcast_to_custom_overlays(broadcast);
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, std::move(broadcast),
[](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
LOG(DEBUG) << "dropped broadcast: " << R.move_as_error();
} else {
LOG(INFO) << "dropped broadcast: " << R.move_as_error();
}
}
});
}
void FullNodeImpl::start_up() {
add_shard_actor(ShardIdFull{masterchainId}, FullNodeShardMode::active);
if (local_id_.is_zero()) {
if (adnl_id_.is_zero()) {
auto pk = ton::PrivateKey{ton::privkeys::Ed25519::random()};
@ -590,8 +606,8 @@ void FullNodeImpl::start_up() {
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override {
td::actor::send_closure(id_, &FullNodeImpl::send_shard_block_info, block_id, cc_seqno, std::move(data));
}
void send_broadcast(BlockBroadcast broadcast) override {
td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast));
void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override {
td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), custom_overlays_only);
}
void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<ReceivedBlock> promise) override {
@ -648,8 +664,8 @@ void FullNodeImpl::start_up() {
}
void FullNodeImpl::update_private_overlays() {
for (auto &p : private_custom_overlays_) {
update_ext_msg_overlay(p.first, p.second);
for (auto &p : custom_overlays_) {
update_custom_overlay(p.second);
}
/*private_block_overlays_.clear();
if (local_keys_.empty()) {
@ -677,30 +693,32 @@ void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) {
for (const auto &p : current_validators_) {
nodes.push_back(p.second);
}
private_block_overlays_[key] = td::actor::create_actor<FullNodePrivateOverlay>(
"BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_, keyring_,
adnl_, rldp_, rldp2_, overlays_, validator_manager_);
private_block_overlays_[key] = td::actor::create_actor<FullNodePrivateBlockOverlay>(
"BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_,
private_block_overlays_enable_compression_, keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_,
actor_id(this));
}
}*/
void FullNodeImpl::update_ext_msg_overlay(const std::string &name, ExtMsgOverlayInfo &overlay) {
void FullNodeImpl::update_custom_overlay(CustomOverlayInfo &overlay) {
auto old_actors = std::move(overlay.actors_);
overlay.actors_.clear();
CustomOverlayParams &params = overlay.params_;
auto try_local_id = [&](const adnl::AdnlNodeIdShort &local_id) {
if (std::find(overlay.nodes_.begin(), overlay.nodes_.end(), local_id) != overlay.nodes_.end()) {
if (std::find(params.nodes_.begin(), params.nodes_.end(), local_id) != params.nodes_.end()) {
auto it = old_actors.find(local_id);
if (it != old_actors.end()) {
overlay.actors_[local_id] = std::move(it->second);
old_actors.erase(it);
} else {
overlay.actors_[local_id] = td::actor::create_actor<FullNodeCustomOverlay>(
"CustomOverlay", local_id, overlay.nodes_, overlay.senders_, name, zero_state_file_hash_, config_,
keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_);
"CustomOverlay", local_id, params, zero_state_file_hash_, config_, keyring_, adnl_, rldp_, rldp2_,
overlays_, validator_manager_, actor_id(this));
}
}
};
try_local_id(adnl_id_);
for (const PublicKeyHash &local_key: local_keys_) {
for (const PublicKeyHash &local_key : local_keys_) {
auto it = current_validators_.find(local_key);
if (it != current_validators_.end()) {
try_local_id(it->second);
@ -708,6 +726,25 @@ void FullNodeImpl::update_ext_msg_overlay(const std::string &name, ExtMsgOverlay
}
}
void FullNodeImpl::send_block_broadcast_to_custom_overlays(const BlockBroadcast& broadcast) {
if (!custom_overlays_sent_broadcasts_.insert(broadcast.block_id).second) {
return;
}
custom_overlays_sent_broadcasts_lru_.push(broadcast.block_id);
if (custom_overlays_sent_broadcasts_lru_.size() > 256) {
custom_overlays_sent_broadcasts_.erase(custom_overlays_sent_broadcasts_lru_.front());
custom_overlays_sent_broadcasts_lru_.pop();
}
for (auto &private_overlay : custom_overlays_) {
for (auto &actor : private_overlay.second.actors_) {
auto local_id = actor.first;
if (private_overlay.second.params_.block_senders_.count(local_id)) {
td::actor::send_closure(actor.second, &FullNodeCustomOverlay::send_broadcast, broadcast.clone());
}
}
}
}
FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash,
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
@ -730,7 +767,6 @@ FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id
, db_root_(db_root)
, started_promise_(std::move(started_promise))
, config_(config) {
add_shard_actor(ShardIdFull{masterchainId}, FullNodeShardMode::active);
}
td::actor::ActorOwn<FullNode> FullNode::create(
@ -758,6 +794,21 @@ bool FullNodeConfig::operator!=(const FullNodeConfig &rhs) const {
return !(*this == rhs);
}
CustomOverlayParams CustomOverlayParams::fetch(const ton_api::engine_validator_customOverlay& f) {
CustomOverlayParams c;
c.name_ = f.name_;
for (const auto &node : f.nodes_) {
c.nodes_.emplace_back(node->adnl_id_);
if (node->msg_sender_) {
c.msg_senders_[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->msg_sender_priority_;
}
if (node->block_sender_) {
c.block_senders_.emplace(node->adnl_id_);
}
}
return c;
}
} // namespace fullnode
} // namespace validator

View file

@ -55,6 +55,15 @@ struct FullNodeConfig {
bool ext_messages_broadcast_disabled_ = false;
};
struct CustomOverlayParams {
std::string name_;
std::vector<adnl::AdnlNodeIdShort> nodes_;
std::map<adnl::AdnlNodeIdShort, int> msg_senders_;
std::set<adnl::AdnlNodeIdShort> block_senders_;
static CustomOverlayParams fetch(const ton_api::engine_validator_customOverlay& f);
};
class FullNode : public td::actor::Actor {
public:
virtual ~FullNode() = default;
@ -75,10 +84,10 @@ class FullNode : public td::actor::Actor {
virtual void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) = 0;
virtual void set_config(FullNodeConfig config) = 0;
virtual void add_ext_msg_overlay(std::vector<adnl::AdnlNodeIdShort> nodes,
std::map<adnl::AdnlNodeIdShort, int> senders, std::string name,
td::Promise<td::Unit> promise) = 0;
virtual void del_ext_msg_overlay(std::string name, td::Promise<td::Unit> promise) = 0;
virtual void add_custom_overlay(CustomOverlayParams params, td::Promise<td::Unit> promise) = 0;
virtual void del_custom_overlay(std::string name, td::Promise<td::Unit> promise) = 0;
virtual void process_block_broadcast(BlockBroadcast broadcast) = 0;
static constexpr td::uint32 max_block_size() {
return 4 << 20;

View file

@ -28,6 +28,7 @@
#include <map>
#include <set>
#include <queue>
namespace ton {
@ -56,9 +57,8 @@ class FullNodeImpl : public FullNode {
void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) override;
void set_config(FullNodeConfig config) override;
void add_ext_msg_overlay(std::vector<adnl::AdnlNodeIdShort> nodes, std::map<adnl::AdnlNodeIdShort, int> senders,
std::string name, td::Promise<td::Unit> promise) override;
void del_ext_msg_overlay(std::string name, td::Promise<td::Unit> promise) override;
void add_custom_overlay(CustomOverlayParams params, td::Promise<td::Unit> promise) override;
void del_custom_overlay(std::string name, td::Promise<td::Unit> promise) override;
void update_shard_configuration(td::Ref<MasterchainState> state, std::set<ShardIdFull> shards_to_monitor,
std::set<ShardIdFull> temporary_shards);
@ -69,7 +69,7 @@ class FullNodeImpl : public FullNode {
void send_ihr_message(AccountIdPrefixFull dst, td::BufferSlice data);
void send_ext_message(AccountIdPrefixFull dst, td::BufferSlice data);
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqnp, td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);
void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only);
void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise<ReceivedBlock> promise);
void download_zero_state(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::BufferSlice> promise);
@ -89,6 +89,8 @@ class FullNodeImpl : public FullNode {
void got_key_block_state(td::Ref<ShardState> state);
void new_key_block(BlockHandle handle);
void process_block_broadcast(BlockBroadcast broadcast) override;
void start_up() override;
FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash,
@ -146,16 +148,19 @@ class FullNodeImpl : public FullNode {
void create_private_block_overlay(PublicKeyHash key);
*/
struct ExtMsgOverlayInfo {
std::vector<adnl::AdnlNodeIdShort> nodes_;
std::map<adnl::AdnlNodeIdShort, int> senders_;
std::map<adnl::AdnlNodeIdShort, td::actor::ActorOwn<FullNodeCustomOverlay>>
actors_; // our local id -> actor
struct CustomOverlayInfo {
CustomOverlayParams params_;
std::map<adnl::AdnlNodeIdShort, td::actor::ActorOwn<FullNodeCustomOverlay>> actors_; // our local id -> actor
};
std::map<std::string, ExtMsgOverlayInfo> private_custom_overlays_;
std::map<std::string, CustomOverlayInfo> custom_overlays_;
std::set<BlockIdExt> custom_overlays_sent_broadcasts_;
std::queue<BlockIdExt> custom_overlays_sent_broadcasts_lru_;
void update_private_overlays();
void update_ext_msg_overlay(const std::string& name, ExtMsgOverlayInfo& overlay);
// void set_private_block_overlays_enable_compression(bool value);
void update_custom_overlay(CustomOverlayInfo& overlay);
void send_block_broadcast_to_custom_overlays(const BlockBroadcast& broadcast);
FullNodePrivateBlockOverlays private_block_overlays_;
};

View file

@ -908,11 +908,6 @@ void AcceptBlockQuery::written_block_info_2() {
}
void AcceptBlockQuery::applied() {
if (!send_broadcast_) {
finish_query();
return;
}
BlockBroadcast b;
b.data = data_->data();
b.block_id = id_;
@ -932,7 +927,8 @@ void AcceptBlockQuery::applied() {
}
// do not wait for answer
td::actor::send_closure_later(manager_, &ValidatorManager::send_block_broadcast, std::move(b));
td::actor::send_closure_later(manager_, &ValidatorManager::send_block_broadcast, std::move(b),
/* custom_overlays_only = */ !send_broadcast_);
finish_query();
}

View file

@ -130,7 +130,7 @@ class ValidatorManager : public ValidatorManagerInterface {
virtual void send_external_message(td::Ref<ExtMessage> message) = 0;
virtual void send_ihr_message(td::Ref<IhrMessage> message) = 0;
virtual void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) = 0;
virtual void send_block_broadcast(BlockBroadcast broadcast) = 0;
virtual void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) = 0;
virtual void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) = 0;
@ -171,6 +171,7 @@ class ValidatorManager : public ValidatorManagerInterface {
virtual void wait_shard_client_state(BlockSeqno seqno, td::Timestamp timeout, td::Promise<td::Unit> promise) = 0;
virtual void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) = 0;
virtual void log_new_validator_group_stats(validatorsession::NewValidatorGroupStats stats) = 0;
virtual void get_block_handle_for_litequery(BlockIdExt block_id, td::Promise<ConstBlockHandle> promise) = 0;
virtual void get_block_data_for_litequery(BlockIdExt block_id, td::Promise<td::Ref<BlockData>> promise) = 0;

View file

@ -255,7 +255,7 @@ class ValidatorManagerImpl : public ValidatorManager {
new_ihr_message(message->serialize());
}
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override;
void send_block_broadcast(BlockBroadcast broadcast) override {
void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override {
}
void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
@ -387,6 +387,9 @@ class ValidatorManagerImpl : public ValidatorManager {
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override {
UNREACHABLE();
}
void log_new_validator_group_stats(validatorsession::NewValidatorGroupStats stats) override {
UNREACHABLE();
}
void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) override {
if (queue_size_counter_.empty()) {
queue_size_counter_ = td::actor::create_actor<QueueSizeCounter>("queuesizecounter", td::Ref<MasterchainState>{},

View file

@ -321,7 +321,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override {
UNREACHABLE();
}
void send_block_broadcast(BlockBroadcast broadcast) override {
void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override {
}
void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
@ -449,6 +449,9 @@ class ValidatorManagerImpl : public ValidatorManager {
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override {
UNREACHABLE();
}
void log_new_validator_group_stats(validatorsession::NewValidatorGroupStats stats) override {
UNREACHABLE();
}
void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) override {
if (queue_size_counter_.empty()) {
queue_size_counter_ = td::actor::create_actor<QueueSizeCounter>("queuesizecounter", td::Ref<MasterchainState>{},

View file

@ -1617,8 +1617,8 @@ void ValidatorManagerImpl::send_top_shard_block_description(td::Ref<ShardTopBloc
}
}
void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast) {
callback_->send_broadcast(std::move(broadcast));
void ValidatorManagerImpl::send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) {
callback_->send_broadcast(std::move(broadcast), custom_overlays_only);
}
void ValidatorManagerImpl::send_get_out_msg_queue_proof_request(
@ -2314,8 +2314,7 @@ td::actor::ActorOwn<ValidatorGroup> ValidatorManagerImpl::create_validator_group
auto G = td::actor::create_actor<ValidatorGroup>(
"validatorgroup", shard, validator_id, session_id, validator_set,
last_masterchain_state_->get_collator_config(true), opts, keyring_, adnl_, rldp_, overlays_, db_root_,
actor_id(this), init_session, opts_->check_unsafe_resync_allowed(validator_set->get_catchain_seqno()),
opts_->validator_mode());
actor_id(this), init_session, opts_->check_unsafe_resync_allowed(validator_set->get_catchain_seqno()), opts_);
return G;
}
}
@ -2928,7 +2927,7 @@ void ValidatorManagerImpl::log_validator_session_stats(BlockIdExt block_id,
stats.cc_seqno, stats.creator.bits256_value(), stats.total_validators, stats.total_weight, stats.signatures,
stats.signatures_weight, stats.approve_signatures, stats.approve_signatures_weight, stats.first_round,
std::move(rounds));
std::string s = td::json_encode<std::string>(td::ToJson(*obj), false);
auto s = td::json_encode<std::string>(td::ToJson(*obj.get()), false);
s.erase(std::remove_if(s.begin(), s.end(), [](char c) { return c == '\n' || c == '\r'; }), s.end());
std::ofstream file;
@ -2936,7 +2935,31 @@ void ValidatorManagerImpl::log_validator_session_stats(BlockIdExt block_id,
file << s << "\n";
file.close();
LOG(INFO) << "Writing validator session stats for " << block_id.id;
LOG(INFO) << "Writing validator session stats for " << block_id.id.to_str();
}
void ValidatorManagerImpl::log_new_validator_group_stats(validatorsession::NewValidatorGroupStats stats) {
std::string fname = opts_->get_session_logs_file();
if (fname.empty()) {
return;
}
std::vector<tl_object_ptr<ton_api::validatorSession_newValidatorGroupStats_node>> nodes;
for (const auto &node : stats.nodes) {
nodes.push_back(
create_tl_object<ton_api::validatorSession_newValidatorGroupStats_node>(node.id.bits256_value(), node.weight));
}
auto obj = create_tl_object<ton_api::validatorSession_newValidatorGroupStats>(
stats.session_id, stats.shard.workchain, stats.shard.shard, stats.cc_seqno, stats.timestamp, stats.self_idx,
std::move(nodes));
auto s = td::json_encode<std::string>(td::ToJson(*obj.get()), false);
s.erase(std::remove_if(s.begin(), s.end(), [](char c) { return c == '\n' || c == '\r'; }), s.end());
std::ofstream file;
file.open(fname, std::ios_base::app);
file << s << "\n";
file.close();
LOG(INFO) << "Writing new validator group stats for " << stats.shard.to_str();
}
void ValidatorManagerImpl::get_block_handle_for_litequery(BlockIdExt block_id, td::Promise<ConstBlockHandle> promise) {

View file

@ -493,7 +493,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void send_external_message(td::Ref<ExtMessage> message) override;
void send_ihr_message(td::Ref<IhrMessage> message) override;
void send_top_shard_block_description(td::Ref<ShardTopBlockDescription> desc) override;
void send_block_broadcast(BlockBroadcast broadcast) override;
void send_block_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override;
void send_get_out_msg_queue_proof_request(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override;
@ -593,6 +593,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void wait_shard_client_state(BlockSeqno seqno, td::Timestamp timeout, td::Promise<td::Unit> promise) override;
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override;
void log_new_validator_group_stats(validatorsession::NewValidatorGroupStats stats) override;
void get_validator_sessions_info(
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) override;

View file

@ -146,23 +146,29 @@ void AsyncStateSerializer::next_iteration() {
return;
}
if (!have_masterchain_state_) {
LOG(INFO) << "started serializing persistent state for " << masterchain_handle_->id().id;
LOG(ERROR) << "started serializing persistent state for " << masterchain_handle_->id().id.to_str();
// block next attempts immediately, but send actual request later
running_ = true;
double delay = td::Random::fast(0, 3600);
LOG(WARNING) << "serializer delay = " << delay << "s";
delay_action([SelfId = actor_id(
this)]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_masterchain_state); },
td::Timestamp::in(td::Random::fast(0, 3600)));
td::Timestamp::in(delay));
return;
}
while (next_idx_ < shards_.size()) {
// block next attempts immediately, but send actual request later
running_ = true;
double delay = td::Random::fast(0, 1800);
LOG(WARNING) << "serializer delay = " << delay << "s";
delay_action(
[SelfId = actor_id(this), shard = shards_[next_idx_]]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard); },
td::Timestamp::in(td::Random::fast(0, 1800)));
[SelfId = actor_id(this), shard = shards_[next_idx_]]() {
td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard);
},
td::Timestamp::in(delay));
return;
}
LOG(INFO) << "finished serializing persistent state for " << masterchain_handle_->id().id;
LOG(ERROR) << "finished serializing persistent state for " << masterchain_handle_->id().id.to_str();
last_key_block_ts_ = masterchain_handle_->unix_time();
last_key_block_id_ = masterchain_handle_->id();
}
@ -223,7 +229,7 @@ void AsyncStateSerializer::got_masterchain_handle(BlockHandle handle) {
void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state,
std::shared_ptr<vm::CellDbReader> cell_db_reader) {
LOG(INFO) << "serializing masterchain state " << masterchain_handle_->id().id;
LOG(ERROR) << "serializing masterchain state " << masterchain_handle_->id().id.to_str();
have_masterchain_state_ = true;
CHECK(next_idx_ == 0);
CHECK(shards_.size() == 0);
@ -248,7 +254,7 @@ void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state
}
void AsyncStateSerializer::stored_masterchain_state() {
LOG(INFO) << "finished serializing masterchain state " << masterchain_handle_->id().id;
LOG(ERROR) << "finished serializing masterchain state " << masterchain_handle_->id().id.to_str();
running_ = false;
next_iteration();
}
@ -278,13 +284,13 @@ void AsyncStateSerializer::got_shard_handle(BlockHandle handle) {
void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref<ShardState> state,
std::shared_ptr<vm::CellDbReader> cell_db_reader) {
LOG(INFO) << "serializing shard state " << handle->id().id;
LOG(ERROR) << "serializing shard state " << handle->id().id.to_str();
auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader](td::FileFd& fd) {
return vm::std_boc_serialize_to_file_large(cell_db_reader, hash, fd, 31);
};
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Unit> R) {
R.ensure();
LOG(INFO) << "finished serializing shard state " << handle->id().id;
LOG(ERROR) << "finished serializing shard state " << handle->id().id.to_str();
td::actor::send_closure(SelfId, &AsyncStateSerializer::success_handler);
});
td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, handle->id(),

View file

@ -57,8 +57,8 @@ void ValidatorGroup::generate_block_candidate(
cache = cached_collated_block_](td::Result<BlockCandidate> R) {
td::actor::send_closure(SelfId, &ValidatorGroup::generated_block_candidate, std::move(cache), std::move(R));
};
if (mode_ == ValidatorManagerOptions::validator_lite_all ||
(mode_ == ValidatorManagerOptions::validator_lite_shards && !shard_.is_masterchain())) {
if (opts_->validator_mode() == ValidatorManagerOptions::validator_lite_all ||
(opts_->validator_mode() == ValidatorManagerOptions::validator_lite_shards && !shard_.is_masterchain())) {
send_collate_query(round_id, td::Timestamp::in(10.0), std::move(P));
return;
}
@ -204,10 +204,10 @@ void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData>
}
});
run_accept_block_query(block_id, std::move(block), std::move(prev), validator_set_, std::move(sig_set),
std::move(approve_sig_set), send_broadcast,
shard_.is_masterchain() || mode_ == ValidatorManagerOptions::validator_normal, manager_,
std::move(P));
run_accept_block_query(
block_id, std::move(block), std::move(prev), validator_set_, std::move(sig_set), std::move(approve_sig_set),
send_broadcast, shard_.is_masterchain() || opts_->validator_mode() == ValidatorManagerOptions::validator_normal,
manager_, std::move(P));
}
void ValidatorGroup::skip_round(td::uint32 round_id) {
@ -343,6 +343,10 @@ void ValidatorGroup::create_session() {
<< ".",
allow_unsafe_self_blocks_resync_);
}
if (opts_->get_catchain_max_block_delay()) {
td::actor::send_closure(session_, &validatorsession::ValidatorSession::set_catchain_max_block_delay,
opts_->get_catchain_max_block_delay().value());
}
if (started_) {
td::actor::send_closure(session_, &validatorsession::ValidatorSession::start);
}
@ -373,6 +377,22 @@ void ValidatorGroup::start(std::vector<BlockIdExt> prev, BlockIdExt min_masterch
prev_block_ids_ = std::vector<BlockIdExt>{next_block_id};
}
postponed_accept_.clear();
validatorsession::NewValidatorGroupStats stats;
stats.session_id = session_id_;
stats.shard = shard_;
stats.cc_seqno = validator_set_->get_catchain_seqno();
stats.timestamp = td::Clocks::system();
td::uint32 idx = 0;
for (const auto& node : validator_set_->export_vector()) {
PublicKeyHash id = ValidatorFullId{node.key}.compute_short_id();
if (id == local_id_) {
stats.self_idx = idx;
}
stats.nodes.push_back(validatorsession::NewValidatorGroupStats::Node{id, node.weight});
++idx;
}
td::actor::send_closure(manager_, &ValidatorManager::log_new_validator_group_stats, std::move(stats));
}
void ValidatorGroup::destroy() {
@ -386,6 +406,9 @@ void ValidatorGroup::destroy() {
return;
}
auto stats = R.move_as_ok();
if (stats.rounds.empty()) {
return;
}
stats.cc_seqno = cc_seqno;
td::actor::send_closure(manager, &ValidatorManager::log_validator_session_stats, block_id,
std::move(stats));

View file

@ -72,8 +72,7 @@ class ValidatorGroup : public td::actor::Actor {
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<overlay::Overlays> overlays,
std::string db_root, td::actor::ActorId<ValidatorManager> validator_manager, bool create_session,
bool allow_unsafe_self_blocks_resync,
ValidatorManagerOptions::ValidatorMode mode = ValidatorManagerOptions::validator_normal)
bool allow_unsafe_self_blocks_resync, td::Ref<ValidatorManagerOptions> opts)
: shard_(shard)
, local_id_(std::move(local_id))
, session_id_(session_id)
@ -88,7 +87,7 @@ class ValidatorGroup : public td::actor::Actor {
, manager_(validator_manager)
, init_(create_session)
, allow_unsafe_self_blocks_resync_(allow_unsafe_self_blocks_resync)
, mode_(mode) {
, opts_(std::move(opts)) {
}
private:
@ -133,7 +132,7 @@ class ValidatorGroup : public td::actor::Actor {
bool init_ = false;
bool started_ = false;
bool allow_unsafe_self_blocks_resync_;
ValidatorManagerOptions::ValidatorMode mode_;
td::Ref<ValidatorManagerOptions> opts_;
td::uint32 last_known_round_id_ = 0;
struct CachedCollatedBlock {

View file

@ -127,6 +127,12 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
bool nonfinal_ls_queries_enabled() const override {
return nonfinal_ls_queries_enabled_;
}
td::optional<td::uint64> get_celldb_cache_size() const override {
return celldb_cache_size_;
}
td::optional<double> get_catchain_max_block_delay() const override {
return catchain_max_block_delay_;
}
ValidatorMode validator_mode() const override {
return validator_mode_;
}
@ -198,6 +204,12 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
void set_nonfinal_ls_queries_enabled(bool value) override {
nonfinal_ls_queries_enabled_ = value;
}
void set_celldb_cache_size(td::uint64 value) override {
celldb_cache_size_ = value;
}
void set_catchain_max_block_delay(double value) override {
catchain_max_block_delay_ = value;
}
void set_validator_mode(ValidatorMode value) override {
validator_mode_ = value;
}
@ -246,6 +258,8 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
double archive_preload_period_ = 0.0;
bool disable_rocksdb_stats_;
bool nonfinal_ls_queries_enabled_ = false;
td::optional<td::uint64> celldb_cache_size_;
td::optional<double> catchain_max_block_delay_;
ValidatorMode validator_mode_ = validator_normal;
};

View file

@ -86,6 +86,8 @@ struct ValidatorManagerOptions : public td::CntObject {
virtual double get_archive_preload_period() const = 0;
virtual bool get_disable_rocksdb_stats() const = 0;
virtual bool nonfinal_ls_queries_enabled() const = 0;
virtual td::optional<td::uint64> get_celldb_cache_size() const = 0;
virtual td::optional<double> get_catchain_max_block_delay() const = 0;
virtual ValidatorMode validator_mode() const = 0;
virtual void set_zero_block_id(BlockIdExt block_id) = 0;
@ -110,6 +112,8 @@ struct ValidatorManagerOptions : public td::CntObject {
virtual void set_archive_preload_period(double value) = 0;
virtual void set_disable_rocksdb_stats(bool value) = 0;
virtual void set_nonfinal_ls_queries_enabled(bool value) = 0;
virtual void set_celldb_cache_size(td::uint64 value) = 0;
virtual void set_catchain_max_block_delay(double value) = 0;
virtual void set_validator_mode(ValidatorMode value) = 0;
static td::Ref<ValidatorManagerOptions> create(
@ -135,7 +139,7 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void send_ihr_message(AccountIdPrefixFull dst, td::BufferSlice data) = 0;
virtual void send_ext_message(AccountIdPrefixFull dst, td::BufferSlice data) = 0;
virtual void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0;
virtual void send_broadcast(BlockBroadcast broadcast) = 0;
virtual void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only = false) = 0;
virtual void download_block(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<ReceivedBlock> promise) = 0;
virtual void download_zero_state(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,