1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Limit file descriptors num by adding archive slice lru (#892)

* --max-archive-fd option limits open files in archive manager

* Don't close the latest archives + bugfix

* Delete temp packages early

---------

Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
EmelyanenkoK 2024-02-07 14:56:37 +03:00 committed by GitHub
parent e723213d5c
commit 12c1b1a2e6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 316 additions and 88 deletions

View file

@ -21,7 +21,6 @@
#include "td/actor/MultiPromise.h"
#include "validator/fabric.h"
#include "td/db/RocksDb.h"
#include "ton/ton-io.hpp"
#include "td/utils/port/path.h"
#include "common/delay.h"
#include "files-async.hpp"
@ -32,9 +31,16 @@ namespace validator {
void PackageWriter::append(std::string filename, td::BufferSlice data,
td::Promise<std::pair<td::uint64, td::uint64>> promise) {
auto offset = package_->append(std::move(filename), std::move(data), !async_mode_);
auto size = package_->size();
td::uint64 offset, size;
{
auto p = package_.lock();
if (!p) {
promise.set_error(td::Status::Error("Package is closed"));
return;
}
offset = p->append(std::move(filename), std::move(data), !async_mode_);
size = p->size();
}
promise.set_value(std::pair<td::uint64, td::uint64>{offset, size});
}
@ -44,8 +50,10 @@ class PackageReader : public td::actor::Actor {
td::Promise<std::pair<std::string, td::BufferSlice>> promise)
: package_(std::move(package)), offset_(offset), promise_(std::move(promise)) {
}
void start_up() {
promise_.set_result(package_->read(offset_));
void start_up() override {
auto result = package_->read(offset_);
package_ = {};
promise_.set_result(std::move(result));
stop();
}
@ -64,6 +72,7 @@ void ArchiveSlice::add_handle(BlockHandle handle, td::Promise<td::Unit> promise)
update_handle(std::move(handle), std::move(promise));
return;
}
before_query();
CHECK(!key_blocks_only_);
CHECK(!temp_);
CHECK(handle->inited_unix_time());
@ -146,6 +155,7 @@ void ArchiveSlice::update_handle(BlockHandle handle, td::Promise<td::Unit> promi
promise.set_value(td::Unit());
return;
}
before_query();
CHECK(!key_blocks_only_);
begin_transaction();
@ -168,6 +178,7 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
return;
}
before_query();
TRY_RESULT_PROMISE(
promise, p,
choose_package(
@ -179,6 +190,7 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer
promise.set_value(td::Unit());
return;
}
promise = begin_async_query(std::move(promise));
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), idx = p->idx, ref_id, promise = std::move(promise)](
td::Result<std::pair<td::uint64, td::uint64>> R) mutable {
if (R.is_error()) {
@ -217,6 +229,7 @@ void ArchiveSlice::get_handle(BlockIdExt block_id, td::Promise<BlockHandle> prom
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
return;
}
before_query();
CHECK(!key_blocks_only_);
std::string value;
auto R = kv_->get(get_db_key_block_info(block_id), value);
@ -239,6 +252,7 @@ void ArchiveSlice::get_temp_handle(BlockIdExt block_id, td::Promise<ConstBlockHa
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
return;
}
before_query();
CHECK(!key_blocks_only_);
std::string value;
auto R = kv_->get(get_db_key_block_info(block_id), value);
@ -261,6 +275,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
return;
}
before_query();
std::string value;
auto R = kv_->get(ref_id.hash().to_hex(), value);
R.ensure();
@ -273,6 +288,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
promise, p,
choose_package(
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, false));
promise = begin_async_query(std::move(promise));
auto P = td::PromiseCreator::lambda(
[promise = std::move(promise)](td::Result<std::pair<std::string, td::BufferSlice>> R) mutable {
if (R.is_error()) {
@ -292,6 +308,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
return;
}
before_query();
bool f = false;
BlockIdExt block_id;
td::uint32 ls = 0;
@ -312,7 +329,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
auto G = fetch_tl_object<ton_api::db_lt_desc_value>(value, true);
G.ensure();
auto g = G.move_as_ok();
if (compare_desc(*g.get()) > 0) {
if (compare_desc(*g) > 0) {
continue;
}
td::uint32 l = g->first_idx_ - 1;
@ -328,7 +345,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
auto E = fetch_tl_object<ton_api::db_lt_el_value>(td::BufferSlice{value}, true);
E.ensure();
auto e = E.move_as_ok();
int cmp_val = compare(*e.get());
int cmp_val = compare(*e);
if (cmp_val < 0) {
rseq = create_block_id(e->id_);
@ -342,9 +359,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
}
}
if (rseq.is_valid()) {
if (!block_id.is_valid()) {
block_id = rseq;
} else if (block_id.id.seqno > rseq.id.seqno) {
if (!block_id.is_valid() || block_id.id.seqno > rseq.id.seqno) {
block_id = rseq;
}
}
@ -430,12 +445,15 @@ void ArchiveSlice::get_slice(td::uint64 archive_id, td::uint64 offset, td::uint3
promise.set_error(td::Status::Error(ErrorCode::error, "bad archive id"));
return;
}
before_query();
auto value = static_cast<td::uint32>(archive_id >> 32);
TRY_RESULT_PROMISE(promise, p, choose_package(value, false));
promise = begin_async_query(std::move(promise));
td::actor::create_actor<db::ReadFile>("readfile", p->path, offset, limit, 0, std::move(promise)).release();
}
void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::uint64> promise) {
before_query();
if (!sliced_mode_) {
promise.set_result(archive_id_);
} else {
@ -444,62 +462,107 @@ void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::
}
}
void ArchiveSlice::start_up() {
PackageId p_id{archive_id_, key_blocks_only_, temp_};
std::string db_path = PSTRING() << db_root_ << p_id.path() << p_id.name() << ".index";
kv_ = std::make_shared<td::RocksDb>(td::RocksDb::open(db_path).move_as_ok());
void ArchiveSlice::before_query() {
if (status_ == st_closed) {
LOG(DEBUG) << "Opening archive slice " << db_path_;
kv_ = std::make_unique<td::RocksDb>(td::RocksDb::open(db_path_).move_as_ok());
std::string value;
auto R2 = kv_->get("status", value);
R2.ensure();
sliced_mode_ = false;
slice_size_ = 100;
std::string value;
auto R2 = kv_->get("status", value);
R2.ensure();
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
if (value == "sliced") {
sliced_mode_ = true;
R2 = kv_->get("slices", value);
R2.ensure();
auto tot = td::to_integer<td::uint32>(value);
R2 = kv_->get("slice_size", value);
R2.ensure();
slice_size_ = td::to_integer<td::uint32>(value);
CHECK(slice_size_ > 0);
for (td::uint32 i = 0; i < tot; i++) {
R2 = kv_->get(PSTRING() << "status." << i, value);
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
if (value == "sliced") {
sliced_mode_ = true;
R2 = kv_->get("slices", value);
R2.ensure();
auto len = td::to_integer<td::uint64>(value);
R2 = kv_->get(PSTRING() << "version." << i, value);
auto tot = td::to_integer<td::uint32>(value);
R2 = kv_->get("slice_size", value);
R2.ensure();
td::uint32 ver = 0;
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
ver = td::to_integer<td::uint32>(value);
slice_size_ = td::to_integer<td::uint32>(value);
CHECK(slice_size_ > 0);
for (td::uint32 i = 0; i < tot; i++) {
R2 = kv_->get(PSTRING() << "status." << i, value);
R2.ensure();
auto len = td::to_integer<td::uint64>(value);
R2 = kv_->get(PSTRING() << "version." << i, value);
R2.ensure();
td::uint32 ver = 0;
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
ver = td::to_integer<td::uint32>(value);
}
auto v = archive_id_ + slice_size_ * i;
add_package(v, len, ver);
}
auto v = archive_id_ + slice_size_ * i;
add_package(v, len, ver);
} else {
auto len = td::to_integer<td::uint64>(value);
add_package(archive_id_, len, 0);
}
} else {
auto len = td::to_integer<td::uint64>(value);
add_package(archive_id_, len, 0);
if (!temp_ && !key_blocks_only_) {
sliced_mode_ = true;
kv_->begin_transaction().ensure();
kv_->set("status", "sliced").ensure();
kv_->set("slices", "1").ensure();
kv_->set("slice_size", td::to_string(slice_size_)).ensure();
kv_->set("status.0", "0").ensure();
kv_->set("version.0", td::to_string(default_package_version())).ensure();
kv_->commit_transaction().ensure();
add_package(archive_id_, 0, default_package_version());
} else {
kv_->begin_transaction().ensure();
kv_->set("status", "0").ensure();
kv_->commit_transaction().ensure();
add_package(archive_id_, 0, 0);
}
}
} else {
if (!temp_ && !key_blocks_only_) {
sliced_mode_ = true;
kv_->begin_transaction().ensure();
kv_->set("status", "sliced").ensure();
kv_->set("slices", "1").ensure();
kv_->set("slice_size", td::to_string(slice_size_)).ensure();
kv_->set("status.0", "0").ensure();
kv_->set("version.0", td::to_string(default_package_version())).ensure();
kv_->commit_transaction().ensure();
add_package(archive_id_, 0, default_package_version());
}
status_ = st_open;
if (!archive_lru_.empty()) {
td::actor::send_closure(archive_lru_, &ArchiveLru::on_query, actor_id(this), p_id_,
packages_.size() + ESTIMATED_DB_OPEN_FILES);
}
}
void ArchiveSlice::close_files() {
if (status_ == st_open) {
if (active_queries_ == 0) {
do_close();
} else {
kv_->begin_transaction().ensure();
kv_->set("status", "0").ensure();
kv_->commit_transaction().ensure();
add_package(archive_id_, 0, 0);
status_ = st_want_close;
}
}
}
void ArchiveSlice::do_close() {
if (destroyed_) {
return;
}
CHECK(status_ != st_closed && active_queries_ == 0);
LOG(DEBUG) << "Closing archive slice " << db_path_;
status_ = st_closed;
kv_ = {};
packages_.clear();
}
template<typename T>
td::Promise<T> ArchiveSlice::begin_async_query(td::Promise<T> promise) {
++active_queries_;
return [SelfId = actor_id(this), promise = std::move(promise)](td::Result<T> R) mutable {
td::actor::send_closure(SelfId, &ArchiveSlice::end_async_query);
promise.set_result(std::move(R));
};
}
void ArchiveSlice::end_async_query() {
CHECK(active_queries_ > 0);
--active_queries_;
if (active_queries_ == 0 && status_ == st_want_close) {
do_close();
}
}
void ArchiveSlice::begin_transaction() {
if (!async_mode_ || !huge_transaction_started_) {
kv_->begin_transaction().ensure();
@ -521,7 +584,7 @@ void ArchiveSlice::commit_transaction() {
void ArchiveSlice::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
async_mode_ = mode;
if (!async_mode_ && huge_transaction_started_) {
if (!async_mode_ && huge_transaction_started_ && kv_) {
kv_->commit_transaction().ensure();
huge_transaction_size_ = 0;
huge_transaction_started_ = false;
@ -536,12 +599,16 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
}
}
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root)
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root,
td::actor::ActorId<ArchiveLru> archive_lru)
: archive_id_(archive_id)
, key_blocks_only_(key_blocks_only)
, temp_(temp)
, finalized_(finalized)
, db_root_(std::move(db_root)) {
, p_id_(archive_id_, key_blocks_only_, temp_)
, db_root_(std::move(db_root))
, archive_lru_(std::move(archive_lru)) {
db_path_ = PSTRING() << db_root_ << p_id_.path() << p_id_.name() << ".index";
}
td::Result<ArchiveSlice::PackageInfo *> ArchiveSlice::choose_package(BlockSeqno masterchain_seqno, bool force) {
@ -587,7 +654,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
if (version >= 1) {
pack->truncate(size).ensure();
}
auto writer = td::actor::create_actor<PackageWriter>("writer", pack);
auto writer = td::actor::create_actor<PackageWriter>("writer", pack, async_mode_);
packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, idx, version);
}
@ -611,6 +678,7 @@ void destroy_db(std::string name, td::uint32 attempt, td::Promise<td::Unit> prom
} // namespace
void ArchiveSlice::destroy(td::Promise<td::Unit> promise) {
before_query();
td::MultiPromise mp;
auto ig = mp.init_guard();
ig.add_promise(std::move(promise));
@ -763,6 +831,7 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
destroy(std::move(promise));
return;
}
before_query();
LOG(INFO) << "TRUNCATE: slice " << archive_id_ << " maxseqno= " << max_masterchain_seqno()
<< " truncate_upto=" << masterchain_seqno;
if (max_masterchain_seqno() <= masterchain_seqno) {
@ -819,7 +888,7 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
pack->writer.reset();
td::unlink(pack->path).ensure();
td::rename(pack->path + ".new", pack->path).ensure();
pack->writer = td::actor::create_actor<PackageWriter>("writer", new_package);
pack->writer = td::actor::create_actor<PackageWriter>("writer", new_package, async_mode_);
for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) {
td::unlink(packages_[idx].path).ensure();
@ -831,6 +900,61 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
promise.set_value(td::Unit());
}
static std::tuple<td::uint32, bool, bool> to_tuple(const PackageId &id) {
return {id.id, id.temp, id.key};
}
void ArchiveLru::on_query(td::actor::ActorId<ArchiveSlice> slice, PackageId id, size_t files_count) {
SliceInfo &info = slices_[to_tuple(id)];
if (info.opened_idx != 0) {
total_files_ -= info.files_count;
lru_.erase(info.opened_idx);
}
info.actor = std::move(slice);
total_files_ += (info.files_count = files_count);
info.opened_idx = current_idx_++;
if (!info.is_permanent) {
lru_.emplace(info.opened_idx, id);
}
enforce_limit();
}
void ArchiveLru::set_permanent_slices(std::vector<PackageId> ids) {
for (auto id : permanent_slices_) {
SliceInfo &info = slices_[to_tuple(id)];
if (!info.is_permanent) {
continue;
}
info.is_permanent = false;
if (info.opened_idx) {
lru_.emplace(info.opened_idx, id);
}
}
permanent_slices_ = std::move(ids);
for (auto id : permanent_slices_) {
SliceInfo &info = slices_[to_tuple(id)];
if (info.is_permanent) {
continue;
}
info.is_permanent = true;
if (info.opened_idx) {
lru_.erase(info.opened_idx);
}
}
enforce_limit();
}
void ArchiveLru::enforce_limit() {
while (total_files_ > max_total_files_ && lru_.size() > 1) {
auto it = lru_.begin();
auto it2 = slices_.find(to_tuple(it->second));
lru_.erase(it);
total_files_ -= it2->second.files_count;
td::actor::send_closure(it2->second.actor, &ArchiveSlice::close_files);
it2->second.opened_idx = 0;
}
}
} // namespace validator
} // namespace ton