1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Merge branch 'testnet' into block-generation

This commit is contained in:
SpyCheese 2024-02-01 19:29:25 +03:00
commit f4fd3ff3be
246 changed files with 7895 additions and 5430 deletions

View file

@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR)
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
if (NOT OPENSSL_FOUND)
find_package(OpenSSL REQUIRED)
@ -53,6 +53,7 @@ set(VALIDATOR_HEADERS
invariants.hpp
import-db-slice.hpp
queue-size-counter.hpp
collator-node.hpp
manager-disk.h
@ -80,6 +81,7 @@ set(VALIDATOR_SOURCE
validator-full-id.cpp
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp
downloaders/wait-block-data.cpp
downloaders/wait-block-state.cpp
@ -101,7 +103,8 @@ set(DISK_VALIDATOR_SOURCE
validator-full-id.cpp
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp
downloaders/wait-block-data-disk.cpp
downloaders/wait-block-state.cpp
downloaders/wait-block-state-merge.cpp
@ -120,7 +123,8 @@ set(HARDFORK_VALIDATOR_SOURCE
validator-full-id.cpp
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp
downloaders/wait-block-data-disk.cpp
downloaders/wait-block-state.cpp
downloaders/wait-block-state-merge.cpp

View file

@ -23,6 +23,7 @@
#include "ton/ton-tl.hpp"
#include "ton/ton-io.hpp"
#include "common/delay.h"
namespace ton {
@ -62,16 +63,31 @@ void CellDbBase::execute_sync(std::function<void()> f) {
f();
}
CellDbIn::CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path)
: root_db_(root_db), parent_(parent), path_(std::move(path)) {
CellDbIn::CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path,
td::Ref<ValidatorManagerOptions> opts)
: root_db_(root_db), parent_(parent), path_(std::move(path)), opts_(opts) {
}
void CellDbIn::start_up() {
on_load_callback_ = [actor = std::make_shared<td::actor::ActorOwn<MigrationProxy>>(
td::actor::create_actor<MigrationProxy>("celldbmigration", actor_id(this))),
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
if (res.cell_.is_null()) {
return;
}
bool expected_stored_boc = res.cell_->get_depth() == compress_depth && compress_depth != 0;
if (expected_stored_boc != res.stored_boc_) {
td::actor::send_closure(*actor, &CellDbIn::MigrationProxy::migrate_cell,
td::Bits256{res.cell_->get_hash().bits()});
}
};
CellDbBase::start_up();
cell_db_ = std::make_shared<td::RocksDb>(td::RocksDb::open(path_).move_as_ok());
boc_ = vm::DynamicBagOfCellsDb::create();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot())).ensure();
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
alarm_timestamp() = td::Timestamp::in(10.0);
@ -129,7 +145,7 @@ void CellDbIn::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promi
set_block(key_hash, std::move(D));
cell_db_->commit_write_batch().ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot())).ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
promise.set_result(boc_->load_cell(cell->get_hash().as_slice()));
@ -140,6 +156,16 @@ void CellDbIn::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>>
}
void CellDbIn::alarm() {
if (migrate_after_ && migrate_after_.is_in_past()) {
migrate_cells();
}
if (migration_stats_ && migration_stats_->end_at_.is_in_past()) {
LOG(INFO) << "CellDb migration, " << migration_stats_->start_.elapsed()
<< "s stats: batches=" << migration_stats_->batches_ << " migrated=" << migration_stats_->migrated_cells_
<< " checked=" << migration_stats_->checked_cells_ << " time=" << migration_stats_->total_time_
<< " queue_size=" << cells_to_migrate_.size();
migration_stats_ = {};
}
auto E = get_block(get_empty_key_hash()).move_as_ok();
auto N = get_block(E.next).move_as_ok();
if (N.is_empty()) {
@ -220,7 +246,7 @@ void CellDbIn::gc_cont2(BlockHandle handle) {
cell_db_->commit_write_batch().ensure();
alarm_timestamp() = td::Timestamp::now();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot())).ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
DCHECK(get_block(key_hash).is_error());
@ -273,6 +299,66 @@ void CellDbIn::set_block(KeyHash key_hash, DbEntry e) {
cell_db_->set(td::as_slice(key), e.release()).ensure();
}
void CellDbIn::migrate_cell(td::Bits256 hash) {
cells_to_migrate_.insert(hash);
if (!migration_active_) {
migration_active_ = true;
migrate_after_ = td::Timestamp::in(10.0);
}
}
void CellDbIn::migrate_cells() {
migrate_after_ = td::Timestamp::never();
if (cells_to_migrate_.empty()) {
migration_active_ = false;
return;
}
td::Timer timer;
if (!migration_stats_) {
migration_stats_ = std::make_unique<MigrationStats>();
}
vm::CellStorer stor{*cell_db_};
auto loader = std::make_unique<vm::CellLoader>(cell_db_->snapshot());
boc_->set_loader(std::make_unique<vm::CellLoader>(*loader)).ensure();
cell_db_->begin_write_batch().ensure();
td::uint32 checked = 0, migrated = 0;
for (auto it = cells_to_migrate_.begin(); it != cells_to_migrate_.end() && checked < 128; ) {
++checked;
td::Bits256 hash = *it;
it = cells_to_migrate_.erase(it);
auto R = loader->load(hash.as_slice(), true, boc_->as_ext_cell_creator());
if (R.is_error()) {
continue;
}
if (R.ok().status == vm::CellLoader::LoadResult::NotFound) {
continue;
}
bool expected_stored_boc =
R.ok().cell_->get_depth() == opts_->get_celldb_compress_depth() && opts_->get_celldb_compress_depth() != 0;
if (expected_stored_boc != R.ok().stored_boc_) {
++migrated;
stor.set(R.ok().refcnt(), R.ok().cell_, expected_stored_boc).ensure();
}
}
cell_db_->commit_write_batch().ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
double time = timer.elapsed();
LOG(DEBUG) << "CellDb migration: migrated=" << migrated << " checked=" << checked << " time=" << time;
++migration_stats_->batches_;
migration_stats_->migrated_cells_ += migrated;
migration_stats_->checked_cells_ += checked;
migration_stats_->total_time_ += time;
if (cells_to_migrate_.empty()) {
migration_active_ = false;
} else {
delay_action([SelfId = actor_id(this)] { td::actor::send_closure(SelfId, &CellDbIn::migrate_cells); },
td::Timestamp::in(time * 2));
}
}
void CellDb::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise) {
if (!started_) {
td::actor::send_closure(cell_db_, &CellDbIn::load_cell, hash, std::move(promise));
@ -300,7 +386,20 @@ void CellDb::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> p
void CellDb::start_up() {
CellDbBase::start_up();
boc_ = vm::DynamicBagOfCellsDb::create();
cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_);
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_);
on_load_callback_ = [actor = std::make_shared<td::actor::ActorOwn<CellDbIn::MigrationProxy>>(
td::actor::create_actor<CellDbIn::MigrationProxy>("celldbmigration", cell_db_.get())),
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
if (res.cell_.is_null()) {
return;
}
bool expected_stored_boc = res.cell_->get_depth() == compress_depth && compress_depth != 0;
if (expected_stored_boc != res.stored_boc_) {
td::actor::send_closure(*actor, &CellDbIn::MigrationProxy::migrate_cell,
td::Bits256{res.cell_->get_hash().bits()});
}
};
}
CellDbIn::DbEntry::DbEntry(tl_object_ptr<ton_api::db_celldb_value> entry)

View file

