mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
"getcollatoroptionsjson" command in validator console (#1059)
* "getcollatoroptionsjson" command in validator console * Improve state serializer Use previous persistent state to speed up reading
This commit is contained in:
parent
00cd053dbc
commit
b9e89d4c66
23 changed files with 353 additions and 80 deletions
|
@ -21,6 +21,7 @@
|
|||
#include "adnl/utils.hpp"
|
||||
#include "ton/ton-io.hpp"
|
||||
#include "common/delay.h"
|
||||
#include "td/utils/filesystem.h"
|
||||
|
||||
namespace ton {
|
||||
|
||||
|
@ -84,6 +85,20 @@ void AsyncStateSerializer::alarm() {
|
|||
td::actor::send_closure(manager_, &ValidatorManager::get_top_masterchain_block, std::move(P));
|
||||
}
|
||||
|
||||
void AsyncStateSerializer::request_previous_state_files() {
|
||||
td::actor::send_closure(
|
||||
manager_, &ValidatorManager::get_previous_persistent_state_files, masterchain_handle_->id().seqno(),
|
||||
[SelfId = actor_id(this)](td::Result<std::vector<std::pair<std::string, ShardIdFull>>> R) {
|
||||
R.ensure();
|
||||
td::actor::send_closure(SelfId, &AsyncStateSerializer::got_previous_state_files, R.move_as_ok());
|
||||
});
|
||||
}
|
||||
|
||||
void AsyncStateSerializer::got_previous_state_files(std::vector<std::pair<std::string, ShardIdFull>> files) {
|
||||
previous_state_files_ = std::move(files);
|
||||
request_masterchain_state();
|
||||
}
|
||||
|
||||
void AsyncStateSerializer::request_masterchain_state() {
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), manager = manager_](td::Result<td::Ref<ShardState>> R) {
|
||||
if (R.is_error()) {
|
||||
|
@ -133,37 +148,43 @@ void AsyncStateSerializer::next_iteration() {
|
|||
}
|
||||
CHECK(masterchain_handle_->id() == last_block_id_);
|
||||
if (attempt_ < max_attempt() && last_key_block_id_.id.seqno < last_block_id_.id.seqno &&
|
||||
need_serialize(masterchain_handle_) && opts_->get_state_serializer_enabled()) {
|
||||
if (!have_masterchain_state_) {
|
||||
LOG(ERROR) << "started serializing persistent state for " << masterchain_handle_->id().id.to_str();
|
||||
// block next attempts immediately, but send actual request later
|
||||
running_ = true;
|
||||
double delay = td::Random::fast(0, 3600);
|
||||
LOG(WARNING) << "serializer delay = " << delay << "s";
|
||||
delay_action([SelfId = actor_id(
|
||||
this)]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_masterchain_state); },
|
||||
td::Timestamp::in(delay));
|
||||
return;
|
||||
}
|
||||
while (next_idx_ < shards_.size()) {
|
||||
if (!need_monitor(shards_[next_idx_].shard_full())) {
|
||||
next_idx_++;
|
||||
} else {
|
||||
need_serialize(masterchain_handle_)) {
|
||||
if (!have_masterchain_state_ && !opts_->get_state_serializer_enabled()) {
|
||||
LOG(ERROR) << "skipping serializing persistent state for " << masterchain_handle_->id().id.to_str()
|
||||
<< ": serializer is disabled";
|
||||
} else if (!have_masterchain_state_ && have_newer_persistent_state(masterchain_handle_->unix_time())) {
|
||||
LOG(ERROR) << "skipping serializing persistent state for " << masterchain_handle_->id().id.to_str()
|
||||
<< ": newer key block with ts=" << last_known_key_block_ts_ << " exists";
|
||||
} else {
|
||||
if (!have_masterchain_state_) {
|
||||
LOG(ERROR) << "started serializing persistent state for " << masterchain_handle_->id().id.to_str();
|
||||
// block next attempts immediately, but send actual request later
|
||||
running_ = true;
|
||||
double delay = td::Random::fast(0, 1800);
|
||||
double delay = td::Random::fast(0, 3600);
|
||||
LOG(WARNING) << "serializer delay = " << delay << "s";
|
||||
delay_action(
|
||||
[SelfId = actor_id(this), shard = shards_[next_idx_]]() {
|
||||
td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard);
|
||||
[SelfId = actor_id(this)]() {
|
||||
td::actor::send_closure(SelfId, &AsyncStateSerializer::request_previous_state_files);
|
||||
},
|
||||
td::Timestamp::in(delay));
|
||||
return;
|
||||
}
|
||||
while (next_idx_ < shards_.size()) {
|
||||
if (!need_monitor(shards_[next_idx_].shard_full())) {
|
||||
next_idx_++;
|
||||
} else {
|
||||
running_ = true;
|
||||
request_shard_state(shards_[next_idx_]);
|
||||
return;
|
||||
}
|
||||
}
|
||||
LOG(ERROR) << "finished serializing persistent state for " << masterchain_handle_->id().id.to_str();
|
||||
}
|
||||
LOG(ERROR) << "finished serializing persistent state for " << masterchain_handle_->id().id.to_str();
|
||||
last_key_block_ts_ = masterchain_handle_->unix_time();
|
||||
last_key_block_id_ = masterchain_handle_->id();
|
||||
previous_state_files_ = {};
|
||||
previous_state_cache_ = {};
|
||||
previous_state_cur_shards_ = {};
|
||||
}
|
||||
if (!saved_to_db_) {
|
||||
running_ = true;
|
||||
|
@ -177,9 +198,6 @@ void AsyncStateSerializer::next_iteration() {
|
|||
return;
|
||||
}
|
||||
if (masterchain_handle_->inited_next_left()) {
|
||||
if (need_serialize(masterchain_handle_) && !opts_->get_state_serializer_enabled()) {
|
||||
LOG(ERROR) << "skipping serializing persistent state for " << masterchain_handle_->id().id.to_str();
|
||||
}
|
||||
last_block_id_ = masterchain_handle_->one_next(true);
|
||||
have_masterchain_state_ = false;
|
||||
masterchain_handle_ = nullptr;
|
||||
|
@ -204,6 +222,88 @@ void AsyncStateSerializer::got_masterchain_handle(BlockHandle handle) {
|
|||
next_iteration();
|
||||
}
|
||||
|
||||
class CachedCellDbReader : public vm::CellDbReader {
|
||||
public:
|
||||
CachedCellDbReader(std::shared_ptr<vm::CellDbReader> parent,
|
||||
std::shared_ptr<std::map<td::Bits256, td::Ref<vm::Cell>>> cache)
|
||||
: parent_(std::move(parent)), cache_(std::move(cache)) {
|
||||
}
|
||||
td::Result<td::Ref<vm::DataCell>> load_cell(td::Slice hash) override {
|
||||
++total_reqs_;
|
||||
DCHECK(hash.size() == 32);
|
||||
if (cache_) {
|
||||
auto it = cache_->find(td::Bits256{(const unsigned char*)hash.data()});
|
||||
if (it != cache_->end()) {
|
||||
++cached_reqs_;
|
||||
TRY_RESULT(loaded_cell, it->second->load_cell());
|
||||
return loaded_cell.data_cell;
|
||||
}
|
||||
}
|
||||
return parent_->load_cell(hash);
|
||||
}
|
||||
void print_stats() const {
|
||||
LOG(WARNING) << "CachedCellDbReader stats : " << total_reqs_ << " reads, " << cached_reqs_ << " cached";
|
||||
}
|
||||
private:
|
||||
std::shared_ptr<vm::CellDbReader> parent_;
|
||||
std::shared_ptr<std::map<td::Bits256, td::Ref<vm::Cell>>> cache_;
|
||||
|
||||
td::uint64 total_reqs_ = 0;
|
||||
td::uint64 cached_reqs_ = 0;
|
||||
};
|
||||
|
||||
void AsyncStateSerializer::prepare_previous_state_cache(ShardIdFull shard) {
|
||||
std::vector<ShardIdFull> prev_shards;
|
||||
for (const auto& [_, prev_shard] : previous_state_files_) {
|
||||
if (shard_intersects(shard, prev_shard)) {
|
||||
prev_shards.push_back(prev_shard);
|
||||
}
|
||||
}
|
||||
if (prev_shards == previous_state_cur_shards_) {
|
||||
return;
|
||||
}
|
||||
previous_state_cur_shards_ = std::move(prev_shards);
|
||||
previous_state_cache_ = {};
|
||||
if (previous_state_cur_shards_.empty()) {
|
||||
return;
|
||||
}
|
||||
td::Timer timer;
|
||||
LOG(WARNING) << "Preloading previous persistent state for shard " << shard.to_str() << " ("
|
||||
<< previous_state_cur_shards_.size() << " files)";
|
||||
std::map<td::Bits256, td::Ref<vm::Cell>> cells;
|
||||
std::function<void(td::Ref<vm::Cell>)> dfs = [&](td::Ref<vm::Cell> cell) {
|
||||
td::Bits256 hash = cell->get_hash().bits();
|
||||
if (!cells.emplace(hash, cell).second) {
|
||||
return;
|
||||
}
|
||||
bool is_special;
|
||||
vm::CellSlice cs = vm::load_cell_slice_special(cell, is_special);
|
||||
for (unsigned i = 0; i < cs.size_refs(); ++i) {
|
||||
dfs(cs.prefetch_ref(i));
|
||||
}
|
||||
};
|
||||
for (const auto& [file, prev_shard] : previous_state_files_) {
|
||||
if (!shard_intersects(shard, prev_shard)) {
|
||||
continue;
|
||||
}
|
||||
auto r_data = td::read_file(file);
|
||||
if (r_data.is_error()) {
|
||||
LOG(INFO) << "Reading " << file << " : " << r_data.move_as_error();
|
||||
continue;
|
||||
}
|
||||
LOG(INFO) << "Reading " << file << " : " << td::format::as_size(r_data.ok().size());
|
||||
auto r_root = vm::std_boc_deserialize(r_data.move_as_ok());
|
||||
if (r_root.is_error()) {
|
||||
LOG(WARNING) << "Deserialize error : " << r_root.move_as_error();
|
||||
continue;
|
||||
}
|
||||
r_data = {};
|
||||
dfs(r_root.move_as_ok());
|
||||
}
|
||||
LOG(WARNING) << "Preloaded previous state: " << cells.size() << " cells in " << timer.elapsed() << "s";
|
||||
previous_state_cache_ = std::make_shared<std::map<td::Bits256, td::Ref<vm::Cell>>>(std::move(cells));
|
||||
}
|
||||
|
||||
void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state,
|
||||
std::shared_ptr<vm::CellDbReader> cell_db_reader) {
|
||||
if (!opts_->get_state_serializer_enabled()) {
|
||||
|
@ -211,6 +311,8 @@ void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state
|
|||
return;
|
||||
}
|
||||
LOG(ERROR) << "serializing masterchain state " << masterchain_handle_->id().id.to_str();
|
||||
prepare_previous_state_cache(state->get_shard());
|
||||
auto new_cell_db_reader = std::make_shared<CachedCellDbReader>(cell_db_reader, previous_state_cache_);
|
||||
have_masterchain_state_ = true;
|
||||
CHECK(next_idx_ == 0);
|
||||
CHECK(shards_.size() == 0);
|
||||
|
@ -220,9 +322,11 @@ void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state
|
|||
shards_.push_back(v->top_block_id());
|
||||
}
|
||||
|
||||
auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader,
|
||||
auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader = new_cell_db_reader,
|
||||
cancellation_token = cancellation_token_source_.get_cancellation_token()](td::FileFd& fd) mutable {
|
||||
return vm::std_boc_serialize_to_file_large(cell_db_reader, hash, fd, 31, std::move(cancellation_token));
|
||||
auto res = vm::std_boc_serialize_to_file_large(cell_db_reader, hash, fd, 31, std::move(cancellation_token));
|
||||
cell_db_reader->print_stats();
|
||||
return res;
|
||||
};
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
|
||||
if (R.is_error() && R.error().code() == cancelled) {
|
||||
|
@ -273,9 +377,13 @@ void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref<ShardStat
|
|||
return;
|
||||
}
|
||||
LOG(ERROR) << "serializing shard state " << handle->id().id.to_str();
|
||||
auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader,
|
||||
prepare_previous_state_cache(state->get_shard());
|
||||
auto new_cell_db_reader = std::make_shared<CachedCellDbReader>(cell_db_reader, previous_state_cache_);
|
||||
auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader = new_cell_db_reader,
|
||||
cancellation_token = cancellation_token_source_.get_cancellation_token()](td::FileFd& fd) mutable {
|
||||
return vm::std_boc_serialize_to_file_large(cell_db_reader, hash, fd, 31, std::move(cancellation_token));
|
||||
auto res = vm::std_boc_serialize_to_file_large(cell_db_reader, hash, fd, 31, std::move(cancellation_token));
|
||||
cell_db_reader->print_stats();
|
||||
return res;
|
||||
};
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Unit> R) {
|
||||
if (R.is_error() && R.error().code() == cancelled) {
|
||||
|
@ -329,6 +437,10 @@ bool AsyncStateSerializer::need_serialize(BlockHandle handle) {
|
|||
ValidatorManager::persistent_state_ttl(handle->unix_time()) > (UnixTime)td::Clocks::system();
|
||||
}
|
||||
|
||||
bool AsyncStateSerializer::have_newer_persistent_state(UnixTime cur_ts) {
|
||||
return cur_ts / (1 << 17) < last_known_key_block_ts_ / (1 << 17);
|
||||
}
|
||||
|
||||
} // namespace validator
|
||||
|
||||
} // namespace ton
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue