mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Use parallel write to celldb (#1264)
* Parallel write in celldb * Add TD_PERF_COUNTER to gc_cell and store_cell * More error handling * Tests for prepare_commit_async * Install g++11 for ubuntu 20.04 --------- Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
parent
fd1735f6ec
commit
d04cdfa0dc
10 changed files with 582 additions and 127 deletions
|
@ -61,10 +61,81 @@
|
|||
#include "openssl/digest.hpp"
|
||||
#include "vm/dict.h"
|
||||
|
||||
#include <condition_variable>
|
||||
#include <latch>
|
||||
#include <numeric>
|
||||
#include <optional>
|
||||
#include <queue>
|
||||
|
||||
namespace vm {
|
||||
class ThreadExecutor : public DynamicBagOfCellsDb::AsyncExecutor {
|
||||
public:
|
||||
explicit ThreadExecutor(size_t threads_n) {
|
||||
for (size_t i = 0; i < threads_n; ++i) {
|
||||
threads_.emplace_back([this]() {
|
||||
while (true) {
|
||||
auto task = pop_task();
|
||||
if (!task) {
|
||||
break;
|
||||
}
|
||||
CHECK(generation_.load() % 2 == 1);
|
||||
task();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
~ThreadExecutor() override {
|
||||
for (size_t i = 0; i < threads_.size(); ++i) {
|
||||
push_task({});
|
||||
}
|
||||
for (auto &t : threads_) {
|
||||
t.join();
|
||||
}
|
||||
}
|
||||
|
||||
void execute_async(std::function<void()> f) override {
|
||||
push_task(std::move(f));
|
||||
}
|
||||
|
||||
void execute_sync(std::function<void()> f) override {
|
||||
auto x = generation_.load();
|
||||
std::scoped_lock lock(sync_mutex_);
|
||||
CHECK(x == generation_);
|
||||
CHECK(generation_.load() % 2 == 1);
|
||||
f();
|
||||
CHECK(generation_.load() % 2 == 1);
|
||||
}
|
||||
void inc_generation() {
|
||||
generation_.fetch_add(1);
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<size_t> generation_{0};
|
||||
std::queue<std::pair<std::function<void()>, size_t>> queue_;
|
||||
std::mutex queue_mutex_;
|
||||
std::condition_variable cv_;
|
||||
std::mutex sync_mutex_;
|
||||
std::vector<td::thread> threads_;
|
||||
|
||||
std::function<void()> pop_task() {
|
||||
std::unique_lock lock(queue_mutex_);
|
||||
cv_.wait(lock, [&] { return !queue_.empty(); });
|
||||
CHECK(!queue_.empty());
|
||||
auto task = std::move(queue_.front());
|
||||
queue_.pop();
|
||||
CHECK(task.second == generation_);
|
||||
return task.first;
|
||||
}
|
||||
|
||||
void push_task(std::function<void()> task) {
|
||||
{
|
||||
std::scoped_lock lock(queue_mutex_);
|
||||
queue_.emplace(std::move(task), generation_.load());
|
||||
}
|
||||
cv_.notify_one();
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<int> do_get_serialization_modes() {
|
||||
std::vector<int> res;
|
||||
|
@ -890,25 +961,91 @@ TEST(TonDb, InMemoryDynamicBocSimple) {
|
|||
boc = DynamicBagOfCellsDb::create_in_memory(kv.get(), {});
|
||||
}
|
||||
|
||||
void test_dynamic_boc(std::optional<DynamicBagOfCellsDb::CreateInMemoryOptions> o_in_memory) {
|
||||
int VERBOSITY_NAME(boc) = VERBOSITY_NAME(DEBUG) + 10;
|
||||
|
||||
struct BocOptions {
|
||||
std::shared_ptr<ThreadExecutor> async_executor;
|
||||
std::optional<DynamicBagOfCellsDb::CreateInMemoryOptions> o_in_memory;
|
||||
td::uint64 seed{123};
|
||||
|
||||
auto create_dboc(td::KeyValueReader *kv, std::optional<td::int64> o_root_n) {
|
||||
if (o_in_memory) {
|
||||
auto res = DynamicBagOfCellsDb::create_in_memory(kv, *o_in_memory);
|
||||
auto stats = res->get_stats().move_as_ok();
|
||||
if (o_root_n) {
|
||||
ASSERT_EQ(*o_root_n, stats.roots_total_count);
|
||||
}
|
||||
VLOG(boc) << "reset roots_n=" << stats.roots_total_count << " cells_n=" << stats.cells_total_count;
|
||||
return res;
|
||||
}
|
||||
return DynamicBagOfCellsDb::create();
|
||||
};
|
||||
void prepare_commit(DynamicBagOfCellsDb &dboc) {
|
||||
if (async_executor) {
|
||||
async_executor->inc_generation();
|
||||
std::latch latch(1);
|
||||
td::Result<td::Unit> res;
|
||||
async_executor->execute_sync([&] {
|
||||
dboc.prepare_commit_async(async_executor, [&](auto r) {
|
||||
res = std::move(r);
|
||||
latch.count_down();
|
||||
});
|
||||
});
|
||||
latch.wait();
|
||||
async_executor->execute_sync([&] {});
|
||||
async_executor->inc_generation();
|
||||
} else {
|
||||
dboc.prepare_commit();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <class F>
|
||||
void with_all_boc_options(F &&f, size_t tests_n = 500) {
|
||||
LOG(INFO) << "Test dynamic boc";
|
||||
auto counter = [] { return td::NamedThreadSafeCounter::get_default().get_counter("DataCell").sum(); };
|
||||
auto run = [&](BocOptions options) {
|
||||
LOG(INFO) << "\t" << (options.o_in_memory ? "in memory" : "on disk") << (options.async_executor ? " async" : "");
|
||||
if (options.o_in_memory) {
|
||||
LOG(INFO) << "\t\tuse_arena=" << options.o_in_memory->use_arena
|
||||
<< " less_memory=" << options.o_in_memory->use_less_memory_during_creation;
|
||||
}
|
||||
for (td::uint32 i = 0; i < tests_n; i++) {
|
||||
auto before = counter();
|
||||
options.seed = i == 0 ? 123 : i;
|
||||
f(options);
|
||||
auto after = counter();
|
||||
LOG_CHECK((options.o_in_memory && options.o_in_memory->use_arena) || before == after)
|
||||
<< before << " vs " << after;
|
||||
}
|
||||
};
|
||||
run({.async_executor = std::make_shared<ThreadExecutor>(4)});
|
||||
run({});
|
||||
for (auto use_arena : {false, true}) {
|
||||
for (auto less_memory : {false, true}) {
|
||||
run({.o_in_memory =
|
||||
DynamicBagOfCellsDb::CreateInMemoryOptions{.extra_threads = std::thread::hardware_concurrency(),
|
||||
.verbose = false,
|
||||
.use_arena = use_arena,
|
||||
.use_less_memory_during_creation = less_memory}});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void test_dynamic_boc(BocOptions options) {
|
||||
auto counter = [] { return td::NamedThreadSafeCounter::get_default().get_counter("DataCell").sum(); };
|
||||
auto before = counter();
|
||||
SCOPE_EXIT {
|
||||
LOG_CHECK((o_in_memory && o_in_memory->use_arena) || before == counter()) << before << " vs " << counter();
|
||||
;
|
||||
LOG_CHECK((options.o_in_memory && options.o_in_memory->use_arena) || before == counter())
|
||||
<< before << " vs " << counter();
|
||||
};
|
||||
td::Random::Xorshift128plus rnd{123};
|
||||
td::Random::Xorshift128plus rnd{options.seed};
|
||||
std::string old_root_hash;
|
||||
std::string old_root_serialization;
|
||||
auto kv = std::make_shared<td::MemoryKeyValue>();
|
||||
auto create_dboc = [&]() {
|
||||
if (o_in_memory) {
|
||||
auto res = DynamicBagOfCellsDb::create_in_memory(kv.get(), *o_in_memory);
|
||||
auto roots_n = old_root_hash.empty() ? 0 : 1;
|
||||
ASSERT_EQ(roots_n, res->get_stats().ok().roots_total_count);
|
||||
return res;
|
||||
}
|
||||
return DynamicBagOfCellsDb::create();
|
||||
auto roots_n = old_root_hash.empty() ? 0 : 1;
|
||||
return options.create_dboc(kv.get(), roots_n);
|
||||
};
|
||||
auto dboc = create_dboc();
|
||||
dboc->set_loader(std::make_unique<CellLoader>(kv));
|
||||
|
@ -947,51 +1084,28 @@ void test_dynamic_boc(std::optional<DynamicBagOfCellsDb::CreateInMemoryOptions>
|
|||
ASSERT_EQ(0u, kv->count("").ok());
|
||||
}
|
||||
|
||||
template <class F>
|
||||
void with_all_boc_options(F &&f) {
|
||||
LOG(INFO) << "Test dynamic boc";
|
||||
LOG(INFO) << "\ton disk";
|
||||
f({});
|
||||
for (auto use_arena : {false, true}) {
|
||||
for (auto less_memory : {false, true}) {
|
||||
LOG(INFO) << "\tuse_arena=" << use_arena << " less_memory=" << less_memory;
|
||||
f(DynamicBagOfCellsDb::CreateInMemoryOptions{.extra_threads = std::thread::hardware_concurrency(),
|
||||
.verbose = false,
|
||||
.use_arena = use_arena,
|
||||
.use_less_memory_during_creation = less_memory});
|
||||
}
|
||||
}
|
||||
}
|
||||
TEST(TonDb, DynamicBoc) {
|
||||
with_all_boc_options(test_dynamic_boc);
|
||||
with_all_boc_options(test_dynamic_boc, 1);
|
||||
};
|
||||
|
||||
void test_dynamic_boc2(std::optional<DynamicBagOfCellsDb::CreateInMemoryOptions> o_in_memory) {
|
||||
int VERBOSITY_NAME(boc) = VERBOSITY_NAME(DEBUG) + 10;
|
||||
td::Random::Xorshift128plus rnd{123};
|
||||
int total_roots = 10000;
|
||||
int max_roots = 20;
|
||||
void test_dynamic_boc2(BocOptions options) {
|
||||
td::Random::Xorshift128plus rnd{options.seed};
|
||||
|
||||
int total_roots = rnd.fast(1, !rnd.fast(0, 10) * 100 + 10);
|
||||
int max_roots = rnd.fast(1, 20);
|
||||
int last_commit_at = 0;
|
||||
int first_root_id = 0;
|
||||
int last_root_id = 0;
|
||||
auto kv = std::make_shared<td::MemoryKeyValue>();
|
||||
auto create_dboc = [&](td::int64 root_n) {
|
||||
if (o_in_memory) {
|
||||
auto res = DynamicBagOfCellsDb::create_in_memory(kv.get(), *o_in_memory);
|
||||
auto stats = res->get_stats().move_as_ok();
|
||||
ASSERT_EQ(root_n, stats.roots_total_count);
|
||||
VLOG(boc) << "reset roots_n=" << stats.roots_total_count << " cells_n=" << stats.cells_total_count;
|
||||
return res;
|
||||
}
|
||||
return DynamicBagOfCellsDb::create();
|
||||
};
|
||||
auto create_dboc = [&](td::int64 root_n) { return options.create_dboc(kv.get(), root_n); };
|
||||
auto dboc = create_dboc(0);
|
||||
dboc->set_loader(std::make_unique<CellLoader>(kv));
|
||||
|
||||
auto counter = [] { return td::NamedThreadSafeCounter::get_default().get_counter("DataCell").sum(); };
|
||||
auto before = counter();
|
||||
SCOPE_EXIT {
|
||||
LOG_CHECK((o_in_memory && o_in_memory->use_arena) || before == counter()) << before << " vs " << counter();
|
||||
SCOPE_EXIT{
|
||||
// LOG_CHECK((options.o_in_memory && options.o_in_memory->use_arena) || before == counter())
|
||||
// << before << " vs " << counter();
|
||||
};
|
||||
|
||||
std::vector<Ref<Cell>> roots(max_roots);
|
||||
|
@ -1009,7 +1123,7 @@ void test_dynamic_boc2(std::optional<DynamicBagOfCellsDb::CreateInMemoryOptions>
|
|||
if (from_root.is_null()) {
|
||||
VLOG(boc) << " from db";
|
||||
auto from_root_hash = root_hashes[root_id % max_roots];
|
||||
if (o_in_memory && (rnd() % 2 == 0)) {
|
||||
if (rnd() % 2 == 0) {
|
||||
from_root = dboc->load_root(from_root_hash).move_as_ok();
|
||||
} else {
|
||||
from_root = dboc->load_cell(from_root_hash).move_as_ok();
|
||||
|
@ -1041,7 +1155,8 @@ void test_dynamic_boc2(std::optional<DynamicBagOfCellsDb::CreateInMemoryOptions>
|
|||
|
||||
auto commit = [&] {
|
||||
VLOG(boc) << "commit";
|
||||
dboc->prepare_commit();
|
||||
//rnd.fast(0, 1);
|
||||
options.prepare_commit(*dboc);
|
||||
{
|
||||
CellStorer cell_storer(*kv);
|
||||
dboc->commit(cell_storer);
|
||||
|
@ -2147,18 +2262,18 @@ TEST(TonDb, BocRespectsUsageCell) {
|
|||
ASSERT_STREQ(serialization, serialization_of_virtualized_cell);
|
||||
}
|
||||
|
||||
void test_dynamic_boc_respectes_usage_cell(std::optional<vm::DynamicBagOfCellsDb::CreateInMemoryOptions> o_in_memory) {
|
||||
td::Random::Xorshift128plus rnd(123);
|
||||
void test_dynamic_boc_respectes_usage_cell(vm::BocOptions options) {
|
||||
td::Random::Xorshift128plus rnd(options.seed);
|
||||
auto cell = vm::gen_random_cell(20, rnd, true);
|
||||
auto usage_tree = std::make_shared<vm::CellUsageTree>();
|
||||
auto usage_cell = vm::UsageCell::create(cell, usage_tree->root_ptr());
|
||||
|
||||
auto kv = std::make_shared<td::MemoryKeyValue>();
|
||||
auto dboc = o_in_memory ? vm::DynamicBagOfCellsDb::create_in_memory(kv.get(), *o_in_memory)
|
||||
: vm::DynamicBagOfCellsDb::create();
|
||||
auto dboc = options.create_dboc(kv.get(), {});
|
||||
dboc->set_loader(std::make_unique<vm::CellLoader>(kv));
|
||||
dboc->inc(usage_cell);
|
||||
{
|
||||
options.prepare_commit(*dboc);
|
||||
vm::CellStorer cell_storer(*kv);
|
||||
dboc->commit(cell_storer);
|
||||
}
|
||||
|
@ -2171,7 +2286,7 @@ void test_dynamic_boc_respectes_usage_cell(std::optional<vm::DynamicBagOfCellsDb
|
|||
}
|
||||
|
||||
TEST(TonDb, DynamicBocRespectsUsageCell) {
|
||||
vm::with_all_boc_options(test_dynamic_boc_respectes_usage_cell);
|
||||
vm::with_all_boc_options(test_dynamic_boc_respectes_usage_cell, 20);
|
||||
}
|
||||
|
||||
TEST(TonDb, LargeBocSerializer) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue