mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Validators temporary join shard overlays
This commit is contained in:
parent
7749cbfa1f
commit
662435462e
18 changed files with 141 additions and 75 deletions
|
@ -25,7 +25,6 @@
|
|||
|
||||
#include "ton/ton-shard.h"
|
||||
#include "ton/ton-tl.hpp"
|
||||
#include "ton/ton-io.hpp"
|
||||
|
||||
#include "adnl/utils.hpp"
|
||||
#include "net/download-block-new.hpp"
|
||||
|
@ -71,38 +70,41 @@ void Neighbour::update_roundtrip(double t) {
|
|||
}
|
||||
|
||||
void FullNodeShardImpl::create_overlay() {
|
||||
class Callback : public overlay::Overlays::Callback {
|
||||
public:
|
||||
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id,
|
||||
td::BufferSlice data) override {
|
||||
// just ignore
|
||||
}
|
||||
void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
|
||||
td::Promise<td::BufferSlice> promise) override {
|
||||
td::actor::send_closure(node_, &FullNodeShardImpl::receive_query, src, std::move(data), std::move(promise));
|
||||
}
|
||||
void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
|
||||
td::actor::send_closure(node_, &FullNodeShardImpl::receive_broadcast, src, std::move(data));
|
||||
}
|
||||
void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
|
||||
td::Promise<td::Unit> promise) override {
|
||||
td::actor::send_closure(node_, &FullNodeShardImpl::check_broadcast, src, std::move(data), std::move(promise));
|
||||
}
|
||||
void on_remove_peer(adnl::AdnlNodeIdShort src) override {
|
||||
td::actor::send_closure(node_, &FullNodeShardImpl::remove_neighbour, src);
|
||||
}
|
||||
Callback(td::actor::ActorId<FullNodeShardImpl> node) : node_(node) {
|
||||
}
|
||||
|
||||
private:
|
||||
td::actor::ActorId<FullNodeShardImpl> node_;
|
||||
};
|
||||
|
||||
if (active_) {
|
||||
class Callback : public overlay::Overlays::Callback {
|
||||
public:
|
||||
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id,
|
||||
td::BufferSlice data) override {
|
||||
// just ignore
|
||||
}
|
||||
void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
|
||||
td::Promise<td::BufferSlice> promise) override {
|
||||
td::actor::send_closure(node_, &FullNodeShardImpl::receive_query, src, std::move(data), std::move(promise));
|
||||
}
|
||||
void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
|
||||
td::actor::send_closure(node_, &FullNodeShardImpl::receive_broadcast, src, std::move(data));
|
||||
}
|
||||
void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
|
||||
td::Promise<td::Unit> promise) override {
|
||||
td::actor::send_closure(node_, &FullNodeShardImpl::check_broadcast, src, std::move(data), std::move(promise));
|
||||
}
|
||||
Callback(td::actor::ActorId<FullNodeShardImpl> node) : node_(node) {
|
||||
}
|
||||
|
||||
private:
|
||||
td::actor::ActorId<FullNodeShardImpl> node_;
|
||||
};
|
||||
|
||||
td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay, adnl_id_, overlay_id_full_.clone(),
|
||||
std::make_unique<Callback>(actor_id(this)), rules_,
|
||||
PSTRING() << "{ \"type\": \"shard\", \"shard_id\": " << get_shard()
|
||||
<< ", \"workchain_id\": " << get_workchain() << " }");
|
||||
} else {
|
||||
td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay_external, adnl_id_,
|
||||
overlay_id_full_.clone(), rules_,
|
||||
overlay_id_full_.clone(), std::make_unique<Callback>(actor_id(this)), rules_,
|
||||
PSTRING() << "{ \"type\": \"shard\", \"shard_id\": " << get_shard()
|
||||
<< ", \"workchain_id\": " << get_workchain() << " }");
|
||||
}
|
||||
|
@ -124,6 +126,10 @@ void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broad
|
|||
std::move(q->message_->data_), std::move(promise));
|
||||
}
|
||||
|
||||
void FullNodeShardImpl::remove_neighbour(adnl::AdnlNodeIdShort id) {
|
||||
neighbours_.erase(id);
|
||||
}
|
||||
|
||||
void FullNodeShardImpl::update_adnl_id(adnl::AdnlNodeIdShort adnl_id, td::Promise<td::Unit> promise) {
|
||||
td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, adnl_id_, overlay_id_);
|
||||
adnl_id_ = adnl_id;
|
||||
|
|
|
@ -145,6 +145,7 @@ class FullNodeShardImpl : public FullNodeShard {
|
|||
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query);
|
||||
void receive_broadcast(PublicKeyHash src, td::BufferSlice query);
|
||||
void check_broadcast(PublicKeyHash src, td::BufferSlice query, td::Promise<td::Unit> promise);
|
||||
void remove_neighbour(adnl::AdnlNodeIdShort id);
|
||||
|
||||
void send_ihr_message(td::BufferSlice data) override;
|
||||
void send_external_message(td::BufferSlice data) override;
|
||||
|
|
|
@ -168,6 +168,8 @@ class ValidatorManager : public ValidatorManagerInterface {
|
|||
|
||||
virtual void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) = 0;
|
||||
|
||||
virtual void validated_new_block(BlockIdExt block_id) = 0;
|
||||
|
||||
static bool is_persistent_state(UnixTime ts, UnixTime prev_ts) {
|
||||
return ts / (1 << 17) != prev_ts / (1 << 17);
|
||||
}
|
||||
|
|
|
@ -375,6 +375,8 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override {
|
||||
UNREACHABLE();
|
||||
}
|
||||
void validated_new_block(BlockIdExt block_id) override {
|
||||
}
|
||||
void get_validator_sessions_info(
|
||||
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) override {
|
||||
UNREACHABLE();
|
||||
|
|
|
@ -435,6 +435,8 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) override {
|
||||
UNREACHABLE();
|
||||
}
|
||||
void validated_new_block(BlockIdExt block_id) override {
|
||||
}
|
||||
void get_validator_sessions_info(
|
||||
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) override {
|
||||
UNREACHABLE();
|
||||
|
|
|
@ -418,7 +418,7 @@ void ValidatorManagerImpl::new_ihr_message(td::BufferSlice data) {
|
|||
}
|
||||
|
||||
void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) {
|
||||
if (!is_collator()) {
|
||||
if (!is_collator() && !is_validator()) {
|
||||
return;
|
||||
}
|
||||
if (!last_masterchain_block_handle_) {
|
||||
|
@ -1794,6 +1794,7 @@ void ValidatorManagerImpl::update_shards() {
|
|||
opts.proto_version = std::max<td::uint32>(opts.proto_version, 1);
|
||||
}
|
||||
auto opts_hash = opts.get_hash();
|
||||
extra_active_shards_.clear();
|
||||
|
||||
std::map<ShardIdFull, std::vector<BlockIdExt>> new_shards;
|
||||
std::set<ShardIdFull> future_shards;
|
||||
|
@ -1848,6 +1849,11 @@ void ValidatorManagerImpl::update_shards() {
|
|||
default:
|
||||
LOG(FATAL) << "state=" << static_cast<td::uint32>(v->fsm_state());
|
||||
}
|
||||
cleanup_last_validated_blocks(v->top_block_id().id);
|
||||
}
|
||||
|
||||
for (const auto& s : last_validated_blocks_) {
|
||||
extra_active_shards_.insert(s.first);
|
||||
}
|
||||
|
||||
new_shards.emplace(ShardIdFull{masterchainId, shardIdAll}, std::vector<BlockIdExt>{last_masterchain_block_id_});
|
||||
|
@ -1920,6 +1926,7 @@ void ValidatorManagerImpl::update_shards() {
|
|||
auto validator_id = get_validator(shard, val_set);
|
||||
|
||||
if (!validator_id.is_zero()) {
|
||||
extra_active_shards_.insert(shard);
|
||||
auto val_group_id = get_validator_set_id(shard, val_set, opts_hash, key_seqno, opts);
|
||||
|
||||
if (force_recover) {
|
||||
|
@ -2012,7 +2019,25 @@ void ValidatorManagerImpl::update_shards() {
|
|||
});
|
||||
td::actor::send_closure(db_, &Db::update_destroyed_validator_sessions, gc_list_, std::move(P));
|
||||
}
|
||||
} // namespace validator
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::cleanup_last_validated_blocks(BlockId new_block) {
|
||||
auto process_shard = [&, this](ShardIdFull shard) {
|
||||
auto it = last_validated_blocks_.find(shard);
|
||||
if (it != last_validated_blocks_.end() && it->second < new_block.seqno) {
|
||||
last_validated_blocks_.erase(it);
|
||||
}
|
||||
};
|
||||
ShardIdFull shard = new_block.shard_full();
|
||||
process_shard(shard);
|
||||
if (shard.pfx_len() > 0) {
|
||||
process_shard(shard_parent(shard));
|
||||
}
|
||||
if (shard.pfx_len() < max_shard_pfx_len) {
|
||||
process_shard(shard_child(shard, true));
|
||||
process_shard(shard_child(shard, false));
|
||||
}
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::written_destroyed_validator_sessions(std::vector<td::actor::ActorId<ValidatorGroup>> list) {
|
||||
for (auto &v : list) {
|
||||
|
@ -2428,6 +2453,7 @@ void ValidatorManagerImpl::get_shard_client_state(bool from_db, td::Promise<Bloc
|
|||
void ValidatorManagerImpl::update_shard_configuration(td::Ref<MasterchainState> state,
|
||||
std::set<ShardIdFull> shards_to_monitor) {
|
||||
shards_to_monitor_ = shards_to_monitor;
|
||||
shards_to_monitor.insert(extra_active_shards_.begin(), extra_active_shards_.end());
|
||||
callback_->update_shard_configuration(std::move(state), std::move(shards_to_monitor));
|
||||
}
|
||||
|
||||
|
|
|
@ -543,6 +543,11 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
void get_validator_sessions_info(
|
||||
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) override;
|
||||
|
||||
void validated_new_block(BlockIdExt block_id) override {
|
||||
BlockSeqno &last = last_validated_blocks_[block_id.shard_full()];
|
||||
last = std::max(last, block_id.seqno());
|
||||
}
|
||||
|
||||
void add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override;
|
||||
|
||||
private:
|
||||
|
@ -606,13 +611,17 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
double max_mempool_num() const {
|
||||
return opts_->max_mempool_num();
|
||||
}
|
||||
void cleanup_last_validated_blocks(BlockId new_block);
|
||||
|
||||
private:
|
||||
|
||||
std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_;
|
||||
|
||||
std::map<adnl::AdnlNodeIdShort, td::actor::ActorOwn<CollatorNode>> collator_nodes_;
|
||||
|
||||
std::set<ShardIdFull> shards_to_monitor_ = {ShardIdFull(masterchainId)};
|
||||
std::set<ShardIdFull> extra_active_shards_;
|
||||
std::map<ShardIdFull, BlockSeqno> last_validated_blocks_;
|
||||
};
|
||||
|
||||
} // namespace validator
|
||||
|
|
|
@ -59,10 +59,14 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat
|
|||
approved_candidates_cache_round_ = round_id;
|
||||
approved_candidates_cache_.clear();
|
||||
}
|
||||
auto next_block_id = create_next_block_id(block.id.root_hash, block.id.file_hash);
|
||||
block.id = next_block_id;
|
||||
|
||||
CacheKey cache_key = block_to_cache_key(block);
|
||||
auto it = approved_candidates_cache_.find(cache_key);
|
||||
if (it != approved_candidates_cache_.end()) {
|
||||
promise.set_result(it->second);
|
||||
return;
|
||||
}
|
||||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), round_id, block = block.clone(),
|
||||
|
@ -96,9 +100,7 @@ void ValidatorGroup::validate_block_candidate(td::uint32 round_id, BlockCandidat
|
|||
P.set_error(td::Status::Error(ErrorCode::notready, "validator group not started"));
|
||||
return;
|
||||
}
|
||||
auto next_block_id = create_next_block_id(block.id.root_hash, block.id.file_hash);
|
||||
VLOG(VALIDATOR_DEBUG) << "validating block candidate " << next_block_id;
|
||||
block.id = next_block_id;
|
||||
run_validate_query(shard_, min_masterchain_block_id_, prev_block_ids_, std::move(block), validator_set_, manager_,
|
||||
td::Timestamp::in(10.0), std::move(P), lite_mode_ ? ValidateMode::lite : 0);
|
||||
}
|
||||
|
@ -140,7 +142,10 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
|
|||
|
||||
void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData> block, std::vector<BlockIdExt> prev,
|
||||
td::Ref<BlockSignatureSet> sig_set, td::Ref<BlockSignatureSet> approve_sig_set,
|
||||
bool send_broadcast, td::Promise<td::Unit> promise) {
|
||||
bool send_broadcast, td::Promise<td::Unit> promise, bool is_retry) {
|
||||
if (!is_retry) {
|
||||
td::actor::send_closure(manager_, &ValidatorManager::validated_new_block, block_id);
|
||||
}
|
||||
auto P = td::PromiseCreator::lambda([=, SelfId = actor_id(this),
|
||||
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
|
||||
if (R.is_error()) {
|
||||
|
@ -150,7 +155,7 @@ void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData>
|
|||
}
|
||||
LOG_CHECK(R.error().code() == ErrorCode::timeout || R.error().code() == ErrorCode::notready) << R.move_as_error();
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::accept_block_query, block_id, std::move(block), std::move(prev),
|
||||
std::move(sig_set), std::move(approve_sig_set), false, std::move(promise));
|
||||
std::move(sig_set), std::move(approve_sig_set), false, std::move(promise), true);
|
||||
} else {
|
||||
promise.set_value(R.move_as_ok());
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ class ValidatorGroup : public td::actor::Actor {
|
|||
void skip_round(td::uint32 round);
|
||||
void accept_block_query(BlockIdExt block_id, td::Ref<BlockData> block, std::vector<BlockIdExt> prev,
|
||||
td::Ref<BlockSignatureSet> sigs, td::Ref<BlockSignatureSet> approve_sigs,
|
||||
bool send_broadcast, td::Promise<td::Unit> promise);
|
||||
bool send_broadcast, td::Promise<td::Unit> promise, bool is_retry = false);
|
||||
void get_approved_candidate(PublicKey source, RootHash root_hash, FileHash file_hash,
|
||||
FileHash collated_data_file_hash, td::Promise<BlockCandidate> promise);
|
||||
BlockId create_next_block_id_simple() const;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue