/*
    This file is part of TON Blockchain Library.
    TON Blockchain Library is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 2 of the License, or
    (at your option) any later version.
    TON Blockchain Library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.
    You should have received a copy of the GNU Lesser General Public License
    along with TON Blockchain Library.  If not, see .
    Copyright 2017-2020 Telegram Systems LLP
*/
#include "auto/tl/ton_api.h"
#include "overlays.h"
#include "td/utils/SharedSlice.h"
#include "td/utils/overloaded.h"
#include "full-node-shard.hpp"
#include "full-node-shard-queries.hpp"
#include "ton/ton-shard.h"
#include "ton/ton-tl.hpp"
#include "adnl/utils.hpp"
#include "net/download-block-new.hpp"
#include "net/download-block.hpp"
#include "net/download-next-block.hpp"
#include "net/download-state.hpp"
#include "net/download-proof.hpp"
#include "net/get-next-key-blocks.hpp"
#include "net/download-archive-slice.hpp"
#include "impl/out-msg-queue-proof.hpp"
#include "td/utils/Random.h"
#include "common/delay.h"
namespace ton {
namespace validator {
namespace fullnode {
Neighbour Neighbour::zero = Neighbour{adnl::AdnlNodeIdShort::zero()};
void Neighbour::update_proto_version(ton_api::tonNode_Capabilities &q) {
  ton_api::downcast_call(q, td::overloaded(
      [&](ton_api::tonNode_capabilities &x) {
        proto_version = x.version_;
        capabilities = x.capabilities_;
        if (!supports_v2()) {
          has_state_known = has_state = true;
        }
      },
      [&](ton_api::tonNode_capabilitiesV2 &x) {
        proto_version = x.version_;
        capabilities = x.capabilities_;
        has_state_known = true;
        has_state = x.have_state_;
      }));
}
void Neighbour::query_success(double t) {
  unreliability--;
  if (unreliability < 0) {
    unreliability = 0;
  }
  update_roundtrip(t);
}
void Neighbour::query_failed() {
  unreliability++;
}
void Neighbour::update_roundtrip(double t) {
  roundtrip = (t + roundtrip) * 0.5;
}
void FullNodeShardImpl::create_overlay() {
  class Callback : public overlay::Overlays::Callback {
   public:
    void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id,
                         td::BufferSlice data) override {
      td::actor::send_closure(node_, &FullNodeShardImpl::receive_message, src, std::move(data));
    }
    void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
                       td::Promise promise) override {
      td::actor::send_closure(node_, &FullNodeShardImpl::receive_query, src, std::move(data), std::move(promise));
    }
    void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
      td::actor::send_closure(node_, &FullNodeShardImpl::receive_broadcast, src, std::move(data));
    }
    void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
                         td::Promise promise) override {
      td::actor::send_closure(node_, &FullNodeShardImpl::check_broadcast, src, std::move(data), std::move(promise));
    }
    Callback(td::actor::ActorId node) : node_(node) {
    }
   private:
    td::actor::ActorId node_;
  };
  if (is_active()) {
    td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay, adnl_id_, overlay_id_full_.clone(),
                            std::make_unique(actor_id(this)), rules_,
                            PSTRING() << "{ \"type\": \"shard\", \"shard_id\": " << get_shard()
                                      << ", \"workchain_id\": " << get_workchain() << " }");
  } else {
    td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay_ex, adnl_id_, overlay_id_full_.clone(),
                            std::make_unique(actor_id(this)), rules_,
                            PSTRING() << "{ \"type\": \"shard\", \"shard_id\": " << get_shard()
                                      << ", \"workchain_id\": " << get_workchain() << " }",
                            false);
  }
  td::actor::send_closure(rldp_, &rldp::Rldp::add_id, adnl_id_);
  td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, adnl_id_);
  if (cert_) {
    td::actor::send_closure(overlays_, &overlay::Overlays::update_certificate, adnl_id_, overlay_id_, local_id_, cert_);
  }
  if (!collator_nodes_.empty()) {
    td::actor::send_closure(overlays_, &overlay::Overlays::set_priority_broadcast_receivers, adnl_id_, overlay_id_,
                            collator_nodes_);
  }
}
void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broadcast, td::Promise promise) {
  if (mode_ != FullNodeShardMode::active) {
    return promise.set_error(td::Status::Error("cannot check broadcast: shard is not active"));
  }
  auto B = fetch_tl_object(std::move(broadcast), true);
  if (B.is_error()) {
    return promise.set_error(B.move_as_error_prefix("failed to parse external message broadcast: "));
  }
  auto q = B.move_as_ok();
  if (config_.ext_messages_broadcast_disabled_) {
    promise.set_error(td::Status::Error("rebroadcasting external messages is disabled"));
    promise = [manager = validator_manager_, message = q->message_->data_.clone()](td::Result R) mutable {
      if (R.is_ok()) {
        td::actor::send_closure(manager, &ValidatorManagerInterface::new_external_message, std::move(message));
      }
    };
  }
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::check_external_message,
                          std::move(q->message_->data_),
                          promise.wrap([](td::Ref) { return td::Unit(); }));
}
void FullNodeShardImpl::remove_neighbour(adnl::AdnlNodeIdShort id) {
  neighbours_.erase(id);
}
void FullNodeShardImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise promise) {
  td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, adnl_id_, overlay_id_);
  adnl_id_ = adnl_id;
  local_id_ = adnl_id_.pubkey_hash();
  create_overlay();
}
void FullNodeShardImpl::set_mode(FullNodeShardMode mode) {
  if (shard_.is_masterchain()) {
    return;
  }
  bool was_active = is_active();
  mode_ = mode;
  if (was_active != is_active()) {
    td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, adnl_id_, overlay_id_);
    create_overlay();
  }
}
void FullNodeShardImpl::try_get_next_block(td::Timestamp timeout, td::Promise promise) {
  if (timeout.is_in_past()) {
    promise.set_error(td::Status::Error(ErrorCode::timeout, "timeout"));
    return;
  }
  auto &b = choose_neighbour();
  if (!b.adnl_id.is_zero() && b.proto_version >= 1) {
    VLOG(FULL_NODE_DEBUG) << "using new download method with adnlid=" << b.adnl_id;
    td::actor::create_actor("downloadnext", adnl_id_, overlay_id_, handle_->id(), b.adnl_id,
                                              download_next_priority(), timeout, validator_manager_, rldp_, overlays_,
                                              adnl_, client_, create_neighbour_promise(b, std::move(promise)))
        .release();
  } else {
    VLOG(FULL_NODE_DEBUG) << "using old download method with adnlid=" << b.adnl_id;
    td::actor::create_actor("downloadnext", adnl_id_, overlay_id_, handle_, b.adnl_id,
                                               download_next_priority(), timeout, validator_manager_, rldp_, overlays_,
                                               adnl_, client_, create_neighbour_promise(b, std::move(promise)))
        .release();
  }
}
void FullNodeShardImpl::got_next_block(td::Result R) {
  if (R.is_error()) {
    if (R.error().code() == ErrorCode::timeout || R.error().code() == ErrorCode::notready) {
      get_next_block();
      return;
    }
  }
  attempt_ = 0;
  R.ensure();
  auto old_seqno = handle_->id().id.seqno;
  handle_ = R.move_as_ok();
  CHECK(handle_->id().id.seqno == old_seqno + 1);
  if (promise_) {
    if (handle_->unix_time() > td::Clocks::system() - 300) {
      promise_.set_value(td::Unit());
    } else {
      sync_completed_at_ = td::Timestamp::in(60.0);
    }
  }
  get_next_block();
}
void FullNodeShardImpl::get_next_block() {
  //return;
  attempt_++;
  auto P = td::PromiseCreator::lambda([validator_manager = validator_manager_, attempt = attempt_,
                                       block_id = handle_->id(), SelfId = actor_id(this)](td::Result R) {
    if (R.is_ok()) {
      auto P = td::PromiseCreator::lambda([SelfId](td::Result R) {
        td::actor::send_closure(SelfId, &FullNodeShardImpl::got_next_block, std::move(R));
      });
      td::actor::send_closure(validator_manager, &ValidatorManagerInterface::validate_block, R.move_as_ok(),
                              std::move(P));
    } else {
      auto S = R.move_as_error();
      if (S.code() != ErrorCode::notready && S.code() != ErrorCode::timeout) {
        VLOG(FULL_NODE_WARNING) << "failed to download next block after " << block_id << ": " << S;
      } else {
        if ((attempt % 128) == 0) {
          VLOG(FULL_NODE_INFO) << "failed to download next block after " << block_id << ": " << S;
        } else {
          VLOG(FULL_NODE_DEBUG) << "failed to download next block after " << block_id << ": " << S;
        }
      }
      delay_action([SelfId]() mutable { td::actor::send_closure(SelfId, &FullNodeShardImpl::get_next_block); },
                   td::Timestamp::in(0.1));
    }
  });
  try_get_next_block(td::Timestamp::in(2.0), std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getNextBlockDescription &query,
                                      td::Promise promise) {
  if (query.prev_block_->workchain_ != masterchainId || static_cast(query.prev_block_->shard_) != shardIdAll) {
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "next block allowed only for masterchain"));
    return;
  }
  auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result R) mutable {
    if (R.is_error()) {
      auto x = create_serialize_tl_object();
      promise.set_value(std::move(x));
    } else {
      auto B = R.move_as_ok();
      if (!B->received() || !B->inited_proof()) {
        auto x = create_serialize_tl_object();
        promise.set_value(std::move(x));
      } else {
        auto x = create_serialize_tl_object(create_tl_block_id(B->id()));
        promise.set_value(std::move(x));
      }
    }
  });
  BlockIdExt block_id = create_block_id(query.prev_block_);
  VLOG(FULL_NODE_DEBUG) << "Got query getNextBlockDescription " << block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_next_block, block_id, std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_prepareBlock &query,
                                      td::Promise promise) {
  auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result R) mutable {
    if (R.is_error()) {
      auto x = create_serialize_tl_object();
      promise.set_value(std::move(x));
    } else {
      auto B = R.move_as_ok();
      if (!B->received()) {
        auto x = create_serialize_tl_object();
        promise.set_value(std::move(x));
      } else {
        auto x = create_serialize_tl_object();
        promise.set_value(std::move(x));
      }
    }
  });
  BlockIdExt block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query prepareBlock " << block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_block_handle, block_id, false,
                          std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_downloadBlock &query,
                                      td::Promise promise) {
  auto P = td::PromiseCreator::lambda([validator_manager = validator_manager_,
                                       promise = std::move(promise)](td::Result R) mutable {
    if (R.is_error()) {
      promise.set_error(td::Status::Error(ErrorCode::protoviolation, "unknown block"));
    } else {
      auto B = R.move_as_ok();
      if (!B->received()) {
        promise.set_error(td::Status::Error(ErrorCode::protoviolation, "unknown block"));
      } else {
        td::actor::send_closure(validator_manager, &ValidatorManagerInterface::get_block_data, B, std::move(promise));
      }
    }
  });
  BlockIdExt block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query downloadBlock " << block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_block_handle, block_id, false,
                          std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_downloadBlockFull &query,
                                      td::Promise promise) {
  BlockIdExt block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query downloadBlockFull " << block_id.to_str() << " from " << src;
  td::actor::create_actor("sender", block_id, false, validator_manager_, std::move(promise)).release();
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_downloadNextBlockFull &query,
                                      td::Promise promise) {
  BlockIdExt block_id = create_block_id(query.prev_block_);
  VLOG(FULL_NODE_DEBUG) << "Got query downloadNextBlockFull " << block_id.to_str() << " from " << src;
  td::actor::create_actor("sender", block_id, true, validator_manager_, std::move(promise)).release();
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_prepareBlockProof &query,
                                      td::Promise promise) {
  if (query.block_->seqno_ == 0) {
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "cannot download proof for zero state"));
    return;
  }
  auto P = td::PromiseCreator::lambda([allow_partial = query.allow_partial_, promise = std::move(promise),
                                       validator_manager = validator_manager_](td::Result R) mutable {
    if (R.is_error()) {
      auto x = create_serialize_tl_object();
      promise.set_value(std::move(x));
      return;
    } else {
      auto handle = R.move_as_ok();
      if (!handle || (!handle->inited_proof() && (!allow_partial || !handle->inited_proof_link()))) {
        auto x = create_serialize_tl_object();
        promise.set_value(std::move(x));
        return;
      }
      if (handle->inited_proof() && handle->id().is_masterchain()) {
        auto x = create_serialize_tl_object();
        promise.set_value(std::move(x));
      } else {
        auto x = create_serialize_tl_object();
        promise.set_value(std::move(x));
      }
    }
  });
  BlockIdExt block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query prepareBlockProof " << block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_block_handle, block_id, false,
                          std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_prepareKeyBlockProof &query,
                                      td::Promise promise) {
  if (query.block_->seqno_ == 0) {
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "cannot download proof for zero state"));
    return;
  }
  auto P = td::PromiseCreator::lambda(
      [allow_partial = query.allow_partial_, promise = std::move(promise)](td::Result R) mutable {
        if (R.is_error()) {
          auto x = create_serialize_tl_object();
          promise.set_value(std::move(x));
        } else if (allow_partial) {
          auto x = create_serialize_tl_object();
          promise.set_value(std::move(x));
        } else {
          auto x = create_serialize_tl_object();
          promise.set_value(std::move(x));
        }
      });
  BlockIdExt block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query prepareKeyBlockProof " << block_id.to_str() << " " << query.allow_partial_
                        << " from " << src;
  if (query.allow_partial_) {
    td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_key_block_proof_link, block_id,
                            std::move(P));
  } else {
    td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_key_block_proof, block_id,
                            std::move(P));
  }
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_downloadBlockProof &query,
                                      td::Promise promise) {
  auto P = td::PromiseCreator::lambda(
      [promise = std::move(promise), validator_manager = validator_manager_](td::Result R) mutable {
        if (R.is_error()) {
          promise.set_error(td::Status::Error(ErrorCode::protoviolation, "unknown block proof"));
          return;
        } else {
          auto handle = R.move_as_ok();
          if (!handle || !handle->inited_proof()) {
            promise.set_error(td::Status::Error(ErrorCode::protoviolation, "unknown block proof"));
            return;
          }
          td::actor::send_closure(validator_manager, &ValidatorManagerInterface::get_block_proof, handle,
                                  std::move(promise));
        }
      });
  BlockIdExt block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query downloadBlockProof " << block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_block_handle, block_id, false,
                          std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_downloadBlockProofLink &query,
                                      td::Promise promise) {
  auto P = td::PromiseCreator::lambda(
      [promise = std::move(promise), validator_manager = validator_manager_](td::Result R) mutable {
        if (R.is_error()) {
          promise.set_error(td::Status::Error(ErrorCode::protoviolation, "unknown block proof"));
          return;
        } else {
          auto handle = R.move_as_ok();
          if (!handle || !handle->inited_proof_link()) {
            promise.set_error(td::Status::Error(ErrorCode::protoviolation, "unknown block proof"));
            return;
          }
          td::actor::send_closure(validator_manager, &ValidatorManagerInterface::get_block_proof_link, handle,
                                  std::move(promise));
        }
      });
  BlockIdExt block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query downloadBlockProofLink " << block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_block_handle, block_id, false,
                          std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_downloadKeyBlockProof &query,
                                      td::Promise promise) {
  if (query.block_->seqno_ == 0) {
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "cannot download proof for zero state"));
    return;
  }
  auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result R) mutable {
    if (R.is_error()) {
      promise.set_error(td::Status::Error(ErrorCode::protoviolation, "unknown block proof"));
    } else {
      promise.set_value(R.move_as_ok());
    }
  });
  BlockIdExt block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query downloadKeyBlockProof " << block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_key_block_proof, block_id, std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_downloadKeyBlockProofLink &query,
                                      td::Promise promise) {
  if (query.block_->seqno_ == 0) {
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "cannot download proof for zero state"));
    return;
  }
  auto P = td::PromiseCreator::lambda([promise = std::move(promise)](td::Result R) mutable {
    if (R.is_error()) {
      promise.set_error(td::Status::Error(ErrorCode::protoviolation, "unknown block proof"));
    } else {
      promise.set_value(R.move_as_ok());
    }
  });
  BlockIdExt block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query downloadKeyBlockProofLink " << block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_key_block_proof_link, block_id,
                          std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_prepareZeroState &query,
                                      td::Promise promise) {
  auto P =
      td::PromiseCreator::lambda([SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable {
        if (R.is_error() || !R.move_as_ok()) {
          auto x = create_serialize_tl_object();
          promise.set_value(std::move(x));
          return;
        }
        auto x = create_serialize_tl_object();
        promise.set_value(std::move(x));
      });
  auto block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query prepareZeroState " << block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::check_zero_state_exists, block_id,
                          std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_preparePersistentState &query,
                                      td::Promise promise) {
  auto P =
      td::PromiseCreator::lambda([SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable {
        if (R.is_error() || !R.move_as_ok()) {
          auto x = create_serialize_tl_object();
          promise.set_value(std::move(x));
          return;
        }
        auto x = create_serialize_tl_object();
        promise.set_value(std::move(x));
      });
  auto block_id = create_block_id(query.block_);
  auto masterchain_block_id = create_block_id(query.masterchain_block_);
  VLOG(FULL_NODE_DEBUG) << "Got query preparePersistentState " << block_id.to_str() << " "
                        << masterchain_block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::check_persistent_state_exists, block_id,
                          masterchain_block_id, std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getNextKeyBlockIds &query,
                                      td::Promise promise) {
  auto cnt = static_cast(query.max_size_);
  if (cnt > 8) {
    cnt = 8;
  }
  auto P =
      td::PromiseCreator::lambda([promise = std::move(promise), cnt](td::Result> R) mutable {
        if (R.is_error()) {
          LOG(WARNING) << "getnextkey: " << R.move_as_error();
          auto x = create_serialize_tl_object(
              std::vector>{}, false, true);
          promise.set_value(std::move(x));
          return;
        }
        auto res = R.move_as_ok();
        std::vector> v;
        for (auto &b : res) {
          v.emplace_back(create_tl_block_id(b));
        }
        auto x = create_serialize_tl_object(std::move(v), res.size() < cnt, false);
        promise.set_value(std::move(x));
      });
  auto block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query getNextKeyBlockIds " << block_id.to_str() << " " << cnt << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_next_key_blocks, block_id, cnt,
                          std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_downloadZeroState &query,
                                      td::Promise promise) {
  auto P = td::PromiseCreator::lambda(
      [SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable {
        if (R.is_error()) {
          promise.set_error(R.move_as_error_prefix("failed to get state from db: "));
          return;
        }
        promise.set_value(R.move_as_ok());
      });
  auto block_id = create_block_id(query.block_);
  VLOG(FULL_NODE_DEBUG) << "Got query downloadZeroState " << block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_zero_state, block_id, std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_downloadPersistentState &query,
                                      td::Promise promise) {
  td::uint64 max_size = 1 << 24;
  auto P = td::PromiseCreator::lambda(
      [SelfId = actor_id(this), promise = std::move(promise), max_size](td::Result R) mutable {
        if (R.is_error()) {
          promise.set_error(R.move_as_error_prefix("failed to get state from db: "));
          return;
        }
        td::BufferSlice s = R.move_as_ok();
        if (s.size() > max_size) {
          promise.set_error(td::Status::Error("state is too big"));
          return;
        }
        promise.set_value(std::move(s));
      });
  auto block_id = create_block_id(query.block_);
  auto masterchain_block_id = create_block_id(query.masterchain_block_);
  VLOG(FULL_NODE_DEBUG) << "Got query downloadPersistentState " << block_id.to_str() << " "
                        << masterchain_block_id.to_str() << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_persistent_state_slice, block_id,
                          masterchain_block_id, 0, max_size + 1, std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_downloadPersistentStateSlice &query,
                                      td::Promise promise) {
  auto block_id = create_block_id(query.block_);
  auto masterchain_block_id = create_block_id(query.masterchain_block_);
  VLOG(FULL_NODE_DEBUG) << "Got query downloadPersistentStateSlice " << block_id.to_str() << " "
                        << masterchain_block_id.to_str() << " " << query.offset_ << " " << query.max_size_ << " from "
                        << src;
  if (query.max_size_ < 0 || query.max_size_ > (1 << 24)) {
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "invalid max_size"));
    return;
  }
  auto P = td::PromiseCreator::lambda(
      [SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable {
        if (R.is_error()) {
          promise.set_error(R.move_as_error_prefix("failed to get state from db: "));
          return;
        }
        promise.set_value(R.move_as_ok());
      });
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_persistent_state_slice, block_id,
                          masterchain_block_id, query.offset_, query.max_size_, std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getCapabilities &query,
                                      td::Promise promise) {
  VLOG(FULL_NODE_DEBUG) << "Got query getCapabilities from " << src;
  promise.set_value(create_serialize_tl_object(proto_version(), proto_capabilities()));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getCapabilitiesV2 &query,
                                      td::Promise promise) {
  VLOG(FULL_NODE_DEBUG) << "Got query getCapabilitiesV2 from " << src;
  promise.set_value(create_serialize_tl_object(proto_version(), proto_capabilities(),
                                                                                mode_ == FullNodeShardMode::active));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveInfo &query,
                                      td::Promise promise) {
  auto P = td::PromiseCreator::lambda(
      [SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable {
        if (R.is_error()) {
          promise.set_value(create_serialize_tl_object());
        } else {
          promise.set_value(create_serialize_tl_object(R.move_as_ok()));
        }
      });
  VLOG(FULL_NODE_DEBUG) << "Got query getArchiveInfo " << query.masterchain_seqno_ << " from " << src;
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_archive_id, query.masterchain_seqno_,
                          std::move(P));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveSlice &query,
                                      td::Promise promise) {
  VLOG(FULL_NODE_DEBUG) << "Got query getArchiveSlice " << query.archive_id_ << " " << query.offset_ << " "
                        << query.max_size_ << " from " << src;
  if (query.max_size_ < 0 || query.max_size_ > (1 << 24)) {
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "invalid max_size"));
    return;
  }
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_archive_slice, query.archive_id_,
                          query.offset_, query.max_size_, std::move(promise));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
                                      td::Promise promise) {
  BlockIdExt block_id = create_block_id(query.block_id_);
  ShardIdFull dst_shard(query.dst_workchain_, query.dst_shard_);
  if (!block_id.is_valid_ext()) {
    promise.set_error(td::Status::Error("invalid block_id"));
    return;
  }
  if (!dst_shard.is_valid_ext()) {
    promise.set_error(td::Status::Error("invalid shard"));
    return;
  }
  auto P = td::PromiseCreator::lambda(
      [promise = std::move(promise)](td::Result> R) mutable {
        if (R.is_error()) {
          promise.set_result(create_serialize_tl_object());
        } else {
          promise.set_result(serialize_tl_object(R.move_as_ok(), true));
        }
      });
  VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof " << block_id.to_str() << " " << dst_shard.to_str()
                        << " from " << src;
  td::actor::create_actor("buildqueueproof", block_id, dst_shard, validator_manager_,
                                                 std::move(P))
      .release();
}
void FullNodeShardImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query,
                                      td::Promise promise) {
  if (!is_active()) {
    td::actor::send_closure(overlays_, &overlay::Overlays::send_message, src, adnl_id_, overlay_id_,
                            create_serialize_tl_object());
    promise.set_error(td::Status::Error("shard is inactive"));
    return;
  }
  auto B = fetch_tl_object(std::move(query), true);
  if (B.is_error()) {
    promise.set_error(td::Status::Error(ErrorCode::protoviolation, "cannot parse tonnode query"));
    return;
  }
  ton_api::downcast_call(*B.move_as_ok().get(), [&](auto &obj) { this->process_query(src, obj, std::move(promise)); });
}
void FullNodeShardImpl::receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data) {
  auto B = fetch_tl_object(std::move(data), true);
  if (B.is_error()) {
    return;
  }
  VLOG(FULL_NODE_DEBUG) << "Got tonNode.forgetPeer from " << src;
  neighbours_.erase(src);
  td::actor::send_closure(overlays_, &overlay::Overlays::forget_peer, adnl_id_, overlay_id_, src);
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ihrMessageBroadcast &query) {
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_ihr_message,
                          std::move(query.message_->data_));
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query) {
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message,
                          std::move(query.message_->data_));
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
  auto block_id = create_block_id(query.block_->block_);
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block,
                          block_id, query.block_->cc_seqno_,
                          std::move(query.block_->data_));
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
  BlockIdExt block_id = create_block_id(query.id_);
  //if (!shard_is_ancestor(shard_, block_id.shard_full())) {
  //  LOG(FULL_NODE_WARNING) << "dropping block broadcast: shard mismatch. overlay=" << shard_.to_str()
  //                         << " block=" << block_id.to_str();
  //  return;
  //}
  std::vector signatures;
  for (auto &sig : query.signatures_) {
    signatures.emplace_back(BlockSignature{sig->who_, std::move(sig->signature_)});
  }
  BlockBroadcast B{block_id,
                   std::move(signatures),
                   static_cast(query.catchain_seqno_),
                   static_cast(query.validator_set_hash_),
                   std::move(query.data_),
                   std::move(query.proof_)};
  auto P = td::PromiseCreator::lambda([](td::Result R) {
    if (R.is_error()) {
      if (R.error().code() == ErrorCode::notready) {
        LOG(DEBUG) << "dropped broadcast: " << R.move_as_error();
      } else {
        LOG(INFO) << "dropped broadcast: " << R.move_as_error();
      }
    }
  });
  td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, std::move(B),
                          std::move(P));
}
void FullNodeShardImpl::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
  if (!is_active()) {
    return;
  }
  auto B = fetch_tl_object(std::move(broadcast), true);
  if (B.is_error()) {
    return;
  }
  ton_api::downcast_call(*B.move_as_ok().get(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
}
void FullNodeShardImpl::send_ihr_message(td::BufferSlice data) {
  if (!client_.empty()) {
    UNREACHABLE();
    return;
  }
  auto B = create_serialize_tl_object(
      create_tl_object(std::move(data)));
  if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
    td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, adnl_id_, overlay_id_, local_id_, 0,
                            std::move(B));
  } else {
    td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, adnl_id_, overlay_id_, local_id_, 0,
                            std::move(B));
  }
}
void FullNodeShardImpl::send_external_message(td::BufferSlice data) {
  if (config_.ext_messages_broadcast_disabled_) {
    return;
  }
  if (!client_.empty()) {
    td::actor::send_closure(client_, &adnl::AdnlExtClient::send_query, "send_ext_query",
                            create_serialize_tl_object_suffix(
                                create_serialize_tl_object(
                                    create_tl_object(std::move(data)))),
                            td::Timestamp::in(1.0), [](td::Result R) {
                              if (R.is_error()) {
                                VLOG(FULL_NODE_WARNING) << "failed to send ext message: " << R.move_as_error();
                              }
                            });
    return;
  }
  auto B = create_serialize_tl_object(
      create_tl_object(std::move(data)));
  if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
    td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, adnl_id_, overlay_id_, local_id_, 0,
                            std::move(B));
  } else {
    td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, adnl_id_, overlay_id_, local_id_, 0,
                            std::move(B));
  }
}
void FullNodeShardImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) {
  if (!client_.empty()) {
    UNREACHABLE();
    return;
  }
  auto B = create_serialize_tl_object(
      create_tl_object(create_tl_block_id(block_id), cc_seqno, std::move(data)));
  if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
    td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, adnl_id_, overlay_id_, local_id_, 0,
                            std::move(B));
  } else {
    td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, adnl_id_, overlay_id_, local_id_,
                            overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
  }
}
void FullNodeShardImpl::send_broadcast(BlockBroadcast broadcast) {
  if (!client_.empty()) {
    UNREACHABLE();
    return;
  }
  std::vector> sigs;
  for (auto &sig : broadcast.signatures) {
    sigs.emplace_back(create_tl_object(sig.node, sig.signature.clone()));
  }
  auto B = create_serialize_tl_object(
      create_tl_block_id(broadcast.block_id), broadcast.catchain_seqno, broadcast.validator_set_hash, std::move(sigs),
      broadcast.proof.clone(), broadcast.data.clone());
  td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, adnl_id_, overlay_id_, local_id_,
                          overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
}
void FullNodeShardImpl::download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
                                       td::Promise promise) {
  auto &b = choose_neighbour();
  if (!b.adnl_id.is_zero() && b.proto_version >= 1) {
    VLOG(FULL_NODE_DEBUG) << "new block download";
    td::actor::create_actor("downloadreq", id, adnl_id_, overlay_id_, b.adnl_id, priority, timeout,
                                              validator_manager_, rldp_, overlays_, adnl_, client_,
                                              create_neighbour_promise(b, std::move(promise)))
        .release();
  } else {
    VLOG(FULL_NODE_DEBUG) << "old block download";
    td::actor::create_actor("downloadreq", id, adnl_id_, overlay_id_, b.adnl_id, priority, timeout,
                                           validator_manager_, rldp_, overlays_, adnl_, client_,
                                           create_neighbour_promise(b, std::move(promise)))
        .release();
  }
}
void FullNodeShardImpl::download_zero_state(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
                                            td::Promise promise) {
  td::actor::create_actor(PSTRING() << "downloadstatereq" << id.id.to_str(), id, BlockIdExt{}, adnl_id_,
                                         overlay_id_, adnl::AdnlNodeIdShort::zero(), priority, timeout,
                                         validator_manager_, rldp_, overlays_, adnl_, client_, std::move(promise))
      .release();
}
void FullNodeShardImpl::download_persistent_state(BlockIdExt id, BlockIdExt masterchain_block_id, td::uint32 priority,
                                                  td::Timestamp timeout, td::Promise promise) {
  auto &b = choose_neighbour();
  td::actor::create_actor(PSTRING() << "downloadstatereq" << id.id.to_str(), id, masterchain_block_id,
                                         adnl_id_, overlay_id_, b.adnl_id, priority, timeout, validator_manager_,
                                         b.use_rldp2() ? (td::actor::ActorId)rldp2_ : rldp_,
                                         overlays_, adnl_, client_, std::move(promise))
      .release();
}
void FullNodeShardImpl::download_block_proof(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
                                             td::Promise promise) {
  auto &b = choose_neighbour();
  td::actor::create_actor("downloadproofreq", block_id, false, false, adnl_id_, overlay_id_, b.adnl_id,
                                         priority, timeout, validator_manager_, rldp_, overlays_, adnl_, client_,
                                         create_neighbour_promise(b, std::move(promise)))
      .release();
}
void FullNodeShardImpl::download_block_proof_link(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
                                                  td::Promise promise) {
  auto &b = choose_neighbour();
  td::actor::create_actor("downloadproofreq", block_id, true, false, adnl_id_, overlay_id_,
                                         b.adnl_id, priority, timeout, validator_manager_, rldp_,
                                         overlays_, adnl_, client_, create_neighbour_promise(b, std::move(promise)))
      .release();
}
void FullNodeShardImpl::get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout,
                                            td::Promise> promise) {
  auto &b = choose_neighbour();
  td::actor::create_actor("next", block_id, 16, adnl_id_, overlay_id_, b.adnl_id,
                                            1, timeout, validator_manager_, rldp_, overlays_, adnl_, client_,
                                            create_neighbour_promise(b, std::move(promise)))
      .release();
}
void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
                                         td::Promise promise) {
  auto &b = choose_neighbour(true);
  td::actor::create_actor(
      "archive", masterchain_seqno, std::move(tmp_dir), adnl_id_, overlay_id_, b.adnl_id, timeout, validator_manager_,
      b.use_rldp2() ? (td::actor::ActorId)rldp2_ : rldp_, overlays_, adnl_, client_,
      create_neighbour_promise(b, std::move(promise)))
      .release();
}
void FullNodeShardImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull dst_shard, td::Timestamp timeout,
                                                     td::Promise> promise) {
  // TODO: maybe more complex download (like other requests here)
  // TODO: estimate max size
  auto &b = choose_neighbour(true);
  if (b.adnl_id == adnl::AdnlNodeIdShort::zero()) {
    promise.set_error(td::Status::Error(ErrorCode::notready, "no nodes"));
    return;
  }
  auto P = td::PromiseCreator::lambda(
      [=, promise = create_neighbour_promise(b, std::move(promise), true)](td::Result R) mutable {
        if (R.is_error()) {
          promise.set_result(R.move_as_error());
          return;
        }
        TRY_RESULT_PROMISE(promise, f, fetch_tl_object(R.move_as_ok(), true));
        ton_api::downcast_call(*f, td::overloaded(
                                       [&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
                                         promise.set_error(td::Status::Error("node doesn't have this block"));
                                       },
                                       [&](ton_api::tonNode_outMsgQueueProof &x) {
                                         promise.set_result(OutMsgQueueProof::fetch(block_id, dst_shard, x));
                                       }));
      });
  td::BufferSlice query = create_serialize_tl_object(
      create_tl_block_id(block_id), dst_shard.workchain, dst_shard.shard);
  td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_,
                          "get_msg_queue", std::move(P), timeout, std::move(query), 1 << 20, rldp_);
}
void FullNodeShardImpl::set_handle(BlockHandle handle, td::Promise promise) {
  CHECK(!handle_);
  handle_ = std::move(handle);
  promise_ = std::move(promise);
  get_next_block();
  sync_completed_at_ = td::Timestamp::in(60.0);
  alarm_timestamp().relax(sync_completed_at_);
}
void FullNodeShardImpl::alarm() {
  if (sync_completed_at_ && sync_completed_at_.is_in_past()) {
    if (promise_) {
      promise_.set_value(td::Unit());
    }
    sync_completed_at_ = td::Timestamp::never();
  }
  if (reload_neighbours_at_ && reload_neighbours_at_.is_in_past()) {
    reload_neighbours();
    reload_neighbours_at_ = td::Timestamp::in(td::Random::fast(10.0, 30.0));
  }
  if (ping_neighbours_at_ && ping_neighbours_at_.is_in_past()) {
    ping_neighbours();
    ping_neighbours_at_ = td::Timestamp::in(td::Random::fast(0.5, 1.0));
  }
  if (update_certificate_at_ && update_certificate_at_.is_in_past()) {
    if (!sign_cert_by_.is_zero()) {
      sign_new_certificate(sign_cert_by_);
      update_certificate_at_ = td::Timestamp::in(30.0);
    } else {
      update_certificate_at_ = td::Timestamp::never();
    }
  }
  alarm_timestamp().relax(sync_completed_at_);
  alarm_timestamp().relax(update_certificate_at_);
  alarm_timestamp().relax(reload_neighbours_at_);
  alarm_timestamp().relax(ping_neighbours_at_);
}
void FullNodeShardImpl::start_up() {
  if (client_.empty()) {
    auto X = create_hash_tl_object(get_workchain(), get_shard(),
                                                                          zero_state_file_hash_);
    td::BufferSlice b{32};
    b.as_slice().copy_from(as_slice(X));
    overlay_id_full_ = overlay::OverlayIdFull{std::move(b)};
    overlay_id_ = overlay_id_full_.compute_short_id();
    rules_ = overlay::OverlayPrivacyRules{overlay::Overlays::max_fec_broadcast_size()};
    create_overlay();
    reload_neighbours_at_ = td::Timestamp::now();
    ping_neighbours_at_ = td::Timestamp::now();
    alarm_timestamp().relax(reload_neighbours_at_);
    alarm_timestamp().relax(ping_neighbours_at_);
  }
}
void FullNodeShardImpl::tear_down() {
  td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, adnl_id_, overlay_id_);
}
void FullNodeShardImpl::sign_new_certificate(PublicKeyHash sign_by) {
  if (sign_by.is_zero()) {
    return;
  }
  ton::overlay::Certificate cert{
      sign_by, static_cast(td::Clocks::system() + 3600), overlay::Overlays::max_fec_broadcast_size(),
      overlay::CertificateFlags::Trusted | overlay::CertificateFlags::AllowFec, td::BufferSlice{}};
  auto to_sign = cert.to_sign(overlay_id_, local_id_);
  auto P = td::PromiseCreator::lambda(
      [SelfId = actor_id(this), cert = std::move(cert)](td::Result> R) mutable {
        if (R.is_error()) {
          // ignore
          VLOG(FULL_NODE_WARNING) << "failed to create certificate: failed to sign: " << R.move_as_error();
        } else {
          auto p = R.move_as_ok();
          cert.set_signature(std::move(p.first));
          cert.set_issuer(p.second);
          td::actor::send_closure(SelfId, &FullNodeShardImpl::signed_new_certificate, std::move(cert));
        }
      });
  td::actor::send_closure(keyring_, &ton::keyring::Keyring::sign_add_get_public_key, sign_by, std::move(to_sign),
                          std::move(P));
}
void FullNodeShardImpl::signed_new_certificate(ton::overlay::Certificate cert) {
  LOG(WARNING) << "updated certificate";
  cert_ = std::make_shared(std::move(cert));
  td::actor::send_closure(overlays_, &overlay::Overlays::update_certificate, adnl_id_, overlay_id_, local_id_, cert_);
}
void FullNodeShardImpl::sign_overlay_certificate(PublicKeyHash signed_key, td::uint32 expire_at, td::uint32 max_size, td::Promise promise) {
  auto sign_by = sign_cert_by_;
  if (sign_by.is_zero()) {
    promise.set_error(td::Status::Error("Node has no key with signing authority"));
    return;
  }
  ton::overlay::Certificate cert{
      sign_by, static_cast(expire_at), max_size,
      overlay::CertificateFlags::Trusted | overlay::CertificateFlags::AllowFec, td::BufferSlice{}};
  auto to_sign = cert.to_sign(overlay_id_, signed_key);
  auto P = td::PromiseCreator::lambda(
      [SelfId = actor_id(this), expire_at = expire_at, max_size = max_size, promise = std::move(promise)](td::Result> R) mutable {
        if (R.is_error()) {
          promise.set_error(R.move_as_error_prefix("failed to create certificate: failed to sign: "));
        } else {
          auto p = R.move_as_ok();
          auto c = ton::create_serialize_tl_object(p.second.tl(), static_cast(expire_at), max_size, std::move(p.first));
          promise.set_value(std::move(c));
        }
      });
  td::actor::send_closure(keyring_, &ton::keyring::Keyring::sign_add_get_public_key, sign_by, std::move(to_sign),
                          std::move(P));
}
void FullNodeShardImpl::import_overlay_certificate(PublicKeyHash signed_key, std::shared_ptr cert, td::Promise promise) {
  td::actor::send_closure(overlays_, &ton::overlay::Overlays::update_certificate,
                                     adnl_id_, overlay_id_, signed_key, cert);
  promise.set_value( td::Unit()  );
}
void FullNodeShardImpl::update_validators(std::vector public_key_hashes, PublicKeyHash local_hash) {
  if (!client_.empty()) {
    return;
  }
  bool update_cert = false;
  if (!local_hash.is_zero() && local_hash != sign_cert_by_) {
    update_cert = true;
  }
  sign_cert_by_ = local_hash;
  std::map authorized_keys;
  for (auto &key : public_key_hashes) {
    authorized_keys.emplace(key, overlay::Overlays::max_fec_broadcast_size());
  }
  rules_ = overlay::OverlayPrivacyRules{overlay::Overlays::max_fec_broadcast_size(), overlay::CertificateFlags::AllowFec, std::move(authorized_keys)};
  td::actor::send_closure(overlays_, &overlay::Overlays::set_privacy_rules, adnl_id_, overlay_id_, rules_);
  if (update_cert) {
    sign_new_certificate(sign_cert_by_);
    update_certificate_at_ = td::Timestamp::in(30.0);
    alarm_timestamp().relax(update_certificate_at_);
  }
}
void FullNodeShardImpl::update_collators(std::vector nodes) {
  if (!client_.empty()) {
    return;
  }
  collator_nodes_ = std::move(nodes);
  td::actor::send_closure(overlays_, &overlay::Overlays::set_priority_broadcast_receivers, adnl_id_, overlay_id_,
                          collator_nodes_);
}
void FullNodeShardImpl::reload_neighbours() {
  auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) {
    if (R.is_error()) {
      return;
    }
    auto vec = R.move_as_ok();
    if (vec.size() == 0) {
      return;
    } else {
      td::actor::send_closure(SelfId, &FullNodeShardImpl::got_neighbours, std::move(vec));
    }
  });
  td::actor::send_closure(overlays_, &overlay::Overlays::get_overlay_random_peers, adnl_id_, overlay_id_,
                          max_neighbours(), std::move(P));
}
void FullNodeShardImpl::got_neighbours(std::vector vec) {
  bool ex = false;
  for (auto &el : vec) {
    auto it = neighbours_.find(el);
    if (it != neighbours_.end()) {
      continue;
    }
    if (neighbours_.size() == max_neighbours()) {
      td::uint32 neighbours_with_state = 0;
      for (const auto &n : neighbours_) {
        if (n.second.has_state) {
          ++neighbours_with_state;
        }
      }
      adnl::AdnlNodeIdShort a = adnl::AdnlNodeIdShort::zero();
      adnl::AdnlNodeIdShort b = adnl::AdnlNodeIdShort::zero();
      td::uint32 cnt = 0;
      double u = 0;
      for (auto &n : neighbours_) {
        if (neighbours_with_state <= min_neighbours_with_state() && n.second.has_state) {
          continue;
        }
        if (n.second.unreliability > u) {
          u = n.second.unreliability;
          a = n.first;
        }
        if (td::Random::fast(0, cnt++) == 0) {
          b = n.first;
        }
      }
      if (u > stop_unreliability()) {
        neighbours_.erase(a);
      } else {
        neighbours_.erase(b);
        ex = true;
      }
    }
    neighbours_.emplace(el, Neighbour{el});
    if (ex) {
      break;
    }
  }
}
const Neighbour &FullNodeShardImpl::choose_neighbour(bool require_state) const {
  if (neighbours_.size() == 0) {
    return Neighbour::zero;
  }
  for (int attempt = 0; attempt < (require_state ? 2 : 1); ++attempt) {
    const Neighbour *best = nullptr;
    td::uint32 sum = 0;
    for (auto &x : neighbours_) {
      if (require_state) {
        if (attempt == 0 && !x.second.has_state) {
          continue;
        }
        if (attempt == 1 && x.second.has_state_known) {
          continue;
        }
      }
      auto unr = static_cast(x.second.unreliability);
      if (x.second.proto_version < proto_version()) {
        unr += 4;
      } else if (x.second.proto_version == proto_version() && x.second.capabilities < proto_capabilities()) {
        unr += 2;
      }
      auto f = static_cast(fail_unreliability());
      if (unr <= f) {
        auto w = 1 << (f - unr);
        sum += w;
        if (td::Random::fast(0, sum - 1) <= w - 1) {
          best = &x.second;
        }
      }
    }
    if (best) {
      return *best;
    }
  }
  return Neighbour::zero;
}
void FullNodeShardImpl::update_neighbour_stats(adnl::AdnlNodeIdShort adnl_id, double t, bool success) {
  auto it = neighbours_.find(adnl_id);
  if (it != neighbours_.end()) {
    if (success) {
      it->second.query_success(t);
    } else {
      it->second.query_failed();
    }
  }
}
void FullNodeShardImpl::got_neighbour_capabilities(adnl::AdnlNodeIdShort adnl_id, double t, td::BufferSlice data) {
  auto it = neighbours_.find(adnl_id);
  if (it == neighbours_.end()) {
    return;
  }
  auto F = fetch_tl_object(std::move(data), true);
  if (F.is_error()) {
    it->second.query_failed();
  } else {
    it->second.update_proto_version(*F.ok());
    it->second.query_success(t);
  }
}
void FullNodeShardImpl::ping_neighbours() {
  if (neighbours_.size() == 0) {
    return;
  }
  td::uint32 max_cnt = 6;
  if (max_cnt > neighbours_.size()) {
    max_cnt = td::narrow_cast(neighbours_.size());
  }
  auto it = neighbours_.lower_bound(last_pinged_neighbour_);
  while (max_cnt > 0) {
    if (it == neighbours_.end()) {
      it = neighbours_.begin();
    }
    auto P = td::PromiseCreator::lambda(
        [SelfId = actor_id(this), start_time = td::Time::now(), id = it->first](td::Result R) {
          if (R.is_error()) {
            td::actor::send_closure(SelfId, &FullNodeShardImpl::update_neighbour_stats, id,
                                    td::Time::now() - start_time, false);
          } else {
            td::actor::send_closure(SelfId, &FullNodeShardImpl::got_neighbour_capabilities, id,
                                    td::Time::now() - start_time, R.move_as_ok());
          }
        });
    td::BufferSlice q;
    if (it->second.supports_v2()) {
      q = create_serialize_tl_object();
    } else {
      q = create_serialize_tl_object();
    }
    td::actor::send_closure(overlays_, &overlay::Overlays::send_query, it->first, adnl_id_, overlay_id_,
                            "get_prepare_block", std::move(P), td::Timestamp::in(1.0), std::move(q));
    last_pinged_neighbour_ = it->first;
    it++;
    max_cnt--;
  }
}
FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id,
                                     FileHash zero_state_file_hash, FullNodeConfig config,
                                     td::actor::ActorId keyring, td::actor::ActorId adnl,
                                     td::actor::ActorId rldp, td::actor::ActorId rldp2,
                                     td::actor::ActorId overlays,
                                     td::actor::ActorId validator_manager,
                                     td::actor::ActorId client, FullNodeShardMode mode)
    : shard_(shard)
    , local_id_(local_id)
    , adnl_id_(adnl_id)
    , zero_state_file_hash_(zero_state_file_hash)
    , keyring_(keyring)
    , adnl_(adnl)
    , rldp_(rldp)
    , rldp2_(rldp2)
    , overlays_(overlays)
    , validator_manager_(validator_manager)
    , client_(client)
    , mode_(shard.is_masterchain() ? FullNodeShardMode::active : mode)
    , config_(config) {
}
td::actor::ActorOwn FullNodeShard::create(
    ShardIdFull shard, PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash,
    FullNodeConfig config, td::actor::ActorId keyring, td::actor::ActorId adnl,
    td::actor::ActorId rldp, td::actor::ActorId rldp2,
    td::actor::ActorId overlays, td::actor::ActorId validator_manager,
    td::actor::ActorId client, FullNodeShardMode mode) {
  return td::actor::create_actor("tonnode", shard, local_id, adnl_id, zero_state_file_hash, config,
                                                    keyring, adnl, rldp, rldp2, overlays, validator_manager, client,
                                                    mode);
}
}  // namespace fullnode
}  // namespace validator
}  // namespace ton