diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index e491061e..d048f611 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -670,7 +670,7 @@ void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ne void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) { BlockIdExt block_id = create_block_id(query.id_); - //if (block_id.shard_full() != shard_) { + //if (!shard_is_ancestor(shard_, block_id.shard_full())) { // LOG(FULL_NODE_WARNING) << "dropping block broadcast: shard mismatch. overlay=" << shard_.to_str() // << " block=" << block_id.to_str(); // return; diff --git a/validator/full-node.cpp b/validator/full-node.cpp index 52d3cfb9..72af6fe8 100644 --- a/validator/full-node.cpp +++ b/validator/full-node.cpp @@ -135,10 +135,18 @@ void FullNodeImpl::initial_read_complete(BlockHandle top_handle) { void FullNodeImpl::update_shard_configuration(td::Ref state, std::set shards_to_monitor, std::set temporary_shards) { CHECK(shards_to_monitor.count(ShardIdFull(masterchainId))); - std::map new_shards; + std::set new_shards; std::map new_active; - new_shards[ShardIdFull(masterchainId)] = state->get_block_id(); + new_shards.insert(ShardIdFull(masterchainId)); std::set workchains; + auto cut_shard = [&](ShardIdFull shard) -> ShardIdFull { + unsigned pfx_len = shard.pfx_len(); + unsigned min_split = state->soft_min_split_depth(shard.workchain); + if (min_split < pfx_len) { + return shard_prefix(shard, min_split); + } + return shard; + }; auto set_active = [&](ShardIdFull shard, FullNodeShardMode mode) { while (new_active.emplace(shard, mode).second && shard.pfx_len() > 0) { shard = shard_parent(shard); @@ -146,20 +154,20 @@ void FullNodeImpl::update_shard_configuration(td::Ref state, s }; for (auto &info : state->get_shards()) { workchains.insert(info->shard().workchain); - new_shards[info->shard()] = info->top_block_id(); + new_shards.insert(cut_shard(info->shard())); } for (const auto &wpair : state->get_workchain_list()) { ton::WorkchainId wc = wpair.first; const block::WorkchainInfo *winfo = wpair.second.get(); if (workchains.count(wc) == 0 && winfo->active && winfo->enabled_since <= state->get_unix_time()) { - new_shards[ShardIdFull(wc)] = BlockIdExt(wc, shardIdAll, 0, winfo->zerostate_root_hash, winfo->zerostate_file_hash); + new_shards.insert(ShardIdFull(wc)); } } for (ShardIdFull shard : shards_to_monitor) { - set_active(shard, FullNodeShardMode::active); + set_active(cut_shard(shard), FullNodeShardMode::active); } for (ShardIdFull shard : temporary_shards) { - set_active(shard, FullNodeShardMode::active_temp); + set_active(cut_shard(shard), FullNodeShardMode::active_temp); } auto info_set_mode = [&](ShardIdFull shard, ShardInfo& info, FullNodeShardMode mode) { @@ -177,7 +185,7 @@ void FullNodeImpl::update_shard_configuration(td::Ref state, s }; for (auto shard : new_shards) { - auto &info = shards_[shard.first]; + auto &info = shards_[shard]; info.exists = true; } @@ -283,7 +291,7 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s } void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) { - auto shard = get_shard(broadcast.block_id.shard_full(), true); + auto shard = get_shard(broadcast.block_id.shard_full()); if (shard.empty()) { VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard"; return; @@ -379,27 +387,28 @@ void FullNodeImpl::download_out_msg_queue_proof(BlockIdExt block_id, ShardIdFull std::move(promise)); } -td::actor::ActorId FullNodeImpl::get_shard(ShardIdFull shard, bool exact) { - if (!exact) { - ShardIdFull s = shard; - while (true) { - auto it = shards_.find(s); - if (it != shards_.end() && it->second.exists) { - if (it->second.actor.empty()) { - add_shard_actor(s, FullNodeShardMode::inactive); - } - if (it->second.mode == FullNodeShardMode::inactive) { - it->second.delete_at = td::Timestamp::in(INACTIVE_SHARD_TTL); - } - return it->second.actor.get(); +td::actor::ActorId FullNodeImpl::get_shard(ShardIdFull shard) { + while (true) { + auto it = shards_.find(shard); + if (it != shards_.end() && it->second.exists) { + if (it->second.actor.empty()) { + add_shard_actor(shard, FullNodeShardMode::inactive); } - if (s.pfx_len() == 0) { - break; + if (it->second.mode == FullNodeShardMode::inactive) { + it->second.delete_at = td::Timestamp::in(INACTIVE_SHARD_TTL); } - s = shard_parent(s); + return it->second.actor.get(); } + if (shard.pfx_len() == 0) { + break; + } + shard = shard_parent(shard); } - auto &info = shards_[shard]; + auto it = shards_.find(shard); + if (it == shards_.end()) { + it = shards_.emplace(shard = ShardIdFull(shard.workchain), ShardInfo{}).first; + } + auto &info = it->second; if (info.actor.empty()) { add_shard_actor(shard, FullNodeShardMode::inactive); } diff --git a/validator/full-node.hpp b/validator/full-node.hpp index 8cc8994b..503ba286 100644 --- a/validator/full-node.hpp +++ b/validator/full-node.hpp @@ -108,7 +108,7 @@ class FullNodeImpl : public FullNode { FileHash zero_state_file_hash_; td::actor::ActorId get_shard(AccountIdPrefixFull dst); - td::actor::ActorId get_shard(ShardIdFull shard, bool exact = false); + td::actor::ActorId get_shard(ShardIdFull shard); std::map shards_; td::actor::ActorId keyring_; diff --git a/validator/manager-init.cpp b/validator/manager-init.cpp index 1a322c02..8935664e 100644 --- a/validator/manager-init.cpp +++ b/validator/manager-init.cpp @@ -566,7 +566,7 @@ void ValidatorManagerMasterchainStarter::truncated() { truncate_shard_next(handle_->id(), ig.get_promise()); auto s = state_->get_shards(); for (auto &shard : s) { - if (opts_->need_monitor(shard->shard())) { + if (opts_->need_monitor(shard->shard(), state_)) { truncate_shard_next(shard->top_block_id(), ig.get_promise()); } } diff --git a/validator/manager.cpp b/validator/manager.cpp index 95ef6bf0..2a17af5e 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -214,7 +214,7 @@ void ValidatorManagerImpl::prevalidate_block(BlockBroadcast broadcast, td::Promi promise.set_error(td::Status::Error(ErrorCode::notready, "node not started")); return; } - if (!shards_to_monitor_.count(broadcast.block_id.shard_full())) { + if (!need_monitor(broadcast.block_id.shard_full())) { promise.set_error(td::Status::Error("not monitoring shard")); return; } @@ -457,7 +457,7 @@ void ValidatorManagerImpl::add_shard_block_description(td::Refblock_id().shard_full(), desc->catchain_seqno()}] = desc; VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id(); - if (shards_to_monitor_.count(desc->block_id().shard_full())) { + if (need_monitor(desc->block_id().shard_full())) { auto P = td::PromiseCreator::lambda([](td::Result> R) { if (R.is_error()) { auto S = R.move_as_error(); @@ -617,10 +617,9 @@ void ValidatorManagerImpl::wait_out_msg_queue_proof(BlockIdExt block_id, ShardId td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_msg_queue, block_id, dst_shard, std::move(R)); }); - auto id = td::actor::create_actor("waitmsgqueue", block_id, dst_shard, - shards_to_monitor_.count(block_id.shard_full()), priority, - actor_id(this), td::Timestamp::at(timeout.at() + 10.0), - std::move(P)) + auto id = td::actor::create_actor( + "waitmsgqueue", block_id, dst_shard, need_monitor(block_id.shard_full()), priority, actor_id(this), + td::Timestamp::at(timeout.at() + 10.0), std::move(P)) .release(); wait_out_msg_queue_proof_[key].actor_ = id; it = wait_out_msg_queue_proof_.find(key); @@ -1083,8 +1082,8 @@ void ValidatorManagerImpl::finished_wait_msg_queue(BlockIdExt block_id, ShardIdF std::move(R)); }); auto id = td::actor::create_actor("waitmsgqueue", block_id, dst_shard, - shards_to_monitor_.count(block_id.shard_full()), - X.second, actor_id(this), X.first, std::move(P)) + need_monitor(block_id.shard_full()), X.second, + actor_id(this), X.first, std::move(P)) .release(); it->second.actor_ = id; return; @@ -2489,7 +2488,6 @@ void ValidatorManagerImpl::get_shard_client_state(bool from_db, td::Promise state, std::set shards_to_monitor) { - shards_to_monitor_ = shards_to_monitor; callback_->update_shard_configuration(std::move(state), std::move(shards_to_monitor), extra_active_shards_); } diff --git a/validator/manager.hpp b/validator/manager.hpp index ddef3441..3b32cfcd 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -628,11 +628,14 @@ class ValidatorManagerImpl : public ValidatorManager { td::Ref get_block_persistent_state(BlockIdExt block_id); private: + bool need_monitor(ShardIdFull shard) const { + return opts_->need_monitor(shard, last_masterchain_state_); + } + std::map> shard_client_waiters_; std::map> collator_nodes_; - std::set shards_to_monitor_ = {ShardIdFull(masterchainId)}; std::set extra_active_shards_; std::map last_validated_blocks_; diff --git a/validator/shard-client.cpp b/validator/shard-client.cpp index 431d7721..5c25b783 100644 --- a/validator/shard-client.cpp +++ b/validator/shard-client.cpp @@ -91,7 +91,7 @@ void ShardClient::start_up_init_mode() { auto vec = masterchain_state_->get_shards(); for (auto &shard : vec) { - if (shards_to_monitor_.count(shard->shard())) { + if (opts_->need_monitor(shard->shard(), masterchain_state_)) { auto P = td::PromiseCreator::lambda([promise = ig.get_promise()](td::Result> R) mutable { R.ensure(); promise.set_value(td::Unit()); @@ -192,7 +192,7 @@ void ShardClient::apply_all_shards() { auto vec = masterchain_state_->get_shards(); for (auto &shard : vec) { - if (shards_to_monitor_.count(shard->shard())) { + if (opts_->need_monitor(shard->shard(), masterchain_state_)) { auto Q = td::PromiseCreator::lambda([SelfId = actor_id(this), promise = ig.get_promise(), shard = shard->shard()](td::Result> R) mutable { if (R.is_error()) { @@ -254,23 +254,9 @@ void ShardClient::build_shard_overlays() { for (const auto &info : masterchain_state_->get_shards()) { auto shard = info->shard(); workchains.insert(shard.workchain); - bool will_split = shard.pfx_len() < max_shard_pfx_len && - (info->fsm_state() == McShardHash::FsmState::fsm_split || info->before_split()); - bool will_merge = - shard.pfx_len() > 0 && (info->fsm_state() == McShardHash::FsmState::fsm_merge || info->before_merge()); - if (opts_->need_monitor(shard) || (will_merge && opts_->need_monitor(shard_parent(shard)))) { + if (opts_->need_monitor(shard, masterchain_state_)) { new_shards_to_monitor.insert(shard); } - if (will_merge && opts_->need_monitor(shard_parent(shard))) { - new_shards_to_monitor.insert(shard_parent(shard)); - } - if (will_split) { - for (int id = 0; id < 2; ++id) { - if (opts_->need_monitor(shard_child(shard, id))) { - new_shards_to_monitor.insert(shard_child(shard, id)); - } - } - } } std::vector new_workchains; @@ -278,7 +264,8 @@ void ShardClient::build_shard_overlays() { ton::WorkchainId wc = wpair.first; const block::WorkchainInfo *winfo = wpair.second.get(); auto shard = ShardIdFull(wc); - if (workchains.count(wc) == 0 && winfo->active && winfo->enabled_since <= cur_time && opts_->need_monitor(shard)) { + if (workchains.count(wc) == 0 && winfo->active && winfo->enabled_since <= cur_time && + opts_->need_monitor(shard, masterchain_state_)) { new_shards_to_monitor.insert(shard); if (shards_to_monitor_.count(shard) == 0) { new_workchains.push_back(BlockIdExt(wc, shardIdAll, 0, winfo->zerostate_root_hash, winfo->zerostate_file_hash)); diff --git a/validator/state-serializer.cpp b/validator/state-serializer.cpp index fc8cd29a..1c8d677c 100644 --- a/validator/state-serializer.cpp +++ b/validator/state-serializer.cpp @@ -159,16 +159,12 @@ void AsyncStateSerializer::next_iteration() { return; } while (next_idx_ < shards_.size()) { - if (!need_monitor(shards_[next_idx_].shard_full())) { - next_idx_++; - } else { - // block next attempts immediately, but send actual request later - running_ = true; - delay_action( - [SelfId = actor_id(this), shard = shards_[next_idx_]]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard); }, - td::Timestamp::in(td::Random::fast(0, 4 * 3600))); - return; - } + // block next attempts immediately, but send actual request later + running_ = true; + delay_action( + [SelfId = actor_id(this), shard = shards_[next_idx_]]() { td::actor::send_closure(SelfId, &AsyncStateSerializer::request_shard_state, shard); }, + td::Timestamp::in(td::Random::fast(0, 4 * 3600))); + return; } LOG(INFO) << "finished serializing persistent state for " << masterchain_handle_->id().id; last_key_block_ts_ = masterchain_handle_->unix_time(); @@ -245,7 +241,9 @@ void AsyncStateSerializer::got_masterchain_state(td::Ref state auto vec = state->get_shards(); for (auto &v : vec) { - shards_.push_back(v->top_block_id()); + if (opts_->need_monitor(v->shard(), state)) { + shards_.push_back(v->top_block_id()); + } } auto write_data = [hash = state->root_cell()->get_hash(), cell_db_reader = cell_db_reader_] (td::FileFd& fd) { @@ -311,10 +309,6 @@ void AsyncStateSerializer::success_handler() { next_iteration(); } -bool AsyncStateSerializer::need_monitor(ShardIdFull shard) { - return opts_->need_monitor(shard); -} - bool AsyncStateSerializer::need_serialize(BlockHandle handle) { if (handle->id().id.seqno == 0 || !handle->is_key_block()) { return false; diff --git a/validator/state-serializer.hpp b/validator/state-serializer.hpp index 0d6e9550..1472c3b4 100644 --- a/validator/state-serializer.hpp +++ b/validator/state-serializer.hpp @@ -60,7 +60,6 @@ class AsyncStateSerializer : public td::actor::Actor { } bool need_serialize(BlockHandle handle); - bool need_monitor(ShardIdFull shard); void alarm() override; void start_up() override; diff --git a/validator/validator-options.hpp b/validator/validator-options.hpp index b4a94a39..e96292ad 100644 --- a/validator/validator-options.hpp +++ b/validator/validator-options.hpp @@ -32,8 +32,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { BlockIdExt init_block_id() const override { return init_block_id_; } - bool need_monitor(ShardIdFull shard) const override { - return check_shard_(shard); + bool need_monitor(ShardIdFull shard, const td::Ref& state) const override { + td::uint32 min_split = state->min_split_depth(shard.workchain); + return check_shard_((td::uint32)shard.pfx_len() <= min_split ? shard : shard_prefix(shard, min_split)); } bool allow_blockchain_init() const override { return allow_blockchain_init_; diff --git a/validator/validator.h b/validator/validator.h index 0daccc1a..c52cbad7 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -49,7 +49,7 @@ struct ValidatorManagerOptions : public td::CntObject { public: virtual BlockIdExt zero_block_id() const = 0; virtual BlockIdExt init_block_id() const = 0; - virtual bool need_monitor(ShardIdFull shard) const = 0; + virtual bool need_monitor(ShardIdFull shard, const td::Ref& state) const = 0; virtual bool allow_blockchain_init() const = 0; virtual double sync_blocks_before() const = 0; virtual double block_ttl() const = 0;