1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Collect statistics for .pack files (#944)

* Statistics for .pack files

* optimizations

* fix typo

* fix erasing packages
This commit is contained in:
Marat 2024-03-27 12:23:11 +01:00 committed by GitHub
parent b07614335c
commit 10487b1c71
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 155 additions and 18 deletions

View file

@ -29,29 +29,151 @@ namespace ton {
namespace validator {
class PackageStatistics {
public:
void record_open(uint64_t count = 1) {
open_count.fetch_add(count, std::memory_order_relaxed);
}
void record_close(uint64_t count = 1) {
close_count.fetch_add(count, std::memory_order_relaxed);
}
void record_read(double time, uint64_t bytes) {
read_bytes.fetch_add(bytes, std::memory_order_relaxed);
std::lock_guard<std::mutex> guard(read_mutex);
read_time.insert(time);
read_time_sum += time;
}
void record_write(double time, uint64_t bytes) {
write_bytes.fetch_add(bytes, std::memory_order_relaxed);
std::lock_guard<std::mutex> guard(write_mutex);
write_time.insert(time);
write_time_sum += time;
}
std::string to_string_and_reset() {
std::stringstream ss;
ss.setf(std::ios::fixed);
ss.precision(6);
ss << "ton.pack.open COUNT : " << open_count.exchange(0, std::memory_order_relaxed) << "\n";
ss << "ton.pack.close COUNT : " << close_count.exchange(0, std::memory_order_relaxed) << "\n";
ss << "ton.pack.read.bytes COUNT : " << read_bytes.exchange(0, std::memory_order_relaxed) << "\n";
ss << "ton.pack.write.bytes COUNT : " << write_bytes.exchange(0, std::memory_order_relaxed) << "\n";
std::multiset<double> temp_read_time;
double temp_read_time_sum;
{
std::lock_guard<std::mutex> guard(read_mutex);
temp_read_time = std::move(read_time);
read_time.clear();
temp_read_time_sum = read_time_sum;
read_time_sum = 0;
}
auto read_stats = calculate_statistics(temp_read_time);
ss << "ton.pack.read.micros P50 : " << read_stats[0] <<
" P95 : " << read_stats[1] <<
" P99 : " << read_stats[2] <<
" P100 : " << read_stats[3] <<
" COUNT : " << temp_read_time.size() <<
" SUM : " << temp_read_time_sum << "\n";
std::multiset<double> temp_write_time;
double temp_write_time_sum;
{
std::lock_guard<std::mutex> guard(write_mutex);
temp_write_time = std::move(write_time);
write_time.clear();
temp_write_time_sum = write_time_sum;
write_time_sum = 0;
}
auto write_stats = calculate_statistics(temp_write_time);
ss << "ton.pack.write.micros P50 : " << write_stats[0] <<
" P95 : " << write_stats[1] <<
" P99 : " << write_stats[2] <<
" P100 : " << write_stats[3] <<
" COUNT : " << temp_write_time.size() <<
" SUM : " << temp_write_time_sum << "\n";
return ss.str();
}
private:
std::atomic_uint64_t open_count;
std::atomic_uint64_t close_count;
std::multiset<double> read_time;
std::atomic_uint64_t read_bytes;
std::multiset<double> write_time;
std::atomic_uint64_t write_bytes;
double read_time_sum;
double write_time_sum;
mutable std::mutex read_mutex;
mutable std::mutex write_mutex;
std::vector<double> calculate_statistics(const std::multiset<double>& data) const {
if (data.empty()) return {0, 0, 0, 0};
auto size = data.size();
auto calc_percentile = [&](double p) -> double {
auto it = data.begin();
std::advance(it, static_cast<int>(std::ceil(p * double(size)) - 1));
return *it;
};
return {calc_percentile(0.5), calc_percentile(0.95), calc_percentile(0.99), *data.rbegin()};
}
};
void DbStatistics::init() {
rocksdb_statistics = td::RocksDb::create_statistics();
pack_statistics = std::make_shared<PackageStatistics>();
}
std::string DbStatistics::to_string_and_reset() {
std::stringstream ss;
ss << td::RocksDb::statistics_to_string(rocksdb_statistics) << pack_statistics->to_string_and_reset();
td::RocksDb::reset_statistics(rocksdb_statistics);
return ss.str();
}
void PackageWriter::append(std::string filename, td::BufferSlice data,
td::Promise<std::pair<td::uint64, td::uint64>> promise) {
td::uint64 offset, size;
auto data_size = data.size();
td::Timestamp start, end;
{
auto p = package_.lock();
if (!p) {
promise.set_error(td::Status::Error("Package is closed"));
return;
}
offset = p->append(std::move(filename), std::move(data), !async_mode_);
start = td::Timestamp::now();
offset = p->append(std::move(filename), std::move(data), !async_mode_);
end = td::Timestamp::now();
size = p->size();
}
if (statistics_) {
statistics_->record_write((end.at() - start.at()) * 1e6, data_size);
}
promise.set_value(std::pair<td::uint64, td::uint64>{offset, size});
}
class PackageReader : public td::actor::Actor {
public:
PackageReader(std::shared_ptr<Package> package, td::uint64 offset,
td::Promise<std::pair<std::string, td::BufferSlice>> promise)
: package_(std::move(package)), offset_(offset), promise_(std::move(promise)) {
td::Promise<std::pair<std::string, td::BufferSlice>> promise, std::shared_ptr<PackageStatistics> statistics)
: package_(std::move(package)), offset_(offset), promise_(std::move(promise)), statistics_(std::move(statistics)) {
}
void start_up() override {
auto start = td::Timestamp::now();
auto result = package_->read(offset_);
if (statistics_ && result.is_ok()) {
statistics_->record_read((td::Timestamp::now().at() - start.at()) * 1e6, result.ok_ref().second.size());
}
package_ = {};
promise_.set_result(std::move(result));
stop();
@ -61,6 +183,7 @@ class PackageReader : public td::actor::Actor {
std::shared_ptr<Package> package_;
td::uint64 offset_;
td::Promise<std::pair<std::string, td::BufferSlice>> promise_;
std::shared_ptr<PackageStatistics> statistics_;
};
void ArchiveSlice::add_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
@ -297,7 +420,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
promise.set_value(std::move(R.move_as_ok().second));
}
});
td::actor::create_actor<PackageReader>("reader", p->package, offset, std::move(P)).release();
td::actor::create_actor<PackageReader>("reader", p->package, offset, std::move(P), statistics_.pack_statistics).release();
}
void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
@ -465,7 +588,7 @@ void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::
void ArchiveSlice::before_query() {
if (status_ == st_closed) {
LOG(DEBUG) << "Opening archive slice " << db_path_;
kv_ = std::make_unique<td::RocksDb>(td::RocksDb::open(db_path_, statistics_).move_as_ok());
kv_ = std::make_unique<td::RocksDb>(td::RocksDb::open(db_path_, statistics_.rocksdb_statistics).move_as_ok());
std::string value;
auto R2 = kv_->get("status", value);
R2.ensure();
@ -547,6 +670,7 @@ void ArchiveSlice::do_close() {
LOG(DEBUG) << "Closing archive slice " << db_path_;
status_ = st_closed;
kv_ = {};
statistics_.pack_statistics->record_close(packages_.size());
packages_.clear();
}
@ -604,7 +728,7 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
}
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root,
td::actor::ActorId<ArchiveLru> archive_lru, std::shared_ptr<rocksdb::Statistics> statistics)
td::actor::ActorId<ArchiveLru> archive_lru, DbStatistics statistics)
: archive_id_(archive_id)
, key_blocks_only_(key_blocks_only)
, temp_(temp)
@ -650,6 +774,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error();
return;
}
statistics_.pack_statistics->record_open();
auto idx = td::narrow_cast<td::uint32>(packages_.size());
if (finalized_) {
packages_.emplace_back(nullptr, td::actor::ActorOwn<PackageWriter>(), seqno, path, idx, version);
@ -659,7 +784,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
if (version >= 1) {
pack->truncate(size).ensure();
}
auto writer = td::actor::create_actor<PackageWriter>("writer", pack, async_mode_);
auto writer = td::actor::create_actor<PackageWriter>("writer", pack, async_mode_, statistics_.pack_statistics);
packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, idx, version);
}
@ -692,7 +817,7 @@ void ArchiveSlice::destroy(td::Promise<td::Unit> promise) {
for (auto &p : packages_) {
td::unlink(p.path).ensure();
}
statistics_.pack_statistics->record_close(packages_.size());
packages_.clear();
kv_ = nullptr;
@ -898,7 +1023,8 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) {
td::unlink(packages_[idx].path).ensure();
}
packages_.erase(packages_.begin() + pack->idx + 1);
statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1);
packages_.erase(packages_.begin() + pack->idx + 1, packages_.end());
kv_->commit_transaction().ensure();