1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Add custom overlays for external messages (#949)

* Private overlay for external messages

* Improve ext msg overlays

* Manage from validator console
* Bypass out queue size limit for high-priority messages
* Shuffle messages in get_external_messages

* Cleanup mempool when creating validator group

* Improve private overlays for externals

1. Allow using validator adnl ids in addition to fullnode ids
2. Set priority per sender, not per overlay
3. Require the same overlay name for all nodes
4. Enable lz4 in private block overlay

* Fix typo, add debug logs

* Enable lz4 in private block overlay by config

Change proto_version for lz4 in catchain overlays to 4

* Add logs for broadcasts in fullnode

---------

Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
EmelyanenkoK 2024-04-01 16:44:08 +03:00 committed by GitHub
parent 46ca0e6014
commit 0434eadc1f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 843 additions and 227 deletions

View file

@ -19,26 +19,25 @@
#include "common/delay.h"
#include "full-node-serializer.hpp"
namespace ton {
namespace ton::validator::fullnode {
namespace validator {
namespace fullnode {
void FullNodePrivateOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
process_block_broadcast(src, query);
}
void FullNodePrivateOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query) {
void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src,
ton_api::tonNode_blockBroadcastCompressed &query) {
process_block_broadcast(src, query);
}
void FullNodePrivateOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
void FullNodePrivateBlockOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
auto B = deserialize_block_broadcast(query, overlay::Overlays::max_fec_broadcast_size());
if (B.is_error()) {
LOG(DEBUG) << "dropped broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast in private overlay from " << src << ": "
<< B.ok().block_id.to_str();
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
@ -52,25 +51,28 @@ void FullNodePrivateOverlay::process_block_broadcast(PublicKeyHash src, ton_api:
std::move(P));
}
void FullNodePrivateOverlay::process_broadcast(PublicKeyHash, ton_api::tonNode_newShardBlockBroadcast &query) {
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block,
create_block_id(query.block_->block_), query.block_->cc_seqno_,
std::move(query.block_->data_));
void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
BlockIdExt block_id = create_block_id(query.block_->block_);
VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast in private overlay from " << src << ": "
<< block_id.to_str();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, block_id,
query.block_->cc_seqno_, std::move(query.block_->data_));
}
void FullNodePrivateOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
return;
}
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
}
void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) {
void FullNodePrivateBlockOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::BufferSlice data) {
if (!inited_) {
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast in private overlay: " << block_id.to_str();
auto B = create_serialize_tl_object<ton_api::tonNode_newShardBlockBroadcast>(
create_tl_object<ton_api::tonNode_newShardBlock>(create_tl_block_id(block_id), cc_seqno, std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
@ -82,11 +84,13 @@ void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, Catchain
}
}
void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) {
void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) {
if (!inited_) {
return;
}
auto B = serialize_block_broadcast(broadcast, false); // compression_enabled = false
VLOG(FULL_NODE_DEBUG) << "Sending block broadcast in private overlay"
<< (enable_compression_ ? " (with compression)" : "") << ": " << broadcast.block_id.to_str();
auto B = serialize_block_broadcast(broadcast, enable_compression_);
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error();
return;
@ -95,7 +99,7 @@ void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) {
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}
void FullNodePrivateOverlay::start_up() {
void FullNodePrivateBlockOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
@ -112,22 +116,22 @@ void FullNodePrivateOverlay::start_up() {
try_init();
}
void FullNodePrivateOverlay::try_init() {
void FullNodePrivateBlockOverlay::try_init() {
// Sometimes adnl id is added to validator engine later (or not at all)
td::actor::send_closure(
adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result<bool> R) {
if (R.is_ok() && R.ok()) {
td::actor::send_closure(SelfId, &FullNodePrivateOverlay::init);
td::actor::send_closure(SelfId, &FullNodePrivateBlockOverlay::init);
} else {
delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateOverlay::try_init); },
delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateBlockOverlay::try_init); },
td::Timestamp::in(30.0));
}
});
}
void FullNodePrivateOverlay::init() {
LOG(FULL_NODE_INFO) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size()
<< " nodes";
void FullNodePrivateBlockOverlay::init() {
LOG(FULL_NODE_WARNING) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size()
<< " nodes, overlay_id=" << overlay_id_;
class Callback : public overlay::Overlays::Callback {
public:
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
@ -136,37 +140,140 @@ void FullNodePrivateOverlay::init() {
td::Promise<td::BufferSlice> promise) override {
}
void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
td::actor::send_closure(node_, &FullNodePrivateOverlay::receive_broadcast, src, std::move(data));
td::actor::send_closure(node_, &FullNodePrivateBlockOverlay::receive_broadcast, src, std::move(data));
}
void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::Unit> promise) override {
}
Callback(td::actor::ActorId<FullNodePrivateOverlay> node) : node_(node) {
Callback(td::actor::ActorId<FullNodePrivateBlockOverlay> node) : node_(node) {
}
private:
td::actor::ActorId<FullNodePrivateOverlay> node_;
td::actor::ActorId<FullNodePrivateBlockOverlay> node_;
};
overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(),
overlay::CertificateFlags::AllowFec | overlay::CertificateFlags::Trusted,
{}};
td::actor::send_closure(overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(),
nodes_, std::make_unique<Callback>(actor_id(this)), rules);
nodes_, std::make_unique<Callback>(actor_id(this)), rules, R"({ "type": "private-blocks" })");
td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_);
td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_);
inited_ = true;
}
void FullNodePrivateOverlay::tear_down() {
void FullNodePrivateBlockOverlay::tear_down() {
if (inited_) {
td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_);
}
}
} // namespace fullnode
void FullNodePrivateExtMsgOverlay::process_broadcast(PublicKeyHash src,
ton_api::tonNode_externalMessageBroadcast &query) {
auto it = senders_.find(adnl::AdnlNodeIdShort{src});
if (it == senders_.end()) {
return;
}
LOG(FULL_NODE_DEBUG) << "Got external message in private overlay \"" << name_ << "\" from " << src
<< " (priority=" << it->second << ")";
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message,
std::move(query.message_->data_), it->second);
}
} // namespace validator
void FullNodePrivateExtMsgOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
return;
}
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
}
} // namespace ton
void FullNodePrivateExtMsgOverlay::send_external_message(td::BufferSlice data) {
if (config_.ext_messages_broadcast_disabled_) {
return;
}
LOG(FULL_NODE_DEBUG) << "Sending external message to private overlay \"" << name_ << "\"";
auto B = create_serialize_tl_object<ton_api::tonNode_externalMessageBroadcast>(
create_tl_object<ton_api::tonNode_externalMessage>(std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(B));
} else {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(B));
}
}
void FullNodePrivateExtMsgOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
std::vector<td::Bits256> nodes;
for (const adnl::AdnlNodeIdShort &id : nodes_) {
nodes.push_back(id.bits256_value());
}
auto X =
create_hash_tl_object<ton_api::tonNode_privateExtMsgsOverlayId>(zero_state_file_hash_, name_, std::move(nodes));
td::BufferSlice b{32};
b.as_slice().copy_from(as_slice(X));
overlay_id_full_ = overlay::OverlayIdFull{std::move(b)};
overlay_id_ = overlay_id_full_.compute_short_id();
try_init();
}
void FullNodePrivateExtMsgOverlay::try_init() {
// Sometimes adnl id is added to validator engine later (or not at all)
td::actor::send_closure(
adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result<bool> R) {
if (R.is_ok() && R.ok()) {
td::actor::send_closure(SelfId, &FullNodePrivateExtMsgOverlay::init);
} else {
delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateExtMsgOverlay::try_init); },
td::Timestamp::in(30.0));
}
});
}
void FullNodePrivateExtMsgOverlay::init() {
LOG(FULL_NODE_WARNING) << "Creating private ext msg overlay \"" << name_ << "\" for adnl id " << local_id_ << " : "
<< nodes_.size() << " nodes, overlay_id=" << overlay_id_;
class Callback : public overlay::Overlays::Callback {
public:
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
}
void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) override {
}
void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
td::actor::send_closure(node_, &FullNodePrivateExtMsgOverlay::receive_broadcast, src, std::move(data));
}
void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
td::Promise<td::Unit> promise) override {
}
Callback(td::actor::ActorId<FullNodePrivateExtMsgOverlay> node) : node_(node) {
}
private:
td::actor::ActorId<FullNodePrivateExtMsgOverlay> node_;
};
std::map<PublicKeyHash, td::uint32> authorized_keys;
for (const auto &sender : senders_) {
authorized_keys[sender.first.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size();
}
overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), 0, std::move(authorized_keys)};
td::actor::send_closure(
overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), nodes_,
std::make_unique<Callback>(actor_id(this)), rules,
PSTRING() << R"({ "type": "private-ext-msg", "name": ")" << td::format::Escaped{name_} << R"(" })");
td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_);
td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_);
}
void FullNodePrivateExtMsgOverlay::tear_down() {
LOG(FULL_NODE_WARNING) << "Destroying private ext msg overlay \"" << name_ << "\" for adnl id " << local_id_;
td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_);
}
} // namespace ton::validator::fullnode

View file

@ -18,13 +18,9 @@
#include "full-node.h"
namespace ton {
namespace ton::validator::fullnode {
namespace validator {
namespace fullnode {
class FullNodePrivateOverlay : public td::actor::Actor {
class FullNodePrivateBlockOverlay : public td::actor::Actor {
public:
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query);
@ -40,19 +36,89 @@ class FullNodePrivateOverlay : public td::actor::Actor {
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);
void set_config(FullNodeConfig config) {
config_ = std::move(config);
}
void set_enable_compression(bool value) {
enable_compression_ = value;
}
void start_up() override;
void tear_down() override;
FullNodePrivateOverlay(adnl::AdnlNodeIdShort local_id, std::vector<adnl::AdnlNodeIdShort> nodes,
FileHash zero_state_file_hash, FullNodeConfig config,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
FullNodePrivateBlockOverlay(adnl::AdnlNodeIdShort local_id, std::vector<adnl::AdnlNodeIdShort> nodes,
FileHash zero_state_file_hash, FullNodeConfig config, bool enable_compression,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
: local_id_(local_id)
, nodes_(std::move(nodes))
, zero_state_file_hash_(zero_state_file_hash)
, config_(config)
, enable_compression_(enable_compression)
, keyring_(keyring)
, adnl_(adnl)
, rldp_(rldp)
, rldp2_(rldp2)
, overlays_(overlays)
, validator_manager_(validator_manager) {
}
private:
adnl::AdnlNodeIdShort local_id_;
std::vector<adnl::AdnlNodeIdShort> nodes_;
FileHash zero_state_file_hash_;
FullNodeConfig config_;
bool enable_compression_;
td::actor::ActorId<keyring::Keyring> keyring_;
td::actor::ActorId<adnl::Adnl> adnl_;
td::actor::ActorId<rldp::Rldp> rldp_;
td::actor::ActorId<rldp2::Rldp> rldp2_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
bool inited_ = false;
overlay::OverlayIdFull overlay_id_full_;
overlay::OverlayIdShort overlay_id_;
void try_init();
void init();
};
class FullNodePrivateExtMsgOverlay : public td::actor::Actor {
public:
void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query);
template <class T>
void process_broadcast(PublicKeyHash, T &) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
}
void receive_broadcast(PublicKeyHash src, td::BufferSlice query);
void send_external_message(td::BufferSlice data);
void set_config(FullNodeConfig config) {
config_ = std::move(config);
}
void start_up() override;
void tear_down() override;
FullNodePrivateExtMsgOverlay(adnl::AdnlNodeIdShort local_id, std::vector<adnl::AdnlNodeIdShort> nodes,
std::map<adnl::AdnlNodeIdShort, int> senders, std::string name,
FileHash zero_state_file_hash, FullNodeConfig config,
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
: local_id_(local_id)
, nodes_(std::move(nodes))
, senders_(std::move(senders))
, name_(std::move(name))
, zero_state_file_hash_(zero_state_file_hash)
, config_(config)
, keyring_(keyring)
, adnl_(adnl)
, rldp_(rldp)
@ -64,6 +130,8 @@ class FullNodePrivateOverlay : public td::actor::Actor {
private:
adnl::AdnlNodeIdShort local_id_;
std::vector<adnl::AdnlNodeIdShort> nodes_;
std::map<adnl::AdnlNodeIdShort, int> senders_;
std::string name_;
FileHash zero_state_file_hash_;
FullNodeConfig config_;
@ -82,8 +150,4 @@ class FullNodePrivateOverlay : public td::actor::Actor {
void init();
};
} // namespace fullnode
} // namespace validator
} // namespace ton
} // namespace ton::validator::fullnode

View file

@ -118,7 +118,7 @@ void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broad
promise.set_error(td::Status::Error("rebroadcasting external messages is disabled"));
promise = [manager = validator_manager_, message = q->message_->data_.clone()](td::Result<td::Unit> R) mutable {
if (R.is_ok()) {
td::actor::send_closure(manager, &ValidatorManagerInterface::new_external_message, std::move(message));
td::actor::send_closure(manager, &ValidatorManagerInterface::new_external_message, std::move(message), 0);
}
};
}
@ -636,13 +636,14 @@ void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ih
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query) {
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message,
std::move(query.message_->data_));
std::move(query.message_->data_), 0);
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block,
create_block_id(query.block_->block_), query.block_->cc_seqno_,
std::move(query.block_->data_));
BlockIdExt block_id = create_block_id(query.block_->block_);
VLOG(FULL_NODE_DEBUG) << "Received newShardBlockBroadcast from " << src << ": " << block_id.to_str();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block, block_id,
query.block_->cc_seqno_, std::move(query.block_->data_));
}
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
@ -659,6 +660,7 @@ void FullNodeShardImpl::process_block_broadcast(PublicKeyHash src, ton_api::tonN
LOG(DEBUG) << "dropped broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast from " << src << ": " << B.ok().block_id.to_str();
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
@ -734,6 +736,7 @@ void FullNodeShardImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno
UNREACHABLE();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newShardBlockBroadcast: " << block_id.to_str();
auto B = create_serialize_tl_object<ton_api::tonNode_newShardBlockBroadcast>(
create_tl_object<ton_api::tonNode_newShardBlock>(create_tl_block_id(block_id), cc_seqno, std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
@ -750,6 +753,7 @@ void FullNodeShardImpl::send_broadcast(BlockBroadcast broadcast) {
UNREACHABLE();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending block broadcast in private overlay: " << broadcast.block_id.to_str();
auto B = serialize_block_broadcast(broadcast, false); // compression_enabled = false
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error();

View file

@ -35,6 +35,10 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
}
local_keys_.insert(key);
create_private_block_overlay(key);
for (auto &p : private_ext_msg_overlays_) {
update_ext_msg_overlay(p.first, p.second);
}
if (!sign_cert_by_.is_zero()) {
promise.set_value(td::Unit());
@ -50,7 +54,6 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
for (auto &shard : shards_) {
td::actor::send_closure(shard.second, &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
}
create_private_block_overlay(key);
promise.set_value(td::Unit());
}
@ -60,6 +63,11 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
return;
}
local_keys_.erase(key);
private_block_overlays_.erase(key);
for (auto &p : private_ext_msg_overlays_) {
update_ext_msg_overlay(p.first, p.second);
}
if (sign_cert_by_ != key) {
promise.set_value(td::Unit());
return;
@ -75,7 +83,6 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
for (auto &shard : shards_) {
td::actor::send_closure(shard.second, &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
}
private_block_overlays_.erase(key);
promise.set_value(td::Unit());
}
@ -111,6 +118,10 @@ void FullNodeImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td:
td::actor::send_closure(s.second, &FullNodeShard::update_adnl_id, adnl_id, ig.get_promise());
}
local_id_ = adnl_id_.pubkey_hash();
for (auto &p : private_ext_msg_overlays_) {
update_ext_msg_overlay(p.first, p.second);
}
}
void FullNodeImpl::set_config(FullNodeConfig config) {
@ -118,6 +129,44 @@ void FullNodeImpl::set_config(FullNodeConfig config) {
for (auto& shard : shards_) {
td::actor::send_closure(shard.second, &FullNodeShard::set_config, config);
}
for (auto& overlay : private_block_overlays_) {
td::actor::send_closure(overlay.second, &FullNodePrivateBlockOverlay::set_config, config);
}
for (auto& overlay : private_ext_msg_overlays_) {
for (auto &actor : overlay.second.actors_) {
td::actor::send_closure(actor.second, &FullNodePrivateExtMsgOverlay::set_config, config);
}
}
}
void FullNodeImpl::add_ext_msg_overlay(std::vector<adnl::AdnlNodeIdShort> nodes,
std::map<adnl::AdnlNodeIdShort, int> senders, std::string name,
td::Promise<td::Unit> promise) {
if (nodes.empty()) {
promise.set_error(td::Status::Error("list of nodes is empty"));
return;
}
if (private_ext_msg_overlays_.count(name)) {
promise.set_error(td::Status::Error(PSTRING() << "duplicate overlay name \"" << name << "\""));
return;
}
VLOG(FULL_NODE_WARNING) << "Adding private overlay for external messages \"" << name << "\", " << nodes.size()
<< " nodes";
auto &p = private_ext_msg_overlays_[name];
p.nodes_ = nodes;
p.senders_ = senders;
update_ext_msg_overlay(name, p);
promise.set_result(td::Unit());
}
void FullNodeImpl::del_ext_msg_overlay(std::string name, td::Promise<td::Unit> promise) {
auto it = private_ext_msg_overlays_.find(name);
if (it == private_ext_msg_overlays_.end()) {
promise.set_error(td::Status::Error(PSTRING() << "no such overlay \"" << name << "\""));
return;
}
private_ext_msg_overlays_.erase(it);
promise.set_result(td::Unit());
}
void FullNodeImpl::initial_read_complete(BlockHandle top_handle) {
@ -172,6 +221,14 @@ void FullNodeImpl::send_ext_message(AccountIdPrefixFull dst, td::BufferSlice dat
VLOG(FULL_NODE_WARNING) << "dropping OUT ext message to unknown shard";
return;
}
for (auto &private_overlay : private_ext_msg_overlays_) {
for (auto &actor : private_overlay.second.actors_) {
auto local_id = actor.first;
if (private_overlay.second.senders_.count(local_id)) {
td::actor::send_closure(actor.second, &FullNodePrivateExtMsgOverlay::send_external_message, data.clone());
}
}
}
td::actor::send_closure(shard, &FullNodeShard::send_external_message, std::move(data));
}
@ -182,8 +239,8 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s
return;
}
if (!private_block_overlays_.empty()) {
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateOverlay::send_shard_block_info,
block_id, cc_seqno, data.clone());
td::actor::send_closure(private_block_overlays_.begin()->second,
&FullNodePrivateBlockOverlay::send_shard_block_info, block_id, cc_seqno, data.clone());
}
td::actor::send_closure(shard, &FullNodeShard::send_shard_block_info, block_id, cc_seqno, std::move(data));
}
@ -195,7 +252,7 @@ void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) {
return;
}
if (!private_block_overlays_.empty()) {
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateOverlay::send_broadcast,
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateBlockOverlay::send_broadcast,
broadcast.clone());
}
td::actor::send_closure(shard, &FullNodeShard::send_broadcast, std::move(broadcast));
@ -292,50 +349,7 @@ td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(AccountIdPrefixFull ds
return get_shard(shard_prefix(dst, 60));
}
void FullNodeImpl::got_key_block_proof(td::Ref<ProofLink> proof) {
auto R = proof->get_key_block_config();
R.ensure();
auto config = R.move_as_ok();
PublicKeyHash l = PublicKeyHash::zero();
std::vector<PublicKeyHash> keys;
std::map<PublicKeyHash, adnl::AdnlNodeIdShort> current_validators;
for (td::int32 i = -1; i <= 1; i++) {
auto r = config->get_total_validator_set(i < 0 ? i : 1 - i);
if (r.not_null()) {
auto vec = r->export_vector();
for (auto &el : vec) {
auto key = ValidatorFullId{el.key}.compute_short_id();
keys.push_back(key);
if (local_keys_.count(key)) {
l = key;
}
if (i == 1) {
current_validators[key] = adnl::AdnlNodeIdShort{el.addr.is_zero() ? key.bits256_value() : el.addr};
}
}
}
}
if (current_validators != current_validators_) {
current_validators_ = std::move(current_validators);
update_private_block_overlays();
}
if (keys == all_validators_) {
return;
}
all_validators_ = keys;
sign_cert_by_ = l;
CHECK(all_validators_.size() > 0);
for (auto &shard : shards_) {
td::actor::send_closure(shard.second, &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
}
}
void FullNodeImpl::got_zero_block_state(td::Ref<ShardState> state) {
void FullNodeImpl::got_key_block_state(td::Ref<ShardState> state) {
auto m = td::Ref<MasterchainState>{std::move(state)};
PublicKeyHash l = PublicKeyHash::zero();
@ -358,9 +372,11 @@ void FullNodeImpl::got_zero_block_state(td::Ref<ShardState> state) {
}
}
set_private_block_overlays_enable_compression(m->get_consensus_config().proto_version >= 3);
if (current_validators != current_validators_) {
current_validators_ = std::move(current_validators);
update_private_block_overlays();
update_private_overlays();
}
if (keys == all_validators_) {
@ -377,28 +393,15 @@ void FullNodeImpl::got_zero_block_state(td::Ref<ShardState> state) {
}
void FullNodeImpl::new_key_block(BlockHandle handle) {
if (handle->id().seqno() == 0) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to get zero state: " << R.move_as_error();
} else {
td::actor::send_closure(SelfId, &FullNodeImpl::got_zero_block_state, R.move_as_ok());
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_shard_state_from_db, handle,
std::move(P));
} else {
CHECK(handle->is_key_block());
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ProofLink>> R) {
if (R.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to get key block proof: " << R.move_as_error();
} else {
td::actor::send_closure(SelfId, &FullNodeImpl::got_key_block_proof, R.move_as_ok());
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_block_proof_link_from_db, handle,
std::move(P));
}
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to get key block state: " << R.move_as_error();
} else {
td::actor::send_closure(SelfId, &FullNodeImpl::got_key_block_state, R.move_as_ok());
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::get_shard_state_from_db, handle,
std::move(P));
}
void FullNodeImpl::start_up() {
@ -484,7 +487,11 @@ void FullNodeImpl::start_up() {
std::make_unique<Callback>(actor_id(this)), std::move(P));
}
void FullNodeImpl::update_private_block_overlays() {
void FullNodeImpl::update_private_overlays() {
for (auto &p : private_ext_msg_overlays_) {
update_ext_msg_overlay(p.first, p.second);
}
private_block_overlays_.clear();
if (local_keys_.empty()) {
return;
@ -494,6 +501,16 @@ void FullNodeImpl::update_private_block_overlays() {
}
}
void FullNodeImpl::set_private_block_overlays_enable_compression(bool value) {
if (private_block_overlays_enable_compression_ == value) {
return;
}
private_block_overlays_enable_compression_ = true;
for (auto &p : private_block_overlays_) {
td::actor::send_closure(p.second, &FullNodePrivateBlockOverlay::set_enable_compression, value);
}
}
void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) {
CHECK(local_keys_.count(key));
if (current_validators_.count(key)) {
@ -501,9 +518,34 @@ void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) {
for (const auto &p : current_validators_) {
nodes.push_back(p.second);
}
private_block_overlays_[key] = td::actor::create_actor<FullNodePrivateOverlay>(
"BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_, keyring_,
adnl_, rldp_, rldp2_, overlays_, validator_manager_);
private_block_overlays_[key] = td::actor::create_actor<FullNodePrivateBlockOverlay>(
"BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_,
private_block_overlays_enable_compression_, keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_);
}
}
void FullNodeImpl::update_ext_msg_overlay(const std::string &name, ExtMsgOverlayInfo &overlay) {
auto old_actors = std::move(overlay.actors_);
overlay.actors_.clear();
auto try_local_id = [&](const adnl::AdnlNodeIdShort &local_id) {
if (std::find(overlay.nodes_.begin(), overlay.nodes_.end(), local_id) != overlay.nodes_.end()) {
auto it = old_actors.find(local_id);
if (it != old_actors.end()) {
overlay.actors_[local_id] = std::move(it->second);
old_actors.erase(it);
} else {
overlay.actors_[local_id] = td::actor::create_actor<FullNodePrivateExtMsgOverlay>(
"ExtMsgPrivateOverlay", local_id, overlay.nodes_, overlay.senders_, name, zero_state_file_hash_, config_,
keyring_, adnl_, rldp_, rldp2_, overlays_, validator_manager_);
}
}
};
try_local_id(adnl_id_);
for (const PublicKeyHash &local_key: local_keys_) {
auto it = current_validators_.find(local_key);
if (it != current_validators_.end()) {
try_local_id(it->second);
}
}
}

View file

@ -74,6 +74,11 @@ class FullNode : public td::actor::Actor {
virtual void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) = 0;
virtual void set_config(FullNodeConfig config) = 0;
virtual void add_ext_msg_overlay(std::vector<adnl::AdnlNodeIdShort> nodes,
std::map<adnl::AdnlNodeIdShort, int> senders, std::string name,
td::Promise<td::Unit> promise) = 0;
virtual void del_ext_msg_overlay(std::string name, td::Promise<td::Unit> promise) = 0;
static constexpr td::uint32 max_block_size() {
return 4 << 20;
}

View file

@ -53,6 +53,10 @@ class FullNodeImpl : public FullNode {
void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) override;
void set_config(FullNodeConfig config) override;
void add_ext_msg_overlay(std::vector<adnl::AdnlNodeIdShort> nodes, std::map<adnl::AdnlNodeIdShort, int> senders,
std::string name, td::Promise<td::Unit> promise) override;
void del_ext_msg_overlay(std::string name, td::Promise<td::Unit> promise) override;
void add_shard(ShardIdFull shard);
void del_shard(ShardIdFull shard);
@ -76,8 +80,7 @@ class FullNodeImpl : public FullNode {
void download_archive(BlockSeqno masterchain_seqno, std::string tmp_dir, td::Timestamp timeout,
td::Promise<std::string> promise);
void got_key_block_proof(td::Ref<ProofLink> proof);
void got_zero_block_state(td::Ref<ShardState> state);
void got_key_block_state(td::Ref<ShardState> state);
void new_key_block(BlockHandle handle);
void start_up() override;
@ -117,10 +120,21 @@ class FullNodeImpl : public FullNode {
std::set<PublicKeyHash> local_keys_;
FullNodeConfig config_;
std::map<PublicKeyHash, td::actor::ActorOwn<FullNodePrivateOverlay>> private_block_overlays_;
std::map<PublicKeyHash, td::actor::ActorOwn<FullNodePrivateBlockOverlay>> private_block_overlays_;
bool private_block_overlays_enable_compression_ = false;
void update_private_block_overlays();
struct ExtMsgOverlayInfo {
std::vector<adnl::AdnlNodeIdShort> nodes_;
std::map<adnl::AdnlNodeIdShort, int> senders_;
std::map<adnl::AdnlNodeIdShort, td::actor::ActorOwn<FullNodePrivateExtMsgOverlay>>
actors_; // our local id -> actor
};
std::map<std::string, ExtMsgOverlayInfo> private_ext_msg_overlays_;
void update_private_overlays();
void set_private_block_overlays_enable_compression(bool value);
void create_private_block_overlay(PublicKeyHash key);
void update_ext_msg_overlay(const std::string& name, ExtMsgOverlayInfo& overlay);
};
} // namespace fullnode

View file

@ -183,7 +183,12 @@ class Collator final : public td::actor::Actor {
block::ValueFlow value_flow_{block::ValueFlow::SetZero()};
std::unique_ptr<vm::AugmentedDictionary> fees_import_dict_;
std::map<ton::Bits256, int> ext_msg_map;
std::vector<std::pair<Ref<vm::Cell>, ExtMessage::Hash>> ext_msg_list_;
struct ExtMsg {
Ref<vm::Cell> cell;
ExtMessage::Hash hash;
int priority;
};
std::vector<ExtMsg> ext_msg_list_;
std::priority_queue<NewOutMsg, std::vector<NewOutMsg>, std::greater<NewOutMsg>> new_msgs;
std::pair<ton::LogicalTime, ton::Bits256> last_proc_int_msg_, first_unproc_int_msg_;
std::unique_ptr<vm::AugmentedDictionary> in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_;
@ -268,8 +273,9 @@ class Collator final : public td::actor::Actor {
bool is_our_address(Ref<vm::CellSlice> addr_ref) const;
bool is_our_address(ton::AccountIdPrefixFull addr_prefix) const;
bool is_our_address(const ton::StdSmcAddress& addr) const;
void after_get_external_messages(td::Result<std::vector<Ref<ExtMessage>>> res);
td::Result<bool> register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash);
void after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res);
td::Result<bool> register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash,
int priority);
// td::Result<bool> register_external_message(td::Slice ext_msg_boc);
void register_new_msg(block::NewOutMsg msg);
void register_new_msgs(block::transaction::Transaction& trans);

View file

@ -49,6 +49,7 @@ static const td::uint32 FORCE_SPLIT_QUEUE_SIZE = 4096;
static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000;
static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047;
static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000;
static const int HIGH_PRIORITY_EXTERNAL = 10; // don't skip high priority externals when queue is big
#define DBG(__n) dbg(__n)&&
#define DSTART int __dcnt = 0;
@ -256,11 +257,10 @@ void Collator::start_up() {
LOG(DEBUG) << "sending get_external_messages() query to Manager";
++pending;
td::actor::send_closure_later(manager, &ValidatorManager::get_external_messages, shard_,
[self = get_self()](td::Result<std::vector<Ref<ExtMessage>>> res) -> void {
LOG(DEBUG) << "got answer to get_external_messages() query";
td::actor::send_closure_later(
std::move(self), &Collator::after_get_external_messages, std::move(res));
});
[self = get_self()](td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res) -> void {
LOG(DEBUG) << "got answer to get_external_messages() query";
td::actor::send_closure_later(std::move(self), &Collator::after_get_external_messages, std::move(res));
});
}
if (is_masterchain() && !is_hardfork_) {
// 5. load shard block info messages
@ -3413,12 +3413,15 @@ bool Collator::process_inbound_external_messages() {
return true;
}
if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE) {
LOG(INFO) << "skipping processing of inbound external messages because out_msg_queue is too big ("
LOG(INFO) << "skipping processing of inbound external messages (except for high-priority) because out_msg_queue is "
"too big ("
<< out_msg_queue_size_ << " > " << SKIP_EXTERNALS_QUEUE_SIZE << ")";
return true;
}
bool full = !block_limit_status_->fits(block::ParamLimits::cl_soft);
for (auto& ext_msg_pair : ext_msg_list_) {
for (auto& ext_msg_struct : ext_msg_list_) {
if (out_msg_queue_size_ > SKIP_EXTERNALS_QUEUE_SIZE && ext_msg_struct.priority < HIGH_PRIORITY_EXTERNAL) {
continue;
}
if (full) {
LOG(INFO) << "BLOCK FULL, stop processing external messages";
break;
@ -3427,15 +3430,15 @@ bool Collator::process_inbound_external_messages() {
LOG(WARNING) << "medium timeout reached, stop processing inbound external messages";
break;
}
auto ext_msg = ext_msg_pair.first;
auto ext_msg = ext_msg_struct.cell;
ton::Bits256 hash{ext_msg->get_hash().bits()};
int r = process_external_message(std::move(ext_msg));
if (r < 0) {
bad_ext_msgs_.emplace_back(ext_msg_pair.second);
bad_ext_msgs_.emplace_back(ext_msg_struct.hash);
return false;
}
if (!r) {
delay_ext_msgs_.emplace_back(ext_msg_pair.second);
delay_ext_msgs_.emplace_back(ext_msg_struct.hash);
}
if (r > 0) {
full = !block_limit_status_->fits(block::ParamLimits::cl_soft);
@ -5062,7 +5065,8 @@ void Collator::return_block_candidate(td::Result<td::Unit> saved) {
* - If the external message has been previuosly registered and accepted, returns false.
* - Otherwise returns true.
*/
td::Result<bool> Collator::register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash) {
td::Result<bool> Collator::register_external_message_cell(Ref<vm::Cell> ext_msg, const ExtMessage::Hash& ext_hash,
int priority) {
if (ext_msg->get_level() != 0) {
return td::Status::Error("external message must have zero level");
}
@ -5106,7 +5110,7 @@ td::Result<bool> Collator::register_external_message_cell(Ref<vm::Cell> ext_msg,
block::gen::t_Message_Any.print_ref(std::cerr, ext_msg);
}
ext_msg_map.emplace(hash, 1);
ext_msg_list_.emplace_back(std::move(ext_msg), ext_hash);
ext_msg_list_.push_back({std::move(ext_msg), ext_hash, priority});
return true;
}
@ -5115,18 +5119,21 @@ td::Result<bool> Collator::register_external_message_cell(Ref<vm::Cell> ext_msg,
*
* @param res The result of the external message retrieval operation.
*/
void Collator::after_get_external_messages(td::Result<std::vector<Ref<ExtMessage>>> res) {
void Collator::after_get_external_messages(td::Result<std::vector<std::pair<Ref<ExtMessage>, int>>> res) {
// res: pair {ext msg, priority}
--pending;
if (res.is_error()) {
fatal_error(res.move_as_error());
return;
}
auto vect = res.move_as_ok();
for (auto&& ext_msg : vect) {
for (auto& p : vect) {
auto& ext_msg = p.first;
int priority = p.second;
Ref<vm::Cell> ext_msg_cell = ext_msg->root_cell();
bool err = ext_msg_cell.is_null();
if (!err) {
auto reg_res = register_external_message_cell(std::move(ext_msg_cell), ext_msg->hash());
auto reg_res = register_external_message_cell(std::move(ext_msg_cell), ext_msg->hash(), priority);
if (reg_res.is_error() || !reg_res.move_as_ok()) {
err = true;
}

View file

@ -104,7 +104,8 @@ class ValidatorManager : public ValidatorManagerInterface {
td::Promise<td::Ref<MessageQueue>> promise) = 0;
virtual void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<MessageQueue>> promise) = 0;
virtual void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) = 0;
virtual void get_external_messages(ShardIdFull shard,
td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) = 0;
virtual void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) = 0;
virtual void get_shard_blocks(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) = 0;

View file

@ -259,7 +259,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro
td::actor::send_closure(db_, &Db::get_key_block_proof, block_id, std::move(P));
}
void ValidatorManagerImpl::new_external_message(td::BufferSlice data) {
void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) {
if (last_masterchain_state_.is_null()) {
return;
}
@ -507,9 +507,13 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t
get_block_handle(block_id, true, std::move(P));
}
void ValidatorManagerImpl::get_external_messages(ShardIdFull shard,
td::Promise<std::vector<td::Ref<ExtMessage>>> promise) {
promise.set_result(ext_messages_);
void ValidatorManagerImpl::get_external_messages(
ShardIdFull shard, td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) {
std::vector<std::pair<td::Ref<ExtMessage>, int>> res;
for (const auto& x : ext_messages_) {
res.emplace_back(x, 0);
}
promise.set_result(std::move(res));
}
void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) {

View file

@ -124,7 +124,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void get_key_block_proof_link(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
//void get_block_description(BlockIdExt block_id, td::Promise<BlockDescription> promise) override;
void new_external_message(td::BufferSlice data) override;
void new_external_message(td::BufferSlice data, int priority) override;
void check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) override {
UNREACHABLE();
}
@ -188,7 +188,8 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Ref<MessageQueue>> promise) override;
void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<MessageQueue>> promise) override;
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
void get_external_messages(ShardIdFull shard,
td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) override;
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
void get_shard_blocks(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
@ -244,7 +245,7 @@ class ValidatorManagerImpl : public ValidatorManager {
UNREACHABLE();
}
void send_external_message(td::Ref<ExtMessage> message) override {
new_external_message(message->serialize());
new_external_message(message->serialize(), 0);
}
void send_ihr_message(td::Ref<IhrMessage> message) override {
new_ihr_message(message->serialize());

View file

@ -150,7 +150,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro
td::actor::send_closure(db_, &Db::get_key_block_proof_link, block_id, std::move(P));
}
void ValidatorManagerImpl::new_external_message(td::BufferSlice data) {
void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) {
auto R = create_ext_message(std::move(data), block::SizeLimitsConfig::ExtMsgLimits());
if (R.is_ok()) {
ext_messages_.emplace_back(R.move_as_ok());
@ -357,9 +357,13 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t
get_block_handle(block_id, true, std::move(P));
}
void ValidatorManagerImpl::get_external_messages(ShardIdFull shard,
td::Promise<std::vector<td::Ref<ExtMessage>>> promise) {
promise.set_result(ext_messages_);
void ValidatorManagerImpl::get_external_messages(
ShardIdFull shard, td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) {
std::vector<std::pair<td::Ref<ExtMessage>, int>> res;
for (const auto &x : ext_messages_) {
res.emplace_back(x, 0);
}
promise.set_result(std::move(res));
}
void ValidatorManagerImpl::get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) {

View file

@ -144,7 +144,7 @@ class ValidatorManagerImpl : public ValidatorManager {
void get_key_block_proof(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
void get_key_block_proof_link(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
void new_external_message(td::BufferSlice data) override;
void new_external_message(td::BufferSlice data, int priority) override;
void check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) override {
UNREACHABLE();
}
@ -228,7 +228,8 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Ref<MessageQueue>> promise) override;
void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<MessageQueue>> promise) override;
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
void get_external_messages(ShardIdFull shard,
td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) override;
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
void get_shard_blocks(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;
@ -308,7 +309,7 @@ class ValidatorManagerImpl : public ValidatorManager {
UNREACHABLE();
}
void send_external_message(td::Ref<ExtMessage> message) override {
new_external_message(message->serialize());
new_external_message(message->serialize(), 0);
}
void send_ihr_message(td::Ref<IhrMessage> message) override {
new_ihr_message(message->serialize());

View file

@ -368,7 +368,7 @@ void ValidatorManagerImpl::get_key_block_proof_link(BlockIdExt block_id, td::Pro
td::actor::send_closure(db_, &Db::get_key_block_proof, block_id, std::move(P));
}
void ValidatorManagerImpl::new_external_message(td::BufferSlice data) {
void ValidatorManagerImpl::new_external_message(td::BufferSlice data, int priority) {
if (!is_validator()) {
return;
}
@ -376,7 +376,7 @@ void ValidatorManagerImpl::new_external_message(td::BufferSlice data) {
VLOG(VALIDATOR_NOTICE) << "dropping ext message: validator is not ready";
return;
}
if (ext_messages_.size() > max_mempool_num()) {
if (ext_msgs_[priority].ext_messages_.size() > (size_t)max_mempool_num()) {
return;
}
auto R = create_ext_message(std::move(data), last_masterchain_state_->get_ext_msg_limits());
@ -384,21 +384,30 @@ void ValidatorManagerImpl::new_external_message(td::BufferSlice data) {
VLOG(VALIDATOR_NOTICE) << "dropping bad ext message: " << R.move_as_error();
return;
}
add_external_message(R.move_as_ok());
add_external_message(R.move_as_ok(), priority);
}
void ValidatorManagerImpl::add_external_message(td::Ref<ExtMessage> msg) {
void ValidatorManagerImpl::add_external_message(td::Ref<ExtMessage> msg, int priority) {
auto &msgs = ext_msgs_[priority];
auto message = std::make_unique<MessageExt<ExtMessage>>(msg);
auto id = message->ext_id();
auto address = message->address();
unsigned long per_address_limit = 256;
if(ext_addr_messages_.count(address) < per_address_limit) {
if (ext_messages_hashes_.count(id.hash) == 0) {
ext_messages_.emplace(id, std::move(message));
ext_messages_hashes_.emplace(id.hash, id);
ext_addr_messages_[address].emplace(id.hash, id);
}
auto it = msgs.ext_addr_messages_.find(address);
if (it != msgs.ext_addr_messages_.end() && it->second.size() >= per_address_limit) {
return;
}
auto it2 = ext_messages_hashes_.find(id.hash);
if (it2 != ext_messages_hashes_.end()) {
int old_priority = it2->second.first;
if (old_priority >= priority) {
return;
}
ext_msgs_[old_priority].erase(id);
}
msgs.ext_messages_.emplace(id, std::move(message));
msgs.ext_addr_messages_[address].emplace(id.hash, id);
ext_messages_hashes_[id.hash] = {priority, id};
}
void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) {
++ls_stats_check_ext_messages_;
@ -783,34 +792,44 @@ void ValidatorManagerImpl::wait_block_message_queue_short(BlockIdExt block_id, t
get_block_handle(block_id, true, std::move(P));
}
void ValidatorManagerImpl::get_external_messages(ShardIdFull shard,
td::Promise<std::vector<td::Ref<ExtMessage>>> promise) {
void ValidatorManagerImpl::get_external_messages(
ShardIdFull shard, td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) {
td::Timer t;
size_t processed = 0, deleted = 0;
std::vector<td::Ref<ExtMessage>> res;
std::vector<std::pair<td::Ref<ExtMessage>, int>> res;
MessageId<ExtMessage> left{AccountIdPrefixFull{shard.workchain, shard.shard & (shard.shard - 1)}, Bits256::zero()};
auto it = ext_messages_.lower_bound(left);
while (it != ext_messages_.end()) {
auto s = it->first;
if (!shard_contains(shard, s.dst)) {
break;
size_t total_msgs = 0;
td::Random::Fast rnd;
for (auto iter = ext_msgs_.rbegin(); iter != ext_msgs_.rend(); ++iter) {
std::vector<std::pair<td::Ref<ExtMessage>, int>> cur_res;
int priority = iter->first;
auto &msgs = iter->second;
auto it = msgs.ext_messages_.lower_bound(left);
while (it != msgs.ext_messages_.end()) {
auto s = it->first;
if (!shard_contains(shard, s.dst)) {
break;
}
++processed;
if (it->second->expired()) {
msgs.ext_addr_messages_[it->second->address()].erase(it->first.hash);
ext_messages_hashes_.erase(it->first.hash);
it = msgs.ext_messages_.erase(it);
++deleted;
continue;
}
if (it->second->is_active()) {
cur_res.emplace_back(it->second->message(), priority);
}
it++;
}
++processed;
if (it->second->expired()) {
ext_addr_messages_[it->second->address()].erase(it->first.hash);
ext_messages_hashes_.erase(it->first.hash);
it = ext_messages_.erase(it);
++deleted;
continue;
}
if (it->second->is_active()) {
res.push_back(it->second->message());
}
it++;
td::random_shuffle(td::as_mutable_span(cur_res), rnd);
res.insert(res.end(), cur_res.begin(), cur_res.end());
total_msgs += msgs.ext_messages_.size();
}
LOG(WARNING) << "get_external_messages to shard " << shard.to_str() << " : time=" << t.elapsed()
<< " result_size=" << res.size() << " processed=" << processed << " expired=" << deleted
<< " total_size=" << ext_messages_.size();
<< " total_size=" << total_msgs;
promise.set_value(std::move(res));
}
@ -850,8 +869,9 @@ void ValidatorManagerImpl::complete_external_messages(std::vector<ExtMessage::Ha
for (auto &hash : to_delete) {
auto it = ext_messages_hashes_.find(hash);
if (it != ext_messages_hashes_.end()) {
ext_addr_messages_[ext_messages_[it->second]->address()].erase(it->first);
CHECK(ext_messages_.erase(it->second));
int priority = it->second.first;
auto msg_id = it->second.second;
ext_msgs_[priority].erase(msg_id);
ext_messages_hashes_.erase(it);
}
}
@ -859,12 +879,14 @@ void ValidatorManagerImpl::complete_external_messages(std::vector<ExtMessage::Ha
for (auto &hash : to_delay) {
auto it = ext_messages_hashes_.find(hash);
if (it != ext_messages_hashes_.end()) {
auto it2 = ext_messages_.find(it->second);
if ((ext_messages_.size() < soft_mempool_limit) && it2->second->can_postpone()) {
int priority = it->second.first;
auto msg_id = it->second.second;
auto &msgs = ext_msgs_[priority];
auto it2 = msgs.ext_messages_.find(msg_id);
if ((msgs.ext_messages_.size() < soft_mempool_limit) && it2->second->can_postpone()) {
it2->second->postpone();
} else {
ext_addr_messages_[it2->second->address()].erase(it2->first.hash);
ext_messages_.erase(it2);
msgs.erase(msg_id);
ext_messages_hashes_.erase(it);
}
}
@ -1460,7 +1482,7 @@ void ValidatorManagerImpl::send_get_next_key_blocks_request(BlockIdExt block_id,
void ValidatorManagerImpl::send_external_message(td::Ref<ExtMessage> message) {
callback_->send_ext_message(message->shard(), message->serialize());
add_external_message(std::move(message));
add_external_message(std::move(message), 0);
}
void ValidatorManagerImpl::send_ihr_message(td::Ref<IhrMessage> message) {
@ -2105,6 +2127,9 @@ td::actor::ActorOwn<ValidatorGroup> ValidatorManagerImpl::create_validator_group
if (check_gc_list_.count(session_id) == 1) {
return td::actor::ActorOwn<ValidatorGroup>{};
} else {
// Call get_external_messages to cleanup mempool for the shard
get_external_messages(shard, [](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
auto validator_id = get_validator(shard, validator_set);
CHECK(!validator_id.is_zero());
auto G = td::actor::create_actor<ValidatorGroup>(

View file

@ -220,9 +220,19 @@ class ValidatorManagerImpl : public ValidatorManager {
};
// DATA FOR COLLATOR
std::map<ShardTopBlockDescriptionId, td::Ref<ShardTopBlockDescription>> shard_blocks_;
std::map<MessageId<ExtMessage>, std::unique_ptr<MessageExt<ExtMessage>>> ext_messages_;
std::map<std::pair<ton::WorkchainId,ton::StdSmcAddress>, std::map<ExtMessage::Hash, MessageId<ExtMessage>>> ext_addr_messages_;
std::map<ExtMessage::Hash, MessageId<ExtMessage>> ext_messages_hashes_;
struct ExtMessages {
std::map<MessageId<ExtMessage>, std::unique_ptr<MessageExt<ExtMessage>>> ext_messages_;
std::map<std::pair<ton::WorkchainId, ton::StdSmcAddress>, std::map<ExtMessage::Hash, MessageId<ExtMessage>>>
ext_addr_messages_;
void erase(const MessageId<ExtMessage>& id) {
auto it = ext_messages_.find(id);
CHECK(it != ext_messages_.end());
ext_addr_messages_[it->second->address()].erase(id.hash);
ext_messages_.erase(it);
}
};
std::map<int, ExtMessages> ext_msgs_; // priority -> messages
std::map<ExtMessage::Hash, std::pair<int, MessageId<ExtMessage>>> ext_messages_hashes_; // hash -> priority
// IHR ?
std::map<MessageId<IhrMessage>, std::unique_ptr<MessageExt<IhrMessage>>> ihr_messages_;
std::map<IhrMessage::Hash, MessageId<IhrMessage>> ihr_messages_hashes_;
@ -349,8 +359,8 @@ class ValidatorManagerImpl : public ValidatorManager {
void get_key_block_proof_link(BlockIdExt block_id, td::Promise<td::BufferSlice> promise) override;
//void get_block_description(BlockIdExt block_id, td::Promise<BlockDescription> promise) override;
void new_external_message(td::BufferSlice data) override;
void add_external_message(td::Ref<ExtMessage> message);
void new_external_message(td::BufferSlice data, int priority) override;
void add_external_message(td::Ref<ExtMessage> message, int priority);
void check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) override;
void new_ihr_message(td::BufferSlice data) override;
@ -410,7 +420,8 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<td::Ref<MessageQueue>> promise) override;
void wait_block_message_queue_short(BlockIdExt id, td::uint32 priority, td::Timestamp timeout,
td::Promise<td::Ref<MessageQueue>> promise) override;
void get_external_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<ExtMessage>>> promise) override;
void get_external_messages(ShardIdFull shard,
td::Promise<std::vector<std::pair<td::Ref<ExtMessage>, int>>> promise) override;
void get_ihr_messages(ShardIdFull shard, td::Promise<std::vector<td::Ref<IhrMessage>>> promise) override;
void get_shard_blocks(BlockIdExt masterchain_block_id,
td::Promise<std::vector<td::Ref<ShardTopBlockDescription>>> promise) override;

View file

@ -198,7 +198,7 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void get_next_block(BlockIdExt block_id, td::Promise<BlockHandle> promise) = 0;
virtual void write_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0;
virtual void new_external_message(td::BufferSlice data) = 0;
virtual void new_external_message(td::BufferSlice data, int priority) = 0;
virtual void check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) = 0;
virtual void new_ihr_message(td::BufferSlice data) = 0;
virtual void new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) = 0;