1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Optimization of persistent state serialization (#364)

* Fix double serialization of masterchain; increase sync_blocks_before

* Improve logging in DownloadState

* Write persistent state directly to file instead of a buffer

* Don't keep ref to masterchain state in AsyncStateSerializer

* Sparse state serialization over longer period

Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
EmelyanenkoK 2022-05-15 17:51:24 +03:00 committed by GitHub
parent 56f0293650
commit c07394aab5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 327 additions and 84 deletions

View file

@ -475,15 +475,128 @@ std::string BagOfCells::extract_string() const {
return std::string{serialized.data(), serialized.data() + serialized.size()}; return std::string{serialized.data(), serialized.data() + serialized.size()};
} }
void BagOfCells::store_uint(unsigned long long value, unsigned bytes) { namespace {
struct BufferWriter {
BufferWriter(unsigned char* store_start, unsigned char* store_end)
: store_start(store_start), store_ptr(store_start), store_end(store_end) {}
size_t position() const {
return store_ptr - store_start;
}
size_t remaining() const {
return store_end - store_ptr;
}
void chk() const {
DCHECK(store_ptr <= store_end);
}
bool empty() const {
return store_ptr == store_end;
}
void store_uint(unsigned long long value, unsigned bytes) {
unsigned char* ptr = store_ptr += bytes; unsigned char* ptr = store_ptr += bytes;
store_chk(); chk();
while (bytes) { while (bytes) {
*--ptr = value & 0xff; *--ptr = value & 0xff;
value >>= 8; value >>= 8;
--bytes; --bytes;
} }
DCHECK(!bytes); DCHECK(!bytes);
}
void store_bytes(unsigned char const* data, size_t s) {
store_ptr += s;
chk();
memcpy(store_ptr - s, data, s);
}
unsigned get_crc32() const {
return td::crc32c(td::Slice{store_start, store_ptr});
}
private:
unsigned char* store_start;
unsigned char* store_ptr;
unsigned char* store_end;
};
struct FileWriter {
FileWriter(td::FileFd& fd, size_t expected_size)
: fd(fd), expected_size(expected_size) {}
~FileWriter() {
flush();
}
size_t position() const {
return flushed_size + writer.position();
}
size_t remaining() const {
return expected_size - position();
}
void chk() const {
DCHECK(position() <= expected_size);
}
bool empty() const {
return remaining() == 0;
}
void store_uint(unsigned long long value, unsigned bytes) {
flush_if_needed(bytes);
writer.store_uint(value, bytes);
}
void store_bytes(unsigned char const* data, size_t s) {
flush_if_needed(s);
writer.store_bytes(data, s);
}
unsigned get_crc32() const {
unsigned char const* start = buf.data();
unsigned char const* end = start + writer.position();
return td::crc32c_extend(current_crc32, td::Slice(start, end));
}
td::Status finalize() {
flush();
return std::move(res);
}
private:
void flush_if_needed(size_t s) {
DCHECK(s <= BUF_SIZE);
if (s > BUF_SIZE - writer.position()) {
flush();
}
}
void flush() {
chk();
unsigned char* start = buf.data();
unsigned char* end = start + writer.position();
if (start == end) {
return;
}
flushed_size += end - start;
current_crc32 = td::crc32c_extend(current_crc32, td::Slice(start, end));
if (res.is_ok()) {
while (end > start) {
auto R = fd.write(td::Slice(start, end));
if (R.is_error()) {
res = R.move_as_error();
break;
}
size_t s = R.move_as_ok();
start += s;
}
}
writer = BufferWriter(buf.data(), buf.data() + buf.size());
}
td::FileFd& fd;
size_t expected_size;
size_t flushed_size = 0;
unsigned current_crc32 = td::crc32c(td::Slice());
static const size_t BUF_SIZE = 1 << 22;
std::vector<unsigned char> buf = std::vector<unsigned char>(BUF_SIZE, '\0');
BufferWriter writer = BufferWriter(buf.data(), buf.data() + buf.size());
td::Status res = td::Status::OK();
};
} }
//serialized_boc#672fb0ac has_idx:(## 1) has_crc32c:(## 1) //serialized_boc#672fb0ac has_idx:(## 1) has_crc32c:(## 1)
@ -497,13 +610,16 @@ void BagOfCells::store_uint(unsigned long long value, unsigned bytes) {
// index:(cells * ##(off_bytes * 8)) // index:(cells * ##(off_bytes * 8))
// cell_data:(tot_cells_size * [ uint8 ]) // cell_data:(tot_cells_size * [ uint8 ])
// = BagOfCells; // = BagOfCells;
std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_size, int mode) { template<typename WriterT>
std::size_t size_est = estimate_serialized_size(mode); std::size_t BagOfCells::serialize_to_impl(WriterT& writer, int mode) {
if (!size_est || size_est > buff_size) { auto store_ref = [&](unsigned long long value) {
return 0; writer.store_uint(value, info.ref_byte_size);
} };
init_store(buffer, buffer + size_est); auto store_offset = [&](unsigned long long value) {
store_uint(info.magic, 4); writer.store_uint(value, info.offset_byte_size);
};
writer.store_uint(info.magic, 4);
td::uint8 byte{0}; td::uint8 byte{0};
if (info.has_index) { if (info.has_index) {
@ -520,9 +636,9 @@ std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_siz
return 0; return 0;
} }
byte |= static_cast<td::uint8>(info.ref_byte_size); byte |= static_cast<td::uint8>(info.ref_byte_size);
store_uint(byte, 1); writer.store_uint(byte, 1);
store_uint(info.offset_byte_size, 1); writer.store_uint(info.offset_byte_size, 1);
store_ref(cell_count); store_ref(cell_count);
store_ref(root_count); store_ref(root_count);
store_ref(0); store_ref(0);
@ -532,7 +648,7 @@ std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_siz
DCHECK(k >= 0 && k < cell_count); DCHECK(k >= 0 && k < cell_count);
store_ref(k); store_ref(k);
} }
DCHECK(store_ptr - buffer == (long long)info.index_offset); DCHECK(writer.position() == info.index_offset);
DCHECK((unsigned)cell_count == cell_list_.size()); DCHECK((unsigned)cell_count == cell_list_.size());
if (info.has_index) { if (info.has_index) {
std::size_t offs = 0; std::size_t offs = 0;
@ -551,8 +667,8 @@ std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_siz
} }
DCHECK(offs == info.data_size); DCHECK(offs == info.data_size);
} }
DCHECK(store_ptr - buffer == (long long)info.data_offset); DCHECK(writer.position() == info.data_offset);
unsigned char* keep_ptr = store_ptr; size_t keep_position = writer.position();
for (int i = 0; i < cell_count; ++i) { for (int i = 0; i < cell_count; ++i) {
const auto& dc_info = cell_list_[cell_count - 1 - i]; const auto& dc_info = cell_list_[cell_count - 1 - i];
const Ref<DataCell>& dc = dc_info.dc_ref; const Ref<DataCell>& dc = dc_info.dc_ref;
@ -560,9 +676,9 @@ std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_siz
if (dc_info.is_root_cell && (mode & Mode::WithTopHash)) { if (dc_info.is_root_cell && (mode & Mode::WithTopHash)) {
with_hash = true; with_hash = true;
} }
int s = dc->serialize(store_ptr, 256, with_hash); unsigned char buf[256];
store_ptr += s; int s = dc->serialize(buf, 256, with_hash);
store_chk(); writer.store_bytes(buf, s);
DCHECK(dc->size_refs() == dc_info.ref_num); DCHECK(dc->size_refs() == dc_info.ref_num);
// std::cerr << (dc_info.is_special() ? '*' : ' ') << i << '<' << (int)dc_info.wt << ">:"; // std::cerr << (dc_info.is_special() ? '*' : ' ') << i << '<' << (int)dc_info.wt << ">:";
for (unsigned j = 0; j < dc_info.ref_num; ++j) { for (unsigned j = 0; j < dc_info.ref_num; ++j) {
@ -573,16 +689,38 @@ std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_siz
} }
// std::cerr << std::endl; // std::cerr << std::endl;
} }
store_chk(); writer.chk();
DCHECK(store_ptr - keep_ptr == (long long)info.data_size); DCHECK(writer.position() - keep_position == info.data_size);
DCHECK(store_end - store_ptr == (info.has_crc32c ? 4 : 0)); DCHECK(writer.remaining() == (info.has_crc32c ? 4 : 0));
if (info.has_crc32c) { if (info.has_crc32c) {
// compute crc32c of buffer .. store_ptr unsigned crc = writer.get_crc32();
unsigned crc = td::crc32c(td::Slice{buffer, store_ptr}); writer.store_uint(td::bswap32(crc), 4);
store_uint(td::bswap32(crc), 4);
} }
DCHECK(store_empty()); DCHECK(writer.empty());
return store_ptr - buffer; return writer.position();
}
std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_size, int mode) {
std::size_t size_est = estimate_serialized_size(mode);
if (!size_est || size_est > buff_size) {
return 0;
}
BufferWriter writer{buffer, buffer + size_est};
return serialize_to_impl(writer, mode);
}
td::Status BagOfCells::serialize_to_file(td::FileFd& fd, int mode) {
std::size_t size_est = estimate_serialized_size(mode);
if (!size_est) {
return td::Status::Error("no cells to serialize to this bag of cells");
}
FileWriter writer{fd, size_est};
size_t s = serialize_to_impl(writer, mode);
TRY_STATUS(writer.finalize());
if (s != size_est) {
return td::Status::Error("error while serializing a bag of cells: actual serialized size differs from estimated");
}
return td::Status::OK();
} }
unsigned long long BagOfCells::Info::read_int(const unsigned char* ptr, unsigned bytes) { unsigned long long BagOfCells::Info::read_int(const unsigned char* ptr, unsigned bytes) {

View file

@ -23,6 +23,7 @@
#include "td/utils/buffer.h" #include "td/utils/buffer.h"
#include "td/utils/HashMap.h" #include "td/utils/HashMap.h"
#include "td/utils/HashSet.h" #include "td/utils/HashSet.h"
#include "td/utils/port/FileFd.h"
namespace vm { namespace vm {
using td::Ref; using td::Ref;
@ -216,8 +217,6 @@ class BagOfCells {
int max_depth{1024}; int max_depth{1024};
Info info; Info info;
unsigned long long data_bytes{0}; unsigned long long data_bytes{0};
unsigned char* store_ptr{nullptr};
unsigned char* store_end{nullptr};
td::HashMap<Hash, int> cells; td::HashMap<Hash, int> cells;
struct CellInfo { struct CellInfo {
Ref<DataCell> dc_ref; Ref<DataCell> dc_ref;
@ -267,6 +266,9 @@ class BagOfCells {
std::string serialize_to_string(int mode = 0); std::string serialize_to_string(int mode = 0);
td::Result<td::BufferSlice> serialize_to_slice(int mode = 0); td::Result<td::BufferSlice> serialize_to_slice(int mode = 0);
std::size_t serialize_to(unsigned char* buffer, std::size_t buff_size, int mode = 0); std::size_t serialize_to(unsigned char* buffer, std::size_t buff_size, int mode = 0);
td::Status serialize_to_file(td::FileFd& fd, int mode = 0);
template<typename WriterT>
std::size_t serialize_to_impl(WriterT& writer, int mode = 0);
std::string extract_string() const; std::string extract_string() const;
td::Result<long long> deserialize(const td::Slice& data, int max_roots = default_max_roots); td::Result<long long> deserialize(const td::Slice& data, int max_roots = default_max_roots);
@ -295,23 +297,6 @@ class BagOfCells {
cell_list_.clear(); cell_list_.clear();
} }
td::uint64 compute_sizes(int mode, int& r_size, int& o_size); td::uint64 compute_sizes(int mode, int& r_size, int& o_size);
void init_store(unsigned char* from, unsigned char* to) {
store_ptr = from;
store_end = to;
}
void store_chk() const {
DCHECK(store_ptr <= store_end);
}
bool store_empty() const {
return store_ptr == store_end;
}
void store_uint(unsigned long long value, unsigned bytes);
void store_ref(unsigned long long value) {
store_uint(value, info.ref_byte_size);
}
void store_offset(unsigned long long value) {
store_uint(value, info.offset_byte_size);
}
void reorder_cells(); void reorder_cells();
int revisit(int cell_idx, int force = 0); int revisit(int cell_idx, int force = 0);
unsigned long long get_idx_entry_raw(int index); unsigned long long get_idx_entry_raw(int index);

View file

@ -336,6 +336,28 @@ void ArchiveManager::add_zero_state(BlockIdExt block_id, td::BufferSlice data, t
void ArchiveManager::add_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice data, void ArchiveManager::add_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice data,
td::Promise<td::Unit> promise) { td::Promise<td::Unit> promise) {
auto create_writer = [&](std::string path, td::Promise<std::string> P) {
td::actor::create_actor<db::WriteFile>("writefile", db_root_ + "/archive/tmp/",
std::move(path), std::move(data), std::move(P))
.release();
};
add_persistent_state_impl(block_id, masterchain_block_id, std::move(promise), std::move(create_writer));
}
void ArchiveManager::add_persistent_state_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_state,
td::Promise<td::Unit> promise) {
auto create_writer = [&](std::string path, td::Promise<std::string> P) {
td::actor::create_actor<db::WriteFile>("writefile", db_root_ + "/archive/tmp/",
std::move(path), std::move(write_state), std::move(P))
.release();
};
add_persistent_state_impl(block_id, masterchain_block_id, std::move(promise), std::move(create_writer));
}
void ArchiveManager::add_persistent_state_impl(BlockIdExt block_id, BlockIdExt masterchain_block_id,
td::Promise<td::Unit> promise,
std::function<void(std::string, td::Promise<std::string>)> create_writer) {
auto id = FileReference{fileref::PersistentState{block_id, masterchain_block_id}}; auto id = FileReference{fileref::PersistentState{block_id, masterchain_block_id}};
auto hash = id.hash(); auto hash = id.hash();
if (perm_states_.find(hash) != perm_states_.end()) { if (perm_states_.find(hash) != perm_states_.end()) {
@ -353,8 +375,7 @@ void ArchiveManager::add_persistent_state(BlockIdExt block_id, BlockIdExt master
promise.set_value(td::Unit()); promise.set_value(td::Unit());
} }
}); });
td::actor::create_actor<db::WriteFile>("writefile", db_root_ + "/archive/tmp/", path, std::move(data), std::move(P)) create_writer(std::move(path), std::move(P));
.release();
} }
void ArchiveManager::get_zero_state(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) { void ArchiveManager::get_zero_state(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) {

View file

@ -45,6 +45,9 @@ class ArchiveManager : public td::actor::Actor {
void add_zero_state(BlockIdExt block_id, td::BufferSlice data, td::Promise<td::Unit> promise); void add_zero_state(BlockIdExt block_id, td::BufferSlice data, td::Promise<td::Unit> promise);
void add_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice data, void add_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice data,
td::Promise<td::Unit> promise); td::Promise<td::Unit> promise);
void add_persistent_state_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_state,
td::Promise<td::Unit> promise);
void get_zero_state(BlockIdExt block_id, td::Promise<td::BufferSlice> promise); void get_zero_state(BlockIdExt block_id, td::Promise<td::BufferSlice> promise);
void get_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise<td::BufferSlice> promise); void get_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise<td::BufferSlice> promise);
void get_persistent_state_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::int64 offset, void get_persistent_state_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::int64 offset,
@ -137,6 +140,8 @@ class ArchiveManager : public td::actor::Actor {
PackageId get_max_temp_file_desc_idx(); PackageId get_max_temp_file_desc_idx();
PackageId get_prev_temp_file_desc_idx(PackageId id); PackageId get_prev_temp_file_desc_idx(PackageId id);
void add_persistent_state_impl(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise,
std::function<void(std::string, td::Promise<std::string>)> create_writer);
void written_perm_state(FileReferenceShort id); void written_perm_state(FileReferenceShort id);
void persistent_state_gc(FileHash last); void persistent_state_gc(FileHash last);

View file

@ -52,30 +52,50 @@ class WriteFile : public td::actor::Actor {
auto res = R.move_as_ok(); auto res = R.move_as_ok();
auto file = std::move(res.first); auto file = std::move(res.first);
auto old_name = res.second; auto old_name = res.second;
td::uint64 offset = 0; auto status = write_data_(file);
while (data_.size() > 0) { if (!status.is_error()) {
auto R = file.pwrite(data_.as_slice(), offset); status = file.sync();
auto s = R.move_as_ok(); }
offset += s; if (status.is_error()) {
data_.confirm_read(s); td::unlink(old_name);
promise_.set_error(std::move(status));
stop();
return;
} }
file.sync().ensure();
if (new_name_.length() > 0) { if (new_name_.length() > 0) {
td::rename(old_name, new_name_).ensure(); status = td::rename(old_name, new_name_);
if (status.is_error()) {
promise_.set_error(std::move(status));
} else {
promise_.set_value(std::move(new_name_)); promise_.set_value(std::move(new_name_));
}
} else { } else {
promise_.set_value(std::move(old_name)); promise_.set_value(std::move(old_name));
} }
stop(); stop();
} }
WriteFile(std::string tmp_dir, std::string new_name, std::function<td::Status(td::FileFd&)> write_data,
td::Promise<std::string> promise)
: tmp_dir_(tmp_dir), new_name_(new_name), write_data_(std::move(write_data)), promise_(std::move(promise)) {
}
WriteFile(std::string tmp_dir, std::string new_name, td::BufferSlice data, td::Promise<std::string> promise) WriteFile(std::string tmp_dir, std::string new_name, td::BufferSlice data, td::Promise<std::string> promise)
: tmp_dir_(tmp_dir), new_name_(new_name), data_(std::move(data)), promise_(std::move(promise)) { : tmp_dir_(tmp_dir), new_name_(new_name), promise_(std::move(promise)) {
write_data_ = [data_ptr = std::make_shared<td::BufferSlice>(std::move(data))] (td::FileFd& fd) {
auto data = std::move(*data_ptr);
td::uint64 offset = 0;
while (data.size() > 0) {
TRY_RESULT(s, fd.pwrite(data.as_slice(), offset));
offset += s;
data.confirm_read(s);
}
return td::Status::OK();
};
} }
private: private:
const std::string tmp_dir_; const std::string tmp_dir_;
std::string new_name_; std::string new_name_;
td::BufferSlice data_; std::function<td::Status(td::FileFd&)> write_data_;
td::Promise<std::string> promise_; td::Promise<std::string> promise_;
}; };

View file

@ -276,6 +276,13 @@ void RootDb::store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterc
std::move(state), std::move(promise)); std::move(state), std::move(promise));
} }
void RootDb::store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_data,
td::Promise<td::Unit> promise) {
td::actor::send_closure(archive_db_, &ArchiveManager::add_persistent_state_gen, block_id, masterchain_block_id,
std::move(write_data), std::move(promise));
}
void RootDb::get_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, void RootDb::get_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id,
td::Promise<td::BufferSlice> promise) { td::Promise<td::BufferSlice> promise) {
td::actor::send_closure(archive_db_, &ArchiveManager::get_persistent_state, block_id, masterchain_block_id, td::actor::send_closure(archive_db_, &ArchiveManager::get_persistent_state, block_id, masterchain_block_id,

View file

@ -69,6 +69,9 @@ class RootDb : public Db {
void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state,
td::Promise<td::Unit> promise) override; td::Promise<td::Unit> promise) override;
void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_data,
td::Promise<td::Unit> promise) override;
void get_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, void get_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id,
td::Promise<td::BufferSlice> promise) override; td::Promise<td::BufferSlice> promise) override;
void get_persistent_state_file_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::int64 offset, void get_persistent_state_file_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::int64 offset,

View file

@ -26,6 +26,7 @@
#include "vm/cells/MerkleUpdate.h" #include "vm/cells/MerkleUpdate.h"
#include "block/block-parse.h" #include "block/block-parse.h"
#include "block/block-auto.h" #include "block/block-auto.h"
#include "td/utils/filesystem.h"
#define LAZY_STATE_DESERIALIZE 1 #define LAZY_STATE_DESERIALIZE 1
@ -301,6 +302,30 @@ td::Result<td::BufferSlice> ShardStateQ::serialize() const {
return st_res.move_as_ok(); return st_res.move_as_ok();
} }
td::Status ShardStateQ::serialize_to_file(td::FileFd& fd) const {
td::PerfWarningTimer perf_timer_{"serializestate", 0.1};
if (!data.is_null()) {
auto cur_data = data.clone();
while (cur_data.size() > 0) {
TRY_RESULT(s, fd.write(cur_data.as_slice()));
cur_data.confirm_read(s);
}
return td::Status::OK();
}
if (root.is_null()) {
return td::Status::Error(-666, "cannot serialize an uninitialized state");
}
vm::BagOfCells new_boc;
new_boc.set_root(root);
TRY_STATUS(new_boc.import_cells());
auto st_res = new_boc.serialize_to_file(fd, 31);
if (st_res.is_error()) {
LOG(ERROR) << "cannot serialize a shardchain state";
return st_res.move_as_error();
}
return td::Status::OK();
}
MasterchainStateQ::MasterchainStateQ(const BlockIdExt& _id, td::BufferSlice _data) MasterchainStateQ::MasterchainStateQ(const BlockIdExt& _id, td::BufferSlice _data)
: MasterchainState(), ShardStateQ(_id, std::move(_data)) { : MasterchainState(), ShardStateQ(_id, std::move(_data)) {
} }

