1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Improve block broadcasts

This commit is contained in:
SpyCheese 2024-07-25 16:33:19 +03:00
parent 4c8d25ac1b
commit 0e7374610d
20 changed files with 125 additions and 278 deletions

View file

@ -237,8 +237,7 @@ class HardforkCreator : public td::actor::Actor {
td::PromiseCreator::lambda([](td::Unit) {}));
}
void on_new_masterchain_block(td::Ref<ton::validator::MasterchainState> state,
std::set<ton::ShardIdFull> shards_to_monitor,
std::set<ton::ShardIdFull> temporary_shards) override {
std::set<ton::ShardIdFull> shards_to_monitor) override {
}
void send_ihr_message(ton::AccountIdPrefixFull dst, td::BufferSlice data) override {
}

View file

@ -206,7 +206,10 @@ class OverlayMemberCertificate {
return expire_at_ < cur_time - 3;
}
auto tl() const {
tl_object_ptr<ton_api::overlay_MemberCertificate> tl() const {
if (empty()) {
return create_tl_object<ton_api::overlay_emptyMemberCertificate>();
}
return create_tl_object<ton_api::overlay_memberCertificate>(signed_by_.tl(), flags_, slot_, expire_at_,
signature_.clone_as_buffer_slice());
}

View file

@ -324,8 +324,7 @@ class TestNode : public td::actor::Actor {
td::PromiseCreator::lambda([](td::Unit) {}));
}
void on_new_masterchain_block(td::Ref<ton::validator::MasterchainState> state,
std::set<ton::ShardIdFull> shards_to_monitor,
std::set<ton::ShardIdFull> temporary_shards) override {
std::set<ton::ShardIdFull> shards_to_monitor) override {
}
void send_ihr_message(ton::AccountIdPrefixFull dst, td::BufferSlice data) override {
}

View file

@ -698,8 +698,8 @@ engine.validator.overlayStats overlay_id:int256 overlay_id_full:PublicKey adnl_i
engine.validator.overlaysStats overlays:(vector engine.validator.overlayStats) = engine.validator.OverlaysStats;
engine.validator.shardOverlayStats.neighbour id:string verison_major:int version_minor:int flags:#
roundtrip:double unreliability:double has_state:string = engine.validator.shardOverlayStats.Neighbour;
engine.validator.shardOverlayStats shard:string mode:string
roundtrip:double unreliability:double = engine.validator.shardOverlayStats.Neighbour;
engine.validator.shardOverlayStats shard:string active:Bool
neighbours:(vector engine.validator.shardOverlayStats.neighbour) = engine.validator.ShardOverlayStats;
engine.validator.fastSyncOverlayStats shard:string validators_adnl:(vector int256) root_public_keys:(vector int256)
member_certificate:overlay.MemberCertificate = engine.validator.FastSyncOverlayStats;

Binary file not shown.

View file

