diff --git a/http/http-connection.cpp b/http/http-connection.cpp index 6ba1a813..f1c3c58c 100644 --- a/http/http-connection.cpp +++ b/http/http-connection.cpp @@ -58,9 +58,7 @@ void HttpConnection::loop() { } TRY_STATUS(buffered_fd_.flush_write()); if (writing_payload_ && buffered_fd_.left_unwritten() < fd_high_watermark()) { - auto w = buffered_fd_.left_unwritten(); - continue_payload_write(); - written = buffered_fd_.left_unwritten() > w; + written = continue_payload_write(); } if (close_after_write_ && !writing_payload_ && !buffered_fd_.left_unwritten()) { LOG(INFO) << "close after write"; @@ -120,7 +118,7 @@ void HttpConnection::write_payload(std::shared_ptr payload) { class Cb : public HttpPayload::Callback { public: - Cb(td::actor::ActorId conn) : conn_(conn) { + Cb(td::actor::ActorId conn, size_t watermark) : conn_(conn), watermark_(watermark) { } void run(size_t ready_bytes) override { if (!reached_ && ready_bytes >= watermark_) { @@ -135,23 +133,23 @@ void HttpConnection::write_payload(std::shared_ptr payload) { } private: - size_t watermark_ = chunk_size(); - bool reached_ = false; - td::actor::ActorId conn_; + size_t watermark_; + bool reached_ = false; }; - writing_payload_->add_callback(std::make_unique(actor_id(this))); + writing_payload_->add_callback(std::make_unique( + actor_id(this), writing_payload_->payload_type() == HttpPayload::PayloadType::pt_tunnel ? 1 : chunk_size())); continue_payload_write(); } -void HttpConnection::continue_payload_write() { +bool HttpConnection::continue_payload_write() { if (!writing_payload_) { - return; + return false; } if (writing_payload_->is_error()) { stop(); - return; + return false; } auto t = writing_payload_->payload_type(); @@ -159,19 +157,24 @@ void HttpConnection::continue_payload_write() { t = HttpPayload::PayloadType::pt_chunked; } + bool wrote = false; while (!writing_payload_->written()) { if (buffered_fd_.left_unwritten() > fd_high_watermark()) { - return; + return wrote; } - if (!writing_payload_->parse_completed() && writing_payload_->ready_bytes() < chunk_size()) { - return; + bool is_tunnel = writing_payload_->payload_type() == HttpPayload::PayloadType::pt_tunnel; + if (!is_tunnel && !writing_payload_->parse_completed() && writing_payload_->ready_bytes() < chunk_size()) { + return wrote; } - writing_payload_->store_http(buffered_fd_.output_buffer(), chunk_size(), t); + if (is_tunnel && writing_payload_->ready_bytes() == 0) { + return wrote; + } + wrote |= writing_payload_->store_http(buffered_fd_.output_buffer(), chunk_size(), t); } if (writing_payload_->parse_completed() && writing_payload_->written()) { payload_written(); - return; } + return wrote; } td::Status HttpConnection::read_payload(HttpResponse *response) { diff --git a/http/http-connection.h b/http/http-connection.h index f31af484..840ac8a1 100644 --- a/http/http-connection.h +++ b/http/http-connection.h @@ -65,9 +65,7 @@ class HttpConnection : public td::actor::Actor, public td::ObserverBase { void send_request(std::unique_ptr request, std::shared_ptr payload); void send_response(std::unique_ptr response, std::shared_ptr payload); void write_payload(std::shared_ptr payload); - void continue_payload_write(); - td::Status receive_request(); - td::Status receive_response(); + bool continue_payload_write(); td::Status read_payload(HttpRequest *request); td::Status read_payload(HttpResponse *response); td::Status read_payload(std::shared_ptr payload); diff --git a/http/http-inbound-connection.h b/http/http-inbound-connection.h index 876c3902..c63e0a30 100644 --- a/http/http-inbound-connection.h +++ b/http/http-inbound-connection.h @@ -35,7 +35,8 @@ class HttpInboundConnection : public HttpConnection { td::Status receive_eof() override { found_eof_ = true; if (reading_payload_) { - if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof) { + if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof && + reading_payload_->payload_type() != HttpPayload::PayloadType::pt_tunnel) { return td::Status::Error("unexpected EOF"); } else { reading_payload_->complete_parse(); diff --git a/http/http-outbound-connection.h b/http/http-outbound-connection.h index a3f5d513..a3d75258 100644 --- a/http/http-outbound-connection.h +++ b/http/http-outbound-connection.h @@ -44,7 +44,8 @@ class HttpOutboundConnection : public HttpConnection { td::Status receive_eof() override { found_eof_ = true; if (reading_payload_) { - if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof) { + if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof && + reading_payload_->payload_type() != HttpPayload::PayloadType::pt_tunnel) { return td::Status::Error("unexpected EOF"); } else { LOG(INFO) << "stopping (EOF payload)"; diff --git a/http/http.cpp b/http/http.cpp index b3c56c9e..427032f4 100644 --- a/http/http.cpp +++ b/http/http.cpp @@ -164,6 +164,8 @@ td::Result> HttpRequest::create_empty_payload() { if (!need_payload()) { return std::make_shared(HttpPayload::PayloadType::pt_empty); + } else if (method_ == "CONNECT") { + return std::make_shared(HttpPayload::PayloadType::pt_tunnel, low_watermark(), high_watermark()); } else if (found_content_length_) { return std::make_shared(HttpPayload::PayloadType::pt_content_length, low_watermark(), high_watermark(), content_length_); @@ -175,7 +177,7 @@ td::Result> HttpRequest::create_empty_payload() { } bool HttpRequest::need_payload() const { - return found_content_length_ || found_transfer_encoding_; + return found_content_length_ || found_transfer_encoding_ || method_ == "CONNECT"; } td::Status HttpRequest::add_header(HttpHeader header) { @@ -284,7 +286,8 @@ td::Status HttpPayload::parse(td::ChainBufferReader &input) { case PayloadType::pt_empty: UNREACHABLE(); case PayloadType::pt_eof: - cur_chunk_size_ = 1 << 30; + case PayloadType::pt_tunnel: + cur_chunk_size_ = 1LL << 60; break; case PayloadType::pt_chunked: state_ = ParseState::reading_crlf; @@ -480,17 +483,18 @@ void HttpPayload::run_callbacks() { } } -void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type) { +bool HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type) { if (store_type == PayloadType::pt_empty) { - return; + return false; } slice_gc(); + bool wrote = false; while (chunks_.size() > 0 && max_size > 0) { auto cur_state = state_.load(std::memory_order_consume); auto s = get_slice(max_size); if (s.size() == 0) { if (cur_state != ParseState::reading_trailer && cur_state != ParseState::completed) { - return; + return wrote; } else { break; } @@ -500,28 +504,33 @@ void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt if (store_type == PayloadType::pt_chunked) { char buf[64]; ::sprintf(buf, "%lx\r\n", s.size()); - output.append(td::Slice(buf, strlen(buf))); + auto slice = td::Slice(buf, strlen(buf)); + wrote |= !slice.empty(); + output.append(slice); } + wrote |= !s.empty(); output.append(std::move(s)); if (store_type == PayloadType::pt_chunked) { output.append(td::Slice("\r\n", 2)); + wrote = true; } } if (chunks_.size() != 0) { - return; + return wrote; } if (!written_zero_chunk_) { if (store_type == PayloadType::pt_chunked) { output.append(td::Slice("0\r\n", 3)); + wrote = true; } written_zero_chunk_ = true; } if (store_type != PayloadType::pt_chunked) { written_trailer_ = true; - return; + return wrote; } while (max_size > 0) { @@ -529,15 +538,16 @@ void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt HttpHeader h = get_header(); if (h.empty()) { if (cur_state != ParseState::completed) { - return; + return wrote; } else { break; } } auto s = h.size(); h.store_http(output); + wrote = true; if (max_size <= s) { - return; + return wrote; } max_size -= s; } @@ -545,7 +555,9 @@ void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt if (!written_trailer_) { output.append(td::Slice("\r\n", 2)); written_trailer_ = true; + wrote = true; } + return wrote; } tl_object_ptr HttpPayload::store_tl(size_t max_size) { @@ -729,17 +741,18 @@ td::Result> HttpResponse::parse(std::unique_ptr> HttpResponse::create(std::string proto_version, td::uint32 code, std::string reason, bool force_no_payload, - bool keep_alive) { + bool keep_alive, bool is_tunnel) { if (proto_version != "HTTP/1.0" && proto_version != "HTTP/1.1") { return td::Status::Error(PSTRING() << "unsupported http version '" << proto_version << "'"); } @@ -749,7 +762,7 @@ td::Result> HttpResponse::create(std::string proto } return std::make_unique(std::move(proto_version), code, std::move(reason), force_no_payload, - keep_alive); + keep_alive, is_tunnel); } td::Status HttpResponse::complete_parse_header() { @@ -767,6 +780,8 @@ td::Result> HttpResponse::create_empty_payload() { if (!need_payload()) { return std::make_shared(HttpPayload::PayloadType::pt_empty); + } else if (is_tunnel_) { + return std::make_shared(HttpPayload::PayloadType::pt_tunnel, low_watermark(), high_watermark()); } else if (found_content_length_) { return std::make_shared(HttpPayload::PayloadType::pt_content_length, low_watermark(), high_watermark(), content_length_); @@ -828,10 +843,12 @@ void HttpResponse::store_http(td::ChainBufferWriter &output) { for (auto &x : options_) { x.store_http(output); } - if (keep_alive_) { - HttpHeader{"Connection", "Keep-Alive"}.store_http(output); - } else { - HttpHeader{"Connection", "Close"}.store_http(output); + if (!is_tunnel_) { + if (keep_alive_) { + HttpHeader{"Connection", "Keep-Alive"}.store_http(output); + } else { + HttpHeader{"Connection", "Close"}.store_http(output); + } } output.append(td::Slice("\r\n", 2)); } @@ -893,7 +910,9 @@ void answer_error(HttpStatusCode code, std::string reason, } auto response = HttpResponse::create("HTTP/1.0", code, reason, false, false).move_as_ok(); response->add_header(HttpHeader{"Content-Length", "0"}); + response->complete_parse_header(); auto payload = response->create_empty_payload().move_as_ok(); + payload->complete_parse(); CHECK(payload->parse_completed()); promise.set_value(std::make_pair(std::move(response), std::move(payload))); } diff --git a/http/http.h b/http/http.h index f5e2087a..8e109455 100644 --- a/http/http.h +++ b/http/http.h @@ -64,7 +64,7 @@ td::Result get_header(std::string line); class HttpPayload { public: - enum class PayloadType { pt_empty, pt_eof, pt_chunked, pt_content_length }; + enum class PayloadType { pt_empty, pt_eof, pt_chunked, pt_content_length, pt_tunnel }; HttpPayload(PayloadType t, size_t low_watermark, size_t high_watermark, td::uint64 size) : type_(t), low_watermark_(low_watermark), high_watermark_(high_watermark), cur_chunk_size_(size) { CHECK(t == PayloadType::pt_content_length); @@ -75,17 +75,15 @@ class HttpPayload { CHECK(t != PayloadType::pt_content_length); CHECK(t != PayloadType::pt_empty); switch (t) { - case PayloadType::pt_empty: - UNREACHABLE(); case PayloadType::pt_eof: + case PayloadType::pt_tunnel: state_ = ParseState::reading_chunk_data; break; case PayloadType::pt_chunked: state_ = ParseState::reading_chunk_header; break; - case PayloadType::pt_content_length: - state_ = ParseState::reading_chunk_data; - break; + default: + UNREACHABLE(); } } HttpPayload(PayloadType t) : type_(t) { @@ -136,7 +134,7 @@ class HttpPayload { void slice_gc(); HttpHeader get_header(); - void store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type); + bool store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type); tl_object_ptr store_tl(size_t max_size); bool written() const { @@ -267,9 +265,11 @@ class HttpResponse { } static td::Result> create(std::string proto_version, td::uint32 code, - std::string reason, bool force_no_payload, bool keep_alive); + std::string reason, bool force_no_payload, bool keep_alive, + bool is_tunnel = false); - HttpResponse(std::string proto_version, td::uint32 code, std::string reason, bool force_no_payload, bool keep_alive); + HttpResponse(std::string proto_version, td::uint32 code, std::string reason, bool force_no_payload, bool keep_alive, + bool is_tunnel = false); bool check_parse_header_completed() const; bool keep_alive() const { @@ -323,6 +323,7 @@ class HttpResponse { bool keep_alive_ = false; std::vector options_; + bool is_tunnel_ = false; }; void answer_error(HttpStatusCode code, std::string reason, diff --git a/rldp-http-proxy/rldp-http-proxy.cpp b/rldp-http-proxy/rldp-http-proxy.cpp index fb5ae4c1..6ecb8e3a 100644 --- a/rldp-http-proxy/rldp-http-proxy.cpp +++ b/rldp-http-proxy/rldp-http-proxy.cpp @@ -52,6 +52,8 @@ #include #include #include "git.h" +#include "td/utils/BufferedFd.h" +#include "common/delay.h" #if TD_DARWIN || TD_LINUX #include @@ -128,8 +130,15 @@ class HttpRldpPayloadReceiver : public td::actor::Actor { public: HttpRldpPayloadReceiver(std::shared_ptr payload, td::Bits256 transfer_id, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id, - td::actor::ActorId adnl, td::actor::ActorId rldp) - : payload_(std::move(payload)), id_(transfer_id), src_(src), local_id_(local_id), adnl_(adnl), rldp_(rldp) { + td::actor::ActorId adnl, td::actor::ActorId rldp, + bool is_tunnel = false) + : payload_(std::move(payload)) + , id_(transfer_id) + , src_(src) + , local_id_(local_id) + , adnl_(adnl) + , rldp_(rldp) + , is_tunnel_(is_tunnel) { } void start_up() override { @@ -178,13 +187,14 @@ class HttpRldpPayloadReceiver : public td::actor::Actor { auto f = ton::create_serialize_tl_object( id_, seqno_++, static_cast(chunk_size())); + auto timeout = td::Timestamp::in(is_tunnel_ ? 60.0 : 15.0); td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, src_, "payload part", std::move(P), - td::Timestamp::in(15.0), std::move(f), 2 * chunk_size() + 1024); + timeout, std::move(f), 2 * chunk_size() + 1024); } void add_data(td::BufferSlice data) { LOG(INFO) << "HttpPayloadReceiver: received answer (size " << data.size() << ")"; - auto F = ton::fetch_tl_object(std::move(data), true); + auto F = ton::fetch_tl_object(data, true); if (F.is_error()) { abort_query(F.move_as_error()); return; @@ -243,14 +253,20 @@ class HttpRldpPayloadReceiver : public td::actor::Actor { bool sent_ = false; td::int32 seqno_ = 0; + bool is_tunnel_; }; class HttpRldpPayloadSender : public td::actor::Actor { public: HttpRldpPayloadSender(std::shared_ptr payload, td::Bits256 transfer_id, ton::adnl::AdnlNodeIdShort local_id, td::actor::ActorId adnl, - td::actor::ActorId rldp) - : payload_(std::move(payload)), id_(transfer_id), local_id_(local_id), adnl_(adnl), rldp_(rldp) { + td::actor::ActorId rldp, bool is_tunnel = false) + : payload_(std::move(payload)) + , id_(transfer_id) + , local_id_(local_id) + , adnl_(adnl) + , rldp_(rldp) + , is_tunnel_(is_tunnel) { } std::string generate_prefix() const { @@ -287,32 +303,36 @@ class HttpRldpPayloadSender : public td::actor::Actor { class Cb : public ton::http::HttpPayload::Callback { public: - Cb(td::actor::ActorId id) : self_id_(id) { + Cb(td::actor::ActorId id, size_t watermark) : self_id_(id), watermark_(watermark) { } void run(size_t ready_bytes) override { if (!reached_ && ready_bytes >= watermark_) { reached_ = true; - td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query); + td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false); } else if (reached_ && ready_bytes < watermark_) { reached_ = false; } } void completed() override { - td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query); + td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false); } private: - size_t watermark_ = ton::http::HttpRequest::low_watermark(); bool reached_ = false; td::actor::ActorId self_id_; + size_t watermark_; }; - payload_->add_callback(std::make_unique(actor_id(this))); + payload_->add_callback(std::make_unique(actor_id(this), + is_tunnel_ ? 1 : ton::http::HttpRequest::low_watermark())); - alarm_timestamp() = td::Timestamp::in(10.0); + alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 60.0 : 10.0); } - void try_answer_query() { + void try_answer_query(bool from_timer = false) { + if (from_timer) { + active_timer_ = false; + } if (!cur_query_promise_) { return; } @@ -321,6 +341,15 @@ class HttpRldpPayloadSender : public td::actor::Actor { } if (payload_->parse_completed() || payload_->ready_bytes() >= ton::http::HttpRequest::low_watermark()) { answer_query(); + } else if (!is_tunnel_ || payload_->ready_bytes() == 0) { + return; + } else if (from_timer) { + answer_query(); + } else if (!active_timer_) { + active_timer_ = true; + ton::delay_action([SelfId = actor_id(this)]() { + td::actor::send_closure(SelfId, &HttpRldpPayloadSender::try_answer_query, true); + }, td::Timestamp::in(0.001)); } } @@ -348,16 +377,12 @@ class HttpRldpPayloadSender : public td::actor::Actor { LOG(INFO) << "received request. size=" << cur_query_size_ << " parse_completed=" << payload_->parse_completed() << " ready_bytes=" << payload_->ready_bytes(); - if (payload_->parse_completed() || payload_->ready_bytes() >= ton::http::HttpRequest::low_watermark()) { - answer_query(); - return; - } - - alarm_timestamp() = td::Timestamp::in(10.0); + alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 50.0 : 10.0); + try_answer_query(false); } void receive_query(td::BufferSlice data, td::Promise promise) { - auto F = ton::fetch_tl_object(std::move(data), true); + auto F = ton::fetch_tl_object(data, true); if (F.is_error()) { LOG(INFO) << "failed to parse query: " << F.move_as_error(); return; @@ -367,6 +392,10 @@ class HttpRldpPayloadSender : public td::actor::Actor { void alarm() override { if (cur_query_promise_) { + if (is_tunnel_) { + answer_query(); + return; + } LOG(INFO) << "timeout on inbound connection. closing http transfer"; } else { LOG(INFO) << "timeout on RLDP connection. closing http transfer"; @@ -382,7 +411,7 @@ class HttpRldpPayloadSender : public td::actor::Actor { } seqno_++; - alarm_timestamp() = td::Timestamp::in(30.0); + alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 60.0 : 30.0); } void abort_query(td::Status error) { @@ -412,6 +441,7 @@ class HttpRldpPayloadSender : public td::actor::Actor { size_t cur_query_size_; td::Promise cur_query_promise_; + bool is_tunnel_, active_timer_ = false; }; class RldpHttpProxy; @@ -452,7 +482,8 @@ class TcpToRldpRequestSender : public td::actor::Actor { } }); - td::actor::create_actor("HttpPayloadSender", request_payload_, id_, local_id_, adnl_, rldp_) + td::actor::create_actor("HttpPayloadSender", request_payload_, id_, local_id_, adnl_, rldp_, + is_tunnel()) .release(); auto f = ton::serialize_tl_object(request_->store_tl(id_), true); @@ -461,13 +492,14 @@ class TcpToRldpRequestSender : public td::actor::Actor { } void got_result(td::BufferSlice data) { - auto F = ton::fetch_tl_object(std::move(data), true); + auto F = ton::fetch_tl_object(data, true); if (F.is_error()) { abort_query(F.move_as_error()); return; } auto f = F.move_as_ok(); - auto R = ton::http::HttpResponse::create(f->http_version_, f->status_code_, f->reason_, false, true); + auto R = ton::http::HttpResponse::create( + f->http_version_, f->status_code_, f->reason_, false, true, is_tunnel() && f->status_code_ == 200); if (R.is_error()) { abort_query(R.move_as_error()); return; @@ -498,7 +530,7 @@ class TcpToRldpRequestSender : public td::actor::Actor { } }); td::actor::create_actor("HttpPayloadReceiver", response_payload_, id_, dst_, local_id_, - adnl_, rldp_) + adnl_, rldp_, is_tunnel()) .release(); promise_.set_value(std::make_pair(std::move(response_), std::move(response_payload_))); @@ -515,6 +547,10 @@ class TcpToRldpRequestSender : public td::actor::Actor { } protected: + bool is_tunnel() const { + return request_->method() == "CONNECT"; + } + td::Bits256 id_; ton::adnl::AdnlNodeIdShort local_id_; @@ -535,6 +571,208 @@ class TcpToRldpRequestSender : public td::actor::Actor { std::shared_ptr response_payload_; }; +class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase { + public: + RldpTcpTunnel(td::Bits256 transfer_id, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id, + td::actor::ActorId adnl, td::actor::ActorId rldp, td::SocketFd fd) + : id_(transfer_id) + , src_(src) + , local_id_(local_id) + , adnl_(std::move(adnl)) + , rldp_(std::move(rldp)) + , fd_(std::move(fd)) { + } + + void start_up() override { + self_ = actor_id(this); + td::actor::SchedulerContext::get()->get_poll().subscribe(fd_.get_poll_info().extract_pollable_fd(this), + td::PollFlags::ReadWrite()); + + class Cb : public ton::adnl::Adnl::Callback { + public: + explicit Cb(td::actor::ActorId id) : self_id_(std::move(id)) { + } + void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, + td::BufferSlice data) override { + LOG(INFO) << "rldp tcp tunnel: dropping message"; + } + void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data, + td::Promise promise) override { + td::actor::send_closure(self_id_, &RldpTcpTunnel::receive_query, std::move(data), std::move(promise)); + } + + private: + td::actor::ActorId self_id_; + }; + td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_, generate_prefix(), + std::make_unique(actor_id(this))); + process(); + } + + void tear_down() override { + LOG(INFO) << "RldpTcpTunnel: tear_down"; + td::actor::send_closure(adnl_, &ton::adnl::Adnl::unsubscribe, local_id_, generate_prefix()); + td::actor::SchedulerContext::get()->get_poll().unsubscribe(fd_.get_poll_info().get_pollable_fd_ref()); + } + + void notify() override { + td::actor::send_closure(self_, &RldpTcpTunnel::process); + } + + void request_data() { + if (close_ || sent_request_) { + return; + } + sent_request_ = true; + auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result R) { + td::actor::send_closure(SelfId, &RldpTcpTunnel::got_data_from_rldp, std::move(R)); + }); + + auto f = ton::create_serialize_tl_object(id_, out_seqno_++, 1 << 17); + td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, src_, "payload part", std::move(P), + td::Timestamp::in(60.0), std::move(f), (1 << 18) + 1024); + } + + void receive_query(td::BufferSlice data, td::Promise promise) { + auto F = ton::fetch_tl_object(data, true); + if (F.is_error()) { + LOG(INFO) << "failed to parse query: " << F.error(); + promise.set_error(F.move_as_error()); + return; + } + auto f = F.move_as_ok(); + if (cur_promise_) { + LOG(INFO) << "failed to process query: previous query is active"; + promise.set_error(td::Status::Error("previous query is active")); + return; + } + if (f->seqno_ != cur_seqno_) { + LOG(INFO) << "failed to process query: seqno mismatch"; + promise.set_error(td::Status::Error("seqno mismatch")); + return; + } + LOG(INFO) << "RldpTcpTunnel: received query, seqno=" << cur_seqno_; + cur_promise_ = std::move(promise); + cur_max_chunk_size_ = f->max_chunk_size_; + alarm_timestamp() = td::Timestamp::in(50.0); + process(); + } + + void got_data_from_rldp(td::Result R) { + if (R.is_error()) { + abort(R.move_as_error()); + return; + } + td::BufferSlice data = R.move_as_ok(); + LOG(INFO) << "RldpTcpTunnel: received data from rldp: size=" << data.size(); + sent_request_ = false; + auto F = ton::fetch_tl_object(data, true); + if (F.is_error()) { + abort(F.move_as_error()); + return; + } + auto f = F.move_as_ok(); + fd_.output_buffer().append(std::move(f->data_)); + if (f->last_) { + got_last_part_ = true; + } + process(); + } + + void process() { + if (!close_) { + auto status = [&] { + TRY_STATUS(fd_.flush_read()); + TRY_STATUS(fd_.flush_write()); + close_ = td::can_close(fd_); + return td::Status::OK(); + }(); + if (status.is_error()) { + abort(std::move(status)); + return; + } + } + if (got_last_part_) { + close_ = true; + } + answer_query(); + request_data(); + } + + void answer_query(bool allow_empty = false, bool from_timer = false) { + if (from_timer) { + active_timer_ = false; + } + auto &input = fd_.input_buffer(); + if (cur_promise_ && (!input.empty() || close_ || allow_empty)) { + if (!from_timer && !close_ && !allow_empty && input.size() < ton::http::HttpRequest::low_watermark()) { + if (!active_timer_) { + active_timer_ = true; + ton::delay_action([SelfId = actor_id(this)]() { + td::actor::send_closure(SelfId, &RldpTcpTunnel::answer_query, false, true); + }, td::Timestamp::in(0.001)); + } + return; + } + size_t s = std::min(input.size(), cur_max_chunk_size_); + td::BufferSlice data(s); + LOG(INFO) << "RldpTcpTunnel: sending data to rldp: size=" << data.size(); + input.advance(s, td::as_mutable_slice(data)); + cur_promise_.set_result(ton::create_serialize_tl_object( + std::move(data), std::vector>(), close_)); + ++cur_seqno_; + cur_promise_.reset(); + alarm_timestamp() = td::Timestamp::never(); + if (close_) { + stop(); + return; + } + } + } + + void alarm() override { + answer_query(true, false); + } + + void abort(td::Status status) { + LOG(INFO) << "RldpTcpTunnel error: " << status; + if (cur_promise_) { + cur_promise_.set_error(status.move_as_error()); + } + stop(); + } + + private: + std::string generate_prefix() const { + std::string x(static_cast(36), '\0'); + auto S = td::MutableSlice{x}; + CHECK(S.size() == 36); + + auto id = ton::ton_api::http_getNextPayloadPart::ID; + S.copy_from(td::Slice(reinterpret_cast(&id), 4)); + S.remove_prefix(4); + S.copy_from(id_.as_slice()); + return x; + } + + td::Bits256 id_; + + ton::adnl::AdnlNodeIdShort src_; + ton::adnl::AdnlNodeIdShort local_id_; + td::actor::ActorId adnl_; + td::actor::ActorId rldp_; + + td::BufferedFd fd_; + + td::actor::ActorId self_; + + td::int32 cur_seqno_ = 0, cur_max_chunk_size_ = 0; + td::Promise cur_promise_; + td::int32 out_seqno_ = 0; + bool close_ = false, sent_request_ = false, got_last_part_ = false; + bool active_timer_ = false; +}; + class RldpToTcpRequestSender : public td::actor::Actor { public: RldpToTcpRequestSender(td::Bits256 id, ton::adnl::AdnlNodeIdShort local_id, ton::adnl::AdnlNodeIdShort dst, @@ -627,8 +865,8 @@ class RldpHttpProxy : public td::actor::Actor { client_port_ = port; } - void set_local_host(std::string name, td::IPAddress remote) { - local_hosts_.emplace_back(std::move(name), std::move(remote)); + void set_local_host(std::string host, td::uint16 port, td::IPAddress remote) { + hosts_[host].ports_[port].remote_addr_ = remote; } void receive_request_result(td::uint64 id, td::Result> R) { @@ -688,7 +926,7 @@ class RldpHttpProxy : public td::actor::Actor { } void store_dht() { - for (auto &serv : local_hosts_) { + for (auto &serv : hosts_) { if (serv.first != "*") { for (auto &serv_id : server_ids_) { ton::PublicKey key = ton::pubkeys::Unenc{"http." + serv.first}; @@ -765,7 +1003,7 @@ class RldpHttpProxy : public td::actor::Actor { } void run_cont() { - if (is_client_ && local_hosts_.size() > 0) { + if (is_client_ && hosts_.size() > 0) { LOG(ERROR) << "client-only node cannot be server"; std::_Exit(2); } @@ -876,9 +1114,6 @@ class RldpHttpProxy : public td::actor::Actor { ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_request::ID), std::make_unique(actor_id(this))); } - for (auto &serv : local_hosts_) { - servers_.emplace(serv.first, td::actor::create_actor("remote", serv.second)); - } rldp_ = ton::rldp::Rldp::create(adnl_.get()); td::actor::send_closure(rldp_, &ton::rldp::Rldp::add_id, local_id_); @@ -938,7 +1173,7 @@ class RldpHttpProxy : public td::actor::Actor { void receive_rldp_request(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data, td::Promise promise) { LOG(INFO) << "got HTTP request over rldp from " << src; - TRY_RESULT_PROMISE(promise, f, ton::fetch_tl_object(std::move(data), true)); + TRY_RESULT_PROMISE(promise, f, ton::fetch_tl_object(data, true)); TRY_RESULT_PROMISE(promise, request, ton::http::HttpRequest::create(f->method_, f->url_, f->http_version_)); for (auto &x : f->headers_) { ton::http::HttpHeader h{x->name_, x->value_}; @@ -947,7 +1182,8 @@ class RldpHttpProxy : public td::actor::Actor { } TRY_STATUS_PROMISE(promise, request->complete_parse_header()); auto host = request->host(); - if (host.size() == 0) { + td::uint16 port = 80; + if (host.empty()) { host = request->url(); if (host.size() >= 7 && host.substr(0, 7) == "http://") { host = host.substr(7); @@ -972,29 +1208,60 @@ class RldpHttpProxy : public td::actor::Actor { { auto p = host.find(':'); if (p != std::string::npos) { + try { + port = (td::uint16)std::stoul(host.substr(p + 1)); + } catch (const std::logic_error &) { + port = 80; + promise.set_error(td::Status::Error(PSLICE() << "Invalid port '" << host.substr(p + 1) << "'")); + return; + } host = host.substr(0, p); } } std::transform(host.begin(), host.end(), host.begin(), [](unsigned char c) { return std::tolower(c); }); - auto it = servers_.find(host); - if (it == servers_.end()) { - it = servers_.find("*"); - if (it == servers_.end()) { + auto it = hosts_.find(host); + if (it == hosts_.end()) { + it = hosts_.find("*"); + if (it == hosts_.end()) { promise.set_error(td::Status::Error(ton::ErrorCode::error, "unknown server name")); return; } } + auto it2 = it->second.ports_.find(port); + if (it2 == it->second.ports_.end()) { + promise.set_error(td::Status::Error(ton::ErrorCode::error, "unknown host:port")); + return; + } + auto &server = it2->second; + if (request->method() == "CONNECT") { + LOG(INFO) << "starting HTTP tunnel over RLDP to " << server.remote_addr_; + start_tcp_tunnel(f->id_, src, dst, server.remote_addr_, std::move(promise)); + return; + } + + if (server.http_remote_.empty()) { + server.http_remote_ = td::actor::create_actor("remote", server.remote_addr_); + } TRY_RESULT_PROMISE(promise, payload, request->create_empty_payload()); LOG(INFO) << "starting HTTP over RLDP request"; td::actor::create_actor("inboundreq", f->id_, dst, src, std::move(request), std::move(payload), std::move(promise), adnl_.get(), rldp_.get(), - it->second.get()) + server.http_remote_.get()) .release(); } + void start_tcp_tunnel(td::Bits256 id, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id, + td::IPAddress ip, td::Promise promise) { + TRY_RESULT_PROMISE(promise, fd, td::SocketFd::open(ip)); + td::actor::create_actor(td::actor::ActorOptions().with_name("tunnel").with_poll(), id, src, local_id, + adnl_.get(), rldp_.get(), std::move(fd)).release(); + promise.set_result(ton::create_serialize_tl_object( + "HTTP/1.1", 200, "Connection Established", std::vector>())); + } + void add_adnl_addr(ton::adnl::AdnlNodeIdShort id) { server_ids_.insert(id); } @@ -1008,10 +1275,17 @@ class RldpHttpProxy : public td::actor::Actor { } private: + struct Host { + struct Server { + td::IPAddress remote_addr_; + td::actor::ActorOwn http_remote_; + }; + std::map ports_; + }; + td::uint16 port_{0}; td::IPAddress addr_; std::string global_config_; - std::vector> local_hosts_; bool is_client_{false}; td::uint16 client_port_{0}; @@ -1023,7 +1297,7 @@ class RldpHttpProxy : public td::actor::Actor { td::actor::ActorOwn server_; std::map dns_; - std::map> servers_; + std::map hosts_; td::actor::ActorOwn keyring_; td::actor::ActorOwn adnl_network_manager_; @@ -1123,6 +1397,42 @@ int main(int argc, char *argv[]) { td::log_interface = td::default_log_interface; }; + auto add_local_host = [&](const std::string& local, const std::string& remote) -> td::Status { + std::string host; + std::vector ports; + auto p = local.find(':'); + if (p == std::string::npos) { + host = local; + ports = {80, 443}; + } else { + host = local.substr(0, p); + ++p; + while (p < local.size()) { + auto p2 = local.find(',', p); + if (p2 == std::string::npos) { + p2 = local.size(); + } + try { + ports.push_back((td::uint16)std::stoul(local.substr(p, p2 - p))); + } catch (const std::logic_error& e) { + return td::Status::Error(PSLICE() << "Invalid port: " << local.substr(p, p2 - p)); + } + p = p2 + 1; + } + } + for (td::uint16 port : ports) { + std::string cur_remote = remote; + if (cur_remote.find(':') == std::string::npos) { + cur_remote += ':'; + cur_remote += std::to_string(port); + } + td::IPAddress addr; + TRY_STATUS(addr.init_host_port(cur_remote)); + td::actor::send_closure(x, &RldpHttpProxy::set_local_host, host, port, addr); + } + return td::Status::OK(); + }; + td::OptionParser p; p.set_description( "A simple rldp-to-http and http-to-rldp proxy for running and accessing ton sites\n" @@ -1136,7 +1446,8 @@ int main(int argc, char *argv[]) { SET_VERBOSITY_LEVEL(v); }); p.add_option('V', "version", "shows rldp-http-proxy build information", [&]() { - std::cout << "rldp-http-proxy build information: [ Commit: " << GitMetadata::CommitSHA1() << ", Date: " << GitMetadata::CommitDate() << "]\n"; + std::cout << "rldp-http-proxy build information: [ Commit: " << GitMetadata::CommitSHA1() + << ", Date: " << GitMetadata::CommitDate() << "]\n"; std::exit(0); }); p.add_option('h', "help", "prints a help message", [&]() { @@ -1170,27 +1481,25 @@ int main(int argc, char *argv[]) { }); p.add_option('C', "global-config", "global TON configuration file", [&](td::Slice arg) { td::actor::send_closure(x, &RldpHttpProxy::set_global_config, arg.str()); }); - p.add_checked_option('L', "local", "http hostname that will be proxied to http server at localhost:80", + p.add_checked_option('L', "local", + ":, hostname that will be proxied to localhost\n" + " is a comma-separated list of ports (may be omitted, default: 80, 443)\n", [&](td::Slice arg) -> td::Status { - td::IPAddress addr; - TRY_STATUS(addr.init_ipv4_port("127.0.0.1", 80)); - td::actor::send_closure(x, &RldpHttpProxy::set_local_host, arg.str(), addr); - return td::Status::OK(); + return add_local_host(arg.str(), "127.0.0.1"); }); p.add_option('D', "db", "db root", [&](td::Slice arg) { td::actor::send_closure(x, &RldpHttpProxy::set_db_root, arg.str()); }); p.add_checked_option( 'R', "remote", - "@:, indicates a http hostname that will be proxied to remote http server at :", + ":@:, indicates a hostname that will be proxied to remote server at :\n" + " is a comma-separated list of ports (may be omitted, default: 80,433)\n" + " is a remote port (may be omitted, default: same as host's port)", [&](td::Slice arg) -> td::Status { auto ch = arg.find('@'); if (ch == td::Slice::npos) { return td::Status::Error("bad format for --remote"); } - td::IPAddress addr; - TRY_STATUS(addr.init_host_port(arg.substr(ch + 1).str())); - td::actor::send_closure(x, &RldpHttpProxy::set_local_host, arg.substr(0, ch).str(), addr); - return td::Status::OK(); + return add_local_host(arg.substr(0, ch).str(), arg.substr(ch + 1).str()); }); p.add_option('d', "daemonize", "set SIGHUP", [&]() { td::set_signal_handler(td::SignalType::HangUp, [](int sig) {