@ -25,6 +25,7 @@
#include "ton/ton-types.h"
#include "interfaces/block-handle.h"
#include "auto/tl/ton_api.h"
#include "validator.h"
namespace ton {
@ -53,7 +54,10 @@ class CellDbIn : public CellDbBase {
void store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise);
void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise);
CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path);
void migrate_cell(td::Bits256 hash);
CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path,
td::Ref<ValidatorManagerOptions> opts);
void start_up() override;
void alarm() override;
@ -89,13 +93,44 @@ class CellDbIn : public CellDbBase {
void gc_cont2(BlockHandle handle);
void skip_gc();
void migrate_cells();
td::actor::ActorId<RootDb> root_db_;
td::actor::ActorId<CellDb> parent_;
std::string path_;
td::Ref<ValidatorManagerOptions> opts_;
std::unique_ptr<vm::DynamicBagOfCellsDb> boc_;
std::shared_ptr<vm::KeyValue> cell_db_;
std::function<void(const vm::CellLoader::LoadResult&)> on_load_callback_;
std::set<td::Bits256> cells_to_migrate_;
td::Timestamp migrate_after_ = td::Timestamp::never();
bool migration_active_ = false;
struct MigrationStats {
td::Timer start_;
td::Timestamp end_at_ = td::Timestamp::in(60.0);
size_t batches_ = 0;
size_t migrated_cells_ = 0;
size_t checked_cells_ = 0;
double total_time_ = 0.0;
};
std::unique_ptr<MigrationStats> migration_stats_;
public:
class MigrationProxy : public td::actor::Actor {
public:
explicit MigrationProxy(td::actor::ActorId<CellDbIn> cell_db) : cell_db_(cell_db) {
}
void migrate_cell(td::Bits256 hash) {
td::actor::send_closure(cell_db_, &CellDbIn::migrate_cell, hash);
}
private:
td::actor::ActorId<CellDbIn> cell_db_;
};
};
class CellDb : public CellDbBase {
@ -104,11 +139,12 @@ class CellDb : public CellDbBase {
void store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise);
void update_snapshot(std::unique_ptr<td::KeyValueReader> snapshot) {
started_ = true;
boc_->set_loader(std::make_unique<vm::CellLoader>(std::move(snapshot))).ensure();
boc_->set_loader(std::make_unique<vm::CellLoader>(std::move(snapshot), on_load_callback_)).ensure();
}
void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise);
CellDb(td::actor::ActorId<RootDb> root_db, std::string path) : root_db_(root_db), path_(path) {
CellDb(td::actor::ActorId<RootDb> root_db, std::string path, td::Ref<ValidatorManagerOptions> opts)
: root_db_(root_db), path_(path), opts_(opts) {
}
void start_up() override;
@ -116,11 +152,14 @@ class CellDb : public CellDbBase {
private:
td::actor::ActorId<RootDb> root_db_;
std::string path_;
td::Ref<ValidatorManagerOptions> opts_;
td::actor::ActorOwn<CellDbIn> cell_db_;
std::unique_ptr<vm::DynamicBagOfCellsDb> boc_;
bool started_ = false;
std::function<void(const vm::CellLoader::LoadResult&)> on_load_callback_;
};
} // namespace validator

View file

@ -397,7 +397,7 @@ void RootDb::get_hardforks(td::Promise<std::vector<BlockIdExt>> promise) {
}
void RootDb::start_up() {
cell_db_ = td::actor::create_actor<CellDb>("celldb", actor_id(this), root_path_ + "/celldb/");
cell_db_ = td::actor::create_actor<CellDb>("celldb", actor_id(this), root_path_ + "/celldb/", opts_);
state_db_ = td::actor::create_actor<StateDb>("statedb", actor_id(this), root_path_ + "/state/");
static_files_db_ = td::actor::create_actor<StaticFilesDb>("staticfilesdb", actor_id(this), root_path_ + "/static/");
archive_db_ = td::actor::create_actor<ArchiveManager>("archive", actor_id(this), root_path_);

View file

@ -26,6 +26,7 @@
#include "statedb.hpp"
#include "staticfilesdb.hpp"
#include "archive-manager.hpp"
#include "validator.h"
namespace ton {
@ -34,8 +35,9 @@ namespace validator {
class RootDb : public Db {
public:
enum class Flags : td::uint32 { f_started = 1, f_ready = 2, f_switched = 4, f_archived = 8 };
RootDb(td::actor::ActorId<ValidatorManager> validator_manager, std::string root_path)
: validator_manager_(validator_manager), root_path_(std::move(root_path)) {
RootDb(td::actor::ActorId<ValidatorManager> validator_manager, std::string root_path,
td::Ref<ValidatorManagerOptions> opts)
: validator_manager_(validator_manager), root_path_(std::move(root_path)), opts_(opts) {
}
void start_up() override;
@ -141,6 +143,7 @@ class RootDb : public Db {
td::actor::ActorId<ValidatorManager> validator_manager_;
std::string root_path_;
td::Ref<ValidatorManagerOptions> opts_;
td::actor::ActorOwn<CellDb> cell_db_;
td::actor::ActorOwn<StateDb> state_db_;

View file

@ -20,6 +20,7 @@
#include "interfaces/validator-manager.h"
#include "interfaces/db.h"
#include "validator.h"
namespace ton {
@ -28,7 +29,8 @@ namespace validator {
enum ValidateMode { fake = 1, full_collated_data = 2 };
enum CollateMode { skip_store_candidate = 1 };
td::actor::ActorOwn<Db> create_db_actor(td::actor::ActorId<ValidatorManager> manager, std::string db_root_);
td::actor::ActorOwn<Db> create_db_actor(td::actor::ActorId<ValidatorManager> manager, std::string db_root_,
td::Ref<ValidatorManagerOptions> opts);
td::actor::ActorOwn<LiteServerCache> create_liteserver_cache_actor(td::actor::ActorId<ValidatorManager> manager,
std::string db_root);

View file

@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR)
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
if (NOT OPENSSL_FOUND)
find_package(OpenSSL REQUIRED)

View file

@ -60,7 +60,6 @@ class Collator final : public td::actor::Actor {
bool preinit_complete{false};
bool is_key_block_{false};
bool block_full_{false};
bool outq_cleanup_partial_{false};
bool inbound_queues_empty_{false};
bool libraries_changed_{false};
bool prev_key_block_exists_{false};
@ -192,6 +191,7 @@ class Collator final : public td::actor::Actor {
std::pair<ton::LogicalTime, ton::Bits256> last_proc_int_msg_, first_unproc_int_msg_;
std::unique_ptr<vm::AugmentedDictionary> in_msg_dict, out_msg_dict, old_out_msg_queue_, out_msg_queue_,
sibling_out_msg_queue_;
td::uint32 out_msg_queue_size_ = 0;
std::unique_ptr<vm::Dictionary> ihr_pending;
std::shared_ptr<block::MsgProcessedUptoCollection> processed_upto_, sibling_processed_upto_;
std::unique_ptr<vm::Dictionary> block_create_stats_;
@ -210,7 +210,7 @@ class Collator final : public td::actor::Actor {
//
block::Account* lookup_account(td::ConstBitPtr addr) const;
std::unique_ptr<block::Account> make_account_from(td::ConstBitPtr addr, Ref<vm::CellSlice> account,
Ref<vm::CellSlice> extra, bool force_create = false);
bool force_create);
td::Result<block::Account*> make_account(td::ConstBitPtr addr, bool force_create = false);
td::actor::ActorId<Collator> get_self() {
return actor_id(this);
@ -235,6 +235,7 @@ class Collator final : public td::actor::Actor {
bool fix_processed_upto(block::MsgProcessedUptoCollection& upto);
void got_neighbor_msg_queues(td::Result<std::map<BlockIdExt, Ref<OutMsgQueueProof>>> R);
void got_neighbor_msg_queue(unsigned i, Ref<OutMsgQueueProof> res);
void got_out_queue_size(size_t i, td::Result<td::uint32> res);
bool adjust_shard_config();
bool store_shard_fees(ShardIdFull shard, const block::CurrencyCollection& fees,
const block::CurrencyCollection& created);
@ -253,7 +254,7 @@ class Collator final : public td::actor::Actor {
Ref<vm::Cell>& in_msg);
bool create_ticktock_transactions(int mask);
bool create_ticktock_transaction(const ton::StdSmcAddress& smc_addr, ton::LogicalTime req_start_lt, int mask);
Ref<vm::Cell> create_ordinary_transaction(Ref<vm::Cell> msg_root);
Ref<vm::Cell> create_ordinary_transaction(Ref<vm::Cell> msg_root, bool is_special_tx = false);
bool check_cur_validator_set();
bool unpack_last_mc_state();
bool unpack_last_state();
@ -269,6 +270,7 @@ class Collator final : public td::actor::Actor {
bool check_prev_block_exact(const BlockIdExt& listed, const BlockIdExt& prev);
bool check_this_shard_mc_info();
bool request_neighbor_msg_queues();
bool request_out_msg_queue_size();
void update_max_lt(ton::LogicalTime lt);
bool is_masterchain() const {
return shard_.is_masterchain();
@ -279,10 +281,6 @@ class Collator final : public td::actor::Actor {
void after_get_external_messages(td::Result<std::vector<Ref<ExtMessage>>> res);
td::Result<bool> register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash);
// td::Result<bool> register_external_message(td::Slice ext_msg_boc);
td::Result<bool> register_ihr_message_cell(Ref<vm::Cell> ihr_msg);
td::Result<bool> register_ihr_message(td::Slice ihr_msg_boc);
td::Result<bool> register_shard_signatures_cell(Ref<vm::Cell> shard_blk_signatures);
td::Result<bool> register_shard_signatures(td::Slice shard_blk_signatures_boc);
void register_new_msg(block::NewOutMsg msg);
void register_new_msgs(block::transaction::Transaction& trans);
bool process_new_messages(bool enqueue_only = false);
@ -296,7 +294,7 @@ class Collator final : public td::actor::Actor {
bool enqueue_message(block::NewOutMsg msg, td::RefInt256 fwd_fees_remaining, ton::LogicalTime enqueued_lt);
bool enqueue_transit_message(Ref<vm::Cell> msg, Ref<vm::Cell> old_msg_env, ton::AccountIdPrefixFull prev_prefix,
ton::AccountIdPrefixFull cur_prefix, ton::AccountIdPrefixFull dest_prefix,
td::RefInt256 fwd_fee_remaining, ton::LogicalTime enqueued_lt);
td::RefInt256 fwd_fee_remaining);
bool delete_out_msg_queue_msg(td::ConstBitPtr key);
bool insert_in_msg(Ref<vm::Cell> in_msg);
bool insert_out_msg(Ref<vm::Cell> out_msg);

File diff suppressed because it is too large Load diff

View file

@ -102,7 +102,7 @@ void ExtMessageQ::run_message(td::BufferSlice data, block::SizeLimitsConfig::Ext
run_fetch_account_state(
wc, addr, manager,
[promise = std::move(promise), msg_root = root, wc,
[promise = std::move(promise), msg_root = root, wc, addr,
M](td::Result<std::tuple<td::Ref<vm::CellSlice>, UnixTime, LogicalTime, std::unique_ptr<block::ConfigInfo>>>
res) mutable {
if (res.is_error()) {
@ -114,7 +114,8 @@ void ExtMessageQ::run_message(td::BufferSlice data, block::SizeLimitsConfig::Ext
auto utime = std::get<1>(tuple);
auto lt = std::get<2>(tuple);
auto config = std::move(std::get<3>(tuple));
if (!acc.unpack(shard_acc, {}, utime, false)) {
bool special = wc == masterchainId && config->is_special_smartcontract(addr);
if (!acc.unpack(shard_acc, utime, special)) {
promise.set_error(td::Status::Error(PSLICE() << "Failed to unpack account state"));
} else {
auto status = run_message_on_account(wc, &acc, utime, lt + 1, msg_root, std::move(config));
@ -155,6 +156,7 @@ td::Status ExtMessageQ::run_message_on_account(ton::WorkchainId wc,
}
compute_phase_cfg_.libraries = std::make_unique<vm::Dictionary>(config->get_libraries_root(), 256);
compute_phase_cfg_.with_vm_log = true;
compute_phase_cfg_.stop_on_accept_message = true;
auto res = Collator::impl_create_ordinary_transaction(msg_root, acc, utime, lt,
&storage_phase_cfg_, &compute_phase_cfg_,

View file

@ -39,8 +39,9 @@ namespace ton {
namespace validator {
td::actor::ActorOwn<Db> create_db_actor(td::actor::ActorId<ValidatorManager> manager, std::string db_root_) {
return td::actor::create_actor<RootDb>("db", manager, db_root_);
td::actor::ActorOwn<Db> create_db_actor(td::actor::ActorId<ValidatorManager> manager, std::string db_root_,
td::Ref<ValidatorManagerOptions> opts) {
return td::actor::create_actor<RootDb>("db", manager, db_root_, opts);
}
td::actor::ActorOwn<LiteServerCache> create_liteserver_cache_actor(td::actor::ActorId<ValidatorManager> manager,
@ -202,10 +203,12 @@ void run_validate_query(ShardIdFull shard, BlockIdExt min_masterchain_block_id,
}
}
bool is_fake = mode & ValidateMode::fake;
td::actor::create_actor<ValidateQuery>(
PSTRING() << (is_fake ? "fakevalidate" : "validateblock") << shard.to_str() << ":" << (seqno + 1), shard,
min_masterchain_block_id, std::move(prev), std::move(candidate), std::move(validator_set), std::move(manager),
timeout, std::move(promise), mode)
static std::atomic<size_t> idx;
td::actor::create_actor<ValidateQuery>(PSTRING() << (is_fake ? "fakevalidate" : "validateblock") << shard.to_str()
<< ":" << (seqno + 1) << "#" << idx.fetch_add(1),
shard, min_masterchain_block_id, std::move(prev), std::move(candidate),
std::move(validator_set), std::move(manager), timeout, std::move(promise),
mode)
.release();
}

View file

@ -505,20 +505,7 @@ void LiteQuery::perform_sendMessage(td::BufferSlice data) {
}
void LiteQuery::get_block_handle_checked(BlockIdExt blkid, td::Promise<ConstBlockHandle> promise) {
auto P = td::PromiseCreator::lambda(
[promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error());
} else {
auto handle = R.move_as_ok();
if (handle->is_applied()) {
promise.set_result(std::move(handle));
} else {
promise.set_error(td::Status::Error(ErrorCode::notready, "block is not applied"));
}
}
});
td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, blkid, false, std::move(P));
td::actor::send_closure(manager_, &ValidatorManager::get_block_handle_for_litequery, blkid, std::move(promise));
}
bool LiteQuery::request_mc_block_data(BlockIdExt blkid) {
@ -1047,7 +1034,8 @@ bool LiteQuery::make_state_root_proof(Ref<vm::Cell>& proof, Ref<vm::Cell> state_
vm::MerkleProofBuilder pb{std::move(block_root)};
block::gen::Block::Record blk;
block::gen::BlockInfo::Record info;
if (!(tlb::unpack_cell(pb.root(), blk) && tlb::unpack_cell(blk.info, info))) {
if (!(tlb::unpack_cell(pb.root(), blk) && tlb::unpack_cell(blk.info, info) &&
block::gen::BlkPrevInfo(info.after_merge).validate_ref(info.prev_ref))) {
return fatal_error("cannot unpack block header");
}
vm::CellSlice upd_cs{vm::NoVmSpec(), blk.state_update};
@ -1497,17 +1485,12 @@ void LiteQuery::continue_getTransactions(unsigned remaining, bool exact) {
LOG(DEBUG) << "sending get_block_by_lt_from_db() query to manager for " << acc_workchain_ << ":" << acc_addr_.to_hex()
<< " " << trans_lt_;
td::actor::send_closure_later(
manager_, &ValidatorManager::get_block_by_lt_from_db, ton::extract_addr_prefix(acc_workchain_, acc_addr_),
manager_, &ValidatorManager::get_block_by_lt_from_db_for_litequery, ton::extract_addr_prefix(acc_workchain_, acc_addr_),
trans_lt_, [Self = actor_id(this), remaining, manager = manager_](td::Result<ConstBlockHandle> res) {
if (res.is_error()) {
td::actor::send_closure(Self, &LiteQuery::abort_getTransactions, res.move_as_error(), ton::BlockIdExt{});
} else {
auto handle = res.move_as_ok();
if (!handle->is_applied()) {
td::actor::send_closure(Self, &LiteQuery::abort_getTransactions, td::Status::Error(ErrorCode::notready, "block is not applied"),
ton::BlockIdExt{});
return;
}
LOG(DEBUG) << "requesting data for block " << handle->id().to_str();
td::actor::send_closure_later(manager, &ValidatorManager::get_block_data_from_db, handle,
[Self, blkid = handle->id(), remaining](td::Result<Ref<BlockData>> res) {
@ -1846,10 +1829,6 @@ void LiteQuery::perform_lookupBlock(BlockId blkid, int mode, LogicalTime lt, Uni
td::actor::send_closure(Self, &LiteQuery::abort_query, res.move_as_error());
} else {
auto handle = res.move_as_ok();
if (!handle->is_applied()) {
td::actor::send_closure(Self, &LiteQuery::abort_query, td::Status::Error(ErrorCode::notready, "block is not applied"));
return;
}
LOG(DEBUG) << "requesting data for block " << handle->id().to_str();
td::actor::send_closure_later(manager, &ValidatorManager::get_block_data_from_db, handle,
[Self, blkid = handle->id(), mode](td::Result<Ref<BlockData>> res) {
@ -1865,13 +1844,14 @@ void LiteQuery::perform_lookupBlock(BlockId blkid, int mode, LogicalTime lt, Uni
ton::AccountIdPrefixFull pfx{blkid.workchain, blkid.shard};
if (mode & 2) {
td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_lt_from_db, pfx, lt, std::move(P));
td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_lt_from_db_for_litequery, pfx, lt,
std::move(P));
} else if (mode & 4) {
td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_unix_time_from_db, pfx, utime,
td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_unix_time_from_db_for_litequery, pfx, utime,
std::move(P));
} else {
td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_seqno_from_db, pfx, blkid.seqno,
std::move(P));
td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_seqno_from_db_for_litequery, pfx,
blkid.seqno, std::move(P));
}
}
@ -2629,7 +2609,7 @@ void LiteQuery::perform_getShardBlockProof(BlockIdExt blkid) {
}
AccountIdPrefixFull pfx{masterchainId, shardIdAll};
td::actor::send_closure_later(
manager, &ValidatorManager::get_block_by_seqno_from_db, pfx, handle->masterchain_ref_block(),
manager, &ValidatorManager::get_block_by_seqno_from_db_for_litequery, pfx, handle->masterchain_ref_block(),
[Self, manager](td::Result<ConstBlockHandle> R) {
if (R.is_error()) {
td::actor::send_closure(Self, &LiteQuery::abort_query, R.move_as_error());

File diff suppressed because it is too large Load diff

View file

@ -195,6 +195,7 @@ class ValidateQuery : public td::actor::Actor {
ton::LogicalTime prev_key_block_lt_;
std::unique_ptr<block::BlockLimits> block_limits_;
std::unique_ptr<block::BlockLimitStatus> block_limit_status_;
td::uint64 total_gas_used_{0}, total_special_gas_used_{0};
LogicalTime start_lt_, end_lt_;
UnixTime prev_now_{~0u}, now_{~0u};
@ -344,8 +345,7 @@ class ValidateQuery : public td::actor::Actor {
td::Bits256& msg_hash);
bool check_in_queue();
bool check_delivered_dequeued();
std::unique_ptr<block::Account> make_account_from(td::ConstBitPtr addr, Ref<vm::CellSlice> account,
Ref<vm::CellSlice> extra);
std::unique_ptr<block::Account> make_account_from(td::ConstBitPtr addr, Ref<vm::CellSlice> account);
std::unique_ptr<block::Account> unpack_account(td::ConstBitPtr addr);
bool check_one_transaction(block::Account& account, LogicalTime lt, Ref<vm::Cell> trans_root, bool is_first,
bool is_last);

View file

@ -170,6 +170,14 @@ class ValidatorManager : public ValidatorManagerInterface {
virtual void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) = 0;
virtual void get_block_handle_for_litequery(BlockIdExt block_id, td::Promise<ConstBlockHandle> promise) = 0;
virtual void get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt,
td::Promise<ConstBlockHandle> promise) = 0;
virtual void get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts,
td::Promise<ConstBlockHandle> promise) = 0;
virtual void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno,
td::Promise<ConstBlockHandle> promise) = 0;
virtual void validated_new_block(BlockIdExt block_id) = 0;
virtual void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) = 0;

View file

@ -901,7 +901,7 @@ void ValidatorManagerImpl::send_top_shard_block_description(td::Ref<ShardTopBloc
}
void ValidatorManagerImpl::start_up() {
db_ = create_db_actor(actor_id(this), db_root_);
db_ = create_db_actor(actor_id(this), db_root_, opts_);
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<ValidatorManagerInitResult> R) {
R.ensure();

View file

@ -23,6 +23,7 @@
#include "validator-group.hpp"
#include "manager-init.h"
#include "manager-disk.h"
#include "queue-size-counter.hpp"
#include <map>
#include <set>
@ -385,6 +386,28 @@ class ValidatorManagerImpl : public ValidatorManager {
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override {
UNREACHABLE();
}
void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) override {
if (queue_size_counter_.empty()) {
queue_size_counter_ =
td::actor::create_actor<QueueSizeCounter>("queuesizecounter", td::Ref<MasterchainState>{}, actor_id(this));
}
td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise));
}
void get_block_handle_for_litequery(BlockIdExt block_id, td::Promise<ConstBlockHandle> promise) override {
get_block_handle(block_id, false, promise.wrap([](BlockHandle &&handle) -> ConstBlockHandle { return handle; }));
}
void get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt,
td::Promise<ConstBlockHandle> promise) override {
get_block_by_lt_from_db(account, lt, std::move(promise));
}
void get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts,
td::Promise<ConstBlockHandle> promise) override {
get_block_by_unix_time_from_db(account, ts, std::move(promise));
}
void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno,
td::Promise<ConstBlockHandle> promise) override {
get_block_by_seqno_from_db(account, seqno, std::move(promise));
}
void validated_new_block(BlockIdExt block_id) override {
}
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) override {
@ -422,6 +445,7 @@ class ValidatorManagerImpl : public ValidatorManager {
int pending_new_shard_block_descr_{0};
std::vector<td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>>> waiting_new_shard_block_descr_;
td::actor::ActorOwn<QueueSizeCounter> queue_size_counter_;
void update_shards();
void update_shard_blocks();

View file

@ -549,7 +549,7 @@ void ValidatorManagerImpl::register_block_handle(BlockHandle handle, td::Promise
}
void ValidatorManagerImpl::start_up() {
db_ = create_db_actor(actor_id(this), db_root_);
db_ = create_db_actor(actor_id(this), db_root_, opts_);
}
void ValidatorManagerImpl::try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) {

View file

@ -23,6 +23,7 @@
#include "validator-group.hpp"
#include "manager-init.h"
#include "manager-hardfork.h"
#include "queue-size-counter.hpp"
#include <map>
#include <set>
@ -447,6 +448,28 @@ class ValidatorManagerImpl : public ValidatorManager {
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override {
UNREACHABLE();
}
void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) override {
if (queue_size_counter_.empty()) {
queue_size_counter_ =
td::actor::create_actor<QueueSizeCounter>("queuesizecounter", td::Ref<MasterchainState>{}, actor_id(this));
}
td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise));
}
void get_block_handle_for_litequery(BlockIdExt block_id, td::Promise<ConstBlockHandle> promise) override {
get_block_handle(block_id, false, promise.wrap([](BlockHandle &&handle) -> ConstBlockHandle { return handle; }));
}
void get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt,
td::Promise<ConstBlockHandle> promise) override {
get_block_by_lt_from_db(account, lt, std::move(promise));
}
void get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts,
td::Promise<ConstBlockHandle> promise) override {
get_block_by_unix_time_from_db(account, ts, std::move(promise));
}
void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno,
td::Promise<ConstBlockHandle> promise) override {
get_block_by_seqno_from_db(account, seqno, std::move(promise));
}
void validated_new_block(BlockIdExt block_id) override {
}
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) override {
@ -473,6 +496,7 @@ class ValidatorManagerImpl : public ValidatorManager {
std::string db_root_;
ShardIdFull shard_to_generate_;
BlockIdExt block_to_generate_;
td::actor::ActorOwn<QueueSizeCounter> queue_size_counter_;
};
} // namespace validator

View file

@ -629,6 +629,12 @@ void ValidatorManagerImpl::wait_block_state(BlockHandle handle, td::uint32 prior
return promise.set_error(
td::Status::Error(PSTRING() << "not monitoring shard " << handle->id().shard_full().to_str()));
}
auto it0 = block_state_cache_.find(handle->id());
if (it0 != block_state_cache_.end()) {
it0->second.ttl_ = td::Timestamp::in(30.0);
promise.set_result(it0->second.state_);
return;
}
auto it = wait_state_.find(handle->id());
if (it == wait_state_.end()) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Ref<ShardState>> R) {
@ -889,6 +895,8 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t
void ValidatorManagerImpl::get_external_messages(ShardIdFull shard,
td::Promise<std::vector<td::Ref<ExtMessage>>> promise) {
td::Timer t;
size_t processed = 0, deleted = 0;
std::vector<td::Ref<ExtMessage>> res;
MessageId<ExtMessage> left{AccountIdPrefixFull{shard.workchain, shard.shard & (shard.shard - 1)}, Bits256::zero()};
auto it = ext_messages_.lower_bound(left);
@ -897,10 +905,12 @@ void ValidatorManagerImpl::get_external_messages(ShardIdFull shard,
if (!shard_contains(shard, s.dst)) {
break;
}
++processed;
if (it->second->expired()) {
ext_addr_messages_[it->second->address()].erase(it->first.hash);
ext_messages_hashes_.erase(it->first.hash);
it = ext_messages_.erase(it);
++deleted;
continue;
}
if (it->second->is_active()) {
@ -908,6 +918,9 @@ void ValidatorManagerImpl::get_external_messages(ShardIdFull shard,
}
it++;
}
LOG(WARNING) << "get_external_messages to shard " << shard.to_str() << " : time=" << t.elapsed()
<< " result_size=" << res.size() << " processed=" << processed << " expired=" << deleted
<< " total_size=" << ext_messages_.size();
promise.set_value(std::move(res));
}
@ -1102,6 +1115,9 @@ void ValidatorManagerImpl::get_block_by_seqno_from_db(AccountIdPrefixFull accoun
}
void ValidatorManagerImpl::finished_wait_state(BlockHandle handle, td::Result<td::Ref<ShardState>> R) {
if (R.is_ok()) {
block_state_cache_[handle->id()] = {R.ok(), td::Timestamp::in(30.0)};
}
auto it = wait_state_.find(handle->id());
if (it != wait_state_.end()) {
if (R.is_error()) {
@ -1461,7 +1477,18 @@ td::Ref<MasterchainState> ValidatorManagerImpl::do_get_last_liteserver_state() {
if (last_masterchain_state_.is_null()) {
return {};
}
if (last_liteserver_state_.is_null() || last_liteserver_state_->get_unix_time() < td::Clocks::system() - 30) {
if (last_liteserver_state_.is_null()) {
last_liteserver_state_ = last_masterchain_state_;
return last_liteserver_state_;
}
if (last_liteserver_state_->get_seqno() == last_masterchain_state_->get_seqno()) {
return last_liteserver_state_;
}
// If liteserver seqno (i.e. shard client) lags then use last masterchain state for liteserver
// Allowed lag depends on the block rate
double time_per_block = double(last_masterchain_state_->get_unix_time() - last_liteserver_state_->get_unix_time()) /
double(last_masterchain_state_->get_seqno() - last_liteserver_state_->get_seqno());
if (td::Clocks::system() - double(last_liteserver_state_->get_unix_time()) > std::min(time_per_block * 8, 180.0)) {
last_liteserver_state_ = last_masterchain_state_;
}
return last_liteserver_state_;
@ -1562,7 +1589,7 @@ void ValidatorManagerImpl::send_get_out_msg_queue_proof_request(
}
void ValidatorManagerImpl::start_up() {
db_ = create_db_actor(actor_id(this), db_root_);
db_ = create_db_actor(actor_id(this), db_root_, opts_);
lite_server_cache_ = create_liteserver_cache_actor(actor_id(this), db_root_);
token_manager_ = td::actor::create_actor<TokenManager>("tokenmanager");
td::mkdir(db_root_ + "/tmp/").ensure();
@ -2448,7 +2475,15 @@ void ValidatorManagerImpl::allow_block_info_gc(BlockIdExt block_id, td::Promise<
void ValidatorManagerImpl::got_next_gc_masterchain_handle(BlockHandle handle) {
CHECK(gc_advancing_);
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Ref<ShardState>> R) {
R.ensure();
if (R.is_error()) {
if (R.error().code() == ErrorCode::timeout) {
LOG(ERROR) << "Failed to get gc masterchain state, retrying: " << R.move_as_error();
td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_gc_masterchain_handle, std::move(handle));
} else {
LOG(FATAL) << "Failed to get gc masterchain state: " << R.move_as_error();
}
return;
}
td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_next_gc_masterchain_state, std::move(handle),
td::Ref<MasterchainState>{R.move_as_ok()});
});
@ -2479,8 +2514,11 @@ void ValidatorManagerImpl::update_shard_client_block_handle(BlockHandle handle,
td::Promise<td::Unit> promise) {
shard_client_handle_ = std::move(handle);
auto seqno = shard_client_handle_->id().seqno();
if (last_liteserver_state_.is_null() || last_liteserver_state_->get_block_id().seqno() < seqno) {
last_liteserver_state_ = std::move(state);
if (state.not_null()) {
shard_client_shards_ = state->get_shards();
if (last_liteserver_state_.is_null() || last_liteserver_state_->get_block_id().seqno() < seqno) {
last_liteserver_state_ = std::move(state);
}
}
shard_client_update(seqno);
promise.set_value(td::Unit());
@ -2519,15 +2557,15 @@ void ValidatorManagerImpl::alarm() {
}
if (log_status_at_.is_in_past()) {
if (last_masterchain_block_handle_) {
LOG(INFO) << "STATUS: last_masterchain_block_ago="
<< td::format::as_time(td::Clocks::system() - last_masterchain_block_handle_->unix_time())
<< " last_known_key_block_ago="
<< td::format::as_time(td::Clocks::system() - (last_known_key_block_handle_->inited_unix_time()
? last_known_key_block_handle_->unix_time()
: 0))
<< " shard_client_ago="
<< td::format::as_time(td::Clocks::system() -
(shard_client_handle_ ? shard_client_handle_->unix_time() : 0));
LOG(ERROR) << "STATUS: last_masterchain_block_ago="
<< td::format::as_time(td::Clocks::system() - last_masterchain_block_handle_->unix_time())
<< " last_known_key_block_ago="
<< td::format::as_time(td::Clocks::system() - (last_known_key_block_handle_->inited_unix_time()
? last_known_key_block_handle_->unix_time()
: 0))
<< " shard_client_ago="
<< td::format::as_time(td::Clocks::system() -
(shard_client_handle_ ? shard_client_handle_->unix_time() : 0));
}
log_status_at_ = td::Timestamp::in(60.0);
}
@ -2553,6 +2591,31 @@ void ValidatorManagerImpl::alarm() {
for (auto &w : shard_client_waiters_) {
w.second.check_timers();
}
for (auto it = block_state_cache_.begin(); it != block_state_cache_.end();) {
bool del = it->second.ttl_.is_in_past();
if (del) {
auto block_id = it->first;
if (block_id.is_masterchain()) {
if (block_id.seqno() == last_masterchain_seqno_) {
it->second.ttl_ = td::Timestamp::in(30.0);
del = false;
}
} else if (last_masterchain_state_.not_null()) {
auto shard = last_masterchain_state_->get_shard_from_config(block_id.shard_full());
if (shard.not_null()) {
if (block_id.seqno() == shard->top_block_id().seqno()) {
it->second.ttl_ = td::Timestamp::in(30.0);
del = false;
}
}
}
}
if (del) {
it = block_state_cache_.erase(it);
} else {
++it;
}
}
}
alarm_timestamp().relax(check_waiters_at_);
if (check_shard_clients_.is_in_past()) {
@ -2577,8 +2640,8 @@ void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_bloc
}
void ValidatorManagerImpl::get_shard_client_state(bool from_db, td::Promise<BlockIdExt> promise) {
if (!shard_client_.empty() && !from_db) {
td::actor::send_closure(shard_client_, &ShardClient::get_processed_masterchain_block_id, std::move(promise));
if (shard_client_handle_ && !from_db) {
promise.set_result(shard_client_handle_->id());
} else {
td::actor::send_closure(db_, &Db::get_shard_client_state, std::move(promise));
}
@ -2776,18 +2839,19 @@ void ValidatorManagerImpl::log_validator_session_stats(BlockIdExt block_id,
}
std::vector<tl_object_ptr<ton_api::validatorSession_statsRound>> rounds;
for (const auto& round : stats.rounds) {
for (const auto &round : stats.rounds) {
std::vector<tl_object_ptr<ton_api::validatorSession_statsProducer>> producers;
for (const auto& producer : round.producers) {
for (const auto &producer : round.producers) {
producers.push_back(create_tl_object<ton_api::validatorSession_statsProducer>(
producer.id.bits256_value(), producer.block_status, producer.block_timestamp));
producer.id.bits256_value(), producer.candidate_id, producer.block_status, producer.block_timestamp,
producer.comment));
}
rounds.push_back(create_tl_object<ton_api::validatorSession_statsRound>(round.timestamp, std::move(producers)));
}
auto obj = create_tl_object<ton_api::validatorSession_stats>(
create_tl_block_id_simple(block_id.id), stats.timestamp, stats.self.bits256_value(),
stats.creator.bits256_value(), stats.total_validators, stats.total_weight, stats.signatures,
stats.success, create_tl_block_id(block_id), stats.timestamp, stats.self.bits256_value(), stats.session_id,
stats.cc_seqno, stats.creator.bits256_value(), stats.total_validators, stats.total_weight, stats.signatures,
stats.signatures_weight, stats.approve_signatures, stats.approve_signatures_weight, stats.first_round,
std::move(rounds));
std::string s = td::json_encode<std::string>(td::ToJson(*obj), false);
@ -2801,6 +2865,143 @@ void ValidatorManagerImpl::log_validator_session_stats(BlockIdExt block_id,
LOG(INFO) << "Writing validator session stats for " << block_id.id;
}
void ValidatorManagerImpl::get_block_handle_for_litequery(BlockIdExt block_id, td::Promise<ConstBlockHandle> promise) {
get_block_handle(
block_id, false,
[SelfId = actor_id(this), block_id, promise = std::move(promise)](td::Result<BlockHandle> R) mutable {
if (R.is_ok() && R.ok()->is_applied()) {
promise.set_value(R.move_as_ok());
} else {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_block_handle_for_litequery_error, block_id,
std::move(R), std::move(promise));
}
});
}
void ValidatorManagerImpl::get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt,
td::Promise<ConstBlockHandle> promise) {
get_block_by_lt_from_db(
account, lt, [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result<ConstBlockHandle> R) mutable {
if (R.is_ok() && R.ok()->is_applied()) {
promise.set_value(R.move_as_ok());
} else {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_lookup_block_for_litequery_error, account, 0,
lt, std::move(R), std::move(promise));
}
});
}
void ValidatorManagerImpl::get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts,
td::Promise<ConstBlockHandle> promise) {
get_block_by_unix_time_from_db(
account, ts, [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result<ConstBlockHandle> R) mutable {
if (R.is_ok() && R.ok()->is_applied()) {
promise.set_value(R.move_as_ok());
} else {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_lookup_block_for_litequery_error, account, 1,
ts, std::move(R), std::move(promise));
}
});
}
void ValidatorManagerImpl::get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno,
td::Promise<ConstBlockHandle> promise) {
get_block_by_seqno_from_db(
account, seqno,
[=, SelfId = actor_id(this), promise = std::move(promise)](td::Result<ConstBlockHandle> R) mutable {
if (R.is_ok() && R.ok()->is_applied()) {
promise.set_value(R.move_as_ok());
} else {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_lookup_block_for_litequery_error, account, 2,
seqno, std::move(R), std::move(promise));
}
});
}
void ValidatorManagerImpl::process_block_handle_for_litequery_error(BlockIdExt block_id,
td::Result<BlockHandle> r_handle,
td::Promise<ConstBlockHandle> promise) {
td::Status err;
if (r_handle.is_error()) {
err = r_handle.move_as_error();
} else {
auto handle = r_handle.move_as_ok();
if (handle->is_applied()) {
promise.set_value(std::move(handle));
return;
}
if (!handle->received() || !handle->received_state()) {
err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << block_id.id.to_str() << " is not in db");
} else {
err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << block_id.id.to_str() << " is not applied");
}
}
if (block_id.is_masterchain()) {
if (block_id.seqno() > last_masterchain_seqno_) {
err = err.move_as_error_suffix(PSTRING() << " (last known masterchain block: " << last_masterchain_seqno_ << ")");
}
} else {
for (auto &shard : shard_client_shards_) {
if (shard_intersects(shard->shard(), block_id.shard_full())) {
if (block_id.seqno() > shard->top_block_id().seqno()) {
err = err.move_as_error_suffix(
PSTRING() << " (possibly out of sync: shard_client_seqno="
<< (shard_client_handle_ ? shard_client_handle_->id().seqno() : 0) << " ls_seqno="
<< (last_liteserver_state_.not_null() ? last_liteserver_state_->get_seqno() : 0) << ")");
}
break;
}
}
}
promise.set_error(std::move(err));
}
void ValidatorManagerImpl::process_lookup_block_for_litequery_error(AccountIdPrefixFull account, int type,
td::uint64 value,
td::Result<ConstBlockHandle> r_handle,
td::Promise<ConstBlockHandle> promise) {
td::Status err;
if (r_handle.is_error()) {
err = r_handle.move_as_error();
} else {
auto handle = r_handle.move_as_ok();
if (handle->is_applied()) {
promise.set_value(std::move(handle));
return;
}
if (!handle->received() || !handle->received_state()) {
err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << handle->id().to_str() << " is not in db");
} else {
err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << handle->id().to_str() << " is not applied");
}
}
if (account.is_masterchain()) {
if (value > (type == 0
? last_masterchain_state_->get_logical_time()
: (type == 1 ? last_masterchain_state_->get_unix_time() : last_masterchain_state_->get_seqno()))) {
err = err.move_as_error_suffix(PSTRING() << " (last known masterchain block: " << last_masterchain_seqno_ << ")");
}
} else {
for (auto &shard : shard_client_shards_) {
if (shard_intersects(shard->shard(), account.as_leaf_shard())) {
if (value > (type == 0 ? shard->end_lt()
: (type == 1 ? (shard_client_handle_ ? shard_client_handle_->unix_time() : 0)
: shard->top_block_id().seqno()))) {
err = err.move_as_error_suffix(
PSTRING() << " (possibly out of sync: shard_client_seqno="
<< (shard_client_handle_ ? shard_client_handle_->id().seqno() : 0) << " ls_seqno="
<< (last_liteserver_state_.not_null() ? last_liteserver_state_->get_seqno() : 0) << ")");
}
break;
}
}
}
static std::string names[3] = {"lt", "utime", "seqno"};
err = err.move_as_error_prefix(PSTRING() << "cannot find block " << account.to_str() << " " << names[type] << "="
<< value << ": ");
promise.set_error(std::move(err));
}
void ValidatorManagerImpl::get_validator_sessions_info(
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) {
std::vector<td::actor::ActorId<ValidatorGroup>> groups;

View file

@ -28,6 +28,7 @@
#include "state-serializer.hpp"
#include "rldp/rldp.h"
#include "token-manager.h"
#include "queue-size-counter.hpp"
#include "collator-node.hpp"
#include <map>
@ -190,6 +191,12 @@ class ValidatorManagerImpl : public ValidatorManager {
std::map<BlockIdExt, WaitList<WaitBlockState, td::Ref<ShardState>>> wait_state_;
std::map<BlockIdExt, WaitList<WaitBlockData, td::Ref<BlockData>>> wait_block_data_;
struct CachedBlockState {
td::Ref<ShardState> state_;
td::Timestamp ttl_;
};
std::map<BlockIdExt, CachedBlockState> block_state_cache_;
struct WaitBlockHandle {
std::vector<td::Promise<BlockHandle>> waiting_;
};
@ -262,6 +269,7 @@ class ValidatorManagerImpl : public ValidatorManager {
BlockHandle last_key_block_handle_;
BlockHandle last_known_key_block_handle_;
BlockHandle shard_client_handle_;
std::vector<td::Ref<McShardHash>> shard_client_shards_;
td::Ref<MasterchainState> last_liteserver_state_;
td::Ref<MasterchainState> do_get_last_liteserver_state();
@ -582,6 +590,31 @@ class ValidatorManagerImpl : public ValidatorManager {
void del_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override;
void update_options(td::Ref<ValidatorManagerOptions> opts) override;
void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) override {
if (queue_size_counter_.empty()) {
if (last_masterchain_state_.is_null()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "not ready"));
return;
}
queue_size_counter_ = td::actor::create_actor<QueueSizeCounter>("queuesizecounter",
last_masterchain_state_, actor_id(this));
}
td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise));
}
void get_block_handle_for_litequery(BlockIdExt block_id, td::Promise<ConstBlockHandle> promise) override;
void get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt,
td::Promise<ConstBlockHandle> promise) override;
void get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts,
td::Promise<ConstBlockHandle> promise) override;
void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno,
td::Promise<ConstBlockHandle> promise) override;
void process_block_handle_for_litequery_error(BlockIdExt block_id, td::Result<BlockHandle> r_handle,
td::Promise<ConstBlockHandle> promise);
void process_lookup_block_for_litequery_error(AccountIdPrefixFull account, int type, td::uint64 value,
td::Result<ConstBlockHandle> r_handle,
td::Promise<ConstBlockHandle> promise);
private:
td::Timestamp resend_shard_blocks_at_;
td::Timestamp check_waiters_at_;
@ -655,6 +688,7 @@ class ValidatorManagerImpl : public ValidatorManager {
}
std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_;
td::actor::ActorOwn<QueueSizeCounter> queue_size_counter_;
struct Collator {
td::actor::ActorOwn<CollatorNode> actor;

View file

@ -0,0 +1,301 @@
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#include "queue-size-counter.hpp"
#include "block/block-auto.h"
#include "block/block-parse.h"
#include "common/delay.h"
#include "td/actor/MultiPromise.h"
#include "td/utils/Random.h"
namespace ton::validator {
static td::Result<td::uint32> calc_queue_size(const td::Ref<ShardState> &state) {
td::uint32 size = 0;
TRY_RESULT(outq_descr, state->message_queue());
block::gen::OutMsgQueueInfo::Record qinfo;
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
return td::Status::Error("invalid message queue");
}
vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
bool ok = queue.check_for_each([&](td::Ref<vm::CellSlice>, td::ConstBitPtr, int) -> bool {
++size;
return true;
});
if (!ok) {
return td::Status::Error("invalid message queue dict");
}
return size;
}
static td::Result<td::uint32> recalc_queue_size(const td::Ref<ShardState> &state, const td::Ref<ShardState> &prev_state,
td::uint32 prev_size) {
TRY_RESULT(outq_descr, state->message_queue());
block::gen::OutMsgQueueInfo::Record qinfo;
if (!tlb::unpack_cell(outq_descr->root_cell(), qinfo)) {
return td::Status::Error("invalid message queue");
}
vm::AugmentedDictionary queue{qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
TRY_RESULT(prev_outq_descr, prev_state->message_queue());
block::gen::OutMsgQueueInfo::Record prev_qinfo;
if (!tlb::unpack_cell(prev_outq_descr->root_cell(), prev_qinfo)) {
return td::Status::Error("invalid message queue");
}
vm::AugmentedDictionary prev_queue{prev_qinfo.out_queue->prefetch_ref(0), 352, block::tlb::aug_OutMsgQueue};
td::uint32 add = 0, rem = 0;
bool ok = prev_queue.scan_diff(
queue, [&](td::ConstBitPtr, int, td::Ref<vm::CellSlice> prev_val, td::Ref<vm::CellSlice> new_val) -> bool {
if (prev_val.not_null()) {
++rem;
}
if (new_val.not_null()) {
++add;
}
return true;
});
if (!ok) {
return td::Status::Error("invalid message queue dict");
}
if (prev_size + add < rem) {
return td::Status::Error("negative value");
}
return prev_size + add - rem;
}
void QueueSizeCounter::start_up() {
if (init_masterchain_state_.is_null()) {
// Used in manager-hardfork or manager-disk
simple_mode_ = true;
return;
}
current_seqno_ = init_masterchain_state_->get_seqno();
process_top_shard_blocks_cont(init_masterchain_state_, true);
init_masterchain_state_ = {};
alarm();
}
void QueueSizeCounter::get_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) {
get_queue_size_ex(block_id, simple_mode_ || is_block_too_old(block_id), std::move(promise));
}
void QueueSizeCounter::get_queue_size_ex(ton::BlockIdExt block_id, bool calc_whole, td::Promise<td::uint32> promise) {
Entry &entry = results_[block_id];
if (entry.done_) {
promise.set_result(entry.queue_size_);
return;
}
entry.promises_.push_back(std::move(promise));
if (entry.started_) {
return;
}
entry.started_ = true;
entry.calc_whole_ = calc_whole;
td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, block_id, true,
[SelfId = actor_id(this), block_id, manager = manager_](td::Result<BlockHandle> R) mutable {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, block_id, R.move_as_error());
return;
}
BlockHandle handle = R.move_as_ok();
td::actor::send_closure(
manager, &ValidatorManager::wait_block_state, handle, 0, td::Timestamp::in(10.0),
[SelfId, handle](td::Result<td::Ref<ShardState>> R) mutable {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, handle->id(),
R.move_as_error());
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont,
std::move(handle), R.move_as_ok());
});
});
}
void QueueSizeCounter::get_queue_size_cont(BlockHandle handle, td::Ref<ShardState> state) {
Entry &entry = results_[handle->id()];
CHECK(entry.started_);
bool calc_whole = entry.calc_whole_ || handle->id().seqno() == 0;
if (!calc_whole) {
CHECK(handle->inited_prev());
auto prev_blocks = handle->prev();
bool after_split = prev_blocks.size() == 1 && handle->id().shard_full() != prev_blocks[0].shard_full();
bool after_merge = prev_blocks.size() == 2;
calc_whole = after_split || after_merge;
}
if (calc_whole) {
auto r_size = calc_queue_size(state);
if (r_size.is_error()) {
on_error(handle->id(), r_size.move_as_error());
return;
}
entry.done_ = true;
entry.queue_size_ = r_size.move_as_ok();
for (auto &promise : entry.promises_) {
promise.set_result(entry.queue_size_);
}
entry.promises_.clear();
return;
}
auto prev_block_id = handle->one_prev(true);
get_queue_size(prev_block_id, [=, SelfId = actor_id(this), manager = manager_](td::Result<td::uint32> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error());
return;
}
td::uint32 prev_size = R.move_as_ok();
td::actor::send_closure(
manager, &ValidatorManager::wait_block_state_short, prev_block_id, 0, td::Timestamp::in(10.0),
[=](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &QueueSizeCounter::on_error, state->get_block_id(), R.move_as_error());
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_cont2, state, R.move_as_ok(), prev_size);
});
});
}
void QueueSizeCounter::get_queue_size_cont2(td::Ref<ShardState> state, td::Ref<ShardState> prev_state,
td::uint32 prev_size) {
BlockIdExt block_id = state->get_block_id();
Entry &entry = results_[block_id];
CHECK(entry.started_);
auto r_size = recalc_queue_size(state, prev_state, prev_size);
if (r_size.is_error()) {
on_error(block_id, r_size.move_as_error());
return;
}
entry.done_ = true;
entry.queue_size_ = r_size.move_as_ok();
for (auto &promise : entry.promises_) {
promise.set_result(entry.queue_size_);
}
entry.promises_.clear();
}
void QueueSizeCounter::on_error(ton::BlockIdExt block_id, td::Status error) {
auto it = results_.find(block_id);
if (it == results_.end()) {
return;
}
Entry &entry = it->second;
CHECK(!entry.done_);
for (auto &promise : entry.promises_) {
promise.set_error(error.clone());
}
results_.erase(it);
}
void QueueSizeCounter::process_top_shard_blocks() {
LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks seqno=" << current_seqno_;
td::actor::send_closure(
manager_, &ValidatorManager::get_block_by_seqno_from_db, AccountIdPrefixFull{masterchainId, 0}, current_seqno_,
[SelfId = actor_id(this), manager = manager_](td::Result<ConstBlockHandle> R) {
if (R.is_error()) {
LOG(WARNING) << "Failed to get masterchain block id: " << R.move_as_error();
delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); },
td::Timestamp::in(5.0));
return;
}
td::actor::send_closure(
manager, &ValidatorManager::wait_block_state_short, R.ok()->id(), 0, td::Timestamp::in(10.0),
[=](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
LOG(WARNING) << "Failed to get masterchain state: " << R.move_as_error();
delay_action([=]() { td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks); },
td::Timestamp::in(5.0));
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_cont,
td::Ref<MasterchainState>(R.move_as_ok()), false);
});
});
}
void QueueSizeCounter::process_top_shard_blocks_cont(td::Ref<MasterchainState> state, bool init) {
LOG(DEBUG) << "QueueSizeCounter::process_top_shard_blocks_cont seqno=" << current_seqno_ << " init=" << init;
td::MultiPromise mp;
auto ig = mp.init_guard();
last_top_blocks_.clear();
last_top_blocks_.push_back(state->get_block_id());
for (auto &shard : state->get_shards()) {
last_top_blocks_.push_back(shard->top_block_id());
}
for (const BlockIdExt &block_id : last_top_blocks_) {
get_queue_size_ex_retry(block_id, init, ig.get_promise());
}
ig.add_promise([SelfId = actor_id(this)](td::Result<td::Unit> R) {
if (R.is_error()) {
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks_finish);
});
if (init) {
init_top_blocks_ = last_top_blocks_;
}
}
void QueueSizeCounter::get_queue_size_ex_retry(BlockIdExt block_id, bool calc_whole, td::Promise<td::Unit> promise) {
get_queue_size_ex(block_id, calc_whole,
[=, promise = std::move(promise), SelfId = actor_id(this)](td::Result<td::uint32> R) mutable {
if (R.is_error()) {
LOG(WARNING) << "Failed to calculate queue size for block " << block_id.to_str() << ": "
<< R.move_as_error();
delay_action(
[=, promise = std::move(promise)]() mutable {
td::actor::send_closure(SelfId, &QueueSizeCounter::get_queue_size_ex_retry, block_id,
calc_whole, std::move(promise));
},
td::Timestamp::in(5.0));
return;
}
promise.set_result(td::Unit());
});
}
void QueueSizeCounter::process_top_shard_blocks_finish() {
++current_seqno_;
wait_shard_client();
}
void QueueSizeCounter::wait_shard_client() {
LOG(DEBUG) << "QueueSizeCounter::wait_shard_client seqno=" << current_seqno_;
td::actor::send_closure(
manager_, &ValidatorManager::wait_shard_client_state, current_seqno_, td::Timestamp::in(60.0),
[SelfId = actor_id(this)](td::Result<td::Unit> R) {
if (R.is_error()) {
delay_action([=]() mutable { td::actor::send_closure(SelfId, &QueueSizeCounter::wait_shard_client); },
td::Timestamp::in(5.0));
return;
}
td::actor::send_closure(SelfId, &QueueSizeCounter::process_top_shard_blocks);
});
}
void QueueSizeCounter::alarm() {
for (auto it = results_.begin(); it != results_.end();) {
if (it->second.done_ && is_block_too_old(it->first)) {
it = results_.erase(it);
} else {
++it;
}
}
alarm_timestamp() = td::Timestamp::in(td::Random::fast(20.0, 40.0));
}
} // namespace ton::validator

