mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Download persistent states when syncing new shards
This commit is contained in:
parent
be2169e523
commit
ea7a5776fe
18 changed files with 273 additions and 17 deletions
|
@ -514,6 +514,9 @@ db.state.shardClient block:tonNode.blockIdExt = db.state.ShardClient;
|
||||||
db.state.asyncSerializer block:tonNode.blockIdExt last:tonNode.blockIdExt last_ts:int = db.state.AsyncSerializer;
|
db.state.asyncSerializer block:tonNode.blockIdExt last:tonNode.blockIdExt last_ts:int = db.state.AsyncSerializer;
|
||||||
db.state.hardforks blocks:(vector tonNode.blockIdExt) = db.state.Hardforks;
|
db.state.hardforks blocks:(vector tonNode.blockIdExt) = db.state.Hardforks;
|
||||||
db.state.dbVersion version:int = db.state.DbVersion;
|
db.state.dbVersion version:int = db.state.DbVersion;
|
||||||
|
db.state.persistentStateDescriptionShards shard_blocks:(vector tonNode.blockIdExt) = db.state.PersistentStateDescriptionShards;
|
||||||
|
db.state.persistentStateDescriptionHeader masterchain_id:tonNode.blockIdExt start_time:int end_time:int = db.state.PersistentStateDescriptionHeader;
|
||||||
|
db.state.persistentStateDescriptionsList list:(vector db.state.persistentStateDescriptionHeader) = db.state.PersistentStateDescriptionsList;
|
||||||
|
|
||||||
db.state.key.destroyedSessions = db.state.Key;
|
db.state.key.destroyedSessions = db.state.Key;
|
||||||
db.state.key.initBlockId = db.state.Key;
|
db.state.key.initBlockId = db.state.Key;
|
||||||
|
@ -522,6 +525,8 @@ db.state.key.shardClient = db.state.Key;
|
||||||
db.state.key.asyncSerializer = db.state.Key;
|
db.state.key.asyncSerializer = db.state.Key;
|
||||||
db.state.key.hardforks = db.state.Key;
|
db.state.key.hardforks = db.state.Key;
|
||||||
db.state.key.dbVersion = db.state.Key;
|
db.state.key.dbVersion = db.state.Key;
|
||||||
|
db.state.key.persistentStateDescriptionShards masterchain_seqno:int = db.state.Key;
|
||||||
|
db.state.key.persistentStateDescriptionsList = db.state.Key;
|
||||||
|
|
||||||
db.lt.el.key workchain:int shard:long idx:int = db.lt.Key;
|
db.lt.el.key workchain:int shard:long idx:int = db.lt.Key;
|
||||||
db.lt.desc.key workchain:int shard:long = db.lt.Key;
|
db.lt.desc.key workchain:int shard:long = db.lt.Key;
|
||||||
|
|
Binary file not shown.
|
@ -484,4 +484,14 @@ struct ValidatorSessionConfig {
|
||||||
static const td::uint32 BLOCK_HASH_COVERS_DATA_FROM_VERSION = 2;
|
static const td::uint32 BLOCK_HASH_COVERS_DATA_FROM_VERSION = 2;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct PersistentStateDescription : public td::CntObject {
|
||||||
|
BlockIdExt masterchain_id;
|
||||||
|
std::vector<BlockIdExt> shard_blocks;
|
||||||
|
UnixTime start_time, end_time;
|
||||||
|
|
||||||
|
virtual CntObject* make_copy() const {
|
||||||
|
return new PersistentStateDescription(*this);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace ton
|
} // namespace ton
|
||||||
|
|
|
@ -497,6 +497,14 @@ void RootDb::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
|
||||||
td::actor::send_closure(archive_db_, &ArchiveManager::set_async_mode, mode, std::move(promise));
|
td::actor::send_closure(archive_db_, &ArchiveManager::set_async_mode, mode, std::move(promise));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RootDb::add_persistent_state_description(td::Ref<PersistentStateDescription> desc, td::Promise<td::Unit> promise) {
|
||||||
|
td::actor::send_closure(state_db_, &StateDb::add_persistent_state_description, std::move(desc), std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
|
void RootDb::get_persistent_state_descriptions(td::Promise<std::vector<td::Ref<PersistentStateDescription>>> promise) {
|
||||||
|
td::actor::send_closure(state_db_, &StateDb::get_persistent_state_descriptions, std::move(promise));
|
||||||
|
}
|
||||||
|
|
||||||
void RootDb::run_gc(UnixTime ts, UnixTime archive_ttl) {
|
void RootDb::run_gc(UnixTime ts, UnixTime archive_ttl) {
|
||||||
td::actor::send_closure(archive_db_, &ArchiveManager::run_gc, ts, archive_ttl);
|
td::actor::send_closure(archive_db_, &ArchiveManager::run_gc, ts, archive_ttl);
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,6 +132,9 @@ class RootDb : public Db {
|
||||||
td::Promise<td::BufferSlice> promise) override;
|
td::Promise<td::BufferSlice> promise) override;
|
||||||
void set_async_mode(bool mode, td::Promise<td::Unit> promise) override;
|
void set_async_mode(bool mode, td::Promise<td::Unit> promise) override;
|
||||||
|
|
||||||
|
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc, td::Promise<td::Unit> promise) override;
|
||||||
|
void get_persistent_state_descriptions(td::Promise<std::vector<td::Ref<PersistentStateDescription>>> promise) override;
|
||||||
|
|
||||||
void run_gc(UnixTime ts, UnixTime archive_ttl) override;
|
void run_gc(UnixTime ts, UnixTime archive_ttl) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
|
@ -240,6 +240,101 @@ void StateDb::start_up() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void StateDb::add_persistent_state_description(td::Ref<PersistentStateDescription> desc,
|
||||||
|
td::Promise<td::Unit> promise) {
|
||||||
|
std::string value;
|
||||||
|
auto list_key = create_hash_tl_object<ton_api::db_state_key_persistentStateDescriptionsList>();
|
||||||
|
auto R = kv_->get(list_key.as_slice(), value);
|
||||||
|
R.ensure();
|
||||||
|
tl_object_ptr<ton_api::db_state_persistentStateDescriptionsList> list;
|
||||||
|
if (R.ok() == td::KeyValue::GetStatus::Ok) {
|
||||||
|
auto F = fetch_tl_object<ton_api::db_state_persistentStateDescriptionsList>(value, true);
|
||||||
|
F.ensure();
|
||||||
|
list = F.move_as_ok();
|
||||||
|
} else {
|
||||||
|
list = create_tl_object<ton_api::db_state_persistentStateDescriptionsList>(
|
||||||
|
std::vector<tl_object_ptr<ton_api::db_state_persistentStateDescriptionHeader>>());
|
||||||
|
}
|
||||||
|
for (const auto& obj : list->list_) {
|
||||||
|
if ((BlockSeqno)obj->masterchain_id_->seqno_ == desc->masterchain_id.seqno()) {
|
||||||
|
promise.set_error(td::Status::Error("duplicate masterchain seqno"));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto now = (UnixTime)td::Clocks::system();
|
||||||
|
size_t new_size = 0;
|
||||||
|
kv_->begin_write_batch().ensure();
|
||||||
|
for (auto& obj : list->list_) {
|
||||||
|
auto end_time = (UnixTime)obj->end_time_;
|
||||||
|
if (end_time <= now) {
|
||||||
|
auto key =
|
||||||
|
create_hash_tl_object<ton_api::db_state_key_persistentStateDescriptionShards>(obj->masterchain_id_->seqno_);
|
||||||
|
kv_->erase(key.as_slice()).ensure();
|
||||||
|
} else {
|
||||||
|
list->list_[new_size++] = std::move(obj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
list->list_.resize(new_size);
|
||||||
|
|
||||||
|
std::vector<tl_object_ptr<ton_api::tonNode_blockIdExt>> shard_blocks;
|
||||||
|
for (const BlockIdExt& block_id : desc->shard_blocks) {
|
||||||
|
shard_blocks.push_back(create_tl_block_id(block_id));
|
||||||
|
}
|
||||||
|
auto key =
|
||||||
|
create_hash_tl_object<ton_api::db_state_key_persistentStateDescriptionShards>(desc->masterchain_id.seqno());
|
||||||
|
kv_->set(key.as_slice(),
|
||||||
|
create_serialize_tl_object<ton_api::db_state_persistentStateDescriptionShards>(std::move(shard_blocks))
|
||||||
|
.as_slice())
|
||||||
|
.ensure();
|
||||||
|
|
||||||
|
list->list_.push_back(create_tl_object<ton_api::db_state_persistentStateDescriptionHeader>(
|
||||||
|
create_tl_block_id(desc->masterchain_id), desc->start_time, desc->end_time));
|
||||||
|
kv_->set(list_key.as_slice(), serialize_tl_object(list, true).as_slice()).ensure();
|
||||||
|
|
||||||
|
kv_->commit_write_batch().ensure();
|
||||||
|
|
||||||
|
promise.set_result(td::Unit());
|
||||||
|
}
|
||||||
|
|
||||||
|
void StateDb::get_persistent_state_descriptions(td::Promise<std::vector<td::Ref<PersistentStateDescription>>> promise) {
|
||||||
|
std::string value;
|
||||||
|
auto R = kv_->get(create_hash_tl_object<ton_api::db_state_key_persistentStateDescriptionsList>().as_slice(), value);
|
||||||
|
R.ensure();
|
||||||
|
if (R.ok() == td::KeyValue::GetStatus::NotFound) {
|
||||||
|
promise.set_value({});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto F = fetch_tl_object<ton_api::db_state_persistentStateDescriptionsList>(value, true);
|
||||||
|
F.ensure();
|
||||||
|
std::vector<td::Ref<PersistentStateDescription>> result;
|
||||||
|
auto now = (UnixTime)td::Clocks::system();
|
||||||
|
for (const auto& obj : F.ok()->list_) {
|
||||||
|
auto end_time = (UnixTime)obj->end_time_;
|
||||||
|
if (end_time <= now) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
PersistentStateDescription desc;
|
||||||
|
desc.start_time = (UnixTime)obj->start_time_;
|
||||||
|
desc.end_time = end_time;
|
||||||
|
desc.masterchain_id = create_block_id(obj->masterchain_id_);
|
||||||
|
auto key =
|
||||||
|
create_hash_tl_object<ton_api::db_state_key_persistentStateDescriptionShards>(desc.masterchain_id.seqno());
|
||||||
|
auto R2 = kv_->get(key.as_slice(), value);
|
||||||
|
R2.ensure();
|
||||||
|
if (R2.ok() == td::KeyValue::GetStatus::NotFound) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
auto F2 = fetch_tl_object<ton_api::db_state_persistentStateDescriptionShards>(value, true);
|
||||||
|
F2.ensure();
|
||||||
|
for (const auto& block_id : F2.ok()->shard_blocks_) {
|
||||||
|
desc.shard_blocks.push_back(create_block_id(block_id));
|
||||||
|
}
|
||||||
|
result.push_back(td::Ref<PersistentStateDescription>(true, std::move(desc)));
|
||||||
|
}
|
||||||
|
promise.set_result(std::move(result));
|
||||||
|
}
|
||||||
|
|
||||||
void StateDb::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise) {
|
void StateDb::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise) {
|
||||||
{
|
{
|
||||||
auto key = create_hash_tl_object<ton_api::db_state_key_asyncSerializer>();
|
auto key = create_hash_tl_object<ton_api::db_state_key_asyncSerializer>();
|
||||||
|
|
|
@ -50,8 +50,8 @@ class StateDb : public td::actor::Actor {
|
||||||
void update_hardforks(std::vector<BlockIdExt> blocks, td::Promise<td::Unit> promise);
|
void update_hardforks(std::vector<BlockIdExt> blocks, td::Promise<td::Unit> promise);
|
||||||
void get_hardforks(td::Promise<std::vector<BlockIdExt>> promise);
|
void get_hardforks(td::Promise<std::vector<BlockIdExt>> promise);
|
||||||
|
|
||||||
void update_db_version(td::uint32 version, td::Promise<td::Unit> promise);
|
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc, td::Promise<td::Unit> promise);
|
||||||
void get_db_version(td::Promise<td::uint32> promise);
|
void get_persistent_state_descriptions(td::Promise<std::vector<td::Ref<PersistentStateDescription>>> promise);
|
||||||
|
|
||||||
StateDb(td::actor::ActorId<RootDb> root_db, std::string path);
|
StateDb(td::actor::ActorId<RootDb> root_db, std::string path);
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "ton/ton-io.hpp"
|
#include "ton/ton-io.hpp"
|
||||||
#include "common/checksum.h"
|
#include "common/checksum.h"
|
||||||
#include "common/delay.h"
|
#include "common/delay.h"
|
||||||
|
#include "validator/downloaders/download-state.hpp"
|
||||||
|
|
||||||
namespace ton {
|
namespace ton {
|
||||||
|
|
||||||
|
@ -106,6 +107,19 @@ void WaitBlockState::start() {
|
||||||
});
|
});
|
||||||
td::actor::send_closure(manager_, &ValidatorManager::send_get_zero_state_request, handle_->id(), priority_,
|
td::actor::send_closure(manager_, &ValidatorManager::send_get_zero_state_request, handle_->id(), priority_,
|
||||||
std::move(P));
|
std::move(P));
|
||||||
|
} else if (check_persistent_state_desc()) {
|
||||||
|
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
|
||||||
|
if (R.is_error()) {
|
||||||
|
LOG(WARNING) << "failed to get persistent state: " << R.move_as_error();
|
||||||
|
td::actor::send_closure(SelfId, &WaitBlockState::start);
|
||||||
|
} else {
|
||||||
|
td::actor::send_closure(SelfId, &WaitBlockState::written_state, R.move_as_ok());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
BlockIdExt masterchain_id = persistent_state_desc_->masterchain_id;
|
||||||
|
td::actor::create_actor<DownloadShardState>("downloadstate", handle_->id(), masterchain_id, priority_, manager_,
|
||||||
|
timeout_, std::move(P))
|
||||||
|
.release();
|
||||||
} else if (!handle_->inited_prev() || (!handle_->inited_proof() && !handle_->inited_proof_link())) {
|
} else if (!handle_->inited_prev() || (!handle_->inited_proof() && !handle_->inited_proof_link())) {
|
||||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle = handle_](td::Result<td::BufferSlice> R) {
|
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle = handle_](td::Result<td::BufferSlice> R) {
|
||||||
if (R.is_error()) {
|
if (R.is_error()) {
|
||||||
|
|
|
@ -27,12 +27,14 @@ namespace validator {
|
||||||
class WaitBlockState : public td::actor::Actor {
|
class WaitBlockState : public td::actor::Actor {
|
||||||
public:
|
public:
|
||||||
WaitBlockState(BlockHandle handle, td::uint32 priority, td::actor::ActorId<ValidatorManager> manager,
|
WaitBlockState(BlockHandle handle, td::uint32 priority, td::actor::ActorId<ValidatorManager> manager,
|
||||||
td::Timestamp timeout, td::Promise<td::Ref<ShardState>> promise)
|
td::Timestamp timeout, td::Promise<td::Ref<ShardState>> promise,
|
||||||
|
td::Ref<PersistentStateDescription> persistent_state_desc = {})
|
||||||
: handle_(std::move(handle))
|
: handle_(std::move(handle))
|
||||||
, priority_(priority)
|
, priority_(priority)
|
||||||
, manager_(manager)
|
, manager_(manager)
|
||||||
, timeout_(timeout)
|
, timeout_(timeout)
|
||||||
, promise_(std::move(promise)) {
|
, promise_(std::move(promise))
|
||||||
|
, persistent_state_desc_(std::move(persistent_state_desc)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void abort_query(td::Status reason);
|
void abort_query(td::Status reason);
|
||||||
|
@ -73,6 +75,7 @@ class WaitBlockState : public td::actor::Actor {
|
||||||
td::actor::ActorId<ValidatorManager> manager_;
|
td::actor::ActorId<ValidatorManager> manager_;
|
||||||
td::Timestamp timeout_;
|
td::Timestamp timeout_;
|
||||||
td::Promise<td::Ref<ShardState>> promise_;
|
td::Promise<td::Ref<ShardState>> promise_;
|
||||||
|
td::Ref<PersistentStateDescription> persistent_state_desc_;
|
||||||
|
|
||||||
td::Ref<ShardState> prev_state_;
|
td::Ref<ShardState> prev_state_;
|
||||||
td::Ref<BlockData> block_;
|
td::Ref<BlockData> block_;
|
||||||
|
@ -81,6 +84,14 @@ class WaitBlockState : public td::actor::Actor {
|
||||||
td::Timestamp next_static_file_attempt_;
|
td::Timestamp next_static_file_attempt_;
|
||||||
|
|
||||||
//td::PerfWarningTimer perf_timer_{"waitstate", 1.0};
|
//td::PerfWarningTimer perf_timer_{"waitstate", 1.0};
|
||||||
|
|
||||||
|
bool check_persistent_state_desc() const {
|
||||||
|
if (persistent_state_desc_.is_null()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
auto now = (UnixTime)td::Clocks::system();
|
||||||
|
return persistent_state_desc_->end_time > now + 3600 && persistent_state_desc_->start_time < now - 6 * 3600;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace validator
|
} // namespace validator
|
||||||
|
|
|
@ -119,6 +119,11 @@ class Db : public td::actor::Actor {
|
||||||
td::Promise<td::BufferSlice> promise) = 0;
|
td::Promise<td::BufferSlice> promise) = 0;
|
||||||
virtual void set_async_mode(bool mode, td::Promise<td::Unit> promise) = 0;
|
virtual void set_async_mode(bool mode, td::Promise<td::Unit> promise) = 0;
|
||||||
|
|
||||||
|
virtual void add_persistent_state_description(td::Ref<PersistentStateDescription> desc,
|
||||||
|
td::Promise<td::Unit> promise) = 0;
|
||||||
|
virtual void get_persistent_state_descriptions(
|
||||||
|
td::Promise<std::vector<td::Ref<PersistentStateDescription>>> promise) = 0;
|
||||||
|
|
||||||
virtual void run_gc(UnixTime ts, UnixTime archive_ttl) = 0;
|
virtual void run_gc(UnixTime ts, UnixTime archive_ttl) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -170,6 +170,8 @@ class ValidatorManager : public ValidatorManagerInterface {
|
||||||
|
|
||||||
virtual void validated_new_block(BlockIdExt block_id) = 0;
|
virtual void validated_new_block(BlockIdExt block_id) = 0;
|
||||||
|
|
||||||
|
virtual void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) = 0;
|
||||||
|
|
||||||
static bool is_persistent_state(UnixTime ts, UnixTime prev_ts) {
|
static bool is_persistent_state(UnixTime ts, UnixTime prev_ts) {
|
||||||
return ts / (1 << 17) != prev_ts / (1 << 17);
|
return ts / (1 << 17) != prev_ts / (1 << 17);
|
||||||
}
|
}
|
||||||
|
|
|
@ -377,6 +377,9 @@ class ValidatorManagerImpl : public ValidatorManager {
|
||||||
}
|
}
|
||||||
void validated_new_block(BlockIdExt block_id) override {
|
void validated_new_block(BlockIdExt block_id) override {
|
||||||
}
|
}
|
||||||
|
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) override {
|
||||||
|
}
|
||||||
|
|
||||||
void get_validator_sessions_info(
|
void get_validator_sessions_info(
|
||||||
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) override {
|
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) override {
|
||||||
UNREACHABLE();
|
UNREACHABLE();
|
||||||
|
|
|
@ -437,6 +437,8 @@ class ValidatorManagerImpl : public ValidatorManager {
|
||||||
}
|
}
|
||||||
void validated_new_block(BlockIdExt block_id) override {
|
void validated_new_block(BlockIdExt block_id) override {
|
||||||
}
|
}
|
||||||
|
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) override {
|
||||||
|
}
|
||||||
void get_validator_sessions_info(
|
void get_validator_sessions_info(
|
||||||
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) override {
|
td::Promise<tl_object_ptr<ton_api::engine_validator_validatorSessionsInfo>> promise) override {
|
||||||
UNREACHABLE();
|
UNREACHABLE();
|
||||||
|
|
|
@ -574,9 +574,10 @@ void ValidatorManagerImpl::wait_block_state(BlockHandle handle, td::uint32 prior
|
||||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Ref<ShardState>> R) {
|
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Ref<ShardState>> R) {
|
||||||
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_state, handle, std::move(R));
|
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_state, handle, std::move(R));
|
||||||
});
|
});
|
||||||
auto id = td::actor::create_actor<WaitBlockState>("waitstate", handle, priority, actor_id(this),
|
auto id =
|
||||||
td::Timestamp::in(10.0), std::move(P))
|
td::actor::create_actor<WaitBlockState>("waitstate", handle, priority, actor_id(this), td::Timestamp::in(10.0),
|
||||||
.release();
|
std::move(P), get_block_persistent_state(handle->id()))
|
||||||
|
.release();
|
||||||
wait_state_[handle->id()].actor_ = id;
|
wait_state_[handle->id()].actor_ = id;
|
||||||
it = wait_state_.find(handle->id());
|
it = wait_state_.find(handle->id());
|
||||||
}
|
}
|
||||||
|
@ -1013,7 +1014,7 @@ void ValidatorManagerImpl::finished_wait_state(BlockHandle handle, td::Result<td
|
||||||
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_state, handle, std::move(R));
|
td::actor::send_closure(SelfId, &ValidatorManagerImpl::finished_wait_state, handle, std::move(R));
|
||||||
});
|
});
|
||||||
auto id = td::actor::create_actor<WaitBlockState>("waitstate", handle, X.second, actor_id(this), X.first,
|
auto id = td::actor::create_actor<WaitBlockState>("waitstate", handle, X.second, actor_id(this), X.first,
|
||||||
std::move(P))
|
std::move(P), get_block_persistent_state(handle->id()))
|
||||||
.release();
|
.release();
|
||||||
it->second.actor_ = id;
|
it->second.actor_ = id;
|
||||||
return;
|
return;
|
||||||
|
@ -1568,8 +1569,17 @@ void ValidatorManagerImpl::started(ValidatorManagerInitResult R) {
|
||||||
td::actor::send_closure(SelfId, &ValidatorManagerImpl::read_gc_list, R.move_as_ok());
|
td::actor::send_closure(SelfId, &ValidatorManagerImpl::read_gc_list, R.move_as_ok());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
td::actor::send_closure(db_, &Db::get_destroyed_validator_sessions, std::move(P));
|
td::actor::send_closure(db_, &Db::get_destroyed_validator_sessions, std::move(P));
|
||||||
|
|
||||||
|
auto Q = td::PromiseCreator::lambda(
|
||||||
|
[SelfId = actor_id(this)](td::Result<std::vector<td::Ref<PersistentStateDescription>>> R) {
|
||||||
|
if (R.is_error()) {
|
||||||
|
LOG(FATAL) << "db error: " << R.move_as_error();
|
||||||
|
} else {
|
||||||
|
td::actor::send_closure(SelfId, &ValidatorManagerImpl::got_persistent_state_descriptions, R.move_as_ok());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
td::actor::send_closure(db_, &Db::get_persistent_state_descriptions, std::move(Q));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ValidatorManagerImpl::read_gc_list(std::vector<ValidatorSessionId> list) {
|
void ValidatorManagerImpl::read_gc_list(std::vector<ValidatorSessionId> list) {
|
||||||
|
@ -2706,6 +2716,53 @@ void ValidatorManagerImpl::update_options(td::Ref<ValidatorManagerOptions> opts)
|
||||||
opts_ = std::move(opts);
|
opts_ = std::move(opts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ValidatorManagerImpl::add_persistent_state_description(td::Ref<PersistentStateDescription> desc) {
|
||||||
|
auto now = (UnixTime)td::Clocks::system();
|
||||||
|
if (desc->end_time <= now) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
td::actor::send_closure(db_, &Db::add_persistent_state_description, desc, [](td::Result<td::Unit>) {});
|
||||||
|
auto it = persistent_state_descriptions_.begin();
|
||||||
|
while (it != persistent_state_descriptions_.end()) {
|
||||||
|
const auto &prev_desc = it->second;
|
||||||
|
if (prev_desc->end_time <= now) {
|
||||||
|
for (const BlockIdExt &block_id : prev_desc->shard_blocks) {
|
||||||
|
persistent_state_blocks_.erase(block_id);
|
||||||
|
}
|
||||||
|
it = persistent_state_descriptions_.erase(it);
|
||||||
|
} else {
|
||||||
|
++it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
add_persistent_state_description_impl(std::move(desc));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ValidatorManagerImpl::add_persistent_state_description_impl(td::Ref<PersistentStateDescription> desc) {
|
||||||
|
if (!persistent_state_descriptions_.emplace(desc->masterchain_id.seqno(), desc).second) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG(DEBUG) << "Add persistent state description for mc block " << desc->masterchain_id.to_str()
|
||||||
|
<< " start_time=" << desc->start_time << " end_time=" << desc->end_time;
|
||||||
|
for (const BlockIdExt &block_id : desc->shard_blocks) {
|
||||||
|
persistent_state_blocks_[block_id] = desc;
|
||||||
|
LOG(DEBUG) << "Persistent state description: shard block " << block_id.to_str();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ValidatorManagerImpl::got_persistent_state_descriptions(std::vector<td::Ref<PersistentStateDescription>> descs) {
|
||||||
|
for (auto &desc : descs) {
|
||||||
|
add_persistent_state_description_impl(std::move(desc));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
td::Ref<PersistentStateDescription> ValidatorManagerImpl::get_block_persistent_state(BlockIdExt block_id) {
|
||||||
|
auto it = persistent_state_blocks_.find(block_id);
|
||||||
|
if (it == persistent_state_blocks_.end()) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
return it->second;
|
||||||
|
}
|
||||||
|
|
||||||
td::actor::ActorOwn<ValidatorManagerInterface> ValidatorManagerFactory::create(
|
td::actor::ActorOwn<ValidatorManagerInterface> ValidatorManagerFactory::create(
|
||||||
td::Ref<ValidatorManagerOptions> opts, std::string db_root, td::actor::ActorId<keyring::Keyring> keyring,
|
td::Ref<ValidatorManagerOptions> opts, std::string db_root, td::actor::ActorId<keyring::Keyring> keyring,
|
||||||
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
|
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
|
||||||
|
|
|
@ -555,6 +555,8 @@ class ValidatorManagerImpl : public ValidatorManager {
|
||||||
last = std::max(last, block_id.seqno());
|
last = std::max(last, block_id.seqno());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) override;
|
||||||
|
|
||||||
void add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override;
|
void add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) override;
|
||||||
void update_options(td::Ref<ValidatorManagerOptions> opts) override;
|
void update_options(td::Ref<ValidatorManagerOptions> opts) override;
|
||||||
|
|
||||||
|
@ -621,8 +623,11 @@ class ValidatorManagerImpl : public ValidatorManager {
|
||||||
}
|
}
|
||||||
void cleanup_last_validated_blocks(BlockId new_block);
|
void cleanup_last_validated_blocks(BlockId new_block);
|
||||||
|
|
||||||
private:
|
void got_persistent_state_descriptions(std::vector<td::Ref<PersistentStateDescription>> descs);
|
||||||
|
void add_persistent_state_description_impl(td::Ref<PersistentStateDescription> desc);
|
||||||
|
td::Ref<PersistentStateDescription> get_block_persistent_state(BlockIdExt block_id);
|
||||||
|
|
||||||
|
private:
|
||||||
std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_;
|
std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_;
|
||||||
|
|
||||||
std::map<adnl::AdnlNodeIdShort, td::actor::ActorOwn<CollatorNode>> collator_nodes_;
|
std::map<adnl::AdnlNodeIdShort, td::actor::ActorOwn<CollatorNode>> collator_nodes_;
|
||||||
|
@ -630,6 +635,9 @@ class ValidatorManagerImpl : public ValidatorManager {
|
||||||
std::set<ShardIdFull> shards_to_monitor_ = {ShardIdFull(masterchainId)};
|
std::set<ShardIdFull> shards_to_monitor_ = {ShardIdFull(masterchainId)};
|
||||||
std::set<ShardIdFull> extra_active_shards_;
|
std::set<ShardIdFull> extra_active_shards_;
|
||||||
std::map<ShardIdFull, BlockSeqno> last_validated_blocks_;
|
std::map<ShardIdFull, BlockSeqno> last_validated_blocks_;
|
||||||
|
|
||||||
|
std::map<BlockSeqno, td::Ref<PersistentStateDescription>> persistent_state_descriptions_;
|
||||||
|
std::map<BlockIdExt, td::Ref<PersistentStateDescription>> persistent_state_blocks_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace validator
|
} // namespace validator
|
||||||
|
|
|
@ -202,7 +202,7 @@ void ShardClient::apply_all_shards() {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
td::actor::send_closure(manager_, &ValidatorManager::wait_block_state_short, shard->top_block_id(),
|
td::actor::send_closure(manager_, &ValidatorManager::wait_block_state_short, shard->top_block_id(),
|
||||||
shard_client_priority(), td::Timestamp::in(600), std::move(Q));
|
shard_client_priority(), td::Timestamp::in(1500), std::move(Q));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -255,11 +255,9 @@ void ShardClient::build_shard_overlays() {
|
||||||
auto shard = info->shard();
|
auto shard = info->shard();
|
||||||
workchains.insert(shard.workchain);
|
workchains.insert(shard.workchain);
|
||||||
bool will_split = shard.pfx_len() < max_shard_pfx_len &&
|
bool will_split = shard.pfx_len() < max_shard_pfx_len &&
|
||||||
((info->fsm_state() == McShardHash::FsmState::fsm_split && info->fsm_utime() < cur_time + 60) ||
|
(info->fsm_state() == McShardHash::FsmState::fsm_split || info->before_split());
|
||||||
info->before_split());
|
bool will_merge =
|
||||||
bool will_merge = shard.pfx_len() > 0 &&
|
shard.pfx_len() > 0 && (info->fsm_state() == McShardHash::FsmState::fsm_merge || info->before_merge());
|
||||||
((info->fsm_state() == McShardHash::FsmState::fsm_merge && info->fsm_utime() < cur_time + 60) ||
|
|
||||||
info->before_merge());
|
|
||||||
if (opts_->need_monitor(shard) || (will_merge && opts_->need_monitor(shard_parent(shard)))) {
|
if (opts_->need_monitor(shard) || (will_merge && opts_->need_monitor(shard_parent(shard)))) {
|
||||||
new_shards_to_monitor.insert(shard);
|
new_shards_to_monitor.insert(shard);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
*/
|
*/
|
||||||
#include "state-serializer.hpp"
|
#include "state-serializer.hpp"
|
||||||
#include "td/utils/Random.h"
|
#include "td/utils/Random.h"
|
||||||
#include "adnl/utils.hpp"
|
|
||||||
#include "ton/ton-io.hpp"
|
#include "ton/ton-io.hpp"
|
||||||
#include "common/delay.h"
|
#include "common/delay.h"
|
||||||
|
|
||||||
|
@ -122,6 +121,21 @@ void AsyncStateSerializer::next_iteration() {
|
||||||
CHECK(masterchain_handle_->id() == last_block_id_);
|
CHECK(masterchain_handle_->id() == last_block_id_);
|
||||||
if (attempt_ < max_attempt() && last_key_block_id_.id.seqno < last_block_id_.id.seqno &&
|
if (attempt_ < max_attempt() && last_key_block_id_.id.seqno < last_block_id_.id.seqno &&
|
||||||
need_serialize(masterchain_handle_)) {
|
need_serialize(masterchain_handle_)) {
|
||||||
|
if (!stored_persistent_state_description_) {
|
||||||
|
LOG(INFO) << "storing persistent state description for " << masterchain_handle_->id().id;
|
||||||
|
running_ = true;
|
||||||
|
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
|
||||||
|
if (R.is_error()) {
|
||||||
|
td::actor::send_closure(SelfId, &AsyncStateSerializer::fail_handler,
|
||||||
|
R.move_as_error_prefix("failed to get masterchain state: "));
|
||||||
|
} else {
|
||||||
|
td::actor::send_closure(SelfId, &AsyncStateSerializer::store_persistent_state_description,
|
||||||
|
td::Ref<MasterchainState>(R.move_as_ok()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
td::actor::send_closure(manager_, &ValidatorManager::get_shard_state_from_db, masterchain_handle_, std::move(P));
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (!cell_db_reader_) {
|
if (!cell_db_reader_) {
|
||||||
running_ = true;
|
running_ = true;
|
||||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::shared_ptr<vm::CellDbReader>> R) {
|
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::shared_ptr<vm::CellDbReader>> R) {
|
||||||
|
@ -175,6 +189,7 @@ void AsyncStateSerializer::next_iteration() {
|
||||||
if (masterchain_handle_->inited_next_left()) {
|
if (masterchain_handle_->inited_next_left()) {
|
||||||
last_block_id_ = masterchain_handle_->one_next(true);
|
last_block_id_ = masterchain_handle_->one_next(true);
|
||||||
have_masterchain_state_ = false;
|
have_masterchain_state_ = false;
|
||||||
|
stored_persistent_state_description_ = false;
|
||||||
masterchain_handle_ = nullptr;
|
masterchain_handle_ = nullptr;
|
||||||
saved_to_db_ = false;
|
saved_to_db_ = false;
|
||||||
shards_.clear();
|
shards_.clear();
|
||||||
|
@ -189,6 +204,24 @@ void AsyncStateSerializer::got_top_masterchain_handle(BlockIdExt block_id) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void AsyncStateSerializer::store_persistent_state_description(td::Ref<MasterchainState> state) {
|
||||||
|
stored_persistent_state_description_ = true;
|
||||||
|
attempt_ = 0;
|
||||||
|
running_ = false;
|
||||||
|
|
||||||
|
PersistentStateDescription desc;
|
||||||
|
desc.masterchain_id = state->get_block_id();
|
||||||
|
desc.start_time = state->get_unix_time();
|
||||||
|
desc.end_time = ValidatorManager::persistent_state_ttl(desc.start_time);
|
||||||
|
for (const auto &v : state->get_shards()) {
|
||||||
|
desc.shard_blocks.push_back(v->top_block_id());
|
||||||
|
}
|
||||||
|
td::actor::send_closure(manager_, &ValidatorManager::add_persistent_state_description,
|
||||||
|
td::Ref<PersistentStateDescription>(true, std::move(desc)));
|
||||||
|
|
||||||
|
next_iteration();
|
||||||
|
}
|
||||||
|
|
||||||
void AsyncStateSerializer::got_cell_db_reader(std::shared_ptr<vm::CellDbReader> cell_db_reader) {
|
void AsyncStateSerializer::got_cell_db_reader(std::shared_ptr<vm::CellDbReader> cell_db_reader) {
|
||||||
cell_db_reader_ = std::move(cell_db_reader);
|
cell_db_reader_ = std::move(cell_db_reader);
|
||||||
running_ = false;
|
running_ = false;
|
||||||
|
|
|
@ -44,6 +44,7 @@ class AsyncStateSerializer : public td::actor::Actor {
|
||||||
|
|
||||||
std::shared_ptr<vm::CellDbReader> cell_db_reader_ = nullptr;
|
std::shared_ptr<vm::CellDbReader> cell_db_reader_ = nullptr;
|
||||||
BlockHandle masterchain_handle_;
|
BlockHandle masterchain_handle_;
|
||||||
|
bool stored_persistent_state_description_ = false;
|
||||||
bool have_masterchain_state_ = false;
|
bool have_masterchain_state_ = false;
|
||||||
|
|
||||||
std::vector<BlockIdExt> shards_;
|
std::vector<BlockIdExt> shards_;
|
||||||
|
@ -71,6 +72,7 @@ class AsyncStateSerializer : public td::actor::Actor {
|
||||||
|
|
||||||
void next_iteration();
|
void next_iteration();
|
||||||
void got_top_masterchain_handle(BlockIdExt block_id);
|
void got_top_masterchain_handle(BlockIdExt block_id);
|
||||||
|
void store_persistent_state_description(td::Ref<MasterchainState> state);
|
||||||
void got_cell_db_reader(std::shared_ptr<vm::CellDbReader> cell_db_reader);
|
void got_cell_db_reader(std::shared_ptr<vm::CellDbReader> cell_db_reader);
|
||||||
void got_masterchain_handle(BlockHandle handle_);
|
void got_masterchain_handle(BlockHandle handle_);
|
||||||
void got_masterchain_state(td::Ref<MasterchainState> state);
|
void got_masterchain_state(td::Ref<MasterchainState> state);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue