mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
New archive format and importing archive slices
This commit is contained in:
parent
eb4c876f22
commit
b8999be2c0
29 changed files with 606 additions and 266 deletions
|
@ -39,7 +39,7 @@ class PackageStatistics {
|
|||
void record_close(uint64_t count = 1) {
|
||||
close_count.fetch_add(count, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
||||
void record_read(double time, uint64_t bytes) {
|
||||
read_bytes.fetch_add(bytes, std::memory_order_relaxed);
|
||||
std::lock_guard guard(read_mutex);
|
||||
|
@ -56,10 +56,10 @@ class PackageStatistics {
|
|||
std::stringstream ss;
|
||||
ss.setf(std::ios::fixed);
|
||||
ss.precision(6);
|
||||
|
||||
|
||||
ss << "ton.pack.open COUNT : " << open_count.exchange(0, std::memory_order_relaxed) << "\n";
|
||||
ss << "ton.pack.close COUNT : " << close_count.exchange(0, std::memory_order_relaxed) << "\n";
|
||||
|
||||
|
||||
ss << "ton.pack.read.bytes COUNT : " << read_bytes.exchange(0, std::memory_order_relaxed) << "\n";
|
||||
ss << "ton.pack.write.bytes COUNT : " << write_bytes.exchange(0, std::memory_order_relaxed) << "\n";
|
||||
|
||||
|
@ -118,7 +118,7 @@ void PackageWriter::append(std::string filename, td::BufferSlice data,
|
|||
return;
|
||||
}
|
||||
start = td::Timestamp::now();
|
||||
offset = p->append(std::move(filename), std::move(data), !async_mode_);
|
||||
offset = p->append(std::move(filename), std::move(data), !async_mode_);
|
||||
end = td::Timestamp::now();
|
||||
size = p->size();
|
||||
}
|
||||
|
@ -152,6 +152,21 @@ class PackageReader : public td::actor::Actor {
|
|||
std::shared_ptr<PackageStatistics> statistics_;
|
||||
};
|
||||
|
||||
static std::string get_package_file_name(PackageId p_id, ShardIdFull shard_prefix) {
|
||||
td::StringBuilder sb;
|
||||
sb << p_id.name();
|
||||
if (!shard_prefix.is_masterchain()) {
|
||||
sb << ".";
|
||||
sb << shard_prefix.workchain << ":" << shard_to_str(shard_prefix.shard);
|
||||
}
|
||||
sb << ".pack";
|
||||
return sb.as_cslice().str();
|
||||
}
|
||||
|
||||
static std::string package_info_to_str(BlockSeqno seqno, ShardIdFull shard_prefix) {
|
||||
return PSTRING() << seqno << "." << shard_prefix.workchain << ":" << shard_to_str(shard_prefix.shard);
|
||||
}
|
||||
|
||||
void ArchiveSlice::add_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
|
||||
if (destroyed_) {
|
||||
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
|
||||
|
@ -271,7 +286,8 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer
|
|||
TRY_RESULT_PROMISE(
|
||||
promise, p,
|
||||
choose_package(
|
||||
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, true));
|
||||
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0,
|
||||
handle ? handle->id().shard_full() : ShardIdFull{masterchainId}, true));
|
||||
std::string value;
|
||||
auto R = kv_->get(ref_id.hash().to_hex(), value);
|
||||
R.ensure();
|
||||
|
@ -376,7 +392,8 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
|
|||
TRY_RESULT_PROMISE(
|
||||
promise, p,
|
||||
choose_package(
|
||||
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, false));
|
||||
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0,
|
||||
handle ? handle->id().shard_full() : ShardIdFull{masterchainId}, false));
|
||||
promise = begin_async_query(std::move(promise));
|
||||
auto P = td::PromiseCreator::lambda(
|
||||
[promise = std::move(promise)](td::Result<std::pair<std::string, td::BufferSlice>> R) mutable {
|
||||
|
@ -536,18 +553,32 @@ void ArchiveSlice::get_slice(td::uint64 archive_id, td::uint64 offset, td::uint3
|
|||
}
|
||||
before_query();
|
||||
auto value = static_cast<td::uint32>(archive_id >> 32);
|
||||
TRY_RESULT_PROMISE(promise, p, choose_package(value, false));
|
||||
PackageInfo *p;
|
||||
if (shard_split_depth_ == 0) {
|
||||
TRY_RESULT_PROMISE_ASSIGN(promise, p, choose_package(value, ShardIdFull{masterchainId}, false));
|
||||
} else {
|
||||
if (value >= packages_.size()) {
|
||||
promise.set_error(td::Status::Error(ErrorCode::notready, "no such package"));
|
||||
return;
|
||||
}
|
||||
p = &packages_[value];
|
||||
}
|
||||
promise = begin_async_query(std::move(promise));
|
||||
td::actor::create_actor<db::ReadFile>("readfile", p->path, offset, limit, 0, std::move(promise)).release();
|
||||
}
|
||||
|
||||
void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::uint64> promise) {
|
||||
void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix,
|
||||
td::Promise<td::uint64> promise) {
|
||||
before_query();
|
||||
if (!sliced_mode_) {
|
||||
promise.set_result(archive_id_);
|
||||
} else {
|
||||
TRY_RESULT_PROMISE(promise, p, choose_package(masterchain_seqno, false));
|
||||
promise.set_result(p->id * (1ull << 32) + archive_id_);
|
||||
TRY_RESULT_PROMISE(promise, p, choose_package(masterchain_seqno, shard_prefix, false));
|
||||
if (shard_split_depth_ == 0) {
|
||||
promise.set_result(p->seqno * (1ull << 32) + archive_id_);
|
||||
} else {
|
||||
promise.set_result(p->idx * (1ull << 32) + archive_id_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -573,9 +604,18 @@ void ArchiveSlice::before_query() {
|
|||
R2.ensure();
|
||||
slice_size_ = td::to_integer<td::uint32>(value);
|
||||
CHECK(slice_size_ > 0);
|
||||
R2 = kv_->get("shard_split_depth", value);
|
||||
R2.ensure();
|
||||
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
|
||||
shard_split_depth_ = td::to_integer<td::uint32>(value);
|
||||
CHECK(shard_split_depth_ <= 60);
|
||||
} else {
|
||||
shard_split_depth_ = 0;
|
||||
}
|
||||
for (td::uint32 i = 0; i < tot; i++) {
|
||||
R2 = kv_->get(PSTRING() << "status." << i, value);
|
||||
R2.ensure();
|
||||
CHECK(R2.move_as_ok() == td::KeyValue::GetStatus::Ok);
|
||||
auto len = td::to_integer<td::uint64>(value);
|
||||
R2 = kv_->get(PSTRING() << "version." << i, value);
|
||||
R2.ensure();
|
||||
|
@ -583,12 +623,24 @@ void ArchiveSlice::before_query() {
|
|||
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
|
||||
ver = td::to_integer<td::uint32>(value);
|
||||
}
|
||||
auto v = archive_id_ + slice_size_ * i;
|
||||
add_package(v, len, ver);
|
||||
td::uint32 seqno;
|
||||
ShardIdFull shard_prefix;
|
||||
if (shard_split_depth_ == 0) {
|
||||
seqno = archive_id_ + slice_size_ * i;
|
||||
shard_prefix = ShardIdFull{masterchainId};
|
||||
} else {
|
||||
R2 = kv_->get(PSTRING() << "info." << i, value);
|
||||
R2.ensure();
|
||||
CHECK(R2.move_as_ok() == td::KeyValue::GetStatus::Ok);
|
||||
unsigned long long shard;
|
||||
CHECK(sscanf(value.c_str(), "%u.%d:%016llx", &seqno, &shard_prefix.workchain, &shard) == 3);
|
||||
shard_prefix.shard = shard;
|
||||
}
|
||||
add_package(seqno, shard_prefix, len, ver);
|
||||
}
|
||||
} else {
|
||||
auto len = td::to_integer<td::uint64>(value);
|
||||
add_package(archive_id_, len, 0);
|
||||
add_package(archive_id_, ShardIdFull{masterchainId}, len, 0);
|
||||
}
|
||||
} else {
|
||||
if (!temp_ && !key_blocks_only_) {
|
||||
|
@ -599,13 +651,17 @@ void ArchiveSlice::before_query() {
|
|||
kv_->set("slice_size", td::to_string(slice_size_)).ensure();
|
||||
kv_->set("status.0", "0").ensure();
|
||||
kv_->set("version.0", td::to_string(default_package_version())).ensure();
|
||||
if (shard_split_depth_ > 0) {
|
||||
kv_->set("info.0", package_info_to_str(archive_id_, ShardIdFull{masterchainId})).ensure();
|
||||
kv_->set("shard_split_depth", td::to_string(shard_split_depth_)).ensure();
|
||||
}
|
||||
kv_->commit_transaction().ensure();
|
||||
add_package(archive_id_, 0, default_package_version());
|
||||
add_package(archive_id_, ShardIdFull{masterchainId}, 0, default_package_version());
|
||||
} else {
|
||||
kv_->begin_transaction().ensure();
|
||||
kv_->set("status", "0").ensure();
|
||||
kv_->commit_transaction().ensure();
|
||||
add_package(archive_id_, 0, 0);
|
||||
add_package(archive_id_, ShardIdFull{masterchainId}, 0, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -642,6 +698,7 @@ void ArchiveSlice::do_close() {
|
|||
statistics_.pack_statistics->record_close(packages_.size());
|
||||
}
|
||||
packages_.clear();
|
||||
id_to_package_.clear();
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
|
@ -697,48 +754,61 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
|
|||
}
|
||||
}
|
||||
|
||||
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root,
|
||||
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized,
|
||||
td::uint32 shard_split_depth, std::string db_root,
|
||||
td::actor::ActorId<ArchiveLru> archive_lru, DbStatistics statistics)
|
||||
: archive_id_(archive_id)
|
||||
, key_blocks_only_(key_blocks_only)
|
||||
, temp_(temp)
|
||||
, finalized_(finalized)
|
||||
, p_id_(archive_id_, key_blocks_only_, temp_)
|
||||
, shard_split_depth_(temp || key_blocks_only ? 0 : shard_split_depth)
|
||||
, db_root_(std::move(db_root))
|
||||
, archive_lru_(std::move(archive_lru))
|
||||
, statistics_(statistics) {
|
||||
db_path_ = PSTRING() << db_root_ << p_id_.path() << p_id_.name() << ".index";
|
||||
}
|
||||
|
||||
td::Result<ArchiveSlice::PackageInfo *> ArchiveSlice::choose_package(BlockSeqno masterchain_seqno, bool force) {
|
||||
td::Result<ArchiveSlice::PackageInfo *> ArchiveSlice::choose_package(BlockSeqno masterchain_seqno,
|
||||
ShardIdFull shard_prefix, bool force) {
|
||||
if (temp_ || key_blocks_only_ || !sliced_mode_) {
|
||||
return &packages_[0];
|
||||
}
|
||||
if (masterchain_seqno < archive_id_) {
|
||||
return td::Status::Error(ErrorCode::notready, "too small masterchain seqno");
|
||||
}
|
||||
auto v = (masterchain_seqno - archive_id_) / slice_size_;
|
||||
if (v >= packages_.size()) {
|
||||
masterchain_seqno -= (masterchain_seqno - archive_id_) % slice_size_;
|
||||
CHECK((masterchain_seqno - archive_id_) % slice_size_ == 0);
|
||||
if (shard_split_depth_ == 0) {
|
||||
shard_prefix = ShardIdFull{masterchainId};
|
||||
} else if (!shard_prefix.is_masterchain()) {
|
||||
shard_prefix.shard |= 1; // In case length is < split depth
|
||||
shard_prefix = ton::shard_prefix(shard_prefix, shard_split_depth_);
|
||||
}
|
||||
auto it = id_to_package_.find({masterchain_seqno, shard_prefix});
|
||||
if (it == id_to_package_.end()) {
|
||||
if (!force) {
|
||||
return td::Status::Error(ErrorCode::notready, "too big masterchain seqno");
|
||||
return td::Status::Error(ErrorCode::notready, "no such package");
|
||||
}
|
||||
CHECK(v == packages_.size());
|
||||
begin_transaction();
|
||||
size_t v = packages_.size();
|
||||
kv_->set("slices", td::to_string(v + 1)).ensure();
|
||||
kv_->set(PSTRING() << "status." << v, "0").ensure();
|
||||
kv_->set(PSTRING() << "version." << v, td::to_string(default_package_version())).ensure();
|
||||
if (shard_split_depth_ > 0) {
|
||||
kv_->set(PSTRING() << "info." << v, package_info_to_str(masterchain_seqno, shard_prefix)).ensure();
|
||||
}
|
||||
commit_transaction();
|
||||
CHECK((masterchain_seqno - archive_id_) % slice_size_ == 0);
|
||||
add_package(masterchain_seqno, 0, default_package_version());
|
||||
add_package(masterchain_seqno, shard_prefix, 0, default_package_version());
|
||||
return &packages_[v];
|
||||
} else {
|
||||
return &packages_[v];
|
||||
return &packages_[it->second];
|
||||
}
|
||||
}
|
||||
|
||||
void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 version) {
|
||||
void ArchiveSlice::add_package(td::uint32 seqno, ShardIdFull shard_prefix, td::uint64 size, td::uint32 version) {
|
||||
PackageId p_id{seqno, key_blocks_only_, temp_};
|
||||
std::string path = PSTRING() << db_root_ << p_id.path() << p_id.name() << ".pack";
|
||||
std::string path = PSTRING() << db_root_ << p_id.path() << get_package_file_name(p_id, shard_prefix);
|
||||
auto R = Package::open(path, false, true);
|
||||
if (R.is_error()) {
|
||||
LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error();
|
||||
|
@ -748,8 +818,9 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
|
|||
statistics_.pack_statistics->record_open();
|
||||
}
|
||||
auto idx = td::narrow_cast<td::uint32>(packages_.size());
|
||||
id_to_package_[{seqno, shard_prefix}] = idx;
|
||||
if (finalized_) {
|
||||
packages_.emplace_back(nullptr, td::actor::ActorOwn<PackageWriter>(), seqno, path, idx, version);
|
||||
packages_.emplace_back(nullptr, td::actor::ActorOwn<PackageWriter>(), seqno, shard_prefix, path, idx, version);
|
||||
return;
|
||||
}
|
||||
auto pack = std::make_shared<Package>(R.move_as_ok());
|
||||
|
@ -757,7 +828,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
|
|||
pack->truncate(size).ensure();
|
||||
}
|
||||
auto writer = td::actor::create_actor<PackageWriter>("writer", pack, async_mode_, statistics_.pack_statistics);
|
||||
packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, idx, version);
|
||||
packages_.emplace_back(std::move(pack), std::move(writer), seqno, shard_prefix, path, idx, version);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
@ -793,6 +864,7 @@ void ArchiveSlice::destroy(td::Promise<td::Unit> promise) {
|
|||
statistics_.pack_statistics->record_close(packages_.size());
|
||||
}
|
||||
packages_.clear();
|
||||
id_to_package_.clear();
|
||||
kv_ = nullptr;
|
||||
|
||||
PackageId p_id{archive_id_, key_blocks_only_, temp_};
|
||||
|
@ -866,7 +938,7 @@ void ArchiveSlice::move_handle(ConstBlockHandle handle, Package *old_pack, Packa
|
|||
move_file(fileref::Block{handle->id()}, old_pack, pack);
|
||||
}
|
||||
|
||||
bool ArchiveSlice::truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block_id, td::uint32 cutoff_idx,
|
||||
bool ArchiveSlice::truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block_id, td::uint32 cutoff_seqno,
|
||||
Package *pack) {
|
||||
std::string value;
|
||||
auto R = kv_->get(get_db_key_block_info(block_id), value);
|
||||
|
@ -881,18 +953,18 @@ bool ArchiveSlice::truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block
|
|||
return false;
|
||||
}
|
||||
|
||||
auto S = choose_package(seqno, false);
|
||||
auto S = choose_package(seqno, block_id.shard_full(), false);
|
||||
S.ensure();
|
||||
auto p = S.move_as_ok();
|
||||
CHECK(p->idx <= cutoff_idx);
|
||||
if (p->idx == cutoff_idx) {
|
||||
CHECK(p->seqno <= cutoff_seqno);
|
||||
if (p->seqno == cutoff_seqno) {
|
||||
move_handle(std::move(handle), p->package.get(), pack);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shard, td::uint32 cutoff_idx,
|
||||
void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shard, td::uint32 cutoff_seqno,
|
||||
Package *pack) {
|
||||
auto key = get_db_key_lt_desc(shard);
|
||||
std::string value;
|
||||
|
@ -918,7 +990,7 @@ void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shar
|
|||
E.ensure();
|
||||
auto e = E.move_as_ok();
|
||||
|
||||
if (truncate_block(masterchain_seqno, create_block_id(e->id_), cutoff_idx, pack)) {
|
||||
if (truncate_block(masterchain_seqno, create_block_id(e->id_), cutoff_seqno, pack)) {
|
||||
CHECK(new_last_idx == i);
|
||||
new_last_idx = i + 1;
|
||||
}
|
||||
|
@ -930,7 +1002,7 @@ void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shar
|
|||
}
|
||||
}
|
||||
|
||||
void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise) {
|
||||
void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle, td::Promise<td::Unit> promise) {
|
||||
if (temp_ || archive_id_ > masterchain_seqno) {
|
||||
destroy(std::move(promise));
|
||||
return;
|
||||
|
@ -943,15 +1015,8 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
|
|||
return;
|
||||
}
|
||||
|
||||
auto cutoff = choose_package(masterchain_seqno, false);
|
||||
cutoff.ensure();
|
||||
auto pack = cutoff.move_as_ok();
|
||||
CHECK(pack);
|
||||
|
||||
auto pack_r = Package::open(pack->path + ".new", false, true);
|
||||
pack_r.ensure();
|
||||
auto new_package = std::make_shared<Package>(pack_r.move_as_ok());
|
||||
new_package->truncate(0).ensure();
|
||||
std::map<ShardIdFull, PackageInfo*> old_packages;
|
||||
std::map<ShardIdFull, std::shared_ptr<Package>> new_packages;
|
||||
|
||||
std::string value;
|
||||
auto status_key = create_serialize_tl_object<ton_api::db_lt_status_key>();
|
||||
|
@ -972,38 +1037,71 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
|
|||
auto G = fetch_tl_object<ton_api::db_lt_shard_value>(value, true);
|
||||
G.ensure();
|
||||
auto g = G.move_as_ok();
|
||||
ShardIdFull shard{g->workchain_, static_cast<td::uint64>(g->shard_)};
|
||||
|
||||
truncate_shard(masterchain_seqno, ShardIdFull{g->workchain_, static_cast<td::uint64>(g->shard_)}, pack->idx,
|
||||
new_package.get());
|
||||
auto package_r = choose_package(masterchain_seqno, shard, false);
|
||||
if (package_r.is_error()) {
|
||||
continue;
|
||||
}
|
||||
auto package = package_r.move_as_ok();
|
||||
CHECK(package);
|
||||
if (!old_packages.count(package->shard_prefix)) {
|
||||
old_packages[package->shard_prefix] = package;
|
||||
auto new_package_r = Package::open(package->path + ".new", false, true);
|
||||
new_package_r.ensure();
|
||||
auto new_package = std::make_shared<Package>(new_package_r.move_as_ok());
|
||||
new_package->truncate(0).ensure();
|
||||
new_packages[package->shard_prefix] = std::move(new_package);
|
||||
}
|
||||
truncate_shard(masterchain_seqno, shard, package->seqno, new_packages[package->shard_prefix].get());
|
||||
}
|
||||
|
||||
for (auto& [shard_prefix, package] : old_packages) {
|
||||
auto new_package = new_packages[shard_prefix];
|
||||
CHECK(new_package);
|
||||
package->package = new_package;
|
||||
package->writer.reset();
|
||||
td::unlink(package->path).ensure();
|
||||
td::rename(package->path + ".new", package->path).ensure();
|
||||
package->writer = td::actor::create_actor<PackageWriter>("writer", new_package, async_mode_);
|
||||
}
|
||||
|
||||
std::vector<PackageInfo> new_packages_info;
|
||||
|
||||
if (!sliced_mode_) {
|
||||
kv_->set("status", td::to_string(new_package->size())).ensure();
|
||||
kv_->set("status", td::to_string(packages_.at(0).package->size())).ensure();
|
||||
} else {
|
||||
kv_->set(PSTRING() << "status." << pack->idx, td::to_string(new_package->size())).ensure();
|
||||
for (size_t i = pack->idx + 1; i < packages_.size(); i++) {
|
||||
for (PackageInfo &package : packages_) {
|
||||
if (package.seqno <= masterchain_seqno) {
|
||||
new_packages_info.push_back(std::move(package));
|
||||
} else {
|
||||
td::unlink(package.path).ensure();
|
||||
}
|
||||
}
|
||||
id_to_package_.clear();
|
||||
for (td::uint32 i = 0; i < new_packages_info.size(); ++i) {
|
||||
PackageInfo &package = new_packages_info[i];
|
||||
package.idx = i;
|
||||
kv_->set(PSTRING() << "status." << i, td::to_string(package.package->size())).ensure();
|
||||
kv_->set(PSTRING() << "version." << i, td::to_string(package.version)).ensure();
|
||||
if (shard_split_depth_ > 0) {
|
||||
kv_->set(PSTRING() << "info." << i, package_info_to_str(package.seqno, package.shard_prefix)).ensure();
|
||||
}
|
||||
id_to_package_[{package.seqno, package.shard_prefix}] = i;
|
||||
}
|
||||
for (size_t i = new_packages_info.size(); i < packages_.size(); i++) {
|
||||
kv_->erase(PSTRING() << "status." << i);
|
||||
kv_->erase(PSTRING() << "version." << i);
|
||||
kv_->erase(PSTRING() << "info." << i);
|
||||
}
|
||||
kv_->set("slices", td::to_string(pack->idx + 1));
|
||||
kv_->set("slices", td::to_string(new_packages_info.size()));
|
||||
if (statistics_.pack_statistics) {
|
||||
statistics_.pack_statistics->record_close(packages_.size() - new_packages_info.size());
|
||||
}
|
||||
packages_ = std::move(new_packages_info);
|
||||
}
|
||||
|
||||
pack->package = new_package;
|
||||
pack->writer.reset();
|
||||
td::unlink(pack->path).ensure();
|
||||
td::rename(pack->path + ".new", pack->path).ensure();
|
||||
pack->writer = td::actor::create_actor<PackageWriter>("writer", new_package, async_mode_);
|
||||
|
||||
for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) {
|
||||
td::unlink(packages_[idx].path).ensure();
|
||||
}
|
||||
if (statistics_.pack_statistics) {
|
||||
statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1);
|
||||
}
|
||||
packages_.erase(packages_.begin() + pack->idx + 1, packages_.end());
|
||||
|
||||
kv_->commit_transaction().ensure();
|
||||
|
||||
promise.set_value(td::Unit());
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue