mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Improve CellDb migration (#835)
* Fix deserializing cells * Use proxy actor * Add delays * Print stats every minute Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
parent
ace934ff35
commit
83efcebad0
6 changed files with 82 additions and 25 deletions
|
@ -23,6 +23,7 @@
|
|||
|
||||
#include "ton/ton-tl.hpp"
|
||||
#include "ton/ton-io.hpp"
|
||||
#include "common/delay.h"
|
||||
|
||||
namespace ton {
|
||||
|
||||
|
@ -68,14 +69,16 @@ CellDbIn::CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb
|
|||
}
|
||||
|
||||
void CellDbIn::start_up() {
|
||||
on_load_callback_ = [db = actor_id(this),
|
||||
on_load_callback_ = [actor = std::make_shared<td::actor::ActorOwn<MigrationProxy>>(
|
||||
td::actor::create_actor<MigrationProxy>("celldbmigration", actor_id(this))),
|
||||
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
|
||||
if (res.cell_.is_null()) {
|
||||
return;
|
||||
}
|
||||
bool expected_stored_boc = res.cell_->get_depth() == compress_depth && compress_depth != 0;
|
||||
if (expected_stored_boc != res.stored_boc_) {
|
||||
td::actor::send_closure(db, &CellDbIn::migrate_cell, td::Bits256{res.cell_->get_hash().bits()});
|
||||
td::actor::send_closure(*actor, &CellDbIn::MigrationProxy::migrate_cell,
|
||||
td::Bits256{res.cell_->get_hash().bits()});
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -156,6 +159,13 @@ void CellDbIn::alarm() {
|
|||
if (migrate_after_ && migrate_after_.is_in_past()) {
|
||||
migrate_cells();
|
||||
}
|
||||
if (migration_stats_ && migration_stats_->end_at_.is_in_past()) {
|
||||
LOG(INFO) << "CellDb migration, " << migration_stats_->start_.elapsed()
|
||||
<< "s stats: batches=" << migration_stats_->batches_ << " migrated=" << migration_stats_->migrated_cells_
|
||||
<< " checked=" << migration_stats_->checked_cells_ << " time=" << migration_stats_->total_time_
|
||||
<< " queue_size=" << cells_to_migrate_.size();
|
||||
migration_stats_ = {};
|
||||
}
|
||||
auto E = get_block(get_empty_key_hash()).move_as_ok();
|
||||
auto N = get_block(E.next).move_as_ok();
|
||||
if (N.is_empty()) {
|
||||
|
@ -291,23 +301,31 @@ void CellDbIn::set_block(KeyHash key_hash, DbEntry e) {
|
|||
|
||||
void CellDbIn::migrate_cell(td::Bits256 hash) {
|
||||
cells_to_migrate_.insert(hash);
|
||||
if (cells_to_migrate_.size() >= 32) {
|
||||
migrate_cells();
|
||||
} else if (!migrate_after_) {
|
||||
migrate_after_ = td::Timestamp::in(1.0);
|
||||
if (!migration_active_) {
|
||||
migration_active_ = true;
|
||||
migrate_after_ = td::Timestamp::in(10.0);
|
||||
}
|
||||
}
|
||||
|
||||
void CellDbIn::migrate_cells() {
|
||||
migrate_after_ = td::Timestamp::never();
|
||||
if (cells_to_migrate_.empty()) {
|
||||
migration_active_ = false;
|
||||
return;
|
||||
}
|
||||
td::Timer timer;
|
||||
if (!migration_stats_) {
|
||||
migration_stats_ = std::make_unique<MigrationStats>();
|
||||
}
|
||||
vm::CellStorer stor{*cell_db_};
|
||||
auto loader = std::make_unique<vm::CellLoader>(cell_db_->snapshot());
|
||||
boc_->set_loader(std::make_unique<vm::CellLoader>(*loader)).ensure();
|
||||
cell_db_->begin_write_batch().ensure();
|
||||
td::uint32 cnt = 0;
|
||||
for (const auto& hash : cells_to_migrate_) {
|
||||
td::uint32 checked = 0, migrated = 0;
|
||||
for (auto it = cells_to_migrate_.begin(); it != cells_to_migrate_.end() && checked < 128; ) {
|
||||
++checked;
|
||||
td::Bits256 hash = *it;
|
||||
it = cells_to_migrate_.erase(it);
|
||||
auto R = loader->load(hash.as_slice(), true, boc_->as_ext_cell_creator());
|
||||
if (R.is_error()) {
|
||||
continue;
|
||||
|
@ -318,18 +336,27 @@ void CellDbIn::migrate_cells() {
|
|||
bool expected_stored_boc =
|
||||
R.ok().cell_->get_depth() == opts_->get_celldb_compress_depth() && opts_->get_celldb_compress_depth() != 0;
|
||||
if (expected_stored_boc != R.ok().stored_boc_) {
|
||||
++cnt;
|
||||
++migrated;
|
||||
stor.set(R.ok().refcnt(), R.ok().cell_, expected_stored_boc).ensure();
|
||||
}
|
||||
}
|
||||
cells_to_migrate_.clear();
|
||||
if (cnt > 0) {
|
||||
LOG(DEBUG) << "Migrated " << cnt << " cells";
|
||||
}
|
||||
cell_db_->commit_write_batch().ensure();
|
||||
boc_->set_loader(std::make_unique<vm::CellLoader>(cell_db_->snapshot(), on_load_callback_)).ensure();
|
||||
td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot());
|
||||
migrate_after_ = td::Timestamp::never();
|
||||
|
||||
double time = timer.elapsed();
|
||||
LOG(DEBUG) << "CellDb migration: migrated=" << migrated << " checked=" << checked << " time=" << time;
|
||||
++migration_stats_->batches_;
|
||||
migration_stats_->migrated_cells_ += migrated;
|
||||
migration_stats_->checked_cells_ += checked;
|
||||
migration_stats_->total_time_ += time;
|
||||
|
||||
if (cells_to_migrate_.empty()) {
|
||||
migration_active_ = false;
|
||||
} else {
|
||||
delay_action([SelfId = actor_id(this)] { td::actor::send_closure(SelfId, &CellDbIn::migrate_cells); },
|
||||
td::Timestamp::in(time * 2));
|
||||
}
|
||||
}
|
||||
|
||||
void CellDb::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise) {
|
||||
|
@ -361,14 +388,16 @@ void CellDb::start_up() {
|
|||
boc_ = vm::DynamicBagOfCellsDb::create();
|
||||
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
|
||||
cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_);
|
||||
on_load_callback_ = [db = cell_db_.get(),
|
||||
on_load_callback_ = [actor = std::make_shared<td::actor::ActorOwn<CellDbIn::MigrationProxy>>(
|
||||
td::actor::create_actor<CellDbIn::MigrationProxy>("celldbmigration", cell_db_.get())),
|
||||
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
|
||||
if (res.cell_.is_null()) {
|
||||
return;
|
||||
}
|
||||
bool expected_stored_boc = res.cell_->get_depth() == compress_depth && compress_depth != 0;
|
||||
if (expected_stored_boc != res.stored_boc_) {
|
||||
td::actor::send_closure(db, &CellDbIn::migrate_cell, td::Bits256{res.cell_->get_hash().bits()});
|
||||
td::actor::send_closure(*actor, &CellDbIn::MigrationProxy::migrate_cell,
|
||||
td::Bits256{res.cell_->get_hash().bits()});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -107,6 +107,30 @@ class CellDbIn : public CellDbBase {
|
|||
std::function<void(const vm::CellLoader::LoadResult&)> on_load_callback_;
|
||||
std::set<td::Bits256> cells_to_migrate_;
|
||||
td::Timestamp migrate_after_ = td::Timestamp::never();
|
||||
bool migration_active_ = false;
|
||||
|
||||
struct MigrationStats {
|
||||
td::Timer start_;
|
||||
td::Timestamp end_at_ = td::Timestamp::in(60.0);
|
||||
size_t batches_ = 0;
|
||||
size_t migrated_cells_ = 0;
|
||||
size_t checked_cells_ = 0;
|
||||
double total_time_ = 0.0;
|
||||
};
|
||||
std::unique_ptr<MigrationStats> migration_stats_;
|
||||
|
||||
public:
|
||||
class MigrationProxy : public td::actor::Actor {
|
||||
public:
|
||||
explicit MigrationProxy(td::actor::ActorId<CellDbIn> cell_db) : cell_db_(cell_db) {
|
||||
}
|
||||
void migrate_cell(td::Bits256 hash) {
|
||||
td::actor::send_closure(cell_db_, &CellDbIn::migrate_cell, hash);
|
||||
}
|
||||
|
||||
private:
|
||||
td::actor::ActorId<CellDbIn> cell_db_;
|
||||
};
|
||||
};
|
||||
|
||||
class CellDb : public CellDbBase {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue