From 46ca0e6014bd879a5a92a0f3cf44ed5caad6651f Mon Sep 17 00:00:00 2001 From: Marat <98183742+dungeon-master-666@users.noreply.github.com> Date: Fri, 29 Mar 2024 20:01:53 +0100 Subject: [PATCH 1/2] Check pack_statistics for nullptr (#950) --- validator/db/archive-slice.cpp | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index 43a02ec4..86c2ee93 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -670,7 +670,9 @@ void ArchiveSlice::do_close() { LOG(DEBUG) << "Closing archive slice " << db_path_; status_ = st_closed; kv_ = {}; - statistics_.pack_statistics->record_close(packages_.size()); + if (statistics_.pack_statistics) { + statistics_.pack_statistics->record_close(packages_.size()); + } packages_.clear(); } @@ -774,7 +776,9 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error(); return; } - statistics_.pack_statistics->record_open(); + if (statistics_.pack_statistics) { + statistics_.pack_statistics->record_open(); + } auto idx = td::narrow_cast(packages_.size()); if (finalized_) { packages_.emplace_back(nullptr, td::actor::ActorOwn(), seqno, path, idx, version); @@ -817,7 +821,9 @@ void ArchiveSlice::destroy(td::Promise promise) { for (auto &p : packages_) { td::unlink(p.path).ensure(); } - statistics_.pack_statistics->record_close(packages_.size()); + if (statistics_.pack_statistics) { + statistics_.pack_statistics->record_close(packages_.size()); + } packages_.clear(); kv_ = nullptr; @@ -1023,7 +1029,9 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) { td::unlink(packages_[idx].path).ensure(); } - statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1); + if (statistics_.pack_statistics) { + statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1); + } packages_.erase(packages_.begin() + pack->idx + 1, packages_.end()); kv_->commit_transaction().ensure(); From 0434eadc1f1965ff39b76159798534a4207d3995 Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Mon, 1 Apr 2024 16:44:08 +0300 Subject: [PATCH 2/2] Add custom overlays for external messages (#949) * Private overlay for external messages * Improve ext msg overlays * Manage from validator console * Bypass out queue size limit for high-priority messages * Shuffle messages in get_external_messages * Cleanup mempool when creating validator group * Improve private overlays for externals 1. Allow using validator adnl ids in addition to fullnode ids 2. Set priority per sender, not per overlay 3. Require the same overlay name for all nodes 4. Enable lz4 in private block overlay * Fix typo, add debug logs * Enable lz4 in private block overlay by config Change proto_version for lz4 in catchain overlays to 4 * Add logs for broadcasts in fullnode --------- Co-authored-by: SpyCheese --- catchain/catchain-receiver.cpp | 3 +- create-hardfork/create-hardfork.cpp | 2 +- overlay/overlay-manager.cpp | 5 +- overlay/overlay-manager.h | 2 +- overlay/overlay.cpp | 9 +- overlay/overlay.h | 3 +- overlay/overlays.h | 2 +- test/test-ton-collator.cpp | 2 +- tl/generate/scheme/ton_api.tl | 9 + tl/generate/scheme/ton_api.tlo | Bin 86740 -> 88032 bytes .../validator-engine-console-query.cpp | 72 +++++++ .../validator-engine-console-query.h | 64 ++++++ .../validator-engine-console.cpp | 3 + validator-engine/validator-engine.cpp | 153 +++++++++++++- validator-engine/validator-engine.hpp | 17 ++ validator-session/validator-session.cpp | 2 +- validator/full-node-private-overlay.cpp | 171 ++++++++++++--- validator/full-node-private-overlay.hpp | 98 +++++++-- validator/full-node-shard.cpp | 14 +- validator/full-node.cpp | 194 +++++++++++------- validator/full-node.h | 5 + validator/full-node.hpp | 22 +- validator/impl/collator-impl.h | 12 +- validator/impl/collator.cpp | 39 ++-- validator/interfaces/validator-manager.h | 3 +- validator/manager-disk.cpp | 12 +- validator/manager-disk.hpp | 7 +- validator/manager-hardfork.cpp | 12 +- validator/manager-hardfork.hpp | 7 +- validator/manager.cpp | 101 +++++---- validator/manager.hpp | 23 ++- validator/validator.h | 2 +- 32 files changed, 843 insertions(+), 227 deletions(-) diff --git a/catchain/catchain-receiver.cpp b/catchain/catchain-receiver.cpp index c8206de9..82779e3b 100644 --- a/catchain/catchain-receiver.cpp +++ b/catchain/catchain-receiver.cpp @@ -520,7 +520,8 @@ void CatChainReceiverImpl::start_up() { } td::actor::send_closure(overlay_manager_, &overlay::Overlays::create_private_overlay, get_source(local_idx_)->get_adnl_id(), overlay_full_id_.clone(), std::move(ids), - make_callback(), overlay::OverlayPrivacyRules{0, 0, std::move(root_keys)}); + make_callback(), overlay::OverlayPrivacyRules{0, 0, std::move(root_keys)}, + R"({ "type": "catchain" })"); CHECK(root_block_); diff --git a/create-hardfork/create-hardfork.cpp b/create-hardfork/create-hardfork.cpp index 5f0b93be..c5f1add8 100644 --- a/create-hardfork/create-hardfork.cpp +++ b/create-hardfork/create-hardfork.cpp @@ -213,7 +213,7 @@ class HardforkCreator : public td::actor::Actor { ton::validator::ValidatorManagerHardforkFactory::create(opts, shard_, shard_top_block_id_, db_root_); for (auto &msg : ext_msgs_) { td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_external_message, - std::move(msg)); + std::move(msg), 0); } for (auto &topmsg : top_shard_descrs_) { td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_shard_block, ton::BlockIdExt{}, diff --git a/overlay/overlay-manager.cpp b/overlay/overlay-manager.cpp index 3c5f5eab..b9eb95b9 100644 --- a/overlay/overlay-manager.cpp +++ b/overlay/overlay-manager.cpp @@ -108,11 +108,12 @@ void OverlayManager::create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, Ov void OverlayManager::create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, - std::unique_ptr callback, OverlayPrivacyRules rules) { + std::unique_ptr callback, OverlayPrivacyRules rules, + std::string scope) { auto id = overlay_id.compute_short_id(); register_overlay(local_id, id, Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id), - std::move(nodes), std::move(callback), std::move(rules))); + std::move(nodes), std::move(callback), std::move(rules), std::move(scope))); } void OverlayManager::receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data) { diff --git a/overlay/overlay-manager.h b/overlay/overlay-manager.h index 035ef3e8..1b9c75a4 100644 --- a/overlay/overlay-manager.h +++ b/overlay/overlay-manager.h @@ -57,7 +57,7 @@ class OverlayManager : public Overlays { OverlayOptions opts) override; void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, std::unique_ptr callback, - OverlayPrivacyRules rules) override; + OverlayPrivacyRules rules, std::string scope) override; void delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id) override; void send_query(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id, std::string name, td::Promise promise, td::Timestamp timeout, td::BufferSlice query) override { diff --git a/overlay/overlay.cpp b/overlay/overlay.cpp index af01e045..f964ccc5 100644 --- a/overlay/overlay.cpp +++ b/overlay/overlay.cpp @@ -49,10 +49,11 @@ td::actor::ActorOwn Overlay::create(td::actor::ActorId manager, td::actor::ActorId dht_node, adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, - std::unique_ptr callback, OverlayPrivacyRules rules) { - auto R = - td::actor::create_actor("overlay", keyring, adnl, manager, dht_node, local_id, std::move(overlay_id), - false, std::move(nodes), std::move(callback), std::move(rules)); + std::unique_ptr callback, OverlayPrivacyRules rules, + std::string scope) { + auto R = td::actor::create_actor("overlay", keyring, adnl, manager, dht_node, local_id, + std::move(overlay_id), false, std::move(nodes), std::move(callback), + std::move(rules), std::move(scope)); return td::actor::ActorOwn(std::move(R)); } diff --git a/overlay/overlay.h b/overlay/overlay.h index da41a247..c1fe9643 100644 --- a/overlay/overlay.h +++ b/overlay/overlay.h @@ -48,7 +48,8 @@ class Overlay : public td::actor::Actor { td::actor::ActorId manager, td::actor::ActorId dht_node, adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, - std::unique_ptr callback, OverlayPrivacyRules rules); + std::unique_ptr callback, OverlayPrivacyRules rules, + std::string scope); virtual void update_dht_node(td::actor::ActorId dht) = 0; diff --git a/overlay/overlays.h b/overlay/overlays.h index 79551e05..6bf5852f 100644 --- a/overlay/overlays.h +++ b/overlay/overlays.h @@ -205,7 +205,7 @@ class Overlays : public td::actor::Actor { td::string scope, OverlayOptions opts) = 0; virtual void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, std::unique_ptr callback, - OverlayPrivacyRules rules) = 0; + OverlayPrivacyRules rules, std::string scope) = 0; virtual void delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id) = 0; virtual void send_query(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id, diff --git a/test/test-ton-collator.cpp b/test/test-ton-collator.cpp index b76dd5eb..286a5fea 100644 --- a/test/test-ton-collator.cpp +++ b/test/test-ton-collator.cpp @@ -300,7 +300,7 @@ class TestNode : public td::actor::Actor { shard_top_block_id_, db_root_); for (auto &msg : ext_msgs_) { td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_external_message, - std::move(msg)); + std::move(msg), 0); } for (auto &topmsg : top_shard_descrs_) { td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_shard_block, ton::BlockIdExt{}, diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 159a0c88..90e6292f 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -400,6 +400,7 @@ tonNode.newShardBlockBroadcast block:tonNode.newShardBlock = tonNode.Broadcast; tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId; tonNode.privateBlockOverlayId zero_state_file_hash:int256 nodes:(vector int256) = tonNode.PrivateBlockOverlayId; +tonNode.privateExtMsgsOverlayId zero_state_file_hash:int256 name:string nodes:(vector int256) = tonNode.PrivateExtMsgsOverlayId; tonNode.keyBlocks blocks:(vector tonNode.blockIdExt) incomplete:Bool error:Bool = tonNode.KeyBlocks; @@ -594,6 +595,10 @@ engine.validator.config out_port:int addrs:(vector engine.Addr) adnl:(vector eng liteservers:(vector engine.liteServer) control:(vector engine.controlInterface) gc:engine.gc = engine.validator.Config; +engine.validator.privateExtMsgOverlayNode adnl_id:int256 sender:Bool sender_priority:int = engine.validator.PrivateExtMsgOverlayNode; +engine.validator.privateExtMsgOverlay name:string nodes:(vector engine.validator.privateExtMsgOverlayNode) = engine.validator.PrivateExtMsgOverlay; +engine.validator.privateExtMsgOverlaysConfig overlays:(vector engine.validator.privateExtMsgOverlay) = engine.validator.PrivateExtMsgOverlaysConfig; + ---functions--- ---types--- @@ -697,6 +702,10 @@ engine.validator.getPerfTimerStats name:string = engine.validator.PerfTimerStats engine.validator.getShardOutQueueSize flags:# block_id:tonNode.blockId dest_wc:flags.0?int dest_shard:flags.0?long = engine.validator.ShardOutQueueSize; engine.validator.setExtMessagesBroadcastDisabled disabled:Bool = engine.validator.Success; +engine.validator.addPrivateExtMsgOverlay overlay:engine.validator.privateExtMsgOverlay = engine.validator.Success; +engine.validator.delPrivateExtMsgOverlay name:string = engine.validator.Success; +engine.validator.showPrivateExtMsgOverlays = engine.validator.PrivateExtMsgOverlaysConfig; + ---types--- storage.pong = storage.Pong; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index 18df7794cc6bf48a3e48ce6020c1a8051d2bf747..78d9ba23cba275c31a82238cf58940f03d179496 100644 GIT binary patch delta 804 zcmcbzl=Z=SR=!8G^{p77fSGY4-z0IZ&#%*-s;1_pXXd5ql_ln6rX-f+7wH8QWtJtD zq`Fp=_!g)8m!%fvBvwvbC#6@?`*M*cQL6p&Q&NFC4ELz5o25gPj$-HhytK^p$%-5@ zn-_}j7F9{GU2s~yBtH*i6sn7gp-%Elnf!oRVsngc2Mc4%=BI|)*#_ACQ-H-k3=AN( zlkd#YogDj7ITqwKMrha+IkV|-*tQ)6DS(C7RXMI0?B2&_ILOTm3?TJ4j|urOPgb0z z&dyexnwOGVG}-=&6hDL+57e4plvz?a`GB>I2S_!tZPq #include @@ -1107,3 +1109,73 @@ td::Status SetExtMessagesBroadcastDisabledQuery::receive(td::BufferSlice data) { td::TerminalIO::out() << "success\n"; return td::Status::OK(); } + +td::Status AddPrivateExtMsgOverlayQuery::run() { + TRY_RESULT_ASSIGN(file_name_, tokenizer_.get_token()); + TRY_STATUS(tokenizer_.check_endl()); + return td::Status::OK(); +} + +td::Status AddPrivateExtMsgOverlayQuery::send() { + TRY_RESULT(data, td::read_file(file_name_)); + TRY_RESULT(json, td::json_decode(data.as_slice())); + auto overlay = ton::create_tl_object(); + TRY_STATUS(ton::ton_api::from_json(*overlay, json.get_object())); + auto b = ton::create_serialize_tl_object(std::move(overlay)); + td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); + return td::Status::OK(); +} + +td::Status AddPrivateExtMsgOverlayQuery::receive(td::BufferSlice data) { + TRY_RESULT_PREFIX(f, ton::fetch_tl_object(data.as_slice(), true), + "received incorrect answer: "); + td::TerminalIO::out() << "success\n"; + return td::Status::OK(); +} + +td::Status DelPrivateExtMsgOverlayQuery::run() { + TRY_RESULT_ASSIGN(name_, tokenizer_.get_token()); + TRY_STATUS(tokenizer_.check_endl()); + return td::Status::OK(); +} + +td::Status DelPrivateExtMsgOverlayQuery::send() { + auto b = ton::create_serialize_tl_object(name_); + td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); + return td::Status::OK(); +} + +td::Status DelPrivateExtMsgOverlayQuery::receive(td::BufferSlice data) { + TRY_RESULT_PREFIX(f, ton::fetch_tl_object(data.as_slice(), true), + "received incorrect answer: "); + td::TerminalIO::out() << "success\n"; + return td::Status::OK(); +} + +td::Status ShowPrivateExtMsgOverlaysQuery::run() { + TRY_STATUS(tokenizer_.check_endl()); + return td::Status::OK(); +} + +td::Status ShowPrivateExtMsgOverlaysQuery::send() { + auto b = ton::create_serialize_tl_object(); + td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); + return td::Status::OK(); +} + +td::Status ShowPrivateExtMsgOverlaysQuery::receive(td::BufferSlice data) { + TRY_RESULT_PREFIX( + f, ton::fetch_tl_object(data.as_slice(), true), + "received incorrect answer: "); + td::TerminalIO::out() << f->overlays_.size() << " private overlays:\n\n"; + for (const auto &overlay : f->overlays_) { + td::TerminalIO::out() << "Overlay \"" << overlay->name_ << "\": " << overlay->nodes_.size() << " nodes\n"; + for (const auto &node : overlay->nodes_) { + td::TerminalIO::out() << " " << node->adnl_id_ + << (node->sender_ ? (PSTRING() << " (sender, p=" << node->sender_priority_ << ")") : "") + << "\n"; + } + td::TerminalIO::out() << "\n"; + } + return td::Status::OK(); +} diff --git a/validator-engine-console/validator-engine-console-query.h b/validator-engine-console/validator-engine-console-query.h index b1bdac7c..b3bb13f9 100644 --- a/validator-engine-console/validator-engine-console-query.h +++ b/validator-engine-console/validator-engine-console-query.h @@ -1144,3 +1144,67 @@ class SetExtMessagesBroadcastDisabledQuery : public Query { private: bool value; }; + +class AddPrivateExtMsgOverlayQuery : public Query { + public: + AddPrivateExtMsgOverlayQuery(td::actor::ActorId console, Tokenizer tokenizer) + : Query(console, std::move(tokenizer)) { + } + td::Status run() override; + td::Status send() override; + td::Status receive(td::BufferSlice data) override; + static std::string get_name() { + return "addprivateextmsgoverlay"; + } + static std::string get_help() { + return "addprivateextmsgoverlay \tadd private overlay for external messages with config from file " + ""; + } + std::string name() const override { + return get_name(); + } + + private: + std::string file_name_; +}; + +class DelPrivateExtMsgOverlayQuery : public Query { + public: + DelPrivateExtMsgOverlayQuery(td::actor::ActorId console, Tokenizer tokenizer) + : Query(console, std::move(tokenizer)) { + } + td::Status run() override; + td::Status send() override; + td::Status receive(td::BufferSlice data) override; + static std::string get_name() { + return "delprivateextmsgoverlay"; + } + static std::string get_help() { + return "delprivateextmsgoverlay \tdelete private overlay for external messages with name "; + } + std::string name() const override { + return get_name(); + } + + private: + std::string name_; +}; + +class ShowPrivateExtMsgOverlaysQuery : public Query { + public: + ShowPrivateExtMsgOverlaysQuery(td::actor::ActorId console, Tokenizer tokenizer) + : Query(console, std::move(tokenizer)) { + } + td::Status run() override; + td::Status send() override; + td::Status receive(td::BufferSlice data) override; + static std::string get_name() { + return "showprivateextmsgoverlays"; + } + static std::string get_help() { + return "showprivateextmsgoverlays\tshow all private overlay for external messages"; + } + std::string name() const override { + return get_name(); + } +}; diff --git a/validator-engine-console/validator-engine-console.cpp b/validator-engine-console/validator-engine-console.cpp index 01acced9..cf09ef69 100644 --- a/validator-engine-console/validator-engine-console.cpp +++ b/validator-engine-console/validator-engine-console.cpp @@ -143,6 +143,9 @@ void ValidatorEngineConsole::run() { add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); + add_query_runner(std::make_unique>()); + add_query_runner(std::make_unique>()); + add_query_runner(std::make_unique>()); } bool ValidatorEngineConsole::envelope_send_query(td::BufferSlice query, td::Promise promise) { diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index 3c9f44be..648dad26 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -1843,6 +1843,7 @@ void ValidatorEngine::start_full_node() { config_.full_node_config, keyring_.get(), adnl_.get(), rldp_.get(), rldp2_.get(), default_dht_node_.is_zero() ? td::actor::ActorId{} : dht_nodes_[default_dht_node_].get(), overlay_manager_.get(), validator_manager_.get(), full_node_client_.get(), db_root_); + load_private_ext_msg_overlays_config(); } for (auto &v : config_.validators) { @@ -2335,6 +2336,66 @@ void ValidatorEngine::try_del_proxy(td::uint32 ip, td::int32 port, std::vector(); + auto data_R = td::read_file(private_ext_msg_overlays_config_file()); + if (data_R.is_error()) { + return; + } + auto data = data_R.move_as_ok(); + auto json_R = td::json_decode(data.as_slice()); + if (json_R.is_error()) { + LOG(ERROR) << "Failed to parse private ext msg overlays config: " << json_R.move_as_error(); + return; + } + auto json = json_R.move_as_ok(); + auto S = ton::ton_api::from_json(*private_ext_msg_overlays_config_, json.get_object()); + if (S.is_error()) { + LOG(ERROR) << "Failed to parse private ext msg overlays config: " << S; + return; + } + + for (auto &overlay : private_ext_msg_overlays_config_->overlays_) { + std::vector nodes; + std::map senders; + for (const auto &node : overlay->nodes_) { + nodes.emplace_back(node->adnl_id_); + if (node->sender_) { + senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->sender_priority_; + } + } + td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes), + std::move(senders), overlay->name_, [](td::Result R) { R.ensure(); }); + } +} + +td::Status ValidatorEngine::write_private_ext_msg_overlays_config() { + auto s = td::json_encode(td::ToJson(*private_ext_msg_overlays_config_), true); + TRY_STATUS_PREFIX(td::write_file(private_ext_msg_overlays_config_file(), s), "failed to write config: "); + return td::Status::OK(); +} + +void ValidatorEngine::add_private_ext_msg_overlay_to_config( + ton::tl_object_ptr overlay, td::Promise promise) { + private_ext_msg_overlays_config_->overlays_.push_back(std::move(overlay)); + TRY_STATUS_PROMISE(promise, write_private_ext_msg_overlays_config()); + promise.set_result(td::Unit()); +} + +void ValidatorEngine::del_private_ext_msg_overlay_from_config(std::string name, td::Promise promise) { + auto &overlays = private_ext_msg_overlays_config_->overlays_; + for (size_t i = 0; i < overlays.size(); ++i) { + if (overlays[i]->name_ == name) { + overlays.erase(overlays.begin() + i); + TRY_STATUS_PROMISE(promise, write_private_ext_msg_overlays_config()); + promise.set_result(td::Unit()); + return; + } + } + promise.set_error(td::Status::Error(PSTRING() << "no overlay \"" << name << "\" in config")); +} + void ValidatorEngine::check_key(ton::PublicKeyHash id, td::Promise promise) { if (keys_.count(id) == 1) { promise.set_value(td::Unit()); @@ -3462,7 +3523,8 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getShardO }); } -void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, +void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query, + td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise) { if (!(perm & ValidatorEnginePermissions::vep_modify)) { promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); @@ -3488,6 +3550,95 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setExtMes }); } +void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_addPrivateExtMsgOverlay &query, + td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, + td::Promise promise) { + if (!(perm & ValidatorEnginePermissions::vep_modify)) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); + return; + } + if (!started_ || full_node_.empty()) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started"))); + return; + } + + auto &overlay = query.overlay_; + std::vector nodes; + std::map senders; + for (const auto &node : overlay->nodes_) { + nodes.emplace_back(node->adnl_id_); + if (node->sender_) { + senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->sender_priority_; + } + } + std::string name = overlay->name_; + td::actor::send_closure( + full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes), std::move(senders), + std::move(name), + [SelfId = actor_id(this), overlay = std::move(overlay), + promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_control_query_error(R.move_as_error())); + return; + } + td::actor::send_closure( + SelfId, &ValidatorEngine::add_private_ext_msg_overlay_to_config, std::move(overlay), + [promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_control_query_error(R.move_as_error())); + return; + } + promise.set_value(ton::create_serialize_tl_object()); + }); + }); +} + +void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_delPrivateExtMsgOverlay &query, + td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, + td::Promise promise) { + if (!(perm & ValidatorEnginePermissions::vep_modify)) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); + return; + } + if (!started_ || full_node_.empty()) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started"))); + return; + } + td::actor::send_closure( + full_node_, &ton::validator::fullnode::FullNode::del_ext_msg_overlay, query.name_, + [SelfId = actor_id(this), name = query.name_, promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_control_query_error(R.move_as_error())); + return; + } + td::actor::send_closure( + SelfId, &ValidatorEngine::del_private_ext_msg_overlay_from_config, std::move(name), + [promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_control_query_error(R.move_as_error())); + return; + } + promise.set_value(ton::create_serialize_tl_object()); + }); + }); +} + +void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_showPrivateExtMsgOverlays &query, + td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, + td::Promise promise) { + if (!(perm & ValidatorEnginePermissions::vep_default)) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); + return; + } + if (!started_ || full_node_.empty()) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started"))); + return; + } + + promise.set_value(ton::serialize_tl_object( + private_ext_msg_overlays_config_, true)); +} + void ValidatorEngine::process_control_query(td::uint16 port, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data, td::Promise promise) { diff --git a/validator-engine/validator-engine.hpp b/validator-engine/validator-engine.hpp index 84a5e67f..027c764f 100644 --- a/validator-engine/validator-engine.hpp +++ b/validator-engine/validator-engine.hpp @@ -167,6 +167,7 @@ class ValidatorEngine : public td::actor::Actor { std::shared_ptr dht_config_; td::Ref validator_options_; Config config_; + ton::tl_object_ptr private_ext_msg_overlays_config_; std::set running_gc_; @@ -362,6 +363,16 @@ class ValidatorEngine : public td::actor::Actor { void try_del_proxy(td::uint32 ip, td::int32 port, std::vector cats, std::vector prio_cats, td::Promise promise); + std::string private_ext_msg_overlays_config_file() const { + return db_root_ + "/private-ext-msg-overlays.json"; + } + + void load_private_ext_msg_overlays_config(); + td::Status write_private_ext_msg_overlays_config(); + void add_private_ext_msg_overlay_to_config( + ton::tl_object_ptr overlay, td::Promise promise); + void del_private_ext_msg_overlay_from_config(std::string name, td::Promise promise); + void check_key(ton::PublicKeyHash id, td::Promise promise); static td::BufferSlice create_control_query_error(td::Status error); @@ -440,6 +451,12 @@ class ValidatorEngine : public td::actor::Actor { ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); void run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); + void run_control_query(ton::ton_api::engine_validator_addPrivateExtMsgOverlay &query, td::BufferSlice data, + ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); + void run_control_query(ton::ton_api::engine_validator_delPrivateExtMsgOverlay &query, td::BufferSlice data, + ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); + void run_control_query(ton::ton_api::engine_validator_showPrivateExtMsgOverlays &query, td::BufferSlice data, + ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); template void run_control_query(T &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise) { diff --git a/validator-session/validator-session.cpp b/validator-session/validator-session.cpp index 51650524..6b328b2b 100644 --- a/validator-session/validator-session.cpp +++ b/validator-session/validator-session.cpp @@ -902,7 +902,7 @@ ValidatorSessionImpl::ValidatorSessionImpl(catchain::CatChainSessionId session_i , rldp_(rldp) , overlay_manager_(overlays) , allow_unsafe_self_blocks_resync_(allow_unsafe_self_blocks_resync) { - compress_block_candidates_ = opts.proto_version >= 3; + compress_block_candidates_ = opts.proto_version >= 4; description_ = ValidatorSessionDescription::create(std::move(opts), nodes, local_id); src_round_candidate_.resize(description_->get_total_nodes()); } diff --git a/validator/full-node-private-overlay.cpp b/validator/full-node-private-overlay.cpp index 559dba17..9fd02ab4 100644 --- a/validator/full-node-private-overlay.cpp +++ b/validator/full-node-private-overlay.cpp @@ -19,26 +19,25 @@ #include "common/delay.h" #include "full-node-serializer.hpp" -namespace ton { +namespace ton::validator::fullnode { -namespace validator { - -namespace fullnode { - -void FullNodePrivateOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) { +void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) { process_block_broadcast(src, query); } -void FullNodePrivateOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query) { +void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, + ton_api::tonNode_blockBroadcastCompressed &query) { process_block_broadcast(src, query); } -void FullNodePrivateOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) { +void FullNodePrivateBlockOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) { auto B = deserialize_block_broadcast(query, overlay::Overlays::max_fec_broadcast_size()); if (B.is_error()) { LOG(DEBUG) << "dropped broadcast: " << B.move_as_error(); return; } + VLOG(FULL_NODE_DEBUG) << "Received block broadcast in private overlay from " << src << ": " + << B.ok().block_id.to_str(); auto P = td::PromiseCreator::lambda([](td::Result R) { if (R.is_error()) { if (R.error().code() == ErrorCode::notready) { @@ -52,25 +51,28 @@ void FullNodePrivateOverlay::process_block_broadcast(PublicKeyHash src, ton_api: std::move(P)); } -void FullNodePrivateOverlay::process_broadcast(PublicKeyHash, ton_api::tonNode_newShardBlockBroadcast &query) { - td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, - create_block_id(query.block_->block_), query.block_->cc_seqno_, - std::move(query.block_->data_)); +void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) { + BlockIdExt block_id = create_block_id(query.block_->block_); + VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in private overlay from " << src << ": " + << block_id.to_str(); + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, block_id, + query.block_->cc_seqno_, std::move(query.block_->data_)); } -void FullNodePrivateOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { +void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { auto B = fetch_tl_object(std::move(broadcast), true); if (B.is_error()) { return; } - ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); }); } -void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) { +void FullNodePrivateBlockOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, + td::BufferSlice data) { if (!inited_) { return; } + VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast in private overlay: " << block_id.to_str(); auto B = create_serialize_tl_object( create_tl_object(create_tl_block_id(block_id), cc_seqno, std::move(data))); if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) { @@ -82,11 +84,13 @@ void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, Catchain } } -void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) { +void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) { if (!inited_) { return; } - auto B = serialize_block_broadcast(broadcast, false); // compression_enabled = false + VLOG(FULL_NODE_DEBUG) << "Sending block broadcast in private overlay" + << (enable_compression_ ? " (with compression)" : "") << ": " << broadcast.block_id.to_str(); + auto B = serialize_block_broadcast(broadcast, enable_compression_); if (B.is_error()) { VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error(); return; @@ -95,7 +99,7 @@ void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) { local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok()); } -void FullNodePrivateOverlay::start_up() { +void FullNodePrivateBlockOverlay::start_up() { std::sort(nodes_.begin(), nodes_.end()); nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end()); @@ -112,22 +116,22 @@ void FullNodePrivateOverlay::start_up() { try_init(); } -void FullNodePrivateOverlay::try_init() { +void FullNodePrivateBlockOverlay::try_init() { // Sometimes adnl id is added to validator engine later (or not at all) td::actor::send_closure( adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result R) { if (R.is_ok() && R.ok()) { - td::actor::send_closure(SelfId, &FullNodePrivateOverlay::init); + td::actor::send_closure(SelfId, &FullNodePrivateBlockOverlay::init); } else { - delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateOverlay::try_init); }, + delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateBlockOverlay::try_init); }, td::Timestamp::in(30.0)); } }); } -void FullNodePrivateOverlay::init() { - LOG(FULL_NODE_INFO) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size() - << " nodes"; +void FullNodePrivateBlockOverlay::init() { + LOG(FULL_NODE_WARNING) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size() + << " nodes, overlay_id=" << overlay_id_; class Callback : public overlay::Overlays::Callback { public: void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { @@ -136,37 +140,140 @@ void FullNodePrivateOverlay::init() { td::Promise promise) override { } void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { - td::actor::send_closure(node_, &FullNodePrivateOverlay::receive_broadcast, src, std::move(data)); + td::actor::send_closure(node_, &FullNodePrivateBlockOverlay::receive_broadcast, src, std::move(data)); } void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data, td::Promise promise) override { } - Callback(td::actor::ActorId node) : node_(node) { + Callback(td::actor::ActorId node) : node_(node) { } private: - td::actor::ActorId node_; + td::actor::ActorId node_; }; overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), overlay::CertificateFlags::AllowFec | overlay::CertificateFlags::Trusted, {}}; td::actor::send_closure(overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), - nodes_, std::make_unique(actor_id(this)), rules); + nodes_, std::make_unique(actor_id(this)), rules, R"({ "type": "private-blocks" })"); td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_); td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_); inited_ = true; } -void FullNodePrivateOverlay::tear_down() { +void FullNodePrivateBlockOverlay::tear_down() { if (inited_) { td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_); } } -} // namespace fullnode +void FullNodePrivateExtMsgOverlay::process_broadcast(PublicKeyHash src, + ton_api::tonNode_externalMessageBroadcast &query) { + auto it = senders_.find(adnl::AdnlNodeIdShort{src}); + if (it == senders_.end()) { + return; + } + LOG(FULL_NODE_DEBUG) << "Got external message in private overlay \"" << name_ << "\" from " << src + << " (priority=" << it->second << ")"; + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message, + std::move(query.message_->data_), it->second); +} -} // namespace validator +void FullNodePrivateExtMsgOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { + auto B = fetch_tl_object(std::move(broadcast), true); + if (B.is_error()) { + return; + } + ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); }); +} -} // namespace ton +void FullNodePrivateExtMsgOverlay::send_external_message(td::BufferSlice data) { + if (config_.ext_messages_broadcast_disabled_) { + return; + } + LOG(FULL_NODE_DEBUG) << "Sending external message to private overlay \"" << name_ << "\""; + auto B = create_serialize_tl_object( + create_tl_object(std::move(data))); + if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) { + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), 0, std::move(B)); + } else { + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), 0, std::move(B)); + } +} + +void FullNodePrivateExtMsgOverlay::start_up() { + std::sort(nodes_.begin(), nodes_.end()); + nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end()); + std::vector nodes; + for (const adnl::AdnlNodeIdShort &id : nodes_) { + nodes.push_back(id.bits256_value()); + } + auto X = + create_hash_tl_object(zero_state_file_hash_, name_, std::move(nodes)); + td::BufferSlice b{32}; + b.as_slice().copy_from(as_slice(X)); + overlay_id_full_ = overlay::OverlayIdFull{std::move(b)}; + overlay_id_ = overlay_id_full_.compute_short_id(); + try_init(); +} + +void FullNodePrivateExtMsgOverlay::try_init() { + // Sometimes adnl id is added to validator engine later (or not at all) + td::actor::send_closure( + adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result R) { + if (R.is_ok() && R.ok()) { + td::actor::send_closure(SelfId, &FullNodePrivateExtMsgOverlay::init); + } else { + delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateExtMsgOverlay::try_init); }, + td::Timestamp::in(30.0)); + } + }); +} + +void FullNodePrivateExtMsgOverlay::init() { + LOG(FULL_NODE_WARNING) << "Creating private ext msg overlay \"" << name_ << "\" for adnl id " << local_id_ << " : " + << nodes_.size() << " nodes, overlay_id=" << overlay_id_; + class Callback : public overlay::Overlays::Callback { + public: + void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { + } + void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data, + td::Promise promise) override { + } + void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { + td::actor::send_closure(node_, &FullNodePrivateExtMsgOverlay::receive_broadcast, src, std::move(data)); + } + void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data, + td::Promise promise) override { + } + Callback(td::actor::ActorId node) : node_(node) { + } + + private: + td::actor::ActorId node_; + }; + + std::map authorized_keys; + for (const auto &sender : senders_) { + authorized_keys[sender.first.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size(); + } + overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), 0, std::move(authorized_keys)}; + td::actor::send_closure( + overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), nodes_, + std::make_unique(actor_id(this)), rules, + PSTRING() << R"({ "type": "private-ext-msg", "name": ")" << td::format::Escaped{name_} << R"(" })"); + + td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_); + td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_); +} + +void FullNodePrivateExtMsgOverlay::tear_down() { + LOG(FULL_NODE_WARNING) << "Destroying private ext msg overlay \"" << name_ << "\" for adnl id " << local_id_; + td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_); +} + +} // namespace ton::validator::fullnode diff --git a/validator/full-node-private-overlay.hpp b/validator/full-node-private-overlay.hpp index e6497a87..5fbb8825 100644 --- a/validator/full-node-private-overlay.hpp +++ b/validator/full-node-private-overlay.hpp @@ -18,13 +18,9 @@ #include "full-node.h" -namespace ton { +namespace ton::validator::fullnode { -namespace validator { - -namespace fullnode { - -class FullNodePrivateOverlay : public td::actor::Actor { +class FullNodePrivateBlockOverlay : public td::actor::Actor { public: void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query); @@ -40,19 +36,89 @@ class FullNodePrivateOverlay : public td::actor::Actor { void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data); void send_broadcast(BlockBroadcast broadcast); + void set_config(FullNodeConfig config) { + config_ = std::move(config); + } + + void set_enable_compression(bool value) { + enable_compression_ = value; + } + void start_up() override; void tear_down() override; - FullNodePrivateOverlay(adnl::AdnlNodeIdShort local_id, std::vector nodes, - FileHash zero_state_file_hash, FullNodeConfig config, - td::actor::ActorId keyring, td::actor::ActorId adnl, - td::actor::ActorId rldp, td::actor::ActorId rldp2, - td::actor::ActorId overlays, - td::actor::ActorId validator_manager) + FullNodePrivateBlockOverlay(adnl::AdnlNodeIdShort local_id, std::vector nodes, + FileHash zero_state_file_hash, FullNodeConfig config, bool enable_compression, + td::actor::ActorId keyring, td::actor::ActorId adnl, + td::actor::ActorId rldp, td::actor::ActorId rldp2, + td::actor::ActorId overlays, + td::actor::ActorId validator_manager) : local_id_(local_id) , nodes_(std::move(nodes)) , zero_state_file_hash_(zero_state_file_hash) , config_(config) + , enable_compression_(enable_compression) + , keyring_(keyring) + , adnl_(adnl) + , rldp_(rldp) + , rldp2_(rldp2) + , overlays_(overlays) + , validator_manager_(validator_manager) { + } + + private: + adnl::AdnlNodeIdShort local_id_; + std::vector nodes_; + FileHash zero_state_file_hash_; + FullNodeConfig config_; + bool enable_compression_; + + td::actor::ActorId keyring_; + td::actor::ActorId adnl_; + td::actor::ActorId rldp_; + td::actor::ActorId rldp2_; + td::actor::ActorId overlays_; + td::actor::ActorId validator_manager_; + + bool inited_ = false; + overlay::OverlayIdFull overlay_id_full_; + overlay::OverlayIdShort overlay_id_; + + void try_init(); + void init(); +}; + +class FullNodePrivateExtMsgOverlay : public td::actor::Actor { + public: + void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query); + template + void process_broadcast(PublicKeyHash, T &) { + VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast"; + } + void receive_broadcast(PublicKeyHash src, td::BufferSlice query); + + void send_external_message(td::BufferSlice data); + + void set_config(FullNodeConfig config) { + config_ = std::move(config); + } + + void start_up() override; + void tear_down() override; + + FullNodePrivateExtMsgOverlay(adnl::AdnlNodeIdShort local_id, std::vector nodes, + std::map senders, std::string name, + FileHash zero_state_file_hash, FullNodeConfig config, + td::actor::ActorId keyring, td::actor::ActorId adnl, + td::actor::ActorId rldp, td::actor::ActorId rldp2, + td::actor::ActorId overlays, + td::actor::ActorId validator_manager) + : local_id_(local_id) + , nodes_(std::move(nodes)) + , senders_(std::move(senders)) + , name_(std::move(name)) + , zero_state_file_hash_(zero_state_file_hash) + , config_(config) , keyring_(keyring) , adnl_(adnl) , rldp_(rldp) @@ -64,6 +130,8 @@ class FullNodePrivateOverlay : public td::actor::Actor { private: adnl::AdnlNodeIdShort local_id_; std::vector nodes_; + std::map senders_; + std::string name_; FileHash zero_state_file_hash_; FullNodeConfig config_; @@ -82,8 +150,4 @@ class FullNodePrivateOverlay : public td::actor::Actor { void init(); }; -} // namespace fullnode - -} // namespace validator - -} // namespace ton +} // namespace ton::validator::fullnode diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 9f798fa8..433d9469 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -118,7 +118,7 @@ void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broad promise.set_error(td::Status::Error("rebroadcasting external messages is disabled")); promise = [manager = validator_manager_, message = q->message_->data_.clone()](td::Result R) mutable { if (R.is_ok()) { - td::actor::send_closure(manager, &ValidatorManagerInterface::new_external_message, std::move(message)); + td::actor::send_closure(manager, &ValidatorManagerInterface::new_external_message, std::move(message), 0); } }; } @@ -636,13 +636,14 @@ void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ih void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query) { td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message, - std::move(query.message_->data_)); + std::move(query.message_->data_), 0); } void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) { - td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, - create_block_id(query.block_->block_), query.block_->cc_seqno_, - std::move(query.block_->data_)); + BlockIdExt block_id = create_block_id(query.block_->block_); + VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast from " << src << ": " << block_id.to_str(); + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, block_id, + query.block_->cc_seqno_, std::move(query.block_->data_)); } void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) { @@ -659,6 +660,7 @@ void FullNodeShardImpl::process_block_broadcast(PublicKeyHash src, ton_api::tonN LOG(DEBUG) << "dropped broadcast: " << B.move_as_error(); return; } + VLOG(FULL_NODE_DEBUG) << "Received block broadcast from " << src << ": " << B.ok().block_id.to_str(); auto P = td::PromiseCreator::lambda([](td::Result R) { if (R.is_error()) { if (R.error().code() == ErrorCode::notready) { @@ -734,6 +736,7 @@ void FullNodeShardImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno UNREACHABLE(); return; } + VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast: " << block_id.to_str(); auto B = create_serialize_tl_object( create_tl_object(create_tl_block_id(block_id), cc_seqno, std::move(data))); if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) { @@ -750,6 +753,7 @@ void FullNodeShardImpl::send_broadcast(BlockBroadcast broadcast) { UNREACHABLE(); return; } + VLOG(FULL_NODE_DEBUG) << "Sending block broadcast in private overlay: " << broadcast.block_id.to_str(); auto B = serialize_block_broadcast(broadcast, false); // compression_enabled = false if (B.is_error()) { VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error(); diff --git a/validator/full-node.cpp b/validator/full-node.cpp index 5a822b26..f9e0e5cb 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -35,6 +35,10 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise pr } local_keys_.insert(key); + create_private_block_overlay(key); + for (auto &p : private_ext_msg_overlays_) { + update_ext_msg_overlay(p.first, p.second); + } if (!sign_cert_by_.is_zero()) { promise.set_value(td::Unit()); @@ -50,7 +54,6 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise pr for (auto &shard : shards_) { td::actor::send_closure(shard.second, &FullNodeShard::update_validators, all_validators_, sign_cert_by_); } - create_private_block_overlay(key); promise.set_value(td::Unit()); } @@ -60,6 +63,11 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise pr return; } local_keys_.erase(key); + private_block_overlays_.erase(key); + for (auto &p : private_ext_msg_overlays_) { + update_ext_msg_overlay(p.first, p.second); + } + if (sign_cert_by_ != key) { promise.set_value(td::Unit()); return; @@ -75,7 +83,6 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise pr for (auto &shard : shards_) { td::actor::send_closure(shard.second, &FullNodeShard::update_validators, all_validators_, sign_cert_by_); } - private_block_overlays_.erase(key); promise.set_value(td::Unit()); } @@ -111,6 +118,10 @@ void FullNodeImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise nodes, + std::map senders, std::string name, + td::Promise promise) { + if (nodes.empty()) { + promise.set_error(td::Status::Error("list of nodes is empty")); + return; + } + if (private_ext_msg_overlays_.count(name)) { + promise.set_error(td::Status::Error(PSTRING() << "duplicate overlay name \"" << name << "\"")); + return; + } + VLOG(FULL_NODE_WARNING) << "Adding private overlay for external messages \"" << name << "\", " << nodes.size() + << " nodes"; + auto &p = private_ext_msg_overlays_[name]; + p.nodes_ = nodes; + p.senders_ = senders; + update_ext_msg_overlay(name, p); + promise.set_result(td::Unit()); +} + +void FullNodeImpl::del_ext_msg_overlay(std::string name, td::Promise promise) { + auto it = private_ext_msg_overlays_.find(name); + if (it == private_ext_msg_overlays_.end()) { + promise.set_error(td::Status::Error(PSTRING() << "no such overlay \"" << name << "\"")); + return; + } + private_ext_msg_overlays_.erase(it); + promise.set_result(td::Unit()); } void FullNodeImpl::initial_read_complete(BlockHandle top_handle) { @@ -172,6 +221,14 @@ void FullNodeImpl::send_ext_message(AccountIdPrefixFull dst, td::BufferSlice dat VLOG(FULL_NODE_WARNING) << "dropping OUT ext message to unknown shard"; return; } + for (auto &private_overlay : private_ext_msg_overlays_) { + for (auto &actor : private_overlay.second.actors_) { + auto local_id = actor.first; + if (private_overlay.second.senders_.count(local_id)) { + td::actor::send_closure(actor.second, &FullNodePrivateExtMsgOverlay::send_external_message, data.clone()); + } + } + } td::actor::send_closure(shard, &FullNodeShard::send_external_message, std::move(data)); } @@ -182,8 +239,8 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s return; } if (!private_block_overlays_.empty()) { - td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateOverlay::send_shard_block_info, - block_id, cc_seqno, data.clone()); + td::actor::send_closure(private_block_overlays_.begin()->second, + &FullNodePrivateBlockOverlay::send_shard_block_info, block_id, cc_seqno, data.clone()); } td::actor::send_closure(shard, &FullNodeShard::send_shard_block_info, block_id, cc_seqno, std::move(data)); } @@ -195,7 +252,7 @@ void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) { return; } if (!private_block_overlays_.empty()) { - td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateOverlay::send_broadcast, + td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_broadcast, broadcast.clone()); } td::actor::send_closure(shard, &FullNodeShard::send_broadcast, std::move(broadcast)); @@ -292,50 +349,7 @@ td::actor::ActorId FullNodeImpl::get_shard(AccountIdPrefixFull ds return get_shard(shard_prefix(dst, 60)); } -void FullNodeImpl::got_key_block_proof(td::Ref proof) { - auto R = proof->get_key_block_config(); - R.ensure(); - auto config = R.move_as_ok(); - - PublicKeyHash l = PublicKeyHash::zero(); - std::vector keys; - std::map current_validators; - for (td::int32 i = -1; i <= 1; i++) { - auto r = config->get_total_validator_set(i < 0 ? i : 1 - i); - if (r.not_null()) { - auto vec = r->export_vector(); - for (auto &el : vec) { - auto key = ValidatorFullId{el.key}.compute_short_id(); - keys.push_back(key); - if (local_keys_.count(key)) { - l = key; - } - if (i == 1) { - current_validators[key] = adnl::AdnlNodeIdShort{el.addr.is_zero() ? key.bits256_value() : el.addr}; - } - } - } - } - - if (current_validators != current_validators_) { - current_validators_ = std::move(current_validators); - update_private_block_overlays(); - } - - if (keys == all_validators_) { - return; - } - - all_validators_ = keys; - sign_cert_by_ = l; - CHECK(all_validators_.size() > 0); - - for (auto &shard : shards_) { - td::actor::send_closure(shard.second, &FullNodeShard::update_validators, all_validators_, sign_cert_by_); - } -} - -void FullNodeImpl::got_zero_block_state(td::Ref state) { +void FullNodeImpl::got_key_block_state(td::Ref state) { auto m = td::Ref{std::move(state)}; PublicKeyHash l = PublicKeyHash::zero(); @@ -358,9 +372,11 @@ void FullNodeImpl::got_zero_block_state(td::Ref state) { } } + set_private_block_overlays_enable_compression(m->get_consensus_config().proto_version >= 3); + if (current_validators != current_validators_) { current_validators_ = std::move(current_validators); - update_private_block_overlays(); + update_private_overlays(); } if (keys == all_validators_) { @@ -377,28 +393,15 @@ void FullNodeImpl::got_zero_block_state(td::Ref state) { } void FullNodeImpl::new_key_block(BlockHandle handle) { - if (handle->id().seqno() == 0) { - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { - if (R.is_error()) { - VLOG(FULL_NODE_WARNING) << "failed to get zero state: " << R.move_as_error(); - } else { - td::actor::send_closure(SelfId, &FullNodeImpl::got_zero_block_state, R.move_as_ok()); - } - }); - td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_shard_state_from_db, handle, - std::move(P)); - } else { - CHECK(handle->is_key_block()); - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { - if (R.is_error()) { - VLOG(FULL_NODE_WARNING) << "failed to get key block proof: " << R.move_as_error(); - } else { - td::actor::send_closure(SelfId, &FullNodeImpl::got_key_block_proof, R.move_as_ok()); - } - }); - td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_block_proof_link_from_db, handle, - std::move(P)); - } + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { + if (R.is_error()) { + VLOG(FULL_NODE_WARNING) << "failed to get key block state: " << R.move_as_error(); + } else { + td::actor::send_closure(SelfId, &FullNodeImpl::got_key_block_state, R.move_as_ok()); + } + }); + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_shard_state_from_db, handle, + std::move(P)); } void FullNodeImpl::start_up() { @@ -484,7 +487,11 @@ void FullNodeImpl::start_up() { std::make_unique(actor_id(this)), std::move(P)); } -void FullNodeImpl::update_private_block_overlays() { +void FullNodeImpl::update_private_overlays() { + for (auto &p : private_ext_msg_overlays_) { + update_ext_msg_overlay(p.first, p.second); + } + private_block_overlays_.clear(); if (local_keys_.empty()) { return; @@ -494,6 +501,16 @@ void FullNodeImpl::update_private_block_overlays() { } } +void FullNodeImpl::set_private_block_overlays_enable_compression(bool value) { + if (private_block_overlays_enable_compression_ == value) { + return; + } + private_block_overlays_enable_compression_ = true; + for (auto &p : private_block_overlays_) { + td::actor::send_closure(p.second, &FullNodePrivateBlockOverlay::set_enable_compression, value); + } +} + void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) { CHECK(local_keys_.count(key)); if (current_validators_.count(key)) { @@ -501,9 +518,34 @@ void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) { for (const auto &p : current_validators_) { nodes.push_back(p.second); } - private_block_overlays_[key] = td::actor::create_actor( - "BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_, keyring_, - adnl_, rldp_, rldp2_, overlays_, validator_manager_); + private_block_overlays_[key] = td::actor::create_actor( + "BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_, + private_block_overlays_enable_compression_, keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_); + } +} + +void FullNodeImpl::update_ext_msg_overlay(const std::string &name, ExtMsgOverlayInfo &overlay) { + auto old_actors = std::move(overlay.actors_); + overlay.actors_.clear(); + auto try_local_id = [&](const adnl::AdnlNodeIdShort &local_id) { + if (std::find(overlay.nodes_.begin(), overlay.nodes_.end(), local_id) != overlay.nodes_.end()) { + auto it = old_actors.find(local_id); + if (it != old_actors.end()) { + overlay.actors_[local_id] = std::move(it->second); + old_actors.erase(it); + } else { + overlay.actors_[local_id] = td::actor::create_actor( + "ExtMsgPrivateOverlay", local_id, overlay.nodes_, overlay.senders_, name, zero_state_file_hash_, config_, + keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_); + } + } + }; + try_local_id(adnl_id_); + for (const PublicKeyHash &local_key: local_keys_) { + auto it = current_validators_.find(local_key); + if (it != current_validators_.end()) { + try_local_id(it->second); + } } } diff --git a/validator/full-node.h b/validator/full-node.h index 15d54b55..82e0dd5c 100644 --- a/validator/full-node.h +++ b/validator/full-node.h @@ -74,6 +74,11 @@ class FullNode : public td::actor::Actor { virtual void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise promise) = 0; virtual void set_config(FullNodeConfig config) = 0; + virtual void add_ext_msg_overlay(std::vector nodes, + std::map senders, std::string name, + td::Promise promise) = 0; + virtual void del_ext_msg_overlay(std::string name, td::Promise promise) = 0; + static constexpr td::uint32 max_block_size() { return 4 << 20; } diff --git a/validator/full-node.hpp b/validator/full-node.hpp index 838700b5..f60bd902 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -53,6 +53,10 @@ class FullNodeImpl : public FullNode { void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise promise) override; void set_config(FullNodeConfig config) override; + void add_ext_msg_overlay(std::vector nodes, std::map senders, + std::string name, td::Promise promise) override; + void del_ext_msg_overlay(std::string name, td::Promise promise) override; + void add_shard(ShardIdFull shard); void del_shard(ShardIdFull shard); @@ -76,8 +80,7 @@ class FullNodeImpl : public FullNode { void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout, td::Promise promise); - void got_key_block_proof(td::Ref proof); - void got_zero_block_state(td::Ref state); + void got_key_block_state(td::Ref state); void new_key_block(BlockHandle handle); void start_up() override; @@ -117,10 +120,21 @@ class FullNodeImpl : public FullNode { std::set local_keys_; FullNodeConfig config_; - std::map> private_block_overlays_; + std::map> private_block_overlays_; + bool private_block_overlays_enable_compression_ = false; - void update_private_block_overlays(); + struct ExtMsgOverlayInfo { + std::vector nodes_; + std::map senders_; + std::map> + actors_; // our local id -> actor + }; + std::map private_ext_msg_overlays_; + + void update_private_overlays(); + void set_private_block_overlays_enable_compression(bool value); void create_private_block_overlay(PublicKeyHash key); + void update_ext_msg_overlay(const std::string& name, ExtMsgOverlayInfo& overlay); }; } // namespace fullnode diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index e059ee68..055c1aed 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -183,7 +183,12 @@ class Collator final : public td::actor::Actor { block::ValueFlow value_flow_{block::ValueFlow::SetZero()}; std::unique_ptr fees_import_dict_; std::map ext_msg_map; - std::vector, ExtMessage::Hash>> ext_msg_list_; + struct ExtMsg { + Ref cell; + ExtMessage::Hash hash; + int priority; + }; + std::vector ext_msg_list_; std::priority_queue, std::greater> new_msgs; std::pair last_proc_int_msg_, first_unproc_int_msg_; std::unique_ptr in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_; @@ -268,8 +273,9 @@ class Collator final : public td::actor::Actor { bool is_our_address(Ref addr_ref) const; bool is_our_address(ton::AccountIdPrefixFull addr_prefix) const; bool is_our_address(const ton::StdSmcAddress& addr) const; - void after_get_external_messages(td::Result>> res); - td::Result register_external_message_cell(Ref ext_msg, const ExtMessage::Hash& ext_hash); + void after_get_external_messages(td::Result, int>>> res); + td::Result register_external_message_cell(Ref ext_msg, const ExtMessage::Hash& ext_hash, + int priority); // td::Result register_external_message(td::Slice ext_msg_boc); void register_new_msg(block::NewOutMsg msg); void register_new_msgs(block::transaction::Transaction& trans); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 073da237..fd4ddd34 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -49,6 +49,7 @@ static const td::uint32 FORCE_SPLIT_QUEUE_SIZE = 4096; static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000; static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047; static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000; +static const int HIGH_PRIORITY_EXTERNAL = 10; // don't skip high priority externals when queue is big #define DBG(__n) dbg(__n)&& #define DSTART int __dcnt = 0; @@ -256,11 +257,10 @@ void Collator::start_up() { LOG(DEBUG) << "sending get_external_messages() query to Manager"; ++pending; td::actor::send_closure_later(manager, &ValidatorManager::get_external_messages, shard_, - [self = get_self()](td::Result>> res) -> void { - LOG(DEBUG) << "got answer to get_external_messages() query"; - td::actor::send_closure_later( - std::move(self), &Collator::after_get_external_messages, std::move(res)); - }); + [self = get_self()](td::Result, int>>> res) -> void { + LOG(DEBUG) << "got answer to get_external_messages() query"; + td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res)); + }); } if (is_masterchain() && !is_hardfork_) { // 5. load shard block info messages @@ -3413,12 +3413,15 @@ bool Collator::process_inbound_external_messages() { return true; } if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE) { - LOG(INFO) << "skipping processing of inbound external messages because out_msg_queue is too big (" + LOG(INFO) << "skipping processing of inbound external messages (except for high-priority) because out_msg_queue is " + "too big (" << out_msg_queue_size_ << " > " << SKIP_EXTERNALS_QUEUE_SIZE << ")"; - return true; } bool full = !block_limit_status_->fits(block::ParamLimits::cl_soft); - for (auto& ext_msg_pair : ext_msg_list_) { + for (auto& ext_msg_struct : ext_msg_list_) { + if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE && ext_msg_struct.priority < HIGH_PRIORITY_EXTERNAL) { + continue; + } if (full) { LOG(INFO) << "BLOCK FULL, stop processing external messages"; break; @@ -3427,15 +3430,15 @@ bool Collator::process_inbound_external_messages() { LOG(WARNING) << "medium timeout reached, stop processing inbound external messages"; break; } - auto ext_msg = ext_msg_pair.first; + auto ext_msg = ext_msg_struct.cell; ton::Bits256 hash{ext_msg->get_hash().bits()}; int r = process_external_message(std::move(ext_msg)); if (r < 0) { - bad_ext_msgs_.emplace_back(ext_msg_pair.second); + bad_ext_msgs_.emplace_back(ext_msg_struct.hash); return false; } if (!r) { - delay_ext_msgs_.emplace_back(ext_msg_pair.second); + delay_ext_msgs_.emplace_back(ext_msg_struct.hash); } if (r > 0) { full = !block_limit_status_->fits(block::ParamLimits::cl_soft); @@ -5062,7 +5065,8 @@ void Collator::return_block_candidate(td::Result saved) { * - If the external message has been previuosly registered and accepted, returns false. * - Otherwise returns true. */ -td::Result Collator::register_external_message_cell(Ref ext_msg, const ExtMessage::Hash& ext_hash) { +td::Result Collator::register_external_message_cell(Ref ext_msg, const ExtMessage::Hash& ext_hash, + int priority) { if (ext_msg->get_level() != 0) { return td::Status::Error("external message must have zero level"); } @@ -5106,7 +5110,7 @@ td::Result Collator::register_external_message_cell(Ref ext_msg, block::gen::t_Message_Any.print_ref(std::cerr, ext_msg); } ext_msg_map.emplace(hash, 1); - ext_msg_list_.emplace_back(std::move(ext_msg), ext_hash); + ext_msg_list_.push_back({std::move(ext_msg), ext_hash, priority}); return true; } @@ -5115,18 +5119,21 @@ td::Result Collator::register_external_message_cell(Ref ext_msg, * * @param res The result of the external message retrieval operation. */ -void Collator::after_get_external_messages(td::Result>> res) { +void Collator::after_get_external_messages(td::Result, int>>> res) { + // res: pair {ext msg, priority} --pending; if (res.is_error()) { fatal_error(res.move_as_error()); return; } auto vect = res.move_as_ok(); - for (auto&& ext_msg : vect) { + for (auto& p : vect) { + auto& ext_msg = p.first; + int priority = p.second; Ref ext_msg_cell = ext_msg->root_cell(); bool err = ext_msg_cell.is_null(); if (!err) { - auto reg_res = register_external_message_cell(std::move(ext_msg_cell), ext_msg->hash()); + auto reg_res = register_external_message_cell(std::move(ext_msg_cell), ext_msg->hash(), priority); if (reg_res.is_error() || !reg_res.move_as_ok()) { err = true; } diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index a3e83a9d..41412fb1 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -104,7 +104,8 @@ class ValidatorManager : public ValidatorManagerInterface { td::Promise> promise) = 0; virtual void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) = 0; - virtual void get_external_messages(ShardIdFull shard, td::Promise>> promise) = 0; + virtual void get_external_messages(ShardIdFull shard, + td::Promise, int>>> promise) = 0; virtual void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) = 0; virtual void get_shard_blocks(BlockIdExt masterchain_block_id, td::Promise>> promise) = 0; diff --git a/validator/manager-disk.cpp b/validator/manager-disk.cpp index 3717d7b0..42e44081 100644 --- a/validator/manager-disk.cpp +++ b/validator/manager-disk.cpp @@ -259,7 +259,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro td::actor::send_closure(db_, &Db::get_key_block_proof, block_id, std::move(P)); } -void ValidatorManagerImpl::new_external_message(td::BufferSlice data) { +void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) { if (last_masterchain_state_.is_null()) { return; } @@ -507,9 +507,13 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t get_block_handle(block_id, true, std::move(P)); } -void ValidatorManagerImpl::get_external_messages(ShardIdFull shard, - td::Promise>> promise) { - promise.set_result(ext_messages_); +void ValidatorManagerImpl::get_external_messages( + ShardIdFull shard, td::Promise, int>>> promise) { + std::vector, int>> res; + for (const auto& x : ext_messages_) { + res.emplace_back(x, 0); + } + promise.set_result(std::move(res)); } void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise>> promise) { diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index 05816c71..d5a6e909 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -124,7 +124,7 @@ class ValidatorManagerImpl : public ValidatorManager { void get_key_block_proof_link(BlockIdExt block_id, td::Promise promise) override; //void get_block_description(BlockIdExt block_id, td::Promise promise) override; - void new_external_message(td::BufferSlice data) override; + void new_external_message(td::BufferSlice data, int priority) override; void check_external_message(td::BufferSlice data, td::Promise> promise) override { UNREACHABLE(); } @@ -188,7 +188,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; - void get_external_messages(ShardIdFull shard, td::Promise>> promise) override; + void get_external_messages(ShardIdFull shard, + td::Promise, int>>> promise) override; void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) override; void get_shard_blocks(BlockIdExt masterchain_block_id, td::Promise>> promise) override; @@ -244,7 +245,7 @@ class ValidatorManagerImpl : public ValidatorManager { UNREACHABLE(); } void send_external_message(td::Ref message) override { - new_external_message(message->serialize()); + new_external_message(message->serialize(), 0); } void send_ihr_message(td::Ref message) override { new_ihr_message(message->serialize()); diff --git a/validator/manager-hardfork.cpp b/validator/manager-hardfork.cpp index e290f635..49d27085 100644 --- a/validator/manager-hardfork.cpp +++ b/validator/manager-hardfork.cpp @@ -150,7 +150,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro td::actor::send_closure(db_, &Db::get_key_block_proof_link, block_id, std::move(P)); } -void ValidatorManagerImpl::new_external_message(td::BufferSlice data) { +void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) { auto R = create_ext_message(std::move(data), block::SizeLimitsConfig::ExtMsgLimits()); if (R.is_ok()) { ext_messages_.emplace_back(R.move_as_ok()); @@ -357,9 +357,13 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t get_block_handle(block_id, true, std::move(P)); } -void ValidatorManagerImpl::get_external_messages(ShardIdFull shard, - td::Promise>> promise) { - promise.set_result(ext_messages_); +void ValidatorManagerImpl::get_external_messages( + ShardIdFull shard, td::Promise, int>>> promise) { + std::vector, int>> res; + for (const auto &x : ext_messages_) { + res.emplace_back(x, 0); + } + promise.set_result(std::move(res)); } void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise>> promise) { diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 77e604ae..7937729c 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -144,7 +144,7 @@ class ValidatorManagerImpl : public ValidatorManager { void get_key_block_proof(BlockIdExt block_id, td::Promise promise) override; void get_key_block_proof_link(BlockIdExt block_id, td::Promise promise) override; - void new_external_message(td::BufferSlice data) override; + void new_external_message(td::BufferSlice data, int priority) override; void check_external_message(td::BufferSlice data, td::Promise> promise) override { UNREACHABLE(); } @@ -228,7 +228,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; - void get_external_messages(ShardIdFull shard, td::Promise>> promise) override; + void get_external_messages(ShardIdFull shard, + td::Promise, int>>> promise) override; void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) override; void get_shard_blocks(BlockIdExt masterchain_block_id, td::Promise>> promise) override; @@ -308,7 +309,7 @@ class ValidatorManagerImpl : public ValidatorManager { UNREACHABLE(); } void send_external_message(td::Ref message) override { - new_external_message(message->serialize()); + new_external_message(message->serialize(), 0); } void send_ihr_message(td::Ref message) override { new_ihr_message(message->serialize()); diff --git a/validator/manager.cpp b/validator/manager.cpp index c24a98d6..e186902a 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -368,7 +368,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro td::actor::send_closure(db_, &Db::get_key_block_proof, block_id, std::move(P)); } -void ValidatorManagerImpl::new_external_message(td::BufferSlice data) { +void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) { if (!is_validator()) { return; } @@ -376,7 +376,7 @@ void ValidatorManagerImpl::new_external_message(td::BufferSlice data) { VLOG(VALIDATOR_NOTICE) << "dropping ext message: validator is not ready"; return; } - if (ext_messages_.size() > max_mempool_num()) { + if (ext_msgs_[priority].ext_messages_.size() > (size_t)max_mempool_num()) { return; } auto R = create_ext_message(std::move(data), last_masterchain_state_->get_ext_msg_limits()); @@ -384,21 +384,30 @@ void ValidatorManagerImpl::new_external_message(td::BufferSlice data) { VLOG(VALIDATOR_NOTICE) << "dropping bad ext message: " << R.move_as_error(); return; } - add_external_message(R.move_as_ok()); + add_external_message(R.move_as_ok(), priority); } -void ValidatorManagerImpl::add_external_message(td::Ref msg) { +void ValidatorManagerImpl::add_external_message(td::Ref msg, int priority) { + auto &msgs = ext_msgs_[priority]; auto message = std::make_unique>(msg); auto id = message->ext_id(); auto address = message->address(); unsigned long per_address_limit = 256; - if(ext_addr_messages_.count(address) < per_address_limit) { - if (ext_messages_hashes_.count(id.hash) == 0) { - ext_messages_.emplace(id, std::move(message)); - ext_messages_hashes_.emplace(id.hash, id); - ext_addr_messages_[address].emplace(id.hash, id); - } + auto it = msgs.ext_addr_messages_.find(address); + if (it != msgs.ext_addr_messages_.end() && it->second.size() >= per_address_limit) { + return; } + auto it2 = ext_messages_hashes_.find(id.hash); + if (it2 != ext_messages_hashes_.end()) { + int old_priority = it2->second.first; + if (old_priority >= priority) { + return; + } + ext_msgs_[old_priority].erase(id); + } + msgs.ext_messages_.emplace(id, std::move(message)); + msgs.ext_addr_messages_[address].emplace(id.hash, id); + ext_messages_hashes_[id.hash] = {priority, id}; } void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise> promise) { ++ls_stats_check_ext_messages_; @@ -783,34 +792,44 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t get_block_handle(block_id, true, std::move(P)); } -void ValidatorManagerImpl::get_external_messages(ShardIdFull shard, - td::Promise>> promise) { +void ValidatorManagerImpl::get_external_messages( + ShardIdFull shard, td::Promise, int>>> promise) { td::Timer t; size_t processed = 0, deleted = 0; - std::vector> res; + std::vector, int>> res; MessageId left{AccountIdPrefixFull{shard.workchain, shard.shard & (shard.shard - 1)}, Bits256::zero()}; - auto it = ext_messages_.lower_bound(left); - while (it != ext_messages_.end()) { - auto s = it->first; - if (!shard_contains(shard, s.dst)) { - break; + size_t total_msgs = 0; + td::Random::Fast rnd; + for (auto iter = ext_msgs_.rbegin(); iter != ext_msgs_.rend(); ++iter) { + std::vector, int>> cur_res; + int priority = iter->first; + auto &msgs = iter->second; + auto it = msgs.ext_messages_.lower_bound(left); + while (it != msgs.ext_messages_.end()) { + auto s = it->first; + if (!shard_contains(shard, s.dst)) { + break; + } + ++processed; + if (it->second->expired()) { + msgs.ext_addr_messages_[it->second->address()].erase(it->first.hash); + ext_messages_hashes_.erase(it->first.hash); + it = msgs.ext_messages_.erase(it); + ++deleted; + continue; + } + if (it->second->is_active()) { + cur_res.emplace_back(it->second->message(), priority); + } + it++; } - ++processed; - if (it->second->expired()) { - ext_addr_messages_[it->second->address()].erase(it->first.hash); - ext_messages_hashes_.erase(it->first.hash); - it = ext_messages_.erase(it); - ++deleted; - continue; - } - if (it->second->is_active()) { - res.push_back(it->second->message()); - } - it++; + td::random_shuffle(td::as_mutable_span(cur_res), rnd); + res.insert(res.end(), cur_res.begin(), cur_res.end()); + total_msgs += msgs.ext_messages_.size(); } LOG(WARNING) << "get_external_messages to shard " << shard.to_str() << " : time=" << t.elapsed() << " result_size=" << res.size() << " processed=" << processed << " expired=" << deleted - << " total_size=" << ext_messages_.size(); + << " total_size=" << total_msgs; promise.set_value(std::move(res)); } @@ -850,8 +869,9 @@ void ValidatorManagerImpl::complete_external_messages(std::vectorsecond]->address()].erase(it->first); - CHECK(ext_messages_.erase(it->second)); + int priority = it->second.first; + auto msg_id = it->second.second; + ext_msgs_[priority].erase(msg_id); ext_messages_hashes_.erase(it); } } @@ -859,12 +879,14 @@ void ValidatorManagerImpl::complete_external_messages(std::vectorsecond); - if ((ext_messages_.size() < soft_mempool_limit) && it2->second->can_postpone()) { + int priority = it->second.first; + auto msg_id = it->second.second; + auto &msgs = ext_msgs_[priority]; + auto it2 = msgs.ext_messages_.find(msg_id); + if ((msgs.ext_messages_.size() < soft_mempool_limit) && it2->second->can_postpone()) { it2->second->postpone(); } else { - ext_addr_messages_[it2->second->address()].erase(it2->first.hash); - ext_messages_.erase(it2); + msgs.erase(msg_id); ext_messages_hashes_.erase(it); } } @@ -1460,7 +1482,7 @@ void ValidatorManagerImpl::send_get_next_key_blocks_request(BlockIdExt block_id, void ValidatorManagerImpl::send_external_message(td::Ref message) { callback_->send_ext_message(message->shard(), message->serialize()); - add_external_message(std::move(message)); + add_external_message(std::move(message), 0); } void ValidatorManagerImpl::send_ihr_message(td::Ref message) { @@ -2105,6 +2127,9 @@ td::actor::ActorOwn ValidatorManagerImpl::create_validator_group if (check_gc_list_.count(session_id) == 1) { return td::actor::ActorOwn{}; } else { + // Call get_external_messages to cleanup mempool for the shard + get_external_messages(shard, [](td::Result, int>>>) {}); + auto validator_id = get_validator(shard, validator_set); CHECK(!validator_id.is_zero()); auto G = td::actor::create_actor( diff --git a/validator/manager.hpp b/validator/manager.hpp index fd599563..7e5930d3 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -220,9 +220,19 @@ class ValidatorManagerImpl : public ValidatorManager { }; // DATA FOR COLLATOR std::map> shard_blocks_; - std::map, std::unique_ptr>> ext_messages_; - std::map, std::map>> ext_addr_messages_; - std::map> ext_messages_hashes_; + struct ExtMessages { + std::map, std::unique_ptr>> ext_messages_; + std::map, std::map>> + ext_addr_messages_; + void erase(const MessageId& id) { + auto it = ext_messages_.find(id); + CHECK(it != ext_messages_.end()); + ext_addr_messages_[it->second->address()].erase(id.hash); + ext_messages_.erase(it); + } + }; + std::map ext_msgs_; // priority -> messages + std::map>> ext_messages_hashes_; // hash -> priority // IHR ? std::map, std::unique_ptr>> ihr_messages_; std::map> ihr_messages_hashes_; @@ -349,8 +359,8 @@ class ValidatorManagerImpl : public ValidatorManager { void get_key_block_proof_link(BlockIdExt block_id, td::Promise promise) override; //void get_block_description(BlockIdExt block_id, td::Promise promise) override; - void new_external_message(td::BufferSlice data) override; - void add_external_message(td::Ref message); + void new_external_message(td::BufferSlice data, int priority) override; + void add_external_message(td::Ref message, int priority); void check_external_message(td::BufferSlice data, td::Promise> promise) override; void new_ihr_message(td::BufferSlice data) override; @@ -410,7 +420,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; - void get_external_messages(ShardIdFull shard, td::Promise>> promise) override; + void get_external_messages(ShardIdFull shard, + td::Promise, int>>> promise) override; void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) override; void get_shard_blocks(BlockIdExt masterchain_block_id, td::Promise>> promise) override; diff --git a/validator/validator.h b/validator/validator.h index 9ad20b4b..9ede0711 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -198,7 +198,7 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void get_next_block(BlockIdExt block_id, td::Promise promise) = 0; virtual void write_handle(BlockHandle handle, td::Promise promise) = 0; - virtual void new_external_message(td::BufferSlice data) = 0; + virtual void new_external_message(td::BufferSlice data, int priority) = 0; virtual void check_external_message(td::BufferSlice data, td::Promise> promise) = 0; virtual void new_ihr_message(td::BufferSlice data) = 0; virtual void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0;