diff --git a/create-hardfork/create-hardfork.cpp b/create-hardfork/create-hardfork.cpp index 0687f761..77fe7155 100644 --- a/create-hardfork/create-hardfork.cpp +++ b/create-hardfork/create-hardfork.cpp @@ -246,6 +246,9 @@ class HardforkCreator : public td::actor::Actor { } void send_shard_block_info(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::BufferSlice data) override { } + void send_block_candidate(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) override { + } void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override { } void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, diff --git a/overlay/overlay-peers.cpp b/overlay/overlay-peers.cpp index 8c4f5d22..77fd1bbe 100644 --- a/overlay/overlay-peers.cpp +++ b/overlay/overlay-peers.cpp @@ -231,7 +231,7 @@ void OverlayImpl::update_neighbours(td::uint32 nodes_to_change) { continue; } - if (X->get_version() <= td::Clocks::system() - Overlays::overlay_peer_ttl()) { + if (public_ && X->get_version() <= td::Clocks::system() - Overlays::overlay_peer_ttl()) { if (X->is_neighbour()) { bool found = false; for (auto &n : neighbours_) { @@ -303,7 +303,7 @@ void OverlayImpl::get_overlay_random_peers(td::uint32 max_peers, auto t = td::Clocks::system(); while (v.size() < max_peers && v.size() < peers_.size() - bad_peers_.size()) { auto P = peers_.get_random(); - if (P->get_version() + 3600 < t) { + if (public_ && P->get_version() + 3600 < t) { VLOG(OVERLAY_INFO) << this << ": deleting outdated peer " << P->get_id(); del_peer(P->get_id()); } else if (P->is_alive()) { diff --git a/test/test-ton-collator.cpp b/test/test-ton-collator.cpp index e742753b..e3bac4e1 100644 --- a/test/test-ton-collator.cpp +++ b/test/test-ton-collator.cpp @@ -347,6 +347,9 @@ class TestNode : public td::actor::Actor { } } } + void send_block_candidate(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) override { + } void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override { } void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, diff --git a/tl/generate/scheme/lite_api.tl b/tl/generate/scheme/lite_api.tl index d74e8953..6d91be8f 100644 --- a/tl/generate/scheme/lite_api.tl +++ b/tl/generate/scheme/lite_api.tl @@ -98,7 +98,7 @@ liteServer.getLibrariesWithProof id:tonNode.blockIdExt mode:# library_list:(vect liteServer.getShardBlockProof id:tonNode.blockIdExt = liteServer.ShardBlockProof; liteServer.getOutMsgQueueSizes mode:# wc:mode.0?int shard:mode.0?long = liteServer.OutMsgQueueSizes; -liteServer.nonfinal.getValidatorGroups mode:# wc:mode.0?int shard:mode.1?long = liteServer.nonfinal.ValidatorGroups; +liteServer.nonfinal.getValidatorGroups mode:# wc:mode.0?int shard:mode.0?long = liteServer.nonfinal.ValidatorGroups; liteServer.nonfinal.getCandidate id:liteServer.nonfinal.candidateId = liteServer.nonfinal.Candidate; liteServer.queryPrefix = Object; diff --git a/tl/generate/scheme/lite_api.tlo b/tl/generate/scheme/lite_api.tlo index ccae66f3..d6e65b18 100644 Binary files a/tl/generate/scheme/lite_api.tlo and b/tl/generate/scheme/lite_api.tlo differ diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 49d3492e..9fec39d1 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -397,6 +397,11 @@ tonNode.blockBroadcastCompressed id:tonNode.blockIdExt catchain_seqno:int valida tonNode.ihrMessageBroadcast message:tonNode.ihrMessage = tonNode.Broadcast; tonNode.externalMessageBroadcast message:tonNode.externalMessage = tonNode.Broadcast; tonNode.newShardBlockBroadcast block:tonNode.newShardBlock = tonNode.Broadcast; +// signature may be empty, at least for now +tonNode.newBlockCandidateBroadcast id:tonNode.blockIdExt catchain_seqno:int validator_set_hash:int + collator_signature:tonNode.blockSignature data:bytes = tonNode.Broadcast; +tonNode.newBlockCandidateBroadcastCompressed id:tonNode.blockIdExt catchain_seqno:int validator_set_hash:int + collator_signature:tonNode.blockSignature flags:# compressed:bytes = tonNode.Broadcast; tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index 6719bdd9..3073390b 100644 Binary files a/tl/generate/scheme/ton_api.tlo and b/tl/generate/scheme/ton_api.tlo differ diff --git a/ton/ton-types.h b/ton/ton-types.h index 957f13e1..4eba30e4 100644 --- a/ton/ton-types.h +++ b/ton/ton-types.h @@ -346,6 +346,10 @@ struct BlockSignature { struct ReceivedBlock { BlockIdExt id; td::BufferSlice data; + + ReceivedBlock clone() const { + return ReceivedBlock{id, data.clone()}; + } }; struct BlockBroadcast { diff --git a/validator/db/files-async.hpp b/validator/db/files-async.hpp index bcb7fa8b..c8508996 100644 --- a/validator/db/files-async.hpp +++ b/validator/db/files-async.hpp @@ -57,7 +57,7 @@ class WriteFile : public td::actor::Actor { status = file.sync(); } if (status.is_error()) { - td::unlink(old_name); + td::unlink(old_name).ignore(); promise_.set_error(std::move(status)); stop(); return; diff --git a/validator/downloaders/wait-block-data.cpp b/validator/downloaders/wait-block-data.cpp index c40dba48..220a8a2c 100644 --- a/validator/downloaders/wait-block-data.cpp +++ b/validator/downloaders/wait-block-data.cpp @@ -17,10 +17,14 @@ Copyright 2017-2020 Telegram Systems LLP */ #include "wait-block-data.hpp" + +#include "block-parse.h" +#include "block-auto.h" #include "fabric.h" #include "adnl/utils.hpp" #include "ton/ton-io.hpp" #include "common/delay.h" +#include "vm/cells/MerkleProof.h" namespace ton { @@ -108,7 +112,7 @@ void WaitBlockData::start() { td::actor::send_closure(SelfId, &WaitBlockData::failed_to_get_block_data_from_net, R.move_as_error_prefix("net error: ")); } else { - td::actor::send_closure(SelfId, &WaitBlockData::got_block_data_from_net, R.move_as_ok()); + td::actor::send_closure(SelfId, &WaitBlockData::got_data_from_net, R.move_as_ok()); } }); @@ -133,13 +137,49 @@ void WaitBlockData::failed_to_get_block_data_from_net(td::Status reason) { td::Timestamp::in(0.1)); } -void WaitBlockData::got_block_data_from_net(ReceivedBlock block) { +void WaitBlockData::got_data_from_net(ReceivedBlock block) { auto X = create_block(std::move(block)); if (X.is_error()) { failed_to_get_block_data_from_net(X.move_as_error_prefix("bad block from net: ")); return; } - data_ = X.move_as_ok(); + got_block_data_from_net(X.move_as_ok()); +} + +void WaitBlockData::got_block_data_from_net(td::Ref block) { + if (data_.not_null()) { + return; + } + data_ = std::move(block); + if (handle_->received()) { + finish_query(); + return; + } + if (!handle_->id().is_masterchain() && !handle_->inited_proof_link()) { + // This can happen if we get block from candidates cache. + // Proof link can be derived from the block (but not for masterchain block). + auto r_proof_link = generate_proof_link(handle_->id(), data_->root_cell()); + if (r_proof_link.is_error()) { + abort_query(r_proof_link.move_as_error_prefix("failed to create proof link for block: ")); + return; + } + td::actor::send_closure(manager_, &ValidatorManager::validate_block_proof_link, handle_->id(), + r_proof_link.move_as_ok(), + [id = handle_->id().id, SelfId = actor_id(this)](td::Result R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &WaitBlockData::abort_query, + R.move_as_error_prefix("validate proof link error: ")); + return; + } + LOG(DEBUG) << "Created and validated proof link for " << id.to_str(); + td::actor::send_closure(SelfId, &WaitBlockData::checked_proof_link); + }); + return; + } + checked_proof_link(); +} + +void WaitBlockData::checked_proof_link() { CHECK(handle_->id().is_masterchain() ? handle_->inited_proof() : handle_->inited_proof_link()); if (!handle_->received()) { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result R) { @@ -198,6 +238,41 @@ void WaitBlockData::got_static_file(td::BufferSlice data) { run_hardfork_accept_block_query(handle_->id(), data_, manager_, std::move(P)); } +td::Result WaitBlockData::generate_proof_link(BlockIdExt id, td::Ref block_root) { + // Creating proof link. Similar to accept-block.cpp + if (id.is_masterchain()) { + return td::Status::Error("cannot create proof link for masterchain block"); + } + auto usage_tree = std::make_shared(); + auto usage_cell = vm::UsageCell::create(block_root, usage_tree->root_ptr()); + + block::gen::Block::Record blk; + block::gen::BlockInfo::Record info; + block::gen::BlockExtra::Record extra; + block::gen::ExtBlkRef::Record mcref{}; // _ ExtBlkRef = BlkMasterInfo; + ShardIdFull shard; + if (!(tlb::unpack_cell(usage_cell, blk) && tlb::unpack_cell(blk.info, info) && !info.version && + block::tlb::t_ShardIdent.unpack(info.shard.write(), shard) && + block::gen::BlkPrevInfo{info.after_merge}.validate_ref(info.prev_ref) && + tlb::unpack_cell(std::move(blk.extra), extra) && block::gen::t_ValueFlow.force_validate_ref(blk.value_flow) && + (!info.not_master || tlb::unpack_cell(info.master_ref, mcref)))) { + return td::Status::Error("cannot unpack block header"); + } + vm::CellSlice upd_cs{vm::NoVmSpec(), blk.state_update}; + + auto proof = vm::MerkleProof::generate(block_root, usage_tree.get()); + vm::CellBuilder cb; + td::Ref bs_cell; + if (!(cb.store_long_bool(0xc3, 8) // block_proof#c3 + && block::tlb::t_BlockIdExt.pack(cb, id) // proof_for:BlockIdExt + && cb.store_ref_bool(std::move(proof)) // proof:^Cell + && cb.store_bool_bool(false) // signatures:(Maybe ^BlockSignatures) + && cb.finalize_to(bs_cell))) { + return td::Status::Error("cannot serialize BlockProof"); + } + return std_boc_serialize(bs_cell, 0); +} + } // namespace validator } // namespace ton diff --git a/validator/downloaders/wait-block-data.hpp b/validator/downloaders/wait-block-data.hpp index 9a03b1cb..229b4bfc 100644 --- a/validator/downloaders/wait-block-data.hpp +++ b/validator/downloaders/wait-block-data.hpp @@ -57,11 +57,15 @@ class WaitBlockData : public td::actor::Actor { void set_is_hardfork(bool value); void start(); void got_block_data_from_db(td::Ref data); - void got_block_data_from_net(ReceivedBlock data); + void got_data_from_net(ReceivedBlock data); + void got_block_data_from_net(td::Ref block); + void checked_proof_link(); void failed_to_get_block_data_from_net(td::Status reason); void got_static_file(td::BufferSlice data); + static td::Result generate_proof_link(BlockIdExt id, td::Ref block_root); + private: BlockHandle handle_; diff --git a/validator/downloaders/wait-block-state.cpp b/validator/downloaders/wait-block-state.cpp index 6ed6e792..a8cd3499 100644 --- a/validator/downloaders/wait-block-state.cpp +++ b/validator/downloaders/wait-block-state.cpp @@ -206,7 +206,8 @@ void WaitBlockState::got_proof_link(td::BufferSlice data) { td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link); } else { LOG(INFO) << "received bad proof link: " << R.move_as_error(); - td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link); + delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link); }, + td::Timestamp::in(0.1)); } }); run_check_proof_link_query(handle_->id(), R.move_as_ok(), manager_, timeout_, std::move(P)); diff --git a/validator/fabric.h b/validator/fabric.h index 8e544f8a..e809be43 100644 --- a/validator/fabric.h +++ b/validator/fabric.h @@ -52,8 +52,8 @@ td::Result>> create_new_shard_bloc td::Ref create_signature_set(std::vector sig_set); -void run_check_external_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits, - td::actor::ActorId manager, td::Promise> promise); +void run_check_external_message(td::Ref message, td::actor::ActorId manager, + td::Promise> promise); void run_accept_block_query(BlockIdExt id, td::Ref data, std::vector prev, td::Ref validator_set, td::Ref signatures, diff --git a/validator/full-node-private-overlay-v2.cpp b/validator/full-node-private-overlay-v2.cpp index 36e4f57b..8dd7f6dd 100644 --- a/validator/full-node-private-overlay-v2.cpp +++ b/validator/full-node-private-overlay-v2.cpp @@ -16,6 +16,8 @@ */ #include "full-node-private-overlay-v2.hpp" + +#include "checksum.h" #include "ton/ton-tl.hpp" #include "common/delay.h" #include "td/utils/JsonBuilder.h" @@ -52,6 +54,40 @@ void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash src, ton_api::ton query.block_->cc_seqno_, std::move(query.block_->data_)); } +void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash src, + ton_api::tonNode_newBlockCandidateBroadcast &query) { + process_block_candidate_broadcast(src, query); +} + +void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash src, + ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) { + process_block_candidate_broadcast(src, query); +} + +void FullNodePrivateOverlayV2::process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) { + BlockIdExt block_id; + CatchainSeqno cc_seqno; + td::uint32 validator_set_hash; + td::BufferSlice data; + auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data, + overlay::Overlays::max_fec_broadcast_size()); + if (S.is_error()) { + LOG(DEBUG) << "dropped broadcast: " << S; + return; + } + if (data.size() > FullNode::max_block_size()) { + VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src; + return; + } + if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) { + VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src; + return; + } + VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate in private overlay from " << src << ": " << block_id.to_str(); + td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno, + validator_set_hash, std::move(data)); +} + void FullNodePrivateOverlayV2::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { auto B = fetch_tl_object(std::move(broadcast), true); if (B.is_error()) { @@ -93,6 +129,22 @@ void FullNodePrivateOverlayV2::send_broadcast(BlockBroadcast broadcast) { local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok()); } +void FullNodePrivateOverlayV2::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::BufferSlice data) { + if (!inited_) { + return; + } + auto B = + serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled + if (B.is_error()) { + VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error(); + return; + } + VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate in private overlay (with compression): " << block_id.to_str(); + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok()); +} + void FullNodePrivateOverlayV2::start_up() { std::sort(nodes_.begin(), nodes_.end()); nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end()); diff --git a/validator/full-node-private-overlay-v2.hpp b/validator/full-node-private-overlay-v2.hpp index de55332f..9a6ec932 100644 --- a/validator/full-node-private-overlay-v2.hpp +++ b/validator/full-node-private-overlay-v2.hpp @@ -27,6 +27,11 @@ class FullNodePrivateOverlayV2 : public td::actor::Actor { void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast& query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast& query); + + void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query); + void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query); + template void process_broadcast(PublicKeyHash, T&) { VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast"; @@ -35,6 +40,8 @@ class FullNodePrivateOverlayV2 : public td::actor::Actor { void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data); void send_broadcast(BlockBroadcast broadcast); + void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data); void start_up() override; void tear_down() override; diff --git a/validator/full-node-private-overlay.cpp b/validator/full-node-private-overlay.cpp index 74bb75c5..e5ea1f0b 100644 --- a/validator/full-node-private-overlay.cpp +++ b/validator/full-node-private-overlay.cpp @@ -17,6 +17,7 @@ #include "full-node-private-overlay.hpp" #include "ton/ton-tl.hpp" #include "common/delay.h" +#include "common/checksum.h" #include "full-node-serializer.hpp" namespace ton::validator::fullnode { @@ -49,6 +50,41 @@ void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api:: query.block_->cc_seqno_, std::move(query.block_->data_)); } +void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, + ton_api::tonNode_newBlockCandidateBroadcast &query) { + process_block_candidate_broadcast(src, query); +} + +void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, + ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) { + process_block_candidate_broadcast(src, query); +} + +void FullNodePrivateBlockOverlay::process_block_candidate_broadcast(PublicKeyHash src, + ton_api::tonNode_Broadcast &query) { + BlockIdExt block_id; + CatchainSeqno cc_seqno; + td::uint32 validator_set_hash; + td::BufferSlice data; + auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data, + overlay::Overlays::max_fec_broadcast_size()); + if (S.is_error()) { + LOG(DEBUG) << "dropped broadcast: " << S; + return; + } + if (data.size() > FullNode::max_block_size()) { + VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src; + return; + } + if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) { + VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src; + return; + } + VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate in private overlay from " << src << ": " << block_id.to_str(); + td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno, + validator_set_hash, std::move(data)); +} + void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { if (adnl::AdnlNodeIdShort{src} == local_id_) { return; @@ -77,6 +113,22 @@ void FullNodePrivateBlockOverlay::send_shard_block_info(BlockIdExt block_id, Cat } } +void FullNodePrivateBlockOverlay::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::BufferSlice data) { + if (!inited_) { + return; + } + auto B = + serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled + if (B.is_error()) { + VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error(); + return; + } + VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate in private overlay: " << block_id.to_str(); + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok()); +} + void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) { if (!inited_) { return; @@ -199,6 +251,45 @@ void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNod std::move(query.message_->data_), it->second); } +void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query) { + process_block_candidate_broadcast(src, query); +} + +void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, + ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) { + process_block_candidate_broadcast(src, query); +} + +void FullNodeCustomOverlay::process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) { + if (!block_senders_.count(adnl::AdnlNodeIdShort(src))) { + VLOG(FULL_NODE_DEBUG) << "Dropping block candidate broadcast in private overlay \"" << name_ + << "\" from unauthorized sender " << src; + return; + } + BlockIdExt block_id; + CatchainSeqno cc_seqno; + td::uint32 validator_set_hash; + td::BufferSlice data; + auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data, + overlay::Overlays::max_fec_broadcast_size()); + if (S.is_error()) { + LOG(DEBUG) << "dropped broadcast: " << S; + return; + } + if (data.size() > FullNode::max_block_size()) { + VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src; + return; + } + if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) { + VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src; + return; + } + VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate in custom overlay \"" << name_ << "\" from " << src << ": " + << block_id.to_str(); + td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno, + validator_set_hash, std::move(data)); +} + void FullNodeCustomOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { if (adnl::AdnlNodeIdShort{src} == local_id_) { return; @@ -241,6 +332,22 @@ void FullNodeCustomOverlay::send_broadcast(BlockBroadcast broadcast) { local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok()); } +void FullNodeCustomOverlay::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::BufferSlice data) { + if (!inited_) { + return; + } + auto B = + serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled + if (B.is_error()) { + VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error(); + return; + } + VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate in custom overlay \"" << name_ << "\": " << block_id.to_str(); + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok()); +} + void FullNodeCustomOverlay::start_up() { std::sort(nodes_.begin(), nodes_.end()); nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end()); diff --git a/validator/full-node-private-overlay.hpp b/validator/full-node-private-overlay.hpp index 196d6da6..0f76c5ae 100644 --- a/validator/full-node-private-overlay.hpp +++ b/validator/full-node-private-overlay.hpp @@ -27,6 +27,11 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor { void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query); + + void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query); + void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query); + template void process_broadcast(PublicKeyHash, T &) { VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast"; @@ -34,6 +39,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor { void receive_broadcast(PublicKeyHash src, td::BufferSlice query); void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data); + void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data); void send_broadcast(BlockBroadcast broadcast); void set_config(FullNodeConfig config) { @@ -98,6 +105,11 @@ class FullNodeCustomOverlay : public td::actor::Actor { void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query); + + void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query); + void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query); + template void process_broadcast(PublicKeyHash, T &) { VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast"; @@ -106,6 +118,8 @@ class FullNodeCustomOverlay : public td::actor::Actor { void send_external_message(td::BufferSlice data); void send_broadcast(BlockBroadcast broadcast); + void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data); void set_config(FullNodeConfig config) { config_ = std::move(config); diff --git a/validator/full-node-serializer.cpp b/validator/full-node-serializer.cpp index 42e68286..94dc2155 100644 --- a/validator/full-node-serializer.cpp +++ b/validator/full-node-serializer.cpp @@ -152,4 +152,63 @@ td::Status deserialize_block_full(ton_api::tonNode_DataFull& obj, BlockIdExt& id return S; } +td::Result serialize_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::Slice data, + bool compression_enabled) { + if (!compression_enabled) { + return create_serialize_tl_object( + create_tl_block_id(block_id), cc_seqno, validator_set_hash, + create_tl_object(Bits256::zero(), td::BufferSlice()), td::BufferSlice(data)); + } + TRY_RESULT(root, vm::std_boc_deserialize(data)); + TRY_RESULT(data_new, vm::std_boc_serialize(root, 2)); + td::BufferSlice compressed = td::lz4_compress(data_new); + VLOG(FULL_NODE_DEBUG) << "Compressing block candidate broadcast: " << data.size() << " -> " << compressed.size(); + return create_serialize_tl_object( + create_tl_block_id(block_id), cc_seqno, validator_set_hash, + create_tl_object(Bits256::zero(), td::BufferSlice()), 0, std::move(compressed)); +} + +static td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_newBlockCandidateBroadcast& obj, + BlockIdExt& block_id, CatchainSeqno& cc_seqno, + td::uint32& validator_set_hash, td::BufferSlice& data) { + block_id = create_block_id(obj.id_); + cc_seqno = obj.catchain_seqno_; + validator_set_hash = obj.validator_set_hash_; + data = std::move(obj.data_); + return td::Status::OK(); +} + +static td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_newBlockCandidateBroadcastCompressed& obj, + BlockIdExt& block_id, CatchainSeqno& cc_seqno, + td::uint32& validator_set_hash, td::BufferSlice& data, + int max_decompressed_data_size) { + block_id = create_block_id(obj.id_); + cc_seqno = obj.catchain_seqno_; + validator_set_hash = obj.validator_set_hash_; + TRY_RESULT(decompressed, td::lz4_decompress(obj.compressed_, max_decompressed_data_size)); + TRY_RESULT(root, vm::std_boc_deserialize(decompressed)); + TRY_RESULT_ASSIGN(data, vm::std_boc_serialize(root, 31)); + VLOG(FULL_NODE_DEBUG) << "Decompressing block candidate broadcast: " << obj.compressed_.size() << " -> " + << data.size(); + return td::Status::OK(); +} + +td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_Broadcast& obj, BlockIdExt& block_id, + CatchainSeqno& cc_seqno, td::uint32& validator_set_hash, + td::BufferSlice& data, int max_decompressed_data_size) { + td::Status S; + ton_api::downcast_call(obj, td::overloaded( + [&](ton_api::tonNode_newBlockCandidateBroadcast& f) { + S = deserialize_block_candidate_broadcast(f, block_id, cc_seqno, validator_set_hash, + data); + }, + [&](ton_api::tonNode_newBlockCandidateBroadcastCompressed& f) { + S = deserialize_block_candidate_broadcast(f, block_id, cc_seqno, validator_set_hash, + data, max_decompressed_data_size); + }, + [&](auto&) { S = td::Status::Error("unknown data type"); })); + return S; +} + } // namespace ton::validator::fullnode diff --git a/validator/full-node-serializer.hpp b/validator/full-node-serializer.hpp index a5c73cbc..f6751689 100644 --- a/validator/full-node-serializer.hpp +++ b/validator/full-node-serializer.hpp @@ -28,4 +28,11 @@ td::Result serialize_block_full(const BlockIdExt& id, td::Slice td::Status deserialize_block_full(ton_api::tonNode_DataFull& obj, BlockIdExt& id, td::BufferSlice& proof, td::BufferSlice& data, bool& is_proof_link, int max_decompressed_data_size); +td::Result serialize_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::Slice data, + bool compression_enabled); +td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_Broadcast& obj, BlockIdExt& block_id, + CatchainSeqno& cc_seqno, td::uint32& validator_set_hash, + td::BufferSlice& data, int max_decompressed_data_size); + } // namespace ton::validator::fullnode diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 6c2a6a55..1284cf97 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -17,6 +17,7 @@ Copyright 2017-2020 Telegram Systems LLP */ #include "auto/tl/ton_api.h" +#include "checksum.h" #include "overlays.h" #include "td/utils/SharedSlice.h" #include "td/utils/overloaded.h" @@ -24,6 +25,7 @@ #include "full-node-shard-queries.hpp" #include "full-node-serializer.hpp" +#include "td/utils/buffer.h" #include "ton/ton-shard.h" #include "ton/ton-tl.hpp" @@ -739,6 +741,35 @@ void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ne query.block_->cc_seqno_, std::move(query.block_->data_)); } +void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query) { + process_block_candidate_broadcast(src, query); +} + +void FullNodeShardImpl::process_broadcast(PublicKeyHash src, + ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) { + process_block_candidate_broadcast(src, query); +} + +void FullNodeShardImpl::process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) { + BlockIdExt block_id; + CatchainSeqno cc_seqno; + td::uint32 validator_set_hash; + td::BufferSlice data; + auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data, + overlay::Overlays::max_fec_broadcast_size()); + if (data.size() > FullNode::max_block_size()) { + VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src; + return; + } + if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) { + VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src; + return; + } + VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate from " << src << ": " << block_id.to_str(); + td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno, + validator_set_hash, std::move(data)); +} + void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) { process_block_broadcast(src, query); } @@ -839,6 +870,23 @@ void FullNodeShardImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno } } +void FullNodeShardImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) { + if (!client_.empty()) { + UNREACHABLE(); + return; + } + auto B = + serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled + if (B.is_error()) { + VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error(); + return; + } + VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate: " << block_id.to_str(); + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, adnl_id_, overlay_id_, local_id_, + overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok()); +} + void FullNodeShardImpl::send_broadcast(BlockBroadcast broadcast) { if (!client_.empty()) { UNREACHABLE(); diff --git a/validator/full-node-shard.h b/validator/full-node-shard.h index e8fed39c..4856989a 100644 --- a/validator/full-node-shard.h +++ b/validator/full-node-shard.h @@ -48,6 +48,8 @@ class FullNodeShard : public td::actor::Actor { virtual void send_ihr_message(td::BufferSlice data) = 0; virtual void send_external_message(td::BufferSlice data) = 0; virtual void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0; + virtual void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) = 0; virtual void send_broadcast(BlockBroadcast broadcast) = 0; virtual void sign_overlay_certificate(PublicKeyHash signed_key, td::uint32 expiry_at, td::uint32 max_size, diff --git a/validator/full-node-shard.hpp b/validator/full-node-shard.hpp index da121d13..393ee269 100644 --- a/validator/full-node-shard.hpp +++ b/validator/full-node-shard.hpp @@ -18,6 +18,7 @@ */ #pragma once +#include "auto/tl/ton_api.h" #include "full-node-shard.h" #include "td/actor/PromiseFuture.h" #include "td/utils/port/Poll.h" @@ -167,6 +168,11 @@ class FullNodeShardImpl : public FullNodeShard { void process_broadcast(PublicKeyHash src, ton_api::tonNode_ihrMessageBroadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query); + + void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query); + void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query); + void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query); + void receive_broadcast(PublicKeyHash src, td::BufferSlice query); void check_broadcast(PublicKeyHash src, td::BufferSlice query, td::Promise promise); void get_stats_extra(td::Promise promise); @@ -175,6 +181,8 @@ class FullNodeShardImpl : public FullNodeShard { void send_ihr_message(td::BufferSlice data) override; void send_external_message(td::BufferSlice data) override; void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override; + void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) override; void send_broadcast(BlockBroadcast broadcast) override; void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, diff --git a/validator/full-node.cpp b/validator/full-node.cpp index 07cfa524..e5ffa9c9 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -352,6 +352,29 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s td::actor::send_closure(shard, &FullNodeShard::send_shard_block_info, block_id, cc_seqno, std::move(data)); } +void FullNodeImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) { + send_block_candidate_broadcast_to_custom_overlays(block_id, cc_seqno, validator_set_hash, data); + auto shard = get_shard(ShardIdFull{masterchainId, shardIdAll}); + if (shard.empty()) { + VLOG(FULL_NODE_WARNING) << "dropping OUT shard block info message to unknown shard"; + return; + } + /*if (!private_block_overlays_.empty()) { + td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_block_candidate, + block_id, cc_seqno, validator_set_hash, data.clone()); + }*/ + auto private_overlay = private_block_overlays_.choose_overlay(block_id.shard_full()); + if (!private_overlay.empty()) { + td::actor::send_closure(private_overlay, &FullNodePrivateOverlayV2::send_block_candidate, block_id, cc_seqno, + validator_set_hash, data.clone()); + } + if (broadcast_block_candidates_in_public_overlay_) { + td::actor::send_closure(shard, &FullNodeShard::send_block_candidate, block_id, cc_seqno, validator_set_hash, + std::move(data)); + } +} + void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) { send_block_broadcast_to_custom_overlays(broadcast); if (custom_overlays_only) { @@ -362,7 +385,7 @@ void FullNodeImpl::send_broadcast(BlockBroadcast broadcast, bool custom_overlays VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard"; return; } - /*if (!private_block_overlays_.empty()) { + /*if (broadcast.block_id.is_masterchain() && !private_block_overlays_.empty()) { td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_broadcast, broadcast.clone()); }*/ @@ -575,6 +598,14 @@ void FullNodeImpl::process_block_broadcast(BlockBroadcast broadcast) { }); } +void FullNodeImpl::process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::BufferSlice data) { + send_block_candidate_broadcast_to_custom_overlays(block_id, cc_seqno, validator_set_hash, data); + // ignore cc_seqno and validator_hash for now + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_block_candidate, block_id, + std::move(data)); +} + void FullNodeImpl::start_up() { add_shard_actor(ShardIdFull{masterchainId}, FullNodeShardMode::active); if (local_id_.is_zero()) { @@ -606,6 +637,11 @@ void FullNodeImpl::start_up() { void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override { td::actor::send_closure(id_, &FullNodeImpl::send_shard_block_info, block_id, cc_seqno, std::move(data)); } + void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) override { + td::actor::send_closure(id_, &FullNodeImpl::send_block_candidate, block_id, cc_seqno, validator_set_hash, + std::move(data)); + } void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only) override { td::actor::send_closure(id_, &FullNodeImpl::send_broadcast, std::move(broadcast), custom_overlays_only); } @@ -745,6 +781,29 @@ void FullNodeImpl::send_block_broadcast_to_custom_overlays(const BlockBroadcast& } } +void FullNodeImpl::send_block_candidate_broadcast_to_custom_overlays(const BlockIdExt &block_id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, + const td::BufferSlice &data) { + // Same cache of sent broadcasts as in send_block_broadcast_to_custom_overlays + if (!custom_overlays_sent_broadcasts_.insert(block_id).second) { + return; + } + custom_overlays_sent_broadcasts_lru_.push(block_id); + if (custom_overlays_sent_broadcasts_lru_.size() > 256) { + custom_overlays_sent_broadcasts_.erase(custom_overlays_sent_broadcasts_lru_.front()); + custom_overlays_sent_broadcasts_lru_.pop(); + } + for (auto &private_overlay : custom_overlays_) { + for (auto &actor : private_overlay.second.actors_) { + auto local_id = actor.first; + if (private_overlay.second.params_.block_senders_.count(local_id)) { + td::actor::send_closure(actor.second, &FullNodeCustomOverlay::send_block_candidate, block_id, cc_seqno, + validator_set_hash, data.clone()); + } + } + } +} + FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash, FullNodeConfig config, td::actor::ActorId keyring, td::actor::ActorId adnl, td::actor::ActorId rldp, diff --git a/validator/full-node.h b/validator/full-node.h index 1b6db25b..477d961e 100644 --- a/validator/full-node.h +++ b/validator/full-node.h @@ -88,6 +88,8 @@ class FullNode : public td::actor::Actor { virtual void del_custom_overlay(std::string name, td::Promise promise) = 0; virtual void process_block_broadcast(BlockBroadcast broadcast) = 0; + virtual void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::BufferSlice data) = 0; static constexpr td::uint32 max_block_size() { return 4 << 20; diff --git a/validator/full-node.hpp b/validator/full-node.hpp index 7fb31f67..1a172993 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -69,6 +69,8 @@ class FullNodeImpl : public FullNode { void send_ihr_message(AccountIdPrefixFull dst, td::BufferSlice data); void send_ext_message(AccountIdPrefixFull dst, td::BufferSlice data); void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqnp, td::BufferSlice data); + void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data); void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only); void download_block(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise promise); void download_zero_state(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, @@ -90,6 +92,8 @@ class FullNodeImpl : public FullNode { void new_key_block(BlockHandle handle); void process_block_broadcast(BlockBroadcast broadcast) override; + void process_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) override; void start_up() override; @@ -147,6 +151,7 @@ class FullNodeImpl : public FullNode { void set_private_block_overlays_enable_compression(bool value); void create_private_block_overlay(PublicKeyHash key); */ + bool broadcast_block_candidates_in_public_overlay_ = false; struct CustomOverlayInfo { CustomOverlayParams params_; @@ -158,10 +163,11 @@ class FullNodeImpl : public FullNode { void update_private_overlays(); // void set_private_block_overlays_enable_compression(bool value); - + // void create_private_block_overlay(PublicKeyHash key); void update_custom_overlay(CustomOverlayInfo& overlay); void send_block_broadcast_to_custom_overlays(const BlockBroadcast& broadcast); - + void send_block_candidate_broadcast_to_custom_overlays(const BlockIdExt& block_id, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, const td::BufferSlice& data); FullNodePrivateBlockOverlays private_block_overlays_; }; diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index bfca165b..fc031351 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -5222,12 +5222,13 @@ bool Collator::create_block_candidate() { td::actor::send_closure_later(actor_id(this), &Collator::return_block_candidate, td::Unit()); } else { LOG(INFO) << "saving new BlockCandidate"; - td::actor::send_closure_later(manager, &ValidatorManager::set_block_candidate, block_candidate->id, - block_candidate->clone(), [self = get_self()](td::Result saved) -> void { - LOG(DEBUG) << "got answer to set_block_candidate"; - td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, - std::move(saved)); - }); + td::actor::send_closure_later( + manager, &ValidatorManager::set_block_candidate, block_candidate->id, block_candidate->clone(), + validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(), + [self = get_self()](td::Result saved) -> void { + LOG(DEBUG) << "got answer to set_block_candidate"; + td::actor::send_closure_later(std::move(self), &Collator::return_block_candidate, std::move(saved)); + }); } // 5. communicate about bad and delayed external messages if (!bad_ext_msgs_.empty() || !delay_ext_msgs_.empty()) { diff --git a/validator/impl/external-message.cpp b/validator/impl/external-message.cpp index 073e7360..2fdb491b 100644 --- a/validator/impl/external-message.cpp +++ b/validator/impl/external-message.cpp @@ -86,24 +86,18 @@ td::Result> ExtMessageQ::create_ext_message(td::BufferSlice dat return Ref{true, std::move(data), std::move(ext_msg), dest_prefix, wc, addr}; } -void ExtMessageQ::run_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits, - td::actor::ActorId manager, +void ExtMessageQ::run_message(td::Ref message, td::actor::ActorId manager, td::Promise> promise) { - auto R = create_ext_message(std::move(data), limits); - if (R.is_error()) { - return promise.set_error(R.move_as_error_prefix("failed to parse external message ")); - } - auto M = R.move_as_ok(); - auto root = M->root_cell(); + auto root = message->root_cell(); block::gen::CommonMsgInfo::Record_ext_in_msg_info info; tlb::unpack_cell_inexact(root, info); // checked in create message - ton::StdSmcAddress addr = M->addr(); - ton::WorkchainId wc = M->wc(); + ton::StdSmcAddress addr = message->addr(); + ton::WorkchainId wc = message->wc(); run_fetch_account_state( wc, addr, manager, - [promise = std::move(promise), msg_root = root, wc, addr, - M](td::Result, UnixTime, LogicalTime, std::unique_ptr>> + [promise = std::move(promise), msg_root = root, wc, addr, message]( + td::Result, UnixTime, LogicalTime, std::unique_ptr>> res) mutable { if (res.is_error()) { promise.set_error(td::Status::Error(PSLICE() << "Failed to get account state")); @@ -120,7 +114,7 @@ void ExtMessageQ::run_message(td::BufferSlice data, block::SizeLimitsConfig::Ext } else { auto status = run_message_on_account(wc, &acc, utime, lt + 1, msg_root, std::move(config)); if (status.is_ok()) { - promise.set_value(std::move(M)); + promise.set_value(std::move(message)); } else { promise.set_error(td::Status::Error(PSLICE() << "External message was not accepted\n" << status.message())); diff --git a/validator/impl/external-message.hpp b/validator/impl/external-message.hpp index d5084761..ad7ecc74 100644 --- a/validator/impl/external-message.hpp +++ b/validator/impl/external-message.hpp @@ -61,8 +61,7 @@ class ExtMessageQ : public ExtMessage { ton::StdSmcAddress addr); static td::Result> create_ext_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits); - static void run_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits, - td::actor::ActorId manager, + static void run_message(td::Ref message, td::actor::ActorId manager, td::Promise> promise); static td::Status run_message_on_account(ton::WorkchainId wc, block::Account* acc, diff --git a/validator/impl/fabric.cpp b/validator/impl/fabric.cpp index e2012497..a16460e6 100644 --- a/validator/impl/fabric.cpp +++ b/validator/impl/fabric.cpp @@ -119,10 +119,9 @@ td::Result> create_ext_message(td::BufferSlice data, return std::move(res); } -void run_check_external_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits, - td::actor::ActorId manager, +void run_check_external_message(Ref message, td::actor::ActorId manager, td::Promise> promise) { - ExtMessageQ::run_message(std::move(data), limits, std::move(manager), std::move(promise)); + ExtMessageQ::run_message(std::move(message), std::move(manager), std::move(promise)); } td::Result> create_ihr_message(td::BufferSlice data) { diff --git a/validator/impl/validate-query.cpp b/validator/impl/validate-query.cpp index 00f9abd1..c0c4a2e8 100644 --- a/validator/impl/validate-query.cpp +++ b/validator/impl/validate-query.cpp @@ -6456,7 +6456,8 @@ bool ValidateQuery::save_candidate() { } }); - td::actor::send_closure(manager, &ValidatorManager::set_block_candidate, id_, block_candidate.clone(), std::move(P)); + td::actor::send_closure(manager, &ValidatorManager::set_block_candidate, id_, block_candidate.clone(), + validator_set_->get_catchain_seqno(), validator_set_->get_validator_set_hash(), std::move(P)); return true; } diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index 144461c2..5e567eee 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -90,7 +90,8 @@ class ValidatorManager : public ValidatorManagerInterface { virtual void wait_block_signatures_short(BlockIdExt id, td::Timestamp timeout, td::Promise> promise) = 0; - virtual void set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise promise) = 0; + virtual void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::Promise promise) = 0; virtual void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) = 0; diff --git a/validator/manager-disk.cpp b/validator/manager-disk.cpp index 76a1f122..97ff2e7d 100644 --- a/validator/manager-disk.cpp +++ b/validator/manager-disk.cpp @@ -775,7 +775,8 @@ void ValidatorManagerImpl::set_next_block(BlockIdExt block_id, BlockIdExt next, get_block_handle(block_id, true, std::move(P)); } -void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise promise) { +void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::Promise promise) { td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise)); } diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index 98d12331..695ac566 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -130,6 +130,8 @@ class ValidatorManagerImpl : public ValidatorManager { } void new_ihr_message(td::BufferSlice data) override; void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override; + void new_block_candidate(BlockIdExt block_id, td::BufferSlice data) override { + } void add_ext_server_id(adnl::AdnlNodeIdShort id) override { UNREACHABLE(); @@ -181,7 +183,8 @@ class ValidatorManagerImpl : public ValidatorManager { void wait_block_signatures_short(BlockIdExt id, td::Timestamp timeout, td::Promise> promise) override; - void set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise promise) override; + void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::Promise promise) override; void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 1a016faf..681c1f4f 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -152,6 +152,9 @@ class ValidatorManagerImpl : public ValidatorManager { void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override { UNREACHABLE(); } + void new_block_candidate(BlockIdExt block_id, td::BufferSlice data) override { + UNREACHABLE(); + } void add_ext_server_id(adnl::AdnlNodeIdShort id) override { UNREACHABLE(); @@ -219,7 +222,8 @@ class ValidatorManagerImpl : public ValidatorManager { void wait_block_signatures_short(BlockIdExt id, td::Timestamp timeout, td::Promise> promise) override; - void set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise promise) override { + void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::Promise promise) override { promise.set_value(td::Unit()); } diff --git a/validator/manager.cpp b/validator/manager.cpp index 3130c106..663cc7a1 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -17,6 +17,8 @@ Copyright 2017-2020 Telegram Systems LLP */ #include "manager.hpp" +#include "checksum.h" +#include "td/utils/buffer.h" #include "validator-group.hpp" #include "downloaders/wait-block-state.hpp" #include "downloaders/wait-block-state-merge.hpp" @@ -412,14 +414,42 @@ void ValidatorManagerImpl::add_external_message(td::Ref msg, int pri ext_messages_hashes_[id.hash] = {priority, id}; } void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise> promise) { - ++ls_stats_check_ext_messages_; auto state = do_get_last_liteserver_state(); if (state.is_null()) { promise.set_error(td::Status::Error(ErrorCode::notready, "not ready")); return; } - run_check_external_message(std::move(data), state->get_ext_msg_limits(), actor_id(this), - std::move(promise)); + auto R = create_ext_message(std::move(data), state->get_ext_msg_limits()); + if (R.is_error()) { + promise.set_error(R.move_as_error_prefix("failed to parse external message: ")); + return; + } + auto message = R.move_as_ok(); + WorkchainId wc = message->wc(); + StdSmcAddress addr = message->addr(); + if (checked_ext_msg_counter_.get_msg_count(wc, addr) >= max_ext_msg_per_addr()) { + promise.set_error( + td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex())); + return; + } + + promise = [self = this, wc, addr, promise = std::move(promise), + SelfId = actor_id(this)](td::Result> R) mutable { + if (R.is_error()) { + promise.set_error(R.move_as_error()); + return; + } + td::actor::send_lambda(SelfId, [=, promise = std::move(promise), message = R.move_as_ok()]() mutable { + if (self->checked_ext_msg_counter_.inc_msg_count(wc, addr) > max_ext_msg_per_addr()) { + promise.set_error( + td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex())); + return; + } + promise.set_result(std::move(message)); + }); + }; + ++ls_stats_check_ext_messages_; + run_check_external_message(std::move(message), actor_id(this), std::move(promise)); } void ValidatorManagerImpl::new_ihr_message(td::BufferSlice data) { @@ -466,6 +496,17 @@ void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc actor_id(this), td::Timestamp::in(2.0), std::move(P)); } +void ValidatorManagerImpl::new_block_candidate(BlockIdExt block_id, td::BufferSlice data) { + if (!last_masterchain_block_handle_) { + VLOG(VALIDATOR_DEBUG) << "dropping top shard block broadcast: not inited"; + return; + } + if (!started_) { + return; + } + add_cached_block_candidate(ReceivedBlock{block_id, std::move(data)}); +} + void ValidatorManagerImpl::add_shard_block_description(td::Ref desc) { if (desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) { auto it = shard_blocks_.find(ShardTopBlockDescriptionId{desc->shard(), desc->catchain_seqno()}); @@ -535,6 +576,36 @@ void ValidatorManagerImpl::loaded_msg_queue_to_masterchain(td::Refsecond.actor_, &WaitBlockData::got_block_data_from_net, r_block.move_as_ok()); + } + } + } + { + auto it = wait_state_.find(id); + if (it != wait_state_.end()) { + // Proof link is not ready at this point, but this will force WaitBlockState to redo send_get_proof_link_request + td::actor::send_closure(it->second.actor_, &WaitBlockState::after_get_proof_link); + } + } + } + if (cached_block_candidates_lru_.size() > max_cached_candidates()) { + CHECK(cached_block_candidates_.erase(cached_block_candidates_lru_.front())); + cached_block_candidates_lru_.pop_front(); + } +} + void ValidatorManagerImpl::add_ext_server_id(adnl::AdnlNodeIdShort id) { class Cb : public adnl::Adnl::Callback { private: @@ -1303,11 +1374,16 @@ void ValidatorManagerImpl::set_next_block(BlockIdExt block_id, BlockIdExt next, get_block_handle(block_id, true, std::move(P)); } -void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise promise) { +void ValidatorManagerImpl::set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::Promise promise) { if (!candidates_buffer_.empty()) { td::actor::send_closure(candidates_buffer_, &CandidatesBuffer::add_new_candidate, id, PublicKey{pubkeys::Ed25519{candidate.pubkey.as_bits256()}}, candidate.collated_file_hash); } + if (!id.is_masterchain()) { + add_cached_block_candidate(ReceivedBlock{id, candidate.data.clone()}); + callback_->send_block_candidate(id, cc_seqno, validator_set_hash, candidate.data.clone()); + } td::actor::send_closure(db_, &Db::store_block_candidate, std::move(candidate), std::move(promise)); } @@ -1564,6 +1640,13 @@ void ValidatorManagerImpl::get_last_liteserver_state_block( void ValidatorManagerImpl::send_get_block_request(BlockIdExt id, td::uint32 priority, td::Promise promise) { + { + auto it = cached_block_candidates_.find(id); + if (it != cached_block_candidates_.end()) { + LOG(DEBUG) << "send_get_block_request: got result from candidates cache for " << id.to_str(); + return promise.set_value(it->second.clone()); + } + } callback_->download_block(id, priority, td::Timestamp::in(10.0), std::move(promise)); } @@ -1586,6 +1669,20 @@ void ValidatorManagerImpl::send_get_block_proof_request(BlockIdExt block_id, td: void ValidatorManagerImpl::send_get_block_proof_link_request(BlockIdExt block_id, td::uint32 priority, td::Promise promise) { + if (!block_id.is_masterchain()) { + auto it = cached_block_candidates_.find(block_id); + if (it != cached_block_candidates_.end()) { + // Proof link can be created from the cached block candidate + LOG(DEBUG) << "send_get_block_proof_link_request: creating proof link from cached caniddate for " + << block_id.to_str(); + TRY_RESULT_PROMISE_PREFIX(promise, block_root, vm::std_boc_deserialize(it->second.data), + "failed to create proof link: "); + TRY_RESULT_PROMISE_PREFIX(promise, proof_link, WaitBlockData::generate_proof_link(it->second.id, block_root), + "failed to create proof link: "); + promise.set_result(std::move(proof_link)); + return; + } + } callback_->download_block_proof_link(block_id, priority, td::Timestamp::in(10.0), std::move(promise)); } @@ -2702,6 +2799,16 @@ void ValidatorManagerImpl::alarm() { log_ls_stats_at_ = td::Timestamp::in(60.0); } alarm_timestamp().relax(log_ls_stats_at_); + if (cleanup_mempool_at_.is_in_past()) { + if (is_validator()) { + get_external_messages(ShardIdFull{masterchainId, shardIdAll}, + [](td::Result, int>>>) {}); + get_external_messages(ShardIdFull{basechainId, shardIdAll}, + [](td::Result, int>>>) {}); + } + cleanup_mempool_at_ = td::Timestamp::in(250.0); + } + alarm_timestamp().relax(cleanup_mempool_at_); } void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise promise) { @@ -3360,6 +3467,29 @@ td::actor::ActorOwn ValidatorManagerFactory::create( rldp, overlays); } +size_t ValidatorManagerImpl::CheckedExtMsgCounter::get_msg_count(WorkchainId wc, StdSmcAddress addr) { + before_query(); + auto it1 = counter_cur_.find({wc, addr}); + auto it2 = counter_prev_.find({wc, addr}); + return (it1 == counter_cur_.end() ? 0 : it1->second) + (it2 == counter_prev_.end() ? 0 : it2->second); +} +size_t ValidatorManagerImpl::CheckedExtMsgCounter::inc_msg_count(WorkchainId wc, StdSmcAddress addr) { + before_query(); + auto it2 = counter_prev_.find({wc, addr}); + return (it2 == counter_prev_.end() ? 0 : it2->second) + ++counter_cur_[{wc, addr}]; +} +void ValidatorManagerImpl::CheckedExtMsgCounter::before_query() { + while (cleanup_at_.is_in_past()) { + counter_prev_ = std::move(counter_cur_); + counter_cur_.clear(); + if (counter_prev_.empty()) { + cleanup_at_ = td::Timestamp::in(max_ext_msg_per_addr_time_window() / 2.0); + break; + } + cleanup_at_ += max_ext_msg_per_addr_time_window() / 2.0; + } +} + } // namespace validator } // namespace ton diff --git a/validator/manager.hpp b/validator/manager.hpp index 90c8f210..8c434c09 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -18,10 +18,14 @@ */ #pragma once +#include "common/refcnt.hpp" #include "interfaces/validator-manager.h" #include "interfaces/db.h" #include "td/actor/PromiseFuture.h" +#include "td/utils/SharedSlice.h" +#include "td/utils/buffer.h" #include "td/utils/port/Poll.h" +#include "td/utils/port/StdStreams.h" #include "validator-group.hpp" #include "shard-client.hpp" #include "manager-init.h" @@ -238,6 +242,9 @@ class ValidatorManagerImpl : public ValidatorManager { std::map shard_blocks_; std::map> cached_msg_queue_to_masterchain_; + std::map cached_block_candidates_; + std::list cached_block_candidates_lru_; + struct ExtMessages { std::map, std::unique_ptr>> ext_messages_; std::map, std::map>> @@ -251,10 +258,20 @@ class ValidatorManagerImpl : public ValidatorManager { }; std::map ext_msgs_; // priority -> messages std::map>> ext_messages_hashes_; // hash -> priority + td::Timestamp cleanup_mempool_at_; // IHR ? std::map, std::unique_ptr>> ihr_messages_; std::map> ihr_messages_hashes_; + struct CheckedExtMsgCounter { + std::map, size_t> counter_cur_, counter_prev_; + td::Timestamp cleanup_at_ = td::Timestamp::now(); + + size_t get_msg_count(WorkchainId wc, StdSmcAddress addr); + size_t inc_msg_count(WorkchainId wc, StdSmcAddress addr); + void before_query(); + } checked_ext_msg_counter_; + private: // VALIDATOR GROUPS ValidatorSessionId get_validator_set_id(ShardIdFull shard, td::Ref val_set, td::Bits256 opts_hash, @@ -383,6 +400,7 @@ class ValidatorManagerImpl : public ValidatorManager { void new_ihr_message(td::BufferSlice data) override; void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) override; + void new_block_candidate(BlockIdExt block_id, td::BufferSlice data) override; void add_ext_server_id(adnl::AdnlNodeIdShort id) override; void add_ext_server_port(td::uint16 port) override; @@ -429,7 +447,8 @@ class ValidatorManagerImpl : public ValidatorManager { void wait_block_signatures_short(BlockIdExt id, td::Timestamp timeout, td::Promise> promise) override; - void set_block_candidate(BlockIdExt id, BlockCandidate candidate, td::Promise promise) override; + void set_block_candidate(BlockIdExt id, BlockCandidate candidate, CatchainSeqno cc_seqno, + td::uint32 validator_set_hash, td::Promise promise) override; void wait_block_state_merge(BlockIdExt left_id, BlockIdExt right_id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; @@ -526,6 +545,7 @@ class ValidatorManagerImpl : public ValidatorManager { } void add_shard_block_description(td::Ref desc); + void add_cached_block_candidate(ReceivedBlock block); void preload_msg_queue_to_masterchain(td::Ref desc); void loaded_msg_queue_to_masterchain(td::Ref desc, td::Ref res); @@ -710,6 +730,16 @@ class ValidatorManagerImpl : public ValidatorManager { double max_mempool_num() const { return opts_->max_mempool_num(); } + size_t max_cached_candidates() const { + return 128; + } + static double max_ext_msg_per_addr_time_window() { + return 10.0; + } + static size_t max_ext_msg_per_addr() { + return 3 * 10; + } + void cleanup_last_validated_blocks(BlockId new_block); void got_persistent_state_descriptions(std::vector> descs); diff --git a/validator/validator.h b/validator/validator.h index efd7d1ef..7b155dc6 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -143,6 +143,8 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void send_ihr_message(AccountIdPrefixFull dst, td::BufferSlice data) = 0; virtual void send_ext_message(AccountIdPrefixFull dst, td::BufferSlice data) = 0; virtual void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0; + virtual void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash, + td::BufferSlice data) = 0; virtual void send_broadcast(BlockBroadcast broadcast, bool custom_overlays_only = false) = 0; virtual void download_block(BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout, td::Promise promise) = 0; @@ -214,6 +216,7 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void check_external_message(td::BufferSlice data, td::Promise> promise) = 0; virtual void new_ihr_message(td::BufferSlice data) = 0; virtual void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0; + virtual void new_block_candidate(BlockIdExt block_id, td::BufferSlice data) = 0; virtual void add_ext_server_id(adnl::AdnlNodeIdShort id) = 0; virtual void add_ext_server_port(td::uint16 port) = 0;