1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-02-13 03:32:22 +00:00

Merge pull request #476 from SpyCheese/tonproxy-v1

Improve TON-proxy stability
This commit is contained in:
EmelyanenkoK 2022-09-30 12:06:32 +03:00 committed by GitHub
commit 1ded7af335
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 801 additions and 251 deletions

View file

@ -58,9 +58,7 @@ void HttpConnection::loop() {
}
TRY_STATUS(buffered_fd_.flush_write());
if (writing_payload_ && buffered_fd_.left_unwritten() < fd_high_watermark()) {
auto w = buffered_fd_.left_unwritten();
continue_payload_write();
written = buffered_fd_.left_unwritten() > w;
written = continue_payload_write();
}
if (close_after_write_ && !writing_payload_ && !buffered_fd_.left_unwritten()) {
LOG(INFO) << "close after write";
@ -120,7 +118,7 @@ void HttpConnection::write_payload(std::shared_ptr<HttpPayload> payload) {
class Cb : public HttpPayload::Callback {
public:
Cb(td::actor::ActorId<HttpConnection> conn) : conn_(conn) {
Cb(td::actor::ActorId<HttpConnection> conn, size_t watermark) : conn_(conn), watermark_(watermark) {
}
void run(size_t ready_bytes) override {
if (!reached_ && ready_bytes >= watermark_) {
@ -135,23 +133,23 @@ void HttpConnection::write_payload(std::shared_ptr<HttpPayload> payload) {
}
private:
size_t watermark_ = chunk_size();
bool reached_ = false;
td::actor::ActorId<HttpConnection> conn_;
size_t watermark_;
bool reached_ = false;
};
writing_payload_->add_callback(std::make_unique<Cb>(actor_id(this)));
writing_payload_->add_callback(std::make_unique<Cb>(
actor_id(this), writing_payload_->payload_type() == HttpPayload::PayloadType::pt_tunnel ? 1 : chunk_size()));
continue_payload_write();
}
void HttpConnection::continue_payload_write() {
bool HttpConnection::continue_payload_write() {
if (!writing_payload_) {
return;
return false;
}
if (writing_payload_->is_error()) {
stop();
return;
return false;
}
auto t = writing_payload_->payload_type();
@ -159,19 +157,24 @@ void HttpConnection::continue_payload_write() {
t = HttpPayload::PayloadType::pt_chunked;
}
bool wrote = false;
while (!writing_payload_->written()) {
if (buffered_fd_.left_unwritten() > fd_high_watermark()) {
return;
return wrote;
}
if (!writing_payload_->parse_completed() && writing_payload_->ready_bytes() < chunk_size()) {
return;
bool is_tunnel = writing_payload_->payload_type() == HttpPayload::PayloadType::pt_tunnel;
if (!is_tunnel && !writing_payload_->parse_completed() && writing_payload_->ready_bytes() < chunk_size()) {
return wrote;
}
writing_payload_->store_http(buffered_fd_.output_buffer(), chunk_size(), t);
if (is_tunnel && writing_payload_->ready_bytes() == 0) {
return wrote;
}
wrote |= writing_payload_->store_http(buffered_fd_.output_buffer(), chunk_size(), t);
}
if (writing_payload_->parse_completed() && writing_payload_->written()) {
payload_written();
return;
}
return wrote;
}
td::Status HttpConnection::read_payload(HttpResponse *response) {

View file

@ -65,9 +65,7 @@ class HttpConnection : public td::actor::Actor, public td::ObserverBase {
void send_request(std::unique_ptr<HttpRequest> request, std::shared_ptr<HttpPayload> payload);
void send_response(std::unique_ptr<HttpResponse> response, std::shared_ptr<HttpPayload> payload);
void write_payload(std::shared_ptr<HttpPayload> payload);
void continue_payload_write();
td::Status receive_request();
td::Status receive_response();
bool continue_payload_write();
td::Status read_payload(HttpRequest *request);
td::Status read_payload(HttpResponse *response);
td::Status read_payload(std::shared_ptr<HttpPayload> payload);

View file

@ -44,13 +44,22 @@ void HttpInboundConnection::send_server_error() {
loop();
}
void HttpInboundConnection::send_proxy_error() {
void HttpInboundConnection::send_proxy_error(td::Status error) {
if (error.code() == ErrorCode::timeout) {
static const auto s =
"HTTP/1.1 504 Gateway Timeout\r\n"
"Connection: keep-alive\r\n"
"Content-length: 0\r\n"
"\r\n";
buffered_fd_.output_buffer().append(td::Slice(s, strlen(s)));
} else {
static const auto s =
"HTTP/1.1 502 Bad Gateway\r\n"
"Connection: keep-alive\r\n"
"Content-length: 0\r\n"
"\r\n";
buffered_fd_.output_buffer().append(td::Slice(s, strlen(s)));
}
loop();
}
@ -83,7 +92,7 @@ td::Status HttpInboundConnection::receive(td::ChainBufferReader &input) {
auto a = R.move_as_ok();
td::actor::send_closure(SelfId, &HttpInboundConnection::send_answer, std::move(a.first), std::move(a.second));
} else {
td::actor::send_closure(SelfId, &HttpInboundConnection::send_proxy_error);
td::actor::send_closure(SelfId, &HttpInboundConnection::send_proxy_error, R.move_as_error());
}
});
http_callback_->receive_request(std::move(cur_request_), payload, std::move(P));

View file

@ -35,7 +35,8 @@ class HttpInboundConnection : public HttpConnection {
td::Status receive_eof() override {
found_eof_ = true;
if (reading_payload_) {
if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof) {
if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof &&
reading_payload_->payload_type() != HttpPayload::PayloadType::pt_tunnel) {
return td::Status::Error("unexpected EOF");
} else {
reading_payload_->complete_parse();
@ -53,7 +54,7 @@ class HttpInboundConnection : public HttpConnection {
void send_client_error();
void send_server_error();
void send_proxy_error();
void send_proxy_error(td::Status error);
void payload_written() override {
writing_payload_ = nullptr;

View file

@ -44,7 +44,8 @@ class HttpOutboundConnection : public HttpConnection {
td::Status receive_eof() override {
found_eof_ = true;
if (reading_payload_) {
if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof) {
if (reading_payload_->payload_type() != HttpPayload::PayloadType::pt_eof &&
reading_payload_->payload_type() != HttpPayload::PayloadType::pt_tunnel) {
return td::Status::Error("unexpected EOF");
} else {
LOG(INFO) << "stopping (EOF payload)";

View file

@ -164,6 +164,8 @@ td::Result<std::shared_ptr<HttpPayload>> HttpRequest::create_empty_payload() {
if (!need_payload()) {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_empty);
} else if (method_ == "CONNECT") {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_tunnel, low_watermark(), high_watermark());
} else if (found_content_length_) {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_content_length, low_watermark(), high_watermark(),
content_length_);
@ -175,7 +177,7 @@ td::Result<std::shared_ptr<HttpPayload>> HttpRequest::create_empty_payload() {
}
bool HttpRequest::need_payload() const {
return found_content_length_ || found_transfer_encoding_;
return found_content_length_ || found_transfer_encoding_ || method_ == "CONNECT";
}
td::Status HttpRequest::add_header(HttpHeader header) {
@ -191,9 +193,6 @@ td::Status HttpRequest::add_header(HttpHeader header) {
if (found_transfer_encoding_ || found_content_length_) {
return td::Status::Error("duplicate Content-Length/Transfer-Encoding");
}
if (len > HttpRequest::max_payload_size()) {
return td::Status::Error("too big Content-Length");
}
content_length_ = len;
found_content_length_ = true;
} else if (lc_name == "transfer-encoding") {
@ -284,7 +283,8 @@ td::Status HttpPayload::parse(td::ChainBufferReader &input) {
case PayloadType::pt_empty:
UNREACHABLE();
case PayloadType::pt_eof:
cur_chunk_size_ = 1 << 30;
case PayloadType::pt_tunnel:
cur_chunk_size_ = 1LL << 60;
break;
case PayloadType::pt_chunked:
state_ = ParseState::reading_crlf;
@ -480,17 +480,18 @@ void HttpPayload::run_callbacks() {
}
}
void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type) {
bool HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type) {
if (store_type == PayloadType::pt_empty) {
return;
return false;
}
slice_gc();
bool wrote = false;
while (chunks_.size() > 0 && max_size > 0) {
auto cur_state = state_.load(std::memory_order_consume);
auto s = get_slice(max_size);
if (s.size() == 0) {
if (cur_state != ParseState::reading_trailer && cur_state != ParseState::completed) {
return;
return wrote;
} else {
break;
}
@ -500,28 +501,33 @@ void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt
if (store_type == PayloadType::pt_chunked) {
char buf[64];
::sprintf(buf, "%lx\r\n", s.size());
output.append(td::Slice(buf, strlen(buf)));
auto slice = td::Slice(buf, strlen(buf));
wrote |= !slice.empty();
output.append(slice);
}
wrote |= !s.empty();
output.append(std::move(s));
if (store_type == PayloadType::pt_chunked) {
output.append(td::Slice("\r\n", 2));
wrote = true;
}
}
if (chunks_.size() != 0) {
return;
if (chunks_.size() != 0 || !parse_completed()) {
return wrote;
}
if (!written_zero_chunk_) {
if (store_type == PayloadType::pt_chunked) {
output.append(td::Slice("0\r\n", 3));
wrote = true;
}
written_zero_chunk_ = true;
}
if (store_type != PayloadType::pt_chunked) {
written_trailer_ = true;
return;
return wrote;
}
while (max_size > 0) {
@ -529,15 +535,16 @@ void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt
HttpHeader h = get_header();
if (h.empty()) {
if (cur_state != ParseState::completed) {
return;
return wrote;
} else {
break;
}
}
auto s = h.size();
h.store_http(output);
wrote = true;
if (max_size <= s) {
return;
return wrote;
}
max_size -= s;
}
@ -545,7 +552,9 @@ void HttpPayload::store_http(td::ChainBufferWriter &output, size_t max_size, Htt
if (!written_trailer_) {
output.append(td::Slice("\r\n", 2));
written_trailer_ = true;
wrote = true;
}
return wrote;
}
tl_object_ptr<ton_api::http_payloadPart> HttpPayload::store_tl(size_t max_size) {
@ -729,17 +738,18 @@ td::Result<std::unique_ptr<HttpResponse>> HttpResponse::parse(std::unique_ptr<Ht
}
HttpResponse::HttpResponse(std::string proto_version, td::uint32 code, std::string reason, bool force_no_payload,
bool keep_alive)
bool keep_alive, bool is_tunnel)
: proto_version_(std::move(proto_version))
, code_(code)
, reason_(std::move(reason))
, force_no_payload_(force_no_payload)
, force_no_keep_alive_(!keep_alive) {
, force_no_keep_alive_(!keep_alive)
, is_tunnel_(is_tunnel) {
}
td::Result<std::unique_ptr<HttpResponse>> HttpResponse::create(std::string proto_version, td::uint32 code,
std::string reason, bool force_no_payload,
bool keep_alive) {
bool keep_alive, bool is_tunnel) {
if (proto_version != "HTTP/1.0" && proto_version != "HTTP/1.1") {
return td::Status::Error(PSTRING() << "unsupported http version '" << proto_version << "'");
}
@ -749,7 +759,7 @@ td::Result<std::unique_ptr<HttpResponse>> HttpResponse::create(std::string proto
}
return std::make_unique<HttpResponse>(std::move(proto_version), code, std::move(reason), force_no_payload,
keep_alive);
keep_alive, is_tunnel);
}
td::Status HttpResponse::complete_parse_header() {
@ -767,6 +777,8 @@ td::Result<std::shared_ptr<HttpPayload>> HttpResponse::create_empty_payload() {
if (!need_payload()) {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_empty);
} else if (is_tunnel_) {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_tunnel, low_watermark(), high_watermark());
} else if (found_content_length_) {
return std::make_shared<HttpPayload>(HttpPayload::PayloadType::pt_content_length, low_watermark(), high_watermark(),
content_length_);
@ -794,9 +806,6 @@ td::Status HttpResponse::add_header(HttpHeader header) {
if (found_transfer_encoding_ || found_content_length_) {
return td::Status::Error("duplicate Content-Length/Transfer-Encoding");
}
if (len > HttpRequest::max_payload_size()) {
return td::Status::Error("too big Content-Length");
}
content_length_ = len;
found_content_length_ = true;
} else if (lc_name == "transfer-encoding") {
@ -828,11 +837,13 @@ void HttpResponse::store_http(td::ChainBufferWriter &output) {
for (auto &x : options_) {
x.store_http(output);
}
if (!is_tunnel_) {
if (keep_alive_) {
HttpHeader{"Connection", "Keep-Alive"}.store_http(output);
} else {
HttpHeader{"Connection", "Close"}.store_http(output);
}
}
output.append(td::Slice("\r\n", 2));
}
@ -847,7 +858,7 @@ tl_object_ptr<ton_api::http_response> HttpResponse::store_tl() {
} else {
headers.push_back(HttpHeader{"Connection", "Close"}.store_tl());
}
return create_tl_object<ton_api::http_response>(proto_version_, code_, reason_, std::move(headers));
return create_tl_object<ton_api::http_response>(proto_version_, code_, reason_, std::move(headers), false);
}
td::Status HttpHeader::basic_check() {
@ -893,7 +904,9 @@ void answer_error(HttpStatusCode code, std::string reason,
}
auto response = HttpResponse::create("HTTP/1.0", code, reason, false, false).move_as_ok();
response->add_header(HttpHeader{"Content-Length", "0"});
response->complete_parse_header();
auto payload = response->create_empty_payload().move_as_ok();
payload->complete_parse();
CHECK(payload->parse_completed());
promise.set_value(std::make_pair(std::move(response), std::move(payload)));
}

View file

@ -64,7 +64,7 @@ td::Result<HttpHeader> get_header(std::string line);
class HttpPayload {
public:
enum class PayloadType { pt_empty, pt_eof, pt_chunked, pt_content_length };
enum class PayloadType { pt_empty, pt_eof, pt_chunked, pt_content_length, pt_tunnel };
HttpPayload(PayloadType t, size_t low_watermark, size_t high_watermark, td::uint64 size)
: type_(t), low_watermark_(low_watermark), high_watermark_(high_watermark), cur_chunk_size_(size) {
CHECK(t == PayloadType::pt_content_length);
@ -75,17 +75,15 @@ class HttpPayload {
CHECK(t != PayloadType::pt_content_length);
CHECK(t != PayloadType::pt_empty);
switch (t) {
case PayloadType::pt_empty:
UNREACHABLE();
case PayloadType::pt_eof:
case PayloadType::pt_tunnel:
state_ = ParseState::reading_chunk_data;
break;
case PayloadType::pt_chunked:
state_ = ParseState::reading_chunk_header;
break;
case PayloadType::pt_content_length:
state_ = ParseState::reading_chunk_data;
break;
default:
UNREACHABLE();
}
}
HttpPayload(PayloadType t) : type_(t) {
@ -136,7 +134,7 @@ class HttpPayload {
void slice_gc();
HttpHeader get_header();
void store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type);
bool store_http(td::ChainBufferWriter &output, size_t max_size, HttpPayload::PayloadType store_type);
tl_object_ptr<ton_api::http_payloadPart> store_tl(size_t max_size);
bool written() const {
@ -267,9 +265,11 @@ class HttpResponse {
}
static td::Result<std::unique_ptr<HttpResponse>> create(std::string proto_version, td::uint32 code,
std::string reason, bool force_no_payload, bool keep_alive);
std::string reason, bool force_no_payload, bool keep_alive,
bool is_tunnel = false);
HttpResponse(std::string proto_version, td::uint32 code, std::string reason, bool force_no_payload, bool keep_alive);
HttpResponse(std::string proto_version, td::uint32 code, std::string reason, bool force_no_payload, bool keep_alive,
bool is_tunnel = false);
bool check_parse_header_completed() const;
bool keep_alive() const {
@ -323,6 +323,7 @@ class HttpResponse {
bool keep_alive_ = false;
std::vector<HttpHeader> options_;
bool is_tunnel_ = false;
};
void answer_error(HttpStatusCode code, std::string reason,

View file

@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR)
add_executable(rldp-http-proxy rldp-http-proxy.cpp)
add_executable(rldp-http-proxy rldp-http-proxy.cpp DNSResolver.h TonlibClient.h TonlibClient.cpp DNSResolver.cpp)
target_include_directories(rldp-http-proxy PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/..>)
target_link_libraries(rldp-http-proxy PRIVATE tonhttp rldp dht tonlib git)

View file

@ -0,0 +1,105 @@
/*
This file is part of TON Blockchain source code.
TON Blockchain is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
TON Blockchain is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with TON Blockchain. If not, see <http://www.gnu.org/licenses/>.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
You must obey the GNU General Public License in all respects for all
of the code used other than OpenSSL. If you modify file(s) with this
exception, you may extend this exception to your version of the file(s),
but you are not obligated to do so. If you do not wish to do so, delete this
exception statement from your version. If you delete this exception statement
from all source files in the program, then also delete it here.
*/
#include "DNSResolver.h"
#include "td/utils/overloaded.h"
static const double CACHE_TIMEOUT_HARD = 300.0;
static const double CACHE_TIMEOUT_SOFT = 270.0;
DNSResolver::DNSResolver(td::actor::ActorId<TonlibClient> tonlib_client) : tonlib_client_(std::move(tonlib_client)) {
}
void DNSResolver::start_up() {
auto obj = tonlib_api::make_object<tonlib_api::sync>();
auto P = td::PromiseCreator::lambda([](td::Result<tonlib_api::object_ptr<tonlib_api::Object>>) {});
td::actor::send_closure(tonlib_client_, &TonlibClient::send_request, std::move(obj), std::move(P));
}
void DNSResolver::resolve(std::string host, td::Promise<ton::adnl::AdnlNodeIdShort> promise) {
auto it = cache_.find(host);
if (it != cache_.end()) {
const CacheEntry &entry = it->second;
double now = td::Time::now();
if (now < entry.created_at_ + CACHE_TIMEOUT_HARD) {
promise.set_result(entry.id_);
promise.reset();
if (now < entry.created_at_ + CACHE_TIMEOUT_SOFT) {
return;
}
}
}
td::Bits256 category = td::sha256_bits256(td::Slice("site", 4));
auto obj = tonlib_api::make_object<tonlib_api::dns_resolve>(nullptr, host, category, 16);
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this), promise = std::move(promise), host = std::move(host)](
td::Result<tonlib_api::object_ptr<tonlib_api::Object>> R) mutable {
if (R.is_error()) {
if (promise) {
promise.set_result(R.move_as_error());
}
} else {
auto v = R.move_as_ok();
auto obj = dynamic_cast<tonlib_api::dns_resolved *>(v.get());
if (obj == nullptr) {
promise.set_result(td::Status::Error("invalid response from tonlib"));
return;
}
ton::adnl::AdnlNodeIdShort id;
td::uint32 cnt = 0;
for (auto &e : obj->entries_) {
tonlib_api::downcast_call(*e->entry_.get(),
td::overloaded(
[&](tonlib_api::dns_entryDataAdnlAddress &x) {
if (td::Random::fast(0, cnt) == 0) {
auto R = ton::adnl::AdnlNodeIdShort::parse(x.adnl_address_->adnl_address_);
if (R.is_ok()) {
id = R.move_as_ok();
cnt++;
}
}
},
[&](auto &x) {}));
}
if (cnt == 0) {
if (promise) {
promise.set_error(td::Status::Error("no DNS entries"));
}
} else {
td::actor::send_closure(SelfId, &DNSResolver::save_to_cache, std::move(host), id);
if (promise) {
promise.set_result(id);
}
}
}
});
td::actor::send_closure(tonlib_client_, &TonlibClient::send_request, std::move(obj), std::move(P));
}
void DNSResolver::save_to_cache(std::string host, ton::adnl::AdnlNodeIdShort id) {
CacheEntry &entry = cache_[host];
entry.id_ = id;
entry.created_at_ = td::Time::now();
}

View file

@ -0,0 +1,49 @@
/*
This file is part of TON Blockchain source code.
TON Blockchain is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
TON Blockchain is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with TON Blockchain. If not, see <http://www.gnu.org/licenses/>.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
You must obey the GNU General Public License in all respects for all
of the code used other than OpenSSL. If you modify file(s) with this
exception, you may extend this exception to your version of the file(s),
but you are not obligated to do so. If you do not wish to do so, delete this
exception statement from your version. If you delete this exception statement
from all source files in the program, then also delete it here.
*/
#pragma once
#include "td/actor/actor.h"
#include "TonlibClient.h"
#include "adnl/adnl.h"
#include "td/actor/PromiseFuture.h"
class DNSResolver : public td::actor::Actor {
public:
explicit DNSResolver(td::actor::ActorId<TonlibClient> tonlib_client);
void start_up() override;
void resolve(std::string host, td::Promise<ton::adnl::AdnlNodeIdShort> promise);
private:
void save_to_cache(std::string host, ton::adnl::AdnlNodeIdShort id);
td::actor::ActorId<TonlibClient> tonlib_client_;
struct CacheEntry {
ton::adnl::AdnlNodeIdShort id_;
double created_at_;
};
std::map<std::string, CacheEntry> cache_;
};

View file

@ -0,0 +1,72 @@
/*
This file is part of TON Blockchain source code.
TON Blockchain is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
TON Blockchain is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with TON Blockchain. If not, see <http://www.gnu.org/licenses/>.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
You must obey the GNU General Public License in all respects for all
of the code used other than OpenSSL. If you modify file(s) with this
exception, you may extend this exception to your version of the file(s),
but you are not obligated to do so. If you do not wish to do so, delete this
exception statement from your version. If you delete this exception statement
from all source files in the program, then also delete it here.
*/
#include "TonlibClient.h"
TonlibClient::TonlibClient(ton::tl_object_ptr<tonlib_api::options> options) : options_(std::move(options)) {
}
void TonlibClient::start_up() {
class Cb : public tonlib::TonlibCallback {
public:
explicit Cb(td::actor::ActorId<TonlibClient> self_id) : self_id_(self_id) {
}
void on_result(std::uint64_t id, tonlib_api::object_ptr<tonlib_api::Object> result) override {
td::actor::send_closure(self_id_, &TonlibClient::receive_request_result, id, std::move(result));
}
void on_error(std::uint64_t id, tonlib_api::object_ptr<tonlib_api::error> error) override {
td::actor::send_closure(self_id_, &TonlibClient::receive_request_result, id,
td::Status::Error(error->code_, std::move(error->message_)));
}
private:
td::actor::ActorId<TonlibClient> self_id_;
};
tonlib_client_ = td::actor::create_actor<tonlib::TonlibClient>("tonlibclient", td::make_unique<Cb>(actor_id(this)));
auto init = tonlib_api::make_object<tonlib_api::init>(std::move(options_));
auto P = td::PromiseCreator::lambda([](td::Result<tonlib_api::object_ptr<tonlib_api::Object>> R) mutable {
R.ensure();
});
send_request(std::move(init), std::move(P));
}
void TonlibClient::send_request(tonlib_api::object_ptr<tonlib_api::Function> obj,
td::Promise<tonlib_api::object_ptr<tonlib_api::Object>> promise) {
auto id = next_request_id_++;
CHECK(requests_.emplace(id, std::move(promise)).second);
td::actor::send_closure(tonlib_client_, &tonlib::TonlibClient::request, id, std::move(obj));
}
void TonlibClient::receive_request_result(td::uint64 id, td::Result<tonlib_api::object_ptr<tonlib_api::Object>> R) {
if (id == 0) {
return;
}
auto it = requests_.find(id);
CHECK(it != requests_.end());
auto promise = std::move(it->second);
requests_.erase(it);
promise.set_result(std::move(R));
}

View file

@ -0,0 +1,47 @@
/*
This file is part of TON Blockchain source code.
TON Blockchain is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
TON Blockchain is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with TON Blockchain. If not, see <http://www.gnu.org/licenses/>.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
You must obey the GNU General Public License in all respects for all
of the code used other than OpenSSL. If you modify file(s) with this
exception, you may extend this exception to your version of the file(s),
but you are not obligated to do so. If you do not wish to do so, delete this
exception statement from your version. If you delete this exception statement
from all source files in the program, then also delete it here.
*/
#pragma once
#include "td/actor/actor.h"
#include "auto/tl/tonlib_api.hpp"
#include "tonlib/tonlib/TonlibClient.h"
class TonlibClient : public td::actor::Actor {
public:
explicit TonlibClient(ton::tl_object_ptr<tonlib_api::options> options);
void start_up() override;
void send_request(tonlib_api::object_ptr<tonlib_api::Function> obj,
td::Promise<tonlib_api::object_ptr<tonlib_api::Object>> promise);
private:
void receive_request_result(td::uint64 id, td::Result<tonlib_api::object_ptr<tonlib_api::Object>> R);
ton::tl_object_ptr<tonlib_api::options> options_;
td::actor::ActorOwn<tonlib::TonlibClient> tonlib_client_;
std::map<td::uint64, td::Promise<tonlib_api::object_ptr<tonlib_api::Object>>> requests_;
td::uint64 next_request_id_{1};
};

View file

@ -52,6 +52,11 @@
#include <list>
#include <set>
#include "git.h"
#include "td/utils/BufferedFd.h"
#include "common/delay.h"
#include "TonlibClient.h"
#include "DNSResolver.h"
#if TD_DARWIN || TD_LINUX
#include <unistd.h>
@ -120,16 +125,28 @@ class HttpRemote : public td::actor::Actor {
private:
td::IPAddress addr_;
bool ready_ = false;
bool ready_ = true;
td::actor::ActorOwn<ton::http::HttpClient> client_;
};
td::BufferSlice create_error_response(const std::string &proto_version, int code, const std::string &reason) {
return ton::create_serialize_tl_object<ton::ton_api::http_response>(
proto_version, code, reason, std::vector<ton::tl_object_ptr<ton::ton_api::http_header>>(), true);
}
class HttpRldpPayloadReceiver : public td::actor::Actor {
public:
HttpRldpPayloadReceiver(std::shared_ptr<ton::http::HttpPayload> payload, td::Bits256 transfer_id,
ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id,
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp)
: payload_(std::move(payload)), id_(transfer_id), src_(src), local_id_(local_id), adnl_(adnl), rldp_(rldp) {
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp,
bool is_tunnel = false)
: payload_(std::move(payload))
, id_(transfer_id)
, src_(src)
, local_id_(local_id)
, adnl_(adnl)
, rldp_(rldp)
, is_tunnel_(is_tunnel) {
}
void start_up() override {
@ -178,13 +195,14 @@ class HttpRldpPayloadReceiver : public td::actor::Actor {
auto f = ton::create_serialize_tl_object<ton::ton_api::http_getNextPayloadPart>(
id_, seqno_++, static_cast<td::int32>(chunk_size()));
auto timeout = td::Timestamp::in(is_tunnel_ ? 60.0 : 15.0);
td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, src_, "payload part", std::move(P),
td::Timestamp::in(15.0), std::move(f), 2 * chunk_size() + 1024);
timeout, std::move(f), 2 * chunk_size() + 1024);
}
void add_data(td::BufferSlice data) {
LOG(INFO) << "HttpPayloadReceiver: received answer (size " << data.size() << ")";
auto F = ton::fetch_tl_object<ton::ton_api::http_payloadPart>(std::move(data), true);
auto F = ton::fetch_tl_object<ton::ton_api::http_payloadPart>(data, true);
if (F.is_error()) {
abort_query(F.move_as_error());
return;
@ -243,14 +261,20 @@ class HttpRldpPayloadReceiver : public td::actor::Actor {
bool sent_ = false;
td::int32 seqno_ = 0;
bool is_tunnel_;
};
class HttpRldpPayloadSender : public td::actor::Actor {
public:
HttpRldpPayloadSender(std::shared_ptr<ton::http::HttpPayload> payload, td::Bits256 transfer_id,
ton::adnl::AdnlNodeIdShort local_id, td::actor::ActorId<ton::adnl::Adnl> adnl,
td::actor::ActorId<ton::rldp::Rldp> rldp)
: payload_(std::move(payload)), id_(transfer_id), local_id_(local_id), adnl_(adnl), rldp_(rldp) {
td::actor::ActorId<ton::rldp::Rldp> rldp, bool is_tunnel = false)
: payload_(std::move(payload))
, id_(transfer_id)
, local_id_(local_id)
, adnl_(adnl)
, rldp_(rldp)
, is_tunnel_(is_tunnel) {
}
std::string generate_prefix() const {
@ -287,32 +311,36 @@ class HttpRldpPayloadSender : public td::actor::Actor {
class Cb : public ton::http::HttpPayload::Callback {
public:
Cb(td::actor::ActorId<HttpRldpPayloadSender> id) : self_id_(id) {
Cb(td::actor::ActorId<HttpRldpPayloadSender> id, size_t watermark) : self_id_(id), watermark_(watermark) {
}
void run(size_t ready_bytes) override {
if (!reached_ && ready_bytes >= watermark_) {
reached_ = true;
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query);
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false);
} else if (reached_ && ready_bytes < watermark_) {
reached_ = false;
}
}
void completed() override {
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query);
td::actor::send_closure(self_id_, &HttpRldpPayloadSender::try_answer_query, false);
}
private:
size_t watermark_ = ton::http::HttpRequest::low_watermark();
bool reached_ = false;
td::actor::ActorId<HttpRldpPayloadSender> self_id_;
size_t watermark_;
};
payload_->add_callback(std::make_unique<Cb>(actor_id(this)));
payload_->add_callback(
std::make_unique<Cb>(actor_id(this), is_tunnel_ ? 1 : ton::http::HttpRequest::low_watermark()));
alarm_timestamp() = td::Timestamp::in(10.0);
alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 60.0 : 10.0);
}
void try_answer_query() {
void try_answer_query(bool from_timer = false) {
if (from_timer) {
active_timer_ = false;
}
if (!cur_query_promise_) {
return;
}
@ -321,6 +349,17 @@ class HttpRldpPayloadSender : public td::actor::Actor {
}
if (payload_->parse_completed() || payload_->ready_bytes() >= ton::http::HttpRequest::low_watermark()) {
answer_query();
} else if (!is_tunnel_ || payload_->ready_bytes() == 0) {
return;
} else if (from_timer) {
answer_query();
} else if (!active_timer_) {
active_timer_ = true;
ton::delay_action(
[SelfId = actor_id(this)]() {
td::actor::send_closure(SelfId, &HttpRldpPayloadSender::try_answer_query, true);
},
td::Timestamp::in(0.001));
}
}
@ -348,16 +387,12 @@ class HttpRldpPayloadSender : public td::actor::Actor {
LOG(INFO) << "received request. size=" << cur_query_size_ << " parse_completed=" << payload_->parse_completed()
<< " ready_bytes=" << payload_->ready_bytes();
if (payload_->parse_completed() || payload_->ready_bytes() >= ton::http::HttpRequest::low_watermark()) {
answer_query();
return;
}
alarm_timestamp() = td::Timestamp::in(10.0);
alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 50.0 : 10.0);
try_answer_query(false);
}
void receive_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
auto F = ton::fetch_tl_object<ton::ton_api::http_getNextPayloadPart>(std::move(data), true);
auto F = ton::fetch_tl_object<ton::ton_api::http_getNextPayloadPart>(data, true);
if (F.is_error()) {
LOG(INFO) << "failed to parse query: " << F.move_as_error();
return;
@ -367,6 +402,10 @@ class HttpRldpPayloadSender : public td::actor::Actor {
void alarm() override {
if (cur_query_promise_) {
if (is_tunnel_) {
answer_query();
return;
}
LOG(INFO) << "timeout on inbound connection. closing http transfer";
} else {
LOG(INFO) << "timeout on RLDP connection. closing http transfer";
@ -382,7 +421,7 @@ class HttpRldpPayloadSender : public td::actor::Actor {
}
seqno_++;
alarm_timestamp() = td::Timestamp::in(30.0);
alarm_timestamp() = td::Timestamp::in(is_tunnel_ ? 60.0 : 30.0);
}
void abort_query(td::Status error) {
@ -403,7 +442,6 @@ class HttpRldpPayloadSender : public td::actor::Actor {
td::Bits256 id_;
bool sent_ = false;
td::int32 seqno_ = 0;
ton::adnl::AdnlNodeIdShort local_id_;
@ -412,6 +450,7 @@ class HttpRldpPayloadSender : public td::actor::Actor {
size_t cur_query_size_;
td::Promise<td::BufferSlice> cur_query_promise_;
bool is_tunnel_, active_timer_ = false;
};
class RldpHttpProxy;
@ -423,7 +462,7 @@ class TcpToRldpRequestSender : public td::actor::Actor {
std::shared_ptr<ton::http::HttpPayload> request_payload,
td::Promise<std::pair<std::unique_ptr<ton::http::HttpResponse>, std::shared_ptr<ton::http::HttpPayload>>> promise,
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::dht::Dht> dht,
td::actor::ActorId<ton::rldp::Rldp> rldp, td::actor::ActorId<RldpHttpProxy> proxy)
td::actor::ActorId<ton::rldp::Rldp> rldp, td::actor::ActorId<DNSResolver> dns_resolver)
: local_id_(local_id)
, host_(std::move(host))
, request_(std::move(request))
@ -432,7 +471,7 @@ class TcpToRldpRequestSender : public td::actor::Actor {
, adnl_(adnl)
, dht_(dht)
, rldp_(rldp)
, proxy_(proxy) {
, dns_resolver_(dns_resolver) {
}
void start_up() override {
resolve();
@ -452,7 +491,8 @@ class TcpToRldpRequestSender : public td::actor::Actor {
}
});
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender", request_payload_, id_, local_id_, adnl_, rldp_)
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender", request_payload_, id_, local_id_, adnl_, rldp_,
is_tunnel())
.release();
auto f = ton::serialize_tl_object(request_->store_tl(id_), true);
@ -461,13 +501,14 @@ class TcpToRldpRequestSender : public td::actor::Actor {
}
void got_result(td::BufferSlice data) {
auto F = ton::fetch_tl_object<ton::ton_api::http_response>(std::move(data), true);
auto F = ton::fetch_tl_object<ton::ton_api::http_response>(data, true);
if (F.is_error()) {
abort_query(F.move_as_error());
return;
}
auto f = F.move_as_ok();
auto R = ton::http::HttpResponse::create(f->http_version_, f->status_code_, f->reason_, false, true);
auto R = ton::http::HttpResponse::create(f->http_version_, f->status_code_, f->reason_, f->no_payload_, true,
is_tunnel() && f->status_code_ == 200);
if (R.is_error()) {
abort_query(R.move_as_error());
return;
@ -497,9 +538,13 @@ class TcpToRldpRequestSender : public td::actor::Actor {
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::finished_payload_transfer);
}
});
if (f->no_payload_) {
response_payload_->complete_parse();
} else {
td::actor::create_actor<HttpRldpPayloadReceiver>("HttpPayloadReceiver", response_payload_, id_, dst_, local_id_,
adnl_, rldp_)
adnl_, rldp_, is_tunnel())
.release();
}
promise_.set_value(std::make_pair(std::move(response_), std::move(response_payload_)));
stop();
@ -511,10 +556,15 @@ class TcpToRldpRequestSender : public td::actor::Actor {
void abort_query(td::Status error) {
LOG(INFO) << "aborting http over rldp query: " << error;
promise_.set_error(std::move(error));
stop();
}
protected:
bool is_tunnel() const {
return request_->method() == "CONNECT";
}
td::Bits256 id_;
ton::adnl::AdnlNodeIdShort local_id_;
@ -529,12 +579,216 @@ class TcpToRldpRequestSender : public td::actor::Actor {
td::actor::ActorId<ton::adnl::Adnl> adnl_;
td::actor::ActorId<ton::dht::Dht> dht_;
td::actor::ActorId<ton::rldp::Rldp> rldp_;
td::actor::ActorId<RldpHttpProxy> proxy_;
td::actor::ActorId<DNSResolver> dns_resolver_;
std::unique_ptr<ton::http::HttpResponse> response_;
std::shared_ptr<ton::http::HttpPayload> response_payload_;
};
class RldpTcpTunnel : public td::actor::Actor, private td::ObserverBase {
public:
RldpTcpTunnel(td::Bits256 transfer_id, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id,
td::actor::ActorId<ton::adnl::Adnl> adnl, td::actor::ActorId<ton::rldp::Rldp> rldp, td::SocketFd fd)
: id_(transfer_id)
, src_(src)
, local_id_(local_id)
, adnl_(std::move(adnl))
, rldp_(std::move(rldp))
, fd_(std::move(fd)) {
}
void start_up() override {
self_ = actor_id(this);
td::actor::SchedulerContext::get()->get_poll().subscribe(fd_.get_poll_info().extract_pollable_fd(this),
td::PollFlags::ReadWrite());
class Cb : public ton::adnl::Adnl::Callback {
public:
explicit Cb(td::actor::ActorId<RldpTcpTunnel> id) : self_id_(std::move(id)) {
}
void receive_message(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst,
td::BufferSlice data) override {
LOG(INFO) << "rldp tcp tunnel: dropping message";
}
void receive_query(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) override {
td::actor::send_closure(self_id_, &RldpTcpTunnel::receive_query, std::move(data), std::move(promise));
}
private:
td::actor::ActorId<RldpTcpTunnel> self_id_;
};
td::actor::send_closure(adnl_, &ton::adnl::Adnl::subscribe, local_id_, generate_prefix(),
std::make_unique<Cb>(actor_id(this)));
process();
}
void tear_down() override {
LOG(INFO) << "RldpTcpTunnel: tear_down";
td::actor::send_closure(adnl_, &ton::adnl::Adnl::unsubscribe, local_id_, generate_prefix());
td::actor::SchedulerContext::get()->get_poll().unsubscribe(fd_.get_poll_info().get_pollable_fd_ref());
}
void notify() override {
td::actor::send_closure(self_, &RldpTcpTunnel::process);
}
void request_data() {
if (close_ || sent_request_) {
return;
}
sent_request_ = true;
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<td::BufferSlice> R) {
td::actor::send_closure(SelfId, &RldpTcpTunnel::got_data_from_rldp, std::move(R));
});
auto f = ton::create_serialize_tl_object<ton::ton_api::http_getNextPayloadPart>(id_, out_seqno_++, 1 << 17);
td::actor::send_closure(rldp_, &ton::rldp::Rldp::send_query_ex, local_id_, src_, "payload part", std::move(P),
td::Timestamp::in(60.0), std::move(f), (1 << 18) + 1024);
}
void receive_query(td::BufferSlice data, td::Promise<td::BufferSlice> promise) {
auto F = ton::fetch_tl_object<ton::ton_api::http_getNextPayloadPart>(data, true);
if (F.is_error()) {
LOG(INFO) << "failed to parse query: " << F.error();
promise.set_error(F.move_as_error());
return;
}
auto f = F.move_as_ok();
if (cur_promise_) {
LOG(INFO) << "failed to process query: previous query is active";
promise.set_error(td::Status::Error("previous query is active"));
return;
}
if (f->seqno_ != cur_seqno_) {
LOG(INFO) << "failed to process query: seqno mismatch";
promise.set_error(td::Status::Error("seqno mismatch"));
return;
}
LOG(INFO) << "RldpTcpTunnel: received query, seqno=" << cur_seqno_;
cur_promise_ = std::move(promise);
cur_max_chunk_size_ = f->max_chunk_size_;
alarm_timestamp() = td::Timestamp::in(50.0);
process();
}
void got_data_from_rldp(td::Result<td::BufferSlice> R) {
if (R.is_error()) {
abort(R.move_as_error());
return;
}
td::BufferSlice data = R.move_as_ok();
LOG(INFO) << "RldpTcpTunnel: received data from rldp: size=" << data.size();
sent_request_ = false;
auto F = ton::fetch_tl_object<ton::ton_api::http_payloadPart>(data, true);
if (F.is_error()) {
abort(F.move_as_error());
return;
}
auto f = F.move_as_ok();
fd_.output_buffer().append(std::move(f->data_));
if (f->last_) {
got_last_part_ = true;
}
process();
}
void process() {
if (!close_) {
auto status = [&] {
TRY_STATUS(fd_.flush_read());
TRY_STATUS(fd_.flush_write());
close_ = td::can_close(fd_);
return td::Status::OK();
}();
if (status.is_error()) {
abort(std::move(status));
return;
}
}
if (got_last_part_) {
close_ = true;
}
answer_query();
request_data();
}
void answer_query(bool allow_empty = false, bool from_timer = false) {
if (from_timer) {
active_timer_ = false;
}
auto &input = fd_.input_buffer();
if (cur_promise_ && (!input.empty() || close_ || allow_empty)) {
if (!from_timer && !close_ && !allow_empty && input.size() < ton::http::HttpRequest::low_watermark()) {
if (!active_timer_) {
active_timer_ = true;
ton::delay_action(
[SelfId = actor_id(this)]() {
td::actor::send_closure(SelfId, &RldpTcpTunnel::answer_query, false, true);
},
td::Timestamp::in(0.001));
}
return;
}
size_t s = std::min<size_t>(input.size(), cur_max_chunk_size_);
td::BufferSlice data(s);
LOG(INFO) << "RldpTcpTunnel: sending data to rldp: size=" << data.size();
input.advance(s, td::as_mutable_slice(data));
cur_promise_.set_result(ton::create_serialize_tl_object<ton::ton_api::http_payloadPart>(
std::move(data), std::vector<ton::tl_object_ptr<ton::ton_api::http_header>>(), close_));
++cur_seqno_;
cur_promise_.reset();
alarm_timestamp() = td::Timestamp::never();
if (close_) {
stop();
return;
}
}
}
void alarm() override {
answer_query(true, false);
}
void abort(td::Status status) {
LOG(INFO) << "RldpTcpTunnel error: " << status;
if (cur_promise_) {
cur_promise_.set_error(status.move_as_error());
}
stop();
}
private:
std::string generate_prefix() const {
std::string x(static_cast<size_t>(36), '\0');
auto S = td::MutableSlice{x};
CHECK(S.size() == 36);
auto id = ton::ton_api::http_getNextPayloadPart::ID;
S.copy_from(td::Slice(reinterpret_cast<const td::uint8 *>(&id), 4));
S.remove_prefix(4);
S.copy_from(id_.as_slice());
return x;
}
td::Bits256 id_;
ton::adnl::AdnlNodeIdShort src_;
ton::adnl::AdnlNodeIdShort local_id_;
td::actor::ActorId<ton::adnl::Adnl> adnl_;
td::actor::ActorId<ton::rldp::Rldp> rldp_;
td::BufferedFd<td::SocketFd> fd_;
td::actor::ActorId<RldpTcpTunnel> self_;
td::int32 cur_seqno_ = 0, cur_max_chunk_size_ = 0;
td::Promise<td::BufferSlice> cur_promise_;
td::int32 out_seqno_ = 0;
bool close_ = false, sent_request_ = false, got_last_part_ = false;
bool active_timer_ = false;
};
class RldpToTcpRequestSender : public td::actor::Actor {
public:
RldpToTcpRequestSender(td::Bits256 id, ton::adnl::AdnlNodeIdShort local_id, ton::adnl::AdnlNodeIdShort dst,
@ -570,9 +824,11 @@ class RldpToTcpRequestSender : public td::actor::Actor {
}
void got_result(std::pair<std::unique_ptr<ton::http::HttpResponse>, std::shared_ptr<ton::http::HttpPayload>> R) {
if (R.first->need_payload()) {
td::actor::create_actor<HttpRldpPayloadSender>("HttpPayloadSender(R)", std::move(R.second), id_, local_id_, adnl_,
rldp_)
.release();
}
auto f = ton::serialize_tl_object(R.first->store_tl(), true);
promise_.set_value(std::move(f));
stop();
@ -580,7 +836,7 @@ class RldpToTcpRequestSender : public td::actor::Actor {
void abort_query(td::Status error) {
LOG(INFO) << "aborting http over rldp query: " << error;
promise_.set_error(std::move(error));
promise_.set_result(create_error_response(request_->proto_version(), 502, "Bad Gateway"));
stop();
}
@ -603,8 +859,7 @@ class RldpToTcpRequestSender : public td::actor::Actor {
class RldpHttpProxy : public td::actor::Actor {
public:
RldpHttpProxy() {
}
RldpHttpProxy() = default;
void set_port(td::uint16 port) {
if (port_) {
@ -627,29 +882,8 @@ class RldpHttpProxy : public td::actor::Actor {
client_port_ = port;
}
void set_local_host(std::string name, td::IPAddress remote) {
local_hosts_.emplace_back(std::move(name), std::move(remote));
}
void receive_request_result(td::uint64 id, td::Result<tonlib_api::object_ptr<tonlib_api::Object>> R) {
if (id == 0) {
return;
}
auto it = tonlib_requests_.find(id);
CHECK(it != tonlib_requests_.end());
auto promise = std::move(it->second);
tonlib_requests_.erase(it);
promise.set_result(std::move(R));
}
void send_tonlib_request(tonlib_api::object_ptr<tonlib_api::Function> obj,
td::Promise<tonlib_api::object_ptr<tonlib_api::Object>> promise) {
auto id = next_tonlib_requests_id_++;
CHECK(tonlib_requests_.emplace(id, std::move(promise)).second);
td::actor::send_closure(tonlib_client_, &tonlib::TonlibClient::request, id, std::move(obj));
void set_local_host(std::string host, td::uint16 port, td::IPAddress remote) {
hosts_[host].ports_[port].remote_addr_ = remote;
}
td::Status load_global_config() {
@ -666,29 +900,11 @@ class RldpHttpProxy : public td::actor::Actor {
TRY_RESULT_PREFIX(dht, ton::dht::Dht::create_global_config(std::move(conf.dht_)), "bad [dht] section: ");
dht_config_ = std::move(dht);
class Cb : public tonlib::TonlibCallback {
public:
Cb(td::actor::ActorId<RldpHttpProxy> self_id) : self_id_(self_id) {
}
void on_result(std::uint64_t id, tonlib_api::object_ptr<tonlib_api::Object> result) override {
td::actor::send_closure(self_id_, &RldpHttpProxy::receive_request_result, id, std::move(result));
}
void on_error(std::uint64_t id, tonlib_api::object_ptr<tonlib_api::error> error) override {
td::actor::send_closure(self_id_, &RldpHttpProxy::receive_request_result, id,
td::Status::Error(error->code_, std::move(error->message_)));
}
private:
td::actor::ActorId<RldpHttpProxy> self_id_;
};
tonlib_client_ = td::actor::create_actor<tonlib::TonlibClient>("tonlibclient", td::make_unique<Cb>(actor_id(this)));
return td::Status::OK();
}
void store_dht() {
for (auto &serv : local_hosts_) {
for (auto &serv : hosts_) {
if (serv.first != "*") {
for (auto &serv_id : server_ids_) {
ton::PublicKey key = ton::pubkeys::Unenc{"http." + serv.first};
@ -722,7 +938,7 @@ class RldpHttpProxy : public td::actor::Actor {
{
auto S = load_global_config();
if (S.is_error()) {
LOG(INFO) << S;
LOG(ERROR) << S;
std::_Exit(2);
}
}
@ -749,23 +965,19 @@ class RldpHttpProxy : public td::actor::Actor {
});
td::actor::send_closure(keyring_, &ton::keyring::Keyring::get_public_key, x.pubkey_hash(), std::move(Q));
}
auto Q = td::PromiseCreator::lambda(
[promise = ig.get_promise()](td::Result<tonlib_api::object_ptr<tonlib_api::Object>> R) mutable {
R.ensure();
promise.set_value(td::Unit());
});
auto conf_dataR = td::read_file(global_config_);
conf_dataR.ensure();
auto req = tonlib_api::make_object<tonlib_api::init>(tonlib_api::make_object<tonlib_api::options>(
auto tonlib_options = tonlib_api::make_object<tonlib_api::options>(
tonlib_api::make_object<tonlib_api::config>(conf_dataR.move_as_ok().as_slice().str(), "", false, false),
tonlib_api::make_object<tonlib_api::keyStoreTypeInMemory>()));
send_tonlib_request(std::move(req), std::move(Q));
tonlib_api::make_object<tonlib_api::keyStoreTypeInMemory>());
tonlib_client_ = td::actor::create_actor<TonlibClient>("tonlibclient", std::move(tonlib_options));
dns_resolver_ = td::actor::create_actor<DNSResolver>("dnsresolver", tonlib_client_.get());
}
void run_cont() {
if (is_client_ && local_hosts_.size() > 0) {
if (is_client_ && hosts_.size() > 0) {
LOG(ERROR) << "client-only node cannot be server";
std::_Exit(2);
}
@ -876,9 +1088,6 @@ class RldpHttpProxy : public td::actor::Actor {
ton::adnl::Adnl::int_to_bytestring(ton::ton_api::http_request::ID),
std::make_unique<AdnlCb>(actor_id(this)));
}
for (auto &serv : local_hosts_) {
servers_.emplace(serv.first, td::actor::create_actor<HttpRemote>("remote", serv.second));
}
rldp_ = ton::rldp::Rldp::create(adnl_.get());
td::actor::send_closure(rldp_, &ton::rldp::Rldp::add_id, local_id_);
@ -931,23 +1140,33 @@ class RldpHttpProxy : public td::actor::Actor {
td::actor::create_actor<TcpToRldpRequestSender>("outboundreq", local_id_, host, std::move(request),
std::move(payload), std::move(promise), adnl_.get(), dht_.get(),
rldp_.get(), actor_id(this))
rldp_.get(), dns_resolver_.get())
.release();
}
void receive_rldp_request(ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise) {
LOG(INFO) << "got HTTP request over rldp from " << src;
TRY_RESULT_PROMISE(promise, f, ton::fetch_tl_object<ton::ton_api::http_request>(std::move(data), true));
TRY_RESULT_PROMISE(promise, request, ton::http::HttpRequest::create(f->method_, f->url_, f->http_version_));
TRY_RESULT_PROMISE(promise, f, ton::fetch_tl_object<ton::ton_api::http_request>(data, true));
std::unique_ptr<ton::http::HttpRequest> request;
auto S = [&]() {
TRY_RESULT_ASSIGN(request, ton::http::HttpRequest::create(f->method_, f->url_, f->http_version_));
for (auto &x : f->headers_) {
ton::http::HttpHeader h{x->name_, x->value_};
TRY_STATUS_PROMISE(promise, h.basic_check());
TRY_STATUS(h.basic_check());
request->add_header(std::move(h));
}
TRY_STATUS_PROMISE(promise, request->complete_parse_header());
TRY_STATUS(request->complete_parse_header());
return td::Status::OK();
}();
if (S.is_error()) {
LOG(INFO) << "Failed to parse http request: " << S;
promise.set_result(create_error_response(f->http_version_, 400, "Bad Request"));
return;
}
auto host = request->host();
if (host.size() == 0) {
td::uint16 port = 80;
if (host.empty()) {
host = request->url();
if (host.size() >= 7 && host.substr(0, 7) == "http://") {
host = host.substr(7);
@ -972,29 +1191,69 @@ class RldpHttpProxy : public td::actor::Actor {
{
auto p = host.find(':');
if (p != std::string::npos) {
try {
port = (td::uint16)std::stoul(host.substr(p + 1));
} catch (const std::logic_error &) {
port = 80;
promise.set_result(create_error_response(f->http_version_, 400, "Bad Request"));
return;
}
host = host.substr(0, p);
}
}
std::transform(host.begin(), host.end(), host.begin(), [](unsigned char c) { return std::tolower(c); });
auto it = servers_.find(host);
if (it == servers_.end()) {
it = servers_.find("*");
if (it == servers_.end()) {
promise.set_error(td::Status::Error(ton::ErrorCode::error, "unknown server name"));
auto it = hosts_.find(host);
if (it == hosts_.end()) {
it = hosts_.find("*");
if (it == hosts_.end()) {
promise.set_result(create_error_response(f->http_version_, 502, "Bad Gateway"));
return;
}
}
auto it2 = it->second.ports_.find(port);
if (it2 == it->second.ports_.end()) {
promise.set_result(create_error_response(f->http_version_, 502, "Bad Gateway"));
return;
}
auto &server = it2->second;
if (request->method() == "CONNECT") {
LOG(INFO) << "starting HTTP tunnel over RLDP to " << server.remote_addr_;
start_tcp_tunnel(f->id_, src, dst, f->http_version_, server.remote_addr_, std::move(promise));
return;
}
TRY_RESULT_PROMISE(promise, payload, request->create_empty_payload());
if (server.http_remote_.empty()) {
server.http_remote_ = td::actor::create_actor<HttpRemote>("remote", server.remote_addr_);
}
auto payload = request->create_empty_payload();
if (payload.is_error()) {
promise.set_result(create_error_response(f->http_version_, 502, "Bad Gateway"));
return;
}
LOG(INFO) << "starting HTTP over RLDP request";
td::actor::create_actor<RldpToTcpRequestSender>("inboundreq", f->id_, dst, src, std::move(request),
std::move(payload), std::move(promise), adnl_.get(), rldp_.get(),
it->second.get())
payload.move_as_ok(), std::move(promise), adnl_.get(), rldp_.get(),
server.http_remote_.get())
.release();
}
void start_tcp_tunnel(td::Bits256 id, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort local_id,
std::string http_version, td::IPAddress ip, td::Promise<td::BufferSlice> promise) {
auto fd = td::SocketFd::open(ip);
if (fd.is_error()) {
promise.set_result(create_error_response(http_version, 502, "Bad Gateway"));
return;
}
td::actor::create_actor<RldpTcpTunnel>(td::actor::ActorOptions().with_name("tunnel").with_poll(), id, src, local_id,
adnl_.get(), rldp_.get(), fd.move_as_ok()).release();
promise.set_result(ton::create_serialize_tl_object<ton::ton_api::http_response>(
http_version, 200, "Connection Established", std::vector<ton::tl_object_ptr<ton::ton_api::http_header>>(),
false));
}
void add_adnl_addr(ton::adnl::AdnlNodeIdShort id) {
server_ids_.insert(id);
}
@ -1008,10 +1267,17 @@ class RldpHttpProxy : public td::actor::Actor {
}
private:
struct Host {
struct Server {
td::IPAddress remote_addr_;
td::actor::ActorOwn<HttpRemote> http_remote_;
};
std::map<td::uint16, Server> ports_;
};
td::uint16 port_{0};
td::IPAddress addr_;
std::string global_config_;
std::vector<std::pair<std::string, td::IPAddress>> local_hosts_;
bool is_client_{false};
td::uint16 client_port_{0};
@ -1022,8 +1288,7 @@ class RldpHttpProxy : public td::actor::Actor {
ton::adnl::AdnlNodeIdShort dht_id_;
td::actor::ActorOwn<ton::http::HttpServer> server_;
std::map<std::string, ton::adnl::AdnlNodeIdShort> dns_;
std::map<std::string, td::actor::ActorOwn<HttpRemote>> servers_;
std::map<std::string, Host> hosts_;
td::actor::ActorOwn<ton::keyring::Keyring> keyring_;
td::actor::ActorOwn<ton::adnl::AdnlNetworkManager> adnl_network_manager_;
@ -1036,9 +1301,8 @@ class RldpHttpProxy : public td::actor::Actor {
std::string db_root_ = ".";
bool proxy_all_ = false;
td::actor::ActorOwn<tonlib::TonlibClient> tonlib_client_;
std::map<td::uint64, td::Promise<tonlib_api::object_ptr<tonlib_api::Object>>> tonlib_requests_;
td::uint64 next_tonlib_requests_id_{1};
td::actor::ActorOwn<TonlibClient> tonlib_client_;
td::actor::ActorOwn<DNSResolver> dns_resolver_;
};
void TcpToRldpRequestSender::resolve() {
@ -1053,63 +1317,15 @@ void TcpToRldpRequestSender::resolve() {
resolved(R.move_as_ok());
return;
}
if (false) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<ton::dht::DhtValue> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::abort_query, R.move_as_error());
return;
}
auto value = R.move_as_ok();
if (value.value().size() != 32) {
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::abort_query, td::Status::Error("bad value in dht"));
return;
}
ton::PublicKeyHash h{value.value().as_slice()};
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::resolved, ton::adnl::AdnlNodeIdShort{h});
});
ton::PublicKey key = ton::pubkeys::Unenc{"http." + host_};
ton::dht::DhtKey dht_key{key.compute_short_id(), "http." + host_, 0};
td::actor::send_closure(dht_, &ton::dht::Dht::get_value, std::move(dht_key), std::move(P));
} else {
td::Bits256 category = td::sha256_bits256(td::Slice("site", 4));
auto obj = tonlib_api::make_object<tonlib_api::dns_resolve>(nullptr, host_, category, 16);
auto P =
td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<tonlib_api::object_ptr<tonlib_api::Object>> R) {
auto P = td::PromiseCreator::lambda([SelfId = actor_id(this)](td::Result<ton::adnl::AdnlNodeIdShort> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::abort_query,
R.move_as_error_prefix("failed to resolve: "));
} else {
auto v = R.move_as_ok();
auto obj = static_cast<tonlib_api::dns_resolved *>(v.get());
ton::adnl::AdnlNodeIdShort id;
td::uint32 cnt = 0;
for (auto &e : obj->entries_) {
tonlib_api::downcast_call(
*e->entry_.get(), td::overloaded(
[&](tonlib_api::dns_entryDataAdnlAddress &x) {
if (td::Random::fast(0, cnt) == 0) {
auto R = ton::adnl::AdnlNodeIdShort::parse(x.adnl_address_->adnl_address_);
if (R.is_ok()) {
id = R.move_as_ok();
cnt++;
}
}
},
[&](auto &x) {}));
}
if (cnt == 0) {
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::abort_query,
td::Status::Error(ton::ErrorCode::notready, "failed to resolve"));
} else {
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::resolved, id);
}
td::actor::send_closure(SelfId, &TcpToRldpRequestSender::resolved, R.move_as_ok());
}
});
td::actor::send_closure(proxy_, &RldpHttpProxy::send_tonlib_request, std::move(obj), std::move(P));
}
td::actor::send_closure(dns_resolver_, &DNSResolver::resolve, host_, std::move(P));
}
int main(int argc, char *argv[]) {
@ -1123,6 +1339,42 @@ int main(int argc, char *argv[]) {
td::log_interface = td::default_log_interface;
};
auto add_local_host = [&](const std::string& local, const std::string& remote) -> td::Status {
std::string host;
std::vector<td::uint16> ports;
auto p = local.find(':');
if (p == std::string::npos) {
host = local;
ports = {80, 443};
} else {
host = local.substr(0, p);
++p;
while (p < local.size()) {
auto p2 = local.find(',', p);
if (p2 == std::string::npos) {
p2 = local.size();
}
try {
ports.push_back((td::uint16)std::stoul(local.substr(p, p2 - p)));
} catch (const std::logic_error& e) {
return td::Status::Error(PSLICE() << "Invalid port: " << local.substr(p, p2 - p));
}
p = p2 + 1;
}
}
for (td::uint16 port : ports) {
std::string cur_remote = remote;
if (cur_remote.find(':') == std::string::npos) {
cur_remote += ':';
cur_remote += std::to_string(port);
}
td::IPAddress addr;
TRY_STATUS(addr.init_host_port(cur_remote));
td::actor::send_closure(x, &RldpHttpProxy::set_local_host, host, port, addr);
}
return td::Status::OK();
};
td::OptionParser p;
p.set_description(
"A simple rldp-to-http and http-to-rldp proxy for running and accessing ton sites\n"
@ -1136,7 +1388,8 @@ int main(int argc, char *argv[]) {
SET_VERBOSITY_LEVEL(v);
});
p.add_option('V', "version", "shows rldp-http-proxy build information", [&]() {
std::cout << "rldp-http-proxy build information: [ Commit: " << GitMetadata::CommitSHA1() << ", Date: " << GitMetadata::CommitDate() << "]\n";
std::cout << "rldp-http-proxy build information: [ Commit: " << GitMetadata::CommitSHA1()
<< ", Date: " << GitMetadata::CommitDate() << "]\n";
std::exit(0);
});
p.add_option('h', "help", "prints a help message", [&]() {
@ -1170,27 +1423,25 @@ int main(int argc, char *argv[]) {
});
p.add_option('C', "global-config", "global TON configuration file",
[&](td::Slice arg) { td::actor::send_closure(x, &RldpHttpProxy::set_global_config, arg.str()); });
p.add_checked_option('L', "local", "http hostname that will be proxied to http server at localhost:80",
p.add_checked_option('L', "local",
"<hosthame>:<ports>, hostname that will be proxied to localhost\n"
"<ports> is a comma-separated list of ports (may be omitted, default: 80, 443)\n",
[&](td::Slice arg) -> td::Status {
td::IPAddress addr;
TRY_STATUS(addr.init_ipv4_port("127.0.0.1", 80));
td::actor::send_closure(x, &RldpHttpProxy::set_local_host, arg.str(), addr);
return td::Status::OK();
return add_local_host(arg.str(), "127.0.0.1");
});
p.add_option('D', "db", "db root",
[&](td::Slice arg) { td::actor::send_closure(x, &RldpHttpProxy::set_db_root, arg.str()); });
p.add_checked_option(
'R', "remote",
"<hostname>@<ip>:<port>, indicates a http hostname that will be proxied to remote http server at <ip>:<port>",
"<hostname>:<ports>@<ip>:<port>, indicates a hostname that will be proxied to remote server at <ip>:<port>\n"
"<ports> is a comma-separated list of ports (may be omitted, default: 80,433)\n"
"<port> is a remote port (may be omitted, default: same as host's port)",
[&](td::Slice arg) -> td::Status {
auto ch = arg.find('@');
if (ch == td::Slice::npos) {
return td::Status::Error("bad format for --remote");
}
td::IPAddress addr;
TRY_STATUS(addr.init_host_port(arg.substr(ch + 1).str()));
td::actor::send_closure(x, &RldpHttpProxy::set_local_host, arg.substr(0, ch).str(), addr);
return td::Status::OK();
return add_local_host(arg.substr(0, ch).str(), arg.substr(ch + 1).str());
});
p.add_option('d', "daemonize", "set SIGHUP", [&]() {
td::set_signal_handler(td::SignalType::HangUp, [](int sig) {

View file

@ -705,7 +705,7 @@ storage.queryPrefix id:int256 = Object;
http.header name:string value:string = http.Header;
http.payloadPart data:bytes trailer:(vector http.header) last:Bool = http.PayloadPart;
http.response http_version:string status_code:int reason:string headers:(vector http.header) = http.Response;
http.response http_version:string status_code:int reason:string headers:(vector http.header) no_payload:Bool = http.Response;
---functions---

Binary file not shown.