mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
serialize boc with bfs (with multiget)
This commit is contained in:
parent
3c245c6146
commit
fbb9954391
14 changed files with 448 additions and 122 deletions
|
@ -205,57 +205,105 @@ td::Status BagOfCells::import_cells() {
|
|||
return td::Status::OK();
|
||||
}
|
||||
|
||||
// Changes in this function may require corresponding changes in crypto/vm/large-boc-serializer.cpp
|
||||
td::Result<int> BagOfCells::import_cell(td::Ref<vm::Cell> cell, int depth) {
|
||||
if (depth > max_depth) {
|
||||
return td::Status::Error("error while importing a cell into a bag of cells: cell depth too large");
|
||||
td::Result<std::vector<vm::Cell::LoadedCell>> BagOfCells::load_cells(const std::vector<td::Ref<vm::Cell>>& batch) {
|
||||
if (reader_) {
|
||||
TRY_RESULT(data_cells, reader_->load_bulk(td::transform(batch, [](const auto& cell) { return cell->get_hash().as_slice(); })));
|
||||
return td::transform(data_cells, [](auto& data_cell) { return vm::Cell::LoadedCell{std::move(data_cell), {}, {}}; });
|
||||
}
|
||||
if (cell.is_null()) {
|
||||
std::vector<vm::Cell::LoadedCell> res;
|
||||
res.reserve(batch.size());
|
||||
for (auto& cell : batch) {
|
||||
TRY_RESULT(loaded_dc, cell->load_cell());
|
||||
res.push_back(std::move(loaded_dc));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
// Changes in this function may require corresponding changes in crypto/vm/large-boc-serializer.cpp
|
||||
td::Result<int> BagOfCells::import_cell(td::Ref<vm::Cell> root_cell, int root_depth) {
|
||||
if (root_cell.is_null()) {
|
||||
return td::Status::Error("error while importing a cell into a bag of cells: cell is null");
|
||||
}
|
||||
if (logger_ptr_) {
|
||||
TRY_STATUS(logger_ptr_->on_cell_processed());
|
||||
|
||||
const int start_ind = cell_count;
|
||||
td::HashMap<CellHash, std::vector<CellHash>> child_hashes_map;
|
||||
std::vector<td::Ref<vm::Cell>> current_batch;
|
||||
current_batch.push_back(root_cell);
|
||||
int current_depth = root_depth;
|
||||
|
||||
while (!current_batch.empty()) {
|
||||
if (current_depth > max_depth) {
|
||||
return td::Status::Error("error while importing a cell into a bag of cells: cell depth too large");
|
||||
}
|
||||
auto it = cells.find(cell->get_hash());
|
||||
if (it != cells.end()) {
|
||||
auto pos = it->second;
|
||||
cell_list_[pos].should_cache = true;
|
||||
return pos;
|
||||
}
|
||||
if (cell->get_virtualization() != 0) {
|
||||
|
||||
std::vector<td::Ref<vm::Cell>> next_batch;
|
||||
TRY_RESULT_PREFIX(loaded_cells, load_cells(current_batch), "error while importing a cell into a bag of cells: ");
|
||||
DCHECK(loaded_cells.size() == current_batch.size());
|
||||
|
||||
for (size_t i = 0; i < current_batch.size(); ++i) {
|
||||
auto& cell = loaded_cells[i];
|
||||
if (cell.data_cell->get_virtualization() != 0) {
|
||||
return td::Status::Error(
|
||||
"error while importing a cell into a bag of cells: cell has non-zero virtualization level");
|
||||
}
|
||||
auto r_loaded_dc = cell->load_cell();
|
||||
if (r_loaded_dc.is_error()) {
|
||||
return td::Status::Error("error while importing a cell into a bag of cells: " +
|
||||
r_loaded_dc.move_as_error().to_string());
|
||||
|
||||
const auto hash = cell.data_cell->get_hash();
|
||||
auto existing_it = cells.find(hash);
|
||||
if (existing_it != cells.end()) {
|
||||
cell_list_[existing_it->second].should_cache = true;
|
||||
continue;
|
||||
}
|
||||
auto loaded_dc = r_loaded_dc.move_as_ok();
|
||||
CellSlice cs(std::move(loaded_dc));
|
||||
std::array<int, 4> refs{-1};
|
||||
|
||||
CellSlice cs(std::move(cell));
|
||||
std::vector<CellHash> child_hashes;
|
||||
DCHECK(cs.size_refs() <= 4);
|
||||
unsigned sum_child_wt = 1;
|
||||
for (unsigned i = 0; i < cs.size_refs(); i++) {
|
||||
auto ref = import_cell(cs.prefetch_ref(i), depth + 1);
|
||||
if (ref.is_error()) {
|
||||
return ref.move_as_error();
|
||||
for (unsigned j = 0; j < cs.size_refs(); j++) {
|
||||
auto child = cs.prefetch_ref(j);
|
||||
const auto child_hash = child->get_hash();
|
||||
child_hashes.push_back(child_hash);
|
||||
|
||||
next_batch.push_back(child);
|
||||
}
|
||||
refs[i] = ref.move_as_ok();
|
||||
sum_child_wt += cell_list_[refs[i]].wt;
|
||||
++int_refs;
|
||||
}
|
||||
DCHECK(cell_list_.size() == static_cast<std::size_t>(cell_count));
|
||||
child_hashes_map[hash] = std::move(child_hashes);
|
||||
|
||||
auto dc = cs.move_as_loaded_cell().data_cell;
|
||||
auto res = cells.emplace(dc->get_hash(), cell_count);
|
||||
DCHECK(res.second);
|
||||
cell_list_.emplace_back(dc, dc->size_refs(), refs);
|
||||
cells.emplace(hash, cell_count);
|
||||
cell_list_.emplace_back(dc, dc->size_refs(), std::array<int, 4>{-1, -1, -1, -1});
|
||||
CellInfo& dc_info = cell_list_.back();
|
||||
dc_info.hcnt = static_cast<unsigned char>(dc->get_level_mask().get_hashes_count());
|
||||
dc_info.wt = static_cast<unsigned char>(std::min(0xffU, sum_child_wt));
|
||||
dc_info.wt = 0; // will be calculated after traversing
|
||||
dc_info.new_idx = -1;
|
||||
data_bytes += dc->get_serialized_size();
|
||||
return cell_count++;
|
||||
cell_count++;
|
||||
}
|
||||
if (logger_ptr_) {
|
||||
TRY_STATUS(logger_ptr_->on_cells_processed(current_batch.size()));
|
||||
}
|
||||
|
||||
current_batch = std::move(next_batch);
|
||||
next_batch.clear();
|
||||
current_depth++;
|
||||
}
|
||||
|
||||
for (int idx = cell_count - 1; idx >= start_ind; --idx) {
|
||||
CellInfo& cell_info = cell_list_[idx];
|
||||
const auto& child_hashes = child_hashes_map[cell_info.dc_ref->get_hash()];
|
||||
unsigned sum_child_wt = 1;
|
||||
|
||||
for (size_t j = 0; j < child_hashes.size(); ++j) {
|
||||
const auto child_it = cells.find(child_hashes[j]);
|
||||
DCHECK(child_it != cells.end());
|
||||
cell_info.ref_idx[j] = child_it->second;
|
||||
sum_child_wt += cell_list_[child_it->second].wt;
|
||||
++int_refs;
|
||||
}
|
||||
|
||||
cell_info.wt = static_cast<unsigned char>(std::min(0xffU, sum_child_wt));
|
||||
}
|
||||
|
||||
auto root_it = cells.find(root_cell->get_hash());
|
||||
DCHECK(root_it != cells.end());
|
||||
return root_it->second;
|
||||
}
|
||||
|
||||
// Changes in this function may require corresponding changes in crypto/vm/large-boc-serializer.cpp
|
||||
|
@ -560,7 +608,7 @@ td::Result<std::size_t> BagOfCells::serialize_to_impl(WriterT& writer, int mode)
|
|||
}
|
||||
store_offset(fixed_offset);
|
||||
if (logger_ptr_) {
|
||||
TRY_STATUS(logger_ptr_->on_cell_processed());
|
||||
TRY_STATUS(logger_ptr_->on_cells_processed(1));
|
||||
}
|
||||
}
|
||||
if (logger_ptr_) {
|
||||
|
@ -593,7 +641,7 @@ td::Result<std::size_t> BagOfCells::serialize_to_impl(WriterT& writer, int mode)
|
|||
}
|
||||
// std::cerr << std::endl;
|
||||
if (logger_ptr_) {
|
||||
TRY_STATUS(logger_ptr_->on_cell_processed());
|
||||
TRY_STATUS(logger_ptr_->on_cells_processed(1));
|
||||
}
|
||||
}
|
||||
writer.chk();
|
||||
|
@ -1015,6 +1063,20 @@ td::Result<td::BufferSlice> std_boc_serialize(Ref<Cell> root, int mode) {
|
|||
return boc.serialize_to_slice(mode);
|
||||
}
|
||||
|
||||
td::Result<td::BufferSlice> std_boc_serialize_with_reader(std::shared_ptr<CellDbReader> reader, Ref<Cell> root, int mode) {
|
||||
if (root.is_null()) {
|
||||
return td::Status::Error("cannot serialize a null cell reference into a bag of cells");
|
||||
}
|
||||
BagOfCells boc;
|
||||
boc.set_reader(std::move(reader));
|
||||
boc.add_root(std::move(root));
|
||||
auto res = boc.import_cells();
|
||||
if (res.is_error()) {
|
||||
return res.move_as_error();
|
||||
}
|
||||
return boc.serialize_to_slice(mode);
|
||||
}
|
||||
|
||||
td::Result<td::BufferSlice> std_boc_serialize_multi(std::vector<Ref<Cell>> roots, int mode) {
|
||||
if (roots.empty()) {
|
||||
return td::BufferSlice{};
|
||||
|
|
|
@ -210,6 +210,7 @@ class BagOfCellsLogger {
|
|||
|
||||
void start_stage(std::string stage) {
|
||||
log_speed_at_ = td::Timestamp::in(LOG_SPEED_PERIOD);
|
||||
last_speed_log_ = td::Timestamp::now();
|
||||
processed_cells_ = 0;
|
||||
timer_ = {};
|
||||
stage_ = std::move(stage);
|
||||
|
@ -217,15 +218,19 @@ class BagOfCellsLogger {
|
|||
void finish_stage(td::Slice desc) {
|
||||
LOG(ERROR) << "serializer: " << stage_ << " took " << timer_.elapsed() << "s, " << desc;
|
||||
}
|
||||
td::Status on_cell_processed() {
|
||||
++processed_cells_;
|
||||
if (processed_cells_ % 1000 == 0) {
|
||||
td::Status on_cells_processed(size_t count) {
|
||||
processed_cells_ += count;
|
||||
if (processed_cells_ / 1000 > last_token_check_) {
|
||||
TRY_STATUS(cancellation_token_.check());
|
||||
last_token_check_ = processed_cells_ / 1000;
|
||||
}
|
||||
if (log_speed_at_.is_in_past()) {
|
||||
log_speed_at_ += LOG_SPEED_PERIOD;
|
||||
LOG(WARNING) << "serializer: " << stage_ << " " << (double)processed_cells_ / LOG_SPEED_PERIOD << " cells/s";
|
||||
double period = td::Timestamp::now().at() - last_speed_log_.at();
|
||||
|
||||
LOG(WARNING) << "serializer: " << stage_ << " " << (double)processed_cells_ / period << " cells/s";
|
||||
processed_cells_ = 0;
|
||||
last_speed_log_ = td::Timestamp::now();
|
||||
log_speed_at_ = td::Timestamp::in(LOG_SPEED_PERIOD);
|
||||
}
|
||||
return td::Status::OK();
|
||||
}
|
||||
|
@ -236,6 +241,8 @@ class BagOfCellsLogger {
|
|||
td::CancellationToken cancellation_token_;
|
||||
td::Timestamp log_speed_at_;
|
||||
size_t processed_cells_ = 0;
|
||||
size_t last_token_check_ = 0;
|
||||
td::Timestamp last_speed_log_;
|
||||
static constexpr double LOG_SPEED_PERIOD = 120.0;
|
||||
};
|
||||
class BagOfCells {
|
||||
|
@ -323,6 +330,7 @@ class BagOfCells {
|
|||
const unsigned char* data_ptr{nullptr};
|
||||
std::vector<unsigned long long> custom_index;
|
||||
BagOfCellsLogger* logger_ptr_{nullptr};
|
||||
std::shared_ptr<CellDbReader> reader_{nullptr};
|
||||
|
||||
public:
|
||||
void clear();
|
||||
|
@ -335,6 +343,9 @@ class BagOfCells {
|
|||
void set_logger(BagOfCellsLogger* logger_ptr) {
|
||||
logger_ptr_ = logger_ptr;
|
||||
}
|
||||
void set_reader(std::shared_ptr<CellDbReader> reader) {
|
||||
reader_ = std::move(reader);
|
||||
}
|
||||
std::size_t estimate_serialized_size(int mode = 0);
|
||||
td::Status serialize(int mode = 0);
|
||||
td::string serialize_to_string(int mode = 0);
|
||||
|
@ -362,6 +373,7 @@ class BagOfCells {
|
|||
|
||||
private:
|
||||
int rv_idx;
|
||||
td::Result<std::vector<vm::Cell::LoadedCell>> load_cells(const std::vector<td::Ref<vm::Cell>>& batch);
|
||||
td::Result<int> import_cell(td::Ref<vm::Cell> cell, int depth);
|
||||
void cells_clear() {
|
||||
cell_count = 0;
|
||||
|
@ -383,6 +395,7 @@ class BagOfCells {
|
|||
|
||||
td::Result<Ref<Cell>> std_boc_deserialize(td::Slice data, bool can_be_empty = false, bool allow_nonzero_level = false);
|
||||
td::Result<td::BufferSlice> std_boc_serialize(Ref<Cell> root, int mode = 0);
|
||||
td::Result<td::BufferSlice> std_boc_serialize_with_reader(std::shared_ptr<CellDbReader> reader, Ref<Cell> root, int mode = 0);
|
||||
|
||||
td::Result<std::vector<Ref<Cell>>> std_boc_deserialize_multi(td::Slice data,
|
||||
int max_roots = BagOfCells::default_max_roots);
|
||||
|
|
|
@ -166,6 +166,28 @@ td::Result<CellLoader::LoadResult> CellLoader::load(td::Slice hash, bool need_da
|
|||
return res;
|
||||
}
|
||||
|
||||
td::Result<std::vector<CellLoader::LoadResult>> CellLoader::load_bulk(td::Span<td::Slice> hashes, bool need_data,
|
||||
ExtCellCreator &ext_cell_creator) {
|
||||
std::vector<std::string> values;
|
||||
TRY_RESULT(get_statuses, reader_->get_multi(hashes, &values));
|
||||
std::vector<LoadResult> res;
|
||||
res.reserve(hashes.size());
|
||||
for (size_t i = 0; i < hashes.size(); i++) {
|
||||
auto get_status = get_statuses[i];
|
||||
if (get_status != KeyValue::GetStatus::Ok) {
|
||||
DCHECK(get_status == KeyValue::GetStatus::NotFound);
|
||||
res.push_back(LoadResult{});
|
||||
continue;
|
||||
}
|
||||
TRY_RESULT(load_res, load(hashes[i], values[i], need_data, ext_cell_creator));
|
||||
if (on_load_callback_) {
|
||||
on_load_callback_(load_res);
|
||||
}
|
||||
res.push_back(std::move(load_res));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
td::Result<CellLoader::LoadResult> CellLoader::load(td::Slice hash, td::Slice value, bool need_data,
|
||||
ExtCellCreator &ext_cell_creator) {
|
||||
LoadResult res;
|
||||
|
|
|
@ -49,6 +49,7 @@ class CellLoader {
|
|||
};
|
||||
CellLoader(std::shared_ptr<KeyValueReader> reader, std::function<void(const LoadResult &)> on_load_callback = {});
|
||||
td::Result<LoadResult> load(td::Slice hash, bool need_data, ExtCellCreator &ext_cell_creator);
|
||||
td::Result<std::vector<LoadResult>> load_bulk(td::Span<td::Slice> hashes, bool need_data, ExtCellCreator &ext_cell_creator);
|
||||
static td::Result<LoadResult> load(td::Slice hash, td::Slice value, bool need_data, ExtCellCreator &ext_cell_creator);
|
||||
td::Result<LoadResult> load_refcnt(td::Slice hash); // This only loads refcnt_, cell_ == null
|
||||
|
||||
|
|
|
@ -100,22 +100,15 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
|
|||
return get_cell_info_lazy(level_mask, hash, depth).cell;
|
||||
}
|
||||
td::Result<Ref<DataCell>> load_cell(td::Slice hash) override {
|
||||
auto info = hash_table_.get_if_exists(hash);
|
||||
if (info && info->sync_with_db) {
|
||||
TRY_RESULT(loaded_cell, info->cell->load_cell());
|
||||
TRY_RESULT(loaded_cell, get_cell_info_force(hash).cell->load_cell());
|
||||
return std::move(loaded_cell.data_cell);
|
||||
}
|
||||
TRY_RESULT(res, loader_->load(hash, true, *this));
|
||||
if (res.status != CellLoader::LoadResult::Ok) {
|
||||
return td::Status::Error("cell not found");
|
||||
}
|
||||
Ref<DataCell> cell = res.cell();
|
||||
hash_table_.apply(hash, [&](CellInfo &info) { update_cell_info_loaded(info, hash, std::move(res)); });
|
||||
return cell;
|
||||
}
|
||||
td::Result<Ref<DataCell>> load_root(td::Slice hash) override {
|
||||
return load_cell(hash);
|
||||
}
|
||||
td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
|
||||
return td::Status::Error("Not implemented");
|
||||
}
|
||||
td::Result<Ref<DataCell>> load_root_thread_safe(td::Slice hash) const override {
|
||||
return td::Status::Error("Not implemented");
|
||||
}
|
||||
|
@ -155,6 +148,9 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
|
|||
promise->set_result(std::move(cell));
|
||||
});
|
||||
}
|
||||
CellInfo &get_cell_info_force(td::Slice hash) {
|
||||
return hash_table_.apply(hash, [&](CellInfo &info) { update_cell_info_force(info, hash); });
|
||||
}
|
||||
CellInfo &get_cell_info_lazy(Cell::LevelMask level_mask, td::Slice hash, td::Slice depth) {
|
||||
return hash_table_.apply(hash.substr(hash.size() - Cell::hash_bytes),
|
||||
[&](CellInfo &info) { update_cell_info_lazy(info, level_mask, hash, depth); });
|
||||
|
@ -334,6 +330,23 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat
|
|||
return std::move(load_result.cell());
|
||||
}
|
||||
|
||||
td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
|
||||
if (db_) {
|
||||
return db_->load_bulk(hashes);
|
||||
}
|
||||
TRY_RESULT(load_result, cell_loader_->load_bulk(hashes, true, *this));
|
||||
|
||||
std::vector<Ref<DataCell>> res;
|
||||
res.reserve(load_result.size());
|
||||
for (auto &load_res : load_result) {
|
||||
if (load_res.status != CellLoader::LoadResult::Ok) {
|
||||
return td::Status::Error("cell not found");
|
||||
}
|
||||
res.push_back(std::move(load_res.cell()));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
private:
|
||||
static td::NamedThreadSafeCounter::CounterRef get_thread_safe_counter() {
|
||||
static auto res = td::NamedThreadSafeCounter::get_default().get_counter("DynamicBagOfCellsDbLoader");
|
||||
|
|
|
@ -44,12 +44,14 @@ class CellDbReader {
|
|||
public:
|
||||
virtual ~CellDbReader() = default;
|
||||
virtual td::Result<Ref<DataCell>> load_cell(td::Slice hash) = 0;
|
||||
virtual td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) = 0;
|
||||
};
|
||||
|
||||
class DynamicBagOfCellsDb {
|
||||
public:
|
||||
virtual ~DynamicBagOfCellsDb() = default;
|
||||
virtual td::Result<Ref<DataCell>> load_cell(td::Slice hash) = 0;
|
||||
virtual td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) = 0;
|
||||
virtual td::Result<Ref<DataCell>> load_root(td::Slice hash) = 0;
|
||||
virtual td::Result<Ref<DataCell>> load_root_thread_safe(td::Slice hash) const = 0;
|
||||
struct Stats {
|
||||
|
|
|
@ -461,6 +461,16 @@ class CellStorage {
|
|||
return td::Status::Error("not found");
|
||||
}
|
||||
|
||||
td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<CellHash> hashes) const {
|
||||
std::vector<Ref<DataCell>> res;
|
||||
res.reserve(hashes.size());
|
||||
for (auto &hash : hashes) {
|
||||
TRY_RESULT(cell, load_cell(hash));
|
||||
res.push_back(std::move(cell));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
td::Result<Ref<DataCell>> load_root_local(const CellHash &hash) const {
|
||||
auto lock = local_access_.lock();
|
||||
if (auto it = local_roots_.find(hash); it != local_roots_.end()) {
|
||||
|
@ -769,6 +779,9 @@ class InMemoryBagOfCellsDb : public DynamicBagOfCellsDb {
|
|||
td::Result<Ref<DataCell>> load_root(td::Slice hash) override {
|
||||
return storage_->load_root_local(CellHash::from_slice(hash));
|
||||
}
|
||||
td::Result<std::vector<Ref<DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
|
||||
return storage_->load_bulk(td::transform(hashes, [](auto &hash) { return CellHash::from_slice(hash); }));
|
||||
}
|
||||
td::Result<Ref<DataCell>> load_root_thread_safe(td::Slice hash) const override {
|
||||
return storage_->load_root_shared(CellHash::from_slice(hash));
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ namespace {
|
|||
class LargeBocSerializer {
|
||||
public:
|
||||
using Hash = Cell::Hash;
|
||||
constexpr static int load_batch_size = 4'000'000;
|
||||
|
||||
explicit LargeBocSerializer(std::shared_ptr<CellDbReader> reader) : reader(std::move(reader)) {
|
||||
}
|
||||
|
@ -46,7 +47,6 @@ class LargeBocSerializer {
|
|||
private:
|
||||
std::shared_ptr<CellDbReader> reader;
|
||||
struct CellInfo {
|
||||
Cell::Hash hash;
|
||||
std::array<int, 4> ref_idx;
|
||||
int idx;
|
||||
unsigned short serialized_size;
|
||||
|
@ -111,46 +111,124 @@ td::Status LargeBocSerializer::import_cells() {
|
|||
return td::Status::OK();
|
||||
}
|
||||
|
||||
td::Result<int> LargeBocSerializer::import_cell(Hash hash, int depth) {
|
||||
if (depth > Cell::max_depth) {
|
||||
td::Result<int> LargeBocSerializer::import_cell(Hash root_hash, int root_depth) {
|
||||
const int start_ind = cell_count;
|
||||
td::HashMap<Hash, std::pair<int, bool>> current_depth_hashes;
|
||||
|
||||
auto existing_it = cells.find(root_hash);
|
||||
if (existing_it != cells.end()) {
|
||||
existing_it->second.should_cache = true;
|
||||
} else {
|
||||
current_depth_hashes.emplace(root_hash, std::make_pair(cell_count, false));
|
||||
}
|
||||
int current_depth = root_depth;
|
||||
int next_child_idx = cell_count + 1;
|
||||
while (!current_depth_hashes.empty()) {
|
||||
if (current_depth > Cell::max_depth) {
|
||||
return td::Status::Error("error while importing a cell into a bag of cells: cell depth too large");
|
||||
}
|
||||
if (logger_ptr_) {
|
||||
TRY_STATUS(logger_ptr_->on_cell_processed());
|
||||
|
||||
cell_list.resize(cell_list.size() + current_depth_hashes.size());
|
||||
td::HashMap<Hash, std::pair<int, bool>> next_depth_hashes;
|
||||
auto batch_start = current_depth_hashes.begin();
|
||||
while (batch_start != current_depth_hashes.end()) {
|
||||
std::vector<td::Slice> batch_hashes;
|
||||
batch_hashes.reserve(load_batch_size);
|
||||
std::vector<std::pair<int, bool>*> batch_idxs_should_cache;
|
||||
batch_idxs_should_cache.reserve(load_batch_size);
|
||||
|
||||
while (batch_hashes.size() < load_batch_size && batch_start != current_depth_hashes.end()) {
|
||||
batch_hashes.push_back(batch_start->first.as_slice());
|
||||
batch_idxs_should_cache.push_back(&batch_start->second);
|
||||
++batch_start;
|
||||
}
|
||||
auto it = cells.find(hash);
|
||||
if (it != cells.end()) {
|
||||
it->second.should_cache = true;
|
||||
return it->second.idx;
|
||||
}
|
||||
TRY_RESULT(cell, reader->load_cell(hash.as_slice()));
|
||||
|
||||
TRY_RESULT_PREFIX(loaded_results, reader->load_bulk(batch_hashes),
|
||||
"error while importing a cell into a bag of cells: ");
|
||||
DCHECK(loaded_results.size() == batch_hashes.size());
|
||||
|
||||
for (size_t i = 0; i < loaded_results.size(); ++i) {
|
||||
auto& cell = loaded_results[i];
|
||||
|
||||
if (cell->get_virtualization() != 0) {
|
||||
return td::Status::Error(
|
||||
"error while importing a cell into a bag of cells: cell has non-zero virtualization level");
|
||||
}
|
||||
|
||||
const auto hash = cell->get_hash();
|
||||
CellSlice cs(std::move(cell));
|
||||
std::array<int, 4> refs;
|
||||
std::fill(refs.begin(), refs.end(), -1);
|
||||
|
||||
DCHECK(cs.size_refs() <= 4);
|
||||
unsigned sum_child_wt = 1;
|
||||
for (unsigned i = 0; i < cs.size_refs(); i++) {
|
||||
TRY_RESULT(ref, import_cell(cs.prefetch_ref(i)->get_hash(), depth + 1));
|
||||
refs[i] = ref;
|
||||
sum_child_wt += cell_list[ref]->second.wt;
|
||||
++int_refs;
|
||||
std::array<int, 4> refs{-1, -1, -1, -1};
|
||||
for (unsigned j = 0; j < cs.size_refs(); j++) {
|
||||
auto child = cs.prefetch_ref(j);
|
||||
const auto child_hash = child->get_hash();
|
||||
|
||||
auto existing_global_it = cells.find(child_hash);
|
||||
if (existing_global_it != cells.end()) {
|
||||
existing_global_it->second.should_cache = true;
|
||||
refs[j] = existing_global_it->second.idx;
|
||||
continue;
|
||||
}
|
||||
auto current_depth_it = current_depth_hashes.find(child_hash);
|
||||
if (current_depth_it != current_depth_hashes.end()) {
|
||||
current_depth_it->second.second = true;
|
||||
refs[j] = current_depth_it->second.first;
|
||||
continue;
|
||||
}
|
||||
auto next_depth_it = next_depth_hashes.find(child_hash);
|
||||
if (next_depth_it != next_depth_hashes.end()) {
|
||||
next_depth_it->second.second = true;
|
||||
refs[j] = next_depth_it->second.first;
|
||||
continue;
|
||||
}
|
||||
auto res = next_depth_hashes.emplace(child_hash, std::make_pair(next_child_idx, false));
|
||||
refs[j] = next_child_idx++;
|
||||
}
|
||||
|
||||
auto dc = cs.move_as_loaded_cell().data_cell;
|
||||
auto res = cells.emplace(hash, CellInfo(cell_count, refs));
|
||||
auto idx_should_cache = batch_idxs_should_cache[i];
|
||||
auto res = cells.emplace(hash, CellInfo(idx_should_cache->first, std::move(refs)));
|
||||
DCHECK(res.second);
|
||||
cell_list.push_back(&*res.first);
|
||||
cell_list[idx_should_cache->first] = &*res.first;
|
||||
CellInfo& dc_info = res.first->second;
|
||||
dc_info.wt = (unsigned char)std::min(0xffU, sum_child_wt);
|
||||
unsigned hcnt = dc->get_level_mask().get_hashes_count();
|
||||
DCHECK(hcnt <= 4);
|
||||
dc_info.hcnt = (unsigned char)hcnt;
|
||||
dc_info.should_cache = idx_should_cache->second;
|
||||
dc_info.hcnt = static_cast<unsigned char>(dc->get_level_mask().get_hashes_count());
|
||||
DCHECK(dc_info.hcnt <= 4);
|
||||
dc_info.wt = 0; // will be calculated after traversing
|
||||
TRY_RESULT(serialized_size, td::narrow_cast_safe<unsigned short>(dc->get_serialized_size()));
|
||||
data_bytes += dc_info.serialized_size = serialized_size;
|
||||
return cell_count++;
|
||||
cell_count++;
|
||||
}
|
||||
if (logger_ptr_) {
|
||||
TRY_STATUS(logger_ptr_->on_cells_processed(batch_hashes.size()));
|
||||
}
|
||||
}
|
||||
|
||||
current_depth_hashes = std::move(next_depth_hashes);
|
||||
next_depth_hashes.clear();
|
||||
current_depth++;
|
||||
}
|
||||
DCHECK(next_child_idx == cell_count);
|
||||
|
||||
for (int idx = cell_count - 1; idx >= start_ind; --idx) {
|
||||
CellInfo& cell_info = cell_list[idx]->second;
|
||||
|
||||
unsigned sum_child_wt = 1;
|
||||
for (size_t j = 0; j < cell_info.ref_idx.size(); ++j) {
|
||||
int child_idx = cell_info.ref_idx[j];
|
||||
if (child_idx == -1) {
|
||||
continue;
|
||||
}
|
||||
sum_child_wt += cell_list[child_idx]->second.wt;
|
||||
++int_refs;
|
||||
}
|
||||
cell_info.wt = static_cast<unsigned char>(std::min(0xffU, sum_child_wt));
|
||||
}
|
||||
|
||||
auto root_it = cells.find(root_hash);
|
||||
DCHECK(root_it != cells.end());
|
||||
return root_it->second.idx;
|
||||
}
|
||||
|
||||
void LargeBocSerializer::reorder_cells() {
|
||||
|
@ -386,7 +464,7 @@ td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) {
|
|||
}
|
||||
store_offset(fixed_offset);
|
||||
if (logger_ptr_) {
|
||||
TRY_STATUS(logger_ptr_->on_cell_processed());
|
||||
TRY_STATUS(logger_ptr_->on_cells_processed(1));
|
||||
}
|
||||
}
|
||||
DCHECK(offs == info.data_size);
|
||||
|
@ -399,10 +477,25 @@ td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) {
|
|||
if (logger_ptr_) {
|
||||
logger_ptr_->start_stage("serialize");
|
||||
}
|
||||
for (int i = 0; i < cell_count; ++i) {
|
||||
auto hash = cell_list[cell_count - 1 - i]->first;
|
||||
const auto& dc_info = cell_list[cell_count - 1 - i]->second;
|
||||
TRY_RESULT(dc, reader->load_cell(hash.as_slice()));
|
||||
for (int batch_start = 0; batch_start < cell_count; batch_start += load_batch_size) {
|
||||
int batch_end = std::min(batch_start + static_cast<int>(load_batch_size), cell_count);
|
||||
|
||||
std::vector<td::Slice> batch_hashes;
|
||||
batch_hashes.reserve(batch_end - batch_start);
|
||||
for (int i = batch_start; i < batch_end; ++i) {
|
||||
int cell_index = cell_count - 1 - i;
|
||||
batch_hashes.push_back(cell_list[cell_index]->first.as_slice());
|
||||
}
|
||||
|
||||
TRY_RESULT(batch_cells, reader->load_bulk(std::move(batch_hashes)));
|
||||
|
||||
for (int i = batch_start; i < batch_end; ++i) {
|
||||
int idx_in_batch = i - batch_start;
|
||||
int cell_index = cell_count - 1 - i;
|
||||
|
||||
const auto& dc_info = cell_list[cell_index]->second;
|
||||
auto& dc = batch_cells[idx_in_batch];
|
||||
|
||||
bool with_hash = (mode & Mode::WithIntHashes) && !dc_info.wt;
|
||||
if (dc_info.is_root_cell && (mode & Mode::WithTopHash)) {
|
||||
with_hash = true;
|
||||
|
@ -417,8 +510,9 @@ td::Status LargeBocSerializer::serialize(td::FileFd& fd, int mode) {
|
|||
DCHECK(k > i && k < cell_count);
|
||||
store_ref(k);
|
||||
}
|
||||
}
|
||||
if (logger_ptr_) {
|
||||
TRY_STATUS(logger_ptr_->on_cell_processed());
|
||||
TRY_STATUS(logger_ptr_->on_cells_processed(batch_hashes.size()));
|
||||
}
|
||||
}
|
||||
DCHECK(writer.position() - keep_position == info.data_size);
|
||||
|
|
|
@ -20,14 +20,19 @@
|
|||
#include "td/utils/Status.h"
|
||||
#include "td/utils/Time.h"
|
||||
#include "td/utils/logging.h"
|
||||
#include "td/utils/Span.h"
|
||||
#include <functional>
|
||||
namespace td {
|
||||
|
||||
enum class DbOpenMode { db_primary, db_secondary, db_readonly };
|
||||
|
||||
class KeyValueReader {
|
||||
public:
|
||||
virtual ~KeyValueReader() = default;
|
||||
enum class GetStatus : int32 { Ok, NotFound };
|
||||
|
||||
virtual Result<GetStatus> get(Slice key, std::string &value) = 0;
|
||||
virtual Result<std::vector<GetStatus>> get_multi(td::Span<Slice> keys, std::vector<std::string> *values) = 0;
|
||||
virtual Result<size_t> count(Slice prefix) = 0;
|
||||
virtual Status for_each(std::function<Status(Slice, Slice)> f) {
|
||||
return Status::Error("for_each is not supported");
|
||||
|
@ -45,6 +50,14 @@ class PrefixedKeyValueReader : public KeyValueReader {
|
|||
Result<GetStatus> get(Slice key, std::string &value) override {
|
||||
return reader_->get(PSLICE() << prefix_ << key, value);
|
||||
}
|
||||
Result<std::vector<GetStatus>> get_multi(td::Span<Slice> keys, std::vector<std::string> *values) override {
|
||||
std::vector<Slice> prefixed_keys;
|
||||
prefixed_keys.reserve(keys.size());
|
||||
for (auto &key : keys) {
|
||||
prefixed_keys.push_back(PSLICE() << prefix_ << key);
|
||||
}
|
||||
return reader_->get_multi(prefixed_keys, values);
|
||||
}
|
||||
Result<size_t> count(Slice prefix) override {
|
||||
return reader_->count(PSLICE() << prefix_ << prefix);
|
||||
}
|
||||
|
@ -88,6 +101,14 @@ class PrefixedKeyValue : public KeyValue {
|
|||
Result<GetStatus> get(Slice key, std::string &value) override {
|
||||
return kv_->get(PSLICE() << prefix_ << key, value);
|
||||
}
|
||||
Result<std::vector<GetStatus>> get_multi(td::Span<Slice> keys, std::vector<std::string> *values) override {
|
||||
std::vector<Slice> prefixed_keys;
|
||||
prefixed_keys.reserve(keys.size());
|
||||
for (auto &key : keys) {
|
||||
prefixed_keys.push_back(PSLICE() << prefix_ << key);
|
||||
}
|
||||
return kv_->get_multi(prefixed_keys, values);
|
||||
}
|
||||
Result<size_t> count(Slice prefix) override {
|
||||
return kv_->count(PSLICE() << prefix_ << prefix);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "td/db/MemoryKeyValue.h"
|
||||
|
||||
#include "td/utils/format.h"
|
||||
#include "td/utils/Span.h"
|
||||
|
||||
namespace td {
|
||||
Result<MemoryKeyValue::GetStatus> MemoryKeyValue::get(Slice key, std::string &value) {
|
||||
|
@ -30,6 +31,23 @@ Result<MemoryKeyValue::GetStatus> MemoryKeyValue::get(Slice key, std::string &va
|
|||
return GetStatus::Ok;
|
||||
}
|
||||
|
||||
Result<std::vector<MemoryKeyValue::GetStatus>> MemoryKeyValue::get_multi(td::Span<Slice> keys,
|
||||
std::vector<std::string> *values) {
|
||||
values->resize(keys.size());
|
||||
std::vector<GetStatus> res;
|
||||
for (size_t i = 0; i < keys.size(); i++) {
|
||||
auto it = map_.find(keys[i]);
|
||||
if (it == map_.end()) {
|
||||
res.push_back(GetStatus::NotFound);
|
||||
(*values)[i] = "";
|
||||
} else {
|
||||
res.push_back(GetStatus::Ok);
|
||||
(*values)[i] = it->second;
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
Status MemoryKeyValue::for_each(std::function<Status(Slice, Slice)> f) {
|
||||
for (auto &it : map_) {
|
||||
TRY_STATUS(f(it.first, it.second));
|
||||
|
|
|
@ -25,6 +25,7 @@ namespace td {
|
|||
class MemoryKeyValue : public KeyValue {
|
||||
public:
|
||||
Result<GetStatus> get(Slice key, std::string &value) override;
|
||||
Result<std::vector<GetStatus>> get_multi(td::Span<Slice> keys, std::vector<std::string> *values) override;
|
||||
Status for_each(std::function<Status(Slice, Slice)> f) override;
|
||||
Status for_each_in_range(Slice begin, Slice end, std::function<Status(Slice, Slice)> f) override;
|
||||
Status set(Slice key, Slice value) override;
|
||||
|
|
|
@ -153,6 +153,40 @@ Result<RocksDb::GetStatus> RocksDb::get(Slice key, std::string &value) {
|
|||
return from_rocksdb(status);
|
||||
}
|
||||
|
||||
Result<std::vector<RocksDb::GetStatus>> RocksDb::get_multi(td::Span<Slice> keys, std::vector<std::string> *values) {
|
||||
std::vector<rocksdb::Status> statuses(keys.size());
|
||||
std::vector<rocksdb::Slice> keys_rocksdb;
|
||||
keys_rocksdb.reserve(keys.size());
|
||||
for (auto &key : keys) {
|
||||
keys_rocksdb.push_back(to_rocksdb(key));
|
||||
}
|
||||
std::vector<rocksdb::PinnableSlice> values_rocksdb(keys.size());
|
||||
rocksdb::ReadOptions options;
|
||||
if (snapshot_) {
|
||||
options.snapshot = snapshot_.get();
|
||||
db_->MultiGet(options, db_->DefaultColumnFamily(), keys_rocksdb.size(), keys_rocksdb.data(), values_rocksdb.data(), statuses.data());
|
||||
} else if (transaction_) {
|
||||
transaction_->MultiGet(options, db_->DefaultColumnFamily(), keys_rocksdb.size(), keys_rocksdb.data(), values_rocksdb.data(), statuses.data());
|
||||
} else {
|
||||
db_->MultiGet(options, db_->DefaultColumnFamily(), keys_rocksdb.size(), keys_rocksdb.data(), values_rocksdb.data(), statuses.data());
|
||||
}
|
||||
std::vector<GetStatus> res(statuses.size());
|
||||
values->resize(statuses.size());
|
||||
for (size_t i = 0; i < statuses.size(); i++) {
|
||||
auto &status = statuses[i];
|
||||
if (status.ok()) {
|
||||
res[i] = GetStatus::Ok;
|
||||
values->at(i) = values_rocksdb[i].ToString();
|
||||
} else if (status.code() == rocksdb::Status::kNotFound) {
|
||||
res[i] = GetStatus::NotFound;
|
||||
values->at(i) = "";
|
||||
} else {
|
||||
return from_rocksdb(status);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
Status RocksDb::set(Slice key, Slice value) {
|
||||
if (write_batch_) {
|
||||
return from_rocksdb(write_batch_->Put(to_rocksdb(key), to_rocksdb(value)));
|
||||
|
|
|
@ -72,6 +72,7 @@ class RocksDb : public KeyValue {
|
|||
static Result<RocksDb> open(std::string path, RocksDbOptions options = {});
|
||||
|
||||
Result<GetStatus> get(Slice key, std::string &value) override;
|
||||
Result<std::vector<RocksDb::GetStatus>> get_multi(td::Span<Slice> keys, std::vector<std::string> *values) override;
|
||||
Status set(Slice key, Slice value) override;
|
||||
Status erase(Slice key) override;
|
||||
Result<size_t> count(Slice prefix) override;
|
||||
|
|
|
@ -273,8 +273,38 @@ class CachedCellDbReader : public vm::CellDbReader {
|
|||
}
|
||||
return parent_->load_cell(hash);
|
||||
}
|
||||
td::Result<std::vector<Ref<vm::DataCell>>> load_bulk(td::Span<td::Slice> hashes) override {
|
||||
total_reqs_ += hashes.size();
|
||||
if (!cache_) {
|
||||
++bulk_reqs_;
|
||||
return parent_->load_bulk(hashes);
|
||||
}
|
||||
std::vector<td::Slice> missing_hashes;
|
||||
std::vector<size_t> missing_indices;
|
||||
std::vector<td::Ref<vm::DataCell>> res(hashes.size());
|
||||
for (size_t i = 0; i < hashes.size(); i++) {
|
||||
auto it = cache_->find(hashes[i]);
|
||||
if (it != cache_->end()) {
|
||||
++cached_reqs_;
|
||||
TRY_RESULT(loaded_cell, (*it)->load_cell());
|
||||
res[i] = loaded_cell.data_cell;
|
||||
continue;
|
||||
}
|
||||
missing_hashes.push_back(hashes[i]);
|
||||
missing_indices.push_back(i);
|
||||
}
|
||||
if (missing_hashes.empty()) {
|
||||
return std::move(res);
|
||||
}
|
||||
TRY_RESULT(missing_cells, parent_->load_bulk(missing_hashes));
|
||||
for (size_t i = 0; i < missing_indices.size(); i++) {
|
||||
res[missing_indices[i]] = missing_cells[i];
|
||||
}
|
||||
return res;
|
||||
};
|
||||
void print_stats() const {
|
||||
LOG(WARNING) << "CachedCellDbReader stats : " << total_reqs_ << " reads, " << cached_reqs_ << " cached";
|
||||
LOG(WARNING) << "CachedCellDbReader stats : " << total_reqs_ << " reads, " << cached_reqs_ << " cached, "
|
||||
<< bulk_reqs_ << " bulk reqs";
|
||||
}
|
||||
private:
|
||||
std::shared_ptr<vm::CellDbReader> parent_;
|
||||
|
@ -282,6 +312,7 @@ class CachedCellDbReader : public vm::CellDbReader {
|
|||
|
||||
td::uint64 total_reqs_ = 0;
|
||||
td::uint64 cached_reqs_ = 0;
|
||||
td::uint64 bulk_reqs_ = 0;
|
||||
};
|
||||
|
||||
void AsyncStateSerializer::PreviousStateCache::prepare_cache(ShardIdFull shard) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue