From 29f3c50237b8ab590f6492a29394b56a2f17a778 Mon Sep 17 00:00:00 2001 From: Jop Zitman Date: Tue, 10 Dec 2024 14:40:14 +0800 Subject: [PATCH] Initial version of slipstream DNS tunnel --- .gitignore | 4 + .gitmodules | 9 + CMakeLists.txt | 58 ++ README.md | 1 + certs/cert.pem | 19 + certs/key.pem | 28 + extern/SPCDNS | 1 + extern/lua-resty-base-encoding | 1 + extern/picoquic | 1 + include/lua-resty-base-encoding-base32.h | 9 + include/slipstream.h | 32 + include/slipstream_inline_dots.h | 9 + .../slipstream_server_circular_query_buffer.h | 20 + src/slipstream.c | 63 ++ src/slipstream_client.c | 679 ++++++++++++++++++ src/slipstream_inline_dots.c | 49 ++ src/slipstream_server.c | 561 +++++++++++++++ src/slipstream_server_circular_query_buffer.c | 42 ++ 18 files changed, 1586 insertions(+) create mode 100644 .gitignore create mode 100644 .gitmodules create mode 100644 CMakeLists.txt create mode 100644 README.md create mode 100644 certs/cert.pem create mode 100644 certs/key.pem create mode 160000 extern/SPCDNS create mode 160000 extern/lua-resty-base-encoding create mode 160000 extern/picoquic create mode 100644 include/lua-resty-base-encoding-base32.h create mode 100644 include/slipstream.h create mode 100644 include/slipstream_inline_dots.h create mode 100644 include/slipstream_server_circular_query_buffer.h create mode 100644 src/slipstream.c create mode 100644 src/slipstream_client.c create mode 100644 src/slipstream_inline_dots.c create mode 100644 src/slipstream_server.c create mode 100644 src/slipstream_server_circular_query_buffer.c diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a3b8768 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea/ +cmake-build-debug/ +cmake-build-release/ +qlog/ diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..f305805 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,9 @@ +[submodule "extern/picoquic"] + path = extern/picoquic + url = git@github.com:EndPositive/picoquic.git +[submodule "extern/SPCDNS"] + path = extern/SPCDNS + url = git@github.com:spc476/SPCDNS.git +[submodule "extern/lua-resty-base-encoding"] + path = extern/lua-resty-base-encoding + url = git@github.com:spacewander/lua-resty-base-encoding.git diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..0bb1d0f --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,58 @@ +cmake_minimum_required(VERSION 3.30) +project(slipstream C) + +set(CMAKE_C_STANDARD 23) + +find_package(Git QUIET) +if(GIT_FOUND AND EXISTS "${PROJECT_SOURCE_DIR}/.git") + option(GIT_SUBMODULE "Check submodules during build" ON) + if(GIT_SUBMODULE) + execute_process(COMMAND ${GIT_EXECUTABLE} submodule update --init --recursive + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + RESULT_VARIABLE GIT_SUBMOD_RESULT) + if(NOT GIT_SUBMOD_RESULT EQUAL "0") + message(FATAL_ERROR "git submodule update --init --recursive failed") + endif() + endif() +endif() + +set(PICOQUIC_FETCH_PTLS Y) +add_subdirectory(extern/picoquic) + +if(CMAKE_BUILD_TYPE STREQUAL "Debug") + list(APPEND PICOQUIC_ADDITIONAL_C_FLAGS -g -Og) +else() + list(APPEND PICOQUIC_ADDITIONAL_C_FLAGS -O3) +endif () + +add_executable(slipstream + src/slipstream.c + src/slipstream_client.c + src/slipstream_server.c + src/slipstream_server_circular_query_buffer.c + src/slipstream_inline_dots.c + include/slipstream.h + include/slipstream_server_circular_query_buffer.h + include/slipstream_inline_dots.h + + extern/lua-resty-base-encoding/b32_data.h + extern/lua-resty-base-encoding/base32.c + extern/lua-resty-base-encoding/modp_stdint.h + include/lua-resty-base-encoding-base32.h + + extern/SPCDNS/src/dns.h + extern/SPCDNS/src/codec.c + extern/SPCDNS/src/mappings.c + extern/SPCDNS/src/mappings.h + extern/SPCDNS/src/netsimple.c + extern/SPCDNS/src/netsimple.h + extern/SPCDNS/src/output.c + extern/SPCDNS/src/output.h +) +target_link_libraries(slipstream PRIVATE m) +target_link_libraries(slipstream PRIVATE picoquic-log picoquic-core) + +target_include_directories(slipstream PRIVATE include) +target_include_directories(slipstream PRIVATE extern) + +set_picoquic_compile_settings(slipstream) diff --git a/README.md b/README.md new file mode 100644 index 0000000..f929cca --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# Slipstream diff --git a/certs/cert.pem b/certs/cert.pem new file mode 100644 index 0000000..f94c873 --- /dev/null +++ b/certs/cert.pem @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDKzCCAhOgAwIBAgIBADANBgkqhkiG9w0BAQsFADAaMRgwFgYDVQQDEw9waWNv +dGxzIHRlc3QgY2EwHhcNMTgwMjIzMDIzODEyWhcNMjgwMjIxMDIzODEyWjAbMRkw +FwYDVQQDExB0ZXN0LmV4YW1wbGUuY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEA5soWzSG7iyawQlHM1yaX2dUAATUkhpbg2WPFOEem7E3zYzc6A/Z+ +bViFlfEgL37cbDUb4pnOAHrrsjGgkyBYh5i9iCTVfCk+H6SOHZJORO1Tq8X9C7Wc +NcshpSdm2Pa8hmv9hsHbLSeoPNeg8NkTPwMVaMZ2GpdmiyAmhzSZ2H9mzNI7ntPW +/XCchVf+ax2yt9haZ+mQE2NPYwHDjqCtdGkP5ZXXnYhJSBzSEhxfGckIiKDyOxiN +kLFLvUdT4ERSFBjauP2cSI0XoOUsiBxJNwHH310AU8jZbveSTcXGYgEuu2MIuDo7 +Vhkq5+TCqXsIFNbjy0taOoPRvUbPsbqFlQIDAQABo3sweTAJBgNVHRMEAjAAMCwG +CWCGSAGG+EIBDQQfFh1PcGVuU1NMIEdlbmVyYXRlZCBDZXJ0aWZpY2F0ZTAdBgNV +HQ4EFgQUE1vXDjBT8j2etP4brfHQ9DeKnpgwHwYDVR0jBBgwFoAUv3nKl7JgeCCW +qkZXnN+nsiP1JWMwDQYJKoZIhvcNAQELBQADggEBAKwARsxOCiGPXU1xhvs+pq9I +63mLi4rfnssOGzGnnAfuEaxggpozf3fOSgfyTaDbACdRPTZEStjQ5HMCcHvY7CH0 +8EYA+lkmFbuXXL8uHby1JBTzbTGf8pkRUsuF/Ie0SLChoDgt8oF3mY5pyU4HUaAw +Zp6HBpIRMdmbwGcwm25bl9MQYTrTX3dBfp3XPzfXbVwjJ7bsiTwAGq+dKwzwOQeM +2ZMZt4BQBoevsNopPrqG0S6kGUmJOIax0t13bKwDj21+Hp/O90HTFVCtAaDxRC56 +k0O8Q62ZxzjGJ7Zw6K3azXlH/BYE+CajxTUF+FKRRkkWL1GrFVUsYd9KLDAVry0= +-----END CERTIFICATE----- diff --git a/certs/key.pem b/certs/key.pem new file mode 100644 index 0000000..b46db68 --- /dev/null +++ b/certs/key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDmyhbNIbuLJrBC +UczXJpfZ1QABNSSGluDZY8U4R6bsTfNjNzoD9n5tWIWV8SAvftxsNRvimc4Aeuuy +MaCTIFiHmL2IJNV8KT4fpI4dkk5E7VOrxf0LtZw1yyGlJ2bY9ryGa/2GwdstJ6g8 +16Dw2RM/AxVoxnYal2aLICaHNJnYf2bM0jue09b9cJyFV/5rHbK32Fpn6ZATY09j +AcOOoK10aQ/lldediElIHNISHF8ZyQiIoPI7GI2QsUu9R1PgRFIUGNq4/ZxIjReg +5SyIHEk3AcffXQBTyNlu95JNxcZiAS67Ywi4OjtWGSrn5MKpewgU1uPLS1o6g9G9 +Rs+xuoWVAgMBAAECggEBAJZxS/XCNH/b43AH5LCnbrtH1u3yl3HIrp/nIquyQYSu +t6aIXKAysW1UFBiPCz0KxGMhJ6FKQ3gaqMQLB7KAllUl4v75i9SZCe8UlLOAKNdT +oYRK1s4oP8DtPmxrR+bMyE4T3TtX6SkBPfETWs1Fo/8iYnVfUaO559VvSs4+Ir91 +6EVBZ+xSQ6+H4nc9TvrByd7HHMmDGFXF19eo1rtQOxbNMiZSNwch+eFdzUr+hIqT +oeMiZCL+gTiWCglQIQNpRurSdc2szQSZNO85DlNU/kt2z2nxQMRrnl5mWUZdqbTM +qOEMq2RD/lR+fGwC/Zp1jly4f5c3QTF8oPzAhnmog20CgYEA9FtAZlmQ/8c5gDhh +n0dgBRDAVL0hmtDxjsL9RsvTVeWLXecpqUfFd0bpm/pi/QUIkoDc33jmUSh/2vCu +LCG5lvKsDVNwqiT+zsdHwGFyybYPNEPk6ILccr5fL7p8Fox5t9+oE+nct+P+VI82 +3TD/wN2soFPD9wDLjSDSfXMS7TsCgYEA8clXgh9AGJuUKoOHvsV9BFQ1HiFVtuXT +Qa+KKCLDnreeTuzF2s4nV51PJM5QEe0zfOABP2xjPYJRe+zIQ+rMwMA1Yxv17ZH/ +81w/poziTtZBGfCRmSnRY4gf3lTajUwcJhzmZ4AdV/ZQyX7rjFTZl9jeuMyY8uNY +y3SEpiw6a28CgYAKUhBWQlIte2yiTb9RyuHzVNHKwnI457pMHVA1PUafyiIoxSqt +S6q7bvNO8zRbG2tRRMAPcDvKEbvUs3Wnx4TfK0C5D10i0o0wjpopNfRzMI1T18pD +R8On1QKQMYAsM6KwcXHX5Xi9C5QiXiojDX6/1p0D6IXOWOo/+7LoOYQDIQKBgDJY +KB5x/1igXHOVu5gfau6R0hWZ/0z8Acb1lCDTTEQqG453gqMSteJqYOZbBxUUfNoN +knTwTqGqFulk3jY2F7gyzWr7kXOMKO01UhON1jlwJ1INY2Ou72h4GZqjtHYjWOEe +t2LprDJ6mUu7X7RynnQdthJol5hLeluywUQQhYGFAoGAbfGV4PhKD2sbY5ZDPkiu +8sWD8fLthbK6yFV7PrWRl4sOe32Iza6bvqT/EU0hbRr54vZXFFMJtev3PzqGNz/o +J5IkcpgVCXjOxIBmRognT4duuEM0EBiH1vaZF1f44x1ntnkncaW3wQ0VGp7xGURI +ykArGNH50gPRuPACNWvYoKE= +-----END PRIVATE KEY----- diff --git a/extern/SPCDNS b/extern/SPCDNS new file mode 160000 index 0000000..d291537 --- /dev/null +++ b/extern/SPCDNS @@ -0,0 +1 @@ +Subproject commit d291537e1f66b4a7ede069b949a8db0f94c59246 diff --git a/extern/lua-resty-base-encoding b/extern/lua-resty-base-encoding new file mode 160000 index 0000000..87136c6 --- /dev/null +++ b/extern/lua-resty-base-encoding @@ -0,0 +1 @@ +Subproject commit 87136c61b6366a82b7c8881a4f44578e71f9b0af diff --git a/extern/picoquic b/extern/picoquic new file mode 160000 index 0000000..ad35d94 --- /dev/null +++ b/extern/picoquic @@ -0,0 +1 @@ +Subproject commit ad35d94082d6c631fab790e4e8f37787b2727cdc diff --git a/include/lua-resty-base-encoding-base32.h b/include/lua-resty-base-encoding-base32.h new file mode 100644 index 0000000..665760b --- /dev/null +++ b/include/lua-resty-base-encoding-base32.h @@ -0,0 +1,9 @@ +#ifndef LUA_RESTY_BASE_ENCODING_BASE32_H +#define LUA_RESTY_BASE_ENCODING_BASE32_H + +#include + +size_t b32_encode(char *dest, const char *src, size_t len, uint32_t no_padding, uint32_t hex); +size_t b32_decode(char *dest, const char *src, size_t len, uint32_t hex); + +#endif // LUA_RESTY_BASE_ENCODING_BASE32_H \ No newline at end of file diff --git a/include/slipstream.h b/include/slipstream.h new file mode 100644 index 0000000..63e0874 --- /dev/null +++ b/include/slipstream.h @@ -0,0 +1,32 @@ +#ifndef SLIPSTREAM_H +#define SLIPSTREAM_H +/* Header file for the picoquic sample project. + * It contains the definitions common to client and server */ + +#ifdef __cplusplus +extern "C" { +#endif + +#define SLIPSTREAM_ALPN "picoquic_sample" +#define SLIPSTREAM_SNI "test.example.com" + +#define SLIPSTREAM_NO_ERROR 0 +#define SLIPSTREAM_INTERNAL_ERROR 0x101 +#define SLIPSTREAM_FILE_CANCEL_ERROR 0x105 + +#define SLIPSTREAM_CLIENT_TICKET_STORE "sample_ticket_store.bin"; +#define SLIPSTREAM_CLIENT_TOKEN_STORE "sample_token_store.bin"; +#define SLIPSTREAM_QLOG_DIR "./qlog/"; + + + +int picoquic_slipstream_client(int listen_port, char const* server_name, int server_port); + +int picoquic_slipstream_server(int server_port, const char* pem_cert, const char* pem_key, char const* upstream_name, + int upstream_port); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/include/slipstream_inline_dots.h b/include/slipstream_inline_dots.h new file mode 100644 index 0000000..affc35a --- /dev/null +++ b/include/slipstream_inline_dots.h @@ -0,0 +1,9 @@ +#ifndef SLIPSTREAM_INLINE_DOTS_H +#define SLIPSTREAM_INLINE_DOTS_H +#include + +size_t slipstream_inline_dotify(char * __restrict__ buf, size_t buflen, size_t len); + +size_t slipstream_inline_undotify(char * __restrict__ buf, size_t len); + +#endif // SLIPSTREAM_INLINE_DOTS_H \ No newline at end of file diff --git a/include/slipstream_server_circular_query_buffer.h b/include/slipstream_server_circular_query_buffer.h new file mode 100644 index 0000000..9e08041 --- /dev/null +++ b/include/slipstream_server_circular_query_buffer.h @@ -0,0 +1,20 @@ +#ifndef SLIPSTREAM_SERVER_CIRCULAR_QUEUE_BUFFER_H +#define SLIPSTREAM_SERVER_CIRCULAR_QUEUE_BUFFER_H + +#define SIZE 4096 + +#include "SPCDNS/src/dns.h" + +typedef struct { + dns_decoded_t queries[SIZE][DNS_DECODEBUF_4K]; + size_t tail; + size_t head; +} circular_query_buffer_t; + +dns_decoded_t* circular_query_buffer_get_write_slot(circular_query_buffer_t* buf); + +dns_decoded_t* circular_query_buffer_get_read_slot(circular_query_buffer_t* buf); + +size_t circular_query_buffer_get_size(circular_query_buffer_t* buf); + +#endif // SLIPSTREAM_SERVER_CIRCULAR_QUEUE_BUFFER_H diff --git a/src/slipstream.c b/src/slipstream.c new file mode 100644 index 0000000..eb4c0a7 --- /dev/null +++ b/src/slipstream.c @@ -0,0 +1,63 @@ +#include +#include +#include +#include "slipstream.h" + +static void usage(char const * sample_name) +{ + fprintf(stderr, "Usage:\n"); + fprintf(stderr, " %s client listen_port slipstream_server_name slipstream_server_port\n", sample_name); + fprintf(stderr, " %s server listen_port cert key target_server_name target_server_port\n", sample_name); + exit(1); +} + +int get_port(char const* sample_name, char const* port_arg) +{ + int server_port = atoi(port_arg); + if (server_port <= 0) { + fprintf(stderr, "Invalid port: %s\n", port_arg); + usage(sample_name); + } + + return server_port; +} + +int main(int argc, char** argv) +{ + int exit_code = 0; +#ifdef _WINDOWS + WSADATA wsaData = { 0 }; + (void)WSA_START(MAKEWORD(2, 2), &wsaData); +#endif + + if (argc < 2) { + usage(argv[0]); + } + else if (strcmp(argv[1], "client") == 0) { + if (argc != 5) { + usage(argv[0]); + } + else { + int local_port = atoi(argv[2]); + char const* remote_ip = argv[3]; + int remote_port = atoi(argv[4]); + exit_code = picoquic_slipstream_client(local_port, remote_ip, remote_port); + } + } + else if (strcmp(argv[1], "server") == 0) { + if (argc != 7) { + usage(argv[0]); + } + else { + int server_port = get_port(argv[0], argv[2]); + int remote_port = get_port(argv[0], argv[6]); + exit_code = picoquic_slipstream_server(server_port, argv[3], argv[4], argv[5], remote_port); + } + } + else + { + usage(argv[0]); + } + + exit(exit_code); +} \ No newline at end of file diff --git a/src/slipstream_client.c b/src/slipstream_client.c new file mode 100644 index 0000000..84f30f9 --- /dev/null +++ b/src/slipstream_client.c @@ -0,0 +1,679 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "picoquic_config.h" +#include "picoquic_packet_loop.h" +#include "slipstream.h" +#include "slipstream_inline_dots.h" + +#include "lua-resty-base-encoding-base32.h" + +#include "SPCDNS/src/dns.h" +#include "SPCDNS/src/mappings.h" + +ssize_t client_encode_segment(dns_packet_t* packet, size_t* packet_len, const unsigned char* src_buf, size_t src_buf_len) { + edns0_opt_t opt; + dns_answer_t edns; + + char name[255]; + const size_t len = b32_encode(&name[0], (const char*) src_buf, src_buf_len, true, false); + const size_t encoded_len = slipstream_inline_dotify(name, 255, len); + name[encoded_len] = '.'; + + const char* tld = "test.com."; + const size_t tld_len = strlen(tld); + memcpy(&name[encoded_len + 1], tld, tld_len); + name[encoded_len + 1 + tld_len] = '\0'; + + dns_question_t domain; + domain.name = name; + domain.type = RR_TXT; + domain.class = CLASS_IN; + + dns_query_t query = {0}; + query.id = rand() % UINT16_MAX; + query.query = true; + query.opcode = OP_QUERY; + query.rd = true; + query.rcode = RCODE_OKAY; + query.qdcount = 1; + query.questions = &domain; + query.arcount = 0; // TODO: set to 1 for EDNS0 + query.additional = NULL; + + // TODO: add EDNS0 + + const dns_rcode_t rc = dns_encode(packet, packet_len, &query); + if (rc != RCODE_OKAY) { + fprintf(stderr, "dns_encode() = (%d) %s\n", rc, dns_rcode_text(rc)); + return -1; + } + + return 0; +} + +ssize_t client_encode(unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len) { + // optimize path for single segment + if (src_buf_len <= *segment_len) { + size_t packet_len = MAX_DNS_QUERY_SIZE; + unsigned char* packet = malloc(packet_len); + const ssize_t ret = client_encode_segment((dns_packet_t*) packet, &packet_len, src_buf, src_buf_len); + if (ret < 0) { + free(packet); + return -1; + } + + *dest_buf = packet; + *segment_len = packet_len; + return packet_len; + } + + size_t num_segments = src_buf_len / *segment_len; + unsigned char* packets = malloc(MAX_DNS_QUERY_SIZE * num_segments); + unsigned char* current_packet = packets; + + const unsigned char* segment = src_buf; + size_t first_packet_len = 0; + for (size_t i = 0; i < num_segments; i++) { + size_t packet_len = MAX_DNS_QUERY_SIZE; + const ssize_t ret = client_encode_segment((dns_packet_t*) current_packet, &packet_len, segment, *segment_len); + if (ret < 0) { + free(packets); + return -1; + } + + if (first_packet_len == 0) { + first_packet_len = packet_len; + } else { + if (packet_len > first_packet_len) { + DBG_PRINTF("current encoded segment length %d > %d than first segment\n", packet_len, first_packet_len); + free(packets); + return -1; + } + } + + current_packet += packet_len; + segment += *segment_len; + } + + *dest_buf = packets; + *segment_len = first_packet_len; + + + return current_packet - packets; +} + +ssize_t client_decode(const unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* from, struct sockaddr_storage* dest) { + *dest_buf = NULL; + + size_t bufsize = DNS_DECODEBUF_4K * sizeof(dns_decoded_t); + dns_decoded_t* decoded = malloc(bufsize); + const dns_rcode_t rc = dns_decode(decoded, &bufsize, (const dns_packet_t*) src_buf, src_buf_len); + if (rc != RCODE_OKAY) { + fprintf(stderr, "dns_decode() = (%d) %s\n", rc, dns_rcode_text(rc)); + return -1; + } + + const dns_query_t *query = (dns_query_t *)decoded; + + if (query->query == 1) { + fprintf(stderr, "dns record is not a response\n"); + return -1; + } + + if (query->ancount != 1) { + fprintf(stderr, "dns record should contain exactly one answer\n"); + return -1; + } + + dns_txt_t *answer_txt = (dns_txt_t*) &query->answers[0]; + if (answer_txt->type != RR_TXT) { + fprintf(stderr, "answer type is not TXT\n"); + return -1; + } + + *dest_buf = malloc(answer_txt->len); + memcpy((void*)*dest_buf, answer_txt->text, answer_txt->len); + + return answer_txt->len; +} + +typedef struct st_slipstream_client_stream_ctx_t { + struct st_slipstream_client_stream_ctx_t* next_stream; + int fd; + uint64_t stream_id; + volatile sig_atomic_t set_active; +} slipstream_client_stream_ctx_t; + +typedef struct st_slipstream_client_ctx_t { + picoquic_cnx_t* cnx; + slipstream_client_stream_ctx_t* first_stream; + slipstream_client_stream_ctx_t* last_stream; + picoquic_network_thread_ctx_t* thread_ctx; +} slipstream_client_ctx_t; + +slipstream_client_stream_ctx_t* slipstream_client_create_stream_ctx(picoquic_cnx_t* cnx, + slipstream_client_ctx_t* client_ctx, int sock_fd) { + slipstream_client_stream_ctx_t* stream_ctx = malloc(sizeof(slipstream_client_stream_ctx_t)); + + if (stream_ctx == NULL) { + fprintf(stdout, "Memory Error, cannot create stream for sock %d\n", sock_fd); + return NULL; + } + + memset(stream_ctx, 0, sizeof(slipstream_client_stream_ctx_t)); + if (client_ctx->first_stream == NULL) { + client_ctx->first_stream = stream_ctx; + client_ctx->last_stream = stream_ctx; + } + else { + client_ctx->last_stream->next_stream = stream_ctx; + client_ctx->last_stream = stream_ctx; + } + stream_ctx->fd = sock_fd; + stream_ctx->stream_id = picoquic_get_next_local_stream_id(client_ctx->cnx, 0); + + return stream_ctx; +} + +static void slipstream_client_free_context(slipstream_client_ctx_t* client_ctx) { + slipstream_client_stream_ctx_t* stream_ctx; + + while ((stream_ctx = client_ctx->first_stream) != NULL) { + client_ctx->first_stream = stream_ctx->next_stream; + if (stream_ctx->fd != 0) { + stream_ctx->fd = close(stream_ctx->fd); + } + free(stream_ctx); + } + client_ctx->last_stream = NULL; +} + +void slipstream_client_mark_active_pass(slipstream_client_ctx_t* client_ctx) { + slipstream_client_stream_ctx_t* stream_ctx = client_ctx->first_stream; + + while (stream_ctx != NULL) { + if (stream_ctx->set_active) { + stream_ctx->set_active = 0; + printf("[%lu:%d] activate: stream\n", stream_ctx->stream_id, stream_ctx->fd); + picoquic_mark_active_stream(client_ctx->cnx, stream_ctx->stream_id, 1, stream_ctx); + } + stream_ctx = stream_ctx->next_stream; + } +} + +int slipstream_client_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_loop_cb_enum cb_mode, + void* callback_ctx, void* callback_arg) { + slipstream_client_ctx_t* client_ctx = callback_ctx; + + switch (cb_mode) { + case picoquic_packet_loop_wake_up: + slipstream_client_mark_active_pass(client_ctx); + break; + case picoquic_packet_loop_after_send: + if (client_ctx->cnx->cnx_state == picoquic_state_disconnected) { + printf("Terminate packet loop\n"); + return PICOQUIC_NO_ERROR_TERMINATE_PACKET_LOOP; + } + default: + break; + } + + return 0; +} + +typedef struct st_slipstream_client_poller_args { + int fd; + picoquic_cnx_t* cnx; + slipstream_client_ctx_t* client_ctx; + slipstream_client_stream_ctx_t* stream_ctx; +} slipstream_client_poller_args; + +void* slipstream_client_poller(void* arg) { + slipstream_client_poller_args* args = arg; + + while (1) { + struct pollfd fds; + fds.fd = args->fd; + fds.events = POLLIN; + fds.revents = 0; + + /* add timeout handlilng */ + int ret = poll(&fds, 1, 1000); + if (ret < 0) { + perror("poll() failed"); + pthread_exit(NULL); + } + if (ret == 0) { + continue; + } + + args->stream_ctx->set_active = 1; + + ret = picoquic_wake_up_network_thread(args->client_ctx->thread_ctx); + if (ret != 0) { + fprintf(stderr, "poll: could not wake up network thread, ret = %d\n", ret); + } + + pthread_exit(NULL); + } +} + +typedef struct st_slipstream_client_accepter_args { + int fd; + picoquic_cnx_t* cnx; + slipstream_client_ctx_t* client_ctx; + slipstream_client_stream_ctx_t* stream_ctx; + picoquic_network_thread_ctx_t* thread_ctx; +} slipstream_client_accepter_args; + +void* slipstream_client_accepter(void* arg) { + slipstream_client_accepter_args* args = arg; + + while (1) { + // Accept incoming client connection + struct sockaddr_in client_addr; + socklen_t client_len = sizeof(client_addr); + int client_sock = accept(args->fd, (struct sockaddr*)&client_addr, &client_len); + if (client_sock < 0) { + if (errno == EINTR) { + continue; + } + perror("accept() failed"); + pthread_exit(NULL); + } + + slipstream_client_stream_ctx_t* stream_ctx = slipstream_client_create_stream_ctx(args->cnx, args->client_ctx, client_sock); + if (stream_ctx == NULL) { + fprintf(stderr, "Could not initiate stream for %d", client_sock); + pthread_exit(NULL); + } + + stream_ctx->set_active = 1; + + int ret = picoquic_wake_up_network_thread(args->thread_ctx); + if (ret != 0) { + fprintf(stderr, "accept: could not wake up network thread, ret = %d\n", ret); + pthread_exit(NULL); + } + + printf("[?:%d] accept: connection\n[?:%d] wakeup\n", client_sock, client_sock); + } +} + +int slipstream_client_callback(picoquic_cnx_t* cnx, + uint64_t stream_id, uint8_t* bytes, size_t length, + picoquic_call_back_event_t fin_or_event, void* callback_ctx, void* v_stream_ctx) { + int ret = 0; + slipstream_client_ctx_t* client_ctx = (slipstream_client_ctx_t*)callback_ctx; + slipstream_client_stream_ctx_t* stream_ctx = (slipstream_client_stream_ctx_t*)v_stream_ctx; + + if (client_ctx == NULL) { + /* This should never happen, because the callback context for the client is initialized + * when creating the client connection. */ + return -1; + } + + switch (fin_or_event) { + case picoquic_callback_stream_data: + case picoquic_callback_stream_fin: + /* Data arrival on stream #x, maybe with fin mark */ + if (stream_ctx == NULL) { + /* This is unexpected, as all contexts were declared when initializing the + * connection. */ + return 0; + } + + // printf("[%lu:%d] quic_recv->send %lu bytes\n", stream_id, stream_ctx->fd, length); + if (length > 0) { + ssize_t bytes_sent = send(stream_ctx->fd, bytes, length, MSG_NOSIGNAL); + if (bytes_sent < 0) { + if (errno == EPIPE) { + /* Connection closed */ + printf("[%lu:%d] send: closed stream\n", stream_id, stream_ctx->fd); + + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR); + return 0; + } + if (errno == EAGAIN) { + /* TODO: this is bad because we don't have a way to backpressure */ + } + + printf("[%lu:%d] send: error: %s (%d)\n", stream_id, stream_ctx->fd, strerror(errno), errno); + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); + return 0; + } + } + if (fin_or_event == picoquic_callback_stream_fin) { + printf("[%lu:%d] fin\n", stream_id, stream_ctx->fd); + /* Close the local_sock fd */ + close(stream_ctx->fd); + stream_ctx->fd = -1; + picoquic_unlink_app_stream_ctx(cnx, stream_id); + } + break; + case picoquic_callback_stop_sending: /* Should not happen, treated as reset */ + /* Mark stream as abandoned, close the file, etc. */ + picoquic_reset_stream(cnx, stream_id, 0); + /* Fall through */ + case picoquic_callback_stream_reset: /* Server reset stream #x */ + if (stream_ctx == NULL) { + /* This is unexpected, as all contexts were declared when initializing the + * connection. */ + } + else { + printf("[%lu:%d] stream reset\n", stream_id, stream_ctx->fd); + + /* Close the local_sock fd */ + stream_ctx->fd = close(stream_ctx->fd); + } + break; + case picoquic_callback_stateless_reset: + case picoquic_callback_close: /* Received connection close */ + case picoquic_callback_application_close: /* Received application close */ + printf("Connection closed (%d).\n", fin_or_event); + /* Delete the server application context */ + slipstream_client_free_context(client_ctx); + /* Remove the application callback */ + picoquic_set_callback(cnx, NULL, NULL); + picoquic_delete_cnx(cnx); + picoquic_close(cnx, 0); + break; + case picoquic_callback_prepare_to_send: + /* Active sending API */ + if (stream_ctx == NULL) { + /* This should never happen */ + } + else { + int length_available; + ret = ioctl(stream_ctx->fd, FIONREAD, &length_available); + // printf("[%lu:%d] recv->quic_send (available %d)\n", stream_id, stream_ctx->fd, length_available); + if (ret < 0) { + printf("[%lu:%d] ioctl error: %s (%d)\n", stream_id, stream_ctx->fd, strerror(errno), errno); + /* TODO: why would it return an error? */ + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); + break; + } + ret = 0; + + int length_to_read = MIN(length, length_available); + if (length_to_read == 0) { + char a; + ssize_t bytes_read = recv(stream_ctx->fd, &a, 1, MSG_PEEK | MSG_DONTWAIT); + // printf("[%lu:%d] recv->quic_send empty read %d bytes\n", stream_id, stream_ctx->fd, bytes_read); + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // printf("[%lu:%d] recv->quic_send empty errno set: %s\n", stream_id, stream_ctx->fd, strerror(errno)); + /* No bytes available, wait for next event */ + (void)picoquic_provide_stream_data_buffer(bytes, 0, 0, 0); + printf("[%lu:%d] recv->quic_send: empty, disactivate\n\n", stream_id, stream_ctx->fd); + + slipstream_client_poller_args* args = malloc(sizeof(slipstream_client_poller_args)); + args->fd = stream_ctx->fd; + args->cnx = cnx; + args->client_ctx = client_ctx; + args->stream_ctx = stream_ctx; + + pthread_t thread; + if (pthread_create(&thread, NULL, slipstream_client_poller, args) != 0) { + perror("pthread_create() failed for thread1"); + free(args); + } + pthread_setname_np(thread, "slipstream_server_poller"); + pthread_detach(thread); + } + if (bytes_read == 0) { + printf("[%lu:%d] recv: closed stream\n", stream_id, stream_ctx->fd); + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR); + return 0; + } + if (bytes_read > 0) { + /* send it in next loop iteration */ + (void)picoquic_provide_stream_data_buffer(bytes, 0, 0, 1); + break; + } + return 0; + } + + uint8_t* buffer = picoquic_provide_stream_data_buffer(bytes, length_to_read, 0, 1); + if (buffer == NULL) { + /* Should never happen according to callback spec. */ + break; + } + // printf("[%lu:%d] recv->quic_send recv %d bytes into quic\n", stream_id, stream_ctx->fd, length_to_read); + ssize_t bytes_read = recv(stream_ctx->fd, buffer, length_to_read, MSG_DONTWAIT); + // printf("[%lu:%d] recv->quic_send recv done %d bytes into quic\n", stream_id, stream_ctx->fd, bytes_read); + if (bytes_read == 0) { + printf("Closed connection on sock %d on recv", stream_ctx->fd); + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR); + return 0; + } + if (bytes_read < 0) { + fprintf(stderr, "recv: %s (%d)\n", strerror(errno), errno); + /* There should be bytes available, so a return value of 0 is an error */ + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); + return 0; + } + } + break; + case picoquic_callback_almost_ready: + fprintf(stdout, "Connection completed, almost ready.\n"); + break; + case picoquic_callback_ready: + fprintf(stdout, "Connection confirmed.\n"); + break; + default: + /* unexpected -- just ignore. */ + break; + } + + return ret; +} + +static int slipstream_connect(char const* server_name, int server_port, + picoquic_quic_t* quic, picoquic_cnx_t** cnx, + slipstream_client_ctx_t* client_ctx) { + int ret = 0; + char const* sni = SLIPSTREAM_SNI; + uint64_t current_time = picoquic_current_time(); + struct sockaddr_storage server_address; + + *cnx = NULL; + + /* Get the server's address */ + int is_name = 0; + ret = picoquic_get_server_address(server_name, server_port, &server_address, &is_name); + if (ret != 0) { + fprintf(stderr, "Cannot get the IP address for <%s> port <%d>", server_name, server_port); + return -1; + } + sni = server_name; + + /* Initialize the callback context and create the connection context. + * We use minimal options on the client side, keeping the transport + * parameter values set by default for picoquic. This could be fixed later. + */ + printf("Starting connection to %s, port %d\n", server_name, server_port); + + /* Create a client connection */ + *cnx = picoquic_create_cnx(quic, picoquic_null_connection_id, picoquic_null_connection_id, + (struct sockaddr*)&server_address, current_time, 0, sni, SLIPSTREAM_ALPN, 1); + if (*cnx == NULL) { + fprintf(stderr, "Could not create connection context\n"); + return -1; + } + + /* Document connection in client's context */ + client_ctx->cnx = *cnx; + /* Set the client callback context */ + picoquic_set_callback(*cnx, slipstream_client_callback, client_ctx); + /* Client connection parameters could be set here, before starting the connection. */ + ret = picoquic_start_client_cnx(*cnx); + if (ret < 0) { + fprintf(stderr, "Could not activate connection\n"); + return -1; + } + + /* Printing out the initial CID, which is used to identify log files */ + picoquic_connection_id_t icid = picoquic_get_initial_cnxid(*cnx); + printf("Initial connection ID: "); + for (uint8_t i = 0; i < icid.id_len; i++) { + printf("%02x", icid.id[i]); + } + printf("\n"); + + return ret; +} + +void client_sighandler(int signum) { + printf("Signal %d received\n", signum); +} + +int picoquic_slipstream_client(int listen_port, char const* server_name, int server_port) { + /* Start: start the QUIC process */ + int ret = 0; + picoquic_quic_t* quic = NULL; + uint64_t current_time = 0; + picoquic_cnx_t* cnx = NULL; + slipstream_client_ctx_t client_ctx = {0}; + char const* ticket_store_filename = SLIPSTREAM_CLIENT_TICKET_STORE; + char const* token_store_filename = SLIPSTREAM_CLIENT_TOKEN_STORE; + + int mtu = 146; + // int mtu = 129; + + /* Create config */ + picoquic_quic_config_t config; + picoquic_config_init(&config); + config.nb_connections = 8; + // config.log_file = "-"; +#ifndef DISABLE_DEBUG_PRINTF + config.qlog_dir = SLIPSTREAM_QLOG_DIR; +#endif + config.server_port = server_port; + config.mtu_max = mtu; + config.initial_send_mtu_ipv4 = mtu; + config.initial_send_mtu_ipv6 = mtu; + config.cc_algo_id = "bbr1"; + config.multipath_option = 0; + config.use_long_log = 1; + config.do_preemptive_repeat = 1; + config.disable_port_blocking = 1; + config.enable_sslkeylog = 1; + config.alpn = SLIPSTREAM_ALPN; + config.token_file_name = token_store_filename; + + /* Create the QUIC context for the server */ + current_time = picoquic_current_time(); + /* Create QUIC context */ + quic = picoquic_create_and_configure(&config, slipstream_client_callback, &client_ctx, current_time, NULL); + if (quic == NULL) { + fprintf(stderr, "Could not create server context\n"); + return -1; + } + + picoquic_set_cookie_mode(quic, 2); + picoquic_set_qlog(quic, config.qlog_dir); + picoquic_set_key_log_file_from_env(quic);; + debug_printf_push_stream(stderr); + + ret = slipstream_connect(server_name, server_port, quic, &cnx, &client_ctx); + if (ret != 0) { + fprintf(stderr, "Could not connect to server\n"); + return -1; + } + + // Create listening socket + int listen_sock = socket(AF_INET, SOCK_STREAM, 0); + if (listen_sock < 0) { + perror("socket() failed"); + exit(EXIT_FAILURE); + } + + int optval = 1; + setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); + + struct sockaddr_in listen_addr = {0}; + listen_addr.sin_family = AF_INET; + listen_addr.sin_addr.s_addr = INADDR_ANY; + listen_addr.sin_port = htons(listen_port); + + if (bind(listen_sock, (struct sockaddr*)&listen_addr, sizeof(listen_addr)) < 0) { + perror("bind() failed"); + close(listen_sock); + exit(EXIT_FAILURE); + } + + if (listen(listen_sock, 5) < 0) { + perror("listen() failed"); + close(listen_sock); + exit(EXIT_FAILURE); + } + + printf("Listening on port %d...\n", listen_port); + + picoquic_packet_loop_param_t param = {0}; + param.local_af = AF_INET; + + // For loopback testing, we need to disable hardware GSO since packets on loopback never reach a hardware NIC + // $ ethtool -K lo tx-udp-segmentation off + // And ensure that gso is on + // $ ethtool -k lo | grep generic-segmentation-offload + // generic-segmentation-offload: on + param.do_not_use_gso = 0; + + param.is_client = 1; + param.decode = client_decode; + param.encode = client_encode; + + picoquic_network_thread_ctx_t thread_ctx = {0}; + thread_ctx.quic = quic; + thread_ctx.param = ¶m; + thread_ctx.loop_callback = slipstream_client_sockloop_callback; + thread_ctx.loop_callback_ctx = &client_ctx; + + /* Open the wake up pipe or event */ + picoquic_open_network_wake_up(&thread_ctx, &ret); + + client_ctx.thread_ctx = &thread_ctx; + + slipstream_client_accepter_args* args = malloc(sizeof(slipstream_client_accepter_args)); + args->fd = listen_sock; + args->cnx = cnx; + args->client_ctx = &client_ctx; + args->thread_ctx = &thread_ctx; + + pthread_t thread; + if (pthread_create(&thread, NULL, slipstream_client_accepter, args) != 0) { + perror("pthread_create() failed for thread"); + free(args); + } + + signal(SIGTERM, client_sighandler); + picoquic_packet_loop_v3(&thread_ctx); + ret = thread_ctx.return_code; + + /* Save tickets and tokens, and free the QUIC context */ + if (picoquic_save_session_tickets(quic, ticket_store_filename) != 0) { + fprintf(stderr, "Could not store the saved session tickets.\n"); + } + if (picoquic_save_retry_tokens(quic, token_store_filename) != 0) { + fprintf(stderr, "Could not save tokens to <%s>.\n", token_store_filename); + } + picoquic_free(quic); + + /* Free the Client context */ + slipstream_client_free_context(&client_ctx); + + return ret; +} diff --git a/src/slipstream_inline_dots.c b/src/slipstream_inline_dots.c new file mode 100644 index 0000000..83111e0 --- /dev/null +++ b/src/slipstream_inline_dots.c @@ -0,0 +1,49 @@ +#include "slipstream_inline_dots.h" + +size_t slipstream_inline_dotify(char * __restrict__ buf, size_t buflen, size_t len) { + size_t dots = len / 57; // Number of dots to insert + size_t new_len = len + dots; + + // Check if result would exceed buffer + if (new_len > buflen) { + return -1; // Error condition + } + + // Start from the end and work backwards + char *src = buf + len - 1; // Points to last char of original string + char *dst = buf + new_len - 1; // Points to where last char will end up + + // Avoid modulo operation in tight loop + size_t next_dot = len - (len % 57); + size_t current_pos = len; + + // Move characters right-to-left, inserting dots + while (current_pos > 0) { + if (current_pos == next_dot) { + *dst-- = '.'; + next_dot -= 57; + current_pos--; + continue; + } + *dst-- = *src--; + current_pos--; + } + + return new_len; +} + +size_t slipstream_inline_undotify(char * __restrict__ buf, size_t len) { + char *reader = buf; + char *writer = buf; + + // For ~255 byte buffer with dots every ~50 chars + // Simple loop is most efficient since dots are sparse + while (len--) { + char c = *reader++; + if (c != '.') { + *writer++ = c; + } + } + + return writer - buf; +} \ No newline at end of file diff --git a/src/slipstream_server.c b/src/slipstream_server.c new file mode 100644 index 0000000..e16ac5f --- /dev/null +++ b/src/slipstream_server.c @@ -0,0 +1,561 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "lua-resty-base-encoding-base32.h" +#include "picoquic_config.h" +#include "slipstream.h" +#include "slipstream_inline_dots.h" +#include "slipstream_server_circular_query_buffer.h" +#include "SPCDNS/src/dns.h" +#include "SPCDNS/src/mappings.h" + +circular_query_buffer_t server_cqb = {0}; + +ssize_t server_encode(unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_size) { + const dns_query_t *query = (dns_query_t *) circular_query_buffer_get_read_slot(&server_cqb); + if (query == NULL) { + fprintf(stderr, "no available DNS request to respond to\n"); + return -1; + } + + if (query->questions == NULL) { + fprintf(stderr, "no questions in DNS request\n"); + return -1; + } + + const dns_question_t *question = &query->questions[0]; // assuming server_decode ensures there is exactly one question + dns_txt_t answer_txt; + answer_txt.name = question->name; + answer_txt.type = question->type; + answer_txt.class = question->class; + answer_txt.ttl = 60; + answer_txt.text = (char *)src_buf; + answer_txt.len = src_buf_len; + + dns_query_t response = {0}; + response.id = query->id; + response.query = false; + response.opcode = OP_QUERY; + response.rd = true; + response.rcode = RCODE_OKAY; + response.qdcount = 1; + response.questions = query->questions; + response.ancount = 1; + response.answers = (dns_answer_t *)&answer_txt; + response.arcount = 0; // TODO: set to 1 for EDNS0 + response.additional = NULL; + + dns_packet_t* packet = malloc(MAX_UDP_PACKET_SIZE); + size_t packet_len = MAX_UDP_PACKET_SIZE; + dns_rcode_t rc = dns_encode(packet, &packet_len, &response); + if (rc != RCODE_OKAY) { + free(packet); + fprintf(stderr, "dns_encode() = (%d) %s\n", rc, dns_rcode_text(rc)); + return EXIT_FAILURE; + } + *dest_buf = (unsigned char*)packet; + + return packet_len; +} + +ssize_t server_decode(const unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *from, struct sockaddr_storage *dest) { + *dest_buf = NULL; + + dns_decoded_t* packet = circular_query_buffer_get_write_slot(&server_cqb); + size_t packet_len = DNS_DECODEBUF_4K * sizeof(dns_decoded_t); + const dns_rcode_t rc = dns_decode(packet, &packet_len, (const dns_packet_t*) src_buf, src_buf_len); + if (rc != RCODE_OKAY) { + fprintf(stderr, "dns_decode() = (%d) %s\n", rc, dns_rcode_text(rc)); + return -1; + } + + const dns_query_t *query = (dns_query_t*) packet; + + if (!query->query) { + fprintf(stderr, "dns record is not a query\n"); + return -1; + } + + if (query->qdcount != 1) { + fprintf(stderr, "dns record should contain exactly one query\n"); + return -1; + } + + const dns_question_t *question = &query->questions[0]; + if (question->type != RR_TXT) { + fprintf(stderr, "query type is not TXT\n"); + return -1; + } + + const char* tld = "test.com."; + const size_t data_len = strlen(question->name) - strlen(tld) - 1; + + // copy the subdomain from name to a new buffer + char data_buf[data_len]; + memcpy(data_buf, question->name, data_len); + const size_t encoded_len = slipstream_inline_undotify(data_buf, data_len); + + char* decoded_buf = malloc(encoded_len); + const size_t decoded_len = b32_decode(decoded_buf, data_buf, encoded_len, false); + if (decoded_len == (size_t) -1) { + free(decoded_buf); + fprintf(stderr, "error decoding base32: %lu\n", decoded_len); + } + + *dest_buf = decoded_buf; + + return decoded_len; +} + +typedef struct st_slipstream_server_stream_ctx_t { + struct st_slipstream_server_stream_ctx_t* next_stream; + struct st_slipstream_server_stream_ctx_t* previous_stream; + int fd; + uint64_t stream_id; + volatile sig_atomic_t set_active; +} slipstream_server_stream_ctx_t; + +typedef struct st_slipstream_server_ctx_t { + picoquic_cnx_t* cnx; + slipstream_server_stream_ctx_t* first_stream; + slipstream_server_stream_ctx_t* last_stream; + picoquic_network_thread_ctx_t* thread_ctx; + struct sockaddr_storage upstream_addr; + struct st_slipstream_server_ctx_t* prev_ctx; + struct st_slipstream_server_ctx_t* next_ctx; +} slipstream_server_ctx_t; + +slipstream_server_stream_ctx_t* slipstream_server_create_stream_ctx(slipstream_server_ctx_t* server_ctx, + uint64_t stream_id) { + slipstream_server_stream_ctx_t* stream_ctx = malloc(sizeof(slipstream_server_stream_ctx_t)); + + int sock_fd = socket(AF_INET, SOCK_STREAM, 0); + if (sock_fd < 0) { + perror("socket() failed"); + return NULL; + } + + if (connect(sock_fd, (struct sockaddr*)&server_ctx->upstream_addr, sizeof(server_ctx->upstream_addr)) < 0) { + perror("connect() failed"); + return NULL; + } + + if (stream_ctx != NULL) { + memset(stream_ctx, 0, sizeof(slipstream_server_stream_ctx_t)); + + if (server_ctx->last_stream == NULL) { + server_ctx->last_stream = stream_ctx; + server_ctx->first_stream = stream_ctx; + } + else { + stream_ctx->previous_stream = server_ctx->last_stream; + server_ctx->last_stream->next_stream = stream_ctx; + server_ctx->last_stream = stream_ctx; + } + stream_ctx->stream_id = stream_id; + + stream_ctx->fd = sock_fd; + } + + return stream_ctx; +} + +void slipstream_server_delete_stream_context(slipstream_server_ctx_t* server_ctx, + slipstream_server_stream_ctx_t* stream_ctx) { + /* Remove the context from the server's list */ + if (stream_ctx->previous_stream == NULL) { + server_ctx->first_stream = stream_ctx->next_stream; + } + else { + stream_ctx->previous_stream->next_stream = stream_ctx->next_stream; + } + + if (stream_ctx->next_stream == NULL) { + server_ctx->last_stream = stream_ctx->previous_stream; + } + else { + stream_ctx->next_stream->previous_stream = stream_ctx->previous_stream; + } + + /* release the memory */ + free(stream_ctx); +} + +void slipstream_server_delete_context(slipstream_server_ctx_t* server_ctx) { + /* Delete any remaining stream context */ + while (server_ctx->first_stream != NULL) { + slipstream_server_delete_stream_context(server_ctx, server_ctx->first_stream); + } + + /* release the memory */ + free(server_ctx); +} + +void slipstream_server_mark_active_pass(slipstream_server_ctx_t* server_ctx) { + slipstream_server_stream_ctx_t* stream_ctx = server_ctx->first_stream; + + while (stream_ctx != NULL) { + if (stream_ctx->set_active) { + stream_ctx->set_active = 0; + printf("[%lu:%d] activate: stream\n", stream_ctx->stream_id, stream_ctx->fd); + picoquic_mark_active_stream(server_ctx->cnx, stream_ctx->stream_id, 1, stream_ctx); + } + stream_ctx = stream_ctx->next_stream; + } +} + +int slipstream_server_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_loop_cb_enum cb_mode, + void* callback_ctx, void* callback_arg) { + slipstream_server_ctx_t* server_ctx = callback_ctx; + + switch (cb_mode) { + case picoquic_packet_loop_wake_up: + if (callback_ctx == NULL) { + return 0; + } + + while (server_ctx->next_ctx != NULL) { + /* skip default ctx */ + server_ctx = server_ctx->next_ctx; + slipstream_server_mark_active_pass(server_ctx); + } + + break; + default: + break; + } + + return 0; +} + +typedef struct st_slipstream_server_poller_args { + int fd; + picoquic_cnx_t* cnx; + slipstream_server_ctx_t* client_ctx; + slipstream_server_stream_ctx_t* stream_ctx; +} slipstream_server_poller_args; + +void* slipstream_server_poller(void* arg) { + slipstream_server_poller_args* args = arg; + + while (1) { + struct pollfd fds; + fds.fd = args->fd; + fds.events = POLLIN; + fds.revents = 0; + + /* add timeout handlilng */ + int ret = poll(&fds, 1, 1000); + if (ret < 0) { + perror("poll() failed"); + pthread_exit(NULL); + } + if (ret == 0) { + continue; + } + + args->stream_ctx->set_active = 1; + + ret = picoquic_wake_up_network_thread(args->client_ctx->thread_ctx); + if (ret != 0) { + fprintf(stderr, "poll: could not wake up network thread, ret = %d\n", ret); + } + printf("[%lu:%d] wakeup\n", args->stream_ctx->stream_id, args->fd); + + pthread_exit(NULL); + } +} + +int slipstream_server_callback(picoquic_cnx_t* cnx, + uint64_t stream_id, uint8_t* bytes, size_t length, + picoquic_call_back_event_t fin_or_event, void* callback_ctx, void* v_stream_ctx) { + int ret = 0; + slipstream_server_ctx_t* server_ctx = (slipstream_server_ctx_t*)callback_ctx; + slipstream_server_stream_ctx_t* stream_ctx = (slipstream_server_stream_ctx_t*)v_stream_ctx; + + /* If this is the first reference to the connection, the application context is set + * to the default value defined for the server. This default value contains the pointer + * to the file directory in which all files are defined. + */ + if (callback_ctx == NULL || callback_ctx == picoquic_get_default_callback_context(picoquic_get_quic_ctx(cnx))) { + server_ctx = (slipstream_server_ctx_t*)malloc(sizeof(slipstream_server_ctx_t)); + if (server_ctx == NULL) { + /* cannot handle the connection */ + picoquic_close(cnx, PICOQUIC_ERROR_MEMORY); + return -1; + } + slipstream_server_ctx_t* d_ctx = picoquic_get_default_callback_context(picoquic_get_quic_ctx(cnx)); + if (d_ctx != NULL) { + memcpy(server_ctx, d_ctx, sizeof(slipstream_server_ctx_t)); + } + else { + /* This really is an error case: the default connection context should never be NULL */ + memset(server_ctx, 0, sizeof(slipstream_server_ctx_t)); + } + server_ctx->cnx = cnx; + picoquic_set_callback(cnx, slipstream_server_callback, server_ctx); + + if (d_ctx->next_ctx != NULL) { + d_ctx->next_ctx->prev_ctx = server_ctx; + } + server_ctx->next_ctx = d_ctx->next_ctx; + server_ctx->prev_ctx = d_ctx; + d_ctx->next_ctx = server_ctx; + + printf("Created ctx\n"); + } + + switch (fin_or_event) { + case picoquic_callback_stream_data: + case picoquic_callback_stream_fin: + /* Data arrival on stream #x, maybe with fin mark */ + if (stream_ctx == NULL) { + /* Create and initialize stream context */ + stream_ctx = slipstream_server_create_stream_ctx(server_ctx, stream_id); + if (stream_ctx == NULL || picoquic_set_app_stream_ctx(cnx, stream_id, stream_ctx) != 0) { + /* Internal error */ + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); + return 0; + } + printf("[%lu:%d] connected\n", stream_id, stream_ctx->fd); + picoquic_mark_active_stream(cnx, stream_id, 1, stream_ctx); + printf("[%lu:%d] marked active\n", stream_id, stream_ctx->fd); + } + + // printf("[%lu:%d] quic_recv->send %lu bytes\n", stream_id, stream_ctx->fd, length); + if (length > 0) { + ssize_t bytes_sent = send(stream_ctx->fd, bytes, length, MSG_NOSIGNAL); + if (bytes_sent < 0) { + if (errno == EPIPE) { + /* Connection closed */ + printf("[%lu:%d] send: closed stream\n", stream_id, stream_ctx->fd); + + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR); + return 0; + } + if (errno == EAGAIN) { + /* TODO: this is bad because we don't have a way to backpressure */ + } + + printf("[%lu:%d] send: error: %s (%d)\n", stream_id, stream_ctx->fd, strerror(errno), errno); + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); + return 0; + } + } + if (fin_or_event == picoquic_callback_stream_fin) { + printf("[%lu:%d] fin\n", stream_id, stream_ctx->fd); + /* Close the local_sock fd */ + close(stream_ctx->fd); + stream_ctx->fd = -1; + picoquic_unlink_app_stream_ctx(cnx, stream_id); + } + break; + case picoquic_callback_stop_sending: /* Should not happen, treated as reset */ + /* Mark stream as abandoned, close the file, etc. */ + picoquic_reset_stream(cnx, stream_id, 0); + /* Fall through */ + case picoquic_callback_stream_reset: /* Server reset stream #x */ + if (stream_ctx == NULL) { + /* This is unexpected, as all contexts were declared when initializing the + * connection. */ + } + else { + printf("[%lu:%d] stream reset\n", stream_id, stream_ctx->fd); + + /* Close the local_sock fd */ + stream_ctx->fd = close(stream_ctx->fd); + } + break; + case picoquic_callback_stateless_reset: + case picoquic_callback_close: /* Received connection close */ + case picoquic_callback_application_close: /* Received application close */ + printf("Connection closed.\n"); + /* Remove the application callback */ + picoquic_set_callback(cnx, NULL, NULL); + break; + case picoquic_callback_prepare_to_send: + /* Active sending API */ + if (stream_ctx == NULL) { + /* This should never happen */ + } + else { + int length_available; + ret = ioctl(stream_ctx->fd, FIONREAD, &length_available); + // printf("[%lu:%d] recv->quic_send (available %d)\n", stream_id, stream_ctx->fd, length_available); + if (ret < 0) { + printf("[%lu:%d] ioctl error: %s (%d)\n", stream_id, stream_ctx->fd, strerror(errno), errno); + /* TODO: why would it return an error? */ + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); + break; + } + ret = 0; + + int length_to_read = MIN(length, length_available); + if (length_to_read == 0) { + char a; + ssize_t bytes_read = recv(stream_ctx->fd, &a, 1, MSG_PEEK | MSG_DONTWAIT); + // printf("[%lu:%d] recv->quic_send empty read %d bytes\n", stream_id, stream_ctx->fd, bytes_read); + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // printf("[%lu:%d] recv->quic_send empty errno set: %s\n", stream_id, stream_ctx->fd, strerror(errno)); + /* No bytes available, wait for next event */ + (void)picoquic_provide_stream_data_buffer(bytes, 0, 0, 0); + printf("[%lu:%d] recv->quic_send: empty, disactivate\n\n", stream_id, stream_ctx->fd); + + slipstream_server_poller_args* args = malloc(sizeof(slipstream_server_poller_args)); + args->fd = stream_ctx->fd; + args->cnx = cnx; + args->client_ctx = server_ctx; + args->stream_ctx = stream_ctx; + + pthread_t thread; + if (pthread_create(&thread, NULL, slipstream_server_poller, args) != 0) { + perror("pthread_create() failed for thread1"); + free(args); + } + pthread_setname_np(thread, "slipstream_server_poller"); + pthread_detach(thread); + } + if (bytes_read == 0) { + printf("[%lu:%d] recv: closed stream\n", stream_id, stream_ctx->fd); + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR); + return 0; + } + if (bytes_read > 0) { + /* send it in next loop iteration */ + (void)picoquic_provide_stream_data_buffer(bytes, 0, 0, 1); + break; + } + return 0; + } + + uint8_t* buffer = picoquic_provide_stream_data_buffer(bytes, length_to_read, 0, 1); + if (buffer == NULL) { + /* Should never happen according to callback spec. */ + break; + } + // printf("[%lu:%d] recv->quic_send recv %d bytes into quic\n", stream_id, stream_ctx->fd, length_to_read); + ssize_t bytes_read = recv(stream_ctx->fd, buffer, length_to_read, MSG_DONTWAIT); + // printf("[%lu:%d] recv->quic_send recv done %d bytes into quic\n", stream_id, stream_ctx->fd, bytes_read); + if (bytes_read == 0) { + printf("Closed connection on sock %d on recv", stream_ctx->fd); + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR); + return 0; + } + if (bytes_read < 0) { + fprintf(stderr, "recv: %s (%d)\n", strerror(errno), errno); + /* There should be bytes available, so a return value of 0 is an error */ + (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); + return 0; + } + } + break; + case picoquic_callback_almost_ready: + fprintf(stdout, "Connection completed, almost ready.\n"); + break; + case picoquic_callback_ready: + fprintf(stdout, "Connection confirmed.\n"); + break; + default: + /* unexpected -- just ignore. */ + break; + } + + return ret; +} + +void server_sighandler(int signum) { + printf("Signal %d received\n", signum); +} + +int picoquic_slipstream_server(int server_port, const char* server_cert, const char* server_key, + char const* upstream_name, int upstream_port) { + /* Start: start the QUIC process with cert and key files */ + int ret = 0; + picoquic_quic_t* quic = NULL; + uint64_t current_time = 0; + slipstream_server_ctx_t default_context = {0}; + printf("Starting Picoquic Sample server on port %d\n", server_port); + + int is_name = 0; + picoquic_get_server_address(upstream_name, upstream_port, &default_context.upstream_addr, &is_name); + + // int mtu = 250; + int mtu = 900; + + /* Create config */ + picoquic_quic_config_t config; + picoquic_config_init(&config); + config.nb_connections = 8; + config.server_cert_file = server_cert; + config.server_key_file = server_key; + // config.log_file = "-"; +#ifndef DISABLE_DEBUG_PRINTF + config.qlog_dir = SLIPSTREAM_QLOG_DIR; +#endif + config.server_port = server_port; + config.mtu_max = mtu; + config.initial_send_mtu_ipv4 = mtu; + config.initial_send_mtu_ipv6 = mtu; + config.cc_algo_id = "bbr1"; + config.multipath_option = 0; + config.use_long_log = 1; + config.do_preemptive_repeat = 1; + config.disable_port_blocking = 1; + config.enable_sslkeylog = 1; + config.alpn = SLIPSTREAM_ALPN; + + + /* Create the QUIC context for the server */ + current_time = picoquic_current_time(); + /* Create QUIC context */ + quic = picoquic_create_and_configure(&config, slipstream_server_callback, &default_context, current_time, NULL); + if (quic == NULL) { + fprintf(stderr, "Could not create server context\n"); + return -1; + } + + picoquic_set_cookie_mode(quic, 2); + picoquic_set_qlog(quic, config.qlog_dir); + picoquic_set_key_log_file_from_env(quic);; + debug_printf_push_stream(stderr); + + picoquic_packet_loop_param_t param = {0}; + param.local_af = AF_INET; + param.local_port = server_port; + param.do_not_use_gso = 1; // can't use GSO since we're limited to responding to one DNS query at a time + param.is_client = 0; + param.decode = server_decode; + param.encode = server_encode; + + picoquic_network_thread_ctx_t thread_ctx = {0}; + thread_ctx.quic = quic; + thread_ctx.param = ¶m; + thread_ctx.loop_callback = slipstream_server_sockloop_callback; + thread_ctx.loop_callback_ctx = &default_context; + + /* Open the wake up pipe or event */ + picoquic_open_network_wake_up(&thread_ctx, &ret); + + default_context.thread_ctx = &thread_ctx; + + signal(SIGTERM, server_sighandler); + picoquic_packet_loop_v3(&thread_ctx); + ret = thread_ctx.return_code; + + /* And finish. */ + printf("Server exit, ret = %d\n", ret); + + picoquic_free(quic); + + return ret; +} diff --git a/src/slipstream_server_circular_query_buffer.c b/src/slipstream_server_circular_query_buffer.c new file mode 100644 index 0000000..e94984f --- /dev/null +++ b/src/slipstream_server_circular_query_buffer.c @@ -0,0 +1,42 @@ +#include +#include "slipstream_server_circular_query_buffer.h" + +#include "picoquic_utils.h" +#include "SPCDNS/src/dns.h" + + +// Get next available slot for writing +dns_decoded_t* circular_query_buffer_get_write_slot(circular_query_buffer_t* buf) { + dns_decoded_t* slot = buf->queries[buf->head]; + + // Move head forward + buf->head = (buf->head + 1) % SIZE; + + // If we've caught up to tail, move tail forward + if (buf->head == buf->tail) { + buf->tail = (buf->tail + 1) % SIZE; + } + + return slot; +} + +// Get next available item for reading +dns_decoded_t* circular_query_buffer_get_read_slot(circular_query_buffer_t* buf) { + // Check if buffer is empty + if (buf->tail == buf->head) { + return NULL; + } + + dns_decoded_t* slot = buf->queries[buf->tail]; + buf->tail = (buf->tail + 1) % SIZE; + + return slot; +} + +size_t circular_query_buffer_get_size(circular_query_buffer_t* buf) { + if (buf->head >= buf->tail) { + return buf->head - buf->tail; + } else { + return SIZE - buf->tail + buf->head; + } +} \ No newline at end of file