View file

@ -0,0 +1,82 @@
/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "interfaces/validator-manager.h"
namespace ton::validator {
class QueueSizeCounter : public td::actor::Actor {
public:
QueueSizeCounter(td::Ref<MasterchainState> last_masterchain_state, td::actor::ActorId<ValidatorManager> manager)
: init_masterchain_state_(last_masterchain_state), manager_(std::move(manager)) {
}
void start_up() override;
void get_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise);
void alarm() override;
private:
td::Ref<MasterchainState> init_masterchain_state_;
td::actor::ActorId<ValidatorManager> manager_;
bool simple_mode_ = false;
BlockSeqno current_seqno_ = 0;
std::vector<BlockIdExt> init_top_blocks_;
std::vector<BlockIdExt> last_top_blocks_;
struct Entry {
bool started_ = false;
bool done_ = false;
bool calc_whole_ = false;
td::uint32 queue_size_ = 0;
std::vector<td::Promise<td::uint32>> promises_;
};
std::map<BlockIdExt, Entry> results_;
void get_queue_size_ex(BlockIdExt block_id, bool calc_whole, td::Promise<td::uint32> promise);
void get_queue_size_cont(BlockHandle handle, td::Ref<ShardState> state);
void get_queue_size_cont2(td::Ref<ShardState> state, td::Ref<ShardState> prev_state, td::uint32 prev_size);
void on_error(BlockIdExt block_id, td::Status error);
void process_top_shard_blocks();
void process_top_shard_blocks_cont(td::Ref<MasterchainState> state, bool init = false);
void get_queue_size_ex_retry(BlockIdExt block_id, bool calc_whole, td::Promise<td::Unit> promise);
void process_top_shard_blocks_finish();
void wait_shard_client();
bool is_block_too_old(const BlockIdExt& block_id) const {
for (const BlockIdExt& top_block : last_top_blocks_) {
if (shard_intersects(block_id.shard_full(), top_block.shard_full())) {
if (block_id.seqno() + 100 < top_block.seqno()) {
return true;
}
break;
}
}
for (const BlockIdExt& init_top_block : init_top_blocks_) {
if (shard_intersects(block_id.shard_full(), init_top_block.shard_full())) {
if (block_id.seqno() < init_top_block.seqno()) {
return true;
}
break;
}
}
return false;
}
};
} // namespace ton::validator

