mirror of
https://github.com/ton-blockchain/ton
synced 2025-03-09 15:40:10 +00:00
Merge branch 'testnet' into block-generation
This commit is contained in:
commit
a2eb3f3631
45 changed files with 1101 additions and 202 deletions
|
@ -77,6 +77,10 @@ class AdnlPeerTableImpl : public AdnlPeerTable {
|
|||
td::actor::ActorId<AdnlChannel> channel) override;
|
||||
void unregister_channel(AdnlChannelIdShort id) override;
|
||||
|
||||
void check_id_exists(AdnlNodeIdShort id, td::Promise<bool> promise) override {
|
||||
promise.set_value(local_ids_.count(id));
|
||||
}
|
||||
|
||||
void write_new_addr_list_to_db(AdnlNodeIdShort local_id, AdnlNodeIdShort peer_id, AdnlDbItem node,
|
||||
td::Promise<td::Unit> promise) override;
|
||||
void get_addr_list_from_db(AdnlNodeIdShort local_id, AdnlNodeIdShort peer_id,
|
||||
|
|
|
@ -212,7 +212,9 @@ void AdnlPeerPairImpl::receive_packet_from_channel(AdnlChannelIdShort id, AdnlPa
|
|||
VLOG(ADNL_NOTICE) << this << ": dropping IN message: outdated channel id" << id;
|
||||
return;
|
||||
}
|
||||
channel_ready_ = true;
|
||||
if (channel_inited_) {
|
||||
channel_ready_ = true;
|
||||
}
|
||||
receive_packet_checked(std::move(packet));
|
||||
}
|
||||
|
||||
|
|
|
@ -97,6 +97,8 @@ class Adnl : public AdnlSenderInterface {
|
|||
virtual void add_id_ex(AdnlNodeIdFull id, AdnlAddressList addr_list, td::uint8 cat, td::uint32 mode) = 0;
|
||||
virtual void del_id(AdnlNodeIdShort id, td::Promise<td::Unit> promise) = 0;
|
||||
|
||||
virtual void check_id_exists(AdnlNodeIdShort id, td::Promise<bool> promise) = 0;
|
||||
|
||||
// subscribe to (some) messages(+queries) to this local id
|
||||
virtual void subscribe(AdnlNodeIdShort dst, std::string prefix, std::unique_ptr<Callback> callback) = 0;
|
||||
virtual void unsubscribe(AdnlNodeIdShort dst, std::string prefix) = 0;
|
||||
|
|
|
@ -401,6 +401,7 @@ if (USE_EMSCRIPTEN)
|
|||
target_link_options(funcfiftlib PRIVATE -sALLOW_MEMORY_GROWTH=1)
|
||||
target_link_options(funcfiftlib PRIVATE -sALLOW_TABLE_GROWTH=1)
|
||||
target_link_options(funcfiftlib PRIVATE --embed-file ${CMAKE_CURRENT_SOURCE_DIR}/fift/lib@/fiftlib)
|
||||
target_link_options(funcfiftlib PRIVATE --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/funcfiftlib/funcfiftlib-prejs.js)
|
||||
target_link_options(funcfiftlib PRIVATE -fexceptions)
|
||||
target_compile_options(funcfiftlib PRIVATE -fexceptions -fno-stack-protector)
|
||||
endif()
|
||||
|
|
1
crypto/funcfiftlib/funcfiftlib-prejs.js
Normal file
1
crypto/funcfiftlib/funcfiftlib-prejs.js
Normal file
|
@ -0,0 +1 @@
|
|||
var crypto = { getRandomValues: function(array) { for (var i = 0; i < array.length; i++) array[i] = (Math.random()*256)|0 } };
|
|
@ -34,29 +34,6 @@
|
|||
#include <sstream>
|
||||
#include <iomanip>
|
||||
|
||||
std::string escape_json(const std::string &s) {
|
||||
std::ostringstream o;
|
||||
for (auto c = s.cbegin(); c != s.cend(); c++) {
|
||||
switch (*c) {
|
||||
case '"': o << "\\\""; break;
|
||||
case '\\': o << "\\\\"; break;
|
||||
case '\b': o << "\\b"; break;
|
||||
case '\f': o << "\\f"; break;
|
||||
case '\n': o << "\\n"; break;
|
||||
case '\r': o << "\\r"; break;
|
||||
case '\t': o << "\\t"; break;
|
||||
default:
|
||||
if ('\x00' <= *c && *c <= '\x1f') {
|
||||
o << "\\u"
|
||||
<< std::hex << std::setw(4) << std::setfill('0') << static_cast<int>(*c);
|
||||
} else {
|
||||
o << *c;
|
||||
}
|
||||
}
|
||||
}
|
||||
return o.str();
|
||||
}
|
||||
|
||||
td::Result<std::string> compile_internal(char *config_json) {
|
||||
TRY_RESULT(input_json, td::json_decode(td::MutableSlice(config_json)))
|
||||
auto &obj = input_json.get_object();
|
||||
|
@ -91,7 +68,7 @@ td::Result<std::string> compile_internal(char *config_json) {
|
|||
auto result_obj = result_json.enter_object();
|
||||
result_obj("status", "ok");
|
||||
result_obj("codeBoc", td::base64_encode(boc));
|
||||
result_obj("fiftCode", escape_json(outs.str()));
|
||||
result_obj("fiftCode", outs.str());
|
||||
result_obj("codeHashHex", code_cell->get_hash().to_hex());
|
||||
result_obj.leave();
|
||||
|
||||
|
|
|
@ -53,7 +53,6 @@ class NewCellStorageStat {
|
|||
bool operator==(const Stat& other) const {
|
||||
return key() == other.key();
|
||||
}
|
||||
Stat(const Stat& other) = default;
|
||||
Stat& operator=(const Stat& other) = default;
|
||||
Stat& operator+=(const Stat& other) {
|
||||
cells += other.cells;
|
||||
|
|
|
@ -48,7 +48,7 @@ if (USE_EMSCRIPTEN)
|
|||
add_executable(emulator-emscripten ${EMULATOR_EMSCRIPTEN_SOURCE})
|
||||
target_link_libraries(emulator-emscripten PUBLIC emulator)
|
||||
target_link_options(emulator-emscripten PRIVATE -sEXPORTED_RUNTIME_METHODS=_malloc,free,UTF8ToString,stringToUTF8,allocate,ALLOC_NORMAL,lengthBytesUTF8)
|
||||
target_link_options(emulator-emscripten PRIVATE -sEXPORTED_FUNCTIONS=_emulate,_free,_run_get_method)
|
||||
target_link_options(emulator-emscripten PRIVATE -sEXPORTED_FUNCTIONS=_emulate,_free,_run_get_method,_create_emulator,_destroy_emulator,_emulate_with_emulator)
|
||||
target_link_options(emulator-emscripten PRIVATE -sEXPORT_NAME=EmulatorModule)
|
||||
target_link_options(emulator-emscripten PRIVATE -sERROR_ON_UNDEFINED_SYMBOLS=0)
|
||||
target_link_options(emulator-emscripten PRIVATE -Oz)
|
||||
|
@ -57,6 +57,7 @@ if (USE_EMSCRIPTEN)
|
|||
target_link_options(emulator-emscripten PRIVATE -sMODULARIZE=1)
|
||||
target_link_options(emulator-emscripten PRIVATE -sENVIRONMENT=web)
|
||||
target_link_options(emulator-emscripten PRIVATE -sFILESYSTEM=0)
|
||||
target_link_options(emulator-emscripten PRIVATE -sALLOW_MEMORY_GROWTH=1)
|
||||
target_link_options(emulator-emscripten PRIVATE -fexceptions)
|
||||
if (USE_EMSCRIPTEN_NO_WASM)
|
||||
target_link_options(emulator-emscripten PRIVATE -sWASM=0)
|
||||
|
|
|
@ -124,9 +124,39 @@ td::Result<GetMethodParams> decode_get_method_params(const char* json) {
|
|||
return params;
|
||||
}
|
||||
|
||||
class NoopLog : public td::LogInterface {
|
||||
public:
|
||||
NoopLog() {
|
||||
}
|
||||
|
||||
void append(td::CSlice new_slice, int log_level) override {
|
||||
}
|
||||
|
||||
void rotate() override {
|
||||
}
|
||||
};
|
||||
|
||||
extern "C" {
|
||||
|
||||
const char *emulate(const char *config, const char* libs, int verbosity, const char* account, const char* message, const char* params) {
|
||||
void* create_emulator(const char *config, int verbosity) {
|
||||
NoopLog logger;
|
||||
|
||||
td::log_interface = &logger;
|
||||
|
||||
SET_VERBOSITY_LEVEL(verbosity_NEVER);
|
||||
return transaction_emulator_create(config, verbosity);
|
||||
}
|
||||
|
||||
void destroy_emulator(void* em) {
|
||||
NoopLog logger;
|
||||
|
||||
td::log_interface = &logger;
|
||||
|
||||
SET_VERBOSITY_LEVEL(verbosity_NEVER);
|
||||
transaction_emulator_destroy(em);
|
||||
}
|
||||
|
||||
const char *emulate_with_emulator(void* em, const char* libs, const char* account, const char* message, const char* params) {
|
||||
StringLog logger;
|
||||
|
||||
td::log_interface = &logger;
|
||||
|
@ -138,8 +168,6 @@ const char *emulate(const char *config, const char* libs, int verbosity, const c
|
|||
}
|
||||
auto decoded_params = decoded_params_res.move_as_ok();
|
||||
|
||||
auto em = transaction_emulator_create(config, verbosity);
|
||||
|
||||
bool rand_seed_set = true;
|
||||
if (decoded_params.rand_seed_hex) {
|
||||
rand_seed_set = transaction_emulator_set_rand_seed(em, decoded_params.rand_seed_hex.unwrap().c_str());
|
||||
|
@ -162,8 +190,6 @@ const char *emulate(const char *config, const char* libs, int verbosity, const c
|
|||
result = transaction_emulator_emulate_transaction(em, account, message);
|
||||
}
|
||||
|
||||
transaction_emulator_destroy(em);
|
||||
|
||||
const char* output = nullptr;
|
||||
{
|
||||
td::JsonBuilder jb;
|
||||
|
@ -178,6 +204,13 @@ const char *emulate(const char *config, const char* libs, int verbosity, const c
|
|||
return output;
|
||||
}
|
||||
|
||||
const char *emulate(const char *config, const char* libs, int verbosity, const char* account, const char* message, const char* params) {
|
||||
auto em = transaction_emulator_create(config, verbosity);
|
||||
auto result = emulate_with_emulator(em, libs, account, message, params);
|
||||
transaction_emulator_destroy(em);
|
||||
return result;
|
||||
}
|
||||
|
||||
const char *run_get_method(const char *params, const char* stack, const char* config) {
|
||||
StringLog logger;
|
||||
|
||||
|
|
|
@ -3642,7 +3642,7 @@ void TestNode::continue_check_validator_load2(std::unique_ptr<TestNode::Validato
|
|||
std::unique_ptr<TestNode::ValidatorLoadInfo> info2, int mode,
|
||||
std::string file_pfx) {
|
||||
LOG(INFO) << "continue_check_validator_load2 for blocks " << info1->blk_id.to_str() << " and "
|
||||
<< info1->blk_id.to_str() << " : requesting block creators data";
|
||||
<< info2->blk_id.to_str() << " : requesting block creators data";
|
||||
td::Status st = info1->unpack_vset();
|
||||
if (st.is_error()) {
|
||||
LOG(ERROR) << "cannot unpack validator set from block " << info1->blk_id.to_str() << " :" << st.move_as_error();
|
||||
|
@ -3740,7 +3740,7 @@ void TestNode::continue_check_validator_load3(std::unique_ptr<TestNode::Validato
|
|||
auto x1 = info2->created[i].first - info1->created[i].first;
|
||||
auto y1 = info2->created[i].second - info1->created[i].second;
|
||||
if (x1 < 0 || y1 < 0 || (x1 | y1) >= (1u << 31)) {
|
||||
LOG(ERROR) << "impossible situation: validator #i created a negative amount of blocks: " << x1
|
||||
LOG(ERROR) << "impossible situation: validator #" << i << " created a negative amount of blocks: " << x1
|
||||
<< " masterchain blocks, " << y1 << " shardchain blocks";
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "td/utils/tl_storers.h"
|
||||
#include "td/utils/crypto.h"
|
||||
#include "crypto/common/bitstring.h"
|
||||
#include <map>
|
||||
|
||||
namespace ton {
|
||||
|
||||
|
@ -129,4 +130,37 @@ td::Bits256 get_tl_object_sha_bits256(const lite_api::Object *T) {
|
|||
return id256;
|
||||
}
|
||||
|
||||
std::string lite_query_name_by_id(int id) {
|
||||
static std::map<int, std::string> names = {
|
||||
{lite_api::liteServer_getMasterchainInfo::ID, "getMasterchainInfo"},
|
||||
{lite_api::liteServer_getMasterchainInfoExt::ID, "getMasterchainInfoExt"},
|
||||
{lite_api::liteServer_getTime::ID, "getTime"},
|
||||
{lite_api::liteServer_getVersion::ID, "getVersion"},
|
||||
{lite_api::liteServer_getBlock::ID, "getBlock"},
|
||||
{lite_api::liteServer_getState::ID, "getState"},
|
||||
{lite_api::liteServer_getBlockHeader::ID, "getBlockHeader"},
|
||||
{lite_api::liteServer_sendMessage::ID, "sendMessage"},
|
||||
{lite_api::liteServer_getAccountState::ID, "getAccountState"},
|
||||
{lite_api::liteServer_getAccountStatePrunned::ID, "getAccountStatePrunned"},
|
||||
{lite_api::liteServer_runSmcMethod::ID, "runSmcMethod"},
|
||||
{lite_api::liteServer_getShardInfo::ID, "getShardInfo"},
|
||||
{lite_api::liteServer_getAllShardsInfo::ID, "getAllShardsInfo"},
|
||||
{lite_api::liteServer_getOneTransaction::ID, "getOneTransaction"},
|
||||
{lite_api::liteServer_getTransactions::ID, "getTransactions"},
|
||||
{lite_api::liteServer_lookupBlock::ID, "lookupBlock"},
|
||||
{lite_api::liteServer_listBlockTransactions::ID, "listBlockTransactions"},
|
||||
{lite_api::liteServer_listBlockTransactionsExt::ID, "listBlockTransactionsExt"},
|
||||
{lite_api::liteServer_getBlockProof::ID, "getBlockProof"},
|
||||
{lite_api::liteServer_getConfigAll::ID, "getConfigAll"},
|
||||
{lite_api::liteServer_getConfigParams::ID, "getConfigParams"},
|
||||
{lite_api::liteServer_getValidatorStats::ID, "getValidatorStats"},
|
||||
{lite_api::liteServer_getLibraries::ID, "getLibraries"},
|
||||
{lite_api::liteServer_getShardBlockProof::ID, "getShardBlockProof"}};
|
||||
auto it = names.find(id);
|
||||
if (it == names.end()) {
|
||||
return "unknown";
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
} // namespace ton
|
||||
|
|
|
@ -46,4 +46,6 @@ template <class Tp, std::enable_if_t<std::is_base_of<lite_api::Object, Tp>::valu
|
|||
td::Bits256 get_tl_object_sha_bits256(const Tp &T) {
|
||||
return get_tl_object_sha_bits256(static_cast<const lite_api::Object *>(&T));
|
||||
}
|
||||
|
||||
std::string lite_query_name_by_id(int id);
|
||||
} // namespace ton
|
||||
|
|
|
@ -397,6 +397,8 @@ tonNode.newShardBlockBroadcast block:tonNode.newShardBlock = tonNode.Broadcast;
|
|||
|
||||
tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId;
|
||||
|
||||
tonNode.privateBlockOverlayId zero_state_file_hash:int256 nodes:(vector int256) = tonNode.PrivateBlockOverlayId;
|
||||
|
||||
tonNode.keyBlocks blocks:(vector tonNode.blockIdExt) incomplete:Bool error:Bool = tonNode.KeyBlocks;
|
||||
|
||||
ton.blockId root_cell_hash:int256 file_hash:int256 = ton.BlockId;
|
||||
|
|
Binary file not shown.
|
@ -355,6 +355,14 @@ struct BlockBroadcast {
|
|||
td::uint32 validator_set_hash;
|
||||
td::BufferSlice data;
|
||||
td::BufferSlice proof;
|
||||
|
||||
BlockBroadcast clone() const {
|
||||
std::vector<BlockSignature> new_signatures;
|
||||
for (const BlockSignature& s : signatures) {
|
||||
new_signatures.emplace_back(s.node, s.signature.clone());
|
||||
}
|
||||
return {block_id, std::move(new_signatures), catchain_seqno, validator_set_hash, data.clone(), proof.clone()};
|
||||
}
|
||||
};
|
||||
|
||||
struct Ed25519_PrivateKey {
|
||||
|
|
|
@ -1424,6 +1424,8 @@ td::Status ValidatorEngine::load_global_config() {
|
|||
validator_options_.write().set_session_logs_file(session_logs_file_);
|
||||
}
|
||||
validator_options_.write().set_celldb_compress_depth(celldb_compress_depth_);
|
||||
validator_options_.write().set_max_open_archive_files(max_open_archive_files_);
|
||||
validator_options_.write().set_archive_preload_period(archive_preload_period_);
|
||||
|
||||
std::vector<ton::BlockIdExt> h;
|
||||
for (auto &x : conf.validator_->hardforks_) {
|
||||
|
@ -4092,6 +4094,23 @@ int main(int argc, char *argv[]) {
|
|||
});
|
||||
return td::Status::OK();
|
||||
});
|
||||
p.add_checked_option(
|
||||
'\0', "max-archive-fd", "limit for a number of open file descriptirs in archive manager. 0 is unlimited (default)",
|
||||
[&](td::Slice s) -> td::Status {
|
||||
TRY_RESULT(v, td::to_integer_safe<size_t>(s));
|
||||
acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_max_open_archive_files, v); });
|
||||
return td::Status::OK();
|
||||
});
|
||||
p.add_checked_option(
|
||||
'\0', "archive-preload-period", "open archive slices for the past X second on startup (default: 0)",
|
||||
[&](td::Slice s) -> td::Status {
|
||||
auto v = td::to_double(s);
|
||||
if (v < 0) {
|
||||
return td::Status::Error("sync-before should be non-negative");
|
||||
}
|
||||
acts.push_back([&x, v]() { td::actor::send_closure(x, &ValidatorEngine::set_archive_preload_period, v); });
|
||||
return td::Status::OK();
|
||||
});
|
||||
auto S = p.run(argc, argv);
|
||||
if (S.is_error()) {
|
||||
LOG(ERROR) << "failed to parse options: " << S.move_as_error();
|
||||
|
|
|
@ -218,6 +218,8 @@ class ValidatorEngine : public td::actor::Actor {
|
|||
double archive_ttl_ = 0;
|
||||
double key_proof_ttl_ = 0;
|
||||
td::uint32 celldb_compress_depth_ = 0;
|
||||
size_t max_open_archive_files_ = 0;
|
||||
double archive_preload_period_ = 0.0;
|
||||
bool read_config_ = false;
|
||||
bool started_keyring_ = false;
|
||||
bool started_ = false;
|
||||
|
@ -281,6 +283,12 @@ class ValidatorEngine : public td::actor::Actor {
|
|||
void set_celldb_compress_depth(td::uint32 value) {
|
||||
celldb_compress_depth_ = value;
|
||||
}
|
||||
void set_max_open_archive_files(size_t value) {
|
||||
max_open_archive_files_ = value;
|
||||
}
|
||||
void set_archive_preload_period(double value) {
|
||||
archive_preload_period_ = value;
|
||||
}
|
||||
void set_not_all_shards() {
|
||||
not_all_shards_ = true;
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ set(VALIDATOR_HEADERS
|
|||
fabric.h
|
||||
interfaces/db.h
|
||||
interfaces/external-message.h
|
||||
interfaces/liteserver.h
|
||||
interfaces/out-msg-queue-proof.h
|
||||
interfaces/proof.h
|
||||
interfaces/shard.h
|
||||
|
@ -146,7 +147,9 @@ set(FULL_NODE_SOURCE
|
|||
full-node-master.h
|
||||
full-node-master.hpp
|
||||
full-node-master.cpp
|
||||
|
||||
full-node-private-overlay.hpp
|
||||
full-node-private-overlay.cpp
|
||||
|
||||
net/download-block.hpp
|
||||
net/download-block.cpp
|
||||
net/download-block-new.hpp
|
||||
|
|
|
@ -55,7 +55,9 @@ std::string PackageId::name() const {
|
|||
}
|
||||
}
|
||||
|
||||
ArchiveManager::ArchiveManager(td::actor::ActorId<RootDb> root, std::string db_root) : db_root_(db_root) {
|
||||
ArchiveManager::ArchiveManager(td::actor::ActorId<RootDb> root, std::string db_root,
|
||||
td::Ref<ValidatorManagerOptions> opts)
|
||||
: db_root_(db_root), opts_(opts) {
|
||||
}
|
||||
|
||||
void ArchiveManager::add_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
|
||||
|
@ -598,9 +600,11 @@ void ArchiveManager::load_package(PackageId id) {
|
|||
}
|
||||
}
|
||||
|
||||
desc.file = td::actor::create_actor<ArchiveSlice>("slice", id.id, id.key, id.temp, false, db_root_);
|
||||
desc.file =
|
||||
td::actor::create_actor<ArchiveSlice>("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get());
|
||||
|
||||
m.emplace(id, std::move(desc));
|
||||
update_permanent_slices();
|
||||
}
|
||||
|
||||
const ArchiveManager::FileDescription *ArchiveManager::get_file_desc(ShardIdFull shard, PackageId id, BlockSeqno seqno,
|
||||
|
@ -631,7 +635,8 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull
|
|||
FileDescription new_desc{id, false};
|
||||
td::mkdir(db_root_ + id.path()).ensure();
|
||||
std::string prefix = PSTRING() << db_root_ << id.path() << id.name();
|
||||
new_desc.file = td::actor::create_actor<ArchiveSlice>("slice", id.id, id.key, id.temp, false, db_root_);
|
||||
new_desc.file =
|
||||
td::actor::create_actor<ArchiveSlice>("slice", id.id, id.key, id.temp, false, db_root_, archive_lru_.get());
|
||||
const FileDescription &desc = f.emplace(id, std::move(new_desc));
|
||||
if (!id.temp) {
|
||||
update_desc(f, desc, shard, seqno, ts, lt);
|
||||
|
@ -673,6 +678,7 @@ const ArchiveManager::FileDescription *ArchiveManager::add_file_desc(ShardIdFull
|
|||
.ensure();
|
||||
}
|
||||
index_->commit_transaction().ensure();
|
||||
update_permanent_slices();
|
||||
return &desc;
|
||||
}
|
||||
|
||||
|
@ -820,6 +826,9 @@ void ArchiveManager::start_up() {
|
|||
td::mkdir(db_root_ + "/archive/states/").ensure();
|
||||
td::mkdir(db_root_ + "/files/").ensure();
|
||||
td::mkdir(db_root_ + "/files/packages/").ensure();
|
||||
if (opts_->get_max_open_archive_files() > 0) {
|
||||
archive_lru_ = td::actor::create_actor<ArchiveLru>("archive_lru", opts_->get_max_open_archive_files());
|
||||
}
|
||||
index_ = std::make_shared<td::RocksDb>(td::RocksDb::open(db_root_ + "/files/globalindex").move_as_ok());
|
||||
std::string value;
|
||||
auto v = index_->get(create_serialize_tl_object<ton_api::db_files_index_key>().as_slice(), value);
|
||||
|
@ -876,10 +885,28 @@ void ArchiveManager::start_up() {
|
|||
}).ensure();
|
||||
|
||||
persistent_state_gc(FileHash::zero());
|
||||
|
||||
double open_since = td::Clocks::system() - opts_->get_archive_preload_period();
|
||||
for (auto it = files_.rbegin(); it != files_.rend(); ++it) {
|
||||
if (it->second.file_actor_id().empty()) {
|
||||
continue;
|
||||
}
|
||||
td::actor::send_closure(it->second.file_actor_id(), &ArchiveSlice::open_files);
|
||||
bool stop = true;
|
||||
for (const auto &first_block : it->second.first_blocks) {
|
||||
if ((double)first_block.second.ts >= open_since) {
|
||||
stop = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (stop) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ArchiveManager::run_gc(UnixTime ts, UnixTime archive_ttl) {
|
||||
auto p = get_temp_package_id_by_unixtime(ts);
|
||||
void ArchiveManager::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) {
|
||||
auto p = get_temp_package_id_by_unixtime(std::max(gc_ts, mc_ts - TEMP_PACKAGES_TTL));
|
||||
std::vector<PackageId> vec;
|
||||
for (auto &x : temp_files_) {
|
||||
if (x.first < p) {
|
||||
|
@ -907,7 +934,7 @@ void ArchiveManager::run_gc(UnixTime ts, UnixTime archive_ttl) {
|
|||
if (it == desc.first_blocks.end()) {
|
||||
continue;
|
||||
}
|
||||
if (it->second.ts < ts - archive_ttl) {
|
||||
if (it->second.ts < gc_ts - archive_ttl) {
|
||||
vec.push_back(f.first);
|
||||
}
|
||||
}
|
||||
|
@ -1200,6 +1227,7 @@ void ArchiveManager::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle han
|
|||
}
|
||||
}
|
||||
}
|
||||
update_permanent_slices();
|
||||
}
|
||||
|
||||
void ArchiveManager::FileMap::shard_index_add(const FileDescription &desc) {
|
||||
|
@ -1298,6 +1326,23 @@ const ArchiveManager::FileDescription *ArchiveManager::FileMap::get_next_file_de
|
|||
return it2->second->deleted ? nullptr : it2->second;
|
||||
}
|
||||
|
||||
void ArchiveManager::update_permanent_slices() {
|
||||
if (archive_lru_.empty()) {
|
||||
return;
|
||||
}
|
||||
std::vector<PackageId> ids;
|
||||
if (!files_.empty()) {
|
||||
ids.push_back(files_.rbegin()->first);
|
||||
}
|
||||
if (!key_files_.empty()) {
|
||||
ids.push_back(key_files_.rbegin()->first);
|
||||
}
|
||||
if (!temp_files_.empty()) {
|
||||
ids.push_back(temp_files_.rbegin()->first);
|
||||
}
|
||||
td::actor::send_closure(archive_lru_, &ArchiveLru::set_permanent_slices, std::move(ids));
|
||||
}
|
||||
|
||||
} // namespace validator
|
||||
|
||||
} // namespace ton
|
||||
|
|
|
@ -28,7 +28,7 @@ class RootDb;
|
|||
|
||||
class ArchiveManager : public td::actor::Actor {
|
||||
public:
|
||||
ArchiveManager(td::actor::ActorId<RootDb> root, std::string db_root);
|
||||
ArchiveManager(td::actor::ActorId<RootDb> root, std::string db_root, td::Ref<ValidatorManagerOptions> opts);
|
||||
|
||||
void add_handle(BlockHandle handle, td::Promise<td::Unit> promise);
|
||||
void update_handle(BlockHandle handle, td::Promise<td::Unit> promise);
|
||||
|
@ -58,7 +58,7 @@ class ArchiveManager : public td::actor::Actor {
|
|||
void truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise);
|
||||
//void truncate_continue(BlockSeqno masterchain_seqno, td::Promise<td::Unit> promise);
|
||||
|
||||
void run_gc(UnixTime ts, UnixTime archive_ttl);
|
||||
void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl);
|
||||
|
||||
/* from LTDB */
|
||||
void get_block_by_unix_time(AccountIdPrefixFull account_id, UnixTime ts, td::Promise<ConstBlockHandle> promise);
|
||||
|
@ -123,6 +123,9 @@ class ArchiveManager : public td::actor::Actor {
|
|||
size_t size() const {
|
||||
return files_.size();
|
||||
}
|
||||
bool empty() const {
|
||||
return files_.empty();
|
||||
}
|
||||
std::map<PackageId, FileDescription>::const_iterator lower_bound(const PackageId &x) const {
|
||||
return files_.lower_bound(x);
|
||||
}
|
||||
|
@ -164,6 +167,7 @@ class ArchiveManager : public td::actor::Actor {
|
|||
void shard_index_del(const FileDescription &desc);
|
||||
};
|
||||
FileMap files_, key_files_, temp_files_;
|
||||
td::actor::ActorOwn<ArchiveLru> archive_lru_;
|
||||
BlockSeqno finalized_up_to_{0};
|
||||
bool async_mode_ = false;
|
||||
bool huge_transaction_started_ = false;
|
||||
|
@ -206,6 +210,7 @@ class ArchiveManager : public td::actor::Actor {
|
|||
void got_gc_masterchain_handle(ConstBlockHandle handle, FileHash hash);
|
||||
|
||||
std::string db_root_;
|
||||
td::Ref<ValidatorManagerOptions> opts_;
|
||||
|
||||
std::shared_ptr<td::KeyValue> index_;
|
||||
|
||||
|
@ -215,6 +220,10 @@ class ArchiveManager : public td::actor::Actor {
|
|||
PackageId get_temp_package_id() const;
|
||||
PackageId get_key_package_id(BlockSeqno seqno) const;
|
||||
PackageId get_temp_package_id_by_unixtime(UnixTime ts) const;
|
||||
|
||||
void update_permanent_slices();
|
||||
|
||||
static const td::uint32 TEMP_PACKAGES_TTL = 86400 * 7;
|
||||
};
|
||||
|
||||
} // namespace validator
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
#include "td/actor/MultiPromise.h"
|
||||
#include "validator/fabric.h"
|
||||
#include "td/db/RocksDb.h"
|
||||
#include "ton/ton-io.hpp"
|
||||
#include "td/utils/port/path.h"
|
||||
#include "common/delay.h"
|
||||
#include "files-async.hpp"
|
||||
|
@ -32,9 +31,16 @@ namespace validator {
|
|||
|
||||
void PackageWriter::append(std::string filename, td::BufferSlice data,
|
||||
td::Promise<std::pair<td::uint64, td::uint64>> promise) {
|
||||
auto offset = package_->append(std::move(filename), std::move(data), !async_mode_);
|
||||
auto size = package_->size();
|
||||
|
||||
td::uint64 offset, size;
|
||||
{
|
||||
auto p = package_.lock();
|
||||
if (!p) {
|
||||
promise.set_error(td::Status::Error("Package is closed"));
|
||||
return;
|
||||
}
|
||||
offset = p->append(std::move(filename), std::move(data), !async_mode_);
|
||||
size = p->size();
|
||||
}
|
||||
promise.set_value(std::pair<td::uint64, td::uint64>{offset, size});
|
||||
}
|
||||
|
||||
|
@ -44,8 +50,10 @@ class PackageReader : public td::actor::Actor {
|
|||
td::Promise<std::pair<std::string, td::BufferSlice>> promise)
|
||||
: package_(std::move(package)), offset_(offset), promise_(std::move(promise)) {
|
||||
}
|
||||
void start_up() {
|
||||
promise_.set_result(package_->read(offset_));
|
||||
void start_up() override {
|
||||
auto result = package_->read(offset_);
|
||||
package_ = {};
|
||||
promise_.set_result(std::move(result));
|
||||
stop();
|
||||
}
|
||||
|
||||
|
@ -64,6 +72,7 @@ void ArchiveSlice::add_handle(BlockHandle handle, td::Promise<td::Unit> promise)
|
|||
update_handle(std::move(handle), std::move(promise));
|
||||
return;
|
||||
}
|
||||
before_query();
|
||||
CHECK(!key_blocks_only_);
|
||||
CHECK(!temp_);
|
||||
CHECK(handle->inited_unix_time());
|
||||
|
@ -146,6 +155,7 @@ void ArchiveSlice::update_handle(BlockHandle handle, td::Promise<td::Unit> promi
|
|||
promise.set_value(td::Unit());
|
||||
return;
|
||||
}
|
||||
before_query();
|
||||
CHECK(!key_blocks_only_);
|
||||
|
||||
begin_transaction();
|
||||
|
@ -168,6 +178,7 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer
|
|||
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
|
||||
return;
|
||||
}
|
||||
before_query();
|
||||
TRY_RESULT_PROMISE(
|
||||
promise, p,
|
||||
choose_package(
|
||||
|
@ -179,6 +190,7 @@ void ArchiveSlice::add_file(BlockHandle handle, FileReference ref_id, td::Buffer
|
|||
promise.set_value(td::Unit());
|
||||
return;
|
||||
}
|
||||
promise = begin_async_query(std::move(promise));
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), idx = p->idx, ref_id, promise = std::move(promise)](
|
||||
td::Result<std::pair<td::uint64, td::uint64>> R) mutable {
|
||||
if (R.is_error()) {
|
||||
|
@ -217,6 +229,7 @@ void ArchiveSlice::get_handle(BlockIdExt block_id, td::Promise<BlockHandle> prom
|
|||
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
|
||||
return;
|
||||
}
|
||||
before_query();
|
||||
CHECK(!key_blocks_only_);
|
||||
std::string value;
|
||||
auto R = kv_->get(get_db_key_block_info(block_id), value);
|
||||
|
@ -239,6 +252,7 @@ void ArchiveSlice::get_temp_handle(BlockIdExt block_id, td::Promise<ConstBlockHa
|
|||
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
|
||||
return;
|
||||
}
|
||||
before_query();
|
||||
CHECK(!key_blocks_only_);
|
||||
std::string value;
|
||||
auto R = kv_->get(get_db_key_block_info(block_id), value);
|
||||
|
@ -261,6 +275,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
|
|||
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
|
||||
return;
|
||||
}
|
||||
before_query();
|
||||
std::string value;
|
||||
auto R = kv_->get(ref_id.hash().to_hex(), value);
|
||||
R.ensure();
|
||||
|
@ -273,6 +288,7 @@ void ArchiveSlice::get_file(ConstBlockHandle handle, FileReference ref_id, td::P
|
|||
promise, p,
|
||||
choose_package(
|
||||
handle ? handle->id().is_masterchain() ? handle->id().seqno() : handle->masterchain_ref_block() : 0, false));
|
||||
promise = begin_async_query(std::move(promise));
|
||||
auto P = td::PromiseCreator::lambda(
|
||||
[promise = std::move(promise)](td::Result<std::pair<std::string, td::BufferSlice>> R) mutable {
|
||||
if (R.is_error()) {
|
||||
|
@ -292,6 +308,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
|
|||
promise.set_error(td::Status::Error(ErrorCode::notready, "package already gc'd"));
|
||||
return;
|
||||
}
|
||||
before_query();
|
||||
bool f = false;
|
||||
BlockIdExt block_id;
|
||||
td::uint32 ls = 0;
|
||||
|
@ -312,7 +329,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
|
|||
auto G = fetch_tl_object<ton_api::db_lt_desc_value>(value, true);
|
||||
G.ensure();
|
||||
auto g = G.move_as_ok();
|
||||
if (compare_desc(*g.get()) > 0) {
|
||||
if (compare_desc(*g) > 0) {
|
||||
continue;
|
||||
}
|
||||
td::uint32 l = g->first_idx_ - 1;
|
||||
|
@ -328,7 +345,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
|
|||
auto E = fetch_tl_object<ton_api::db_lt_el_value>(td::BufferSlice{value}, true);
|
||||
E.ensure();
|
||||
auto e = E.move_as_ok();
|
||||
int cmp_val = compare(*e.get());
|
||||
int cmp_val = compare(*e);
|
||||
|
||||
if (cmp_val < 0) {
|
||||
rseq = create_block_id(e->id_);
|
||||
|
@ -342,9 +359,7 @@ void ArchiveSlice::get_block_common(AccountIdPrefixFull account_id,
|
|||
}
|
||||
}
|
||||
if (rseq.is_valid()) {
|
||||
if (!block_id.is_valid()) {
|
||||
block_id = rseq;
|
||||
} else if (block_id.id.seqno > rseq.id.seqno) {
|
||||
if (!block_id.is_valid() || block_id.id.seqno > rseq.id.seqno) {
|
||||
block_id = rseq;
|
||||
}
|
||||
}
|
||||
|
@ -430,12 +445,15 @@ void ArchiveSlice::get_slice(td::uint64 archive_id, td::uint64 offset, td::uint3
|
|||
promise.set_error(td::Status::Error(ErrorCode::error, "bad archive id"));
|
||||
return;
|
||||
}
|
||||
before_query();
|
||||
auto value = static_cast<td::uint32>(archive_id >> 32);
|
||||
TRY_RESULT_PROMISE(promise, p, choose_package(value, false));
|
||||
promise = begin_async_query(std::move(promise));
|
||||
td::actor::create_actor<db::ReadFile>("readfile", p->path, offset, limit, 0, std::move(promise)).release();
|
||||
}
|
||||
|
||||
void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::uint64> promise) {
|
||||
before_query();
|
||||
if (!sliced_mode_) {
|
||||
promise.set_result(archive_id_);
|
||||
} else {
|
||||
|
@ -444,62 +462,111 @@ void ArchiveSlice::get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::
|
|||
}
|
||||
}
|
||||
|
||||
void ArchiveSlice::start_up() {
|
||||
PackageId p_id{archive_id_, key_blocks_only_, temp_};
|
||||
std::string db_path = PSTRING() << db_root_ << p_id.path() << p_id.name() << ".index";
|
||||
kv_ = std::make_shared<td::RocksDb>(td::RocksDb::open(db_path).move_as_ok());
|
||||
void ArchiveSlice::before_query() {
|
||||
if (status_ == st_closed) {
|
||||
LOG(DEBUG) << "Opening archive slice " << db_path_;
|
||||
kv_ = std::make_unique<td::RocksDb>(td::RocksDb::open(db_path_).move_as_ok());
|
||||
std::string value;
|
||||
auto R2 = kv_->get("status", value);
|
||||
R2.ensure();
|
||||
sliced_mode_ = false;
|
||||
slice_size_ = 100;
|
||||
|
||||
std::string value;
|
||||
auto R2 = kv_->get("status", value);
|
||||
R2.ensure();
|
||||
|
||||
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
|
||||
if (value == "sliced") {
|
||||
sliced_mode_ = true;
|
||||
R2 = kv_->get("slices", value);
|
||||
R2.ensure();
|
||||
auto tot = td::to_integer<td::uint32>(value);
|
||||
R2 = kv_->get("slice_size", value);
|
||||
R2.ensure();
|
||||
slice_size_ = td::to_integer<td::uint32>(value);
|
||||
CHECK(slice_size_ > 0);
|
||||
for (td::uint32 i = 0; i < tot; i++) {
|
||||
R2 = kv_->get(PSTRING() << "status." << i, value);
|
||||
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
|
||||
if (value == "sliced") {
|
||||
sliced_mode_ = true;
|
||||
R2 = kv_->get("slices", value);
|
||||
R2.ensure();
|
||||
auto len = td::to_integer<td::uint64>(value);
|
||||
R2 = kv_->get(PSTRING() << "version." << i, value);
|
||||
auto tot = td::to_integer<td::uint32>(value);
|
||||
R2 = kv_->get("slice_size", value);
|
||||
R2.ensure();
|
||||
td::uint32 ver = 0;
|
||||
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
|
||||
ver = td::to_integer<td::uint32>(value);
|
||||
slice_size_ = td::to_integer<td::uint32>(value);
|
||||
CHECK(slice_size_ > 0);
|
||||
for (td::uint32 i = 0; i < tot; i++) {
|
||||
R2 = kv_->get(PSTRING() << "status." << i, value);
|
||||
R2.ensure();
|
||||
auto len = td::to_integer<td::uint64>(value);
|
||||
R2 = kv_->get(PSTRING() << "version." << i, value);
|
||||
R2.ensure();
|
||||
td::uint32 ver = 0;
|
||||
if (R2.move_as_ok() == td::KeyValue::GetStatus::Ok) {
|
||||
ver = td::to_integer<td::uint32>(value);
|
||||
}
|
||||
auto v = archive_id_ + slice_size_ * i;
|
||||
add_package(v, len, ver);
|
||||
}
|
||||
auto v = archive_id_ + slice_size_ * i;
|
||||
add_package(v, len, ver);
|
||||
} else {
|
||||
auto len = td::to_integer<td::uint64>(value);
|
||||
add_package(archive_id_, len, 0);
|
||||
}
|
||||
} else {
|
||||
auto len = td::to_integer<td::uint64>(value);
|
||||
add_package(archive_id_, len, 0);
|
||||
if (!temp_ && !key_blocks_only_) {
|
||||
sliced_mode_ = true;
|
||||
kv_->begin_transaction().ensure();
|
||||
kv_->set("status", "sliced").ensure();
|
||||
kv_->set("slices", "1").ensure();
|
||||
kv_->set("slice_size", td::to_string(slice_size_)).ensure();
|
||||
kv_->set("status.0", "0").ensure();
|
||||
kv_->set("version.0", td::to_string(default_package_version())).ensure();
|
||||
kv_->commit_transaction().ensure();
|
||||
add_package(archive_id_, 0, default_package_version());
|
||||
} else {
|
||||
kv_->begin_transaction().ensure();
|
||||
kv_->set("status", "0").ensure();
|
||||
kv_->commit_transaction().ensure();
|
||||
add_package(archive_id_, 0, 0);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (!temp_ && !key_blocks_only_) {
|
||||
sliced_mode_ = true;
|
||||
kv_->begin_transaction().ensure();
|
||||
kv_->set("status", "sliced").ensure();
|
||||
kv_->set("slices", "1").ensure();
|
||||
kv_->set("slice_size", td::to_string(slice_size_)).ensure();
|
||||
kv_->set("status.0", "0").ensure();
|
||||
kv_->set("version.0", td::to_string(default_package_version())).ensure();
|
||||
kv_->commit_transaction().ensure();
|
||||
add_package(archive_id_, 0, default_package_version());
|
||||
}
|
||||
status_ = st_open;
|
||||
if (!archive_lru_.empty()) {
|
||||
td::actor::send_closure(archive_lru_, &ArchiveLru::on_query, actor_id(this), p_id_,
|
||||
packages_.size() + ESTIMATED_DB_OPEN_FILES);
|
||||
}
|
||||
}
|
||||
|
||||
void ArchiveSlice::open_files() {
|
||||
before_query();
|
||||
}
|
||||
|
||||
void ArchiveSlice::close_files() {
|
||||
if (status_ == st_open) {
|
||||
if (active_queries_ == 0) {
|
||||
do_close();
|
||||
} else {
|
||||
kv_->begin_transaction().ensure();
|
||||
kv_->set("status", "0").ensure();
|
||||
kv_->commit_transaction().ensure();
|
||||
add_package(archive_id_, 0, 0);
|
||||
status_ = st_want_close;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ArchiveSlice::do_close() {
|
||||
if (destroyed_) {
|
||||
return;
|
||||
}
|
||||
CHECK(status_ != st_closed && active_queries_ == 0);
|
||||
LOG(DEBUG) << "Closing archive slice " << db_path_;
|
||||
status_ = st_closed;
|
||||
kv_ = {};
|
||||
packages_.clear();
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
td::Promise<T> ArchiveSlice::begin_async_query(td::Promise<T> promise) {
|
||||
++active_queries_;
|
||||
return [SelfId = actor_id(this), promise = std::move(promise)](td::Result<T> R) mutable {
|
||||
td::actor::send_closure(SelfId, &ArchiveSlice::end_async_query);
|
||||
promise.set_result(std::move(R));
|
||||
};
|
||||
}
|
||||
|
||||
void ArchiveSlice::end_async_query() {
|
||||
CHECK(active_queries_ > 0);
|
||||
--active_queries_;
|
||||
if (active_queries_ == 0 && status_ == st_want_close) {
|
||||
do_close();
|
||||
}
|
||||
}
|
||||
|
||||
void ArchiveSlice::begin_transaction() {
|
||||
if (!async_mode_ || !huge_transaction_started_) {
|
||||
kv_->begin_transaction().ensure();
|
||||
|
@ -521,7 +588,7 @@ void ArchiveSlice::commit_transaction() {
|
|||
|
||||
void ArchiveSlice::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
|
||||
async_mode_ = mode;
|
||||
if (!async_mode_ && huge_transaction_started_) {
|
||||
if (!async_mode_ && huge_transaction_started_ && kv_) {
|
||||
kv_->commit_transaction().ensure();
|
||||
huge_transaction_size_ = 0;
|
||||
huge_transaction_started_ = false;
|
||||
|
@ -532,16 +599,20 @@ void ArchiveSlice::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
|
|||
ig.add_promise(std::move(promise));
|
||||
|
||||
for (auto &p : packages_) {
|
||||
td::actor::send_closure(p.writer, &PackageWriter::set_async_mode, mode, std::move(promise));
|
||||
td::actor::send_closure(p.writer, &PackageWriter::set_async_mode, mode, ig.get_promise());
|
||||
}
|
||||
}
|
||||
|
||||
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root)
|
||||
ArchiveSlice::ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root,
|
||||
td::actor::ActorId<ArchiveLru> archive_lru)
|
||||
: archive_id_(archive_id)
|
||||
, key_blocks_only_(key_blocks_only)
|
||||
, temp_(temp)
|
||||
, finalized_(finalized)
|
||||
, db_root_(std::move(db_root)) {
|
||||
, p_id_(archive_id_, key_blocks_only_, temp_)
|
||||
, db_root_(std::move(db_root))
|
||||
, archive_lru_(std::move(archive_lru)) {
|
||||
db_path_ = PSTRING() << db_root_ << p_id_.path() << p_id_.name() << ".index";
|
||||
}
|
||||
|
||||
td::Result<ArchiveSlice::PackageInfo *> ArchiveSlice::choose_package(BlockSeqno masterchain_seqno, bool force) {
|
||||
|
@ -587,7 +658,7 @@ void ArchiveSlice::add_package(td::uint32 seqno, td::uint64 size, td::uint32 ver
|
|||
if (version >= 1) {
|
||||
pack->truncate(size).ensure();
|
||||
}
|
||||
auto writer = td::actor::create_actor<PackageWriter>("writer", pack);
|
||||
auto writer = td::actor::create_actor<PackageWriter>("writer", pack, async_mode_);
|
||||
packages_.emplace_back(std::move(pack), std::move(writer), seqno, path, idx, version);
|
||||
}
|
||||
|
||||
|
@ -611,6 +682,7 @@ void destroy_db(std::string name, td::uint32 attempt, td::Promise<td::Unit> prom
|
|||
} // namespace
|
||||
|
||||
void ArchiveSlice::destroy(td::Promise<td::Unit> promise) {
|
||||
before_query();
|
||||
td::MultiPromise mp;
|
||||
auto ig = mp.init_guard();
|
||||
ig.add_promise(std::move(promise));
|
||||
|
@ -763,6 +835,7 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
|
|||
destroy(std::move(promise));
|
||||
return;
|
||||
}
|
||||
before_query();
|
||||
LOG(INFO) << "TRUNCATE: slice " << archive_id_ << " maxseqno= " << max_masterchain_seqno()
|
||||
<< " truncate_upto=" << masterchain_seqno;
|
||||
if (max_masterchain_seqno() <= masterchain_seqno) {
|
||||
|
@ -819,7 +892,7 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
|
|||
pack->writer.reset();
|
||||
td::unlink(pack->path).ensure();
|
||||
td::rename(pack->path + ".new", pack->path).ensure();
|
||||
pack->writer = td::actor::create_actor<PackageWriter>("writer", new_package);
|
||||
pack->writer = td::actor::create_actor<PackageWriter>("writer", new_package, async_mode_);
|
||||
|
||||
for (auto idx = pack->idx + 1; idx < packages_.size(); idx++) {
|
||||
td::unlink(packages_[idx].path).ensure();
|
||||
|
@ -831,6 +904,61 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handl
|
|||
promise.set_value(td::Unit());
|
||||
}
|
||||
|
||||
static std::tuple<td::uint32, bool, bool> to_tuple(const PackageId &id) {
|
||||
return {id.id, id.temp, id.key};
|
||||
}
|
||||
|
||||
void ArchiveLru::on_query(td::actor::ActorId<ArchiveSlice> slice, PackageId id, size_t files_count) {
|
||||
SliceInfo &info = slices_[to_tuple(id)];
|
||||
if (info.opened_idx != 0) {
|
||||
total_files_ -= info.files_count;
|
||||
lru_.erase(info.opened_idx);
|
||||
}
|
||||
info.actor = std::move(slice);
|
||||
total_files_ += (info.files_count = files_count);
|
||||
info.opened_idx = current_idx_++;
|
||||
if (!info.is_permanent) {
|
||||
lru_.emplace(info.opened_idx, id);
|
||||
}
|
||||
enforce_limit();
|
||||
}
|
||||
|
||||
void ArchiveLru::set_permanent_slices(std::vector<PackageId> ids) {
|
||||
for (auto id : permanent_slices_) {
|
||||
SliceInfo &info = slices_[to_tuple(id)];
|
||||
if (!info.is_permanent) {
|
||||
continue;
|
||||
}
|
||||
info.is_permanent = false;
|
||||
if (info.opened_idx) {
|
||||
lru_.emplace(info.opened_idx, id);
|
||||
}
|
||||
}
|
||||
permanent_slices_ = std::move(ids);
|
||||
for (auto id : permanent_slices_) {
|
||||
SliceInfo &info = slices_[to_tuple(id)];
|
||||
if (info.is_permanent) {
|
||||
continue;
|
||||
}
|
||||
info.is_permanent = true;
|
||||
if (info.opened_idx) {
|
||||
lru_.erase(info.opened_idx);
|
||||
}
|
||||
}
|
||||
enforce_limit();
|
||||
}
|
||||
|
||||
void ArchiveLru::enforce_limit() {
|
||||
while (total_files_ > max_total_files_ && lru_.size() > 1) {
|
||||
auto it = lru_.begin();
|
||||
auto it2 = slices_.find(to_tuple(it->second));
|
||||
lru_.erase(it);
|
||||
total_files_ -= it2->second.files_count;
|
||||
td::actor::send_closure(it2->second.actor, &ArchiveSlice::close_files);
|
||||
it2->second.opened_idx = 0;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace validator
|
||||
|
||||
} // namespace ton
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "validator/interfaces/db.h"
|
||||
#include "package.hpp"
|
||||
#include "fileref.hpp"
|
||||
#include <map>
|
||||
|
||||
namespace ton {
|
||||
|
||||
|
@ -44,7 +45,7 @@ struct PackageId {
|
|||
std::string path() const;
|
||||
std::string name() const;
|
||||
|
||||
bool is_empty() {
|
||||
bool is_empty() const {
|
||||
return id == std::numeric_limits<td::uint32>::max();
|
||||
}
|
||||
static PackageId empty(bool key, bool temp) {
|
||||
|
@ -54,26 +55,33 @@ struct PackageId {
|
|||
|
||||
class PackageWriter : public td::actor::Actor {
|
||||
public:
|
||||
PackageWriter(std::shared_ptr<Package> package) : package_(std::move(package)) {
|
||||
PackageWriter(std::weak_ptr<Package> package, bool async_mode = false)
|
||||
: package_(std::move(package)), async_mode_(async_mode) {
|
||||
}
|
||||
|
||||
void append(std::string filename, td::BufferSlice data, td::Promise<std::pair<td::uint64, td::uint64>> promise);
|
||||
void set_async_mode(bool mode, td::Promise<td::Unit> promise) {
|
||||
async_mode_ = mode;
|
||||
if (!async_mode_) {
|
||||
package_->sync();
|
||||
auto p = package_.lock();
|
||||
if (p) {
|
||||
p->sync();
|
||||
}
|
||||
}
|
||||
promise.set_value(td::Unit());
|
||||
}
|
||||
|
||||
private:
|
||||
std::shared_ptr<Package> package_;
|
||||
std::weak_ptr<Package> package_;
|
||||
bool async_mode_ = false;
|
||||
};
|
||||
|
||||
class ArchiveLru;
|
||||
|
||||
class ArchiveSlice : public td::actor::Actor {
|
||||
public:
|
||||
ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root);
|
||||
ArchiveSlice(td::uint32 archive_id, bool key_blocks_only, bool temp, bool finalized, std::string db_root,
|
||||
td::actor::ActorId<ArchiveLru> archive_lru);
|
||||
|
||||
void get_archive_id(BlockSeqno masterchain_seqno, td::Promise<td::uint64> promise);
|
||||
|
||||
|
@ -95,16 +103,24 @@ class ArchiveSlice : public td::actor::Actor {
|
|||
|
||||
void get_slice(td::uint64 archive_id, td::uint64 offset, td::uint32 limit, td::Promise<td::BufferSlice> promise);
|
||||
|
||||
void start_up() override;
|
||||
void destroy(td::Promise<td::Unit> promise);
|
||||
void truncate(BlockSeqno masterchain_seqno, ConstBlockHandle handle, td::Promise<td::Unit> promise);
|
||||
|
||||
void begin_transaction();
|
||||
void commit_transaction();
|
||||
void set_async_mode(bool mode, td::Promise<td::Unit> promise);
|
||||
|
||||
void open_files();
|
||||
void close_files();
|
||||
|
||||
private:
|
||||
void written_data(BlockHandle handle, td::Promise<td::Unit> promise);
|
||||
void before_query();
|
||||
void do_close();
|
||||
template<typename T>
|
||||
td::Promise<T> begin_async_query(td::Promise<T> promise);
|
||||
void end_async_query();
|
||||
|
||||
void begin_transaction();
|
||||
void commit_transaction();
|
||||
|
||||
void add_file_cont(size_t idx, FileReference ref_id, td::uint64 offset, td::uint64 size,
|
||||
td::Promise<td::Unit> promise);
|
||||
|
||||
|
@ -112,13 +128,14 @@ class ArchiveSlice : public td::actor::Actor {
|
|||
td::BufferSlice get_db_key_lt_desc(ShardIdFull shard);
|
||||
td::BufferSlice get_db_key_lt_el(ShardIdFull shard, td::uint32 idx);
|
||||
td::BufferSlice get_db_key_block_info(BlockIdExt block_id);
|
||||
td::BufferSlice get_lt_from_db(ShardIdFull shard, td::uint32 idx);
|
||||
|
||||
td::uint32 archive_id_;
|
||||
|
||||
bool key_blocks_only_;
|
||||
bool temp_;
|
||||
bool finalized_;
|
||||
PackageId p_id_;
|
||||
std::string db_path_;
|
||||
|
||||
bool destroyed_ = false;
|
||||
bool async_mode_ = false;
|
||||
|
@ -127,8 +144,14 @@ class ArchiveSlice : public td::actor::Actor {
|
|||
td::uint32 huge_transaction_size_ = 0;
|
||||
td::uint32 slice_size_{100};
|
||||
|
||||
enum Status {
|
||||
st_closed, st_open, st_want_close
|
||||
} status_ = st_closed;
|
||||
size_t active_queries_ = 0;
|
||||
|
||||
std::string db_root_;
|
||||
std::shared_ptr<td::KeyValue> kv_;
|
||||
td::actor::ActorId<ArchiveLru> archive_lru_;
|
||||
std::unique_ptr<td::KeyValue> kv_;
|
||||
|
||||
struct PackageInfo {
|
||||
PackageInfo(std::shared_ptr<Package> package, td::actor::ActorOwn<PackageWriter> writer, BlockSeqno id,
|
||||
|
@ -164,6 +187,32 @@ class ArchiveSlice : public td::actor::Actor {
|
|||
static constexpr td::uint32 default_package_version() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
static const size_t ESTIMATED_DB_OPEN_FILES = 5;
|
||||
};
|
||||
|
||||
class ArchiveLru : public td::actor::Actor {
|
||||
public:
|
||||
explicit ArchiveLru(size_t max_total_files) : max_total_files_(max_total_files) {
|
||||
CHECK(max_total_files_ > 0);
|
||||
}
|
||||
void on_query(td::actor::ActorId<ArchiveSlice> slice, PackageId id, size_t files_count);
|
||||
void set_permanent_slices(std::vector<PackageId> ids);
|
||||
private:
|
||||
size_t current_idx_ = 1;
|
||||
struct SliceInfo {
|
||||
td::actor::ActorId<ArchiveSlice> actor;
|
||||
size_t files_count = 0;
|
||||
size_t opened_idx = 0; // 0 - not opened
|
||||
bool is_permanent = false;
|
||||
};
|
||||
std::map<std::tuple<td::uint32, bool, bool>, SliceInfo> slices_;
|
||||
std::map<size_t, PackageId> lru_;
|
||||
size_t total_files_ = 0;
|
||||
size_t max_total_files_ = 0;
|
||||
std::vector<PackageId> permanent_slices_;
|
||||
|
||||
void enforce_limit();
|
||||
};
|
||||
|
||||
} // namespace validator
|
||||
|
|
|
@ -400,7 +400,7 @@ void RootDb::start_up() {
|
|||
cell_db_ = td::actor::create_actor<CellDb>("celldb", actor_id(this), root_path_ + "/celldb/", opts_);
|
||||
state_db_ = td::actor::create_actor<StateDb>("statedb", actor_id(this), root_path_ + "/state/");
|
||||
static_files_db_ = td::actor::create_actor<StaticFilesDb>("staticfilesdb", actor_id(this), root_path_ + "/static/");
|
||||
archive_db_ = td::actor::create_actor<ArchiveManager>("archive", actor_id(this), root_path_);
|
||||
archive_db_ = td::actor::create_actor<ArchiveManager>("archive", actor_id(this), root_path_, opts_);
|
||||
}
|
||||
|
||||
void RootDb::archive(BlockHandle handle, td::Promise<td::Unit> promise) {
|
||||
|
@ -497,6 +497,10 @@ void RootDb::set_async_mode(bool mode, td::Promise<td::Unit> promise) {
|
|||
td::actor::send_closure(archive_db_, &ArchiveManager::set_async_mode, mode, std::move(promise));
|
||||
}
|
||||
|
||||
void RootDb::run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) {
|
||||
td::actor::send_closure(archive_db_, &ArchiveManager::run_gc, mc_ts, gc_ts, archive_ttl);
|
||||
}
|
||||
|
||||
void RootDb::add_persistent_state_description(td::Ref<PersistentStateDescription> desc, td::Promise<td::Unit> promise) {
|
||||
td::actor::send_closure(state_db_, &StateDb::add_persistent_state_description, std::move(desc), std::move(promise));
|
||||
}
|
||||
|
@ -505,10 +509,6 @@ void RootDb::get_persistent_state_descriptions(td::Promise<std::vector<td::Ref<P
|
|||
td::actor::send_closure(state_db_, &StateDb::get_persistent_state_descriptions, std::move(promise));
|
||||
}
|
||||
|
||||
void RootDb::run_gc(UnixTime ts, UnixTime archive_ttl) {
|
||||
td::actor::send_closure(archive_db_, &ArchiveManager::run_gc, ts, archive_ttl);
|
||||
}
|
||||
|
||||
} // namespace validator
|
||||
|
||||
} // namespace ton
|
||||
|
|
|
@ -134,14 +134,13 @@ class RootDb : public Db {
|
|||
td::Promise<td::BufferSlice> promise) override;
|
||||
void set_async_mode(bool mode, td::Promise<td::Unit> promise) override;
|
||||
|
||||
void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) override;
|
||||
void add_persistent_state_description(td::Ref<PersistentStateDescription> desc, td::Promise<td::Unit> promise) override;
|
||||
void get_persistent_state_descriptions(td::Promise<std::vector<td::Ref<PersistentStateDescription>>> promise) override;
|
||||
|
||||
void run_gc(UnixTime ts, UnixTime archive_ttl) override;
|
||||
|
||||
private:
|
||||
td::actor::ActorId<ValidatorManager> validator_manager_;
|
||||
|
||||
std::string root_path_;
|
||||
td::Ref<ValidatorManagerOptions> opts_;
|
||||
|
||||
|
|
|
@ -123,12 +123,14 @@ void WaitBlockState::start() {
|
|||
} else if (!handle_->inited_prev() || (!handle_->inited_proof() && !handle_->inited_proof_link())) {
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle = handle_](td::Result<td::BufferSlice> R) {
|
||||
if (R.is_error()) {
|
||||
delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::start); }, td::Timestamp::in(0.1));
|
||||
delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link); },
|
||||
td::Timestamp::in(0.1));
|
||||
} else {
|
||||
td::actor::send_closure(SelfId, &WaitBlockState::got_proof_link, R.move_as_ok());
|
||||
}
|
||||
});
|
||||
|
||||
waiting_proof_link_ = true;
|
||||
td::actor::send_closure(manager_, &ValidatorManager::send_get_block_proof_link_request, handle_->id(), priority_,
|
||||
std::move(P));
|
||||
} else if (prev_state_.is_null()) {
|
||||
|
@ -147,12 +149,14 @@ void WaitBlockState::start() {
|
|||
} else if (handle_->id().is_masterchain() && !handle_->inited_proof()) {
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), handle = handle_](td::Result<td::BufferSlice> R) {
|
||||
if (R.is_error()) {
|
||||
delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::start); }, td::Timestamp::in(0.1));
|
||||
delay_action([SelfId]() { td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof); },
|
||||
td::Timestamp::in(0.1));
|
||||
} else {
|
||||
td::actor::send_closure(SelfId, &WaitBlockState::got_proof, R.move_as_ok());
|
||||
}
|
||||
});
|
||||
|
||||
waiting_proof_ = true;
|
||||
td::actor::send_closure(manager_, &ValidatorManager::send_get_block_proof_request, handle_->id(), priority_,
|
||||
std::move(P));
|
||||
} else if (block_.is_null()) {
|
||||
|
@ -186,6 +190,9 @@ void WaitBlockState::got_prev_state(td::Ref<ShardState> state) {
|
|||
}
|
||||
|
||||
void WaitBlockState::got_proof_link(td::BufferSlice data) {
|
||||
if (!waiting_proof_link_) {
|
||||
return;
|
||||
}
|
||||
auto R = create_proof_link(handle_->id(), std::move(data));
|
||||
if (R.is_error()) {
|
||||
LOG(INFO) << "received bad proof link: " << R.move_as_error();
|
||||
|
@ -196,22 +203,25 @@ void WaitBlockState::got_proof_link(td::BufferSlice data) {
|
|||
if (R.is_ok()) {
|
||||
auto h = R.move_as_ok();
|
||||
CHECK(h->inited_prev());
|
||||
td::actor::send_closure(SelfId, &WaitBlockState::start);
|
||||
td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link);
|
||||
} else {
|
||||
LOG(INFO) << "received bad proof link: " << R.move_as_error();
|
||||
td::actor::send_closure(SelfId, &WaitBlockState::start);
|
||||
td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof_link);
|
||||
}
|
||||
});
|
||||
run_check_proof_link_query(handle_->id(), R.move_as_ok(), manager_, timeout_, std::move(P));
|
||||
}
|
||||
|
||||
void WaitBlockState::got_proof(td::BufferSlice data) {
|
||||
if (!waiting_proof_) {
|
||||
return;
|
||||
}
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::Unit> R) {
|
||||
if (R.is_ok()) {
|
||||
td::actor::send_closure(SelfId, &WaitBlockState::start);
|
||||
td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof);
|
||||
} else {
|
||||
LOG(INFO) << "received bad proof link: " << R.move_as_error();
|
||||
td::actor::send_closure(SelfId, &WaitBlockState::start);
|
||||
td::actor::send_closure(SelfId, &WaitBlockState::after_get_proof);
|
||||
}
|
||||
});
|
||||
td::actor::send_closure(manager_, &ValidatorManager::validate_block_proof, handle_->id(), std::move(data),
|
||||
|
|
|
@ -47,11 +47,9 @@ class WaitBlockState : public td::actor::Actor {
|
|||
void force_read_from_db();
|
||||
|
||||
void start_up() override;
|
||||
void got_block_handle(BlockHandle handle);
|
||||
void start();
|
||||
void got_state_from_db(td::Ref<ShardState> data);
|
||||
void got_state_from_static_file(td::Ref<ShardState> state, td::BufferSlice data);
|
||||
void failed_to_get_state_from_db(td::Status reason);
|
||||
void got_prev_state(td::Ref<ShardState> state);
|
||||
void failed_to_get_prev_state(td::Status reason);
|
||||
void got_block_data(td::Ref<BlockData> data);
|
||||
|
@ -70,6 +68,22 @@ class WaitBlockState : public td::actor::Actor {
|
|||
priority_ = priority;
|
||||
}
|
||||
|
||||
// These two methods can be called from ValidatorManagerImpl::written_handle
|
||||
void after_get_proof_link() {
|
||||
if (!waiting_proof_link_) {
|
||||
return;
|
||||
}
|
||||
waiting_proof_link_ = false;
|
||||
start();
|
||||
}
|
||||
void after_get_proof() {
|
||||
if (!waiting_proof_) {
|
||||
return;
|
||||
}
|
||||
waiting_proof_ = false;
|
||||
start();
|
||||
}
|
||||
|
||||
private:
|
||||
BlockHandle handle_;
|
||||
|
||||
|
@ -84,6 +98,8 @@ class WaitBlockState : public td::actor::Actor {
|
|||
td::Ref<BlockData> block_;
|
||||
|
||||
bool reading_from_db_ = false;
|
||||
bool waiting_proof_link_ = false;
|
||||
bool waiting_proof_ = false;
|
||||
td::Timestamp next_static_file_attempt_;
|
||||
|
||||
td::PerfWarningTimer perf_timer_{"waitstate", 1.0};
|
||||
|
|
175
validator/full-node-private-overlay.cpp
Normal file
175
validator/full-node-private-overlay.cpp
Normal file
|
@ -0,0 +1,175 @@
|
|||
/*
|
||||
This file is part of TON Blockchain Library.
|
||||
|
||||
TON Blockchain Library is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation, either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
TON Blockchain Library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "full-node-private-overlay.hpp"
|
||||
#include "ton/ton-tl.hpp"
|
||||
#include "common/delay.h"
|
||||
|
||||
namespace ton {
|
||||
|
||||
namespace validator {
|
||||
|
||||
namespace fullnode {
|
||||
|
||||
void FullNodePrivateOverlay::process_broadcast(PublicKeyHash, ton_api::tonNode_blockBroadcast &query) {
|
||||
std::vector<BlockSignature> signatures;
|
||||
for (auto &sig : query.signatures_) {
|
||||
signatures.emplace_back(BlockSignature{sig->who_, std::move(sig->signature_)});
|
||||
}
|
||||
|
||||
BlockIdExt block_id = create_block_id(query.id_);
|
||||
BlockBroadcast B{block_id,
|
||||
std::move(signatures),
|
||||
static_cast<UnixTime>(query.catchain_seqno_),
|
||||
static_cast<td::uint32>(query.validator_set_hash_),
|
||||
std::move(query.data_),
|
||||
std::move(query.proof_)};
|
||||
|
||||
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
|
||||
if (R.is_error()) {
|
||||
if (R.error().code() == ErrorCode::notready) {
|
||||
LOG(DEBUG) << "dropped broadcast: " << R.move_as_error();
|
||||
} else {
|
||||
LOG(INFO) << "dropped broadcast: " << R.move_as_error();
|
||||
}
|
||||
}
|
||||
});
|
||||
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, std::move(B),
|
||||
std::move(P));
|
||||
}
|
||||
|
||||
void FullNodePrivateOverlay::process_broadcast(PublicKeyHash, ton_api::tonNode_newShardBlockBroadcast &query) {
|
||||
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_shard_block,
|
||||
create_block_id(query.block_->block_), query.block_->cc_seqno_,
|
||||
std::move(query.block_->data_));
|
||||
}
|
||||
|
||||
void FullNodePrivateOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
|
||||
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
|
||||
if (B.is_error()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
|
||||
}
|
||||
|
||||
void FullNodePrivateOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) {
|
||||
if (!inited_) {
|
||||
return;
|
||||
}
|
||||
auto B = create_serialize_tl_object<ton_api::tonNode_newShardBlockBroadcast>(
|
||||
create_tl_object<ton_api::tonNode_newShardBlock>(create_tl_block_id(block_id), cc_seqno, std::move(data)));
|
||||
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
|
||||
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
|
||||
local_id_.pubkey_hash(), 0, std::move(B));
|
||||
} else {
|
||||
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
|
||||
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
|
||||
}
|
||||
}
|
||||
|
||||
void FullNodePrivateOverlay::send_broadcast(BlockBroadcast broadcast) {
|
||||
if (!inited_) {
|
||||
return;
|
||||
}
|
||||
std::vector<tl_object_ptr<ton_api::tonNode_blockSignature>> sigs;
|
||||
for (auto &sig : broadcast.signatures) {
|
||||
sigs.emplace_back(create_tl_object<ton_api::tonNode_blockSignature>(sig.node, sig.signature.clone()));
|
||||
}
|
||||
auto B = create_serialize_tl_object<ton_api::tonNode_blockBroadcast>(
|
||||
create_tl_block_id(broadcast.block_id), broadcast.catchain_seqno, broadcast.validator_set_hash, std::move(sigs),
|
||||
broadcast.proof.clone(), broadcast.data.clone());
|
||||
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
|
||||
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
|
||||
}
|
||||
|
||||
void FullNodePrivateOverlay::start_up() {
|
||||
std::sort(nodes_.begin(), nodes_.end());
|
||||
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
|
||||
|
||||
std::vector<td::Bits256> nodes;
|
||||
for (const adnl::AdnlNodeIdShort &id : nodes_) {
|
||||
nodes.push_back(id.bits256_value());
|
||||
}
|
||||
auto X = create_hash_tl_object<ton_api::tonNode_privateBlockOverlayId>(zero_state_file_hash_, std::move(nodes));
|
||||
td::BufferSlice b{32};
|
||||
b.as_slice().copy_from(as_slice(X));
|
||||
overlay_id_full_ = overlay::OverlayIdFull{std::move(b)};
|
||||
overlay_id_ = overlay_id_full_.compute_short_id();
|
||||
|
||||
try_init();
|
||||
}
|
||||
|
||||
void FullNodePrivateOverlay::try_init() {
|
||||
// Sometimes adnl id is added to validator engine later (or not at all)
|
||||
td::actor::send_closure(
|
||||
adnl_, &adnl::Adnl::check_id_exists, local_id_, [SelfId = actor_id(this)](td::Result<bool> R) {
|
||||
if (R.is_ok() && R.ok()) {
|
||||
td::actor::send_closure(SelfId, &FullNodePrivateOverlay::init);
|
||||
} else {
|
||||
delay_action([SelfId]() { td::actor::send_closure(SelfId, &FullNodePrivateOverlay::try_init); },
|
||||
td::Timestamp::in(30.0));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void FullNodePrivateOverlay::init() {
|
||||
LOG(FULL_NODE_INFO) << "Creating private block overlay for adnl id " << local_id_ << " : " << nodes_.size()
|
||||
<< " nodes";
|
||||
class Callback : public overlay::Overlays::Callback {
|
||||
public:
|
||||
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
|
||||
}
|
||||
void receive_query(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
|
||||
td::Promise<td::BufferSlice> promise) override {
|
||||
}
|
||||
void receive_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
|
||||
td::actor::send_closure(node_, &FullNodePrivateOverlay::receive_broadcast, src, std::move(data));
|
||||
}
|
||||
void check_broadcast(PublicKeyHash src, overlay::OverlayIdShort overlay_id, td::BufferSlice data,
|
||||
td::Promise<td::Unit> promise) override {
|
||||
}
|
||||
Callback(td::actor::ActorId<FullNodePrivateOverlay> node) : node_(node) {
|
||||
}
|
||||
|
||||
private:
|
||||
td::actor::ActorId<FullNodePrivateOverlay> node_;
|
||||
};
|
||||
|
||||
overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(),
|
||||
overlay::CertificateFlags::AllowFec | overlay::CertificateFlags::Trusted,
|
||||
{}};
|
||||
td::actor::send_closure(overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(),
|
||||
nodes_, std::make_unique<Callback>(actor_id(this)), rules);
|
||||
|
||||
td::actor::send_closure(rldp_, &rldp::Rldp::add_id, local_id_);
|
||||
td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, local_id_);
|
||||
inited_ = true;
|
||||
}
|
||||
|
||||
void FullNodePrivateOverlay::tear_down() {
|
||||
if (inited_) {
|
||||
td::actor::send_closure(overlays_, &ton::overlay::Overlays::delete_overlay, local_id_, overlay_id_);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace fullnode
|
||||
|
||||
} // namespace validator
|
||||
|
||||
} // namespace ton
|
86
validator/full-node-private-overlay.hpp
Normal file
86
validator/full-node-private-overlay.hpp
Normal file
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
This file is part of TON Blockchain Library.
|
||||
|
||||
TON Blockchain Library is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation, either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
TON Blockchain Library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "full-node.h"
|
||||
|
||||
namespace ton {
|
||||
|
||||
namespace validator {
|
||||
|
||||
namespace fullnode {
|
||||
|
||||
class FullNodePrivateOverlay : public td::actor::Actor {
|
||||
public:
|
||||
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query);
|
||||
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query);
|
||||
template <class T>
|
||||
void process_broadcast(PublicKeyHash, T &) {
|
||||
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
|
||||
}
|
||||
void receive_broadcast(PublicKeyHash src, td::BufferSlice query);
|
||||
|
||||
void send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data);
|
||||
void send_broadcast(BlockBroadcast broadcast);
|
||||
|
||||
void start_up() override;
|
||||
void tear_down() override;
|
||||
|
||||
FullNodePrivateOverlay(adnl::AdnlNodeIdShort local_id, std::vector<adnl::AdnlNodeIdShort> nodes,
|
||||
FileHash zero_state_file_hash, FullNodeConfig config,
|
||||
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
|
||||
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
|
||||
td::actor::ActorId<overlay::Overlays> overlays,
|
||||
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
|
||||
: local_id_(local_id)
|
||||
, nodes_(std::move(nodes))
|
||||
, zero_state_file_hash_(zero_state_file_hash)
|
||||
, config_(config)
|
||||
, keyring_(keyring)
|
||||
, adnl_(adnl)
|
||||
, rldp_(rldp)
|
||||
, rldp2_(rldp2)
|
||||
, overlays_(overlays)
|
||||
, validator_manager_(validator_manager) {
|
||||
}
|
||||
|
||||
private:
|
||||
adnl::AdnlNodeIdShort local_id_;
|
||||
std::vector<adnl::AdnlNodeIdShort> nodes_;
|
||||
FileHash zero_state_file_hash_;
|
||||
FullNodeConfig config_;
|
||||
|
||||
td::actor::ActorId<keyring::Keyring> keyring_;
|
||||
td::actor::ActorId<adnl::Adnl> adnl_;
|
||||
td::actor::ActorId<rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<rldp2::Rldp> rldp2_;
|
||||
td::actor::ActorId<overlay::Overlays> overlays_;
|
||||
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
|
||||
|
||||
bool inited_ = false;
|
||||
overlay::OverlayIdFull overlay_id_full_;
|
||||
overlay::OverlayIdShort overlay_id_;
|
||||
|
||||
void try_init();
|
||||
void init();
|
||||
};
|
||||
|
||||
} // namespace fullnode
|
||||
|
||||
} // namespace validator
|
||||
|
||||
} // namespace ton
|
|
@ -111,18 +111,13 @@ void FullNodeShardImpl::create_overlay() {
|
|||
private:
|
||||
td::actor::ActorId<FullNodeShardImpl> node_;
|
||||
};
|
||||
if (is_active()) {
|
||||
td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay, adnl_id_, overlay_id_full_.clone(),
|
||||
std::make_unique<Callback>(actor_id(this)), rules_,
|
||||
PSTRING() << "{ \"type\": \"shard\", \"shard_id\": " << get_shard()
|
||||
<< ", \"workchain_id\": " << get_workchain() << " }");
|
||||
} else {
|
||||
td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay_ex, adnl_id_, overlay_id_full_.clone(),
|
||||
std::make_unique<Callback>(actor_id(this)), rules_,
|
||||
PSTRING() << "{ \"type\": \"shard\", \"shard_id\": " << get_shard()
|
||||
<< ", \"workchain_id\": " << get_workchain() << " }",
|
||||
false);
|
||||
}
|
||||
overlay::OverlayOptions opts;
|
||||
opts.announce_self_ = is_active();
|
||||
td::actor::send_closure(overlays_, &overlay::Overlays::create_public_overlay_ex, adnl_id_, overlay_id_full_.clone(),
|
||||
std::make_unique<Callback>(actor_id(this)), rules_,
|
||||
PSTRING() << "{ \"type\": \"shard\", \"shard_id\": " << get_shard()
|
||||
<< ", \"workchain_id\": " << get_workchain() << " }",
|
||||
opts);
|
||||
|
||||
td::actor::send_closure(rldp_, &rldp::Rldp::add_id, adnl_id_);
|
||||
td::actor::send_closure(rldp2_, &rldp2::Rldp::add_id, adnl_id_);
|
||||
|
@ -145,6 +140,9 @@ void FullNodeShardImpl::check_broadcast(PublicKeyHash src, td::BufferSlice broad
|
|||
}
|
||||
|
||||
auto q = B.move_as_ok();
|
||||
if (!processed_ext_msg_broadcasts_.insert(td::sha256_bits256(q->message_->data_)).second) {
|
||||
return promise.set_error(td::Status::Error("duplicate external message broadcast"));
|
||||
}
|
||||
if (config_.ext_messages_broadcast_disabled_) {
|
||||
promise.set_error(td::Status::Error("rebroadcasting external messages is disabled"));
|
||||
promise = [manager = validator_manager_, message = q->message_->data_.clone()](td::Result<td::Unit> R) mutable {
|
||||
|
@ -827,6 +825,9 @@ void FullNodeShardImpl::send_external_message(td::BufferSlice data) {
|
|||
});
|
||||
return;
|
||||
}
|
||||
if (!processed_ext_msg_broadcasts_.insert(td::sha256_bits256(data)).second) {
|
||||
return;
|
||||
}
|
||||
auto B = create_serialize_tl_object<ton_api::tonNode_externalMessageBroadcast>(
|
||||
create_tl_object<ton_api::tonNode_externalMessage>(std::move(data)));
|
||||
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
|
||||
|
@ -1017,10 +1018,15 @@ void FullNodeShardImpl::alarm() {
|
|||
update_certificate_at_ = td::Timestamp::never();
|
||||
}
|
||||
}
|
||||
if (cleanup_processed_ext_msg_at_ && cleanup_processed_ext_msg_at_.is_in_past()) {
|
||||
processed_ext_msg_broadcasts_.clear();
|
||||
cleanup_processed_ext_msg_at_ = td::Timestamp::in(60.0);
|
||||
}
|
||||
alarm_timestamp().relax(sync_completed_at_);
|
||||
alarm_timestamp().relax(update_certificate_at_);
|
||||
alarm_timestamp().relax(reload_neighbours_at_);
|
||||
alarm_timestamp().relax(ping_neighbours_at_);
|
||||
alarm_timestamp().relax(cleanup_processed_ext_msg_at_);
|
||||
}
|
||||
|
||||
void FullNodeShardImpl::start_up() {
|
||||
|
@ -1037,8 +1043,8 @@ void FullNodeShardImpl::start_up() {
|
|||
|
||||
reload_neighbours_at_ = td::Timestamp::now();
|
||||
ping_neighbours_at_ = td::Timestamp::now();
|
||||
alarm_timestamp().relax(reload_neighbours_at_);
|
||||
alarm_timestamp().relax(ping_neighbours_at_);
|
||||
cleanup_processed_ext_msg_at_ = td::Timestamp::now();
|
||||
alarm_timestamp().relax(td::Timestamp::now());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "full-node-shard.h"
|
||||
#include "td/actor/PromiseFuture.h"
|
||||
#include "td/utils/port/Poll.h"
|
||||
#include <set>
|
||||
|
||||
namespace ton {
|
||||
|
||||
|
@ -280,6 +281,9 @@ class FullNodeShardImpl : public FullNodeShard {
|
|||
std::vector<adnl::AdnlNodeIdShort> collator_nodes_;
|
||||
|
||||
FullNodeConfig config_;
|
||||
|
||||
std::set<td::Bits256> processed_ext_msg_broadcasts_;
|
||||
td::Timestamp cleanup_processed_ext_msg_at_;
|
||||
};
|
||||
|
||||
} // namespace fullnode
|
||||
|
|
|
@ -53,6 +53,7 @@ void FullNodeImpl::add_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
|
|||
td::actor::send_closure(shard.second.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
|
||||
}
|
||||
}
|
||||
create_private_block_overlay(key);
|
||||
promise.set_value(td::Unit());
|
||||
}
|
||||
|
||||
|
@ -79,6 +80,7 @@ void FullNodeImpl::del_permanent_key(PublicKeyHash key, td::Promise<td::Unit> pr
|
|||
td::actor::send_closure(shard.second.actor, &FullNodeShard::update_validators, all_validators_, sign_cert_by_);
|
||||
}
|
||||
}
|
||||
private_block_overlays_.erase(key);
|
||||
promise.set_value(td::Unit());
|
||||
}
|
||||
|
||||
|
@ -290,6 +292,10 @@ void FullNodeImpl::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_s
|
|||
VLOG(FULL_NODE_WARNING) << "dropping OUT shard block info message to unknown shard";
|
||||
return;
|
||||
}
|
||||
if (!private_block_overlays_.empty()) {
|
||||
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateOverlay::send_shard_block_info,
|
||||
block_id, cc_seqno, data.clone());
|
||||
}
|
||||
td::actor::send_closure(shard, &FullNodeShard::send_shard_block_info, block_id, cc_seqno, std::move(data));
|
||||
}
|
||||
|
||||
|
@ -299,6 +305,10 @@ void FullNodeImpl::send_broadcast(BlockBroadcast broadcast) {
|
|||
VLOG(FULL_NODE_WARNING) << "dropping OUT broadcast to unknown shard";
|
||||
return;
|
||||
}
|
||||
if (!private_block_overlays_.empty()) {
|
||||
td::actor::send_closure(private_block_overlays_.begin()->second, &FullNodePrivateOverlay::send_broadcast,
|
||||
broadcast.clone());
|
||||
}
|
||||
td::actor::send_closure(shard, &FullNodeShard::send_broadcast, std::move(broadcast));
|
||||
}
|
||||
|
||||
|
@ -440,6 +450,7 @@ void FullNodeImpl::got_key_block_proof(td::Ref<ProofLink> proof) {
|
|||
|
||||
PublicKeyHash l = PublicKeyHash::zero();
|
||||
std::vector<PublicKeyHash> keys;
|
||||
std::map<PublicKeyHash, adnl::AdnlNodeIdShort> current_validators;
|
||||
for (td::int32 i = -1; i <= 1; i++) {
|
||||
auto r = config->get_total_validator_set(i < 0 ? i : 1 - i);
|
||||
if (r.not_null()) {
|
||||
|
@ -450,10 +461,18 @@ void FullNodeImpl::got_key_block_proof(td::Ref<ProofLink> proof) {
|
|||
if (local_keys_.count(key)) {
|
||||
l = key;
|
||||
}
|
||||
if (i == 1) {
|
||||
current_validators[key] = adnl::AdnlNodeIdShort{el.addr.is_zero() ? key.bits256_value() : el.addr};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (current_validators != current_validators_) {
|
||||
current_validators_ = std::move(current_validators);
|
||||
update_private_block_overlays();
|
||||
}
|
||||
|
||||
if (keys == all_validators_) {
|
||||
return;
|
||||
}
|
||||
|
@ -474,6 +493,7 @@ void FullNodeImpl::got_zero_block_state(td::Ref<ShardState> state) {
|
|||
|
||||
PublicKeyHash l = PublicKeyHash::zero();
|
||||
std::vector<PublicKeyHash> keys;
|
||||
std::map<PublicKeyHash, adnl::AdnlNodeIdShort> current_validators;
|
||||
for (td::int32 i = -1; i <= 1; i++) {
|
||||
auto r = m->get_total_validator_set(i < 0 ? i : 1 - i);
|
||||
if (r.not_null()) {
|
||||
|
@ -484,10 +504,18 @@ void FullNodeImpl::got_zero_block_state(td::Ref<ShardState> state) {
|
|||
if (local_keys_.count(key)) {
|
||||
l = key;
|
||||
}
|
||||
if (i == 1) {
|
||||
current_validators[key] = adnl::AdnlNodeIdShort{el.addr.is_zero() ? key.bits256_value() : el.addr};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (current_validators != current_validators_) {
|
||||
current_validators_ = std::move(current_validators);
|
||||
update_private_block_overlays();
|
||||
}
|
||||
|
||||
if (keys == all_validators_) {
|
||||
return;
|
||||
}
|
||||
|
@ -615,6 +643,29 @@ void FullNodeImpl::start_up() {
|
|||
std::make_unique<Callback>(actor_id(this)), std::move(started_promise_));
|
||||
}
|
||||
|
||||
void FullNodeImpl::update_private_block_overlays() {
|
||||
private_block_overlays_.clear();
|
||||
if (local_keys_.empty()) {
|
||||
return;
|
||||
}
|
||||
for (const auto &key : local_keys_) {
|
||||
create_private_block_overlay(key);
|
||||
}
|
||||
}
|
||||
|
||||
void FullNodeImpl::create_private_block_overlay(PublicKeyHash key) {
|
||||
CHECK(local_keys_.count(key));
|
||||
if (current_validators_.count(key)) {
|
||||
std::vector<adnl::AdnlNodeIdShort> nodes;
|
||||
for (const auto &p : current_validators_) {
|
||||
nodes.push_back(p.second);
|
||||
}
|
||||
private_block_overlays_[key] = td::actor::create_actor<FullNodePrivateOverlay>(
|
||||
"BlocksPrivateOverlay", current_validators_[key], std::move(nodes), zero_state_file_hash_, config_, keyring_,
|
||||
adnl_, rldp_, rldp2_, overlays_, validator_manager_);
|
||||
}
|
||||
}
|
||||
|
||||
FullNodeImpl::FullNodeImpl(PublicKeyHash local_id, adnl::AdnlNodeIdShort adnl_id, FileHash zero_state_file_hash,
|
||||
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring,
|
||||
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
//#include "ton-node-slave.h"
|
||||
#include "interfaces/proof.h"
|
||||
#include "interfaces/shard.h"
|
||||
#include "full-node-private-overlay.hpp"
|
||||
|
||||
#include <map>
|
||||
#include <set>
|
||||
|
@ -125,6 +126,7 @@ class FullNodeImpl : public FullNode {
|
|||
|
||||
PublicKeyHash sign_cert_by_;
|
||||
std::vector<PublicKeyHash> all_validators_;
|
||||
std::map<PublicKeyHash, adnl::AdnlNodeIdShort> current_validators_;
|
||||
|
||||
std::set<PublicKeyHash> local_keys_;
|
||||
|
||||
|
@ -132,6 +134,11 @@ class FullNodeImpl : public FullNode {
|
|||
bool collators_inited_ = false;
|
||||
block::CollatorConfig collator_config_;
|
||||
FullNodeConfig config_;
|
||||
|
||||
std::map<PublicKeyHash, td::actor::ActorOwn<FullNodePrivateOverlay>> private_block_overlays_;
|
||||
|
||||
void update_private_block_overlays();
|
||||
void create_private_block_overlay(PublicKeyHash key);
|
||||
};
|
||||
|
||||
} // namespace fullnode
|
||||
|
|
|
@ -32,6 +32,7 @@ set(TON_VALIDATOR_SOURCE
|
|||
external-message.hpp
|
||||
ihr-message.hpp
|
||||
liteserver.hpp
|
||||
liteserver-cache.hpp
|
||||
message-queue.hpp
|
||||
out-msg-queue-proof.hpp
|
||||
proof.hpp
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#include "ton/ton-io.hpp"
|
||||
#include "liteserver.hpp"
|
||||
#include "validator/fabric.h"
|
||||
#include "liteserver-cache.hpp"
|
||||
|
||||
namespace ton {
|
||||
|
||||
|
@ -46,7 +47,7 @@ td::actor::ActorOwn<Db> create_db_actor(td::actor::ActorId<ValidatorManager> man
|
|||
|
||||
td::actor::ActorOwn<LiteServerCache> create_liteserver_cache_actor(td::actor::ActorId<ValidatorManager> manager,
|
||||
std::string db_root) {
|
||||
return td::actor::create_actor<LiteServerCache>("cache");
|
||||
return td::actor::create_actor<LiteServerCacheImpl>("cache");
|
||||
}
|
||||
|
||||
td::Result<td::Ref<BlockData>> create_block(BlockIdExt block_id, td::BufferSlice data) {
|
||||
|
@ -246,7 +247,7 @@ void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_b
|
|||
|
||||
void run_liteserver_query(td::BufferSlice data, td::actor::ActorId<ValidatorManager> manager,
|
||||
td::actor::ActorId<LiteServerCache> cache, td::Promise<td::BufferSlice> promise) {
|
||||
LiteQuery::run_query(std::move(data), std::move(manager), std::move(promise));
|
||||
LiteQuery::run_query(std::move(data), std::move(manager), std::move(cache), std::move(promise));
|
||||
}
|
||||
|
||||
void run_fetch_account_state(WorkchainId wc, StdSmcAddress addr, td::actor::ActorId<ValidatorManager> manager,
|
||||
|
|
116
validator/impl/liteserver-cache.hpp
Normal file
116
validator/impl/liteserver-cache.hpp
Normal file
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
This file is part of TON Blockchain Library.
|
||||
|
||||
TON Blockchain Library is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation, either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
TON Blockchain Library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "interfaces/liteserver.h"
|
||||
#include <map>
|
||||
|
||||
namespace ton::validator {
|
||||
|
||||
class LiteServerCacheImpl : public LiteServerCache {
|
||||
public:
|
||||
void start_up() override {
|
||||
alarm();
|
||||
}
|
||||
|
||||
void alarm() override {
|
||||
alarm_timestamp() = td::Timestamp::in(60.0);
|
||||
if (queries_cnt_ > 0 || !send_message_cache_.empty()) {
|
||||
LOG(WARNING) << "LS Cache stats: " << queries_cnt_ << " queries, " << queries_hit_cnt_ << " hits; "
|
||||
<< cache_.size() << " entries, size=" << total_size_ << "/" << MAX_CACHE_SIZE << "; "
|
||||
<< send_message_cache_.size() << " different sendMessage queries, " << send_message_error_cnt_
|
||||
<< " duplicates";
|
||||
queries_cnt_ = 0;
|
||||
queries_hit_cnt_ = 0;
|
||||
send_message_cache_.clear();
|
||||
send_message_error_cnt_ = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void lookup(td::Bits256 key, td::Promise<td::BufferSlice> promise) override {
|
||||
++queries_cnt_;
|
||||
auto it = cache_.find(key);
|
||||
if (it == cache_.end()) {
|
||||
promise.set_error(td::Status::Error("not found"));
|
||||
return;
|
||||
}
|
||||
++queries_hit_cnt_;
|
||||
auto entry = it->second.get();
|
||||
entry->remove();
|
||||
lru_.put(entry);
|
||||
promise.set_value(entry->value_.clone());
|
||||
}
|
||||
|
||||
void update(td::Bits256 key, td::BufferSlice value) override {
|
||||
std::unique_ptr<CacheEntry> &entry = cache_[key];
|
||||
if (entry == nullptr) {
|
||||
entry = std::make_unique<CacheEntry>(key, std::move(value));
|
||||
} else {
|
||||
total_size_ -= entry->size();
|
||||
entry->value_ = std::move(value);
|
||||
entry->remove();
|
||||
}
|
||||
lru_.put(entry.get());
|
||||
total_size_ += entry->size();
|
||||
|
||||
while (total_size_ > MAX_CACHE_SIZE) {
|
||||
auto to_remove = (CacheEntry *)lru_.get();
|
||||
CHECK(to_remove);
|
||||
total_size_ -= to_remove->size();
|
||||
to_remove->remove();
|
||||
cache_.erase(to_remove->key_);
|
||||
}
|
||||
}
|
||||
|
||||
void process_send_message(td::Bits256 key, td::Promise<td::Unit> promise) override {
|
||||
if (send_message_cache_.insert(key).second) {
|
||||
promise.set_result(td::Unit());
|
||||
} else {
|
||||
++send_message_error_cnt_;
|
||||
promise.set_error(td::Status::Error("duplicate message"));
|
||||
}
|
||||
}
|
||||
|
||||
void drop_send_message_from_cache(td::Bits256 key) override {
|
||||
send_message_cache_.erase(key);
|
||||
}
|
||||
|
||||
private:
|
||||
struct CacheEntry : public td::ListNode {
|
||||
explicit CacheEntry(td::Bits256 key, td::BufferSlice value) : key_(key), value_(std::move(value)) {
|
||||
}
|
||||
td::Bits256 key_;
|
||||
td::BufferSlice value_;
|
||||
|
||||
size_t size() const {
|
||||
return value_.size() + 32 * 2;
|
||||
}
|
||||
};
|
||||
|
||||
std::map<td::Bits256, std::unique_ptr<CacheEntry>> cache_;
|
||||
td::ListNode lru_;
|
||||
size_t total_size_ = 0;
|
||||
|
||||
size_t queries_cnt_ = 0, queries_hit_cnt_ = 0;
|
||||
|
||||
std::set<td::Bits256> send_message_cache_;
|
||||
size_t send_message_error_cnt_ = 0;
|
||||
|
||||
static const size_t MAX_CACHE_SIZE = 64 << 20;
|
||||
};
|
||||
|
||||
} // namespace ton::validator
|
|
@ -54,8 +54,11 @@ td::int32 get_tl_tag(td::Slice slice) {
|
|||
}
|
||||
|
||||
void LiteQuery::run_query(td::BufferSlice data, td::actor::ActorId<ValidatorManager> manager,
|
||||
td::actor::ActorId<LiteServerCache> cache,
|
||||
td::Promise<td::BufferSlice> promise) {
|
||||
td::actor::create_actor<LiteQuery>("litequery", std::move(data), std::move(manager), std::move(promise)).release();
|
||||
td::actor::create_actor<LiteQuery>("litequery", std::move(data), std::move(manager), std::move(cache),
|
||||
std::move(promise))
|
||||
.release();
|
||||
}
|
||||
|
||||
void LiteQuery::fetch_account_state(WorkchainId wc, StdSmcAddress acc_addr, td::actor::ActorId<ton::validator::ValidatorManager> manager,
|
||||
|
@ -64,8 +67,8 @@ void LiteQuery::fetch_account_state(WorkchainId wc, StdSmcAddress acc_addr, td:
|
|||
}
|
||||
|
||||
LiteQuery::LiteQuery(td::BufferSlice data, td::actor::ActorId<ValidatorManager> manager,
|
||||
td::Promise<td::BufferSlice> promise)
|
||||
: query_(std::move(data)), manager_(std::move(manager)), promise_(std::move(promise)) {
|
||||
td::actor::ActorId<LiteServerCache> cache, td::Promise<td::BufferSlice> promise)
|
||||
: query_(std::move(data)), manager_(std::move(manager)), cache_(std::move(cache)), promise_(std::move(promise)) {
|
||||
timeout_ = td::Timestamp::in(default_timeout_msec * 0.001);
|
||||
}
|
||||
|
||||
|
@ -110,7 +113,10 @@ void LiteQuery::alarm() {
|
|||
fatal_error(-503, "timeout");
|
||||
}
|
||||
|
||||
bool LiteQuery::finish_query(td::BufferSlice result) {
|
||||
bool LiteQuery::finish_query(td::BufferSlice result, bool skip_cache_update) {
|
||||
if (use_cache_ && !skip_cache_update) {
|
||||
td::actor::send_closure(cache_, &LiteServerCache::update, cache_key_, result.clone());
|
||||
}
|
||||
if (promise_) {
|
||||
promise_.set_result(std::move(result));
|
||||
stop();
|
||||
|
@ -124,19 +130,53 @@ bool LiteQuery::finish_query(td::BufferSlice result) {
|
|||
void LiteQuery::start_up() {
|
||||
alarm_timestamp() = timeout_;
|
||||
|
||||
if(acc_state_promise_) {
|
||||
td::actor::send_closure_later(actor_id(this),&LiteQuery::perform_fetchAccountState);
|
||||
if (acc_state_promise_) {
|
||||
td::actor::send_closure_later(actor_id(this), &LiteQuery::perform_fetchAccountState);
|
||||
return;
|
||||
}
|
||||
|
||||
auto F = fetch_tl_object<ton::lite_api::Function>(std::move(query_), true);
|
||||
auto F = fetch_tl_object<ton::lite_api::Function>(query_, true);
|
||||
if (F.is_error()) {
|
||||
td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, 0); // unknown
|
||||
abort_query(F.move_as_error());
|
||||
return;
|
||||
}
|
||||
query_obj_ = F.move_as_ok();
|
||||
|
||||
if (!cache_.empty() && query_obj_->get_id() == lite_api::liteServer_sendMessage::ID) {
|
||||
// Dropping duplicate "sendMessage"
|
||||
cache_key_ = td::sha256_bits256(query_);
|
||||
td::actor::send_closure(cache_, &LiteServerCache::process_send_message, cache_key_,
|
||||
[SelfId = actor_id(this)](td::Result<td::Unit> R) {
|
||||
if (R.is_ok()) {
|
||||
td::actor::send_closure(SelfId, &LiteQuery::perform);
|
||||
} else {
|
||||
td::actor::send_closure(SelfId, &LiteQuery::abort_query,
|
||||
R.move_as_error_prefix("cannot send external message : "));
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
use_cache_ = !cache_.empty() && query_obj_->get_id() == lite_api::liteServer_runSmcMethod::ID;
|
||||
if (use_cache_) {
|
||||
cache_key_ = td::sha256_bits256(query_);
|
||||
td::actor::send_closure(
|
||||
cache_, &LiteServerCache::lookup, cache_key_, [SelfId = actor_id(this)](td::Result<td::BufferSlice> R) {
|
||||
if (R.is_error()) {
|
||||
td::actor::send_closure(SelfId, &LiteQuery::perform);
|
||||
} else {
|
||||
td::actor::send_closure(SelfId, &LiteQuery::finish_query, R.move_as_ok(), true);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
perform();
|
||||
}
|
||||
}
|
||||
|
||||
void LiteQuery::perform() {
|
||||
td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, query_obj_->get_id());
|
||||
lite_api::downcast_call(
|
||||
*F.move_as_ok().get(),
|
||||
*query_obj_,
|
||||
td::overloaded(
|
||||
[&](lite_api::liteServer_getTime& q) { this->perform_getTime(); },
|
||||
[&](lite_api::liteServer_getVersion& q) { this->perform_getVersion(); },
|
||||
|
@ -491,15 +531,18 @@ void LiteQuery::perform_sendMessage(td::BufferSlice data) {
|
|||
auto copy = data.clone();
|
||||
td::actor::send_closure_later(
|
||||
manager_, &ValidatorManager::check_external_message, std::move(copy),
|
||||
[Self = actor_id(this), data = std::move(data), manager = manager_](td::Result<td::Ref<ExtMessage>> res) mutable {
|
||||
if(res.is_error()) {
|
||||
td::actor::send_closure(Self, &LiteQuery::abort_query,
|
||||
res.move_as_error_prefix("cannot apply external message to current state : "s));
|
||||
[Self = actor_id(this), data = std::move(data), manager = manager_, cache = cache_,
|
||||
cache_key = cache_key_](td::Result<td::Ref<ExtMessage>> res) mutable {
|
||||
if (res.is_error()) {
|
||||
// Don't cache errors
|
||||
td::actor::send_closure(cache, &LiteServerCache::drop_send_message_from_cache, cache_key);
|
||||
td::actor::send_closure(Self, &LiteQuery::abort_query,
|
||||
res.move_as_error_prefix("cannot apply external message to current state : "s));
|
||||
} else {
|
||||
LOG(INFO) << "sending an external message to validator manager";
|
||||
td::actor::send_closure_later(manager, &ValidatorManager::send_external_message, res.move_as_ok());
|
||||
auto b = ton::create_serialize_tl_object<ton::lite_api::liteServer_sendMsgStatus>(1);
|
||||
td::actor::send_closure(Self, &LiteQuery::finish_query, std::move(b));
|
||||
td::actor::send_closure(Self, &LiteQuery::finish_query, std::move(b), false);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
#include "shard.hpp"
|
||||
#include "proof.hpp"
|
||||
#include "block/block-auto.h"
|
||||
|
||||
#include "auto/tl/lite_api.h"
|
||||
|
||||
namespace ton {
|
||||
|
||||
|
@ -37,11 +37,16 @@ using td::Ref;
|
|||
class LiteQuery : public td::actor::Actor {
|
||||
td::BufferSlice query_;
|
||||
td::actor::ActorId<ton::validator::ValidatorManager> manager_;
|
||||
td::actor::ActorId<LiteServerCache> cache_;
|
||||
td::Timestamp timeout_;
|
||||
td::Promise<td::BufferSlice> promise_;
|
||||
|
||||
td::Promise<std::tuple<td::Ref<vm::CellSlice>,UnixTime,LogicalTime,std::unique_ptr<block::ConfigInfo>>> acc_state_promise_;
|
||||
|
||||
tl_object_ptr<ton::lite_api::Function> query_obj_;
|
||||
bool use_cache_{false};
|
||||
td::Bits256 cache_key_;
|
||||
|
||||
int pending_{0};
|
||||
int mode_{0};
|
||||
WorkchainId acc_workchain_;
|
||||
|
@ -75,11 +80,11 @@ class LiteQuery : public td::actor::Actor {
|
|||
ls_capabilities = 7
|
||||
}; // version 1.1; +1 = build block proof chains, +2 = masterchainInfoExt, +4 = runSmcMethod
|
||||
LiteQuery(td::BufferSlice data, td::actor::ActorId<ton::validator::ValidatorManager> manager,
|
||||
td::Promise<td::BufferSlice> promise);
|
||||
td::actor::ActorId<LiteServerCache> cache, td::Promise<td::BufferSlice> promise);
|
||||
LiteQuery(WorkchainId wc, StdSmcAddress acc_addr, td::actor::ActorId<ton::validator::ValidatorManager> manager,
|
||||
td::Promise<std::tuple<td::Ref<vm::CellSlice>,UnixTime,LogicalTime,std::unique_ptr<block::ConfigInfo>>> promise);
|
||||
static void run_query(td::BufferSlice data, td::actor::ActorId<ton::validator::ValidatorManager> manager,
|
||||
td::Promise<td::BufferSlice> promise);
|
||||
td::actor::ActorId<LiteServerCache> cache, td::Promise<td::BufferSlice> promise);
|
||||
|
||||
static void fetch_account_state(WorkchainId wc, StdSmcAddress acc_addr, td::actor::ActorId<ton::validator::ValidatorManager> manager,
|
||||
td::Promise<std::tuple<td::Ref<vm::CellSlice>,UnixTime,LogicalTime,std::unique_ptr<block::ConfigInfo>>> promise);
|
||||
|
@ -90,9 +95,10 @@ class LiteQuery : public td::actor::Actor {
|
|||
bool fatal_error(int err_code, std::string err_msg = "");
|
||||
void abort_query(td::Status reason);
|
||||
void abort_query_ext(td::Status reason, std::string err_msg);
|
||||
bool finish_query(td::BufferSlice result);
|
||||
bool finish_query(td::BufferSlice result, bool skip_cache_update = false);
|
||||
void alarm() override;
|
||||
void start_up() override;
|
||||
void perform();
|
||||
void perform_getTime();
|
||||
void perform_getVersion();
|
||||
void perform_getMasterchainInfo(int mode);
|
||||
|
|
|
@ -119,12 +119,12 @@ class Db : public td::actor::Actor {
|
|||
td::Promise<td::BufferSlice> promise) = 0;
|
||||
virtual void set_async_mode(bool mode, td::Promise<td::Unit> promise) = 0;
|
||||
|
||||
virtual void run_gc(UnixTime mc_ts, UnixTime gc_ts, UnixTime archive_ttl) = 0;
|
||||
|
||||
virtual void add_persistent_state_description(td::Ref<PersistentStateDescription> desc,
|
||||
td::Promise<td::Unit> promise) = 0;
|
||||
virtual void get_persistent_state_descriptions(
|
||||
td::Promise<std::vector<td::Ref<PersistentStateDescription>>> promise) = 0;
|
||||
|
||||
virtual void run_gc(UnixTime ts, UnixTime archive_ttl) = 0;
|
||||
};
|
||||
|
||||
} // namespace validator
|
||||
|
|
|
@ -19,16 +19,20 @@
|
|||
#pragma once
|
||||
|
||||
#include "td/actor/actor.h"
|
||||
#include "td/utils/buffer.h"
|
||||
#include "common/bitstring.h"
|
||||
|
||||
namespace ton {
|
||||
|
||||
namespace validator {
|
||||
namespace ton::validator {
|
||||
|
||||
class LiteServerCache : public td::actor::Actor {
|
||||
public:
|
||||
virtual ~LiteServerCache() = default;
|
||||
~LiteServerCache() override = default;
|
||||
|
||||
virtual void lookup(td::Bits256 key, td::Promise<td::BufferSlice> promise) = 0;
|
||||
virtual void update(td::Bits256 key, td::BufferSlice value) = 0;
|
||||
|
||||
virtual void process_send_message(td::Bits256 key, td::Promise<td::Unit> promise) = 0;
|
||||
virtual void drop_send_message_from_cache(td::Bits256 key) = 0;
|
||||
};
|
||||
|
||||
} // namespace validator
|
||||
|
||||
} // namespace ton
|
||||
} // namespace ton::validator
|
|
@ -178,6 +178,9 @@ class ValidatorManager : public ValidatorManagerInterface {
|
|||
virtual void get_block_by_seqno_from_db_for_litequery(AccountIdPrefixFull account, BlockSeqno seqno,
|
||||
td::Promise<ConstBlockHandle> promise) = 0;
|
||||
|
||||
virtual void add_lite_query_stats(int lite_query_id) {
|
||||
}
|
||||
|
||||
virtual void validated_new_block(BlockIdExt block_id) = 0;
|
||||
|
||||
virtual void add_persistent_state_description(td::Ref<PersistentStateDescription> desc) = 0;
|
||||
|
|
|
@ -403,6 +403,7 @@ void ValidatorManagerImpl::add_external_message(td::Ref<ExtMessage> msg) {
|
|||
}
|
||||
}
|
||||
void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) {
|
||||
++ls_stats_check_ext_messages_;
|
||||
auto state = do_get_last_liteserver_state();
|
||||
if (state.is_null()) {
|
||||
promise.set_error(td::Status::Error(ErrorCode::notready, "not ready"));
|
||||
|
@ -1299,7 +1300,7 @@ void ValidatorManagerImpl::write_handle(BlockHandle handle, td::Promise<td::Unit
|
|||
void ValidatorManagerImpl::written_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
|
||||
bool received = handle->received();
|
||||
bool inited_state = handle->received_state();
|
||||
bool inited_proof = handle->inited_proof();
|
||||
bool inited_proof = handle->id().is_masterchain() ? handle->inited_proof() : handle->inited_proof_link();
|
||||
|
||||
if (handle->need_flush()) {
|
||||
handle->flush(actor_id(this), handle, std::move(promise));
|
||||
|
@ -1312,11 +1313,24 @@ void ValidatorManagerImpl::written_handle(BlockHandle handle, td::Promise<td::Un
|
|||
td::actor::send_closure(it->second.actor_, &WaitBlockData::force_read_from_db);
|
||||
}
|
||||
}
|
||||
if (inited_state && inited_proof) {
|
||||
if (inited_state) {
|
||||
auto it = wait_state_.find(handle->id());
|
||||
if (it != wait_state_.end()) {
|
||||
td::actor::send_closure(it->second.actor_, &WaitBlockState::force_read_from_db);
|
||||
}
|
||||
} else {
|
||||
if (handle->inited_proof_link()) {
|
||||
auto it = wait_state_.find(handle->id());
|
||||
if (it != wait_state_.end()) {
|
||||
td::actor::send_closure(it->second.actor_, &WaitBlockState::after_get_proof_link);
|
||||
}
|
||||
}
|
||||
if (handle->id().is_masterchain() && handle->inited_proof()) {
|
||||
auto it = wait_state_.find(handle->id());
|
||||
if (it != wait_state_.end()) {
|
||||
td::actor::send_closure(it->second.actor_, &WaitBlockState::after_get_proof);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
promise.set_value(td::Unit());
|
||||
|
@ -2551,9 +2565,9 @@ void ValidatorManagerImpl::state_serializer_update(BlockSeqno seqno) {
|
|||
void ValidatorManagerImpl::alarm() {
|
||||
try_advance_gc_masterchain_block();
|
||||
alarm_timestamp() = td::Timestamp::in(1.0);
|
||||
if (gc_masterchain_handle_) {
|
||||
td::actor::send_closure(db_, &Db::run_gc, gc_masterchain_handle_->unix_time(),
|
||||
static_cast<UnixTime>(opts_->archive_ttl()));
|
||||
if (last_masterchain_block_handle_ && gc_masterchain_handle_) {
|
||||
td::actor::send_closure(db_, &Db::run_gc, last_masterchain_block_handle_->unix_time(),
|
||||
gc_masterchain_handle_->unix_time(), static_cast<UnixTime>(opts_->archive_ttl()));
|
||||
}
|
||||
if (log_status_at_.is_in_past()) {
|
||||
if (last_masterchain_block_handle_) {
|
||||
|
@ -2633,6 +2647,29 @@ void ValidatorManagerImpl::alarm() {
|
|||
}
|
||||
}
|
||||
alarm_timestamp().relax(check_shard_clients_);
|
||||
|
||||
if (log_ls_stats_at_.is_in_past()) {
|
||||
if (!ls_stats_.empty() || ls_stats_check_ext_messages_ != 0) {
|
||||
td::StringBuilder sb;
|
||||
sb << "Liteserver stats (1 minute):";
|
||||
td::uint32 total = 0;
|
||||
for (const auto &p : ls_stats_) {
|
||||
sb << " " << lite_query_name_by_id(p.first) << ":" << p.second;
|
||||
total += p.second;
|
||||
}
|
||||
if (total > 0) {
|
||||
sb << " TOTAL:" << total;
|
||||
}
|
||||
if (ls_stats_check_ext_messages_ > 0) {
|
||||
sb << " checkExtMessage:" << ls_stats_check_ext_messages_;
|
||||
}
|
||||
LOG(WARNING) << sb.as_cslice();
|
||||
}
|
||||
ls_stats_.clear();
|
||||
ls_stats_check_ext_messages_ = 0;
|
||||
log_ls_stats_at_ = td::Timestamp::in(60.0);
|
||||
}
|
||||
alarm_timestamp().relax(log_ls_stats_at_);
|
||||
}
|
||||
|
||||
void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) {
|
||||
|
|
|
@ -615,6 +615,10 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
td::Result<ConstBlockHandle> r_handle,
|
||||
td::Promise<ConstBlockHandle> promise);
|
||||
|
||||
void add_lite_query_stats(int lite_query_id) override {
|
||||
++ls_stats_[lite_query_id];
|
||||
}
|
||||
|
||||
private:
|
||||
td::Timestamp resend_shard_blocks_at_;
|
||||
td::Timestamp check_waiters_at_;
|
||||
|
@ -690,6 +694,10 @@ class ValidatorManagerImpl : public ValidatorManager {
|
|||
std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_;
|
||||
td::actor::ActorOwn<QueueSizeCounter> queue_size_counter_;
|
||||
|
||||
td::Timestamp log_ls_stats_at_;
|
||||
std::map<int, td::uint32> ls_stats_; // lite_api ID -> count, 0 for unknown
|
||||
td::uint32 ls_stats_check_ext_messages_{0};
|
||||
|
||||
struct Collator {
|
||||
td::actor::ActorOwn<CollatorNode> actor;
|
||||
std::set<ShardIdFull> shards;
|
||||
|
|
|
@ -169,6 +169,8 @@ void ValidatorGroup::accept_block_candidate(td::uint32 round_id, PublicKeyHash s
|
|||
std::move(approve_sig_set), src == local_id_, std::move(promise));
|
||||
prev_block_ids_ = std::vector<BlockIdExt>{next_block_id};
|
||||
cached_collated_block_ = nullptr;
|
||||
approved_candidates_cache_.clear();
|
||||
approved_candidates_cache_.clear();
|
||||
}
|
||||
|
||||
void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData> block, std::vector<BlockIdExt> prev,
|
||||
|
@ -185,36 +187,15 @@ void ValidatorGroup::accept_block_query(BlockIdExt block_id, td::Ref<BlockData>
|
|||
return;
|
||||
}
|
||||
LOG_CHECK(R.error().code() == ErrorCode::timeout || R.error().code() == ErrorCode::notready) << R.move_as_error();
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block),
|
||||
std::move(prev), std::move(sig_set), std::move(approve_sig_set), std::move(promise));
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::accept_block_query, block_id, std::move(block),
|
||||
std::move(prev), std::move(sig_set), std::move(approve_sig_set), send_broadcast,
|
||||
std::move(promise), true);
|
||||
} else {
|
||||
promise.set_value(R.move_as_ok());
|
||||
}
|
||||
});
|
||||
|
||||
run_accept_block_query(next_block_id, std::move(block), prev_block_ids_, validator_set_, std::move(sig_set),
|
||||
std::move(approve_sig_set), src == local_id_, manager_, std::move(P));
|
||||
prev_block_ids_ = std::vector<BlockIdExt>{next_block_id};
|
||||
cached_collated_block_ = nullptr;
|
||||
approved_candidates_cache_.clear();
|
||||
}
|
||||
|
||||
void ValidatorGroup::retry_accept_block_query(BlockIdExt block_id, td::Ref<BlockData> block,
|
||||
std::vector<BlockIdExt> prev, td::Ref<BlockSignatureSet> sig_set,
|
||||
td::Ref<BlockSignatureSet> approve_sig_set,
|
||||
td::Promise<td::Unit> promise) {
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), block_id, block, prev, sig_set, approve_sig_set,
|
||||
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
|
||||
if (R.is_error()) {
|
||||
LOG_CHECK(R.error().code() == ErrorCode::timeout) << R.move_as_error();
|
||||
td::actor::send_closure(SelfId, &ValidatorGroup::retry_accept_block_query, block_id, std::move(block),
|
||||
std::move(prev), std::move(sig_set), std::move(approve_sig_set), std::move(promise));
|
||||
} else {
|
||||
promise.set_value(R.move_as_ok());
|
||||
}
|
||||
});
|
||||
|
||||
run_accept_block_query(block_id, std::move(block), std::move(prev), validator_set_, std::move(sig_set),
|
||||
run_accept_block_query(block_id, std::move(block), prev_block_ids_, validator_set_, std::move(sig_set),
|
||||
std::move(approve_sig_set), send_broadcast,
|
||||
shard_.is_masterchain() || mode_ == ValidatorManagerOptions::validator_normal, manager_,
|
||||
std::move(P));
|
||||
|
|
|
@ -115,6 +115,12 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
|
|||
td::uint32 get_celldb_compress_depth() const override {
|
||||
return celldb_compress_depth_;
|
||||
}
|
||||
size_t get_max_open_archive_files() const override {
|
||||
return max_open_archive_files_;
|
||||
}
|
||||
double get_archive_preload_period() const override {
|
||||
return archive_preload_period_;
|
||||
}
|
||||
ValidatorMode validator_mode() const override {
|
||||
return validator_mode_;
|
||||
}
|
||||
|
@ -174,6 +180,12 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
|
|||
void set_celldb_compress_depth(td::uint32 value) override {
|
||||
celldb_compress_depth_ = value;
|
||||
}
|
||||
void set_max_open_archive_files(size_t value) override {
|
||||
max_open_archive_files_ = value;
|
||||
}
|
||||
void set_archive_preload_period(double value) override {
|
||||
archive_preload_period_ = value;
|
||||
}
|
||||
void set_validator_mode(ValidatorMode value) override {
|
||||
validator_mode_ = value;
|
||||
}
|
||||
|
@ -218,6 +230,8 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions {
|
|||
BlockSeqno sync_upto_{0};
|
||||
std::string session_logs_file_;
|
||||
td::uint32 celldb_compress_depth_{0};
|
||||
size_t max_open_archive_files_ = 0;
|
||||
double archive_preload_period_ = 0.0;
|
||||
ValidatorMode validator_mode_ = validator_normal;
|
||||
};
|
||||
|
||||
|
|
|
@ -82,6 +82,8 @@ struct ValidatorManagerOptions : public td::CntObject {
|
|||
virtual BlockSeqno sync_upto() const = 0;
|
||||
virtual std::string get_session_logs_file() const = 0;
|
||||
virtual td::uint32 get_celldb_compress_depth() const = 0;
|
||||
virtual size_t get_max_open_archive_files() const = 0;
|
||||
virtual double get_archive_preload_period() const = 0;
|
||||
virtual ValidatorMode validator_mode() const = 0;
|
||||
|
||||
virtual void set_zero_block_id(BlockIdExt block_id) = 0;
|
||||
|
@ -102,6 +104,8 @@ struct ValidatorManagerOptions : public td::CntObject {
|
|||
virtual void set_sync_upto(BlockSeqno seqno) = 0;
|
||||
virtual void set_session_logs_file(std::string f) = 0;
|
||||
virtual void set_celldb_compress_depth(td::uint32 value) = 0;
|
||||
virtual void set_max_open_archive_files(size_t value) = 0;
|
||||
virtual void set_archive_preload_period(double value) = 0;
|
||||
virtual void set_validator_mode(ValidatorMode value) = 0;
|
||||
|
||||
static td::Ref<ValidatorManagerOptions> create(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue