mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			349 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			349 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|     This file is part of TON Blockchain Library.
 | |
| 
 | |
|     TON Blockchain Library is free software: you can redistribute it and/or modify
 | |
|     it under the terms of the GNU Lesser General Public License as published by
 | |
|     the Free Software Foundation, either version 2 of the License, or
 | |
|     (at your option) any later version.
 | |
| 
 | |
|     TON Blockchain Library is distributed in the hope that it will be useful,
 | |
|     but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|     GNU Lesser General Public License for more details.
 | |
| 
 | |
|     You should have received a copy of the GNU Lesser General Public License
 | |
|     along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
|     Copyright 2017-2019 Telegram Systems LLP
 | |
| */
 | |
| #include "Binlog.h"
 | |
| 
 | |
| #include "BinlogReaderHelper.h"
 | |
| 
 | |
| #include "td/db/utils/StreamInterface.h"
 | |
| #include "td/db/utils/ChainBuffer.h"
 | |
| #include "td/db/utils/CyclicBuffer.h"
 | |
| #include "td/db/utils/FileSyncState.h"
 | |
| #include "td/db/utils/StreamToFileActor.h"
 | |
| #include "td/db/utils/FileToStreamActor.h"
 | |
| 
 | |
| #include "td/actor/actor.h"
 | |
| 
 | |
| #include "td/utils/misc.h"
 | |
| #include "td/utils/port/path.h"
 | |
| #include "td/utils/VectorQueue.h"
 | |
| 
 | |
| namespace td {
 | |
| namespace {
 | |
| class BinlogReplayActor : public actor::Actor {
 | |
|  public:
 | |
|   BinlogReplayActor(StreamReader stream_reader, actor::ActorOwn<FileToStreamActor> file_to_stream,
 | |
|                     std::shared_ptr<BinlogReaderInterface> binlog_reader, Promise<Unit> promise)
 | |
|       : stream_reader_(std::move(stream_reader))
 | |
|       , file_to_stream_(std::move(file_to_stream))
 | |
|       , binlog_reader_(std::move(binlog_reader))
 | |
|       , promise_(std::move(promise)) {
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   StreamReader stream_reader_;
 | |
|   actor::ActorOwn<FileToStreamActor> file_to_stream_;
 | |
|   std::shared_ptr<BinlogReaderInterface> binlog_reader_;
 | |
|   Promise<Unit> promise_;
 | |
| 
 | |
|   bool is_writer_closed_{false};
 | |
|   BinlogReaderHelper binlog_reader_helper_;
 | |
| 
 | |
|   unique_ptr<FileToStreamActor::Callback> create_callback() {
 | |
|     class Callback : public FileToStreamActor::Callback {
 | |
|      public:
 | |
|       Callback(actor::ActorShared<> actor) : actor_(std::move(actor)) {
 | |
|       }
 | |
|       void got_more() override {
 | |
|         send_signals_later(actor_, actor::ActorSignals::wakeup());
 | |
|       }
 | |
| 
 | |
|      private:
 | |
|       actor::ActorShared<> actor_;
 | |
|     };
 | |
|     return make_unique<Callback>(actor_shared(this));
 | |
|   }
 | |
| 
 | |
|   void start_up() override {
 | |
|     send_closure_later(file_to_stream_, &FileToStreamActor::set_callback, create_callback());
 | |
|   }
 | |
|   void notify_writer() {
 | |
|     send_signals_later(file_to_stream_, actor::ActorSignals::wakeup());
 | |
|   }
 | |
| 
 | |
|   void loop() override {
 | |
|     auto status = do_loop();
 | |
|     if (status.is_error()) {
 | |
|       stream_reader_.close_reader(status.clone());
 | |
|       promise_.set_error(std::move(status));
 | |
|       return stop();
 | |
|     }
 | |
|     if (is_writer_closed_) {
 | |
|       stream_reader_.close_reader(Status::OK());
 | |
|       promise_.set_value(Unit());
 | |
|       return stop();
 | |
|     }
 | |
|   }
 | |
|   Status do_loop() {
 | |
|     is_writer_closed_ = stream_reader_.is_writer_closed();
 | |
|     if (is_writer_closed_) {
 | |
|       TRY_STATUS(std::move(stream_reader_.writer_status()));
 | |
|     }
 | |
| 
 | |
|     // TODO: watermark want_more/got_more logic
 | |
|     int64 got_size = stream_reader_.reader_size();
 | |
|     while (got_size > 0) {
 | |
|       auto slice = stream_reader_.prepare_read();
 | |
|       TRY_STATUS(binlog_reader_helper_.parse(*binlog_reader_, slice));
 | |
|       stream_reader_.confirm_read(slice.size());
 | |
|       got_size -= slice.size();
 | |
|     }
 | |
|     notify_writer();
 | |
| 
 | |
|     if (is_writer_closed_) {
 | |
|       if (binlog_reader_helper_.unparsed_size() != 0) {
 | |
|         return Status::Error(PSLICE() << "Got " << binlog_reader_helper_.unparsed_size()
 | |
|                                       << " unparsed bytes in binlog");
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     return Status::OK();
 | |
|   }
 | |
| };
 | |
| }  // namespace
 | |
| Binlog::Binlog(string path) : path_(std::move(path)) {
 | |
| }
 | |
| 
 | |
| Status Binlog::replay_sync(BinlogReaderInterface& binlog_reader) {
 | |
|   TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Read));
 | |
|   // No need to use Cyclic buffer, but CyclicBuffer is important for async version
 | |
|   CyclicBuffer::Options options;
 | |
|   options.chunk_size = 256;
 | |
|   options.count = 1;
 | |
|   auto reader_writer = CyclicBuffer::create(options);
 | |
| 
 | |
|   auto buf_reader = std::move(reader_writer.first);
 | |
|   auto buf_writer = std::move(reader_writer.second);
 | |
| 
 | |
|   TRY_RESULT(fd_size, fd.get_size());
 | |
| 
 | |
|   BinlogReaderHelper helper;
 | |
|   while (fd_size != 0) {
 | |
|     auto read_to = buf_writer.prepare_write();
 | |
|     if (static_cast<int64>(read_to.size()) > fd_size) {
 | |
|       read_to.truncate(narrow_cast<size_t>(fd_size));
 | |
|     }
 | |
|     TRY_RESULT(read, fd.read(read_to));
 | |
|     if (read == 0) {
 | |
|       return Status::Error("Unexpected end of file");
 | |
|     }
 | |
|     fd_size -= read;
 | |
|     buf_writer.confirm_write(read);
 | |
| 
 | |
|     auto data = buf_reader.prepare_read();
 | |
|     CHECK(data.size() == read);
 | |
|     TRY_STATUS(helper.parse(binlog_reader, data));
 | |
|     buf_reader.confirm_read(data.size());
 | |
|   }
 | |
| 
 | |
|   if (helper.unparsed_size() != 0) {
 | |
|     return Status::Error(PSLICE() << "Got " << helper.unparsed_size() << " unparsed bytes in binlog");
 | |
|   }
 | |
| 
 | |
|   //TODO: check crc32
 | |
|   //TODO: allow binlog truncate
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| void Binlog::replay_async(std::shared_ptr<BinlogReaderInterface> binlog_reader, Promise<Unit> promise) {
 | |
|   auto r_fd = FileFd::open(path_, FileFd::Flags::Read);
 | |
|   if (r_fd.is_error()) {
 | |
|     promise.set_error(r_fd.move_as_error());
 | |
|     return;
 | |
|   }
 | |
|   auto fd = r_fd.move_as_ok();
 | |
|   CyclicBuffer::Options buf_options;
 | |
|   buf_options.chunk_size = 256;
 | |
|   auto reader_writer = CyclicBuffer::create(buf_options);
 | |
| 
 | |
|   auto buf_reader = std::move(reader_writer.first);
 | |
|   auto buf_writer = std::move(reader_writer.second);
 | |
| 
 | |
|   auto r_fd_size = fd.get_size();
 | |
|   if (r_fd_size.is_error()) {
 | |
|     promise.set_error(r_fd_size.move_as_error());
 | |
|   }
 | |
|   auto options = FileToStreamActor::Options{};
 | |
|   options.limit = r_fd_size.move_as_ok();
 | |
|   auto file_to_stream =
 | |
|       actor::create_actor<FileToStreamActor>("FileToStream", std::move(fd), std::move(buf_writer), options);
 | |
|   auto stream_to_binlog = actor::create_actor<BinlogReplayActor>(
 | |
|       "BinlogReplay", std::move(buf_reader), std::move(file_to_stream), std::move(binlog_reader), std::move(promise));
 | |
|   stream_to_binlog.release();
 | |
| }
 | |
| 
 | |
| void Binlog::destroy(CSlice path) {
 | |
|   td::unlink(path).ignore();
 | |
| }
 | |
| 
 | |
| void Binlog::destroy() {
 | |
|   destroy(path_);
 | |
| }
 | |
| 
 | |
| BinlogWriter::BinlogWriter(std::string path) : path_(std::move(path)) {
 | |
| }
 | |
| 
 | |
| Status BinlogWriter::open() {
 | |
|   TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Write | FileFd::Flags::Append | FileFd::Create));
 | |
|   fd_ = std::move(fd);
 | |
|   ChainBuffer::Options buf_options;
 | |
|   buf_options.max_io_slices = 128;
 | |
|   buf_options.chunk_size = 256;
 | |
|   auto reader_writer = ChainBuffer::create(buf_options);
 | |
|   buf_reader_ = std::move(reader_writer.first);
 | |
|   buf_writer_ = std::move(reader_writer.second);
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| Status BinlogWriter::lazy_flush() {
 | |
|   if (buf_reader_.reader_size() < 512) {
 | |
|     return Status::OK();
 | |
|   }
 | |
|   return flush();
 | |
| }
 | |
| 
 | |
| Status BinlogWriter::flush() {
 | |
|   while (buf_reader_.reader_size() != 0) {
 | |
|     TRY_RESULT(written, fd_.writev(buf_reader_.prepare_readv()));
 | |
|     buf_reader_.confirm_read(written);
 | |
|   }
 | |
|   return Status::OK();
 | |
| }
 | |
| Status BinlogWriter::sync() {
 | |
|   flush();
 | |
|   return fd_.sync();
 | |
| }
 | |
| 
 | |
| Status BinlogWriter::close() {
 | |
|   sync();
 | |
|   fd_.close();
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| namespace detail {
 | |
| class FlushHelperActor : public actor::Actor {
 | |
|  public:
 | |
|   FlushHelperActor(FileSyncState::Reader sync_state_reader, actor::ActorOwn<StreamToFileActor> actor)
 | |
|       : sync_state_reader_(std::move(sync_state_reader)), actor_(std::move(actor)) {
 | |
|   }
 | |
|   void flush() {
 | |
|     //TODO;
 | |
|   }
 | |
|   void sync(size_t position, Promise<Unit> promise) {
 | |
|     sync_state_reader_.set_requested_sync_size(position);
 | |
|     if (promise) {
 | |
|       queries_.emplace(position, std::move(promise));
 | |
|     }
 | |
|     send_signals_later(actor_, actor::ActorSignals::wakeup());
 | |
|   }
 | |
| 
 | |
|   void close(Promise<> promise) {
 | |
|     close_promise_ = std::move(promise);
 | |
|     actor_.reset();
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   FileSyncState::Reader sync_state_reader_;
 | |
|   actor::ActorOwn<StreamToFileActor> actor_;
 | |
|   Promise<> close_promise_;
 | |
| 
 | |
|   struct Query {
 | |
|     Query(size_t position, Promise<Unit> promise) : position(position), promise(std::move(promise)) {
 | |
|     }
 | |
|     size_t position;
 | |
|     Promise<Unit> promise;
 | |
|   };
 | |
|   VectorQueue<Query> queries_;
 | |
| 
 | |
|   unique_ptr<StreamToFileActor::Callback> create_callback() {
 | |
|     class Callback : public StreamToFileActor::Callback {
 | |
|      public:
 | |
|       Callback(actor::ActorShared<> actor) : actor_(std::move(actor)) {
 | |
|       }
 | |
|       void on_sync_state_changed() override {
 | |
|         send_signals_later(actor_, actor::ActorSignals::wakeup());
 | |
|       }
 | |
| 
 | |
|      private:
 | |
|       actor::ActorShared<> actor_;
 | |
|     };
 | |
|     return make_unique<Callback>(actor_shared(this));
 | |
|   }
 | |
| 
 | |
|   void start_up() override {
 | |
|     send_closure_later(actor_, &StreamToFileActor::set_callback, create_callback());
 | |
|   }
 | |
| 
 | |
|   void loop() override {
 | |
|     auto synced_position = sync_state_reader_.synced_size();
 | |
|     while (!queries_.empty() && queries_.front().position <= synced_position) {
 | |
|       queries_.front().promise.set_value(Unit());
 | |
|       queries_.pop();
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void hangup_shared() override {
 | |
|     stop();
 | |
|   }
 | |
|   void tear_down() override {
 | |
|     if (close_promise_) {
 | |
|       close_promise_.set_value(Unit());
 | |
|     }
 | |
|   }
 | |
| };
 | |
| }  // namespace detail
 | |
| BinlogWriterAsync::BinlogWriterAsync(std::string path) : path_(std::move(path)) {
 | |
| }
 | |
| BinlogWriterAsync::~BinlogWriterAsync() = default;
 | |
| 
 | |
| Status BinlogWriterAsync::open() {
 | |
|   TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Write | FileFd::Flags::Append | FileFd::Create));
 | |
|   ChainBuffer::Options buf_options;
 | |
|   buf_options.max_io_slices = 128;
 | |
|   buf_options.chunk_size = 256;
 | |
|   auto reader_writer = ChainBuffer::create(buf_options);
 | |
|   buf_writer_ = std::move(reader_writer.second);
 | |
| 
 | |
|   auto sync_state_reader_writer = td::FileSyncState::create();
 | |
|   auto writer_actor = actor::create_actor<StreamToFileActor>("StreamToFile", std::move(reader_writer.first),
 | |
|                                                              std::move(fd), std::move(sync_state_reader_writer.second));
 | |
|   writer_actor_ = writer_actor.get();
 | |
|   sync_state_reader_ = std::move(sync_state_reader_writer.first);
 | |
| 
 | |
|   flush_helper_actor_ =
 | |
|       actor::create_actor<detail::FlushHelperActor>("FlushHelperActor", sync_state_reader_, std::move(writer_actor));
 | |
| 
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| void BinlogWriterAsync::close(Promise<> promise) {
 | |
|   send_closure(std::move(flush_helper_actor_), &detail::FlushHelperActor::close, std::move(promise));
 | |
|   writer_actor_ = {};
 | |
| }
 | |
| void BinlogWriterAsync::lazy_flush() {
 | |
|   send_signals_later(writer_actor_, actor::ActorSignals::wakeup());
 | |
| }
 | |
| 
 | |
| void BinlogWriterAsync::flush() {
 | |
|   send_closure(flush_helper_actor_, &detail::FlushHelperActor::flush);
 | |
| }
 | |
| void BinlogWriterAsync::sync(Promise<Unit> promise) {
 | |
|   send_closure(flush_helper_actor_, &detail::FlushHelperActor::sync, buf_writer_.writer_size(), std::move(promise));
 | |
| }
 | |
| 
 | |
| }  // namespace td
 |