mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
parent
5101b404a4
commit
8d7f1bba73
27 changed files with 617 additions and 477 deletions
|
@ -17,6 +17,7 @@
|
|||
Copyright 2017-2020 Telegram Systems LLP
|
||||
*/
|
||||
#include <set>
|
||||
#include <utility>
|
||||
#include "td/actor/PromiseFuture.h"
|
||||
#include "td/utils/Random.h"
|
||||
#include "td/db/RocksDb.h"
|
||||
|
@ -30,6 +31,25 @@ namespace ton {
|
|||
|
||||
namespace catchain {
|
||||
|
||||
static const td::uint32 MAX_NEIGHBOURS = 5;
|
||||
static const double EXPECTED_UNSAFE_INITIAL_SYNC_DURATION = 300.0;
|
||||
static const double EXPECTED_INITIAL_SYNC_DURATION = 5.0;
|
||||
static const td::uint32 OVERLAY_MAX_ALLOWED_PACKET_SIZE = 16 * 1024 * 1024;
|
||||
static const double NEIGHBOURS_ROTATE_INTERVAL_MIN = 60;
|
||||
static const double NEIGHBOURS_ROTATE_INTERVAL_MAX = 120;
|
||||
static const td::uint32 MAX_QUERY_BLOCKS = 100;
|
||||
static const td::uint32 MAX_QUERY_HEIGHT = 100;
|
||||
static const td::uint32 GET_DIFFERENCE_MAX_SEND = 100;
|
||||
static const double GET_DIFFERENCE_TIMEOUT = 5.0;
|
||||
static const double GET_BLOCK_TIMEOUT = 2.0;
|
||||
static const td::uint32 MAX_PENDING_DEPS = 16;
|
||||
static const double EXPECTED_INITIAL_SYNC_DURATION_WITH_UNPROCESSED = 60.0;
|
||||
static const double SYNC_INTERVAL_MIN = 0.1;
|
||||
static const double SYNC_INTERVAL_MAX = 0.2;
|
||||
static const td::uint32 SYNC_ITERATIONS = 3;
|
||||
static const double DESTROY_DB_DELAY = 1.0;
|
||||
static const td::uint32 DESTROY_DB_MAX_ATTEMPTS = 10;
|
||||
|
||||
PublicKeyHash CatChainReceiverImpl::get_source_hash(td::uint32 source_id) const {
|
||||
CHECK(source_id < sources_.size());
|
||||
return sources_[source_id]->get_hash();
|
||||
|
@ -45,18 +65,19 @@ void CatChainReceiverImpl::deliver_block(CatChainReceivedBlock *block) {
|
|||
<< " custom=" << block->is_custom();
|
||||
callback_->new_block(block->get_source_id(), block->get_fork_id(), block->get_hash(), block->get_height(),
|
||||
block->get_height() == 1 ? CatChainBlockHash::zero() : block->get_prev_hash(),
|
||||
block->get_dep_hashes(), block->get_deps(),
|
||||
block->get_dep_hashes(), block->get_vt(),
|
||||
block->is_custom() ? block->get_payload().clone() : td::SharedSlice());
|
||||
|
||||
std::vector<adnl::AdnlNodeIdShort> v;
|
||||
|
||||
for (auto it : neighbours_) {
|
||||
auto S = get_source(it);
|
||||
for (td::uint32 it : neighbours_) {
|
||||
CatChainReceiverSource *S = get_source(it);
|
||||
v.push_back(S->get_adnl_id());
|
||||
}
|
||||
|
||||
auto update = create_tl_object<ton_api::catchain_blockUpdate>(block->export_tl());
|
||||
auto D = serialize_tl_object(update, true, block->get_payload().as_slice());
|
||||
td::BufferSlice D = serialize_tl_object(update, true, block->get_payload().as_slice());
|
||||
CHECK(D.size() <= opts_.max_serialized_block_size);
|
||||
|
||||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_multiple_messages, std::move(v),
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(D));
|
||||
|
@ -64,8 +85,8 @@ void CatChainReceiverImpl::deliver_block(CatChainReceivedBlock *block) {
|
|||
|
||||
void CatChainReceiverImpl::receive_block(adnl::AdnlNodeIdShort src, tl_object_ptr<ton_api::catchain_block> block,
|
||||
td::BufferSlice payload) {
|
||||
auto id = CatChainReceivedBlock::block_hash(this, block, payload);
|
||||
auto B = get_block(id);
|
||||
CatChainBlockHash id = CatChainReceivedBlock::block_hash(this, block, payload);
|
||||
CatChainReceivedBlock *B = get_block(id);
|
||||
if (B && B->initialized()) {
|
||||
return;
|
||||
}
|
||||
|
@ -76,7 +97,27 @@ void CatChainReceiverImpl::receive_block(adnl::AdnlNodeIdShort src, tl_object_pt
|
|||
return;
|
||||
}
|
||||
|
||||
auto S = validate_block_sync(block, payload.as_slice());
|
||||
td::uint64 max_block_height = get_max_block_height(opts_, sources_.size());
|
||||
if ((td::uint32)block->height_ > max_block_height) {
|
||||
VLOG(CATCHAIN_WARNING) << this << ": received too many blocks from " << src
|
||||
<< " (limit=" << max_block_height << ")";
|
||||
return;
|
||||
}
|
||||
|
||||
td::uint32 src_id = block->src_;
|
||||
if (src_id >= get_sources_cnt()) {
|
||||
VLOG(CATCHAIN_WARNING) << this << ": received broken block from " << src << ": bad src " << block->src_;
|
||||
return;
|
||||
}
|
||||
CatChainReceiverSource *source = get_source(src_id);
|
||||
if (source->fork_is_found()) {
|
||||
if (B == nullptr || !B->has_rev_deps()) {
|
||||
VLOG(CATCHAIN_WARNING) << this << ": dropping block from source " << src_id << ": source has a fork";
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
td::Status S = validate_block_sync(block, payload.as_slice());
|
||||
|
||||
if (S.is_error()) {
|
||||
VLOG(CATCHAIN_WARNING) << this << ": received broken block from " << src << ": " << S.move_as_error();
|
||||
|
@ -89,11 +130,11 @@ void CatChainReceiverImpl::receive_block(adnl::AdnlNodeIdShort src, tl_object_pt
|
|||
<< " (unsafe=" << allow_unsafe_self_blocks_resync_ << ")";
|
||||
} else {
|
||||
LOG(ERROR) << this << ": received unknown SELF block from " << src << ". UPDATING LOCAL DATABASE. UNSAFE";
|
||||
initial_sync_complete_at_ = td::Timestamp::in(300.0);
|
||||
initial_sync_complete_at_ = td::Timestamp::in(EXPECTED_UNSAFE_INITIAL_SYNC_DURATION);
|
||||
}
|
||||
}
|
||||
|
||||
auto raw_data = serialize_tl_object(block, true, payload.as_slice());
|
||||
td::BufferSlice raw_data = serialize_tl_object(block, true, payload.as_slice());
|
||||
create_block(std::move(block), td::SharedSlice{payload.as_slice()});
|
||||
|
||||
if (!opts_.debug_disable_db) {
|
||||
|
@ -104,6 +145,11 @@ void CatChainReceiverImpl::receive_block(adnl::AdnlNodeIdShort src, tl_object_pt
|
|||
}
|
||||
|
||||
void CatChainReceiverImpl::receive_block_answer(adnl::AdnlNodeIdShort src, td::BufferSlice data) {
|
||||
if (data.size() > opts_.max_serialized_block_size) {
|
||||
VLOG(CATCHAIN_INFO) << this << ": received bad block result " << src << ": too big (size="
|
||||
<< data.size() << ", limit=" << opts_.max_serialized_block_size << ")";
|
||||
return;
|
||||
}
|
||||
auto F = fetch_tl_prefix<ton_api::catchain_BlockResult>(data, true);
|
||||
if (F.is_error()) {
|
||||
VLOG(CATCHAIN_INFO) << this << ": received bad block result: " << F.move_as_error();
|
||||
|
@ -111,7 +157,7 @@ void CatChainReceiverImpl::receive_block_answer(adnl::AdnlNodeIdShort src, td::B
|
|||
}
|
||||
auto f = F.move_as_ok();
|
||||
ton_api::downcast_call(
|
||||
*f.get(),
|
||||
*f,
|
||||
td::overloaded(
|
||||
[&](ton_api::catchain_blockNotFound &r) { VLOG(CATCHAIN_INFO) << this << ": catchain block not found"; },
|
||||
[&](ton_api::catchain_blockResult &r) { receive_block(src, std::move(r.block_), std::move(data)); }));
|
||||
|
@ -129,6 +175,11 @@ void CatChainReceiverImpl::receive_message_from_overlay(adnl::AdnlNodeIdShort sr
|
|||
VLOG(CATCHAIN_INFO) << this << ": dropping block update from blamed source " << src;
|
||||
return;
|
||||
}*/
|
||||
if (data.size() > opts_.max_serialized_block_size) {
|
||||
VLOG(CATCHAIN_WARNING) << this << ": dropping broken block from " << src << ": too big (size="
|
||||
<< data.size() << ", limit=" << opts_.max_serialized_block_size << ")";
|
||||
return;
|
||||
}
|
||||
auto R = fetch_tl_prefix<ton_api::catchain_blockUpdate>(data, true);
|
||||
if (R.is_error()) {
|
||||
VLOG(CATCHAIN_WARNING) << this << ": dropping broken block from " << src << ": " << R.move_as_error();
|
||||
|
@ -139,14 +190,14 @@ void CatChainReceiverImpl::receive_message_from_overlay(adnl::AdnlNodeIdShort sr
|
|||
receive_block(src, std::move(U->block_), std::move(data));
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::receive_broadcast_from_overlay(PublicKeyHash src, td::BufferSlice data) {
|
||||
void CatChainReceiverImpl::receive_broadcast_from_overlay(const PublicKeyHash &src, td::BufferSlice data) {
|
||||
if (!read_db_) {
|
||||
return;
|
||||
}
|
||||
callback_->on_broadcast(src, std::move(data));
|
||||
}
|
||||
|
||||
/*void CatChainReceiverImpl::send_block(PublicKeyHash src, tl_object_ptr<ton_api::catchain_block> block,
|
||||
/*void CatChainReceiverImpl::send_block(const PublicKeyHash &src, tl_object_ptr<ton_api::catchain_block> block,
|
||||
td::BufferSlice payload) {
|
||||
CHECK(read_db_);
|
||||
CHECK(src == local_id_);
|
||||
|
@ -164,7 +215,7 @@ CatChainReceivedBlock *CatChainReceiverImpl::create_block(tl_object_ptr<ton_api:
|
|||
if (block->height_ == 0) {
|
||||
return root_block_;
|
||||
}
|
||||
auto hash = CatChainReceivedBlock::block_hash(this, block, payload.as_slice());
|
||||
CatChainBlockHash hash = CatChainReceivedBlock::block_hash(this, block, payload.as_slice());
|
||||
|
||||
auto it = blocks_.find(hash);
|
||||
if (it != blocks_.end()) {
|
||||
|
@ -173,9 +224,8 @@ CatChainReceivedBlock *CatChainReceiverImpl::create_block(tl_object_ptr<ton_api:
|
|||
}
|
||||
return it->second.get();
|
||||
} else {
|
||||
blocks_.emplace(hash, CatChainReceivedBlock::create(std::move(block), std::move(payload), this));
|
||||
it = blocks_.find(hash);
|
||||
return it->second.get();
|
||||
auto r = blocks_.emplace(hash, CatChainReceivedBlock::create(std::move(block), std::move(payload), this));
|
||||
return r.first->second.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -183,7 +233,7 @@ CatChainReceivedBlock *CatChainReceiverImpl::create_block(tl_object_ptr<ton_api:
|
|||
if (block->height_ == 0) {
|
||||
return root_block_;
|
||||
}
|
||||
auto hash = CatChainReceivedBlock::block_hash(this, block);
|
||||
CatChainBlockHash hash = CatChainReceivedBlock::block_hash(this, block);
|
||||
auto it = blocks_.find(hash);
|
||||
if (it != blocks_.end()) {
|
||||
return it->second.get();
|
||||
|
@ -194,20 +244,20 @@ CatChainReceivedBlock *CatChainReceiverImpl::create_block(tl_object_ptr<ton_api:
|
|||
}
|
||||
}
|
||||
|
||||
td::Status CatChainReceiverImpl::validate_block_sync(tl_object_ptr<ton_api::catchain_block_dep> &dep) {
|
||||
td::Status CatChainReceiverImpl::validate_block_sync(const tl_object_ptr<ton_api::catchain_block_dep> &dep) const {
|
||||
TRY_STATUS_PREFIX(CatChainReceivedBlock::pre_validate_block(this, dep), "failed to validate block: ");
|
||||
|
||||
if (dep->height_ > 0) {
|
||||
auto id = CatChainReceivedBlock::block_id(this, dep);
|
||||
auto B = serialize_tl_object(id, true);
|
||||
auto block = get_block(get_tl_object_sha_bits256(id));
|
||||
td::BufferSlice B = serialize_tl_object(id, true);
|
||||
CatChainReceivedBlock *block = get_block(get_tl_object_sha_bits256(id));
|
||||
if (block) {
|
||||
return td::Status::OK();
|
||||
}
|
||||
|
||||
auto S = get_source_by_hash(PublicKeyHash{id->src_});
|
||||
CatChainReceiverSource *S = get_source_by_hash(PublicKeyHash{id->src_});
|
||||
CHECK(S != nullptr);
|
||||
auto E = S->get_encryptor_sync();
|
||||
Encryptor *E = S->get_encryptor_sync();
|
||||
CHECK(E != nullptr);
|
||||
return E->check_signature(B.as_slice(), dep->signature_.as_slice());
|
||||
} else {
|
||||
|
@ -215,17 +265,18 @@ td::Status CatChainReceiverImpl::validate_block_sync(tl_object_ptr<ton_api::catc
|
|||
}
|
||||
}
|
||||
|
||||
td::Status CatChainReceiverImpl::validate_block_sync(tl_object_ptr<ton_api::catchain_block> &block, td::Slice payload) {
|
||||
td::Status CatChainReceiverImpl::validate_block_sync(const tl_object_ptr<ton_api::catchain_block> &block,
|
||||
const td::Slice &payload) const {
|
||||
//LOG(INFO) << ton_api::to_string(block);
|
||||
TRY_STATUS_PREFIX(CatChainReceivedBlock::pre_validate_block(this, block, payload), "failed to validate block: ");
|
||||
|
||||
if (block->height_ > 0) {
|
||||
auto id = CatChainReceivedBlock::block_id(this, block, payload);
|
||||
auto B = serialize_tl_object(id, true);
|
||||
td::BufferSlice B = serialize_tl_object(id, true);
|
||||
|
||||
auto S = get_source_by_hash(PublicKeyHash{id->src_});
|
||||
CatChainReceiverSource *S = get_source_by_hash(PublicKeyHash{id->src_});
|
||||
CHECK(S != nullptr);
|
||||
auto E = S->get_encryptor_sync();
|
||||
Encryptor *E = S->get_encryptor_sync();
|
||||
CHECK(E != nullptr);
|
||||
return E->check_signature(B.as_slice(), block->signature_.as_slice());
|
||||
} else {
|
||||
|
@ -235,7 +286,7 @@ td::Status CatChainReceiverImpl::validate_block_sync(tl_object_ptr<ton_api::catc
|
|||
|
||||
void CatChainReceiverImpl::run_scheduler() {
|
||||
while (!to_run_.empty()) {
|
||||
auto B = to_run_.front();
|
||||
CatChainReceivedBlock *B = to_run_.front();
|
||||
to_run_.pop_front();
|
||||
|
||||
B->run();
|
||||
|
@ -265,7 +316,7 @@ void CatChainReceiverImpl::add_block_cont_3(tl_object_ptr<ton_api::catchain_bloc
|
|||
}
|
||||
|
||||
active_send_ = false;
|
||||
if (pending_blocks_.size() > 0) {
|
||||
if (!pending_blocks_.empty()) {
|
||||
auto B = std::move(pending_blocks_.front());
|
||||
pending_blocks_.pop_front();
|
||||
add_block(std::move(B->payload_), std::move(B->deps_));
|
||||
|
@ -278,9 +329,9 @@ void CatChainReceiverImpl::add_block_cont_2(tl_object_ptr<ton_api::catchain_bloc
|
|||
return;
|
||||
}
|
||||
|
||||
auto id = CatChainReceivedBlock::block_hash(this, block, payload);
|
||||
CatChainBlockHash id = CatChainReceivedBlock::block_hash(this, block, payload);
|
||||
|
||||
td::BufferSlice raw_data{32};
|
||||
td::BufferSlice raw_data{id.as_array().size()};
|
||||
raw_data.as_slice().copy_from(as_slice(id));
|
||||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), block = std::move(block),
|
||||
|
@ -298,9 +349,9 @@ void CatChainReceiverImpl::add_block_cont(tl_object_ptr<ton_api::catchain_block>
|
|||
add_block_cont_2(std::move(block), std::move(payload));
|
||||
return;
|
||||
}
|
||||
auto id = CatChainReceivedBlock::block_hash(this, block, payload.as_slice());
|
||||
CatChainBlockHash id = CatChainReceivedBlock::block_hash(this, block, payload.as_slice());
|
||||
|
||||
auto raw_data = serialize_tl_object(block, true, payload.as_slice());
|
||||
td::BufferSlice raw_data = serialize_tl_object(block, true, payload.as_slice());
|
||||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), block = std::move(block),
|
||||
payload = std::move(payload)](td::Result<td::Unit> R) mutable {
|
||||
|
@ -319,7 +370,7 @@ void CatChainReceiverImpl::add_block(td::BufferSlice payload, std::vector<CatCha
|
|||
}
|
||||
active_send_ = true;
|
||||
|
||||
auto S = get_source_by_hash(local_id_);
|
||||
CatChainReceiverSource *S = get_source_by_hash(local_id_);
|
||||
CHECK(S != nullptr);
|
||||
CHECK(S->get_id() == local_idx_);
|
||||
if (!intentional_fork_) {
|
||||
|
@ -331,7 +382,7 @@ void CatChainReceiverImpl::add_block(td::BufferSlice payload, std::vector<CatCha
|
|||
std::vector<tl_object_ptr<ton_api::catchain_block_dep>> deps_arr;
|
||||
deps_arr.resize(deps.size());
|
||||
for (size_t i = 0; i < deps.size(); i++) {
|
||||
auto B = get_block(deps[i]);
|
||||
CatChainReceivedBlock *B = get_block(deps[i]);
|
||||
LOG_CHECK(B != nullptr) << this << ": cannot find block with hash " << deps[i];
|
||||
if (!intentional_fork_) {
|
||||
CHECK(B->get_source_id() != local_idx_);
|
||||
|
@ -339,13 +390,13 @@ void CatChainReceiverImpl::add_block(td::BufferSlice payload, std::vector<CatCha
|
|||
deps_arr[i] = B->export_tl_dep();
|
||||
}
|
||||
|
||||
auto height = prev->height_ + 1;
|
||||
int height = prev->height_ + 1;
|
||||
auto block_data = create_tl_object<ton_api::catchain_block_data>(std::move(prev), std::move(deps_arr));
|
||||
auto block = create_tl_object<ton_api::catchain_block>(incarnation_, local_idx_, height, std::move(block_data),
|
||||
td::BufferSlice());
|
||||
|
||||
auto id = CatChainReceivedBlock::block_id(this, block, payload);
|
||||
auto id_s = serialize_tl_object(id, true);
|
||||
td::BufferSlice id_s = serialize_tl_object(id, true);
|
||||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), print_id = print_id(), block = std::move(block),
|
||||
payload = std::move(payload)](td::Result<td::BufferSlice> R) mutable {
|
||||
|
@ -362,24 +413,24 @@ void CatChainReceiverImpl::add_block(td::BufferSlice payload, std::vector<CatCha
|
|||
|
||||
void CatChainReceiverImpl::debug_add_fork_cont(tl_object_ptr<ton_api::catchain_block> block, td::BufferSlice payload) {
|
||||
validate_block_sync(block, payload.as_slice()).ensure();
|
||||
auto B = create_block(std::move(block), td::SharedSlice{payload.as_slice()});
|
||||
CatChainReceivedBlock *B = create_block(std::move(block), td::SharedSlice{payload.as_slice()});
|
||||
B->written();
|
||||
|
||||
run_scheduler();
|
||||
CHECK(B->delivered());
|
||||
|
||||
active_send_ = false;
|
||||
if (pending_blocks_.size() > 0) {
|
||||
auto B = std::move(pending_blocks_.front());
|
||||
if (!pending_blocks_.empty()) {
|
||||
auto pending_block = std::move(pending_blocks_.front());
|
||||
pending_blocks_.pop_front();
|
||||
add_block(std::move(B->payload_), std::move(B->deps_));
|
||||
add_block(std::move(pending_block->payload_), std::move(pending_block->deps_));
|
||||
}
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::debug_add_fork(td::BufferSlice payload, CatChainBlockHeight height,
|
||||
std::vector<CatChainBlockHash> deps) {
|
||||
intentional_fork_ = true;
|
||||
auto S = get_source_by_hash(local_id_);
|
||||
CatChainReceiverSource *S = get_source_by_hash(local_id_);
|
||||
CHECK(S != nullptr);
|
||||
CHECK(S->get_id() == local_idx_);
|
||||
|
||||
|
@ -399,7 +450,7 @@ void CatChainReceiverImpl::debug_add_fork(td::BufferSlice payload, CatChainBlock
|
|||
std::vector<tl_object_ptr<ton_api::catchain_block_dep>> deps_arr;
|
||||
deps_arr.resize(deps.size());
|
||||
for (size_t i = 0; i < deps.size(); i++) {
|
||||
auto B = get_block(deps[i]);
|
||||
CatChainReceivedBlock *B = get_block(deps[i]);
|
||||
LOG_CHECK(B != nullptr) << this << ": cannot find block with hash " << deps[i];
|
||||
CHECK(B->get_source_id() != local_idx_);
|
||||
deps_arr[i] = B->export_tl_dep();
|
||||
|
@ -410,7 +461,7 @@ void CatChainReceiverImpl::debug_add_fork(td::BufferSlice payload, CatChainBlock
|
|||
td::BufferSlice());
|
||||
|
||||
auto id = CatChainReceivedBlock::block_id(this, block, payload);
|
||||
auto id_s = serialize_tl_object(id, true);
|
||||
td::BufferSlice id_s = serialize_tl_object(id, true);
|
||||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), print_id = print_id(), block = std::move(block),
|
||||
payload = std::move(payload)](td::Result<td::BufferSlice> R) mutable {
|
||||
|
@ -425,29 +476,33 @@ void CatChainReceiverImpl::debug_add_fork(td::BufferSlice payload, CatChainBlock
|
|||
td::actor::send_closure_later(keyring_, &keyring::Keyring::sign_message, local_id_, std::move(id_s), std::move(P));
|
||||
}
|
||||
|
||||
CatChainReceiverImpl::CatChainReceiverImpl(std::unique_ptr<Callback> callback, CatChainOptions opts,
|
||||
CatChainReceiverImpl::CatChainReceiverImpl(std::unique_ptr<Callback> callback,
|
||||
const CatChainOptions &opts,
|
||||
td::actor::ActorId<keyring::Keyring> keyring,
|
||||
td::actor::ActorId<adnl::Adnl> adnl,
|
||||
td::actor::ActorId<overlay::Overlays> overlay_manager,
|
||||
std::vector<CatChainNode> ids, PublicKeyHash local_id,
|
||||
CatChainSessionId unique_hash, std::string db_root, std::string db_suffix,
|
||||
const std::vector<CatChainNode> &ids,
|
||||
const PublicKeyHash &local_id,
|
||||
const CatChainSessionId &unique_hash,
|
||||
std::string db_root,
|
||||
std::string db_suffix,
|
||||
bool allow_unsafe_self_blocks_resync)
|
||||
: callback_(std::move(callback))
|
||||
, opts_(std::move(opts))
|
||||
, keyring_(keyring)
|
||||
, adnl_(adnl)
|
||||
, overlay_manager_(overlay_manager)
|
||||
, opts_(opts)
|
||||
, keyring_(std::move(keyring))
|
||||
, adnl_(std::move(adnl))
|
||||
, overlay_manager_(std::move(overlay_manager))
|
||||
, local_id_(local_id)
|
||||
, db_root_(db_root)
|
||||
, db_suffix_(db_suffix)
|
||||
, db_root_(std::move(db_root))
|
||||
, db_suffix_(std::move(db_suffix))
|
||||
, allow_unsafe_self_blocks_resync_(allow_unsafe_self_blocks_resync) {
|
||||
std::vector<td::Bits256> short_ids;
|
||||
local_idx_ = static_cast<td::uint32>(ids.size());
|
||||
for (auto &id : ids) {
|
||||
td::uint32 seq = static_cast<td::uint32>(sources_.size());
|
||||
for (const CatChainNode &id : ids) {
|
||||
auto seq = static_cast<td::uint32>(sources_.size());
|
||||
auto R = CatChainReceiverSource::create(this, id.pub_key, id.adnl_id, seq);
|
||||
auto S = R.move_as_ok();
|
||||
auto h = id.pub_key.compute_short_id();
|
||||
PublicKeyHash h = id.pub_key.compute_short_id();
|
||||
short_ids.push_back(h.bits256_value());
|
||||
sources_hashes_[h] = seq;
|
||||
sources_adnl_addrs_[id.adnl_id] = seq;
|
||||
|
@ -477,12 +532,13 @@ CatChainReceiverImpl::CatChainReceiverImpl(std::unique_ptr<Callback> callback, C
|
|||
|
||||
void CatChainReceiverImpl::start_up() {
|
||||
std::vector<adnl::AdnlNodeIdShort> ids;
|
||||
ids.reserve(get_sources_cnt());
|
||||
for (td::uint32 i = 0; i < get_sources_cnt(); i++) {
|
||||
ids.push_back(get_source(i)->get_adnl_id());
|
||||
}
|
||||
std::map<PublicKeyHash, td::uint32> root_keys;
|
||||
for (td::uint32 i = 0; i < get_sources_cnt(); i++) {
|
||||
root_keys.emplace(get_source(i)->get_hash(), 16 << 20);
|
||||
root_keys.emplace(get_source(i)->get_hash(), OVERLAY_MAX_ALLOWED_PACKET_SIZE);
|
||||
}
|
||||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::create_private_overlay,
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_full_id_.clone(), std::move(ids),
|
||||
|
@ -498,13 +554,13 @@ void CatChainReceiverImpl::start_up() {
|
|||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<DbType::GetResult> R) {
|
||||
R.ensure();
|
||||
auto g = R.move_as_ok();
|
||||
DbType::GetResult g = R.move_as_ok();
|
||||
if (g.status == td::KeyValue::GetStatus::NotFound) {
|
||||
td::actor::send_closure(SelfId, &CatChainReceiverImpl::read_db);
|
||||
} else {
|
||||
auto B = std::move(g.value);
|
||||
CHECK(B.size() == 32);
|
||||
td::BufferSlice B = std::move(g.value);
|
||||
CatChainBlockHash x;
|
||||
CHECK(B.size() == x.as_array().size());
|
||||
as_slice(x).copy_from(B.as_slice());
|
||||
td::actor::send_closure(SelfId, &CatChainReceiverImpl::read_db_from, x);
|
||||
}
|
||||
|
@ -527,7 +583,7 @@ void CatChainReceiverImpl::read_db_from(CatChainBlockHash id) {
|
|||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), id](td::Result<DbType::GetResult> R) {
|
||||
R.ensure();
|
||||
auto g = R.move_as_ok();
|
||||
DbType::GetResult g = R.move_as_ok();
|
||||
CHECK(g.status == td::KeyValue::GetStatus::Ok);
|
||||
|
||||
td::actor::send_closure(SelfId, &CatChainReceiverImpl::read_block_from_db, id, std::move(g.value));
|
||||
|
@ -543,12 +599,12 @@ void CatChainReceiverImpl::read_block_from_db(CatChainBlockHash id, td::BufferSl
|
|||
F.ensure();
|
||||
|
||||
auto block = F.move_as_ok();
|
||||
auto payload = std::move(data);
|
||||
td::BufferSlice payload = std::move(data);
|
||||
|
||||
auto block_id = CatChainReceivedBlock::block_hash(this, block, payload);
|
||||
CatChainBlockHash block_id = CatChainReceivedBlock::block_hash(this, block, payload);
|
||||
CHECK(block_id == id);
|
||||
|
||||
auto B = get_block(id);
|
||||
CatChainReceivedBlock *B = get_block(id);
|
||||
if (B && B->initialized()) {
|
||||
CHECK(B->in_db());
|
||||
if (!pending_in_db_) {
|
||||
|
@ -557,7 +613,7 @@ void CatChainReceiverImpl::read_block_from_db(CatChainBlockHash id, td::BufferSl
|
|||
return;
|
||||
}
|
||||
|
||||
auto source = get_source(block->src_);
|
||||
CatChainReceiverSource *source = get_source(block->src_);
|
||||
CHECK(source != nullptr);
|
||||
|
||||
CHECK(block->incarnation_ == incarnation_);
|
||||
|
@ -565,18 +621,18 @@ void CatChainReceiverImpl::read_block_from_db(CatChainBlockHash id, td::BufferSl
|
|||
validate_block_sync(block, payload).ensure();
|
||||
|
||||
B = create_block(std::move(block), td::SharedSlice{payload.as_slice()});
|
||||
B->written();
|
||||
CHECK(B);
|
||||
B->written();
|
||||
|
||||
auto deps = B->get_dep_hashes();
|
||||
std::vector<CatChainBlockHash> deps = B->get_dep_hashes();
|
||||
deps.push_back(B->get_prev_hash());
|
||||
for (auto &dep : deps) {
|
||||
auto dep_block = get_block(dep);
|
||||
for (const CatChainBlockHash &dep : deps) {
|
||||
CatChainReceivedBlock *dep_block = get_block(dep);
|
||||
if (!dep_block || !dep_block->initialized()) {
|
||||
pending_in_db_++;
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), dep](td::Result<DbType::GetResult> R) {
|
||||
R.ensure();
|
||||
auto g = R.move_as_ok();
|
||||
DbType::GetResult g = R.move_as_ok();
|
||||
CHECK(g.status == td::KeyValue::GetStatus::Ok);
|
||||
|
||||
td::actor::send_closure(SelfId, &CatChainReceiverImpl::read_block_from_db, dep, std::move(g.value));
|
||||
|
@ -601,26 +657,28 @@ void CatChainReceiverImpl::read_db() {
|
|||
|
||||
read_db_ = true;
|
||||
|
||||
next_rotate_ = td::Timestamp::in(60 + td::Random::fast(0, 60));
|
||||
next_sync_ = td::Timestamp::in(0.001 * td::Random::fast(0, 60));
|
||||
initial_sync_complete_at_ = td::Timestamp::in(allow_unsafe_self_blocks_resync_ ? 300.0 : 5.0);
|
||||
next_rotate_ = td::Timestamp::in(td::Random::fast(NEIGHBOURS_ROTATE_INTERVAL_MIN, NEIGHBOURS_ROTATE_INTERVAL_MAX));
|
||||
next_sync_ = td::Timestamp::in(
|
||||
0.001 * td::Random::fast(NEIGHBOURS_ROTATE_INTERVAL_MIN, NEIGHBOURS_ROTATE_INTERVAL_MAX));
|
||||
initial_sync_complete_at_ = td::Timestamp::in(
|
||||
allow_unsafe_self_blocks_resync_ ? EXPECTED_UNSAFE_INITIAL_SYNC_DURATION : EXPECTED_INITIAL_SYNC_DURATION);
|
||||
alarm_timestamp().relax(next_rotate_);
|
||||
alarm_timestamp().relax(next_sync_);
|
||||
alarm_timestamp().relax(initial_sync_complete_at_);
|
||||
}
|
||||
|
||||
td::actor::ActorOwn<CatChainReceiverInterface> CatChainReceiverInterface::create(
|
||||
std::unique_ptr<Callback> callback, CatChainOptions opts, td::actor::ActorId<keyring::Keyring> keyring,
|
||||
std::unique_ptr<Callback> callback, const CatChainOptions &opts, td::actor::ActorId<keyring::Keyring> keyring,
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<overlay::Overlays> overlay_manager,
|
||||
std::vector<CatChainNode> ids, PublicKeyHash local_id, CatChainSessionId unique_hash, std::string db_root,
|
||||
std::string db_suffix, bool allow_unsafe_self_blocks_resync) {
|
||||
const std::vector<CatChainNode> &ids, const PublicKeyHash &local_id, const CatChainSessionId &unique_hash,
|
||||
std::string db_root, std::string db_suffix, bool allow_unsafe_self_blocks_resync) {
|
||||
auto A = td::actor::create_actor<CatChainReceiverImpl>(
|
||||
"catchainreceiver", std::move(callback), std::move(opts), keyring, adnl, overlay_manager, std::move(ids),
|
||||
local_id, unique_hash, db_root, db_suffix, allow_unsafe_self_blocks_resync);
|
||||
"catchainreceiver", std::move(callback), opts, std::move(keyring), std::move(adnl), std::move(overlay_manager),
|
||||
ids, local_id, unique_hash, std::move(db_root), std::move(db_suffix), allow_unsafe_self_blocks_resync);
|
||||
return std::move(A);
|
||||
}
|
||||
|
||||
CatChainReceiverSource *CatChainReceiverImpl::get_source_by_hash(PublicKeyHash source_hash) const {
|
||||
CatChainReceiverSource *CatChainReceiverImpl::get_source_by_hash(const PublicKeyHash &source_hash) const {
|
||||
auto it = sources_hashes_.find(source_hash);
|
||||
if (it == sources_hashes_.end()) {
|
||||
return nullptr;
|
||||
|
@ -650,10 +708,10 @@ void CatChainReceiverImpl::receive_query_from_overlay(adnl::AdnlNodeIdShort src,
|
|||
return;
|
||||
}
|
||||
auto f = F.move_as_ok();
|
||||
ton_api::downcast_call(*f.get(), [&](auto &obj) { this->process_query(src, obj, std::move(promise)); });
|
||||
ton_api::downcast_call(*f, [&](auto &obj) { this->process_query(src, std::move(obj), std::move(promise)); });
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlock &query,
|
||||
void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlock query,
|
||||
td::Promise<td::BufferSlice> promise) {
|
||||
auto it = blocks_.find(query.block_);
|
||||
if (it == blocks_.end() || it->second->get_height() == 0 || !it->second->initialized()) {
|
||||
|
@ -664,19 +722,20 @@ void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::cat
|
|||
}
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlocks &query,
|
||||
void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlocks query,
|
||||
td::Promise<td::BufferSlice> promise) {
|
||||
if (query.blocks_.size() > 100) {
|
||||
if (query.blocks_.size() > MAX_QUERY_BLOCKS) {
|
||||
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "too many blocks"));
|
||||
return;
|
||||
}
|
||||
td::int32 cnt = 0;
|
||||
for (auto &b : query.blocks_) {
|
||||
for (const CatChainBlockHash &b : query.blocks_) {
|
||||
auto it = blocks_.find(b);
|
||||
if (it != blocks_.end() && it->second->get_height() > 0) {
|
||||
auto block = create_tl_object<ton_api::catchain_blockUpdate>(it->second->export_tl());
|
||||
CHECK(it->second->get_payload().size() > 0);
|
||||
auto B = serialize_tl_object(block, true, it->second->get_payload().clone());
|
||||
CHECK(!it->second->get_payload().empty());
|
||||
td::BufferSlice B = serialize_tl_object(block, true, it->second->get_payload().clone());
|
||||
CHECK(B.size() <= opts_.max_serialized_block_size);
|
||||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_message, src,
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(B));
|
||||
cnt++;
|
||||
|
@ -685,19 +744,19 @@ void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::cat
|
|||
promise.set_value(serialize_tl_object(create_tl_object<ton_api::catchain_sent>(cnt), true));
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlockHistory &query,
|
||||
void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getBlockHistory query,
|
||||
td::Promise<td::BufferSlice> promise) {
|
||||
auto h = query.height_;
|
||||
int64_t h = query.height_;
|
||||
if (h <= 0) {
|
||||
promise.set_error(td::Status::Error(ErrorCode::protoviolation, "not-positive height"));
|
||||
return;
|
||||
}
|
||||
if (h > 100) {
|
||||
h = 100;
|
||||
if (h > MAX_QUERY_HEIGHT) {
|
||||
h = MAX_QUERY_HEIGHT;
|
||||
}
|
||||
std::set<CatChainBlockHash> s{query.stop_if_.begin(), query.stop_if_.end()};
|
||||
|
||||
auto B = get_block(query.block_);
|
||||
CatChainReceivedBlock *B = get_block(query.block_);
|
||||
if (B == nullptr) {
|
||||
promise.set_value(serialize_tl_object(create_tl_object<ton_api::catchain_sent>(0), true));
|
||||
return;
|
||||
|
@ -711,8 +770,9 @@ void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::cat
|
|||
break;
|
||||
}
|
||||
auto block = create_tl_object<ton_api::catchain_blockUpdate>(B->export_tl());
|
||||
CHECK(B->get_payload().size() > 0);
|
||||
auto BB = serialize_tl_object(block, true, B->get_payload().as_slice());
|
||||
CHECK(!B->get_payload().empty());
|
||||
td::BufferSlice BB = serialize_tl_object(block, true, B->get_payload().as_slice());
|
||||
CHECK(BB.size() <= opts_.max_serialized_block_size);
|
||||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_message, src,
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(BB));
|
||||
B = B->get_prev();
|
||||
|
@ -721,7 +781,7 @@ void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::cat
|
|||
promise.set_value(serialize_tl_object(create_tl_object<ton_api::catchain_sent>(cnt), true));
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getDifference &query,
|
||||
void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::catchain_getDifference query,
|
||||
td::Promise<td::BufferSlice> promise) {
|
||||
auto &vt = query.rt_;
|
||||
if (vt.size() != get_sources_cnt()) {
|
||||
|
@ -731,7 +791,7 @@ void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::cat
|
|||
}
|
||||
for (td::uint32 i = 0; i < get_sources_cnt(); i++) {
|
||||
if (vt[i] >= 0) {
|
||||
auto S = get_source(i);
|
||||
CatChainReceiverSource *S = get_source(i);
|
||||
if (S->fork_is_found()) {
|
||||
auto obj = fetch_tl_object<ton_api::catchain_block_data_fork>(S->fork_proof(), true);
|
||||
obj.ensure();
|
||||
|
@ -744,26 +804,21 @@ void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::cat
|
|||
}
|
||||
|
||||
std::vector<td::int32> my_vt(get_sources_cnt());
|
||||
td::uint64 total = 0;
|
||||
for (td::uint32 i = 0; i < get_sources_cnt(); i++) {
|
||||
if (vt[i] >= 0) {
|
||||
auto x = static_cast<CatChainBlockHeight>(vt[i]);
|
||||
auto S = get_source(i);
|
||||
if (S->delivered_height() > x) {
|
||||
total += S->delivered_height() - x;
|
||||
}
|
||||
my_vt[i] = S->delivered_height();
|
||||
CatChainReceiverSource *S = get_source(i);
|
||||
my_vt[i] = static_cast<td::int32>(S->delivered_height());
|
||||
} else {
|
||||
my_vt[i] = -1;
|
||||
}
|
||||
}
|
||||
|
||||
const td::uint32 max_send = 100;
|
||||
const td::uint32 max_send = GET_DIFFERENCE_MAX_SEND;
|
||||
|
||||
td::int32 l = 0;
|
||||
td::int32 r = max_send + 1;
|
||||
while (r - l > 1) {
|
||||
td::int32 x = (r + l) / 2;
|
||||
td::int32 left = 0;
|
||||
td::int32 right = max_send + 1;
|
||||
while (right - left > 1) {
|
||||
td::int32 x = (right + left) / 2;
|
||||
td::uint64 sum = 0;
|
||||
for (td::uint32 i = 0; i < get_sources_cnt(); i++) {
|
||||
if (vt[i] >= 0 && my_vt[i] > vt[i]) {
|
||||
|
@ -771,23 +826,23 @@ void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::cat
|
|||
}
|
||||
}
|
||||
if (sum > max_send) {
|
||||
r = x;
|
||||
right = x;
|
||||
} else {
|
||||
l = x;
|
||||
left = x;
|
||||
}
|
||||
}
|
||||
CHECK(r > 0);
|
||||
CHECK(right > 0);
|
||||
for (td::uint32 i = 0; i < get_sources_cnt(); i++) {
|
||||
if (vt[i] >= 0 && my_vt[i] > vt[i]) {
|
||||
auto S = get_source(i);
|
||||
auto t = (my_vt[i] - vt[i] > r) ? r : (my_vt[i] - vt[i]);
|
||||
CHECK(t > 0);
|
||||
CatChainReceiverSource *S = get_source(i);
|
||||
td::int32 t = (my_vt[i] - vt[i] > right) ? right : (my_vt[i] - vt[i]);
|
||||
while (t-- > 0) {
|
||||
auto M = S->get_block(++vt[i]);
|
||||
CatChainReceivedBlock *M = S->get_block(++vt[i]);
|
||||
CHECK(M != nullptr);
|
||||
auto block = create_tl_object<ton_api::catchain_blockUpdate>(M->export_tl());
|
||||
CHECK(M->get_payload().size() > 0);
|
||||
auto BB = serialize_tl_object(block, true, M->get_payload().as_slice());
|
||||
CHECK(!M->get_payload().empty());
|
||||
td::BufferSlice BB = serialize_tl_object(block, true, M->get_payload().as_slice());
|
||||
CHECK(BB.size() <= opts_.max_serialized_block_size);
|
||||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_message, src,
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(BB));
|
||||
}
|
||||
|
@ -798,7 +853,7 @@ void CatChainReceiverImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::cat
|
|||
}
|
||||
|
||||
void CatChainReceiverImpl::got_fork_proof(td::BufferSlice data) {
|
||||
auto F = fetch_tl_object<ton_api::catchain_differenceFork>(std::move(data), true);
|
||||
auto F = fetch_tl_object<ton_api::catchain_differenceFork>(data, true);
|
||||
if (F.is_error()) {
|
||||
VLOG(CATCHAIN_WARNING) << this << ": received bad fork proof: " << F.move_as_error();
|
||||
return;
|
||||
|
@ -825,7 +880,7 @@ void CatChainReceiverImpl::got_fork_proof(td::BufferSlice data) {
|
|||
return;
|
||||
}
|
||||
|
||||
auto S = get_source(f->left_->src_);
|
||||
CatChainReceiverSource *S = get_source(f->left_->src_);
|
||||
S->on_found_fork_proof(
|
||||
create_serialize_tl_object<ton_api::catchain_block_data_fork>(std::move(f->left_), std::move(f->right_)));
|
||||
S->blame();
|
||||
|
@ -835,24 +890,24 @@ void CatChainReceiverImpl::synchronize_with(CatChainReceiverSource *S) {
|
|||
CHECK(!S->blamed());
|
||||
std::vector<td::int32> rt(get_sources_cnt());
|
||||
for (td::uint32 i = 0; i < get_sources_cnt(); i++) {
|
||||
auto SS = get_source(i);
|
||||
CatChainReceiverSource *SS = get_source(i);
|
||||
if (SS->blamed()) {
|
||||
rt[i] = -1;
|
||||
} else {
|
||||
rt[i] = S->delivered_height();
|
||||
rt[i] = static_cast<td::int32>(S->delivered_height());
|
||||
}
|
||||
}
|
||||
|
||||
auto P = td::PromiseCreator::lambda(
|
||||
[SelfId = actor_id(this), src = S->get_hash(), print_id = print_id()](td::Result<td::BufferSlice> R) {
|
||||
if (R.is_error()) {
|
||||
VLOG(CATCHAIN_INFO) << print_id << ": timedout syncronize query to " << src;
|
||||
VLOG(CATCHAIN_INFO) << print_id << ": timedout synchronize query to " << src;
|
||||
return;
|
||||
}
|
||||
auto data = R.move_as_ok();
|
||||
td::BufferSlice data = R.move_as_ok();
|
||||
auto X = fetch_tl_object<ton_api::catchain_Difference>(data.clone(), true);
|
||||
if (X.is_error()) {
|
||||
VLOG(CATCHAIN_WARNING) << print_id << ": received incorrect answer to syncronize query from " << src << ": "
|
||||
VLOG(CATCHAIN_WARNING) << print_id << ": received incorrect answer to synchronize query from " << src << ": "
|
||||
<< X.move_as_error();
|
||||
return;
|
||||
}
|
||||
|
@ -866,49 +921,49 @@ void CatChainReceiverImpl::synchronize_with(CatChainReceiverSource *S) {
|
|||
});
|
||||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_query, S->get_adnl_id(),
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, "sync", std::move(P),
|
||||
td::Timestamp::in(5.0),
|
||||
td::Timestamp::in(GET_DIFFERENCE_TIMEOUT),
|
||||
serialize_tl_object(create_tl_object<ton_api::catchain_getDifference>(std::move(rt)), true));
|
||||
|
||||
if (S->delivered_height() < S->received_height()) {
|
||||
auto B = S->get_block(S->delivered_height() + 1);
|
||||
CatChainReceivedBlock *B = S->get_block(S->delivered_height() + 1);
|
||||
CHECK(B->initialized());
|
||||
|
||||
std::vector<CatChainBlockHash> vec;
|
||||
B->find_pending_deps(vec, 16);
|
||||
B->find_pending_deps(vec, MAX_PENDING_DEPS);
|
||||
|
||||
for (auto &hash : vec) {
|
||||
auto P = td::PromiseCreator::lambda(
|
||||
for (const CatChainBlockHash &hash : vec) {
|
||||
auto PP = td::PromiseCreator::lambda(
|
||||
[SelfId = actor_id(this), print_id = print_id(), src = S->get_adnl_id()](td::Result<td::BufferSlice> R) {
|
||||
if (R.is_error()) {
|
||||
VLOG(CATCHAIN_INFO) << print_id << ": timedout syncronize query to " << src;
|
||||
VLOG(CATCHAIN_INFO) << print_id << ": timedout synchronize query to " << src;
|
||||
} else {
|
||||
td::actor::send_closure(SelfId, &CatChainReceiverImpl::receive_block_answer, src, R.move_as_ok());
|
||||
}
|
||||
});
|
||||
auto query = serialize_tl_object(create_tl_object<ton_api::catchain_getBlock>(hash), true);
|
||||
td::BufferSlice query = serialize_tl_object(create_tl_object<ton_api::catchain_getBlock>(hash), true);
|
||||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_query, S->get_adnl_id(),
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, "sync blocks", std::move(P),
|
||||
td::Timestamp::in(2.0), std::move(query));
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, "sync blocks", std::move(PP),
|
||||
td::Timestamp::in(GET_BLOCK_TIMEOUT), std::move(query));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::choose_neighbours() {
|
||||
std::vector<td::uint32> n;
|
||||
n.resize(get_max_neighbours());
|
||||
n.resize(MAX_NEIGHBOURS);
|
||||
|
||||
td::uint32 size = 0;
|
||||
for (td::uint32 i = 0; i < get_sources_cnt(); i++) {
|
||||
if (i == local_idx_) {
|
||||
continue;
|
||||
}
|
||||
auto S = get_source(i);
|
||||
CatChainReceiverSource *S = get_source(i);
|
||||
if (!S->blamed()) {
|
||||
size++;
|
||||
if (size <= n.size()) {
|
||||
n[size - 1] = i;
|
||||
} else {
|
||||
td::uint32 id = td::Random::fast(0, size - 1);
|
||||
td::uint32 id = td::Random::fast(0, static_cast<td::int32>(size) - 1);
|
||||
if (id < n.size()) {
|
||||
n[id] = i;
|
||||
}
|
||||
|
@ -922,15 +977,15 @@ void CatChainReceiverImpl::choose_neighbours() {
|
|||
}
|
||||
|
||||
bool CatChainReceiverImpl::unsafe_start_up_check_completed() {
|
||||
auto S = get_source(local_idx_);
|
||||
CatChainReceiverSource *S = get_source(local_idx_);
|
||||
CHECK(!S->blamed());
|
||||
if (S->has_unreceived() || S->has_undelivered()) {
|
||||
LOG(INFO) << "catchain: has_unreceived=" << S->has_unreceived() << " has_undelivered=" << S->has_undelivered();
|
||||
run_scheduler();
|
||||
initial_sync_complete_at_ = td::Timestamp::in(60.0);
|
||||
initial_sync_complete_at_ = td::Timestamp::in(EXPECTED_INITIAL_SYNC_DURATION_WITH_UNPROCESSED);
|
||||
return false;
|
||||
}
|
||||
auto h = S->delivered_height();
|
||||
CatChainBlockHeight h = S->delivered_height();
|
||||
if (h == 0) {
|
||||
CHECK(last_sent_block_->get_height() == 0);
|
||||
CHECK(!unsafe_root_block_writing_);
|
||||
|
@ -941,20 +996,20 @@ bool CatChainReceiverImpl::unsafe_start_up_check_completed() {
|
|||
return true;
|
||||
}
|
||||
if (unsafe_root_block_writing_) {
|
||||
initial_sync_complete_at_ = td::Timestamp::in(5.0);
|
||||
initial_sync_complete_at_ = td::Timestamp::in(EXPECTED_INITIAL_SYNC_DURATION);
|
||||
LOG(INFO) << "catchain: writing=true";
|
||||
return false;
|
||||
}
|
||||
|
||||
unsafe_root_block_writing_ = true;
|
||||
auto B = S->get_block(h);
|
||||
CatChainReceivedBlock *B = S->get_block(h);
|
||||
CHECK(B != nullptr);
|
||||
CHECK(B->delivered());
|
||||
CHECK(B->in_db());
|
||||
|
||||
auto id = B->get_hash();
|
||||
CatChainBlockHash id = B->get_hash();
|
||||
|
||||
td::BufferSlice raw_data{32};
|
||||
td::BufferSlice raw_data{id.as_array().size()};
|
||||
raw_data.as_slice().copy_from(as_slice(id));
|
||||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), block = B](td::Result<td::Unit> R) mutable {
|
||||
|
@ -963,7 +1018,7 @@ bool CatChainReceiverImpl::unsafe_start_up_check_completed() {
|
|||
});
|
||||
|
||||
db_.set(CatChainBlockHash::zero(), std::move(raw_data), std::move(P), 0);
|
||||
initial_sync_complete_at_ = td::Timestamp::in(5.0);
|
||||
initial_sync_complete_at_ = td::Timestamp::in(EXPECTED_INITIAL_SYNC_DURATION);
|
||||
LOG(INFO) << "catchain: need update root";
|
||||
return false;
|
||||
}
|
||||
|
@ -977,9 +1032,9 @@ void CatChainReceiverImpl::written_unsafe_root_block(CatChainReceivedBlock *bloc
|
|||
void CatChainReceiverImpl::alarm() {
|
||||
alarm_timestamp() = td::Timestamp::never();
|
||||
if (next_sync_ && next_sync_.is_in_past()) {
|
||||
next_sync_ = td::Timestamp::in(td::Random::fast(0.1, 0.2));
|
||||
for (auto i = 0; i < 3; i++) {
|
||||
auto S = get_source(td::Random::fast(0, get_sources_cnt() - 1));
|
||||
next_sync_ = td::Timestamp::in(td::Random::fast(SYNC_INTERVAL_MIN, SYNC_INTERVAL_MAX));
|
||||
for (unsigned i = 0; i < SYNC_ITERATIONS; i++) {
|
||||
CatChainReceiverSource *S = get_source(td::Random::fast(0, static_cast<td::int32>(get_sources_cnt()) - 1));
|
||||
CHECK(S != nullptr);
|
||||
if (!S->blamed()) {
|
||||
synchronize_with(S);
|
||||
|
@ -988,7 +1043,7 @@ void CatChainReceiverImpl::alarm() {
|
|||
}
|
||||
}
|
||||
if (next_rotate_ && next_rotate_.is_in_past()) {
|
||||
next_rotate_ = td::Timestamp::in(td::Random::fast(60.0, 120.0));
|
||||
next_rotate_ = td::Timestamp::in(td::Random::fast(NEIGHBOURS_ROTATE_INTERVAL_MIN, NEIGHBOURS_ROTATE_INTERVAL_MAX));
|
||||
choose_neighbours();
|
||||
}
|
||||
if (!started_ && read_db_ && initial_sync_complete_at_ && initial_sync_complete_at_.is_in_past()) {
|
||||
|
@ -1013,58 +1068,68 @@ void CatChainReceiverImpl::send_fec_broadcast(td::BufferSlice data) {
|
|||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_broadcast_fec_ex,
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, local_id_, 0, std::move(data));
|
||||
}
|
||||
void CatChainReceiverImpl::send_custom_query_data(PublicKeyHash dst, std::string name,
|
||||
void CatChainReceiverImpl::send_custom_query_data(const PublicKeyHash &dst, std::string name,
|
||||
td::Promise<td::BufferSlice> promise, td::Timestamp timeout,
|
||||
td::BufferSlice query) {
|
||||
auto S = get_source_by_hash(dst);
|
||||
CatChainReceiverSource *S = get_source_by_hash(dst);
|
||||
CHECK(S != nullptr);
|
||||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_query, S->get_adnl_id(),
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(name), std::move(promise),
|
||||
timeout, std::move(query));
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::send_custom_query_data_via(PublicKeyHash dst, std::string name,
|
||||
void CatChainReceiverImpl::send_custom_query_data_via(const PublicKeyHash &dst, std::string name,
|
||||
td::Promise<td::BufferSlice> promise, td::Timestamp timeout,
|
||||
td::BufferSlice query, td::uint64 max_answer_size,
|
||||
td::actor::ActorId<adnl::AdnlSenderInterface> via) {
|
||||
auto S = get_source_by_hash(dst);
|
||||
CatChainReceiverSource *S = get_source_by_hash(dst);
|
||||
CHECK(S != nullptr);
|
||||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_query_via, S->get_adnl_id(),
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(name), std::move(promise),
|
||||
timeout, std::move(query), max_answer_size, via);
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::send_custom_message_data(PublicKeyHash dst, td::BufferSlice data) {
|
||||
auto S = get_source_by_hash(dst);
|
||||
void CatChainReceiverImpl::send_custom_message_data(const PublicKeyHash &dst, td::BufferSlice data) {
|
||||
CatChainReceiverSource *S = get_source_by_hash(dst);
|
||||
CHECK(S != nullptr);
|
||||
td::actor::send_closure(overlay_manager_, &overlay::Overlays::send_message, S->get_adnl_id(),
|
||||
get_source(local_idx_)->get_adnl_id(), overlay_id_, std::move(data));
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::block_written_to_db(CatChainBlockHash hash) {
|
||||
auto block = get_block(hash);
|
||||
CatChainReceivedBlock *block = get_block(hash);
|
||||
CHECK(block);
|
||||
|
||||
block->written();
|
||||
run_scheduler();
|
||||
}
|
||||
|
||||
static void destroy_db(std::string name, td::uint32 attempt) {
|
||||
static void destroy_db(const std::string& name, td::uint32 attempt) {
|
||||
auto S = td::RocksDb::destroy(name);
|
||||
if (S.is_ok()) {
|
||||
return;
|
||||
}
|
||||
if (S.is_error() && attempt >= 10) {
|
||||
LOG(ERROR) << "failed to destroy catchain " << name << ": " << S;
|
||||
} else {
|
||||
if (S.is_error()) {
|
||||
LOG(DEBUG) << "failed to destroy catchain " << name << ": " << S;
|
||||
delay_action([name, attempt]() { destroy_db(name, attempt); }, td::Timestamp::in(1.0));
|
||||
if (attempt < DESTROY_DB_MAX_ATTEMPTS) {
|
||||
delay_action([name, attempt]() { destroy_db(name, attempt + 1); }, td::Timestamp::in(DESTROY_DB_DELAY));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CatChainReceiverImpl::destroy() {
|
||||
auto name = db_root_ + "/catchainreceiver" + db_suffix_ + td::base64url_encode(as_slice(incarnation_));
|
||||
delay_action([name]() { destroy_db(name, 0); }, td::Timestamp::in(1.0));
|
||||
delay_action([name]() { destroy_db(name, 0); }, td::Timestamp::in(DESTROY_DB_DELAY));
|
||||
stop();
|
||||
}
|
||||
|
||||
td::uint64 get_max_block_height(const CatChainOptions& opts, size_t sources_cnt) {
|
||||
if (opts.max_block_height_coeff == 0) {
|
||||
return std::numeric_limits<td::uint64>::max();
|
||||
}
|
||||
return opts.max_block_height_coeff * (1 + (sources_cnt + opts.max_deps - 1) / opts.max_deps) / 1000;
|
||||
}
|
||||
|
||||
} // namespace catchain
|
||||
|
||||
} // namespace ton
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue