1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-15 04:32:21 +00:00

Merge branch testnet into block-generation

This commit is contained in:
SpyCheese 2024-05-30 11:21:39 +03:00
commit 1ee9e47007
37 changed files with 680 additions and 48 deletions

View file

@ -246,6 +246,9 @@ class HardforkCreator : public td::actor::Actor {
}
void send_shard_block_info(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::BufferSlice data) override {
}
void send_block_candidate(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) override {
}
void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override {
}
void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,

View file

@ -231,7 +231,7 @@ void OverlayImpl::update_neighbours(td::uint32 nodes_to_change) {
continue;
}
if (X->get_version() <= td::Clocks::system() - Overlays::overlay_peer_ttl()) {
if (public_ && X->get_version() <= td::Clocks::system() - Overlays::overlay_peer_ttl()) {
if (X->is_neighbour()) {
bool found = false;
for (auto &n : neighbours_) {
@ -303,7 +303,7 @@ void OverlayImpl::get_overlay_random_peers(td::uint32 max_peers,
auto t = td::Clocks::system();
while (v.size() < max_peers && v.size() < peers_.size() - bad_peers_.size()) {
auto P = peers_.get_random();
if (P->get_version() + 3600 < t) {
if (public_ && P->get_version() + 3600 < t) {
VLOG(OVERLAY_INFO) << this << ": deleting outdated peer " << P->get_id();
del_peer(P->get_id());
} else if (P->is_alive()) {

View file

@ -347,6 +347,9 @@ class TestNode : public td::actor::Actor {
}
}
}
void send_block_candidate(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) override {
}
void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override {
}
void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,

View file

@ -98,7 +98,7 @@ liteServer.getLibrariesWithProof id:tonNode.blockIdExt mode:# library_list:(vect
liteServer.getShardBlockProof id:tonNode.blockIdExt = liteServer.ShardBlockProof;
liteServer.getOutMsgQueueSizes mode:# wc:mode.0?int shard:mode.0?long = liteServer.OutMsgQueueSizes;
liteServer.nonfinal.getValidatorGroups mode:# wc:mode.0?int shard:mode.1?long = liteServer.nonfinal.ValidatorGroups;
liteServer.nonfinal.getValidatorGroups mode:# wc:mode.0?int shard:mode.0?long = liteServer.nonfinal.ValidatorGroups;
liteServer.nonfinal.getCandidate id:liteServer.nonfinal.candidateId = liteServer.nonfinal.Candidate;
liteServer.queryPrefix = Object;

Binary file not shown.

View file

@ -397,6 +397,11 @@ tonNode.blockBroadcastCompressed id:tonNode.blockIdExt catchain_seqno:int valida
tonNode.ihrMessageBroadcast message:tonNode.ihrMessage = tonNode.Broadcast;
tonNode.externalMessageBroadcast message:tonNode.externalMessage = tonNode.Broadcast;
tonNode.newShardBlockBroadcast block:tonNode.newShardBlock = tonNode.Broadcast;
// signature may be empty, at least for now
tonNode.newBlockCandidateBroadcast id:tonNode.blockIdExt catchain_seqno:int validator_set_hash:int
collator_signature:tonNode.blockSignature data:bytes = tonNode.Broadcast;
tonNode.newBlockCandidateBroadcastCompressed id:tonNode.blockIdExt catchain_seqno:int validator_set_hash:int
collator_signature:tonNode.blockSignature flags:# compressed:bytes = tonNode.Broadcast;
tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId;

Binary file not shown.

View file

@ -346,6 +346,10 @@ struct BlockSignature {
struct ReceivedBlock {
BlockIdExt id;
td::BufferSlice data;
ReceivedBlock clone() const {
return ReceivedBlock{id, data.clone()};
}
};
struct BlockBroadcast {

View file

@ -57,7 +57,7 @@ class WriteFile : public td::actor::Actor {
status = file.sync();
}
if (status.is_error()) {
td::unlink(old_name);
td::unlink(old_name).ignore();
promise_.set_error(std::move(status));
stop();
return;

View file

@ -17,10 +17,14 @@
Copyright 2017-2020 Telegram Systems LLP
*/
#include "wait-block-data.hpp"
#include "block-parse.h"
#include "block-auto.h"
#include "fabric.h"
#include "adnl/utils.hpp"
#include "ton/ton-io.hpp"
#include "common/delay.h"
#include "vm/cells/MerkleProof.h"
namespace ton {
@ -108,7 +112,7 @@ void WaitBlockData::start() {
td::actor::send_closure(SelfId, &WaitBlockData::failed_to_get_block_data_from_net,
R.move_as_error_prefix("net error: "));
} else {
td::actor::send_closure(SelfId, &WaitBlockData::got_block_data_from_net, R.move_as_ok());
td::actor::send_closure(SelfId, &WaitBlockData::got_data_from_net, R.move_as_ok());
}
});
@ -133,13 +137,49 @@ void WaitBlockData::failed_to_get_block_data_from_net(td::Status reason) {
td::Timestamp::in(0.1));
}
void WaitBlockData::got_block_data_from_net(ReceivedBlock block) {
void WaitBlockData::got_data_from_net(ReceivedBlock block) {
auto X = create_block(std::move(block));
if (X.is_error()) {
failed_to_get_block_data_from_net(X.move_as_error_prefix("bad block from net: "));
return;
}
data_ = X.move_as_ok();
got_block_data_from_net(X.move_as_ok());
}
void WaitBlockData::got_block_data_from_net(td::Ref<BlockData> block) {
if (data_.not_null()) {
return;
}
data_ = std::move(block);
if (handle_->received()) {
finish_query();
return;
}
if (!handle_->id().is_masterchain() && !handle_->inited_proof_link()) {
// This can happen if we get block from candidates cache.
// Proof link can be derived from the block (but not for masterchain block).
auto r_proof_link = generate_proof_link(handle_->id(), data_->root_cell());
if (r_proof_link.is_error()) {
abort_query(r_proof_link.move_as_error_prefix("failed to create proof link for block: "));
return;
}
td::actor::send_closure(manager_, &ValidatorManager::validate_block_proof_link, handle_->id(),
r_proof_link.move_as_ok(),
[id = handle_->id().id, SelfId = actor_id(this)](td::Result<td::Unit> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &WaitBlockData::abort_query,
R.move_as_error_prefix("validate proof link error: "));
return;
}
LOG(DEBUG) << "Created and validated proof link for " << id.to_str();
td::actor::send_closure(SelfId, &WaitBlockData::checked_proof_link);
});
return;
}
checked_proof_link();
}
void WaitBlockData::checked_proof_link() {
CHECK(handle_->id().is_masterchain() ? handle_->inited_proof() : handle_->inited_proof_link());
if (!handle_->received()) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
@ -198,6 +238,41 @@ void WaitBlockData::got_static_file(td::BufferSlice data) {
run_hardfork_accept_block_query(handle_->id(), data_, manager_, std::move(P));
}
td::Result<td::BufferSlice> WaitBlockData::generate_proof_link(BlockIdExt id, td::Ref<vm::Cell> block_root) {
// Creating proof link. Similar to accept-block.cpp
if (id.is_masterchain()) {
return td::Status::Error("cannot create proof link for masterchain block");
}
auto usage_tree = std::make_shared<vm::CellUsageTree>();
auto usage_cell = vm::UsageCell::create(block_root, usage_tree->root_ptr());
block::gen::Block::Record blk;
block::gen::BlockInfo::Record info;
block::gen::BlockExtra::Record extra;
block::gen::ExtBlkRef::Record mcref{}; // _ ExtBlkRef = BlkMasterInfo;
ShardIdFull shard;
if (!(tlb::unpack_cell(usage_cell, blk) && tlb::unpack_cell(blk.info, info) && !info.version &&
block::tlb::t_ShardIdent.unpack(info.shard.write(), shard) &&
block::gen::BlkPrevInfo{info.after_merge}.validate_ref(info.prev_ref) &&
tlb::unpack_cell(std::move(blk.extra), extra) && block::gen::t_ValueFlow.force_validate_ref(blk.value_flow) &&
(!info.not_master || tlb::unpack_cell(info.master_ref, mcref)))) {
return td::Status::Error("cannot unpack block header");
}
vm::CellSlice upd_cs{vm::NoVmSpec(), blk.state_update};
auto proof = vm::MerkleProof::generate(block_root, usage_tree.get());
vm::CellBuilder cb;
td::Ref<vm::Cell> bs_cell;
if (!(cb.store_long_bool(0xc3, 8) // block_proof#c3
&& block::tlb::t_BlockIdExt.pack(cb, id) // proof_for:BlockIdExt
&& cb.store_ref_bool(std::move(proof)) // proof:^Cell
&& cb.store_bool_bool(false) // signatures:(Maybe ^BlockSignatures)
&& cb.finalize_to(bs_cell))) {
return td::Status::Error("cannot serialize BlockProof");
}
return std_boc_serialize(bs_cell, 0);
}
} // namespace validator
} // namespace ton

View file

@ -57,11 +57,15 @@ class WaitBlockData : public td::actor::Actor {
void set_is_hardfork(bool value);
void start();
void got_block_data_from_db(td::Ref<BlockData> data);
void got_block_data_from_net(ReceivedBlock data);
void got_data_from_net(ReceivedBlock data);
void got_block_data_from_net(td::Ref<BlockData> block);
void checked_proof_link();
void failed_to_get_block_data_from_net(td::Status reason);
void got_static_file(td::BufferSlice data);
static td::Result<td::BufferSlice> generate_proof_link(BlockIdExt id, td::Ref<vm::Cell> block_root);
private:
BlockHandle handle_;

View file

@ -206,7 +206,8 @@ void WaitBlockState::got_proof_link(td::BufferSlice data) {
td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link);
} else {
LOG(INFO) << "received bad proof link: " << R.move_as_error();
td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link);
delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link); },
td::Timestamp::in(0.1));
}
});
run_check_proof_link_query(handle_->id(), R.move_as_ok(), manager_, timeout_, std::move(P));

