mirror of
https://github.com/ton-blockchain/ton
synced 2025-02-12 11:12:16 +00:00
Tonproxy improvements (#483)
* Bugfixes in rldp-http-proxy and http parser * Tonlib: change liteservers on query timeout or connection close * Increase maximum size of http request * Minor bugfixes in http
This commit is contained in:
parent
cc9ce0eb28
commit
caffdbb5ba
23 changed files with 158 additions and 77 deletions
|
@ -80,6 +80,9 @@ class AdnlExtClientImpl : public AdnlExtClient {
|
||||||
if (!conn_.empty() && conn_.get() == conn) {
|
if (!conn_.empty() && conn_.get() == conn) {
|
||||||
callback_->on_stop_ready();
|
callback_->on_stop_ready();
|
||||||
conn_ = {};
|
conn_ = {};
|
||||||
|
for (auto& q : out_queries_) {
|
||||||
|
td::actor::send_closure(q.second, &AdnlQuery::set_error, td::Status::Error(ErrorCode::cancelled));
|
||||||
|
}
|
||||||
alarm_timestamp() = next_create_at_;
|
alarm_timestamp() = next_create_at_;
|
||||||
try_stop();
|
try_stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,13 +25,16 @@ namespace ton {
|
||||||
namespace adnl {
|
namespace adnl {
|
||||||
|
|
||||||
void AdnlQuery::alarm() {
|
void AdnlQuery::alarm() {
|
||||||
promise_.set_error(td::Status::Error(ErrorCode::timeout, "adnl query timeout"));
|
set_error(td::Status::Error(ErrorCode::timeout, "adnl query timeout"));
|
||||||
stop();
|
|
||||||
}
|
}
|
||||||
void AdnlQuery::result(td::BufferSlice data) {
|
void AdnlQuery::result(td::BufferSlice data) {
|
||||||
promise_.set_value(std::move(data));
|
promise_.set_value(std::move(data));
|
||||||
stop();
|
stop();
|
||||||
}
|
}
|
||||||
|
void AdnlQuery::set_error(td::Status error) {
|
||||||
|
promise_.set_error(std::move(error));
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
|
||||||
AdnlQueryId AdnlQuery::random_query_id() {
|
AdnlQueryId AdnlQuery::random_query_id() {
|
||||||
AdnlQueryId q_id;
|
AdnlQueryId q_id;
|
||||||
|
|
|
@ -48,6 +48,7 @@ class AdnlQuery : public td::actor::Actor {
|
||||||
}
|
}
|
||||||
void alarm() override;
|
void alarm() override;
|
||||||
void result(td::BufferSlice data);
|
void result(td::BufferSlice data);
|
||||||
|
void set_error(td::Status error);
|
||||||
void start_up() override {
|
void start_up() override {
|
||||||
alarm_timestamp() = timeout_;
|
alarm_timestamp() = timeout_;
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,10 +79,10 @@ td::Status HttpInboundConnection::receive(td::ChainBufferReader &input) {
|
||||||
send_client_error();
|
send_client_error();
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
}
|
}
|
||||||
|
cur_request_ = R.move_as_ok();
|
||||||
if (exit_loop) {
|
if (exit_loop) {
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
}
|
}
|
||||||
cur_request_ = R.move_as_ok();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto payload = cur_request_->create_empty_payload().move_as_ok();
|
auto payload = cur_request_->create_empty_payload().move_as_ok();
|
||||||
|
|
|
@ -42,10 +42,10 @@ td::Status HttpOutboundConnection::receive(td::ChainBufferReader &input) {
|
||||||
answer_error(HttpStatusCode::status_bad_request, "", std::move(promise_));
|
answer_error(HttpStatusCode::status_bad_request, "", std::move(promise_));
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
}
|
}
|
||||||
|
cur_response_ = R.move_as_ok();
|
||||||
if (exit_loop) {
|
if (exit_loop) {
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
}
|
}
|
||||||
cur_response_ = R.move_as_ok();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cur_response_->code() == 100) {
|
if (cur_response_->code() == 100) {
|
||||||
|
|
|
@ -279,25 +279,20 @@ td::Status HttpPayload::parse(td::ChainBufferReader &input) {
|
||||||
} break;
|
} break;
|
||||||
case ParseState::reading_chunk_data: {
|
case ParseState::reading_chunk_data: {
|
||||||
if (cur_chunk_size_ == 0) {
|
if (cur_chunk_size_ == 0) {
|
||||||
switch (type_) {
|
if (type_ == PayloadType::pt_eof || type_ == PayloadType::pt_tunnel) {
|
||||||
case PayloadType::pt_empty:
|
|
||||||
UNREACHABLE();
|
|
||||||
case PayloadType::pt_eof:
|
|
||||||
case PayloadType::pt_tunnel:
|
|
||||||
cur_chunk_size_ = 1LL << 60;
|
cur_chunk_size_ = 1LL << 60;
|
||||||
break;
|
} else if (type_ == PayloadType::pt_chunked) {
|
||||||
case PayloadType::pt_chunked:
|
|
||||||
state_ = ParseState::reading_crlf;
|
state_ = ParseState::reading_crlf;
|
||||||
break;
|
break;
|
||||||
case PayloadType::pt_content_length: {
|
} else if (type_ == PayloadType::pt_content_length) {
|
||||||
LOG(INFO) << "payload parse success";
|
LOG(INFO) << "payload parse success";
|
||||||
const std::lock_guard<std::mutex> lock{mutex_};
|
const std::lock_guard<std::mutex> lock{mutex_};
|
||||||
state_ = ParseState::completed;
|
state_ = ParseState::completed;
|
||||||
run_callbacks();
|
run_callbacks();
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
} break;
|
} else {
|
||||||
|
UNREACHABLE();
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
if (input.size() == 0) {
|
if (input.size() == 0) {
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
|
@ -502,7 +497,7 @@ bool HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt
|
||||||
char buf[64];
|
char buf[64];
|
||||||
::sprintf(buf, "%lx\r\n", s.size());
|
::sprintf(buf, "%lx\r\n", s.size());
|
||||||
auto slice = td::Slice(buf, strlen(buf));
|
auto slice = td::Slice(buf, strlen(buf));
|
||||||
wrote |= !slice.empty();
|
wrote = true;
|
||||||
output.append(slice);
|
output.append(slice);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -514,7 +509,8 @@ bool HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt
|
||||||
wrote = true;
|
wrote = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (chunks_.size() != 0 || !parse_completed()) {
|
auto cur_state = state_.load(std::memory_order_consume);
|
||||||
|
if (chunks_.size() != 0 || (cur_state != ParseState::reading_trailer && cur_state != ParseState::completed)) {
|
||||||
return wrote;
|
return wrote;
|
||||||
}
|
}
|
||||||
if (!written_zero_chunk_) {
|
if (!written_zero_chunk_) {
|
||||||
|
@ -531,7 +527,7 @@ bool HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt
|
||||||
}
|
}
|
||||||
|
|
||||||
while (max_size > 0) {
|
while (max_size > 0) {
|
||||||
auto cur_state = state_.load(std::memory_order_consume);
|
cur_state = state_.load(std::memory_order_consume);
|
||||||
HttpHeader h = get_header();
|
HttpHeader h = get_header();
|
||||||
if (h.empty()) {
|
if (h.empty()) {
|
||||||
if (cur_state != ParseState::completed) {
|
if (cur_state != ParseState::completed) {
|
||||||
|
@ -587,7 +583,8 @@ tl_object_ptr<ton_api::http_payloadPart> HttpPayload::store_tl(size_t max_size)
|
||||||
max_size -= s.size();
|
max_size -= s.size();
|
||||||
}
|
}
|
||||||
obj->data_.truncate(obj->data_.size() - S.size());
|
obj->data_.truncate(obj->data_.size() - S.size());
|
||||||
if (chunks_.size() != 0) {
|
auto cur_state = state_.load(std::memory_order_consume);
|
||||||
|
if (chunks_.size() != 0 || (cur_state != ParseState::reading_trailer && cur_state != ParseState::completed)) {
|
||||||
return obj;
|
return obj;
|
||||||
}
|
}
|
||||||
if (!written_zero_chunk_) {
|
if (!written_zero_chunk_) {
|
||||||
|
@ -597,7 +594,7 @@ tl_object_ptr<ton_api::http_payloadPart> HttpPayload::store_tl(size_t max_size)
|
||||||
LOG(INFO) << "data completed";
|
LOG(INFO) << "data completed";
|
||||||
|
|
||||||
while (max_size > 0) {
|
while (max_size > 0) {
|
||||||
auto cur_state = state_.load(std::memory_order_consume);
|
cur_state = state_.load(std::memory_order_consume);
|
||||||
HttpHeader h = get_header();
|
HttpHeader h = get_header();
|
||||||
if (h.empty()) {
|
if (h.empty()) {
|
||||||
if (cur_state != ParseState::completed) {
|
if (cur_state != ParseState::completed) {
|
||||||
|
@ -869,7 +866,7 @@ td::Status HttpHeader::basic_check() {
|
||||||
}
|
}
|
||||||
for (auto &c : value) {
|
for (auto &c : value) {
|
||||||
if (c == '\r' || c == '\n') {
|
if (c == '\r' || c == '\n') {
|
||||||
return td::Status::Error("bad character in header name");
|
return td::Status::Error("bad character in header value");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return td::Status::OK();
|
return td::Status::OK();
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
*/
|
*/
|
||||||
#include "DNSResolver.h"
|
#include "DNSResolver.h"
|
||||||
#include "td/utils/overloaded.h"
|
#include "td/utils/overloaded.h"
|
||||||
|
#include "common/delay.h"
|
||||||
|
|
||||||
static const double CACHE_TIMEOUT_HARD = 300.0;
|
static const double CACHE_TIMEOUT_HARD = 300.0;
|
||||||
static const double CACHE_TIMEOUT_SOFT = 270.0;
|
static const double CACHE_TIMEOUT_SOFT = 270.0;
|
||||||
|
@ -33,8 +34,18 @@ DNSResolver::DNSResolver(td::actor::ActorId<TonlibClient> tonlib_client) : tonli
|
||||||
}
|
}
|
||||||
|
|
||||||
void DNSResolver::start_up() {
|
void DNSResolver::start_up() {
|
||||||
|
sync();
|
||||||
|
}
|
||||||
|
|
||||||
|
void DNSResolver::sync() {
|
||||||
auto obj = tonlib_api::make_object<tonlib_api::sync>();
|
auto obj = tonlib_api::make_object<tonlib_api::sync>();
|
||||||
auto P = td::PromiseCreator::lambda([](td::Result<tonlib_api::object_ptr<tonlib_api::Object>>) {});
|
auto P = td::PromiseCreator::lambda([SelfId =
|
||||||
|
actor_id(this)](td::Result<tonlib_api::object_ptr<tonlib_api::Object>> R) {
|
||||||
|
if (R.is_error()) {
|
||||||
|
LOG(WARNING) << "Sync error: " << R.move_as_error();
|
||||||
|
ton::delay_action([SelfId]() { td::actor::send_closure(SelfId, &DNSResolver::sync); }, td::Timestamp::in(5.0));
|
||||||
|
}
|
||||||
|
});
|
||||||
td::actor::send_closure(tonlib_client_, &TonlibClient::send_request, std::move(obj), std::move(P));
|
td::actor::send_closure(tonlib_client_, &TonlibClient::send_request, std::move(obj), std::move(P));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ class DNSResolver : public td::actor::Actor {
|
||||||
void resolve(std::string host, td::Promise<ton::adnl::AdnlNodeIdShort> promise);
|
void resolve(std::string host, td::Promise<ton::adnl::AdnlNodeIdShort> promise);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void sync();
|
||||||
void save_to_cache(std::string host, ton::adnl::AdnlNodeIdShort id);
|
void save_to_cache(std::string host, ton::adnl::AdnlNodeIdShort id);
|
||||||
|
|
||||||
td::actor::ActorId<TonlibClient> tonlib_client_;
|
td::actor::ActorId<TonlibClient> tonlib_client_;
|
||||||
|
|
|
@ -117,7 +117,7 @@ class HttpRemote : public td::actor::Actor {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
td::actor::send_closure(client_, &ton::http::HttpClient::send_request, std::move(request), std::move(payload),
|
td::actor::send_closure(client_, &ton::http::HttpClient::send_request, std::move(request), std::move(payload),
|
||||||
td::Timestamp::in(30.0), std::move(P));
|
td::Timestamp::never(), std::move(P));
|
||||||
} else {
|
} else {
|
||||||
ton::http::answer_error(ton::http::HttpStatusCode::status_bad_request, "", std::move(promise));
|
ton::http::answer_error(ton::http::HttpStatusCode::status_bad_request, "", std::move(promise));
|
||||||
}
|
}
|
||||||
|
@ -801,6 +801,7 @@ class RldpToTcpRequestSender : public td::actor::Actor {
|
||||||
, dst_(dst)
|
, dst_(dst)
|
||||||
, request_(std::move(request))
|
, request_(std::move(request))
|
||||||
, request_payload_(std::move(request_payload))
|
, request_payload_(std::move(request_payload))
|
||||||
|
, proto_version_(request_->proto_version())
|
||||||
, promise_(std::move(promise))
|
, promise_(std::move(promise))
|
||||||
, adnl_(adnl)
|
, adnl_(adnl)
|
||||||
, rldp_(rldp)
|
, rldp_(rldp)
|
||||||
|
@ -824,11 +825,9 @@ class RldpToTcpRequestSender : public td::actor::Actor {
|
||||||
}
|
}
|
||||||
|
|
||||||
void got_result(std::pair<std::unique_ptr<ton::http::HttpResponse>, std::shared_ptr<ton::http::HttpPayload>> R) {
|
void got_result(std::pair<std::unique_ptr<ton::http::HttpResponse>, std::shared_ptr<ton::http::HttpPayload>> R) {
|
||||||
if (R.first->need_payload()) {
|
|
||||||
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender(R)", std::move(R.second), id_, local_id_, adnl_,
|
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender(R)", std::move(R.second), id_, local_id_, adnl_,
|
||||||
rldp_)
|
rldp_)
|
||||||
.release();
|
.release();
|
||||||
}
|
|
||||||
auto f = ton::serialize_tl_object(R.first->store_tl(), true);
|
auto f = ton::serialize_tl_object(R.first->store_tl(), true);
|
||||||
promise_.set_value(std::move(f));
|
promise_.set_value(std::move(f));
|
||||||
stop();
|
stop();
|
||||||
|
@ -836,7 +835,7 @@ class RldpToTcpRequestSender : public td::actor::Actor {
|
||||||
|
|
||||||
void abort_query(td::Status error) {
|
void abort_query(td::Status error) {
|
||||||
LOG(INFO) << "aborting http over rldp query: " << error;
|
LOG(INFO) << "aborting http over rldp query: " << error;
|
||||||
promise_.set_result(create_error_response(request_->proto_version(), 502, "Bad Gateway"));
|
promise_.set_result(create_error_response(proto_version_, 502, "Bad Gateway"));
|
||||||
stop();
|
stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -848,6 +847,7 @@ class RldpToTcpRequestSender : public td::actor::Actor {
|
||||||
|
|
||||||
std::unique_ptr<ton::http::HttpRequest> request_;
|
std::unique_ptr<ton::http::HttpRequest> request_;
|
||||||
std::shared_ptr<ton::http::HttpPayload> request_payload_;
|
std::shared_ptr<ton::http::HttpPayload> request_payload_;
|
||||||
|
std::string proto_version_;
|
||||||
|
|
||||||
td::Promise<td::BufferSlice> promise_;
|
td::Promise<td::BufferSlice> promise_;
|
||||||
|
|
||||||
|
@ -1090,6 +1090,7 @@ class RldpHttpProxy : public td::actor::Actor {
|
||||||
}
|
}
|
||||||
|
|
||||||
rldp_ = ton::rldp::Rldp::create(adnl_.get());
|
rldp_ = ton::rldp::Rldp::create(adnl_.get());
|
||||||
|
td::actor::send_closure(rldp_, &ton::rldp::Rldp::set_default_mtu, 16 << 10);
|
||||||
td::actor::send_closure(rldp_, &ton::rldp::Rldp::add_id, local_id_);
|
td::actor::send_closure(rldp_, &ton::rldp::Rldp::add_id, local_id_);
|
||||||
for (auto &serv_id : server_ids_) {
|
for (auto &serv_id : server_ids_) {
|
||||||
td::actor::send_closure(rldp_, &ton::rldp::Rldp::add_id, serv_id);
|
td::actor::send_closure(rldp_, &ton::rldp::Rldp::add_id, serv_id);
|
||||||
|
|
|
@ -71,7 +71,7 @@ class RldpIn : public RldpImpl {
|
||||||
|
|
||||||
void send_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, std::string name,
|
void send_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, std::string name,
|
||||||
td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice data) override {
|
td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice data) override {
|
||||||
send_query_ex(src, dst, name, std::move(promise), timeout, std::move(data), default_mtu());
|
send_query_ex(src, dst, name, std::move(promise), timeout, std::move(data), default_mtu_);
|
||||||
}
|
}
|
||||||
void send_query_ex(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, std::string name,
|
void send_query_ex(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, std::string name,
|
||||||
td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice data,
|
td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice data,
|
||||||
|
@ -101,6 +101,10 @@ class RldpIn : public RldpImpl {
|
||||||
void add_id(adnl::AdnlNodeIdShort local_id) override;
|
void add_id(adnl::AdnlNodeIdShort local_id) override;
|
||||||
void get_conn_ip_str(adnl::AdnlNodeIdShort l_id, adnl::AdnlNodeIdShort p_id, td::Promise<td::string> promise) override;
|
void get_conn_ip_str(adnl::AdnlNodeIdShort l_id, adnl::AdnlNodeIdShort p_id, td::Promise<td::string> promise) override;
|
||||||
|
|
||||||
|
void set_default_mtu(td::uint64 mtu) override {
|
||||||
|
default_mtu_ = mtu;
|
||||||
|
}
|
||||||
|
|
||||||
RldpIn(td::actor::ActorId<adnl::AdnlPeerTable> adnl) : adnl_(adnl) {
|
RldpIn(td::actor::ActorId<adnl::AdnlPeerTable> adnl) : adnl_(adnl) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,6 +120,7 @@ class RldpIn : public RldpImpl {
|
||||||
std::set<TransferId> lru_set_;
|
std::set<TransferId> lru_set_;
|
||||||
RldpLru lru_;
|
RldpLru lru_;
|
||||||
td::uint32 lru_size_ = 0;
|
td::uint32 lru_size_ = 0;
|
||||||
|
td::uint64 default_mtu_ = adnl::Adnl::get_mtu();
|
||||||
|
|
||||||
std::map<TransferId, td::uint64> max_size_;
|
std::map<TransferId, td::uint64> max_size_;
|
||||||
|
|
||||||
|
|
|
@ -116,9 +116,9 @@ void RldpIn::process_message_part(adnl::AdnlNodeIdShort source, adnl::AdnlNodeId
|
||||||
}
|
}
|
||||||
auto ite = max_size_.find(part.transfer_id_);
|
auto ite = max_size_.find(part.transfer_id_);
|
||||||
if (ite == max_size_.end()) {
|
if (ite == max_size_.end()) {
|
||||||
if (static_cast<td::uint64>(part.total_size_) > default_mtu()) {
|
if (static_cast<td::uint64>(part.total_size_) > default_mtu_) {
|
||||||
VLOG(RLDP_NOTICE) << "dropping too big rldp packet of size=" << part.total_size_
|
VLOG(RLDP_NOTICE) << "dropping too big rldp packet of size=" << part.total_size_
|
||||||
<< " default_mtu=" << default_mtu();
|
<< " default_mtu=" << default_mtu_;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -28,15 +28,13 @@ class Rldp : public adnl::AdnlSenderInterface {
|
||||||
public:
|
public:
|
||||||
virtual ~Rldp() = default;
|
virtual ~Rldp() = default;
|
||||||
|
|
||||||
static constexpr td::uint64 default_mtu() {
|
|
||||||
return adnl::Adnl::get_mtu();
|
|
||||||
}
|
|
||||||
|
|
||||||
virtual void add_id(adnl::AdnlNodeIdShort local_id) = 0;
|
virtual void add_id(adnl::AdnlNodeIdShort local_id) = 0;
|
||||||
|
|
||||||
virtual void send_message_ex(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::Timestamp timeout,
|
virtual void send_message_ex(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::Timestamp timeout,
|
||||||
td::BufferSlice data) = 0;
|
td::BufferSlice data) = 0;
|
||||||
|
|
||||||
|
virtual void set_default_mtu(td::uint64 mtu) = 0;
|
||||||
|
|
||||||
static td::actor::ActorOwn<Rldp> create(td::actor::ActorId<adnl::Adnl> adnl);
|
static td::actor::ActorOwn<Rldp> create(td::actor::ActorId<adnl::Adnl> adnl);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ void ExtClient::with_last_config(td::Promise<LastConfigState> promise) {
|
||||||
self->last_config_queries_.extract(query_id).set_result(std::move(result));
|
self->last_config_queries_.extract(query_id).set_result(std::move(result));
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
if (client_.last_block_actor_.empty()) {
|
if (client_.last_config_actor_.empty()) {
|
||||||
return P.set_error(TonlibError::NoLiteServers());
|
return P.set_error(TonlibError::NoLiteServers());
|
||||||
}
|
}
|
||||||
td::actor::send_closure(client_.last_config_actor_, &LastConfig::get_last_config, std::move(P));
|
td::actor::send_closure(client_.last_config_actor_, &LastConfig::get_last_config, std::move(P));
|
||||||
|
@ -62,10 +62,10 @@ void ExtClient::send_raw_query(td::BufferSlice query, td::Promise<td::BufferSlic
|
||||||
self->queries_.extract(query_id).set_result(std::move(result));
|
self->queries_.extract(query_id).set_result(std::move(result));
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
if (client_.andl_ext_client_.empty()) {
|
if (client_.adnl_ext_client_.empty()) {
|
||||||
return P.set_error(TonlibError::NoLiteServers());
|
return P.set_error(TonlibError::NoLiteServers());
|
||||||
}
|
}
|
||||||
td::actor::send_closure(client_.andl_ext_client_, &ton::adnl::AdnlExtClient::send_query, "query", std::move(query),
|
td::actor::send_closure(client_.adnl_ext_client_, &ton::adnl::AdnlExtClient::send_query, "query", std::move(query),
|
||||||
td::Timestamp::in(10.0), std::move(P));
|
td::Timestamp::in(10.0), std::move(P));
|
||||||
}
|
}
|
||||||
} // namespace tonlib
|
} // namespace tonlib
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
#include "td/utils/Container.h"
|
#include "td/utils/Container.h"
|
||||||
#include "td/utils/Random.h"
|
#include "td/utils/Random.h"
|
||||||
|
|
||||||
|
#include "ExtClientLazy.h"
|
||||||
#include "TonlibError.h"
|
#include "TonlibError.h"
|
||||||
#include "utils.h"
|
#include "utils.h"
|
||||||
|
|
||||||
|
@ -37,7 +38,7 @@ class LastConfig;
|
||||||
struct LastBlockState;
|
struct LastBlockState;
|
||||||
struct LastConfigState;
|
struct LastConfigState;
|
||||||
struct ExtClientRef {
|
struct ExtClientRef {
|
||||||
td::actor::ActorId<ton::adnl::AdnlExtClient> andl_ext_client_;
|
td::actor::ActorId<ExtClientLazy> adnl_ext_client_;
|
||||||
td::actor::ActorId<LastBlock> last_block_actor_;
|
td::actor::ActorId<LastBlock> last_block_actor_;
|
||||||
td::actor::ActorId<LastConfig> last_config_actor_;
|
td::actor::ActorId<LastConfig> last_config_actor_;
|
||||||
};
|
};
|
||||||
|
@ -94,6 +95,12 @@ class ExtClient {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void force_change_liteserver() {
|
||||||
|
if (!client_.adnl_ext_client_.empty()) {
|
||||||
|
td::actor::send_closure(client_.adnl_ext_client_, &ExtClientLazy::force_change_liteserver);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ExtClientRef client_;
|
ExtClientRef client_;
|
||||||
td::Container<td::Promise<td::BufferSlice>> queries_;
|
td::Container<td::Promise<td::BufferSlice>> queries_;
|
||||||
|
|
|
@ -18,13 +18,20 @@
|
||||||
*/
|
*/
|
||||||
#include "ExtClientLazy.h"
|
#include "ExtClientLazy.h"
|
||||||
#include "TonlibError.h"
|
#include "TonlibError.h"
|
||||||
|
#include "td/utils/Random.h"
|
||||||
namespace tonlib {
|
namespace tonlib {
|
||||||
|
|
||||||
class ExtClientLazyImp : public ton::adnl::AdnlExtClient {
|
class ExtClientLazyImp : public ExtClientLazy {
|
||||||
public:
|
public:
|
||||||
ExtClientLazyImp(ton::adnl::AdnlNodeIdFull dst, td::IPAddress dst_addr,
|
ExtClientLazyImp(std::vector<std::pair<ton::adnl::AdnlNodeIdFull, td::IPAddress>> servers,
|
||||||
td::unique_ptr<ExtClientLazy::Callback> callback)
|
td::unique_ptr<ExtClientLazy::Callback> callback)
|
||||||
: dst_(std::move(dst)), dst_addr_(std::move(dst_addr)), callback_(std::move(callback)) {
|
: servers_(std::move(servers)), callback_(std::move(callback)) {
|
||||||
|
CHECK(!servers_.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
void start_up() override {
|
||||||
|
td::Random::Fast rnd;
|
||||||
|
td::random_shuffle(td::as_mutable_span(servers_), rnd);
|
||||||
}
|
}
|
||||||
|
|
||||||
void check_ready(td::Promise<td::Unit> promise) override {
|
void check_ready(td::Promise<td::Unit> promise) override {
|
||||||
|
@ -41,37 +48,66 @@ class ExtClientLazyImp : public ton::adnl::AdnlExtClient {
|
||||||
if (client_.empty()) {
|
if (client_.empty()) {
|
||||||
return promise.set_error(TonlibError::Cancelled());
|
return promise.set_error(TonlibError::Cancelled());
|
||||||
}
|
}
|
||||||
|
td::Promise<td::BufferSlice> P = [SelfId = actor_id(this), idx = cur_server_idx_,
|
||||||
|
promise = std::move(promise)](td::Result<td::BufferSlice> R) mutable {
|
||||||
|
if (R.is_error() &&
|
||||||
|
(R.error().code() == ton::ErrorCode::timeout || R.error().code() == ton::ErrorCode::cancelled)) {
|
||||||
|
td::actor::send_closure(SelfId, &ExtClientLazyImp::set_server_bad, idx, true);
|
||||||
|
}
|
||||||
|
promise.set_result(std::move(R));
|
||||||
|
};
|
||||||
send_closure(client_, &ton::adnl::AdnlExtClient::send_query, std::move(name), std::move(data), timeout,
|
send_closure(client_, &ton::adnl::AdnlExtClient::send_query, std::move(name), std::move(data), timeout,
|
||||||
std::move(promise));
|
std::move(P));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void force_change_liteserver() override {
|
||||||
|
if (servers_.size() == 1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
cur_server_bad_ = cur_server_bad_force_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
void before_query() {
|
void before_query() {
|
||||||
if (is_closing_) {
|
if (is_closing_) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!client_.empty()) {
|
|
||||||
alarm_timestamp() = td::Timestamp::in(MAX_NO_QUERIES_TIMEOUT);
|
alarm_timestamp() = td::Timestamp::in(MAX_NO_QUERIES_TIMEOUT);
|
||||||
|
if (cur_server_bad_) {
|
||||||
|
++cur_server_idx_;
|
||||||
|
} else if (!client_.empty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
class Callback : public ton::adnl::AdnlExtClient::Callback {
|
class Callback : public ton::adnl::AdnlExtClient::Callback {
|
||||||
public:
|
public:
|
||||||
explicit Callback(td::actor::ActorShared<> parent) : parent_(std::move(parent)) {
|
explicit Callback(td::actor::ActorShared<ExtClientLazyImp> parent, size_t idx)
|
||||||
|
: parent_(std::move(parent)), idx_(idx) {
|
||||||
}
|
}
|
||||||
void on_ready() override {
|
void on_ready() override {
|
||||||
|
td::actor::send_closure(parent_, &ExtClientLazyImp::set_server_bad, idx_, false);
|
||||||
}
|
}
|
||||||
void on_stop_ready() override {
|
void on_stop_ready() override {
|
||||||
|
td::actor::send_closure(parent_, &ExtClientLazyImp::set_server_bad, idx_, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
td::actor::ActorShared<> parent_;
|
td::actor::ActorShared<ExtClientLazyImp> parent_;
|
||||||
|
size_t idx_;
|
||||||
};
|
};
|
||||||
ref_cnt_++;
|
ref_cnt_++;
|
||||||
client_ = ton::adnl::AdnlExtClient::create(dst_, dst_addr_, std::make_unique<Callback>(td::actor::actor_shared()));
|
cur_server_bad_ = false;
|
||||||
|
cur_server_bad_force_ = false;
|
||||||
|
const auto& s = servers_[cur_server_idx_ % servers_.size()];
|
||||||
|
LOG(INFO) << "Connecting to liteserver " << s.second;
|
||||||
|
client_ = ton::adnl::AdnlExtClient::create(
|
||||||
|
s.first, s.second, std::make_unique<Callback>(td::actor::actor_shared(this), cur_server_idx_));
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
std::vector<std::pair<ton::adnl::AdnlNodeIdFull, td::IPAddress>> servers_;
|
||||||
ton::adnl::AdnlNodeIdFull dst_;
|
size_t cur_server_idx_ = 0;
|
||||||
td::IPAddress dst_addr_;
|
bool cur_server_bad_ = false;
|
||||||
|
bool cur_server_bad_force_ = false;
|
||||||
|
|
||||||
td::actor::ActorOwn<ton::adnl::AdnlExtClient> client_;
|
td::actor::ActorOwn<ton::adnl::AdnlExtClient> client_;
|
||||||
td::unique_ptr<ExtClientLazy::Callback> callback_;
|
td::unique_ptr<ExtClientLazy::Callback> callback_;
|
||||||
static constexpr double MAX_NO_QUERIES_TIMEOUT = 100;
|
static constexpr double MAX_NO_QUERIES_TIMEOUT = 100;
|
||||||
|
@ -79,6 +115,11 @@ class ExtClientLazyImp : public ton::adnl::AdnlExtClient {
|
||||||
bool is_closing_{false};
|
bool is_closing_{false};
|
||||||
td::uint32 ref_cnt_{1};
|
td::uint32 ref_cnt_{1};
|
||||||
|
|
||||||
|
void set_server_bad(size_t idx, bool bad) {
|
||||||
|
if (idx == cur_server_idx_ && servers_.size() > 1 && !cur_server_bad_force_) {
|
||||||
|
cur_server_bad_ = bad;
|
||||||
|
}
|
||||||
|
}
|
||||||
void alarm() override {
|
void alarm() override {
|
||||||
client_.reset();
|
client_.reset();
|
||||||
}
|
}
|
||||||
|
@ -99,9 +140,13 @@ class ExtClientLazyImp : public ton::adnl::AdnlExtClient {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
td::actor::ActorOwn<ton::adnl::AdnlExtClient> ExtClientLazy::create(ton::adnl::AdnlNodeIdFull dst,
|
td::actor::ActorOwn<ExtClientLazy> ExtClientLazy::create(ton::adnl::AdnlNodeIdFull dst, td::IPAddress dst_addr,
|
||||||
td::IPAddress dst_addr,
|
|
||||||
td::unique_ptr<Callback> callback) {
|
td::unique_ptr<Callback> callback) {
|
||||||
return td::actor::create_actor<ExtClientLazyImp>("ExtClientLazy", dst, dst_addr, std::move(callback));
|
return create({std::make_pair(dst, dst_addr)}, std::move(callback));
|
||||||
|
}
|
||||||
|
|
||||||
|
td::actor::ActorOwn<ExtClientLazy> ExtClientLazy::create(
|
||||||
|
std::vector<std::pair<ton::adnl::AdnlNodeIdFull, td::IPAddress>> servers, td::unique_ptr<Callback> callback) {
|
||||||
|
return td::actor::create_actor<ExtClientLazyImp>("ExtClientLazy", std::move(servers), std::move(callback));
|
||||||
}
|
}
|
||||||
} // namespace tonlib
|
} // namespace tonlib
|
||||||
|
|
|
@ -22,15 +22,20 @@
|
||||||
#include "adnl/adnl-ext-client.h"
|
#include "adnl/adnl-ext-client.h"
|
||||||
|
|
||||||
namespace tonlib {
|
namespace tonlib {
|
||||||
class ExtClientLazy {
|
class ExtClientLazy : public ton::adnl::AdnlExtClient {
|
||||||
public:
|
public:
|
||||||
class Callback {
|
class Callback {
|
||||||
public:
|
public:
|
||||||
virtual ~Callback() {
|
virtual ~Callback() {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
static td::actor::ActorOwn<ton::adnl::AdnlExtClient> create(ton::adnl::AdnlNodeIdFull dst, td::IPAddress dst_addr,
|
|
||||||
|
virtual void force_change_liteserver() = 0;
|
||||||
|
|
||||||
|
static td::actor::ActorOwn<ExtClientLazy> create(ton::adnl::AdnlNodeIdFull dst, td::IPAddress dst_addr,
|
||||||
td::unique_ptr<Callback> callback);
|
td::unique_ptr<Callback> callback);
|
||||||
|
static td::actor::ActorOwn<ExtClientLazy> create(
|
||||||
|
std::vector<std::pair<ton::adnl::AdnlNodeIdFull, td::IPAddress>> servers, td::unique_ptr<Callback> callback);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace tonlib
|
} // namespace tonlib
|
||||||
|
|
|
@ -38,6 +38,9 @@ class ExtClientOutboundImp : public ExtClientOutbound {
|
||||||
callback_->request(query_id, data.as_slice().str());
|
callback_->request(query_id, data.as_slice().str());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void force_change_liteserver() override {
|
||||||
|
}
|
||||||
|
|
||||||
void on_query_result(td::int64 id, td::Result<td::BufferSlice> r_data, td::Promise<td::Unit> promise) override {
|
void on_query_result(td::int64 id, td::Result<td::BufferSlice> r_data, td::Promise<td::Unit> promise) override {
|
||||||
auto it = queries_.find(id);
|
auto it = queries_.find(id);
|
||||||
if (it == queries_.end()) {
|
if (it == queries_.end()) {
|
||||||
|
|
|
@ -19,10 +19,10 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
#include "td/actor/actor.h"
|
#include "td/actor/actor.h"
|
||||||
|
|
||||||
#include "adnl/adnl-ext-client.h"
|
#include "ExtClientLazy.h"
|
||||||
|
|
||||||
namespace tonlib {
|
namespace tonlib {
|
||||||
class ExtClientOutbound : public ton::adnl::AdnlExtClient {
|
class ExtClientOutbound : public ExtClientLazy {
|
||||||
public:
|
public:
|
||||||
class Callback {
|
class Callback {
|
||||||
public:
|
public:
|
||||||
|
|
|
@ -374,6 +374,7 @@ void LastBlock::on_sync_error(td::Status status) {
|
||||||
promise.set_error(status.clone());
|
promise.set_error(status.clone());
|
||||||
}
|
}
|
||||||
promises_.clear();
|
promises_.clear();
|
||||||
|
client_.force_change_liteserver();
|
||||||
}
|
}
|
||||||
void LastBlock::on_fatal_error(td::Status status) {
|
void LastBlock::on_fatal_error(td::Status status) {
|
||||||
VLOG(last_block) << "sync: fatal error " << status;
|
VLOG(last_block) << "sync: fatal error " << status;
|
||||||
|
|
|
@ -141,6 +141,7 @@ void LastConfig::on_error(td::Status status) {
|
||||||
promise.set_error(status.clone());
|
promise.set_error(status.clone());
|
||||||
}
|
}
|
||||||
promises_.clear();
|
promises_.clear();
|
||||||
|
get_config_state_ = QueryState::Empty;
|
||||||
}
|
}
|
||||||
|
|
||||||
void LastConfig::tear_down() {
|
void LastConfig::tear_down() {
|
||||||
|
|
|
@ -1649,7 +1649,7 @@ void TonlibClient::hangup() {
|
||||||
|
|
||||||
ExtClientRef TonlibClient::get_client_ref() {
|
ExtClientRef TonlibClient::get_client_ref() {
|
||||||
ExtClientRef ref;
|
ExtClientRef ref;
|
||||||
ref.andl_ext_client_ = raw_client_.get();
|
ref.adnl_ext_client_ = raw_client_.get();
|
||||||
ref.last_block_actor_ = raw_last_block_.get();
|
ref.last_block_actor_ = raw_last_block_.get();
|
||||||
ref.last_config_actor_ = raw_last_config_.get();
|
ref.last_config_actor_ = raw_last_config_.get();
|
||||||
|
|
||||||
|
@ -1683,10 +1683,10 @@ void TonlibClient::init_ext_client() {
|
||||||
ext_client_outbound_ = client.get();
|
ext_client_outbound_ = client.get();
|
||||||
raw_client_ = std::move(client);
|
raw_client_ = std::move(client);
|
||||||
} else {
|
} else {
|
||||||
auto lite_clients_size = config_.lite_clients.size();
|
std::vector<std::pair<ton::adnl::AdnlNodeIdFull, td::IPAddress>> servers;
|
||||||
CHECK(lite_clients_size != 0);
|
for (const auto& s : config_.lite_clients) {
|
||||||
auto lite_client_id = td::Random::fast(0, td::narrow_cast<int>(lite_clients_size) - 1);
|
servers.emplace_back(s.adnl_id, s.address);
|
||||||
auto& lite_client = config_.lite_clients[lite_client_id];
|
}
|
||||||
class Callback : public ExtClientLazy::Callback {
|
class Callback : public ExtClientLazy::Callback {
|
||||||
public:
|
public:
|
||||||
explicit Callback(td::actor::ActorShared<> parent) : parent_(std::move(parent)) {
|
explicit Callback(td::actor::ActorShared<> parent) : parent_(std::move(parent)) {
|
||||||
|
@ -1697,8 +1697,7 @@ void TonlibClient::init_ext_client() {
|
||||||
};
|
};
|
||||||
ext_client_outbound_ = {};
|
ext_client_outbound_ = {};
|
||||||
ref_cnt_++;
|
ref_cnt_++;
|
||||||
raw_client_ = ExtClientLazy::create(lite_client.adnl_id, lite_client.address,
|
raw_client_ = ExtClientLazy::create(std::move(servers), td::make_unique<Callback>(td::actor::actor_shared()));
|
||||||
td::make_unique<Callback>(td::actor::actor_shared()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -110,7 +110,7 @@ class TonlibClient : public td::actor::Actor {
|
||||||
vm::Dictionary libraries{256};
|
vm::Dictionary libraries{256};
|
||||||
|
|
||||||
// network
|
// network
|
||||||
td::actor::ActorOwn<ton::adnl::AdnlExtClient> raw_client_;
|
td::actor::ActorOwn<ExtClientLazy> raw_client_;
|
||||||
td::actor::ActorId<ExtClientOutbound> ext_client_outbound_;
|
td::actor::ActorId<ExtClientOutbound> ext_client_outbound_;
|
||||||
td::actor::ActorOwn<LastBlock> raw_last_block_;
|
td::actor::ActorOwn<LastBlock> raw_last_block_;
|
||||||
td::actor::ActorOwn<LastConfig> raw_last_config_;
|
td::actor::ActorOwn<LastConfig> raw_last_config_;
|
||||||
|
|
|
@ -174,7 +174,7 @@ class TonlibCli : public td::actor::Actor {
|
||||||
|
|
||||||
std::map<std::uint64_t, td::Promise<tonlib_api::object_ptr<tonlib_api::Object>>> query_handlers_;
|
std::map<std::uint64_t, td::Promise<tonlib_api::object_ptr<tonlib_api::Object>>> query_handlers_;
|
||||||
|
|
||||||
td::actor::ActorOwn<ton::adnl::AdnlExtClient> raw_client_;
|
td::actor::ActorOwn<tonlib::ExtClientLazy> raw_client_;
|
||||||
|
|
||||||
bool is_closing_{false};
|
bool is_closing_{false};
|
||||||
td::uint32 ref_cnt_{1};
|
td::uint32 ref_cnt_{1};
|
||||||
|
|
Loading…
Reference in a new issue