mirror of
https://github.com/ton-blockchain/ton
synced 2025-02-12 11:12:16 +00:00
rldp2 support in rldp-http-proxy (#608)
Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
parent
7a78ea33b7
commit
c369127ae0
8 changed files with 220 additions and 64 deletions
|
@ -2,4 +2,4 @@ cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR)
|
|||
|
||||
add_executable(rldp-http-proxy rldp-http-proxy.cpp DNSResolver.h DNSResolver.cpp)
|
||||
target_include_directories(rldp-http-proxy PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/..>)
|
||||
target_link_libraries(rldp-http-proxy PRIVATE tonhttp rldp dht tonlib git)
|
||||
target_link_libraries(rldp-http-proxy PRIVATE tonhttp rldp rldp2 dht tonlib git)
|
||||
|
|
|
@ -45,6 +45,7 @@
|
|||
|
||||
#include "adnl/adnl.h"
|
||||
#include "rldp/rldp.h"
|
||||
#include "rldp2/rldp.h"
|
||||
#include "dht/dht.h"
|
||||
|
||||
#include <algorithm>
|
||||
|
@ -63,6 +64,53 @@
|
|||
|
||||
class RldpHttpProxy;
|
||||
|
||||
class RldpDispatcher : public ton::adnl::AdnlSenderInterface {
|
||||
public:
|
||||
RldpDispatcher(td::actor::ActorId<ton::rldp::Rldp> rldp, td::actor::ActorId<ton::rldp2::Rldp> rldp2)
|
||||
: rldp_(std::move(rldp)), rldp2_(std::move(rldp2)) {
|
||||
}
|
||||
|
||||
void send_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data) override {
|
||||
td::actor::send_closure(dispatch(dst), &ton::adnl::AdnlSenderInterface::send_message, src, dst, std::move(data));
|
||||
}
|
||||
|
||||
void send_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, std::string name,
|
||||
td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice data) override {
|
||||
td::actor::send_closure(dispatch(dst), &ton::adnl::AdnlSenderInterface::send_query, src, dst, std::move(name),
|
||||
std::move(promise), timeout, std::move(data));
|
||||
}
|
||||
void send_query_ex(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, std::string name,
|
||||
td::Promise<td::BufferSlice> promise, td::Timestamp timeout, td::BufferSlice data,
|
||||
td::uint64 max_answer_size) override {
|
||||
td::actor::send_closure(dispatch(dst), &ton::adnl::AdnlSenderInterface::send_query_ex, src, dst, std::move(name),
|
||||
std::move(promise), timeout, std::move(data), max_answer_size);
|
||||
}
|
||||
void get_conn_ip_str(ton::adnl::AdnlNodeIdShort l_id, ton::adnl::AdnlNodeIdShort p_id,
|
||||
td::Promise<td::string> promise) override {
|
||||
td::actor::send_closure(rldp_, &ton::adnl::AdnlSenderInterface::get_conn_ip_str, l_id, p_id, std::move(promise));
|
||||
}
|
||||
|
||||
void set_supports_rldp2(ton::adnl::AdnlNodeIdShort dst, bool supports) {
|
||||
if (supports) {
|
||||
supports_rldp2_.insert(dst);
|
||||
} else {
|
||||
supports_rldp2_.erase(dst);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<ton::rldp2::Rldp> rldp2_;
|
||||
std::set<ton::adnl::AdnlNodeIdShort> supports_rldp2_;
|
||||
|
||||
td::actor::ActorId<ton::adnl::AdnlSenderInterface> dispatch(ton::adnl::AdnlNodeIdShort dst) const {
|
||||
if (supports_rldp2_.count(dst)) {
|
||||
return rldp2_;
|
||||
}
|
||||
return rldp_;
|
||||
}
|
||||
};
|
||||
|
||||
class HttpRemote : public td::actor::Actor {
|
||||
public:
|
||||
struct Query {
|
||||
|
@ -137,6 +185,8 @@ const std::string PROXY_SITE_VERISON_HEADER_NAME = "Ton-Proxy-Site-Version";
|
|||
const std::string PROXY_ENTRY_VERISON_HEADER_NAME = "Ton-Proxy-Entry-Version";
|
||||
const std::string PROXY_VERSION_HEADER = PSTRING() << "Commit: " << GitMetadata::CommitSHA1()
|
||||
<< ", Date: " << GitMetadata::CommitDate();
|
||||
const td::uint64 CAPABILITY_RLDP2 = 1;
|
||||
const td::uint64 CAPABILITIES = 1;
|
||||
|
||||
using RegisteredPayloadSenderGuard =
|
||||
std::unique_ptr<std::pair<td::actor::ActorId<RldpHttpProxy>, td::Bits256>,
|
||||
|
@ -146,8 +196,8 @@ class HttpRldpPayloadReceiver : public td::actor::Actor {
|
|||
public:
|
||||
HttpRldpPayloadReceiver(std::shared_ptr<ton::http::HttpPayload> payload, td::Bits256 transfer_id,
|
||||
ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id,
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp,
|
||||
bool is_tunnel = false)
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl,
|
||||
td::actor::ActorId<ton::adnl::AdnlSenderInterface> rldp, bool is_tunnel = false)
|
||||
: payload_(std::move(payload))
|
||||
, id_(transfer_id)
|
||||
, src_(src)
|
||||
|
@ -204,8 +254,8 @@ class HttpRldpPayloadReceiver : public td::actor::Actor {
|
|||
auto f = ton::create_serialize_tl_object<ton::ton_api::http_getNextPayloadPart>(
|
||||
id_, seqno_++, static_cast<td::int32>(chunk_size()));
|
||||
auto timeout = td::Timestamp::in(is_tunnel_ ? 60.0 : 15.0);
|
||||
td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, src_, "payload part", std::move(P),
|
||||
timeout, std::move(f), 2 * chunk_size() + 1024);
|
||||
td::actor::send_closure(rldp_, &ton::adnl::AdnlSenderInterface::send_query_ex, local_id_, src_, "payload part",
|
||||
std::move(P), timeout, std::move(f), 2 * chunk_size() + 1024);
|
||||
}
|
||||
|
||||
void add_data(td::BufferSlice data) {
|
||||
|
@ -265,7 +315,7 @@ class HttpRldpPayloadReceiver : public td::actor::Actor {
|
|||
ton::adnl::AdnlNodeIdShort src_;
|
||||
ton::adnl::AdnlNodeIdShort local_id_;
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl_;
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<ton::adnl::AdnlSenderInterface> rldp_;
|
||||
|
||||
bool sent_ = false;
|
||||
td::int32 seqno_ = 0;
|
||||
|
@ -276,8 +326,8 @@ class HttpRldpPayloadSender : public td::actor::Actor {
|
|||
public:
|
||||
HttpRldpPayloadSender(std::shared_ptr<ton::http::HttpPayload> payload, td::Bits256 transfer_id,
|
||||
ton::adnl::AdnlNodeIdShort local_id, td::actor::ActorId<ton::adnl::Adnl> adnl,
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp, td::actor::ActorId<RldpHttpProxy> proxy,
|
||||
bool is_tunnel = false)
|
||||
td::actor::ActorId<ton::adnl::AdnlSenderInterface> rldp,
|
||||
td::actor::ActorId<RldpHttpProxy> proxy, bool is_tunnel = false)
|
||||
: payload_(std::move(payload))
|
||||
, id_(transfer_id)
|
||||
, local_id_(local_id)
|
||||
|
@ -407,7 +457,7 @@ class HttpRldpPayloadSender : public td::actor::Actor {
|
|||
|
||||
ton::adnl::AdnlNodeIdShort local_id_;
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl_;
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<ton::adnl::AdnlSenderInterface> rldp_;
|
||||
td::actor::ActorId<RldpHttpProxy> proxy_;
|
||||
|
||||
size_t cur_query_size_;
|
||||
|
@ -424,9 +474,8 @@ class TcpToRldpRequestSender : public td::actor::Actor {
|
|||
std::shared_ptr<ton::http::HttpPayload> request_payload,
|
||||
td::Promise<std::pair<std::unique_ptr<ton::http::HttpResponse>, std::shared_ptr<ton::http::HttpPayload>>> promise,
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::dht::Dht> dht,
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp, td::actor::ActorId<RldpHttpProxy> proxy,
|
||||
td::actor::ActorId<DNSResolver> dns_resolver,
|
||||
ton::adnl::AdnlNodeIdShort storage_gateway)
|
||||
td::actor::ActorId<ton::adnl::AdnlSenderInterface> rldp, td::actor::ActorId<RldpHttpProxy> proxy,
|
||||
td::actor::ActorId<DNSResolver> dns_resolver, ton::adnl::AdnlNodeIdShort storage_gateway)
|
||||
: local_id_(local_id)
|
||||
, host_(std::move(host))
|
||||
, request_(std::move(request))
|
||||
|
@ -447,26 +496,7 @@ class TcpToRldpRequestSender : public td::actor::Actor {
|
|||
}
|
||||
|
||||
void resolve(std::string host);
|
||||
|
||||
void resolved(ton::adnl::AdnlNodeIdShort id) {
|
||||
dst_ = id;
|
||||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::BufferSlice> R) {
|
||||
if (R.is_error()) {
|
||||
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::abort_query, R.move_as_error());
|
||||
} else {
|
||||
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::got_result, R.move_as_ok());
|
||||
}
|
||||
});
|
||||
|
||||
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender", request_payload_, id_, local_id_, adnl_, rldp_,
|
||||
proxy_, is_tunnel())
|
||||
.release();
|
||||
|
||||
auto f = ton::serialize_tl_object(request_tl_, true);
|
||||
td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, dst_, "http request over rldp",
|
||||
std::move(P), td::Timestamp::in(30.0), std::move(f), 16 << 10);
|
||||
}
|
||||
void resolved(ton::adnl::AdnlNodeIdShort id);
|
||||
|
||||
void got_result(td::BufferSlice data) {
|
||||
auto F = ton::fetch_tl_object<ton::ton_api::http_response>(data, true);
|
||||
|
@ -548,7 +578,7 @@ class TcpToRldpRequestSender : public td::actor::Actor {
|
|||
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl_;
|
||||
td::actor::ActorId<ton::dht::Dht> dht_;
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<ton::adnl::AdnlSenderInterface> rldp_;
|
||||
td::actor::ActorId<RldpHttpProxy> proxy_;
|
||||
td::actor::ActorId<DNSResolver> dns_resolver_;
|
||||
ton::adnl::AdnlNodeIdShort storage_gateway_ = ton::adnl::AdnlNodeIdShort::zero();
|
||||
|
@ -562,7 +592,7 @@ class TcpToRldpRequestSender : public td::actor::Actor {
|
|||
class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase {
|
||||
public:
|
||||
RldpTcpTunnel(td::Bits256 transfer_id, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id,
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp,
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::adnl::AdnlSenderInterface> rldp,
|
||||
td::actor::ActorId<RldpHttpProxy> proxy, td::SocketFd fd)
|
||||
: id_(transfer_id)
|
||||
, src_(src)
|
||||
|
@ -599,8 +629,8 @@ class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase {
|
|||
|
||||
auto f = ton::create_serialize_tl_object<ton::ton_api::http_getNextPayloadPart>(id_, out_seqno_++,
|
||||
(1 << 21) - (1 << 11));
|
||||
td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, src_, "payload part", std::move(P),
|
||||
td::Timestamp::in(60.0), std::move(f), (1 << 21) + 1024);
|
||||
td::actor::send_closure(rldp_, &ton::adnl::AdnlSenderInterface::send_query_ex, local_id_, src_, "payload part",
|
||||
std::move(P), td::Timestamp::in(60.0), std::move(f), (1 << 21) + 1024);
|
||||
}
|
||||
|
||||
void receive_query(ton::tl_object_ptr<ton::ton_api::http_getNextPayloadPart> f,
|
||||
|
@ -727,7 +757,7 @@ class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase {
|
|||
ton::adnl::AdnlNodeIdShort src_;
|
||||
ton::adnl::AdnlNodeIdShort local_id_;
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl_;
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<ton::adnl::AdnlSenderInterface> rldp_;
|
||||
td::actor::ActorId<RldpHttpProxy> proxy_;
|
||||
|
||||
td::BufferedFd<td::SocketFd> fd_;
|
||||
|
@ -746,7 +776,8 @@ class RldpToTcpRequestSender : public td::actor::Actor {
|
|||
RldpToTcpRequestSender(td::Bits256 id, ton::adnl::AdnlNodeIdShort local_id, ton::adnl::AdnlNodeIdShort dst,
|
||||
std::unique_ptr<ton::http::HttpRequest> request,
|
||||
std::shared_ptr<ton::http::HttpPayload> request_payload, td::Promise<td::BufferSlice> promise,
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp,
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl,
|
||||
td::actor::ActorId<ton::adnl::AdnlSenderInterface> rldp,
|
||||
td::actor::ActorId<RldpHttpProxy> proxy, td::actor::ActorId<HttpRemote> remote)
|
||||
: id_(id)
|
||||
, local_id_(local_id)
|
||||
|
@ -806,7 +837,7 @@ class RldpToTcpRequestSender : public td::actor::Actor {
|
|||
td::Promise<td::BufferSlice> promise_;
|
||||
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl_;
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<ton::adnl::AdnlSenderInterface> rldp_;
|
||||
td::actor::ActorId<RldpHttpProxy> proxy_;
|
||||
|
||||
td::actor::ActorId<HttpRemote> remote_;
|
||||
|
@ -1032,35 +1063,58 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
private:
|
||||
td::actor::ActorId<RldpHttpProxy> self_id_;
|
||||
};
|
||||
for (auto &serv_id : server_ids_) {
|
||||
class AdnlCb : public ton::adnl::Adnl::Callback {
|
||||
public:
|
||||
AdnlCb(td::actor::ActorId<RldpHttpProxy> id) : self_id_(id) {
|
||||
}
|
||||
void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst,
|
||||
td::BufferSlice data) override {
|
||||
}
|
||||
void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
|
||||
td::Promise<td::BufferSlice> promise) override {
|
||||
td::actor::send_closure(self_id_, &RldpHttpProxy::receive_rldp_request, src, dst, std::move(data),
|
||||
std::move(promise));
|
||||
}
|
||||
class AdnlCapabilitiesCb : public ton::adnl::Adnl::Callback {
|
||||
public:
|
||||
AdnlCapabilitiesCb(td::actor::ActorId<RldpHttpProxy> id) : self_id_(id) {
|
||||
}
|
||||
void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst,
|
||||
td::BufferSlice data) override {
|
||||
}
|
||||
void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
|
||||
td::Promise<td::BufferSlice> promise) override {
|
||||
TRY_RESULT_PROMISE(promise, query, ton::fetch_tl_object<ton::ton_api::http_proxy_getCapabilities>(data, true));
|
||||
promise.set_result(ton::create_serialize_tl_object<ton::ton_api::http_proxy_capabilities>(CAPABILITIES));
|
||||
td::actor::send_closure(self_id_, &RldpHttpProxy::update_peer_capabilities, src, query->capabilities_);
|
||||
}
|
||||
|
||||
private:
|
||||
td::actor::ActorId<RldpHttpProxy> self_id_;
|
||||
};
|
||||
private:
|
||||
td::actor::ActorId<RldpHttpProxy> self_id_;
|
||||
};
|
||||
class AdnlServerCb : public ton::adnl::Adnl::Callback {
|
||||
public:
|
||||
AdnlServerCb(td::actor::ActorId<RldpHttpProxy> id) : self_id_(id) {
|
||||
}
|
||||
void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst,
|
||||
td::BufferSlice data) override {
|
||||
}
|
||||
void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
|
||||
td::Promise<td::BufferSlice> promise) override {
|
||||
td::actor::send_closure(self_id_, &RldpHttpProxy::receive_rldp_request, src, dst, std::move(data),
|
||||
std::move(promise));
|
||||
}
|
||||
|
||||
private:
|
||||
td::actor::ActorId<RldpHttpProxy> self_id_;
|
||||
};
|
||||
for (auto &serv_id : server_ids_) {
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, serv_id,
|
||||
ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_request::ID),
|
||||
std::make_unique<AdnlCb>(actor_id(this)));
|
||||
std::make_unique<AdnlServerCb>(actor_id(this)));
|
||||
if (local_id_ != serv_id) {
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, serv_id,
|
||||
ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_getNextPayloadPart::ID),
|
||||
std::make_unique<AdnlPayloadCb>(actor_id(this)));
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, serv_id,
|
||||
ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_proxy_getCapabilities::ID),
|
||||
std::make_unique<AdnlCapabilitiesCb>(actor_id(this)));
|
||||
}
|
||||
}
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_,
|
||||
ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_getNextPayloadPart::ID),
|
||||
std::make_unique<AdnlPayloadCb>(actor_id(this)));
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_,
|
||||
ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_proxy_getCapabilities::ID),
|
||||
std::make_unique<AdnlCapabilitiesCb>(actor_id(this)));
|
||||
|
||||
rldp_ = ton::rldp::Rldp::create(adnl_.get());
|
||||
td::actor::send_closure(rldp_, &ton::rldp::Rldp::set_default_mtu, 16 << 10);
|
||||
|
@ -1069,6 +1123,15 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
td::actor::send_closure(rldp_, &ton::rldp::Rldp::add_id, serv_id);
|
||||
}
|
||||
|
||||
rldp2_ = ton::rldp2::Rldp::create(adnl_.get());
|
||||
td::actor::send_closure(rldp2_, &ton::rldp2::Rldp::set_default_mtu, 16 << 10);
|
||||
td::actor::send_closure(rldp2_, &ton::rldp2::Rldp::add_id, local_id_);
|
||||
for (auto &serv_id : server_ids_) {
|
||||
td::actor::send_closure(rldp2_, &ton::rldp2::Rldp::add_id, serv_id);
|
||||
}
|
||||
|
||||
rldp_dispatcher_ = td::actor::create_actor<RldpDispatcher>("RldpDispatcher", rldp_.get(), rldp2_.get());
|
||||
|
||||
store_dht();
|
||||
}
|
||||
|
||||
|
@ -1107,7 +1170,7 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
}
|
||||
std::transform(host.begin(), host.end(), host.begin(), [](unsigned char c) { return std::tolower(c); });
|
||||
bool allow = proxy_all_;
|
||||
for (const char* suffix : {".adnl", ".ton", ".bag"}) {
|
||||
for (const char *suffix : {".adnl", ".ton", ".bag"}) {
|
||||
if (td::ends_with(host, td::Slice(suffix))) {
|
||||
allow = true;
|
||||
}
|
||||
|
@ -1117,9 +1180,9 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
return;
|
||||
}
|
||||
|
||||
td::actor::create_actor<TcpToRldpRequestSender>("outboundreq", local_id_, host, std::move(request),
|
||||
std::move(payload), std::move(promise), adnl_.get(), dht_.get(),
|
||||
rldp_.get(), actor_id(this), dns_resolver_.get(), storage_gateway_)
|
||||
td::actor::create_actor<TcpToRldpRequestSender>(
|
||||
"outboundreq", local_id_, host, std::move(request), std::move(payload), std::move(promise), adnl_.get(),
|
||||
dht_.get(), rldp_dispatcher_.get(), actor_id(this), dns_resolver_.get(), storage_gateway_)
|
||||
.release();
|
||||
}
|
||||
|
||||
|
@ -1127,6 +1190,7 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
td::Promise<td::BufferSlice> promise) {
|
||||
LOG(INFO) << "got HTTP request over rldp from " << src;
|
||||
TRY_RESULT_PROMISE(promise, f, ton::fetch_tl_object<ton::ton_api::http_request>(data, true));
|
||||
ask_peer_capabilities(src);
|
||||
std::unique_ptr<ton::http::HttpRequest> request;
|
||||
auto S = [&]() {
|
||||
TRY_RESULT_ASSIGN(request, ton::http::HttpRequest::create(f->method_, f->url_, f->http_version_));
|
||||
|
@ -1214,8 +1278,8 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
|
||||
LOG(INFO) << "starting HTTP over RLDP request";
|
||||
td::actor::create_actor<RldpToTcpRequestSender>("inboundreq", f->id_, dst, src, std::move(request),
|
||||
payload.move_as_ok(), std::move(promise), adnl_.get(), rldp_.get(),
|
||||
actor_id(this), server.http_remote_.get())
|
||||
payload.move_as_ok(), std::move(promise), adnl_.get(),
|
||||
rldp_dispatcher_.get(), actor_id(this), server.http_remote_.get())
|
||||
.release();
|
||||
}
|
||||
|
||||
|
@ -1227,7 +1291,7 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
return;
|
||||
}
|
||||
td::actor::create_actor<RldpTcpTunnel>(td::actor::ActorOptions().with_name("tunnel").with_poll(), id, src, local_id,
|
||||
adnl_.get(), rldp_.get(), actor_id(this), fd.move_as_ok())
|
||||
adnl_.get(), rldp_dispatcher_.get(), actor_id(this), fd.move_as_ok())
|
||||
.release();
|
||||
std::vector<ton::tl_object_ptr<ton::ton_api::http_header>> headers;
|
||||
headers.push_back(
|
||||
|
@ -1291,6 +1355,47 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
storage_gateway_ = id;
|
||||
}
|
||||
|
||||
void update_peer_capabilities(ton::adnl::AdnlNodeIdShort peer, td::uint64 capabilities) {
|
||||
auto &c = peer_capabilities_[peer];
|
||||
if (c.capabilities != capabilities) {
|
||||
LOG(DEBUG) << "Update capabilities of peer " << peer << " : " << capabilities;
|
||||
}
|
||||
c.capabilities = capabilities;
|
||||
c.received = true;
|
||||
td::actor::send_closure(rldp_dispatcher_, &RldpDispatcher::set_supports_rldp2, peer,
|
||||
capabilities & CAPABILITY_RLDP2);
|
||||
}
|
||||
|
||||
void ask_peer_capabilities(ton::adnl::AdnlNodeIdShort peer) {
|
||||
auto &c = peer_capabilities_[peer];
|
||||
if (!c.received && c.retry_at.is_in_past()) {
|
||||
c.retry_at = td::Timestamp::in(30.0);
|
||||
auto send_query = [&](const ton::adnl::AdnlNodeIdShort &local_id) {
|
||||
td::actor::send_closure(
|
||||
adnl_, &ton::adnl::Adnl::send_query, local_id, peer, "q",
|
||||
[SelfId = actor_id(this), peer](td::Result<td::BufferSlice> R) {
|
||||
if (R.is_error()) {
|
||||
return;
|
||||
}
|
||||
auto r_obj = ton::fetch_tl_object<ton::ton_api::http_proxy_capabilities>(R.move_as_ok(), true);
|
||||
if (r_obj.is_error()) {
|
||||
return;
|
||||
}
|
||||
td::actor::send_closure(SelfId, &RldpHttpProxy::update_peer_capabilities, peer,
|
||||
r_obj.ok()->capabilities_);
|
||||
},
|
||||
td::Timestamp::in(3.0),
|
||||
ton::create_serialize_tl_object<ton::ton_api::http_proxy_getCapabilities>(CAPABILITIES));
|
||||
};
|
||||
for (const ton::adnl::AdnlNodeIdShort &local_id : server_ids_) {
|
||||
if (local_id != local_id_) {
|
||||
send_query(local_id);
|
||||
}
|
||||
}
|
||||
send_query(local_id_);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
struct Host {
|
||||
struct Server {
|
||||
|
@ -1320,6 +1425,8 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
td::actor::ActorOwn<ton::adnl::Adnl> adnl_;
|
||||
td::actor::ActorOwn<ton::dht::Dht> dht_;
|
||||
td::actor::ActorOwn<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorOwn<ton::rldp2::Rldp> rldp2_;
|
||||
td::actor::ActorOwn<RldpDispatcher> rldp_dispatcher_;
|
||||
|
||||
std::shared_ptr<ton::dht::DhtGlobalConfig> dht_config_;
|
||||
|
||||
|
@ -1333,6 +1440,13 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
std::map<td::Bits256,
|
||||
std::function<void(ton::tl_object_ptr<ton::ton_api::http_getNextPayloadPart>, td::Promise<td::BufferSlice>)>>
|
||||
payload_senders_;
|
||||
|
||||
struct PeerCapabilities {
|
||||
td::uint64 capabilities = 0;
|
||||
bool received = false;
|
||||
td::Timestamp retry_at = td::Timestamp::now();
|
||||
};
|
||||
std::map<ton::adnl::AdnlNodeIdShort, PeerCapabilities> peer_capabilities_;
|
||||
};
|
||||
|
||||
void TcpToRldpRequestSender::resolve(std::string host) {
|
||||
|
@ -1355,7 +1469,7 @@ void TcpToRldpRequestSender::resolve(std::string host) {
|
|||
}
|
||||
request_tl_->url_ = (PSTRING() << "/gateway/" << bag_id << url);
|
||||
host = storage_gateway_.serialize() + ".adnl";
|
||||
for (auto& header : request_tl_->headers_) {
|
||||
for (auto &header : request_tl_->headers_) {
|
||||
if (td::to_lower(header->name_) == "host") {
|
||||
header->value_ = host;
|
||||
break;
|
||||
|
@ -1390,6 +1504,27 @@ void TcpToRldpRequestSender::resolve(std::string host) {
|
|||
});
|
||||
}
|
||||
|
||||
void TcpToRldpRequestSender::resolved(ton::adnl::AdnlNodeIdShort id) {
|
||||
dst_ = id;
|
||||
td::actor::send_closure(proxy_, &RldpHttpProxy::ask_peer_capabilities, id);
|
||||
|
||||
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::BufferSlice> R) {
|
||||
if (R.is_error()) {
|
||||
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::abort_query, R.move_as_error());
|
||||
} else {
|
||||
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::got_result, R.move_as_ok());
|
||||
}
|
||||
});
|
||||
|
||||
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender", request_payload_, id_, local_id_, adnl_, rldp_,
|
||||
proxy_, is_tunnel())
|
||||
.release();
|
||||
|
||||
auto f = ton::serialize_tl_object(request_tl_, true);
|
||||
td::actor::send_closure(rldp_, &ton::adnl::AdnlSenderInterface::send_query_ex, local_id_, dst_,
|
||||
"http request over rldp", std::move(P), td::Timestamp::in(30.0), std::move(f), 16 << 10);
|
||||
}
|
||||
|
||||
void HttpRldpPayloadSender::start_up() {
|
||||
td::actor::send_closure(
|
||||
proxy_, &RldpHttpProxy::register_payload_sender, id_,
|
||||
|
|
|
@ -92,6 +92,8 @@ class RldpIn : public RldpImpl {
|
|||
|
||||
void get_conn_ip_str(adnl::AdnlNodeIdShort l_id, adnl::AdnlNodeIdShort p_id, td::Promise<td::string> promise) override;
|
||||
|
||||
void set_default_mtu(td::uint64 mtu) override;
|
||||
|
||||
RldpIn(td::actor::ActorId<adnl::AdnlPeerTable> adnl) : adnl_(adnl) {
|
||||
}
|
||||
|
||||
|
@ -107,6 +109,8 @@ class RldpIn : public RldpImpl {
|
|||
|
||||
std::set<adnl::AdnlNodeIdShort> local_ids_;
|
||||
|
||||
td::optional<td::uint64> custom_default_mtu_;
|
||||
|
||||
td::actor::ActorId<RldpConnectionActor> create_connection(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst);
|
||||
};
|
||||
|
||||
|
|
|
@ -44,6 +44,9 @@ class RldpConnectionActor : public td::actor::Actor, private ConnectionCallback
|
|||
connection_.receive_raw(std::move(data));
|
||||
yield();
|
||||
}
|
||||
void set_default_mtu(td::uint64 mtu) {
|
||||
connection_.set_default_mtu(mtu);
|
||||
}
|
||||
|
||||
private:
|
||||
td::actor::ActorId<RldpIn> rldp_;
|
||||
|
@ -129,6 +132,9 @@ td::actor::ActorId<RldpConnectionActor> RldpIn::create_connection(adnl::AdnlNode
|
|||
return it->second.get();
|
||||
}
|
||||
auto connection = td::actor::create_actor<RldpConnectionActor>("RldpConnection", actor_id(this), src, dst, adnl_);
|
||||
if (custom_default_mtu_) {
|
||||
td::actor::send_closure(connection, &RldpConnectionActor::set_default_mtu, custom_default_mtu_.value());
|
||||
}
|
||||
auto res = connection.get();
|
||||
connections_[std::make_pair(src, dst)] = std::move(connection);
|
||||
return res;
|
||||
|
@ -221,6 +227,13 @@ void RldpIn::get_conn_ip_str(adnl::AdnlNodeIdShort l_id, adnl::AdnlNodeIdShort p
|
|||
td::actor::send_closure(adnl_, &adnl::AdnlPeerTable::get_conn_ip_str, l_id, p_id, std::move(promise));
|
||||
}
|
||||
|
||||
void RldpIn::set_default_mtu(td::uint64 mtu) {
|
||||
custom_default_mtu_ = mtu;
|
||||
for (auto &connection : connections_) {
|
||||
td::actor::send_closure(connection.second, &RldpConnectionActor::set_default_mtu, mtu);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<adnl::Adnl::Callback> RldpIn::make_adnl_callback() {
|
||||
class Callback : public adnl::Adnl::Callback {
|
||||
private:
|
||||
|
|
|
@ -37,6 +37,8 @@ class Rldp : public adnl::AdnlSenderInterface {
|
|||
virtual void send_message_ex(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::Timestamp timeout,
|
||||
td::BufferSlice data) = 0;
|
||||
|
||||
virtual void set_default_mtu(td::uint64 mtu) = 0;
|
||||
|
||||
static td::actor::ActorOwn<Rldp> create(td::actor::ActorId<adnl::Adnl> adnl);
|
||||
};
|
||||
|
||||
|
|
|
@ -720,11 +720,13 @@ storage.getPiece piece_id:int = storage.Piece;
|
|||
http.header name:string value:string = http.Header;
|
||||
http.payloadPart data:bytes trailer:(vector http.header) last:Bool = http.PayloadPart;
|
||||
http.response http_version:string status_code:int reason:string headers:(vector http.header) no_payload:Bool = http.Response;
|
||||
http.proxy.capabilities capabilities:long = http.proxy.Capabilities;
|
||||
|
||||
---functions---
|
||||
|
||||
http.request id:int256 method:string url:string http_version:string headers:(vector http.header) = http.Response;
|
||||
http.getNextPayloadPart id:int256 seqno:int max_chunk_size:int = http.PayloadPart;
|
||||
http.proxy.getCapabilities capabilities:long = http.proxy.Capabilities;
|
||||
|
||||
---types---
|
||||
|
||||
|
|
Binary file not shown.
Binary file not shown.
Loading…
Reference in a new issue