/*
    This file is part of TON Blockchain Library.
    TON Blockchain Library is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 2 of the License, or
    (at your option) any later version.
    TON Blockchain Library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.
    You should have received a copy of the GNU Lesser General Public License
    along with TON Blockchain Library.  If not, see .
    Copyright 2017-2020 Telegram Systems LLP
*/
#include "ChainBuffer.h"
#include "td/utils/buffer.h"
#include "td/db/utils/StreamInterface.h"
namespace td {
namespace detail {
class ChainBuffer : public StreamWriterInterface, public StreamReaderInterface {
 public:
  using Options = ::td::ChainBuffer::Options;
  ChainBuffer(Options options) {
    shared_.options_ = options;
    reader_.io_slices_.reserve(options.max_io_slices);
    reader_.buf_ = writer_.buf_.extract_reader();
  }
  // StreamReaderInterface
  size_t reader_size() override {
    reader_.buf_.sync_with_writer();
    return reader_.buf_.size();
  }
  Slice prepare_read() override {
    return reader_.buf_.prepare_read();
  }
  Span prepare_readv() override {
    reader_.io_slices_.clear();
    auto it = reader_.buf_.clone();
    while (!it.empty() && reader_.io_slices_.size() < reader_.io_slices_.capacity()) {
      auto slice = it.prepare_read();
      reader_.io_slices_.push_back(as_io_slice(slice));
      it.confirm_read(slice.size());
    }
    return reader_.io_slices_;
  }
  void confirm_read(size_t size) override {
    reader_.buf_.advance(size);
  }
  void close_reader(Status error) override {
    CHECK(!reader_.is_closed_);
    reader_.status_ = std::move(error);
    reader_.is_closed_.store(true, std::memory_order_release);
  }
  bool is_writer_closed() const override {
    return writer_.is_closed_.load(std::memory_order_acquire);
  }
  Status &writer_status() override {
    CHECK(is_writer_closed());
    return writer_.status_;
  }
  // StreamWriterInterface
  size_t writer_size() override {
    return writer_.size_;
  }
  MutableSlice prepare_write() override {
    return writer_.buf_.prepare_append(shared_.options_.chunk_size);
  }
  MutableSlice prepare_write_at_least(size_t size) override {
    return writer_.buf_.prepare_append_at_least(size);
  }
  void confirm_write(size_t size) override {
    writer_.buf_.confirm_append(size);
    writer_.size_ += size;
  }
  void append(Slice data) override {
    writer_.buf_.append(data, shared_.options_.chunk_size);
    writer_.size_ += data.size();
  }
  void append(BufferSlice data) override {
    writer_.size_ += data.size();
    writer_.buf_.append(std::move(data));
  }
  void append(std::string data) override {
    append(Slice(data));
  }
  void close_writer(Status error) override {
    CHECK(!writer_.is_closed_);
    writer_.status_ = std::move(error);
    writer_.is_closed_.store(true, std::memory_order_release);
  }
  bool is_reader_closed() const override {
    return reader_.is_closed_.load(std::memory_order_acquire);
  }
  Status &reader_status() override {
    CHECK(is_reader_closed());
    return reader_.status_;
  }
 private:
  struct SharedData {
    Options options_;
  } shared_;
  char pad1[128];
  struct ReaderData {
    ChainBufferReader buf_;
    std::atomic is_closed_{false};
    Status status_;
    std::vector io_slices_;
  } reader_;
  char pad2[128];
  struct WriterData {
    ChainBufferWriter buf_;
    std::atomic is_closed_{false};
    Status status_;
    size_t size_{0};
  } writer_;
};
}  // namespace detail
std::pair ChainBuffer::create(Options options) {
  auto impl = std::make_shared(options);
  return {Reader(impl), Writer(impl)};
}
}  // namespace td