Support multiple simultaneous clients

* replace cqb with a dns request buffer with separate queues for each cnx id
* ensure we respond to the addr from the DNS request we popped from queue
This commit is contained in:
Jop Zitman 2024-12-17 17:15:32 +08:00
parent 0006c48e4b
commit 2038af95e8
10 changed files with 380 additions and 73 deletions

View file

@ -41,13 +41,15 @@ add_executable(slipstream
src/slipstream.c
src/slipstream_client.c
src/slipstream_server.c
src/slipstream_server_circular_query_buffer.c
src/slipstream_dns_request_buffer.c
src/slipstream_inline_dots.c
src/slipstream_resolver_addresses.c
src/slipstream_utils.c
include/slipstream.h
include/slipstream_server_circular_query_buffer.h
include/slipstream_dns_request_buffer.h
include/slipstream_inline_dots.h
include/slipstream_resolver_addresses.h
include/slipstream_utils.h
extern/lua-resty-base-encoding/b32_data.h
extern/lua-resty-base-encoding/base32.c

2
extern/picoquic vendored

@ -1 +1 @@
Subproject commit 191912cae6c5145e854dd519667523884d6e8f46
Subproject commit d2404ebedc9bff468eb005601e4ec91d7db2cb93

View file

@ -0,0 +1,55 @@
#ifndef SLIPSTREAM_DNS_REQUEST_BUFFER
#define SLIPSTREAM_DNS_REQUEST_BUFFER
#include <stdbool.h>
#include "SPCDNS/src/dns.h"
#define GLOBAL_BUFFER_SIZE 4096
typedef struct st_slipstream_cnxid_dns_request_buffer_t slipstream_cnxid_dns_request_buffer_t;
typedef struct st_element_t {
dns_decoded_t dns_decoded[DNS_DECODEBUF_4K];
struct sockaddr_storage peer_addr;
struct st_element_t* buffer_prev;
struct st_element_t* buffer_next;
struct st_element_t* cnxid_buffer_prev;
struct st_element_t* cnxid_buffer_next;
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer;
} slot_t;
typedef struct st_slipstream_cnxid_dns_request_buffer_t {
slot_t* head;
slot_t* tail;
} slipstream_cnxid_dns_request_buffer_t;
typedef struct {
slot_t elements[GLOBAL_BUFFER_SIZE];
slot_t* head;
slot_t* tail;
slot_t* free;
picohash_table* cnxid_to_cnxid_buffer;
} slipstream_dns_request_buffer_t;
typedef struct st_cnxid_to_cnxid_buffer_t {
picoquic_connection_id_t cnx_id;
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer;
} cnxid_to_cnxid_buffer_t;
void slipstream_dns_request_buffer_init(slipstream_dns_request_buffer_t* buffer);
slipstream_cnxid_dns_request_buffer_t* slipstream_dns_request_buffer_get_cnxid_buffer(
slipstream_dns_request_buffer_t* buffer, picoquic_connection_id_t* initial_cnxid, bool create);
slot_t* slipstream_dns_request_buffer_get_write_slot(slipstream_dns_request_buffer_t* buffer);
void slipstream_dns_request_buffer_commit_slot_to_cnxid_buffer(slipstream_dns_request_buffer_t* buffer,
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer,
slot_t* slot);
slot_t* slipstream_dns_request_buffer_get_read_slot(slipstream_dns_request_buffer_t* buffer,
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer);
#endif //SLIPSTREAM_DNS_REQUEST_BUFFER

View file

@ -1,20 +0,0 @@
#ifndef SLIPSTREAM_SERVER_CIRCULAR_QUEUE_BUFFER_H
#define SLIPSTREAM_SERVER_CIRCULAR_QUEUE_BUFFER_H
#define SIZE 4096
#include "SPCDNS/src/dns.h"
typedef struct {
dns_decoded_t queries[SIZE][DNS_DECODEBUF_4K];
size_t tail;
size_t head;
} circular_query_buffer_t;
dns_decoded_t* circular_query_buffer_get_write_slot(circular_query_buffer_t* buf);
dns_decoded_t* circular_query_buffer_get_read_slot(circular_query_buffer_t* buf);
size_t circular_query_buffer_get_size(circular_query_buffer_t* buf);
#endif // SLIPSTREAM_SERVER_CIRCULAR_QUEUE_BUFFER_H

View file

@ -0,0 +1,10 @@
#ifndef SLIPSTREAM_UTILS_H
#define SLIPSTREAM_UTILS_H
#include "picoquic.h"
char* picoquic_connection_id_to_string(const picoquic_connection_id_t* cid);
void sockaddr_dummy(struct sockaddr_storage *addr_storage);
#endif //SLIPSTREAM_UTILS_H

View file

@ -26,7 +26,7 @@
char* client_domain_name = NULL;
size_t client_domain_name_len = 0;
ssize_t client_encode_segment(dns_packet_t* packet, size_t* packet_len, const unsigned char* src_buf, size_t src_buf_len) {
ssize_t client_encode_segment(picoquic_quic_t* quic, dns_packet_t* packet, size_t* packet_len, const unsigned char* src_buf, size_t src_buf_len) {
edns0_opt_t opt;
dns_answer_t edns;
@ -66,12 +66,12 @@ ssize_t client_encode_segment(dns_packet_t* packet, size_t* packet_len, const un
return 0;
}
ssize_t client_encode(unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len) {
ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* last_cnx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr) {
// optimize path for single segment
if (src_buf_len <= *segment_len) {
size_t packet_len = MAX_DNS_QUERY_SIZE;
unsigned char* packet = malloc(packet_len);
const ssize_t ret = client_encode_segment((dns_packet_t*) packet, &packet_len, src_buf, src_buf_len);
const ssize_t ret = client_encode_segment(quic, (dns_packet_t*) packet, &packet_len, src_buf, src_buf_len);
if (ret < 0) {
free(packet);
return -1;
@ -90,7 +90,7 @@ ssize_t client_encode(unsigned char** dest_buf, const unsigned char* src_buf, si
size_t first_packet_len = 0;
for (size_t i = 0; i < num_segments; i++) {
size_t packet_len = MAX_DNS_QUERY_SIZE;
const ssize_t ret = client_encode_segment((dns_packet_t*) current_packet, &packet_len, segment, *segment_len);
const ssize_t ret = client_encode_segment(quic, (dns_packet_t*) current_packet, &packet_len, segment, *segment_len);
if (ret < 0) {
free(packets);
return -1;
@ -117,7 +117,7 @@ ssize_t client_encode(unsigned char** dest_buf, const unsigned char* src_buf, si
return current_packet - packets;
}
ssize_t client_decode(const unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* from, struct sockaddr_storage* dest) {
ssize_t client_decode(picoquic_quic_t* quic, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr) {
*dest_buf = NULL;
size_t bufsize = DNS_DECODEBUF_4K * sizeof(dns_decoded_t);

View file

@ -0,0 +1,183 @@
#include <assert.h>
#include <picohash.h>
#include <string.h>
#include <stdlib.h>
#include "picoquic.h"
#include "picoquic_utils.h"
#include "slipstream_utils.h"
#include "slipstream_dns_request_buffer.h"
static uint64_t cnxid_to_cnxid_buffer_hash(const void* key) {
const cnxid_to_cnxid_buffer_t* l_cid = key;
return picoquic_connection_id_hash(&l_cid->cnx_id);
}
static int cnxid_to_cnxid_buffer_compare(const void* key1, const void* key2) {
const cnxid_to_cnxid_buffer_t* l_cid1 = key1;
const cnxid_to_cnxid_buffer_t* l_cid2 = key2;
return picoquic_compare_connection_id(&l_cid1->cnx_id, &l_cid2->cnx_id);
}
void slipstream_dns_request_buffer_init(slipstream_dns_request_buffer_t* buffer) {
memset(buffer, 0, sizeof(slipstream_dns_request_buffer_t));
buffer->head = NULL;
buffer->tail = NULL;
for (int i = 0; i < GLOBAL_BUFFER_SIZE; i++) {
slot_t* element = &buffer->elements[i];
element->buffer_next = buffer->free;
element->buffer_prev = NULL;
buffer->free = element;
}
buffer->cnxid_to_cnxid_buffer = picohash_create(32, cnxid_to_cnxid_buffer_hash, cnxid_to_cnxid_buffer_compare);
}
slipstream_cnxid_dns_request_buffer_t* slipstream_dns_request_buffer_get_cnxid_buffer(
slipstream_dns_request_buffer_t* buffer, picoquic_connection_id_t* initial_cnxid, bool create) {
cnxid_to_cnxid_buffer_t key = {
.cnx_id = *initial_cnxid,
.cnxid_buffer = NULL
};
const picohash_item* item = picohash_retrieve(buffer->cnxid_to_cnxid_buffer, &key);
if (item != NULL) {
return ((cnxid_to_cnxid_buffer_t*)item->key)->cnxid_buffer;
}
if (!create) {
return NULL;
}
char* initial_cnxid_str = picoquic_connection_id_to_string(initial_cnxid);
DBG_PRINTF("creating new hash key for %s\n", initial_cnxid_str);
free(initial_cnxid_str);
cnxid_to_cnxid_buffer_t* new_key = malloc(sizeof(cnxid_to_cnxid_buffer_t));
memcpy(&new_key->cnx_id, initial_cnxid, sizeof(picoquic_connection_id_t));
new_key->cnxid_buffer = malloc(sizeof(slipstream_cnxid_dns_request_buffer_t));
memset(new_key->cnxid_buffer, 0, sizeof(slipstream_cnxid_dns_request_buffer_t));
if (new_key->cnxid_buffer == NULL) {
fprintf(stderr, "error allocating memory for cnx buffer\n");
return NULL;
}
if (picohash_insert(buffer->cnxid_to_cnxid_buffer, new_key) < 0) {
free(new_key->cnxid_buffer);
fprintf(stderr, "error adding a cnx buffer for a new cnx id\n");
return NULL;
}
return new_key->cnxid_buffer;
}
void slipstream_cnxid_dns_request_buffer_free_slot(slipstream_cnxid_dns_request_buffer_t* cnxid_buffer, slot_t* slot) {
if (slot->cnxid_buffer_prev != NULL) {
slot->cnxid_buffer_prev->cnxid_buffer_next = slot->cnxid_buffer_next;
}
if (slot->cnxid_buffer_next != NULL) {
slot->cnxid_buffer_next->cnxid_buffer_prev = slot->cnxid_buffer_prev;
}
if (cnxid_buffer->head == slot) {
cnxid_buffer->head = slot->cnxid_buffer_next;
}
if (cnxid_buffer->tail == slot) {
cnxid_buffer->tail = slot->cnxid_buffer_prev;
}
slot->cnxid_buffer = NULL;
}
void slipstream_dns_request_buffer_free_slot(slipstream_dns_request_buffer_t* buffer, slot_t* slot) {
if (slot->buffer_prev != NULL) {
slot->buffer_prev->buffer_next = slot->buffer_next;
}
if (slot->buffer_next != NULL) {
slot->buffer_next->buffer_prev = slot->buffer_prev;
}
if (buffer->head == slot) {
buffer->head = slot->buffer_next;
}
if (buffer->tail == slot) {
buffer->tail = slot->buffer_prev;
}
if (buffer->free) {
buffer->free->buffer_prev = slot;
}
slot->buffer_next = buffer->free;
buffer->free = slot;
}
slot_t* slipstream_dns_request_buffer_get_write_slot(slipstream_dns_request_buffer_t* buffer) {
if (!buffer->free) {
slot_t* tail = buffer->tail;
assert(tail != NULL);
slipstream_cnxid_dns_request_buffer_free_slot(tail->cnxid_buffer, tail);
slipstream_dns_request_buffer_free_slot(buffer, tail);
}
// Get the first free element
slot_t* slot = buffer->free;
buffer->free = slot->buffer_next;
// Add the element to the head of the global buffer
slot->buffer_next = buffer->head;
slot->buffer_prev = NULL;
buffer->head = slot;
if (slot->buffer_next != NULL) {
slot->buffer_next->buffer_prev = slot;
}
// If the tail is NULL (first element), set it to the new element
if (buffer->tail == NULL) {
buffer->tail = slot;
}
return slot;
}
// TODO: what happens if we don't commit
void slipstream_dns_request_buffer_commit_slot_to_cnxid_buffer(slipstream_dns_request_buffer_t* buffer,
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer,
slot_t* slot) {
slot->cnxid_buffer = cnxid_buffer;
// Add this slot to a specific cnxid buffer
slot->cnxid_buffer_next = cnxid_buffer->head;
slot->cnxid_buffer_prev = NULL;
cnxid_buffer->head = slot;
if (slot->cnxid_buffer_next != NULL) {
slot->cnxid_buffer_next->cnxid_buffer_prev = slot;
}
// If the tail is NULL (first element), set it to the new element
if (cnxid_buffer->tail == NULL) {
cnxid_buffer->tail = slot;
}
}
slot_t* slipstream_dns_request_buffer_get_read_slot(slipstream_dns_request_buffer_t* buffer,
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer) {
// Get the last element from the cnxid buffer
slot_t* slot = cnxid_buffer->tail;
if (!slot) {
return NULL;
}
slipstream_cnxid_dns_request_buffer_free_slot(slot->cnxid_buffer, slot);
slipstream_dns_request_buffer_free_slot(buffer, slot);
return slot;
}
// TODO: free up cnxid_buffer

View file

@ -6,31 +6,49 @@
#ifdef BUILD_LOGLIB
#include <autoqlog.h>
#endif
#include <picohash.h>
#include <pthread.h>
#include <stdbool.h>
#include <arpa/nameser.h>
#include <sys/ioctl.h>
#include <sys/param.h>
#include <sys/poll.h>
#include <assert.h>
#include "lua-resty-base-encoding-base32.h"
#include "picoquic_config.h"
#include "slipstream.h"
#include "slipstream_inline_dots.h"
#include "slipstream_server_circular_query_buffer.h"
#include "slipstream_packet.h"
#include "slipstream_dns_request_buffer.h"
#include "slipstream_utils.h"
#include "SPCDNS/src/dns.h"
#include "SPCDNS/src/mappings.h"
circular_query_buffer_t server_cqb = {0};
char* server_domain_name = NULL;
size_t server_domain_name_len = 0;
ssize_t server_encode(unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_size) {
const dns_query_t *query = (dns_query_t *) circular_query_buffer_get_read_slot(&server_cqb);
if (query == NULL) {
slipstream_dns_request_buffer_t slipstream_server_dns_request_buffer;
ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_size, struct sockaddr_storage *peer_addr) {
// we don't support segmentation in the server
assert(segment_size == NULL || *segment_size == 0 || *segment_size == src_buf_len);
picoquic_connection_id_t initial_cnxid = picoquic_get_initial_cnxid(cnx);
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer = slipstream_dns_request_buffer_get_cnxid_buffer(
&slipstream_server_dns_request_buffer, &initial_cnxid, false);
if (cnxid_buffer == NULL) {
fprintf(stderr, "error getting cnxid buffer\n");
return -1;
}
slot_t *slot = slipstream_dns_request_buffer_get_read_slot(&slipstream_server_dns_request_buffer, cnxid_buffer);
if (slot == NULL) {
fprintf(stderr, "no available DNS request to respond to\n");
return -1;
}
dns_query_t *query = (dns_query_t *) slot->dns_decoded;
if (query->questions == NULL) {
fprintf(stderr, "no questions in DNS request\n");
@ -69,14 +87,29 @@ ssize_t server_encode(unsigned char** dest_buf, const unsigned char* src_buf, si
}
*dest_buf = (unsigned char*)packet;
memcpy(peer_addr, &slot->peer_addr, sizeof(struct sockaddr_storage));
return packet_len;
}
ssize_t server_decode(const unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *from, struct sockaddr_storage *dest) {
ssize_t server_decode(picoquic_quic_t* quic, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr) {
*dest_buf = NULL;
dns_decoded_t* packet = circular_query_buffer_get_write_slot(&server_cqb);
slot_t* slot = slipstream_dns_request_buffer_get_write_slot(&slipstream_server_dns_request_buffer);
if (slot == NULL) {
fprintf(stderr, "error getting write slot\n");
sockaddr_dummy(peer_addr);
return -1;
}
// DNS packets arrive from random source ports, so:
// * save the original address in the dns query slot
// * set the source address to a dummy address (to prevent QUIC from using it)
memcpy(&slot->peer_addr, peer_addr, sizeof(struct sockaddr_storage));
sockaddr_dummy(peer_addr);
size_t packet_len = DNS_DECODEBUF_4K * sizeof(dns_decoded_t);
dns_decoded_t* packet = slot->dns_decoded;
const dns_rcode_t rc = dns_decode(packet, &packet_len, (const dns_packet_t*) src_buf, src_buf_len);
if (rc != RCODE_OKAY) {
fprintf(stderr, "dns_decode() = (%d) %s\n", rc, dns_rcode_text(rc));
@ -114,6 +147,42 @@ ssize_t server_decode(const unsigned char** dest_buf, const unsigned char* src_b
if (decoded_len == (size_t) -1) {
free(decoded_buf);
fprintf(stderr, "error decoding base32: %lu\n", decoded_len);
return -1;
}
picoquic_connection_id_t incoming_src_connection_id = {0};
picoquic_connection_id_t incoming_dest_connection_id; // sure to be set by parser
bool is_poll_packet = false;
// ReSharper disable once CppDFAUnreachableCode
const int ret = slipstream_packet_parse(decoded_buf, decoded_len, PICOQUIC_SHORT_HEADER_CONNECTION_ID_SIZE,
&incoming_src_connection_id, &incoming_dest_connection_id, &is_poll_packet);
if (ret != 0) {
free(decoded_buf);
fprintf(stderr, "error parsing slipstream packet: %d\n", ret);
return -1;
}
picoquic_cnx_t *cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id);
picoquic_connection_id_t initial_cnxid;
if (cnx == NULL) {
initial_cnxid = incoming_dest_connection_id;
} else {
initial_cnxid = picoquic_get_initial_cnxid(cnx);
}
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer = slipstream_dns_request_buffer_get_cnxid_buffer(
&slipstream_server_dns_request_buffer, &initial_cnxid, true);
if (cnxid_buffer == NULL) {
free(decoded_buf);
fprintf(stderr, "error getting or creating cnxid buffer\n");
return -1;
}
slipstream_dns_request_buffer_commit_slot_to_cnxid_buffer(&slipstream_server_dns_request_buffer, cnxid_buffer, slot);
if (is_poll_packet) {
free(decoded_buf);
return 0;
}
*dest_buf = decoded_buf;
@ -541,6 +610,8 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
#endif
picoquic_set_key_log_file_from_env(quic);
slipstream_dns_request_buffer_init(&slipstream_server_dns_request_buffer);
picoquic_packet_loop_param_t param = {0};
param.local_af = AF_INET;
param.local_port = server_port;

View file

@ -1,38 +0,0 @@
#include <time.h>
#include "slipstream_server_circular_query_buffer.h"
// Get next available slot for writing
dns_decoded_t* circular_query_buffer_get_write_slot(circular_query_buffer_t* buf) {
dns_decoded_t* slot = buf->queries[buf->head];
// Move head forward
buf->head = (buf->head + 1) % SIZE;
// If we've caught up to tail, move tail forward
if (buf->head == buf->tail) {
buf->tail = (buf->tail + 1) % SIZE;
}
return slot;
}
// Get next available item for reading
dns_decoded_t* circular_query_buffer_get_read_slot(circular_query_buffer_t* buf) {
// Check if buffer is empty
if (buf->tail == buf->head) {
return NULL;
}
dns_decoded_t* slot = buf->queries[buf->tail];
buf->tail = (buf->tail + 1) % SIZE;
return slot;
}
size_t circular_query_buffer_get_size(circular_query_buffer_t* buf) {
if (buf->head >= buf->tail) {
return buf->head - buf->tail;
}
return SIZE - buf->tail + buf->head;
}

44
src/slipstream_utils.c Normal file
View file

@ -0,0 +1,44 @@
#include "slipstream_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
char* picoquic_connection_id_to_string(const picoquic_connection_id_t* cid) {
// Each byte needs 2 hex characters + null terminator
char* str = malloc((cid->id_len * 2 + 1) * sizeof(char));
if (str == NULL) {
return NULL;
}
// Convert each byte to hex
for (int i = 0; i < cid->id_len; i++) {
sprintf(&str[i * 2], "%02x", cid->id[i]);
}
str[cid->id_len * 2] = '\0';
return str;
}
// Function to create a dummy sockaddr_storage with hardcoded IPv4 and port
void sockaddr_dummy(struct sockaddr_storage *addr_storage) {
// Clear the entire sockaddr_storage to avoid residual data
memset(addr_storage, 0, sizeof(struct sockaddr_storage));
// Cast sockaddr_storage to sockaddr_in for IPv4
struct sockaddr_in *addr4 = (struct sockaddr_in *)addr_storage;
// Set address family to AF_INET (IPv4)
addr4->sin_family = AF_INET;
// Use a hardcoded IPv4 address: 192.0.2.1 (TEST-NET-1 for testing)
inet_pton(AF_INET, "192.0.2.1", &addr4->sin_addr);
// Set a hardcoded port: 12345
addr4->sin_port = htons(12345);
#ifdef __APPLE__ // For BSD systems, set sin_len
addr4->sin_len = sizeof(struct sockaddr_in);
#endif
}