diff --git a/validator/impl/liteserver.cpp b/validator/impl/liteserver.cpp index 9c7a0456..86fc690a 100644 --- a/validator/impl/liteserver.cpp +++ b/validator/impl/liteserver.cpp @@ -505,20 +505,7 @@ void LiteQuery::perform_sendMessage(td::BufferSlice data) { } void LiteQuery::get_block_handle_checked(BlockIdExt blkid, td::Promise promise) { - auto P = td::PromiseCreator::lambda( - [promise = std::move(promise)](td::Result R) mutable { - if (R.is_error()) { - promise.set_error(R.move_as_error()); - } else { - auto handle = R.move_as_ok(); - if (handle->is_applied()) { - promise.set_result(std::move(handle)); - } else { - promise.set_error(td::Status::Error(ErrorCode::notready, "block is not applied")); - } - } - }); - td::actor::send_closure(manager_, &ValidatorManager::get_block_handle, blkid, false, std::move(P)); + td::actor::send_closure(manager_, &ValidatorManager::get_block_handle_for_litequery, blkid, std::move(promise)); } bool LiteQuery::request_mc_block_data(BlockIdExt blkid) { @@ -1497,17 +1484,12 @@ void LiteQuery::continue_getTransactions(unsigned remaining, bool exact) { LOG(DEBUG) << "sending get_block_by_lt_from_db() query to manager for " << acc_workchain_ << ":" << acc_addr_.to_hex() << " " << trans_lt_; td::actor::send_closure_later( - manager_, &ValidatorManager::get_block_by_lt_from_db, ton::extract_addr_prefix(acc_workchain_, acc_addr_), + manager_, &ValidatorManager::get_block_by_lt_from_db_for_litequery, ton::extract_addr_prefix(acc_workchain_, acc_addr_), trans_lt_, [Self = actor_id(this), remaining, manager = manager_](td::Result res) { if (res.is_error()) { td::actor::send_closure(Self, &LiteQuery::abort_getTransactions, res.move_as_error(), ton::BlockIdExt{}); } else { auto handle = res.move_as_ok(); - if (!handle->is_applied()) { - td::actor::send_closure(Self, &LiteQuery::abort_getTransactions, td::Status::Error(ErrorCode::notready, "block is not applied"), - ton::BlockIdExt{}); - return; - } LOG(DEBUG) << "requesting data for block " << handle->id().to_str(); td::actor::send_closure_later(manager, &ValidatorManager::get_block_data_from_db, handle, [Self, blkid = handle->id(), remaining](td::Result> res) { @@ -1846,10 +1828,6 @@ void LiteQuery::perform_lookupBlock(BlockId blkid, int mode, LogicalTime lt, Uni td::actor::send_closure(Self, &LiteQuery::abort_query, res.move_as_error()); } else { auto handle = res.move_as_ok(); - if (!handle->is_applied()) { - td::actor::send_closure(Self, &LiteQuery::abort_query, td::Status::Error(ErrorCode::notready, "block is not applied")); - return; - } LOG(DEBUG) << "requesting data for block " << handle->id().to_str(); td::actor::send_closure_later(manager, &ValidatorManager::get_block_data_from_db, handle, [Self, blkid = handle->id(), mode](td::Result> res) { @@ -1865,13 +1843,14 @@ void LiteQuery::perform_lookupBlock(BlockId blkid, int mode, LogicalTime lt, Uni ton::AccountIdPrefixFull pfx{blkid.workchain, blkid.shard}; if (mode & 2) { - td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_lt_from_db, pfx, lt, std::move(P)); + td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_lt_from_db_for_litequery, pfx, lt, + std::move(P)); } else if (mode & 4) { - td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_unix_time_from_db, pfx, utime, + td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_unix_time_from_db_for_litequery, pfx, utime, std::move(P)); } else { - td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_seqno_from_db, pfx, blkid.seqno, - std::move(P)); + td::actor::send_closure_later(manager_, &ValidatorManager::get_block_by_seqno_from_db_for_litequery, pfx, + blkid.seqno, std::move(P)); } } @@ -2629,7 +2608,7 @@ void LiteQuery::perform_getShardBlockProof(BlockIdExt blkid) { } AccountIdPrefixFull pfx{masterchainId, shardIdAll}; td::actor::send_closure_later( - manager, &ValidatorManager::get_block_by_seqno_from_db, pfx, handle->masterchain_ref_block(), + manager, &ValidatorManager::get_block_by_seqno_from_db_for_litequery, pfx, handle->masterchain_ref_block(), [Self, manager](td::Result R) { if (R.is_error()) { td::actor::send_closure(Self, &LiteQuery::abort_query, R.move_as_error()); diff --git a/validator/interfaces/validator-manager.h b/validator/interfaces/validator-manager.h index ae15ed4c..6b375bac 100644 --- a/validator/interfaces/validator-manager.h +++ b/validator/interfaces/validator-manager.h @@ -170,6 +170,14 @@ class ValidatorManager : public ValidatorManagerInterface { virtual void log_validator_session_stats(BlockIdExt block_id, validatorsession::ValidatorSessionStats stats) = 0; + virtual void get_block_handle_for_litequery(BlockIdExt block_id, td::Promise promise) = 0; + virtual void get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt, + td::Promise promise) = 0; + virtual void get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts, + td::Promise promise) = 0; + virtual void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno, + td::Promise promise) = 0; + static bool is_persistent_state(UnixTime ts, UnixTime prev_ts) { return ts / (1 << 17) != prev_ts / (1 << 17); } diff --git a/validator/manager-disk.hpp b/validator/manager-disk.hpp index 1c3f4f5b..4812d60a 100644 --- a/validator/manager-disk.hpp +++ b/validator/manager-disk.hpp @@ -384,6 +384,21 @@ class ValidatorManagerImpl : public ValidatorManager { } td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise)); } + void get_block_handle_for_litequery(BlockIdExt block_id, td::Promise promise) override { + get_block_handle(block_id, false, promise.wrap([](BlockHandle &&handle) -> ConstBlockHandle { return handle; })); + } + void get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt, + td::Promise promise) override { + get_block_by_lt_from_db(account, lt, std::move(promise)); + } + void get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts, + td::Promise promise) override { + get_block_by_unix_time_from_db(account, ts, std::move(promise)); + } + void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno, + td::Promise promise) override { + get_block_by_seqno_from_db(account, seqno, std::move(promise)); + } private: PublicKeyHash local_id_; diff --git a/validator/manager-hardfork.hpp b/validator/manager-hardfork.hpp index 675c2304..c34ae5c7 100644 --- a/validator/manager-hardfork.hpp +++ b/validator/manager-hardfork.hpp @@ -445,6 +445,21 @@ class ValidatorManagerImpl : public ValidatorManager { } td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise)); } + void get_block_handle_for_litequery(BlockIdExt block_id, td::Promise promise) override { + get_block_handle(block_id, false, promise.wrap([](BlockHandle &&handle) -> ConstBlockHandle { return handle; })); + } + void get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt, + td::Promise promise) override { + get_block_by_lt_from_db(account, lt, std::move(promise)); + } + void get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts, + td::Promise promise) override { + get_block_by_unix_time_from_db(account, ts, std::move(promise)); + } + void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno, + td::Promise promise) override { + get_block_by_seqno_from_db(account, seqno, std::move(promise)); + } private: td::Ref opts_; diff --git a/validator/manager.cpp b/validator/manager.cpp index 4bac4a74..dcda7b6e 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -2334,8 +2334,11 @@ void ValidatorManagerImpl::update_shard_client_block_handle(BlockHandle handle, td::Promise promise) { shard_client_handle_ = std::move(handle); auto seqno = shard_client_handle_->id().seqno(); - if (last_liteserver_state_.is_null() || last_liteserver_state_->get_block_id().seqno() < seqno) { - last_liteserver_state_ = std::move(state); + if (state.not_null()) { + shard_client_shards_ = state->get_shards(); + if (last_liteserver_state_.is_null() || last_liteserver_state_->get_block_id().seqno() < seqno) { + last_liteserver_state_ = std::move(state); + } } shard_client_update(seqno); promise.set_value(td::Unit()); @@ -2662,6 +2665,143 @@ void ValidatorManagerImpl::log_validator_session_stats(BlockIdExt block_id, LOG(INFO) << "Writing validator session stats for " << block_id.id; } +void ValidatorManagerImpl::get_block_handle_for_litequery(BlockIdExt block_id, td::Promise promise) { + get_block_handle( + block_id, false, + [SelfId = actor_id(this), block_id, promise = std::move(promise)](td::Result R) mutable { + if (R.is_ok() && R.ok()->is_applied()) { + promise.set_value(R.move_as_ok()); + } else { + td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_block_handle_for_litequery_error, block_id, + std::move(R), std::move(promise)); + } + }); +} + +void ValidatorManagerImpl::get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt, + td::Promise promise) { + get_block_by_lt_from_db( + account, lt, [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { + if (R.is_ok() && R.ok()->is_applied()) { + promise.set_value(R.move_as_ok()); + } else { + td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_lookup_block_for_litequery_error, account, 0, + lt, std::move(R), std::move(promise)); + } + }); +} + +void ValidatorManagerImpl::get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts, + td::Promise promise) { + get_block_by_unix_time_from_db( + account, ts, [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { + if (R.is_ok() && R.ok()->is_applied()) { + promise.set_value(R.move_as_ok()); + } else { + td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_lookup_block_for_litequery_error, account, 1, + ts, std::move(R), std::move(promise)); + } + }); +} + +void ValidatorManagerImpl::get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno, + td::Promise promise) { + get_block_by_seqno_from_db( + account, seqno, + [=, SelfId = actor_id(this), promise = std::move(promise)](td::Result R) mutable { + if (R.is_ok() && R.ok()->is_applied()) { + promise.set_value(R.move_as_ok()); + } else { + td::actor::send_closure(SelfId, &ValidatorManagerImpl::process_lookup_block_for_litequery_error, account, 2, + seqno, std::move(R), std::move(promise)); + } + }); +} + +void ValidatorManagerImpl::process_block_handle_for_litequery_error(BlockIdExt block_id, + td::Result r_handle, + td::Promise promise) { + td::Status err; + if (r_handle.is_error()) { + err = r_handle.move_as_error(); + } else { + auto handle = r_handle.move_as_ok(); + if (handle->is_applied()) { + promise.set_value(std::move(handle)); + return; + } + if (!handle->received() || !handle->received_state()) { + err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << block_id.id.to_str() << " is not in db"); + } else { + err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << block_id.id.to_str() << " is not applied"); + } + } + if (block_id.is_masterchain()) { + if (block_id.seqno() > last_masterchain_seqno_) { + err = err.move_as_error_suffix(PSTRING() << " (last known masterchain block: " << last_masterchain_seqno_ << ")"); + } + } else { + for (auto &shard : shard_client_shards_) { + if (shard_intersects(shard->shard(), block_id.shard_full())) { + if (block_id.seqno() > shard->top_block_id().seqno()) { + err = err.move_as_error_suffix( + PSTRING() << " (possibly out of sync: shard_client_seqno=" + << (shard_client_handle_ ? shard_client_handle_->id().seqno() : 0) << " ls_seqno=" + << (last_liteserver_state_.not_null() ? last_liteserver_state_->get_seqno() : 0) << ")"); + } + break; + } + } + } + promise.set_error(std::move(err)); +} + +void ValidatorManagerImpl::process_lookup_block_for_litequery_error(AccountIdPrefixFull account, int type, + td::uint64 value, + td::Result r_handle, + td::Promise promise) { + td::Status err; + if (r_handle.is_error()) { + err = r_handle.move_as_error(); + } else { + auto handle = r_handle.move_as_ok(); + if (handle->is_applied()) { + promise.set_value(std::move(handle)); + return; + } + if (!handle->received() || !handle->received_state()) { + err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << handle->id().to_str() << " is not in db"); + } else { + err = td::Status::Error(ErrorCode::notready, PSTRING() << "block " << handle->id().to_str() << " is not applied"); + } + } + if (account.is_masterchain()) { + if (value > (type == 0 + ? last_masterchain_state_->get_logical_time() + : (type == 1 ? last_masterchain_state_->get_unix_time() : last_masterchain_state_->get_seqno()))) { + err = err.move_as_error_suffix(PSTRING() << " (last known masterchain block: " << last_masterchain_seqno_ << ")"); + } + } else { + for (auto &shard : shard_client_shards_) { + if (shard_intersects(shard->shard(), account.as_leaf_shard())) { + if (value > (type == 0 ? shard->end_lt() + : (type == 1 ? (shard_client_handle_ ? shard_client_handle_->unix_time() : 0) + : shard->top_block_id().seqno()))) { + err = err.move_as_error_suffix( + PSTRING() << " (possibly out of sync: shard_client_seqno=" + << (shard_client_handle_ ? shard_client_handle_->id().seqno() : 0) << " ls_seqno=" + << (last_liteserver_state_.not_null() ? last_liteserver_state_->get_seqno() : 0) << ")"); + } + break; + } + } + } + static std::string names[3] = {"lt", "utime", "seqno"}; + err = err.move_as_error_prefix(PSTRING() << "cannot find block " << account.to_str() << " " << names[type] << "=" + << value << ": "); + promise.set_error(std::move(err)); +} + td::actor::ActorOwn ValidatorManagerFactory::create( td::Ref opts, std::string db_root, td::actor::ActorId keyring, td::actor::ActorId adnl, td::actor::ActorId rldp, diff --git a/validator/manager.hpp b/validator/manager.hpp index 9f51cc27..bdf0155e 100644 --- a/validator/manager.hpp +++ b/validator/manager.hpp @@ -252,6 +252,7 @@ class ValidatorManagerImpl : public ValidatorManager { BlockHandle last_key_block_handle_; BlockHandle last_known_key_block_handle_; BlockHandle shard_client_handle_; + std::vector> shard_client_shards_; td::Ref last_liteserver_state_; td::Ref do_get_last_liteserver_state(); @@ -561,6 +562,19 @@ class ValidatorManagerImpl : public ValidatorManager { td::actor::send_closure(queue_size_counter_, &QueueSizeCounter::get_queue_size, block_id, std::move(promise)); } + void get_block_handle_for_litequery(BlockIdExt block_id, td::Promise promise) override; + void get_block_by_lt_from_db_for_litequery(AccountIdPrefixFull account, LogicalTime lt, + td::Promise promise) override; + void get_block_by_unix_time_from_db_for_litequery(AccountIdPrefixFull account, UnixTime ts, + td::Promise promise) override; + void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno, + td::Promise promise) override; + void process_block_handle_for_litequery_error(BlockIdExt block_id, td::Result r_handle, + td::Promise promise); + void process_lookup_block_for_litequery_error(AccountIdPrefixFull account, int type, td::uint64 value, + td::Result r_handle, + td::Promise promise); + private: td::Timestamp resend_shard_blocks_at_; td::Timestamp check_waiters_at_;