1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-13 11:42:18 +00:00

Simplify selecting shards for monitor

This commit is contained in:
SpyCheese 2022-10-03 17:55:37 +03:00
parent c2dde00459
commit 81c0e920c5
11 changed files with 66 additions and 75 deletions

View file

@ -670,7 +670,7 @@ void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ne
void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) { void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
BlockIdExt block_id = create_block_id(query.id_); BlockIdExt block_id = create_block_id(query.id_);
//if (block_id.shard_full() != shard_) { //if (!shard_is_ancestor(shard_, block_id.shard_full())) {
// LOG(FULL_NODE_WARNING) << "dropping block broadcast: shard mismatch. overlay=" << shard_.to_str() // LOG(FULL_NODE_WARNING) << "dropping block broadcast: shard mismatch. overlay=" << shard_.to_str()
// << " block=" << block_id.to_str(); // << " block=" << block_id.to_str();
// return; // return;

View file

@ -135,10 +135,18 @@ void FullNodeImpl::initial_read_complete(BlockHandle top_handle) {
void FullNodeImpl::update_shard_configuration(td::Ref<MasterchainState> state, std::set<ShardIdFull> shards_to_monitor, void FullNodeImpl::update_shard_configuration(td::Ref<MasterchainState> state, std::set<ShardIdFull> shards_to_monitor,
std::set<ShardIdFull> temporary_shards) { std::set<ShardIdFull> temporary_shards) {
CHECK(shards_to_monitor.count(ShardIdFull(masterchainId))); CHECK(shards_to_monitor.count(ShardIdFull(masterchainId)));
std::map<ShardIdFull, BlockIdExt> new_shards; std::set<ShardIdFull> new_shards;
std::map<ShardIdFull, FullNodeShardMode> new_active; std::map<ShardIdFull, FullNodeShardMode> new_active;
new_shards[ShardIdFull(masterchainId)] = state->get_block_id(); new_shards.insert(ShardIdFull(masterchainId));
std::set<WorkchainId> workchains; std::set<WorkchainId> workchains;
auto cut_shard = [&](ShardIdFull shard) -> ShardIdFull {
unsigned pfx_len = shard.pfx_len();
unsigned min_split = state->soft_min_split_depth(shard.workchain);
if (min_split < pfx_len) {
return shard_prefix(shard, min_split);
}
return shard;
};
auto set_active = [&](ShardIdFull shard, FullNodeShardMode mode) { auto set_active = [&](ShardIdFull shard, FullNodeShardMode mode) {
while (new_active.emplace(shard, mode).second && shard.pfx_len() > 0) { while (new_active.emplace(shard, mode).second && shard.pfx_len() > 0) {
shard = shard_parent(shard); shard = shard_parent(shard);
@ -146,20 +154,20 @@ void FullNodeImpl::update_shard_configuration(td::Ref<MasterchainState> state, s
}; };
for (auto &info : state->get_shards()) { for (auto &info : state->get_shards()) {
workchains.insert(info->shard().workchain); workchains.insert(info->shard().workchain);
new_shards[info->shard()] = info->top_block_id(); new_shards.insert(cut_shard(info->shard()));
} }
for (const auto &wpair : state->get_workchain_list()) { for (const auto &wpair : state->get_workchain_list()) {
ton::WorkchainId wc = wpair.first; ton::WorkchainId wc = wpair.first;
const block::WorkchainInfo *winfo = wpair.second.get(); const block::WorkchainInfo *winfo = wpair.second.get();
if (workchains.count(wc) == 0 && winfo->active && winfo->enabled_since <= state->get_unix_time()) { if (workchains.count(wc) == 0 && winfo->active && winfo->enabled_since <= state->get_unix_time()) {
new_shards[ShardIdFull(wc)] = BlockIdExt(wc, shardIdAll, 0, winfo->zerostate_root_hash, winfo->zerostate_file_hash); new_shards.insert(ShardIdFull(wc));
} }
} }
for (ShardIdFull shard : shards_to_monitor) { for (ShardIdFull shard : shards_to_monitor) {
set_active(shard, FullNodeShardMode::active); set_active(cut_shard(shard), FullNodeShardMode::active);
} }
for (ShardIdFull shard : temporary_shards) { for (ShardIdFull shard : temporary_shards) {
set_active(shard, FullNodeShardMode::active_temp); set_active(cut_shard(shard), FullNodeShardMode::active_temp);
} }
auto info_set_mode = [&](ShardIdFull shard, ShardInfo& info, FullNodeShardMode mode) { auto info_set_mode = [&](ShardIdFull shard, ShardInfo& info, FullNodeShardMode mode) {
@ -177,7 +185,7 @@ void FullNodeImpl::update_shard_configuration(td::Ref<MasterchainState> state, s
}; };
for (auto shard : new_shards) { for (auto shard : new_shards) {
auto &info = shards_[shard.first]; auto &info = shards_[shard];
info.exists = true; info.exists = true;
} }
@ -283,7 +291,7 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s
} }
void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) { void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) {
auto shard = get_shard(broadcast.block_id.shard_full(), true); auto shard = get_shard(broadcast.block_id.shard_full());
if (shard.empty()) { if (shard.empty()) {
VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard"; VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard";
return; return;
@ -379,27 +387,28 @@ void FullNodeImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull
std::move(promise)); std::move(promise));
} }
td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard, bool exact) { td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard) {
if (!exact) { while (true) {
ShardIdFull s = shard; auto it = shards_.find(shard);
while (true) { if (it != shards_.end() && it->second.exists) {
auto it = shards_.find(s); if (it->second.actor.empty()) {
if (it != shards_.end() && it->second.exists) { add_shard_actor(shard, FullNodeShardMode::inactive);
if (it->second.actor.empty()) {
add_shard_actor(s, FullNodeShardMode::inactive);
}
if (it->second.mode == FullNodeShardMode::inactive) {
it->second.delete_at = td::Timestamp::in(INACTIVE_SHARD_TTL);
}
return it->second.actor.get();
} }
if (s.pfx_len() == 0) { if (it->second.mode == FullNodeShardMode::inactive) {
break; it->second.delete_at = td::Timestamp::in(INACTIVE_SHARD_TTL);
} }
s = shard_parent(s); return it->second.actor.get();
} }
if (shard.pfx_len() == 0) {
break;
}
shard = shard_parent(shard);
} }
auto &info = shards_[shard]; auto it = shards_.find(shard);
if (it == shards_.end()) {
it = shards_.emplace(shard = ShardIdFull(shard.workchain), ShardInfo{}).first;
}
auto &info = it->second;
if (info.actor.empty()) { if (info.actor.empty()) {
add_shard_actor(shard, FullNodeShardMode::inactive); add_shard_actor(shard, FullNodeShardMode::inactive);
} }

View file

@ -108,7 +108,7 @@ class FullNodeImpl : public FullNode {
FileHash zero_state_file_hash_; FileHash zero_state_file_hash_;
td::actor::ActorId<FullNodeShard> get_shard(AccountIdPrefixFull dst); td::actor::ActorId<FullNodeShard> get_shard(AccountIdPrefixFull dst);
td::actor::ActorId<FullNodeShard> get_shard(ShardIdFull shard, bool exact = false); td::actor::ActorId<FullNodeShard> get_shard(ShardIdFull shard);
std::map<ShardIdFull, ShardInfo> shards_; std::map<ShardIdFull, ShardInfo> shards_;
td::actor::ActorId<keyring::Keyring> keyring_; td::actor::ActorId<keyring::Keyring> keyring_;

View file

@ -566,7 +566,7 @@ void ValidatorManagerMasterchainStarter::truncated() {
truncate_shard_next(handle_->id(), ig.get_promise()); truncate_shard_next(handle_->id(), ig.get_promise());
auto s = state_->get_shards(); auto s = state_->get_shards();
for (auto &shard : s) { for (auto &shard : s) {
if (opts_->need_monitor(shard->shard())) { if (opts_->need_monitor(shard->shard(), state_)) {
truncate_shard_next(shard->top_block_id(), ig.get_promise()); truncate_shard_next(shard->top_block_id(), ig.get_promise());
} }
} }

View file

@ -214,7 +214,7 @@ void ValidatorManagerImpl::prevalidate_block(BlockBroadcast broadcast, td::Promi
promise.set_error(td::Status::Error(ErrorCode::notready, "node not started")); promise.set_error(td::Status::Error(ErrorCode::notready, "node not started"));
return; return;
} }
if (!shards_to_monitor_.count(broadcast.block_id.shard_full())) { if (!need_monitor(broadcast.block_id.shard_full())) {
promise.set_error(td::Status::Error("not monitoring shard")); promise.set_error(td::Status::Error("not monitoring shard"));
return; return;
} }
@ -457,7 +457,7 @@ void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDesc
} }
shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}] = desc; shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}] = desc;
VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id(); VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id();
if (shards_to_monitor_.count(desc->block_id().shard_full())) { if (need_monitor(desc->block_id().shard_full())) {
auto P = td::PromiseCreator::lambda([](td::Result<td::Ref<ShardState>> R) { auto P = td::PromiseCreator::lambda([](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) { if (R.is_error()) {
auto S = R.move_as_error(); auto S = R.move_as_error();
@ -617,10 +617,9 @@ void ValidatorManagerImpl::wait_out_msg_queue_proof(BlockIdExt block_id, ShardId
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard, td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard,
std::move(R)); std::move(R));
}); });
auto id = td::actor::create_actor<WaitOutMsgQueueProof>("waitmsgqueue", block_id, dst_shard, auto id = td::actor::create_actor<WaitOutMsgQueueProof>(
shards_to_monitor_.count(block_id.shard_full()), priority, "waitmsgqueue", block_id, dst_shard, need_monitor(block_id.shard_full()), priority, actor_id(this),
actor_id(this), td::Timestamp::at(timeout.at() + 10.0), td::Timestamp::at(timeout.at() + 10.0), std::move(P))
std::move(P))
.release(); .release();
wait_out_msg_queue_proof_[key].actor_ = id; wait_out_msg_queue_proof_[key].actor_ = id;
it = wait_out_msg_queue_proof_.find(key); it = wait_out_msg_queue_proof_.find(key);
@ -1083,8 +1082,8 @@ void ValidatorManagerImpl::finished_wait_msg_queue(BlockIdExt block_id, ShardIdF
std::move(R)); std::move(R));
}); });
auto id = td::actor::create_actor<WaitOutMsgQueueProof>("waitmsgqueue", block_id, dst_shard, auto id = td::actor::create_actor<WaitOutMsgQueueProof>("waitmsgqueue", block_id, dst_shard,
shards_to_monitor_.count(block_id.shard_full()), need_monitor(block_id.shard_full()), X.second,
X.second, actor_id(this), X.first, std::move(P)) actor_id(this), X.first, std::move(P))
.release(); .release();
it->second.actor_ = id; it->second.actor_ = id;
return; return;
@ -2489,7 +2488,6 @@ void ValidatorManagerImpl::get_shard_client_state(bool from_db, td::Promise<Bloc
void ValidatorManagerImpl::update_shard_configuration(td::Ref<MasterchainState> state, void ValidatorManagerImpl::update_shard_configuration(td::Ref<MasterchainState> state,
std::set<ShardIdFull> shards_to_monitor) { std::set<ShardIdFull> shards_to_monitor) {
shards_to_monitor_ = shards_to_monitor;
callback_->update_shard_configuration(std::move(state), std::move(shards_to_monitor), extra_active_shards_); callback_->update_shard_configuration(std::move(state), std::move(shards_to_monitor), extra_active_shards_);
} }

View file

@ -628,11 +628,14 @@ class ValidatorManagerImpl : public ValidatorManager {
td::Ref<PersistentStateDescription> get_block_persistent_state(BlockIdExt block_id); td::Ref<PersistentStateDescription> get_block_persistent_state(BlockIdExt block_id);
private: private:
bool need_monitor(ShardIdFull shard) const {
return opts_->need_monitor(shard, last_masterchain_state_);
}
std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_; std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_;
std::map<adnl::AdnlNodeIdShort, td::actor::ActorOwn<CollatorNode>> collator_nodes_; std::map<adnl::AdnlNodeIdShort, td::actor::ActorOwn<CollatorNode>> collator_nodes_;
std::set<ShardIdFull> shards_to_monitor_ = {ShardIdFull(masterchainId)};
std::set<ShardIdFull> extra_active_shards_; std::set<ShardIdFull> extra_active_shards_;
std::map<ShardIdFull, BlockSeqno> last_validated_blocks_; std::map<ShardIdFull, BlockSeqno> last_validated_blocks_;

View file

@ -91,7 +91,7 @@ void ShardClient::start_up_init_mode() {
auto vec = masterchain_state_->get_shards(); auto vec = masterchain_state_->get_shards();
for (auto &shard : vec) { for (auto &shard : vec) {
if (shards_to_monitor_.count(shard->shard())) { if (opts_->need_monitor(shard->shard(), masterchain_state_)) {
auto P = td::PromiseCreator::lambda([promise = ig.get_promise()](td::Result<td::Ref<ShardState>> R) mutable { auto P = td::PromiseCreator::lambda([promise = ig.get_promise()](td::Result<td::Ref<ShardState>> R) mutable {
R.ensure(); R.ensure();
promise.set_value(td::Unit()); promise.set_value(td::Unit());
@ -192,7 +192,7 @@ void ShardClient::apply_all_shards() {
auto vec = masterchain_state_->get_shards(); auto vec = masterchain_state_->get_shards();
for (auto &shard : vec) { for (auto &shard : vec) {
if (shards_to_monitor_.count(shard->shard())) { if (opts_->need_monitor(shard->shard(), masterchain_state_)) {
auto Q = td::PromiseCreator::lambda([SelfId = actor_id(this), promise = ig.get_promise(), auto Q = td::PromiseCreator::lambda([SelfId = actor_id(this), promise = ig.get_promise(),
shard = shard->shard()](td::Result<td::Ref<ShardState>> R) mutable { shard = shard->shard()](td::Result<td::Ref<ShardState>> R) mutable {
if (R.is_error()) { if (R.is_error()) {
@ -254,23 +254,9 @@ void ShardClient::build_shard_overlays() {
for (const auto &info : masterchain_state_->get_shards()) { for (const auto &info : masterchain_state_->get_shards()) {
auto shard = info->shard(); auto shard = info->shard();
workchains.insert(shard.workchain); workchains.insert(shard.workchain);
bool will_split = shard.pfx_len() < max_shard_pfx_len && if (opts_->need_monitor(shard, masterchain_state_)) {
(info->fsm_state() == McShardHash::FsmState::fsm_split || info->before_split());
bool will_merge =
shard.pfx_len() > 0 && (info->fsm_state() == McShardHash::FsmState::fsm_merge || info->before_merge());
if (opts_->need_monitor(shard) || (will_merge && opts_->need_monitor(shard_parent(shard)))) {
new_shards_to_monitor.insert(shard); new_shards_to_monitor.insert(shard);
} }
if (will_merge && opts_->need_monitor(shard_parent(shard))) {
new_shards_to_monitor.insert(shard_parent(shard));
}
if (will_split) {
for (int id = 0; id < 2; ++id) {
if (opts_->need_monitor(shard_child(shard, id))) {
new_shards_to_monitor.insert(shard_child(shard, id));
}
}
}
} }
std::vector<BlockIdExt> new_workchains; std::vector<BlockIdExt> new_workchains;
@ -278,7 +264,8 @@ void ShardClient::build_shard_overlays() {
ton::WorkchainId wc = wpair.first; ton::WorkchainId wc = wpair.first;
const block::WorkchainInfo *winfo = wpair.second.get(); const block::WorkchainInfo *winfo = wpair.second.get();
auto shard = ShardIdFull(wc); auto shard = ShardIdFull(wc);
if (workchains.count(wc) == 0 && winfo->active && winfo->enabled_since <= cur_time && opts_->need_monitor(shard)) { if (workchains.count(wc) == 0 && winfo->active && winfo->enabled_since <= cur_time &&
opts_->need_monitor(shard, masterchain_state_)) {
new_shards_to_monitor.insert(shard); new_shards_to_monitor.insert(shard);
if (shards_to_monitor_.count(shard) == 0) { if (shards_to_monitor_.count(shard) == 0) {
new_workchains.push_back(BlockIdExt(wc, shardIdAll, 0, winfo->zerostate_root_hash, winfo->zerostate_file_hash)); new_workchains.push_back(BlockIdExt(wc, shardIdAll, 0, winfo->zerostate_root_hash, winfo->zerostate_file_hash));

View file

@ -159,16 +159,12 @@ void AsyncStateSerializer::next_iteration() {
return; return;
} }
while (next_idx_ < shards_.size()) { while (next_idx_ < shards_.size()) {
if (!need_monitor(shards_[next_idx_].shard_full())) { // block next attempts immediately, but send actual request later
next_idx_++; running_ = true;
} else { delay_action(
// block next attempts immediately, but send actual request later [SelfId = actor_id(this), shard = shards_[next_idx_]]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard); },
running_ = true; td::Timestamp::in(td::Random::fast(0, 4 * 3600)));
delay_action( return;
[SelfId = actor_id(this), shard = shards_[next_idx_]]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard); },
td::Timestamp::in(td::Random::fast(0, 4 * 3600)));
return;
}
} }
LOG(INFO) << "finished serializing persistent state for " << masterchain_handle_->id().id; LOG(INFO) << "finished serializing persistent state for " << masterchain_handle_->id().id;
last_key_block_ts_ = masterchain_handle_->unix_time(); last_key_block_ts_ = masterchain_handle_->unix_time();
@ -245,7 +241,9 @@ void AsyncStateSerializer::got_masterchain_state(td::Ref<MasterchainState> state
auto vec = state->get_shards(); auto vec = state->get_shards();
for (auto &v : vec) { for (auto &v : vec) {
shards_.push_back(v->top_block_id()); if (opts_->need_monitor(v->shard(), state)) {
shards_.push_back(v->top_block_id());
}
} }
auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader = cell_db_reader_] (td::FileFd& fd) { auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader = cell_db_reader_] (td::FileFd& fd) {
@ -311,10 +309,6 @@ void AsyncStateSerializer::success_handler() {
next_iteration(); next_iteration();
} }
bool AsyncStateSerializer::need_monitor(ShardIdFull shard) {
return opts_->need_monitor(shard);
}
bool AsyncStateSerializer::need_serialize(BlockHandle handle) { bool AsyncStateSerializer::need_serialize(BlockHandle handle) {
if (handle->id().id.seqno == 0 || !handle->is_key_block()) { if (handle->id().id.seqno == 0 || !handle->is_key_block()) {
return false; return false;

View file

@ -60,7 +60,6 @@ class AsyncStateSerializer : public td::actor::Actor {
} }
bool need_serialize(BlockHandle handle); bool need_serialize(BlockHandle handle);
bool need_monitor(ShardIdFull shard);
void alarm() override; void alarm() override;
void start_up() override; void start_up() override;

View file

@ -32,8 +32,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
BlockIdExt init_block_id() const override { BlockIdExt init_block_id() const override {
return init_block_id_; return init_block_id_;
} }
bool need_monitor(ShardIdFull shard) const override { bool need_monitor(ShardIdFull shard, const td::Ref<MasterchainState>& state) const override {
return check_shard_(shard); td::uint32 min_split = state->min_split_depth(shard.workchain);
return check_shard_((td::uint32)shard.pfx_len() <= min_split ? shard : shard_prefix(shard, min_split));
} }
bool allow_blockchain_init() const override { bool allow_blockchain_init() const override {
return allow_blockchain_init_; return allow_blockchain_init_;

View file

@ -49,7 +49,7 @@ struct ValidatorManagerOptions : public td::CntObject {
public: public:
virtual BlockIdExt zero_block_id() const = 0; virtual BlockIdExt zero_block_id() const = 0;
virtual BlockIdExt init_block_id() const = 0; virtual BlockIdExt init_block_id() const = 0;
virtual bool need_monitor(ShardIdFull shard) const = 0; virtual bool need_monitor(ShardIdFull shard, const td::Ref<MasterchainState>& state) const = 0;
virtual bool allow_blockchain_init() const = 0; virtual bool allow_blockchain_init() const = 0;
virtual double sync_blocks_before() const = 0; virtual double sync_blocks_before() const = 0;
virtual double block_ttl() const = 0; virtual double block_ttl() const = 0;