/*
    This file is part of TON Blockchain Library.
    TON Blockchain Library is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 2 of the License, or
    (at your option) any later version.
    TON Blockchain Library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.
    You should have received a copy of the GNU Lesser General Public License
    along with TON Blockchain Library.  If not, see .
    Copyright 2017-2020 Telegram Systems LLP
*/
#include "overlay-manager.h"
#include "auto/tl/ton_api.h"
#include "overlay.h"
#include "adnl/utils.hpp"
#include "td/actor/actor.h"
#include "td/actor/common.h"
#include "td/utils/Random.h"
#include "td/db/RocksDb.h"
#include "td/utils/Status.h"
#include "td/utils/overloaded.h"
#include "keys/encryptor.h"
#include "td/utils/port/Poll.h"
#include 
namespace ton {
namespace overlay {
void OverlayManager::update_dht_node(td::actor::ActorId dht) {
  dht_node_ = dht;
  for (auto &X : overlays_) {
    for (auto &Y : X.second) {
      td::actor::send_closure(Y.second, &Overlay::update_dht_node, dht);
    }
  }
}
void OverlayManager::register_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
                                      td::actor::ActorOwn overlay) {
  auto it = overlays_.find(local_id);
  VLOG(OVERLAY_INFO) << this << ": registering overlay " << overlay_id << "@" << local_id;
  if (it == overlays_.end()) {
    td::actor::send_closure(adnl_, &adnl::Adnl::subscribe, local_id,
                            adnl::Adnl::int_to_bytestring(ton_api::overlay_message::ID),
                            std::make_unique(actor_id(this)));
    td::actor::send_closure(adnl_, &adnl::Adnl::subscribe, local_id,
                            adnl::Adnl::int_to_bytestring(ton_api::overlay_query::ID),
                            std::make_unique(actor_id(this)));
  }
  overlays_[local_id][overlay_id] = std::move(overlay);
  auto P = td::PromiseCreator::lambda([id = overlays_[local_id][overlay_id].get()](td::Result R) {
    R.ensure();
    auto value = R.move_as_ok();
    if (value.status == td::KeyValue::GetStatus::Ok) {
      auto F = fetch_tl_object(std::move(value.value), true);
      F.ensure();
      auto nodes = std::move(F.move_as_ok()->nodes_);
      td::actor::send_closure(id, &Overlay::receive_nodes_from_db, std::move(nodes));
    }
  });
  auto key = create_hash_tl_object(local_id.bits256_value(), overlay_id.bits256_value());
  db_.get(key, std::move(P));
}
void OverlayManager::delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id) {
  auto it = overlays_.find(local_id);
  if (it != overlays_.end()) {
    it->second.erase(overlay_id);
    if (it->second.size() == 0) {
      td::actor::send_closure(adnl_, &adnl::Adnl::unsubscribe, local_id,
                              adnl::Adnl::int_to_bytestring(ton_api::overlay_message::ID));
      td::actor::send_closure(adnl_, &adnl::Adnl::unsubscribe, local_id,
                              adnl::Adnl::int_to_bytestring(ton_api::overlay_query::ID));
      overlays_.erase(it);
    }
  }
}
void OverlayManager::create_public_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
                                           std::unique_ptr callback, OverlayPrivacyRules rules,
                                           td::string scope) {
  create_public_overlay_ex(local_id, std::move(overlay_id), std::move(callback), std::move(rules), std::move(scope),
                           {});
}
void OverlayManager::create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
                                              std::unique_ptr callback, OverlayPrivacyRules rules,
                                              td::string scope, OverlayOptions opts) {
  CHECK(!dht_node_.empty());
  auto id = overlay_id.compute_short_id();
  register_overlay(local_id, id,
                   Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id),
                                   std::move(callback), std::move(rules), scope, std::move(opts)));
}
void OverlayManager::create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
                                            std::vector nodes,
                                            std::unique_ptr callback, OverlayPrivacyRules rules) {
  auto id = overlay_id.compute_short_id();
  register_overlay(local_id, id,
                   Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id),
                                   std::move(nodes), std::move(callback), std::move(rules)));
}
void OverlayManager::receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data) {
  auto R = fetch_tl_prefix(data, true);
  if (R.is_error()) {
    VLOG(OVERLAY_WARNING) << this << ": can not parse overlay message: " << R.move_as_error();
    return;
  }
  auto M = R.move_as_ok();
  auto it = overlays_.find(dst);
  if (it == overlays_.end()) {
    VLOG(OVERLAY_NOTICE) << this << ": message to unknown overlay " << M->overlay_ << "@" << dst;
    return;
  }
  auto it2 = it->second.find(OverlayIdShort{M->overlay_});
  if (it2 == it->second.end()) {
    VLOG(OVERLAY_NOTICE) << this << ": message to localid is not in overlay " << M->overlay_ << "@" << dst;
    return;
  }
  td::actor::send_closure(it2->second, &Overlay::update_throughput_in_ctr, src, (td::uint32)data.size(), false);
  td::actor::send_closure(it2->second, &Overlay::receive_message, src, std::move(data));
}
void OverlayManager::receive_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data,
                                   td::Promise promise) {
  auto R = fetch_tl_prefix(data, true);
  if (R.is_error()) {
    VLOG(OVERLAY_WARNING) << this << ": can not parse overlay query [" << src << "->" << dst
                          << "]: " << R.move_as_error();
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "bad overlay query header"));
    return;
  }
  auto M = R.move_as_ok();
  auto it = overlays_.find(dst);
  if (it == overlays_.end()) {
    VLOG(OVERLAY_NOTICE) << this << ": query to unknown overlay " << M->overlay_ << "@" << dst << " from " << src;
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, PSTRING() << "bad local_id " << dst));
    return;
  }
  auto it2 = it->second.find(OverlayIdShort{M->overlay_});
  if (it2 == it->second.end()) {
    VLOG(OVERLAY_NOTICE) << this << ": query to localid not in overlay " << M->overlay_ << "@" << dst << " from " << src;
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, PSTRING() << "bad overlay_id " << M->overlay_));
    return;
  }
  td::actor::send_closure(it2->second, &Overlay::update_throughput_in_ctr, src, (td::uint32)data.size(), true);
  td::actor::send_closure(it2->second, &Overlay::receive_query, src, std::move(data), std::move(promise));
}
void OverlayManager::send_query_via(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id,
                                    std::string name, td::Promise promise, td::Timestamp timeout,
                                    td::BufferSlice query, td::uint64 max_answer_size,
                                    td::actor::ActorId via) {
  CHECK(query.size() <= adnl::Adnl::huge_packet_max_size());
  
  auto it = overlays_.find(src);
  if (it != overlays_.end()) {
    auto it2 = it->second.find(overlay_id);
    if (it2 != it->second.end()) {
      td::actor::send_closure(it2->second, &Overlay::update_throughput_out_ctr, dst, (td::uint32)query.size(), true);
    }
  }
  
  td::actor::send_closure(
      via, &adnl::AdnlSenderInterface::send_query_ex, src, dst, std::move(name), std::move(promise), timeout,
      create_serialize_tl_object_suffix(query.as_slice(), overlay_id.tl()), max_answer_size);
}
void OverlayManager::send_message_via(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id,
                                      td::BufferSlice object, td::actor::ActorId via) {
  CHECK(object.size() <= adnl::Adnl::huge_packet_max_size());
  
  auto it = overlays_.find(src);
  if (it != overlays_.end()) {
    auto it2 = it->second.find(overlay_id);
    if (it2 != it->second.end()) {
      td::actor::send_closure(it2->second, &Overlay::update_throughput_out_ctr, dst, (td::uint32)object.size(), false);
    }
  }
  
  td::actor::send_closure(
      via, &adnl::AdnlSenderInterface::send_message, src, dst,
      create_serialize_tl_object_suffix(object.as_slice(), overlay_id.tl()));
}
void OverlayManager::send_broadcast(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id, td::BufferSlice object) {
  send_broadcast_ex(local_id, overlay_id, local_id.pubkey_hash(), 0, std::move(object));
}
void OverlayManager::send_broadcast_ex(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id, PublicKeyHash send_as,
                                       td::uint32 flags, td::BufferSlice object) {
  CHECK(object.size() <= Overlays::max_simple_broadcast_size());
  auto it = overlays_.find(local_id);
  if (it != overlays_.end()) {
    auto it2 = it->second.find(overlay_id);
    if (it2 != it->second.end()) {
      td::actor::send_closure(it2->second, &Overlay::send_broadcast, send_as, flags, std::move(object));
    }
  }
}
void OverlayManager::send_broadcast_fec(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
                                        td::BufferSlice object) {
  send_broadcast_fec_ex(local_id, overlay_id, local_id.pubkey_hash(), 0, std::move(object));
}
void OverlayManager::send_broadcast_fec_ex(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
                                           PublicKeyHash send_as, td::uint32 flags, td::BufferSlice object) {
  CHECK(object.size() <= Overlays::max_fec_broadcast_size());
  auto it = overlays_.find(local_id);
  if (it != overlays_.end()) {
    auto it2 = it->second.find(overlay_id);
    if (it2 != it->second.end()) {
      td::actor::send_closure(it2->second, &Overlay::send_broadcast_fec, send_as, flags, std::move(object));
    }
  }
}
void OverlayManager::set_privacy_rules(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
                                       OverlayPrivacyRules rules) {
  auto it = overlays_.find(local_id);
  if (it != overlays_.end()) {
    auto it2 = it->second.find(overlay_id);
    if (it2 != it->second.end()) {
      td::actor::send_closure(it2->second, &Overlay::set_privacy_rules, std::move(rules));
    }
  }
}
void OverlayManager::update_certificate(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id, PublicKeyHash key,
                                        std::shared_ptr cert) {
  auto it = overlays_.find(local_id);
  if (it != overlays_.end()) {
    auto it2 = it->second.find(overlay_id);
    if (it2 != it->second.end()) {
      td::actor::send_closure(it2->second, &Overlay::add_certificate, key, std::move(cert));
    }
  }
}
void OverlayManager::get_overlay_random_peers(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
                                              td::uint32 max_peers,
                                              td::Promise> promise) {
  auto it = overlays_.find(local_id);
  if (it != overlays_.end()) {
    auto it2 = it->second.find(overlay_id);
    if (it2 != it->second.end()) {
      td::actor::send_closure(it2->second, &Overlay::get_overlay_random_peers, max_peers, std::move(promise));
    }
  }
}
td::actor::ActorOwn Overlays::create(std::string db_root, td::actor::ActorId keyring,
                                               td::actor::ActorId adnl, td::actor::ActorId dht) {
  return td::actor::create_actor("overlaymanager", db_root, keyring, adnl, dht);
}
OverlayManager::OverlayManager(std::string db_root, td::actor::ActorId keyring,
                               td::actor::ActorId adnl, td::actor::ActorId dht)
    : db_root_(db_root), keyring_(keyring), adnl_(adnl), dht_node_(dht) {
}
void OverlayManager::start_up() {
  std::shared_ptr kv =
      std::make_shared(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
  db_ = DbType{std::move(kv)};
}
void OverlayManager::save_to_db(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
                                std::vector nodes) {
  std::vector> nodes_vec;
  for (auto &n : nodes) {
    nodes_vec.push_back(n.tl());
  }
  auto obj = create_tl_object(std::move(nodes_vec));
  auto key = create_hash_tl_object(local_id.bits256_value(), overlay_id.bits256_value());
  db_.set(key, create_serialize_tl_object(std::move(obj)));
}
void OverlayManager::get_stats(td::Promise> promise) {
  class Cb : public td::actor::Actor {
   public:
    Cb(td::Promise> promise) : promise_(std::move(promise)) {
    }
    void incr_pending() {
      pending_++;
    }
    void decr_pending() {
      if (!--pending_) {
        promise_.set_result(create_tl_object(std::move(res_)));
        stop();
      }
    }
    void receive_answer(tl_object_ptr res) {
      if (res) {
        res_.push_back(std::move(res));
      }
      decr_pending();
    }
   private:
    std::vector> res_;
    size_t pending_{1};
    td::Promise> promise_;
  };
  auto act = td::actor::create_actor("overlaysstatsmerger", std::move(promise)).release();
  for (auto &a : overlays_) {
    for (auto &b : a.second) {
      td::actor::send_closure(act, &Cb::incr_pending);
      td::actor::send_closure(b.second, &Overlay::get_stats,
                              [act](td::Result> R) {
                                if (R.is_ok()) {
                                  td::actor::send_closure(act, &Cb::receive_answer, R.move_as_ok());
                                } else {
                                  td::actor::send_closure(act, &Cb::receive_answer, nullptr);
                                }
                              });
    }
  }
  td::actor::send_closure(act, &Cb::decr_pending);
}
Certificate::Certificate(PublicKey issued_by, td::int32 expire_at, td::uint32 max_size, td::uint32 flags,
                         td::BufferSlice signature)
    : issued_by_(issued_by)
    , expire_at_(expire_at)
    , max_size_(max_size)
    , flags_(flags)
    , signature_(td::SharedSlice(signature.as_slice())) {
}
Certificate::Certificate(PublicKeyHash issued_by, td::int32 expire_at, td::uint32 max_size, td::uint32 flags,
                         td::BufferSlice signature)
    : issued_by_(issued_by)
    , expire_at_(expire_at)
    , max_size_(max_size)
    , flags_(flags)
    , signature_(td::SharedSlice(signature.as_slice())) {
}
void Certificate::set_signature(td::BufferSlice signature) {
  signature_ = td::SharedSlice{signature.as_slice()};
}
void Certificate::set_issuer(PublicKey issuer) {
  issued_by_ = issuer;
}
constexpr td::uint32 cert_default_flags(td::uint32 max_size) {
  return (max_size > Overlays::max_simple_broadcast_size() ? CertificateFlags::AllowFec : 0) |
         CertificateFlags::Trusted;
}
td::BufferSlice Certificate::to_sign(OverlayIdShort overlay_id, PublicKeyHash issued_to) const {
  if (flags_ == cert_default_flags(max_size_)) {
    return create_serialize_tl_object(overlay_id.tl(), issued_to.tl(), expire_at_,
                                                                      max_size_);
  } else {
    return create_serialize_tl_object(overlay_id.tl(), issued_to.tl(), expire_at_,
                                                                        max_size_, flags_);
  }
}
const PublicKeyHash Certificate::issuer_hash() const {
  PublicKeyHash r;
  issued_by_.visit(
      td::overloaded([&](const PublicKeyHash &x) { r = x; }, [&](const PublicKey &x) { r = x.compute_short_id(); }));
  return r;
}
const PublicKey &Certificate::issuer() const {
  return issued_by_.get();
}
td::Result> Certificate::create(tl_object_ptr cert) {
  std::shared_ptr res;
  ton_api::downcast_call(*cert.get(),
                         td::overloaded([&](ton_api::overlay_emptyCertificate &obj) { res = nullptr; },
                                        [&](ton_api::overlay_certificate &obj) {
                                          res = std::make_shared(PublicKey{obj.issued_by_}, obj.expire_at_,
                                                                              static_cast(obj.max_size_),
                                                                              cert_default_flags(obj.max_size_),
                                                                              std::move(obj.signature_));
                                        },
                                        [&](ton_api::overlay_certificateV2 &obj) {
                                          res = std::make_shared(PublicKey{obj.issued_by_}, obj.expire_at_,
                                                                              static_cast(obj.max_size_),
                                                                              static_cast(obj.flags_),
                                                                              std::move(obj.signature_));
                                        }));
  return std::move(res);
}
BroadcastCheckResult Certificate::check(PublicKeyHash node, OverlayIdShort overlay_id, td::int32 unix_time,
                                        td::uint32 size, bool is_fec) const {
  if (size > max_size_) {
    return BroadcastCheckResult::Forbidden;
  }
  if (unix_time > expire_at_) {
    return BroadcastCheckResult::Forbidden;
  }
  if (is_fec && !(flags_ & CertificateFlags::AllowFec)) {
    return BroadcastCheckResult::Forbidden;
  }
  auto R1 = issued_by_.get().create_encryptor();
  if (R1.is_error()) {
    return BroadcastCheckResult::Forbidden;
  }
  auto E = R1.move_as_ok();
  auto B = to_sign(overlay_id, node);
  if (E->check_signature(B.as_slice(), signature_.as_slice()).is_error()) {
    return BroadcastCheckResult::Forbidden;
  }
  return (flags_ & CertificateFlags::Trusted) ? BroadcastCheckResult::Allowed : BroadcastCheckResult::NeedCheck;
}
tl_object_ptr Certificate::tl() const {
  return create_tl_object(issued_by_.get().tl(), expire_at_, max_size_,
                                                        signature_.clone_as_buffer_slice());
}
tl_object_ptr Certificate::empty_tl() {
  return create_tl_object();
}
}  // namespace overlay
}  // namespace ton