/*
    This file is part of TON Blockchain Library.
    TON Blockchain Library is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 2 of the License, or
    (at your option) any later version.
    TON Blockchain Library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.
    You should have received a copy of the GNU Lesser General Public License
    along with TON Blockchain Library.  If not, see .
    Copyright 2017-2020 Telegram Systems LLP
*/
#include "crypto/block/Binlog.h"
#include "td/utils/as.h"
#include "td/utils/misc.h"
#include "td/utils/port/path.h"
#include 
namespace block {
/*
 * 
 *    GENERIC BINLOG (move to separate file)
 * 
 */
BinlogBuffer::BinlogBuffer(std::unique_ptr cb, std::size_t _max_size, td::FileFd fd)
    : cb(std::move(cb))
    , need_more_bytes(0)
    , eptr(nullptr)
    , log_rpos(0)
    , log_cpos(0)
    , log_wpos(0)
    , fd(std::move(fd))
    , replica(false)
    , writing(false)
    , dirty(false)
    , created(false)
    , ok(false) {
  max_size = _max_size;
  start = static_cast(std::malloc(max_size));
  DCHECK(start);
  rptr = wptr = cptr = start;
  end = start + max_size;
}
unsigned char* BinlogBuffer::alloc_log_event_force(std::size_t size) {
  unsigned char* res = alloc_log_event(size);
  if (!res) {
    throw LevAllocError{size};
  }
  return res;
}
unsigned char* BinlogBuffer::try_alloc_log_event(std::size_t size) {
  if (!eptr) {
    if (end - wptr >= (long)size) {
      unsigned char* res = wptr;
      wptr += size;
      log_wpos += size;
      return res;
    }
    eptr = wptr;
    wptr = start;
    if (rptr == eptr) {
      rptr = start;
    }
    if (cptr == eptr) {
      cptr = start;
    }
  }
  if (rptr - wptr > (long)size) {
    unsigned char* res = wptr;
    wptr += size;
    log_wpos += size;
    return res;
  }
  return nullptr;
}
bool BinlogBuffer::flush(int mode) {
  auto r_res = try_flush(mode);
  if (r_res.is_ok()) {
    return r_res.ok();
  }
  std::string msg = PSTRING() << "cannot flush binlog file " << binlog_name << " at position " << log_rpos << " "
                              << r_res.error();
  LOG(ERROR) << msg;
  throw BinlogError{msg};
}
td::Result BinlogBuffer::try_flush(int mode) {
  LOG(DEBUG) << "in flush: writing=" << writing << " r=" << rptr - start << " c=" << cptr - start
             << " w=" << wptr - start << "; rp=" << log_rpos << " cp=" << log_cpos << " wp=" << log_wpos;
  if (!writing || rptr == cptr) {
    return false;  // nothing to flush
  }
  DCHECK(!fd.empty());  // must have an open binlog file
  while (rptr != cptr) {
    unsigned char* tptr = (cptr >= rptr ? cptr : eptr);
    DCHECK(rptr <= tptr);
    auto sz = tptr - rptr;
    if (sz) {
      LOG(INFO) << "writing " << sz << " bytes to binlog " << binlog_name << " at position " << log_rpos;
      TRY_RESULT(res, fd.pwrite(td::Slice(rptr, sz), log_rpos));
      if (static_cast(res) != sz) {
        return td::Status::Error(PSLICE() << "written " << res << " bytes instead of " << sz);
      }
      log_rpos += sz;
      rptr += sz;
    }
    if (rptr == eptr) {
      rptr = start;
      eptr = nullptr;
    }
  }
  if (mode >= 3) {
    LOG(INFO) << "syncing binlog " << binlog_name << " (position " << log_rpos << ")";
    TRY_STATUS(fd.sync());
  }
  return true;
}
unsigned char* BinlogBuffer::alloc_log_event(std::size_t size) {
  if (!writing) {
    throw BinlogError{"cannot create new binlog event: binlog not open for writing"};
  }
  if (size >= max_size || size > max_event_size) {
    return nullptr;
  }
  size = (size + 3) & -4;
  unsigned char* res = try_alloc_log_event(size);
  if (!res) {
    flush();
    return try_alloc_log_event(size);
  } else {
    return res;
  }
}
bool BinlogBuffer::commit_range(unsigned long long pos_start, unsigned long long pos_end) {
  // TODO: make something more clever, with partially committed/uncommitted segments in [cpos..wpos] range
  if (pos_start != log_cpos || pos_end < pos_start || pos_end > log_wpos) {
    return false;
  }
  if (!pos_start && pos_end >= pos_start + 4 && td::as(cptr) != 0x0442446b) {
    throw BinlogError{"incorrect magic"};
  }
  long long size = pos_end - pos_start;
  replay_range(cptr, pos_start, pos_end);
  log_cpos = pos_end;
  cptr += size;
  if (eptr && cptr >= eptr) {
    cptr -= eptr - start;
  }
  return true;
}
bool BinlogBuffer::rollback_range(unsigned long long pos_start, unsigned long long pos_end) {
  if (pos_start < log_cpos || pos_end < pos_start || pos_end != log_wpos) {
    return false;
  }
  long long size = pos_end - pos_start;
  log_wpos = pos_end;
  if (size >= wptr - start) {
    wptr -= size;
  } else {
    DCHECK(eptr);
    wptr += eptr - start - size;
  }
  return true;
}
void BinlogBuffer::NewBinlogEvent::commit() {
  //LOG(DEBUG) << "in NewBinlogEvent::commit (status = " << status << ")";
  if (!(status & 4)) {
    throw BinlogError{"cannot commit new binlog event: already committed or rolled back"};
  }
  if (!bb.commit_range(pos, pos + size)) {
    throw BinlogError{"cannot commit new binlog event: possibly some earlier log events are not committed yet"};
  }
  status = 1;
  //LOG(DEBUG) << "after NewBinlogEvent::commit (status = " << status << ")";
}
void BinlogBuffer::NewBinlogEvent::rollback() {
  if (!(status & 4)) {
    throw BinlogError{"cannot roll back new binlog event: already committed or rolled back"};
  }
  if (!bb.rollback_range(pos, pos + size)) {
    throw BinlogError{"cannot roll back new binlog event: possibly some later log event are already committed"};
  }
  status = 2;
}
BinlogBuffer::NewBinlogEvent::~NewBinlogEvent() {
  if (status & 4) {
    if (status == 5) {
      status = 4;
      commit();
    } else if (status == 6) {
      status = 4;
      rollback();
    } else {
      LOG(ERROR) << "newly-allocated binlog event is neither committed nor rolled back (automatically rolling back)";
      rollback();
    }
  }
}
void BinlogBuffer::replay_range(unsigned char* ptr, unsigned long long pos_start, unsigned long long pos_end) {
  unsigned char* tptr = (ptr <= wptr ? wptr : eptr);
  long long avail = tptr - ptr;
  while (pos_start < pos_end) {
    if (ptr == eptr) {
      ptr = start;
      tptr = wptr;
      avail = tptr - ptr;
      if (avail > (long long)(pos_end - pos_start)) {
        avail = pos_end - pos_start;
      }
    }
    int res = (avail >= 4 ? cb->replay_log_event(*this, reinterpret_cast(ptr),
                                                 td::narrow_cast(avail), pos_start)
                          : -0x7ffffffc);
    if (res <= 0 || res > avail) {
      std::ostringstream ss;
      ss << "cannot interpret newly-committed binlog event 0x" << std::hex
         << (avail >= 4 ? (unsigned)td::as(ptr) : 0u) << std::dec << ": error " << res;
      throw BinlogError{ss.str()};
    }
    ptr += res;
    pos_start += res;
    avail -= res;
  }
}
int BinlogBuffer::replay_pending(bool allow_partial) {
  if (rptr == cptr) {
    return 0;
  }
  unsigned char* tptr = (rptr <= cptr ? cptr : eptr);
  long long avail = tptr - rptr;
  DCHECK(tptr && avail >= 0);
  while (rptr != cptr) {
    int res = (avail >= 4 ? cb->replay_log_event(*this, reinterpret_cast(rptr),
                                                 td::narrow_cast(avail), log_rpos)
                          : -0x7ffffffc);
    if (res > 0) {
      if (res > avail) {
        throw BinlogError{"binlog event used more bytes than available"};
      }
      avail -= res;
      log_rpos += res;
      rptr += res;
      if (rptr != eptr) {
        continue;
      }
      rptr = start;
      tptr = cptr;
      avail = tptr - rptr;
      continue;
    }
    long long prev_need = 0;
    while (res < -0x40000000) {
      long long need = res - 0x80000000;
      need = (need + 3) & -4;
      if (need > (long long)max_event_size) {
        throw BinlogError{"binlog event requires too many bytes"};
      }
      if (need <= avail) {
        throw BinlogError{"binlog event requires more bytes, but we already had them"};
      }
      if (need <= prev_need) {
        throw BinlogError{"binlog event requires more bytes, but we already had them"};
      }
      prev_need = need;
      long long total_avail = avail + (rptr > cptr ? cptr - start : 0);
      if (need > total_avail) {
        if (allow_partial) {
          need_more_bytes = td::narrow_cast(need - total_avail);
          return 2;
        } else {
          throw BinlogError{"binlog event extends past end of buffer"};
        }
      }
      if (need <= 1024) {
        unsigned char tmp[1024];
        std::memcpy(tmp, rptr, td::narrow_cast(avail));
        std::memcpy(tmp + avail, start, td::narrow_cast(need - avail));
        res = cb->replay_log_event(*this, reinterpret_cast(tmp), td::narrow_cast(need),
                                   log_rpos);
      } else {
        unsigned char* tmp = static_cast(std::malloc(td::narrow_cast(need)));
        std::memcpy(tmp, rptr, td::narrow_cast(avail));
        std::memcpy(tmp + avail, start, td::narrow_cast(need - avail));
        res = cb->replay_log_event(*this, reinterpret_cast(tmp), td::narrow_cast(need),
                                   log_rpos);
        std::free(tmp);
      }
      if (res > need) {
        throw BinlogError{"binlog event used more bytes than available"};
      }
    }
    if (res < 0) {
      return res;
    }
    if (!res) {
      throw BinlogError{"unknown error while interpreting binlog event"};
    }
    if (res < avail) {
      avail -= res;
      log_rpos += res;
      rptr += res;
      continue;
    }
    DCHECK(eptr);
    log_rpos += res;
    rptr += res;
    rptr = start + (rptr - eptr);
    eptr = nullptr;
    DCHECK(start <= rptr && rptr <= cptr && cptr <= wptr && wptr <= end);
  }
  return 1;
}
BinlogBuffer::~BinlogBuffer() {
  if (start) {
    if (writing) {
      flush(2);
    }
    std::free(start);
  }
}
td::Status BinlogBuffer::set_binlog(std::string new_binlog_name, int mode) {
  if (!binlog_name.empty() || !fd.empty()) {
    return td::Status::Error("binlog buffer already attached to a file");
  }
  td::int32 flags = td::FileFd::Read;
  if ((mode & 1) != 0) {
    flags |= td::FileFd::Write;
  }
  auto r_fd = td::FileFd::open(new_binlog_name, flags, 0640);
  if (r_fd.is_error()) {
    if (!(~mode & 3)) {
      TRY_RESULT(new_fd, td::FileFd::open(new_binlog_name, flags | td::FileFd::CreateNew, 0640));
      fd = std::move(new_fd);
      created = true;
    } else {
      return r_fd.move_as_error();
    }
  } else {
    fd = r_fd.move_as_ok();
  }
  replica = !(mode & 1);
  if (!replica) {
    TRY_STATUS(fd.lock(td::FileFd::LockFlags::Write, new_binlog_name, 100));
  }
  if (created) {
    writing = true;
    td::Status res;
    try {
      res = cb->init_new_binlog(*this);
    } catch (BinlogBuffer::BinlogError& err) {
      res = td::Status::Error(err.msg);
    }
    if (res.is_error()) {
      fd.close();
      td::unlink(new_binlog_name).ignore();
      writing = false;
      return res;
    }
    binlog_name = new_binlog_name;
    ok = true;
    return td::Status::OK();
  }
  binlog_name = new_binlog_name;
  auto res = replay_binlog(replica);
  if (res.is_error()) {
    return res.move_as_error();
  }
  if (!replica) {
    if (log_rpos != log_wpos || log_rpos != log_cpos || rptr != wptr || rptr != cptr) {
      std::string msg = (PSLICE() << "error while interpreting binlog `" << binlog_name << "`: " << log_wpos - log_rpos
                                  << " bytes left uninterpreted at position " << log_rpos << ", truncated binlog?")
                            .c_str();
      LOG(ERROR) << msg;
      return td::Status::Error(msg);
    }
    //rptr = wptr = cptr = start;
    //eptr = nullptr;
    LOG(INFO) << "read and interpreted " << res.move_as_ok() << " bytes from binlog `" << binlog_name
              << "`, final position " << log_rpos << ", reopening in write mode";
    writing = true;
    if (!log_rpos) {
      td::Status status;
      try {
        status = cb->init_new_binlog(*this);
      } catch (BinlogBuffer::BinlogError& err) {
        status = td::Status::Error(err.msg);
      }
      if (status.is_error()) {
        fd.close();
        td::unlink(new_binlog_name).ignore();
        writing = false;
        return status;
      }
    }
  }
  ok = true;
  return td::Status::OK();
}
td::Result BinlogBuffer::replay_binlog(bool allow_partial) {
  if (writing) {
    return 0;
  }
  long long total = 0;
  while (true) {
    auto res = read_file();
    if (res.is_error()) {
      return res.move_as_error();
    }
    long long sz = res.move_as_ok();
    total += sz;
    try {
      cptr = wptr;
      log_cpos = log_wpos;
      if (!log_rpos && rptr == start && wptr >= rptr + 4 && td::as(rptr) != 0x0442446b) {
        throw BinlogError{"incorrect magic"};
      }
      int r = replay_pending(allow_partial || sz != 0);
      if (r < 0 && r >= -0x40000000) {
        throw InterpretError{(PSLICE() << "binlog error " << r).c_str()};
      }
    } catch (BinlogError err) {
      LOG(ERROR) << "error reading binlog " << binlog_name << ": " << err.msg << " at position " << log_rpos;
      return td::Status::Error(PSLICE() << "error reading binlog " << binlog_name << ": " << err.msg << " at position "
                                        << log_rpos);
    } catch (InterpretError err) {
      LOG(ERROR) << "error interpreting binlog " << binlog_name << ": " << err.msg << " at position " << log_rpos;
      return td::Status::Error(PSLICE() << "error interpreting binlog " << binlog_name << ": " << err.msg
                                        << " at position " << log_rpos);
    }
    if (!sz) {
      break;
    }
  };
  return total;
}
td::Result BinlogBuffer::read_file() {
  unsigned char* ptr = wptr;
  std::size_t sz = end - wptr;
  if (rptr > wptr) {
    DCHECK(eptr);
    sz = rptr - wptr;
    if (sz <= 4) {
      return 0;  // buffer full
    }
    sz -= 4;
  } else if (!sz) {
    DCHECK(!eptr);
    if (rptr <= start + 4) {
      return 0;  // buffer full
    }
    eptr = end;
    ptr = wptr = start;
    sz = rptr - start - 4;
  }
  auto r_res = fd.pread(td::MutableSlice(ptr, sz), log_wpos);
  if (r_res.is_error()) {
    std::string msg = PSTRING() << "error reading binlog file `" << binlog_name << "` at position " << log_wpos << " : "
                                << r_res.error();
    LOG(ERROR) << msg;
    return td::Status::Error(msg);
  }
  auto res = r_res.move_as_ok();
  DCHECK(std::size_t(res) <= sz);
  LOG(INFO) << "read " << res << " bytes from binlog `" << binlog_name << "` at position " << log_wpos;
  log_wpos += res;
  wptr += res;
  return (int)res;
}
}  // namespace block