@ -256,8 +256,8 @@ static td::BufferSlice serialize_error(td::Status error) {
return create_serialize_tl_object<ton_api::collatorNode_generateBlockError>(error.code(), error.message().c_str());
}
static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey creator, CatchainSeqno* cc_seqno = nullptr,
td::uint32* val_set_hash = nullptr) {
static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey creator, CatchainSeqno& cc_seqno,
td::uint32& val_set_hash) {
CHECK(!block.id.is_masterchain());
if (block.pubkey == creator) {
return block;
@ -278,12 +278,8 @@ static BlockCandidate change_creator(BlockCandidate block, Ed25519_PublicKey cre
block.id.file_hash = block::compute_file_hash(block.data.as_slice());
block.pubkey = creator;
if (cc_seqno) {
*cc_seqno = info.gen_catchain_seqno;
}
if (val_set_hash) {
*val_set_hash = info.gen_validator_list_hash_short;
}
cc_seqno = info.gen_catchain_seqno;
val_set_hash = info.gen_validator_list_hash_short;
return block;
}
@ -305,8 +301,9 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data
};
if (!validator_adnl_ids_.count(src)) {
new_promise.set_error(td::Status::Error("src is not a validator"));
return;
}
TRY_RESULT_PROMISE(new_promise, f, fetch_tl_object<ton_api::collatorNode_generateBlock>(std::move(data), true));
TRY_RESULT_PROMISE(new_promise, f, fetch_tl_object<ton_api::collatorNode_generateBlock>(data, true));
ShardIdFull shard = create_shard_id(f->shard_);
CatchainSeqno cc_seqno = f->cc_seqno_;
std::vector<BlockIdExt> prev_blocks;
@ -319,7 +316,7 @@ void CollatorNode::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data
TRY_RESULT_PROMISE(new_promise, block, std::move(R));
CatchainSeqno cc_seqno;
td::uint32 val_set_hash;
block = change_creator(std::move(block), creator, &cc_seqno, &val_set_hash);
block = change_creator(std::move(block), creator, cc_seqno, val_set_hash);
td::Promise<td::Unit> P =
new_promise.wrap([block = block.clone()](td::Unit&&) mutable -> BlockCandidate { return std::move(block); });
td::actor::send_closure(manager, &ValidatorManager::set_block_candidate, block.id, std::move(block), cc_seqno,

View file

@ -255,7 +255,9 @@ void FullNodeFastSyncOverlay::get_stats_extra(td::Promise<std::string> promise)
for (const auto &x : root_public_keys_) {
res->root_public_keys_.push_back(x.bits256_value());
}
res->member_certificate_ = member_certificate_.tl();
if (!member_certificate_.empty()) {
res->member_certificate_ = member_certificate_.tl();
}
promise.set_result(td::json_encode<std::string>(td::ToJson(*res), true));
}

View file

@ -104,7 +104,7 @@ void FullNodeShardImpl::create_overlay() {
td::actor::ActorId<FullNodeShardImpl> node_;
};
overlay::OverlayOptions opts;
opts.announce_self_ = is_active();
opts.announce_self_ = active_;
td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay_ex, adnl_id_, overlay_id_full_.clone(),
std::make_unique<Callback>(actor_id(this)), rules_,
PSTRING() << "{ \"type\": \"shard\", \"shard_id\": " << get_shard()
@ -119,7 +119,7 @@ void FullNodeShardImpl::create_overlay() {
}
void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broadcast, td::Promise<td::Unit> promise) {
if (mode_ != FullNodeShardMode::active) {
if (!active_) {
return promise.set_error(td::Status::Error("cannot check broadcast: shard is not active"));
}
auto B = fetch_tl_object<ton_api::tonNode_externalMessageBroadcast>(std::move(broadcast), true);
@ -161,16 +161,16 @@ void FullNodeShardImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promis
create_overlay();
}
void FullNodeShardImpl::set_mode(FullNodeShardMode mode) {
void FullNodeShardImpl::set_active(bool active) {
if (shard_.is_masterchain()) {
return;
}
bool was_active = is_active();
mode_ = mode;
if (was_active != is_active()) {
td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, adnl_id_, overlay_id_);
create_overlay();
if (active_ == active) {
return;
}
active_ = active;
td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, adnl_id_, overlay_id_);
create_overlay();
}
void FullNodeShardImpl::try_get_next_block(td::Timestamp timeout, td::Promise<ReceivedBlock> promise) {
@ -219,7 +219,6 @@ void FullNodeShardImpl::got_next_block(td::Result<BlockHandle> R) {
}
void FullNodeShardImpl::get_next_block() {
//return;
attempt_++;
auto P = td::PromiseCreator::lambda([validator_manager = validator_manager_, attempt = attempt_,
block_id = handle_->id(), SelfId = actor_id(this)](td::Result<ReceivedBlock> R) {
@ -623,12 +622,8 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getCapabilities &query,
td::Promise<td::BufferSlice> promise) {
VLOG(FULL_NODE_DEBUG) << "Got query getCapabilities from " << src;
td::uint32 flags = 0;
if (mode_ != FullNodeShardMode::active) {
flags |= Neighbour::FLAG_NO_STATE;
}
promise.set_value(
create_serialize_tl_object<ton_api::tonNode_capabilities>(proto_version_major(), proto_version_minor(), flags));
create_serialize_tl_object<ton_api::tonNode_capabilities>(proto_version_major(), proto_version_minor(), 0));
}
void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveInfo &query,
@ -717,7 +712,7 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
void FullNodeShardImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query,
td::Promise<td::BufferSlice> promise) {
if (!is_active()) {
if (!active_) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_message, src, adnl_id_, overlay_id_,
create_serialize_tl_object<ton_api::tonNode_forgetPeer>());
promise.set_error(td::Status::Error("shard is inactive"));
@ -811,7 +806,7 @@ void FullNodeShardImpl::process_block_broadcast(PublicKeyHash src, ton_api::tonN
}
void FullNodeShardImpl::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (!is_active()) {
if (!active_) {
return;
}
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
@ -983,7 +978,7 @@ void FullNodeShardImpl::get_next_key_blocks(BlockIdExt block_id, td::Timestamp t
void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) {
auto &b = choose_neighbour(true);
auto &b = choose_neighbour();
td::actor::create_actor<DownloadArchiveSlice>(
"archive", masterchain_seqno, shard_prefix, std::move(tmp_dir), adnl_id_, overlay_id_, b.adnl_id, timeout,
validator_manager_, rldp2_, overlays_, adnl_, client_, create_neighbour_promise(b, std::move(promise)))
@ -994,7 +989,7 @@ void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std:
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
// TODO: maybe more complex download (like other requests here)
auto &b = choose_neighbour(true);
auto &b = choose_neighbour();
if (b.adnl_id == adnl::AdnlNodeIdShort::zero()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "no nodes"));
return;
@ -1213,22 +1208,11 @@ void FullNodeShardImpl::got_neighbours(std::vector<adnl::AdnlNodeIdShort> vec) {
continue;
}
if (neighbours_.size() == max_neighbours()) {
td::uint32 neighbours_with_state = 0;
for (const auto &n : neighbours_) {
if (n.second.has_state_known() && n.second.has_state()) {
++neighbours_with_state;
}
}
adnl::AdnlNodeIdShort a = adnl::AdnlNodeIdShort::zero();
adnl::AdnlNodeIdShort b = adnl::AdnlNodeIdShort::zero();
td::uint32 cnt = 0;
double u = 0;
for (auto &n : neighbours_) {
if (neighbours_with_state <= min_neighbours_with_state() && n.second.has_state_known() &&
n.second.has_state()) {
continue;
}
if (n.second.unreliability > u) {
u = n.second.unreliability;
a = n.first;
@ -1252,7 +1236,7 @@ void FullNodeShardImpl::got_neighbours(std::vector<adnl::AdnlNodeIdShort> vec) {
}
}
const Neighbour &FullNodeShardImpl::choose_neighbour(bool require_state) const {
const Neighbour &FullNodeShardImpl::choose_neighbour() const {
if (neighbours_.size() == 0) {
return Neighbour::zero;
}
@ -1261,40 +1245,30 @@ const Neighbour &FullNodeShardImpl::choose_neighbour(bool require_state) const {
for (auto &x : neighbours_) {
min_unreliability = std::min(min_unreliability, x.second.unreliability);
}
for (int attempt = 0; attempt < (require_state ? 2 : 1); ++attempt) {
const Neighbour *best = nullptr;
td::uint32 sum = 0;
const Neighbour *best = nullptr;
td::uint32 sum = 0;
for (auto &x : neighbours_) {
if (require_state) {
if (attempt == 0 && !(x.second.has_state_known() && x.second.has_state())) {
continue;
}
if (attempt == 1 && x.second.has_state_known()) {
continue;
}
}
auto unr = static_cast<td::uint32>(x.second.unreliability - min_unreliability);
for (auto &x : neighbours_) {
auto unr = static_cast<td::uint32>(x.second.unreliability - min_unreliability);
if (x.second.version_major < proto_version_major()) {
unr += 4;
} else if (x.second.version_major == proto_version_major() && x.second.version_minor < proto_version_minor()) {
unr += 2;
}
if (x.second.version_major < proto_version_major()) {
unr += 4;
} else if (x.second.version_major == proto_version_major() && x.second.version_minor < proto_version_minor()) {
unr += 2;
}
auto f = static_cast<td::uint32>(fail_unreliability());
auto f = static_cast<td::uint32>(fail_unreliability());
if (unr <= f) {
auto w = 1 << (f - unr);
sum += w;
if (td::Random::fast(0, sum - 1) <= w - 1) {
best = &x.second;
}
if (unr <= f) {
auto w = 1 << (f - unr);
sum += w;
if (td::Random::fast(0, sum - 1) <= w - 1) {
best = &x.second;
}
}
if (best) {
return *best;
}
}
if (best) {
return *best;
}
return Neighbour::zero;
}
@ -1361,17 +1335,7 @@ void FullNodeShardImpl::ping_neighbours() {
void FullNodeShardImpl::get_stats_extra(td::Promise<std::string> promise) {
auto res = create_tl_object<ton_api::engine_validator_shardOverlayStats>();
res->shard_ = shard_.to_str();
switch (mode_) {
case active:
res->mode_ = "active";
break;
case active_temp:
res->mode_ = "active_temp";
break;
case inactive:
res->mode_ = "inactive";
break;
}
res->active_ = active_;
for (const auto &p : neighbours_) {
const auto &n = p.second;
auto f = create_tl_object<ton_api::engine_validator_shardOverlayStats_neighbour>();
@ -1381,7 +1345,6 @@ void FullNodeShardImpl::get_stats_extra(td::Promise<std::string> promise) {
f->flags_ = n.flags;
f->roundtrip_ = n.roundtrip;
f->unreliability_ = n.unreliability;
f->has_state_ = (n.has_state_known() ? (n.has_state() ? "true" : "false") : "undefined");
res->neighbours_.push_back(std::move(f));
}
promise.set_result(td::json_encode<std::string>(td::ToJson(*res), true));
@ -1394,7 +1357,7 @@ FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client,
td::actor::ActorId<FullNode> full_node, FullNodeShardMode mode)
td::actor::ActorId<FullNode> full_node, bool active)
: shard_(shard)
, local_id_(local_id)
, adnl_id_(adnl_id)
@ -1407,7 +1370,7 @@ FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id,
, validator_manager_(validator_manager)
, client_(client)
, full_node_(full_node)
, mode_(mode)
, active_(active)
, config_(config) {
}
@ -1416,10 +1379,10 @@ td::actor::ActorOwn<FullNodeShard> FullNodeShard::create(
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, td::actor::ActorId<FullNode> full_node, FullNodeShardMode mode) {
td::actor::ActorId<adnl::AdnlExtClient> client, td::actor::ActorId<FullNode> full_node, bool active) {
return td::actor::create_actor<FullNodeShardImpl>(PSTRING() << "tonnode" << shard.to_str(), shard, local_id, adnl_id,
zero_state_file_hash, config, keyring, adnl, rldp, rldp2, overlays,
validator_manager, client, full_node, mode);
validator_manager, client, full_node, active);
}
} // namespace fullnode

View file

@ -28,12 +28,6 @@ namespace validator {
namespace fullnode {
enum FullNodeShardMode {
active, // Node can answer queries about the shard
active_temp, // Like 'active', but queries about shard state are not allowed (only blocks)
inactive // Node is not a part of the overlay
};
class FullNodeShard : public td::actor::Actor {
public:
virtual ~FullNodeShard() = default;
@ -42,7 +36,7 @@ class FullNodeShard : public td::actor::Actor {
virtual ShardIdFull get_shard_full() const = 0;
virtual void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) = 0;
virtual void set_mode(FullNodeShardMode mode) = 0;
virtual void set_active(bool active) = 0;
virtual void set_config(FullNodeConfig config) = 0;
virtual void send_ihr_message(td::BufferSlice data) = 0;
@ -85,8 +79,7 @@ class FullNodeShard : public td::actor::Actor {
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, td::actor::ActorId<FullNode> full_node,
FullNodeShardMode mode = FullNodeShardMode::active);
td::actor::ActorId<adnl::AdnlExtClient> client, td::actor::ActorId<FullNode> full_node, bool active);
};
} // namespace fullnode

View file

@ -40,22 +40,14 @@ struct Neighbour {
double roundtrip_weight = 0;
double unreliability = 0;
Neighbour(adnl::AdnlNodeIdShort adnl_id) : adnl_id(std::move(adnl_id)) {
explicit Neighbour(adnl::AdnlNodeIdShort adnl_id) : adnl_id(std::move(adnl_id)) {
}
void update_proto_version(ton_api::tonNode_capabilities &q);
void query_success(double t);
void query_failed();
void update_roundtrip(double t);
bool has_state() const {
return !(flags & FLAG_NO_STATE);
}
bool has_state_known() const {
return version_major != 0;
}
static Neighbour zero;
static constexpr td::uint32 FLAG_NO_STATE = 1;
};
class FullNodeShardImpl : public FullNodeShard {
@ -82,9 +74,6 @@ class FullNodeShardImpl : public FullNodeShard {
static constexpr td::uint32 max_neighbours() {
return 16;
}
static constexpr td::uint32 min_neighbours_with_state() {
return 10;
}
static constexpr double stop_unreliability() {
return 5.0;
}
@ -94,15 +83,12 @@ class FullNodeShardImpl : public FullNodeShard {
void create_overlay();
void update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) override;
void set_mode(FullNodeShardMode mode) override;
void set_active(bool active) override;
void set_config(FullNodeConfig config) override {
config_ = config;
}
//td::Result<Block> fetch_block(td::BufferSlice data);
void prevalidate_block(BlockIdExt block_id, td::BufferSlice data, td::BufferSlice proof,
td::Promise<ReceivedBlock> promise);
void try_get_next_block(td::Timestamp timestamp, td::Promise<ReceivedBlock> promise);
void got_next_block(td::Result<BlockHandle> block);
void get_next_block();
@ -155,8 +141,6 @@ class FullNodeShardImpl : public FullNodeShard {
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
td::Promise<td::BufferSlice> promise);
// void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_prepareNextKeyBlockProof &query,
// td::Promise<td::BufferSlice> promise);
void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query, td::Promise<td::BufferSlice> promise);
void receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data);
@ -222,16 +206,14 @@ class FullNodeShardImpl : public FullNodeShard {
void got_neighbours(std::vector<adnl::AdnlNodeIdShort> res);
void update_neighbour_stats(adnl::AdnlNodeIdShort adnl_id, double t, bool success);
void got_neighbour_capabilities(adnl::AdnlNodeIdShort adnl_id, double t, td::BufferSlice data);
const Neighbour &choose_neighbour(bool require_state = false) const;
const Neighbour &choose_neighbour() const;
template <typename T>
td::Promise<T> create_neighbour_promise(const Neighbour &x, td::Promise<T> p, bool require_state = false) {
return td::PromiseCreator::lambda([id = x.adnl_id, SelfId = actor_id(this), p = std::move(p), ts = td::Time::now(),
ignore_error = require_state && !x.has_state_known()](td::Result<T> R) mutable {
return td::PromiseCreator::lambda([id = x.adnl_id, SelfId = actor_id(this), p = std::move(p),
ts = td::Time::now()](td::Result<T> R) mutable {
if (R.is_error() && R.error().code() != ErrorCode::notready && R.error().code() != ErrorCode::cancelled) {
if (!ignore_error) {
td::actor::send_closure(SelfId, &FullNodeShardImpl::update_neighbour_stats, id, td::Time::now() - ts, false);
}
td::actor::send_closure(SelfId, &FullNodeShardImpl::update_neighbour_stats, id, td::Time::now() - ts, false);
} else {
td::actor::send_closure(SelfId, &FullNodeShardImpl::update_neighbour_stats, id, td::Time::now() - ts, true);
}
@ -245,17 +227,13 @@ class FullNodeShardImpl : public FullNodeShard {
td::actor::ActorId<rldp2::Rldp> rldp2, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client, td::actor::ActorId<FullNode> full_node,
FullNodeShardMode mode = FullNodeShardMode::active);
bool active);
private:
bool use_new_download() const {
return false;
}
bool is_active() const {
return mode_ != FullNodeShardMode::inactive;
}
ShardIdFull shard_;
BlockHandle handle_;
td::Promise<td::Unit> promise_;
@ -289,7 +267,7 @@ class FullNodeShardImpl : public FullNodeShard {
td::Timestamp ping_neighbours_at_;
adnl::AdnlNodeIdShort last_pinged_neighbour_ = adnl::AdnlNodeIdShort::zero();
FullNodeShardMode mode_;
bool active_;
FullNodeConfig config_;

View file

@ -198,81 +198,56 @@ void FullNodeImpl::initial_read_complete(BlockHandle top_handle) {
td::actor::send_closure(it->second.actor, &FullNodeShard::set_handle, top_handle, std::move(P));
}
void FullNodeImpl::on_new_masterchain_block(td::Ref<MasterchainState> state, std::set<ShardIdFull> shards_to_monitor,
std::set<ShardIdFull> temporary_shards) {
void FullNodeImpl::on_new_masterchain_block(td::Ref<MasterchainState> state, std::set<ShardIdFull> shards_to_monitor) {
CHECK(shards_to_monitor.count(ShardIdFull(masterchainId)));
std::set<ShardIdFull> new_shards;
std::map<ShardIdFull, FullNodeShardMode> new_active;
new_shards.insert(ShardIdFull(masterchainId));
bool join_all_overlays = !sign_cert_by_.is_zero();
std::set<ShardIdFull> all_shards;
std::set<ShardIdFull> new_active;
all_shards.insert(ShardIdFull(masterchainId));
std::set<WorkchainId> workchains;
auto cut_shard = [&](ShardIdFull shard) -> ShardIdFull {
int min_split = state->monitor_min_split_depth(shard.workchain);
return min_split < shard.pfx_len() ? shard_prefix(shard, min_split) : shard;
};
auto set_active = [&](ShardIdFull shard, FullNodeShardMode mode) {
while (new_active.emplace(shard, mode).second && shard.pfx_len() > 0) {
auto set_active = [&](ShardIdFull shard) {
while (new_active.emplace(shard).second && shard.pfx_len() > 0) {
shard = shard_parent(shard);
}
};
for (auto &info : state->get_shards()) {
workchains.insert(info->shard().workchain);
new_shards.insert(cut_shard(info->shard()));
all_shards.insert(cut_shard(info->shard()));
}
for (const auto &wpair : state->get_workchain_list()) {
ton::WorkchainId wc = wpair.first;
const block::WorkchainInfo *winfo = wpair.second.get();
if (workchains.count(wc) == 0 && winfo->active && winfo->enabled_since <= state->get_unix_time()) {
new_shards.insert(ShardIdFull(wc));
all_shards.insert(ShardIdFull(wc));
}
}
for (ShardIdFull shard : shards_to_monitor) {
set_active(cut_shard(shard), FullNodeShardMode::active);
}
for (ShardIdFull shard : temporary_shards) {
set_active(cut_shard(shard), FullNodeShardMode::active_temp);
set_active(cut_shard(shard));
}
auto info_set_mode = [&](ShardIdFull shard, ShardInfo &info, FullNodeShardMode mode) {
if (info.mode == mode) {
return;
}
if (info.actor.empty()) {
add_shard_actor(shard, mode);
return;
}
info.mode = mode;
td::actor::send_closure(info.actor, &FullNodeShard::set_mode, mode);
info.delete_at =
mode != FullNodeShardMode::inactive ? td::Timestamp::never() : td::Timestamp::in(INACTIVE_SHARD_TTL);
};
for (auto shard : new_shards) {
auto &info = shards_[shard];
info.exists = true;
}
for (auto &p : shards_) {
ShardIdFull shard = p.first;
ShardInfo &info = p.second;
info.exists = new_shards.count(shard);
auto it = new_active.find(shard);
info_set_mode(shard, info, it == new_active.end() ? FullNodeShardMode::inactive : it->second);
}
for (const auto &s : new_active) {
info_set_mode(s.first, shards_[s.first], s.second);
}
auto it = shards_.begin();
while (it != shards_.end()) {
if (it->second.mode == FullNodeShardMode::inactive && it->second.delete_at && it->second.delete_at.is_in_past()) {
it->second.actor.reset();
it->second.delete_at = td::Timestamp::never();
}
if (!it->second.exists && it->second.actor.empty()) {
it = shards_.erase(it);
} else {
for (auto it = shards_.begin(); it != shards_.end(); ) {
if (all_shards.count(it->first)) {
++it;
} else {
it = shards_.erase(it);
}
}
for (ShardIdFull shard : all_shards) {
bool active = new_active.count(shard);
bool overlay_exists = !shards_[shard].actor.empty();
if (active || join_all_overlays || overlay_exists) {
update_shard_actor(shard, active);
}
}
for (auto &[_, shard_info] : shards_) {
if (!shard_info.active && shard_info.delete_at && shard_info.delete_at.is_in_past() && !join_all_overlays) {
shard_info.actor = {};
shard_info.delete_at = td::Timestamp::never();
}
}
@ -298,18 +273,19 @@ void FullNodeImpl::on_new_masterchain_block(td::Ref<MasterchainState> state, std
}
}
void FullNodeImpl::add_shard_actor(ShardIdFull shard, FullNodeShardMode mode) {
void FullNodeImpl::update_shard_actor(ShardIdFull shard, bool active) {
ShardInfo &info = shards_[shard];
if (!info.actor.empty()) {
return;
}
info.actor = FullNodeShard::create(shard, local_id_, adnl_id_, zero_state_file_hash_, config_, keyring_, adnl_, rldp_,
rldp2_, overlays_, validator_manager_, client_, actor_id(this), mode);
info.mode = mode;
info.delete_at = mode != FullNodeShardMode::inactive ? td::Timestamp::never() : td::Timestamp::in(INACTIVE_SHARD_TTL);
if (all_validators_.size() > 0) {
td::actor::send_closure(info.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
if (info.actor.empty()) {
info.actor = FullNodeShard::create(shard, local_id_, adnl_id_, zero_state_file_hash_, config_, keyring_, adnl_, rldp_,
rldp2_, overlays_, validator_manager_, client_, actor_id(this), active);
if (!all_validators_.empty()) {
td::actor::send_closure(info.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
}
} else if (info.active != active) {
td::actor::send_closure(info.actor, &FullNodeShard::set_active, active);
}
info.active = active;
info.delete_at = active ? td::Timestamp::never() : td::Timestamp::in(INACTIVE_SHARD_TTL);
}
void FullNodeImpl::sync_completed() {
@ -506,15 +482,11 @@ void FullNodeImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vect
}
td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard) {
ShardIdFull shard0 = shard;
while (true) {
auto it = shards_.find(shard);
if (it != shards_.end() && it->second.exists) {
if (it != shards_.end()) {
if (it->second.actor.empty()) {
add_shard_actor(shard, FullNodeShardMode::inactive);
}
if (it->second.mode == FullNodeShardMode::inactive) {
it->second.delete_at = td::Timestamp::in(INACTIVE_SHARD_TTL);
update_shard_actor(shard, false);
}
return it->second.actor.get();
}
@ -523,19 +495,8 @@ td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard) {
}
shard = shard_parent(shard);
}
shard = shard0;
auto it = shards_.find(shard);
if (it == shards_.end()) {
it = shards_.emplace(shard = ShardIdFull(shard.workchain), ShardInfo{}).first;
}
auto &info = it->second;
if (info.actor.empty()) {
add_shard_actor(shard, FullNodeShardMode::inactive);
}
if (info.mode == FullNodeShardMode::inactive) {
info.delete_at = td::Timestamp::in(INACTIVE_SHARD_TTL);
}
return info.actor.get();
update_shard_actor(shard, false);
return shards_[shard].actor.get();
}
td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(AccountIdPrefixFull dst) {
@ -635,7 +596,7 @@ void FullNodeImpl::process_block_candidate_broadcast(BlockIdExt block_id, Catcha
}
void FullNodeImpl::start_up() {
add_shard_actor(ShardIdFull{masterchainId}, FullNodeShardMode::active);
update_shard_actor(ShardIdFull{masterchainId}, true);
if (local_id_.is_zero()) {
if (adnl_id_.is_zero()) {
auto pk = ton::PrivateKey{ton::privkeys::Ed25519::random()};
@ -651,10 +612,9 @@ void FullNodeImpl::start_up() {
void initial_read_complete(BlockHandle handle) override {
td::actor::send_closure(id_, &FullNodeImpl::initial_read_complete, handle);
}
void on_new_masterchain_block(td::Ref<MasterchainState> state, std::set<ShardIdFull> shards_to_monitor,
std::set<ShardIdFull> temporary_shards) override {
void on_new_masterchain_block(td::Ref<MasterchainState> state, std::set<ShardIdFull> shards_to_monitor) override {
td::actor::send_closure(id_, &FullNodeImpl::on_new_masterchain_block, std::move(state),
std::move(shards_to_monitor), std::move(temporary_shards));
std::move(shards_to_monitor));
}
void send_ihr_message(AccountIdPrefixFull dst, td::BufferSlice data) override {
td::actor::send_closure(id_, &FullNodeImpl::send_ihr_message, dst, std::move(data));
@ -716,7 +676,7 @@ void FullNodeImpl::start_up() {
td::actor::send_closure(id_, &FullNodeImpl::new_key_block, std::move(handle));
}
Callback(td::actor::ActorId<FullNodeImpl> id) : id_(id) {
explicit Callback(td::actor::ActorId<FullNodeImpl> id) : id_(id) {
}
private:

View file

@ -59,8 +59,7 @@ class FullNodeImpl : public FullNode {
void add_custom_overlay(CustomOverlayParams params, td::Promise<td::Unit> promise) override;
void del_custom_overlay(std::string name, td::Promise<td::Unit> promise) override;
void on_new_masterchain_block(td::Ref<MasterchainState> state, std::set<ShardIdFull> shards_to_monitor,
std::set<ShardIdFull> temporary_shards);
void on_new_masterchain_block(td::Ref<MasterchainState> state, std::set<ShardIdFull> shards_to_monitor);
void sync_completed();
@ -111,13 +110,12 @@ class FullNodeImpl : public FullNode {
private:
struct ShardInfo {
bool exists = false;
td::actor::ActorOwn<FullNodeShard> actor;
FullNodeShardMode mode = FullNodeShardMode::inactive;
bool active = false;
td::Timestamp delete_at = td::Timestamp::never();
};
void add_shard_actor(ShardIdFull shard, FullNodeShardMode mode);
void update_shard_actor(ShardIdFull shard, bool active);
PublicKeyHash local_id_;
adnl::AdnlNodeIdShort adnl_id_;

View file

@ -195,8 +195,6 @@ class ValidatorManager : public ValidatorManagerInterface {
virtual void add_lite_query_stats(int lite_query_id) {
}
virtual void validated_new_block(BlockIdExt block_id) = 0;
virtual void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) = 0;
static bool is_persistent_state(UnixTime ts, UnixTime prev_ts) {

View file

@ -437,8 +437,6 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Promise<tl_object_ptr<lite_api::liteServer_nonfinal_validatorGroups>> promise) override {
promise.set_result(td::Status::Error("not implemented"));
}
void validated_new_block(BlockIdExt block_id) override {
}
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) override {
}

View file

@ -503,8 +503,6 @@ class ValidatorManagerImpl : public ValidatorManager {
void update_options(td::Ref<ValidatorManagerOptions> opts) override {
opts_ = std::move(opts);
}
void validated_new_block(BlockIdExt block_id) override {
}
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) override {
}
void get_validator_sessions_info(

View file

@ -492,7 +492,7 @@ void ValidatorManagerImpl::new_ihr_message(td::BufferSlice data) {
}
void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) {
if (!is_validator() && !is_shard_collator(block_id.shard_full())) {
if (!is_validator() && !is_shard_collator(block_id.shard_full()) && !cached_block_candidates_.count(block_id)) {
return;
}
if (!last_masterchain_block_handle_) {
@ -2120,7 +2120,7 @@ void ValidatorManagerImpl::update_shard_overlays() {
shards_to_monitor.insert(ShardIdFull{wc, shardIdAll});
}
}
callback_->on_new_masterchain_block(last_masterchain_state_, std::move(shards_to_monitor), extra_active_shards_);
callback_->on_new_masterchain_block(last_masterchain_state_, std::move(shards_to_monitor));
}
void ValidatorManagerImpl::update_shards() {
@ -2138,7 +2138,6 @@ void ValidatorManagerImpl::update_shards() {
opts.proto_version = std::max<td::uint32>(opts.proto_version, 1);
}
auto opts_hash = opts.get_hash();
extra_active_shards_.clear();
std::map<ShardIdFull, std::vector<BlockIdExt>> new_shards;
std::set<ShardIdFull> future_shards;
@ -2193,11 +2192,6 @@ void ValidatorManagerImpl::update_shards() {
default:
LOG(FATAL) << "state=" << static_cast<td::uint32>(v->fsm_state());
}
cleanup_last_validated_blocks(v->top_block_id().id);
}
for (const auto& s : last_validated_blocks_) {
extra_active_shards_.insert(s.first);
}
new_shards.emplace(ShardIdFull{masterchainId, shardIdAll}, std::vector<BlockIdExt>{last_masterchain_block_id_});
@ -2270,7 +2264,6 @@ void ValidatorManagerImpl::update_shards() {
auto validator_id = get_validator(shard, val_set);
if (!validator_id.is_zero()) {
extra_active_shards_.insert(shard);
auto val_group_id = get_validator_set_id(shard, val_set, opts_hash, key_seqno, opts);
if (force_recover) {
@ -2366,24 +2359,6 @@ void ValidatorManagerImpl::update_shards() {
}
}
void ValidatorManagerImpl::cleanup_last_validated_blocks(BlockId new_block) {
auto process_shard = [&, this](ShardIdFull shard) {
auto it = last_validated_blocks_.find(shard);
if (it != last_validated_blocks_.end() && it->second < new_block.seqno) {
last_validated_blocks_.erase(it);
}
};
ShardIdFull shard = new_block.shard_full();
process_shard(shard);
if (shard.pfx_len() > 0) {
process_shard(shard_parent(shard));
}
if (shard.pfx_len() < max_shard_pfx_len) {
process_shard(shard_child(shard, true));
process_shard(shard_child(shard, false));
}
}
void ValidatorManagerImpl::written_destroyed_validator_sessions(std::vector<td::actor::ActorId<ValidatorGroup>> list) {
for (auto &v : list) {
td::actor::send_closure(v, &ValidatorGroup::destroy);
@ -2463,7 +2438,7 @@ td::actor::ActorOwn<ValidatorGroup> ValidatorManagerImpl::create_validator_group
auto validator_id = get_validator(shard, validator_set);
CHECK(!validator_id.is_zero());
auto G = td::actor::create_actor<ValidatorGroup>(
"validatorgroup", shard, validator_id, session_id, validator_set,
PSTRING() << "valgroup" << shard.to_str(), shard, validator_id, session_id, validator_set,
last_masterchain_state_->get_collator_config(true), opts, keyring_, adnl_, rldp_, overlays_, db_root_,
actor_id(this), init_session, opts_->check_unsafe_resync_allowed(validator_set->get_catchain_seqno()), opts_,
opts_->need_monitor(shard, last_masterchain_state_));

View file

@ -624,11 +624,6 @@ class ValidatorManagerImpl : public ValidatorManager {
void get_validator_sessions_info(
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) override;
void validated_new_block(BlockIdExt block_id) override {
BlockSeqno &last = last_validated_blocks_[block_id.shard_full()];
last = std::max(last, block_id.seqno());
}
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) override;
void add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override;
@ -745,8 +740,6 @@ class ValidatorManagerImpl : public ValidatorManager {
return 3 * 10;
}
void cleanup_last_validated_blocks(BlockId new_block);
void got_persistent_state_descriptions(std::vector<td::Ref<PersistentStateDescription>> descs);
void add_persistent_state_description_impl(td::Ref<PersistentStateDescription> desc);
td::Ref<PersistentStateDescription> get_block_persistent_state(BlockIdExt block_id);
@ -773,9 +766,6 @@ class ValidatorManagerImpl : public ValidatorManager {
};
std::map<adnl::AdnlNodeIdShort, Collator> collator_nodes_;
std::set<ShardIdFull> extra_active_shards_;
std::map<ShardIdFull, BlockSeqno> last_validated_blocks_;
std::map<BlockSeqno, td::Ref<PersistentStateDescription>> persistent_state_descriptions_;
std::map<BlockIdExt, td::Ref<PersistentStateDescription>> persistent_state_blocks_;
};

View file

@ -177,9 +177,6 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData> block, std::vector<BlockIdExt> prev,
td::Ref<BlockSignatureSet> sig_set, td::Ref<BlockSignatureSet> approve_sig_set,
bool send_broadcast, td::Promise<td::Unit> promise, bool is_retry) {
if (!is_retry) {
td::actor::send_closure(manager_, &ValidatorManager::validated_new_block, block_id);
}
auto P = td::PromiseCreator::lambda([=, SelfId = actor_id(this),
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
@ -197,7 +194,7 @@ void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData>
});
run_accept_block_query(block_id, std::move(block), std::move(prev), validator_set_, std::move(sig_set),
std::move(approve_sig_set), send_broadcast, apply_blocks_, manager_, std::move(P));
std::move(approve_sig_set), send_broadcast, monitoring_shard_, manager_, std::move(P));
}
void ValidatorGroup::skip_round(td::uint32 round_id) {

View file

@ -68,7 +68,7 @@ class ValidatorGroup : public td::actor::Actor {
void update_options(td::Ref<ValidatorManagerOptions> opts, bool apply_blocks) {
opts_ = std::move(opts);
apply_blocks_ = apply_blocks;
monitoring_shard_ = apply_blocks;
}
ValidatorGroup(ShardIdFull shard, PublicKeyHash local_id, ValidatorSessionId session_id,
@ -77,7 +77,7 @@ class ValidatorGroup : public td::actor::Actor {
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<overlay::Overlays> overlays,
std::string db_root, td::actor::ActorId<ValidatorManager> validator_manager, bool create_session,
bool allow_unsafe_self_blocks_resync, td::Ref<ValidatorManagerOptions> opts, bool apply_blocks)
bool allow_unsafe_self_blocks_resync, td::Ref<ValidatorManagerOptions> opts, bool monitoring_shard)
: shard_(shard)
, local_id_(std::move(local_id))
, session_id_(session_id)
@ -93,7 +93,7 @@ class ValidatorGroup : public td::actor::Actor {
, init_(create_session)
, allow_unsafe_self_blocks_resync_(allow_unsafe_self_blocks_resync)
, opts_(std::move(opts))
, apply_blocks_(apply_blocks) {
, monitoring_shard_(monitoring_shard) {
}
private:
@ -141,7 +141,7 @@ class ValidatorGroup : public td::actor::Actor {
bool allow_unsafe_self_blocks_resync_;
td::Ref<ValidatorManagerOptions> opts_;
td::uint32 last_known_round_id_ = 0;
bool apply_blocks_ = true;
bool monitoring_shard_ = true;
struct CachedCollatedBlock {
td::optional<BlockCandidate> result;
@ -151,7 +151,7 @@ class ValidatorGroup : public td::actor::Actor {
void generated_block_candidate(std::shared_ptr<CachedCollatedBlock> cache, td::Result<BlockCandidate> R);
typedef std::tuple<td::Bits256, BlockIdExt, FileHash, FileHash> CacheKey;
using CacheKey = std::tuple<td::Bits256, BlockIdExt, FileHash, FileHash>;
std::map<CacheKey, UnixTime> approved_candidates_cache_;
void update_approve_cache(CacheKey key, UnixTime value);

View file

@ -170,8 +170,7 @@ class ValidatorManagerInterface : public td::actor::Actor {
virtual void initial_read_complete(BlockHandle top_masterchain_blocks) = 0;
virtual void on_new_masterchain_block(td::Ref<ton::validator::MasterchainState> state,
std::set<ShardIdFull> shards_to_monitor,
std::set<ShardIdFull> temporary_shards) = 0;
std::set<ShardIdFull> shards_to_monitor) = 0;
virtual void send_ihr_message(AccountIdPrefixFull dst, td::BufferSlice data) = 0;
virtual void send_ext_message(AccountIdPrefixFull dst, td::BufferSlice data) = 0;