1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

bugfixes + decreased archive slice size

This commit is contained in:
ton 2020-04-08 23:49:28 +04:00
parent 148a5e0179
commit 8be3fc99ed
11 changed files with 290 additions and 102 deletions

View file

@ -168,8 +168,10 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
return;
}
auto &p = choose_package(
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0);
TRY_RESULT_PROMISE(
promise, p,
choose_package(
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, true));
std::string value;
auto R = kv_->get(ref_id.hash().to_hex(), value);
R.ensure();
@ -177,29 +179,35 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer
promise.set_value(td::Unit());
return;
}
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), ref_id, promise = std::move(promise)](
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), idx = p->idx, ref_id, promise = std::move(promise)](
td::Result<std::pair<td::uint64, td::uint64>> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error());
return;
}
auto v = R.move_as_ok();
td::actor::send_closure(SelfId, &ArchiveSlice::add_file_cont, std::move(ref_id), v.first, v.second,
td::actor::send_closure(SelfId, &ArchiveSlice::add_file_cont, idx, std::move(ref_id), v.first, v.second,
std::move(promise));
});
td::actor::send_closure(p.writer, &PackageWriter::append, ref_id.filename(), std::move(data), std::move(P));
td::actor::send_closure(p->writer, &PackageWriter::append, ref_id.filename(), std::move(data), std::move(P));
}
void ArchiveSlice::add_file_cont(FileReference ref_id, td::uint64 offset, td::uint64 size,
void ArchiveSlice::add_file_cont(size_t idx, FileReference ref_id, td::uint64 offset, td::uint64 size,
td::Promise<td::Unit> promise) {
if (destroyed_) {
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
return;
}
begin_transaction();
kv_->set("status", td::to_string(size)).ensure();
kv_->set(ref_id.hash().to_hex(), td::to_string(offset)).ensure();
if (sliced_mode_) {
kv_->set(PSTRING() << "status." << idx, td::to_string(size)).ensure();
kv_->set(ref_id.hash().to_hex(), td::to_string(offset)).ensure();
} else {
CHECK(!idx);
kv_->set("status", td::to_string(size)).ensure();
kv_->set(ref_id.hash().to_hex(), td::to_string(offset)).ensure();
}
commit_transaction();
promise.set_value(td::Unit());
}
@ -253,8 +261,6 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
return;
}
auto &p = choose_package(
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0);
std::string value;
auto R = kv_->get(ref_id.hash().to_hex(), value);
R.ensure();
@ -263,6 +269,10 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
return;
}
auto offset = td::to_integer<td::uint64>(value);
TRY_RESULT_PROMISE(
promise, p,
choose_package(
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, false));
auto P = td::PromiseCreator::lambda(
[promise = std::move(promise)](td::Result<std::pair<std::string, td::BufferSlice>> R) mutable {
if (R.is_error()) {
@ -271,7 +281,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
promise.set_value(std::move(R.move_as_ok().second));
}
});
td::actor::create_actor<PackageReader>("reader", p.package, offset, std::move(P)).release();
td::actor::create_actor<PackageReader>("reader", p->package, offset, std::move(P)).release();
}
void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
@ -416,36 +426,70 @@ td::BufferSlice ArchiveSlice::get_db_key_block_info(BlockIdExt block_id) {
void ArchiveSlice::get_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit,
td::Promise<td::BufferSlice> promise) {
if (archive_id != archive_id_) {
if (static_cast<td::uint32>(archive_id) != archive_id_) {
promise.set_error(td::Status::Error(ErrorCode::error, "bad archive id"));
return;
}
td::actor::create_actor<db::ReadFile>("readfile", prefix_ + ".pack", offset, limit, 0, std::move(promise)).release();
auto value = static_cast<td::uint32>(archive_id >> 32);
TRY_RESULT_PROMISE(promise, p, choose_package(value, false));
td::actor::create_actor<db::ReadFile>("readfile", p->path, offset, limit, 0, std::move(promise)).release();
}
void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::uint64> promise) {
if (!sliced_mode_) {
promise.set_result(archive_id_);
} else {
TRY_RESULT_PROMISE(promise, p, choose_package(masterchain_seqno, false));
promise.set_result(p->id * (1ull << 32) + archive_id_);
}
}
void ArchiveSlice::start_up() {
auto R = Package::open(prefix_ + ".pack", false, true);
if (R.is_error()) {
LOG(FATAL) << "failed to open/create archive '" << prefix_ << ".pack"
<< "': " << R.move_as_error();
return;
}
auto pack = std::make_shared<Package>(R.move_as_ok());
kv_ = std::make_shared<td::RocksDb>(td::RocksDb::open(prefix_ + ".index").move_as_ok());
PackageId p_id{archive_id_, key_blocks_only_, temp_};
std::string db_path = PSTRING() << db_root_ << p_id.path() << p_id.name() << ".index";
kv_ = std::make_shared<td::RocksDb>(td::RocksDb::open(db_path).move_as_ok());
std::string value;
auto R2 = kv_->get("status", value);
R2.ensure();
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
auto len = td::to_integer<td::uint64>(value);
pack->truncate(len);
if (value == "sliced") {
sliced_mode_ = true;
R2 = kv_->get("slices", value);
R2.ensure();
auto tot = td::to_integer<td::uint32>(value);
R2 = kv_->get("slice_size", value);
R2.ensure();
slice_size_ = td::to_integer<td::uint32>(value);
CHECK(slice_size_ > 0);
for (td::uint32 i = 0; i < tot; i++) {
R2 = kv_->get(PSTRING() << "status." << i, value);
R2.ensure();
auto len = td::to_integer<td::uint64>(value);
auto v = archive_id_ + slice_size_ * i;
add_package(v, len);
}
} else {
auto len = td::to_integer<td::uint64>(value);
add_package(archive_id_, len);
}
} else {
pack->truncate(0);
if (!temp_ && !key_blocks_only_) {
sliced_mode_ = true;
kv_->begin_transaction().ensure();
kv_->set("status", "sliced").ensure();
kv_->set("slices", "1").ensure();
kv_->set("slice_size", td::to_string(slice_size_)).ensure();
kv_->set("status.0", "0").ensure();
kv_->commit_transaction().ensure();
} else {
kv_->begin_transaction().ensure();
kv_->set("status", "0").ensure();
kv_->commit_transaction().ensure();
}
add_package(archive_id_, 0);
}
auto writer = td::actor::create_actor<PackageWriter>("writer", pack);
packages_.emplace_back(std::move(pack), std::move(writer), prefix_ + ".pack", 0);
}
void ArchiveSlice::begin_transaction() {
@ -484,15 +528,46 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
}
}
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, std::string prefix)
: archive_id_(archive_id), key_blocks_only_(key_blocks_only), temp_(temp), prefix_(std::move(prefix)) {
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, std::string db_root)
: archive_id_(archive_id), key_blocks_only_(key_blocks_only), temp_(temp), db_root_(std::move(db_root)) {
}
ArchiveSlice::PackageInfo &ArchiveSlice::choose_package(BlockSeqno masterchain_seqno) {
if (temp_ || key_blocks_only_) {
return packages_[0];
td::Result<ArchiveSlice::PackageInfo *> ArchiveSlice::choose_package(BlockSeqno masterchain_seqno, bool force) {
if (temp_ || key_blocks_only_ || !sliced_mode_) {
return &packages_[0];
}
return packages_[0];
if (masterchain_seqno < archive_id_) {
return td::Status::Error(ErrorCode::notready, "too small masterchain seqno");
}
auto v = (masterchain_seqno - archive_id_) / slice_size_;
if (v >= packages_.size()) {
if (!force) {
return td::Status::Error(ErrorCode::notready, "too big masterchain seqno");
}
CHECK(v == packages_.size());
begin_transaction();
kv_->set("slices", td::to_string(v + 1)).ensure();
kv_->set(PSTRING() << "status." << v, "0").ensure();
commit_transaction();
CHECK((masterchain_seqno - archive_id_) % slice_size_ == 0);
add_package(masterchain_seqno, 0);
return &packages_[v];
} else {
return &packages_[v];
}
}
void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size) {
PackageId p_id{seqno, key_blocks_only_, temp_};
std::string path = PSTRING() << db_root_ << p_id.path() << p_id.name() << ".pack";
auto R = Package::open(path, false, true);
if (R.is_error()) {
LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error();
return;
}
auto pack = std::make_shared<Package>(R.move_as_ok());
auto writer = td::actor::create_actor<PackageWriter>("writer", pack);
packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, 0);
}
namespace {
@ -520,12 +595,16 @@ void ArchiveSlice::destroy(td::Promise<td::Unit> promise) {
ig.add_promise(std::move(promise));
destroyed_ = true;
for (auto &p : packages_) {
td::unlink(p.path).ensure();
}
packages_.clear();
kv_ = nullptr;
td::unlink(prefix_ + ".pack").ensure();
delay_action([name = prefix_ + ".index", attempt = 0,
PackageId p_id{archive_id_, key_blocks_only_, temp_};
std::string db_path = PSTRING() << db_root_ << p_id.path() << p_id.name() << ".index";
delay_action([name = db_path, attempt = 0,
promise = ig.get_promise()]() mutable { destroy_db(name, attempt, std::move(promise)); },
td::Timestamp::in(0.0));
}