diff --git a/validator/db/archive-manager.cpp b/validator/db/archive-manager.cpp index d9bd3f01..8bf322f6 100644 --- a/validator/db/archive-manager.cpp +++ b/validator/db/archive-manager.cpp @@ -216,7 +216,7 @@ void ArchiveManager::get_file_short(FileReference ref_id, td::Promisefile_actor_id(), &ArchiveSlice::get_file, ref_id, std::move(P)); + td::actor::send_closure(f->file_actor_id(), &ArchiveSlice::get_file, nullptr, ref_id, std::move(P)); return; } } @@ -239,7 +239,7 @@ void ArchiveManager::get_key_block_proof(FileReference ref_id, td::Promisefile_actor_id(), &ArchiveSlice::get_file, ref_id, std::move(promise)); + td::actor::send_closure(f->file_actor_id(), &ArchiveSlice::get_file, nullptr, ref_id, std::move(promise)); } else { promise.set_error(td::Status::Error(ErrorCode::notready, "key proof not in db")); } @@ -267,14 +267,15 @@ void ArchiveManager::get_file_short_cont(FileReference ref_id, PackageId idx, td td::actor::send_closure(SelfId, &ArchiveManager::get_file_short_cont, std::move(ref_id), idx, std::move(promise)); } }); - td::actor::send_closure(f->file_actor_id(), &ArchiveSlice::get_file, std::move(ref_id), std::move(P)); + td::actor::send_closure(f->file_actor_id(), &ArchiveSlice::get_file, nullptr, std::move(ref_id), std::move(P)); } void ArchiveManager::get_file(ConstBlockHandle handle, FileReference ref_id, td::Promise promise) { if (handle->moved_to_archive()) { auto f = get_file_desc(handle->id().shard_full(), get_package_id(handle->masterchain_ref_block()), 0, 0, 0, false); if (f) { - td::actor::send_closure(f->file_actor_id(), &ArchiveSlice::get_file, std::move(ref_id), std::move(promise)); + td::actor::send_closure(f->file_actor_id(), &ArchiveSlice::get_file, std::move(handle), std::move(ref_id), + std::move(promise)); return; } } diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index 3865d25c..1bec048e 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -168,6 +168,8 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); return; } + auto &p = choose_package( + handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0); std::string value; auto R = kv_->get(ref_id.hash().to_hex(), value); R.ensure(); @@ -186,7 +188,7 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer std::move(promise)); }); - td::actor::send_closure(writer_, &PackageWriter::append, ref_id.filename(), std::move(data), std::move(P)); + td::actor::send_closure(p.writer, &PackageWriter::append, ref_id.filename(), std::move(data), std::move(P)); } void ArchiveSlice::add_file_cont(FileReference ref_id, td::uint64 offset, td::uint64 size, @@ -246,11 +248,13 @@ void ArchiveSlice::get_temp_handle(BlockIdExt block_id, td::Promise promise) { +void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::Promise promise) { if (destroyed_) { promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd")); return; } + auto &p = choose_package( + handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0); std::string value; auto R = kv_->get(ref_id.hash().to_hex(), value); R.ensure(); @@ -267,7 +271,7 @@ void ArchiveSlice::get_file(FileReference ref_id, td::Promise p promise.set_value(std::move(R.move_as_ok().second)); } }); - td::actor::create_actor("reader", package_, offset, std::move(P)).release(); + td::actor::create_actor("reader", p.package, offset, std::move(P)).release(); } void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id, @@ -426,7 +430,7 @@ void ArchiveSlice::start_up() { << "': " << R.move_as_error(); return; } - package_ = std::make_shared(R.move_as_ok()); + auto pack = std::make_shared(R.move_as_ok()); kv_ = std::make_shared(td::RocksDb::open(prefix_ + ".index").move_as_ok()); std::string value; @@ -435,12 +439,13 @@ void ArchiveSlice::start_up() { if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) { auto len = td::to_integer(value); - package_->truncate(len); + pack->truncate(len); } else { - package_->truncate(0); + pack->truncate(0); } - writer_ = td::actor::create_actor("writer", package_); + auto writer = td::actor::create_actor("writer", pack); + packages_.emplace_back(std::move(pack), std::move(writer), prefix_ + ".pack", 0); } void ArchiveSlice::begin_transaction() { @@ -470,13 +475,26 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise promise) { huge_transaction_started_ = false; } - td::actor::send_closure(writer_, &PackageWriter::set_async_mode, mode, std::move(promise)); + td::MultiPromise mp; + auto ig = mp.init_guard(); + ig.add_promise(std::move(promise)); + + for (auto &p : packages_) { + td::actor::send_closure(p.writer, &PackageWriter::set_async_mode, mode, std::move(promise)); + } } ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, std::string prefix) : archive_id_(archive_id), key_blocks_only_(key_blocks_only), temp_(temp), prefix_(std::move(prefix)) { } +ArchiveSlice::PackageInfo &ArchiveSlice::choose_package(BlockSeqno masterchain_seqno) { + if (temp_ || key_blocks_only_) { + return packages_[0]; + } + return packages_[0]; +} + namespace { void destroy_db(std::string name, td::uint32 attempt, td::Promise promise) { @@ -502,8 +520,7 @@ void ArchiveSlice::destroy(td::Promise promise) { ig.add_promise(std::move(promise)); destroyed_ = true; - writer_.reset(); - package_ = nullptr; + packages_.clear(); kv_ = nullptr; td::unlink(prefix_ + ".pack").ensure(); diff --git a/validator/db/archive-slice.hpp b/validator/db/archive-slice.hpp index 65f323ce..c0f8f53d 100644 --- a/validator/db/archive-slice.hpp +++ b/validator/db/archive-slice.hpp @@ -58,7 +58,7 @@ class ArchiveSlice : public td::actor::Actor { void add_file(BlockHandle handle, FileReference ref_id, td::BufferSlice data, td::Promise promise); void get_handle(BlockIdExt block_id, td::Promise promise); void get_temp_handle(BlockIdExt block_id, td::Promise promise); - void get_file(FileReference ref_id, td::Promise promise); + void get_file(ConstBlockHandle handle, FileReference ref_id, td::Promise promise); /* from LTDB */ void get_block_by_unix_time(AccountIdPrefixFull account_id, UnixTime ts, td::Promise promise); @@ -99,9 +99,21 @@ class ArchiveSlice : public td::actor::Actor { td::uint32 huge_transaction_size_ = 0; std::string prefix_; - std::shared_ptr package_; std::shared_ptr kv_; - td::actor::ActorOwn writer_; + + struct PackageInfo { + PackageInfo(std::shared_ptr package, td::actor::ActorOwn writer, std::string path, + td::uint32 idx) + : package(std::move(package)), writer(std ::move(writer)), path(std::move(path)), idx(idx) { + } + std::shared_ptr package; + td::actor::ActorOwn writer; + std::string path; + td::uint32 idx; + }; + std::vector packages_; + + PackageInfo &choose_package(BlockSeqno masterchain_seqno); }; } // namespace validator diff --git a/validator/manager.cpp b/validator/manager.cpp index dc75220a..c55f4ce3 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1445,6 +1445,8 @@ void ValidatorManagerImpl::started(ValidatorManagerInitResult R) { last_key_block_handle_ = std::move(R.last_key_block_handle_); last_known_key_block_handle_ = last_key_block_handle_; + + CHECK(last_masterchain_block_handle_->is_applied()); callback_->new_key_block(last_key_block_handle_); gc_masterchain_handle_ = std::move(R.gc_handle);