diff --git a/crypto/block/block.tlb b/crypto/block/block.tlb index 8cb6c2f0..f1a1c948 100644 --- a/crypto/block/block.tlb +++ b/crypto/block/block.tlb @@ -735,7 +735,8 @@ misbehaviour_punishment_config_v1#01 _ MisbehaviourPunishmentConfig = ConfigParam 40; // collator_nodes: each collator is (workchain:int32 shard:uint64 adnl_id:uint256) -colator_config#a0 full_collated_data:Bool collator_nodes:(HashmapE 352 Unit) = CollatorConfig; +collator_info#0 full_node_id:(Maybe uint256) = CollatorInfo; +colator_config#a0 full_collated_data:Bool collator_nodes:(HashmapE 352 CollatorInfo) = CollatorConfig; _ CollatorConfig = ConfigParam 41; oracle_bridge_params#_ bridge_address:bits256 oracle_mutlisig_address:bits256 oracles:(HashmapE 256 uint256) external_chain_address:bits256 = OracleBridgeParams; diff --git a/crypto/block/mc-config.cpp b/crypto/block/mc-config.cpp index a1ddfb3e..ed79c8b6 100644 --- a/crypto/block/mc-config.cpp +++ b/crypto/block/mc-config.cpp @@ -2142,14 +2142,19 @@ CollatorConfig Config::get_collator_config(bool need_collator_nodes) const { collator_config.full_collated_data = rec.full_collated_data; if (need_collator_nodes) { vm::Dictionary dict{rec.collator_nodes->prefetch_ref(), 32 + 64 + 256}; - dict.check_for_each([&](Ref, td::ConstBitPtr key, int n) { + dict.check_for_each([&](Ref value, td::ConstBitPtr key, int n) { CHECK(n == 32 + 64 + 256); auto workchain = (td::int32)key.get_int(32); key.advance(32); td::uint64 shard = key.get_uint(64); key.advance(64); td::Bits256 adnl_id(key); - collator_config.collator_nodes.push_back({ton::ShardIdFull(workchain, shard), adnl_id}); + td::Bits256 full_node_id = td::Bits256::zero(); + gen::CollatorInfo::Record info; + if (tlb::csr_unpack(std::move(value), info) && info.full_node_id->size() == 257) { + full_node_id = td::Bits256(info.full_node_id->data_bits() + 1); + } + collator_config.collator_nodes.push_back({ton::ShardIdFull(workchain, shard), adnl_id, full_node_id}); return true; }); } diff --git a/crypto/block/mc-config.h b/crypto/block/mc-config.h index 7058117b..e00a045e 100644 --- a/crypto/block/mc-config.h +++ b/crypto/block/mc-config.h @@ -485,6 +485,7 @@ class ShardConfig { struct CollatorNodeDescr { ton::ShardIdFull shard; ton::NodeIdShort adnl_id; + ton::NodeIdShort full_node_id; }; struct CollatorConfig { diff --git a/overlay/overlay-broadcast.cpp b/overlay/overlay-broadcast.cpp index ecd21e7a..bcb7c285 100644 --- a/overlay/overlay-broadcast.cpp +++ b/overlay/overlay-broadcast.cpp @@ -69,6 +69,10 @@ td::Status BroadcastSimple::run_checks() { td::Status BroadcastSimple::distribute() { auto B = serialize(); auto nodes = overlay_->get_neighbours(5); + if (is_ours_) { + auto priority_nodes = overlay_->get_priority_broadcast_receivers(3); + nodes.insert(nodes.end(), priority_nodes.begin(), priority_nodes.end()); + } auto manager = overlay_->overlay_manager(); for (auto &n : nodes) { @@ -140,7 +144,7 @@ td::Status BroadcastSimple::create_new(td::actor::ActorId overlay, auto date = static_cast(td::Clocks::system()); auto B = std::make_unique(broadcast_hash, PublicKey{}, nullptr, flags, std::move(data), date, - td::BufferSlice{}, false, nullptr, adnl::AdnlNodeIdShort::zero()); + td::BufferSlice{}, false, nullptr, adnl::AdnlNodeIdShort::zero(), true); auto to_sign = B->to_sign(); auto P = td::PromiseCreator::lambda( diff --git a/overlay/overlay-broadcast.hpp b/overlay/overlay-broadcast.hpp index e7b39eec..906a94a0 100644 --- a/overlay/overlay-broadcast.hpp +++ b/overlay/overlay-broadcast.hpp @@ -46,6 +46,7 @@ class BroadcastSimple : public td::ListNode { td::uint32 date_; td::BufferSlice signature_; bool is_valid_{false}; + bool is_ours_{false}; OverlayImpl *overlay_; @@ -63,7 +64,7 @@ class BroadcastSimple : public td::ListNode { public: BroadcastSimple(Overlay::BroadcastHash broadcast_hash, PublicKey source, std::shared_ptr cert, td::uint32 flags, td::BufferSlice data, td::uint32 date, td::BufferSlice signature, bool is_valid, - OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id) + OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, bool is_ours = false) : broadcast_hash_(broadcast_hash) , source_(std::move(source)) , cert_(std::move(cert)) @@ -73,7 +74,8 @@ class BroadcastSimple : public td::ListNode { , signature_(std::move(signature)) , is_valid_(is_valid) , overlay_(overlay) - , src_peer_id_(src_peer_id) { + , src_peer_id_(src_peer_id) + , is_ours_(is_ours) { } Overlay::BroadcastHash get_hash() const { diff --git a/overlay/overlay-fec-broadcast.cpp b/overlay/overlay-fec-broadcast.cpp index 7ff08309..4297ea0c 100644 --- a/overlay/overlay-fec-broadcast.cpp +++ b/overlay/overlay-fec-broadcast.cpp @@ -26,8 +26,8 @@ namespace overlay { td::Result> BroadcastFec::create(Overlay::BroadcastHash hash, PublicKey src, Overlay::BroadcastDataHash data_hash, td::uint32 flags, - td::uint32 date, fec::FecType fec_type) { - auto F = std::make_unique(hash, std::move(src), data_hash, flags, date, std::move(fec_type)); + td::uint32 date, fec::FecType fec_type, bool is_ours) { + auto F = std::make_unique(hash, std::move(src), data_hash, flags, date, std::move(fec_type), is_ours); TRY_STATUS(F->init_fec_type()); TRY_STATUS(F->run_checks()); return std::move(F); @@ -94,12 +94,12 @@ void BroadcastFec::broadcast_checked(td::Result R) { overlay_->deliver_broadcast(get_source().compute_short_id(), data_.clone()); auto manager = overlay_->overlay_manager(); while (!parts_.empty()) { - distribute_part(parts_.begin()->first); + distribute_part(parts_.begin()->first); } } // Do we need status here?? -td::Status BroadcastFec::distribute_part(td::uint32 seqno) { +td::Status BroadcastFec::distribute_part(td::uint32 seqno) { auto i = parts_.find(seqno); if (i == parts_.end()) { // should not get here @@ -111,8 +111,12 @@ td::Status BroadcastFec::distribute_part(td::uint32 seqno) { td::BufferSlice data = std::move(tls.second); auto nodes = overlay_->get_neighbours(5); - auto manager = overlay_->overlay_manager(); + if (is_ours_) { + auto priority_nodes = overlay_->get_priority_broadcast_receivers(3); + nodes.insert(nodes.end(), priority_nodes.begin(), priority_nodes.end()); + } + auto manager = overlay_->overlay_manager(); for (auto &n : nodes) { if (neighbour_completed(n)) { continue; @@ -140,7 +144,8 @@ td::Status OverlayFecBroadcastPart::apply() { if (is_short_) { return td::Status::Error(ErrorCode::protoviolation, "short broadcast part for incomplete broadcast"); } - TRY_RESULT(B, BroadcastFec::create(broadcast_hash_, source_, broadcast_data_hash_, flags_, date_, fec_type_)); + TRY_RESULT( + B, BroadcastFec::create(broadcast_hash_, source_, broadcast_data_hash_, flags_, date_, fec_type_, is_ours_)); bcast_ = B.get(); overlay_->register_fec_broadcast(std::move(B)); } @@ -304,7 +309,8 @@ td::Status OverlayFecBroadcastPart::create_new(OverlayImpl *overlay, td::actor:: auto B = std::make_unique( broadcast_hash, part_hash, PublicKey{}, overlay->get_certificate(local_id), data_hash, size, flags, - part_data_hash, std::move(part), seqno, std::move(fec_type), date, td::BufferSlice{}, false, nullptr, overlay, adnl::AdnlNodeIdShort::zero()); + part_data_hash, std::move(part), seqno, std::move(fec_type), date, td::BufferSlice{}, false, nullptr, overlay, + adnl::AdnlNodeIdShort::zero(), true); auto to_sign = B->to_sign(); auto P = td::PromiseCreator::lambda( diff --git a/overlay/overlay-fec-broadcast.hpp b/overlay/overlay-fec-broadcast.hpp index 612af22f..7f038b41 100644 --- a/overlay/overlay-fec-broadcast.hpp +++ b/overlay/overlay-fec-broadcast.hpp @@ -131,18 +131,19 @@ class BroadcastFec : public td::ListNode { td::Status run_checks(); BroadcastFec(Overlay::BroadcastHash hash, PublicKey src, Overlay::BroadcastDataHash data_hash, td::uint32 flags, - td::uint32 date, fec::FecType fec_type) + td::uint32 date, fec::FecType fec_type, bool is_ours = false) : hash_(hash) , data_hash_(data_hash) , flags_(flags) , date_(date) , src_(std::move(src)) - , fec_type_(std::move(fec_type)) { + , fec_type_(std::move(fec_type)) + , is_ours_(is_ours) { } static td::Result> create(Overlay::BroadcastHash hash, PublicKey src, Overlay::BroadcastDataHash data_hash, td::uint32 flags, - td::uint32 date, fec::FecType fec_type); + td::uint32 date, fec::FecType fec_type, bool is_ours); bool neighbour_received(adnl::AdnlNodeIdShort id) const { return received_neighbours_.find(id) != received_neighbours_.end(); @@ -225,6 +226,7 @@ class BroadcastFec : public td::ListNode { OverlayImpl *overlay_; adnl::AdnlNodeIdShort src_peer_id_ = adnl::AdnlNodeIdShort::zero(); td::BufferSlice data_; + bool is_ours_ = false; }; class OverlayFecBroadcastPart : public td::ListNode { @@ -246,6 +248,7 @@ class OverlayFecBroadcastPart : public td::ListNode { bool is_short_; bool untrusted_{false}; + bool is_ours_; BroadcastFec *bcast_; OverlayImpl *overlay_; @@ -265,7 +268,8 @@ class OverlayFecBroadcastPart : public td::ListNode { std::shared_ptr cert, Overlay::BroadcastDataHash data_hash, td::uint32 data_size, td::uint32 flags, Overlay::BroadcastDataHash part_data_hash, td::BufferSlice data, td::uint32 seqno, fec::FecType fec_type, td::uint32 date, td::BufferSlice signature, - bool is_short, BroadcastFec *bcast, OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id) + bool is_short, BroadcastFec *bcast, OverlayImpl *overlay, adnl::AdnlNodeIdShort src_peer_id, + bool is_ours = false) : broadcast_hash_(broadcast_hash) , part_hash_(part_hash) , source_(std::move(source)) @@ -282,7 +286,8 @@ class OverlayFecBroadcastPart : public td::ListNode { , is_short_(is_short) , bcast_(bcast) , overlay_(overlay) - , src_peer_id_(src_peer_id) { + , src_peer_id_(src_peer_id) + , is_ours_(is_ours) { } td::uint32 data_size() const { diff --git a/overlay/overlay-manager.cpp b/overlay/overlay-manager.cpp index 2134bbc0..96e9fc2a 100644 --- a/overlay/overlay-manager.cpp +++ b/overlay/overlay-manager.cpp @@ -359,6 +359,19 @@ void OverlayManager::get_stats(td::Promise nodes) { + auto it = overlays_.find(local_id); + if (it == overlays_.end()) { + return; + } + auto it2 = it->second.find(overlay); + if (it2 == it->second.end()) { + return; + } + td::actor::send_closure(it2->second, &Overlay::set_priority_broadcast_receivers, std::move(nodes)); +} + Certificate::Certificate(PublicKey issued_by, td::int32 expire_at, td::uint32 max_size, td::uint32 flags, td::BufferSlice signature) : issued_by_(issued_by) diff --git a/overlay/overlay-manager.h b/overlay/overlay-manager.h index ada37303..ed9da23b 100644 --- a/overlay/overlay-manager.h +++ b/overlay/overlay-manager.h @@ -96,6 +96,9 @@ class OverlayManager : public Overlays { td::actor::ActorOwn overlay); void get_stats(td::Promise> promise) override; + void set_priority_broadcast_receivers(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay, + std::vector nodes) override; + struct PrintId {}; PrintId print_id() const { diff --git a/overlay/overlay.h b/overlay/overlay.h index 146be738..fcf61ea8 100644 --- a/overlay/overlay.h +++ b/overlay/overlay.h @@ -69,6 +69,7 @@ class Overlay : public td::actor::Actor { virtual void update_peer_ip_str(adnl::AdnlNodeIdShort peer_id, td::string ip_str) = 0; //virtual void receive_broadcast(td::BufferSlice data) = 0; //virtual void subscribe(std::unique_ptr callback) = 0; + virtual void set_priority_broadcast_receivers(std::vector nodes) = 0; }; } // namespace overlay diff --git a/overlay/overlay.hpp b/overlay/overlay.hpp index 19786494..07003e09 100644 --- a/overlay/overlay.hpp +++ b/overlay/overlay.hpp @@ -192,7 +192,19 @@ class OverlayImpl : public Overlay { } else { std::vector vec; for (td::uint32 i = 0; i < max_size; i++) { - vec.push_back(neighbours_[td::Random::fast(0, static_cast(neighbours_.size()))]); + vec.push_back(neighbours_[td::Random::fast(0, static_cast(neighbours_.size()) - 1)]); + } + return vec; + } + } + std::vector get_priority_broadcast_receivers(td::uint32 max_size = 0) const { + if (max_size == 0 || max_size >= priority_broadcast_receivers_.size()) { + return priority_broadcast_receivers_; + } else { + std::vector vec; + for (td::uint32 i = 0; i < max_size; i++) { + vec.push_back(priority_broadcast_receivers_[td::Random::fast( + 0, static_cast(priority_broadcast_receivers_.size()) - 1)]); } return vec; } @@ -250,6 +262,10 @@ class OverlayImpl : public Overlay { } } + void set_priority_broadcast_receivers(std::vector nodes) override { + priority_broadcast_receivers_ = std::move(nodes); + } + private: template void process_query(adnl::AdnlNodeIdShort src, T &query, td::Promise promise) { @@ -309,6 +325,7 @@ class OverlayImpl : public Overlay { std::queue bcast_lru_; std::map> out_fec_bcasts_; + std::vector priority_broadcast_receivers_; void bcast_gc(); diff --git a/overlay/overlays.h b/overlay/overlays.h index 0a9c7765..a7b29faf 100644 --- a/overlay/overlays.h +++ b/overlay/overlays.h @@ -238,6 +238,9 @@ class Overlays : public td::actor::Actor { virtual void get_overlay_random_peers(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay, td::uint32 max_peers, td::Promise> promise) = 0; virtual void get_stats(td::Promise> promise) = 0; + + virtual void set_priority_broadcast_receivers(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay, + std::vector nodes) = 0; }; } // namespace overlay diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 56e0c1db..e491061e 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -127,6 +127,10 @@ void FullNodeShardImpl::create_overlay() { if (cert_) { td::actor::send_closure(overlays_, &overlay::Overlays::update_certificate, adnl_id_, overlay_id_, local_id_, cert_); } + if (!collator_nodes_.empty()) { + td::actor::send_closure(overlays_, &overlay::Overlays::set_priority_broadcast_receivers, adnl_id_, overlay_id_, + collator_nodes_); + } } void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broadcast, td::Promise promise) { @@ -1031,6 +1035,15 @@ void FullNodeShardImpl::update_validators(std::vector public_key_ } } +void FullNodeShardImpl::update_collators(std::vector nodes) { + if (!client_.empty()) { + return; + } + collator_nodes_ = std::move(nodes); + td::actor::send_closure(overlays_, &overlay::Overlays::set_priority_broadcast_receivers, adnl_id_, overlay_id_, + collator_nodes_); +} + void FullNodeShardImpl::reload_neighbours() { auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { if (R.is_error()) { diff --git a/validator/full-node-shard.h b/validator/full-node-shard.h index e24873aa..847091c6 100644 --- a/validator/full-node-shard.h +++ b/validator/full-node-shard.h @@ -75,6 +75,7 @@ class FullNodeShard : public td::actor::Actor { virtual void set_handle(BlockHandle handle, td::Promise promise) = 0; virtual void update_validators(std::vector public_key_hashes, PublicKeyHash local_hash) = 0; + virtual void update_collators(std::vector nodes) = 0; static td::actor::ActorOwn create( ShardIdFull shard, PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash, diff --git a/validator/full-node-shard.hpp b/validator/full-node-shard.hpp index cd7c20a9..1cb2957d 100644 --- a/validator/full-node-shard.hpp +++ b/validator/full-node-shard.hpp @@ -181,6 +181,7 @@ class FullNodeShardImpl : public FullNodeShard { void alarm() override; void update_validators(std::vector public_key_hashes, PublicKeyHash local_hash) override; + void update_collators(std::vector nodes) override; void sign_overlay_certificate(PublicKeyHash signed_key, td::uint32 expiry_at, td::uint32 max_size, td::Promise promise) override; void import_overlay_certificate(PublicKeyHash signed_key, std::shared_ptr cert, td::Promise promise) override; @@ -258,6 +259,7 @@ class FullNodeShardImpl : public FullNodeShard { adnl::AdnlNodeIdShort last_pinged_neighbour_ = adnl::AdnlNodeIdShort::zero(); FullNodeShardMode mode_; + std::vector collator_nodes_; }; } // namespace fullnode diff --git a/validator/full-node.cpp b/validator/full-node.cpp index 6792f969..52d3cfb9 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -205,6 +205,33 @@ void FullNodeImpl::update_shard_configuration(td::Ref state, s ++it; } } + + if (!collators_inited_ || state->is_key_state()) { + update_collators(state); + } +} + +void FullNodeImpl::update_collators(td::Ref state) { + collators_inited_ = true; + collator_config_ = state->get_collator_config(true); + for (auto& s : shards_) { + update_shard_collators(s.first, s.second); + } +} + +void FullNodeImpl::update_shard_collators(ShardIdFull shard, ShardInfo& info) { + if (info.actor.empty()) { + return; + } + std::vector nodes; + for (const block::CollatorNodeDescr& desc : collator_config_.collator_nodes) { + if (!desc.full_node_id.is_zero() && shard_intersects(shard, desc.shard)) { + nodes.emplace_back(desc.full_node_id); + } + } + std::sort(nodes.begin(), nodes.end()); + nodes.erase(std::unique(nodes.begin(), nodes.end()), nodes.end()); + td::actor::send_closure(info.actor, &FullNodeShard::update_collators, std::move(nodes)); } void FullNodeImpl::add_shard_actor(ShardIdFull shard, FullNodeShardMode mode) { @@ -219,6 +246,9 @@ void FullNodeImpl::add_shard_actor(ShardIdFull shard, FullNodeShardMode mode) { if (all_validators_.size() > 0) { td::actor::send_closure(info.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_); } + if (collators_inited_) { + update_shard_collators(shard, info); + } } void FullNodeImpl::sync_completed() { diff --git a/validator/full-node.hpp b/validator/full-node.hpp index fbe60d1b..8cc8994b 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -92,7 +92,16 @@ class FullNodeImpl : public FullNode { td::Promise started_promise); private: + struct ShardInfo { + bool exists = false; + td::actor::ActorOwn actor; + FullNodeShardMode mode = FullNodeShardMode::inactive; + td::Timestamp delete_at = td::Timestamp::never(); + }; + void add_shard_actor(ShardIdFull shard, FullNodeShardMode mode); + void update_collators(td::Ref state); + void update_shard_collators(ShardIdFull shard, ShardInfo& info); PublicKeyHash local_id_; adnl::AdnlNodeIdShort adnl_id_; @@ -100,13 +109,6 @@ class FullNodeImpl : public FullNode { td::actor::ActorId get_shard(AccountIdPrefixFull dst); td::actor::ActorId get_shard(ShardIdFull shard, bool exact = false); - - struct ShardInfo { - bool exists = false; - td::actor::ActorOwn actor; - FullNodeShardMode mode = FullNodeShardMode::inactive; - td::Timestamp delete_at = td::Timestamp::never(); - }; std::map shards_; td::actor::ActorId keyring_; @@ -125,6 +127,8 @@ class FullNodeImpl : public FullNode { std::set local_keys_; td::Promise started_promise_; + bool collators_inited_ = false; + block::CollatorConfig collator_config_; }; } // namespace fullnode diff --git a/validator/impl/shard.cpp b/validator/impl/shard.cpp index 019ecbeb..a09bd1df 100644 --- a/validator/impl/shard.cpp +++ b/validator/impl/shard.cpp @@ -556,5 +556,9 @@ BlockIdExt MasterchainStateQ::prev_key_block_id(BlockSeqno seqno) const { return block_id; } +bool MasterchainStateQ::is_key_state() const { + return config_ ? config_->is_key_state() : false; +} + } // namespace validator } // namespace ton diff --git a/validator/impl/shard.hpp b/validator/impl/shard.hpp index 6b95c7e4..c726447f 100644 --- a/validator/impl/shard.hpp +++ b/validator/impl/shard.hpp @@ -129,6 +129,7 @@ class MasterchainStateQ : public MasterchainState, public ShardStateQ { BlockIdExt last_key_block_id() const override; BlockIdExt next_key_block_id(BlockSeqno seqno) const override; BlockIdExt prev_key_block_id(BlockSeqno seqno) const override; + bool is_key_state() const override; MasterchainStateQ* make_copy() const override; static td::Result> fetch(const BlockIdExt& _id, td::BufferSlice _data, diff --git a/validator/interfaces/shard.h b/validator/interfaces/shard.h index 15caa7b2..59211a85 100644 --- a/validator/interfaces/shard.h +++ b/validator/interfaces/shard.h @@ -77,6 +77,7 @@ class MasterchainState : virtual public ShardState { virtual BlockIdExt last_key_block_id() const = 0; virtual BlockIdExt next_key_block_id(BlockSeqno seqno) const = 0; virtual BlockIdExt prev_key_block_id(BlockSeqno seqno) const = 0; + virtual bool is_key_state() const = 0; virtual bool get_old_mc_block_id(ton::BlockSeqno seqno, ton::BlockIdExt& blkid, ton::LogicalTime* end_lt = nullptr) const = 0; virtual bool check_old_mc_block_id(const ton::BlockIdExt& blkid, bool strict = false) const = 0;