diff --git a/catchain/catchain-receiver.cpp b/catchain/catchain-receiver.cpp index c8206de9..82779e3b 100644 --- a/catchain/catchain-receiver.cpp +++ b/catchain/catchain-receiver.cpp @@ -520,7 +520,8 @@ void CatChainReceiverImpl::start_up() { } td::actor::send_closure(overlay_manager_, &overlay::Overlays::create_private_overlay, get_source(local_idx_)->get_adnl_id(), overlay_full_id_.clone(), std::move(ids), - make_callback(), overlay::OverlayPrivacyRules{0, 0, std::move(root_keys)}); + make_callback(), overlay::OverlayPrivacyRules{0, 0, std::move(root_keys)}, + R"({ "type": "catchain" })"); CHECK(root_block_); diff --git a/create-hardfork/create-hardfork.cpp b/create-hardfork/create-hardfork.cpp index 66b9a854..96ebcfc4 100644 --- a/create-hardfork/create-hardfork.cpp +++ b/create-hardfork/create-hardfork.cpp @@ -213,7 +213,7 @@ class HardforkCreator : public td::actor::Actor { ton::validator::ValidatorManagerHardforkFactory::create(opts, shard_, shard_top_block_id_, db_root_); for (auto &msg : ext_msgs_) { td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_external_message, - std::move(msg)); + std::move(msg), 0); } for (auto &topmsg : top_shard_descrs_) { td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_shard_block, ton::BlockIdExt{}, diff --git a/overlay/overlay-manager.cpp b/overlay/overlay-manager.cpp index 0591e732..e1055761 100644 --- a/overlay/overlay-manager.cpp +++ b/overlay/overlay-manager.cpp @@ -107,11 +107,12 @@ void OverlayManager::create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, Ov void OverlayManager::create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, - std::unique_ptr callback, OverlayPrivacyRules rules) { + std::unique_ptr callback, OverlayPrivacyRules rules, + std::string scope) { auto id = overlay_id.compute_short_id(); register_overlay(local_id, id, Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id), - std::move(nodes), std::move(callback), std::move(rules))); + std::move(nodes), std::move(callback), std::move(rules), std::move(scope))); } void OverlayManager::receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data) { diff --git a/overlay/overlay-manager.h b/overlay/overlay-manager.h index 659069bd..2e25c944 100644 --- a/overlay/overlay-manager.h +++ b/overlay/overlay-manager.h @@ -57,7 +57,7 @@ class OverlayManager : public Overlays { OverlayOptions opts) override; void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, std::unique_ptr callback, - OverlayPrivacyRules rules) override; + OverlayPrivacyRules rules, std::string scope) override; void delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id) override; void send_query(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id, std::string name, td::Promise promise, td::Timestamp timeout, td::BufferSlice query) override { diff --git a/overlay/overlay.cpp b/overlay/overlay.cpp index efa19d9b..5b391441 100644 --- a/overlay/overlay.cpp +++ b/overlay/overlay.cpp @@ -50,10 +50,11 @@ td::actor::ActorOwn Overlay::create(td::actor::ActorId manager, td::actor::ActorId dht_node, adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, - std::unique_ptr callback, OverlayPrivacyRules rules) { - auto R = - td::actor::create_actor("overlay", keyring, adnl, manager, dht_node, local_id, std::move(overlay_id), - false, std::move(nodes), std::move(callback), std::move(rules)); + std::unique_ptr callback, OverlayPrivacyRules rules, + std::string scope) { + auto R = td::actor::create_actor("overlay", keyring, adnl, manager, dht_node, local_id, + std::move(overlay_id), false, std::move(nodes), std::move(callback), + std::move(rules), std::move(scope)); return td::actor::ActorOwn(std::move(R)); } diff --git a/overlay/overlay.h b/overlay/overlay.h index b21de362..a6d5cd6b 100644 --- a/overlay/overlay.h +++ b/overlay/overlay.h @@ -48,7 +48,8 @@ class Overlay : public td::actor::Actor { td::actor::ActorId manager, td::actor::ActorId dht_node, adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, - std::unique_ptr callback, OverlayPrivacyRules rules); + std::unique_ptr callback, OverlayPrivacyRules rules, + std::string scope); virtual void update_dht_node(td::actor::ActorId dht) = 0; diff --git a/overlay/overlays.h b/overlay/overlays.h index 62daa0d9..a7d76fe0 100644 --- a/overlay/overlays.h +++ b/overlay/overlays.h @@ -212,7 +212,7 @@ class Overlays : public td::actor::Actor { td::string scope, OverlayOptions opts) = 0; virtual void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id, std::vector nodes, std::unique_ptr callback, - OverlayPrivacyRules rules) = 0; + OverlayPrivacyRules rules, std::string scope) = 0; virtual void delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id) = 0; virtual void send_query(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id, diff --git a/test/test-ton-collator.cpp b/test/test-ton-collator.cpp index 3e5991ca..c91bf493 100644 --- a/test/test-ton-collator.cpp +++ b/test/test-ton-collator.cpp @@ -300,7 +300,7 @@ class TestNode : public td::actor::Actor { shard_top_block_id_, db_root_); for (auto &msg : ext_msgs_) { td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_external_message, - std::move(msg)); + std::move(msg), 0); } for (auto &topmsg : top_shard_descrs_) { td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_shard_block, ton::BlockIdExt{}, diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index cafc0926..a3f6d6e2 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -401,6 +401,7 @@ tonNode.newShardBlockBroadcast block:tonNode.newShardBlock = tonNode.Broadcast; tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId; tonNode.privateBlockOverlayId zero_state_file_hash:int256 nodes:(vector int256) = tonNode.PrivateBlockOverlayId; +tonNode.privateExtMsgsOverlayId zero_state_file_hash:int256 name:string nodes:(vector int256) = tonNode.PrivateExtMsgsOverlayId; tonNode.privateBlockOverlayIdV2 zero_state_file_hash:int256 workchain:int shard:long nodes:(vector int256) senders:(vector int256) = tonNode.PrivateBlockOverlayIdV2; tonNode.keyBlocks blocks:(vector tonNode.blockIdExt) incomplete:Bool error:Bool = tonNode.KeyBlocks; @@ -614,6 +615,10 @@ engine.validator.config out_port:int addrs:(vector engine.Addr) adnl:(vector eng shards_to_monitor:(vector tonNode.shardId) gc:engine.gc = engine.validator.Config; +engine.validator.privateExtMsgOverlayNode adnl_id:int256 sender:Bool sender_priority:int = engine.validator.PrivateExtMsgOverlayNode; +engine.validator.privateExtMsgOverlay name:string nodes:(vector engine.validator.privateExtMsgOverlayNode) = engine.validator.PrivateExtMsgOverlay; +engine.validator.privateExtMsgOverlaysConfig overlays:(vector engine.validator.privateExtMsgOverlay) = engine.validator.PrivateExtMsgOverlaysConfig; + ---functions--- ---types--- @@ -727,6 +732,10 @@ engine.validator.getPerfTimerStats name:string = engine.validator.PerfTimerStats engine.validator.getShardOutQueueSize flags:# block_id:tonNode.blockId dest_wc:flags.0?int dest_shard:flags.0?long = engine.validator.ShardOutQueueSize; engine.validator.setExtMessagesBroadcastDisabled disabled:Bool = engine.validator.Success; +engine.validator.addPrivateExtMsgOverlay overlay:engine.validator.privateExtMsgOverlay = engine.validator.Success; +engine.validator.delPrivateExtMsgOverlay name:string = engine.validator.Success; +engine.validator.showPrivateExtMsgOverlays = engine.validator.PrivateExtMsgOverlaysConfig; + engine.validator.getValidatorSessionsInfo = engine.validator.ValidatorSessionsInfo; engine.validator.addCollator adnl_id:int256 shard:tonNode.shardId = engine.validator.Success; diff --git a/tl/generate/scheme/ton_api.tlo b/tl/generate/scheme/ton_api.tlo index b3c6bf82..9582e87d 100644 Binary files a/tl/generate/scheme/ton_api.tlo and b/tl/generate/scheme/ton_api.tlo differ diff --git a/validator-engine-console/validator-engine-console-query.cpp b/validator-engine-console/validator-engine-console-query.cpp index 98a4d324..aa4f23d8 100644 --- a/validator-engine-console/validator-engine-console-query.cpp +++ b/validator-engine-console/validator-engine-console-query.cpp @@ -33,6 +33,7 @@ #include "td/utils/filesystem.h" #include "overlay/overlays.h" #include "ton/ton-tl.hpp" +#include "td/utils/JsonBuilder.h" #include "auto/tl/ton_api_json.h" #include "tl/tl_json.h" @@ -1120,6 +1121,76 @@ td::Status SetExtMessagesBroadcastDisabledQuery::receive(td::BufferSlice data) { return td::Status::OK(); } +td::Status AddPrivateExtMsgOverlayQuery::run() { + TRY_RESULT_ASSIGN(file_name_, tokenizer_.get_token()); + TRY_STATUS(tokenizer_.check_endl()); + return td::Status::OK(); +} + +td::Status AddPrivateExtMsgOverlayQuery::send() { + TRY_RESULT(data, td::read_file(file_name_)); + TRY_RESULT(json, td::json_decode(data.as_slice())); + auto overlay = ton::create_tl_object(); + TRY_STATUS(ton::ton_api::from_json(*overlay, json.get_object())); + auto b = ton::create_serialize_tl_object(std::move(overlay)); + td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); + return td::Status::OK(); +} + +td::Status AddPrivateExtMsgOverlayQuery::receive(td::BufferSlice data) { + TRY_RESULT_PREFIX(f, ton::fetch_tl_object(data.as_slice(), true), + "received incorrect answer: "); + td::TerminalIO::out() << "success\n"; + return td::Status::OK(); +} + +td::Status DelPrivateExtMsgOverlayQuery::run() { + TRY_RESULT_ASSIGN(name_, tokenizer_.get_token()); + TRY_STATUS(tokenizer_.check_endl()); + return td::Status::OK(); +} + +td::Status DelPrivateExtMsgOverlayQuery::send() { + auto b = ton::create_serialize_tl_object(name_); + td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); + return td::Status::OK(); +} + +td::Status DelPrivateExtMsgOverlayQuery::receive(td::BufferSlice data) { + TRY_RESULT_PREFIX(f, ton::fetch_tl_object(data.as_slice(), true), + "received incorrect answer: "); + td::TerminalIO::out() << "success\n"; + return td::Status::OK(); +} + +td::Status ShowPrivateExtMsgOverlaysQuery::run() { + TRY_STATUS(tokenizer_.check_endl()); + return td::Status::OK(); +} + +td::Status ShowPrivateExtMsgOverlaysQuery::send() { + auto b = ton::create_serialize_tl_object(); + td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); + return td::Status::OK(); +} + +td::Status ShowPrivateExtMsgOverlaysQuery::receive(td::BufferSlice data) { + TRY_RESULT_PREFIX( + f, ton::fetch_tl_object(data.as_slice(), true), + "received incorrect answer: "); + td::TerminalIO::out() << f->overlays_.size() << " private overlays:\n\n"; + for (const auto &overlay : f->overlays_) { + td::TerminalIO::out() << "Overlay \"" << overlay->name_ << "\": " << overlay->nodes_.size() << " nodes\n"; + for (const auto &node : overlay->nodes_) { + td::TerminalIO::out() << " " << node->adnl_id_ + << (node->sender_ ? (PSTRING() << " (sender, p=" << node->sender_priority_ << ")") : "") + << "\n"; + } + td::TerminalIO::out() << "\n"; + } + return td::Status::OK(); +} + td::Status GetValidatorSessionsInfoQuery::run() { TRY_STATUS(tokenizer_.check_endl()); return td::Status::OK(); diff --git a/validator-engine-console/validator-engine-console-query.h b/validator-engine-console/validator-engine-console-query.h index 02c92af0..9ff1edc6 100644 --- a/validator-engine-console/validator-engine-console-query.h +++ b/validator-engine-console/validator-engine-console-query.h @@ -1145,6 +1145,70 @@ class SetExtMessagesBroadcastDisabledQuery : public Query { bool value; }; +class AddPrivateExtMsgOverlayQuery : public Query { + public: + AddPrivateExtMsgOverlayQuery(td::actor::ActorId console, Tokenizer tokenizer) + : Query(console, std::move(tokenizer)) { + } + td::Status run() override; + td::Status send() override; + td::Status receive(td::BufferSlice data) override; + static std::string get_name() { + return "addprivateextmsgoverlay"; + } + static std::string get_help() { + return "addprivateextmsgoverlay \tadd private overlay for external messages with config from file " + ""; + } + std::string name() const override { + return get_name(); + } + + private: + std::string file_name_; +}; + +class DelPrivateExtMsgOverlayQuery : public Query { + public: + DelPrivateExtMsgOverlayQuery(td::actor::ActorId console, Tokenizer tokenizer) + : Query(console, std::move(tokenizer)) { + } + td::Status run() override; + td::Status send() override; + td::Status receive(td::BufferSlice data) override; + static std::string get_name() { + return "delprivateextmsgoverlay"; + } + static std::string get_help() { + return "delprivateextmsgoverlay \tdelete private overlay for external messages with name "; + } + std::string name() const override { + return get_name(); + } + + private: + std::string name_; +}; + +class ShowPrivateExtMsgOverlaysQuery : public Query { + public: + ShowPrivateExtMsgOverlaysQuery(td::actor::ActorId console, Tokenizer tokenizer) + : Query(console, std::move(tokenizer)) { + } + td::Status run() override; + td::Status send() override; + td::Status receive(td::BufferSlice data) override; + static std::string get_name() { + return "showprivateextmsgoverlays"; + } + static std::string get_help() { + return "showprivateextmsgoverlays\tshow all private overlay for external messages"; + } + std::string name() const override { + return get_name(); + } +}; + class GetValidatorSessionsInfoQuery : public Query { public: GetValidatorSessionsInfoQuery(td::actor::ActorId console, Tokenizer tokenizer) diff --git a/validator-engine-console/validator-engine-console.cpp b/validator-engine-console/validator-engine-console.cpp index 5d12c937..3dce5f46 100644 --- a/validator-engine-console/validator-engine-console.cpp +++ b/validator-engine-console/validator-engine-console.cpp @@ -143,6 +143,9 @@ void ValidatorEngineConsole::run() { add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); + add_query_runner(std::make_unique>()); + add_query_runner(std::make_unique>()); + add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index e4fda23e..1138c962 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -1944,6 +1944,7 @@ void ValidatorEngine::start_full_node() { td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_collator_adnl_id, ton::adnl::AdnlNodeIdShort(c.adnl_id)); } + load_private_ext_msg_overlays_config(); } else { started_full_node(); } @@ -2444,6 +2445,66 @@ void ValidatorEngine::try_del_proxy(td::uint32 ip, td::int32 port, std::vector(); + auto data_R = td::read_file(private_ext_msg_overlays_config_file()); + if (data_R.is_error()) { + return; + } + auto data = data_R.move_as_ok(); + auto json_R = td::json_decode(data.as_slice()); + if (json_R.is_error()) { + LOG(ERROR) << "Failed to parse private ext msg overlays config: " << json_R.move_as_error(); + return; + } + auto json = json_R.move_as_ok(); + auto S = ton::ton_api::from_json(*private_ext_msg_overlays_config_, json.get_object()); + if (S.is_error()) { + LOG(ERROR) << "Failed to parse private ext msg overlays config: " << S; + return; + } + + for (auto &overlay : private_ext_msg_overlays_config_->overlays_) { + std::vector nodes; + std::map senders; + for (const auto &node : overlay->nodes_) { + nodes.emplace_back(node->adnl_id_); + if (node->sender_) { + senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->sender_priority_; + } + } + td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes), + std::move(senders), overlay->name_, [](td::Result R) { R.ensure(); }); + } +} + +td::Status ValidatorEngine::write_private_ext_msg_overlays_config() { + auto s = td::json_encode(td::ToJson(*private_ext_msg_overlays_config_), true); + TRY_STATUS_PREFIX(td::write_file(private_ext_msg_overlays_config_file(), s), "failed to write config: "); + return td::Status::OK(); +} + +void ValidatorEngine::add_private_ext_msg_overlay_to_config( + ton::tl_object_ptr overlay, td::Promise promise) { + private_ext_msg_overlays_config_->overlays_.push_back(std::move(overlay)); + TRY_STATUS_PROMISE(promise, write_private_ext_msg_overlays_config()); + promise.set_result(td::Unit()); +} + +void ValidatorEngine::del_private_ext_msg_overlay_from_config(std::string name, td::Promise promise) { + auto &overlays = private_ext_msg_overlays_config_->overlays_; + for (size_t i = 0; i < overlays.size(); ++i) { + if (overlays[i]->name_ == name) { + overlays.erase(overlays.begin() + i); + TRY_STATUS_PROMISE(promise, write_private_ext_msg_overlays_config()); + promise.set_result(td::Unit()); + return; + } + } + promise.set_error(td::Status::Error(PSTRING() << "no overlay \"" << name << "\" in config")); +} + void ValidatorEngine::check_key(ton::PublicKeyHash id, td::Promise promise) { if (keys_.count(id) == 1) { promise.set_value(td::Unit()); @@ -3571,7 +3632,8 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getShardO }); } -void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, +void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query, + td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise) { if (!(perm & ValidatorEnginePermissions::vep_modify)) { promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); @@ -3597,6 +3659,95 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setExtMes }); } +void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_addPrivateExtMsgOverlay &query, + td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, + td::Promise promise) { + if (!(perm & ValidatorEnginePermissions::vep_modify)) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); + return; + } + if (!started_ || full_node_.empty()) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started"))); + return; + } + + auto &overlay = query.overlay_; + std::vector nodes; + std::map senders; + for (const auto &node : overlay->nodes_) { + nodes.emplace_back(node->adnl_id_); + if (node->sender_) { + senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->sender_priority_; + } + } + std::string name = overlay->name_; + td::actor::send_closure( + full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes), std::move(senders), + std::move(name), + [SelfId = actor_id(this), overlay = std::move(overlay), + promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_control_query_error(R.move_as_error())); + return; + } + td::actor::send_closure( + SelfId, &ValidatorEngine::add_private_ext_msg_overlay_to_config, std::move(overlay), + [promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_control_query_error(R.move_as_error())); + return; + } + promise.set_value(ton::create_serialize_tl_object()); + }); + }); +} + +void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_delPrivateExtMsgOverlay &query, + td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, + td::Promise promise) { + if (!(perm & ValidatorEnginePermissions::vep_modify)) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); + return; + } + if (!started_ || full_node_.empty()) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started"))); + return; + } + td::actor::send_closure( + full_node_, &ton::validator::fullnode::FullNode::del_ext_msg_overlay, query.name_, + [SelfId = actor_id(this), name = query.name_, promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_control_query_error(R.move_as_error())); + return; + } + td::actor::send_closure( + SelfId, &ValidatorEngine::del_private_ext_msg_overlay_from_config, std::move(name), + [promise = std::move(promise)](td::Result R) mutable { + if (R.is_error()) { + promise.set_value(create_control_query_error(R.move_as_error())); + return; + } + promise.set_value(ton::create_serialize_tl_object()); + }); + }); +} + +void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_showPrivateExtMsgOverlays &query, + td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, + td::Promise promise) { + if (!(perm & ValidatorEnginePermissions::vep_default)) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); + return; + } + if (!started_ || full_node_.empty()) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started"))); + return; + } + + promise.set_value(ton::serialize_tl_object( + private_ext_msg_overlays_config_, true)); +} + void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getValidatorSessionsInfo &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise) { diff --git a/validator-engine/validator-engine.hpp b/validator-engine/validator-engine.hpp index b6345f64..40d9945a 100644 --- a/validator-engine/validator-engine.hpp +++ b/validator-engine/validator-engine.hpp @@ -181,6 +181,7 @@ class ValidatorEngine : public td::actor::Actor { std::shared_ptr dht_config_; td::Ref validator_options_; Config config_; + ton::tl_object_ptr private_ext_msg_overlays_config_; std::set running_gc_; @@ -389,6 +390,16 @@ class ValidatorEngine : public td::actor::Actor { void try_del_proxy(td::uint32 ip, td::int32 port, std::vector cats, std::vector prio_cats, td::Promise promise); + std::string private_ext_msg_overlays_config_file() const { + return db_root_ + "/private-ext-msg-overlays.json"; + } + + void load_private_ext_msg_overlays_config(); + td::Status write_private_ext_msg_overlays_config(); + void add_private_ext_msg_overlay_to_config( + ton::tl_object_ptr overlay, td::Promise promise); + void del_private_ext_msg_overlay_from_config(std::string name, td::Promise promise); + void check_key(ton::PublicKeyHash id, td::Promise promise); static td::BufferSlice create_control_query_error(td::Status error); @@ -477,6 +488,12 @@ class ValidatorEngine : public td::actor::Actor { ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); void run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); + void run_control_query(ton::ton_api::engine_validator_addPrivateExtMsgOverlay &query, td::BufferSlice data, + ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); + void run_control_query(ton::ton_api::engine_validator_delPrivateExtMsgOverlay &query, td::BufferSlice data, + ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); + void run_control_query(ton::ton_api::engine_validator_showPrivateExtMsgOverlays &query, td::BufferSlice data, + ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); template void run_control_query(T &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise) { diff --git a/validator-session/validator-session.cpp b/validator-session/validator-session.cpp index add7def5..44320470 100644 --- a/validator-session/validator-session.cpp +++ b/validator-session/validator-session.cpp @@ -903,7 +903,7 @@ ValidatorSessionImpl::ValidatorSessionImpl(catchain::CatChainSessionId session_i , rldp_(rldp) , overlay_manager_(overlays) , allow_unsafe_self_blocks_resync_(allow_unsafe_self_blocks_resync) { - compress_block_candidates_ = opts.proto_version >= 3; + compress_block_candidates_ = opts.proto_version >= 4; description_ = ValidatorSessionDescription::create(std::move(opts), nodes, local_id); src_round_candidate_.resize(description_->get_total_nodes()); } diff --git a/validator/db/archive-slice.cpp b/validator/db/archive-slice.cpp index 43a02ec4..86c2ee93 100644 --- a/validator/db/archive-slice.cpp +++ b/validator/db/archive-slice.cpp @@ -670,7 +670,9 @@ void ArchiveSlice::do_close() { LOG(DEBUG) << "Closing archive slice " << db_path_; status_ = st_closed; kv_ = {}; - statistics_.pack_statistics->record_close(packages_.size()); + if (statistics_.pack_statistics) { + statistics_.pack_statistics->record_close(packages_.size()); + } packages_.clear(); } @@ -774,7 +776,9 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error(); return; } - statistics_.pack_statistics->record_open(); + if (statistics_.pack_statistics) { + statistics_.pack_statistics->record_open(); + } auto idx = td::narrow_cast(packages_.size()); if (finalized_) { packages_.emplace_back(nullptr, td::actor::ActorOwn(), seqno, path, idx, version); @@ -817,7 +821,9 @@ void ArchiveSlice::destroy(td::Promise promise) { for (auto &p : packages_) { td::unlink(p.path).ensure(); } - statistics_.pack_statistics->record_close(packages_.size()); + if (statistics_.pack_statistics) { + statistics_.pack_statistics->record_close(packages_.size()); + } packages_.clear(); kv_ = nullptr; @@ -1023,7 +1029,9 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) { td::unlink(packages_[idx].path).ensure(); } - statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1); + if (statistics_.pack_statistics) { + statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1); + } packages_.erase(packages_.begin() + pack->idx + 1, packages_.end()); kv_->commit_transaction().ensure(); diff --git a/validator/full-node-private-overlay-v2.cpp b/validator/full-node-private-overlay-v2.cpp index 402c8d09..49657d3c 100644 --- a/validator/full-node-private-overlay-v2.cpp +++ b/validator/full-node-private-overlay-v2.cpp @@ -14,7 +14,6 @@ You should have received a copy of the GNU Lesser General Public License along with TON Blockchain Library. If not, see . */ -#pragma once #include "full-node-private-overlay-v2.hpp" #include "ton/ton-tl.hpp" @@ -40,6 +39,8 @@ void FullNodePrivateOverlayV2::process_block_broadcast(PublicKeyHash src, ton_ap LOG(DEBUG) << "dropped broadcast: " << B.move_as_error(); return; } + VLOG(FULL_NODE_DEBUG) << "Received block broadcast in private overlay from " << src << ": " + << B.ok().block_id.to_str(); auto P = td::PromiseCreator::lambda([](td::Result R) { if (R.is_error()) { if (R.error().code() == ErrorCode::notready) { @@ -49,14 +50,14 @@ void FullNodePrivateOverlayV2::process_block_broadcast(PublicKeyHash src, ton_ap } } }); - LOG(FULL_NODE_DEBUG) << "Got block broadcast in private overlay: " << B.ok().block_id.to_str(); td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, B.move_as_ok(), std::move(P)); } -void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash, ton_api::tonNode_newShardBlockBroadcast &query) { +void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) { BlockIdExt block_id = create_block_id(query.block_->block_); - LOG(FULL_NODE_DEBUG) << "Got block description broadcast in private overlay: " << block_id.to_str(); + VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in private overlay from " << src << ": " + << block_id.to_str(); td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, block_id, query.block_->cc_seqno_, std::move(query.block_->data_)); } @@ -75,6 +76,7 @@ void FullNodePrivateOverlayV2::send_shard_block_info(BlockIdExt block_id, Catcha if (!inited_) { return; } + VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast in private overlay: " << block_id.to_str(); auto B = create_serialize_tl_object( create_tl_object(create_tl_block_id(block_id), cc_seqno, std::move(data))); if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) { @@ -90,6 +92,8 @@ void FullNodePrivateOverlayV2::send_broadcast(BlockBroadcast broadcast) { if (!inited_) { return; } + VLOG(FULL_NODE_DEBUG) << "Sending block broadcast in private overlay (with compression): " + << broadcast.block_id.to_str(); auto B = serialize_block_broadcast(broadcast, true); // compression_enabled = true if (B.is_error()) { VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error(); @@ -164,8 +168,10 @@ void FullNodePrivateOverlayV2::init() { authorized_keys[sender.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size(); } overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), 0, std::move(authorized_keys)}; + std::string scope = PSTRING() << R"({ "type": "private-blocks-v2", "shard_id": )" << shard_.shard + << ", \"workchain_id\": " << shard_.workchain << " }"; td::actor::send_closure(overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), - nodes_, std::make_unique(actor_id(this)), rules); + nodes_, std::make_unique(actor_id(this)), rules, std::move(scope)); td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_); td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_); diff --git a/validator/full-node-private-overlay.cpp b/validator/full-node-private-overlay.cpp index 559dba17..9fd02ab4 100644 --- a/validator/full-node-private-overlay.cpp +++ b/validator/full-node-private-overlay.cpp @@ -19,26 +19,25 @@ #include "common/delay.h" #include "full-node-serializer.hpp" -namespace ton { +namespace ton::validator::fullnode { -namespace validator { - -namespace fullnode { - -void FullNodePrivateOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) { +void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) { process_block_broadcast(src, query); } -void FullNodePrivateOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query) { +void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, + ton_api::tonNode_blockBroadcastCompressed &query) { process_block_broadcast(src, query); } -void FullNodePrivateOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) { +void FullNodePrivateBlockOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) { auto B = deserialize_block_broadcast(query, overlay::Overlays::max_fec_broadcast_size()); if (B.is_error()) { LOG(DEBUG) << "dropped broadcast: " << B.move_as_error(); return; } + VLOG(FULL_NODE_DEBUG) << "Received block broadcast in private overlay from " << src << ": " + << B.ok().block_id.to_str(); auto P = td::PromiseCreator::lambda([](td::Result R) { if (R.is_error()) { if (R.error().code() == ErrorCode::notready) { @@ -52,25 +51,28 @@ void FullNodePrivateOverlay::process_block_broadcast(PublicKeyHash src, ton_api: std::move(P)); } -void FullNodePrivateOverlay::process_broadcast(PublicKeyHash, ton_api::tonNode_newShardBlockBroadcast &query) { - td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, - create_block_id(query.block_->block_), query.block_->cc_seqno_, - std::move(query.block_->data_)); +void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) { + BlockIdExt block_id = create_block_id(query.block_->block_); + VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in private overlay from " << src << ": " + << block_id.to_str(); + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, block_id, + query.block_->cc_seqno_, std::move(query.block_->data_)); } -void FullNodePrivateOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { +void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { auto B = fetch_tl_object(std::move(broadcast), true); if (B.is_error()) { return; } - ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); }); } -void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) { +void FullNodePrivateBlockOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, + td::BufferSlice data) { if (!inited_) { return; } + VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast in private overlay: " << block_id.to_str(); auto B = create_serialize_tl_object( create_tl_object(create_tl_block_id(block_id), cc_seqno, std::move(data))); if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) { @@ -82,11 +84,13 @@ void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, Catchain } } -void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) { +void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) { if (!inited_) { return; } - auto B = serialize_block_broadcast(broadcast, false); // compression_enabled = false + VLOG(FULL_NODE_DEBUG) << "Sending block broadcast in private overlay" + << (enable_compression_ ? " (with compression)" : "") << ": " << broadcast.block_id.to_str(); + auto B = serialize_block_broadcast(broadcast, enable_compression_); if (B.is_error()) { VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error(); return; @@ -95,7 +99,7 @@ void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) { local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok()); } -void FullNodePrivateOverlay::start_up() { +void FullNodePrivateBlockOverlay::start_up() { std::sort(nodes_.begin(), nodes_.end()); nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end()); @@ -112,22 +116,22 @@ void FullNodePrivateOverlay::start_up() { try_init(); } -void FullNodePrivateOverlay::try_init() { +void FullNodePrivateBlockOverlay::try_init() { // Sometimes adnl id is added to validator engine later (or not at all) td::actor::send_closure( adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result R) { if (R.is_ok() && R.ok()) { - td::actor::send_closure(SelfId, &FullNodePrivateOverlay::init); + td::actor::send_closure(SelfId, &FullNodePrivateBlockOverlay::init); } else { - delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateOverlay::try_init); }, + delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateBlockOverlay::try_init); }, td::Timestamp::in(30.0)); } }); } -void FullNodePrivateOverlay::init() { - LOG(FULL_NODE_INFO) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size() - << " nodes"; +void FullNodePrivateBlockOverlay::init() { + LOG(FULL_NODE_WARNING) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size() + << " nodes, overlay_id=" << overlay_id_; class Callback : public overlay::Overlays::Callback { public: void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { @@ -136,37 +140,140 @@ void FullNodePrivateOverlay::init() { td::Promise promise) override { } void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { - td::actor::send_closure(node_, &FullNodePrivateOverlay::receive_broadcast, src, std::move(data)); + td::actor::send_closure(node_, &FullNodePrivateBlockOverlay::receive_broadcast, src, std::move(data)); } void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data, td::Promise promise) override { } - Callback(td::actor::ActorId node) : node_(node) { + Callback(td::actor::ActorId node) : node_(node) { } private: - td::actor::ActorId node_; + td::actor::ActorId node_; }; overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), overlay::CertificateFlags::AllowFec | overlay::CertificateFlags::Trusted, {}}; td::actor::send_closure(overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), - nodes_, std::make_unique(actor_id(this)), rules); + nodes_, std::make_unique(actor_id(this)), rules, R"({ "type": "private-blocks" })"); td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_); td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_); inited_ = true; } -void FullNodePrivateOverlay::tear_down() { +void FullNodePrivateBlockOverlay::tear_down() { if (inited_) { td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_); } } -} // namespace fullnode +void FullNodePrivateExtMsgOverlay::process_broadcast(PublicKeyHash src, + ton_api::tonNode_externalMessageBroadcast &query) { + auto it = senders_.find(adnl::AdnlNodeIdShort{src}); + if (it == senders_.end()) { + return; + } + LOG(FULL_NODE_DEBUG) << "Got external message in private overlay \"" << name_ << "\" from " << src + << " (priority=" << it->second << ")"; + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message, + std::move(query.message_->data_), it->second); +} -} // namespace validator +void FullNodePrivateExtMsgOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) { + auto B = fetch_tl_object(std::move(broadcast), true); + if (B.is_error()) { + return; + } + ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); }); +} -} // namespace ton +void FullNodePrivateExtMsgOverlay::send_external_message(td::BufferSlice data) { + if (config_.ext_messages_broadcast_disabled_) { + return; + } + LOG(FULL_NODE_DEBUG) << "Sending external message to private overlay \"" << name_ << "\""; + auto B = create_serialize_tl_object( + create_tl_object(std::move(data))); + if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) { + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), 0, std::move(B)); + } else { + td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, + local_id_.pubkey_hash(), 0, std::move(B)); + } +} + +void FullNodePrivateExtMsgOverlay::start_up() { + std::sort(nodes_.begin(), nodes_.end()); + nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end()); + std::vector nodes; + for (const adnl::AdnlNodeIdShort &id : nodes_) { + nodes.push_back(id.bits256_value()); + } + auto X = + create_hash_tl_object(zero_state_file_hash_, name_, std::move(nodes)); + td::BufferSlice b{32}; + b.as_slice().copy_from(as_slice(X)); + overlay_id_full_ = overlay::OverlayIdFull{std::move(b)}; + overlay_id_ = overlay_id_full_.compute_short_id(); + try_init(); +} + +void FullNodePrivateExtMsgOverlay::try_init() { + // Sometimes adnl id is added to validator engine later (or not at all) + td::actor::send_closure( + adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result R) { + if (R.is_ok() && R.ok()) { + td::actor::send_closure(SelfId, &FullNodePrivateExtMsgOverlay::init); + } else { + delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateExtMsgOverlay::try_init); }, + td::Timestamp::in(30.0)); + } + }); +} + +void FullNodePrivateExtMsgOverlay::init() { + LOG(FULL_NODE_WARNING) << "Creating private ext msg overlay \"" << name_ << "\" for adnl id " << local_id_ << " : " + << nodes_.size() << " nodes, overlay_id=" << overlay_id_; + class Callback : public overlay::Overlays::Callback { + public: + void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { + } + void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data, + td::Promise promise) override { + } + void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override { + td::actor::send_closure(node_, &FullNodePrivateExtMsgOverlay::receive_broadcast, src, std::move(data)); + } + void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data, + td::Promise promise) override { + } + Callback(td::actor::ActorId node) : node_(node) { + } + + private: + td::actor::ActorId node_; + }; + + std::map authorized_keys; + for (const auto &sender : senders_) { + authorized_keys[sender.first.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size(); + } + overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), 0, std::move(authorized_keys)}; + td::actor::send_closure( + overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), nodes_, + std::make_unique(actor_id(this)), rules, + PSTRING() << R"({ "type": "private-ext-msg", "name": ")" << td::format::Escaped{name_} << R"(" })"); + + td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_); + td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_); +} + +void FullNodePrivateExtMsgOverlay::tear_down() { + LOG(FULL_NODE_WARNING) << "Destroying private ext msg overlay \"" << name_ << "\" for adnl id " << local_id_; + td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_); +} + +} // namespace ton::validator::fullnode diff --git a/validator/full-node-private-overlay.hpp b/validator/full-node-private-overlay.hpp index e6497a87..5fbb8825 100644 --- a/validator/full-node-private-overlay.hpp +++ b/validator/full-node-private-overlay.hpp @@ -18,13 +18,9 @@ #include "full-node.h" -namespace ton { +namespace ton::validator::fullnode { -namespace validator { - -namespace fullnode { - -class FullNodePrivateOverlay : public td::actor::Actor { +class FullNodePrivateBlockOverlay : public td::actor::Actor { public: void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query); void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query); @@ -40,19 +36,89 @@ class FullNodePrivateOverlay : public td::actor::Actor { void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data); void send_broadcast(BlockBroadcast broadcast); + void set_config(FullNodeConfig config) { + config_ = std::move(config); + } + + void set_enable_compression(bool value) { + enable_compression_ = value; + } + void start_up() override; void tear_down() override; - FullNodePrivateOverlay(adnl::AdnlNodeIdShort local_id, std::vector nodes, - FileHash zero_state_file_hash, FullNodeConfig config, - td::actor::ActorId keyring, td::actor::ActorId adnl, - td::actor::ActorId rldp, td::actor::ActorId rldp2, - td::actor::ActorId overlays, - td::actor::ActorId validator_manager) + FullNodePrivateBlockOverlay(adnl::AdnlNodeIdShort local_id, std::vector nodes, + FileHash zero_state_file_hash, FullNodeConfig config, bool enable_compression, + td::actor::ActorId keyring, td::actor::ActorId adnl, + td::actor::ActorId rldp, td::actor::ActorId rldp2, + td::actor::ActorId overlays, + td::actor::ActorId validator_manager) : local_id_(local_id) , nodes_(std::move(nodes)) , zero_state_file_hash_(zero_state_file_hash) , config_(config) + , enable_compression_(enable_compression) + , keyring_(keyring) + , adnl_(adnl) + , rldp_(rldp) + , rldp2_(rldp2) + , overlays_(overlays) + , validator_manager_(validator_manager) { + } + + private: + adnl::AdnlNodeIdShort local_id_; + std::vector nodes_; + FileHash zero_state_file_hash_; + FullNodeConfig config_; + bool enable_compression_; + + td::actor::ActorId keyring_; + td::actor::ActorId adnl_; + td::actor::ActorId rldp_; + td::actor::ActorId rldp2_; + td::actor::ActorId overlays_; + td::actor::ActorId validator_manager_; + + bool inited_ = false; + overlay::OverlayIdFull overlay_id_full_; + overlay::OverlayIdShort overlay_id_; + + void try_init(); + void init(); +}; + +class FullNodePrivateExtMsgOverlay : public td::actor::Actor { + public: + void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query); + template + void process_broadcast(PublicKeyHash, T &) { + VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast"; + } + void receive_broadcast(PublicKeyHash src, td::BufferSlice query); + + void send_external_message(td::BufferSlice data); + + void set_config(FullNodeConfig config) { + config_ = std::move(config); + } + + void start_up() override; + void tear_down() override; + + FullNodePrivateExtMsgOverlay(adnl::AdnlNodeIdShort local_id, std::vector nodes, + std::map senders, std::string name, + FileHash zero_state_file_hash, FullNodeConfig config, + td::actor::ActorId keyring, td::actor::ActorId adnl, + td::actor::ActorId rldp, td::actor::ActorId rldp2, + td::actor::ActorId overlays, + td::actor::ActorId validator_manager) + : local_id_(local_id) + , nodes_(std::move(nodes)) + , senders_(std::move(senders)) + , name_(std::move(name)) + , zero_state_file_hash_(zero_state_file_hash) + , config_(config) , keyring_(keyring) , adnl_(adnl) , rldp_(rldp) @@ -64,6 +130,8 @@ class FullNodePrivateOverlay : public td::actor::Actor { private: adnl::AdnlNodeIdShort local_id_; std::vector nodes_; + std::map senders_; + std::string name_; FileHash zero_state_file_hash_; FullNodeConfig config_; @@ -82,8 +150,4 @@ class FullNodePrivateOverlay : public td::actor::Actor { void init(); }; -} // namespace fullnode - -} // namespace validator - -} // namespace ton +} // namespace ton::validator::fullnode diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index a7703c57..13e05cbc 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -134,7 +134,7 @@ void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broad promise.set_error(td::Status::Error("rebroadcasting external messages is disabled")); promise = [manager = validator_manager_, message = q->message_->data_.clone()](td::Result R) mutable { if (R.is_ok()) { - td::actor::send_closure(manager, &ValidatorManagerInterface::new_external_message, std::move(message)); + td::actor::send_closure(manager, &ValidatorManagerInterface::new_external_message, std::move(message), 0); } }; } @@ -729,14 +729,14 @@ void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ih void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query) { td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message, - std::move(query.message_->data_)); + std::move(query.message_->data_), 0); } void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) { - auto block_id = create_block_id(query.block_->block_); - td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, - block_id, query.block_->cc_seqno_, - std::move(query.block_->data_)); + BlockIdExt block_id = create_block_id(query.block_->block_); + VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast from " << src << ": " << block_id.to_str(); + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, block_id, + query.block_->cc_seqno_, std::move(query.block_->data_)); } void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) { @@ -758,6 +758,7 @@ void FullNodeShardImpl::process_block_broadcast(PublicKeyHash src, ton_api::tonN // << " block=" << block_id.to_str(); // return; //} + VLOG(FULL_NODE_DEBUG) << "Received block broadcast from " << src << ": " << B.ok().block_id.to_str(); auto P = td::PromiseCreator::lambda([](td::Result R) { if (R.is_error()) { if (R.error().code() == ErrorCode::notready) { @@ -836,6 +837,7 @@ void FullNodeShardImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno UNREACHABLE(); return; } + VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast: " << block_id.to_str(); auto B = create_serialize_tl_object( create_tl_object(create_tl_block_id(block_id), cc_seqno, std::move(data))); if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) { @@ -852,6 +854,7 @@ void FullNodeShardImpl::send_broadcast(BlockBroadcast broadcast) { UNREACHABLE(); return; } + VLOG(FULL_NODE_DEBUG) << "Sending block broadcast in private overlay: " << broadcast.block_id.to_str(); auto B = serialize_block_broadcast(broadcast, false); // compression_enabled = false if (B.is_error()) { VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error(); diff --git a/validator/full-node.cpp b/validator/full-node.cpp index 57323c4b..6c1990ef 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -37,6 +37,10 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise pr } local_keys_.insert(key); + // create_private_block_overlay(key); + for (auto &p : private_ext_msg_overlays_) { + update_ext_msg_overlay(p.first, p.second); + } if (!sign_cert_by_.is_zero()) { promise.set_value(td::Unit()); @@ -54,7 +58,6 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise pr td::actor::send_closure(shard.second.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_); } } - // create_private_block_overlay(key); promise.set_value(td::Unit()); } @@ -64,6 +67,11 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise pr return; } local_keys_.erase(key); + // private_block_overlays_.erase(key); + for (auto &p : private_ext_msg_overlays_) { + update_ext_msg_overlay(p.first, p.second); + } + if (sign_cert_by_ != key) { promise.set_value(td::Unit()); return; @@ -81,7 +89,6 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise pr td::actor::send_closure(shard.second.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_); } } - // private_block_overlays_.erase(key); promise.set_value(td::Unit()); } @@ -131,6 +138,10 @@ void FullNodeImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise nodes, + std::map senders, std::string name, + td::Promise promise) { + if (nodes.empty()) { + promise.set_error(td::Status::Error("list of nodes is empty")); + return; + } + if (private_ext_msg_overlays_.count(name)) { + promise.set_error(td::Status::Error(PSTRING() << "duplicate overlay name \"" << name << "\"")); + return; + } + VLOG(FULL_NODE_WARNING) << "Adding private overlay for external messages \"" << name << "\", " << nodes.size() + << " nodes"; + auto &p = private_ext_msg_overlays_[name]; + p.nodes_ = nodes; + p.senders_ = senders; + update_ext_msg_overlay(name, p); + promise.set_result(td::Unit()); +} + +void FullNodeImpl::del_ext_msg_overlay(std::string name, td::Promise promise) { + auto it = private_ext_msg_overlays_.find(name); + if (it == private_ext_msg_overlays_.end()) { + promise.set_error(td::Status::Error(PSTRING() << "no such overlay \"" << name << "\"")); + return; + } + private_ext_msg_overlays_.erase(it); + promise.set_result(td::Unit()); } void FullNodeImpl::initial_read_complete(BlockHandle top_handle) { @@ -277,6 +326,14 @@ void FullNodeImpl::send_ext_message(AccountIdPrefixFull dst, td::BufferSlice dat VLOG(FULL_NODE_WARNING) << "dropping OUT ext message to unknown shard"; return; } + for (auto &private_overlay : private_ext_msg_overlays_) { + for (auto &actor : private_overlay.second.actors_) { + auto local_id = actor.first; + if (private_overlay.second.senders_.count(local_id)) { + td::actor::send_closure(actor.second, &FullNodePrivateExtMsgOverlay::send_external_message, data.clone()); + } + } + } td::actor::send_closure(shard, &FullNodeShard::send_external_message, std::move(data)); } @@ -286,6 +343,10 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s VLOG(FULL_NODE_WARNING) << "dropping OUT shard block info message to unknown shard"; return; } + /*if (!private_block_overlays_.empty()) { + td::actor::send_closure(private_block_overlays_.begin()->second, + &FullNodePrivateBlockOverlay::send_shard_block_info, block_id, cc_seqno, data.clone()); + }*/ auto private_overlay = private_block_overlays_.choose_overlay(ShardIdFull(masterchainId)); if (!private_overlay.empty()) { td::actor::send_closure(private_overlay, &FullNodePrivateOverlayV2::send_shard_block_info, block_id, cc_seqno, @@ -300,6 +361,10 @@ void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) { VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard"; return; } + /*if (!private_block_overlays_.empty()) { + td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_broadcast, + broadcast.clone()); + }*/ auto private_overlay = private_block_overlays_.choose_overlay(broadcast.block_id.shard_full()); if (!private_overlay.empty()) { td::actor::send_closure(private_overlay, &FullNodePrivateOverlayV2::send_broadcast, broadcast.clone()); @@ -438,52 +503,7 @@ td::actor::ActorId FullNodeImpl::get_shard(AccountIdPrefixFull ds return get_shard(shard_prefix(dst, max_shard_pfx_len)); } -void FullNodeImpl::got_key_block_proof(td::Ref proof) { - auto R = proof->get_key_block_config(); - R.ensure(); - auto config = R.move_as_ok(); - - PublicKeyHash l = PublicKeyHash::zero(); - std::vector keys; - std::map current_validators; - for (td::int32 i = -1; i <= 1; i++) { - auto r = config->get_total_validator_set(i < 0 ? i : 1 - i); - if (r.not_null()) { - auto vec = r->export_vector(); - for (auto &el : vec) { - auto key = ValidatorFullId{el.key}.compute_short_id(); - keys.push_back(key); - if (local_keys_.count(key)) { - l = key; - } - if (i == 1) { - current_validators[key] = adnl::AdnlNodeIdShort{el.addr.is_zero() ? key.bits256_value() : el.addr}; - } - } - } - } - - if (current_validators != current_validators_) { - current_validators_ = std::move(current_validators); - // update_private_block_overlays(); - } - - if (keys == all_validators_) { - return; - } - - all_validators_ = keys; - sign_cert_by_ = l; - CHECK(all_validators_.size() > 0); - - for (auto &shard : shards_) { - if (!shard.second.actor.empty()) { - td::actor::send_closure(shard.second.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_); - } - } -} - -void FullNodeImpl::got_zero_block_state(td::Ref state) { +void FullNodeImpl::got_key_block_state(td::Ref state) { auto m = td::Ref{std::move(state)}; PublicKeyHash l = PublicKeyHash::zero(); @@ -506,9 +526,11 @@ void FullNodeImpl::got_zero_block_state(td::Ref state) { } } + // set_private_block_overlays_enable_compression(m->get_consensus_config().proto_version >= 3); + if (current_validators != current_validators_) { current_validators_ = std::move(current_validators); - // update_private_block_overlays(); + update_private_overlays(); } if (keys == all_validators_) { @@ -527,28 +549,15 @@ void FullNodeImpl::got_zero_block_state(td::Ref state) { } void FullNodeImpl::new_key_block(BlockHandle handle) { - if (handle->id().seqno() == 0) { - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { - if (R.is_error()) { - VLOG(FULL_NODE_WARNING) << "failed to get zero state: " << R.move_as_error(); - } else { - td::actor::send_closure(SelfId, &FullNodeImpl::got_zero_block_state, R.move_as_ok()); - } - }); - td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_shard_state_from_db, handle, - std::move(P)); - } else { - CHECK(handle->is_key_block()); - auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { - if (R.is_error()) { - VLOG(FULL_NODE_WARNING) << "failed to get key block proof: " << R.move_as_error(); - } else { - td::actor::send_closure(SelfId, &FullNodeImpl::got_key_block_proof, R.move_as_ok()); - } - }); - td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_block_proof_link_from_db, handle, - std::move(P)); - } + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result> R) { + if (R.is_error()) { + VLOG(FULL_NODE_WARNING) << "failed to get key block state: " << R.move_as_error(); + } else { + td::actor::send_closure(SelfId, &FullNodeImpl::got_key_block_state, R.move_as_ok()); + } + }); + td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_shard_state_from_db, handle, + std::move(P)); } void FullNodeImpl::start_up() { @@ -638,13 +647,26 @@ void FullNodeImpl::start_up() { std::make_unique(actor_id(this)), std::move(started_promise_)); } -/* void FullNodeImpl::update_private_block_overlays() { - private_block_overlays_.clear(); +void FullNodeImpl::update_private_overlays() { + for (auto &p : private_ext_msg_overlays_) { + update_ext_msg_overlay(p.first, p.second); + } + /*private_block_overlays_.clear(); if (local_keys_.empty()) { return; } for (const auto &key : local_keys_) { create_private_block_overlay(key); + }*/ +} + +/*void FullNodeImpl::set_private_block_overlays_enable_compression(bool value) { + if (private_block_overlays_enable_compression_ == value) { + return; + } + private_block_overlays_enable_compression_ = true; + for (auto &p : private_block_overlays_) { + td::actor::send_closure(p.second, &FullNodePrivateBlockOverlay::set_enable_compression, value); } } @@ -659,7 +681,32 @@ void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) { "BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_, keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_); } -} */ +}*/ + +void FullNodeImpl::update_ext_msg_overlay(const std::string &name, ExtMsgOverlayInfo &overlay) { + auto old_actors = std::move(overlay.actors_); + overlay.actors_.clear(); + auto try_local_id = [&](const adnl::AdnlNodeIdShort &local_id) { + if (std::find(overlay.nodes_.begin(), overlay.nodes_.end(), local_id) != overlay.nodes_.end()) { + auto it = old_actors.find(local_id); + if (it != old_actors.end()) { + overlay.actors_[local_id] = std::move(it->second); + old_actors.erase(it); + } else { + overlay.actors_[local_id] = td::actor::create_actor( + "ExtMsgPrivateOverlay", local_id, overlay.nodes_, overlay.senders_, name, zero_state_file_hash_, config_, + keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_); + } + } + }; + try_local_id(adnl_id_); + for (const PublicKeyHash &local_key: local_keys_) { + auto it = current_validators_.find(local_key); + if (it != current_validators_.end()) { + try_local_id(it->second); + } + } +} FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash, FullNodeConfig config, td::actor::ActorId keyring, diff --git a/validator/full-node.h b/validator/full-node.h index 65e6fae0..fa130783 100644 --- a/validator/full-node.h +++ b/validator/full-node.h @@ -75,6 +75,11 @@ class FullNode : public td::actor::Actor { virtual void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise promise) = 0; virtual void set_config(FullNodeConfig config) = 0; + virtual void add_ext_msg_overlay(std::vector nodes, + std::map senders, std::string name, + td::Promise promise) = 0; + virtual void del_ext_msg_overlay(std::string name, td::Promise promise) = 0; + static constexpr td::uint32 max_block_size() { return 4 << 20; } diff --git a/validator/full-node.hpp b/validator/full-node.hpp index ff998f32..881230c4 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -56,6 +56,10 @@ class FullNodeImpl : public FullNode { void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise promise) override; void set_config(FullNodeConfig config) override; + void add_ext_msg_overlay(std::vector nodes, std::map senders, + std::string name, td::Promise promise) override; + void del_ext_msg_overlay(std::string name, td::Promise promise) override; + void update_shard_configuration(td::Ref state, std::set shards_to_monitor, std::set temporary_shards); @@ -82,8 +86,7 @@ class FullNodeImpl : public FullNode { block::ImportedMsgQueueLimits limits, td::Timestamp timeout, td::Promise>> promise); - void got_key_block_proof(td::Ref proof); - void got_zero_block_state(td::Ref state); + void got_key_block_state(td::Ref state); void new_key_block(BlockHandle handle); void start_up() override; @@ -138,11 +141,21 @@ class FullNodeImpl : public FullNode { // TODO: Decide what to do with old private overlays. Maybe use old or new depending on some flag in config. /* std::map> private_block_overlays_; - - void update_private_block_overlays(); + bool private_block_overlays_enable_compression_ = false; + void set_private_block_overlays_enable_compression(bool value); void create_private_block_overlay(PublicKeyHash key); */ + struct ExtMsgOverlayInfo { + std::vector nodes_; + std::map senders_; + std::map> + actors_; // our local id -> actor + }; + std::map private_ext_msg_overlays_; + void update_private_overlays(); + void update_ext_msg_overlay(const std::string& name, ExtMsgOverlayInfo& overlay); + FullNodePrivateBlockOverlays private_block_overlays_; }; diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index 1257341b..1775d657 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -186,7 +186,12 @@ class Collator final : public td::actor::Actor { block::ValueFlow value_flow_{block::ValueFlow::SetZero()}; std::unique_ptr fees_import_dict_; std::map ext_msg_map; - std::vector, ExtMessage::Hash>> ext_msg_list_; + struct ExtMsg { + Ref cell; + ExtMessage::Hash hash; + int priority; + }; + std::vector ext_msg_list_; std::priority_queue, std::greater> new_msgs; std::pair last_proc_int_msg_, first_unproc_int_msg_; std::unique_ptr in_msg_dict, out_msg_dict, old_out_msg_queue_, out_msg_queue_, @@ -278,8 +283,9 @@ class Collator final : public td::actor::Actor { bool is_our_address(Ref addr_ref) const; bool is_our_address(ton::AccountIdPrefixFull addr_prefix) const; bool is_our_address(const ton::StdSmcAddress& addr) const; - void after_get_external_messages(td::Result>> res); - td::Result register_external_message_cell(Ref ext_msg, const ExtMessage::Hash& ext_hash); + void after_get_external_messages(td::Result, int>>> res); + td::Result register_external_message_cell(Ref ext_msg, const ExtMessage::Hash& ext_hash, + int priority); // td::Result register_external_message(td::Slice ext_msg_boc); void register_new_msg(block::NewOutMsg msg); void register_new_msgs(block::transaction::Transaction& trans); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index c4dd57aa..bfca165b 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -49,6 +49,7 @@ static const td::uint32 FORCE_SPLIT_QUEUE_SIZE = 4096; static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000; static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047; static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000; +static const int HIGH_PRIORITY_EXTERNAL = 10; // don't skip high priority externals when queue is big #define DBG(__n) dbg(__n)&& #define DSTART int __dcnt = 0; @@ -255,11 +256,10 @@ void Collator::start_up() { LOG(DEBUG) << "sending get_external_messages() query to Manager"; ++pending; td::actor::send_closure_later(manager, &ValidatorManager::get_external_messages, shard_, - [self = get_self()](td::Result>> res) -> void { - LOG(DEBUG) << "got answer to get_external_messages() query"; - td::actor::send_closure_later( - std::move(self), &Collator::after_get_external_messages, std::move(res)); - }); + [self = get_self()](td::Result, int>>> res) -> void { + LOG(DEBUG) << "got answer to get_external_messages() query"; + td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res)); + }); } if (is_masterchain() && !is_hardfork_) { // 5. load shard block info messages @@ -3548,12 +3548,15 @@ bool Collator::process_inbound_external_messages() { return true; } if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE) { - LOG(INFO) << "skipping processing of inbound external messages because out_msg_queue is too big (" + LOG(INFO) << "skipping processing of inbound external messages (except for high-priority) because out_msg_queue is " + "too big (" << out_msg_queue_size_ << " > " << SKIP_EXTERNALS_QUEUE_SIZE << ")"; - return true; } bool full = !block_limit_status_->fits(block::ParamLimits::cl_soft); - for (auto& ext_msg_pair : ext_msg_list_) { + for (auto& ext_msg_struct : ext_msg_list_) { + if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE && ext_msg_struct.priority < HIGH_PRIORITY_EXTERNAL) { + continue; + } if (full) { LOG(INFO) << "BLOCK FULL, stop processing external messages"; break; @@ -3562,15 +3565,15 @@ bool Collator::process_inbound_external_messages() { LOG(WARNING) << "medium timeout reached, stop processing inbound external messages"; break; } - auto ext_msg = ext_msg_pair.first; + auto ext_msg = ext_msg_struct.cell; ton::Bits256 hash{ext_msg->get_hash().bits()}; int r = process_external_message(std::move(ext_msg)); if (r < 0) { - bad_ext_msgs_.emplace_back(ext_msg_pair.second); + bad_ext_msgs_.emplace_back(ext_msg_struct.hash); return false; } if (!r) { - delay_ext_msgs_.emplace_back(ext_msg_pair.second); + delay_ext_msgs_.emplace_back(ext_msg_struct.hash); } if (r > 0) { full = !block_limit_status_->fits(block::ParamLimits::cl_soft); @@ -5274,7 +5277,8 @@ void Collator::return_block_candidate(td::Result saved) { * - If the external message has been previuosly registered and accepted, returns false. * - Otherwise returns true. */ -td::Result Collator::register_external_message_cell(Ref ext_msg, const ExtMessage::Hash& ext_hash) { +td::Result Collator::register_external_message_cell(Ref ext_msg, const ExtMessage::Hash& ext_hash, + int priority) { if (ext_msg->get_level() != 0) { return td::Status::Error("external message must have zero level"); } @@ -5318,7 +5322,7 @@ td::Result Collator::register_external_message_cell(Ref ext_msg, block::gen::t_Message_Any.print_ref(std::cerr, ext_msg); } ext_msg_map.emplace(hash, 1); - ext_msg_list_.emplace_back(std::move(ext_msg), ext_hash); + ext_msg_list_.push_back({std::move(ext_msg), ext_hash, priority}); return true; } @@ -5327,18 +5331,21 @@ td::Result Collator::register_external_message_cell(Ref ext_msg, * * @param res The result of the external message retrieval operation. */ -void Collator::after_get_external_messages(td::Result>> res) { +void Collator::after_get_external_messages(td::Result, int>>> res) { + // res: pair {ext msg, priority} --pending; if (res.is_error()) { fatal_error(res.move_as_error()); return; } auto vect = res.move_as_ok(); - for (auto&& ext_msg : vect) { + for (auto& p : vect) { + auto& ext_msg = p.first; + int priority = p.second; Ref ext_msg_cell = ext_msg->root_cell(); bool err = ext_msg_cell.is_null(); if (!err) { - auto reg_res = register_external_message_cell(std::move(ext_msg_cell), ext_msg->hash()); + auto reg_res = register_external_message_cell(std::move(ext_msg_cell), ext_msg->hash(), priority); if (reg_res.is_error() || !reg_res.move_as_ok()) { err = true; } diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index 9dc6076e..32db6e84 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -101,7 +101,8 @@ class ValidatorManager : public ValidatorManagerInterface { td::Promise> promise) = 0; virtual void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) = 0; - virtual void get_external_messages(ShardIdFull shard, td::Promise>> promise) = 0; + virtual void get_external_messages(ShardIdFull shard, + td::Promise, int>>> promise) = 0; virtual void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) = 0; virtual void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id, td::Promise>> promise) = 0; diff --git a/validator/manager-disk.cpp b/validator/manager-disk.cpp index 2d4b1ca3..76a1f122 100644 --- a/validator/manager-disk.cpp +++ b/validator/manager-disk.cpp @@ -259,7 +259,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro td::actor::send_closure(db_, &Db::get_key_block_proof, block_id, std::move(P)); } -void ValidatorManagerImpl::new_external_message(td::BufferSlice data) { +void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) { if (last_masterchain_state_.is_null()) { return; } @@ -507,9 +507,13 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t get_block_handle(block_id, true, std::move(P)); } -void ValidatorManagerImpl::get_external_messages(ShardIdFull shard, - td::Promise>> promise) { - promise.set_result(ext_messages_); +void ValidatorManagerImpl::get_external_messages( + ShardIdFull shard, td::Promise, int>>> promise) { + std::vector, int>> res; + for (const auto& x : ext_messages_) { + res.emplace_back(x, 0); + } + promise.set_result(std::move(res)); } void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise>> promise) { diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index bcfc9588..043c8ba4 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -124,7 +124,7 @@ class ValidatorManagerImpl : public ValidatorManager { void get_key_block_proof_link(BlockIdExt block_id, td::Promise promise) override; //void get_block_description(BlockIdExt block_id, td::Promise promise) override; - void new_external_message(td::BufferSlice data) override; + void new_external_message(td::BufferSlice data, int priority) override; void check_external_message(td::BufferSlice data, td::Promise> promise) override { UNREACHABLE(); } @@ -192,7 +192,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; - void get_external_messages(ShardIdFull shard, td::Promise>> promise) override; + void get_external_messages(ShardIdFull shard, + td::Promise, int>>> promise) override; void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) override; void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id, td::Promise>> promise) override; @@ -248,7 +249,7 @@ class ValidatorManagerImpl : public ValidatorManager { UNREACHABLE(); } void send_external_message(td::Ref message) override { - new_external_message(message->serialize()); + new_external_message(message->serialize(), 0); } void send_ihr_message(td::Ref message) override { new_ihr_message(message->serialize()); diff --git a/validator/manager-hardfork.cpp b/validator/manager-hardfork.cpp index 865292c3..1ba8c68d 100644 --- a/validator/manager-hardfork.cpp +++ b/validator/manager-hardfork.cpp @@ -150,7 +150,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro td::actor::send_closure(db_, &Db::get_key_block_proof_link, block_id, std::move(P)); } -void ValidatorManagerImpl::new_external_message(td::BufferSlice data) { +void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) { auto R = create_ext_message(std::move(data), block::SizeLimitsConfig::ExtMsgLimits()); if (R.is_ok()) { ext_messages_.emplace_back(R.move_as_ok()); @@ -357,9 +357,13 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t get_block_handle(block_id, true, std::move(P)); } -void ValidatorManagerImpl::get_external_messages(ShardIdFull shard, - td::Promise>> promise) { - promise.set_result(ext_messages_); +void ValidatorManagerImpl::get_external_messages( + ShardIdFull shard, td::Promise, int>>> promise) { + std::vector, int>> res; + for (const auto &x : ext_messages_) { + res.emplace_back(x, 0); + } + promise.set_result(std::move(res)); } void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise>> promise) { diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 36125fe1..408d5eea 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -144,7 +144,7 @@ class ValidatorManagerImpl : public ValidatorManager { void get_key_block_proof(BlockIdExt block_id, td::Promise promise) override; void get_key_block_proof_link(BlockIdExt block_id, td::Promise promise) override; - void new_external_message(td::BufferSlice data) override; + void new_external_message(td::BufferSlice data, int priority) override; void check_external_message(td::BufferSlice data, td::Promise> promise) override { UNREACHABLE(); } @@ -232,7 +232,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; - void get_external_messages(ShardIdFull shard, td::Promise>> promise) override; + void get_external_messages(ShardIdFull shard, + td::Promise, int>>> promise) override; void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) override; void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id, td::Promise>> promise) override; @@ -312,7 +313,7 @@ class ValidatorManagerImpl : public ValidatorManager { UNREACHABLE(); } void send_external_message(td::Ref message) override { - new_external_message(message->serialize()); + new_external_message(message->serialize(), 0); } void send_ihr_message(td::Ref message) override { new_ihr_message(message->serialize()); diff --git a/validator/manager.cpp b/validator/manager.cpp index 1a3558d4..3a2ffea0 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -370,7 +370,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro td::actor::send_closure(db_, &Db::get_key_block_proof, block_id, std::move(P)); } -void ValidatorManagerImpl::new_external_message(td::BufferSlice data) { +void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) { if (!is_collator()) { return; } @@ -378,7 +378,7 @@ void ValidatorManagerImpl::new_external_message(td::BufferSlice data) { VLOG(VALIDATOR_NOTICE) << "dropping ext message: validator is not ready"; return; } - if ((double)ext_messages_.size() > max_mempool_num()) { + if (ext_msgs_[priority].ext_messages_.size() > (size_t)max_mempool_num()) { return; } auto R = create_ext_message(std::move(data), last_masterchain_state_->get_ext_msg_limits()); @@ -386,21 +386,30 @@ void ValidatorManagerImpl::new_external_message(td::BufferSlice data) { VLOG(VALIDATOR_NOTICE) << "dropping bad ext message: " << R.move_as_error(); return; } - add_external_message(R.move_as_ok()); + add_external_message(R.move_as_ok(), priority); } -void ValidatorManagerImpl::add_external_message(td::Ref msg) { +void ValidatorManagerImpl::add_external_message(td::Ref msg, int priority) { + auto &msgs = ext_msgs_[priority]; auto message = std::make_unique>(msg); auto id = message->ext_id(); auto address = message->address(); unsigned long per_address_limit = 256; - if (ext_addr_messages_.count(address) < per_address_limit) { - if (ext_messages_hashes_.count(id.hash) == 0) { - ext_messages_.emplace(id, std::move(message)); - ext_messages_hashes_.emplace(id.hash, id); - ext_addr_messages_[address].emplace(id.hash, id); - } + auto it = msgs.ext_addr_messages_.find(address); + if (it != msgs.ext_addr_messages_.end() && it->second.size() >= per_address_limit) { + return; } + auto it2 = ext_messages_hashes_.find(id.hash); + if (it2 != ext_messages_hashes_.end()) { + int old_priority = it2->second.first; + if (old_priority >= priority) { + return; + } + ext_msgs_[old_priority].erase(id); + } + msgs.ext_messages_.emplace(id, std::move(message)); + msgs.ext_addr_messages_[address].emplace(id.hash, id); + ext_messages_hashes_[id.hash] = {priority, id}; } void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise> promise) { ++ls_stats_check_ext_messages_; @@ -894,34 +903,44 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t get_block_handle(block_id, true, std::move(P)); } -void ValidatorManagerImpl::get_external_messages(ShardIdFull shard, - td::Promise>> promise) { +void ValidatorManagerImpl::get_external_messages( + ShardIdFull shard, td::Promise, int>>> promise) { td::Timer t; size_t processed = 0, deleted = 0; - std::vector> res; + std::vector, int>> res; MessageId left{AccountIdPrefixFull{shard.workchain, shard.shard & (shard.shard - 1)}, Bits256::zero()}; - auto it = ext_messages_.lower_bound(left); - while (it != ext_messages_.end()) { - auto s = it->first; - if (!shard_contains(shard, s.dst)) { - break; + size_t total_msgs = 0; + td::Random::Fast rnd; + for (auto iter = ext_msgs_.rbegin(); iter != ext_msgs_.rend(); ++iter) { + std::vector, int>> cur_res; + int priority = iter->first; + auto &msgs = iter->second; + auto it = msgs.ext_messages_.lower_bound(left); + while (it != msgs.ext_messages_.end()) { + auto s = it->first; + if (!shard_contains(shard, s.dst)) { + break; + } + ++processed; + if (it->second->expired()) { + msgs.ext_addr_messages_[it->second->address()].erase(it->first.hash); + ext_messages_hashes_.erase(it->first.hash); + it = msgs.ext_messages_.erase(it); + ++deleted; + continue; + } + if (it->second->is_active()) { + cur_res.emplace_back(it->second->message(), priority); + } + it++; } - ++processed; - if (it->second->expired()) { - ext_addr_messages_[it->second->address()].erase(it->first.hash); - ext_messages_hashes_.erase(it->first.hash); - it = ext_messages_.erase(it); - ++deleted; - continue; - } - if (it->second->is_active()) { - res.push_back(it->second->message()); - } - it++; + td::random_shuffle(td::as_mutable_span(cur_res), rnd); + res.insert(res.end(), cur_res.begin(), cur_res.end()); + total_msgs += msgs.ext_messages_.size(); } LOG(WARNING) << "get_external_messages to shard " << shard.to_str() << " : time=" << t.elapsed() << " result_size=" << res.size() << " processed=" << processed << " expired=" << deleted - << " total_size=" << ext_messages_.size(); + << " total_size=" << total_msgs; promise.set_value(std::move(res)); } @@ -964,8 +983,9 @@ void ValidatorManagerImpl::complete_external_messages(std::vectorsecond]->address()].erase(it->first); - CHECK(ext_messages_.erase(it->second)); + int priority = it->second.first; + auto msg_id = it->second.second; + ext_msgs_[priority].erase(msg_id); ext_messages_hashes_.erase(it); } } @@ -973,12 +993,14 @@ void ValidatorManagerImpl::complete_external_messages(std::vectorsecond); - if ((ext_messages_.size() < soft_mempool_limit) && it2->second->can_postpone()) { + int priority = it->second.first; + auto msg_id = it->second.second; + auto &msgs = ext_msgs_[priority]; + auto it2 = msgs.ext_messages_.find(msg_id); + if ((msgs.ext_messages_.size() < soft_mempool_limit) && it2->second->can_postpone()) { it2->second->postpone(); } else { - ext_addr_messages_[it2->second->address()].erase(it2->first.hash); - ext_messages_.erase(it2); + msgs.erase(msg_id); ext_messages_hashes_.erase(it); } } @@ -1574,7 +1596,7 @@ void ValidatorManagerImpl::send_get_next_key_blocks_request(BlockIdExt block_id, void ValidatorManagerImpl::send_external_message(td::Ref message) { callback_->send_ext_message(message->shard(), message->serialize()); - add_external_message(std::move(message)); + add_external_message(std::move(message), 0); } void ValidatorManagerImpl::send_ihr_message(td::Ref message) { @@ -2284,6 +2306,9 @@ td::actor::ActorOwn ValidatorManagerImpl::create_validator_group if (check_gc_list_.count(session_id) == 1) { return td::actor::ActorOwn{}; } else { + // Call get_external_messages to cleanup mempool for the shard + get_external_messages(shard, [](td::Result, int>>>) {}); + auto validator_id = get_validator(shard, validator_set); CHECK(!validator_id.is_zero()); auto G = td::actor::create_actor( diff --git a/validator/manager.hpp b/validator/manager.hpp index ba0e0e3a..66914043 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -237,9 +237,20 @@ class ValidatorManagerImpl : public ValidatorManager { }; std::map shard_blocks_; std::map> cached_msg_queue_to_masterchain_; - std::map, std::unique_ptr>> ext_messages_; - std::map, std::map>> ext_addr_messages_; - std::map> ext_messages_hashes_; + + struct ExtMessages { + std::map, std::unique_ptr>> ext_messages_; + std::map, std::map>> + ext_addr_messages_; + void erase(const MessageId& id) { + auto it = ext_messages_.find(id); + CHECK(it != ext_messages_.end()); + ext_addr_messages_[it->second->address()].erase(id.hash); + ext_messages_.erase(it); + } + }; + std::map ext_msgs_; // priority -> messages + std::map>> ext_messages_hashes_; // hash -> priority // IHR ? std::map, std::unique_ptr>> ihr_messages_; std::map> ihr_messages_hashes_; @@ -366,8 +377,8 @@ class ValidatorManagerImpl : public ValidatorManager { void get_key_block_proof_link(BlockIdExt block_id, td::Promise promise) override; //void get_block_description(BlockIdExt block_id, td::Promise promise) override; - void new_external_message(td::BufferSlice data) override; - void add_external_message(td::Ref message); + void new_external_message(td::BufferSlice data, int priority) override; + void add_external_message(td::Ref message, int priority); void check_external_message(td::BufferSlice data, td::Promise> promise) override; void new_ihr_message(td::BufferSlice data) override; @@ -429,7 +440,8 @@ class ValidatorManagerImpl : public ValidatorManager { td::Promise> promise) override; void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout, td::Promise> promise) override; - void get_external_messages(ShardIdFull shard, td::Promise>> promise) override; + void get_external_messages(ShardIdFull shard, + td::Promise, int>>> promise) override; void get_ihr_messages(ShardIdFull shard, td::Promise>> promise) override; void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id, td::Promise>> promise) override; diff --git a/validator/validator.h b/validator/validator.h index 6481a96e..df96188e 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -202,7 +202,7 @@ class ValidatorManagerInterface : public td::actor::Actor { virtual void get_next_block(BlockIdExt block_id, td::Promise promise) = 0; virtual void write_handle(BlockHandle handle, td::Promise promise) = 0; - virtual void new_external_message(td::BufferSlice data) = 0; + virtual void new_external_message(td::BufferSlice data, int priority) = 0; virtual void check_external_message(td::BufferSlice data, td::Promise> promise) = 0; virtual void new_ihr_message(td::BufferSlice data) = 0; virtual void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0;