diff --git a/rldp-http-proxy/rldp-http-proxy.cpp b/rldp-http-proxy/rldp-http-proxy.cpp index a45e6433..1c76651f 100644 --- a/rldp-http-proxy/rldp-http-proxy.cpp +++ b/rldp-http-proxy/rldp-http-proxy.cpp @@ -33,7 +33,6 @@ #include "td/utils/FileLog.h" #include "td/utils/Random.h" #include "td/utils/filesystem.h" -#include "td/utils/overloaded.h" #include "auto/tl/ton_api_json.h" #include "auto/tl/tonlib_api.hpp" @@ -134,6 +133,15 @@ td::BufferSlice create_error_response(const std::string &proto_version, int code proto_version, code, reason, std::vector>(), true); } +const std::string PROXY_SITE_VERISON_HEADER_NAME = "Ton-Proxy-Site-Version"; +const std::string PROXY_ENTRY_VERISON_HEADER_NAME = "Ton-Proxy-Entry-Version"; +const std::string PROXY_VERSION_HEADER = PSTRING() << "Commit: " << GitMetadata::CommitSHA1() + << ", Date: " << GitMetadata::CommitDate(); + +using RegisteredPayloadSenderGuard = + std::unique_ptr, td::Bits256>, + std::function, td::Bits256> *)>>; + class HttpRldpPayloadReceiver : public td::actor::Actor { public: HttpRldpPayloadReceiver(std::shared_ptr payload, td::Bits256 transfer_id, @@ -244,10 +252,10 @@ class HttpRldpPayloadReceiver : public td::actor::Actor { private: static constexpr size_t watermark() { - return 1 << 15; + return (1 << 21) - (1 << 11); } static constexpr size_t chunk_size() { - return 1 << 17; + return (1 << 21) - (1 << 11); } std::shared_ptr payload_; @@ -268,12 +276,14 @@ class HttpRldpPayloadSender : public td::actor::Actor { public: HttpRldpPayloadSender(std::shared_ptr payload, td::Bits256 transfer_id, ton::adnl::AdnlNodeIdShort local_id, td::actor::ActorId adnl, - td::actor::ActorId rldp, bool is_tunnel = false) + td::actor::ActorId rldp, td::actor::ActorId proxy, + bool is_tunnel = false) : payload_(std::move(payload)) , id_(transfer_id) , local_id_(local_id) , adnl_(adnl) , rldp_(rldp) + , proxy_(proxy) , is_tunnel_(is_tunnel) { } @@ -289,52 +299,10 @@ class HttpRldpPayloadSender : public td::actor::Actor { return x; } - void start_up() override { - class AdnlCb : public ton::adnl::Adnl::Callback { - public: - AdnlCb(td::actor::ActorId id) : self_id_(id) { - } - void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, - td::BufferSlice data) override { - LOG(INFO) << "http payload sender: dropping message"; - } - void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data, - td::Promise promise) override { - td::actor::send_closure(self_id_, &HttpRldpPayloadSender::receive_query, std::move(data), std::move(promise)); - } + void start_up() override; - private: - td::actor::ActorId self_id_; - }; - td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_, generate_prefix(), - std::make_unique(actor_id(this))); - - class Cb : public ton::http::HttpPayload::Callback { - public: - Cb(td::actor::ActorId id, size_t watermark) : self_id_(id), watermark_(watermark) { - } - void run(size_t ready_bytes) override { - if (!reached_ && ready_bytes >= watermark_) { - reached_ = true; - td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false); - } else if (reached_ && ready_bytes < watermark_) { - reached_ = false; - } - } - void completed() override { - td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false); - } - - private: - bool reached_ = false; - td::actor::ActorId self_id_; - size_t watermark_; - }; - - payload_->add_callback( - std::make_unique(actor_id(this), is_tunnel_ ? 1 : ton::http::HttpRequest::low_watermark())); - - alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 60.0 : 10.0); + void registered_sender(RegisteredPayloadSenderGuard guard) { + guard_ = std::move(guard); } void try_answer_query(bool from_timer = false) { @@ -391,13 +359,9 @@ class HttpRldpPayloadSender : public td::actor::Actor { try_answer_query(false); } - void receive_query(td::BufferSlice data, td::Promise promise) { - auto F = ton::fetch_tl_object(data, true); - if (F.is_error()) { - LOG(INFO) << "failed to parse query: " << F.move_as_error(); - return; - } - send_data(F.move_as_ok(), std::move(promise)); + void receive_query(ton::tl_object_ptr f, + td::Promise promise) { + send_data(std::move(f), std::move(promise)); } void alarm() override { @@ -429,24 +393,22 @@ class HttpRldpPayloadSender : public td::actor::Actor { stop(); } - void tear_down() override { - td::actor::send_closure(adnl_, &ton::adnl::Adnl::unsubscribe, local_id_, generate_prefix()); - } - private: static constexpr size_t watermark() { - return 1 << 15; + return (1 << 21) - (1 << 11); } std::shared_ptr payload_; td::Bits256 id_; + RegisteredPayloadSenderGuard guard_; td::int32 seqno_ = 0; ton::adnl::AdnlNodeIdShort local_id_; td::actor::ActorId adnl_; td::actor::ActorId rldp_; + td::actor::ActorId proxy_; size_t cur_query_size_; td::Promise cur_query_promise_; @@ -462,7 +424,8 @@ class TcpToRldpRequestSender : public td::actor::Actor { std::shared_ptr request_payload, td::Promise, std::shared_ptr>> promise, td::actor::ActorId adnl, td::actor::ActorId dht, - td::actor::ActorId rldp, td::actor::ActorId dns_resolver) + td::actor::ActorId rldp, td::actor::ActorId proxy, + td::actor::ActorId dns_resolver) : local_id_(local_id) , host_(std::move(host)) , request_(std::move(request)) @@ -471,6 +434,7 @@ class TcpToRldpRequestSender : public td::actor::Actor { , adnl_(adnl) , dht_(dht) , rldp_(rldp) + , proxy_(proxy) , dns_resolver_(dns_resolver) { } void start_up() override { @@ -492,7 +456,7 @@ class TcpToRldpRequestSender : public td::actor::Actor { }); td::actor::create_actor("HttpPayloadSender", request_payload_, id_, local_id_, adnl_, rldp_, - is_tunnel()) + proxy_, is_tunnel()) .release(); auto f = ton::serialize_tl_object(request_->store_tl(id_), true); @@ -523,6 +487,7 @@ class TcpToRldpRequestSender : public td::actor::Actor { } response_->add_header(std::move(h)); } + response_->add_header({PROXY_ENTRY_VERISON_HEADER_NAME, PROXY_VERSION_HEADER}); auto S = response_->complete_parse_header(); if (S.is_error()) { abort_query(S.move_as_error()); @@ -579,6 +544,7 @@ class TcpToRldpRequestSender : public td::actor::Actor { td::actor::ActorId adnl_; td::actor::ActorId dht_; td::actor::ActorId rldp_; + td::actor::ActorId proxy_; td::actor::ActorId dns_resolver_; std::unique_ptr response_; @@ -588,47 +554,28 @@ class TcpToRldpRequestSender : public td::actor::Actor { class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase { public: RldpTcpTunnel(td::Bits256 transfer_id, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id, - td::actor::ActorId adnl, td::actor::ActorId rldp, td::SocketFd fd) + td::actor::ActorId adnl, td::actor::ActorId rldp, + td::actor::ActorId proxy, td::SocketFd fd) : id_(transfer_id) , src_(src) , local_id_(local_id) , adnl_(std::move(adnl)) , rldp_(std::move(rldp)) + , proxy_(std::move(proxy)) , fd_(std::move(fd)) { } - void start_up() override { - self_ = actor_id(this); - td::actor::SchedulerContext::get()->get_poll().subscribe(fd_.get_poll_info().extract_pollable_fd(this), - td::PollFlags::ReadWrite()); - - class Cb : public ton::adnl::Adnl::Callback { - public: - explicit Cb(td::actor::ActorId id) : self_id_(std::move(id)) { - } - void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, - td::BufferSlice data) override { - LOG(INFO) << "rldp tcp tunnel: dropping message"; - } - void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data, - td::Promise promise) override { - td::actor::send_closure(self_id_, &RldpTcpTunnel::receive_query, std::move(data), std::move(promise)); - } - - private: - td::actor::ActorId self_id_; - }; - td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_, generate_prefix(), - std::make_unique(actor_id(this))); - process(); - } + void start_up() override; void tear_down() override { LOG(INFO) << "RldpTcpTunnel: tear_down"; - td::actor::send_closure(adnl_, &ton::adnl::Adnl::unsubscribe, local_id_, generate_prefix()); td::actor::SchedulerContext::get()->get_poll().unsubscribe(fd_.get_poll_info().get_pollable_fd_ref()); } + void registered_sender(RegisteredPayloadSenderGuard guard) { + guard_ = std::move(guard); + } + void notify() override { td::actor::send_closure(self_, &RldpTcpTunnel::process); } @@ -642,19 +589,14 @@ class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase { td::actor::send_closure(SelfId, &RldpTcpTunnel::got_data_from_rldp, std::move(R)); }); - auto f = ton::create_serialize_tl_object(id_, out_seqno_++, 1 << 17); + auto f = ton::create_serialize_tl_object(id_, out_seqno_++, + (1 << 21) - (1 << 11)); td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, src_, "payload part", std::move(P), - td::Timestamp::in(60.0), std::move(f), (1 << 18) + 1024); + td::Timestamp::in(60.0), std::move(f), (1 << 21) + 1024); } - void receive_query(td::BufferSlice data, td::Promise promise) { - auto F = ton::fetch_tl_object(data, true); - if (F.is_error()) { - LOG(INFO) << "failed to parse query: " << F.error(); - promise.set_error(F.move_as_error()); - return; - } - auto f = F.move_as_ok(); + void receive_query(ton::tl_object_ptr f, + td::Promise promise) { if (cur_promise_) { LOG(INFO) << "failed to process query: previous query is active"; promise.set_error(td::Status::Error("previous query is active")); @@ -772,11 +714,13 @@ class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase { } td::Bits256 id_; + RegisteredPayloadSenderGuard guard_; ton::adnl::AdnlNodeIdShort src_; ton::adnl::AdnlNodeIdShort local_id_; td::actor::ActorId adnl_; td::actor::ActorId rldp_; + td::actor::ActorId proxy_; td::BufferedFd fd_; @@ -795,7 +739,7 @@ class RldpToTcpRequestSender : public td::actor::Actor { std::unique_ptr request, std::shared_ptr request_payload, td::Promise promise, td::actor::ActorId adnl, td::actor::ActorId rldp, - td::actor::ActorId remote) + td::actor::ActorId proxy, td::actor::ActorId remote) : id_(id) , local_id_(local_id) , dst_(dst) @@ -805,6 +749,7 @@ class RldpToTcpRequestSender : public td::actor::Actor { , promise_(std::move(promise)) , adnl_(adnl) , rldp_(rldp) + , proxy_(proxy) , remote_(std::move(remote)) { } void start_up() override { @@ -826,8 +771,9 @@ class RldpToTcpRequestSender : public td::actor::Actor { void got_result(std::pair, std::shared_ptr> R) { td::actor::create_actor("HttpPayloadSender(R)", std::move(R.second), id_, local_id_, adnl_, - rldp_) + rldp_, proxy_) .release(); + R.first->add_header({PROXY_SITE_VERISON_HEADER_NAME, PROXY_VERSION_HEADER}); auto f = ton::serialize_tl_object(R.first->store_tl(), true); promise_.set_value(std::move(f)); stop(); @@ -853,6 +799,7 @@ class RldpToTcpRequestSender : public td::actor::Actor { td::actor::ActorId adnl_; td::actor::ActorId rldp_; + td::actor::ActorId proxy_; td::actor::ActorId remote_; }; @@ -1067,6 +1014,22 @@ class RldpHttpProxy : public td::actor::Actor { server_ = ton::http::HttpServer::create(port_, std::make_shared(actor_id(this))); } + class AdnlPayloadCb : public ton::adnl::Adnl::Callback { + public: + AdnlPayloadCb(td::actor::ActorId id) : self_id_(id) { + } + void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, + td::BufferSlice data) override { + } + void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data, + td::Promise promise) override { + td::actor::send_closure(self_id_, &RldpHttpProxy::receive_payload_part_request, std::move(data), + std::move(promise)); + } + + private: + td::actor::ActorId self_id_; + }; for (auto &serv_id : server_ids_) { class AdnlCb : public ton::adnl::Adnl::Callback { public: @@ -1087,7 +1050,15 @@ class RldpHttpProxy : public td::actor::Actor { td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, serv_id, ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_request::ID), std::make_unique(actor_id(this))); + if (local_id_ != serv_id) { + td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, serv_id, + ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_getNextPayloadPart::ID), + std::make_unique(actor_id(this))); + } } + td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_, + ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_getNextPayloadPart::ID), + std::make_unique(actor_id(this))); rldp_ = ton::rldp::Rldp::create(adnl_.get()); td::actor::send_closure(rldp_, &ton::rldp::Rldp::set_default_mtu, 16 << 10); @@ -1141,7 +1112,7 @@ class RldpHttpProxy : public td::actor::Actor { td::actor::create_actor("outboundreq", local_id_, host, std::move(request), std::move(payload), std::move(promise), adnl_.get(), dht_.get(), - rldp_.get(), dns_resolver_.get()) + rldp_.get(), actor_id(this), dns_resolver_.get()) .release(); } @@ -1237,7 +1208,7 @@ class RldpHttpProxy : public td::actor::Actor { LOG(INFO) << "starting HTTP over RLDP request"; td::actor::create_actor("inboundreq", f->id_, dst, src, std::move(request), payload.move_as_ok(), std::move(promise), adnl_.get(), rldp_.get(), - server.http_remote_.get()) + actor_id(this), server.http_remote_.get()) .release(); } @@ -1249,10 +1220,52 @@ class RldpHttpProxy : public td::actor::Actor { return; } td::actor::create_actor(td::actor::ActorOptions().with_name("tunnel").with_poll(), id, src, local_id, - adnl_.get(), rldp_.get(), fd.move_as_ok()).release(); + adnl_.get(), rldp_.get(), actor_id(this), fd.move_as_ok()) + .release(); + std::vector> headers; + headers.push_back( + ton::create_tl_object(PROXY_SITE_VERISON_HEADER_NAME, PROXY_VERSION_HEADER)); promise.set_result(ton::create_serialize_tl_object( - http_version, 200, "Connection Established", std::vector>(), - false)); + http_version, 200, "Connection Established", std::move(headers), false)); + } + + void receive_payload_part_request(td::BufferSlice data, td::Promise promise) { + auto F = ton::fetch_tl_object(data, true); + if (F.is_error()) { + LOG(INFO) << "failed to parse query: " << F.error(); + promise.set_error(F.move_as_error()); + return; + } + auto f = F.move_as_ok(); + auto it = payload_senders_.find(f->id_); + if (it == payload_senders_.end()) { + LOG(INFO) << "failed to answer query: unknown request id"; + promise.set_error(td::Status::Error("unknown request id")); + return; + } + it->second(std::move(f), std::move(promise)); + } + + void register_payload_sender( + td::Bits256 id, + std::function, td::Promise)> f, + td::Promise promise) { + auto &f1 = payload_senders_[id]; + if (f1) { + promise.set_error(td::Status::Error("duplicate id")); + return; + } + f1 = std::move(f); + promise.set_result(RegisteredPayloadSenderGuard( + new std::pair, td::Bits256>(actor_id(this), id), + [](std::pair, td::Bits256> *x) { + td::actor::send_closure(x->first, &RldpHttpProxy::unregister_payload_sender, x->second); + delete x; + })); + } + + void unregister_payload_sender(td::Bits256 id) { + payload_senders_.erase(id); } void add_adnl_addr(ton::adnl::AdnlNodeIdShort id) { @@ -1304,6 +1317,10 @@ class RldpHttpProxy : public td::actor::Actor { td::actor::ActorOwn tonlib_client_; td::actor::ActorOwn dns_resolver_; + + std::map, td::Promise)>> + payload_senders_; }; void TcpToRldpRequestSender::resolve() { @@ -1329,6 +1346,67 @@ void TcpToRldpRequestSender::resolve() { td::actor::send_closure(dns_resolver_, &DNSResolver::resolve, host_, std::move(P)); } +void HttpRldpPayloadSender::start_up() { + td::actor::send_closure( + proxy_, &RldpHttpProxy::register_payload_sender, id_, + [SelfId = actor_id(this)](ton::tl_object_ptr f, + td::Promise promise) { + td::actor::send_closure(SelfId, &HttpRldpPayloadSender::receive_query, std::move(f), std::move(promise)); + }, + [SelfId = actor_id(this)](td::Result R) { + if (R.is_error()) { + LOG(INFO) << "Failed to register request sender: " << R.move_as_error(); + } + td::actor::send_closure(SelfId, &HttpRldpPayloadSender::registered_sender, R.move_as_ok()); + }); + + class Cb : public ton::http::HttpPayload::Callback { + public: + Cb(td::actor::ActorId id, size_t watermark) : self_id_(id), watermark_(watermark) { + } + void run(size_t ready_bytes) override { + if (!reached_ && ready_bytes >= watermark_) { + reached_ = true; + td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false); + } else if (reached_ && ready_bytes < watermark_) { + reached_ = false; + } + } + void completed() override { + td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false); + } + + private: + bool reached_ = false; + td::actor::ActorId self_id_; + size_t watermark_; + }; + + payload_->add_callback( + std::make_unique(actor_id(this), is_tunnel_ ? 1 : ton::http::HttpRequest::low_watermark())); + + alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 60.0 : 10.0); +} + +void RldpTcpTunnel::start_up() { + self_ = actor_id(this); + td::actor::SchedulerContext::get()->get_poll().subscribe(fd_.get_poll_info().extract_pollable_fd(this), + td::PollFlags::ReadWrite()); + td::actor::send_closure( + proxy_, &RldpHttpProxy::register_payload_sender, id_, + [SelfId = actor_id(this)](ton::tl_object_ptr f, + td::Promise promise) { + td::actor::send_closure(SelfId, &RldpTcpTunnel::receive_query, std::move(f), std::move(promise)); + }, + [SelfId = actor_id(this)](td::Result R) { + if (R.is_error()) { + LOG(INFO) << "Failed to register request sender: " << R.move_as_error(); + } + td::actor::send_closure(SelfId, &RldpTcpTunnel::registered_sender, R.move_as_ok()); + }); + process(); +} + int main(int argc, char *argv[]) { SET_VERBOSITY_LEVEL(verbosity_WARNING); @@ -1340,7 +1418,7 @@ int main(int argc, char *argv[]) { td::log_interface = td::default_log_interface; }; - auto add_local_host = [&](const std::string& local, const std::string& remote) -> td::Status { + auto add_local_host = [&](const std::string &local, const std::string &remote) -> td::Status { std::string host; std::vector ports; auto p = local.find(':'); @@ -1357,7 +1435,7 @@ int main(int argc, char *argv[]) { } try { ports.push_back((td::uint16)std::stoul(local.substr(p, p2 - p))); - } catch (const std::logic_error& e) { + } catch (const std::logic_error &e) { return td::Status::Error(PSLICE() << "Invalid port: " << local.substr(p, p2 - p)); } p = p2 + 1; @@ -1427,9 +1505,7 @@ int main(int argc, char *argv[]) { p.add_checked_option('L', "local", ":, hostname that will be proxied to localhost\n" " is a comma-separated list of ports (may be omitted, default: 80, 443)\n", - [&](td::Slice arg) -> td::Status { - return add_local_host(arg.str(), "127.0.0.1"); - }); + [&](td::Slice arg) -> td::Status { return add_local_host(arg.str(), "127.0.0.1"); }); p.add_option('D', "db", "db root", [&](td::Slice arg) { td::actor::send_closure(x, &RldpHttpProxy::set_db_root, arg.str()); }); p.add_checked_option(