1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Accelerator: partial fullnodes (#1393)

* Accelerator: partial fullnodes

1) Node can monitor a subset of shards
2) New archive slice format (sharded)
3) Validators are still required to have all shards
4) Support partial liteservers in lite-client, blockchain explorer, tonlib
5) Proxy liteserver

* Fix compilation error
This commit is contained in:
SpyCheese 2024-11-26 15:46:58 +04:00 committed by GitHub
parent 62444100f5
commit 954a96a077
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
83 changed files with 3213 additions and 1113 deletions

View file

@ -39,7 +39,7 @@ class PackageStatistics {
void record_close(uint64_t count = 1) {
close_count.fetch_add(count, std::memory_order_relaxed);
}
void record_read(double time, uint64_t bytes) {
read_bytes.fetch_add(bytes, std::memory_order_relaxed);
std::lock_guard guard(read_mutex);
@ -56,10 +56,10 @@ class PackageStatistics {
std::stringstream ss;
ss.setf(std::ios::fixed);
ss.precision(6);
ss << "ton.pack.open COUNT : " << open_count.exchange(0, std::memory_order_relaxed) << "\n";
ss << "ton.pack.close COUNT : " << close_count.exchange(0, std::memory_order_relaxed) << "\n";
ss << "ton.pack.read.bytes COUNT : " << read_bytes.exchange(0, std::memory_order_relaxed) << "\n";
ss << "ton.pack.write.bytes COUNT : " << write_bytes.exchange(0, std::memory_order_relaxed) << "\n";
@ -118,7 +118,7 @@ void PackageWriter::append(std::string filename, td::BufferSlice data,
return;
}
start = td::Timestamp::now();
offset = p->append(std::move(filename), std::move(data), !async_mode_);
offset = p->append(std::move(filename), std::move(data), !async_mode_);
end = td::Timestamp::now();
size = p->size();
}
@ -152,6 +152,21 @@ class PackageReader : public td::actor::Actor {
std::shared_ptr<PackageStatistics> statistics_;
};
static std::string get_package_file_name(PackageId p_id, ShardIdFull shard_prefix) {
td::StringBuilder sb;
sb << p_id.name();
if (!shard_prefix.is_masterchain()) {
sb << ".";
sb << shard_prefix.workchain << ":" << shard_to_str(shard_prefix.shard);
}
sb << ".pack";
return sb.as_cslice().str();
}
static std::string package_info_to_str(BlockSeqno seqno, ShardIdFull shard_prefix) {
return PSTRING() << seqno << "." << shard_prefix.workchain << ":" << shard_to_str(shard_prefix.shard);
}
void ArchiveSlice::add_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
if (destroyed_) {
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
@ -271,7 +286,8 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer
TRY_RESULT_PROMISE(
promise, p,
choose_package(
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, true));
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0,
handle ? handle->id().shard_full() : ShardIdFull{masterchainId}, true));
std::string value;
auto R = kv_->get(ref_id.hash().to_hex(), value);
R.ensure();
@ -376,7 +392,8 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
TRY_RESULT_PROMISE(
promise, p,
choose_package(
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, false));
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0,
handle ? handle->id().shard_full() : ShardIdFull{masterchainId}, false));
promise = begin_async_query(std::move(promise));
auto P = td::PromiseCreator::lambda(
[promise = std::move(promise)](td::Result<std::pair<std::string, td::BufferSlice>> R) mutable {
@ -536,18 +553,32 @@ void ArchiveSlice::get_slice(td::uint64 archive_id, td::uint64 offset, td::uint3
}
before_query();
auto value = static_cast<td::uint32>(archive_id >> 32);
TRY_RESULT_PROMISE(promise, p, choose_package(value, false));
PackageInfo *p;
if (shard_split_depth_ == 0) {
TRY_RESULT_PROMISE_ASSIGN(promise, p, choose_package(value, ShardIdFull{masterchainId}, false));
} else {
if (value >= packages_.size()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "no such package"));
return;
}
p = &packages_[value];
}
promise = begin_async_query(std::move(promise));
td::actor::create_actor<db::ReadFile>("readfile", p->path, offset, limit, 0, std::move(promise)).release();
}
void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::uint64> promise) {
void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix,
td::Promise<td::uint64> promise) {
before_query();
if (!sliced_mode_) {
promise.set_result(archive_id_);
} else {
TRY_RESULT_PROMISE(promise, p, choose_package(masterchain_seqno, false));
promise.set_result(p->id * (1ull << 32) + archive_id_);
TRY_RESULT_PROMISE(promise, p, choose_package(masterchain_seqno, shard_prefix, false));
if (shard_split_depth_ == 0) {
promise.set_result(p->seqno * (1ull << 32) + archive_id_);
} else {
promise.set_result(p->idx * (1ull << 32) + archive_id_);
}
}
}
@ -573,9 +604,18 @@ void ArchiveSlice::before_query() {
R2.ensure();
slice_size_ = td::to_integer<td::uint32>(value);
CHECK(slice_size_ > 0);
R2 = kv_->get("shard_split_depth", value);
R2.ensure();
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
shard_split_depth_ = td::to_integer<td::uint32>(value);
CHECK(shard_split_depth_ <= 60);
} else {
shard_split_depth_ = 0;
}
for (td::uint32 i = 0; i < tot; i++) {
R2 = kv_->get(PSTRING() << "status." << i, value);
R2.ensure();
CHECK(R2.move_as_ok() == td::KeyValue::GetStatus::Ok);
auto len = td::to_integer<td::uint64>(value);
R2 = kv_->get(PSTRING() << "version." << i, value);
R2.ensure();
@ -583,12 +623,24 @@ void ArchiveSlice::before_query() {
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
ver = td::to_integer<td::uint32>(value);
}
auto v = archive_id_ + slice_size_ * i;
add_package(v, len, ver);
td::uint32 seqno;
ShardIdFull shard_prefix;
if (shard_split_depth_ == 0) {
seqno = archive_id_ + slice_size_ * i;
shard_prefix = ShardIdFull{masterchainId};
} else {
R2 = kv_->get(PSTRING() << "info." << i, value);
R2.ensure();
CHECK(R2.move_as_ok() == td::KeyValue::GetStatus::Ok);
unsigned long long shard;
CHECK(sscanf(value.c_str(), "%u.%d:%016llx", &seqno, &shard_prefix.workchain, &shard) == 3);
shard_prefix.shard = shard;
}
add_package(seqno, shard_prefix, len, ver);
}
} else {
auto len = td::to_integer<td::uint64>(value);
add_package(archive_id_, len, 0);
add_package(archive_id_, ShardIdFull{masterchainId}, len, 0);
}
} else {
if (!temp_ && !key_blocks_only_) {
@ -599,13 +651,17 @@ void ArchiveSlice::before_query() {
kv_->set("slice_size", td::to_string(slice_size_)).ensure();
kv_->set("status.0", "0").ensure();
kv_->set("version.0", td::to_string(default_package_version())).ensure();
if (shard_split_depth_ > 0) {
kv_->set("info.0", package_info_to_str(archive_id_, ShardIdFull{masterchainId})).ensure();
kv_->set("shard_split_depth", td::to_string(shard_split_depth_)).ensure();
}
kv_->commit_transaction().ensure();
add_package(archive_id_, 0, default_package_version());
add_package(archive_id_, ShardIdFull{masterchainId}, 0, default_package_version());
} else {
kv_->begin_transaction().ensure();
kv_->set("status", "0").ensure();
kv_->commit_transaction().ensure();
add_package(archive_id_, 0, 0);
add_package(archive_id_, ShardIdFull{masterchainId}, 0, 0);
}
}
}
@ -642,6 +698,7 @@ void ArchiveSlice::do_close() {
statistics_.pack_statistics->record_close(packages_.size());
}
packages_.clear();
id_to_package_.clear();
}
template<typename T>
@ -697,48 +754,61 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
}
}
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root,
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized,
td::uint32 shard_split_depth, std::string db_root,
td::actor::ActorId<ArchiveLru> archive_lru, DbStatistics statistics)
: archive_id_(archive_id)
, key_blocks_only_(key_blocks_only)
, temp_(temp)
, finalized_(finalized)
, p_id_(archive_id_, key_blocks_only_, temp_)
, shard_split_depth_(temp || key_blocks_only ? 0 : shard_split_depth)
, db_root_(std::move(db_root))
, archive_lru_(std::move(archive_lru))
, statistics_(statistics) {
db_path_ = PSTRING() << db_root_ << p_id_.path() << p_id_.name() << ".index";
}
td::Result<ArchiveSlice::PackageInfo *> ArchiveSlice::choose_package(BlockSeqno masterchain_seqno, bool force) {
td::Result<ArchiveSlice::PackageInfo *> ArchiveSlice::choose_package(BlockSeqno masterchain_seqno,
ShardIdFull shard_prefix, bool force) {
if (temp_ || key_blocks_only_ || !sliced_mode_) {
return &packages_[0];
}
if (masterchain_seqno < archive_id_) {
return td::Status::Error(ErrorCode::notready, "too small masterchain seqno");
}
auto v = (masterchain_seqno - archive_id_) / slice_size_;
if (v >= packages_.size()) {
masterchain_seqno -= (masterchain_seqno - archive_id_) % slice_size_;
CHECK((masterchain_seqno - archive_id_) % slice_size_ == 0);
if (shard_split_depth_ == 0) {
shard_prefix = ShardIdFull{masterchainId};
} else if (!shard_prefix.is_masterchain()) {
shard_prefix.shard |= 1; // In case length is < split depth
shard_prefix = ton::shard_prefix(shard_prefix, shard_split_depth_);
}
auto it = id_to_package_.find({masterchain_seqno, shard_prefix});
if (it == id_to_package_.end()) {
if (!force) {
return td::Status::Error(ErrorCode::notready, "too big masterchain seqno");
return td::Status::Error(ErrorCode::notready, "no such package");
}
CHECK(v == packages_.size());
begin_transaction();
size_t v = packages_.size();
kv_->set("slices", td::to_string(v + 1)).ensure();
kv_->set(PSTRING() << "status." << v, "0").ensure();
kv_->set(PSTRING() << "version." << v, td::to_string(default_package_version())).ensure();
if (shard_split_depth_ > 0) {
kv_->set(PSTRING() << "info." << v, package_info_to_str(masterchain_seqno, shard_prefix)).ensure();
}
commit_transaction();
CHECK((masterchain_seqno - archive_id_) % slice_size_ == 0);
add_package(masterchain_seqno, 0, default_package_version());
add_package(masterchain_seqno, shard_prefix, 0, default_package_version());
return &packages_[v];
} else {
return &packages_[v];
return &packages_[it->second];
}
}
void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 version) {
void ArchiveSlice::add_package(td::uint32 seqno, ShardIdFull shard_prefix, td::uint64 size, td::uint32 version) {
PackageId p_id{seqno, key_blocks_only_, temp_};
std::string path = PSTRING() << db_root_ << p_id.path() << p_id.name() << ".pack";
std::string path = PSTRING() << db_root_ << p_id.path() << get_package_file_name(p_id, shard_prefix);
auto R = Package::open(path, false, true);
if (R.is_error()) {
LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error();
@ -748,8 +818,9 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
statistics_.pack_statistics->record_open();
}
auto idx = td::narrow_cast<td::uint32>(packages_.size());
id_to_package_[{seqno, shard_prefix}] = idx;
if (finalized_) {
packages_.emplace_back(nullptr, td::actor::ActorOwn<PackageWriter>(), seqno, path, idx, version);
packages_.emplace_back(nullptr, td::actor::ActorOwn<PackageWriter>(), seqno, shard_prefix, path, idx, version);
return;
}
auto pack = std::make_shared<Package>(R.move_as_ok());
@ -757,7 +828,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
pack->truncate(size).ensure();
}
auto writer = td::actor::create_actor<PackageWriter>("writer", pack, async_mode_, statistics_.pack_statistics);
packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, idx, version);
packages_.emplace_back(std::move(pack), std::move(writer), seqno, shard_prefix, path, idx, version);
}
namespace {
@ -790,6 +861,7 @@ void ArchiveSlice::destroy(td::Promise<td::Unit> promise) {
statistics_.pack_statistics->record_close(packages_.size());
}
packages_.clear();
id_to_package_.clear();
kv_ = nullptr;
delay_action([name = db_path_, attempt = 0,
@ -861,7 +933,7 @@ void ArchiveSlice::move_handle(ConstBlockHandle handle, Package *old_pack, Packa
move_file(fileref::Block{handle->id()}, old_pack, pack);
}
bool ArchiveSlice::truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block_id, td::uint32 cutoff_idx,
bool ArchiveSlice::truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block_id, td::uint32 cutoff_seqno,
Package *pack) {
std::string value;
auto R = kv_->get(get_db_key_block_info(block_id), value);
@ -876,18 +948,18 @@ bool ArchiveSlice::truncate_block(BlockSeqno masterchain_seqno, BlockIdExt block
return false;
}
auto S = choose_package(seqno, false);
auto S = choose_package(seqno, block_id.shard_full(), false);
S.ensure();
auto p = S.move_as_ok();
CHECK(p->idx <= cutoff_idx);
if (p->idx == cutoff_idx) {
CHECK(p->seqno <= cutoff_seqno);
if (p->seqno == cutoff_seqno) {
move_handle(std::move(handle), p->package.get(), pack);
}
return true;
}
void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shard, td::uint32 cutoff_idx,
void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shard, td::uint32 cutoff_seqno,
Package *pack) {
auto key = get_db_key_lt_desc(shard);
std::string value;
@ -913,7 +985,7 @@ void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shar
E.ensure();
auto e = E.move_as_ok();
if (truncate_block(masterchain_seqno, create_block_id(e->id_), cutoff_idx, pack)) {
if (truncate_block(masterchain_seqno, create_block_id(e->id_), cutoff_seqno, pack)) {
CHECK(new_last_idx == i);
new_last_idx = i + 1;
}
@ -925,7 +997,7 @@ void ArchiveSlice::truncate_shard(BlockSeqno masterchain_seqno, ShardIdFull shar
}
}
void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise) {
void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle, td::Promise<td::Unit> promise) {
if (temp_ || archive_id_ > masterchain_seqno) {
destroy(std::move(promise));
return;
@ -938,15 +1010,8 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
return;
}
auto cutoff = choose_package(masterchain_seqno, false);
cutoff.ensure();
auto pack = cutoff.move_as_ok();
CHECK(pack);
auto pack_r = Package::open(pack->path + ".new", false, true);
pack_r.ensure();
auto new_package = std::make_shared<Package>(pack_r.move_as_ok());
new_package->truncate(0).ensure();
std::map<ShardIdFull, PackageInfo*> old_packages;
std::map<ShardIdFull, std::shared_ptr<Package>> new_packages;
std::string value;
auto status_key = create_serialize_tl_object<ton_api::db_lt_status_key>();
@ -967,38 +1032,71 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
auto G = fetch_tl_object<ton_api::db_lt_shard_value>(value, true);
G.ensure();
auto g = G.move_as_ok();
ShardIdFull shard{g->workchain_, static_cast<td::uint64>(g->shard_)};
truncate_shard(masterchain_seqno, ShardIdFull{g->workchain_, static_cast<td::uint64>(g->shard_)}, pack->idx,
new_package.get());
auto package_r = choose_package(masterchain_seqno, shard, false);
if (package_r.is_error()) {
continue;
}
auto package = package_r.move_as_ok();
CHECK(package);
if (!old_packages.count(package->shard_prefix)) {
old_packages[package->shard_prefix] = package;
auto new_package_r = Package::open(package->path + ".new", false, true);
new_package_r.ensure();
auto new_package = std::make_shared<Package>(new_package_r.move_as_ok());
new_package->truncate(0).ensure();
new_packages[package->shard_prefix] = std::move(new_package);
}
truncate_shard(masterchain_seqno, shard, package->seqno, new_packages[package->shard_prefix].get());
}
for (auto& [shard_prefix, package] : old_packages) {
auto new_package = new_packages[shard_prefix];
CHECK(new_package);
package->package = new_package;
package->writer.reset();
td::unlink(package->path).ensure();
td::rename(package->path + ".new", package->path).ensure();
package->writer = td::actor::create_actor<PackageWriter>("writer", new_package, async_mode_);
}
std::vector<PackageInfo> new_packages_info;
if (!sliced_mode_) {
kv_->set("status", td::to_string(new_package->size())).ensure();
kv_->set("status", td::to_string(packages_.at(0).package->size())).ensure();
} else {
kv_->set(PSTRING() << "status." << pack->idx, td::to_string(new_package->size())).ensure();
for (size_t i = pack->idx + 1; i < packages_.size(); i++) {
for (PackageInfo &package : packages_) {
if (package.seqno <= masterchain_seqno) {
new_packages_info.push_back(std::move(package));
} else {
td::unlink(package.path).ensure();
}
}
id_to_package_.clear();
for (td::uint32 i = 0; i < new_packages_info.size(); ++i) {
PackageInfo &package = new_packages_info[i];
package.idx = i;
kv_->set(PSTRING() << "status." << i, td::to_string(package.package->size())).ensure();
kv_->set(PSTRING() << "version." << i, td::to_string(package.version)).ensure();
if (shard_split_depth_ > 0) {
kv_->set(PSTRING() << "info." << i, package_info_to_str(package.seqno, package.shard_prefix)).ensure();
}
id_to_package_[{package.seqno, package.shard_prefix}] = i;
}
for (size_t i = new_packages_info.size(); i < packages_.size(); i++) {
kv_->erase(PSTRING() << "status." << i);
kv_->erase(PSTRING() << "version." << i);
kv_->erase(PSTRING() << "info." << i);
}
kv_->set("slices", td::to_string(pack->idx + 1));
kv_->set("slices", td::to_string(new_packages_info.size()));
if (statistics_.pack_statistics) {
statistics_.pack_statistics->record_close(packages_.size() - new_packages_info.size());
}
packages_ = std::move(new_packages_info);
}
pack->package = new_package;
pack->writer.reset();
td::unlink(pack->path).ensure();
td::rename(pack->path + ".new", pack->path).ensure();
pack->writer = td::actor::create_actor<PackageWriter>("writer", new_package, async_mode_);
for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) {
td::unlink(packages_[idx].path).ensure();
}
if (statistics_.pack_statistics) {
statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1);
}
packages_.erase(packages_.begin() + pack->idx + 1, packages_.end());
kv_->commit_transaction().ensure();
promise.set_value(td::Unit());
}