1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-13 03:32:22 +00:00

Merge branch 'testnet' into block-generation

This commit is contained in:
SpyCheese 2024-04-01 17:28:23 +03:00
commit 4916e4847a
34 changed files with 868 additions and 235 deletions

View file

@ -520,7 +520,8 @@ void CatChainReceiverImpl::start_up() {
}
td::actor::send_closure(overlay_manager_, &overlay::Overlays::create_private_overlay,
get_source(local_idx_)->get_adnl_id(), overlay_full_id_.clone(), std::move(ids),
make_callback(), overlay::OverlayPrivacyRules{0, 0, std::move(root_keys)});
make_callback(), overlay::OverlayPrivacyRules{0, 0, std::move(root_keys)},
R"({ "type": "catchain" })");
CHECK(root_block_);

View file

@ -213,7 +213,7 @@ class HardforkCreator : public td::actor::Actor {
ton::validator::ValidatorManagerHardforkFactory::create(opts, shard_, shard_top_block_id_, db_root_);
for (auto &msg : ext_msgs_) {
td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_external_message,
std::move(msg));
std::move(msg), 0);
}
for (auto &topmsg : top_shard_descrs_) {
td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_shard_block, ton::BlockIdExt{},

View file

@ -107,11 +107,12 @@ void OverlayManager::create_public_overlay_ex(adnl::AdnlNodeIdShort local_id, Ov
void OverlayManager::create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes,
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules) {
std::unique_ptr<Callback> callback, OverlayPrivacyRules rules,
std::string scope) {
auto id = overlay_id.compute_short_id();
register_overlay(local_id, id,
Overlay::create(keyring_, adnl_, actor_id(this), dht_node_, local_id, std::move(overlay_id),
std::move(nodes), std::move(callback), std::move(rules)));
std::move(nodes), std::move(callback), std::move(rules), std::move(scope)));
}
void OverlayManager::receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data) {

View file

@ -57,7 +57,7 @@ class OverlayManager : public Overlays {
OverlayOptions opts) override;
void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback,
OverlayPrivacyRules rules) override;
OverlayPrivacyRules rules, std::string scope) override;
void delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id) override;
void send_query(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id, std::string name,
td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice query) override {

View file

@ -50,10 +50,11 @@ td::actor::ActorOwn<Overlay> Overlay::create(td::actor::ActorId<keyring::Keyring
td::actor::ActorId<OverlayManager> manager,
td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::vector<adnl::AdnlNodeIdShort> nodes,
std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules) {
auto R =
td::actor::create_actor<OverlayImpl>("overlay", keyring, adnl, manager, dht_node, local_id, std::move(overlay_id),
false, std::move(nodes), std::move(callback), std::move(rules));
std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules,
std::string scope) {
auto R = td::actor::create_actor<OverlayImpl>("overlay", keyring, adnl, manager, dht_node, local_id,
std::move(overlay_id), false, std::move(nodes), std::move(callback),
std::move(rules), std::move(scope));
return td::actor::ActorOwn<Overlay>(std::move(R));
}

View file

@ -48,7 +48,8 @@ class Overlay : public td::actor::Actor {
td::actor::ActorId<OverlayManager> manager,
td::actor::ActorId<dht::Dht> dht_node, adnl::AdnlNodeIdShort local_id,
OverlayIdFull overlay_id, std::vector<adnl::AdnlNodeIdShort> nodes,
std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules);
std::unique_ptr<Overlays::Callback> callback, OverlayPrivacyRules rules,
std::string scope);
virtual void update_dht_node(td::actor::ActorId<dht::Dht> dht) = 0;

View file

@ -212,7 +212,7 @@ class Overlays : public td::actor::Actor {
td::string scope, OverlayOptions opts) = 0;
virtual void create_private_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdFull overlay_id,
std::vector<adnl::AdnlNodeIdShort> nodes, std::unique_ptr<Callback> callback,
OverlayPrivacyRules rules) = 0;
OverlayPrivacyRules rules, std::string scope) = 0;
virtual void delete_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id) = 0;
virtual void send_query(adnl::AdnlNodeIdShort dst, adnl::AdnlNodeIdShort src, OverlayIdShort overlay_id,

View file

@ -300,7 +300,7 @@ class TestNode : public td::actor::Actor {
shard_top_block_id_, db_root_);
for (auto &msg : ext_msgs_) {
td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_external_message,
std::move(msg));
std::move(msg), 0);
}
for (auto &topmsg : top_shard_descrs_) {
td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManager::new_shard_block, ton::BlockIdExt{},

View file

@ -401,6 +401,7 @@ tonNode.newShardBlockBroadcast block:tonNode.newShardBlock = tonNode.Broadcast;
tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId;
tonNode.privateBlockOverlayId zero_state_file_hash:int256 nodes:(vector int256) = tonNode.PrivateBlockOverlayId;
tonNode.privateExtMsgsOverlayId zero_state_file_hash:int256 name:string nodes:(vector int256) = tonNode.PrivateExtMsgsOverlayId;
tonNode.privateBlockOverlayIdV2 zero_state_file_hash:int256 workchain:int shard:long nodes:(vector int256) senders:(vector int256) = tonNode.PrivateBlockOverlayIdV2;
tonNode.keyBlocks blocks:(vector tonNode.blockIdExt) incomplete:Bool error:Bool = tonNode.KeyBlocks;
@ -614,6 +615,10 @@ engine.validator.config out_port:int addrs:(vector engine.Addr) adnl:(vector eng
shards_to_monitor:(vector tonNode.shardId)
gc:engine.gc = engine.validator.Config;
engine.validator.privateExtMsgOverlayNode adnl_id:int256 sender:Bool sender_priority:int = engine.validator.PrivateExtMsgOverlayNode;
engine.validator.privateExtMsgOverlay name:string nodes:(vector engine.validator.privateExtMsgOverlayNode) = engine.validator.PrivateExtMsgOverlay;
engine.validator.privateExtMsgOverlaysConfig overlays:(vector engine.validator.privateExtMsgOverlay) = engine.validator.PrivateExtMsgOverlaysConfig;
---functions---
---types---
@ -727,6 +732,10 @@ engine.validator.getPerfTimerStats name:string = engine.validator.PerfTimerStats
engine.validator.getShardOutQueueSize flags:# block_id:tonNode.blockId dest_wc:flags.0?int dest_shard:flags.0?long = engine.validator.ShardOutQueueSize;
engine.validator.setExtMessagesBroadcastDisabled disabled:Bool = engine.validator.Success;
engine.validator.addPrivateExtMsgOverlay overlay:engine.validator.privateExtMsgOverlay = engine.validator.Success;
engine.validator.delPrivateExtMsgOverlay name:string = engine.validator.Success;
engine.validator.showPrivateExtMsgOverlays = engine.validator.PrivateExtMsgOverlaysConfig;
engine.validator.getValidatorSessionsInfo = engine.validator.ValidatorSessionsInfo;
engine.validator.addCollator adnl_id:int256 shard:tonNode.shardId = engine.validator.Success;

Binary file not shown.

View file

@ -33,6 +33,7 @@
#include "td/utils/filesystem.h"
#include "overlay/overlays.h"
#include "ton/ton-tl.hpp"
#include "td/utils/JsonBuilder.h"
#include "auto/tl/ton_api_json.h"
#include "tl/tl_json.h"
@ -1120,6 +1121,76 @@ td::Status SetExtMessagesBroadcastDisabledQuery::receive(td::BufferSlice data) {
return td::Status::OK();
}
td::Status AddPrivateExtMsgOverlayQuery::run() {
TRY_RESULT_ASSIGN(file_name_, tokenizer_.get_token<std::string>());
TRY_STATUS(tokenizer_.check_endl());
return td::Status::OK();
}
td::Status AddPrivateExtMsgOverlayQuery::send() {
TRY_RESULT(data, td::read_file(file_name_));
TRY_RESULT(json, td::json_decode(data.as_slice()));
auto overlay = ton::create_tl_object<ton::ton_api::engine_validator_privateExtMsgOverlay>();
TRY_STATUS(ton::ton_api::from_json(*overlay, json.get_object()));
auto b = ton::create_serialize_tl_object<ton::ton_api::engine_validator_addPrivateExtMsgOverlay>(std::move(overlay));
td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise());
return td::Status::OK();
}
td::Status AddPrivateExtMsgOverlayQuery::receive(td::BufferSlice data) {
TRY_RESULT_PREFIX(f, ton::fetch_tl_object<ton::ton_api::engine_validator_success>(data.as_slice(), true),
"received incorrect answer: ");
td::TerminalIO::out() << "success\n";
return td::Status::OK();
}
td::Status DelPrivateExtMsgOverlayQuery::run() {
TRY_RESULT_ASSIGN(name_, tokenizer_.get_token<std::string>());
TRY_STATUS(tokenizer_.check_endl());
return td::Status::OK();
}
td::Status DelPrivateExtMsgOverlayQuery::send() {
auto b = ton::create_serialize_tl_object<ton::ton_api::engine_validator_delPrivateExtMsgOverlay>(name_);
td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise());
return td::Status::OK();
}
td::Status DelPrivateExtMsgOverlayQuery::receive(td::BufferSlice data) {
TRY_RESULT_PREFIX(f, ton::fetch_tl_object<ton::ton_api::engine_validator_success>(data.as_slice(), true),
"received incorrect answer: ");
td::TerminalIO::out() << "success\n";
return td::Status::OK();
}
td::Status ShowPrivateExtMsgOverlaysQuery::run() {
TRY_STATUS(tokenizer_.check_endl());
return td::Status::OK();
}
td::Status ShowPrivateExtMsgOverlaysQuery::send() {
auto b = ton::create_serialize_tl_object<ton::ton_api::engine_validator_showPrivateExtMsgOverlays>();
td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise());
return td::Status::OK();
}
td::Status ShowPrivateExtMsgOverlaysQuery::receive(td::BufferSlice data) {
TRY_RESULT_PREFIX(
f, ton::fetch_tl_object<ton::ton_api::engine_validator_privateExtMsgOverlaysConfig>(data.as_slice(), true),
"received incorrect answer: ");
td::TerminalIO::out() << f->overlays_.size() << " private overlays:\n\n";
for (const auto &overlay : f->overlays_) {
td::TerminalIO::out() << "Overlay \"" << overlay->name_ << "\": " << overlay->nodes_.size() << " nodes\n";
for (const auto &node : overlay->nodes_) {
td::TerminalIO::out() << " " << node->adnl_id_
<< (node->sender_ ? (PSTRING() << " (sender, p=" << node->sender_priority_ << ")") : "")
<< "\n";
}
td::TerminalIO::out() << "\n";
}
return td::Status::OK();
}
td::Status GetValidatorSessionsInfoQuery::run() {
TRY_STATUS(tokenizer_.check_endl());
return td::Status::OK();

View file

@ -1145,6 +1145,70 @@ class SetExtMessagesBroadcastDisabledQuery : public Query {
bool value;
};
class AddPrivateExtMsgOverlayQuery : public Query {
public:
AddPrivateExtMsgOverlayQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer)
: Query(console, std::move(tokenizer)) {
}
td::Status run() override;
td::Status send() override;
td::Status receive(td::BufferSlice data) override;
static std::string get_name() {
return "addprivateextmsgoverlay";
}
static std::string get_help() {
return "addprivateextmsgoverlay <filename>\tadd private overlay for external messages with config from file "
"<filename>";
}
std::string name() const override {
return get_name();
}
private:
std::string file_name_;
};
class DelPrivateExtMsgOverlayQuery : public Query {
public:
DelPrivateExtMsgOverlayQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer)
: Query(console, std::move(tokenizer)) {
}
td::Status run() override;
td::Status send() override;
td::Status receive(td::BufferSlice data) override;
static std::string get_name() {
return "delprivateextmsgoverlay";
}
static std::string get_help() {
return "delprivateextmsgoverlay <name>\tdelete private overlay for external messages with name <name>";
}
std::string name() const override {
return get_name();
}
private:
std::string name_;
};
class ShowPrivateExtMsgOverlaysQuery : public Query {
public:
ShowPrivateExtMsgOverlaysQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer)
: Query(console, std::move(tokenizer)) {
}
td::Status run() override;
td::Status send() override;
td::Status receive(td::BufferSlice data) override;
static std::string get_name() {
return "showprivateextmsgoverlays";
}
static std::string get_help() {
return "showprivateextmsgoverlays\tshow all private overlay for external messages";
}
std::string name() const override {
return get_name();
}
};
class GetValidatorSessionsInfoQuery : public Query {
public:
GetValidatorSessionsInfoQuery(td::actor::ActorId<ValidatorEngineConsole> console, Tokenizer tokenizer)

View file

@ -143,6 +143,9 @@ void ValidatorEngineConsole::run() {
add_query_runner(std::make_unique<QueryRunnerImpl<GetPerfTimerStatsJsonQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<GetShardOutQueueSizeQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<SetExtMessagesBroadcastDisabledQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<AddPrivateExtMsgOverlayQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<DelPrivateExtMsgOverlayQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<ShowPrivateExtMsgOverlaysQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<GetValidatorSessionsInfoQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<AddCollatorQuery>>());
add_query_runner(std::make_unique<QueryRunnerImpl<AddShardQuery>>());

View file

@ -1944,6 +1944,7 @@ void ValidatorEngine::start_full_node() {
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_collator_adnl_id,
ton::adnl::AdnlNodeIdShort(c.adnl_id));
}
load_private_ext_msg_overlays_config();
} else {
started_full_node();
}
@ -2444,6 +2445,66 @@ void ValidatorEngine::try_del_proxy(td::uint32 ip, td::int32 port, std::vector<A
write_config(std::move(promise));
}
void ValidatorEngine::load_private_ext_msg_overlays_config() {
private_ext_msg_overlays_config_ =
ton::create_tl_object<ton::ton_api::engine_validator_privateExtMsgOverlaysConfig>();
auto data_R = td::read_file(private_ext_msg_overlays_config_file());
if (data_R.is_error()) {
return;
}
auto data = data_R.move_as_ok();
auto json_R = td::json_decode(data.as_slice());
if (json_R.is_error()) {
LOG(ERROR) << "Failed to parse private ext msg overlays config: " << json_R.move_as_error();
return;
}
auto json = json_R.move_as_ok();
auto S = ton::ton_api::from_json(*private_ext_msg_overlays_config_, json.get_object());
if (S.is_error()) {
LOG(ERROR) << "Failed to parse private ext msg overlays config: " << S;
return;
}
for (auto &overlay : private_ext_msg_overlays_config_->overlays_) {
std::vector<ton::adnl::AdnlNodeIdShort> nodes;
std::map<ton::adnl::AdnlNodeIdShort, int> senders;
for (const auto &node : overlay->nodes_) {
nodes.emplace_back(node->adnl_id_);
if (node->sender_) {
senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->sender_priority_;
}
}
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes),
std::move(senders), overlay->name_, [](td::Result<td::Unit> R) { R.ensure(); });
}
}
td::Status ValidatorEngine::write_private_ext_msg_overlays_config() {
auto s = td::json_encode<std::string>(td::ToJson(*private_ext_msg_overlays_config_), true);
TRY_STATUS_PREFIX(td::write_file(private_ext_msg_overlays_config_file(), s), "failed to write config: ");
return td::Status::OK();
}
void ValidatorEngine::add_private_ext_msg_overlay_to_config(
ton::tl_object_ptr<ton::ton_api::engine_validator_privateExtMsgOverlay> overlay, td::Promise<td::Unit> promise) {
private_ext_msg_overlays_config_->overlays_.push_back(std::move(overlay));
TRY_STATUS_PROMISE(promise, write_private_ext_msg_overlays_config());
promise.set_result(td::Unit());
}
void ValidatorEngine::del_private_ext_msg_overlay_from_config(std::string name, td::Promise<td::Unit> promise) {
auto &overlays = private_ext_msg_overlays_config_->overlays_;
for (size_t i = 0; i < overlays.size(); ++i) {
if (overlays[i]->name_ == name) {
overlays.erase(overlays.begin() + i);
TRY_STATUS_PROMISE(promise, write_private_ext_msg_overlays_config());
promise.set_result(td::Unit());
return;
}
}
promise.set_error(td::Status::Error(PSTRING() << "no overlay \"" << name << "\" in config"));
}
void ValidatorEngine::check_key(ton::PublicKeyHash id, td::Promise<td::Unit> promise) {
if (keys_.count(id) == 1) {
promise.set_value(td::Unit());
@ -3571,7 +3632,8 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getShardO
});
}
void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm,
void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query,
td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm,
td::Promise<td::BufferSlice> promise) {
if (!(perm & ValidatorEnginePermissions::vep_modify)) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized")));
@ -3597,6 +3659,95 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setExtMes
});
}
void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_addPrivateExtMsgOverlay &query,
td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm,
td::Promise<td::BufferSlice> promise) {
if (!(perm & ValidatorEnginePermissions::vep_modify)) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized")));
return;
}
if (!started_ || full_node_.empty()) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started")));
return;
}
auto &overlay = query.overlay_;
std::vector<ton::adnl::AdnlNodeIdShort> nodes;
std::map<ton::adnl::AdnlNodeIdShort, int> senders;
for (const auto &node : overlay->nodes_) {
nodes.emplace_back(node->adnl_id_);
if (node->sender_) {
senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->sender_priority_;
}
}
std::string name = overlay->name_;
td::actor::send_closure(
full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes), std::move(senders),
std::move(name),
[SelfId = actor_id(this), overlay = std::move(overlay),
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
return;
}
td::actor::send_closure(
SelfId, &ValidatorEngine::add_private_ext_msg_overlay_to_config, std::move(overlay),
[promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
return;
}
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::engine_validator_success>());
});
});
}
void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_delPrivateExtMsgOverlay &query,
td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm,
td::Promise<td::BufferSlice> promise) {
if (!(perm & ValidatorEnginePermissions::vep_modify)) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized")));
return;
}
if (!started_ || full_node_.empty()) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started")));
return;
}
td::actor::send_closure(
full_node_, &ton::validator::fullnode::FullNode::del_ext_msg_overlay, query.name_,
[SelfId = actor_id(this), name = query.name_, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
return;
}
td::actor::send_closure(
SelfId, &ValidatorEngine::del_private_ext_msg_overlay_from_config, std::move(name),
[promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
return;
}
promise.set_value(ton::create_serialize_tl_object<ton::ton_api::engine_validator_success>());
});
});
}
void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_showPrivateExtMsgOverlays &query,
td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm,
td::Promise<td::BufferSlice> promise) {
if (!(perm & ValidatorEnginePermissions::vep_default)) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized")));
return;
}
if (!started_ || full_node_.empty()) {
promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started")));
return;
}
promise.set_value(ton::serialize_tl_object<ton::ton_api::engine_validator_privateExtMsgOverlaysConfig>(
private_ext_msg_overlays_config_, true));
}
void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_getValidatorSessionsInfo &query,
td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm,
td::Promise<td::BufferSlice> promise) {

View file

@ -181,6 +181,7 @@ class ValidatorEngine : public td::actor::Actor {
std::shared_ptr<ton::dht::DhtGlobalConfig> dht_config_;
td::Ref<ton::validator::ValidatorManagerOptions> validator_options_;
Config config_;
ton::tl_object_ptr<ton::ton_api::engine_validator_privateExtMsgOverlaysConfig> private_ext_msg_overlays_config_;
std::set<ton::PublicKeyHash> running_gc_;
@ -389,6 +390,16 @@ class ValidatorEngine : public td::actor::Actor {
void try_del_proxy(td::uint32 ip, td::int32 port, std::vector<AdnlCategory> cats, std::vector<AdnlCategory> prio_cats,
td::Promise<td::Unit> promise);
std::string private_ext_msg_overlays_config_file() const {
return db_root_ + "/private-ext-msg-overlays.json";
}
void load_private_ext_msg_overlays_config();
td::Status write_private_ext_msg_overlays_config();
void add_private_ext_msg_overlay_to_config(
ton::tl_object_ptr<ton::ton_api::engine_validator_privateExtMsgOverlay> overlay, td::Promise<td::Unit> promise);
void del_private_ext_msg_overlay_from_config(std::string name, td::Promise<td::Unit> promise);
void check_key(ton::PublicKeyHash id, td::Promise<td::Unit> promise);
static td::BufferSlice create_control_query_error(td::Status error);
@ -477,6 +488,12 @@ class ValidatorEngine : public td::actor::Actor {
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_setExtMessagesBroadcastDisabled &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_addPrivateExtMsgOverlay &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_delPrivateExtMsgOverlay &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
void run_control_query(ton::ton_api::engine_validator_showPrivateExtMsgOverlays &query, td::BufferSlice data,
ton::PublicKeyHash src, td::uint32 perm, td::Promise<td::BufferSlice> promise);
template <class T>
void run_control_query(T &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm,
td::Promise<td::BufferSlice> promise) {

View file

@ -903,7 +903,7 @@ ValidatorSessionImpl::ValidatorSessionImpl(catchain::CatChainSessionId session_i
, rldp_(rldp)
, overlay_manager_(overlays)
, allow_unsafe_self_blocks_resync_(allow_unsafe_self_blocks_resync) {
compress_block_candidates_ = opts.proto_version >= 3;
compress_block_candidates_ = opts.proto_version >= 4;
description_ = ValidatorSessionDescription::create(std::move(opts), nodes, local_id);
src_round_candidate_.resize(description_->get_total_nodes());
}

View file

@ -670,7 +670,9 @@ void ArchiveSlice::do_close() {
LOG(DEBUG) << "Closing archive slice " << db_path_;
status_ = st_closed;
kv_ = {};
statistics_.pack_statistics->record_close(packages_.size());
if (statistics_.pack_statistics) {
statistics_.pack_statistics->record_close(packages_.size());
}
packages_.clear();
}
@ -774,7 +776,9 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
LOG(FATAL) << "failed to open/create archive '" << path << "': " << R.move_as_error();
return;
}
statistics_.pack_statistics->record_open();
if (statistics_.pack_statistics) {
statistics_.pack_statistics->record_open();
}
auto idx = td::narrow_cast<td::uint32>(packages_.size());
if (finalized_) {
packages_.emplace_back(nullptr, td::actor::ActorOwn<PackageWriter>(), seqno, path, idx, version);
@ -817,7 +821,9 @@ void ArchiveSlice::destroy(td::Promise<td::Unit> promise) {
for (auto &p : packages_) {
td::unlink(p.path).ensure();
}
statistics_.pack_statistics->record_close(packages_.size());
if (statistics_.pack_statistics) {
statistics_.pack_statistics->record_close(packages_.size());
}
packages_.clear();
kv_ = nullptr;
@ -1023,7 +1029,9 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) {
td::unlink(packages_[idx].path).ensure();
}
statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1);
if (statistics_.pack_statistics) {
statistics_.pack_statistics->record_close(packages_.size() - pack->idx - 1);
}
packages_.erase(packages_.begin() + pack->idx + 1, packages_.end());
kv_->commit_transaction().ensure();

View file

@ -14,7 +14,6 @@
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "full-node-private-overlay-v2.hpp"
#include "ton/ton-tl.hpp"
@ -40,6 +39,8 @@ void FullNodePrivateOverlayV2::process_block_broadcast(PublicKeyHash src, ton_ap
LOG(DEBUG) << "dropped broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast in private overlay from " << src << ": "
<< B.ok().block_id.to_str();
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
@ -49,14 +50,14 @@ void FullNodePrivateOverlayV2::process_block_broadcast(PublicKeyHash src, ton_ap
}
}
});
LOG(FULL_NODE_DEBUG) << "Got block broadcast in private overlay: " << B.ok().block_id.to_str();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, B.move_as_ok(),
std::move(P));
}
void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash, ton_api::tonNode_newShardBlockBroadcast &query) {
void FullNodePrivateOverlayV2::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
BlockIdExt block_id = create_block_id(query.block_->block_);
LOG(FULL_NODE_DEBUG) << "Got block description broadcast in private overlay: " << block_id.to_str();
VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in private overlay from " << src << ": "
<< block_id.to_str();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, block_id,
query.block_->cc_seqno_, std::move(query.block_->data_));
}
@ -75,6 +76,7 @@ void FullNodePrivateOverlayV2::send_shard_block_info(BlockIdExt block_id, Catcha
if (!inited_) {
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast in private overlay: " << block_id.to_str();
auto B = create_serialize_tl_object<ton_api::tonNode_newShardBlockBroadcast>(
create_tl_object<ton_api::tonNode_newShardBlock>(create_tl_block_id(block_id), cc_seqno, std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
@ -90,6 +92,8 @@ void FullNodePrivateOverlayV2::send_broadcast(BlockBroadcast broadcast) {
if (!inited_) {
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending block broadcast in private overlay (with compression): "
<< broadcast.block_id.to_str();
auto B = serialize_block_broadcast(broadcast, true); // compression_enabled = true
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error();
@ -164,8 +168,10 @@ void FullNodePrivateOverlayV2::init() {
authorized_keys[sender.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size();
}
overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), 0, std::move(authorized_keys)};
std::string scope = PSTRING() << R"({ "type": "private-blocks-v2", "shard_id": )" << shard_.shard
<< ", \"workchain_id\": " << shard_.workchain << " }";
td::actor::send_closure(overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(),
nodes_, std::make_unique<Callback>(actor_id(this)), rules);
nodes_, std::make_unique<Callback>(actor_id(this)), rules, std::move(scope));
td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_);
td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_);

View file

@ -19,26 +19,25 @@
#include "common/delay.h"
#include "full-node-serializer.hpp"
namespace ton {
namespace ton::validator::fullnode {
namespace validator {
namespace fullnode {
void FullNodePrivateOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
process_block_broadcast(src, query);
}
void FullNodePrivateOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query) {
void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src,
ton_api::tonNode_blockBroadcastCompressed &query) {
process_block_broadcast(src, query);
}
void FullNodePrivateOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
void FullNodePrivateBlockOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
auto B = deserialize_block_broadcast(query, overlay::Overlays::max_fec_broadcast_size());
if (B.is_error()) {
LOG(DEBUG) << "dropped broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast in private overlay from " << src << ": "
<< B.ok().block_id.to_str();
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
@ -52,25 +51,28 @@ void FullNodePrivateOverlay::process_block_broadcast(PublicKeyHash src, ton_api:
std::move(P));
}
void FullNodePrivateOverlay::process_broadcast(PublicKeyHash, ton_api::tonNode_newShardBlockBroadcast &query) {
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block,
create_block_id(query.block_->block_), query.block_->cc_seqno_,
std::move(query.block_->data_));
void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
BlockIdExt block_id = create_block_id(query.block_->block_);
VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in private overlay from " << src << ": "
<< block_id.to_str();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, block_id,
query.block_->cc_seqno_, std::move(query.block_->data_));
}
void FullNodePrivateOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
return;
}
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
}
void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) {
void FullNodePrivateBlockOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::BufferSlice data) {
if (!inited_) {
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast in private overlay: " << block_id.to_str();
auto B = create_serialize_tl_object<ton_api::tonNode_newShardBlockBroadcast>(
create_tl_object<ton_api::tonNode_newShardBlock>(create_tl_block_id(block_id), cc_seqno, std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
@ -82,11 +84,13 @@ void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, Catchain
}
}
void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) {
void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) {
if (!inited_) {
return;
}
auto B = serialize_block_broadcast(broadcast, false); // compression_enabled = false
VLOG(FULL_NODE_DEBUG) << "Sending block broadcast in private overlay"
<< (enable_compression_ ? " (with compression)" : "") << ": " << broadcast.block_id.to_str();
auto B = serialize_block_broadcast(broadcast, enable_compression_);
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error();
return;
@ -95,7 +99,7 @@ void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) {
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodePrivateOverlay::start_up() {
void FullNodePrivateBlockOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
@ -112,22 +116,22 @@ void FullNodePrivateOverlay::start_up() {
try_init();
}
void FullNodePrivateOverlay::try_init() {
void FullNodePrivateBlockOverlay::try_init() {
// Sometimes adnl id is added to validator engine later (or not at all)
td::actor::send_closure(
adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result<bool> R) {
if (R.is_ok() && R.ok()) {
td::actor::send_closure(SelfId, &FullNodePrivateOverlay::init);
td::actor::send_closure(SelfId, &FullNodePrivateBlockOverlay::init);
} else {
delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateOverlay::try_init); },
delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateBlockOverlay::try_init); },
td::Timestamp::in(30.0));
}
});
}
void FullNodePrivateOverlay::init() {
LOG(FULL_NODE_INFO) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size()
<< " nodes";
void FullNodePrivateBlockOverlay::init() {
LOG(FULL_NODE_WARNING) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size()
<< " nodes, overlay_id=" << overlay_id_;
class Callback : public overlay::Overlays::Callback {
public:
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
@ -136,37 +140,140 @@ void FullNodePrivateOverlay::init() {
td::Promise<td::BufferSlice> promise) override {
}
void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
td::actor::send_closure(node_, &FullNodePrivateOverlay::receive_broadcast, src, std::move(data));
td::actor::send_closure(node_, &FullNodePrivateBlockOverlay::receive_broadcast, src, std::move(data));
}
void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::Unit> promise) override {
}
Callback(td::actor::ActorId<FullNodePrivateOverlay> node) : node_(node) {
Callback(td::actor::ActorId<FullNodePrivateBlockOverlay> node) : node_(node) {
}
private:
td::actor::ActorId<FullNodePrivateOverlay> node_;
td::actor::ActorId<FullNodePrivateBlockOverlay> node_;
};
overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(),
overlay::CertificateFlags::AllowFec | overlay::CertificateFlags::Trusted,
{}};
td::actor::send_closure(overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(),
nodes_, std::make_unique<Callback>(actor_id(this)), rules);
nodes_, std::make_unique<Callback>(actor_id(this)), rules, R"({ "type": "private-blocks" })");
td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_);
td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_);
inited_ = true;
}
void FullNodePrivateOverlay::tear_down() {
void FullNodePrivateBlockOverlay::tear_down() {
if (inited_) {
td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_);
}
}
} // namespace fullnode
void FullNodePrivateExtMsgOverlay::process_broadcast(PublicKeyHash src,
ton_api::tonNode_externalMessageBroadcast &query) {
auto it = senders_.find(adnl::AdnlNodeIdShort{src});
if (it == senders_.end()) {
return;
}
LOG(FULL_NODE_DEBUG) << "Got external message in private overlay \"" << name_ << "\" from " << src
<< " (priority=" << it->second << ")";
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message,
std::move(query.message_->data_), it->second);
}
} // namespace validator
void FullNodePrivateExtMsgOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
return;
}
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
}
} // namespace ton
void FullNodePrivateExtMsgOverlay::send_external_message(td::BufferSlice data) {
if (config_.ext_messages_broadcast_disabled_) {
return;
}
LOG(FULL_NODE_DEBUG) << "Sending external message to private overlay \"" << name_ << "\"";
auto B = create_serialize_tl_object<ton_api::tonNode_externalMessageBroadcast>(
create_tl_object<ton_api::tonNode_externalMessage>(std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(B));
} else {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(B));
}
}
void FullNodePrivateExtMsgOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
std::vector<td::Bits256> nodes;
for (const adnl::AdnlNodeIdShort &id : nodes_) {
nodes.push_back(id.bits256_value());
}
auto X =
create_hash_tl_object<ton_api::tonNode_privateExtMsgsOverlayId>(zero_state_file_hash_, name_, std::move(nodes));
td::BufferSlice b{32};
b.as_slice().copy_from(as_slice(X));
overlay_id_full_ = overlay::OverlayIdFull{std::move(b)};
overlay_id_ = overlay_id_full_.compute_short_id();
try_init();
}
void FullNodePrivateExtMsgOverlay::try_init() {
// Sometimes adnl id is added to validator engine later (or not at all)
td::actor::send_closure(
adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result<bool> R) {
if (R.is_ok() && R.ok()) {
td::actor::send_closure(SelfId, &FullNodePrivateExtMsgOverlay::init);
} else {
delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateExtMsgOverlay::try_init); },
td::Timestamp::in(30.0));
}
});
}
void FullNodePrivateExtMsgOverlay::init() {
LOG(FULL_NODE_WARNING) << "Creating private ext msg overlay \"" << name_ << "\" for adnl id " << local_id_ << " : "
<< nodes_.size() << " nodes, overlay_id=" << overlay_id_;
class Callback : public overlay::Overlays::Callback {
public:
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
}
void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) override {
}
void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
td::actor::send_closure(node_, &FullNodePrivateExtMsgOverlay::receive_broadcast, src, std::move(data));
}
void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::Unit> promise) override {
}
Callback(td::actor::ActorId<FullNodePrivateExtMsgOverlay> node) : node_(node) {
}
private:
td::actor::ActorId<FullNodePrivateExtMsgOverlay> node_;
};
std::map<PublicKeyHash, td::uint32> authorized_keys;
for (const auto &sender : senders_) {
authorized_keys[sender.first.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size();
}
overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), 0, std::move(authorized_keys)};
td::actor::send_closure(
overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), nodes_,
std::make_unique<Callback>(actor_id(this)), rules,
PSTRING() << R"({ "type": "private-ext-msg", "name": ")" << td::format::Escaped{name_} << R"(" })");
td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_);
td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_);
}
void FullNodePrivateExtMsgOverlay::tear_down() {
LOG(FULL_NODE_WARNING) << "Destroying private ext msg overlay \"" << name_ << "\" for adnl id " << local_id_;
td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_);
}
} // namespace ton::validator::fullnode

View file

@ -18,13 +18,9 @@
#include "full-node.h"
namespace ton {
namespace ton::validator::fullnode {
namespace validator {
namespace fullnode {
class FullNodePrivateOverlay : public td::actor::Actor {
class FullNodePrivateBlockOverlay : public td::actor::Actor {
public:
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query);
@ -40,19 +36,89 @@ class FullNodePrivateOverlay : public td::actor::Actor {
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);
void set_config(FullNodeConfig config) {
config_ = std::move(config);
}
void set_enable_compression(bool value) {
enable_compression_ = value;
}
void start_up() override;
void tear_down() override;
FullNodePrivateOverlay(adnl::AdnlNodeIdShort local_id, std::vector<adnl::AdnlNodeIdShort> nodes,
FileHash zero_state_file_hash, FullNodeConfig config,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
FullNodePrivateBlockOverlay(adnl::AdnlNodeIdShort local_id, std::vector<adnl::AdnlNodeIdShort> nodes,
FileHash zero_state_file_hash, FullNodeConfig config, bool enable_compression,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
: local_id_(local_id)
, nodes_(std::move(nodes))
, zero_state_file_hash_(zero_state_file_hash)
, config_(config)
, enable_compression_(enable_compression)
, keyring_(keyring)
, adnl_(adnl)
, rldp_(rldp)
, rldp2_(rldp2)
, overlays_(overlays)
, validator_manager_(validator_manager) {
}
private:
adnl::AdnlNodeIdShort local_id_;
std::vector<adnl::AdnlNodeIdShort> nodes_;
FileHash zero_state_file_hash_;
FullNodeConfig config_;
bool enable_compression_;
td::actor::ActorId<keyring::Keyring> keyring_;
td::actor::ActorId<adnl::Adnl> adnl_;
td::actor::ActorId<rldp::Rldp> rldp_;
td::actor::ActorId<rldp2::Rldp> rldp2_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
bool inited_ = false;
overlay::OverlayIdFull overlay_id_full_;
overlay::OverlayIdShort overlay_id_;
void try_init();
void init();
};
class FullNodePrivateExtMsgOverlay : public td::actor::Actor {
public:
void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query);
template <class T>
void process_broadcast(PublicKeyHash, T &) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
}
void receive_broadcast(PublicKeyHash src, td::BufferSlice query);
void send_external_message(td::BufferSlice data);
void set_config(FullNodeConfig config) {
config_ = std::move(config);
}
void start_up() override;
void tear_down() override;
FullNodePrivateExtMsgOverlay(adnl::AdnlNodeIdShort local_id, std::vector<adnl::AdnlNodeIdShort> nodes,
std::map<adnl::AdnlNodeIdShort, int> senders, std::string name,
FileHash zero_state_file_hash, FullNodeConfig config,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
: local_id_(local_id)
, nodes_(std::move(nodes))
, senders_(std::move(senders))
, name_(std::move(name))
, zero_state_file_hash_(zero_state_file_hash)
, config_(config)
, keyring_(keyring)
, adnl_(adnl)
, rldp_(rldp)
@ -64,6 +130,8 @@ class FullNodePrivateOverlay : public td::actor::Actor {
private:
adnl::AdnlNodeIdShort local_id_;
std::vector<adnl::AdnlNodeIdShort> nodes_;
std::map<adnl::AdnlNodeIdShort, int> senders_;
std::string name_;
FileHash zero_state_file_hash_;
FullNodeConfig config_;
@ -82,8 +150,4 @@ class FullNodePrivateOverlay : public td::actor::Actor {
void init();
};
} // namespace fullnode
} // namespace validator
} // namespace ton
} // namespace ton::validator::fullnode

View file

@ -134,7 +134,7 @@ void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broad
promise.set_error(td::Status::Error("rebroadcasting external messages is disabled"));
promise = [manager = validator_manager_, message = q->message_->data_.clone()](td::Result<td::Unit> R) mutable {
if (R.is_ok()) {
td::actor::send_closure(manager, &ValidatorManagerInterface::new_external_message, std::move(message));
td::actor::send_closure(manager, &ValidatorManagerInterface::new_external_message, std::move(message), 0);
}
};
}
@ -729,14 +729,14 @@ void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ih
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query) {
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message,
std::move(query.message_->data_));
std::move(query.message_->data_), 0);
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
auto block_id = create_block_id(query.block_->block_);
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block,
block_id, query.block_->cc_seqno_,
std::move(query.block_->data_));
BlockIdExt block_id = create_block_id(query.block_->block_);
VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast from " << src << ": " << block_id.to_str();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, block_id,
query.block_->cc_seqno_, std::move(query.block_->data_));
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
@ -758,6 +758,7 @@ void FullNodeShardImpl::process_block_broadcast(PublicKeyHash src, ton_api::tonN
// << " block=" << block_id.to_str();
// return;
//}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast from " << src << ": " << B.ok().block_id.to_str();
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
@ -836,6 +837,7 @@ void FullNodeShardImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno
UNREACHABLE();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast: " << block_id.to_str();
auto B = create_serialize_tl_object<ton_api::tonNode_newShardBlockBroadcast>(
create_tl_object<ton_api::tonNode_newShardBlock>(create_tl_block_id(block_id), cc_seqno, std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
@ -852,6 +854,7 @@ void FullNodeShardImpl::send_broadcast(BlockBroadcast broadcast) {
UNREACHABLE();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending block broadcast in private overlay: " << broadcast.block_id.to_str();
auto B = serialize_block_broadcast(broadcast, false); // compression_enabled = false
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error();

View file

@ -37,6 +37,10 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
}
local_keys_.insert(key);
// create_private_block_overlay(key);
for (auto &p : private_ext_msg_overlays_) {
update_ext_msg_overlay(p.first, p.second);
}
if (!sign_cert_by_.is_zero()) {
promise.set_value(td::Unit());
@ -54,7 +58,6 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
td::actor::send_closure(shard.second.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
}
}
// create_private_block_overlay(key);
promise.set_value(td::Unit());
}
@ -64,6 +67,11 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
return;
}
local_keys_.erase(key);
// private_block_overlays_.erase(key);
for (auto &p : private_ext_msg_overlays_) {
update_ext_msg_overlay(p.first, p.second);
}
if (sign_cert_by_ != key) {
promise.set_value(td::Unit());
return;
@ -81,7 +89,6 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
td::actor::send_closure(shard.second.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
}
}
// private_block_overlays_.erase(key);
promise.set_value(td::Unit());
}
@ -131,6 +138,10 @@ void FullNodeImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td:
}
}
local_id_ = adnl_id_.pubkey_hash();
for (auto &p : private_ext_msg_overlays_) {
update_ext_msg_overlay(p.first, p.second);
}
}
void FullNodeImpl::set_config(FullNodeConfig config) {
@ -140,6 +151,44 @@ void FullNodeImpl::set_config(FullNodeConfig config) {
td::actor::send_closure(s.second.actor, &FullNodeShard::set_config, config);
}
}
/*for (auto& overlay : private_block_overlays_) {
td::actor::send_closure(overlay.second, &FullNodePrivateBlockOverlay::set_config, config);
}*/
for (auto& overlay : private_ext_msg_overlays_) {
for (auto &actor : overlay.second.actors_) {
td::actor::send_closure(actor.second, &FullNodePrivateExtMsgOverlay::set_config, config);
}
}
}
void FullNodeImpl::add_ext_msg_overlay(std::vector<adnl::AdnlNodeIdShort> nodes,
std::map<adnl::AdnlNodeIdShort, int> senders, std::string name,
td::Promise<td::Unit> promise) {
if (nodes.empty()) {
promise.set_error(td::Status::Error("list of nodes is empty"));
return;
}
if (private_ext_msg_overlays_.count(name)) {
promise.set_error(td::Status::Error(PSTRING() << "duplicate overlay name \"" << name << "\""));
return;
}
VLOG(FULL_NODE_WARNING) << "Adding private overlay for external messages \"" << name << "\", " << nodes.size()
<< " nodes";
auto &p = private_ext_msg_overlays_[name];
p.nodes_ = nodes;
p.senders_ = senders;
update_ext_msg_overlay(name, p);
promise.set_result(td::Unit());
}
void FullNodeImpl::del_ext_msg_overlay(std::string name, td::Promise<td::Unit> promise) {
auto it = private_ext_msg_overlays_.find(name);
if (it == private_ext_msg_overlays_.end()) {
promise.set_error(td::Status::Error(PSTRING() << "no such overlay \"" << name << "\""));
return;
}
private_ext_msg_overlays_.erase(it);
promise.set_result(td::Unit());
}
void FullNodeImpl::initial_read_complete(BlockHandle top_handle) {
@ -277,6 +326,14 @@ void FullNodeImpl::send_ext_message(AccountIdPrefixFull dst, td::BufferSlice dat
VLOG(FULL_NODE_WARNING) << "dropping OUT ext message to unknown shard";
return;
}
for (auto &private_overlay : private_ext_msg_overlays_) {
for (auto &actor : private_overlay.second.actors_) {
auto local_id = actor.first;
if (private_overlay.second.senders_.count(local_id)) {
td::actor::send_closure(actor.second, &FullNodePrivateExtMsgOverlay::send_external_message, data.clone());
}
}
}
td::actor::send_closure(shard, &FullNodeShard::send_external_message, std::move(data));
}
@ -286,6 +343,10 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s
VLOG(FULL_NODE_WARNING) << "dropping OUT shard block info message to unknown shard";
return;
}
/*if (!private_block_overlays_.empty()) {
td::actor::send_closure(private_block_overlays_.begin()->second,
&FullNodePrivateBlockOverlay::send_shard_block_info, block_id, cc_seqno, data.clone());
}*/
auto private_overlay = private_block_overlays_.choose_overlay(ShardIdFull(masterchainId));
if (!private_overlay.empty()) {
td::actor::send_closure(private_overlay, &FullNodePrivateOverlayV2::send_shard_block_info, block_id, cc_seqno,
@ -300,6 +361,10 @@ void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) {
VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard";
return;
}
/*if (!private_block_overlays_.empty()) {
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_broadcast,
broadcast.clone());
}*/
auto private_overlay = private_block_overlays_.choose_overlay(broadcast.block_id.shard_full());
if (!private_overlay.empty()) {
td::actor::send_closure(private_overlay, &FullNodePrivateOverlayV2::send_broadcast, broadcast.clone());
@ -438,52 +503,7 @@ td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(AccountIdPrefixFull ds
return get_shard(shard_prefix(dst, max_shard_pfx_len));
}
void FullNodeImpl::got_key_block_proof(td::Ref<ProofLink> proof) {
auto R = proof->get_key_block_config();
R.ensure();
auto config = R.move_as_ok();
PublicKeyHash l = PublicKeyHash::zero();
std::vector<PublicKeyHash> keys;
std::map<PublicKeyHash, adnl::AdnlNodeIdShort> current_validators;
for (td::int32 i = -1; i <= 1; i++) {
auto r = config->get_total_validator_set(i < 0 ? i : 1 - i);
if (r.not_null()) {
auto vec = r->export_vector();
for (auto &el : vec) {
auto key = ValidatorFullId{el.key}.compute_short_id();
keys.push_back(key);
if (local_keys_.count(key)) {
l = key;
}
if (i == 1) {
current_validators[key] = adnl::AdnlNodeIdShort{el.addr.is_zero() ? key.bits256_value() : el.addr};
}
}
}
}
if (current_validators != current_validators_) {
current_validators_ = std::move(current_validators);
// update_private_block_overlays();
}
if (keys == all_validators_) {
return;
}
all_validators_ = keys;
sign_cert_by_ = l;
CHECK(all_validators_.size() > 0);
for (auto &shard : shards_) {
if (!shard.second.actor.empty()) {
td::actor::send_closure(shard.second.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
}
}
}
void FullNodeImpl::got_zero_block_state(td::Ref<ShardState> state) {
void FullNodeImpl::got_key_block_state(td::Ref<ShardState> state) {
auto m = td::Ref<MasterchainState>{std::move(state)};
PublicKeyHash l = PublicKeyHash::zero();
@ -506,9 +526,11 @@ void FullNodeImpl::got_zero_block_state(td::Ref<ShardState> state) {
}
}
// set_private_block_overlays_enable_compression(m->get_consensus_config().proto_version >= 3);
if (current_validators != current_validators_) {
current_validators_ = std::move(current_validators);
// update_private_block_overlays();
update_private_overlays();
}
if (keys == all_validators_) {
@ -527,28 +549,15 @@ void FullNodeImpl::got_zero_block_state(td::Ref<ShardState> state) {
}
void FullNodeImpl::new_key_block(BlockHandle handle) {
if (handle->id().seqno() == 0) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to get zero state: " << R.move_as_error();
} else {
td::actor::send_closure(SelfId, &FullNodeImpl::got_zero_block_state, R.move_as_ok());
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_shard_state_from_db, handle,
std::move(P));
} else {
CHECK(handle->is_key_block());
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ProofLink>> R) {
if (R.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to get key block proof: " << R.move_as_error();
} else {
td::actor::send_closure(SelfId, &FullNodeImpl::got_key_block_proof, R.move_as_ok());
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_block_proof_link_from_db, handle,
std::move(P));
}
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to get key block state: " << R.move_as_error();
} else {
td::actor::send_closure(SelfId, &FullNodeImpl::got_key_block_state, R.move_as_ok());
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_shard_state_from_db, handle,
std::move(P));
}
void FullNodeImpl::start_up() {
@ -638,13 +647,26 @@ void FullNodeImpl::start_up() {
std::make_unique<Callback>(actor_id(this)), std::move(started_promise_));
}
/* void FullNodeImpl::update_private_block_overlays() {
private_block_overlays_.clear();
void FullNodeImpl::update_private_overlays() {
for (auto &p : private_ext_msg_overlays_) {
update_ext_msg_overlay(p.first, p.second);
}
/*private_block_overlays_.clear();
if (local_keys_.empty()) {
return;
}
for (const auto &key : local_keys_) {
create_private_block_overlay(key);
}*/
}
/*void FullNodeImpl::set_private_block_overlays_enable_compression(bool value) {
if (private_block_overlays_enable_compression_ == value) {
return;
}
private_block_overlays_enable_compression_ = true;
for (auto &p : private_block_overlays_) {
td::actor::send_closure(p.second, &FullNodePrivateBlockOverlay::set_enable_compression, value);
}
}
@ -659,7 +681,32 @@ void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) {
"BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_, keyring_,
adnl_, rldp_, rldp2_, overlays_, validator_manager_);
}
} */
}*/
void FullNodeImpl::update_ext_msg_overlay(const std::string &name, ExtMsgOverlayInfo &overlay) {
auto old_actors = std::move(overlay.actors_);
overlay.actors_.clear();
auto try_local_id = [&](const adnl::AdnlNodeIdShort &local_id) {
if (std::find(overlay.nodes_.begin(), overlay.nodes_.end(), local_id) != overlay.nodes_.end()) {
auto it = old_actors.find(local_id);
if (it != old_actors.end()) {
overlay.actors_[local_id] = std::move(it->second);
old_actors.erase(it);
} else {
overlay.actors_[local_id] = td::actor::create_actor<FullNodePrivateExtMsgOverlay>(
"ExtMsgPrivateOverlay", local_id, overlay.nodes_, overlay.senders_, name, zero_state_file_hash_, config_,
keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_);
}
}
};
try_local_id(adnl_id_);
for (const PublicKeyHash &local_key: local_keys_) {
auto it = current_validators_.find(local_key);
if (it != current_validators_.end()) {
try_local_id(it->second);
}
}
}
FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash,
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring,

View file

@ -75,6 +75,11 @@ class FullNode : public td::actor::Actor {
virtual void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) = 0;
virtual void set_config(FullNodeConfig config) = 0;
virtual void add_ext_msg_overlay(std::vector<adnl::AdnlNodeIdShort> nodes,
std::map<adnl::AdnlNodeIdShort, int> senders, std::string name,
td::Promise<td::Unit> promise) = 0;
virtual void del_ext_msg_overlay(std::string name, td::Promise<td::Unit> promise) = 0;
static constexpr td::uint32 max_block_size() {
return 4 << 20;
}

View file

@ -56,6 +56,10 @@ class FullNodeImpl : public FullNode {
void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) override;
void set_config(FullNodeConfig config) override;
void add_ext_msg_overlay(std::vector<adnl::AdnlNodeIdShort> nodes, std::map<adnl::AdnlNodeIdShort, int> senders,
std::string name, td::Promise<td::Unit> promise) override;
void del_ext_msg_overlay(std::string name, td::Promise<td::Unit> promise) override;
void update_shard_configuration(td::Ref<MasterchainState> state, std::set<ShardIdFull> shards_to_monitor,
std::set<ShardIdFull> temporary_shards);
@ -82,8 +86,7 @@ class FullNodeImpl : public FullNode {
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise);
void got_key_block_proof(td::Ref<ProofLink> proof);
void got_zero_block_state(td::Ref<ShardState> state);
void got_key_block_state(td::Ref<ShardState> state);
void new_key_block(BlockHandle handle);
void start_up() override;
@ -138,11 +141,21 @@ class FullNodeImpl : public FullNode {
// TODO: Decide what to do with old private overlays. Maybe use old or new depending on some flag in config.
/*
std::map<PublicKeyHash, td::actor::ActorOwn<FullNodePrivateOverlay>> private_block_overlays_;
void update_private_block_overlays();
bool private_block_overlays_enable_compression_ = false;
void set_private_block_overlays_enable_compression(bool value);
void create_private_block_overlay(PublicKeyHash key);
*/
struct ExtMsgOverlayInfo {
std::vector<adnl::AdnlNodeIdShort> nodes_;
std::map<adnl::AdnlNodeIdShort, int> senders_;
std::map<adnl::AdnlNodeIdShort, td::actor::ActorOwn<FullNodePrivateExtMsgOverlay>>
actors_; // our local id -> actor
};
std::map<std::string, ExtMsgOverlayInfo> private_ext_msg_overlays_;
void update_private_overlays();
void update_ext_msg_overlay(const std::string& name, ExtMsgOverlayInfo& overlay);
FullNodePrivateBlockOverlays private_block_overlays_;
};

