mirror of
https://github.com/ton-blockchain/ton
synced 2025-02-12 11:12:16 +00:00
update rldp-proxy: fix subscribing, change chunk size, add version headers (#523)
* Fix subscribing to request id, change chunk size * Add ton-proxy version to response headers Co-authored-by: SpyCheese <mikle98@yandex.ru>
This commit is contained in:
parent
d3e177b49e
commit
1bd1455fb6
1 changed files with 188 additions and 112 deletions
|
@ -33,7 +33,6 @@
|
|||
#include "td/utils/FileLog.h"
|
||||
#include "td/utils/Random.h"
|
||||
#include "td/utils/filesystem.h"
|
||||
#include "td/utils/overloaded.h"
|
||||
|
||||
#include "auto/tl/ton_api_json.h"
|
||||
#include "auto/tl/tonlib_api.hpp"
|
||||
|
@ -134,6 +133,15 @@ td::BufferSlice create_error_response(const std::string &proto_version, int code
|
|||
proto_version, code, reason, std::vector<ton::tl_object_ptr<ton::ton_api::http_header>>(), true);
|
||||
}
|
||||
|
||||
const std::string PROXY_SITE_VERISON_HEADER_NAME = "Ton-Proxy-Site-Version";
|
||||
const std::string PROXY_ENTRY_VERISON_HEADER_NAME = "Ton-Proxy-Entry-Version";
|
||||
const std::string PROXY_VERSION_HEADER = PSTRING() << "Commit: " << GitMetadata::CommitSHA1()
|
||||
<< ", Date: " << GitMetadata::CommitDate();
|
||||
|
||||
using RegisteredPayloadSenderGuard =
|
||||
std::unique_ptr<std::pair<td::actor::ActorId<RldpHttpProxy>, td::Bits256>,
|
||||
std::function<void(std::pair<td::actor::ActorId<RldpHttpProxy>, td::Bits256> *)>>;
|
||||
|
||||
class HttpRldpPayloadReceiver : public td::actor::Actor {
|
||||
public:
|
||||
HttpRldpPayloadReceiver(std::shared_ptr<ton::http::HttpPayload> payload, td::Bits256 transfer_id,
|
||||
|
@ -244,10 +252,10 @@ class HttpRldpPayloadReceiver : public td::actor::Actor {
|
|||
|
||||
private:
|
||||
static constexpr size_t watermark() {
|
||||
return 1 << 15;
|
||||
return (1 << 21) - (1 << 11);
|
||||
}
|
||||
static constexpr size_t chunk_size() {
|
||||
return 1 << 17;
|
||||
return (1 << 21) - (1 << 11);
|
||||
}
|
||||
|
||||
std::shared_ptr<ton::http::HttpPayload> payload_;
|
||||
|
@ -268,12 +276,14 @@ class HttpRldpPayloadSender : public td::actor::Actor {
|
|||
public:
|
||||
HttpRldpPayloadSender(std::shared_ptr<ton::http::HttpPayload> payload, td::Bits256 transfer_id,
|
||||
ton::adnl::AdnlNodeIdShort local_id, td::actor::ActorId<ton::adnl::Adnl> adnl,
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp, bool is_tunnel = false)
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp, td::actor::ActorId<RldpHttpProxy> proxy,
|
||||
bool is_tunnel = false)
|
||||
: payload_(std::move(payload))
|
||||
, id_(transfer_id)
|
||||
, local_id_(local_id)
|
||||
, adnl_(adnl)
|
||||
, rldp_(rldp)
|
||||
, proxy_(proxy)
|
||||
, is_tunnel_(is_tunnel) {
|
||||
}
|
||||
|
||||
|
@ -289,52 +299,10 @@ class HttpRldpPayloadSender : public td::actor::Actor {
|
|||
return x;
|
||||
}
|
||||
|
||||
void start_up() override {
|
||||
class AdnlCb : public ton::adnl::Adnl::Callback {
|
||||
public:
|
||||
AdnlCb(td::actor::ActorId<HttpRldpPayloadSender> id) : self_id_(id) {
|
||||
}
|
||||
void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst,
|
||||
td::BufferSlice data) override {
|
||||
LOG(INFO) << "http payload sender: dropping message";
|
||||
}
|
||||
void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
|
||||
td::Promise<td::BufferSlice> promise) override {
|
||||
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::receive_query, std::move(data), std::move(promise));
|
||||
}
|
||||
void start_up() override;
|
||||
|
||||
private:
|
||||
td::actor::ActorId<HttpRldpPayloadSender> self_id_;
|
||||
};
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_, generate_prefix(),
|
||||
std::make_unique<AdnlCb>(actor_id(this)));
|
||||
|
||||
class Cb : public ton::http::HttpPayload::Callback {
|
||||
public:
|
||||
Cb(td::actor::ActorId<HttpRldpPayloadSender> id, size_t watermark) : self_id_(id), watermark_(watermark) {
|
||||
}
|
||||
void run(size_t ready_bytes) override {
|
||||
if (!reached_ && ready_bytes >= watermark_) {
|
||||
reached_ = true;
|
||||
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false);
|
||||
} else if (reached_ && ready_bytes < watermark_) {
|
||||
reached_ = false;
|
||||
}
|
||||
}
|
||||
void completed() override {
|
||||
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false);
|
||||
}
|
||||
|
||||
private:
|
||||
bool reached_ = false;
|
||||
td::actor::ActorId<HttpRldpPayloadSender> self_id_;
|
||||
size_t watermark_;
|
||||
};
|
||||
|
||||
payload_->add_callback(
|
||||
std::make_unique<Cb>(actor_id(this), is_tunnel_ ? 1 : ton::http::HttpRequest::low_watermark()));
|
||||
|
||||
alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 60.0 : 10.0);
|
||||
void registered_sender(RegisteredPayloadSenderGuard guard) {
|
||||
guard_ = std::move(guard);
|
||||
}
|
||||
|
||||
void try_answer_query(bool from_timer = false) {
|
||||
|
@ -391,13 +359,9 @@ class HttpRldpPayloadSender : public td::actor::Actor {
|
|||
try_answer_query(false);
|
||||
}
|
||||
|
||||
void receive_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
|
||||
auto F = ton::fetch_tl_object<ton::ton_api::http_getNextPayloadPart>(data, true);
|
||||
if (F.is_error()) {
|
||||
LOG(INFO) << "failed to parse query: " << F.move_as_error();
|
||||
return;
|
||||
}
|
||||
send_data(F.move_as_ok(), std::move(promise));
|
||||
void receive_query(ton::tl_object_ptr<ton::ton_api::http_getNextPayloadPart> f,
|
||||
td::Promise<td::BufferSlice> promise) {
|
||||
send_data(std::move(f), std::move(promise));
|
||||
}
|
||||
|
||||
void alarm() override {
|
||||
|
@ -429,24 +393,22 @@ class HttpRldpPayloadSender : public td::actor::Actor {
|
|||
stop();
|
||||
}
|
||||
|
||||
void tear_down() override {
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::unsubscribe, local_id_, generate_prefix());
|
||||
}
|
||||
|
||||
private:
|
||||
static constexpr size_t watermark() {
|
||||
return 1 << 15;
|
||||
return (1 << 21) - (1 << 11);
|
||||
}
|
||||
|
||||
std::shared_ptr<ton::http::HttpPayload> payload_;
|
||||
|
||||
td::Bits256 id_;
|
||||
RegisteredPayloadSenderGuard guard_;
|
||||
|
||||
td::int32 seqno_ = 0;
|
||||
|
||||
ton::adnl::AdnlNodeIdShort local_id_;
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl_;
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<RldpHttpProxy> proxy_;
|
||||
|
||||
size_t cur_query_size_;
|
||||
td::Promise<td::BufferSlice> cur_query_promise_;
|
||||
|
@ -462,7 +424,8 @@ class TcpToRldpRequestSender : public td::actor::Actor {
|
|||
std::shared_ptr<ton::http::HttpPayload> request_payload,
|
||||
td::Promise<std::pair<std::unique_ptr<ton::http::HttpResponse>, std::shared_ptr<ton::http::HttpPayload>>> promise,
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::dht::Dht> dht,
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp, td::actor::ActorId<DNSResolver> dns_resolver)
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp, td::actor::ActorId<RldpHttpProxy> proxy,
|
||||
td::actor::ActorId<DNSResolver> dns_resolver)
|
||||
: local_id_(local_id)
|
||||
, host_(std::move(host))
|
||||
, request_(std::move(request))
|
||||
|
@ -471,6 +434,7 @@ class TcpToRldpRequestSender : public td::actor::Actor {
|
|||
, adnl_(adnl)
|
||||
, dht_(dht)
|
||||
, rldp_(rldp)
|
||||
, proxy_(proxy)
|
||||
, dns_resolver_(dns_resolver) {
|
||||
}
|
||||
void start_up() override {
|
||||
|
@ -492,7 +456,7 @@ class TcpToRldpRequestSender : public td::actor::Actor {
|
|||
});
|
||||
|
||||
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender", request_payload_, id_, local_id_, adnl_, rldp_,
|
||||
is_tunnel())
|
||||
proxy_, is_tunnel())
|
||||
.release();
|
||||
|
||||
auto f = ton::serialize_tl_object(request_->store_tl(id_), true);
|
||||
|
@ -523,6 +487,7 @@ class TcpToRldpRequestSender : public td::actor::Actor {
|
|||
}
|
||||
response_->add_header(std::move(h));
|
||||
}
|
||||
response_->add_header({PROXY_ENTRY_VERISON_HEADER_NAME, PROXY_VERSION_HEADER});
|
||||
auto S = response_->complete_parse_header();
|
||||
if (S.is_error()) {
|
||||
abort_query(S.move_as_error());
|
||||
|
@ -579,6 +544,7 @@ class TcpToRldpRequestSender : public td::actor::Actor {
|
|||
td::actor::ActorId<ton::adnl::Adnl> adnl_;
|
||||
td::actor::ActorId<ton::dht::Dht> dht_;
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<RldpHttpProxy> proxy_;
|
||||
td::actor::ActorId<DNSResolver> dns_resolver_;
|
||||
|
||||
std::unique_ptr<ton::http::HttpResponse> response_;
|
||||
|
@ -588,47 +554,28 @@ class TcpToRldpRequestSender : public td::actor::Actor {
|
|||
class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase {
|
||||
public:
|
||||
RldpTcpTunnel(td::Bits256 transfer_id, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id,
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp, td::SocketFd fd)
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp,
|
||||
td::actor::ActorId<RldpHttpProxy> proxy, td::SocketFd fd)
|
||||
: id_(transfer_id)
|
||||
, src_(src)
|
||||
, local_id_(local_id)
|
||||
, adnl_(std::move(adnl))
|
||||
, rldp_(std::move(rldp))
|
||||
, proxy_(std::move(proxy))
|
||||
, fd_(std::move(fd)) {
|
||||
}
|
||||
|
||||
void start_up() override {
|
||||
self_ = actor_id(this);
|
||||
td::actor::SchedulerContext::get()->get_poll().subscribe(fd_.get_poll_info().extract_pollable_fd(this),
|
||||
td::PollFlags::ReadWrite());
|
||||
|
||||
class Cb : public ton::adnl::Adnl::Callback {
|
||||
public:
|
||||
explicit Cb(td::actor::ActorId<RldpTcpTunnel> id) : self_id_(std::move(id)) {
|
||||
}
|
||||
void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst,
|
||||
td::BufferSlice data) override {
|
||||
LOG(INFO) << "rldp tcp tunnel: dropping message";
|
||||
}
|
||||
void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
|
||||
td::Promise<td::BufferSlice> promise) override {
|
||||
td::actor::send_closure(self_id_, &RldpTcpTunnel::receive_query, std::move(data), std::move(promise));
|
||||
}
|
||||
|
||||
private:
|
||||
td::actor::ActorId<RldpTcpTunnel> self_id_;
|
||||
};
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_, generate_prefix(),
|
||||
std::make_unique<Cb>(actor_id(this)));
|
||||
process();
|
||||
}
|
||||
void start_up() override;
|
||||
|
||||
void tear_down() override {
|
||||
LOG(INFO) << "RldpTcpTunnel: tear_down";
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::unsubscribe, local_id_, generate_prefix());
|
||||
td::actor::SchedulerContext::get()->get_poll().unsubscribe(fd_.get_poll_info().get_pollable_fd_ref());
|
||||
}
|
||||
|
||||
void registered_sender(RegisteredPayloadSenderGuard guard) {
|
||||
guard_ = std::move(guard);
|
||||
}
|
||||
|
||||
void notify() override {
|
||||
td::actor::send_closure(self_, &RldpTcpTunnel::process);
|
||||
}
|
||||
|
@ -642,19 +589,14 @@ class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase {
|
|||
td::actor::send_closure(SelfId, &RldpTcpTunnel::got_data_from_rldp, std::move(R));
|
||||
});
|
||||
|
||||
auto f = ton::create_serialize_tl_object<ton::ton_api::http_getNextPayloadPart>(id_, out_seqno_++, 1 << 17);
|
||||
auto f = ton::create_serialize_tl_object<ton::ton_api::http_getNextPayloadPart>(id_, out_seqno_++,
|
||||
(1 << 21) - (1 << 11));
|
||||
td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, src_, "payload part", std::move(P),
|
||||
td::Timestamp::in(60.0), std::move(f), (1 << 18) + 1024);
|
||||
td::Timestamp::in(60.0), std::move(f), (1 << 21) + 1024);
|
||||
}
|
||||
|
||||
void receive_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
|
||||
auto F = ton::fetch_tl_object<ton::ton_api::http_getNextPayloadPart>(data, true);
|
||||
if (F.is_error()) {
|
||||
LOG(INFO) << "failed to parse query: " << F.error();
|
||||
promise.set_error(F.move_as_error());
|
||||
return;
|
||||
}
|
||||
auto f = F.move_as_ok();
|
||||
void receive_query(ton::tl_object_ptr<ton::ton_api::http_getNextPayloadPart> f,
|
||||
td::Promise<td::BufferSlice> promise) {
|
||||
if (cur_promise_) {
|
||||
LOG(INFO) << "failed to process query: previous query is active";
|
||||
promise.set_error(td::Status::Error("previous query is active"));
|
||||
|
@ -772,11 +714,13 @@ class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase {
|
|||
}
|
||||
|
||||
td::Bits256 id_;
|
||||
RegisteredPayloadSenderGuard guard_;
|
||||
|
||||
ton::adnl::AdnlNodeIdShort src_;
|
||||
ton::adnl::AdnlNodeIdShort local_id_;
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl_;
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<RldpHttpProxy> proxy_;
|
||||
|
||||
td::BufferedFd<td::SocketFd> fd_;
|
||||
|
||||
|
@ -795,7 +739,7 @@ class RldpToTcpRequestSender : public td::actor::Actor {
|
|||
std::unique_ptr<ton::http::HttpRequest> request,
|
||||
std::shared_ptr<ton::http::HttpPayload> request_payload, td::Promise<td::BufferSlice> promise,
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp,
|
||||
td::actor::ActorId<HttpRemote> remote)
|
||||
td::actor::ActorId<RldpHttpProxy> proxy, td::actor::ActorId<HttpRemote> remote)
|
||||
: id_(id)
|
||||
, local_id_(local_id)
|
||||
, dst_(dst)
|
||||
|
@ -805,6 +749,7 @@ class RldpToTcpRequestSender : public td::actor::Actor {
|
|||
, promise_(std::move(promise))
|
||||
, adnl_(adnl)
|
||||
, rldp_(rldp)
|
||||
, proxy_(proxy)
|
||||
, remote_(std::move(remote)) {
|
||||
}
|
||||
void start_up() override {
|
||||
|
@ -826,8 +771,9 @@ class RldpToTcpRequestSender : public td::actor::Actor {
|
|||
|
||||
void got_result(std::pair<std::unique_ptr<ton::http::HttpResponse>, std::shared_ptr<ton::http::HttpPayload>> R) {
|
||||
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender(R)", std::move(R.second), id_, local_id_, adnl_,
|
||||
rldp_)
|
||||
rldp_, proxy_)
|
||||
.release();
|
||||
R.first->add_header({PROXY_SITE_VERISON_HEADER_NAME, PROXY_VERSION_HEADER});
|
||||
auto f = ton::serialize_tl_object(R.first->store_tl(), true);
|
||||
promise_.set_value(std::move(f));
|
||||
stop();
|
||||
|
@ -853,6 +799,7 @@ class RldpToTcpRequestSender : public td::actor::Actor {
|
|||
|
||||
td::actor::ActorId<ton::adnl::Adnl> adnl_;
|
||||
td::actor::ActorId<ton::rldp::Rldp> rldp_;
|
||||
td::actor::ActorId<RldpHttpProxy> proxy_;
|
||||
|
||||
td::actor::ActorId<HttpRemote> remote_;
|
||||
};
|
||||
|
@ -1067,6 +1014,22 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
server_ = ton::http::HttpServer::create(port_, std::make_shared<Cb>(actor_id(this)));
|
||||
}
|
||||
|
||||
class AdnlPayloadCb : public ton::adnl::Adnl::Callback {
|
||||
public:
|
||||
AdnlPayloadCb(td::actor::ActorId<RldpHttpProxy> id) : self_id_(id) {
|
||||
}
|
||||
void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst,
|
||||
td::BufferSlice data) override {
|
||||
}
|
||||
void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
|
||||
td::Promise<td::BufferSlice> promise) override {
|
||||
td::actor::send_closure(self_id_, &RldpHttpProxy::receive_payload_part_request, std::move(data),
|
||||
std::move(promise));
|
||||
}
|
||||
|
||||
private:
|
||||
td::actor::ActorId<RldpHttpProxy> self_id_;
|
||||
};
|
||||
for (auto &serv_id : server_ids_) {
|
||||
class AdnlCb : public ton::adnl::Adnl::Callback {
|
||||
public:
|
||||
|
@ -1087,7 +1050,15 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, serv_id,
|
||||
ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_request::ID),
|
||||
std::make_unique<AdnlCb>(actor_id(this)));
|
||||
if (local_id_ != serv_id) {
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, serv_id,
|
||||
ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_getNextPayloadPart::ID),
|
||||
std::make_unique<AdnlPayloadCb>(actor_id(this)));
|
||||
}
|
||||
}
|
||||
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_,
|
||||
ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_getNextPayloadPart::ID),
|
||||
std::make_unique<AdnlPayloadCb>(actor_id(this)));
|
||||
|
||||
rldp_ = ton::rldp::Rldp::create(adnl_.get());
|
||||
td::actor::send_closure(rldp_, &ton::rldp::Rldp::set_default_mtu, 16 << 10);
|
||||
|
@ -1141,7 +1112,7 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
|
||||
td::actor::create_actor<TcpToRldpRequestSender>("outboundreq", local_id_, host, std::move(request),
|
||||
std::move(payload), std::move(promise), adnl_.get(), dht_.get(),
|
||||
rldp_.get(), dns_resolver_.get())
|
||||
rldp_.get(), actor_id(this), dns_resolver_.get())
|
||||
.release();
|
||||
}
|
||||
|
||||
|
@ -1237,7 +1208,7 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
LOG(INFO) << "starting HTTP over RLDP request";
|
||||
td::actor::create_actor<RldpToTcpRequestSender>("inboundreq", f->id_, dst, src, std::move(request),
|
||||
payload.move_as_ok(), std::move(promise), adnl_.get(), rldp_.get(),
|
||||
server.http_remote_.get())
|
||||
actor_id(this), server.http_remote_.get())
|
||||
.release();
|
||||
}
|
||||
|
||||
|
@ -1249,10 +1220,52 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
return;
|
||||
}
|
||||
td::actor::create_actor<RldpTcpTunnel>(td::actor::ActorOptions().with_name("tunnel").with_poll(), id, src, local_id,
|
||||
adnl_.get(), rldp_.get(), fd.move_as_ok()).release();
|
||||
adnl_.get(), rldp_.get(), actor_id(this), fd.move_as_ok())
|
||||
.release();
|
||||
std::vector<ton::tl_object_ptr<ton::ton_api::http_header>> headers;
|
||||
headers.push_back(
|
||||
ton::create_tl_object<ton::ton_api::http_header>(PROXY_SITE_VERISON_HEADER_NAME, PROXY_VERSION_HEADER));
|
||||
promise.set_result(ton::create_serialize_tl_object<ton::ton_api::http_response>(
|
||||
http_version, 200, "Connection Established", std::vector<ton::tl_object_ptr<ton::ton_api::http_header>>(),
|
||||
false));
|
||||
http_version, 200, "Connection Established", std::move(headers), false));
|
||||
}
|
||||
|
||||
void receive_payload_part_request(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
|
||||
auto F = ton::fetch_tl_object<ton::ton_api::http_getNextPayloadPart>(data, true);
|
||||
if (F.is_error()) {
|
||||
LOG(INFO) << "failed to parse query: " << F.error();
|
||||
promise.set_error(F.move_as_error());
|
||||
return;
|
||||
}
|
||||
auto f = F.move_as_ok();
|
||||
auto it = payload_senders_.find(f->id_);
|
||||
if (it == payload_senders_.end()) {
|
||||
LOG(INFO) << "failed to answer query: unknown request id";
|
||||
promise.set_error(td::Status::Error("unknown request id"));
|
||||
return;
|
||||
}
|
||||
it->second(std::move(f), std::move(promise));
|
||||
}
|
||||
|
||||
void register_payload_sender(
|
||||
td::Bits256 id,
|
||||
std::function<void(ton::tl_object_ptr<ton::ton_api::http_getNextPayloadPart>, td::Promise<td::BufferSlice>)> f,
|
||||
td::Promise<RegisteredPayloadSenderGuard> promise) {
|
||||
auto &f1 = payload_senders_[id];
|
||||
if (f1) {
|
||||
promise.set_error(td::Status::Error("duplicate id"));
|
||||
return;
|
||||
}
|
||||
f1 = std::move(f);
|
||||
promise.set_result(RegisteredPayloadSenderGuard(
|
||||
new std::pair<td::actor::ActorId<RldpHttpProxy>, td::Bits256>(actor_id(this), id),
|
||||
[](std::pair<td::actor::ActorId<RldpHttpProxy>, td::Bits256> *x) {
|
||||
td::actor::send_closure(x->first, &RldpHttpProxy::unregister_payload_sender, x->second);
|
||||
delete x;
|
||||
}));
|
||||
}
|
||||
|
||||
void unregister_payload_sender(td::Bits256 id) {
|
||||
payload_senders_.erase(id);
|
||||
}
|
||||
|
||||
void add_adnl_addr(ton::adnl::AdnlNodeIdShort id) {
|
||||
|
@ -1304,6 +1317,10 @@ class RldpHttpProxy : public td::actor::Actor {
|
|||
|
||||
td::actor::ActorOwn<TonlibClient> tonlib_client_;
|
||||
td::actor::ActorOwn<DNSResolver> dns_resolver_;
|
||||
|
||||
std::map<td::Bits256,
|
||||
std::function<void(ton::tl_object_ptr<ton::ton_api::http_getNextPayloadPart>, td::Promise<td::BufferSlice>)>>
|
||||
payload_senders_;
|
||||
};
|
||||
|
||||
void TcpToRldpRequestSender::resolve() {
|
||||
|
@ -1329,6 +1346,67 @@ void TcpToRldpRequestSender::resolve() {
|
|||
td::actor::send_closure(dns_resolver_, &DNSResolver::resolve, host_, std::move(P));
|
||||
}
|
||||
|
||||
void HttpRldpPayloadSender::start_up() {
|
||||
td::actor::send_closure(
|
||||
proxy_, &RldpHttpProxy::register_payload_sender, id_,
|
||||
[SelfId = actor_id(this)](ton::tl_object_ptr<ton::ton_api::http_getNextPayloadPart> f,
|
||||
td::Promise<td::BufferSlice> promise) {
|
||||
td::actor::send_closure(SelfId, &HttpRldpPayloadSender::receive_query, std::move(f), std::move(promise));
|
||||
},
|
||||
[SelfId = actor_id(this)](td::Result<RegisteredPayloadSenderGuard> R) {
|
||||
if (R.is_error()) {
|
||||
LOG(INFO) << "Failed to register request sender: " << R.move_as_error();
|
||||
}
|
||||
td::actor::send_closure(SelfId, &HttpRldpPayloadSender::registered_sender, R.move_as_ok());
|
||||
});
|
||||
|
||||
class Cb : public ton::http::HttpPayload::Callback {
|
||||
public:
|
||||
Cb(td::actor::ActorId<HttpRldpPayloadSender> id, size_t watermark) : self_id_(id), watermark_(watermark) {
|
||||
}
|
||||
void run(size_t ready_bytes) override {
|
||||
if (!reached_ && ready_bytes >= watermark_) {
|
||||
reached_ = true;
|
||||
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false);
|
||||
} else if (reached_ && ready_bytes < watermark_) {
|
||||
reached_ = false;
|
||||
}
|
||||
}
|
||||
void completed() override {
|
||||
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false);
|
||||
}
|
||||
|
||||
private:
|
||||
bool reached_ = false;
|
||||
td::actor::ActorId<HttpRldpPayloadSender> self_id_;
|
||||
size_t watermark_;
|
||||
};
|
||||
|
||||
payload_->add_callback(
|
||||
std::make_unique<Cb>(actor_id(this), is_tunnel_ ? 1 : ton::http::HttpRequest::low_watermark()));
|
||||
|
||||
alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 60.0 : 10.0);
|
||||
}
|
||||
|
||||
void RldpTcpTunnel::start_up() {
|
||||
self_ = actor_id(this);
|
||||
td::actor::SchedulerContext::get()->get_poll().subscribe(fd_.get_poll_info().extract_pollable_fd(this),
|
||||
td::PollFlags::ReadWrite());
|
||||
td::actor::send_closure(
|
||||
proxy_, &RldpHttpProxy::register_payload_sender, id_,
|
||||
[SelfId = actor_id(this)](ton::tl_object_ptr<ton::ton_api::http_getNextPayloadPart> f,
|
||||
td::Promise<td::BufferSlice> promise) {
|
||||
td::actor::send_closure(SelfId, &RldpTcpTunnel::receive_query, std::move(f), std::move(promise));
|
||||
},
|
||||
[SelfId = actor_id(this)](td::Result<RegisteredPayloadSenderGuard> R) {
|
||||
if (R.is_error()) {
|
||||
LOG(INFO) << "Failed to register request sender: " << R.move_as_error();
|
||||
}
|
||||
td::actor::send_closure(SelfId, &RldpTcpTunnel::registered_sender, R.move_as_ok());
|
||||
});
|
||||
process();
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
SET_VERBOSITY_LEVEL(verbosity_WARNING);
|
||||
|
||||
|
@ -1340,7 +1418,7 @@ int main(int argc, char *argv[]) {
|
|||
td::log_interface = td::default_log_interface;
|
||||
};
|
||||
|
||||
auto add_local_host = [&](const std::string& local, const std::string& remote) -> td::Status {
|
||||
auto add_local_host = [&](const std::string &local, const std::string &remote) -> td::Status {
|
||||
std::string host;
|
||||
std::vector<td::uint16> ports;
|
||||
auto p = local.find(':');
|
||||
|
@ -1357,7 +1435,7 @@ int main(int argc, char *argv[]) {
|
|||
}
|
||||
try {
|
||||
ports.push_back((td::uint16)std::stoul(local.substr(p, p2 - p)));
|
||||
} catch (const std::logic_error& e) {
|
||||
} catch (const std::logic_error &e) {
|
||||
return td::Status::Error(PSLICE() << "Invalid port: " << local.substr(p, p2 - p));
|
||||
}
|
||||
p = p2 + 1;
|
||||
|
@ -1427,9 +1505,7 @@ int main(int argc, char *argv[]) {
|
|||
p.add_checked_option('L', "local",
|
||||
"<hosthame>:<ports>, hostname that will be proxied to localhost\n"
|
||||
"<ports> is a comma-separated list of ports (may be omitted, default: 80, 443)\n",
|
||||
[&](td::Slice arg) -> td::Status {
|
||||
return add_local_host(arg.str(), "127.0.0.1");
|
||||
});
|
||||
[&](td::Slice arg) -> td::Status { return add_local_host(arg.str(), "127.0.0.1"); });
|
||||
p.add_option('D', "db", "db root",
|
||||
[&](td::Slice arg) { td::actor::send_closure(x, &RldpHttpProxy::set_db_root, arg.str()); });
|
||||
p.add_checked_option(
|
||||
|
|
Loading…
Reference in a new issue