View file

@ -87,6 +87,7 @@ class ShardStateQ : virtual public ShardState {
td::Result<Ref<ShardState>> merge_with(const ShardState& with) const override; td::Result<Ref<ShardState>> merge_with(const ShardState& with) const override;
td::Result<std::pair<Ref<ShardState>, Ref<ShardState>>> split() const override; td::Result<std::pair<Ref<ShardState>, Ref<ShardState>>> split() const override;
td::Result<td::BufferSlice> serialize() const override; td::Result<td::BufferSlice> serialize() const override;
td::Status serialize_to_file(td::FileFd& fd) const override;
}; };
#if TD_MSVC #if TD_MSVC

View file

@ -53,6 +53,9 @@ class Db : public td::actor::Actor {
virtual void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, virtual void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state,
td::Promise<td::Unit> promise) = 0; td::Promise<td::Unit> promise) = 0;
virtual void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_data,
td::Promise<td::Unit> promise) = 0;
virtual void get_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, virtual void get_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id,
td::Promise<td::BufferSlice> promise) = 0; td::Promise<td::BufferSlice> promise) = 0;
virtual void get_persistent_state_file_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::int64 offset, virtual void get_persistent_state_file_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::int64 offset,

View file

@ -55,6 +55,7 @@ class ShardState : public td::CntObject {
virtual td::Result<std::pair<td::Ref<ShardState>, td::Ref<ShardState>>> split() const = 0; virtual td::Result<std::pair<td::Ref<ShardState>, td::Ref<ShardState>>> split() const = 0;
virtual td::Result<td::BufferSlice> serialize() const = 0; virtual td::Result<td::BufferSlice> serialize() const = 0;
virtual td::Status serialize_to_file(td::FileFd& fd) const = 0;
}; };
class MasterchainState : virtual public ShardState { class MasterchainState : virtual public ShardState {

View file

@ -57,6 +57,9 @@ class ValidatorManager : public ValidatorManagerInterface {
td::Promise<td::Ref<ShardState>> promise) = 0; td::Promise<td::Ref<ShardState>> promise) = 0;
virtual void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, virtual void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state,
td::Promise<td::Unit> promise) = 0; td::Promise<td::Unit> promise) = 0;
virtual void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_data,
td::Promise<td::Unit> promise) = 0;
virtual void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) = 0; virtual void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) = 0;
virtual void wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout, virtual void wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<ShardState>> promise) = 0; td::Promise<td::Ref<ShardState>> promise) = 0;

