1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

New liteserver config format

* Specify shards and seqno/utime/lt limits for liteservers in global config
* Support in lite-client, tonlib, blockchain-explorer
* Rework proxy-liteserver
This commit is contained in:
SpyCheese 2024-06-12 18:12:45 +03:00
parent 38ab70c037
commit 007f1fb1d7
26 changed files with 1187 additions and 1130 deletions

View file

@ -29,50 +29,71 @@
#include "td/utils/OptionParser.h"
#include "td/utils/port/path.h"
#include "td/utils/port/signals.h"
#include "td/utils/port/user.h"
#include "td/utils/port/IPAddress.h"
#include "td/utils/Random.h"
#include "td/utils/FileLog.h"
#include "git.h"
#include "auto/tl/ton_api.h"
#include "auto/tl/lite_api.h"
#include "auto/tl/lite_api.hpp"
#include "tl-utils/lite-utils.hpp"
#include "ton/lite-tl.hpp"
#include "auto/tl/ton_api_json.h"
#include "adnl/adnl.h"
#include "lite-client/QueryTraits.h"
#include "lite-client/ext-client.h"
#if TD_DARWIN || TD_LINUX
#include <unistd.h>
#endif
#include <iostream>
#include <map>
using namespace ton;
class ProxyLiteserver : public td::actor::Actor {
public:
ProxyLiteserver(std::string global_config, std::string db_root, td::uint16 port)
: global_config_(std::move(global_config)), db_root_(std::move(db_root)), port_(port) {
ProxyLiteserver(std::string global_config, std::string db_root, td::uint16 port, PublicKeyHash public_key_hash)
: global_config_(std::move(global_config))
, db_root_(std::move(db_root))
, port_(port)
, public_key_hash_(public_key_hash) {
}
void start_up() override {
LOG_CHECK(db_root_ != "") << "db root is not set";
LOG_CHECK(!db_root_.empty()) << "db root is not set";
td::mkdir(db_root_).ensure();
db_root_ = td::realpath(db_root_).move_as_ok();
keyring_ = keyring::Keyring::create(db_root_ + "/keyring");
if (public_key_hash_.is_zero()) {
id_ = {};
run();
} else {
td::actor::send_closure(keyring_, &keyring::Keyring::get_public_key, public_key_hash_,
[SelfId = actor_id(this)](td::Result<PublicKey> R) mutable {
if (R.is_error()) {
LOG(FATAL) << "Failed to load public key: " << R.move_as_error();
}
td::actor::send_closure(SelfId, &ProxyLiteserver::got_public_key, R.move_as_ok());
});
}
}
void got_public_key(PublicKey pub) {
id_ = adnl::AdnlNodeIdFull{pub};
run();
}
void run() {
td::Status S = prepare_local_config();
if (S.is_error()) {
LOG(FATAL) << "Local config error: " << S;
}
S = create_ext_client();
S = parse_global_config();
if (S.is_error()) {
LOG(FATAL) << S;
}
run_clients();
create_ext_server();
}
@ -82,73 +103,123 @@ class ProxyLiteserver : public td::actor::Actor {
auto conf_data = r_conf_data.move_as_ok();
TRY_RESULT_PREFIX(conf_json, td::json_decode(conf_data.as_slice()), "failed to parse json: ");
TRY_STATUS_PREFIX(ton_api::from_json(*config_, conf_json.get_object()), "json does not fit TL scheme: ");
TRY_RESULT_PREFIX_ASSIGN(port_, td::narrow_cast_safe<td::uint16>(config_->port_), "invalid port: ");
TRY_RESULT_PREFIX_ASSIGN(id_, adnl::AdnlNodeIdFull::create(config_->id_), "invalid id: ");
TRY_RESULT_PREFIX(cfg_port, td::narrow_cast_safe<td::uint16>(config_->port_), "invalid port: ");
TRY_RESULT_PREFIX(cfg_id, adnl::AdnlNodeIdFull::create(config_->id_), "invalid id: ");
bool rewrite_config = false;
if (port_ == 0) {
port_ = cfg_port;
} else {
rewrite_config |= (port_ != cfg_port);
}
if (id_.empty()) {
id_ = std::move(cfg_id);
} else {
rewrite_config |= (id_ != cfg_id);
}
if (!rewrite_config) {
return td::Status::OK();
}
} else {
LOG(WARNING) << "First launch, creating local config";
if (port_ == 0) {
return td::Status::Error("port is not set");
}
config_->port_ = port_;
}
if (port_ == 0) {
return td::Status::Error("port is not set");
}
config_->port_ = port_;
if (id_.empty()) {
auto pk = PrivateKey{privkeys::Ed25519::random()};
id_ = adnl::AdnlNodeIdFull{pk.compute_public_key()};
config_->id_ = id_.tl();
td::actor::send_closure(keyring_, &keyring::Keyring::add_key, std::move(pk), false, [](td::Result<td::Unit> R) {
if (R.is_error()) {
LOG(FATAL) << "Failed to store private key";
}
});
}
config_->id_ = id_.tl();
auto s = td::json_encode<std::string>(td::ToJson(*config_), true);
TRY_STATUS_PREFIX(td::write_file(config_file(), s), "failed to write file: ");
auto s = td::json_encode<std::string>(td::ToJson(*config_), true);
TRY_STATUS_PREFIX(td::write_file(config_file(), s), "failed to write file: ");
LOG(WARNING) << "Writing config.json";
return td::Status::OK();
}
td::Status parse_global_config() {
TRY_RESULT_PREFIX(global_config_data, td::read_file(global_config_), "Failed to read global config: ");
TRY_RESULT_PREFIX(global_config_json, td::json_decode(global_config_data.as_slice()),
"Failed to parse global config: ");
ton_api::liteclient_config_global gc;
TRY_STATUS_PREFIX(ton_api::from_json(gc, global_config_json.get_object()), "Failed to parse global config: ");
TRY_RESULT_PREFIX(servers, liteclient::LiteServerConfig::parse_global_config(gc),
"Falied to parse liteservers in global config: ");
if (servers.empty()) {
return td::Status::Error("No liteservers in global config");
}
for (auto& s : servers) {
servers_.emplace_back();
servers_.back().config = std::move(s);
}
return td::Status::OK();
}
td::Status create_ext_client() {
std::vector<liteclient::ExtClient::LiteServer> servers;
TRY_RESULT_PREFIX(global_config_data, td::read_file(global_config_), "Failed to read global config: ");
TRY_RESULT_PREFIX(global_config_json, td::json_decode(global_config_data.as_slice()),
"Failed to parse global config: ");
ton::ton_api::liteclient_config_global gc;
ton::ton_api::from_json(gc, global_config_json.get_object()).ensure();
size_t size = gc.liteservers_.size() + gc.liteservers_v2_.size();
if (size == 0) {
return td::Status::Error("No liteservers in global config");
}
for (auto& s : gc.liteservers_) {
td::IPAddress addr;
addr.init_host_port(td::IPAddress::ipv4_to_str(s->ip_), s->port_).ensure();
liteclient::ExtClient::LiteServer serv;
serv.address = addr;
serv.adnl_id = ton::adnl::AdnlNodeIdFull::create(s->id_).move_as_ok();
servers.push_back(std::move(serv));
}
for (auto& s : gc.liteservers_v2_) {
td::IPAddress addr;
addr.init_host_port(td::IPAddress::ipv4_to_str(s->ip_), s->port_).ensure();
liteclient::ExtClient::LiteServer serv;
serv.address = addr;
serv.adnl_id = ton::adnl::AdnlNodeIdFull::create(s->id_).move_as_ok();
serv.is_full = false;
for (auto& shard : s->shards_) {
serv.shards.emplace_back(shard->workchain_, (ton::ShardId)shard->shard_);
CHECK(serv.shards.back().is_valid_ext());
void run_clients() {
class Callback : public adnl::AdnlExtClient::Callback {
public:
explicit Callback(td::actor::ActorId<ProxyLiteserver> id, size_t idx) : id_(std::move(id)), idx_(idx) {
}
servers.push_back(std::move(serv));
void on_ready() override {
td::actor::send_closure(id_, &ProxyLiteserver::on_client_status, idx_, true);
}
void on_stop_ready() override {
td::actor::send_closure(id_, &ProxyLiteserver::on_client_status, idx_, false);
}
private:
td::actor::ActorId<ProxyLiteserver> id_;
size_t idx_;
};
for (size_t i = 0; i < servers_.size(); ++i) {
Server& server = servers_[i];
server.client = adnl::AdnlExtClient::create(server.config.adnl_id, server.config.addr,
std::make_unique<Callback>(actor_id(this), i));
server.alive = false;
}
class Callback : public liteclient::ExtClient::Callback {};
ext_client_ = liteclient::ExtClient::create(std::move(servers), td::make_unique<Callback>());
return td::Status::OK();
}
void on_client_status(size_t idx, bool ready) {
Server& server = servers_[idx];
if (server.alive == ready) {
return;
}
server.alive = ready;
LOG(WARNING) << (ready ? "Connected to" : "Disconnected from") << " server #" << idx << " ("
<< server.config.addr.get_ip_str() << ":" << server.config.addr.get_port() << ")";
}
void create_ext_server() {
adnl_ = adnl::Adnl::create("", keyring_.get());
td::actor::send_closure(adnl_, &adnl::Adnl::add_id, id_, ton::adnl::AdnlAddressList{}, (td::uint8)255);
td::actor::send_closure(adnl_, &adnl::Adnl::create_ext_server,
std::vector<adnl::AdnlNodeIdShort>{id_.compute_short_id()}, std::vector<td::uint16>{port_},
td::actor::send_closure(adnl_, &adnl::Adnl::add_id, id_, adnl::AdnlAddressList{}, (td::uint8)255);
class AdnlCallback : public adnl::Adnl::Callback {
public:
explicit AdnlCallback(td::actor::ActorId<ProxyLiteserver> id) : id_(id) {
}
void receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data) override {
}
void receive_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) override {
td::actor::send_closure(id_, &ProxyLiteserver::receive_query, std::move(data), std::move(promise));
}
private:
td::actor::ActorId<ProxyLiteserver> id_;
};
td::actor::send_closure(adnl_, &adnl::Adnl::subscribe, id_.compute_short_id(),
adnl::Adnl::int_to_bytestring(lite_api::liteServer_query::ID),
std::make_unique<AdnlCallback>(actor_id(this)));
td::actor::send_closure(adnl_, &adnl::Adnl::create_ext_server, std::vector{id_.compute_short_id()},
std::vector{port_},
[SelfId = actor_id(this)](td::Result<td::actor::ActorOwn<adnl::AdnlExtServer>> R) {
R.ensure();
td::actor::send_closure(SelfId, &ProxyLiteserver::created_ext_server, R.move_as_ok());
@ -158,93 +229,72 @@ class ProxyLiteserver : public td::actor::Actor {
void created_ext_server(td::actor::ActorOwn<adnl::AdnlExtServer> s) {
ext_server_ = std::move(s);
LOG(WARNING) << "Started proxy liteserver on port " << port_;
class AdnlCallback : public adnl::Adnl::Callback {
public:
AdnlCallback(td::actor::ActorId<liteclient::ExtClient> client) : client_(client) {
}
void receive_message(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data) override {
}
void receive_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) override {
td::actor::create_actor<QueryWorker>("worker", client_, std::move(data), std::move(promise)).release();
}
private:
td::actor::ActorId<liteclient::ExtClient> client_;
};
td::actor::send_closure(adnl_, &adnl::Adnl::subscribe, id_.compute_short_id(),
adnl::Adnl::int_to_bytestring(lite_api::liteServer_query::ID),
std::make_unique<AdnlCallback>(ext_client_.get()));
alarm();
}
class QueryWorker : public td::actor::Actor {
public:
QueryWorker(td::actor::ActorId<liteclient::ExtClient> client, td::BufferSlice data,
td::Promise<td::BufferSlice> promise)
: client_(std::move(client)), data_(std::move(data)), promise_(std::move(promise)) {
}
void start_up() override {
auto data = data_.clone();
auto F = fetch_tl_object<lite_api::liteServer_query>(data, true);
if (F.is_ok()) {
data = std::move(F.move_as_ok()->data_);
} else {
auto G = fetch_tl_prefix<lite_api::liteServer_queryPrefix>(data, true);
if (G.is_error()) {
fatal_error(G.move_as_error());
return;
}
td::Result<size_t> select_server(const liteclient::QueryInfo& query_info) {
size_t best_idx = servers_.size();
int cnt = 0;
for (size_t i = 0; i < servers_.size(); ++i) {
Server& server = servers_[i];
if (!server.alive || !server.config.accepts_query(query_info)) {
continue;
}
fetch_tl_prefix<lite_api::liteServer_waitMasterchainSeqno>(data, true).ignore();
auto F2 = fetch_tl_object<ton::lite_api::Function>(std::move(data), true);
if (F2.is_error()) {
fatal_error(F2.move_as_error());
++cnt;
if (td::Random::fast(1, cnt) == 1) {
best_idx = i;
}
}
if (best_idx == servers_.size()) {
return td::Status::Error(PSTRING() << "no liteserver for query " << query_info.to_str());
}
return best_idx;
}
void receive_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
liteclient::QueryInfo query_info = liteclient::get_query_info(data);
++ls_stats_[query_info.query_id];
promise = [promise = std::move(promise), query_info, timer = td::Timer()](td::Result<td::BufferSlice> R) mutable {
if (R.is_ok()) {
LOG(INFO) << "Query " << query_info.to_str() << ": OK, time=" << timer.elapsed()
<< ", response_size=" << R.ok().size();
promise.set_value(R.move_as_ok());
return;
}
auto query = F2.move_as_ok();
lite_api::downcast_call(*query, [&](auto& obj) { shard_ = liteclient::get_query_shard(obj); });
LOG(INFO) << "Query " << query_info.to_str() << ": " << R.error();
promise.set_value(create_serialize_tl_object<lite_api::liteServer_error>(
R.error().code(), "Gateway error: " + R.error().message().str()));
};
TRY_RESULT_PROMISE(promise, server_idx, select_server(query_info));
LOG(INFO) << "Got query: shard=" << shard_.to_str() << " size=" << data_.size();
td::actor::send_closure(client_, &liteclient::ExtClient::send_query, "q", std::move(data_), shard_,
td::Timestamp::in(8.0), [SelfId = actor_id(this)](td::Result<td::BufferSlice> R) {
td::actor::send_closure(SelfId, &QueryWorker::got_result, std::move(R));
});
}
Server& server = servers_[server_idx];
LOG(INFO) << "Sending query " << query_info.to_str() << ", size=" << data.size() << ", to server #" << server_idx
<< " (" << server.config.addr.get_ip_str() << ":" << server.config.addr.get_port() << ")";
td::actor::send_closure(server.client, &adnl::AdnlExtClient::send_query, "q", std::move(data),
td::Timestamp::in(8.0), std::move(promise));
}
void got_result(td::Result<td::BufferSlice> R) {
if (R.is_error()) {
LOG(INFO) << "Query to shard=" << shard_.to_str() << ": " << R.error();
promise_.set_value(create_serialize_tl_object<lite_api::liteServer_error>(
R.error().code(), "gateway error: " + R.error().message().str()));
} else {
td::BufferSlice response = R.move_as_ok();
LOG(INFO) << "Query to shard=" << shard_.to_str() << ": OK, size=" << response.size()
<< " time=" << timer_.elapsed();
promise_.set_value(std::move(response));
void alarm() override {
alarm_timestamp() = td::Timestamp::in(60.0);
if (!ls_stats_.empty()) {
td::StringBuilder sb;
sb << "Liteserver stats (1 minute):";
td::uint32 total = 0;
for (const auto& p : ls_stats_) {
sb << " " << lite_query_name_by_id(p.first) << ":" << p.second;
total += p.second;
}
stop();
sb << " TOTAL:" << total;
LOG(WARNING) << sb.as_cslice();
ls_stats_.clear();
}
void fatal_error(td::Status S) {
promise_.set_error(std::move(S));
stop();
}
private:
td::actor::ActorId<liteclient::ExtClient> client_;
td::BufferSlice data_;
td::Promise<td::BufferSlice> promise_;
td::Timer timer_ = {};
ShardIdFull shard_;
};
}
private:
std::string global_config_;
std::string db_root_;
td::uint16 port_;
PublicKeyHash public_key_hash_;
tl_object_ptr<ton_api::proxyLiteserver_config> config_ = create_tl_object<ton_api::proxyLiteserver_config>();
adnl::AdnlNodeIdFull id_;
@ -252,7 +302,15 @@ class ProxyLiteserver : public td::actor::Actor {
td::actor::ActorOwn<keyring::Keyring> keyring_;
td::actor::ActorOwn<adnl::Adnl> adnl_;
td::actor::ActorOwn<adnl::AdnlExtServer> ext_server_;
td::actor::ActorOwn<liteclient::ExtClient> ext_client_;
struct Server {
liteclient::LiteServerConfig config;
td::actor::ActorOwn<adnl::AdnlExtClient> client;
bool alive = false;
};
std::vector<Server> servers_;
std::map<int, td::uint32> ls_stats_; // lite_api ID -> count, 0 for unknown
std::string config_file() const {
return db_root_ + "/config.json";
@ -270,6 +328,7 @@ int main(int argc, char* argv[]) {
std::string global_config, db_root;
td::uint16 port = 0;
PublicKeyHash public_key_hash = PublicKeyHash::zero();
td::uint32 threads = 4;
td::OptionParser p;
@ -290,15 +349,27 @@ int main(int argc, char* argv[]) {
std::cout << sb.as_cslice().c_str();
std::exit(2);
});
p.add_checked_option('p', "port", "liteserver port (use only on first launch)", [&](td::Slice arg) -> td::Status {
TRY_RESULT_ASSIGN(port, td::to_integer_safe<td::uint16>(arg));
return td::Status::OK();
});
p.add_checked_option('p', "port", "liteserver port (required only on first launch)",
[&](td::Slice arg) -> td::Status {
TRY_RESULT_ASSIGN(port, td::to_integer_safe<td::uint16>(arg));
return td::Status::OK();
});
p.add_checked_option(
'A', "adnl-id",
"liteserver public key hash in hex (optional). The corresponding private key is required in <db>/keyring/",
[&](td::Slice arg) -> td::Status {
td::Bits256 value;
if (value.from_hex(arg) != 256) {
return td::Status::Error("invalid adnl-id");
}
public_key_hash = PublicKeyHash{value};
return td::Status::OK();
});
p.add_option('C', "global-config", "global TON configuration file",
[&](td::Slice arg) { global_config = arg.str(); });
p.add_option('D', "db", "db root", [&](td::Slice arg) { db_root = arg.str(); });
p.add_option('d', "daemonize", "set SIGHUP", [&]() {
td::set_signal_handler(td::SignalType::HangUp, [](int sig) {
td::set_signal_handler(td::SignalType::HangUp, [](int) {
#if TD_DARWIN || TD_LINUX
close(0);
setsid();
@ -318,8 +389,10 @@ int main(int argc, char* argv[]) {
p.run(argc, argv).ensure();
td::actor::Scheduler scheduler({threads});
scheduler.run_in_context(
[&] { td::actor::create_actor<ProxyLiteserver>("proxy-liteserver", global_config, db_root, port).release(); });
scheduler.run_in_context([&] {
td::actor::create_actor<ProxyLiteserver>("proxy-liteserver", global_config, db_root, port, public_key_hash)
.release();
});
while (scheduler.run(1)) {
}
}