1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

speed up synchronization

- download old files in chunks
- updated docs
- fixed elector/config smartcontracts
This commit is contained in:
ton 2019-11-18 22:15:14 +04:00
parent 0dae2c157b
commit 7f3a22a217
21 changed files with 365 additions and 191 deletions

View file

@ -250,8 +250,7 @@ void ArchiveManager::get_file_short_cont(FileReference ref_id, PackageId idx, td
void ArchiveManager::get_file(BlockHandle handle, FileReference ref_id, td::Promise<td::BufferSlice> promise) {
if (handle->moved_to_archive()) {
auto f = get_file_desc(handle->id().shard_full(), PackageId{handle->masterchain_ref_block(), false, false}, 0, 0, 0,
false);
auto f = get_file_desc(handle->id().shard_full(), get_package_id(handle->masterchain_ref_block()), 0, 0, 0, false);
if (f) {
td::actor::send_closure(f->file_actor_id(), &ArchiveSlice::get_file, std::move(ref_id), std::move(promise));
return;

View file

@ -193,6 +193,9 @@ void DownloadShardState::written_shard_state(td::Ref<ShardState> state) {
handle_->set_logical_time(state_->get_logical_time());
handle_->set_applied();
handle_->set_split(state_->before_split());
if (!block_id_.is_masterchain()) {
handle_->set_masterchain_ref_block(masterchain_block_id_.seqno());
}
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle = handle_](td::Result<td::Unit> R) {
CHECK(handle->handle_moved_to_archive());

View file

@ -99,7 +99,6 @@ void FullNodeImpl::initial_read_complete(BlockHandle top_handle) {
}
void FullNodeImpl::add_shard(ShardIdFull shard) {
LOG(WARNING) << "add shard " << shard;
while (true) {
if (shards_.count(shard) == 0) {
shards_.emplace(shard, FullNodeShard::create(shard, local_id_, adnl_id_, zero_state_file_hash_, keyring_, adnl_,
@ -239,6 +238,7 @@ void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, std::string tm
}
td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard) {
add_shard(ShardIdFull{shard.workchain, shardIdAll});
while (shards_.count(shard) == 0) {
if (shard.shard == shardIdAll) {
return td::actor::ActorId<FullNodeShard>{};

View file

@ -5,6 +5,8 @@
#include "td/actor/MultiPromise.h"
#include "common/checksum.h"
#include "td/utils/port/path.h"
#include "ton/ton-io.hpp"
#include "downloaders/download-state.hpp"
namespace ton {
@ -94,79 +96,85 @@ void ArchiveImporter::check_masterchain_block(BlockSeqno seqno) {
checked_all_masterchain_blocks(seqno - 1);
return;
}
if (seqno < state_->get_block_id().seqno()) {
if (!state_->check_old_mc_block_id(it->second)) {
abort_query(td::Status::Error(ErrorCode::protoviolation, "bad old masterchain block id"));
return;
}
check_masterchain_block(seqno + 1);
} else if (seqno == state_->get_block_id().seqno()) {
if (state_->get_block_id() != it->second) {
abort_query(td::Status::Error(ErrorCode::protoviolation, "bad old masterchain block id"));
return;
}
check_masterchain_block(seqno + 1);
} else {
if (seqno != state_->get_block_id().seqno() + 1) {
abort_query(td::Status::Error(ErrorCode::protoviolation, "hole in masterchain seqno"));
return;
}
auto it2 = blocks_.find(it->second);
CHECK(it2 != blocks_.end());
auto R1 = package_->read(it2->second[0]);
if (R1.is_error()) {
abort_query(R1.move_as_error());
return;
}
auto proofR = create_proof(it->second, std::move(R1.move_as_ok().second));
if (proofR.is_error()) {
abort_query(proofR.move_as_error());
return;
}
auto R2 = package_->read(it2->second[1]);
if (R2.is_error()) {
abort_query(R2.move_as_error());
return;
}
if (sha256_bits256(R2.ok().second.as_slice()) != it->second.file_hash) {
abort_query(td::Status::Error(ErrorCode::protoviolation, "bad block file hash"));
return;
}
auto dataR = create_block(it->second, std::move(R2.move_as_ok().second));
if (dataR.is_error()) {
abort_query(dataR.move_as_error());
return;
}
auto proof = proofR.move_as_ok();
auto data = dataR.move_as_ok();
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), id = state_->get_block_id(),
data](td::Result<BlockHandle> R) mutable {
if (R.is_error()) {
td::actor::send_closure(SelfId, &ArchiveImporter::abort_query, R.move_as_error());
while (seqno <= state_->get_block_id().seqno()) {
if (seqno < state_->get_block_id().seqno()) {
if (!state_->check_old_mc_block_id(it->second)) {
abort_query(td::Status::Error(ErrorCode::protoviolation, "bad old masterchain block id"));
return;
}
auto handle = R.move_as_ok();
CHECK(!handle->merge_before());
if (handle->one_prev(true) != id) {
td::actor::send_closure(SelfId, &ArchiveImporter::abort_query,
td::Status::Error(ErrorCode::protoviolation, "prev block mismatch"));
} else {
if (state_->get_block_id() != it->second) {
abort_query(td::Status::Error(ErrorCode::protoviolation, "bad old masterchain block id"));
return;
}
td::actor::send_closure(SelfId, &ArchiveImporter::checked_masterchain_proof, std::move(handle), std::move(data));
});
run_check_proof_query(it->second, std::move(proof), manager_, td::Timestamp::in(2.0), std::move(P), state_,
opts_->is_hardfork(it->second));
}
seqno++;
it = masterchain_blocks_.find(seqno);
if (it == masterchain_blocks_.end()) {
checked_all_masterchain_blocks(seqno - 1);
return;
}
}
if (seqno != state_->get_block_id().seqno() + 1) {
abort_query(td::Status::Error(ErrorCode::protoviolation, "hole in masterchain seqno"));
return;
}
auto it2 = blocks_.find(it->second);
CHECK(it2 != blocks_.end());
auto R1 = package_->read(it2->second[0]);
if (R1.is_error()) {
abort_query(R1.move_as_error());
return;
}
auto proofR = create_proof(it->second, std::move(R1.move_as_ok().second));
if (proofR.is_error()) {
abort_query(proofR.move_as_error());
return;
}
auto R2 = package_->read(it2->second[1]);
if (R2.is_error()) {
abort_query(R2.move_as_error());
return;
}
if (sha256_bits256(R2.ok().second.as_slice()) != it->second.file_hash) {
abort_query(td::Status::Error(ErrorCode::protoviolation, "bad block file hash"));
return;
}
auto dataR = create_block(it->second, std::move(R2.move_as_ok().second));
if (dataR.is_error()) {
abort_query(dataR.move_as_error());
return;
}
auto proof = proofR.move_as_ok();
auto data = dataR.move_as_ok();
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), id = state_->get_block_id(),
data](td::Result<BlockHandle> R) mutable {
if (R.is_error()) {
td::actor::send_closure(SelfId, &ArchiveImporter::abort_query, R.move_as_error());
return;
}
auto handle = R.move_as_ok();
CHECK(!handle->merge_before());
if (handle->one_prev(true) != id) {
td::actor::send_closure(SelfId, &ArchiveImporter::abort_query,
td::Status::Error(ErrorCode::protoviolation, "prev block mismatch"));
return;
}
td::actor::send_closure(SelfId, &ArchiveImporter::checked_masterchain_proof, std::move(handle), std::move(data));
});
run_check_proof_query(it->second, std::move(proof), manager_, td::Timestamp::in(2.0), std::move(P), state_,
opts_->is_hardfork(it->second));
}
void ArchiveImporter::checked_masterchain_proof(BlockHandle handle, td::Ref<BlockData> data) {
CHECK(data.not_null());
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle](td::Result<td::Unit> R) {
R.ensure();
td::actor::send_closure(SelfId, &ArchiveImporter::applied_masterchain_block, std::move(handle));
@ -189,17 +197,16 @@ void ArchiveImporter::got_new_materchain_state(td::Ref<MasterchainState> state)
}
void ArchiveImporter::checked_all_masterchain_blocks(BlockSeqno seqno) {
max_shard_client_seqno_ = seqno;
check_next_shard_client_seqno(shard_client_seqno_ + 1);
}
void ArchiveImporter::check_next_shard_client_seqno(BlockSeqno seqno) {
if (seqno > max_shard_client_seqno_) {
if (seqno > state_->get_seqno()) {
finish_query();
return;
}
if (seqno == max_shard_client_seqno_) {
if (seqno == state_->get_seqno()) {
got_masterchain_state(state_);
} else {
BlockIdExt b;
@ -217,14 +224,13 @@ void ArchiveImporter::check_next_shard_client_seqno(BlockSeqno seqno) {
void ArchiveImporter::got_masterchain_state(td::Ref<MasterchainState> state) {
auto s = state->get_shards();
auto P = td::PromiseCreator::lambda(
[SelfId = actor_id(this), seqno = state->get_block_id().seqno()](td::Result<td::Unit> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &ArchiveImporter::abort_query, R.move_as_error());
} else {
td::actor::send_closure(SelfId, &ArchiveImporter::check_next_shard_client_seqno, seqno + 1);
}
});
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), seqno = state->get_seqno()](td::Result<td::Unit> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &ArchiveImporter::abort_query, R.move_as_error());
} else {
td::actor::send_closure(SelfId, &ArchiveImporter::checked_shard_client_seqno, seqno);
}
});
td::MultiPromise mp;
auto ig = mp.init_guard();
@ -235,6 +241,12 @@ void ArchiveImporter::got_masterchain_state(td::Ref<MasterchainState> state) {
}
}
void ArchiveImporter::checked_shard_client_seqno(BlockSeqno seqno) {
CHECK(shard_client_seqno_ + 1 == seqno);
shard_client_seqno_++;
check_next_shard_client_seqno(seqno + 1);
}
void ArchiveImporter::apply_shard_block(BlockIdExt block_id, BlockIdExt masterchain_block_id,
td::Promise<td::Unit> promise) {
auto P = td::PromiseCreator::lambda(
@ -253,9 +265,18 @@ void ArchiveImporter::apply_shard_block_cont1(BlockHandle handle, BlockIdExt mas
return;
}
if (handle->id().seqno() == 0) {
auto P = td::PromiseCreator::lambda(
[promise = std::move(promise)](td::Result<td::Ref<ShardState>> R) mutable { promise.set_value(td::Unit()); });
td::actor::create_actor<DownloadShardState>("downloadstate", handle->id(), masterchain_block_id, 2, manager_,
td::Timestamp::in(3600), std::move(P))
.release();
return;
}
auto it = blocks_.find(handle->id());
if (it == blocks_.end()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "no proof for shard block"));
promise.set_error(td::Status::Error(ErrorCode::notready, PSTRING() << "no proof for shard block " << handle->id()));
return;
}
TRY_RESULT_PROMISE(promise, data, package_->read(it->second[0]));
@ -280,21 +301,21 @@ void ArchiveImporter::apply_shard_block_cont2(BlockHandle handle, BlockIdExt mas
}
CHECK(handle->id().seqno() > 0);
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle, masterchain_block_id,
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error());
} else {
td::actor::send_closure(SelfId, &ArchiveImporter::apply_shard_block_cont3, std::move(handle),
masterchain_block_id, std::move(promise));
}
});
if (!handle->merge_before() && handle->one_prev(true).shard_full() == handle->id().shard_full()) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle, masterchain_block_id,
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error());
} else {
td::actor::send_closure(SelfId, &ArchiveImporter::apply_shard_block_cont3, std::move(handle),
masterchain_block_id, std::move(promise));
}
});
apply_shard_block(handle->one_prev(true), masterchain_block_id, std::move(P));
} else {
td::MultiPromise mp;
auto ig = mp.init_guard();
ig.add_promise(std::move(promise));
ig.add_promise(std::move(P));
check_shard_block_applied(handle->one_prev(true), ig.get_promise());
if (handle->merge_before()) {
check_shard_block_applied(handle->one_prev(false), ig.get_promise());
@ -335,15 +356,12 @@ void ArchiveImporter::check_shard_block_applied(BlockIdExt block_id, td::Promise
}
void ArchiveImporter::abort_query(td::Status error) {
if (promise_) {
promise_.set_error(std::move(error));
td::unlink(path_).ensure();
}
stop();
LOG(INFO) << error;
finish_query();
}
void ArchiveImporter::finish_query() {
if (promise_) {
promise_.set_value(std::vector<BlockSeqno>(state_->get_block_id().seqno(), max_shard_client_seqno_));
promise_.set_value(std::vector<BlockSeqno>{state_->get_seqno(), shard_client_seqno_});
td::unlink(path_).ensure();
}
stop();

View file

@ -25,6 +25,7 @@ class ArchiveImporter : public td::actor::Actor {
void checked_all_masterchain_blocks(BlockSeqno seqno);
void check_next_shard_client_seqno(BlockSeqno seqno);
void checked_shard_client_seqno(BlockSeqno seqno);
void got_masterchain_state(td::Ref<MasterchainState> state);
void apply_shard_block(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise);
void apply_shard_block_cont1(BlockHandle handle, BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise);
@ -36,7 +37,6 @@ class ArchiveImporter : public td::actor::Actor {
std::string path_;
td::Ref<MasterchainState> state_;
BlockSeqno shard_client_seqno_;
BlockSeqno max_shard_client_seqno_;
td::Ref<ValidatorManagerOptions> opts_;

View file

@ -414,7 +414,8 @@ void ValidatorManagerMasterchainStarter::got_key_block_handle(BlockHandle handle
void ValidatorManagerMasterchainStarter::got_shard_block_id(BlockIdExt block_id) {
client_block_id_ = block_id;
finish();
start_shard_client();
return;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::vector<BlockIdExt>> R) {
R.ensure();
@ -436,7 +437,7 @@ void ValidatorManagerMasterchainStarter::got_hardforks(std::vector<BlockIdExt> v
return;
}
}
finish();
start_shard_client();
return;
}
if (h.size() > vec.size() + 1) {
@ -565,13 +566,20 @@ void ValidatorManagerMasterchainStarter::truncated() {
void ValidatorManagerMasterchainStarter::written_next() {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
R.ensure();
td::actor::send_closure(SelfId, &ValidatorManagerMasterchainStarter::finish);
td::actor::send_closure(SelfId, &ValidatorManagerMasterchainStarter::start_shard_client);
});
td::actor::send_closure(db_, &Db::update_hardforks, opts_->get_hardforks(), std::move(P));
}
void ValidatorManagerMasterchainStarter::start_shard_client() {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
R.ensure();
td::actor::send_closure(SelfId, &ValidatorManagerMasterchainStarter::finish);
});
client_ = td::actor::create_actor<ShardClient>("shardclient", opts_, manager_, std::move(P));
}
void ValidatorManagerMasterchainStarter::finish() {
client_ = td::actor::create_actor<ShardClient>("shardclient", opts_, manager_);
promise_.set_value(
ValidatorManagerInitResult{handle_, state_, std::move(client_), gc_handle_, gc_state_, last_key_block_handle_});
stop();

View file

@ -105,6 +105,7 @@ class ValidatorManagerMasterchainStarter : public td::actor::Actor {
void got_prev_key_block_handle(BlockHandle handle);
void truncated();
void written_next();
void start_shard_client();
void finish();
private:

View file

@ -1414,7 +1414,6 @@ void ValidatorManagerImpl::started(ValidatorManagerInitResult R) {
gc_masterchain_state_ = std::move(R.gc_state);
shard_client_ = std::move(R.clients);
td::actor::send_closure(shard_client_, &ShardClient::start);
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::vector<ValidatorSessionId>> R) {
if (R.is_error()) {
@ -1429,8 +1428,6 @@ void ValidatorManagerImpl::started(ValidatorManagerInitResult R) {
});
td::actor::send_closure(db_, &Db::get_destroyed_validator_sessions, std::move(P));
send_peek_key_block_request();
}
void ValidatorManagerImpl::read_gc_list(std::vector<ValidatorSessionId> list) {
@ -1443,6 +1440,104 @@ void ValidatorManagerImpl::read_gc_list(std::vector<ValidatorSessionId> list) {
serializer_ =
td::actor::create_actor<AsyncStateSerializer>("serializer", last_key_block_handle_->id(), opts_, actor_id(this));
if (!out_of_sync()) {
completed_prestart_sync();
} else {
prestart_sync();
}
}
bool ValidatorManagerImpl::out_of_sync() {
if (last_masterchain_block_handle_->unix_time() + 600 > td::Clocks::system()) {
return false;
}
if (validator_groups_.size() > 0 && last_known_key_block_handle_->id().seqno() <= last_masterchain_seqno_) {
return false;
}
return true;
}
void ValidatorManagerImpl::prestart_sync() {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
R.ensure();
td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive);
});
td::actor::send_closure(db_, &Db::set_async_mode, false, std::move(P));
}
void ValidatorManagerImpl::download_next_archive() {
if (!out_of_sync()) {
finish_prestart_sync();
return;
}
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::string> R) {
if (R.is_error()) {
LOG(INFO) << "failed to download archive slice: " << R.error();
td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive);
} else {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::downloaded_archive_slice, R.move_as_ok());
}
});
callback_->download_archive(shard_client_handle_->id().seqno() + 1, db_root_ + "/tmp/", td::Timestamp::in(3600.0),
std::move(P));
}
void ValidatorManagerImpl::downloaded_archive_slice(std::string name) {
LOG(INFO) << "downloaded archive slice: " << name;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::vector<BlockSeqno>> R) {
if (R.is_error()) {
LOG(INFO) << "failed to check downloaded archive slice: " << R.error();
td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive);
} else {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::checked_archive_slice, R.move_as_ok());
}
});
td::actor::create_actor<ArchiveImporter>("archiveimport", name, last_masterchain_state_,
shard_client_handle_->id().seqno(), opts_, actor_id(this), std::move(P))
.release();
}
void ValidatorManagerImpl::checked_archive_slice(std::vector<BlockSeqno> seqno) {
CHECK(seqno.size() == 2);
LOG(INFO) << "checked downloaded archive slice: mc_top_seqno=" << seqno[0] << " shard_top_seqno_=" << seqno[1];
CHECK(seqno[0] <= last_masterchain_seqno_);
BlockIdExt b;
CHECK(last_masterchain_state_->get_old_mc_block_id(seqno[1], b));
auto P = td::PromiseCreator::lambda(
[SelfId = actor_id(this), db = db_.get(), client = shard_client_.get()](td::Result<BlockHandle> R) {
R.ensure();
auto handle = R.move_as_ok();
auto P = td::PromiseCreator::lambda([SelfId, client, handle](td::Result<td::Ref<ShardState>> R) mutable {
auto P = td::PromiseCreator::lambda([SelfId](td::Result<td::Unit> R) {
R.ensure();
td::actor::send_closure(SelfId, &ValidatorManagerImpl::download_next_archive);
});
td::actor::send_closure(client, &ShardClient::force_update_shard_client_ex, std::move(handle),
td::Ref<MasterchainState>{R.move_as_ok()}, std::move(P));
});
td::actor::send_closure(db, &Db::get_block_state, std::move(handle), std::move(P));
});
get_block_handle(b, true, std::move(P));
}
void ValidatorManagerImpl::finish_prestart_sync() {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
R.ensure();
td::actor::send_closure(SelfId, &ValidatorManagerImpl::completed_prestart_sync);
});
td::actor::send_closure(db_, &Db::set_async_mode, false, std::move(P));
}
void ValidatorManagerImpl::completed_prestart_sync() {
td::actor::send_closure(shard_client_, &ShardClient::start);
send_peek_key_block_request();
LOG(WARNING) << "initial read complete: " << last_masterchain_block_handle_->id() << " "
<< last_masterchain_block_id_;
callback_->initial_read_complete(last_masterchain_block_handle_);
@ -1979,11 +2074,6 @@ void ValidatorManagerImpl::alarm() {
}
log_status_at_ = td::Timestamp::in(60.0);
}
if (false && !downloading_archive_slice_ && shard_client_handle_ &&
shard_client_handle_->unix_time() + 600 <= td::Clocks::system() && next_download_archive_slice_at_.is_in_past()) {
next_download_archive_slice_at_ = td::Timestamp::in(10.0);
try_download_archive_slice();
}
alarm_timestamp().relax(log_status_at_);
if (resend_shard_blocks_at_ && resend_shard_blocks_at_.is_in_past()) {
resend_shard_blocks_at_ = td::Timestamp::never();
@ -2053,49 +2143,6 @@ void ValidatorManagerImpl::try_get_static_file(FileHash file_hash, td::Promise<t
td::actor::send_closure(db_, &Db::try_get_static_file, file_hash, std::move(promise));
}
void ValidatorManagerImpl::try_download_archive_slice() {
CHECK(shard_client_handle_);
downloading_archive_slice_ = true;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::string> R) {
if (R.is_error()) {
LOG(INFO) << "failed to download archive slice: " << R.error();
td::actor::send_closure(SelfId, &ValidatorManagerImpl::failed_to_download_archive_slice);
} else {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::downloaded_archive_slice, R.move_as_ok());
}
});
callback_->download_archive(shard_client_handle_->id().seqno(), db_root_ + "/tmp/", td::Timestamp::in(3600.0),
std::move(P));
}
void ValidatorManagerImpl::failed_to_download_archive_slice() {
downloading_archive_slice_ = false;
}
void ValidatorManagerImpl::downloaded_archive_slice(std::string name) {
LOG(INFO) << "downloaded archive slice: " << name;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<std::vector<BlockSeqno>> R) {
if (R.is_error()) {
LOG(INFO) << "failed to check downloaded archive slice: " << R.error();
td::actor::send_closure(SelfId, &ValidatorManagerImpl::failed_to_download_archive_slice);
} else {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::checked_archive_slice, R.move_as_ok());
}
});
td::actor::create_actor<ArchiveImporter>("archiveimport", name, last_masterchain_state_,
shard_client_handle_->id().seqno(), opts_, actor_id(this), std::move(P))
.release();
}
void ValidatorManagerImpl::checked_archive_slice(std::vector<BlockSeqno> seqno) {
CHECK(seqno.size() == 2);
LOG(INFO) << "checked downloaded archive slice: mc_top_seqno=" << seqno[0] << " shard_top_seqno_=" << seqno[1];
downloading_archive_slice_ = false;
next_download_archive_slice_at_ = td::Timestamp::in(10.0);
}
void ValidatorManagerImpl::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::uint64> promise) {
td::actor::send_closure(db_, &Db::get_archive_id, masterchain_seqno, std::move(promise));
}

