From 12c1b1a2e6c14206b6cfb9c9cfb5fbfa9bedf5bb Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Wed, 7 Feb 2024 14:56:37 +0300 Subject: [PATCH] Limit file descriptors num by adding archive slice lru (#892) * --max-archive-fd option limits open files in archive manager * Don't close the latest archives + bugfix * Delete temp packages early --------- Co-authored-by: SpyCheese --- validator-engine/validator-engine.cpp | 8 + validator-engine/validator-engine.hpp | 4 + validator/db/archive-manager.cpp | 39 +++- validator/db/archive-manager.hpp | 13 +- validator/db/archive-slice.cpp | 244 +++++++++++++++++++------- validator/db/archive-slice.hpp | 70 ++++++-- validator/db/rootdb.cpp | 6 +- validator/db/rootdb.hpp | 3 +- validator/interfaces/db.h | 2 +- validator/manager.cpp | 6 +- validator/validator-options.hpp | 7 + validator/validator.h | 2 + 12 files changed, 316 insertions(+), 88 deletions(-) diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index e488504f..c7e4787c 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -1364,6 +1364,7 @@ td::Status ValidatorEngine::load_global_config() { validator_options_.write().set_session_logs_file(session_logs_file_); } validator_options_.write().set_celldb_compress_depth(celldb_compress_depth_); + validator_options_.write().set_max_open_archive_files(max_open_archive_files_); std::vector h; for (auto &x : conf.validator_->hardforks_) { @@ -3793,6 +3794,13 @@ int main(int argc, char *argv[]) { }); return td::Status::OK(); }); + p.add_checked_option( + '\0', "max-archive-fd", "limit for a number of open file descriptirs in archive manager. 0 is unlimited (default)", + [&](td::Slice s) -> td::Status { + TRY_RESULT(v, td::to_integer_safe(s)); + acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_max_open_archive_files, v); }); + return td::Status::OK(); + }); auto S = p.run(argc, argv); if (S.is_error()) { LOG(ERROR) << "failed to parse options: " << S.move_as_error(); diff --git a/validator-engine/validator-engine.hpp b/validator-engine/validator-engine.hpp index e59bb418..76b6134b 100644 --- a/validator-engine/validator-engine.hpp +++ b/validator-engine/validator-engine.hpp @@ -204,6 +204,7 @@ class ValidatorEngine : public td::actor::Actor { double archive_ttl_ = 0; double key_proof_ttl_ = 0; td::uint32 celldb_compress_depth_ = 0; + size_t max_open_archive_files_ = 0; bool read_config_ = false; bool started_keyring_ = false; bool started_ = false; @@ -264,6 +265,9 @@ class ValidatorEngine : public td::actor::Actor { void set_celldb_compress_depth(td::uint32 value) { celldb_compress_depth_ = value; } + void set_max_open_archive_files(size_t value) { + max_open_archive_files_ = value; + } void start_up() override; ValidatorEngine() { } diff --git a/validator/db/archive-manager.cpp b/validator/db/archive-manager.cpp index 91b7b880..25d686bf 100644 --- a/validator/db/archive-manager.cpp +++ b/validator/db/archive-manager.cpp @@ -55,7 +55,9 @@ std::string PackageId::name() const { } } -ArchiveManager::ArchiveManager(td::actor::ActorId root, std::string db_root) : db_root_(db_root) { +ArchiveManager::ArchiveManager(td::actor::ActorId root, std::string db_root, + td::Ref opts) + : db_root_(db_root), opts_(opts) { } void ArchiveManager::add_handle(BlockHandle handle, td::Promise promise) { @@ -598,9 +600,11 @@ void ArchiveManager::load_package(PackageId id) { } } - desc.file = td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_); + desc.file = + td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get()); m.emplace(id, std::move(desc)); + update_permanent_slices(); } const ArchiveManager::FileDescription *ArchiveManager::get_file_desc(ShardIdFull shard, PackageId id, BlockSeqno seqno, @@ -631,7 +635,8 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull FileDescription new_desc{id, false}; td::mkdir(db_root_ + id.path()).ensure(); std::string prefix = PSTRING() << db_root_ << id.path() << id.name(); - new_desc.file = td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_); + new_desc.file = + td::actor::create_actor("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get()); const FileDescription &desc = f.emplace(id, std::move(new_desc)); if (!id.temp) { update_desc(f, desc, shard, seqno, ts, lt); @@ -673,6 +678,7 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull .ensure(); } index_->commit_transaction().ensure(); + update_permanent_slices(); return &desc; } @@ -820,6 +826,9 @@ void ArchiveManager::start_up() { td::mkdir(db_root_ + "/archive/states/").ensure(); td::mkdir(db_root_ + "/files/").ensure(); td::mkdir(db_root_ + "/files/packages/").ensure(); + if (opts_->get_max_open_archive_files() > 0) { + archive_lru_ = td::actor::create_actor("archive_lru", opts_->get_max_open_archive_files()); + } index_ = std::make_shared(td::RocksDb::open(db_root_ + "/files/globalindex").move_as_ok()); std::string value; auto v = index_->get(create_serialize_tl_object().as_slice(), value); @@ -878,8 +887,8 @@ void ArchiveManager::start_up() { persistent_state_gc(FileHash::zero()); } -void ArchiveManager::run_gc(UnixTime ts, UnixTime archive_ttl) { - auto p = get_temp_package_id_by_unixtime(ts); +void ArchiveManager::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) { + auto p = get_temp_package_id_by_unixtime(std::max(gc_ts, mc_ts - TEMP_PACKAGES_TTL)); std::vector vec; for (auto &x : temp_files_) { if (x.first < p) { @@ -907,7 +916,7 @@ void ArchiveManager::run_gc(UnixTime ts, UnixTime archive_ttl) { if (it == desc.first_blocks.end()) { continue; } - if (it->second.ts < ts - archive_ttl) { + if (it->second.ts < gc_ts - archive_ttl) { vec.push_back(f.first); } } @@ -1200,6 +1209,7 @@ void ArchiveManager::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle han } } } + update_permanent_slices(); } void ArchiveManager::FileMap::shard_index_add(const FileDescription &desc) { @@ -1298,6 +1308,23 @@ const ArchiveManager::FileDescription *ArchiveManager::FileMap::get_next_file_de return it2->second->deleted ? nullptr : it2->second; } +void ArchiveManager::update_permanent_slices() { + if (archive_lru_.empty()) { + return; + } + std::vector ids; + if (!files_.empty()) { + ids.push_back(files_.rbegin()->first); + } + if (!key_files_.empty()) { + ids.push_back(key_files_.rbegin()->first); + } + if (!temp_files_.empty()) { + ids.push_back(temp_files_.rbegin()->first); + } + td::actor::send_closure(archive_lru_, &ArchiveLru::set_permanent_slices, std::move(ids)); +} + } // namespace validator } // namespace ton diff --git a/validator/db/archive-manager.hpp b/validator/db/archive-manager.hpp index e5008764..1c5deaf8 100644 --- a/validator/db/archive-manager.hpp +++ b/validator/db/archive-manager.hpp @@ -28,7 +28,7 @@ class RootDb; class ArchiveManager : public td::actor::Actor { public: - ArchiveManager(td::actor::ActorId root, std::string db_root); + ArchiveManager(td::actor::ActorId root, std::string db_root, td::Ref opts); void add_handle(BlockHandle handle, td::Promise promise); void update_handle(BlockHandle handle, td::Promise promise); @@ -58,7 +58,7 @@ class ArchiveManager : public td::actor::Actor { void truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise promise); //void truncate_continue(BlockSeqno masterchain_seqno, td::Promise promise); - void run_gc(UnixTime ts, UnixTime archive_ttl); + void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl); /* from LTDB */ void get_block_by_unix_time(AccountIdPrefixFull account_id, UnixTime ts, td::Promise promise); @@ -123,6 +123,9 @@ class ArchiveManager : public td::actor::Actor { size_t size() const { return files_.size(); } + bool empty() const { + return files_.empty(); + } std::map::const_iterator lower_bound(const PackageId &x) const { return files_.lower_bound(x); } @@ -164,6 +167,7 @@ class ArchiveManager : public td::actor::Actor { void shard_index_del(const FileDescription &desc); }; FileMap files_, key_files_, temp_files_; + td::actor::ActorOwn archive_lru_; BlockSeqno finalized_up_to_{0}; bool async_mode_ = false; bool huge_transaction_started_ = false; @@ -206,6 +210,7 @@ class ArchiveManager : public td::actor::Actor { void got_gc_masterchain_handle(ConstBlockHandle handle, FileHash hash); std::string db_root_; + td::Ref opts_; std::shared_ptr index_; @@ -215,6 +220,10 @@ class ArchiveManager : public td::actor::Actor { PackageId get_temp_package_id() const; PackageId get_key_package_id(BlockSeqno seqno) const; PackageId get_temp_package_id_by_unixtime(UnixTime ts) const; + + void update_permanent_slices(); + + static const td::uint32 TEMP_PACKAGES_TTL = 86400 * 7; }; } // namespace validator diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index d3d63360..4a7c419a 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -21,7 +21,6 @@ #include "td/actor/MultiPromise.h" #include "validator/fabric.h" #include "td/db/RocksDb.h" -#include "ton/ton-io.hpp" #include "td/utils/port/path.h" #include "common/delay.h" #include "files-async.hpp" @@ -32,9 +31,16 @@ namespace validator { void PackageWriter::append(std::string filename, td::BufferSlice data, td::Promise> promise) { - auto offset = package_->append(std::move(filename), std::move(data), !async_mode_); - auto size = package_->size(); - + td::uint64 offset, size; + { + auto p = package_.lock(); + if (!p) { + promise.set_error(td::Status::Error("Package is closed")); + return; + } + offset = p->append(std::move(filename), std::move(data), !async_mode_); + size = p->size(); + } promise.set_value(std::pair{offset, size}); } @@ -44,8 +50,10 @@ class PackageReader : public td::actor::Actor { td::Promise> promise) : package_(std::move(package)), offset_(offset), promise_(std::move(promise)) { } - void start_up() { - promise_.set_result(package_->read(offset_)); + void start_up() override { + auto result = package_->read(offset_); + package_ = {}; + promise_.set_result(std::move(result)); stop(); } @@ -64,6 +72,7 @@ void ArchiveSlice::add_handle(BlockHandle handle, td::Promise promise) update_handle(std::move(handle), std::move(promise)); return; } + before_query(); CHECK(!key_blocks_only_); CHECK(!temp_); CHECK(handle->inited_unix_time()); @@ -146,6 +155,7 @@ void ArchiveSlice::update_handle(BlockHandle handle, td::Promise promi promise.set_value(td::Unit()); return; } + before_query(); CHECK(!key_blocks_only_); begin_transaction(); @@ -168,6 +178,7 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); return; } + before_query(); TRY_RESULT_PROMISE( promise, p, choose_package( @@ -179,6 +190,7 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer promise.set_value(td::Unit()); return; } + promise = begin_async_query(std::move(promise)); auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), idx = p->idx, ref_id, promise = std::move(promise)]( td::Result> R) mutable { if (R.is_error()) { @@ -217,6 +229,7 @@ void ArchiveSlice::get_handle(BlockIdExt block_id, td::Promise prom promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); return; } + before_query(); CHECK(!key_blocks_only_); std::string value; auto R = kv_->get(get_db_key_block_info(block_id), value); @@ -239,6 +252,7 @@ void ArchiveSlice::get_temp_handle(BlockIdExt block_id, td::Promiseget(get_db_key_block_info(block_id), value); @@ -261,6 +275,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); return; } + before_query(); std::string value; auto R = kv_->get(ref_id.hash().to_hex(), value); R.ensure(); @@ -273,6 +288,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P promise, p, choose_package( handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, false)); + promise = begin_async_query(std::move(promise)); auto P = td::PromiseCreator::lambda( [promise = std::move(promise)](td::Result> R) mutable { if (R.is_error()) { @@ -292,6 +308,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id, promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); return; } + before_query(); bool f = false; BlockIdExt block_id; td::uint32 ls = 0; @@ -312,7 +329,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id, auto G = fetch_tl_object(value, true); G.ensure(); auto g = G.move_as_ok(); - if (compare_desc(*g.get()) > 0) { + if (compare_desc(*g) > 0) { continue; } td::uint32 l = g->first_idx_ - 1; @@ -328,7 +345,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id, auto E = fetch_tl_object(td::BufferSlice{value}, true); E.ensure(); auto e = E.move_as_ok(); - int cmp_val = compare(*e.get()); + int cmp_val = compare(*e); if (cmp_val < 0) { rseq = create_block_id(e->id_); @@ -342,9 +359,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id, } } if (rseq.is_valid()) { - if (!block_id.is_valid()) { - block_id = rseq; - } else if (block_id.id.seqno > rseq.id.seqno) { + if (!block_id.is_valid() || block_id.id.seqno > rseq.id.seqno) { block_id = rseq; } } @@ -430,12 +445,15 @@ void ArchiveSlice::get_slice(td::uint64 archive_id, td::uint64 offset, td::uint3 promise.set_error(td::Status::Error(ErrorCode::error, "bad archive id")); return; } + before_query(); auto value = static_cast(archive_id >> 32); TRY_RESULT_PROMISE(promise, p, choose_package(value, false)); + promise = begin_async_query(std::move(promise)); td::actor::create_actor("readfile", p->path, offset, limit, 0, std::move(promise)).release(); } void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise) { + before_query(); if (!sliced_mode_) { promise.set_result(archive_id_); } else { @@ -444,62 +462,107 @@ void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise(td::RocksDb::open(db_path).move_as_ok()); +void ArchiveSlice::before_query() { + if (status_ == st_closed) { + LOG(DEBUG) << "Opening archive slice " << db_path_; + kv_ = std::make_unique(td::RocksDb::open(db_path_).move_as_ok()); + std::string value; + auto R2 = kv_->get("status", value); + R2.ensure(); + sliced_mode_ = false; + slice_size_ = 100; - std::string value; - auto R2 = kv_->get("status", value); - R2.ensure(); - - if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { - if (value == "sliced") { - sliced_mode_ = true; - R2 = kv_->get("slices", value); - R2.ensure(); - auto tot = td::to_integer(value); - R2 = kv_->get("slice_size", value); - R2.ensure(); - slice_size_ = td::to_integer(value); - CHECK(slice_size_ > 0); - for (td::uint32 i = 0; i < tot; i++) { - R2 = kv_->get(PSTRING() << "status." << i, value); + if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { + if (value == "sliced") { + sliced_mode_ = true; + R2 = kv_->get("slices", value); R2.ensure(); - auto len = td::to_integer(value); - R2 = kv_->get(PSTRING() << "version." << i, value); + auto tot = td::to_integer(value); + R2 = kv_->get("slice_size", value); R2.ensure(); - td::uint32 ver = 0; - if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { - ver = td::to_integer(value); + slice_size_ = td::to_integer(value); + CHECK(slice_size_ > 0); + for (td::uint32 i = 0; i < tot; i++) { + R2 = kv_->get(PSTRING() << "status." << i, value); + R2.ensure(); + auto len = td::to_integer(value); + R2 = kv_->get(PSTRING() << "version." << i, value); + R2.ensure(); + td::uint32 ver = 0; + if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { + ver = td::to_integer(value); + } + auto v = archive_id_ + slice_size_ * i; + add_package(v, len, ver); } - auto v = archive_id_ + slice_size_ * i; - add_package(v, len, ver); + } else { + auto len = td::to_integer(value); + add_package(archive_id_, len, 0); } } else { - auto len = td::to_integer(value); - add_package(archive_id_, len, 0); + if (!temp_ && !key_blocks_only_) { + sliced_mode_ = true; + kv_->begin_transaction().ensure(); + kv_->set("status", "sliced").ensure(); + kv_->set("slices", "1").ensure(); + kv_->set("slice_size", td::to_string(slice_size_)).ensure(); + kv_->set("status.0", "0").ensure(); + kv_->set("version.0", td::to_string(default_package_version())).ensure(); + kv_->commit_transaction().ensure(); + add_package(archive_id_, 0, default_package_version()); + } else { + kv_->begin_transaction().ensure(); + kv_->set("status", "0").ensure(); + kv_->commit_transaction().ensure(); + add_package(archive_id_, 0, 0); + } } - } else { - if (!temp_ && !key_blocks_only_) { - sliced_mode_ = true; - kv_->begin_transaction().ensure(); - kv_->set("status", "sliced").ensure(); - kv_->set("slices", "1").ensure(); - kv_->set("slice_size", td::to_string(slice_size_)).ensure(); - kv_->set("status.0", "0").ensure(); - kv_->set("version.0", td::to_string(default_package_version())).ensure(); - kv_->commit_transaction().ensure(); - add_package(archive_id_, 0, default_package_version()); + } + status_ = st_open; + if (!archive_lru_.empty()) { + td::actor::send_closure(archive_lru_, &ArchiveLru::on_query, actor_id(this), p_id_, + packages_.size() + ESTIMATED_DB_OPEN_FILES); + } +} + +void ArchiveSlice::close_files() { + if (status_ == st_open) { + if (active_queries_ == 0) { + do_close(); } else { - kv_->begin_transaction().ensure(); - kv_->set("status", "0").ensure(); - kv_->commit_transaction().ensure(); - add_package(archive_id_, 0, 0); + status_ = st_want_close; } } } +void ArchiveSlice::do_close() { + if (destroyed_) { + return; + } + CHECK(status_ != st_closed && active_queries_ == 0); + LOG(DEBUG) << "Closing archive slice " << db_path_; + status_ = st_closed; + kv_ = {}; + packages_.clear(); +} + +template +td::Promise ArchiveSlice::begin_async_query(td::Promise promise) { + ++active_queries_; + return [SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { + td::actor::send_closure(SelfId, &ArchiveSlice::end_async_query); + promise.set_result(std::move(R)); + }; +} + +void ArchiveSlice::end_async_query() { + CHECK(active_queries_ > 0); + --active_queries_; + if (active_queries_ == 0 && status_ == st_want_close) { + do_close(); + } +} + void ArchiveSlice::begin_transaction() { if (!async_mode_ || !huge_transaction_started_) { kv_->begin_transaction().ensure(); @@ -521,7 +584,7 @@ void ArchiveSlice::commit_transaction() { void ArchiveSlice::set_async_mode(bool mode, td::Promise promise) { async_mode_ = mode; - if (!async_mode_ && huge_transaction_started_) { + if (!async_mode_ && huge_transaction_started_ && kv_) { kv_->commit_transaction().ensure(); huge_transaction_size_ = 0; huge_transaction_started_ = false; @@ -536,12 +599,16 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise promise) { } } -ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root) +ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root, + td::actor::ActorId archive_lru) : archive_id_(archive_id) , key_blocks_only_(key_blocks_only) , temp_(temp) , finalized_(finalized) - , db_root_(std::move(db_root)) { + , p_id_(archive_id_, key_blocks_only_, temp_) + , db_root_(std::move(db_root)) + , archive_lru_(std::move(archive_lru)) { + db_path_ = PSTRING() << db_root_ << p_id_.path() << p_id_.name() << ".index"; } td::Result ArchiveSlice::choose_package(BlockSeqno masterchain_seqno, bool force) { @@ -587,7 +654,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver if (version >= 1) { pack->truncate(size).ensure(); } - auto writer = td::actor::create_actor("writer", pack); + auto writer = td::actor::create_actor("writer", pack, async_mode_); packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, idx, version); } @@ -611,6 +678,7 @@ void destroy_db(std::string name, td::uint32 attempt, td::Promise prom } // namespace void ArchiveSlice::destroy(td::Promise promise) { + before_query(); td::MultiPromise mp; auto ig = mp.init_guard(); ig.add_promise(std::move(promise)); @@ -763,6 +831,7 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl destroy(std::move(promise)); return; } + before_query(); LOG(INFO) << "TRUNCATE: slice " << archive_id_ << " maxseqno= " << max_masterchain_seqno() << " truncate_upto=" << masterchain_seqno; if (max_masterchain_seqno() <= masterchain_seqno) { @@ -819,7 +888,7 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl pack->writer.reset(); td::unlink(pack->path).ensure(); td::rename(pack->path + ".new", pack->path).ensure(); - pack->writer = td::actor::create_actor("writer", new_package); + pack->writer = td::actor::create_actor("writer", new_package, async_mode_); for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) { td::unlink(packages_[idx].path).ensure(); @@ -831,6 +900,61 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl promise.set_value(td::Unit()); } +static std::tuple to_tuple(const PackageId &id) { + return {id.id, id.temp, id.key}; +} + +void ArchiveLru::on_query(td::actor::ActorId slice, PackageId id, size_t files_count) { + SliceInfo &info = slices_[to_tuple(id)]; + if (info.opened_idx != 0) { + total_files_ -= info.files_count; + lru_.erase(info.opened_idx); + } + info.actor = std::move(slice); + total_files_ += (info.files_count = files_count); + info.opened_idx = current_idx_++; + if (!info.is_permanent) { + lru_.emplace(info.opened_idx, id); + } + enforce_limit(); +} + +void ArchiveLru::set_permanent_slices(std::vector ids) { + for (auto id : permanent_slices_) { + SliceInfo &info = slices_[to_tuple(id)]; + if (!info.is_permanent) { + continue; + } + info.is_permanent = false; + if (info.opened_idx) { + lru_.emplace(info.opened_idx, id); + } + } + permanent_slices_ = std::move(ids); + for (auto id : permanent_slices_) { + SliceInfo &info = slices_[to_tuple(id)]; + if (info.is_permanent) { + continue; + } + info.is_permanent = true; + if (info.opened_idx) { + lru_.erase(info.opened_idx); + } + } + enforce_limit(); +} + +void ArchiveLru::enforce_limit() { + while (total_files_ > max_total_files_ && lru_.size() > 1) { + auto it = lru_.begin(); + auto it2 = slices_.find(to_tuple(it->second)); + lru_.erase(it); + total_files_ -= it2->second.files_count; + td::actor::send_closure(it2->second.actor, &ArchiveSlice::close_files); + it2->second.opened_idx = 0; + } +} + } // namespace validator } // namespace ton diff --git a/validator/db/archive-slice.hpp b/validator/db/archive-slice.hpp index 487115fe..9f775d2e 100644 --- a/validator/db/archive-slice.hpp +++ b/validator/db/archive-slice.hpp @@ -21,6 +21,7 @@ #include "validator/interfaces/db.h" #include "package.hpp" #include "fileref.hpp" +#include namespace ton { @@ -44,7 +45,7 @@ struct PackageId { std::string path() const; std::string name() const; - bool is_empty() { + bool is_empty() const { return id == std::numeric_limits::max(); } static PackageId empty(bool key, bool temp) { @@ -54,26 +55,33 @@ struct PackageId { class PackageWriter : public td::actor::Actor { public: - PackageWriter(std::shared_ptr package) : package_(std::move(package)) { + PackageWriter(std::weak_ptr package, bool async_mode = false) + : package_(std::move(package)), async_mode_(async_mode) { } void append(std::string filename, td::BufferSlice data, td::Promise> promise); void set_async_mode(bool mode, td::Promise promise) { async_mode_ = mode; if (!async_mode_) { - package_->sync(); + auto p = package_.lock(); + if (p) { + p->sync(); + } } promise.set_value(td::Unit()); } private: - std::shared_ptr package_; + std::weak_ptr package_; bool async_mode_ = false; }; +class ArchiveLru; + class ArchiveSlice : public td::actor::Actor { public: - ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root); + ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root, + td::actor::ActorId archive_lru); void get_archive_id(BlockSeqno masterchain_seqno, td::Promise promise); @@ -95,16 +103,23 @@ class ArchiveSlice : public td::actor::Actor { void get_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise promise); - void start_up() override; void destroy(td::Promise promise); void truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise promise); - void begin_transaction(); - void commit_transaction(); void set_async_mode(bool mode, td::Promise promise); + void close_files(); + private: - void written_data(BlockHandle handle, td::Promise promise); + void before_query(); + void do_close(); + template + td::Promise begin_async_query(td::Promise promise); + void end_async_query(); + + void begin_transaction(); + void commit_transaction(); + void add_file_cont(size_t idx, FileReference ref_id, td::uint64 offset, td::uint64 size, td::Promise promise); @@ -112,13 +127,14 @@ class ArchiveSlice : public td::actor::Actor { td::BufferSlice get_db_key_lt_desc(ShardIdFull shard); td::BufferSlice get_db_key_lt_el(ShardIdFull shard, td::uint32 idx); td::BufferSlice get_db_key_block_info(BlockIdExt block_id); - td::BufferSlice get_lt_from_db(ShardIdFull shard, td::uint32 idx); td::uint32 archive_id_; bool key_blocks_only_; bool temp_; bool finalized_; + PackageId p_id_; + std::string db_path_; bool destroyed_ = false; bool async_mode_ = false; @@ -127,8 +143,14 @@ class ArchiveSlice : public td::actor::Actor { td::uint32 huge_transaction_size_ = 0; td::uint32 slice_size_{100}; + enum Status { + st_closed, st_open, st_want_close + } status_ = st_closed; + size_t active_queries_ = 0; + std::string db_root_; - std::shared_ptr kv_; + td::actor::ActorId archive_lru_; + std::unique_ptr kv_; struct PackageInfo { PackageInfo(std::shared_ptr package, td::actor::ActorOwn writer, BlockSeqno id, @@ -164,6 +186,32 @@ class ArchiveSlice : public td::actor::Actor { static constexpr td::uint32 default_package_version() { return 1; } + + static const size_t ESTIMATED_DB_OPEN_FILES = 5; +}; + +class ArchiveLru : public td::actor::Actor { + public: + explicit ArchiveLru(size_t max_total_files) : max_total_files_(max_total_files) { + CHECK(max_total_files_ > 0); + } + void on_query(td::actor::ActorId slice, PackageId id, size_t files_count); + void set_permanent_slices(std::vector ids); + private: + size_t current_idx_ = 1; + struct SliceInfo { + td::actor::ActorId actor; + size_t files_count = 0; + size_t opened_idx = 0; // 0 - not opened + bool is_permanent = false; + }; + std::map, SliceInfo> slices_; + std::map lru_; + size_t total_files_ = 0; + size_t max_total_files_ = 0; + std::vector permanent_slices_; + + void enforce_limit(); }; } // namespace validator diff --git a/validator/db/rootdb.cpp b/validator/db/rootdb.cpp index 601b07c1..ff9abae6 100644 --- a/validator/db/rootdb.cpp +++ b/validator/db/rootdb.cpp @@ -400,7 +400,7 @@ void RootDb::start_up() { cell_db_ = td::actor::create_actor("celldb", actor_id(this), root_path_ + "/celldb/", opts_); state_db_ = td::actor::create_actor("statedb", actor_id(this), root_path_ + "/state/"); static_files_db_ = td::actor::create_actor("staticfilesdb", actor_id(this), root_path_ + "/static/"); - archive_db_ = td::actor::create_actor("archive", actor_id(this), root_path_); + archive_db_ = td::actor::create_actor("archive", actor_id(this), root_path_, opts_); } void RootDb::archive(BlockHandle handle, td::Promise promise) { @@ -497,8 +497,8 @@ void RootDb::set_async_mode(bool mode, td::Promise promise) { td::actor::send_closure(archive_db_, &ArchiveManager::set_async_mode, mode, std::move(promise)); } -void RootDb::run_gc(UnixTime ts, UnixTime archive_ttl) { - td::actor::send_closure(archive_db_, &ArchiveManager::run_gc, ts, archive_ttl); +void RootDb::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) { + td::actor::send_closure(archive_db_, &ArchiveManager::run_gc, mc_ts, gc_ts, archive_ttl); } } // namespace validator diff --git a/validator/db/rootdb.hpp b/validator/db/rootdb.hpp index 598defcb..97b9550b 100644 --- a/validator/db/rootdb.hpp +++ b/validator/db/rootdb.hpp @@ -134,11 +134,10 @@ class RootDb : public Db { td::Promise promise) override; void set_async_mode(bool mode, td::Promise promise) override; - void run_gc(UnixTime ts, UnixTime archive_ttl) override; + void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) override; private: td::actor::ActorId validator_manager_; - std::string root_path_; td::Ref opts_; diff --git a/validator/interfaces/db.h b/validator/interfaces/db.h index ba4d9dda..84ea2b36 100644 --- a/validator/interfaces/db.h +++ b/validator/interfaces/db.h @@ -119,7 +119,7 @@ class Db : public td::actor::Actor { td::Promise promise) = 0; virtual void set_async_mode(bool mode, td::Promise promise) = 0; - virtual void run_gc(UnixTime ts, UnixTime archive_ttl) = 0; + virtual void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) = 0; }; } // namespace validator diff --git a/validator/manager.cpp b/validator/manager.cpp index baaa78eb..55891ead 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -2385,9 +2385,9 @@ void ValidatorManagerImpl::state_serializer_update(BlockSeqno seqno) { void ValidatorManagerImpl::alarm() { try_advance_gc_masterchain_block(); alarm_timestamp() = td::Timestamp::in(1.0); - if (gc_masterchain_handle_) { - td::actor::send_closure(db_, &Db::run_gc, gc_masterchain_handle_->unix_time(), - static_cast(opts_->archive_ttl())); + if (last_masterchain_block_handle_ && gc_masterchain_handle_) { + td::actor::send_closure(db_, &Db::run_gc, last_masterchain_block_handle_->unix_time(), + gc_masterchain_handle_->unix_time(), static_cast(opts_->archive_ttl())); } if (log_status_at_.is_in_past()) { if (last_masterchain_block_handle_) { diff --git a/validator/validator-options.hpp b/validator/validator-options.hpp index 3a7e5ba7..3980b6c6 100644 --- a/validator/validator-options.hpp +++ b/validator/validator-options.hpp @@ -117,6 +117,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { td::uint32 get_celldb_compress_depth() const override { return celldb_compress_depth_; } + size_t get_max_open_archive_files() const override { + return max_open_archive_files_; + } void set_zero_block_id(BlockIdExt block_id) override { zero_block_id_ = block_id; @@ -173,6 +176,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { void set_celldb_compress_depth(td::uint32 value) override { celldb_compress_depth_ = value; } + void set_max_open_archive_files(size_t value) override { + max_open_archive_files_ = value; + } ValidatorManagerOptionsImpl *make_copy() const override { return new ValidatorManagerOptionsImpl(*this); @@ -216,6 +222,7 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { BlockSeqno sync_upto_{0}; std::string session_logs_file_; td::uint32 celldb_compress_depth_{0}; + size_t max_open_archive_files_ = 0; }; } // namespace validator diff --git a/validator/validator.h b/validator/validator.h index 2fefb064..d24467a7 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -82,6 +82,7 @@ struct ValidatorManagerOptions : public td::CntObject { virtual BlockSeqno sync_upto() const = 0; virtual std::string get_session_logs_file() const = 0; virtual td::uint32 get_celldb_compress_depth() const = 0; + virtual size_t get_max_open_archive_files() const = 0; virtual void set_zero_block_id(BlockIdExt block_id) = 0; virtual void set_init_block_id(BlockIdExt block_id) = 0; @@ -102,6 +103,7 @@ struct ValidatorManagerOptions : public td::CntObject { virtual void set_sync_upto(BlockSeqno seqno) = 0; virtual void set_session_logs_file(std::string f) = 0; virtual void set_celldb_compress_depth(td::uint32 value) = 0; + virtual void set_max_open_archive_files(size_t value) = 0; static td::Ref create( BlockIdExt zero_block_id, BlockIdExt init_block_id,