1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-12 11:12:16 +00:00

Improve tweaking for high throughput (#610)

* Option "--disable-ext-msg-broadcast"

* "Get shard out queue size" query

* Move disabling ext msg broadcasts from command-line arguments to config

* Fix compilation error

* Asynchronous store_cell and gc in celldb

* Make GC in celldb work evenly over time

* Increase timeouts for downloading blocks

* Reuse blocks from previous rounds in validator session

* Use Rldp2 in FullNode for downloading persistent states and archives

* Improve logs in download-archive-slice and download-state

* Decrease delay between serializing shards

* Make CellDbIn::load_cell synchronous to avoid interfering with store_cell

---------

Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
EmelyanenkoK 2023-03-15 10:47:35 +03:00 committed by GitHub
parent 30c742aedd
commit 47311d6e0e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 712 additions and 163 deletions

View file

@ -43,7 +43,7 @@ class CellHashTable {
template <class F>
void for_each(F &&f) {
for (auto &info : set_) {
f(info);
f(const_cast<InfoT &>(info));
}
}
template <class F>

View file

@ -150,6 +150,20 @@ td::Result<CellLoader::LoadResult> CellLoader::load(td::Slice hash, bool need_da
return res;
}
td::Result<CellLoader::LoadResult> CellLoader::load_refcnt(td::Slice hash) {
LoadResult res;
std::string serialized;
TRY_RESULT(get_status, reader_->get(hash, serialized));
if (get_status != KeyValue::GetStatus::Ok) {
DCHECK(get_status == KeyValue::GetStatus::NotFound);
return res;
}
res.status = LoadResult::Ok;
td::TlParser parser(serialized);
td::parse(res.refcnt_, parser);
return res;
}
CellStorer::CellStorer(KeyValue &kv) : kv_(kv) {
}

View file

@ -48,6 +48,7 @@ class CellLoader {
};
CellLoader(std::shared_ptr<KeyValueReader> reader);
td::Result<LoadResult> load(td::Slice hash, bool need_data, ExtCellCreator &ext_cell_creator);
td::Result<LoadResult> load_refcnt(td::Slice hash); // This only loads refcnt_, cell_ == null
private:
std::shared_ptr<KeyValueReader> reader_;

View file

@ -27,6 +27,9 @@
#include "td/utils/ThreadSafeCounter.h"
#include "vm/cellslice.h"
#include <queue>
#include "td/actor/actor.h"
#include "common/delay.h"
namespace vm {
namespace {
@ -138,8 +141,6 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
if (cell->get_virtualization() != 0) {
return;
}
//LOG(ERROR) << "INC";
//CellSlice(cell, nullptr).print_rec(std::cout);
to_inc_.push_back(cell);
}
void dec(const Ref<Cell> &cell) override {
@ -149,8 +150,6 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
if (cell->get_virtualization() != 0) {
return;
}
//LOG(ERROR) << "DEC";
//CellSlice(cell, nullptr).print_rec(std::cout);
to_dec_.push_back(cell);
}
@ -167,25 +166,20 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
if (is_prepared_for_commit()) {
return td::Status::OK();
}
//LOG(ERROR) << "dfs_new_cells_in_db";
for (auto &new_cell : to_inc_) {
auto &new_cell_info = get_cell_info(new_cell);
dfs_new_cells_in_db(new_cell_info);
}
//return td::Status::OK();
//LOG(ERROR) << "dfs_new_cells";
for (auto &new_cell : to_inc_) {
auto &new_cell_info = get_cell_info(new_cell);
dfs_new_cells(new_cell_info);
}
//LOG(ERROR) << "dfs_old_cells";
for (auto &old_cell : to_dec_) {
auto &old_cell_info = get_cell_info(old_cell);
dfs_old_cells(old_cell_info);
}
//LOG(ERROR) << "save_diff_prepare";
save_diff_prepare();
to_inc_.clear();
@ -363,7 +357,6 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
info.was = true;
visited_.push_back(&info);
}
//LOG(ERROR) << "dfs new " << td::format::escaped(info.cell->hash());
if (info.was_dfs_new_cells) {
return;
@ -384,7 +377,6 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
info.was = true;
visited_.push_back(&info);
}
//LOG(ERROR) << "dfs old " << td::format::escaped(info.cell->hash());
load_cell(info);
@ -405,7 +397,6 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
}
void save_diff(CellStorer &storer) {
//LOG(ERROR) << hash_table_.size();
for (auto info_ptr : visited_) {
save_cell(*info_ptr, storer);
}
@ -414,7 +405,6 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
void save_cell_prepare(CellInfo &info) {
if (info.refcnt_diff == 0) {
//CellSlice(info.cell, nullptr).print_rec(std::cout);
return;
}
load_cell(info);
@ -450,15 +440,11 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
if (info.db_refcnt == 0) {
CHECK(info.in_db);
//LOG(ERROR) << "ERASE";
//CellSlice(NoVm(), info.cell).print_rec(std::cout);
storer.erase(info.cell->get_hash().as_slice());
info.in_db = false;
hash_table_.erase(info.cell->get_hash().as_slice());
guard.dismiss();
} else {
//LOG(ERROR) << "SAVE " << info.db_refcnt;
//CellSlice(NoVm(), info.cell).print_rec(std::cout);
auto loaded_cell = info.cell->load_cell().move_as_ok();
storer.set(info.db_refcnt, *loaded_cell.data_cell);
info.in_db = true;
@ -482,7 +468,6 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
CHECK(cell->is_loaded());
vm::CellSlice cs(vm::NoVm{}, cell); // FIXME
for (unsigned i = 0; i < cs.size_refs(); i++) {
//LOG(ERROR) << "---> " << td::format::escaped(cell->ref(i)->hash());
f(get_cell_info(cs.prefetch_ref(i)));
}
}
@ -573,6 +558,177 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
DynamicBocExtCellExtra{cell_db_reader_}));
return std::move(res);
}
struct PrepareCommitAsyncState {
size_t remaining_ = 0;
std::shared_ptr<AsyncExecutor> executor_;
td::Promise<td::Unit> promise_;
struct CellInfo2 {
CellInfo *info;
std::vector<CellInfo2 *> parents;
unsigned remaining_children = 0;
Cell::Hash key() const {
return info->key();
}
bool operator<(const CellInfo2 &other) const {
return key() < other.key();
}
friend bool operator<(const CellInfo2 &a, td::Slice b) {
return a.key().as_slice() < b;
}
friend bool operator<(td::Slice a, const CellInfo2 &b) {
return a < b.key().as_slice();
}
};
CellHashTable<CellInfo2> cells_;
};
std::unique_ptr<PrepareCommitAsyncState> pca_state_;
void prepare_commit_async(std::shared_ptr<AsyncExecutor> executor, td::Promise<td::Unit> promise) override {
if (pca_state_) {
promise.set_error(td::Status::Error("Other prepare_commit_async is not finished"));
return;
}
if (is_prepared_for_commit()) {
promise.set_result(td::Unit());
return;
}
pca_state_ = std::make_unique<PrepareCommitAsyncState>();
pca_state_->executor_ = std::move(executor);
pca_state_->promise_ = std::move(promise);
for (auto &new_cell : to_inc_) {
dfs_new_cells_in_db_async(new_cell);
}
pca_state_->cells_.for_each([&](PrepareCommitAsyncState::CellInfo2 &info) {
++pca_state_->remaining_;
if (info.remaining_children == 0) {
pca_load_from_db(&info);
}
});
if (pca_state_->remaining_ == 0) {
prepare_commit_async_cont();
}
}
void dfs_new_cells_in_db_async(const td::Ref<vm::Cell> &cell, PrepareCommitAsyncState::CellInfo2 *parent = nullptr) {
bool exists = true;
pca_state_->cells_.apply(cell->get_hash().as_slice(), [&](PrepareCommitAsyncState::CellInfo2 &info) {
if (info.info == nullptr) {
exists = false;
info.info = &get_cell_info(cell);
}
});
auto info = pca_state_->cells_.get_if_exists(cell->get_hash().as_slice());
if (parent) {
info->parents.push_back(parent);
++parent->remaining_children;
}
if (exists) {
return;
}
if (cell->is_loaded()) {
vm::CellSlice cs(vm::NoVm{}, cell);
for (unsigned i = 0; i < cs.size_refs(); i++) {
dfs_new_cells_in_db_async(cs.prefetch_ref(i), info);
}
}
}
void pca_load_from_db(PrepareCommitAsyncState::CellInfo2 *info) {
pca_state_->executor_->execute_async(
[db = this, info, executor = pca_state_->executor_, loader = *loader_]() mutable {
auto res = loader.load_refcnt(info->info->cell->get_hash().as_slice()).move_as_ok();
executor->execute_sync([db, info, res = std::move(res)]() {
db->pca_set_in_db(info, std::move(res));
});
});
}
void pca_set_in_db(PrepareCommitAsyncState::CellInfo2 *info, CellLoader::LoadResult result) {
info->info->sync_with_db = true;
if (result.status == CellLoader::LoadResult::Ok) {
info->info->in_db = true;
info->info->db_refcnt = result.refcnt();
} else {
info->info->in_db = false;
}
for (PrepareCommitAsyncState::CellInfo2 *parent_info : info->parents) {
if (parent_info->info->sync_with_db) {
continue;
}
if (!info->info->in_db) {
pca_set_in_db(parent_info, {});
} else if (--parent_info->remaining_children == 0) {
pca_load_from_db(parent_info);
}
}
if (--pca_state_->remaining_ == 0) {
prepare_commit_async_cont();
}
}
void prepare_commit_async_cont() {
for (auto &new_cell : to_inc_) {
auto &new_cell_info = get_cell_info(new_cell);
dfs_new_cells(new_cell_info);
}
CHECK(pca_state_->remaining_ == 0);
for (auto &old_cell : to_dec_) {
auto &old_cell_info = get_cell_info(old_cell);
dfs_old_cells_async(old_cell_info);
}
if (pca_state_->remaining_ == 0) {
prepare_commit_async_cont2();
}
}
void dfs_old_cells_async(CellInfo &info) {
if (!info.was) {
info.was = true;
visited_.push_back(&info);
if (!info.sync_with_db) {
++pca_state_->remaining_;
load_cell_async(
info.cell->get_hash().as_slice(), pca_state_->executor_,
[executor = pca_state_->executor_, db = this, info = &info](td::Result<td::Ref<vm::DataCell>> R) {
R.ensure();
executor->execute_sync([db, info]() {
CHECK(info->sync_with_db);
db->dfs_old_cells_async(*info);
if (--db->pca_state_->remaining_ == 0) {
db->prepare_commit_async_cont2();
}
});
});
return;
}
}
info.refcnt_diff--;
if (!info.sync_with_db) {
return;
}
auto new_refcnt = info.refcnt_diff + info.db_refcnt;
CHECK(new_refcnt >= 0);
if (new_refcnt != 0) {
return;
}
for_each(info, [this](auto &child_info) { dfs_old_cells_async(child_info); });
}
void prepare_commit_async_cont2() {
save_diff_prepare();
to_inc_.clear();
to_dec_.clear();
pca_state_->promise_.set_result(td::Unit());
pca_state_ = {};
}
};
} // namespace

View file

@ -75,6 +75,7 @@ class DynamicBagOfCellsDb {
virtual void load_cell_async(td::Slice hash, std::shared_ptr<AsyncExecutor> executor,
td::Promise<Ref<DataCell>> promise) = 0;
virtual void prepare_commit_async(std::shared_ptr<AsyncExecutor> executor, td::Promise<td::Unit> promise) = 0;
};
} // namespace vm

View file

@ -170,7 +170,7 @@ ton::tl_object_ptr<ton::ton_api::engine_validator_config> Config::tl() const {
return ton::create_tl_object<ton::ton_api::engine_validator_config>(
out_port, std::move(addrs_vec), std::move(adnl_vec), std::move(dht_vec), std::move(val_vec),
ton::PublicKeyHash::zero().tl(), std::move(full_node_slaves_vec), std::move(full_node_masters_vec),
std::move(liteserver_vec), std::move(control_vec), std::move(gc_vec));
nullptr, std::move(liteserver_vec), std::move(control_vec), std::move(gc_vec));
}
td::Result<bool> Config::config_add_network_addr(td::IPAddress in_ip, td::IPAddress out_ip,

View file

@ -33,7 +33,7 @@ void OptionParser::set_description(string description) {
void OptionParser::add_option(Option::Type type, char short_key, Slice long_key, Slice description,
std::function<Status(Slice)> callback) {
for (auto &option : options_) {
if (option.short_key == short_key || (!long_key.empty() && long_key == option.long_key)) {
if ((short_key != '\0' && option.short_key == short_key) || (!long_key.empty() && long_key == option.long_key)) {
LOG(ERROR) << "Ignore duplicated option '" << short_key << "' '" << long_key << "'";
}
}

View file

@ -588,10 +588,12 @@ engine.gc ids:(vector int256) = engine.Gc;
engine.dht.config dht:(vector engine.dht) gc:engine.gc = engine.dht.Config;
engine.validator.fullNodeMaster port:int adnl:int256 = engine.validator.FullNodeMaster;
engine.validator.fullNodeSlave ip:int port:int adnl:PublicKey = engine.validator.FullNodeSlave;
engine.validator.fullNodeConfig ext_messages_broadcast_disabled:Bool = engine.validator.FullNodeConfig;
engine.validator.config out_port:int addrs:(vector engine.Addr) adnl:(vector engine.adnl)
dht:(vector engine.dht)
validators:(vector engine.validator) fullnode:int256 fullnodeslaves:(vector engine.validator.fullNodeSlave)
fullnodemasters:(vector engine.validator.fullNodeMaster)
fullnodeconfig:engine.validator.fullNodeConfig
liteservers:(vector engine.liteServer) control:(vector engine.controlInterface)
gc:engine.gc = engine.validator.Config;
@ -642,6 +644,8 @@ engine.validator.onePerfTimerStat time:int min:double avg:double max:double = en
engine.validator.perfTimerStatsByName name:string stats:(vector engine.validator.OnePerfTimerStat) = engine.validator.PerfTimerStatsByName;
engine.validator.perfTimerStats stats:(vector engine.validator.PerfTimerStatsByName) = engine.validator.PerfTimerStats;
engine.validator.shardOutQueueSize size:int = engine.validator.ShardOutQueueSize;
---functions---
@ -693,6 +697,8 @@ engine.validator.signShardOverlayCertificate workchain:int shard:long signed_key
engine.validator.importShardOverlayCertificate workchain:int shard:long signed_key:engine.validator.KeyHash cert:overlay.Certificate = engine.validator.Success;
engine.validator.getPerfTimerStats name:string = engine.validator.PerfTimerStats;
engine.validator.getShardOutQueueSize flags:# block_id:tonNode.blockId dest_wc:flags.0?int dest_shard:flags.0?long = engine.validator.ShardOutQueueSize;
engine.validator.setExtMessagesBroadcastDisabled disabled:Bool = engine.validator.Success;
---types---

Binary file not shown.

View file

@ -32,6 +32,7 @@
#include "terminal/terminal.h"
#include "td/utils/filesystem.h"
#include "overlay/overlays.h"
#include "ton/ton-tl.hpp"
#include <cctype>
#include <fstream>
@ -1055,3 +1056,54 @@ td::Status GetPerfTimerStatsJsonQuery::receive(td::BufferSlice data) {
td::TerminalIO::output(std::string("wrote stats to " + file_name_ + "\n"));
return td::Status::OK();
}
td::Status GetShardOutQueueSizeQuery::run() {
TRY_RESULT_ASSIGN(block_id_.workchain, tokenizer_.get_token<int>());
TRY_RESULT_ASSIGN(block_id_.shard, tokenizer_.get_token<long long>());
TRY_RESULT_ASSIGN(block_id_.seqno, tokenizer_.get_token<int>());
if (!tokenizer_.endl()) {
ton::ShardIdFull dest;
TRY_RESULT_ASSIGN(dest.workchain, tokenizer_.get_token<int>());
TRY_RESULT_ASSIGN(dest.shard, tokenizer_.get_token<long long>());
dest_ = dest;
}
TRY_STATUS(tokenizer_.check_endl());
return td::Status::OK();
}
td::Status GetShardOutQueueSizeQuery::send() {
auto b = ton::create_serialize_tl_object<ton::ton_api::engine_validator_getShardOutQueueSize>(
dest_ ? 1 : 0, ton::create_tl_block_id_simple(block_id_), dest_ ? dest_.value().workchain : 0,
dest_ ? dest_.value().shard : 0);
td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise());
return td::Status::OK();
}
td::Status GetShardOutQueueSizeQuery::receive(td::BufferSlice data) {
TRY_RESULT_PREFIX(f, ton::fetch_tl_object<ton::ton_api::engine_validator_shardOutQueueSize>(data.as_slice(), true),
"received incorrect answer: ");
td::TerminalIO::out() << "Queue_size: " << f->size_ << "\n";
return td::Status::OK();
}
td::Status SetExtMessagesBroadcastDisabledQuery::run() {
TRY_RESULT(x, tokenizer_.get_token<int>());
if (x < 0 || x > 1) {
return td::Status::Error("value should be 0 or 1");
}
value = x;
return td::Status::OK();
}
td::Status SetExtMessagesBroadcastDisabledQuery::send() {
auto b = ton::create_serialize_tl_object<ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled>(value);
td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise());
return td::Status::OK();
}
td::Status SetExtMessagesBroadcastDisabledQuery::receive(td::BufferSlice data) {
TRY_RESULT_PREFIX(f, ton::fetch_tl_object<ton::ton_api::engine_validator_success>(data.as_slice(), true),
"received incorrect answer: ");
td::TerminalIO::out() << "success\n";
return td::Status::OK();
}

View file

@ -33,6 +33,7 @@
#include "td/utils/SharedSlice.h"
#include "td/utils/port/IPAddress.h"
#include "td/actor/actor.h"
#include "ton/ton-types.h"
#include "keys/keys.hpp"
@ -1096,3 +1097,50 @@ class GetPerfTimerStatsJsonQuery : public Query {
private:
std::string file_name_;
};
class GetShardOutQueueSizeQuery : public Query {
public:
GetShardOutQueueSizeQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer)
: Query(console, std::move(tokenizer)) {
}
td::Status run() override;
td::Status send() override;
td::Status receive(td::BufferSlice data) override;
static std::string get_name() {
return "getshardoutqueuesize";
}
static std::string get_help() {
return "getshardoutqueuesize <wc> <shard> <seqno> [<dest_wc> <dest_shard>]\treturns number of messages in the "
"queue of the given shard. Destination shard is optional.";
}
std::string name() const override {
return get_name();
}
private:
ton::BlockId block_id_;
td::optional<ton::ShardIdFull> dest_;
};
class SetExtMessagesBroadcastDisabledQuery : public Query {
public:
SetExtMessagesBroadcastDisabledQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer)
: Query(console, std::move(tokenizer)) {
}
td::Status run() override;
td::Status send() override;
td::Status receive(td::BufferSlice data) override;
static std::string get_name() {
return "setextmessagesbroadcastdisabled";
}
static std::string get_help() {
return "setextmessagesbroadcastdisabled <value>\tdisable broadcasting and rebroadcasting ext messages; value is 0 "
"or 1.";
}
std::string name() const override {
return get_name();
}
private:
bool value;
};

View file

@ -141,6 +141,8 @@ void ValidatorEngineConsole::run() {
add_query_runner(std::make_unique<QueryRunnerImpl<ImportShardOverlayCertificateQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<SignShardOverlayCertificateQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<GetPerfTimerStatsJsonQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<GetShardOutQueueSizeQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<SetExtMessagesBroadcastDisabledQuery>>());
}
bool ValidatorEngineConsole::envelope_send_query(td::BufferSlice query, td::Promise<td::BufferSlice> promise) {

View file

@ -12,7 +12,7 @@ set(VALIDATOR_ENGINE_SOURCE
add_executable(validator-engine ${VALIDATOR_ENGINE_SOURCE})
target_link_libraries(validator-engine overlay tdutils tdactor adnl tl_api dht
rldp catchain validatorsession full-node validator ton_validator validator
rldp rldp2 catchain validatorsession full-node validator ton_validator validator
fift-lib memprof git ${JEMALLOC_LIBRARIES})
install(TARGETS validator-engine RUNTIME DESTINATION bin)

View file

@ -69,7 +69,8 @@
#include <limits>
#include <set>
#include "git.h"
#include "block-auto.h"
#include "block-parse.h"
Config::Config() {
out_port = 3278;
@ -149,6 +150,10 @@ Config::Config(ton::ton_api::engine_validator_config &config) {
config_add_full_node_master(s->port_, ton::PublicKeyHash{s->adnl_}).ensure();
}
if (config.fullnodeconfig_) {
full_node_config = ton::validator::fullnode::FullNodeConfig(config.fullnodeconfig_);
}
for (auto &serv : config.liteservers_) {
config_add_lite_server(ton::PublicKeyHash{serv->id_}, serv->port_).ensure();
}
@ -219,6 +224,11 @@ ton::tl_object_ptr<ton::ton_api::engine_validator_config> Config::tl() const {
ton::create_tl_object<ton::ton_api::engine_validator_fullNodeMaster>(x.first, x.second.tl()));
}
ton::tl_object_ptr<ton::ton_api::engine_validator_fullNodeConfig> full_node_config_obj = {};
if (full_node_config != ton::validator::fullnode::FullNodeConfig()) {
full_node_config_obj = full_node_config.tl();
}
std::vector<ton::tl_object_ptr<ton::ton_api::engine_liteServer>> liteserver_vec;
for (auto &x : liteservers) {
liteserver_vec.push_back(ton::create_tl_object<ton::ton_api::engine_liteServer>(x.second.tl(), x.first));
@ -240,8 +250,8 @@ ton::tl_object_ptr<ton::ton_api::engine_validator_config> Config::tl() const {
}
return ton::create_tl_object<ton::ton_api::engine_validator_config>(
out_port, std::move(addrs_vec), std::move(adnl_vec), std::move(dht_vec), std::move(val_vec), full_node.tl(),
std::move(full_node_slaves_vec), std::move(full_node_masters_vec), std::move(liteserver_vec),
std::move(control_vec), std::move(gc_vec));
std::move(full_node_slaves_vec), std::move(full_node_masters_vec), std::move(full_node_config_obj),
std::move(liteserver_vec), std::move(control_vec), std::move(gc_vec));
}
td::Result<bool> Config::config_add_network_addr(td::IPAddress in_ip, td::IPAddress out_ip,
@ -1742,6 +1752,7 @@ void ValidatorEngine::started_dht() {
void ValidatorEngine::start_rldp() {
rldp_ = ton::rldp::Rldp::create(adnl_.get());
rldp2_ = ton::rldp2::Rldp::create(adnl_.get());
started_rldp();
}
@ -1804,7 +1815,7 @@ void ValidatorEngine::start_full_node() {
}
full_node_ = ton::validator::fullnode::FullNode::create(
short_id, ton::adnl::AdnlNodeIdShort{config_.full_node}, validator_options_->zero_block_id().file_hash,
keyring_.get(), adnl_.get(), rldp_.get(),
config_.full_node_config, keyring_.get(), adnl_.get(), rldp_.get(), rldp2_.get(),
default_dht_node_.is_zero() ? td::actor::ActorId<ton::dht::Dht>{} : dht_nodes_[default_dht_node_].get(),
overlay_manager_.get(), validator_manager_.get(), full_node_client_.get(), db_root_);
}
@ -3333,6 +3344,112 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getPerfTi
td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::prepare_perf_timer_stats, std::move(P));
}
void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getShardOutQueueSize &query,
td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm,
td::Promise<td::BufferSlice> promise) {
if (!(perm & ValidatorEnginePermissions::vep_default)) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized")));
return;
}
if (validator_manager_.empty()) {
promise.set_value(
create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "validator manager not started")));
return;
}
ton::BlockId block_id = ton::create_block_id_simple(query.block_id_);
if (!block_id.is_valid_ext()) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "invalid block id")));
return;
}
td::optional<ton::ShardIdFull> dest;
if (query.flags_ & 1) {
dest = ton::ShardIdFull{query.dest_wc_, (ton::ShardId)query.dest_shard_};
if (!dest.value().is_valid_ext()) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "invalid shard")));
return;
}
}
td::actor::send_closure(
validator_manager_, &ton::validator::ValidatorManagerInterface::get_block_by_seqno_from_db,
ton::AccountIdPrefixFull{block_id.workchain, block_id.shard}, block_id.seqno,
[=, promise = std::move(promise),
manager = validator_manager_.get()](td::Result<ton::validator::ConstBlockHandle> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
return;
}
auto handle = R.move_as_ok();
if (handle->id().id != block_id) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "no such block")));
return;
}
td::actor::send_closure(
manager, &ton::validator::ValidatorManagerInterface::get_shard_state_from_db, handle,
[=, promise = std::move(promise)](td::Result<td::Ref<ton::validator::ShardState>> R) mutable {
auto res = [&]() -> td::Result<td::BufferSlice> {
TRY_RESULT(state, std::move(R));
TRY_RESULT(outq_descr, state->message_queue());
block::gen::OutMsgQueueInfo::Record qinfo;
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
return td::Status::Error(ton::ErrorCode::error, "invalid message queue");
}
auto queue = std::make_unique<vm::AugmentedDictionary>(qinfo.out_queue->prefetch_ref(0), 352,
block::tlb::aug_OutMsgQueue);
if (dest) {
td::BitArray<96> prefix;
td::BitPtr ptr = prefix.bits();
ptr.store_int(dest.value().workchain, 32);
ptr.advance(32);
ptr.store_uint(dest.value().shard, 64);
if (!queue->cut_prefix_subdict(prefix.bits(), 32 + dest.value().pfx_len())) {
return td::Status::Error(ton::ErrorCode::error, "invalid message queue");
}
}
int size = 0;
queue->check_for_each([&](td::Ref<vm::CellSlice>, td::ConstBitPtr, int) -> bool {
++size;
return true;
});
return ton::create_serialize_tl_object<ton::ton_api::engine_validator_shardOutQueueSize>(size);
}();
if (res.is_error()) {
promise.set_value(create_control_query_error(res.move_as_error()));
} else {
promise.set_value(res.move_as_ok());
}
});
});
}
void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm,
td::Promise<td::BufferSlice> promise) {
if (!(perm & ValidatorEnginePermissions::vep_modify)) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized")));
return;
}
if (!started_) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started")));
return;
}
if (config_.full_node_config.ext_messages_broadcast_disabled_ == query.disabled_) {
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::engine_validator_success>());
return;
}
config_.full_node_config.ext_messages_broadcast_disabled_ = query.disabled_;
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::set_config, config_.full_node_config);
write_config([promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
} else {
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::engine_validator_success>());
}
});
}
void ValidatorEngine::process_control_query(td::uint16 port, ton::adnl::AdnlNodeIdShort src,
ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) {

View file

@ -30,6 +30,7 @@
#include "adnl/adnl.h"
#include "auto/tl/ton_api.h"
#include "rldp/rldp.h"
#include "rldp2/rldp.h"
#include "dht/dht.h"
#include "validator/manager.h"
#include "validator/validator.h"
@ -85,6 +86,7 @@ struct Config {
std::vector<FullNodeSlave> full_node_slaves;
std::map<td::int32, ton::PublicKeyHash> full_node_masters;
std::map<td::int32, ton::PublicKeyHash> liteservers;
ton::validator::fullnode::FullNodeConfig full_node_config;
std::map<td::int32, Control> controls;
std::set<ton::PublicKeyHash> gc;
@ -137,6 +139,7 @@ class ValidatorEngine : public td::actor::Actor {
td::actor::ActorOwn<ton::adnl::AdnlNetworkManager> adnl_network_manager_;
td::actor::ActorOwn<ton::adnl::Adnl> adnl_;
td::actor::ActorOwn<ton::rldp::Rldp> rldp_;
td::actor::ActorOwn<ton::rldp2::Rldp> rldp2_;
std::map<ton::PublicKeyHash, td::actor::ActorOwn<ton::dht::Dht>> dht_nodes_;
ton::PublicKeyHash default_dht_node_ = ton::PublicKeyHash::zero();
td::actor::ActorOwn<ton::overlay::Overlays> overlay_manager_;
@ -409,6 +412,10 @@ class ValidatorEngine : public td::actor::Actor {
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_getPerfTimerStats &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_getShardOutQueueSize &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
template <class T>
void run_control_query(T &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm,
td::Promise<td::BufferSlice> promise) {

View file

@ -45,8 +45,8 @@ void ValidatorSessionImpl::process_blocks(std::vector<catchain::CatChainBlock *>
std::vector<tl_object_ptr<ton_api::validatorSession_round_Message>> msgs;
if (generated_ && !sent_generated_) {
auto it = blocks_[0].find(generated_block_);
CHECK(it != blocks_[0].end());
auto it = blocks_.find(generated_block_);
CHECK(it != blocks_.end());
auto &B = it->second;
auto file_hash = sha256_bits256(B->data_);
@ -230,13 +230,15 @@ void ValidatorSessionImpl::process_broadcast(PublicKeyHash src, td::BufferSlice
auto block_round = static_cast<td::uint32>(candidate->round_);
auto block_id = description().candidate_id(src_idx, candidate->root_hash_, file_hash, collated_data_file_hash);
if (block_round < cur_round_ || block_round >= cur_round_ + blocks_.size()) {
if ((td::int32)block_round < (td::int32)cur_round_ - MAX_PAST_ROUND_BLOCK ||
block_round >= cur_round_ + MAX_FUTURE_ROUND_BLOCK) {
VLOG(VALIDATOR_SESSION_NOTICE) << this << "[node " << src << "][broadcast " << block_id
<< "]: bad round=" << block_round << " cur_round" << cur_round_;
return;
}
auto it = blocks_[block_round - cur_round_].find(block_id);
if (it != blocks_[block_round - cur_round_].end()) {
auto it = blocks_.find(block_id);
if (it != blocks_.end()) {
it->second->round_ = std::max<td::uint32>(it->second->round_, block_round);
VLOG(VALIDATOR_SESSION_INFO) << this << "[node " << src << "][broadcast " << block_id << "]: duplicate";
return;
}
@ -248,7 +250,7 @@ void ValidatorSessionImpl::process_broadcast(PublicKeyHash src, td::BufferSlice
return;
}
blocks_[block_round - cur_round_][block_id] = std::move(candidate);
blocks_[block_id] = std::move(candidate);
VLOG(VALIDATOR_SESSION_WARNING) << this << ": received broadcast " << block_id;
if (block_round != cur_round_) {
@ -407,7 +409,7 @@ void ValidatorSessionImpl::generated_block(td::uint32 round, ValidatorSessionCan
td::actor::send_closure(catchain_, &catchain::CatChain::send_broadcast, std::move(B));
blocks_[0].emplace(block_id, std::move(b));
blocks_.emplace(block_id, std::move(b));
pending_generate_ = false;
generated_ = true;
generated_block_ = block_id;
@ -507,9 +509,10 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) {
if (block) {
auto T = td::Timestamp::at(round_started_at_.at() + description().get_delay(block->get_src_idx()) + 2.0);
auto it = blocks_[0].find(block_id);
auto it = blocks_.find(block_id);
if (it != blocks_[0].end()) {
if (it != blocks_.end()) {
it->second->round_ = std::max<td::uint32>(it->second->round_, cur_round_);
td::PerfWarningTimer timer{"too long block validation", 1.0};
auto &B = it->second;
@ -532,7 +535,6 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) {
}
});
pending_approve_.insert(block_id);
CHECK(static_cast<td::int32>(cur_round_) == B->round_);
callback_->on_candidate(cur_round_, description().get_source_public_key(block->get_src_idx()), B->root_hash_,
B->data_.clone(), B->collated_data_.clone(), std::move(P));
@ -556,7 +558,7 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) {
get_broadcast_p2p(id, block->get_file_hash(), block->get_collated_data_file_hash(),
description().get_source_id(block->get_src_idx()), cur_round_, block->get_root_hash(),
std::move(P), td::Timestamp::in(2.0));
std::move(P), td::Timestamp::in(15.0));
} else {
LOG(VALIDATOR_SESSION_DEBUG) << this << ": no nodes to download candidate " << block << " from";
}
@ -773,7 +775,7 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) {
}
}
auto it = blocks_[0].find(SentBlock::get_block_id(block));
auto it = blocks_.find(SentBlock::get_block_id(block));
bool have_block = (bool)block;
if (!have_block) {
callback_->on_block_skipped(cur_round_);
@ -788,7 +790,7 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) {
cur_stats_.creator = description().get_source_id(block->get_src_idx());
cur_stats_.self = description().get_source_id(local_idx());
if (it == blocks_[0].end()) {
if (it == blocks_.end()) {
callback_->on_block_committed(cur_round_, description().get_source_public_key(block->get_src_idx()),
block->get_root_hash(), block->get_file_hash(), td::BufferSlice(),
std::move(export_sigs), std::move(export_approve_sigs), std::move(cur_stats_));
@ -804,10 +806,14 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) {
} else {
stats_add_round();
}
for (size_t i = 0; i < blocks_.size() - 1; i++) {
blocks_[i] = std::move(blocks_[i + 1]);
auto it2 = blocks_.begin();
while (it2 != blocks_.end()) {
if (it2->second->round_ < (td::int32)cur_round_ - MAX_PAST_ROUND_BLOCK) {
it2 = blocks_.erase(it2);
} else {
++it2;
}
}
blocks_[blocks_.size() - 1].clear();
}
round_started_at_ = td::Timestamp::now();

View file

@ -73,7 +73,7 @@ class ValidatorSessionImpl : public ValidatorSession {
ValidatorSessionCandidateId signed_block_;
td::BufferSlice signature_;
std::array<std::map<ValidatorSessionCandidateId, tl_object_ptr<ton_api::validatorSession_candidate>>, 100> blocks_;
std::map<ValidatorSessionCandidateId, tl_object_ptr<ton_api::validatorSession_candidate>> blocks_;
catchain::CatChainSessionId unique_hash_;
@ -204,6 +204,8 @@ class ValidatorSessionImpl : public ValidatorSession {
private:
static const size_t MAX_REJECT_REASON_SIZE = 1024;
static const td::int32 MAX_FUTURE_ROUND_BLOCK = 100;
static const td::int32 MAX_PAST_ROUND_BLOCK = 20;
};
} // namespace validatorsession

View file

@ -198,5 +198,5 @@ target_link_libraries(validator-disk PRIVATE tdutils tdactor adnl rldp tl_api dh
target_link_libraries(validator-hardfork PRIVATE tdutils tdactor adnl rldp tl_api dht tdfec
overlay catchain validatorsession ton_crypto ton_block ton_db)
target_link_libraries(full-node PRIVATE tdutils tdactor adnl rldp tl_api dht tdfec
target_link_libraries(full-node PRIVATE tdutils tdactor adnl rldp rldp2 tl_api dht tdfec
overlay catchain validatorsession ton_crypto ton_block ton_db)

View file

@ -33,11 +33,11 @@ class CellDbAsyncExecutor : public vm::DynamicBagOfCellsDb::AsyncExecutor {
explicit CellDbAsyncExecutor(td::actor::ActorId<CellDbBase> cell_db) : cell_db_(std::move(cell_db)) {
}
void execute_async(std::function<void()> f) {
void execute_async(std::function<void()> f) override {
class Runner : public td::actor::Actor {
public:
explicit Runner(std::function<void()> f) : f_(std::move(f)) {}
void start_up() {
void start_up() override {
f_();
stop();
}
@ -47,7 +47,7 @@ class CellDbAsyncExecutor : public vm::DynamicBagOfCellsDb::AsyncExecutor {
td::actor::create_actor<Runner>("executeasync", std::move(f)).release();
}
void execute_sync(std::function<void()> f) {
void execute_sync(std::function<void()> f) override {
td::actor::send_closure(cell_db_, &CellDbBase::execute_sync, std::move(f));
}
private:
@ -83,23 +83,45 @@ void CellDbIn::start_up() {
set_block(empty, std::move(e));
cell_db_->commit_write_batch().ensure();
}
last_gc_ = empty;
}
void CellDbIn::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise) {
boc_->load_cell_async(hash.as_slice(), async_executor, std::move(promise));
enqueue([this, hash, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
return;
}
promise.set_result(boc_->load_cell(hash.as_slice()));
release_db();
});
}
void CellDbIn::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise) {
td::PerfWarningTimer timer{"storecell", 0.1};
auto key_hash = get_key_hash(block_id);
auto R = get_block(key_hash);
// duplicate
if (R.is_ok()) {
promise.set_result(boc_->load_cell(cell->get_hash().as_slice()));
return;
}
enqueue([this, block_id, cell = std::move(cell), promise = std::move(promise)](td::Result<td::Unit> R0) mutable {
if (R0.is_error()) {
return;
}
promise = promise.wrap([timer = td::PerfWarningTimer{"storecell", 0.1}](td::Ref<vm::DataCell> &&r) { return r; });
auto key_hash = get_key_hash(block_id);
auto R = get_block(key_hash);
// duplicate
if (R.is_ok()) {
promise.set_result(boc_->load_cell(cell->get_hash().as_slice()));
release_db();
return;
}
boc_->inc(cell);
boc_->prepare_commit_async(
async_executor, [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result<td::Unit> R) mutable {
R.ensure();
td::actor::send_closure(SelfId, &CellDbIn::store_cell_cont, block_id, cell, std::move(promise));
});
});
}
void CellDbIn::store_cell_cont(BlockIdExt block_id, td::Ref<vm::Cell> cell,
td::Promise<td::Ref<vm::DataCell>> promise) {
auto key_hash = get_key_hash(block_id);
auto empty = get_empty_key_hash();
auto ER = get_block(empty);
ER.ensure();
@ -120,9 +142,7 @@ void CellDbIn::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promi
P.prev = key_hash;
}
boc_->inc(cell);
boc_->prepare_commit().ensure();
vm::CellStorer stor{*cell_db_.get()};
vm::CellStorer stor{*cell_db_};
cell_db_->begin_write_batch().ensure();
boc_->commit(stor).ensure();
set_block(empty, std::move(E));
@ -134,24 +154,29 @@ void CellDbIn::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promi
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
promise.set_result(boc_->load_cell(cell->get_hash().as_slice()));
release_db();
}
void CellDbIn::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) {
promise.set_result(boc_->get_cell_db_reader());
enqueue([this, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
return;
}
promise.set_result(boc_->get_cell_db_reader());
release_db();
});
}
void CellDbIn::alarm() {
auto R = get_block(last_gc_);
R.ensure();
auto N = R.move_as_ok();
auto E = get_block(get_empty_key_hash()).move_as_ok();
auto N = get_block(E.next).move_as_ok();
if (N.is_empty()) {
last_gc_ = N.next;
alarm_timestamp() = td::Timestamp::in(0.1);
return;
}
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<bool> R) {
auto block_id = N.block_id;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), block_id](td::Result<bool> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &CellDbIn::skip_gc);
} else {
@ -159,24 +184,19 @@ void CellDbIn::alarm() {
if (!value) {
td::actor::send_closure(SelfId, &CellDbIn::skip_gc);
} else {
td::actor::send_closure(SelfId, &CellDbIn::gc);
td::actor::send_closure(SelfId, &CellDbIn::gc, block_id);
}
}
});
td::actor::send_closure(root_db_, &RootDb::allow_state_gc, N.block_id, std::move(P));
td::actor::send_closure(root_db_, &RootDb::allow_state_gc, block_id, std::move(P));
}
void CellDbIn::gc() {
auto R = get_block(last_gc_);
R.ensure();
auto N = R.move_as_ok();
void CellDbIn::gc(BlockIdExt block_id) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
R.ensure();
td::actor::send_closure(SelfId, &CellDbIn::gc_cont, R.move_as_ok());
});
td::actor::send_closure(root_db_, &RootDb::get_block_handle_external, N.block_id, false, std::move(P));
td::actor::send_closure(root_db_, &RootDb::get_block_handle_external, block_id, false, std::move(P));
}
void CellDbIn::gc_cont(BlockHandle handle) {
@ -194,12 +214,30 @@ void CellDbIn::gc_cont(BlockHandle handle) {
}
void CellDbIn::gc_cont2(BlockHandle handle) {
td::PerfWarningTimer timer{"gccell", 0.1};
enqueue([this, handle = std::move(handle)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
return;
}
td::Promise<td::Unit> promise = [timer = td::PerfWarningTimer{"gccell", 0.1}](td::Result<td::Unit>) {};
auto FR = get_block(get_key_hash(handle->id()));
FR.ensure();
auto F = FR.move_as_ok();
auto cell = boc_->load_cell(F.root_hash.as_slice()).move_as_ok();
auto FR = get_block(last_gc_);
boc_->dec(cell);
boc_->prepare_commit_async(async_executor, [SelfId = actor_id(this), promise = std::move(promise),
block_id = handle->id()](td::Result<td::Unit> R) mutable {
R.ensure();
td::actor::send_closure(SelfId, &CellDbIn::gc_cont3, block_id, std::move(promise));
});
});
}
void CellDbIn::gc_cont3(BlockIdExt block_id, td::Promise<td::Unit> promise) {
auto key_hash = get_key_hash(block_id);
auto FR = get_block(key_hash);
FR.ensure();
auto F = FR.move_as_ok();
auto PR = get_block(F.prev);
PR.ensure();
auto P = PR.move_as_ok();
@ -214,14 +252,10 @@ void CellDbIn::gc_cont2(BlockHandle handle) {
N.next = N.prev;
}
auto cell = boc_->load_cell(F.root_hash.as_slice()).move_as_ok();
boc_->dec(cell);
boc_->prepare_commit().ensure();
vm::CellStorer stor{*cell_db_.get()};
vm::CellStorer stor{*cell_db_};
cell_db_->begin_write_batch().ensure();
boc_->commit(stor).ensure();
cell_db_->erase(get_key(last_gc_)).ensure();
cell_db_->erase(get_key(key_hash)).ensure();
set_block(F.prev, std::move(P));
set_block(F.next, std::move(N));
cell_db_->commit_write_batch().ensure();
@ -230,16 +264,33 @@ void CellDbIn::gc_cont2(BlockHandle handle) {
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot())).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
DCHECK(get_block(last_gc_).is_error());
last_gc_ = F.next;
DCHECK(get_block(key_hash).is_error());
promise.set_result(td::Unit());
release_db();
}
void CellDbIn::enqueue(td::Promise<td::Unit> promise) {
db_queue_.push(std::move(promise));
process_event();
}
void CellDbIn::release_db() {
db_busy_ = false;
process_event();
}
void CellDbIn::process_event() {
if (db_busy_ || db_queue_.empty()) {
return;
}
db_busy_ = true;
auto promise = std::move(db_queue_.front());
db_queue_.pop();
promise.set_result(td::Unit());
}
void CellDbIn::skip_gc() {
auto FR = get_block(last_gc_);
FR.ensure();
auto F = FR.move_as_ok();
last_gc_ = F.next;
alarm_timestamp() = td::Timestamp::in(0.01);
alarm_timestamp() = td::Timestamp::in(1.0);
}
std::string CellDbIn::get_key(KeyHash key_hash) {
@ -296,7 +347,7 @@ void CellDb::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise
} else {
promise.set_result(R.move_as_ok());
}
});
});
boc_->load_cell_async(hash.as_slice(), async_executor, std::move(P));
}
}

View file

@ -25,6 +25,7 @@
#include "ton/ton-types.h"
#include "interfaces/block-handle.h"
#include "auto/tl/ton_api.h"
#include <queue>
namespace ton {
@ -84,11 +85,14 @@ class CellDbIn : public CellDbBase {
static BlockIdExt get_empty_key();
KeyHash get_empty_key_hash();
void gc();
void gc(BlockIdExt block_id);
void gc_cont(BlockHandle handle);
void gc_cont2(BlockHandle handle);
void gc_cont3(BlockIdExt block_id, td::Promise<td::Unit> promise);
void skip_gc();
void store_cell_cont(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise);
td::actor::ActorId<RootDb> root_db_;
td::actor::ActorId<CellDb> parent_;
@ -97,7 +101,12 @@ class CellDbIn : public CellDbBase {
std::unique_ptr<vm::DynamicBagOfCellsDb> boc_;
std::shared_ptr<vm::KeyValue> cell_db_;
KeyHash last_gc_;
std::queue<td::Promise<td::Unit>> db_queue_;
bool db_busy_ = false;
void enqueue(td::Promise<td::Unit> promise);
void release_db();
void process_event();
};
class CellDb : public CellDbBase {

View file

@ -96,6 +96,7 @@ void FullNodeShardImpl::create_overlay() {
std::make_unique<Callback>(actor_id(this)), rules_, PSTRING() << "{ \"type\": \"shard\", \"shard_id\": " << get_shard() << ", \"workchain_id\": " << get_workchain() << " }");
td::actor::send_closure(rldp_, &rldp::Rldp::add_id, adnl_id_);
td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, adnl_id_);
if (cert_) {
td::actor::send_closure(overlays_, &overlay::Overlays::update_certificate, adnl_id_, overlay_id_, local_id_, cert_);
}
@ -108,15 +109,17 @@ void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broad
}
auto q = B.move_as_ok();
if (config_.ext_messages_broadcast_disabled_) {
promise.set_error(td::Status::Error("rebroadcasting external messages is disabled"));
promise = [manager = validator_manager_, message = q->message_->data_.clone()](td::Result<td::Unit> R) mutable {
if (R.is_ok()) {
td::actor::send_closure(manager, &ValidatorManagerInterface::new_external_message, std::move(message));
}
};
}
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::check_external_message,
std::move(q->message_->data_),
[promise = std::move(promise)](td::Result<td::Ref<ExtMessage>> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error());
} else {
promise.set_result(td::Unit());
}
});
promise.wrap([](td::Ref<ExtMessage>) { return td::Unit(); }));
}
void FullNodeShardImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) {
@ -685,6 +688,9 @@ void FullNodeShardImpl::send_ihr_message(td::BufferSlice data) {
}
void FullNodeShardImpl::send_external_message(td::BufferSlice data) {
if (config_.ext_messages_broadcast_disabled_) {
return;
}
if (!client_.empty()) {
td::actor::send_closure(client_, &adnl::AdnlExtClient::send_query, "send_ext_query",
create_serialize_tl_object_suffix<ton_api::tonNode_query>(
@ -768,9 +774,11 @@ void FullNodeShardImpl::download_zero_state(BlockIdExt id, td::uint32 priority,
void FullNodeShardImpl::download_persistent_state(BlockIdExt id, BlockIdExt masterchain_block_id, td::uint32 priority,
td::Timestamp timeout, td::Promise<td::BufferSlice> promise) {
auto &b = choose_neighbour();
td::actor::create_actor<DownloadState>(PSTRING() << "downloadstatereq" << id.id.to_str(), id, masterchain_block_id,
adnl_id_, overlay_id_, adnl::AdnlNodeIdShort::zero(), priority, timeout,
validator_manager_, rldp_, overlays_, adnl_, client_, std::move(promise))
adnl_id_, overlay_id_, b.adnl_id, priority, timeout, validator_manager_,
b.use_rldp2() ? (td::actor::ActorId<adnl::AdnlSenderInterface>)rldp2_ : rldp_,
overlays_, adnl_, client_, std::move(promise))
.release();
}
@ -805,8 +813,9 @@ void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, std::stri
td::Promise<std::string> promise) {
auto &b = choose_neighbour();
td::actor::create_actor<DownloadArchiveSlice>(
"archive", masterchain_seqno, std::move(tmp_dir), adnl_id_, overlay_id_, adnl::AdnlNodeIdShort::zero(), timeout,
validator_manager_, rldp_, overlays_, adnl_, client_, create_neighbour_promise(b, std::move(promise)))
"archive", masterchain_seqno, std::move(tmp_dir), adnl_id_, overlay_id_, b.adnl_id, timeout, validator_manager_,
b.use_rldp2() ? (td::actor::ActorId<adnl::AdnlSenderInterface>)rldp2_ : rldp_, overlays_, adnl_, client_,
create_neighbour_promise(b, std::move(promise)))
.release();
}
@ -1100,8 +1109,9 @@ void FullNodeShardImpl::ping_neighbours() {
}
FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id,
FileHash zero_state_file_hash, td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
FileHash zero_state_file_hash, FullNodeConfig config,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client)
@ -1112,18 +1122,21 @@ FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id,
, keyring_(keyring)
, adnl_(adnl)
, rldp_(rldp)
, rldp2_(rldp2)
, overlays_(overlays)
, validator_manager_(validator_manager)
, client_(client) {
, client_(client)
, config_(config) {
}
td::actor::ActorOwn<FullNodeShard> FullNodeShard::create(
ShardIdFull shard, PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager, td::actor::ActorId<adnl::AdnlExtClient> client) {
return td::actor::create_actor<FullNodeShardImpl>("tonnode", shard, local_id, adnl_id, zero_state_file_hash, keyring,
adnl, rldp, overlays, validator_manager, client);
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client) {
return td::actor::create_actor<FullNodeShardImpl>("tonnode", shard, local_id, adnl_id, zero_state_file_hash, config,
keyring, adnl, rldp, rldp2, overlays, validator_manager, client);
}
} // namespace fullnode

View file

@ -36,6 +36,7 @@ class FullNodeShard : public td::actor::Actor {
virtual ShardIdFull get_shard_full() const = 0;
virtual void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) = 0;
virtual void set_config(FullNodeConfig config) = 0;
virtual void send_ihr_message(td::BufferSlice data) = 0;
virtual void send_external_message(td::BufferSlice data) = 0;
@ -68,9 +69,10 @@ class FullNodeShard : public td::actor::Actor {
static td::actor::ActorOwn<FullNodeShard> create(
ShardIdFull shard, PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager, td::actor::ActorId<adnl::AdnlExtClient> client);
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client);
};
} // namespace fullnode

View file

@ -44,6 +44,10 @@ struct Neighbour {
void query_failed();
void update_roundtrip(double t);
bool use_rldp2() const {
return std::make_pair(proto_version, capabilities) >= std::make_pair<td::uint32, td::uint64>(2, 2);
}
static Neighbour zero;
};
@ -66,7 +70,7 @@ class FullNodeShardImpl : public FullNodeShard {
return 2;
}
static constexpr td::uint64 proto_capabilities() {
return 1;
return 2;
}
static constexpr td::uint32 max_neighbours() {
return 16;
@ -81,6 +85,10 @@ class FullNodeShardImpl : public FullNodeShard {
void create_overlay();
void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) override;
void set_config(FullNodeConfig config) override {
config_ = config;
}
//td::Result<Block> fetch_block(td::BufferSlice data);
void prevalidate_block(BlockIdExt block_id, td::BufferSlice data, td::BufferSlice proof,
td::Promise<ReceivedBlock> promise);
@ -198,9 +206,9 @@ class FullNodeShardImpl : public FullNodeShard {
}
FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id,
FileHash zero_state_file_hash, td::actor::ActorId<keyring::Keyring> keyring,
FileHash zero_state_file_hash, FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<rldp2::Rldp> rldp2, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client);
@ -220,6 +228,7 @@ class FullNodeShardImpl : public FullNodeShard {
td::actor::ActorId<keyring::Keyring> keyring_;
td::actor::ActorId<adnl::Adnl> adnl_;
td::actor::ActorId<rldp::Rldp> rldp_;
td::actor::ActorId<rldp2::Rldp> rldp2_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
td::actor::ActorId<adnl::AdnlExtClient> client_;
@ -239,6 +248,8 @@ class FullNodeShardImpl : public FullNodeShard {
td::Timestamp reload_neighbours_at_;
td::Timestamp ping_neighbours_at_;
adnl::AdnlNodeIdShort last_pinged_neighbour_ = adnl::AdnlNodeIdShort::zero();
FullNodeConfig config_;
};
} // namespace fullnode

View file

@ -20,6 +20,7 @@
#include "ton/ton-shard.h"
#include "ton/ton-io.hpp"
#include "td/actor/MultiPromise.h"
#include "full-node.h"
namespace ton {
@ -110,6 +111,13 @@ void FullNodeImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td:
local_id_ = adnl_id_.pubkey_hash();
}
void FullNodeImpl::set_config(FullNodeConfig config) {
config_ = config;
for (auto& shard : shards_) {
td::actor::send_closure(shard.second, &FullNodeShard::set_config, config);
}
}
void FullNodeImpl::initial_read_complete(BlockHandle top_handle) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
R.ensure();
@ -123,8 +131,8 @@ void FullNodeImpl::initial_read_complete(BlockHandle top_handle) {
void FullNodeImpl::add_shard(ShardIdFull shard) {
while (true) {
if (shards_.count(shard) == 0) {
shards_.emplace(shard, FullNodeShard::create(shard, local_id_, adnl_id_, zero_state_file_hash_, keyring_, adnl_,
rldp_, overlays_, validator_manager_, client_));
shards_.emplace(shard, FullNodeShard::create(shard, local_id_, adnl_id_, zero_state_file_hash_, config_, keyring_,
adnl_, rldp_, rldp2_, overlays_, validator_manager_, client_));
if (all_validators_.size() > 0) {
td::actor::send_closure(shards_[shard], &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
}
@ -449,8 +457,9 @@ void FullNodeImpl::start_up() {
}
FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<dht::Dht> dht,
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
td::actor::ActorId<rldp2::Rldp> rldp2, td::actor::ActorId<dht::Dht> dht,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, std::string db_root)
@ -460,24 +469,40 @@ FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id
, keyring_(keyring)
, adnl_(adnl)
, rldp_(rldp)
, rldp2_(rldp2)
, dht_(dht)
, overlays_(overlays)
, validator_manager_(validator_manager)
, client_(client)
, db_root_(db_root) {
, db_root_(db_root)
, config_(config) {
add_shard(ShardIdFull{masterchainId});
}
td::actor::ActorOwn<FullNode> FullNode::create(ton::PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id,
FileHash zero_state_file_hash,
FileHash zero_state_file_hash, FullNodeConfig config,
td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
td::actor::ActorId<dht::Dht> dht,
td::actor::ActorId<rldp2::Rldp> rldp2, td::actor::ActorId<dht::Dht> dht,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, std::string db_root) {
return td::actor::create_actor<FullNodeImpl>("fullnode", local_id, adnl_id, zero_state_file_hash, keyring, adnl, rldp,
dht, overlays, validator_manager, client, db_root);
return td::actor::create_actor<FullNodeImpl>("fullnode", local_id, adnl_id, zero_state_file_hash, config, keyring,
adnl, rldp, rldp2, dht, overlays, validator_manager, client, db_root);
}
FullNodeConfig::FullNodeConfig(const tl_object_ptr<ton_api::engine_validator_fullNodeConfig> &obj)
: ext_messages_broadcast_disabled_(obj->ext_messages_broadcast_disabled_) {
}
tl_object_ptr<ton_api::engine_validator_fullNodeConfig> FullNodeConfig::tl() const {
return create_tl_object<ton_api::engine_validator_fullNodeConfig>(ext_messages_broadcast_disabled_);
}
bool FullNodeConfig::operator==(const FullNodeConfig &rhs) const {
return ext_messages_broadcast_disabled_ == rhs.ext_messages_broadcast_disabled_;
}
bool FullNodeConfig::operator!=(const FullNodeConfig &rhs) const {
return !(*this == rhs);
}
} // namespace fullnode

View file

@ -27,6 +27,7 @@
#include "adnl/adnl.h"
#include "rldp/rldp.h"
#include "rldp2/rldp.h"
#include "dht/dht.h"
#include "overlay/overlays.h"
#include "validator/validator.h"
@ -44,6 +45,16 @@ constexpr int VERBOSITY_NAME(FULL_NODE_INFO) = verbosity_DEBUG;
constexpr int VERBOSITY_NAME(FULL_NODE_DEBUG) = verbosity_DEBUG;
constexpr int VERBOSITY_NAME(FULL_NODE_EXTRA_DEBUG) = verbosity_DEBUG + 1;
struct FullNodeConfig {
FullNodeConfig() = default;
FullNodeConfig(const tl_object_ptr<ton_api::engine_validator_fullNodeConfig>& obj);
tl_object_ptr<ton_api::engine_validator_fullNodeConfig> tl() const;
bool operator==(const FullNodeConfig& rhs) const;
bool operator!=(const FullNodeConfig& rhs) const;
bool ext_messages_broadcast_disabled_ = false;
};
class FullNode : public td::actor::Actor {
public:
virtual ~FullNode() = default;
@ -61,6 +72,7 @@ class FullNode : public td::actor::Actor {
td::Promise<td::Unit> promise) = 0;
virtual void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) = 0;
virtual void set_config(FullNodeConfig config) = 0;
static constexpr td::uint32 max_block_size() {
return 4 << 20;
@ -73,10 +85,10 @@ class FullNode : public td::actor::Actor {
}
static td::actor::ActorOwn<FullNode> create(ton::PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id,
FileHash zero_state_file_hash,
FileHash zero_state_file_hash, FullNodeConfig config,
td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
td::actor::ActorId<dht::Dht> dht,
td::actor::ActorId<rldp2::Rldp> rldp2, td::actor::ActorId<dht::Dht> dht,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, std::string db_root);

View file

@ -49,8 +49,8 @@ class FullNodeImpl : public FullNode {
std::shared_ptr<ton::overlay::Certificate> cert,
td::Promise<td::Unit> promise) override;
void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) override;
void set_config(FullNodeConfig config) override;
void add_shard(ShardIdFull shard);
void del_shard(ShardIdFull shard);
@ -82,9 +82,9 @@ class FullNodeImpl : public FullNode {
void start_up() override;
FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<dht::Dht> dht,
td::actor::ActorId<overlay::Overlays> overlays,
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<dht::Dht> dht, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, std::string db_root);
@ -101,6 +101,7 @@ class FullNodeImpl : public FullNode {
td::actor::ActorId<keyring::Keyring> keyring_;
td::actor::ActorId<adnl::Adnl> adnl_;
td::actor::ActorId<rldp::Rldp> rldp_;
td::actor::ActorId<rldp2::Rldp> rldp2_;
td::actor::ActorId<dht::Dht> dht_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
@ -112,6 +113,7 @@ class FullNodeImpl : public FullNode {
std::vector<PublicKeyHash> all_validators_;
std::set<PublicKeyHash> local_keys_;
FullNodeConfig config_;
};
} // namespace fullnode

View file

@ -29,7 +29,7 @@ namespace fullnode {
DownloadArchiveSlice::DownloadArchiveSlice(
BlockSeqno masterchain_seqno, std::string tmp_dir, adnl::AdnlNodeIdShort local_id,
overlay::OverlayIdShort overlay_id, adnl::AdnlNodeIdShort download_from, td::Timestamp timeout,
td::actor::ActorId<ValidatorManagerInterface> validator_manager, td::actor::ActorId<rldp::Rldp> rldp,
td::actor::ActorId<ValidatorManagerInterface> validator_manager, td::actor::ActorId<adnl::AdnlSenderInterface> rldp,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<adnl::AdnlExtClient> client, td::Promise<std::string> promise)
: masterchain_seqno_(masterchain_seqno)
@ -144,6 +144,8 @@ void DownloadArchiveSlice::got_archive_info(td::BufferSlice data) {
return;
}
prev_logged_timer_ = td::Timer();
LOG(INFO) << "downloading archive slice #" << masterchain_seqno_ << " from " << download_from_;
get_archive_slice();
}
@ -159,12 +161,12 @@ void DownloadArchiveSlice::get_archive_slice() {
auto q = create_serialize_tl_object<ton_api::tonNode_getArchiveSlice>(archive_id_, offset_, slice_size());
if (client_.empty()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, download_from_, local_id_, overlay_id_,
"get_archive_slice", std::move(P), td::Timestamp::in(3.0), std::move(q),
"get_archive_slice", std::move(P), td::Timestamp::in(15.0), std::move(q),
slice_size() + 1024, rldp_);
} else {
td::actor::send_closure(client_, &adnl::AdnlExtClient::send_query, "get_archive_slice",
create_serialize_tl_object_suffix<ton_api::tonNode_query>(std::move(q)),
td::Timestamp::in(1.0), std::move(P));
td::Timestamp::in(15.0), std::move(P));
}
}
@ -181,7 +183,16 @@ void DownloadArchiveSlice::got_archive_slice(td::BufferSlice data) {
offset_ += data.size();
double elapsed = prev_logged_timer_.elapsed();
if (elapsed > 10.0) {
prev_logged_timer_ = td::Timer();
LOG(INFO) << "downloading archive slice #" << masterchain_seqno_ << ": total=" << offset_ << " ("
<< td::format::as_size((td::uint64)(double(offset_ - prev_logged_sum_) / elapsed)) << "/s)";
prev_logged_sum_ = offset_;
}
if (data.size() < slice_size()) {
LOG(INFO) << "finished downloading arcrive slice #" << masterchain_seqno_ << ": total=" << offset_;
finish_query();
} else {
get_archive_slice();

View file

@ -21,7 +21,6 @@
#include "overlay/overlays.h"
#include "ton/ton-types.h"
#include "validator/validator.h"
#include "rldp/rldp.h"
#include "adnl/adnl-ext-client.h"
#include "td/utils/port/FileFd.h"
@ -36,9 +35,9 @@ class DownloadArchiveSlice : public td::actor::Actor {
DownloadArchiveSlice(BlockSeqno masterchain_seqno, std::string tmp_dir, adnl::AdnlNodeIdShort local_id,
overlay::OverlayIdShort overlay_id, adnl::AdnlNodeIdShort download_from, td::Timestamp timeout,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<adnl::AdnlExtClient> client,
td::Promise<std::string> promise);
td::actor::ActorId<adnl::AdnlSenderInterface> rldp,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<adnl::AdnlExtClient> client, td::Promise<std::string> promise);
void abort_query(td::Status reason);
void alarm() override;
@ -51,7 +50,7 @@ class DownloadArchiveSlice : public td::actor::Actor {
void got_archive_slice(td::BufferSlice data);
static constexpr td::uint32 slice_size() {
return 1 << 17;
return 1 << 21;
}
private:
@ -68,11 +67,14 @@ class DownloadArchiveSlice : public td::actor::Actor {
td::Timestamp timeout_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
td::actor::ActorId<rldp::Rldp> rldp_;
td::actor::ActorId<adnl::AdnlSenderInterface> rldp_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<adnl::Adnl> adnl_;
td::actor::ActorId<adnl::AdnlExtClient> client_;
td::Promise<std::string> promise_;
td::uint64 prev_logged_sum_ = 0;
td::Timer prev_logged_timer_;
};
} // namespace fullnode

View file

@ -201,12 +201,12 @@ void DownloadBlockNew::got_node_to_download(adnl::AdnlNodeIdShort node) {
}
if (client_.empty()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, download_from_, local_id_, overlay_id_,
"get_proof", std::move(P), td::Timestamp::in(3.0), std::move(q),
"get_proof", std::move(P), td::Timestamp::in(15.0), std::move(q),
FullNode::max_proof_size() + FullNode::max_block_size() + 128, rldp_);
} else {
td::actor::send_closure(client_, &adnl::AdnlExtClient::send_query, "get_prepare",
create_serialize_tl_object_suffix<ton_api::tonNode_query>(std::move(q)),
td::Timestamp::in(1.0), std::move(P));
td::Timestamp::in(15.0), std::move(P));
}
}

View file

@ -373,12 +373,12 @@ void DownloadBlock::got_block_data_description(td::BufferSlice data_description)
auto q = create_serialize_tl_object<ton_api::tonNode_downloadBlock>(create_tl_block_id(block_id_));
if (client_.empty()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, download_from_, local_id_,
overlay_id_, "get_block", std::move(P), td::Timestamp::in(3.0), std::move(q),
overlay_id_, "get_block", std::move(P), td::Timestamp::in(15.0), std::move(q),
FullNode::max_block_size(), rldp_);
} else {
td::actor::send_closure(client_, &adnl::AdnlExtClient::send_query, "get_block",
create_serialize_tl_object_suffix<ton_api::tonNode_query>(std::move(q)),
td::Timestamp::in(3.0), std::move(P));
td::Timestamp::in(15.0), std::move(P));
}
},
[&](ton_api::tonNode_notFound &val) {

View file

@ -32,9 +32,9 @@ DownloadState::DownloadState(BlockIdExt block_id, BlockIdExt masterchain_block_i
overlay::OverlayIdShort overlay_id, adnl::AdnlNodeIdShort download_from,
td::uint32 priority, td::Timestamp timeout,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<adnl::AdnlExtClient> client,
td::Promise<td::BufferSlice> promise)
td::actor::ActorId<adnl::AdnlSenderInterface> rldp,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<adnl::AdnlExtClient> client, td::Promise<td::BufferSlice> promise)
: block_id_(block_id)
, masterchain_block_id_(masterchain_block_id)
, local_id_(local_id)
@ -115,7 +115,7 @@ void DownloadState::got_block_handle(BlockHandle handle) {
void DownloadState::got_node_to_download(adnl::AdnlNodeIdShort node) {
download_from_ = node;
LOG(INFO) << "downloading state " << block_id_ << " from " << download_from_;
LOG(INFO) << "downloading state " << block_id_.to_str() << " from " << download_from_;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::BufferSlice> R) mutable {
if (R.is_error()) {
@ -192,8 +192,8 @@ void DownloadState::got_block_state_part(td::BufferSlice data, td::uint32 reques
double elapsed = prev_logged_timer_.elapsed();
if (elapsed > 10.0) {
prev_logged_timer_ = td::Timer();
LOG(INFO) << "downloading state " << block_id_ << ": total=" << sum_ <<
" (" << double(sum_ - prev_logged_sum_) / elapsed << " B/s)";
LOG(INFO) << "downloading state " << block_id_.to_str() << ": total=" << sum_ << " ("
<< td::format::as_size((td::uint64)(double(sum_ - prev_logged_sum_) / elapsed)) << "/s)";
prev_logged_sum_ = sum_;
}
@ -210,7 +210,7 @@ void DownloadState::got_block_state_part(td::BufferSlice data, td::uint32 reques
return;
}
td::uint32 part_size = 1 << 18;
td::uint32 part_size = 1 << 21;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), part_size](td::Result<td::BufferSlice> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &DownloadState::abort_query, R.move_as_error());
@ -223,18 +223,18 @@ void DownloadState::got_block_state_part(td::BufferSlice data, td::uint32 reques
create_tl_block_id(block_id_), create_tl_block_id(masterchain_block_id_), sum_, part_size);
if (client_.empty()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, download_from_, local_id_, overlay_id_,
"download state", std::move(P), td::Timestamp::in(10.0), std::move(query),
"download state", std::move(P), td::Timestamp::in(20.0), std::move(query),
FullNode::max_state_size(), rldp_);
} else {
td::actor::send_closure(client_, &adnl::AdnlExtClient::send_query, "download state",
create_serialize_tl_object_suffix<ton_api::tonNode_query>(std::move(query)),
td::Timestamp::in(10.0), std::move(P));
td::Timestamp::in(20.0), std::move(P));
}
}
void DownloadState::got_block_state(td::BufferSlice data) {
state_ = std::move(data);
LOG(INFO) << "finished downloading state " << block_id_ << ": total=" << sum_;
LOG(INFO) << "finished downloading state " << block_id_.to_str() << ": total=" << sum_;
finish_query();
}

View file

@ -21,7 +21,6 @@
#include "overlay/overlays.h"
#include "ton/ton-types.h"
#include "validator/validator.h"
#include "rldp/rldp.h"
#include "adnl/adnl-ext-client.h"
namespace ton {
@ -35,7 +34,7 @@ class DownloadState : public td::actor::Actor {
DownloadState(BlockIdExt block_id, BlockIdExt masterchain_block_id, adnl::AdnlNodeIdShort local_id,
overlay::OverlayIdShort overlay_id, adnl::AdnlNodeIdShort download_from, td::uint32 priority,
td::Timestamp timeout, td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<adnl::AdnlSenderInterface> rldp, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<adnl::AdnlExtClient> client,
td::Promise<td::BufferSlice> promise);
@ -62,7 +61,7 @@ class DownloadState : public td::actor::Actor {
td::Timestamp timeout_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
td::actor::ActorId<rldp::Rldp> rldp_;
td::actor::ActorId<adnl::AdnlSenderInterface> rldp_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<adnl::Adnl> adnl_;
td::actor::ActorId<adnl::AdnlExtClient> client_;

View file

@ -148,7 +148,7 @@ void AsyncStateSerializer::next_iteration() {
running_ = true;
delay_action(
[SelfId = actor_id(this), shard = shards_[next_idx_]]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard); },
td::Timestamp::in(td::Random::fast(0, 4 * 3600)));
td::Timestamp::in(td::Random::fast(0, 1800)));
return;
}
}