View file

@ -186,7 +186,12 @@ class Collator final : public td::actor::Actor {
block::ValueFlow value_flow_{block::ValueFlow::SetZero()};
std::unique_ptr<vm::AugmentedDictionary> fees_import_dict_;
std::map<ton::Bits256, int> ext_msg_map;
std::vector<std::pair<Ref<vm::Cell>, ExtMessage::Hash>> ext_msg_list_;
struct ExtMsg {
Ref<vm::Cell> cell;
ExtMessage::Hash hash;
int priority;
};
std::vector<ExtMsg> ext_msg_list_;
std::priority_queue<NewOutMsg, std::vector<NewOutMsg>, std::greater<NewOutMsg>> new_msgs;
std::pair<ton::LogicalTime, ton::Bits256> last_proc_int_msg_, first_unproc_int_msg_;
std::unique_ptr<vm::AugmentedDictionary> in_msg_dict, out_msg_dict, old_out_msg_queue_, out_msg_queue_,
@ -278,8 +283,9 @@ class Collator final : public td::actor::Actor {
bool is_our_address(Ref<vm::CellSlice> addr_ref) const;
bool is_our_address(ton::AccountIdPrefixFull addr_prefix) const;
bool is_our_address(const ton::StdSmcAddress& addr) const;
void after_get_external_messages(td::Result<std::vector<Ref<ExtMessage>>> res);
td::Result<bool> register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash);
void after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res);
td::Result<bool> register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash,
int priority);
// td::Result<bool> register_external_message(td::Slice ext_msg_boc);
void register_new_msg(block::NewOutMsg msg);
void register_new_msgs(block::transaction::Transaction& trans);