View file

@ -680,6 +680,13 @@ void ValidatorManagerImpl::store_persistent_state_file(BlockIdExt block_id, Bloc
std::move(promise)); std::move(promise));
} }
void ValidatorManagerImpl::store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_data,
td::Promise<td::Unit> promise) {
td::actor::send_closure(db_, &Db::store_persistent_state_file_gen, block_id, masterchain_block_id,
std::move(write_data), std::move(promise));
}
void ValidatorManagerImpl::store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, void ValidatorManagerImpl::store_zero_state_file(BlockIdExt block_id, td::BufferSlice state,
td::Promise<td::Unit> promise) { td::Promise<td::Unit> promise) {
td::actor::send_closure(db_, &Db::store_zero_state_file, block_id, std::move(state), std::move(promise)); td::actor::send_closure(db_, &Db::store_zero_state_file, block_id, std::move(state), std::move(promise));

View file

@ -143,6 +143,9 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Ref<ShardState>> promise) override; td::Promise<td::Ref<ShardState>> promise) override;
void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state,
td::Promise<td::Unit> promise) override; td::Promise<td::Unit> promise) override;
void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_data,
td::Promise<td::Unit> promise) override;
void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) override; void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) override;
void wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout, void wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<ShardState>> promise) override; td::Promise<td::Ref<ShardState>> promise) override;