View file

@ -261,6 +261,14 @@ class ValidatorManagerImpl : public ValidatorManager {
void update_gc_block_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;
void update_shard_client_block_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;
bool out_of_sync();
void prestart_sync();
void download_next_archive();
void downloaded_archive_slice(std::string name);
void checked_archive_slice(std::vector<BlockSeqno> seqno);
void finish_prestart_sync();
void completed_prestart_sync();
public:
void install_callback(std::unique_ptr<Callback> new_callback, td::Promise<td::Unit> promise) override {
callback_ = std::move(new_callback);
@ -427,10 +435,6 @@ class ValidatorManagerImpl : public ValidatorManager {
void get_async_serializer_state(td::Promise<AsyncSerializerState> promise) override;
void try_get_static_file(FileHash file_hash, td::Promise<td::BufferSlice> promise) override;
void try_download_archive_slice();
void downloaded_archive_slice(std::string name);
void checked_archive_slice(std::vector<BlockSeqno> seqno);
void failed_to_download_archive_slice();
void get_download_token(size_t download_size, td::uint32 priority, td::Timestamp timeout,
td::Promise<std::unique_ptr<DownloadToken>> promise) override {
@ -558,9 +562,6 @@ class ValidatorManagerImpl : public ValidatorManager {
bool started_ = false;
bool allow_validate_ = false;
bool downloading_archive_slice_ = false;
td::Timestamp next_download_archive_slice_at_ = td::Timestamp::now();
private:
double state_ttl() const {
return opts_->state_ttl();

View file

@ -40,11 +40,9 @@ void ShardClient::start_up() {
}
void ShardClient::start() {
if (!started_ && masterchain_state_.not_null()) {
started_ = true;
apply_all_shards();
} else {
if (!started_) {
started_ = true;
saved_to_db();
}
}
@ -52,7 +50,31 @@ void ShardClient::got_state_from_db(BlockIdExt state) {
CHECK(!init_mode_);
CHECK(state.is_valid());
new_masterchain_block_id(state);
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<BlockHandle> R) {
R.ensure();
td::actor::send_closure(SelfId, &ShardClient::got_init_handle_from_db, R.move_as_ok());
});
td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, state, true, std::move(P));
}
void ShardClient::got_init_handle_from_db(BlockHandle handle) {
masterchain_block_handle_ = std::move(handle);
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Ref<ShardState>> R) {
R.ensure();
td::actor::send_closure(SelfId, &ShardClient::got_init_state_from_db, td::Ref<MasterchainState>{R.move_as_ok()});
});
td::actor::send_closure(manager_, &ValidatorManager::get_shard_state_from_db, masterchain_block_handle_,
std::move(P));
}
void ShardClient::got_init_state_from_db(td::Ref<MasterchainState> state) {
masterchain_state_ = std::move(state);
build_shard_overlays();
masterchain_state_.clear();
saved_to_db();
}
void ShardClient::start_up_init_mode() {
@ -97,14 +119,19 @@ void ShardClient::applied_all_shards() {
}
void ShardClient::saved_to_db() {
if (init_mode_) {
promise_.set_value(td::Unit());
init_mode_ = false;
}
CHECK(masterchain_block_handle_);
td::actor::send_closure(manager_, &ValidatorManager::update_shard_client_block_handle, masterchain_block_handle_,
[](td::Unit) {});
if (promise_) {
promise_.set_value(td::Unit());
}
if (init_mode_) {
init_mode_ = false;
}
if (!started_) {
return;
}
if (masterchain_block_handle_->inited_next_left()) {
new_masterchain_block_id(masterchain_block_handle_->one_next(true));
} else {
@ -239,6 +266,40 @@ void ShardClient::build_shard_overlays() {
}
}
void ShardClient::force_update_shard_client(BlockHandle handle, td::Promise<td::Unit> promise) {
CHECK(!init_mode_);
CHECK(!started_);
if (masterchain_block_handle_->id().seqno() >= handle->id().seqno()) {
promise.set_value(td::Unit());
return;
}
auto P = td::PromiseCreator::lambda(
[SelfId = actor_id(this), handle, promise = std::move(promise)](td::Result<td::Ref<ShardState>> R) mutable {
R.ensure();
td::actor::send_closure(SelfId, &ShardClient::force_update_shard_client_ex, std::move(handle),
td::Ref<MasterchainState>{R.move_as_ok()}, std::move(promise));
});
td::actor::send_closure(manager_, &ValidatorManager::get_shard_state_from_db, std::move(handle), std::move(P));
}
void ShardClient::force_update_shard_client_ex(BlockHandle handle, td::Ref<MasterchainState> state,
td::Promise<td::Unit> promise) {
CHECK(!init_mode_);
CHECK(!started_);
if (masterchain_block_handle_->id().seqno() >= handle->id().seqno()) {
promise.set_value(td::Unit());
return;
}
masterchain_block_handle_ = std::move(handle);
masterchain_state_ = std::move(state);
promise_ = std::move(promise);
build_shard_overlays();
applied_all_shards();
}
} // namespace validator
} // namespace ton

View file

@ -55,8 +55,9 @@ class ShardClient : public td::actor::Actor {
, promise_(std::move(promise)) {
init_mode_ = true;
}
ShardClient(td::Ref<ValidatorManagerOptions> opts, td::actor::ActorId<ValidatorManager> manager)
: opts_(std::move(opts)), manager_(manager) {
ShardClient(td::Ref<ValidatorManagerOptions> opts, td::actor::ActorId<ValidatorManager> manager,
td::Promise<td::Unit> promise)
: opts_(std::move(opts)), manager_(manager), promise_(std::move(promise)) {
}
static constexpr td::uint32 shard_client_priority() {
@ -70,6 +71,8 @@ class ShardClient : public td::actor::Actor {
void start_up_init_mode_finished();
void start();
void got_state_from_db(BlockIdExt masterchain_block_id);
void got_init_handle_from_db(BlockHandle handle);
void got_init_state_from_db(td::Ref<MasterchainState> state);
void im_download_shard_state(BlockIdExt block_id, td::Promise<td::Unit> promise);
void im_downloaded_zero_state(BlockIdExt block_id, td::BufferSlice data, td::Promise<td::Unit> promise);
@ -91,6 +94,9 @@ class ShardClient : public td::actor::Actor {
void get_processed_masterchain_block(td::Promise<BlockSeqno> promise);
void get_processed_masterchain_block_id(td::Promise<BlockIdExt> promise);
void force_update_shard_client(BlockHandle handle, td::Promise<td::Unit> promise);
void force_update_shard_client_ex(BlockHandle handle, td::Ref<MasterchainState> state, td::Promise<td::Unit> promise);
};
} // namespace validator