1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Synchronize last mc seqno between servers in proxy-liteserver

This commit is contained in:
SpyCheese 2024-09-10 20:32:05 +03:00
parent 9d927e8cfb
commit 9b4a3a8263

View file

@ -43,8 +43,12 @@
#if TD_DARWIN || TD_LINUX #if TD_DARWIN || TD_LINUX
#include <unistd.h> #include <unistd.h>
#endif #endif
#include "td/utils/overloaded.h"
#include <iostream> #include <iostream>
#include <map> #include <map>
#include <auto/tl/lite_api.hpp>
#include "td/utils/tl_storers.h"
using namespace ton; using namespace ton;
@ -252,26 +256,93 @@ class ProxyLiteserver : public td::actor::Actor {
} }
void receive_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) { void receive_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
// Like in ValidatorManagerImpl::run_ext_query
auto F = fetch_tl_object<lite_api::liteServer_query>(data, true);
if (F.is_ok()) {
data = std::move(F.move_as_ok()->data_);
} else {
auto G = fetch_tl_prefix<lite_api::liteServer_queryPrefix>(data, true);
if (G.is_error()) {
promise.set_error(G.move_as_error());
return;
}
}
tl_object_ptr<lite_api::liteServer_waitMasterchainSeqno> wait_mc_seqno_obj;
auto E = fetch_tl_prefix<lite_api::liteServer_waitMasterchainSeqno>(data, true);
if (E.is_ok()) {
wait_mc_seqno_obj = E.move_as_ok();
}
liteclient::QueryInfo query_info = liteclient::get_query_info(data); liteclient::QueryInfo query_info = liteclient::get_query_info(data);
++ls_stats_[query_info.query_id]; ++ls_stats_[query_info.query_id];
promise = [promise = std::move(promise), query_info, timer = td::Timer()](td::Result<td::BufferSlice> R) mutable { promise = [promise = std::move(promise), query_info, timer = td::Timer(),
wait_mc_seqno =
(wait_mc_seqno_obj ? wait_mc_seqno_obj->seqno_ : 0)](td::Result<td::BufferSlice> R) mutable {
if (R.is_ok()) { if (R.is_ok()) {
LOG(INFO) << "Query " << query_info.to_str() << ": OK, time=" << timer.elapsed() LOG(INFO) << "Query " << query_info.to_str()
<< ", response_size=" << R.ok().size(); << (wait_mc_seqno ? PSTRING() << " (wait seqno " << wait_mc_seqno << ")" : "")
<< ": OK, time=" << timer.elapsed() << ", response_size=" << R.ok().size();
promise.set_value(R.move_as_ok()); promise.set_value(R.move_as_ok());
return; return;
} }
LOG(INFO) << "Query " << query_info.to_str() << ": " << R.error(); LOG(INFO) << "Query " << query_info.to_str()
<< (wait_mc_seqno ? PSTRING() << " (wait seqno " << wait_mc_seqno << ")" : "") << ": " << R.error();
promise.set_value(create_serialize_tl_object<lite_api::liteServer_error>( promise.set_value(create_serialize_tl_object<lite_api::liteServer_error>(
R.error().code(), "Gateway error: " + R.error().message().str())); R.error().code(), "Gateway error: " + R.error().message().str()));
}; };
TRY_RESULT_PROMISE(promise, server_idx, select_server(query_info));
TRY_RESULT_PROMISE(promise, server_idx, select_server(query_info));
Server& server = servers_[server_idx]; Server& server = servers_[server_idx];
LOG(INFO) << "Sending query " << query_info.to_str() << ", size=" << data.size() << ", to server #" << server_idx LOG(INFO) << "Sending query " << query_info.to_str()
<< " (" << server.config.addr.get_ip_str() << ":" << server.config.addr.get_port() << ")"; << (wait_mc_seqno_obj ? PSTRING() << " (wait seqno " << wait_mc_seqno_obj->seqno_ << ")" : "")
<< ", size=" << data.size() << ", to server #" << server_idx << " (" << server.config.addr.get_ip_str()
<< ":" << server.config.addr.get_port() << ")";
BlockSeqno wait_mc_seqno = wait_mc_seqno_obj ? wait_mc_seqno_obj->seqno_ : 0;
wait_mc_seqno = std::max(wait_mc_seqno, last_known_masterchain_seqno_);
if (server.last_known_masterchain_seqno < wait_mc_seqno) {
int timeout_ms = wait_mc_seqno_obj ? wait_mc_seqno_obj->timeout_ms_ : 8000;
data = serialize_tl_object(create_tl_object<lite_api::liteServer_waitMasterchainSeqno>(wait_mc_seqno, timeout_ms),
true, std::move(data));
}
data = create_serialize_tl_object<lite_api::liteServer_query>(std::move(data));
td::actor::send_closure(server.client, &adnl::AdnlExtClient::send_query, "q", std::move(data), td::actor::send_closure(server.client, &adnl::AdnlExtClient::send_query, "q", std::move(data),
td::Timestamp::in(8.0), std::move(promise)); td::Timestamp::in(8.0),
[SelfId = actor_id(this), promise = std::move(promise), server_idx,
wait_mc_seqno](td::Result<td::BufferSlice> R) mutable {
if (R.is_ok()) {
td::actor::send_closure(SelfId, &ProxyLiteserver::process_query_response,
R.ok().clone(), server_idx, wait_mc_seqno);
}
promise.set_result(std::move(R));
});
}
void process_query_response(td::BufferSlice data, size_t server_idx, BlockSeqno wait_mc_seqno) {
auto F = fetch_tl_object<lite_api::Object>(data, true);
if (F.is_error() || F.ok()->get_id() == lite_api::liteServer_error::ID) {
return;
}
BlockSeqno new_seqno = wait_mc_seqno;
lite_api::downcast_call(*F.ok_ref(), td::overloaded(
[&](lite_api::liteServer_masterchainInfo& f) {
new_seqno = std::max<BlockSeqno>(new_seqno, f.last_->seqno_);
},
[&](lite_api::liteServer_masterchainInfoExt& f) {
new_seqno = std::max<BlockSeqno>(new_seqno, f.last_->seqno_);
},
[&](lite_api::liteServer_accountState& f) {
if (f.id_->workchain_ == masterchainId) {
new_seqno = std::max<BlockSeqno>(new_seqno, f.id_->seqno_);
}
},
[&](auto& obj) {}));
servers_[server_idx].last_known_masterchain_seqno =
std::max(servers_[server_idx].last_known_masterchain_seqno, new_seqno);
if (new_seqno > last_known_masterchain_seqno_) {
last_known_masterchain_seqno_ = new_seqno;
LOG(INFO) << "Last known masterchain seqno = " << new_seqno;
}
} }
void alarm() override { void alarm() override {
@ -307,11 +378,15 @@ class ProxyLiteserver : public td::actor::Actor {
liteclient::LiteServerConfig config; liteclient::LiteServerConfig config;
td::actor::ActorOwn<adnl::AdnlExtClient> client; td::actor::ActorOwn<adnl::AdnlExtClient> client;
bool alive = false; bool alive = false;
BlockSeqno last_known_masterchain_seqno = 0;
}; };
std::vector<Server> servers_; std::vector<Server> servers_;
std::map<int, td::uint32> ls_stats_; // lite_api ID -> count, 0 for unknown std::map<int, td::uint32> ls_stats_; // lite_api ID -> count, 0 for unknown
BlockSeqno last_known_masterchain_seqno_ = 0;
tl_object_ptr<lite_api::liteServer_masterchainInfoExt> last_masterchain_info_;
std::string config_file() const { std::string config_file() const {
return db_root_ + "/config.json"; return db_root_ + "/config.json";
} }