View file

@ -169,6 +169,11 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Unit> promise) override { td::Promise<td::Unit> promise) override {
UNREACHABLE(); UNREACHABLE();
} }
void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_data,
td::Promise<td::Unit> promise) override {
UNREACHABLE();
}
void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) override { void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) override {
UNREACHABLE(); UNREACHABLE();
} }

View file

@ -1051,6 +1051,13 @@ void ValidatorManagerImpl::store_persistent_state_file(BlockIdExt block_id, Bloc
std::move(promise)); std::move(promise));
} }
void ValidatorManagerImpl::store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_data,
td::Promise<td::Unit> promise) {
td::actor::send_closure(db_, &Db::store_persistent_state_file_gen, block_id, masterchain_block_id,
std::move(write_data), std::move(promise));
}
void ValidatorManagerImpl::store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, void ValidatorManagerImpl::store_zero_state_file(BlockIdExt block_id, td::BufferSlice state,
td::Promise<td::Unit> promise) { td::Promise<td::Unit> promise) {
td::actor::send_closure(db_, &Db::store_zero_state_file, block_id, std::move(state), std::move(promise)); td::actor::send_closure(db_, &Db::store_zero_state_file, block_id, std::move(state), std::move(promise));

View file

@ -347,6 +347,9 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Ref<ShardState>> promise) override; td::Promise<td::Ref<ShardState>> promise) override;
void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state,
td::Promise<td::Unit> promise) override; td::Promise<td::Unit> promise) override;
void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id,
std::function<td::Status(td::FileFd&)> write_data,
td::Promise<td::Unit> promise) override;
void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) override; void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise<td::Unit> promise) override;
void wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout, void wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<ShardState>> promise) override; td::Promise<td::Ref<ShardState>> promise) override;

View file

@ -149,6 +149,7 @@ void DownloadState::got_block_state_description(td::BufferSlice data) {
abort_query(F.move_as_error()); abort_query(F.move_as_error());
return; return;
} }
prev_logged_timer_ = td::Timer();
ton_api::downcast_call( ton_api::downcast_call(
*F.move_as_ok().get(), *F.move_as_ok().get(),
@ -188,8 +189,12 @@ void DownloadState::got_block_state_part(td::BufferSlice data, td::uint32 reques
sum_ += data.size(); sum_ += data.size();
parts_.push_back(std::move(data)); parts_.push_back(std::move(data));
if (sum_ % (1 << 22) == 0) { double elapsed = prev_logged_timer_.elapsed();
LOG(DEBUG) << "downloading state " << block_id_ << ": total=" << sum_; if (elapsed > 10.0) {
prev_logged_timer_ = td::Timer();
LOG(INFO) << "downloading state " << block_id_ << ": total=" << sum_ <<
" (" << double(sum_ - prev_logged_sum_) / elapsed << " B/s)";
prev_logged_sum_ = sum_;
} }
if (last_part) { if (last_part) {

View file

@ -72,6 +72,9 @@ class DownloadState : public td::actor::Actor {
td::BufferSlice state_; td::BufferSlice state_;
std::vector<td::BufferSlice> parts_; std::vector<td::BufferSlice> parts_;
td::uint64 sum_ = 0; td::uint64 sum_ = 0;
td::uint64 prev_logged_sum_ = 0;
td::Timer prev_logged_timer_;
}; };
} // namespace fullnode } // namespace fullnode

View file

@ -122,14 +122,12 @@ void AsyncStateSerializer::next_iteration() {
CHECK(masterchain_handle_->id() == last_block_id_); CHECK(masterchain_handle_->id() == last_block_id_);
if (attempt_ < max_attempt() && last_key_block_id_.id.seqno < last_block_id_.id.seqno && if (attempt_ < max_attempt() && last_key_block_id_.id.seqno < last_block_id_.id.seqno &&
need_serialize(masterchain_handle_)) { need_serialize(masterchain_handle_)) {
if (masterchain_state_.is_null()) { if (!have_masterchain_state_) {
// block next attempts immediately, but send actual request later // block next attempts immediately, but send actual request later
running_ = true; running_ = true;
delay_action( delay_action(
[SelfId = actor_id(this)]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_masterchain_state); }, [SelfId = actor_id(this)]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_masterchain_state); },
// Masterchain is more important and much lighter than shards td::Timestamp::in(td::Random::fast(0, 3600)));
// thus lower delay
td::Timestamp::in(td::Random::fast(0, 600)));
return; return;
} }
while (next_idx_ < shards_.size()) { while (next_idx_ < shards_.size()) {
@ -140,8 +138,6 @@ void AsyncStateSerializer::next_iteration() {
running_ = true; running_ = true;
delay_action( delay_action(
[SelfId = actor_id(this), shard = shards_[next_idx_]]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard); }, [SelfId = actor_id(this), shard = shards_[next_idx_]]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard); },
// Shards are less important and heavier than master
// thus higher delay
td::Timestamp::in(td::Random::fast(0, 4 * 3600))); td::Timestamp::in(td::Random::fast(0, 4 * 3600)));
return; return;
} }
@ -162,7 +158,7 @@ void AsyncStateSerializer::next_iteration() {
} }
if (masterchain_handle_->inited_next_left()) { if (masterchain_handle_->inited_next_left()) {
last_block_id_ = masterchain_handle_->one_next(true); last_block_id_ = masterchain_handle_->one_next(true);
masterchain_state_ = td::Ref<MasterchainState>{}; have_masterchain_state_ = false;
masterchain_handle_ = nullptr; masterchain_handle_ = nullptr;
saved_to_db_ = false; saved_to_db_ = false;
shards_.clear(); shards_.clear();
@ -186,25 +182,25 @@ void AsyncStateSerializer::got_masterchain_handle(BlockHandle handle) {
} }
void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state) { void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state) {
masterchain_state_ = state; have_masterchain_state_ = true;
CHECK(next_idx_ == 0); CHECK(next_idx_ == 0);
CHECK(shards_.size() == 0); CHECK(shards_.size() == 0);
auto vec = masterchain_state_->get_shards(); auto vec = state->get_shards();
shards_.push_back(masterchain_handle_->id());
for (auto &v : vec) { for (auto &v : vec) {
shards_.push_back(v->top_block_id()); shards_.push_back(v->top_block_id());
} }
auto B = masterchain_state_->serialize(); auto write_data = [state] (td::FileFd& fd) {
B.ensure(); return state->serialize_to_file(fd);
};
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
R.ensure(); R.ensure();
td::actor::send_closure(SelfId, &AsyncStateSerializer::stored_masterchain_state); td::actor::send_closure(SelfId, &AsyncStateSerializer::stored_masterchain_state);
}); });
td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file, masterchain_handle_->id(), td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, masterchain_handle_->id(),
masterchain_handle_->id(), B.move_as_ok(), std::move(P)); masterchain_handle_->id(), write_data, std::move(P));
} }
void AsyncStateSerializer::stored_masterchain_state() { void AsyncStateSerializer::stored_masterchain_state() {
@ -225,13 +221,15 @@ void AsyncStateSerializer::got_shard_handle(BlockHandle handle) {
} }
void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref<ShardState> state) { void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref<ShardState> state) {
auto B = state->serialize().move_as_ok(); auto write_data = [state] (td::FileFd& fd) {
return state->serialize_to_file(fd);
};
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
R.ensure(); R.ensure();
td::actor::send_closure(SelfId, &AsyncStateSerializer::success_handler); td::actor::send_closure(SelfId, &AsyncStateSerializer::success_handler);
}); });
td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file, handle->id(), td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, handle->id(),
masterchain_handle_->id(), std::move(B), std::move(P)); masterchain_handle_->id(), write_data, std::move(P));
LOG(INFO) << "storing persistent state for " << masterchain_handle_->id().seqno() << ":" << handle->id().id.shard; LOG(INFO) << "storing persistent state for " << masterchain_handle_->id().seqno() << ":" << handle->id().id.shard;
next_idx_++; next_idx_++;
} }

View file

@ -43,7 +43,7 @@ class AsyncStateSerializer : public td::actor::Actor {
td::uint32 next_idx_ = 0; td::uint32 next_idx_ = 0;
BlockHandle masterchain_handle_; BlockHandle masterchain_handle_;
td::Ref<MasterchainState> masterchain_state_; bool have_masterchain_state_ = false;
std::vector<BlockIdExt> shards_; std::vector<BlockIdExt> shards_;

View file

@ -96,7 +96,7 @@ struct ValidatorManagerOptions : public td::CntObject {
BlockIdExt zero_block_id, BlockIdExt init_block_id, BlockIdExt zero_block_id, BlockIdExt init_block_id,
std::function<bool(ShardIdFull, CatchainSeqno, ShardCheckMode)> check_shard = [](ShardIdFull, CatchainSeqno, std::function<bool(ShardIdFull, CatchainSeqno, ShardCheckMode)> check_shard = [](ShardIdFull, CatchainSeqno,
ShardCheckMode) { return true; }, ShardCheckMode) { return true; },
bool allow_blockchain_init = false, double sync_blocks_before = 300, double block_ttl = 86400 * 7, bool allow_blockchain_init = false, double sync_blocks_before = 86400, double block_ttl = 86400 * 7,
double state_ttl = 3600, double archive_ttl = 86400 * 365, double key_proof_ttl = 86400 * 3650, double state_ttl = 3600, double archive_ttl = 86400 * 365, double key_proof_ttl = 86400 * 3650,
double max_mempool_num = 999999, double max_mempool_num = 999999,
bool initial_sync_disabled = false); bool initial_sync_disabled = false);