diff --git a/validator/CMakeLists.txt b/validator/CMakeLists.txt index 8de60081..832374c6 100644 --- a/validator/CMakeLists.txt +++ b/validator/CMakeLists.txt @@ -43,6 +43,7 @@ set(VALIDATOR_HEADERS fabric.h interfaces/db.h interfaces/external-message.h + interfaces/liteserver.h interfaces/proof.h interfaces/shard.h interfaces/signature-set.h diff --git a/validator/impl/CMakeLists.txt b/validator/impl/CMakeLists.txt index f4b967a8..7cd273f2 100644 --- a/validator/impl/CMakeLists.txt +++ b/validator/impl/CMakeLists.txt @@ -32,6 +32,7 @@ set(TON_VALIDATOR_SOURCE external-message.hpp ihr-message.hpp liteserver.hpp + liteserver-cache.hpp message-queue.hpp proof.hpp shard.hpp diff --git a/validator/impl/fabric.cpp b/validator/impl/fabric.cpp index ede8d36d..e3478594 100644 --- a/validator/impl/fabric.cpp +++ b/validator/impl/fabric.cpp @@ -34,6 +34,7 @@ #include "ton/ton-io.hpp" #include "liteserver.hpp" #include "validator/fabric.h" +#include "liteserver-cache.hpp" namespace ton { @@ -46,7 +47,7 @@ td::actor::ActorOwn create_db_actor(td::actor::ActorId man td::actor::ActorOwn create_liteserver_cache_actor(td::actor::ActorId manager, std::string db_root) { - return td::actor::create_actor("cache"); + return td::actor::create_actor("cache"); } td::Result> create_block(BlockIdExt block_id, td::BufferSlice data) { @@ -244,7 +245,7 @@ void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_b void run_liteserver_query(td::BufferSlice data, td::actor::ActorId manager, td::actor::ActorId cache, td::Promise promise) { - LiteQuery::run_query(std::move(data), std::move(manager), std::move(promise)); + LiteQuery::run_query(std::move(data), std::move(manager), std::move(cache), std::move(promise)); } void run_fetch_account_state(WorkchainId wc, StdSmcAddress addr, td::actor::ActorId manager, diff --git a/validator/impl/liteserver-cache.hpp b/validator/impl/liteserver-cache.hpp new file mode 100644 index 00000000..0e58e051 --- /dev/null +++ b/validator/impl/liteserver-cache.hpp @@ -0,0 +1,112 @@ +/* + This file is part of TON Blockchain Library. + + TON Blockchain Library is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + TON Blockchain Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with TON Blockchain Library. If not, see . +*/ +#pragma once + +#include "interfaces/liteserver.h" +#include + +namespace ton::validator { + +class LiteServerCacheImpl : public LiteServerCache { + public: + void start_up() override { + alarm(); + } + + void alarm() override { + alarm_timestamp() = td::Timestamp::in(60.0); + if (queries_cnt_ > 0 || !send_message_cache_.empty()) { + LOG(WARNING) << "LS Cache stats: " << queries_cnt_ << " queries, " << queries_hit_cnt_ << " hits; " + << cache_.size() << " entries, size=" << total_size_ << "/" << MAX_CACHE_SIZE << "; " + << send_message_cache_.size() << " different sendMessage queries, " << send_message_error_cnt_ + << " duplicates"; + queries_cnt_ = 0; + queries_hit_cnt_ = 0; + send_message_cache_.clear(); + send_message_error_cnt_ = 0; + } + } + + void lookup(td::Bits256 key, td::Promise promise) override { + ++queries_cnt_; + auto it = cache_.find(key); + if (it == cache_.end()) { + promise.set_error(td::Status::Error("not found")); + return; + } + ++queries_hit_cnt_; + auto entry = it->second.get(); + entry->remove(); + lru_.put(entry); + promise.set_value(entry->value_.clone()); + } + + void update(td::Bits256 key, td::BufferSlice value) override { + std::unique_ptr &entry = cache_[key]; + if (entry == nullptr) { + entry = std::make_unique(key, std::move(value)); + } else { + total_size_ -= entry->size(); + entry->value_ = std::move(value); + entry->remove(); + } + lru_.put(entry.get()); + total_size_ += entry->size(); + + while (total_size_ > MAX_CACHE_SIZE) { + auto to_remove = (CacheEntry *)lru_.get(); + CHECK(to_remove); + total_size_ -= to_remove->size(); + to_remove->remove(); + cache_.erase(to_remove->key_); + } + } + + void process_send_message(td::Bits256 key, td::Promise promise) override { + if (send_message_cache_.insert(key).second) { + promise.set_result(td::Unit()); + } else { + ++send_message_error_cnt_; + promise.set_error(td::Status::Error("duplicate message")); + } + } + + private: + struct CacheEntry : public td::ListNode { + explicit CacheEntry(td::Bits256 key, td::BufferSlice value) : key_(key), value_(std::move(value)) { + } + td::Bits256 key_; + td::BufferSlice value_; + + size_t size() const { + return value_.size() + 32 * 2; + } + }; + + std::map> cache_; + td::ListNode lru_; + size_t total_size_ = 0; + + size_t queries_cnt_ = 0, queries_hit_cnt_ = 0; + + std::set send_message_cache_; + size_t send_message_error_cnt_ = 0; + + static const size_t MAX_CACHE_SIZE = 64 << 20; +}; + +} // namespace ton::validator diff --git a/validator/impl/liteserver.cpp b/validator/impl/liteserver.cpp index f2be6ef9..7d25b408 100644 --- a/validator/impl/liteserver.cpp +++ b/validator/impl/liteserver.cpp @@ -54,8 +54,11 @@ td::int32 get_tl_tag(td::Slice slice) { } void LiteQuery::run_query(td::BufferSlice data, td::actor::ActorId manager, + td::actor::ActorId cache, td::Promise promise) { - td::actor::create_actor("litequery", std::move(data), std::move(manager), std::move(promise)).release(); + td::actor::create_actor("litequery", std::move(data), std::move(manager), std::move(cache), + std::move(promise)) + .release(); } void LiteQuery::fetch_account_state(WorkchainId wc, StdSmcAddress acc_addr, td::actor::ActorId manager, @@ -64,8 +67,8 @@ void LiteQuery::fetch_account_state(WorkchainId wc, StdSmcAddress acc_addr, td: } LiteQuery::LiteQuery(td::BufferSlice data, td::actor::ActorId manager, - td::Promise promise) - : query_(std::move(data)), manager_(std::move(manager)), promise_(std::move(promise)) { + td::actor::ActorId cache, td::Promise promise) + : query_(std::move(data)), manager_(std::move(manager)), cache_(std::move(cache)), promise_(std::move(promise)) { timeout_ = td::Timestamp::in(default_timeout_msec * 0.001); } @@ -110,7 +113,10 @@ void LiteQuery::alarm() { fatal_error(-503, "timeout"); } -bool LiteQuery::finish_query(td::BufferSlice result) { +bool LiteQuery::finish_query(td::BufferSlice result, bool skip_cache_update) { + if (use_cache_ && !skip_cache_update) { + td::actor::send_closure(cache_, &LiteServerCache::update, cache_key_, result.clone()); + } if (promise_) { promise_.set_result(std::move(result)); stop(); @@ -124,21 +130,53 @@ bool LiteQuery::finish_query(td::BufferSlice result) { void LiteQuery::start_up() { alarm_timestamp() = timeout_; - if(acc_state_promise_) { - td::actor::send_closure_later(actor_id(this),&LiteQuery::perform_fetchAccountState); + if (acc_state_promise_) { + td::actor::send_closure_later(actor_id(this), &LiteQuery::perform_fetchAccountState); return; } - auto F = fetch_tl_object(std::move(query_), true); + auto F = fetch_tl_object(query_, true); if (F.is_error()) { td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, 0); // unknown abort_query(F.move_as_error()); return; } - td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, F.ok()->get_id()); + query_obj_ = F.move_as_ok(); + if (!cache_.empty() && query_obj_->get_id() == lite_api::liteServer_sendMessage::ID) { + // Dropping duplicate "sendMessage" + cache_key_ = td::sha256_bits256(query_); + td::actor::send_closure(cache_, &LiteServerCache::process_send_message, cache_key_, + [SelfId = actor_id(this)](td::Result R) { + if (R.is_ok()) { + td::actor::send_closure(SelfId, &LiteQuery::perform); + } else { + td::actor::send_closure(SelfId, &LiteQuery::abort_query, + R.move_as_error_prefix("cannot send external message : ")); + } + }); + return; + } + use_cache_ = !cache_.empty() && query_obj_->get_id() == lite_api::liteServer_runSmcMethod::ID; + if (use_cache_) { + cache_key_ = td::sha256_bits256(query_); + td::actor::send_closure( + cache_, &LiteServerCache::lookup, cache_key_, [SelfId = actor_id(this)](td::Result R) { + if (R.is_error()) { + td::actor::send_closure(SelfId, &LiteQuery::perform); + } else { + td::actor::send_closure(SelfId, &LiteQuery::finish_query, R.move_as_ok(), true); + } + }); + } else { + perform(); + } +} + +void LiteQuery::perform() { + td::actor::send_closure(manager_, &ValidatorManager::add_lite_query_stats, query_obj_->get_id()); lite_api::downcast_call( - *F.move_as_ok().get(), + *query_obj_, td::overloaded( [&](lite_api::liteServer_getTime& q) { this->perform_getTime(); }, [&](lite_api::liteServer_getVersion& q) { this->perform_getVersion(); }, @@ -501,7 +539,7 @@ void LiteQuery::perform_sendMessage(td::BufferSlice data) { LOG(INFO) << "sending an external message to validator manager"; td::actor::send_closure_later(manager, &ValidatorManager::send_external_message, res.move_as_ok()); auto b = ton::create_serialize_tl_object(1); - td::actor::send_closure(Self, &LiteQuery::finish_query, std::move(b)); + td::actor::send_closure(Self, &LiteQuery::finish_query, std::move(b), false); } }); } diff --git a/validator/impl/liteserver.hpp b/validator/impl/liteserver.hpp index 2707fdfe..57ec7c3c 100644 --- a/validator/impl/liteserver.hpp +++ b/validator/impl/liteserver.hpp @@ -27,7 +27,7 @@ #include "shard.hpp" #include "proof.hpp" #include "block/block-auto.h" - +#include "auto/tl/lite_api.h" namespace ton { @@ -37,11 +37,16 @@ using td::Ref; class LiteQuery : public td::actor::Actor { td::BufferSlice query_; td::actor::ActorId manager_; + td::actor::ActorId cache_; td::Timestamp timeout_; td::Promise promise_; td::Promise,UnixTime,LogicalTime,std::unique_ptr>> acc_state_promise_; + tl_object_ptr query_obj_; + bool use_cache_{false}; + td::Bits256 cache_key_; + int pending_{0}; int mode_{0}; WorkchainId acc_workchain_; @@ -75,11 +80,11 @@ class LiteQuery : public td::actor::Actor { ls_capabilities = 7 }; // version 1.1; +1 = build block proof chains, +2 = masterchainInfoExt, +4 = runSmcMethod LiteQuery(td::BufferSlice data, td::actor::ActorId manager, - td::Promise promise); + td::actor::ActorId cache, td::Promise promise); LiteQuery(WorkchainId wc, StdSmcAddress acc_addr, td::actor::ActorId manager, td::Promise,UnixTime,LogicalTime,std::unique_ptr>> promise); static void run_query(td::BufferSlice data, td::actor::ActorId manager, - td::Promise promise); + td::actor::ActorId cache, td::Promise promise); static void fetch_account_state(WorkchainId wc, StdSmcAddress acc_addr, td::actor::ActorId manager, td::Promise,UnixTime,LogicalTime,std::unique_ptr>> promise); @@ -90,9 +95,10 @@ class LiteQuery : public td::actor::Actor { bool fatal_error(int err_code, std::string err_msg = ""); void abort_query(td::Status reason); void abort_query_ext(td::Status reason, std::string err_msg); - bool finish_query(td::BufferSlice result); + bool finish_query(td::BufferSlice result, bool skip_cache_update = false); void alarm() override; void start_up() override; + void perform(); void perform_getTime(); void perform_getVersion(); void perform_getMasterchainInfo(int mode); diff --git a/validator/interfaces/liteserver.h b/validator/interfaces/liteserver.h index 3e803029..0920c11f 100644 --- a/validator/interfaces/liteserver.h +++ b/validator/interfaces/liteserver.h @@ -19,16 +19,19 @@ #pragma once #include "td/actor/actor.h" +#include "td/utils/buffer.h" +#include "common/bitstring.h" -namespace ton { - -namespace validator { +namespace ton::validator { class LiteServerCache : public td::actor::Actor { public: - virtual ~LiteServerCache() = default; + ~LiteServerCache() override = default; + + virtual void lookup(td::Bits256 key, td::Promise promise) = 0; + virtual void update(td::Bits256 key, td::BufferSlice value) = 0; + + virtual void process_send_message(td::Bits256 key, td::Promise promise) = 0; }; -} // namespace validator - -} // namespace ton +} // namespace ton::validator \ No newline at end of file