View file

@ -52,8 +52,8 @@ td::Result<std::vector<td::Ref<ShardTopBlockDescription>>> create_new_shard_bloc
td::Ref<BlockSignatureSet> create_signature_set(std::vector<BlockSignature> sig_set);
void run_check_external_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
td::actor::ActorId<ValidatorManager> manager, td::Promise<td::Ref<ExtMessage>> promise);
void run_check_external_message(td::Ref<ExtMessage> message, td::actor::ActorId<ValidatorManager> manager,
td::Promise<td::Ref<ExtMessage>> promise);
void run_accept_block_query(BlockIdExt id, td::Ref<BlockData> data, std::vector<BlockIdExt> prev,
td::Ref<ValidatorSet> validator_set, td::Ref<BlockSignatureSet> signatures,

View file

@ -16,6 +16,8 @@
*/
#include "full-node-private-overlay-v2.hpp"
#include "checksum.h"
#include "ton/ton-tl.hpp"
#include "common/delay.h"
#include "td/utils/JsonBuilder.h"
@ -52,6 +54,40 @@ void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash src, ton_api::ton
query.block_->cc_seqno_, std::move(query.block_->data_));
}
void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash src,
ton_api::tonNode_newBlockCandidateBroadcast &query) {
process_block_candidate_broadcast(src, query);
}
void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash src,
ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) {
process_block_candidate_broadcast(src, query);
}
void FullNodePrivateOverlayV2::process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
BlockIdExt block_id;
CatchainSeqno cc_seqno;
td::uint32 validator_set_hash;
td::BufferSlice data;
auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data,
overlay::Overlays::max_fec_broadcast_size());
if (S.is_error()) {
LOG(DEBUG) << "dropped broadcast: " << S;
return;
}
if (data.size() > FullNode::max_block_size()) {
VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src;
return;
}
if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) {
VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src;
return;
}
VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate in private overlay from " << src << ": " << block_id.to_str();
td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno,
validator_set_hash, std::move(data));
}
void FullNodePrivateOverlayV2::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
@ -93,6 +129,22 @@ void FullNodePrivateOverlayV2::send_broadcast(BlockBroadcast broadcast) {
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodePrivateOverlayV2::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::BufferSlice data) {
if (!inited_) {
return;
}
auto B =
serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate in private overlay (with compression): " << block_id.to_str();
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodePrivateOverlayV2::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());

View file

@ -27,6 +27,11 @@ class FullNodePrivateOverlayV2 : public td::actor::Actor {
void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast& query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast& query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);
template <class T>
void process_broadcast(PublicKeyHash, T&) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
@ -35,6 +40,8 @@ class FullNodePrivateOverlayV2 : public td::actor::Actor {
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void start_up() override;
void tear_down() override;

View file

@ -17,6 +17,7 @@
#include "full-node-private-overlay.hpp"
#include "ton/ton-tl.hpp"
#include "common/delay.h"
#include "common/checksum.h"
#include "full-node-serializer.hpp"
namespace ton::validator::fullnode {
@ -49,6 +50,41 @@ void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::
query.block_->cc_seqno_, std::move(query.block_->data_));
}
void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src,
ton_api::tonNode_newBlockCandidateBroadcast &query) {
process_block_candidate_broadcast(src, query);
}
void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src,
ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) {
process_block_candidate_broadcast(src, query);
}
void FullNodePrivateBlockOverlay::process_block_candidate_broadcast(PublicKeyHash src,
ton_api::tonNode_Broadcast &query) {
BlockIdExt block_id;
CatchainSeqno cc_seqno;
td::uint32 validator_set_hash;
td::BufferSlice data;
auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data,
overlay::Overlays::max_fec_broadcast_size());
if (S.is_error()) {
LOG(DEBUG) << "dropped broadcast: " << S;
return;
}
if (data.size() > FullNode::max_block_size()) {
VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src;
return;
}
if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) {
VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src;
return;
}
VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate in private overlay from " << src << ": " << block_id.to_str();
td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno,
validator_set_hash, std::move(data));
}
void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
@ -77,6 +113,22 @@ void FullNodePrivateBlockOverlay::send_shard_block_info(BlockIdExt block_id, Cat
}
}
void FullNodePrivateBlockOverlay::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::BufferSlice data) {
if (!inited_) {
return;
}
auto B =
serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate in private overlay: " << block_id.to_str();
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) {
if (!inited_) {
return;
@ -199,6 +251,45 @@ void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNod
std::move(query.message_->data_), it->second);
}
void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query) {
process_block_candidate_broadcast(src, query);
}
void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src,
ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) {
process_block_candidate_broadcast(src, query);
}
void FullNodeCustomOverlay::process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
if (!block_senders_.count(adnl::AdnlNodeIdShort(src))) {
VLOG(FULL_NODE_DEBUG) << "Dropping block candidate broadcast in private overlay \"" << name_
<< "\" from unauthorized sender " << src;
return;
}
BlockIdExt block_id;
CatchainSeqno cc_seqno;
td::uint32 validator_set_hash;
td::BufferSlice data;
auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data,
overlay::Overlays::max_fec_broadcast_size());
if (S.is_error()) {
LOG(DEBUG) << "dropped broadcast: " << S;
return;
}
if (data.size() > FullNode::max_block_size()) {
VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src;
return;
}
if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) {
VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src;
return;
}
VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate in custom overlay \"" << name_ << "\" from " << src << ": "
<< block_id.to_str();
td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno,
validator_set_hash, std::move(data));
}
void FullNodeCustomOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
@ -241,6 +332,22 @@ void FullNodeCustomOverlay::send_broadcast(BlockBroadcast broadcast) {
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodeCustomOverlay::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::BufferSlice data) {
if (!inited_) {
return;
}
auto B =
serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate in custom overlay \"" << name_ << "\": " << block_id.to_str();
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodeCustomOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());

View file

@ -27,6 +27,11 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);
template <class T>
void process_broadcast(PublicKeyHash, T &) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
@ -34,6 +39,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
void receive_broadcast(PublicKeyHash src, td::BufferSlice query);
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data);
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);
void set_config(FullNodeConfig config) {
@ -98,6 +105,11 @@ class FullNodeCustomOverlay : public td::actor::Actor {
void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);
template <class T>
void process_broadcast(PublicKeyHash, T &) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
@ -106,6 +118,8 @@ class FullNodeCustomOverlay : public td::actor::Actor {
void send_external_message(td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void set_config(FullNodeConfig config) {
config_ = std::move(config);

View file

@ -152,4 +152,63 @@ td::Status deserialize_block_full(ton_api::tonNode_DataFull& obj, BlockIdExt& id
return S;
}
td::Result<td::BufferSlice> serialize_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::Slice data,
bool compression_enabled) {
if (!compression_enabled) {
return create_serialize_tl_object<ton_api::tonNode_newBlockCandidateBroadcast>(
create_tl_block_id(block_id), cc_seqno, validator_set_hash,
create_tl_object<ton_api::tonNode_blockSignature>(Bits256::zero(), td::BufferSlice()), td::BufferSlice(data));
}
TRY_RESULT(root, vm::std_boc_deserialize(data));
TRY_RESULT(data_new, vm::std_boc_serialize(root, 2));
td::BufferSlice compressed = td::lz4_compress(data_new);
VLOG(FULL_NODE_DEBUG) << "Compressing block candidate broadcast: " << data.size() << " -> " << compressed.size();
return create_serialize_tl_object<ton_api::tonNode_newBlockCandidateBroadcastCompressed>(
create_tl_block_id(block_id), cc_seqno, validator_set_hash,
create_tl_object<ton_api::tonNode_blockSignature>(Bits256::zero(), td::BufferSlice()), 0, std::move(compressed));
}
static td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_newBlockCandidateBroadcast& obj,
BlockIdExt& block_id, CatchainSeqno& cc_seqno,
td::uint32& validator_set_hash, td::BufferSlice& data) {
block_id = create_block_id(obj.id_);
cc_seqno = obj.catchain_seqno_;
validator_set_hash = obj.validator_set_hash_;
data = std::move(obj.data_);
return td::Status::OK();
}
static td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_newBlockCandidateBroadcastCompressed& obj,
BlockIdExt& block_id, CatchainSeqno& cc_seqno,
td::uint32& validator_set_hash, td::BufferSlice& data,
int max_decompressed_data_size) {
block_id = create_block_id(obj.id_);
cc_seqno = obj.catchain_seqno_;
validator_set_hash = obj.validator_set_hash_;
TRY_RESULT(decompressed, td::lz4_decompress(obj.compressed_, max_decompressed_data_size));
TRY_RESULT(root, vm::std_boc_deserialize(decompressed));
TRY_RESULT_ASSIGN(data, vm::std_boc_serialize(root, 31));
VLOG(FULL_NODE_DEBUG) << "Decompressing block candidate broadcast: " << obj.compressed_.size() << " -> "
<< data.size();
return td::Status::OK();
}
td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_Broadcast& obj, BlockIdExt& block_id,
CatchainSeqno& cc_seqno, td::uint32& validator_set_hash,
td::BufferSlice& data, int max_decompressed_data_size) {
td::Status S;
ton_api::downcast_call(obj, td::overloaded(
[&](ton_api::tonNode_newBlockCandidateBroadcast& f) {
S = deserialize_block_candidate_broadcast(f, block_id, cc_seqno, validator_set_hash,
data);
},
[&](ton_api::tonNode_newBlockCandidateBroadcastCompressed& f) {
S = deserialize_block_candidate_broadcast(f, block_id, cc_seqno, validator_set_hash,
data, max_decompressed_data_size);
},
[&](auto&) { S = td::Status::Error("unknown data type"); }));
return S;
}
} // namespace ton::validator::fullnode

View file

@ -28,4 +28,11 @@ td::Result<td::BufferSlice> serialize_block_full(const BlockIdExt& id, td::Slice
td::Status deserialize_block_full(ton_api::tonNode_DataFull& obj, BlockIdExt& id, td::BufferSlice& proof,
td::BufferSlice& data, bool& is_proof_link, int max_decompressed_data_size);
td::Result<td::BufferSlice> serialize_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::Slice data,
bool compression_enabled);
td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_Broadcast& obj, BlockIdExt& block_id,
CatchainSeqno& cc_seqno, td::uint32& validator_set_hash,
td::BufferSlice& data, int max_decompressed_data_size);
} // namespace ton::validator::fullnode

View file

@ -17,6 +17,7 @@
Copyright 2017-2020 Telegram Systems LLP
*/
#include "auto/tl/ton_api.h"
#include "checksum.h"
#include "overlays.h"
#include "td/utils/SharedSlice.h"
#include "td/utils/overloaded.h"
@ -24,6 +25,7 @@
#include "full-node-shard-queries.hpp"
#include "full-node-serializer.hpp"
#include "td/utils/buffer.h"
#include "ton/ton-shard.h"
#include "ton/ton-tl.hpp"
@ -739,6 +741,35 @@ void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ne
query.block_->cc_seqno_, std::move(query.block_->data_));
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query) {
process_block_candidate_broadcast(src, query);
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src,
ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) {
process_block_candidate_broadcast(src, query);
}
void FullNodeShardImpl::process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
BlockIdExt block_id;
CatchainSeqno cc_seqno;
td::uint32 validator_set_hash;
td::BufferSlice data;
auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data,
overlay::Overlays::max_fec_broadcast_size());
if (data.size() > FullNode::max_block_size()) {
VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src;
return;
}
if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) {
VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src;
return;
}
VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate from " << src << ": " << block_id.to_str();
td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno,
validator_set_hash, std::move(data));
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
process_block_broadcast(src, query);
}
@ -839,6 +870,23 @@ void FullNodeShardImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno
}
}
void FullNodeShardImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) {
if (!client_.empty()) {
UNREACHABLE();
return;
}
auto B =
serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate: " << block_id.to_str();
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, adnl_id_, overlay_id_, local_id_,
overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodeShardImpl::send_broadcast(BlockBroadcast broadcast) {
if (!client_.empty()) {
UNREACHABLE();

View file

@ -48,6 +48,8 @@ class FullNodeShard : public td::actor::Actor {
virtual void send_ihr_message(td::BufferSlice data) = 0;
virtual void send_external_message(td::BufferSlice data) = 0;
virtual void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0;
virtual void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) = 0;
virtual void send_broadcast(BlockBroadcast broadcast) = 0;
virtual void sign_overlay_certificate(PublicKeyHash signed_key, td::uint32 expiry_at, td::uint32 max_size,

View file

@ -18,6 +18,7 @@
*/
#pragma once
#include "auto/tl/ton_api.h"
#include "full-node-shard.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/port/Poll.h"
@ -167,6 +168,11 @@ class FullNodeShardImpl : public FullNodeShard {
void process_broadcast(PublicKeyHash src, ton_api::tonNode_ihrMessageBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);
void receive_broadcast(PublicKeyHash src, td::BufferSlice query);
void check_broadcast(PublicKeyHash src, td::BufferSlice query, td::Promise<td::Unit> promise);
void get_stats_extra(td::Promise<std::string> promise);
@ -175,6 +181,8 @@ class FullNodeShardImpl : public FullNodeShard {
void send_ihr_message(td::BufferSlice data) override;
void send_external_message(td::BufferSlice data) override;
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override;
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) override;
void send_broadcast(BlockBroadcast broadcast) override;
void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,

View file

@ -352,6 +352,29 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s
td::actor::send_closure(shard, &FullNodeShard::send_shard_block_info, block_id, cc_seqno, std::move(data));
}
void FullNodeImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) {
send_block_candidate_broadcast_to_custom_overlays(block_id, cc_seqno, validator_set_hash, data);
auto shard = get_shard(ShardIdFull{masterchainId, shardIdAll});
if (shard.empty()) {
VLOG(FULL_NODE_WARNING) << "dropping OUT shard block info message to unknown shard";
return;
}
/*if (!private_block_overlays_.empty()) {
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_block_candidate,
block_id, cc_seqno, validator_set_hash, data.clone());
}*/
auto private_overlay = private_block_overlays_.choose_overlay(block_id.shard_full());
if (!private_overlay.empty()) {
td::actor::send_closure(private_overlay, &FullNodePrivateOverlayV2::send_block_candidate, block_id, cc_seqno,
validator_set_hash, data.clone());
}
if (broadcast_block_candidates_in_public_overlay_) {
td::actor::send_closure(shard, &FullNodeShard::send_block_candidate, block_id, cc_seqno, validator_set_hash,
std::move(data));
}
}
void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) {
send_block_broadcast_to_custom_overlays(broadcast);
if (custom_overlays_only) {
@ -362,7 +385,7 @@ void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, bool custom_overlays
VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard";
return;
}
/*if (!private_block_overlays_.empty()) {
/*if (broadcast.block_id.is_masterchain() && !private_block_overlays_.empty()) {
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_broadcast,
broadcast.clone());
}*/
@ -575,6 +598,14 @@ void FullNodeImpl::process_block_broadcast(BlockBroadcast broadcast) {
});
}
void FullNodeImpl::process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::BufferSlice data) {
send_block_candidate_broadcast_to_custom_overlays(block_id, cc_seqno, validator_set_hash, data);
// ignore cc_seqno and validator_hash for now
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_block_candidate, block_id,
std::move(data));
}
void FullNodeImpl::start_up() {
add_shard_actor(ShardIdFull{masterchainId}, FullNodeShardMode::active);
if (local_id_.is_zero()) {
@ -606,6 +637,11 @@ void FullNodeImpl::start_up() {
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override {
td::actor::send_closure(id_, &FullNodeImpl::send_shard_block_info, block_id, cc_seqno, std::move(data));
}
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) override {
td::actor::send_closure(id_, &FullNodeImpl::send_block_candidate, block_id, cc_seqno, validator_set_hash,
std::move(data));
}
void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override {
td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), custom_overlays_only);
}
@ -745,6 +781,29 @@ void FullNodeImpl::send_block_broadcast_to_custom_overlays(const BlockBroadcast&
}
}
void FullNodeImpl::send_block_candidate_broadcast_to_custom_overlays(const BlockIdExt &block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash,
const td::BufferSlice &data) {
// Same cache of sent broadcasts as in send_block_broadcast_to_custom_overlays
if (!custom_overlays_sent_broadcasts_.insert(block_id).second) {
return;
}
custom_overlays_sent_broadcasts_lru_.push(block_id);
if (custom_overlays_sent_broadcasts_lru_.size() > 256) {
custom_overlays_sent_broadcasts_.erase(custom_overlays_sent_broadcasts_lru_.front());
custom_overlays_sent_broadcasts_lru_.pop();
}
for (auto &private_overlay : custom_overlays_) {
for (auto &actor : private_overlay.second.actors_) {
auto local_id = actor.first;
if (private_overlay.second.params_.block_senders_.count(local_id)) {
td::actor::send_closure(actor.second, &FullNodeCustomOverlay::send_block_candidate, block_id, cc_seqno,
validator_set_hash, data.clone());
}
}
}
}
FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash,
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,

