/*
    This file is part of TON Blockchain Library.
    TON Blockchain Library is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 2 of the License, or
    (at your option) any later version.
    TON Blockchain Library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.
    You should have received a copy of the GNU Lesser General Public License
    along with TON Blockchain Library.  If not, see .
    Copyright 2017-2020 Telegram Systems LLP
*/
#include "Binlog.h"
#include "BinlogReaderHelper.h"
#include "td/db/utils/StreamInterface.h"
#include "td/db/utils/ChainBuffer.h"
#include "td/db/utils/CyclicBuffer.h"
#include "td/db/utils/FileSyncState.h"
#include "td/db/utils/StreamToFileActor.h"
#include "td/db/utils/FileToStreamActor.h"
#include "td/actor/actor.h"
#include "td/utils/misc.h"
#include "td/utils/port/path.h"
#include "td/utils/VectorQueue.h"
namespace td {
namespace {
class BinlogReplayActor : public actor::Actor {
 public:
  BinlogReplayActor(StreamReader stream_reader, actor::ActorOwn file_to_stream,
                    std::shared_ptr binlog_reader, Promise promise)
      : stream_reader_(std::move(stream_reader))
      , file_to_stream_(std::move(file_to_stream))
      , binlog_reader_(std::move(binlog_reader))
      , promise_(std::move(promise)) {
  }
 private:
  StreamReader stream_reader_;
  actor::ActorOwn file_to_stream_;
  std::shared_ptr binlog_reader_;
  Promise promise_;
  bool is_writer_closed_{false};
  BinlogReaderHelper binlog_reader_helper_;
  unique_ptr create_callback() {
    class Callback : public FileToStreamActor::Callback {
     public:
      Callback(actor::ActorShared<> actor) : actor_(std::move(actor)) {
      }
      void got_more() override {
        send_signals_later(actor_, actor::ActorSignals::wakeup());
      }
     private:
      actor::ActorShared<> actor_;
    };
    return make_unique(actor_shared(this));
  }
  void start_up() override {
    send_closure_later(file_to_stream_, &FileToStreamActor::set_callback, create_callback());
  }
  void notify_writer() {
    send_signals_later(file_to_stream_, actor::ActorSignals::wakeup());
  }
  void loop() override {
    auto status = do_loop();
    if (status.is_error()) {
      stream_reader_.close_reader(status.clone());
      promise_.set_error(std::move(status));
      return stop();
    }
    if (is_writer_closed_) {
      stream_reader_.close_reader(Status::OK());
      promise_.set_value(Unit());
      return stop();
    }
  }
  Status do_loop() {
    is_writer_closed_ = stream_reader_.is_writer_closed();
    if (is_writer_closed_) {
      TRY_STATUS(std::move(stream_reader_.writer_status()));
    }
    // TODO: watermark want_more/got_more logic
    int64 got_size = stream_reader_.reader_size();
    while (got_size > 0) {
      auto slice = stream_reader_.prepare_read();
      TRY_STATUS(binlog_reader_helper_.parse(*binlog_reader_, slice));
      stream_reader_.confirm_read(slice.size());
      got_size -= slice.size();
    }
    notify_writer();
    if (is_writer_closed_) {
      if (binlog_reader_helper_.unparsed_size() != 0) {
        return Status::Error(PSLICE() << "Got " << binlog_reader_helper_.unparsed_size()
                                      << " unparsed bytes in binlog");
      }
    }
    return Status::OK();
  }
};
}  // namespace
Binlog::Binlog(string path) : path_(std::move(path)) {
}
Status Binlog::replay_sync(BinlogReaderInterface& binlog_reader) {
  TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Read));
  // No need to use Cyclic buffer, but CyclicBuffer is important for async version
  CyclicBuffer::Options options;
  options.chunk_size = 256;
  options.count = 1;
  auto reader_writer = CyclicBuffer::create(options);
  auto buf_reader = std::move(reader_writer.first);
  auto buf_writer = std::move(reader_writer.second);
  TRY_RESULT(fd_size, fd.get_size());
  BinlogReaderHelper helper;
  while (fd_size != 0) {
    auto read_to = buf_writer.prepare_write();
    if (static_cast(read_to.size()) > fd_size) {
      read_to.truncate(narrow_cast(fd_size));
    }
    TRY_RESULT(read, fd.read(read_to));
    if (read == 0) {
      return Status::Error("Unexpected end of file");
    }
    fd_size -= read;
    buf_writer.confirm_write(read);
    auto data = buf_reader.prepare_read();
    CHECK(data.size() == read);
    TRY_STATUS(helper.parse(binlog_reader, data));
    buf_reader.confirm_read(data.size());
  }
  if (helper.unparsed_size() != 0) {
    return Status::Error(PSLICE() << "Got " << helper.unparsed_size() << " unparsed bytes in binlog");
  }
  //TODO: check crc32
  //TODO: allow binlog truncate
  return Status::OK();
}
void Binlog::replay_async(std::shared_ptr binlog_reader, Promise promise) {
  auto r_fd = FileFd::open(path_, FileFd::Flags::Read);
  if (r_fd.is_error()) {
    promise.set_error(r_fd.move_as_error());
    return;
  }
  auto fd = r_fd.move_as_ok();
  CyclicBuffer::Options buf_options;
  buf_options.chunk_size = 256;
  auto reader_writer = CyclicBuffer::create(buf_options);
  auto buf_reader = std::move(reader_writer.first);
  auto buf_writer = std::move(reader_writer.second);
  auto r_fd_size = fd.get_size();
  if (r_fd_size.is_error()) {
    promise.set_error(r_fd_size.move_as_error());
  }
  auto options = FileToStreamActor::Options{};
  options.limit = r_fd_size.move_as_ok();
  auto file_to_stream =
      actor::create_actor("FileToStream", std::move(fd), std::move(buf_writer), options);
  auto stream_to_binlog = actor::create_actor(
      "BinlogReplay", std::move(buf_reader), std::move(file_to_stream), std::move(binlog_reader), std::move(promise));
  stream_to_binlog.release();
}
void Binlog::destroy(CSlice path) {
  td::unlink(path).ignore();
}
void Binlog::destroy() {
  destroy(path_);
}
BinlogWriter::BinlogWriter(std::string path) : path_(std::move(path)) {
}
Status BinlogWriter::open() {
  TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Write | FileFd::Flags::Append | FileFd::Create));
  fd_ = std::move(fd);
  ChainBuffer::Options buf_options;
  buf_options.max_io_slices = 128;
  buf_options.chunk_size = 256;
  auto reader_writer = ChainBuffer::create(buf_options);
  buf_reader_ = std::move(reader_writer.first);
  buf_writer_ = std::move(reader_writer.second);
  return Status::OK();
}
Status BinlogWriter::lazy_flush() {
  if (buf_reader_.reader_size() < 512) {
    return Status::OK();
  }
  return flush();
}
Status BinlogWriter::flush() {
  while (buf_reader_.reader_size() != 0) {
    TRY_RESULT(written, fd_.writev(buf_reader_.prepare_readv()));
    buf_reader_.confirm_read(written);
  }
  return Status::OK();
}
Status BinlogWriter::sync() {
  flush();
  return fd_.sync();
}
Status BinlogWriter::close() {
  sync();
  fd_.close();
  return Status::OK();
}
namespace detail {
class FlushHelperActor : public actor::Actor {
 public:
  FlushHelperActor(FileSyncState::Reader sync_state_reader, actor::ActorOwn actor)
      : sync_state_reader_(std::move(sync_state_reader)), actor_(std::move(actor)) {
  }
  void flush() {
    //TODO;
  }
  void sync(size_t position, Promise promise) {
    sync_state_reader_.set_requested_sync_size(position);
    if (promise) {
      queries_.emplace(position, std::move(promise));
    }
    send_signals_later(actor_, actor::ActorSignals::wakeup());
  }
  void close(Promise<> promise) {
    close_promise_ = std::move(promise);
    actor_.reset();
  }
 private:
  FileSyncState::Reader sync_state_reader_;
  actor::ActorOwn actor_;
  Promise<> close_promise_;
  struct Query {
    Query(size_t position, Promise promise) : position(position), promise(std::move(promise)) {
    }
    size_t position;
    Promise promise;
  };
  VectorQueue queries_;
  unique_ptr create_callback() {
    class Callback : public StreamToFileActor::Callback {
     public:
      Callback(actor::ActorShared<> actor) : actor_(std::move(actor)) {
      }
      void on_sync_state_changed() override {
        send_signals_later(actor_, actor::ActorSignals::wakeup());
      }
     private:
      actor::ActorShared<> actor_;
    };
    return make_unique(actor_shared(this));
  }
  void start_up() override {
    send_closure_later(actor_, &StreamToFileActor::set_callback, create_callback());
  }
  void loop() override {
    auto synced_position = sync_state_reader_.synced_size();
    while (!queries_.empty() && queries_.front().position <= synced_position) {
      queries_.front().promise.set_value(Unit());
      queries_.pop();
    }
  }
  void hangup_shared() override {
    stop();
  }
  void tear_down() override {
    if (close_promise_) {
      close_promise_.set_value(Unit());
    }
  }
};
}  // namespace detail
BinlogWriterAsync::BinlogWriterAsync(std::string path) : path_(std::move(path)) {
}
BinlogWriterAsync::~BinlogWriterAsync() = default;
Status BinlogWriterAsync::open() {
  TRY_RESULT(fd, FileFd::open(path_, FileFd::Flags::Write | FileFd::Flags::Append | FileFd::Create));
  ChainBuffer::Options buf_options;
  buf_options.max_io_slices = 128;
  buf_options.chunk_size = 256;
  auto reader_writer = ChainBuffer::create(buf_options);
  buf_writer_ = std::move(reader_writer.second);
  auto sync_state_reader_writer = td::FileSyncState::create();
  auto writer_actor = actor::create_actor("StreamToFile", std::move(reader_writer.first),
                                                             std::move(fd), std::move(sync_state_reader_writer.second));
  writer_actor_ = writer_actor.get();
  sync_state_reader_ = std::move(sync_state_reader_writer.first);
  flush_helper_actor_ =
      actor::create_actor("FlushHelperActor", sync_state_reader_, std::move(writer_actor));
  return Status::OK();
}
void BinlogWriterAsync::close(Promise<> promise) {
  send_closure(std::move(flush_helper_actor_), &detail::FlushHelperActor::close, std::move(promise));
  writer_actor_ = {};
}
void BinlogWriterAsync::lazy_flush() {
  send_signals_later(writer_actor_, actor::ActorSignals::wakeup());
}
void BinlogWriterAsync::flush() {
  send_closure(flush_helper_actor_, &detail::FlushHelperActor::flush);
}
void BinlogWriterAsync::sync(Promise promise) {
  send_closure(flush_helper_actor_, &detail::FlushHelperActor::sync, buf_writer_.writer_size(), std::move(promise));
}
}  // namespace td