1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-12 11:12:16 +00:00

HTTP tunnel in rldp-http-proxy

This commit is contained in:
SpyCheese 2022-06-27 19:38:11 +03:00
parent d11580dfb3
commit c55b6f84a5
7 changed files with 432 additions and 100 deletions

View file

@ -58,9 +58,7 @@ void HttpConnection::loop() {
}
TRY_STATUS(buffered_fd_.flush_write());
if (writing_payload_ && buffered_fd_.left_unwritten() < fd_high_watermark()) {
auto w = buffered_fd_.left_unwritten();
continue_payload_write();
written = buffered_fd_.left_unwritten() > w;
written = continue_payload_write();
}
if (close_after_write_ && !writing_payload_ && !buffered_fd_.left_unwritten()) {
LOG(INFO) << "close after write";
@ -120,7 +118,7 @@ void HttpConnection::write_payload(std::shared_ptr<HttpPayload> payload) {
class Cb : public HttpPayload::Callback {
public:
Cb(td::actor::ActorId<HttpConnection> conn) : conn_(conn) {
Cb(td::actor::ActorId<HttpConnection> conn, size_t watermark) : conn_(conn), watermark_(watermark) {
}
void run(size_t ready_bytes) override {
if (!reached_ && ready_bytes >= watermark_) {
@ -135,23 +133,23 @@ void HttpConnection::write_payload(std::shared_ptr<HttpPayload> payload) {
}
private:
size_t watermark_ = chunk_size();
bool reached_ = false;
td::actor::ActorId<HttpConnection> conn_;
size_t watermark_;
bool reached_ = false;
};
writing_payload_->add_callback(std::make_unique<Cb>(actor_id(this)));
writing_payload_->add_callback(std::make_unique<Cb>(
actor_id(this), writing_payload_->payload_type() == HttpPayload::PayloadType::pt_tunnel ? 1 : chunk_size()));
continue_payload_write();
}
void HttpConnection::continue_payload_write() {
bool HttpConnection::continue_payload_write() {
if (!writing_payload_) {
return;
return false;
}
if (writing_payload_->is_error()) {
stop();
return;
return false;
}
auto t = writing_payload_->payload_type();
@ -159,19 +157,24 @@ void HttpConnection::continue_payload_write() {
t = HttpPayload::PayloadType::pt_chunked;
}
bool wrote = false;
while (!writing_payload_->written()) {
if (buffered_fd_.left_unwritten() > fd_high_watermark()) {
return;
return wrote;
}
if (!writing_payload_->parse_completed() && writing_payload_->ready_bytes() < chunk_size()) {
return;
bool is_tunnel = writing_payload_->payload_type() == HttpPayload::PayloadType::pt_tunnel;
if (!is_tunnel && !writing_payload_->parse_completed() && writing_payload_->ready_bytes() < chunk_size()) {
return wrote;
}
writing_payload_->store_http(buffered_fd_.output_buffer(), chunk_size(), t);
if (is_tunnel && writing_payload_->ready_bytes() == 0) {
return wrote;
}
wrote |= writing_payload_->store_http(buffered_fd_.output_buffer(), chunk_size(), t);
}
if (writing_payload_->parse_completed() && writing_payload_->written()) {
payload_written();
return;
}
return wrote;
}
td::Status HttpConnection::read_payload(HttpResponse *response) {

View file

@ -65,9 +65,7 @@ class HttpConnection : public td::actor::Actor, public td::ObserverBase {
void send_request(std::unique_ptr<HttpRequest> request, std::shared_ptr<HttpPayload> payload);
void send_response(std::unique_ptr<HttpResponse> response, std::shared_ptr<HttpPayload> payload);
void write_payload(std::shared_ptr<HttpPayload> payload);
void continue_payload_write();
td::Status receive_request();
td::Status receive_response();
bool continue_payload_write();
td::Status read_payload(HttpRequest *request);
td::Status read_payload(HttpResponse *response);
td::Status read_payload(std::shared_ptr<HttpPayload> payload);

View file

@ -35,7 +35,8 @@ class HttpInboundConnection : public HttpConnection {
td::Status receive_eof() override {
found_eof_ = true;
if (reading_payload_) {
if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof) {
if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof &&
reading_payload_->payload_type() != HttpPayload::PayloadType::pt_tunnel) {
return td::Status::Error("unexpected EOF");
} else {
reading_payload_->complete_parse();

View file

@ -44,7 +44,8 @@ class HttpOutboundConnection : public HttpConnection {
td::Status receive_eof() override {
found_eof_ = true;
if (reading_payload_) {
if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof) {
if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof &&
reading_payload_->payload_type() != HttpPayload::PayloadType::pt_tunnel) {
return td::Status::Error("unexpected EOF");
} else {
LOG(INFO) << "stopping (EOF payload)";

View file

@ -164,6 +164,8 @@ td::Result<std::shared_ptr<HttpPayload>> HttpRequest::create_empty_payload() {
if (!need_payload()) {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_empty);
} else if (method_ == "CONNECT") {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_tunnel, low_watermark(), high_watermark());
} else if (found_content_length_) {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_content_length, low_watermark(), high_watermark(),
content_length_);
@ -175,7 +177,7 @@ td::Result<std::shared_ptr<HttpPayload>> HttpRequest::create_empty_payload() {
}
bool HttpRequest::need_payload() const {
return found_content_length_ || found_transfer_encoding_;
return found_content_length_ || found_transfer_encoding_ || method_ == "CONNECT";
}
td::Status HttpRequest::add_header(HttpHeader header) {
@ -284,7 +286,8 @@ td::Status HttpPayload::parse(td::ChainBufferReader &input) {
case PayloadType::pt_empty:
UNREACHABLE();
case PayloadType::pt_eof:
cur_chunk_size_ = 1 << 30;
case PayloadType::pt_tunnel:
cur_chunk_size_ = 1LL << 60;
break;
case PayloadType::pt_chunked:
state_ = ParseState::reading_crlf;
@ -480,17 +483,18 @@ void HttpPayload::run_callbacks() {
}
}
void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type) {
bool HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type) {
if (store_type == PayloadType::pt_empty) {
return;
return false;
}
slice_gc();
bool wrote = false;
while (chunks_.size() > 0 && max_size > 0) {
auto cur_state = state_.load(std::memory_order_consume);
auto s = get_slice(max_size);
if (s.size() == 0) {
if (cur_state != ParseState::reading_trailer && cur_state != ParseState::completed) {
return;
return wrote;
} else {
break;
}
@ -500,28 +504,33 @@ void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt
if (store_type == PayloadType::pt_chunked) {
char buf[64];
::sprintf(buf, "%lx\r\n", s.size());
output.append(td::Slice(buf, strlen(buf)));
auto slice = td::Slice(buf, strlen(buf));
wrote |= !slice.empty();
output.append(slice);
}
wrote |= !s.empty();
output.append(std::move(s));
if (store_type == PayloadType::pt_chunked) {
output.append(td::Slice("\r\n", 2));
wrote = true;
}
}
if (chunks_.size() != 0) {
return;
return wrote;
}
if (!written_zero_chunk_) {
if (store_type == PayloadType::pt_chunked) {
output.append(td::Slice("0\r\n", 3));
wrote = true;
}
written_zero_chunk_ = true;
}
if (store_type != PayloadType::pt_chunked) {
written_trailer_ = true;
return;
return wrote;
}
while (max_size > 0) {
@ -529,15 +538,16 @@ void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt
HttpHeader h = get_header();
if (h.empty()) {
if (cur_state != ParseState::completed) {
return;
return wrote;
} else {
break;
}
}
auto s = h.size();
h.store_http(output);
wrote = true;
if (max_size <= s) {
return;
return wrote;
}
max_size -= s;
}
@ -545,7 +555,9 @@ void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt
if (!written_trailer_) {
output.append(td::Slice("\r\n", 2));
written_trailer_ = true;
wrote = true;
}
return wrote;
}
tl_object_ptr<ton_api::http_payloadPart> HttpPayload::store_tl(size_t max_size) {
@ -729,17 +741,18 @@ td::Result<std::unique_ptr<HttpResponse>> HttpResponse::parse(std::unique_ptr<Ht
}
HttpResponse::HttpResponse(std::string proto_version, td::uint32 code, std::string reason, bool force_no_payload,
bool keep_alive)
bool keep_alive, bool is_tunnel)
: proto_version_(std::move(proto_version))
, code_(code)
, reason_(std::move(reason))
, force_no_payload_(force_no_payload)
, force_no_keep_alive_(!keep_alive) {
, force_no_keep_alive_(!keep_alive)
, is_tunnel_(is_tunnel) {
}
td::Result<std::unique_ptr<HttpResponse>> HttpResponse::create(std::string proto_version, td::uint32 code,
std::string reason, bool force_no_payload,
bool keep_alive) {
bool keep_alive, bool is_tunnel) {
if (proto_version != "HTTP/1.0" && proto_version != "HTTP/1.1") {
return td::Status::Error(PSTRING() << "unsupported http version '" << proto_version << "'");
}
@ -749,7 +762,7 @@ td::Result<std::unique_ptr<HttpResponse>> HttpResponse::create(std::string proto
}
return std::make_unique<HttpResponse>(std::move(proto_version), code, std::move(reason), force_no_payload,
keep_alive);
keep_alive, is_tunnel);
}
td::Status HttpResponse::complete_parse_header() {
@ -767,6 +780,8 @@ td::Result<std::shared_ptr<HttpPayload>> HttpResponse::create_empty_payload() {
if (!need_payload()) {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_empty);
} else if (is_tunnel_) {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_tunnel, low_watermark(), high_watermark());
} else if (found_content_length_) {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_content_length, low_watermark(), high_watermark(),
content_length_);
@ -828,11 +843,13 @@ void HttpResponse::store_http(td::ChainBufferWriter &output) {
for (auto &x : options_) {
x.store_http(output);
}
if (!is_tunnel_) {
if (keep_alive_) {
HttpHeader{"Connection", "Keep-Alive"}.store_http(output);
} else {
HttpHeader{"Connection", "Close"}.store_http(output);
}
}
output.append(td::Slice("\r\n", 2));
}
@ -893,7 +910,9 @@ void answer_error(HttpStatusCode code, std::string reason,
}
auto response = HttpResponse::create("HTTP/1.0", code, reason, false, false).move_as_ok();
response->add_header(HttpHeader{"Content-Length", "0"});
response->complete_parse_header();
auto payload = response->create_empty_payload().move_as_ok();
payload->complete_parse();
CHECK(payload->parse_completed());
promise.set_value(std::make_pair(std::move(response), std::move(payload)));
}

View file

@ -64,7 +64,7 @@ td::Result<HttpHeader> get_header(std::string line);
class HttpPayload {
public:
enum class PayloadType { pt_empty, pt_eof, pt_chunked, pt_content_length };
enum class PayloadType { pt_empty, pt_eof, pt_chunked, pt_content_length, pt_tunnel };
HttpPayload(PayloadType t, size_t low_watermark, size_t high_watermark, td::uint64 size)
: type_(t), low_watermark_(low_watermark), high_watermark_(high_watermark), cur_chunk_size_(size) {
CHECK(t == PayloadType::pt_content_length);
@ -75,17 +75,15 @@ class HttpPayload {
CHECK(t != PayloadType::pt_content_length);
CHECK(t != PayloadType::pt_empty);
switch (t) {
case PayloadType::pt_empty:
UNREACHABLE();
case PayloadType::pt_eof:
case PayloadType::pt_tunnel:
state_ = ParseState::reading_chunk_data;
break;
case PayloadType::pt_chunked:
state_ = ParseState::reading_chunk_header;
break;
case PayloadType::pt_content_length:
state_ = ParseState::reading_chunk_data;
break;
default:
UNREACHABLE();
}
}
HttpPayload(PayloadType t) : type_(t) {
@ -136,7 +134,7 @@ class HttpPayload {
void slice_gc();
HttpHeader get_header();
void store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type);
bool store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type);
tl_object_ptr<ton_api::http_payloadPart> store_tl(size_t max_size);
bool written() const {
@ -267,9 +265,11 @@ class HttpResponse {
}
static td::Result<std::unique_ptr<HttpResponse>> create(std::string proto_version, td::uint32 code,
std::string reason, bool force_no_payload, bool keep_alive);
std::string reason, bool force_no_payload, bool keep_alive,
bool is_tunnel = false);
HttpResponse(std::string proto_version, td::uint32 code, std::string reason, bool force_no_payload, bool keep_alive);
HttpResponse(std::string proto_version, td::uint32 code, std::string reason, bool force_no_payload, bool keep_alive,
bool is_tunnel = false);
bool check_parse_header_completed() const;
bool keep_alive() const {
@ -323,6 +323,7 @@ class HttpResponse {
bool keep_alive_ = false;
std::vector<HttpHeader> options_;
bool is_tunnel_ = false;
};
void answer_error(HttpStatusCode code, std::string reason,

View file

@ -52,6 +52,8 @@
#include <list>
#include <set>
#include "git.h"
#include "td/utils/BufferedFd.h"
#include "common/delay.h"
#if TD_DARWIN || TD_LINUX
#include <unistd.h>
@ -128,8 +130,15 @@ class HttpRldpPayloadReceiver : public td::actor::Actor {
public:
HttpRldpPayloadReceiver(std::shared_ptr<ton::http::HttpPayload> payload, td::Bits256 transfer_id,
ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id,
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp)
: payload_(std::move(payload)), id_(transfer_id), src_(src), local_id_(local_id), adnl_(adnl), rldp_(rldp) {
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp,
bool is_tunnel = false)
: payload_(std::move(payload))
, id_(transfer_id)
, src_(src)
, local_id_(local_id)
, adnl_(adnl)
, rldp_(rldp)
, is_tunnel_(is_tunnel) {
}
void start_up() override {
@ -178,13 +187,14 @@ class HttpRldpPayloadReceiver : public td::actor::Actor {
auto f = ton::create_serialize_tl_object<ton::ton_api::http_getNextPayloadPart>(
id_, seqno_++, static_cast<td::int32>(chunk_size()));
auto timeout = td::Timestamp::in(is_tunnel_ ? 60.0 : 15.0);
td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, src_, "payload part", std::move(P),
td::Timestamp::in(15.0), std::move(f), 2 * chunk_size() + 1024);
timeout, std::move(f), 2 * chunk_size() + 1024);
}
void add_data(td::BufferSlice data) {
LOG(INFO) << "HttpPayloadReceiver: received answer (size " << data.size() << ")";
auto F = ton::fetch_tl_object<ton::ton_api::http_payloadPart>(std::move(data), true);
auto F = ton::fetch_tl_object<ton::ton_api::http_payloadPart>(data, true);
if (F.is_error()) {
abort_query(F.move_as_error());
return;
@ -243,14 +253,20 @@ class HttpRldpPayloadReceiver : public td::actor::Actor {
bool sent_ = false;
td::int32 seqno_ = 0;
bool is_tunnel_;
};
class HttpRldpPayloadSender : public td::actor::Actor {
public:
HttpRldpPayloadSender(std::shared_ptr<ton::http::HttpPayload> payload, td::Bits256 transfer_id,
ton::adnl::AdnlNodeIdShort local_id, td::actor::ActorId<ton::adnl::Adnl> adnl,
td::actor::ActorId<ton::rldp::Rldp> rldp)
: payload_(std::move(payload)), id_(transfer_id), local_id_(local_id), adnl_(adnl), rldp_(rldp) {
td::actor::ActorId<ton::rldp::Rldp> rldp, bool is_tunnel = false)
: payload_(std::move(payload))
, id_(transfer_id)
, local_id_(local_id)
, adnl_(adnl)
, rldp_(rldp)
, is_tunnel_(is_tunnel) {
}
std::string generate_prefix() const {
@ -287,32 +303,36 @@ class HttpRldpPayloadSender : public td::actor::Actor {
class Cb : public ton::http::HttpPayload::Callback {
public:
Cb(td::actor::ActorId<HttpRldpPayloadSender> id) : self_id_(id) {
Cb(td::actor::ActorId<HttpRldpPayloadSender> id, size_t watermark) : self_id_(id), watermark_(watermark) {
}
void run(size_t ready_bytes) override {
if (!reached_ && ready_bytes >= watermark_) {
reached_ = true;
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query);
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false);
} else if (reached_ && ready_bytes < watermark_) {
reached_ = false;
}
}
void completed() override {
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query);
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false);
}
private:
size_t watermark_ = ton::http::HttpRequest::low_watermark();
bool reached_ = false;
td::actor::ActorId<HttpRldpPayloadSender> self_id_;
size_t watermark_;
};
payload_->add_callback(std::make_unique<Cb>(actor_id(this)));
payload_->add_callback(std::make_unique<Cb>(actor_id(this),
is_tunnel_ ? 1 : ton::http::HttpRequest::low_watermark()));
alarm_timestamp() = td::Timestamp::in(10.0);
alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 60.0 : 10.0);
}
void try_answer_query() {
void try_answer_query(bool from_timer = false) {
if (from_timer) {
active_timer_ = false;
}
if (!cur_query_promise_) {
return;
}
@ -321,6 +341,15 @@ class HttpRldpPayloadSender : public td::actor::Actor {
}
if (payload_->parse_completed() || payload_->ready_bytes() >= ton::http::HttpRequest::low_watermark()) {
answer_query();
} else if (!is_tunnel_ || payload_->ready_bytes() == 0) {
return;
} else if (from_timer) {
answer_query();
} else if (!active_timer_) {
active_timer_ = true;
ton::delay_action([SelfId = actor_id(this)]() {
td::actor::send_closure(SelfId, &HttpRldpPayloadSender::try_answer_query, true);
}, td::Timestamp::in(0.001));
}
}
@ -348,16 +377,12 @@ class HttpRldpPayloadSender : public td::actor::Actor {
LOG(INFO) << "received request. size=" << cur_query_size_ << " parse_completed=" << payload_->parse_completed()
<< " ready_bytes=" << payload_->ready_bytes();
if (payload_->parse_completed() || payload_->ready_bytes() >= ton::http::HttpRequest::low_watermark()) {
answer_query();
return;
}
alarm_timestamp() = td::Timestamp::in(10.0);
alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 50.0 : 10.0);
try_answer_query(false);
}
void receive_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
auto F = ton::fetch_tl_object<ton::ton_api::http_getNextPayloadPart>(std::move(data), true);
auto F = ton::fetch_tl_object<ton::ton_api::http_getNextPayloadPart>(data, true);
if (F.is_error()) {
LOG(INFO) << "failed to parse query: " << F.move_as_error();
return;
@ -367,6 +392,10 @@ class HttpRldpPayloadSender : public td::actor::Actor {
void alarm() override {
if (cur_query_promise_) {
if (is_tunnel_) {
answer_query();
return;
}
LOG(INFO) << "timeout on inbound connection. closing http transfer";
} else {
LOG(INFO) << "timeout on RLDP connection. closing http transfer";
@ -382,7 +411,7 @@ class HttpRldpPayloadSender : public td::actor::Actor {
}
seqno_++;
alarm_timestamp() = td::Timestamp::in(30.0);
alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 60.0 : 30.0);
}
void abort_query(td::Status error) {
@ -412,6 +441,7 @@ class HttpRldpPayloadSender : public td::actor::Actor {
size_t cur_query_size_;
td::Promise<td::BufferSlice> cur_query_promise_;
bool is_tunnel_, active_timer_ = false;
};
class RldpHttpProxy;
@ -452,7 +482,8 @@ class TcpToRldpRequestSender : public td::actor::Actor {
}
});
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender", request_payload_, id_, local_id_, adnl_, rldp_)
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender", request_payload_, id_, local_id_, adnl_, rldp_,
is_tunnel())
.release();
auto f = ton::serialize_tl_object(request_->store_tl(id_), true);
@ -461,13 +492,14 @@ class TcpToRldpRequestSender : public td::actor::Actor {
}
void got_result(td::BufferSlice data) {
auto F = ton::fetch_tl_object<ton::ton_api::http_response>(std::move(data), true);
auto F = ton::fetch_tl_object<ton::ton_api::http_response>(data, true);
if (F.is_error()) {
abort_query(F.move_as_error());
return;
}
auto f = F.move_as_ok();
auto R = ton::http::HttpResponse::create(f->http_version_, f->status_code_, f->reason_, false, true);
auto R = ton::http::HttpResponse::create(
f->http_version_, f->status_code_, f->reason_, false, true, is_tunnel() && f->status_code_ == 200);
if (R.is_error()) {
abort_query(R.move_as_error());
return;
@ -498,7 +530,7 @@ class TcpToRldpRequestSender : public td::actor::Actor {
}
});
td::actor::create_actor<HttpRldpPayloadReceiver>("HttpPayloadReceiver", response_payload_, id_, dst_, local_id_,
adnl_, rldp_)
adnl_, rldp_, is_tunnel())
.release();
promise_.set_value(std::make_pair(std::move(response_), std::move(response_payload_)));
@ -515,6 +547,10 @@ class TcpToRldpRequestSender : public td::actor::Actor {
}
protected:
bool is_tunnel() const {
return request_->method() == "CONNECT";
}
td::Bits256 id_;
ton::adnl::AdnlNodeIdShort local_id_;
@ -535,6 +571,208 @@ class TcpToRldpRequestSender : public td::actor::Actor {
std::shared_ptr<ton::http::HttpPayload> response_payload_;
};
class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase {
public:
RldpTcpTunnel(td::Bits256 transfer_id, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id,
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp, td::SocketFd fd)
: id_(transfer_id)
, src_(src)
, local_id_(local_id)
, adnl_(std::move(adnl))
, rldp_(std::move(rldp))
, fd_(std::move(fd)) {
}
void start_up() override {
self_ = actor_id(this);
td::actor::SchedulerContext::get()->get_poll().subscribe(fd_.get_poll_info().extract_pollable_fd(this),
td::PollFlags::ReadWrite());
class Cb : public ton::adnl::Adnl::Callback {
public:
explicit Cb(td::actor::ActorId<RldpTcpTunnel> id) : self_id_(std::move(id)) {
}
void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst,
td::BufferSlice data) override {
LOG(INFO) << "rldp tcp tunnel: dropping message";
}
void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) override {
td::actor::send_closure(self_id_, &RldpTcpTunnel::receive_query, std::move(data), std::move(promise));
}
private:
td::actor::ActorId<RldpTcpTunnel> self_id_;
};
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_, generate_prefix(),
std::make_unique<Cb>(actor_id(this)));
process();
}
void tear_down() override {
LOG(INFO) << "RldpTcpTunnel: tear_down";
td::actor::send_closure(adnl_, &ton::adnl::Adnl::unsubscribe, local_id_, generate_prefix());
td::actor::SchedulerContext::get()->get_poll().unsubscribe(fd_.get_poll_info().get_pollable_fd_ref());
}
void notify() override {
td::actor::send_closure(self_, &RldpTcpTunnel::process);
}
void request_data() {
if (close_ || sent_request_) {
return;
}
sent_request_ = true;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::BufferSlice> R) {
td::actor::send_closure(SelfId, &RldpTcpTunnel::got_data_from_rldp, std::move(R));
});
auto f = ton::create_serialize_tl_object<ton::ton_api::http_getNextPayloadPart>(id_, out_seqno_++, 1 << 17);
td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, src_, "payload part", std::move(P),
td::Timestamp::in(60.0), std::move(f), (1 << 18) + 1024);
}
void receive_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
auto F = ton::fetch_tl_object<ton::ton_api::http_getNextPayloadPart>(data, true);
if (F.is_error()) {
LOG(INFO) << "failed to parse query: " << F.error();
promise.set_error(F.move_as_error());
return;
}
auto f = F.move_as_ok();
if (cur_promise_) {
LOG(INFO) << "failed to process query: previous query is active";
promise.set_error(td::Status::Error("previous query is active"));
return;
}
if (f->seqno_ != cur_seqno_) {
LOG(INFO) << "failed to process query: seqno mismatch";
promise.set_error(td::Status::Error("seqno mismatch"));
return;
}
LOG(INFO) << "RldpTcpTunnel: received query, seqno=" << cur_seqno_;
cur_promise_ = std::move(promise);
cur_max_chunk_size_ = f->max_chunk_size_;
alarm_timestamp() = td::Timestamp::in(50.0);
process();
}
void got_data_from_rldp(td::Result<td::BufferSlice> R) {
if (R.is_error()) {
abort(R.move_as_error());
return;
}
td::BufferSlice data = R.move_as_ok();
LOG(INFO) << "RldpTcpTunnel: received data from rldp: size=" << data.size();
sent_request_ = false;
auto F = ton::fetch_tl_object<ton::ton_api::http_payloadPart>(data, true);
if (F.is_error()) {
abort(F.move_as_error());
return;
}
auto f = F.move_as_ok();
fd_.output_buffer().append(std::move(f->data_));
if (f->last_) {
got_last_part_ = true;
}
process();
}
void process() {
if (!close_) {
auto status = [&] {
TRY_STATUS(fd_.flush_read());
TRY_STATUS(fd_.flush_write());
close_ = td::can_close(fd_);
return td::Status::OK();
}();
if (status.is_error()) {
abort(std::move(status));
return;
}
}
if (got_last_part_) {
close_ = true;
}
answer_query();
request_data();
}
void answer_query(bool allow_empty = false, bool from_timer = false) {
if (from_timer) {
active_timer_ = false;
}
auto &input = fd_.input_buffer();
if (cur_promise_ && (!input.empty() || close_ || allow_empty)) {
if (!from_timer && !close_ && !allow_empty && input.size() < ton::http::HttpRequest::low_watermark()) {
if (!active_timer_) {
active_timer_ = true;
ton::delay_action([SelfId = actor_id(this)]() {
td::actor::send_closure(SelfId, &RldpTcpTunnel::answer_query, false, true);
}, td::Timestamp::in(0.001));
}
return;
}
size_t s = std::min<size_t>(input.size(), cur_max_chunk_size_);
td::BufferSlice data(s);
LOG(INFO) << "RldpTcpTunnel: sending data to rldp: size=" << data.size();
input.advance(s, td::as_mutable_slice(data));
cur_promise_.set_result(ton::create_serialize_tl_object<ton::ton_api::http_payloadPart>(
std::move(data), std::vector<ton::tl_object_ptr<ton::ton_api::http_header>>(), close_));
++cur_seqno_;
cur_promise_.reset();
alarm_timestamp() = td::Timestamp::never();
if (close_) {
stop();
return;
}
}
}
void alarm() override {
answer_query(true, false);
}
void abort(td::Status status) {
LOG(INFO) << "RldpTcpTunnel error: " << status;
if (cur_promise_) {
cur_promise_.set_error(status.move_as_error());
}
stop();
}
private:
std::string generate_prefix() const {
std::string x(static_cast<size_t>(36), '\0');
auto S = td::MutableSlice{x};
CHECK(S.size() == 36);
auto id = ton::ton_api::http_getNextPayloadPart::ID;
S.copy_from(td::Slice(reinterpret_cast<const td::uint8 *>(&id), 4));
S.remove_prefix(4);
S.copy_from(id_.as_slice());
return x;
}
td::Bits256 id_;
ton::adnl::AdnlNodeIdShort src_;
ton::adnl::AdnlNodeIdShort local_id_;
td::actor::ActorId<ton::adnl::Adnl> adnl_;
td::actor::ActorId<ton::rldp::Rldp> rldp_;
td::BufferedFd<td::SocketFd> fd_;
td::actor::ActorId<RldpTcpTunnel> self_;
td::int32 cur_seqno_ = 0, cur_max_chunk_size_ = 0;
td::Promise<td::BufferSlice> cur_promise_;
td::int32 out_seqno_ = 0;
bool close_ = false, sent_request_ = false, got_last_part_ = false;
bool active_timer_ = false;
};
class RldpToTcpRequestSender : public td::actor::Actor {
public:
RldpToTcpRequestSender(td::Bits256 id, ton::adnl::AdnlNodeIdShort local_id, ton::adnl::AdnlNodeIdShort dst,
@ -627,8 +865,8 @@ class RldpHttpProxy : public td::actor::Actor {
client_port_ = port;
}
void set_local_host(std::string name, td::IPAddress remote) {
local_hosts_.emplace_back(std::move(name), std::move(remote));
void set_local_host(std::string host, td::uint16 port, td::IPAddress remote) {
hosts_[host].ports_[port].remote_addr_ = remote;
}
void receive_request_result(td::uint64 id, td::Result<tonlib_api::object_ptr<tonlib_api::Object>> R) {
@ -688,7 +926,7 @@ class RldpHttpProxy : public td::actor::Actor {
}
void store_dht() {
for (auto &serv : local_hosts_) {
for (auto &serv : hosts_) {
if (serv.first != "*") {
for (auto &serv_id : server_ids_) {
ton::PublicKey key = ton::pubkeys::Unenc{"http." + serv.first};
@ -765,7 +1003,7 @@ class RldpHttpProxy : public td::actor::Actor {
}
void run_cont() {
if (is_client_ && local_hosts_.size() > 0) {
if (is_client_ && hosts_.size() > 0) {
LOG(ERROR) << "client-only node cannot be server";
std::_Exit(2);
}
@ -876,9 +1114,6 @@ class RldpHttpProxy : public td::actor::Actor {
ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_request::ID),
std::make_unique<AdnlCb>(actor_id(this)));
}
for (auto &serv : local_hosts_) {
servers_.emplace(serv.first, td::actor::create_actor<HttpRemote>("remote", serv.second));
}
rldp_ = ton::rldp::Rldp::create(adnl_.get());
td::actor::send_closure(rldp_, &ton::rldp::Rldp::add_id, local_id_);
@ -938,7 +1173,7 @@ class RldpHttpProxy : public td::actor::Actor {
void receive_rldp_request(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) {
LOG(INFO) << "got HTTP request over rldp from " << src;
TRY_RESULT_PROMISE(promise, f, ton::fetch_tl_object<ton::ton_api::http_request>(std::move(data), true));
TRY_RESULT_PROMISE(promise, f, ton::fetch_tl_object<ton::ton_api::http_request>(data, true));
TRY_RESULT_PROMISE(promise, request, ton::http::HttpRequest::create(f->method_, f->url_, f->http_version_));
for (auto &x : f->headers_) {
ton::http::HttpHeader h{x->name_, x->value_};
@ -947,7 +1182,8 @@ class RldpHttpProxy : public td::actor::Actor {
}
TRY_STATUS_PROMISE(promise, request->complete_parse_header());
auto host = request->host();
if (host.size() == 0) {
td::uint16 port = 80;
if (host.empty()) {
host = request->url();
if (host.size() >= 7 && host.substr(0, 7) == "http://") {
host = host.substr(7);
@ -972,29 +1208,60 @@ class RldpHttpProxy : public td::actor::Actor {
{
auto p = host.find(':');
if (p != std::string::npos) {
try {
port = (td::uint16)std::stoul(host.substr(p + 1));
} catch (const std::logic_error &) {
port = 80;
promise.set_error(td::Status::Error(PSLICE() << "Invalid port '" << host.substr(p + 1) << "'"));
return;
}
host = host.substr(0, p);
}
}
std::transform(host.begin(), host.end(), host.begin(), [](unsigned char c) { return std::tolower(c); });
auto it = servers_.find(host);
if (it == servers_.end()) {
it = servers_.find("*");
if (it == servers_.end()) {
auto it = hosts_.find(host);
if (it == hosts_.end()) {
it = hosts_.find("*");
if (it == hosts_.end()) {
promise.set_error(td::Status::Error(ton::ErrorCode::error, "unknown server name"));
return;
}
}
auto it2 = it->second.ports_.find(port);
if (it2 == it->second.ports_.end()) {
promise.set_error(td::Status::Error(ton::ErrorCode::error, "unknown host:port"));
return;
}
auto &server = it2->second;
if (request->method() == "CONNECT") {
LOG(INFO) << "starting HTTP tunnel over RLDP to " << server.remote_addr_;
start_tcp_tunnel(f->id_, src, dst, server.remote_addr_, std::move(promise));
return;
}
if (server.http_remote_.empty()) {
server.http_remote_ = td::actor::create_actor<HttpRemote>("remote", server.remote_addr_);
}
TRY_RESULT_PROMISE(promise, payload, request->create_empty_payload());
LOG(INFO) << "starting HTTP over RLDP request";
td::actor::create_actor<RldpToTcpRequestSender>("inboundreq", f->id_, dst, src, std::move(request),
std::move(payload), std::move(promise), adnl_.get(), rldp_.get(),
it->second.get())
server.http_remote_.get())
.release();
}
void start_tcp_tunnel(td::Bits256 id, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id,
td::IPAddress ip, td::Promise<td::BufferSlice> promise) {
TRY_RESULT_PROMISE(promise, fd, td::SocketFd::open(ip));
td::actor::create_actor<RldpTcpTunnel>(td::actor::ActorOptions().with_name("tunnel").with_poll(), id, src, local_id,
adnl_.get(), rldp_.get(), std::move(fd)).release();
promise.set_result(ton::create_serialize_tl_object<ton::ton_api::http_response>(
"HTTP/1.1", 200, "Connection Established", std::vector<ton::tl_object_ptr<ton::ton_api::http_header>>()));
}
void add_adnl_addr(ton::adnl::AdnlNodeIdShort id) {
server_ids_.insert(id);
}
@ -1008,10 +1275,17 @@ class RldpHttpProxy : public td::actor::Actor {
}
private:
struct Host {
struct Server {
td::IPAddress remote_addr_;
td::actor::ActorOwn<HttpRemote> http_remote_;
};
std::map<td::uint16, Server> ports_;
};
td::uint16 port_{0};
td::IPAddress addr_;
std::string global_config_;
std::vector<std::pair<std::string, td::IPAddress>> local_hosts_;
bool is_client_{false};
td::uint16 client_port_{0};
@ -1023,7 +1297,7 @@ class RldpHttpProxy : public td::actor::Actor {
td::actor::ActorOwn<ton::http::HttpServer> server_;
std::map<std::string, ton::adnl::AdnlNodeIdShort> dns_;
std::map<std::string, td::actor::ActorOwn<HttpRemote>> servers_;
std::map<std::string, Host> hosts_;
td::actor::ActorOwn<ton::keyring::Keyring> keyring_;
td::actor::ActorOwn<ton::adnl::AdnlNetworkManager> adnl_network_manager_;
@ -1123,6 +1397,42 @@ int main(int argc, char *argv[]) {
td::log_interface = td::default_log_interface;
};
auto add_local_host = [&](const std::string& local, const std::string& remote) -> td::Status {
std::string host;
std::vector<td::uint16> ports;
auto p = local.find(':');
if (p == std::string::npos) {
host = local;
ports = {80, 443};
} else {
host = local.substr(0, p);
++p;
while (p < local.size()) {
auto p2 = local.find(',', p);
if (p2 == std::string::npos) {
p2 = local.size();
}
try {
ports.push_back((td::uint16)std::stoul(local.substr(p, p2 - p)));
} catch (const std::logic_error& e) {
return td::Status::Error(PSLICE() << "Invalid port: " << local.substr(p, p2 - p));
}
p = p2 + 1;
}
}
for (td::uint16 port : ports) {
std::string cur_remote = remote;
if (cur_remote.find(':') == std::string::npos) {
cur_remote += ':';
cur_remote += std::to_string(port);
}
td::IPAddress addr;
TRY_STATUS(addr.init_host_port(cur_remote));
td::actor::send_closure(x, &RldpHttpProxy::set_local_host, host, port, addr);
}
return td::Status::OK();
};
td::OptionParser p;
p.set_description(
"A simple rldp-to-http and http-to-rldp proxy for running and accessing ton sites\n"
@ -1136,7 +1446,8 @@ int main(int argc, char *argv[]) {
SET_VERBOSITY_LEVEL(v);
});
p.add_option('V', "version", "shows rldp-http-proxy build information", [&]() {
std::cout << "rldp-http-proxy build information: [ Commit: " << GitMetadata::CommitSHA1() << ", Date: " << GitMetadata::CommitDate() << "]\n";
std::cout << "rldp-http-proxy build information: [ Commit: " << GitMetadata::CommitSHA1()
<< ", Date: " << GitMetadata::CommitDate() << "]\n";
std::exit(0);
});
p.add_option('h', "help", "prints a help message", [&]() {
@ -1170,27 +1481,25 @@ int main(int argc, char *argv[]) {
});
p.add_option('C', "global-config", "global TON configuration file",
[&](td::Slice arg) { td::actor::send_closure(x, &RldpHttpProxy::set_global_config, arg.str()); });
p.add_checked_option('L', "local", "http hostname that will be proxied to http server at localhost:80",
p.add_checked_option('L', "local",
"<hosthame>:<ports>, hostname that will be proxied to localhost\n"
"<ports> is a comma-separated list of ports (may be omitted, default: 80, 443)\n",
[&](td::Slice arg) -> td::Status {
td::IPAddress addr;
TRY_STATUS(addr.init_ipv4_port("127.0.0.1", 80));
td::actor::send_closure(x, &RldpHttpProxy::set_local_host, arg.str(), addr);
return td::Status::OK();
return add_local_host(arg.str(), "127.0.0.1");
});
p.add_option('D', "db", "db root",
[&](td::Slice arg) { td::actor::send_closure(x, &RldpHttpProxy::set_db_root, arg.str()); });
p.add_checked_option(
'R', "remote",
"<hostname>@<ip>:<port>, indicates a http hostname that will be proxied to remote http server at <ip>:<port>",
"<hostname>:<ports>@<ip>:<port>, indicates a hostname that will be proxied to remote server at <ip>:<port>\n"
"<ports> is a comma-separated list of ports (may be omitted, default: 80,433)\n"
"<port> is a remote port (may be omitted, default: same as host's port)",
[&](td::Slice arg) -> td::Status {
auto ch = arg.find('@');
if (ch == td::Slice::npos) {
return td::Status::Error("bad format for --remote");
}
td::IPAddress addr;
TRY_STATUS(addr.init_host_port(arg.substr(ch + 1).str()));
td::actor::send_closure(x, &RldpHttpProxy::set_local_host, arg.substr(0, ch).str(), addr);
return td::Status::OK();
return add_local_host(arg.substr(0, ch).str(), arg.substr(ch + 1).str());
});
p.add_option('d', "daemonize", "set SIGHUP", [&]() {
td::set_signal_handler(td::SignalType::HangUp, [](int sig) {