mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			810 lines
		
	
	
	
		
			23 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			810 lines
		
	
	
	
		
			23 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
    This file is part of TON Blockchain Library.
 | 
						|
 | 
						|
    TON Blockchain Library is free software: you can redistribute it and/or modify
 | 
						|
    it under the terms of the GNU Lesser General Public License as published by
 | 
						|
    the Free Software Foundation, either version 2 of the License, or
 | 
						|
    (at your option) any later version.
 | 
						|
 | 
						|
    TON Blockchain Library is distributed in the hope that it will be useful,
 | 
						|
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
    GNU Lesser General Public License for more details.
 | 
						|
 | 
						|
    You should have received a copy of the GNU Lesser General Public License
 | 
						|
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
    Copyright 2017-2020 Telegram Systems LLP
 | 
						|
*/
 | 
						|
#include "td/utils/tests.h"
 | 
						|
 | 
						|
#include "td/utils/as.h"
 | 
						|
#include "td/utils/base64.h"
 | 
						|
#include "td/utils/benchmark.h"
 | 
						|
#include "td/utils/buffer.h"
 | 
						|
#include "td/utils/crypto.h"
 | 
						|
#include "td/utils/filesystem.h"
 | 
						|
#include "td/utils/Slice.h"
 | 
						|
#include "td/utils/Span.h"
 | 
						|
#include "td/utils/misc.h"
 | 
						|
#include "td/utils/overloaded.h"
 | 
						|
#include "td/utils/optional.h"
 | 
						|
#include "td/utils/port/FileFd.h"
 | 
						|
#include "td/utils/port/path.h"
 | 
						|
#include "td/utils/port/IoSlice.h"
 | 
						|
#include "td/utils/UInt.h"
 | 
						|
#include "td/utils/Variant.h"
 | 
						|
#include "td/utils/VectorQueue.h"
 | 
						|
 | 
						|
#include "td/actor/actor.h"
 | 
						|
 | 
						|
#include "td/db/utils/StreamInterface.h"
 | 
						|
#include "td/db/utils/ChainBuffer.h"
 | 
						|
#include "td/db/utils/CyclicBuffer.h"
 | 
						|
#include "td/db/binlog/BinlogReaderHelper.h"
 | 
						|
 | 
						|
#include "td/db/binlog/Binlog.h"
 | 
						|
 | 
						|
#include <ctime>
 | 
						|
 | 
						|
// Toy Binlog Implementation
 | 
						|
using td::int64;
 | 
						|
using td::MutableSlice;
 | 
						|
using td::Result;
 | 
						|
using td::Slice;
 | 
						|
using td::Status;
 | 
						|
 | 
						|
using RootHash = td::UInt256;
 | 
						|
using FileHash = td::UInt256;
 | 
						|
struct BlockId {
 | 
						|
  int workchain;
 | 
						|
  unsigned seqno;
 | 
						|
  unsigned long long shard;
 | 
						|
};
 | 
						|
 | 
						|
template <class T>
 | 
						|
Result<int64> memcpy_parse(Slice data, T* res) {
 | 
						|
  if (data.size() < sizeof(T)) {
 | 
						|
    return -static_cast<int64>(sizeof(T));
 | 
						|
  }
 | 
						|
  std::memcpy(res, data.data(), sizeof(T));
 | 
						|
  if (res->tag_field != res->tag) {
 | 
						|
    return Status::Error("Tag mismatch");
 | 
						|
  }
 | 
						|
  return sizeof(T);
 | 
						|
}
 | 
						|
template <class T>
 | 
						|
int64 memcpy_serialize(MutableSlice data, const T& res) {
 | 
						|
  if (data.size() < sizeof(T)) {
 | 
						|
    return -static_cast<int64>(sizeof(T));
 | 
						|
  }
 | 
						|
  std::memcpy(data.data(), &res, sizeof(T));
 | 
						|
  return sizeof(T);
 | 
						|
}
 | 
						|
 | 
						|
#pragma pack(push, 4)
 | 
						|
struct LogEventCrc32C {
 | 
						|
  static constexpr unsigned tag = 0x473a830a;
 | 
						|
 | 
						|
  unsigned tag_field;
 | 
						|
  td::uint32 crc32c;
 | 
						|
  LogEventCrc32C() = default;
 | 
						|
  LogEventCrc32C(td::uint32 crc32c) : tag_field(tag), crc32c(crc32c) {
 | 
						|
  }
 | 
						|
  static Result<int64> parse(Slice data, LogEventCrc32C* res) {
 | 
						|
    return memcpy_parse(data, res);
 | 
						|
  }
 | 
						|
  int64 serialize(MutableSlice data) const {
 | 
						|
    return memcpy_serialize(data, *this);
 | 
						|
  }
 | 
						|
  auto key() const {
 | 
						|
    return crc32c;
 | 
						|
  }
 | 
						|
  bool operator==(const LogEventCrc32C& other) const {
 | 
						|
    return key() == other.key();
 | 
						|
  }
 | 
						|
  bool operator!=(const LogEventCrc32C& other) const {
 | 
						|
    return !(*this == other);
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
struct LogEventStart {
 | 
						|
  static constexpr unsigned tag = 0x0442446b;
 | 
						|
  static constexpr unsigned log_type = 0x290100;
 | 
						|
  unsigned tag_field;
 | 
						|
  unsigned type_field;
 | 
						|
  unsigned created_at;
 | 
						|
  unsigned char zerostate_root_hash[32];
 | 
						|
  LogEventStart() = default;
 | 
						|
  LogEventStart(const RootHash& hash, unsigned _now = 0)
 | 
						|
      : tag_field(tag), type_field(log_type), created_at(_now ? _now : (unsigned)std::time(nullptr)) {
 | 
						|
    td::as<RootHash>(zerostate_root_hash) = hash;
 | 
						|
  }
 | 
						|
  static Result<int64> parse(Slice data, LogEventStart* res) {
 | 
						|
    return memcpy_parse(data, res);
 | 
						|
  }
 | 
						|
  int64 serialize(MutableSlice data) const {
 | 
						|
    return memcpy_serialize(data, *this);
 | 
						|
  }
 | 
						|
  auto key() const {
 | 
						|
    return std::make_tuple(tag_field, type_field, created_at, Slice(zerostate_root_hash, 32));
 | 
						|
  }
 | 
						|
  bool operator==(const LogEventStart& other) const {
 | 
						|
    return key() == other.key();
 | 
						|
  }
 | 
						|
  bool operator!=(const LogEventStart& other) const {
 | 
						|
    return !(*this == other);
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
struct LogEventSetZeroState {
 | 
						|
  static constexpr unsigned tag = 0x63ab3cd9;
 | 
						|
  unsigned tag_field;
 | 
						|
  unsigned flags;
 | 
						|
  long long file_size;
 | 
						|
  unsigned char file_hash[32];
 | 
						|
  unsigned char root_hash[32];
 | 
						|
  LogEventSetZeroState() = default;
 | 
						|
  LogEventSetZeroState(const RootHash& rhash, const FileHash& fhash, unsigned long long _fsize, unsigned _flags = 0)
 | 
						|
      : tag_field(tag), flags(_flags), file_size(_fsize) {
 | 
						|
    td::as<FileHash>(file_hash) = fhash;
 | 
						|
    td::as<RootHash>(root_hash) = rhash;
 | 
						|
  }
 | 
						|
  static Result<int64> parse(Slice data, LogEventSetZeroState* res) {
 | 
						|
    return memcpy_parse(data, res);
 | 
						|
  }
 | 
						|
  int64 serialize(MutableSlice data) const {
 | 
						|
    return memcpy_serialize(data, *this);
 | 
						|
  }
 | 
						|
  auto key() const {
 | 
						|
    return std::make_tuple(tag_field, flags, file_size, Slice(file_hash, 32), Slice(root_hash, 32));
 | 
						|
  }
 | 
						|
  bool operator==(const LogEventSetZeroState& other) const {
 | 
						|
    return key() == other.key();
 | 
						|
  }
 | 
						|
  bool operator!=(const LogEventSetZeroState& other) const {
 | 
						|
    return !(*this == other);
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
struct LogEventNewBlock {
 | 
						|
  static constexpr unsigned tag = 0x19f4bc63;
 | 
						|
  unsigned tag_field;
 | 
						|
  unsigned flags;  // lower 8 bits = authority
 | 
						|
  int workchain;
 | 
						|
  unsigned seqno;
 | 
						|
  unsigned long long shard;
 | 
						|
  long long file_size;
 | 
						|
  unsigned char file_hash[32];
 | 
						|
  unsigned char root_hash[32];
 | 
						|
  unsigned char last_bytes[8];
 | 
						|
  LogEventNewBlock() = default;
 | 
						|
  LogEventNewBlock(const BlockId& block, const RootHash& rhash, const FileHash& fhash, unsigned long long _fsize,
 | 
						|
                   unsigned _flags)
 | 
						|
      : tag_field(tag)
 | 
						|
      , flags(_flags)
 | 
						|
      , workchain(block.workchain)
 | 
						|
      , seqno(block.seqno)
 | 
						|
      , shard(block.shard)
 | 
						|
      , file_size(_fsize) {
 | 
						|
    td::as<FileHash>(file_hash) = fhash;
 | 
						|
    td::as<RootHash>(root_hash) = rhash;
 | 
						|
    td::as<unsigned long long>(last_bytes) = 0;
 | 
						|
  }
 | 
						|
  static Result<int64> parse(Slice data, LogEventNewBlock* res) {
 | 
						|
    return memcpy_parse(data, res);
 | 
						|
  }
 | 
						|
  int64 serialize(MutableSlice data) const {
 | 
						|
    return memcpy_serialize(data, *this);
 | 
						|
  }
 | 
						|
  auto key() const {
 | 
						|
    return std::make_tuple(tag_field, flags, workchain, seqno, shard, file_size, Slice(file_hash, 32),
 | 
						|
                           Slice(root_hash, 32), Slice(last_bytes, 8));
 | 
						|
  }
 | 
						|
  bool operator==(const LogEventNewBlock& other) const {
 | 
						|
    return key() == other.key();
 | 
						|
  }
 | 
						|
  bool operator!=(const LogEventNewBlock& other) const {
 | 
						|
    return !(*this == other);
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
struct LogEventNewState {
 | 
						|
  static constexpr unsigned tag = 0x4190a21f;
 | 
						|
  unsigned tag_field;
 | 
						|
  unsigned flags;  // lower 8 bits = authority
 | 
						|
  int workchain;
 | 
						|
  unsigned seqno;
 | 
						|
  unsigned long long shard;
 | 
						|
  long long file_size;
 | 
						|
  unsigned char file_hash[32];
 | 
						|
  unsigned char root_hash[32];
 | 
						|
  unsigned char last_bytes[8];
 | 
						|
  LogEventNewState() = default;
 | 
						|
  LogEventNewState(const BlockId& state, const RootHash& rhash, const FileHash& fhash, unsigned long long _fsize,
 | 
						|
                   unsigned _flags)
 | 
						|
      : tag_field(tag)
 | 
						|
      , flags(_flags)
 | 
						|
      , workchain(state.workchain)
 | 
						|
      , seqno(state.seqno)
 | 
						|
      , shard(state.shard)
 | 
						|
      , file_size(_fsize) {
 | 
						|
    td::as<FileHash>(file_hash) = fhash;
 | 
						|
    td::as<RootHash>(root_hash) = rhash;
 | 
						|
    td::as<unsigned long long>(last_bytes) = 0;
 | 
						|
  }
 | 
						|
  static Result<int64> parse(Slice data, LogEventNewState* res) {
 | 
						|
    return memcpy_parse(data, res);
 | 
						|
  }
 | 
						|
  int64 serialize(MutableSlice data) const {
 | 
						|
    return memcpy_serialize(data, *this);
 | 
						|
  }
 | 
						|
  auto key() const {
 | 
						|
    return std::make_tuple(tag_field, flags, workchain, seqno, shard, file_size, Slice(file_hash, 32),
 | 
						|
                           Slice(root_hash, 32), Slice(last_bytes, 8));
 | 
						|
  }
 | 
						|
  bool operator==(const LogEventNewState& other) const {
 | 
						|
    return key() == other.key();
 | 
						|
  }
 | 
						|
  bool operator!=(const LogEventNewState& other) const {
 | 
						|
    return !(*this == other);
 | 
						|
  }
 | 
						|
};
 | 
						|
#pragma pack(pop)
 | 
						|
 | 
						|
struct LogEventString {
 | 
						|
  static constexpr unsigned tag = 0xabcdabcd;
 | 
						|
 | 
						|
  std::string data;
 | 
						|
 | 
						|
  bool operator==(const LogEventString& other) const {
 | 
						|
    return data == other.data;
 | 
						|
  }
 | 
						|
  bool operator!=(const LogEventString& other) const {
 | 
						|
    return !(*this == other);
 | 
						|
  }
 | 
						|
 | 
						|
  int64 serialize(MutableSlice dest) const {
 | 
						|
    size_t need_size = 8 + data.size();
 | 
						|
    if (dest.size() < need_size) {
 | 
						|
      return -static_cast<int64>(need_size);
 | 
						|
    }
 | 
						|
    dest.truncate(need_size);
 | 
						|
    td::as<unsigned>(dest.data()) = unsigned(tag);
 | 
						|
    td::as<int>(dest.data() + 4) = td::narrow_cast<int>(data.size());
 | 
						|
    dest.substr(8).copy_from(data);
 | 
						|
    return dest.size();
 | 
						|
  }
 | 
						|
 | 
						|
  static Result<int64> parse(Slice data, LogEventString* res) {
 | 
						|
    if (data.size() < 4) {
 | 
						|
      return -4;
 | 
						|
    }
 | 
						|
    unsigned got_tag = td::as<unsigned>(data.data());
 | 
						|
    if (got_tag != tag) {
 | 
						|
      return Status::Error(PSLICE() << "tag mismatch " << td::format::as_hex(got_tag));
 | 
						|
    }
 | 
						|
    data = data.substr(4);
 | 
						|
    if (data.size() < 4) {
 | 
						|
      return -8;
 | 
						|
    }
 | 
						|
    td::int64 length = td::as<td::uint32>(data.data());
 | 
						|
    data = data.substr(4);
 | 
						|
    if (static_cast<int64>(data.size()) < length) {
 | 
						|
      return -length - 8;
 | 
						|
    }
 | 
						|
    res->data = data.substr(0, td::narrow_cast<std::size_t>(length)).str();
 | 
						|
    return length + 8;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
struct LogEvent {
 | 
						|
  td::Variant<LogEventCrc32C, LogEventStart, LogEventString, LogEventNewBlock, LogEventNewState, LogEventSetZeroState>
 | 
						|
      event_{LogEventStart{}};
 | 
						|
 | 
						|
  bool operator==(const LogEvent& other) const {
 | 
						|
    return event_ == other.event_;
 | 
						|
  }
 | 
						|
  bool operator!=(const LogEvent& other) const {
 | 
						|
    return !(*this == other);
 | 
						|
  }
 | 
						|
 | 
						|
  LogEvent() = default;
 | 
						|
  LogEvent(LogEvent&& other) = default;
 | 
						|
  template <class T>
 | 
						|
  LogEvent(T&& e) : event_(std::forward<T>(e)) {
 | 
						|
  }
 | 
						|
 | 
						|
  int64 serialize(MutableSlice data) const {
 | 
						|
    int64 res;
 | 
						|
    event_.visit([&](auto& e) { res = e.serialize(data); });
 | 
						|
    return res;
 | 
						|
  }
 | 
						|
 | 
						|
  static Result<int64> parse(Slice data, LogEvent* res) {
 | 
						|
    if (data.size() < 4) {
 | 
						|
      return -4;
 | 
						|
    }
 | 
						|
    //LOG(ERROR) << td::format::as_hex_dump<4>(data);
 | 
						|
    unsigned got_tag = td::as<unsigned>(data.data());
 | 
						|
    switch (got_tag) {
 | 
						|
      case LogEventCrc32C::tag: {
 | 
						|
        LogEventCrc32C e;
 | 
						|
        TRY_RESULT(x, e.parse(data, &e));
 | 
						|
        if (x >= 0) {
 | 
						|
          res->event_ = e;
 | 
						|
        }
 | 
						|
        return x;
 | 
						|
      }
 | 
						|
      case LogEventStart::tag: {
 | 
						|
        LogEventStart e;
 | 
						|
        TRY_RESULT(x, e.parse(data, &e));
 | 
						|
        if (x >= 0) {
 | 
						|
          res->event_ = e;
 | 
						|
        }
 | 
						|
        return x;
 | 
						|
      }
 | 
						|
      case LogEventSetZeroState::tag: {
 | 
						|
        LogEventSetZeroState e;
 | 
						|
        TRY_RESULT(x, e.parse(data, &e));
 | 
						|
        if (x >= 0) {
 | 
						|
          res->event_ = e;
 | 
						|
        }
 | 
						|
        return x;
 | 
						|
      }
 | 
						|
      case LogEventNewBlock::tag: {
 | 
						|
        LogEventNewBlock e;
 | 
						|
        TRY_RESULT(x, e.parse(data, &e));
 | 
						|
        if (x >= 0) {
 | 
						|
          res->event_ = e;
 | 
						|
        }
 | 
						|
        return x;
 | 
						|
      }
 | 
						|
      case LogEventNewState::tag: {
 | 
						|
        LogEventNewState e;
 | 
						|
        TRY_RESULT(x, e.parse(data, &e));
 | 
						|
        if (x >= 0) {
 | 
						|
          res->event_ = e;
 | 
						|
        }
 | 
						|
        return x;
 | 
						|
      }
 | 
						|
      case LogEventString::tag: {
 | 
						|
        LogEventString e;
 | 
						|
        TRY_RESULT(x, e.parse(data, &e));
 | 
						|
        if (x >= 0) {
 | 
						|
          res->event_ = e;
 | 
						|
        }
 | 
						|
        return x;
 | 
						|
      }
 | 
						|
      default:
 | 
						|
        return Status::Error(PSLICE() << "Unknown tag: " << td::format::as_hex(got_tag));
 | 
						|
    }
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
static td::CSlice test_binlog_path("test.binlog");
 | 
						|
 | 
						|
class BinlogReader : public td::BinlogReaderInterface {
 | 
						|
 public:
 | 
						|
  td::Span<LogEvent> logevents() const {
 | 
						|
    return logevents_;
 | 
						|
  }
 | 
						|
 | 
						|
  td::Result<td::int64> parse(td::Slice data) override {
 | 
						|
    if (data.size() < 4) {
 | 
						|
      return -4;
 | 
						|
    }
 | 
						|
    LogEvent res;
 | 
						|
    TRY_RESULT(size, res.parse(data, &res));
 | 
						|
    if (size > 0) {
 | 
						|
      if (res.event_.get_offset() == res.event_.offset<LogEventCrc32C>()) {
 | 
						|
        auto crc = res.event_.get<LogEventCrc32C>().crc32c;
 | 
						|
        flush_crc();
 | 
						|
        if (crc != crc_) {
 | 
						|
          return Status::Error("Crc mismatch");
 | 
						|
        }
 | 
						|
      } else {
 | 
						|
        logevents_.emplace_back(std::move(res));
 | 
						|
      }
 | 
						|
      lazy_crc_extend(data.substr(0, td::narrow_cast<std::size_t>(size)));
 | 
						|
    }
 | 
						|
    return size;
 | 
						|
  }
 | 
						|
 | 
						|
  td::uint32 crc32c() {
 | 
						|
    flush_crc();
 | 
						|
    return crc_;
 | 
						|
  }
 | 
						|
 | 
						|
  void flush() override {
 | 
						|
    flush_crc();
 | 
						|
  }
 | 
						|
 | 
						|
 private:
 | 
						|
  std::vector<LogEvent> logevents_;
 | 
						|
  td::uint32 crc_{0};
 | 
						|
  td::Slice suffix_;
 | 
						|
 | 
						|
  void flush_crc() {
 | 
						|
    crc_ = td::crc32c_extend(crc_, suffix_);
 | 
						|
    suffix_ = Slice();
 | 
						|
  }
 | 
						|
  void lazy_crc_extend(Slice slice) {
 | 
						|
    if (suffix_.empty()) {
 | 
						|
      suffix_ = slice;
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    if (suffix_.end() == slice.begin()) {
 | 
						|
      suffix_ = Slice(suffix_.begin(), slice.end());
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    flush_crc();
 | 
						|
    suffix_ = slice;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
class RandomBinlog {
 | 
						|
 public:
 | 
						|
  RandomBinlog() {
 | 
						|
    size_t logevent_count = 1000;
 | 
						|
    for (size_t i = 0; i < logevent_count; i++) {
 | 
						|
      add_logevent(create_random_logevent());
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  Slice data() const {
 | 
						|
    return data_;
 | 
						|
  }
 | 
						|
  td::Span<LogEvent> logevents() const {
 | 
						|
    return logevents_;
 | 
						|
  }
 | 
						|
 | 
						|
 private:
 | 
						|
  std::vector<LogEvent> logevents_;
 | 
						|
  std::string data_;
 | 
						|
 | 
						|
  template <class T>
 | 
						|
  void add_logevent(T event) {
 | 
						|
    int64 size = -event.serialize({});
 | 
						|
    std::string data(td::narrow_cast<std::size_t>(size), '\0');
 | 
						|
    int64 new_size = event.serialize(data);
 | 
						|
    CHECK(new_size == size);
 | 
						|
    data_ += data;
 | 
						|
    logevents_.emplace_back(std::move(event));
 | 
						|
  }
 | 
						|
 | 
						|
  LogEvent create_random_logevent() {
 | 
						|
    auto rand_uint256 = [] {
 | 
						|
      td::UInt256 res;
 | 
						|
      td::Random::secure_bytes(as_slice(res));
 | 
						|
      return res;
 | 
						|
    };
 | 
						|
    auto rand_block_id = [] {
 | 
						|
      BlockId res;
 | 
						|
      res.workchain = td::Random::fast(0, 100);
 | 
						|
      res.shard = td::Random::fast(0, 100);
 | 
						|
      res.seqno = td::Random::fast(0, 100);
 | 
						|
      return res;
 | 
						|
    };
 | 
						|
 | 
						|
    auto type = td::Random::fast(0, 4);
 | 
						|
    switch (type) {
 | 
						|
      case 0: {
 | 
						|
        auto size = td::Random::fast(0, 10);
 | 
						|
        LogEventString event;
 | 
						|
        event.data = td::rand_string('a', 'z', size);
 | 
						|
        return event;
 | 
						|
      }
 | 
						|
      case 1: {
 | 
						|
        return LogEventStart(rand_uint256(), 12);
 | 
						|
      }
 | 
						|
      case 2: {
 | 
						|
        return LogEventSetZeroState(rand_uint256(), rand_uint256(), td::Random::fast(0, 1000),
 | 
						|
                                    td::Random::fast(0, 1000));
 | 
						|
      }
 | 
						|
      case 3: {
 | 
						|
        return LogEventNewBlock(rand_block_id(), rand_uint256(), rand_uint256(), 12, 17);
 | 
						|
      }
 | 
						|
      case 4: {
 | 
						|
        return LogEventNewState(rand_block_id(), rand_uint256(), rand_uint256(), 12, 17);
 | 
						|
      }
 | 
						|
    }
 | 
						|
    UNREACHABLE();
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
void test_binlog(td::Slice data, td::optional<td::Span<LogEvent>> events = {}) {
 | 
						|
  auto splitted_binlog = td::rand_split(data);
 | 
						|
 | 
						|
  std::string new_binlog_data;
 | 
						|
 | 
						|
  BinlogReader reader;
 | 
						|
  td::BinlogReaderHelper reader_impl;
 | 
						|
  for (auto& chunk : splitted_binlog) {
 | 
						|
    reader_impl.parse(reader, chunk).ensure();
 | 
						|
  }
 | 
						|
 | 
						|
  //Binlog write sync
 | 
						|
  {
 | 
						|
    td::Binlog::destroy(test_binlog_path);
 | 
						|
    td::BinlogWriter binlog_writer(test_binlog_path.str());
 | 
						|
    binlog_writer.open().ensure();
 | 
						|
 | 
						|
    BinlogReader new_reader;
 | 
						|
    size_t i = 0;
 | 
						|
    for (auto& logevent : reader.logevents()) {
 | 
						|
      binlog_writer.write_event(logevent, &new_reader).ensure();
 | 
						|
      i++;
 | 
						|
      if (i % 10 == 0) {
 | 
						|
        binlog_writer.write_event(LogEvent(LogEventCrc32C(new_reader.crc32c())), &new_reader).ensure();
 | 
						|
      }
 | 
						|
    }
 | 
						|
    binlog_writer.sync();
 | 
						|
    binlog_writer.close().ensure();
 | 
						|
 | 
						|
    auto file_data = read_file(test_binlog_path).move_as_ok();
 | 
						|
    ASSERT_TRUE(reader.logevents() == new_reader.logevents());
 | 
						|
    new_binlog_data = file_data.as_slice().str();
 | 
						|
    data = new_binlog_data;
 | 
						|
    //ASSERT_EQ(data, file_data);
 | 
						|
  }
 | 
						|
 | 
						|
  //Binlog write async
 | 
						|
  {
 | 
						|
    td::Binlog::destroy(test_binlog_path);
 | 
						|
    td::BinlogWriterAsync binlog_writer(test_binlog_path.str());
 | 
						|
 | 
						|
    td::actor::Scheduler scheduler({2});
 | 
						|
 | 
						|
    BinlogReader new_reader;
 | 
						|
    scheduler.run_in_context([&]() mutable {
 | 
						|
      binlog_writer.open().ensure();
 | 
						|
      for (auto& logevent : reader.logevents()) {
 | 
						|
        binlog_writer.write_event(logevent, &new_reader).ensure();
 | 
						|
      }
 | 
						|
      binlog_writer.sync([&](Result<td::Unit> res) {
 | 
						|
        res.ensure();
 | 
						|
        binlog_writer.close([&](Result<td::Unit> res) {
 | 
						|
          res.ensure();
 | 
						|
          td::actor::SchedulerContext::get()->stop();
 | 
						|
        });
 | 
						|
      });
 | 
						|
    });
 | 
						|
 | 
						|
    scheduler.run();
 | 
						|
    scheduler.stop();
 | 
						|
 | 
						|
    auto file_data = read_file(test_binlog_path).move_as_ok();
 | 
						|
    ASSERT_TRUE(reader.logevents() == new_reader.logevents());
 | 
						|
    //ASSERT_EQ(data, file_data);
 | 
						|
  }
 | 
						|
 | 
						|
  ASSERT_TRUE(!events || events.value() == reader.logevents());
 | 
						|
 | 
						|
  std::string new_data;
 | 
						|
  for (auto& event : reader.logevents()) {
 | 
						|
    int64 size = -event.serialize({});
 | 
						|
    std::string event_data(td::narrow_cast<std::size_t>(size), '\0');
 | 
						|
    int64 new_size = event.serialize(event_data);
 | 
						|
    CHECK(new_size == size);
 | 
						|
    new_data += event_data;
 | 
						|
  }
 | 
						|
  //ASSERT_EQ(data, new_data);
 | 
						|
 | 
						|
  // Binlog::read_sync
 | 
						|
  {
 | 
						|
    td::CSlice path("test.binlog");
 | 
						|
    td::Binlog::destroy(path);
 | 
						|
    td::write_file(path, data).ensure();
 | 
						|
 | 
						|
    td::Binlog binlog(path.str());
 | 
						|
    BinlogReader binlog_reader;
 | 
						|
    binlog.replay_sync(binlog_reader).ensure();
 | 
						|
 | 
						|
    ASSERT_EQ(reader.logevents().size(), binlog_reader.logevents().size());
 | 
						|
    ASSERT_TRUE(reader.logevents() == binlog_reader.logevents());
 | 
						|
  }
 | 
						|
 | 
						|
  // Binlog::read_async
 | 
						|
  {
 | 
						|
    td::Binlog::destroy(test_binlog_path);
 | 
						|
    td::write_file(test_binlog_path, data).ensure();
 | 
						|
 | 
						|
    td::Binlog binlog(test_binlog_path.str());
 | 
						|
    auto binlog_reader = std::make_shared<BinlogReader>();
 | 
						|
 | 
						|
    td::actor::Scheduler scheduler({2});
 | 
						|
    scheduler.run_in_context([&]() mutable {
 | 
						|
      binlog.replay_async(binlog_reader, [](Result<td::Unit> res) {
 | 
						|
        res.ensure();
 | 
						|
        td::actor::SchedulerContext::get()->stop();
 | 
						|
      });
 | 
						|
    });
 | 
						|
 | 
						|
    scheduler.run();
 | 
						|
    scheduler.stop();
 | 
						|
 | 
						|
    ASSERT_EQ(reader.logevents().size(), binlog_reader->logevents().size());
 | 
						|
    ASSERT_TRUE(reader.logevents() == binlog_reader->logevents());
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
TEST(Binlog, Reader) {
 | 
						|
  RandomBinlog binlog;
 | 
						|
  test_binlog(binlog.data(), binlog.logevents());
 | 
						|
}
 | 
						|
 | 
						|
TEST(Binlog, Hands) {
 | 
						|
  std::string binlog = td::base64_decode(
 | 
						|
                           "a0RCBAABKQCRMn1c2DaJhwrptxburpRtrWI2sjGhVbG29bFO0r8DDtAAExjZPKtjAAAAALwGAAAA"
 | 
						|
                           "AAAAFvJq3qfzFCDWap+LUrgBI8sWFayIOQSxkBjV3CWgizHYNomHCum3Fu6ulG2tYjayMaFVsbb1"
 | 
						|
                           "sU7SvwMO0AATGGO89BmAAAAA/////wEAAAAAAAAAAAAAgN4RAAAAAAAAa53L4ziGleZ7K+StAsBd"
 | 
						|
                           "txMxbHHfuB9SJRFp+BMzXfnGnt8TsgFnig7j/xVRjtIsYUVw0rQZJUC0sWQROj0SHvplIkBV9vMp")
 | 
						|
                           .move_as_ok();
 | 
						|
  test_binlog(binlog);
 | 
						|
}
 | 
						|
 | 
						|
TEST(Buffers, CyclicBufferSimple) {
 | 
						|
  {
 | 
						|
    auto reader_writer = td::CyclicBuffer::create();
 | 
						|
    auto reader = std::move(reader_writer.first);
 | 
						|
    auto writer = std::move(reader_writer.second);
 | 
						|
 | 
						|
    ASSERT_TRUE(!writer.is_reader_closed());
 | 
						|
    reader.close_reader(td::Status::Error(2));
 | 
						|
    ASSERT_TRUE(!reader.is_writer_closed());
 | 
						|
    ASSERT_TRUE(writer.is_reader_closed());
 | 
						|
    ASSERT_EQ(2, writer.reader_status().code());
 | 
						|
  }
 | 
						|
  {
 | 
						|
    auto reader_writer = td::CyclicBuffer::create();
 | 
						|
    auto reader = std::move(reader_writer.first);
 | 
						|
    auto writer = std::move(reader_writer.second);
 | 
						|
 | 
						|
    ASSERT_TRUE(!reader.is_writer_closed());
 | 
						|
    writer.close_writer(td::Status::Error(2));
 | 
						|
    ASSERT_TRUE(!writer.is_reader_closed());
 | 
						|
    ASSERT_TRUE(reader.is_writer_closed());
 | 
						|
    ASSERT_EQ(2, reader.writer_status().code());
 | 
						|
  }
 | 
						|
  {
 | 
						|
    td::CyclicBuffer::Options options;
 | 
						|
    options.chunk_size = 14;
 | 
						|
    options.count = 10;
 | 
						|
    options.alignment = 7;
 | 
						|
    auto reader_writer = td::CyclicBuffer::create(options);
 | 
						|
    auto reader = std::move(reader_writer.first);
 | 
						|
    auto writer = std::move(reader_writer.second);
 | 
						|
 | 
						|
    auto data = td::rand_string('a', 'z', 100001);
 | 
						|
    td::Slice write_slice = data;
 | 
						|
    td::Slice read_slice = data;
 | 
						|
    for (size_t i = 1; i < options.count; i++) {
 | 
						|
      ASSERT_EQ((i - 1) * options.chunk_size, reader.reader_size());
 | 
						|
      ASSERT_EQ((i - 1) * options.chunk_size, writer.writer_size());
 | 
						|
      auto slice = writer.prepare_write();
 | 
						|
      ASSERT_EQ(0u, reinterpret_cast<td::uint64>(slice.data()) % options.alignment);
 | 
						|
      auto to_copy = write_slice;
 | 
						|
      to_copy.truncate(options.chunk_size);
 | 
						|
      slice.copy_from(to_copy);
 | 
						|
      write_slice = write_slice.substr(to_copy.size());
 | 
						|
      writer.confirm_write(to_copy.size());
 | 
						|
      ASSERT_EQ(i * options.chunk_size, reader.reader_size());
 | 
						|
      ASSERT_EQ(i * options.chunk_size, writer.writer_size());
 | 
						|
    }
 | 
						|
    bool is_writer_closed = false;
 | 
						|
    while (true) {
 | 
						|
      {
 | 
						|
        bool is_closed = reader.is_writer_closed();
 | 
						|
        auto slice = reader.prepare_read();
 | 
						|
        ASSERT_EQ(read_slice.substr(0, slice.size()), slice);
 | 
						|
        read_slice = read_slice.substr(slice.size());
 | 
						|
        reader.confirm_read(slice.size());
 | 
						|
        if (is_closed && slice.empty()) {
 | 
						|
          break;
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      if (!is_writer_closed) {
 | 
						|
        auto slice = writer.prepare_write();
 | 
						|
        auto to_copy = write_slice;
 | 
						|
        to_copy.truncate(options.chunk_size);
 | 
						|
        if (to_copy.empty()) {
 | 
						|
          writer.close_writer(td::Status::OK());
 | 
						|
          is_writer_closed = true;
 | 
						|
        } else {
 | 
						|
          slice.copy_from(to_copy);
 | 
						|
          write_slice = write_slice.substr(to_copy.size());
 | 
						|
          writer.confirm_write(to_copy.size());
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    ASSERT_EQ(0u, write_slice.size());
 | 
						|
    ASSERT_EQ(0u, read_slice.size());
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
TEST(Buffers, CyclicBuffer) {
 | 
						|
  for (int t = 0; t < 20; t++) {
 | 
						|
    td::CyclicBuffer::Options options;
 | 
						|
    options.chunk_size = 14;
 | 
						|
    options.count = 10;
 | 
						|
    options.alignment = 7;
 | 
						|
    auto reader_writer = td::CyclicBuffer::create(options);
 | 
						|
    auto reader = std::move(reader_writer.first);
 | 
						|
    auto writer = std::move(reader_writer.second);
 | 
						|
    auto data = td::rand_string('a', 'z', 100001);
 | 
						|
    auto chunks = td::rand_split(data);
 | 
						|
 | 
						|
    size_t chunk_i = 0;
 | 
						|
    std::string res;
 | 
						|
    while (true) {
 | 
						|
      if (td::Random::fast(0, 1) == 0) {
 | 
						|
        bool is_closed = reader.is_writer_closed();
 | 
						|
        auto slice = reader.prepare_read();
 | 
						|
        res += slice.str();
 | 
						|
        reader.confirm_read(slice.size());
 | 
						|
        if (slice.empty() && is_closed) {
 | 
						|
          reader.writer_status().ensure();
 | 
						|
          break;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      if (chunk_i < chunks.size() && td::Random::fast(0, 1) == 0) {
 | 
						|
        auto slice = writer.prepare_write();
 | 
						|
        auto from = Slice(chunks[chunk_i]);
 | 
						|
        auto copy = from.substr(0, slice.size());
 | 
						|
        slice.copy_from(copy);
 | 
						|
        writer.confirm_write(copy.size());
 | 
						|
        auto left = from.substr(copy.size());
 | 
						|
        if (!left.empty()) {
 | 
						|
          chunks[chunk_i] = left.str();
 | 
						|
        } else {
 | 
						|
          chunk_i++;
 | 
						|
          if (chunk_i == chunks.size()) {
 | 
						|
            writer.close_writer(td::Status::OK());
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    ASSERT_EQ(data, res);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
TEST(Buffers, ChainBuffer) {
 | 
						|
  for (int t = 0; t < 20; t++) {
 | 
						|
    td::ChainBuffer::Options options;
 | 
						|
    options.chunk_size = 14;
 | 
						|
    auto reader_writer = td::ChainBuffer::create(options);
 | 
						|
    auto reader = std::move(reader_writer.first);
 | 
						|
    auto writer = std::move(reader_writer.second);
 | 
						|
    auto data = td::rand_string('a', 'z', 100001);
 | 
						|
    auto chunks = td::rand_split(data);
 | 
						|
 | 
						|
    size_t chunk_i = 0;
 | 
						|
    std::string res;
 | 
						|
    while (true) {
 | 
						|
      if (td::Random::fast(0, 1) == 0) {
 | 
						|
        bool is_closed = reader.is_writer_closed();
 | 
						|
        Slice slice;
 | 
						|
        if (reader.reader_size() != 0) {
 | 
						|
          slice = reader.prepare_read();
 | 
						|
          res += slice.str();
 | 
						|
          reader.confirm_read(slice.size());
 | 
						|
        }
 | 
						|
        if (slice.empty() && is_closed) {
 | 
						|
          reader.writer_status().ensure();
 | 
						|
          break;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      if (chunk_i < chunks.size() && td::Random::fast(0, 1) == 0) {
 | 
						|
        writer.append(chunks[chunk_i]);
 | 
						|
        chunk_i++;
 | 
						|
        if (chunk_i == chunks.size()) {
 | 
						|
          writer.close_writer(td::Status::OK());
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    ASSERT_EQ(data.size(), res.size());
 | 
						|
    ASSERT_EQ(data, res);
 | 
						|
  }
 | 
						|
}
 |