View file

@ -49,6 +49,7 @@ static const td::uint32 FORCE_SPLIT_QUEUE_SIZE = 4096;
static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000;
static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047;
static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000;
static const int HIGH_PRIORITY_EXTERNAL = 10; // don't skip high priority externals when queue is big
#define DBG(__n) dbg(__n)&&
#define DSTART int __dcnt = 0;
@ -255,11 +256,10 @@ void Collator::start_up() {
LOG(DEBUG) << "sending get_external_messages() query to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::get_external_messages, shard_,
[self = get_self()](td::Result<std::vector<Ref<ExtMessage>>> res) -> void {
LOG(DEBUG) << "got answer to get_external_messages() query";
td::actor::send_closure_later(
std::move(self), &Collator::after_get_external_messages, std::move(res));
});
[self = get_self()](td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res) -> void {
LOG(DEBUG) << "got answer to get_external_messages() query";
td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res));
});
}
if (is_masterchain() && !is_hardfork_) {
// 5. load shard block info messages
@ -3548,12 +3548,15 @@ bool Collator::process_inbound_external_messages() {
return true;
}
if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE) {
LOG(INFO) << "skipping processing of inbound external messages because out_msg_queue is too big ("
LOG(INFO) << "skipping processing of inbound external messages (except for high-priority) because out_msg_queue is "
"too big ("
<< out_msg_queue_size_ << " > " << SKIP_EXTERNALS_QUEUE_SIZE << ")";
return true;
}
bool full = !block_limit_status_->fits(block::ParamLimits::cl_soft);
for (auto& ext_msg_pair : ext_msg_list_) {
for (auto& ext_msg_struct : ext_msg_list_) {
if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE && ext_msg_struct.priority < HIGH_PRIORITY_EXTERNAL) {
continue;
}
if (full) {
LOG(INFO) << "BLOCK FULL, stop processing external messages";
break;
@ -3562,15 +3565,15 @@ bool Collator::process_inbound_external_messages() {
LOG(WARNING) << "medium timeout reached, stop processing inbound external messages";
break;
}
auto ext_msg = ext_msg_pair.first;
auto ext_msg = ext_msg_struct.cell;
ton::Bits256 hash{ext_msg->get_hash().bits()};
int r = process_external_message(std::move(ext_msg));
if (r < 0) {
bad_ext_msgs_.emplace_back(ext_msg_pair.second);
bad_ext_msgs_.emplace_back(ext_msg_struct.hash);
return false;
}
if (!r) {
delay_ext_msgs_.emplace_back(ext_msg_pair.second);
delay_ext_msgs_.emplace_back(ext_msg_struct.hash);
}
if (r > 0) {
full = !block_limit_status_->fits(block::ParamLimits::cl_soft);
@ -5274,7 +5277,8 @@ void Collator::return_block_candidate(td::Result<td::Unit> saved) {
* - If the external message has been previuosly registered and accepted, returns false.
* - Otherwise returns true.
*/
td::Result<bool> Collator::register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash) {
td::Result<bool> Collator::register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash,
int priority) {
if (ext_msg->get_level() != 0) {
return td::Status::Error("external message must have zero level");
}
@ -5318,7 +5322,7 @@ td::Result<bool> Collator::register_external_message_cell(Ref<vm::Cell> ext_msg,
block::gen::t_Message_Any.print_ref(std::cerr, ext_msg);
}
ext_msg_map.emplace(hash, 1);
ext_msg_list_.emplace_back(std::move(ext_msg), ext_hash);
ext_msg_list_.push_back({std::move(ext_msg), ext_hash, priority});
return true;
}
@ -5327,18 +5331,21 @@ td::Result<bool> Collator::register_external_message_cell(Ref<vm::Cell> ext_msg,
*
* @param res The result of the external message retrieval operation.
*/
void Collator::after_get_external_messages(td::Result<std::vector<Ref<ExtMessage>>> res) {
void Collator::after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res) {
// res: pair {ext msg, priority}
--pending;
if (res.is_error()) {
fatal_error(res.move_as_error());
return;
}
auto vect = res.move_as_ok();
for (auto&& ext_msg : vect) {
for (auto& p : vect) {
auto& ext_msg = p.first;
int priority = p.second;
Ref<vm::Cell> ext_msg_cell = ext_msg->root_cell();
bool err = ext_msg_cell.is_null();
if (!err) {
auto reg_res = register_external_message_cell(std::move(ext_msg_cell), ext_msg->hash());
auto reg_res = register_external_message_cell(std::move(ext_msg_cell), ext_msg->hash(), priority);
if (reg_res.is_error() || !reg_res.move_as_ok()) {
err = true;
}

View file

@ -101,7 +101,8 @@ class ValidatorManager : public ValidatorManagerInterface {
td::Promise<td::Ref<MessageQueue>> promise) = 0;
virtual void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<MessageQueue>> promise) = 0;
virtual void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) = 0;
virtual void get_external_messages(ShardIdFull shard,
td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) = 0;
virtual void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) = 0;
virtual void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) = 0;

View file

@ -259,7 +259,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro
td::actor::send_closure(db_, &Db::get_key_block_proof, block_id, std::move(P));
}
void ValidatorManagerImpl::new_external_message(td::BufferSlice data) {
void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) {
if (last_masterchain_state_.is_null()) {
return;
}
@ -507,9 +507,13 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t
get_block_handle(block_id, true, std::move(P));
}
void ValidatorManagerImpl::get_external_messages(ShardIdFull shard,
td::Promise<std::vector<td::Ref<ExtMessage>>> promise) {
promise.set_result(ext_messages_);
void ValidatorManagerImpl::get_external_messages(
ShardIdFull shard, td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) {
std::vector<std::pair<td::Ref<ExtMessage>, int>> res;
for (const auto& x : ext_messages_) {
res.emplace_back(x, 0);
}
promise.set_result(std::move(res));
}
void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) {

View file

@ -124,7 +124,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void get_key_block_proof_link(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
//void get_block_description(BlockIdExt block_id, td::Promise<BlockDescription> promise) override;
void new_external_message(td::BufferSlice data) override;
void new_external_message(td::BufferSlice data, int priority) override;
void check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) override {
UNREACHABLE();
}
@ -192,7 +192,8 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Ref<MessageQueue>> promise) override;
void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<MessageQueue>> promise) override;
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
void get_external_messages(ShardIdFull shard,
td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) override;
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
@ -248,7 +249,7 @@ class ValidatorManagerImpl : public ValidatorManager {
UNREACHABLE();
}
void send_external_message(td::Ref<ExtMessage> message) override {
new_external_message(message->serialize());
new_external_message(message->serialize(), 0);
}
void send_ihr_message(td::Ref<IhrMessage> message) override {
new_ihr_message(message->serialize());

View file

@ -150,7 +150,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro
td::actor::send_closure(db_, &Db::get_key_block_proof_link, block_id, std::move(P));
}
void ValidatorManagerImpl::new_external_message(td::BufferSlice data) {
void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) {
auto R = create_ext_message(std::move(data), block::SizeLimitsConfig::ExtMsgLimits());
if (R.is_ok()) {
ext_messages_.emplace_back(R.move_as_ok());
@ -357,9 +357,13 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t
get_block_handle(block_id, true, std::move(P));
}
void ValidatorManagerImpl::get_external_messages(ShardIdFull shard,
td::Promise<std::vector<td::Ref<ExtMessage>>> promise) {
promise.set_result(ext_messages_);
void ValidatorManagerImpl::get_external_messages(
ShardIdFull shard, td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) {
std::vector<std::pair<td::Ref<ExtMessage>, int>> res;
for (const auto &x : ext_messages_) {
res.emplace_back(x, 0);
}
promise.set_result(std::move(res));
}
void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) {

View file

@ -144,7 +144,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void get_key_block_proof(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
void get_key_block_proof_link(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
void new_external_message(td::BufferSlice data) override;
void new_external_message(td::BufferSlice data, int priority) override;
void check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) override {
UNREACHABLE();
}
@ -232,7 +232,8 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Ref<MessageQueue>> promise) override;
void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<MessageQueue>> promise) override;
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
void get_external_messages(ShardIdFull shard,
td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) override;
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
@ -312,7 +313,7 @@ class ValidatorManagerImpl : public ValidatorManager {
UNREACHABLE();
}
void send_external_message(td::Ref<ExtMessage> message) override {
new_external_message(message->serialize());
new_external_message(message->serialize(), 0);
}
void send_ihr_message(td::Ref<IhrMessage> message) override {
new_ihr_message(message->serialize());

View file

@ -370,7 +370,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro
td::actor::send_closure(db_, &Db::get_key_block_proof, block_id, std::move(P));
}
void ValidatorManagerImpl::new_external_message(td::BufferSlice data) {
void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) {
if (!is_collator()) {
return;
}
@ -378,7 +378,7 @@ void ValidatorManagerImpl::new_external_message(td::BufferSlice data) {
VLOG(VALIDATOR_NOTICE) << "dropping ext message: validator is not ready";
return;
}
if ((double)ext_messages_.size() > max_mempool_num()) {
if (ext_msgs_[priority].ext_messages_.size() > (size_t)max_mempool_num()) {
return;
}
auto R = create_ext_message(std::move(data), last_masterchain_state_->get_ext_msg_limits());
@ -386,21 +386,30 @@ void ValidatorManagerImpl::new_external_message(td::BufferSlice data) {
VLOG(VALIDATOR_NOTICE) << "dropping bad ext message: " << R.move_as_error();
return;
}
add_external_message(R.move_as_ok());
add_external_message(R.move_as_ok(), priority);
}
void ValidatorManagerImpl::add_external_message(td::Ref<ExtMessage> msg) {
void ValidatorManagerImpl::add_external_message(td::Ref<ExtMessage> msg, int priority) {
auto &msgs = ext_msgs_[priority];
auto message = std::make_unique<MessageExt<ExtMessage>>(msg);
auto id = message->ext_id();
auto address = message->address();
unsigned long per_address_limit = 256;
if (ext_addr_messages_.count(address) < per_address_limit) {
if (ext_messages_hashes_.count(id.hash) == 0) {
ext_messages_.emplace(id, std::move(message));
ext_messages_hashes_.emplace(id.hash, id);
ext_addr_messages_[address].emplace(id.hash, id);
}
auto it = msgs.ext_addr_messages_.find(address);
if (it != msgs.ext_addr_messages_.end() && it->second.size() >= per_address_limit) {
return;
}
auto it2 = ext_messages_hashes_.find(id.hash);
if (it2 != ext_messages_hashes_.end()) {
int old_priority = it2->second.first;
if (old_priority >= priority) {
return;
}
ext_msgs_[old_priority].erase(id);
}
msgs.ext_messages_.emplace(id, std::move(message));
msgs.ext_addr_messages_[address].emplace(id.hash, id);
ext_messages_hashes_[id.hash] = {priority, id};
}
void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) {
++ls_stats_check_ext_messages_;
@ -894,34 +903,44 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t
get_block_handle(block_id, true, std::move(P));
}
void ValidatorManagerImpl::get_external_messages(ShardIdFull shard,
td::Promise<std::vector<td::Ref<ExtMessage>>> promise) {
void ValidatorManagerImpl::get_external_messages(
ShardIdFull shard, td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) {
td::Timer t;
size_t processed = 0, deleted = 0;
std::vector<td::Ref<ExtMessage>> res;
std::vector<std::pair<td::Ref<ExtMessage>, int>> res;
MessageId<ExtMessage> left{AccountIdPrefixFull{shard.workchain, shard.shard & (shard.shard - 1)}, Bits256::zero()};
auto it = ext_messages_.lower_bound(left);
while (it != ext_messages_.end()) {
auto s = it->first;
if (!shard_contains(shard, s.dst)) {
break;
size_t total_msgs = 0;
td::Random::Fast rnd;
for (auto iter = ext_msgs_.rbegin(); iter != ext_msgs_.rend(); ++iter) {
std::vector<std::pair<td::Ref<ExtMessage>, int>> cur_res;
int priority = iter->first;
auto &msgs = iter->second;
auto it = msgs.ext_messages_.lower_bound(left);
while (it != msgs.ext_messages_.end()) {
auto s = it->first;
if (!shard_contains(shard, s.dst)) {
break;
}
++processed;
if (it->second->expired()) {
msgs.ext_addr_messages_[it->second->address()].erase(it->first.hash);
ext_messages_hashes_.erase(it->first.hash);
it = msgs.ext_messages_.erase(it);
++deleted;
continue;
}
if (it->second->is_active()) {
cur_res.emplace_back(it->second->message(), priority);
}
it++;
}
++processed;
if (it->second->expired()) {
ext_addr_messages_[it->second->address()].erase(it->first.hash);
ext_messages_hashes_.erase(it->first.hash);
it = ext_messages_.erase(it);
++deleted;
continue;
}
if (it->second->is_active()) {
res.push_back(it->second->message());
}
it++;
td::random_shuffle(td::as_mutable_span(cur_res), rnd);
res.insert(res.end(), cur_res.begin(), cur_res.end());
total_msgs += msgs.ext_messages_.size();
}
LOG(WARNING) << "get_external_messages to shard " << shard.to_str() << " : time=" << t.elapsed()
<< " result_size=" << res.size() << " processed=" << processed << " expired=" << deleted
<< " total_size=" << ext_messages_.size();
<< " total_size=" << total_msgs;
promise.set_value(std::move(res));
}
@ -964,8 +983,9 @@ void ValidatorManagerImpl::complete_external_messages(std::vector<ExtMessage::Ha
for (auto &hash : to_delete) {
auto it = ext_messages_hashes_.find(hash);
if (it != ext_messages_hashes_.end()) {
ext_addr_messages_[ext_messages_[it->second]->address()].erase(it->first);
CHECK(ext_messages_.erase(it->second));
int priority = it->second.first;
auto msg_id = it->second.second;
ext_msgs_[priority].erase(msg_id);
ext_messages_hashes_.erase(it);
}
}
@ -973,12 +993,14 @@ void ValidatorManagerImpl::complete_external_messages(std::vector<ExtMessage::Ha
for (auto &hash : to_delay) {
auto it = ext_messages_hashes_.find(hash);
if (it != ext_messages_hashes_.end()) {
auto it2 = ext_messages_.find(it->second);
if ((ext_messages_.size() < soft_mempool_limit) && it2->second->can_postpone()) {
int priority = it->second.first;
auto msg_id = it->second.second;
auto &msgs = ext_msgs_[priority];
auto it2 = msgs.ext_messages_.find(msg_id);
if ((msgs.ext_messages_.size() < soft_mempool_limit) && it2->second->can_postpone()) {
it2->second->postpone();
} else {
ext_addr_messages_[it2->second->address()].erase(it2->first.hash);
ext_messages_.erase(it2);
msgs.erase(msg_id);
ext_messages_hashes_.erase(it);
}
}
@ -1574,7 +1596,7 @@ void ValidatorManagerImpl::send_get_next_key_blocks_request(BlockIdExt block_id,
void ValidatorManagerImpl::send_external_message(td::Ref<ExtMessage> message) {
callback_->send_ext_message(message->shard(), message->serialize());
add_external_message(std::move(message));
add_external_message(std::move(message), 0);
}
void ValidatorManagerImpl::send_ihr_message(td::Ref<IhrMessage> message) {
@ -2284,6 +2306,9 @@ td::actor::ActorOwn<ValidatorGroup> ValidatorManagerImpl::create_validator_group
if (check_gc_list_.count(session_id) == 1) {
return td::actor::ActorOwn<ValidatorGroup>{};
} else {
// Call get_external_messages to cleanup mempool for the shard
get_external_messages(shard, [](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
auto validator_id = get_validator(shard, validator_set);
CHECK(!validator_id.is_zero());
auto G = td::actor::create_actor<ValidatorGroup>(

View file

@ -237,9 +237,20 @@ class ValidatorManagerImpl : public ValidatorManager {
};
std::map<ShardTopBlockDescriptionId, ShardTopBlock> shard_blocks_;
std::map<BlockIdExt, td::Ref<OutMsgQueueProof>> cached_msg_queue_to_masterchain_;
std::map<MessageId<ExtMessage>, std::unique_ptr<MessageExt<ExtMessage>>> ext_messages_;
std::map<std::pair<ton::WorkchainId,ton::StdSmcAddress>, std::map<ExtMessage::Hash, MessageId<ExtMessage>>> ext_addr_messages_;
std::map<ExtMessage::Hash, MessageId<ExtMessage>> ext_messages_hashes_;
struct ExtMessages {
std::map<MessageId<ExtMessage>, std::unique_ptr<MessageExt<ExtMessage>>> ext_messages_;
std::map<std::pair<ton::WorkchainId, ton::StdSmcAddress>, std::map<ExtMessage::Hash, MessageId<ExtMessage>>>
ext_addr_messages_;
void erase(const MessageId<ExtMessage>& id) {
auto it = ext_messages_.find(id);
CHECK(it != ext_messages_.end());
ext_addr_messages_[it->second->address()].erase(id.hash);
ext_messages_.erase(it);
}
};
std::map<int, ExtMessages> ext_msgs_; // priority -> messages
std::map<ExtMessage::Hash, std::pair<int, MessageId<ExtMessage>>> ext_messages_hashes_; // hash -> priority
// IHR ?
std::map<MessageId<IhrMessage>, std::unique_ptr<MessageExt<IhrMessage>>> ihr_messages_;
std::map<IhrMessage::Hash, MessageId<IhrMessage>> ihr_messages_hashes_;
@ -366,8 +377,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void get_key_block_proof_link(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
//void get_block_description(BlockIdExt block_id, td::Promise<BlockDescription> promise) override;
void new_external_message(td::BufferSlice data) override;
void add_external_message(td::Ref<ExtMessage> message);
void new_external_message(td::BufferSlice data, int priority) override;
void add_external_message(td::Ref<ExtMessage> message, int priority);
void check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) override;
void new_ihr_message(td::BufferSlice data) override;
@ -429,7 +440,8 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Ref<MessageQueue>> promise) override;
void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<MessageQueue>> promise) override;
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
void get_external_messages(ShardIdFull shard,
td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) override;
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
void get_shard_blocks_for_collator(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;

View file

@ -202,7 +202,7 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void get_next_block(BlockIdExt block_id, td::Promise<BlockHandle> promise) = 0;
virtual void write_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0;
virtual void new_external_message(td::BufferSlice data) = 0;
virtual void new_external_message(td::BufferSlice data, int priority) = 0;
virtual void check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) = 0;
virtual void new_ihr_message(td::BufferSlice data) = 0;
virtual void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0;