mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			685 lines
		
	
	
	
		
			20 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			685 lines
		
	
	
	
		
			20 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* 
 | |
|     This file is part of TON Blockchain source code.
 | |
| 
 | |
|     TON Blockchain is free software; you can redistribute it and/or
 | |
|     modify it under the terms of the GNU General Public License
 | |
|     as published by the Free Software Foundation; either version 2
 | |
|     of the License, or (at your option) any later version.
 | |
| 
 | |
|     TON Blockchain is distributed in the hope that it will be useful,
 | |
|     but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|     GNU General Public License for more details.
 | |
| 
 | |
|     You should have received a copy of the GNU General Public License
 | |
|     along with TON Blockchain.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
|     In addition, as a special exception, the copyright holders give permission 
 | |
|     to link the code of portions of this program with the OpenSSL library. 
 | |
|     You must obey the GNU General Public License in all respects for all 
 | |
|     of the code used other than OpenSSL. If you modify file(s) with this 
 | |
|     exception, you may extend this exception to your version of the file(s), 
 | |
|     but you are not obligated to do so. If you do not wish to do so, delete this 
 | |
|     exception statement from your version. If you delete this exception statement 
 | |
|     from all source files in the program, then also delete it here.
 | |
| 
 | |
|     Copyright 2017-2019 Telegram Systems LLP
 | |
| */
 | |
| #include "td/utils/OptionsParser.h"
 | |
| #include "td/utils/filesystem.h"
 | |
| #include "td/utils/port/FileFd.h"
 | |
| #include "td/utils/Timer.h"
 | |
| #include "td/utils/crypto.h"
 | |
| #include "td/utils/BufferedReader.h"
 | |
| #include "td/utils/optional.h"
 | |
| #include "td/actor/actor.h"
 | |
| 
 | |
| #include "td/db/utils/StreamInterface.h"
 | |
| #include "td/db/utils/ChainBuffer.h"
 | |
| #include "td/db/utils/CyclicBuffer.h"
 | |
| #include "td/db/utils/FileSyncState.h"
 | |
| #include "td/db/utils/StreamToFileActor.h"
 | |
| #include "td/db/utils/FileToStreamActor.h"
 | |
| 
 | |
| #include <cmath>
 | |
| 
 | |
| namespace td {
 | |
| class AsyncCyclicBufferReader : public td::actor::Actor {
 | |
|  public:
 | |
|   class Callback {
 | |
|    public:
 | |
|     virtual ~Callback() {
 | |
|     }
 | |
|     virtual void want_more() = 0;
 | |
|     virtual Status process(Slice data) = 0;
 | |
|     virtual void on_closed(Status status) = 0;
 | |
|   };
 | |
|   AsyncCyclicBufferReader(CyclicBuffer::Reader reader, td::unique_ptr<Callback> callback)
 | |
|       : reader_(std::move(reader)), callback_(std::move(callback)) {
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   CyclicBuffer::Reader reader_;
 | |
|   td::unique_ptr<Callback> callback_;
 | |
| 
 | |
|   void loop() override {
 | |
|     while (true) {
 | |
|       auto data = reader_.prepare_read();
 | |
|       if (data.empty()) {
 | |
|         if (reader_.is_writer_closed()) {
 | |
|           callback_->on_closed(std::move(reader_.writer_status()));
 | |
|           return stop();
 | |
|         }
 | |
|         callback_->want_more();
 | |
|         return;
 | |
|       }
 | |
|       auto status = callback_->process(data);
 | |
|       if (status.is_error()) {
 | |
|         callback_->on_closed(std::move(status));
 | |
|       }
 | |
|       reader_.confirm_read(data.size());
 | |
|       //TODO: better condition for want_more. May be reader should decide if it is ready for more writes
 | |
|       callback_->want_more();
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| }  // namespace td
 | |
| 
 | |
| class Processor {
 | |
|  public:
 | |
|   void process(td::Slice slice) {
 | |
|     res = crc32c_extend(res, slice);
 | |
|     res2 = crc32c_extend(res2, slice);
 | |
|   }
 | |
|   auto result() {
 | |
|     return res * res2;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   td::uint32 res{0};
 | |
|   td::uint32 res2{0};
 | |
| };
 | |
| 
 | |
| void read_baseline(td::CSlice path) {
 | |
|   LOG(ERROR) << "BASELINE";
 | |
|   td::PerfWarningTimer timer("read file");
 | |
|   auto data = td::read_file(path).move_as_ok();
 | |
|   timer.reset();
 | |
| 
 | |
|   td::PerfWarningTimer process_timer("process file", 0);
 | |
|   Processor processor;
 | |
|   processor.process(data.as_slice());
 | |
|   process_timer.reset();
 | |
|   LOG(ERROR) << processor.result();
 | |
| }
 | |
| 
 | |
| void read_buffered(td::CSlice path, size_t buffer_size) {
 | |
|   LOG(ERROR) << "BufferedReader";
 | |
|   auto fd = td::FileFd::open(path, td::FileFd::Read).move_as_ok();
 | |
|   td::BufferedReader reader(fd, buffer_size);
 | |
|   std::vector<char> buf(buffer_size);
 | |
|   Processor processor;
 | |
|   while (true) {
 | |
|     auto slice = td::MutableSlice(&buf[0], buf.size());
 | |
|     auto size = reader.read(slice).move_as_ok();
 | |
|     if (size == 0) {
 | |
|       break;
 | |
|     }
 | |
|     processor.process(slice.truncate(size));
 | |
|   }
 | |
|   LOG(ERROR) << processor.result();
 | |
| }
 | |
| 
 | |
| void read_async(td::CSlice path, size_t buffer_size) {
 | |
|   LOG(ERROR) << "Async";
 | |
|   auto fd = td::FileFd::open(path, td::FileFd::Read).move_as_ok();
 | |
|   td::actor::Scheduler scheduler({2});
 | |
|   scheduler.run_in_context([&] {
 | |
|     auto reader_writer = td::CyclicBuffer::create();
 | |
|     //TODO: hide actor
 | |
|     auto reader =
 | |
|         td::actor::create_actor<td::FileToStreamActor>("Reader", std::move(fd), std::move(reader_writer.second));
 | |
|     class Callback : public td::AsyncCyclicBufferReader::Callback {
 | |
|      public:
 | |
|       Callback(td::actor::ActorOwn<> reader) : reader_(std::move(reader)) {
 | |
|       }
 | |
|       void want_more() override {
 | |
|         td::actor::send_signals_later(reader_, td::actor::ActorSignals::wakeup());
 | |
|       }
 | |
|       td::Status process(td::Slice data) override {
 | |
|         processor.process(data);
 | |
|         return td::Status::OK();
 | |
|       }
 | |
|       void on_closed(td::Status status) override {
 | |
|         LOG(ERROR) << processor.result();
 | |
|         td::actor::SchedulerContext::get()->stop();
 | |
|       }
 | |
| 
 | |
|      private:
 | |
|       td::actor::ActorOwn<> reader_;
 | |
|       Processor processor;
 | |
|     };
 | |
|     auto reader_copy = reader.get();
 | |
|     auto callback = td::make_unique<Callback>(std::move(reader));
 | |
|     auto processor = td::actor::create_actor<td::AsyncCyclicBufferReader>(
 | |
|         "BufferReader", std::move(reader_writer.first), std::move(callback));
 | |
|     class ReaderCallback : public td::FileToStreamActor::Callback {
 | |
|      public:
 | |
|       ReaderCallback(td::actor::ActorId<> actor) : actor_(std::move(actor)) {
 | |
|       }
 | |
|       void got_more() override {
 | |
|         td::actor::send_signals_later(actor_, td::actor::ActorSignals::wakeup());
 | |
|       }
 | |
| 
 | |
|      private:
 | |
|       td::actor::ActorId<> actor_;
 | |
|     };
 | |
|     send_closure(reader_copy, &td::FileToStreamActor::set_callback,
 | |
|                  td::make_unique<ReaderCallback>(processor.release()));
 | |
|   });
 | |
|   scheduler.run();
 | |
| }
 | |
| 
 | |
| static char o_direct_buf[100000000];
 | |
| void read_o_direct(td::CSlice path, size_t buffer_size) {
 | |
|   LOG(ERROR) << "Direct";
 | |
|   auto fd = td::FileFd::open(path, td::FileFd::Read | td::FileFd::Direct).move_as_ok();
 | |
|   size_t align = 4096;
 | |
|   auto *ptr =
 | |
|       reinterpret_cast<char *>((reinterpret_cast<std::uintptr_t>(o_direct_buf) + align - 1) & td::bits_negate64(align));
 | |
| 
 | |
|   td::BufferedReader reader(fd, buffer_size);
 | |
|   Processor processor;
 | |
|   while (true) {
 | |
|     auto slice = td::MutableSlice(ptr, buffer_size);
 | |
|     auto size = reader.read(slice).move_as_ok();
 | |
|     if (size == 0) {
 | |
|       break;
 | |
|     }
 | |
|     processor.process(slice.truncate(size));
 | |
|   }
 | |
|   LOG(ERROR) << processor.result();
 | |
| }
 | |
| 
 | |
| class DataGenerator {
 | |
|  public:
 | |
|   operator bool() const {
 | |
|     return generated_size < total_size;
 | |
|   }
 | |
| 
 | |
|   td::string next() {
 | |
|     auto res = words_[2];
 | |
|     generated_size += res.size();
 | |
|     return res;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::vector<std::string> words_{"a", "fjdksalfdfs", std::string(20, 'b'), std::string(1000, 'a')};
 | |
|   size_t total_size = (1 << 20) * 600;
 | |
|   size_t generated_size = 0;
 | |
| };
 | |
| 
 | |
| void write_baseline(td::CSlice path, size_t buffer_size) {
 | |
|   LOG(ERROR) << "Baseline";
 | |
|   auto fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
 | |
|                 .move_as_ok();
 | |
|   std::vector<char> buf(buffer_size);
 | |
| 
 | |
|   DataGenerator generator;
 | |
|   while (generator) {
 | |
|     auto slice = generator.next();
 | |
|     fd.write(slice).ensure();
 | |
|   }
 | |
|   fd.sync().ensure();
 | |
| }
 | |
| void write_buffered(td::CSlice path, size_t buffer_size) {
 | |
|   LOG(ERROR) << "Buffered";
 | |
|   auto fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
 | |
|                 .move_as_ok();
 | |
|   std::vector<char> buf(buffer_size);
 | |
|   size_t data_size{0};
 | |
| 
 | |
|   auto flush = [&]() {
 | |
|     auto slice = td::Slice(buf.data(), data_size);
 | |
|     fd.write(slice).ensure();
 | |
|     //auto io_slice = as_io_slice(slice);
 | |
|     //fd.writev({&io_slice, 1}).ensure();
 | |
|     data_size = 0;
 | |
|   };
 | |
|   auto append = [&](td::Slice slice) {
 | |
|     if (data_size + slice.size() > buffer_size) {
 | |
|       flush();
 | |
|     }
 | |
| 
 | |
|     td::MutableSlice(buf.data(), buffer_size).substr(data_size).copy_from(slice);
 | |
|     data_size += slice.size();
 | |
|   };
 | |
| 
 | |
|   DataGenerator generator;
 | |
|   while (generator) {
 | |
|     auto slice = generator.next();
 | |
|     append(slice);
 | |
|   }
 | |
|   flush();
 | |
|   fd.sync().ensure();
 | |
| }
 | |
| 
 | |
| namespace td {
 | |
| 
 | |
| class FileWriter {
 | |
|  public:
 | |
|   FileWriter(FileFd fd, size_t buffer_size) : fd_(std::move(fd)), raw_buffer_(buffer_size) {
 | |
|     reset();
 | |
|     buffer_slices_.reserve(1024);
 | |
|     strings_.reserve(1024);
 | |
|     ios_slices_.reserve(1024);
 | |
|   }
 | |
| 
 | |
|   void append(std::string data) {
 | |
|     cached_size_ += data.size();
 | |
|     if (data.size() <= max_copy_size) {
 | |
|       append_copy(data);
 | |
|     } else {
 | |
|       CHECK(strings_.size() < strings_.capacity());
 | |
|       strings_.push_back(std::move(data));
 | |
|       ios_slices_.push_back(as_io_slice(strings_.back()));
 | |
|       should_merge_ = false;
 | |
|     }
 | |
|     try_flush();
 | |
|   }
 | |
| 
 | |
|   void append(BufferSlice data) {
 | |
|     cached_size_ += data.size();
 | |
|     if (data.size() <= max_copy_size) {
 | |
|       append_copy(data);
 | |
|     } else {
 | |
|       buffer_slices_.push_back(std::move(data));
 | |
|       ios_slices_.push_back(as_io_slice(strings_.back()));
 | |
|       should_merge_ = false;
 | |
|     }
 | |
|     try_flush();
 | |
|   }
 | |
| 
 | |
|   void append(Slice data) {
 | |
|     if (data.size() <= max_copy_size) {
 | |
|       append_copy(data);
 | |
|       try_flush();
 | |
|     } else if (data.size() > min_immediate_write_size) {
 | |
|       ios_slices_.push_back(as_io_slice(data));
 | |
|       flush();
 | |
|     } else {
 | |
|       append(BufferSlice(data));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void flush() {
 | |
|     if (ios_slices_.empty()) {
 | |
|       return;
 | |
|     }
 | |
|     flushed_size_ += cached_size_;
 | |
|     fd_.writev(ios_slices_).ensure();
 | |
|     reset();
 | |
|   }
 | |
| 
 | |
|   void sync() {
 | |
|     flush();
 | |
|     synced_size_ = flushed_size_;
 | |
|     fd_.sync().ensure();
 | |
|   }
 | |
| 
 | |
|   bool may_flush() const {
 | |
|     return cached_size_ != 0;
 | |
|   }
 | |
|   size_t total_size() const {
 | |
|     return flushed_size() + cached_size_;
 | |
|   }
 | |
|   size_t flushed_size() const {
 | |
|     return flushed_size_;
 | |
|   }
 | |
|   size_t synced_size() const {
 | |
|     return synced_size_;
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   static constexpr size_t max_cached_size = 256 * (1 << 10);
 | |
|   static constexpr size_t min_immediate_write_size = 32 * (1 << 10);
 | |
| 
 | |
|   FileFd fd_;
 | |
| 
 | |
|   std::vector<char> raw_buffer_;
 | |
|   size_t max_copy_size = min(raw_buffer_.size() / 8, size_t(4096u));
 | |
|   MutableSlice buffer_;
 | |
|   bool should_merge_ = false;
 | |
| 
 | |
|   std::vector<BufferSlice> buffer_slices_;
 | |
|   std::vector<std::string> strings_;
 | |
|   std::vector<IoSlice> ios_slices_;
 | |
|   size_t cached_size_{0};
 | |
|   size_t flushed_size_{0};
 | |
|   size_t synced_size_{0};
 | |
| 
 | |
|   void append_copy(Slice data) {
 | |
|     buffer_.copy_from(data);
 | |
|     if (should_merge_) {
 | |
|       auto back = as_slice(ios_slices_.back());
 | |
|       back = Slice(back.data(), back.size() + data.size());
 | |
|       ios_slices_.back() = as_io_slice(back);
 | |
|     } else {
 | |
|       ios_slices_.push_back(as_io_slice(buffer_.substr(0, data.size())));
 | |
|       should_merge_ = true;
 | |
|     }
 | |
|     buffer_ = buffer_.substr(data.size());
 | |
|   }
 | |
| 
 | |
|   void reset() {
 | |
|     buffer_ = MutableSlice(raw_buffer_.data(), raw_buffer_.size());
 | |
|     buffer_slices_.clear();
 | |
|     strings_.clear();
 | |
|     ios_slices_.clear();
 | |
|     should_merge_ = false;
 | |
|     cached_size_ = 0;
 | |
|   }
 | |
| 
 | |
|   bool must_flush() const {
 | |
|     return buffer_.size() < max_copy_size || ios_slices_.size() == ios_slices_.capacity() ||
 | |
|            cached_size_ >= max_cached_size;
 | |
|   }
 | |
|   void try_flush() {
 | |
|     if (!must_flush()) {
 | |
|       return;
 | |
|     }
 | |
|     flush();
 | |
|   }
 | |
| };
 | |
| 
 | |
| class AsyncFileWriterActor : public actor::Actor {
 | |
|  public:
 | |
|   AsyncFileWriterActor(FileSyncState::Reader state) : state_(std::move(state)) {
 | |
|     io_slices_.reserve(100);
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   FileFd fd_;
 | |
|   ChainBufferReader reader_;
 | |
|   FileSyncState::Reader state_;
 | |
|   std::vector<IoSlice> io_slices_;
 | |
| 
 | |
|   size_t flushed_size_{0};
 | |
|   size_t synced_size_{0};
 | |
| 
 | |
|   void flush() {
 | |
|     reader_.sync_with_writer();
 | |
|     while (!reader_.empty()) {
 | |
|       auto it = reader_.clone();
 | |
|       size_t io_slices_size = 0;
 | |
|       while (!it.empty() && io_slices_.size() < io_slices_.capacity()) {
 | |
|         auto slice = it.prepare_read();
 | |
|         io_slices_.push_back(as_io_slice(slice));
 | |
|         io_slices_size += slice.size();
 | |
|         it.confirm_read(slice.size());
 | |
|       }
 | |
|       if (!io_slices_.empty()) {
 | |
|         auto r_written = fd_.writev(io_slices_);
 | |
|         LOG_IF(FATAL, r_written.is_error()) << r_written.error();
 | |
|         auto written = r_written.move_as_ok();
 | |
|         CHECK(written == io_slices_size);
 | |
|         flushed_size_ += written;
 | |
|         io_slices_.clear();
 | |
|       }
 | |
|       reader_ = std::move(it);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   void loop() override {
 | |
|     reader_.sync_with_writer();
 | |
|     flush();
 | |
|   }
 | |
| };
 | |
| 
 | |
| }  // namespace td
 | |
| 
 | |
| void write_vector(td::CSlice path, size_t buffer_size) {
 | |
|   LOG(ERROR) << "io vector";
 | |
|   auto fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
 | |
|                 .move_as_ok();
 | |
|   td::FileWriter writer(std::move(fd), buffer_size);
 | |
| 
 | |
|   DataGenerator generator;
 | |
|   while (generator) {
 | |
|     auto slice = generator.next();
 | |
|     writer.append(std::move(slice));
 | |
|   }
 | |
|   writer.sync();
 | |
| }
 | |
| 
 | |
| void write_async(td::CSlice path, size_t buffer_size) {
 | |
|   LOG(ERROR) << "Async";
 | |
|   auto fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
 | |
|                 .move_as_ok();
 | |
|   td::actor::Scheduler scheduler({1});
 | |
|   scheduler.run_in_context([&] {
 | |
|     class Writer : public td::actor::Actor {
 | |
|      public:
 | |
|       Writer(td::FileFd fd, size_t buffer_size) : fd_(std::move(fd)), buffer_size_(buffer_size) {
 | |
|       }
 | |
|       class Callback : public td::StreamToFileActor::Callback {
 | |
|        public:
 | |
|         Callback(td::actor::ActorShared<> parent) : parent_(std::move(parent)) {
 | |
|         }
 | |
|         void on_sync_state_changed() override {
 | |
|           td::actor::send_signals_later(parent_, td::actor::ActorSignals::wakeup());
 | |
|         }
 | |
| 
 | |
|        private:
 | |
|         td::actor::ActorShared<> parent_;
 | |
|       };
 | |
| 
 | |
|       void start_up() override {
 | |
|         auto buffer_reader_writer = td::ChainBuffer::create();
 | |
|         buffer_writer_ = std::move(buffer_reader_writer.second);
 | |
|         auto buffer_reader = std::move(buffer_reader_writer.first);
 | |
| 
 | |
|         auto sync_state_reader_writer = td::FileSyncState::create();
 | |
|         fd_sync_state_ = std::move(sync_state_reader_writer.first);
 | |
|         auto sync_state_writer = std::move(sync_state_reader_writer.second);
 | |
|         auto options = td::StreamToFileActor::Options{};
 | |
|         writer_ = td::actor::create_actor<td::StreamToFileActor>(td::actor::ActorOptions().with_name("FileWriterActor"),
 | |
|                                                                  std::move(buffer_reader), std::move(fd_),
 | |
|                                                                  std::move(sync_state_writer), options);
 | |
|         send_closure(writer_, &td::StreamToFileActor::set_callback, td::make_unique<Callback>(actor_shared(this)));
 | |
|         loop();
 | |
|       }
 | |
| 
 | |
|      private:
 | |
|       td::FileFd fd_;
 | |
|       td::optional<td::ChainBuffer::Writer> buffer_writer_;
 | |
|       td::optional<td::FileSyncState::Reader> fd_sync_state_;
 | |
|       td::actor::ActorOwn<td::StreamToFileActor> writer_;
 | |
|       size_t buffer_size_;
 | |
|       DataGenerator generator_;
 | |
|       size_t total_size_{0};
 | |
|       bool was_sync_{false};
 | |
| 
 | |
|       void loop() override {
 | |
|         auto flushed_size = fd_sync_state_.value().flushed_size();
 | |
|         while (generator_ && total_size_ < flushed_size + buffer_size_ * 10) {
 | |
|           auto str = generator_.next();
 | |
|           total_size_ += str.size();
 | |
|           buffer_writer_.value().append(str);
 | |
|         }
 | |
|         td::actor::send_signals_later(writer_, td::actor::ActorSignals::wakeup());
 | |
|         if (generator_) {
 | |
|           return;
 | |
|         } else if (!was_sync_) {
 | |
|           was_sync_ = true;
 | |
|           fd_sync_state_.value().set_requested_sync_size(total_size_);
 | |
|           td::actor::send_signals_later(writer_, td::actor::ActorSignals::wakeup());
 | |
|         }
 | |
|         if (fd_sync_state_.value().synced_size() == total_size_) {
 | |
|           writer_.reset();
 | |
|         }
 | |
|       }
 | |
|       void hangup_shared() override {
 | |
|         td::actor::SchedulerContext::get()->stop();
 | |
|         stop();
 | |
|       }
 | |
|     };
 | |
|     td::actor::create_actor<Writer>("Writer", std::move(fd), buffer_size).release();
 | |
|   });
 | |
|   scheduler.run();
 | |
| }
 | |
| 
 | |
| void write_async2(td::CSlice path, size_t buffer_size) {
 | |
|   LOG(ERROR) << "Async2";
 | |
|   auto fd = td::FileFd::open(path, td::FileFd::Flags::Create | td::FileFd::Flags::Truncate | td::FileFd::Flags::Write)
 | |
|                 .move_as_ok();
 | |
|   td::actor::Scheduler scheduler({1});
 | |
|   scheduler.run_in_context([&] {
 | |
|     class Worker : public td::actor::Actor {
 | |
|      public:
 | |
|       Worker(td::FileFd fd, td::ChainBufferReader reader, td::actor::ActorShared<> parent)
 | |
|           : fd_(std::move(fd)), reader_(std::move(reader)), parent_(std::move(parent)) {
 | |
|       }
 | |
| 
 | |
|      private:
 | |
|       td::FileFd fd_;
 | |
|       td::ChainBufferReader reader_;
 | |
|       td::actor::ActorShared<> parent_;
 | |
|       void loop() override {
 | |
|         reader_.sync_with_writer();
 | |
|         while (!reader_.empty()) {
 | |
|           auto slice = reader_.prepare_read();
 | |
|           fd_.write(slice).ensure();
 | |
|           reader_.confirm_read(slice.size());
 | |
|         }
 | |
|       }
 | |
|       void hangup() override {
 | |
|         loop();
 | |
|         fd_.sync().ensure();
 | |
|         stop();
 | |
|       }
 | |
|     };
 | |
|     class Writer : public td::actor::Actor {
 | |
|      public:
 | |
|       Writer(td::FileFd fd) : fd_(std::move(fd)) {
 | |
|       }
 | |
| 
 | |
|      private:
 | |
|       td::FileFd fd_;
 | |
|       td::actor::ActorOwn<> worker_;
 | |
|       td::ChainBufferWriter writer_;
 | |
|       DataGenerator generator_;
 | |
| 
 | |
|       void start_up() override {
 | |
|         worker_ =
 | |
|             td::actor::create_actor<Worker>("Worker", std::move(fd_), writer_.extract_reader(), actor_shared(this));
 | |
|         while (generator_) {
 | |
|           writer_.append(generator_.next(), 65536);
 | |
|           send_signals_later(worker_, td::actor::ActorSignals::wakeup());
 | |
|         }
 | |
|         worker_.reset();
 | |
|       }
 | |
|       void hangup_shared() override {
 | |
|         td::actor::SchedulerContext::get()->stop();
 | |
|         stop();
 | |
|       }
 | |
|     };
 | |
|     td::actor::create_actor<Writer>(td::actor::ActorOptions().with_name("Writer").with_poll(), std::move(fd)).release();
 | |
|   });
 | |
|   scheduler.run();
 | |
| }
 | |
| 
 | |
| int main(int argc, char **argv) {
 | |
|   std::string from;
 | |
|   enum Type { Read, Write };
 | |
|   Type type{Write};
 | |
|   enum Mode { Baseline, Buffered, Direct, Async, WriteV, Async2 };
 | |
|   Mode mode = Baseline;
 | |
|   size_t buffer_size = 1024;
 | |
| 
 | |
|   td::OptionsParser options_parser;
 | |
|   options_parser.add_option('f', td::Slice("from"), td::Slice("read from file"), [&](td::Slice arg) -> td::Status {
 | |
|     from = arg.str();
 | |
|     return td::Status::OK();
 | |
|   });
 | |
|   options_parser.add_option('m', td::Slice("mode"), td::Slice("mode"), [&](td::Slice arg) -> td::Status {
 | |
|     TRY_RESULT(x, td::to_integer_safe<int>(arg));
 | |
|     switch (x) {
 | |
|       case 0:
 | |
|         mode = Baseline;
 | |
|         return td::Status::OK();
 | |
|       case 1:
 | |
|         mode = Buffered;
 | |
|         return td::Status::OK();
 | |
|       case 2:
 | |
|         mode = Direct;
 | |
|         return td::Status::OK();
 | |
|       case 3:
 | |
|         mode = Async;
 | |
|         return td::Status::OK();
 | |
|       case 4:
 | |
|         mode = WriteV;
 | |
|         return td::Status::OK();
 | |
|       case 5:
 | |
|         mode = Async2;
 | |
|         return td::Status::OK();
 | |
|     }
 | |
|     return td::Status::Error("unknown mode");
 | |
|   });
 | |
|   options_parser.add_option('b', td::Slice("buffer"), td::Slice("buffer size"), [&](td::Slice arg) -> td::Status {
 | |
|     TRY_RESULT(x, td::to_integer_safe<size_t>(arg));
 | |
|     buffer_size = x;
 | |
|     return td::Status::OK();
 | |
|   });
 | |
| 
 | |
|   auto status = options_parser.run(argc, argv);
 | |
|   if (status.is_error()) {
 | |
|     LOG(ERROR) << status.error() << "\n" << options_parser;
 | |
|     return 0;
 | |
|   }
 | |
| 
 | |
|   switch (type) {
 | |
|     case Read:
 | |
|       switch (mode) {
 | |
|         case Baseline:
 | |
|           read_baseline(from);
 | |
|           break;
 | |
|         case Buffered:
 | |
|           read_buffered(from, buffer_size);
 | |
|           break;
 | |
|         case Direct:
 | |
|           read_o_direct(from, buffer_size);
 | |
|           break;
 | |
|         case Async:
 | |
|           read_async(from, buffer_size);
 | |
|           break;
 | |
|         case Async2:
 | |
|         case WriteV:
 | |
|           LOG(FATAL) << "Not supported mode for Read test";
 | |
|       }
 | |
|       break;
 | |
|     case Write:
 | |
|       switch (mode) {
 | |
|         case Baseline:
 | |
|           write_baseline(from, buffer_size);
 | |
|           break;
 | |
|         case Buffered:
 | |
|           write_buffered(from, buffer_size);
 | |
|           break;
 | |
|         case WriteV:
 | |
|           write_vector(from, buffer_size);
 | |
|           break;
 | |
|         case Async:
 | |
|           write_async(from, buffer_size);
 | |
|           break;
 | |
|         case Async2:
 | |
|           write_async2(from, buffer_size);
 | |
|           break;
 | |
|         case Direct:
 | |
|           LOG(FATAL) << "Unimplemented";
 | |
|       }
 | |
|   }
 | |
| 
 | |
|   return 0;
 | |
| }
 |