1
0
Fork 0
mirror of https://github.com/ton-blockchain/ton synced 2025-03-09 15:40:10 +00:00

Merge pull request #1547 from xssnick/tun

ADNL Tunnel integration
This commit is contained in:
EmelyanenkoK 2025-03-08 14:38:03 +03:00 committed by GitHub
commit e1bf68bbfd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 480 additions and 12 deletions

1
.gitignore vendored
View file

@ -24,3 +24,4 @@ libsodium-1.0.18-stable-msvc.zip
libmicrohttpd-0.9.77-w32-bin.zip libmicrohttpd-0.9.77-w32-bin.zip
openssl-3.1.4.zip openssl-3.1.4.zip
readline-5.0-1-lib.zip readline-5.0-1-lib.zip
libtunnel.a

View file

@ -105,6 +105,21 @@ option(TON_USE_ASAN "Use \"ON\" to enable AddressSanitizer." OFF)
option(TON_USE_TSAN "Use \"ON\" to enable ThreadSanitizer." OFF) option(TON_USE_TSAN "Use \"ON\" to enable ThreadSanitizer." OFF)
option(TON_USE_UBSAN "Use \"ON\" to enable UndefinedBehaviorSanitizer." OFF) option(TON_USE_UBSAN "Use \"ON\" to enable UndefinedBehaviorSanitizer." OFF)
set(TON_ARCH "native" CACHE STRING "Architecture, will be passed to -march=") set(TON_ARCH "native" CACHE STRING "Architecture, will be passed to -march=")
option(TON_USE_GO_TUNNEL "Use \"ON\" to enable ADNL Tunnel over shared Go library." OFF)
if (TON_USE_GO_TUNNEL)
set(TUNNEL_GO_LIB_PATH "${CMAKE_CURRENT_SOURCE_DIR}/libtunnel.a")
if (EXISTS "${TUNNEL_GO_LIB_PATH}")
message(STATUS "Found ADNL Tunnel library (Go): ${TUNNEL_GO_LIB_PATH}")
add_library(tunnel STATIC IMPORTED)
set_target_properties(tunnel PROPERTIES IMPORTED_LOCATION "${TUNNEL_GO_LIB_PATH}")
set(TUNNEL_LIB_IF_USED "tunnel")
add_compile_definitions(TON_USE_GO_TUNNEL)
else()
message(FATAL_ERROR "Missing ADNL Tunnel library (Go), but enabled: ${TUNNEL_GO_LIB_PATH}")
endif()
endif()
#BEGIN M1 support #BEGIN M1 support
EXECUTE_PROCESS( COMMAND uname -m COMMAND tr -d '\n' OUTPUT_VARIABLE ARCHITECTURE ) EXECUTE_PROCESS( COMMAND uname -m COMMAND tr -d '\n' OUTPUT_VARIABLE ARCHITECTURE )

View file

@ -149,3 +149,21 @@ Linux and MacOS binaries are available for both x86-64 and arm64 architectures.
## Running tests ## Running tests
Tests are executed by running `ctest` in the build directory. See `doc/Tests.md` for more information. Tests are executed by running `ctest` in the build directory. See `doc/Tests.md` for more information.
## Using ADNL tunnel
### At node compilation
1. Clone https://github.com/ton-blockchain/adnl-tunnel and install golang 1.23.3 or newer
* `cd adnl-tunnel`
* `make library`
* It will build `libtunnel.a`
2. Copy `libtunnel.a` to ton src directory root (usually `/usr/src/ton`).
3. On the first step of ton node compilation run cmake with option `-DTON_USE_GO_TUNNEL=ON` to enable tunnel.
4. Build ton node as usual.
### At node startup
1. Run validator-engine with `--tunnel-config /path/to/tunnel-config.json` startup argument.
2. It will create example tunnel config file at specified path (`/path/to/tunnel-config.json`).
3. Fill it with desired tunnel configuration, enable payments and top up wallet address if needed.
4. Run validator-engine `--tunnel-config /path/to/tunnel-config.json` again, and follow instructions in console if any.
5. When setup is completed and node started, you can stop it and run in daemon mode as usual.

View file

@ -76,6 +76,46 @@ size_t AdnlNetworkManagerImpl::add_listening_udp_port(td::uint16 port) {
return idx; return idx;
} }
#define TUNNEL_FAKE_PORT 1
size_t AdnlNetworkManagerImpl::add_tunnel_udp_port(std::string global_config, std::string tunnel_config, td::Promise<td::IPAddress> on_ready,
td::actor::Scheduler *scheduler) {
auto it = port_2_socket_.find(TUNNEL_FAKE_PORT);
if (it != port_2_socket_.end()) {
return it->second;
}
class Callback : public td::UdpServer::TunnelCallback {
public:
Callback(td::actor::ActorShared<AdnlNetworkManagerImpl> manager, size_t idx, td::actor::Scheduler *scheduler, TunnelEventsHandler* tunnel_events_handler)
: manager_(std::move(manager)), idx_(idx), scheduler_(scheduler), tunnel_events_handler_(tunnel_events_handler) {
}
private:
TunnelEventsHandler* tunnel_events_handler_;
td::actor::ActorShared<AdnlNetworkManagerImpl> manager_;
size_t idx_;
td::actor::Scheduler *scheduler_;
void on_udp_message(td::UdpMessage udp_message) override {
scheduler_->run_in_context_external([&] {
td::actor::send_closure_later(manager_, &AdnlNetworkManagerImpl::receive_udp_message, std::move(udp_message),
idx_);
});
}
void on_in_addr_update(const td::IPAddress ip) override {
tunnel_events_handler_->on_in_addr_update(ip);
}
};
auto idx = udp_sockets_.size();
auto X = td::UdpServer::create_via_tunnel("udp tunnel server", global_config, tunnel_config,
std::make_unique<Callback>(actor_shared(this), idx, scheduler, tunnel_events_handler_.get()),
std::move(on_ready));
X.ensure();
port_2_socket_[TUNNEL_FAKE_PORT] = idx;
udp_sockets_.push_back(UdpSocketDesc{TUNNEL_FAKE_PORT, X.move_as_ok()});
return idx;
}
void AdnlNetworkManagerImpl::add_self_addr(td::IPAddress addr, AdnlCategoryMask cat_mask, td::uint32 priority) { void AdnlNetworkManagerImpl::add_self_addr(td::IPAddress addr, AdnlCategoryMask cat_mask, td::uint32 priority) {
auto port = td::narrow_cast<td::uint16>(addr.get_port()); auto port = td::narrow_cast<td::uint16>(addr.get_port());
size_t idx = add_listening_udp_port(port); size_t idx = add_listening_udp_port(port);
@ -92,6 +132,23 @@ void AdnlNetworkManagerImpl::add_self_addr(td::IPAddress addr, AdnlCategoryMask
out_desc_[priority].push_back(std::move(d)); out_desc_[priority].push_back(std::move(d));
} }
void AdnlNetworkManagerImpl::add_tunnel(std::string global_config, std::string tunnel_config, AdnlCategoryMask cat_mask, td::uint32 priority,
td::Promise<td::IPAddress> on_ready, td::actor::Scheduler *scheduler) {
size_t idx = add_tunnel_udp_port(global_config, tunnel_config, std::move(on_ready), scheduler);
add_in_addr(InDesc{TUNNEL_FAKE_PORT, nullptr, cat_mask}, idx);
auto d = OutDesc{TUNNEL_FAKE_PORT, td::IPAddress{}, nullptr, idx};
for (auto &it : out_desc_[priority]) {
if (it == d) {
it.cat_mask |= cat_mask;
return;
}
}
d.cat_mask = cat_mask;
out_desc_[priority].push_back(std::move(d));
}
void AdnlNetworkManagerImpl::add_proxy_addr(td::IPAddress addr, td::uint16 local_port, std::shared_ptr<AdnlProxy> proxy, void AdnlNetworkManagerImpl::add_proxy_addr(td::IPAddress addr, td::uint16 local_port, std::shared_ptr<AdnlProxy> proxy,
AdnlCategoryMask cat_mask, td::uint32 priority) { AdnlCategoryMask cat_mask, td::uint32 priority) {
size_t idx = add_listening_udp_port(local_port); size_t idx = add_listening_udp_port(local_port);

View file

@ -59,17 +59,25 @@ class AdnlNetworkManager : public td::actor::Actor {
public: public:
//using ConnHandle = td::uint64; //using ConnHandle = td::uint64;
class Callback { class Callback {
public: public:
virtual ~Callback() = default; virtual ~Callback() = default;
//virtual void receive_packet(td::IPAddress addr, ConnHandle conn_handle, td::BufferSlice data) = 0; //virtual void receive_packet(td::IPAddress addr, ConnHandle conn_handle, td::BufferSlice data) = 0;
virtual void receive_packet(td::IPAddress addr, AdnlCategoryMask cat_mask, td::BufferSlice data) = 0; virtual void receive_packet(td::IPAddress addr, AdnlCategoryMask cat_mask, td::BufferSlice data) = 0;
}; };
class TunnelEventsHandler {
public:
virtual ~TunnelEventsHandler() = default;
virtual void on_in_addr_update(td::IPAddress ip) = 0;
};
static td::actor::ActorOwn<AdnlNetworkManager> create(td::uint16 out_port); static td::actor::ActorOwn<AdnlNetworkManager> create(td::uint16 out_port);
virtual ~AdnlNetworkManager() = default; virtual ~AdnlNetworkManager() = default;
virtual void install_callback(std::unique_ptr<Callback> callback) = 0; virtual void install_callback(std::unique_ptr<Callback> callback) = 0;
virtual void install_tunnel_events_handler(std::unique_ptr<TunnelEventsHandler> handler) = 0;
virtual void add_tunnel(std::string global_config, std::string tunnel_config, AdnlCategoryMask cat_mask, td::uint32 priority,
td::Promise<td::IPAddress> on_ready, td::actor::Scheduler *scheduler) = 0;
virtual void add_self_addr(td::IPAddress addr, AdnlCategoryMask cat_mask, td::uint32 priority) = 0; virtual void add_self_addr(td::IPAddress addr, AdnlCategoryMask cat_mask, td::uint32 priority) = 0;
virtual void add_proxy_addr(td::IPAddress addr, td::uint16 local_port, std::shared_ptr<AdnlProxy> proxy, virtual void add_proxy_addr(td::IPAddress addr, td::uint16 local_port, std::shared_ptr<AdnlProxy> proxy,
AdnlCategoryMask cat_mask, td::uint32 priority) = 0; AdnlCategoryMask cat_mask, td::uint32 priority) = 0;

View file

@ -95,6 +95,10 @@ class AdnlNetworkManagerImpl : public AdnlNetworkManager {
size_t in_desc{std::numeric_limits<size_t>::max()}; size_t in_desc{std::numeric_limits<size_t>::max()};
bool allow_proxy{false}; bool allow_proxy{false};
}; };
struct TunnelDesc {
size_t index{};
td::IPAddress address;
};
OutDesc *choose_out_iface(td::uint8 cat, td::uint32 priority); OutDesc *choose_out_iface(td::uint8 cat, td::uint32 priority);
@ -105,6 +109,10 @@ class AdnlNetworkManagerImpl : public AdnlNetworkManager {
callback_ = std::move(callback); callback_ = std::move(callback);
} }
void install_tunnel_events_handler(std::unique_ptr<TunnelEventsHandler> handler) override {
tunnel_events_handler_ = std::move(handler);
}
void alarm() override; void alarm() override;
void start_up() override { void start_up() override {
alarm_timestamp() = td::Timestamp::in(60.0); alarm_timestamp() = td::Timestamp::in(60.0);
@ -127,6 +135,8 @@ class AdnlNetworkManagerImpl : public AdnlNetworkManager {
in_desc_.push_back(std::move(desc)); in_desc_.push_back(std::move(desc));
} }
void add_tunnel(std::string global_config, std::string tunnel_config, AdnlCategoryMask cat_mask, td::uint32 priority, td::Promise<td::IPAddress> on_ready,
td::actor::Scheduler *scheduler) override;
void add_self_addr(td::IPAddress addr, AdnlCategoryMask cat_mask, td::uint32 priority) override; void add_self_addr(td::IPAddress addr, AdnlCategoryMask cat_mask, td::uint32 priority) override;
void add_proxy_addr(td::IPAddress addr, td::uint16 local_port, std::shared_ptr<AdnlProxy> proxy, void add_proxy_addr(td::IPAddress addr, td::uint16 local_port, std::shared_ptr<AdnlProxy> proxy,
AdnlCategoryMask cat_mask, td::uint32 priority) override; AdnlCategoryMask cat_mask, td::uint32 priority) override;
@ -141,12 +151,14 @@ class AdnlNetworkManagerImpl : public AdnlNetworkManager {
} }
} }
size_t add_tunnel_udp_port(std::string global_config, std::string tunnel_config, td::Promise<td::IPAddress> on_ready, td::actor::Scheduler *scheduler);
size_t add_listening_udp_port(td::uint16 port); size_t add_listening_udp_port(td::uint16 port);
void receive_udp_message(td::UdpMessage message, size_t idx); void receive_udp_message(td::UdpMessage message, size_t idx);
void proxy_register(OutDesc &desc); void proxy_register(OutDesc &desc);
private: private:
std::unique_ptr<Callback> callback_; std::unique_ptr<Callback> callback_;
std::unique_ptr<TunnelEventsHandler> tunnel_events_handler_;
std::map<td::uint32, std::vector<OutDesc>> out_desc_; std::map<td::uint32, std::vector<OutDesc>> out_desc_;
std::vector<InDesc> in_desc_; std::vector<InDesc> in_desc_;