View file

@ -88,10 +88,6 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat
return;
}
if (approved_candidates_cache_round_ != round_id) {
approved_candidates_cache_round_ = round_id;
approved_candidates_cache_.clear();
}
auto next_block_id = create_next_block_id(block.id.root_hash, block.id.file_hash);
block.id = next_block_id;
@ -119,7 +115,7 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat
auto v = R.move_as_ok();
v.visit(td::overloaded(
[&](UnixTime ts) {
td::actor::send_closure(SelfId, &ValidatorGroup::update_approve_cache, round_id, block_to_cache_key(block),
td::actor::send_closure(SelfId, &ValidatorGroup::update_approve_cache, block_to_cache_key(block),
ts);
promise.set_result(ts);
},
@ -133,17 +129,14 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat
P.set_error(td::Status::Error(ErrorCode::notready, "validator group not started"));
return;
}
VLOG(VALIDATOR_DEBUG) << "validating block candidate " << next_block_id.to_str();
VLOG(VALIDATOR_DEBUG) << "validating block candidate " << next_block_id;
block.id = next_block_id;
run_validate_query(shard_, min_masterchain_block_id_, prev_block_ids_, std::move(block), validator_set_, manager_,
td::Timestamp::in(15.0), std::move(P),
collator_config_.full_collated_data ? ValidateMode::full_collated_data : 0);
}
void ValidatorGroup::update_approve_cache(td::uint32 round_id, CacheKey key, UnixTime value) {
if (approved_candidates_cache_round_ != round_id) {
return;
}
void ValidatorGroup::update_approve_cache(CacheKey key, UnixTime value) {
approved_candidates_cache_[key] = value;
}
@ -153,6 +146,7 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
std::vector<BlockSignature> approve_signatures,
validatorsession::ValidatorSessionStats stats,
td::Promise<td::Unit> promise) {
stats.cc_seqno = validator_set_->get_catchain_seqno();
if (round_id >= last_known_round_id_) {
last_known_round_id_ = round_id + 1;
}
@ -167,6 +161,7 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
return;
}
auto next_block_id = create_next_block_id(root_hash, file_hash);
LOG(WARNING) << "Accepted block " << next_block_id;
td::actor::send_closure(manager_, &ValidatorManager::log_validator_session_stats, next_block_id, std::move(stats));
auto block =
block_data.size() > 0 ? create_block(next_block_id, std::move(block_data)).move_as_ok() : td::Ref<BlockData>{};
@ -190,8 +185,30 @@ void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData>
return;
}
LOG_CHECK(R.error().code() == ErrorCode::timeout || R.error().code() == ErrorCode::notready) << R.move_as_error();
td::actor::send_closure(SelfId, &ValidatorGroup::accept_block_query, block_id, std::move(block), std::move(prev),
std::move(sig_set), std::move(approve_sig_set), false, std::move(promise), true);
td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block),
std::move(prev), std::move(sig_set), std::move(approve_sig_set), std::move(promise));
} else {
promise.set_value(R.move_as_ok());
}
});
run_accept_block_query(next_block_id, std::move(block), prev_block_ids_, validator_set_, std::move(sig_set),
std::move(approve_sig_set), src == local_id_, manager_, std::move(P));
prev_block_ids_ = std::vector<BlockIdExt>{next_block_id};
cached_collated_block_ = nullptr;
approved_candidates_cache_.clear();
}
void ValidatorGroup::retry_accept_block_query(BlockIdExt block_id, td::Ref<BlockData> block,
std::vector<BlockIdExt> prev, td::Ref<BlockSignatureSet> sig_set,
td::Ref<BlockSignatureSet> approve_sig_set,
td::Promise<td::Unit> promise) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), block_id, block, prev, sig_set, approve_sig_set,
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
LOG_CHECK(R.error().code() == ErrorCode::timeout) << R.move_as_error();
td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block),
std::move(prev), std::move(sig_set), std::move(approve_sig_set), std::move(promise));
} else {
promise.set_value(R.move_as_ok());
}
@ -343,6 +360,7 @@ void ValidatorGroup::start(std::vector<BlockIdExt> prev, BlockIdExt min_masterch
prev_block_ids_ = prev;
min_masterchain_block_id_ = min_masterchain_block_id;
cached_collated_block_ = nullptr;
approved_candidates_cache_.clear();
started_ = true;
if (init_) {
@ -365,6 +383,19 @@ void ValidatorGroup::start(std::vector<BlockIdExt> prev, BlockIdExt min_masterch
void ValidatorGroup::destroy() {
if (!session_.empty()) {
td::actor::send_closure(session_, &validatorsession::ValidatorSession::get_current_stats,
[manager = manager_, cc_seqno = validator_set_->get_catchain_seqno(),
block_id = create_next_block_id(RootHash::zero(), FileHash::zero())](
td::Result<validatorsession::ValidatorSessionStats> R) {
if (R.is_error()) {
LOG(WARNING) << "Failed to get validator session stats: " << R.move_as_error();
return;
}
auto stats = R.move_as_ok();
stats.cc_seqno = cc_seqno;
td::actor::send_closure(manager, &ValidatorManager::log_validator_session_stats, block_id,
std::move(stats));
});
auto ses = session_.release();
delay_action([ses]() mutable { td::actor::send_closure(ses, &validatorsession::ValidatorSession::destroy); },
td::Timestamp::in(10.0));

View file

@ -141,9 +141,8 @@ class ValidatorGroup : public td::actor::Actor {
typedef std::tuple<td::Bits256, BlockIdExt, FileHash, FileHash> CacheKey;
std::map<CacheKey, UnixTime> approved_candidates_cache_;
td::uint32 approved_candidates_cache_round_ = 0;
void update_approve_cache(td::uint32 round_id, CacheKey key, UnixTime value);
void update_approve_cache(CacheKey key, UnixTime value);
static CacheKey block_to_cache_key(const BlockCandidate& block) {
return std::make_tuple(block.pubkey.as_bits256(), block.id, sha256_bits256(block.data), block.collated_file_hash);

View file

@ -112,6 +112,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
std::string get_session_logs_file() const override {
return session_logs_file_;
}
td::uint32 get_celldb_compress_depth() const override {
return celldb_compress_depth_;
}
ValidatorMode validator_mode() const override {
return validator_mode_;
}
@ -168,6 +171,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
void set_session_logs_file(std::string f) override {
session_logs_file_ = std::move(f);
}
void set_celldb_compress_depth(td::uint32 value) override {
celldb_compress_depth_ = value;
}
void set_validator_mode(ValidatorMode value) override {
validator_mode_ = value;
}
@ -211,6 +217,7 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
BlockSeqno truncate_{0};
BlockSeqno sync_upto_{0};
std::string session_logs_file_;
td::uint32 celldb_compress_depth_{0};
ValidatorMode validator_mode_ = validator_normal;
};

View file

@ -81,6 +81,7 @@ struct ValidatorManagerOptions : public td::CntObject {
virtual BlockSeqno get_truncate_seqno() const = 0;
virtual BlockSeqno sync_upto() const = 0;
virtual std::string get_session_logs_file() const = 0;
virtual td::uint32 get_celldb_compress_depth() const = 0;
virtual ValidatorMode validator_mode() const = 0;
virtual void set_zero_block_id(BlockIdExt block_id) = 0;
@ -100,6 +101,7 @@ struct ValidatorManagerOptions : public td::CntObject {
virtual void truncate_db(BlockSeqno seqno) = 0;
virtual void set_sync_upto(BlockSeqno seqno) = 0;
virtual void set_session_logs_file(std::string f) = 0;
virtual void set_celldb_compress_depth(td::uint32 value) = 0;
virtual void set_validator_mode(ValidatorMode value) = 0;
static td::Ref<ValidatorManagerOptions> create(
@ -239,6 +241,8 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void prepare_perf_timer_stats(td::Promise<std::vector<PerfTimerStats>> promise) = 0;
virtual void add_perf_timer_stat(std::string name, double duration) = 0;
virtual void get_out_msg_queue_size(BlockIdExt block_id, td::Promise<td::uint32> promise) = 0;
virtual void get_validator_sessions_info(
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) = 0;