diff --git a/crypto/vm/boc.cpp b/crypto/vm/boc.cpp index c53595ff..8ff244d0 100644 --- a/crypto/vm/boc.cpp +++ b/crypto/vm/boc.cpp @@ -475,15 +475,128 @@ std::string BagOfCells::extract_string() const { return std::string{serialized.data(), serialized.data() + serialized.size()}; } -void BagOfCells::store_uint(unsigned long long value, unsigned bytes) { - unsigned char* ptr = store_ptr += bytes; - store_chk(); - while (bytes) { - *--ptr = value & 0xff; - value >>= 8; - --bytes; +namespace { +struct BufferWriter { + BufferWriter(unsigned char* store_start, unsigned char* store_end) + : store_start(store_start), store_ptr(store_start), store_end(store_end) {} + + size_t position() const { + return store_ptr - store_start; } - DCHECK(!bytes); + size_t remaining() const { + return store_end - store_ptr; + } + void chk() const { + DCHECK(store_ptr <= store_end); + } + bool empty() const { + return store_ptr == store_end; + } + void store_uint(unsigned long long value, unsigned bytes) { + unsigned char* ptr = store_ptr += bytes; + chk(); + while (bytes) { + *--ptr = value & 0xff; + value >>= 8; + --bytes; + } + DCHECK(!bytes); + } + void store_bytes(unsigned char const* data, size_t s) { + store_ptr += s; + chk(); + memcpy(store_ptr - s, data, s); + } + unsigned get_crc32() const { + return td::crc32c(td::Slice{store_start, store_ptr}); + } + + private: + unsigned char* store_start; + unsigned char* store_ptr; + unsigned char* store_end; +}; + +struct FileWriter { + FileWriter(td::FileFd& fd, size_t expected_size) + : fd(fd), expected_size(expected_size) {} + + ~FileWriter() { + flush(); + } + + size_t position() const { + return flushed_size + writer.position(); + } + size_t remaining() const { + return expected_size - position(); + } + void chk() const { + DCHECK(position() <= expected_size); + } + bool empty() const { + return remaining() == 0; + } + void store_uint(unsigned long long value, unsigned bytes) { + flush_if_needed(bytes); + writer.store_uint(value, bytes); + } + void store_bytes(unsigned char const* data, size_t s) { + flush_if_needed(s); + writer.store_bytes(data, s); + } + unsigned get_crc32() const { + unsigned char const* start = buf.data(); + unsigned char const* end = start + writer.position(); + return td::crc32c_extend(current_crc32, td::Slice(start, end)); + } + + td::Status finalize() { + flush(); + return std::move(res); + } + + private: + void flush_if_needed(size_t s) { + DCHECK(s <= BUF_SIZE); + if (s > BUF_SIZE - writer.position()) { + flush(); + } + } + + void flush() { + chk(); + unsigned char* start = buf.data(); + unsigned char* end = start + writer.position(); + if (start == end) { + return; + } + flushed_size += end - start; + current_crc32 = td::crc32c_extend(current_crc32, td::Slice(start, end)); + if (res.is_ok()) { + while (end > start) { + auto R = fd.write(td::Slice(start, end)); + if (R.is_error()) { + res = R.move_as_error(); + break; + } + size_t s = R.move_as_ok(); + start += s; + } + } + writer = BufferWriter(buf.data(), buf.data() + buf.size()); + } + + td::FileFd& fd; + size_t expected_size; + size_t flushed_size = 0; + unsigned current_crc32 = td::crc32c(td::Slice()); + + static const size_t BUF_SIZE = 1 << 22; + std::vector buf = std::vector(BUF_SIZE, '\0'); + BufferWriter writer = BufferWriter(buf.data(), buf.data() + buf.size()); + td::Status res = td::Status::OK(); +}; } //serialized_boc#672fb0ac has_idx:(## 1) has_crc32c:(## 1) @@ -497,13 +610,16 @@ void BagOfCells::store_uint(unsigned long long value, unsigned bytes) { // index:(cells * ##(off_bytes * 8)) // cell_data:(tot_cells_size * [ uint8 ]) // = BagOfCells; -std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_size, int mode) { - std::size_t size_est = estimate_serialized_size(mode); - if (!size_est || size_est > buff_size) { - return 0; - } - init_store(buffer, buffer + size_est); - store_uint(info.magic, 4); +template +std::size_t BagOfCells::serialize_to_impl(WriterT& writer, int mode) { + auto store_ref = [&](unsigned long long value) { + writer.store_uint(value, info.ref_byte_size); + }; + auto store_offset = [&](unsigned long long value) { + writer.store_uint(value, info.offset_byte_size); + }; + + writer.store_uint(info.magic, 4); td::uint8 byte{0}; if (info.has_index) { @@ -520,9 +636,9 @@ std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_siz return 0; } byte |= static_cast(info.ref_byte_size); - store_uint(byte, 1); + writer.store_uint(byte, 1); - store_uint(info.offset_byte_size, 1); + writer.store_uint(info.offset_byte_size, 1); store_ref(cell_count); store_ref(root_count); store_ref(0); @@ -532,7 +648,7 @@ std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_siz DCHECK(k >= 0 && k < cell_count); store_ref(k); } - DCHECK(store_ptr - buffer == (long long)info.index_offset); + DCHECK(writer.position() == info.index_offset); DCHECK((unsigned)cell_count == cell_list_.size()); if (info.has_index) { std::size_t offs = 0; @@ -551,8 +667,8 @@ std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_siz } DCHECK(offs == info.data_size); } - DCHECK(store_ptr - buffer == (long long)info.data_offset); - unsigned char* keep_ptr = store_ptr; + DCHECK(writer.position() == info.data_offset); + size_t keep_position = writer.position(); for (int i = 0; i < cell_count; ++i) { const auto& dc_info = cell_list_[cell_count - 1 - i]; const Ref& dc = dc_info.dc_ref; @@ -560,9 +676,9 @@ std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_siz if (dc_info.is_root_cell && (mode & Mode::WithTopHash)) { with_hash = true; } - int s = dc->serialize(store_ptr, 256, with_hash); - store_ptr += s; - store_chk(); + unsigned char buf[256]; + int s = dc->serialize(buf, 256, with_hash); + writer.store_bytes(buf, s); DCHECK(dc->size_refs() == dc_info.ref_num); // std::cerr << (dc_info.is_special() ? '*' : ' ') << i << '<' << (int)dc_info.wt << ">:"; for (unsigned j = 0; j < dc_info.ref_num; ++j) { @@ -573,16 +689,38 @@ std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_siz } // std::cerr << std::endl; } - store_chk(); - DCHECK(store_ptr - keep_ptr == (long long)info.data_size); - DCHECK(store_end - store_ptr == (info.has_crc32c ? 4 : 0)); + writer.chk(); + DCHECK(writer.position() - keep_position == info.data_size); + DCHECK(writer.remaining() == (info.has_crc32c ? 4 : 0)); if (info.has_crc32c) { - // compute crc32c of buffer .. store_ptr - unsigned crc = td::crc32c(td::Slice{buffer, store_ptr}); - store_uint(td::bswap32(crc), 4); + unsigned crc = writer.get_crc32(); + writer.store_uint(td::bswap32(crc), 4); } - DCHECK(store_empty()); - return store_ptr - buffer; + DCHECK(writer.empty()); + return writer.position(); +} + +std::size_t BagOfCells::serialize_to(unsigned char* buffer, std::size_t buff_size, int mode) { + std::size_t size_est = estimate_serialized_size(mode); + if (!size_est || size_est > buff_size) { + return 0; + } + BufferWriter writer{buffer, buffer + size_est}; + return serialize_to_impl(writer, mode); +} + +td::Status BagOfCells::serialize_to_file(td::FileFd& fd, int mode) { + std::size_t size_est = estimate_serialized_size(mode); + if (!size_est) { + return td::Status::Error("no cells to serialize to this bag of cells"); + } + FileWriter writer{fd, size_est}; + size_t s = serialize_to_impl(writer, mode); + TRY_STATUS(writer.finalize()); + if (s != size_est) { + return td::Status::Error("error while serializing a bag of cells: actual serialized size differs from estimated"); + } + return td::Status::OK(); } unsigned long long BagOfCells::Info::read_int(const unsigned char* ptr, unsigned bytes) { diff --git a/crypto/vm/boc.h b/crypto/vm/boc.h index cc277a8c..02078e27 100644 --- a/crypto/vm/boc.h +++ b/crypto/vm/boc.h @@ -23,6 +23,7 @@ #include "td/utils/buffer.h" #include "td/utils/HashMap.h" #include "td/utils/HashSet.h" +#include "td/utils/port/FileFd.h" namespace vm { using td::Ref; @@ -216,8 +217,6 @@ class BagOfCells { int max_depth{1024}; Info info; unsigned long long data_bytes{0}; - unsigned char* store_ptr{nullptr}; - unsigned char* store_end{nullptr}; td::HashMap cells; struct CellInfo { Ref dc_ref; @@ -267,6 +266,9 @@ class BagOfCells { std::string serialize_to_string(int mode = 0); td::Result serialize_to_slice(int mode = 0); std::size_t serialize_to(unsigned char* buffer, std::size_t buff_size, int mode = 0); + td::Status serialize_to_file(td::FileFd& fd, int mode = 0); + template + std::size_t serialize_to_impl(WriterT& writer, int mode = 0); std::string extract_string() const; td::Result deserialize(const td::Slice& data, int max_roots = default_max_roots); @@ -295,23 +297,6 @@ class BagOfCells { cell_list_.clear(); } td::uint64 compute_sizes(int mode, int& r_size, int& o_size); - void init_store(unsigned char* from, unsigned char* to) { - store_ptr = from; - store_end = to; - } - void store_chk() const { - DCHECK(store_ptr <= store_end); - } - bool store_empty() const { - return store_ptr == store_end; - } - void store_uint(unsigned long long value, unsigned bytes); - void store_ref(unsigned long long value) { - store_uint(value, info.ref_byte_size); - } - void store_offset(unsigned long long value) { - store_uint(value, info.offset_byte_size); - } void reorder_cells(); int revisit(int cell_idx, int force = 0); unsigned long long get_idx_entry_raw(int index); diff --git a/validator/db/archive-manager.cpp b/validator/db/archive-manager.cpp index fb8f2c00..e7614154 100644 --- a/validator/db/archive-manager.cpp +++ b/validator/db/archive-manager.cpp @@ -336,6 +336,28 @@ void ArchiveManager::add_zero_state(BlockIdExt block_id, td::BufferSlice data, t void ArchiveManager::add_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice data, td::Promise promise) { + auto create_writer = [&](std::string path, td::Promise P) { + td::actor::create_actor("writefile", db_root_ + "/archive/tmp/", + std::move(path), std::move(data), std::move(P)) + .release(); + }; + add_persistent_state_impl(block_id, masterchain_block_id, std::move(promise), std::move(create_writer)); +} + +void ArchiveManager::add_persistent_state_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_state, + td::Promise promise) { + auto create_writer = [&](std::string path, td::Promise P) { + td::actor::create_actor("writefile", db_root_ + "/archive/tmp/", + std::move(path), std::move(write_state), std::move(P)) + .release(); + }; + add_persistent_state_impl(block_id, masterchain_block_id, std::move(promise), std::move(create_writer)); +} + +void ArchiveManager::add_persistent_state_impl(BlockIdExt block_id, BlockIdExt masterchain_block_id, + td::Promise promise, + std::function)> create_writer) { auto id = FileReference{fileref::PersistentState{block_id, masterchain_block_id}}; auto hash = id.hash(); if (perm_states_.find(hash) != perm_states_.end()) { @@ -353,8 +375,7 @@ void ArchiveManager::add_persistent_state(BlockIdExt block_id, BlockIdExt master promise.set_value(td::Unit()); } }); - td::actor::create_actor("writefile", db_root_ + "/archive/tmp/", path, std::move(data), std::move(P)) - .release(); + create_writer(std::move(path), std::move(P)); } void ArchiveManager::get_zero_state(BlockIdExt block_id, td::Promise promise) { diff --git a/validator/db/archive-manager.hpp b/validator/db/archive-manager.hpp index dbe6e4d4..31e0e6b6 100644 --- a/validator/db/archive-manager.hpp +++ b/validator/db/archive-manager.hpp @@ -45,6 +45,9 @@ class ArchiveManager : public td::actor::Actor { void add_zero_state(BlockIdExt block_id, td::BufferSlice data, td::Promise promise); void add_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice data, td::Promise promise); + void add_persistent_state_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_state, + td::Promise promise); void get_zero_state(BlockIdExt block_id, td::Promise promise); void get_persistent_state(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise promise); void get_persistent_state_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::int64 offset, @@ -137,6 +140,8 @@ class ArchiveManager : public td::actor::Actor { PackageId get_max_temp_file_desc_idx(); PackageId get_prev_temp_file_desc_idx(PackageId id); + void add_persistent_state_impl(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise promise, + std::function)> create_writer); void written_perm_state(FileReferenceShort id); void persistent_state_gc(FileHash last); diff --git a/validator/db/files-async.hpp b/validator/db/files-async.hpp index 1f2ab5ce..2da534bf 100644 --- a/validator/db/files-async.hpp +++ b/validator/db/files-async.hpp @@ -52,30 +52,50 @@ class WriteFile : public td::actor::Actor { auto res = R.move_as_ok(); auto file = std::move(res.first); auto old_name = res.second; - td::uint64 offset = 0; - while (data_.size() > 0) { - auto R = file.pwrite(data_.as_slice(), offset); - auto s = R.move_as_ok(); - offset += s; - data_.confirm_read(s); + auto status = write_data_(file); + if (!status.is_error()) { + status = file.sync(); + } + if (status.is_error()) { + td::unlink(old_name); + promise_.set_error(std::move(status)); + stop(); + return; } - file.sync().ensure(); if (new_name_.length() > 0) { - td::rename(old_name, new_name_).ensure(); - promise_.set_value(std::move(new_name_)); + status = td::rename(old_name, new_name_); + if (status.is_error()) { + promise_.set_error(std::move(status)); + } else { + promise_.set_value(std::move(new_name_)); + } } else { promise_.set_value(std::move(old_name)); } stop(); } + WriteFile(std::string tmp_dir, std::string new_name, std::function write_data, + td::Promise promise) + : tmp_dir_(tmp_dir), new_name_(new_name), write_data_(std::move(write_data)), promise_(std::move(promise)) { + } WriteFile(std::string tmp_dir, std::string new_name, td::BufferSlice data, td::Promise promise) - : tmp_dir_(tmp_dir), new_name_(new_name), data_(std::move(data)), promise_(std::move(promise)) { + : tmp_dir_(tmp_dir), new_name_(new_name), promise_(std::move(promise)) { + write_data_ = [data_ptr = std::make_shared(std::move(data))] (td::FileFd& fd) { + auto data = std::move(*data_ptr); + td::uint64 offset = 0; + while (data.size() > 0) { + TRY_RESULT(s, fd.pwrite(data.as_slice(), offset)); + offset += s; + data.confirm_read(s); + } + return td::Status::OK(); + }; } private: const std::string tmp_dir_; std::string new_name_; - td::BufferSlice data_; + std::function write_data_; td::Promise promise_; }; diff --git a/validator/db/rootdb.cpp b/validator/db/rootdb.cpp index ef88ad5f..58e3dd2f 100644 --- a/validator/db/rootdb.cpp +++ b/validator/db/rootdb.cpp @@ -276,6 +276,13 @@ void RootDb::store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterc std::move(state), std::move(promise)); } +void RootDb::store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_data, + td::Promise promise) { + td::actor::send_closure(archive_db_, &ArchiveManager::add_persistent_state_gen, block_id, masterchain_block_id, + std::move(write_data), std::move(promise)); +} + void RootDb::get_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise promise) { td::actor::send_closure(archive_db_, &ArchiveManager::get_persistent_state, block_id, masterchain_block_id, diff --git a/validator/db/rootdb.hpp b/validator/db/rootdb.hpp index c4d642e7..2654d482 100644 --- a/validator/db/rootdb.hpp +++ b/validator/db/rootdb.hpp @@ -69,6 +69,9 @@ class RootDb : public Db { void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, td::Promise promise) override; + void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_data, + td::Promise promise) override; void get_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise promise) override; void get_persistent_state_file_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::int64 offset, diff --git a/validator/impl/shard.cpp b/validator/impl/shard.cpp index 18f71d32..a96f1a81 100644 --- a/validator/impl/shard.cpp +++ b/validator/impl/shard.cpp @@ -26,6 +26,7 @@ #include "vm/cells/MerkleUpdate.h" #include "block/block-parse.h" #include "block/block-auto.h" +#include "td/utils/filesystem.h" #define LAZY_STATE_DESERIALIZE 1 @@ -301,6 +302,30 @@ td::Result ShardStateQ::serialize() const { return st_res.move_as_ok(); } +td::Status ShardStateQ::serialize_to_file(td::FileFd& fd) const { + td::PerfWarningTimer perf_timer_{"serializestate", 0.1}; + if (!data.is_null()) { + auto cur_data = data.clone(); + while (cur_data.size() > 0) { + TRY_RESULT(s, fd.write(cur_data.as_slice())); + cur_data.confirm_read(s); + } + return td::Status::OK(); + } + if (root.is_null()) { + return td::Status::Error(-666, "cannot serialize an uninitialized state"); + } + vm::BagOfCells new_boc; + new_boc.set_root(root); + TRY_STATUS(new_boc.import_cells()); + auto st_res = new_boc.serialize_to_file(fd, 31); + if (st_res.is_error()) { + LOG(ERROR) << "cannot serialize a shardchain state"; + return st_res.move_as_error(); + } + return td::Status::OK(); +} + MasterchainStateQ::MasterchainStateQ(const BlockIdExt& _id, td::BufferSlice _data) : MasterchainState(), ShardStateQ(_id, std::move(_data)) { } diff --git a/validator/impl/shard.hpp b/validator/impl/shard.hpp index 862b0fc5..0056df11 100644 --- a/validator/impl/shard.hpp +++ b/validator/impl/shard.hpp @@ -87,6 +87,7 @@ class ShardStateQ : virtual public ShardState { td::Result> merge_with(const ShardState& with) const override; td::Result, Ref>> split() const override; td::Result serialize() const override; + td::Status serialize_to_file(td::FileFd& fd) const override; }; #if TD_MSVC diff --git a/validator/interfaces/db.h b/validator/interfaces/db.h index f49a7819..9983572b 100644 --- a/validator/interfaces/db.h +++ b/validator/interfaces/db.h @@ -53,6 +53,9 @@ class Db : public td::actor::Actor { virtual void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, td::Promise promise) = 0; + virtual void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_data, + td::Promise promise) = 0; virtual void get_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise promise) = 0; virtual void get_persistent_state_file_slice(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::int64 offset, diff --git a/validator/interfaces/shard.h b/validator/interfaces/shard.h index 0cfbda43..96ca8da2 100644 --- a/validator/interfaces/shard.h +++ b/validator/interfaces/shard.h @@ -55,6 +55,7 @@ class ShardState : public td::CntObject { virtual td::Result, td::Ref>> split() const = 0; virtual td::Result serialize() const = 0; + virtual td::Status serialize_to_file(td::FileFd& fd) const = 0; }; class MasterchainState : virtual public ShardState { diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index 1d4a7c9f..42e99ecc 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -57,6 +57,9 @@ class ValidatorManager : public ValidatorManagerInterface { td::Promise> promise) = 0; virtual void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, td::Promise promise) = 0; + virtual void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_data, + td::Promise promise) = 0; virtual void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise promise) = 0; virtual void wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) = 0; diff --git a/validator/manager-disk.cpp b/validator/manager-disk.cpp index e43e37b2..404f7fe8 100644 --- a/validator/manager-disk.cpp +++ b/validator/manager-disk.cpp @@ -680,6 +680,13 @@ void ValidatorManagerImpl::store_persistent_state_file(BlockIdExt block_id, Bloc std::move(promise)); } +void ValidatorManagerImpl::store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_data, + td::Promise promise) { + td::actor::send_closure(db_, &Db::store_persistent_state_file_gen, block_id, masterchain_block_id, + std::move(write_data), std::move(promise)); +} + void ValidatorManagerImpl::store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise promise) { td::actor::send_closure(db_, &Db::store_zero_state_file, block_id, std::move(state), std::move(promise)); diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index fa365ccf..bb740b4c 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -143,6 +143,9 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, td::Promise promise) override; + void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_data, + td::Promise promise) override; void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise promise) override; void wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 9938bcb2..71157dcc 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -169,6 +169,11 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise promise) override { UNREACHABLE(); } + void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_data, + td::Promise promise) override { + UNREACHABLE(); + } void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise promise) override { UNREACHABLE(); } diff --git a/validator/manager.cpp b/validator/manager.cpp index e6fe5954..28e5cd3a 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -1051,6 +1051,13 @@ void ValidatorManagerImpl::store_persistent_state_file(BlockIdExt block_id, Bloc std::move(promise)); } +void ValidatorManagerImpl::store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_data, + td::Promise promise) { + td::actor::send_closure(db_, &Db::store_persistent_state_file_gen, block_id, masterchain_block_id, + std::move(write_data), std::move(promise)); +} + void ValidatorManagerImpl::store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise promise) { td::actor::send_closure(db_, &Db::store_zero_state_file, block_id, std::move(state), std::move(promise)); diff --git a/validator/manager.hpp b/validator/manager.hpp index 19ee4cdd..123625b2 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -347,6 +347,9 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state, td::Promise promise) override; + void store_persistent_state_file_gen(BlockIdExt block_id, BlockIdExt masterchain_block_id, + std::function write_data, + td::Promise promise) override; void store_zero_state_file(BlockIdExt block_id, td::BufferSlice state, td::Promise promise) override; void wait_block_state(BlockHandle handle, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; diff --git a/validator/net/download-state.cpp b/validator/net/download-state.cpp index 49986ce9..fedfaae8 100644 --- a/validator/net/download-state.cpp +++ b/validator/net/download-state.cpp @@ -149,6 +149,7 @@ void DownloadState::got_block_state_description(td::BufferSlice data) { abort_query(F.move_as_error()); return; } + prev_logged_timer_ = td::Timer(); ton_api::downcast_call( *F.move_as_ok().get(), @@ -188,8 +189,12 @@ void DownloadState::got_block_state_part(td::BufferSlice data, td::uint32 reques sum_ += data.size(); parts_.push_back(std::move(data)); - if (sum_ % (1 << 22) == 0) { - LOG(DEBUG) << "downloading state " << block_id_ << ": total=" << sum_; + double elapsed = prev_logged_timer_.elapsed(); + if (elapsed > 10.0) { + prev_logged_timer_ = td::Timer(); + LOG(INFO) << "downloading state " << block_id_ << ": total=" << sum_ << + " (" << double(sum_ - prev_logged_sum_) / elapsed << " B/s)"; + prev_logged_sum_ = sum_; } if (last_part) { diff --git a/validator/net/download-state.hpp b/validator/net/download-state.hpp index d965e3c7..a586f61f 100644 --- a/validator/net/download-state.hpp +++ b/validator/net/download-state.hpp @@ -72,6 +72,9 @@ class DownloadState : public td::actor::Actor { td::BufferSlice state_; std::vector parts_; td::uint64 sum_ = 0; + + td::uint64 prev_logged_sum_ = 0; + td::Timer prev_logged_timer_; }; } // namespace fullnode diff --git a/validator/state-serializer.cpp b/validator/state-serializer.cpp index 63b7b864..daf2de11 100644 --- a/validator/state-serializer.cpp +++ b/validator/state-serializer.cpp @@ -122,14 +122,12 @@ void AsyncStateSerializer::next_iteration() { CHECK(masterchain_handle_->id() == last_block_id_); if (attempt_ < max_attempt() && last_key_block_id_.id.seqno < last_block_id_.id.seqno && need_serialize(masterchain_handle_)) { - if (masterchain_state_.is_null()) { + if (!have_masterchain_state_) { // block next attempts immediately, but send actual request later running_ = true; delay_action( [SelfId = actor_id(this)]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_masterchain_state); }, - // Masterchain is more important and much lighter than shards - // thus lower delay - td::Timestamp::in(td::Random::fast(0, 600))); + td::Timestamp::in(td::Random::fast(0, 3600))); return; } while (next_idx_ < shards_.size()) { @@ -140,8 +138,6 @@ void AsyncStateSerializer::next_iteration() { running_ = true; delay_action( [SelfId = actor_id(this), shard = shards_[next_idx_]]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard); }, - // Shards are less important and heavier than master - // thus higher delay td::Timestamp::in(td::Random::fast(0, 4 * 3600))); return; } @@ -162,7 +158,7 @@ void AsyncStateSerializer::next_iteration() { } if (masterchain_handle_->inited_next_left()) { last_block_id_ = masterchain_handle_->one_next(true); - masterchain_state_ = td::Ref{}; + have_masterchain_state_ = false; masterchain_handle_ = nullptr; saved_to_db_ = false; shards_.clear(); @@ -186,25 +182,25 @@ void AsyncStateSerializer::got_masterchain_handle(BlockHandle handle) { } void AsyncStateSerializer::got_masterchain_state(td::Ref state) { - masterchain_state_ = state; + have_masterchain_state_ = true; CHECK(next_idx_ == 0); CHECK(shards_.size() == 0); - auto vec = masterchain_state_->get_shards(); - shards_.push_back(masterchain_handle_->id()); + auto vec = state->get_shards(); for (auto &v : vec) { shards_.push_back(v->top_block_id()); } - auto B = masterchain_state_->serialize(); - B.ensure(); + auto write_data = [state] (td::FileFd& fd) { + return state->serialize_to_file(fd); + }; auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result R) { R.ensure(); td::actor::send_closure(SelfId, &AsyncStateSerializer::stored_masterchain_state); }); - td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file, masterchain_handle_->id(), - masterchain_handle_->id(), B.move_as_ok(), std::move(P)); + td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, masterchain_handle_->id(), + masterchain_handle_->id(), write_data, std::move(P)); } void AsyncStateSerializer::stored_masterchain_state() { @@ -225,13 +221,15 @@ void AsyncStateSerializer::got_shard_handle(BlockHandle handle) { } void AsyncStateSerializer::got_shard_state(BlockHandle handle, td::Ref state) { - auto B = state->serialize().move_as_ok(); + auto write_data = [state] (td::FileFd& fd) { + return state->serialize_to_file(fd); + }; auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result R) { R.ensure(); td::actor::send_closure(SelfId, &AsyncStateSerializer::success_handler); }); - td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file, handle->id(), - masterchain_handle_->id(), std::move(B), std::move(P)); + td::actor::send_closure(manager_, &ValidatorManager::store_persistent_state_file_gen, handle->id(), + masterchain_handle_->id(), write_data, std::move(P)); LOG(INFO) << "storing persistent state for " << masterchain_handle_->id().seqno() << ":" << handle->id().id.shard; next_idx_++; } diff --git a/validator/state-serializer.hpp b/validator/state-serializer.hpp index 162e96c1..14261df7 100644 --- a/validator/state-serializer.hpp +++ b/validator/state-serializer.hpp @@ -43,7 +43,7 @@ class AsyncStateSerializer : public td::actor::Actor { td::uint32 next_idx_ = 0; BlockHandle masterchain_handle_; - td::Ref masterchain_state_; + bool have_masterchain_state_ = false; std::vector shards_; diff --git a/validator/validator.h b/validator/validator.h index 349824d6..46110c92 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -96,7 +96,7 @@ struct ValidatorManagerOptions : public td::CntObject { BlockIdExt zero_block_id, BlockIdExt init_block_id, std::function check_shard = [](ShardIdFull, CatchainSeqno, ShardCheckMode) { return true; }, - bool allow_blockchain_init = false, double sync_blocks_before = 300, double block_ttl = 86400 * 7, + bool allow_blockchain_init = false, double sync_blocks_before = 86400, double block_ttl = 86400 * 7, double state_ttl = 3600, double archive_ttl = 86400 * 365, double key_proof_ttl = 86400 * 3650, double max_mempool_num = 999999, bool initial_sync_disabled = false);