View file

@ -34,11 +34,16 @@ class TestLoopbackNetworkManager : public ton::adnl::AdnlNetworkManager {
callback_ = std::move(callback); callback_ = std::move(callback);
} }
void install_tunnel_events_handler(std::unique_ptr<TunnelEventsHandler> handler) override {};
void add_self_addr(td::IPAddress addr, AdnlCategoryMask cat_mask, td::uint32 priority) override { void add_self_addr(td::IPAddress addr, AdnlCategoryMask cat_mask, td::uint32 priority) override {
} }
void add_proxy_addr(td::IPAddress addr, td::uint16 local_port, std::shared_ptr<AdnlProxy> proxy, void add_proxy_addr(td::IPAddress addr, td::uint16 local_port, std::shared_ptr<AdnlProxy> proxy,
AdnlCategoryMask cat_mask, td::uint32 priority) override { AdnlCategoryMask cat_mask, td::uint32 priority) override {
} }
void add_tunnel(std::string global_config, std::string tunnel_config, AdnlCategoryMask cat_mask, td::uint32 priority, td::Promise<td::IPAddress> on_ready,
td::actor::Scheduler* scheduler) override {
}
void send_udp_packet(ton::adnl::AdnlNodeIdShort src_id, ton::adnl::AdnlNodeIdShort dst_id, td::IPAddress dst_addr, void send_udp_packet(ton::adnl::AdnlNodeIdShort src_id, ton::adnl::AdnlNodeIdShort dst_id, td::IPAddress dst_addr,
td::uint32 priority, td::BufferSlice data) override { td::uint32 priority, td::BufferSlice data) override {
if (allowed_sources_.count(src_id) == 0 || allowed_destinations_.count(dst_id) == 0) { if (allowed_sources_.count(src_id) == 0 || allowed_destinations_.count(dst_id) == 0) {

View file

@ -12,7 +12,7 @@ set(TDNET_SOURCE
add_library(tdnet STATIC ${TDNET_SOURCE}) add_library(tdnet STATIC ${TDNET_SOURCE})
target_include_directories(tdnet PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>) target_include_directories(tdnet PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>)
target_link_libraries(tdnet PUBLIC tdactor) target_link_libraries(tdnet PUBLIC tdactor ${TUNNEL_LIB_IF_USED})
add_executable(tcp_ping_pong example/tcp_ping_pong.cpp) add_executable(tcp_ping_pong example/tcp_ping_pong.cpp)
target_link_libraries(tcp_ping_pong PRIVATE tdactor tdnet) target_link_libraries(tcp_ping_pong PRIVATE tdactor tdnet)

View file

@ -20,7 +20,12 @@
#include "td/net/FdListener.h" #include "td/net/FdListener.h"
#include "td/net/TcpListener.h" #include "td/net/TcpListener.h"
#ifdef TON_USE_GO_TUNNEL
#include "td/net/tunnel/libtunnel.h"
#endif
#include "td/utils/BufferedFd.h" #include "td/utils/BufferedFd.h"
#include "td/utils/filesystem.h"
#include <map> #include <map>
@ -29,6 +34,152 @@ namespace {
int VERBOSITY_NAME(udp_server) = VERBOSITY_NAME(DEBUG) + 10; int VERBOSITY_NAME(udp_server) = VERBOSITY_NAME(DEBUG) + 10;
} }
namespace detail { namespace detail {
#define TUNNEL_BUFFER_SZ_PACKETS 100
#define TUNNEL_MAX_PACKET_MTU 1500
#define TUNNEL_ALARM_EVERY 0.01
class UdpServerTunnelImpl : public UdpServer {
public:
void start_up() override;
void alarm() override;
void send(td::UdpMessage &&message) override;
static td::actor::ActorOwn<UdpServerTunnelImpl> create(td::Slice name, std::string global_config, std::string tunnel_config, std::unique_ptr<TunnelCallback> callback,
td::Promise<td::IPAddress> on_ready);
UdpServerTunnelImpl(std::string global_config, std::string tunnel_config, std::unique_ptr<TunnelCallback> callback, td::Promise<td::IPAddress> on_ready);
private:
td::Promise<td::IPAddress> on_ready_;
uint8_t out_buf_[(sizeof(sockaddr)+2+TUNNEL_MAX_PACKET_MTU)*TUNNEL_BUFFER_SZ_PACKETS];
size_t out_buf_offset_ = 0;
size_t out_buf_msg_num_ = 0;
size_t tunnel_index_;
double last_batch_at_ = Time::now();
std::string global_config_;
std::string tunnel_config_;
int32 port_;
std::unique_ptr<TunnelCallback> callback_;
static void on_recv_batch(void *next, uint8_t *data, size_t num);
static void on_reinit(void *next, sockaddr *addr);
};
void UdpServerTunnelImpl::send(td::UdpMessage &&message) {
const auto sock = message.address.get_sockaddr();
const auto sz = message.data.size();
// ip+port
memcpy(out_buf_ + out_buf_offset_, sock, sizeof(sockaddr));
out_buf_offset_ += sizeof(sockaddr);
// data len (2 bytes)
out_buf_[out_buf_offset_] = static_cast<uint8_t>(sz >> 8);
out_buf_[out_buf_offset_ + 1] = static_cast<uint8_t>(sz & 0xff);
if (sz > TUNNEL_MAX_PACKET_MTU) {
LOG(WARNING) << "udp message is too big, dropping";
return;
}
memcpy(out_buf_ + out_buf_offset_ + 2, message.data.data(), sz);
out_buf_offset_ += 2 + sz;
out_buf_msg_num_++;
if (out_buf_msg_num_ == TUNNEL_BUFFER_SZ_PACKETS) {
#ifdef TON_USE_GO_TUNNEL
WriteTunnel(tunnel_index_, out_buf_, out_buf_msg_num_);
LOG(DEBUG) << "Sending messages by fulfillment " << TUNNEL_BUFFER_SZ_PACKETS;
#endif
out_buf_offset_ = 0;
out_buf_msg_num_ = 0;
last_batch_at_ = Time::now();
}
}
void UdpServerTunnelImpl::alarm() {
if (out_buf_msg_num_ > 0 && Time::now()-last_batch_at_ >= TUNNEL_ALARM_EVERY) {
#ifdef TON_USE_GO_TUNNEL
WriteTunnel(tunnel_index_, out_buf_, out_buf_msg_num_);
LOG(DEBUG) << "Sending messages by alarm " << out_buf_msg_num_;
#endif
out_buf_offset_ = 0;
out_buf_msg_num_ = 0;
last_batch_at_ = Time::now();
}
alarm_timestamp() = td::Timestamp::in(TUNNEL_ALARM_EVERY);
}
void UdpServerTunnelImpl::start_up() {
#ifdef TON_USE_GO_TUNNEL
auto global_conf_data_R = td::read_file(global_config_);
if (global_conf_data_R.is_error()) {
LOG(FATAL) << global_conf_data_R.move_as_error_prefix("failed to read global config: ");
return;
}
auto global_cfg = global_conf_data_R.move_as_ok();
LOG(INFO) << "Initializing ADNL Tunnel...";
const auto res = PrepareTunnel(&on_recv_batch, &on_reinit, callback_.get(), callback_.get(), tunnel_config_.data(), tunnel_config_.size(), global_cfg.data(), global_cfg.size());
if (!res.index) {
// the reason will be displayed in logs from lib part
exit(1);
}
tunnel_index_ = res.index;
LOG(INFO) << "ADNL Tunnel Initialized";
td:IPAddress ip;
ip.init_ipv4_port(td::IPAddress::ipv4_to_str(res.ip), static_cast<td::uint16>(res.port)).ensure();
on_ready_.set_value(std::move(ip));
alarm_timestamp() = td::Timestamp::in(TUNNEL_ALARM_EVERY);
#else
LOG(FATAL) << "Tunnel was not enabled during node building, rebuild with cmake flag -DTON_USE_GO_TUNNEL=ON";
#endif
}
void UdpServerTunnelImpl::on_recv_batch(void *next, uint8_t *data, size_t num) {
for (size_t i = 0; i < num; i++) {
UdpMessage msg{};
msg.address.init_sockaddr(reinterpret_cast<sockaddr *>(data));
const uint16_t len = (static_cast<uint16_t>(data[16]) << 8) + static_cast<uint16_t>(data[17]);
msg.data = BufferSlice(reinterpret_cast<const char *>(data + 18), len);
data += 18+len;
// both init_sockaddr and BufferSlice doing memcpy so it is safe
static_cast<TunnelCallback*>(next)->on_udp_message(std::move(msg));
}
}
void UdpServerTunnelImpl::on_reinit(void *next, sockaddr *addr) {
td::IPAddress ip;
ip.init_sockaddr(addr);
static_cast<TunnelCallback*>(next)->on_in_addr_update(std::move(ip));
}
td::actor::ActorOwn<UdpServerTunnelImpl> UdpServerTunnelImpl::create(td::Slice name, std::string global_config, std::string tunnel_config,
std::unique_ptr<TunnelCallback> callback,
td::Promise<td::IPAddress> on_ready) {
return td::actor::create_actor<UdpServerTunnelImpl>(
actor::ActorOptions().with_name(name).with_poll(!td::Poll::is_edge_triggered()), global_config, tunnel_config, std::move(callback), std::move(on_ready));
}
UdpServerTunnelImpl::UdpServerTunnelImpl(std::string global_config, std::string tunnel_config, std::unique_ptr<TunnelCallback> callback, td::Promise<td::IPAddress> on_ready): on_ready_(std::move(on_ready))
, global_config_(global_config)
, tunnel_config_(tunnel_config)
, callback_(std::move(callback)) {
}
class UdpServerImpl : public UdpServer { class UdpServerImpl : public UdpServer {
public: public:
void send(td::UdpMessage &&message) override; void send(td::UdpMessage &&message) override;
@ -396,6 +547,13 @@ Result<actor::ActorOwn<UdpServer>> UdpServer::create(td::Slice name, int32 port,
fd.maximize_rcv_buffer().ensure(); fd.maximize_rcv_buffer().ensure();
return detail::UdpServerImpl::create(name, std::move(fd), std::move(callback)); return detail::UdpServerImpl::create(name, std::move(fd), std::move(callback));
} }
Result<actor::ActorOwn<UdpServer>> UdpServer::create_via_tunnel(td::Slice name, std::string global_config, std::string tunnel_config,
std::unique_ptr<TunnelCallback> callback,
td::Promise<td::IPAddress> on_ready) {
return detail::UdpServerTunnelImpl::create(name, global_config, tunnel_config, std::move(callback), std::move(on_ready));
}
Result<actor::ActorOwn<UdpServer>> UdpServer::create_via_tcp(td::Slice name, int32 port, Result<actor::ActorOwn<UdpServer>> UdpServer::create_via_tcp(td::Slice name, int32 port,
std::unique_ptr<Callback> callback) { std::unique_ptr<Callback> callback) {
return actor::create_actor<detail::UdpServerViaTcp>(name, port, std::move(callback)); return actor::create_actor<detail::UdpServerViaTcp>(name, port, std::move(callback));

View file

@ -28,15 +28,23 @@ namespace td {
class UdpServer : public td::actor::Actor { class UdpServer : public td::actor::Actor {
public: public:
class Callback { class Callback {
public: public:
virtual ~Callback() = default; virtual ~Callback() = default;
virtual void on_udp_message(td::UdpMessage udp_message) = 0; virtual void on_udp_message(td::UdpMessage udp_message) = 0;
}; };
class TunnelCallback : public Callback {
public:
virtual void on_in_addr_update(td::IPAddress ip) = 0;
};
virtual void send(td::UdpMessage &&message) = 0; virtual void send(td::UdpMessage &&message) = 0;
static Result<actor::ActorOwn<UdpServer>> create(td::Slice name, int32 port, std::unique_ptr<Callback> callback); static Result<actor::ActorOwn<UdpServer>> create(td::Slice name, int32 port, std::unique_ptr<Callback> callback);
static Result<actor::ActorOwn<UdpServer>> create_via_tcp(td::Slice name, int32 port, static Result<actor::ActorOwn<UdpServer>> create_via_tcp(td::Slice name, int32 port,
std::unique_ptr<Callback> callback); std::unique_ptr<Callback> callback);
static Result<actor::ActorOwn<UdpServer>> create_via_tunnel(td::Slice name, std::string global_config, std::string tunnel_config,
std::unique_ptr<TunnelCallback> callback,
td::Promise<td::IPAddress> on_ready);
}; };
} // namespace td } // namespace td

View file

@ -0,0 +1,111 @@
/* Code generated by cmd/cgo; DO NOT EDIT. */
/* package command-line-arguments */
#line 1 "cgo-builtin-export-prolog"
#include <stddef.h>
#ifndef GO_CGO_EXPORT_PROLOGUE_H
#define GO_CGO_EXPORT_PROLOGUE_H
#ifndef GO_CGO_GOSTRING_TYPEDEF
typedef struct { const char *p; ptrdiff_t n; } _GoString_;
#endif
#endif
/* Start of preamble from import "C" comments. */
#line 3 "lib.go"
#include <stdint.h>
#include <sys/socket.h>
typedef struct {
size_t index;
int ip;
int port;
} Tunnel;
// next - is pointer to class instance or callback to call method from node code
typedef void (*RecvCallback)(void* next, uint8_t* data, size_t num);
typedef void (*ReinitCallback)(void* next, struct sockaddr* data);
// we need it because we cannot call C func by pointer directly from go
static inline void on_recv_batch_ready(RecvCallback cb, void* next, void* data, size_t num) {
cb(next, (uint8_t*)data, num);
}
static inline void on_reinit(ReinitCallback cb, void* next, void* data) {
cb(next, (struct sockaddr*)data);
}
#line 1 "cgo-generated-wrapper"
/* End of preamble from import "C" comments. */
/* Start of boilerplate cgo prologue. */
#line 1 "cgo-gcc-export-header-prolog"
#ifndef GO_CGO_PROLOGUE_H
#define GO_CGO_PROLOGUE_H
typedef signed char GoInt8;
typedef unsigned char GoUint8;
typedef short GoInt16;
typedef unsigned short GoUint16;
typedef int GoInt32;
typedef unsigned int GoUint32;
typedef long long GoInt64;
typedef unsigned long long GoUint64;
typedef GoInt64 GoInt;
typedef GoUint64 GoUint;
typedef size_t GoUintptr;
typedef float GoFloat32;
typedef double GoFloat64;
#ifdef _MSC_VER
#include <complex.h>
typedef _Fcomplex GoComplex64;
typedef _Dcomplex GoComplex128;
#else
typedef float _Complex GoComplex64;
typedef double _Complex GoComplex128;
#endif
/*
static assertion to make sure the file is being used on architecture
at least with matching size of GoInt.
*/
typedef char _check_for_64_bit_pointer_matching_GoInt[sizeof(void*)==64/8 ? 1:-1];
#ifndef GO_CGO_GOSTRING_TYPEDEF
typedef _GoString_ GoString;
#endif
typedef void *GoMap;
typedef void *GoChan;
typedef struct { void *t; void *v; } GoInterface;
typedef struct { void *data; GoInt len; GoInt cap; } GoSlice;
#endif
/* End of boilerplate cgo prologue. */
#ifdef __cplusplus
extern "C" {
#endif
//goland:noinspection ALL
extern Tunnel PrepareTunnel(RecvCallback onRecv, ReinitCallback onReinit, void* nextOnRecv, void* nextOnReinit, char* configJson, int configJsonLen, char* networkConfigJson, int networkConfigJsonLen);
extern int WriteTunnel(size_t tunIdx, uint8_t* data, size_t num);
#ifdef __cplusplus
}
#endif

View file

@ -1276,6 +1276,9 @@ void ValidatorEngine::set_local_config(std::string str) {
void ValidatorEngine::set_global_config(std::string str) { void ValidatorEngine::set_global_config(std::string str) {
global_config_ = str; global_config_ = str;
} }
void ValidatorEngine::set_tunnel_config(std::string str) {
tunnel_config_ = str;
}
void ValidatorEngine::set_db_root(std::string db_root) { void ValidatorEngine::set_db_root(std::string db_root) {
db_root_ = db_root; db_root_ = db_root;
} }
@ -1849,6 +1852,66 @@ void ValidatorEngine::start_adnl() {
adnl_ = ton::adnl::Adnl::create(db_root_, keyring_.get()); adnl_ = ton::adnl::Adnl::create(db_root_, keyring_.get());
td::actor::send_closure(adnl_, &ton::adnl::Adnl::register_network_manager, adnl_network_manager_.get()); td::actor::send_closure(adnl_, &ton::adnl::Adnl::register_network_manager, adnl_network_manager_.get());
if (!tunnel_config_.empty()) {
auto on_tunnel_ready = td::PromiseCreator::lambda([SelfId = actor_id(this), this](td::Result<td::IPAddress> R) {
R.ensure();
auto addr = R.move_as_ok();
LOG(INFO) << "Tunnel ready, addr: " << addr;
add_addr(Config::Addr{}, Config::AddrCats{
.in_addr = addr,
.is_tunnel = true,
.cats = {0, 1, 2, 3},
});
for (auto &adnl : config_.adnl_ids) {
add_adnl(adnl.first, adnl.second);
}
td::actor::send_closure(adnl_, &ton::adnl::Adnl::add_static_nodes_from_config, std::move(adnl_static_nodes_));
td::actor::send_closure(SelfId, &ValidatorEngine::started_adnl);
});
class Handler : public ton::adnl::AdnlNetworkManager::TunnelEventsHandler {
public:
Handler(ValidatorEngine *scheduler)
: validator_engine_(scheduler) {
}
private:
ValidatorEngine *validator_engine_;
void on_in_addr_update(td::IPAddress ip) override {
validator_engine_->scheduler_->run_in_context_external([&] {
LOG(INFO) << "[EVENT] Tunnel reinitialized, addr: " << ip;
validator_engine_->addr_lists_.clear();
validator_engine_->add_addr(Config::Addr{}, Config::AddrCats{
.in_addr = ip,
.is_tunnel = true,
.cats = {0, 1, 2, 3},
});
for (auto &adnl : validator_engine_->config_.adnl_ids) {
validator_engine_->add_adnl(adnl.first, adnl.second);
}
});
}
};
td::actor::send_closure(adnl_network_manager_, &ton::adnl::AdnlNetworkManager::install_tunnel_events_handler, std::make_unique<Handler>(this));
ton::adnl::AdnlCategoryMask cat_mask;
for (int i = 0; i <= 3; i++) {
cat_mask[i] = true;
}
td::actor::send_closure(adnl_network_manager_, &ton::adnl::AdnlNetworkManager::add_tunnel, global_config_, tunnel_config_,
std::move(cat_mask), 0, std::move(on_tunnel_ready), scheduler_);
return;
}
for (auto &addr : config_.addrs) { for (auto &addr : config_.addrs) {
add_addr(addr.first, addr.second); add_addr(addr.first, addr.second);
} }
@ -1868,13 +1931,16 @@ void ValidatorEngine::add_addr(const Config::Addr &addr, const Config::AddrCats
for (auto cat : cats.priority_cats) { for (auto cat : cats.priority_cats) {
cat_mask[cat] = true; cat_mask[cat] = true;
} }
if (!cats.proxy) {
td::actor::send_closure(adnl_network_manager_, &ton::adnl::AdnlNetworkManager::add_self_addr, addr.addr, if (!cats.is_tunnel) {
std::move(cat_mask), cats.cats.size() ? 0 : 1); if (!cats.proxy) {
} else { td::actor::send_closure(adnl_network_manager_, &ton::adnl::AdnlNetworkManager::add_self_addr, addr.addr,
td::actor::send_closure(adnl_network_manager_, &ton::adnl::AdnlNetworkManager::add_proxy_addr, cats.in_addr, std::move(cat_mask), cats.cats.size() ? 0 : 1);
static_cast<td::uint16>(addr.addr.get_port()), cats.proxy, std::move(cat_mask), } else {
cats.cats.size() ? 0 : 1); td::actor::send_closure(adnl_network_manager_, &ton::adnl::AdnlNetworkManager::add_proxy_addr, cats.in_addr,
static_cast<td::uint16>(addr.addr.get_port()), cats.proxy, std::move(cat_mask),
cats.cats.size() ? 0 : 1);
}
} }
td::uint32 ts = static_cast<td::uint32>(td::Clocks::system()); td::uint32 ts = static_cast<td::uint32>(td::Clocks::system());
@ -4311,6 +4377,10 @@ int main(int argc, char *argv[]) {
acts.push_back( acts.push_back(
[&x, fname = fname.str()]() { td::actor::send_closure(x, &ValidatorEngine::set_local_config, fname); }); [&x, fname = fname.str()]() { td::actor::send_closure(x, &ValidatorEngine::set_local_config, fname); });
}); });
p.add_option('\0', "tunnel-config", "file to read tunnel config", [&](td::Slice fname) {
acts.push_back(
[&x, fname = fname.str()]() { td::actor::send_closure(x, &ValidatorEngine::set_tunnel_config, fname); });
});
p.add_checked_option('I', "ip", "ip:port of instance", [&](td::Slice arg) { p.add_checked_option('I', "ip", "ip:port of instance", [&](td::Slice arg) {
td::IPAddress addr; td::IPAddress addr;
TRY_STATUS(addr.init_host_port(arg.str())); TRY_STATUS(addr.init_host_port(arg.str()));
@ -4617,7 +4687,7 @@ int main(int argc, char *argv[]) {
scheduler.run_in_context([&] { scheduler.run_in_context([&] {
vm::init_vm().ensure(); vm::init_vm().ensure();
x = td::actor::create_actor<ValidatorEngine>("validator-engine"); x = td::actor::create_actor<ValidatorEngine>("validator-engine", &scheduler);
for (auto &act : acts) { for (auto &act : acts) {
act(); act();
} }

View file

@ -57,6 +57,7 @@ struct Config {
}; };
struct AddrCats { struct AddrCats {
td::IPAddress in_addr; td::IPAddress in_addr;
bool is_tunnel;
std::shared_ptr<ton::adnl::AdnlProxy> proxy; std::shared_ptr<ton::adnl::AdnlProxy> proxy;
std::set<AdnlCategory> cats; std::set<AdnlCategory> cats;
std::set<AdnlCategory> priority_cats; std::set<AdnlCategory> priority_cats;
@ -140,6 +141,8 @@ struct Config {
class ValidatorEngine : public td::actor::Actor { class ValidatorEngine : public td::actor::Actor {
private: private:
td::actor::Scheduler* scheduler_;
td::actor::ActorOwn<ton::keyring::Keyring> keyring_; td::actor::ActorOwn<ton::keyring::Keyring> keyring_;
td::actor::ActorOwn<ton::adnl::AdnlNetworkManager> adnl_network_manager_; td::actor::ActorOwn<ton::adnl::AdnlNetworkManager> adnl_network_manager_;
td::actor::ActorOwn<ton::adnl::Adnl> adnl_; td::actor::ActorOwn<ton::adnl::Adnl> adnl_;
@ -156,6 +159,7 @@ class ValidatorEngine : public td::actor::Actor {
std::string local_config_ = ""; std::string local_config_ = "";
std::string global_config_ = "ton-global.config"; std::string global_config_ = "ton-global.config";
std::string tunnel_config_ = "";
std::string config_file_; std::string config_file_;
std::string temp_config_file() const { std::string temp_config_file() const {
return config_file_ + ".tmp"; return config_file_ + ".tmp";
@ -249,6 +253,7 @@ class ValidatorEngine : public td::actor::Actor {
} }
void set_local_config(std::string str); void set_local_config(std::string str);
void set_global_config(std::string str); void set_global_config(std::string str);
void set_tunnel_config(std::string str);
void set_fift_dir(std::string str) { void set_fift_dir(std::string str) {
fift_dir_ = str; fift_dir_ = str;
} }
@ -343,7 +348,7 @@ class ValidatorEngine : public td::actor::Actor {
} }
void start_up() override; void start_up() override;
ValidatorEngine() { explicit ValidatorEngine(td::actor::Scheduler* scheduler): scheduler_(scheduler) {
} }
// load config // load config