View file

@ -88,6 +88,8 @@ class FullNode : public td::actor::Actor {
virtual void del_custom_overlay(std::string name, td::Promise<td::Unit> promise) = 0;
virtual void process_block_broadcast(BlockBroadcast broadcast) = 0;
virtual void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::BufferSlice data) = 0;
static constexpr td::uint32 max_block_size() {
return 4 << 20;

View file

@ -69,6 +69,8 @@ class FullNodeImpl : public FullNode {
void send_ihr_message(AccountIdPrefixFull dst, td::BufferSlice data);
void send_ext_message(AccountIdPrefixFull dst, td::BufferSlice data);
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqnp, td::BufferSlice data);
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only);
void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise<ReceivedBlock> promise);
void download_zero_state(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
@ -90,6 +92,8 @@ class FullNodeImpl : public FullNode {
void new_key_block(BlockHandle handle);
void process_block_broadcast(BlockBroadcast broadcast) override;
void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) override;
void start_up() override;
@ -147,6 +151,7 @@ class FullNodeImpl : public FullNode {
void set_private_block_overlays_enable_compression(bool value);
void create_private_block_overlay(PublicKeyHash key);
*/
bool broadcast_block_candidates_in_public_overlay_ = false;
struct CustomOverlayInfo {
CustomOverlayParams params_;
@ -158,10 +163,11 @@ class FullNodeImpl : public FullNode {
void update_private_overlays();
// void set_private_block_overlays_enable_compression(bool value);
// void create_private_block_overlay(PublicKeyHash key);
void update_custom_overlay(CustomOverlayInfo& overlay);
void send_block_broadcast_to_custom_overlays(const BlockBroadcast& broadcast);
void send_block_candidate_broadcast_to_custom_overlays(const BlockIdExt& block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, const td::BufferSlice& data);
FullNodePrivateBlockOverlays private_block_overlays_;
};

View file

@ -5222,12 +5222,13 @@ bool Collator::create_block_candidate() {
td::actor::send_closure_later(actor_id(this), &Collator::return_block_candidate, td::Unit());
} else {
LOG(INFO) << "saving new BlockCandidate";
td::actor::send_closure_later(manager, &ValidatorManager::set_block_candidate, block_candidate->id,
block_candidate->clone(), [self = get_self()](td::Result<td::Unit> saved) -> void {
LOG(DEBUG) << "got answer to set_block_candidate";
td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate,
std::move(saved));
});
td::actor::send_closure_later(
manager, &ValidatorManager::set_block_candidate, block_candidate->id, block_candidate->clone(),
validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(),
[self = get_self()](td::Result<td::Unit> saved) -> void {
LOG(DEBUG) << "got answer to set_block_candidate";
td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, std::move(saved));
});
}
// 5. communicate about bad and delayed external messages
if (!bad_ext_msgs_.empty() || !delay_ext_msgs_.empty()) {

View file

@ -86,24 +86,18 @@ td::Result<Ref<ExtMessageQ>> ExtMessageQ::create_ext_message(td::BufferSlice dat
return Ref<ExtMessageQ>{true, std::move(data), std::move(ext_msg), dest_prefix, wc, addr};
}
void ExtMessageQ::run_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
td::actor::ActorId<ton::validator::ValidatorManager> manager,
void ExtMessageQ::run_message(td::Ref<ExtMessage> message, td::actor::ActorId<ton::validator::ValidatorManager> manager,
td::Promise<td::Ref<ExtMessage>> promise) {
auto R = create_ext_message(std::move(data), limits);
if (R.is_error()) {
return promise.set_error(R.move_as_error_prefix("failed to parse external message "));
}
auto M = R.move_as_ok();
auto root = M->root_cell();
auto root = message->root_cell();
block::gen::CommonMsgInfo::Record_ext_in_msg_info info;
tlb::unpack_cell_inexact(root, info); // checked in create message
ton::StdSmcAddress addr = M->addr();
ton::WorkchainId wc = M->wc();
ton::StdSmcAddress addr = message->addr();
ton::WorkchainId wc = message->wc();
run_fetch_account_state(
wc, addr, manager,
[promise = std::move(promise), msg_root = root, wc, addr,
M](td::Result<std::tuple<td::Ref<vm::CellSlice>, UnixTime, LogicalTime, std::unique_ptr<block::ConfigInfo>>>
[promise = std::move(promise), msg_root = root, wc, addr, message](
td::Result<std::tuple<td::Ref<vm::CellSlice>, UnixTime, LogicalTime, std::unique_ptr<block::ConfigInfo>>>
res) mutable {
if (res.is_error()) {
promise.set_error(td::Status::Error(PSLICE() << "Failed to get account state"));
@ -120,7 +114,7 @@ void ExtMessageQ::run_message(td::BufferSlice data, block::SizeLimitsConfig::Ext
} else {
auto status = run_message_on_account(wc, &acc, utime, lt + 1, msg_root, std::move(config));
if (status.is_ok()) {
promise.set_value(std::move(M));
promise.set_value(std::move(message));
} else {
promise.set_error(td::Status::Error(PSLICE() << "External message was not accepted\n"
<< status.message()));

View file

@ -61,8 +61,7 @@ class ExtMessageQ : public ExtMessage {
ton::StdSmcAddress addr);
static td::Result<td::Ref<ExtMessageQ>> create_ext_message(td::BufferSlice data,
block::SizeLimitsConfig::ExtMsgLimits limits);
static void run_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
td::actor::ActorId<ton::validator::ValidatorManager> manager,
static void run_message(td::Ref<ExtMessage> message, td::actor::ActorId<ton::validator::ValidatorManager> manager,
td::Promise<td::Ref<ExtMessage>> promise);
static td::Status run_message_on_account(ton::WorkchainId wc,
block::Account* acc,

View file

@ -119,10 +119,9 @@ td::Result<td::Ref<ExtMessage>> create_ext_message(td::BufferSlice data,
return std::move(res);
}
void run_check_external_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
td::actor::ActorId<ValidatorManager> manager,
void run_check_external_message(Ref<ExtMessage> message, td::actor::ActorId<ValidatorManager> manager,
td::Promise<td::Ref<ExtMessage>> promise) {
ExtMessageQ::run_message(std::move(data), limits, std::move(manager), std::move(promise));
ExtMessageQ::run_message(std::move(message), std::move(manager), std::move(promise));
}
td::Result<td::Ref<IhrMessage>> create_ihr_message(td::BufferSlice data) {

View file

@ -6456,7 +6456,8 @@ bool ValidateQuery::save_candidate() {
}
});
td::actor::send_closure(manager, &ValidatorManager::set_block_candidate, id_, block_candidate.clone(), std::move(P));
td::actor::send_closure(manager, &ValidatorManager::set_block_candidate, id_, block_candidate.clone(),
validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(), std::move(P));
return true;
}

View file

@ -90,7 +90,8 @@ class ValidatorManager : public ValidatorManagerInterface {
virtual void wait_block_signatures_short(BlockIdExt id, td::Timestamp timeout,
td::Promise<td::Ref<BlockSignatureSet>> promise) = 0;
virtual void set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise<td::Unit> promise) = 0;
virtual void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) = 0;
virtual void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority,
td::Timestamp timeout, td::Promise<td::Ref<ShardState>> promise) = 0;

View file

@ -775,7 +775,8 @@ void ValidatorManagerImpl::set_next_block(BlockIdExt block_id, BlockIdExt next,
get_block_handle(block_id, true, std::move(P));
}
void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise<td::Unit> promise) {
void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) {
td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise));
}

View file

@ -130,6 +130,8 @@ class ValidatorManagerImpl : public ValidatorManager {
}
void new_ihr_message(td::BufferSlice data) override;
void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override;
void new_block_candidate(BlockIdExt block_id, td::BufferSlice data) override {
}
void add_ext_server_id(adnl::AdnlNodeIdShort id) override {
UNREACHABLE();
@ -181,7 +183,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void wait_block_signatures_short(BlockIdExt id, td::Timestamp timeout,
td::Promise<td::Ref<BlockSignatureSet>> promise) override;
void set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise<td::Unit> promise) override;
void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) override;
void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<ShardState>> promise) override;

View file

@ -152,6 +152,9 @@ class ValidatorManagerImpl : public ValidatorManager {
void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override {
UNREACHABLE();
}
void new_block_candidate(BlockIdExt block_id, td::BufferSlice data) override {
UNREACHABLE();
}
void add_ext_server_id(adnl::AdnlNodeIdShort id) override {
UNREACHABLE();
@ -219,7 +222,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void wait_block_signatures_short(BlockIdExt id, td::Timestamp timeout,
td::Promise<td::Ref<BlockSignatureSet>> promise) override;
void set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise<td::Unit> promise) override {
void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) override {
promise.set_value(td::Unit());
}

View file

@ -17,6 +17,8 @@
Copyright 2017-2020 Telegram Systems LLP
*/
#include "manager.hpp"
#include "checksum.h"
#include "td/utils/buffer.h"
#include "validator-group.hpp"
#include "downloaders/wait-block-state.hpp"
#include "downloaders/wait-block-state-merge.hpp"
@ -412,14 +414,42 @@ void ValidatorManagerImpl::add_external_message(td::Ref<ExtMessage> msg, int pri
ext_messages_hashes_[id.hash] = {priority, id};
}
void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) {
++ls_stats_check_ext_messages_;
auto state = do_get_last_liteserver_state();
if (state.is_null()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "not ready"));
return;
}
run_check_external_message(std::move(data), state->get_ext_msg_limits(), actor_id(this),
std::move(promise));
auto R = create_ext_message(std::move(data), state->get_ext_msg_limits());
if (R.is_error()) {
promise.set_error(R.move_as_error_prefix("failed to parse external message: "));
return;
}
auto message = R.move_as_ok();
WorkchainId wc = message->wc();
StdSmcAddress addr = message->addr();
if (checked_ext_msg_counter_.get_msg_count(wc, addr) >= max_ext_msg_per_addr()) {
promise.set_error(
td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex()));
return;
}
promise = [self = this, wc, addr, promise = std::move(promise),
SelfId = actor_id(this)](td::Result<td::Ref<ExtMessage>> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error());
return;
}
td::actor::send_lambda(SelfId, [=, promise = std::move(promise), message = R.move_as_ok()]() mutable {
if (self->checked_ext_msg_counter_.inc_msg_count(wc, addr) > max_ext_msg_per_addr()) {
promise.set_error(
td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex()));
return;
}
promise.set_result(std::move(message));
});
};
++ls_stats_check_ext_messages_;
run_check_external_message(std::move(message), actor_id(this), std::move(promise));
}
void ValidatorManagerImpl::new_ihr_message(td::BufferSlice data) {
@ -466,6 +496,17 @@ void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc
actor_id(this), td::Timestamp::in(2.0), std::move(P));
}
void ValidatorManagerImpl::new_block_candidate(BlockIdExt block_id, td::BufferSlice data) {
if (!last_masterchain_block_handle_) {
VLOG(VALIDATOR_DEBUG) << "dropping top shard block broadcast: not inited";
return;
}
if (!started_) {
return;
}
add_cached_block_candidate(ReceivedBlock{block_id, std::move(data)});
}
void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDescription> desc) {
if (desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{desc->shard(), desc->catchain_seqno()});
@ -535,6 +576,36 @@ void ValidatorManagerImpl::loaded_msg_queue_to_masterchain(td::Ref<ShardTopBlock
}
}
void ValidatorManagerImpl::add_cached_block_candidate(ReceivedBlock block) {
BlockIdExt id = block.id;
if (block.id.is_masterchain()) {
return;
}
if (cached_block_candidates_.emplace(id, std::move(block)).second) {
cached_block_candidates_lru_.push_back(id);
{
auto it = wait_block_data_.find(id);
if (it != wait_block_data_.end()) {
auto r_block = create_block(cached_block_candidates_[id].clone());
if (r_block.is_ok()) {
td::actor::send_closure(it->second.actor_, &WaitBlockData::got_block_data_from_net, r_block.move_as_ok());
}
}
}
{
auto it = wait_state_.find(id);
if (it != wait_state_.end()) {
// Proof link is not ready at this point, but this will force WaitBlockState to redo send_get_proof_link_request
td::actor::send_closure(it->second.actor_, &WaitBlockState::after_get_proof_link);
}
}
}
if (cached_block_candidates_lru_.size() > max_cached_candidates()) {
CHECK(cached_block_candidates_.erase(cached_block_candidates_lru_.front()));
cached_block_candidates_lru_.pop_front();
}
}
void ValidatorManagerImpl::add_ext_server_id(adnl::AdnlNodeIdShort id) {
class Cb : public adnl::Adnl::Callback {
private:
@ -1303,11 +1374,16 @@ void ValidatorManagerImpl::set_next_block(BlockIdExt block_id, BlockIdExt next,
get_block_handle(block_id, true, std::move(P));
}
void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise<td::Unit> promise) {
void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) {
if (!candidates_buffer_.empty()) {
td::actor::send_closure(candidates_buffer_, &CandidatesBuffer::add_new_candidate, id,
PublicKey{pubkeys::Ed25519{candidate.pubkey.as_bits256()}}, candidate.collated_file_hash);
}
if (!id.is_masterchain()) {
add_cached_block_candidate(ReceivedBlock{id, candidate.data.clone()});
callback_->send_block_candidate(id, cc_seqno, validator_set_hash, candidate.data.clone());
}
td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise));
}
@ -1564,6 +1640,13 @@ void ValidatorManagerImpl::get_last_liteserver_state_block(
void ValidatorManagerImpl::send_get_block_request(BlockIdExt id, td::uint32 priority,
td::Promise<ReceivedBlock> promise) {
{
auto it = cached_block_candidates_.find(id);
if (it != cached_block_candidates_.end()) {
LOG(DEBUG) << "send_get_block_request: got result from candidates cache for " << id.to_str();
return promise.set_value(it->second.clone());
}
}
callback_->download_block(id, priority, td::Timestamp::in(10.0), std::move(promise));
}
@ -1586,6 +1669,20 @@ void ValidatorManagerImpl::send_get_block_proof_request(BlockIdExt block_id, td:
void ValidatorManagerImpl::send_get_block_proof_link_request(BlockIdExt block_id, td::uint32 priority,
td::Promise<td::BufferSlice> promise) {
if (!block_id.is_masterchain()) {
auto it = cached_block_candidates_.find(block_id);
if (it != cached_block_candidates_.end()) {
// Proof link can be created from the cached block candidate
LOG(DEBUG) << "send_get_block_proof_link_request: creating proof link from cached caniddate for "
<< block_id.to_str();
TRY_RESULT_PROMISE_PREFIX(promise, block_root, vm::std_boc_deserialize(it->second.data),
"failed to create proof link: ");
TRY_RESULT_PROMISE_PREFIX(promise, proof_link, WaitBlockData::generate_proof_link(it->second.id, block_root),
"failed to create proof link: ");
promise.set_result(std::move(proof_link));
return;
}
}
callback_->download_block_proof_link(block_id, priority, td::Timestamp::in(10.0), std::move(promise));
}
@ -2702,6 +2799,16 @@ void ValidatorManagerImpl::alarm() {
log_ls_stats_at_ = td::Timestamp::in(60.0);
}
alarm_timestamp().relax(log_ls_stats_at_);
if (cleanup_mempool_at_.is_in_past()) {
if (is_validator()) {
get_external_messages(ShardIdFull{masterchainId, shardIdAll},
[](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
get_external_messages(ShardIdFull{basechainId, shardIdAll},
[](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
}
cleanup_mempool_at_ = td::Timestamp::in(250.0);
}
alarm_timestamp().relax(cleanup_mempool_at_);
}
void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) {
@ -3360,6 +3467,29 @@ td::actor::ActorOwn<ValidatorManagerInterface> ValidatorManagerFactory::create(
rldp, overlays);
}
size_t ValidatorManagerImpl::CheckedExtMsgCounter::get_msg_count(WorkchainId wc, StdSmcAddress addr) {
before_query();
auto it1 = counter_cur_.find({wc, addr});
auto it2 = counter_prev_.find({wc, addr});
return (it1 == counter_cur_.end() ? 0 : it1->second) + (it2 == counter_prev_.end() ? 0 : it2->second);
}
size_t ValidatorManagerImpl::CheckedExtMsgCounter::inc_msg_count(WorkchainId wc, StdSmcAddress addr) {
before_query();
auto it2 = counter_prev_.find({wc, addr});
return (it2 == counter_prev_.end() ? 0 : it2->second) + ++counter_cur_[{wc, addr}];
}
void ValidatorManagerImpl::CheckedExtMsgCounter::before_query() {
while (cleanup_at_.is_in_past()) {
counter_prev_ = std::move(counter_cur_);
counter_cur_.clear();
if (counter_prev_.empty()) {
cleanup_at_ = td::Timestamp::in(max_ext_msg_per_addr_time_window() / 2.0);
break;
}
cleanup_at_ += max_ext_msg_per_addr_time_window() / 2.0;
}
}
} // namespace validator
} // namespace ton

View file

@ -18,10 +18,14 @@
*/
#pragma once
#include "common/refcnt.hpp"
#include "interfaces/validator-manager.h"
#include "interfaces/db.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/SharedSlice.h"
#include "td/utils/buffer.h"
#include "td/utils/port/Poll.h"
#include "td/utils/port/StdStreams.h"
#include "validator-group.hpp"
#include "shard-client.hpp"
#include "manager-init.h"
@ -238,6 +242,9 @@ class ValidatorManagerImpl : public ValidatorManager {
std::map<ShardTopBlockDescriptionId, ShardTopBlock> shard_blocks_;
std::map<BlockIdExt, td::Ref<OutMsgQueueProof>> cached_msg_queue_to_masterchain_;
std::map<BlockIdExt, ReceivedBlock> cached_block_candidates_;
std::list<BlockIdExt> cached_block_candidates_lru_;
struct ExtMessages {
std::map<MessageId<ExtMessage>, std::unique_ptr<MessageExt<ExtMessage>>> ext_messages_;
std::map<std::pair<ton::WorkchainId, ton::StdSmcAddress>, std::map<ExtMessage::Hash, MessageId<ExtMessage>>>
@ -251,10 +258,20 @@ class ValidatorManagerImpl : public ValidatorManager {
};
std::map<int, ExtMessages> ext_msgs_; // priority -> messages
std::map<ExtMessage::Hash, std::pair<int, MessageId<ExtMessage>>> ext_messages_hashes_; // hash -> priority
td::Timestamp cleanup_mempool_at_;
// IHR ?
std::map<MessageId<IhrMessage>, std::unique_ptr<MessageExt<IhrMessage>>> ihr_messages_;
std::map<IhrMessage::Hash, MessageId<IhrMessage>> ihr_messages_hashes_;
struct CheckedExtMsgCounter {
std::map<std::pair<WorkchainId, StdSmcAddress>, size_t> counter_cur_, counter_prev_;
td::Timestamp cleanup_at_ = td::Timestamp::now();
size_t get_msg_count(WorkchainId wc, StdSmcAddress addr);
size_t inc_msg_count(WorkchainId wc, StdSmcAddress addr);
void before_query();
} checked_ext_msg_counter_;
private:
// VALIDATOR GROUPS
ValidatorSessionId get_validator_set_id(ShardIdFull shard, td::Ref<ValidatorSet> val_set, td::Bits256 opts_hash,
@ -383,6 +400,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void new_ihr_message(td::BufferSlice data) override;
void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override;
void new_block_candidate(BlockIdExt block_id, td::BufferSlice data) override;
void add_ext_server_id(adnl::AdnlNodeIdShort id) override;
void add_ext_server_port(td::uint16 port) override;
@ -429,7 +447,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void wait_block_signatures_short(BlockIdExt id, td::Timestamp timeout,
td::Promise<td::Ref<BlockSignatureSet>> promise) override;
void set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise<td::Unit> promise) override;
void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::Promise<td::Unit> promise) override;
void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<ShardState>> promise) override;
@ -526,6 +545,7 @@ class ValidatorManagerImpl : public ValidatorManager {
}
void add_shard_block_description(td::Ref<ShardTopBlockDescription> desc);
void add_cached_block_candidate(ReceivedBlock block);
void preload_msg_queue_to_masterchain(td::Ref<ShardTopBlockDescription> desc);
void loaded_msg_queue_to_masterchain(td::Ref<ShardTopBlockDescription> desc, td::Ref<OutMsgQueueProof> res);
@ -710,6 +730,16 @@ class ValidatorManagerImpl : public ValidatorManager {
double max_mempool_num() const {
return opts_->max_mempool_num();
}
size_t max_cached_candidates() const {
return 128;
}
static double max_ext_msg_per_addr_time_window() {
return 10.0;
}
static size_t max_ext_msg_per_addr() {
return 3 * 10;
}
void cleanup_last_validated_blocks(BlockId new_block);
void got_persistent_state_descriptions(std::vector<td::Ref<PersistentStateDescription>> descs);

View file

@ -143,6 +143,8 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void send_ihr_message(AccountIdPrefixFull dst, td::BufferSlice data) = 0;
virtual void send_ext_message(AccountIdPrefixFull dst, td::BufferSlice data) = 0;
virtual void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0;
virtual void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data) = 0;
virtual void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only = false) = 0;
virtual void download_block(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<ReceivedBlock> promise) = 0;
@ -214,6 +216,7 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) = 0;
virtual void new_ihr_message(td::BufferSlice data) = 0;
virtual void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0;
virtual void new_block_candidate(BlockIdExt block_id, td::BufferSlice data) = 0;
virtual void add_ext_server_id(adnl::AdnlNodeIdShort id) = 0;
virtual void add_ext_server_port(td::uint16 port) = 0;