Implement custom sockloop

- s.t. server only attempts sending data when there is an available request
- s.t. dns requests are responded to in-order
- ensures that rtt stays consistent
- ensures that congestion control isn't tripped
- ensures that dns resolver isn't tripped
This commit is contained in:
Jop Zitman 2024-12-29 17:23:48 +08:00
parent 3350be382c
commit ee59a67164
10 changed files with 336 additions and 414 deletions

View file

@ -38,17 +38,18 @@ add_subdirectory(extern/picoquic)
add_executable(slipstream
src/slipstream.c
src/slipstream_client.c
src/slipstream_server.c
src/slipstream_dns_request_buffer.c
src/slipstream_inline_dots.c
src/slipstream_packet.c
src/slipstream_resolver_addresses.c
src/slipstream_server.c
src/slipstream_sockloop.c
src/slipstream_utils.c
include/slipstream.h
include/slipstream_dns_request_buffer.h
include/slipstream_inline_dots.h
include/slipstream_packet.h
include/slipstream_resolver_addresses.h
include/slipstream_slot.h
include/slipstream_sockloop.h
include/slipstream_utils.h
extern/lua-resty-base-encoding/b32_data.h

2
extern/picoquic vendored

@ -1 +1 @@
Subproject commit a7b7df45d831a6d1b6ff37a0a4f8ce847a7f48c2
Subproject commit c00a6f69032a255ee15892a0fec3c8e8cc1da4a4

View file

@ -19,7 +19,6 @@ extern "C" {
#define SLIPSTREAM_QLOG_DIR "./qlog";
int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_filename, const char* domain_name,
const char* cc_algo_id);

View file

@ -1,62 +0,0 @@
#ifndef SLIPSTREAM_DNS_REQUEST_BUFFER
#define SLIPSTREAM_DNS_REQUEST_BUFFER
#include <stdbool.h>
#include <picohash.h>
#include "SPCDNS/src/dns.h"
#define GLOBAL_BUFFER_SIZE 32
typedef struct st_slipstream_cnxid_dns_request_buffer_t slipstream_cnxid_dns_request_buffer_t;
typedef struct st_element_t {
dns_decoded_t dns_decoded[DNS_DECODEBUF_4K];
struct sockaddr_storage peer_addr;
struct sockaddr_storage local_addr;
struct st_element_t* buffer_prev;
struct st_element_t* buffer_next;
struct st_element_t* cnxid_buffer_prev;
struct st_element_t* cnxid_buffer_next;
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer;
uint64_t created_time;
int query_id;
} slot_t;
typedef struct st_slipstream_cnxid_dns_request_buffer_t {
slot_t* head;
slot_t* tail;
} slipstream_cnxid_dns_request_buffer_t;
typedef struct {
slot_t slots[GLOBAL_BUFFER_SIZE];
slot_t* head;
slot_t* tail;
slot_t* free;
picohash_table* cnxid_to_cnxid_buffer;
slipstream_cnxid_dns_request_buffer_t** cnxid_buffers;
size_t cnxid_buffers_len;
} slipstream_dns_request_buffer_t;
typedef struct st_cnxid_to_cnxid_buffer_t {
picoquic_connection_id_t cnx_id;
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer;
} cnxid_to_cnxid_buffer_t;
void slipstream_dns_request_buffer_init(slipstream_dns_request_buffer_t* buffer);
slipstream_cnxid_dns_request_buffer_t* slipstream_dns_request_buffer_get_cnxid_buffer(
slipstream_dns_request_buffer_t* buffer, picoquic_connection_id_t* initial_cnxid, bool create);
void slipstream_dns_request_buffer_free_slot(slipstream_dns_request_buffer_t* buffer, slot_t* slot);
slot_t* slipstream_dns_request_buffer_get_write_slot(slipstream_dns_request_buffer_t* buffer);
void slipstream_dns_request_buffer_commit_slot_to_cnxid_buffer(slipstream_dns_request_buffer_t* buffer,
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer,
slot_t* slot);
slot_t* slipstream_dns_request_buffer_get_read_slot(slipstream_dns_request_buffer_t* buffer,
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer);
#endif //SLIPSTREAM_DNS_REQUEST_BUFFER

17
include/slipstream_slot.h Normal file
View file

@ -0,0 +1,17 @@
#ifndef SLIPSTREAM_SLOT
#define SLIPSTREAM_SLOT
#include "SPCDNS/src/dns.h"
#include "picoquic.h"
typedef struct st_slot_t {
dns_decoded_t dns_decoded[DNS_DECODEBUF_4K];
dns_rcode_t error;
struct sockaddr_storage peer_addr;
struct sockaddr_storage local_addr;
picoquic_cnx_t* cnx;
uint64_t created_time;
int query_id;
} slot_t;
#endif // SLIPSTREAM_SLOT

View file

@ -0,0 +1,8 @@
#ifndef SLIPSTREAM_SOCKLOOP_H
#define SLIPSTREAM_SOCKLOOP_H
#include "picoquic_packet_loop.h"
void* slipstream_packet_loop(picoquic_network_thread_ctx_t* thread_ctx);
#endif //SLIPSTREAM_SOCKLOOP_H

View file

@ -11,6 +11,7 @@
#include <assert.h>
#include <picoquic_internal.h>
#include <pthread.h>
#include <slipstream_sockloop.h>
#include <stdbool.h>
#include <arpa/nameser.h>
#include <sys/ioctl.h>
@ -88,7 +89,7 @@ ssize_t client_encode_segment(picoquic_quic_t* quic, dns_packet_t* packet, size_
return 0;
}
ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
ssize_t client_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
assert(callback_ctx);
slipstream_client_ctx_t* client_ctx = callback_ctx;
@ -146,7 +147,7 @@ ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback
return current_packet - packets;
}
ssize_t client_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
ssize_t client_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, picoquic_socket_ctx_t* s_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
*dest_buf = NULL;
slipstream_client_ctx_t* client_ctx = callback_ctx;
@ -176,7 +177,7 @@ ssize_t client_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket
}
if (query->ancount != 1) {
DBG_PRINTF("[%d] dns record should contain exactly one answer", query->id);
// DBG_PRINTF("[%d] dns record should contain exactly one answer", query->id);
return 0;
}
@ -200,11 +201,6 @@ ssize_t client_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket
return answer_txt->len;
}
const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr);
if (send_socket == INVALID_SOCKET) {
DBG_PRINTF("[%d] no valid socket found for poll packet", query->id);
return answer_txt->len;
}
// get active destination connection id on this ctx
picoquic_cnx_t* cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id);
@ -225,14 +221,14 @@ ssize_t client_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket
}
unsigned char* encoded;
ssize_t encoded_len = client_encode(quic, cnx, callback_ctx, s_ctxs, &encoded, poll_packet_buf, poll_packet_len,
&poll_packet_len, peer_addr, local_addr);
ssize_t encoded_len = client_encode(quic, slot_p, callback_ctx, &encoded, poll_packet_buf, poll_packet_len, &poll_packet_len, peer_addr, local_addr);
if (encoded_len <= 0) {
DBG_PRINTF("error encoding poll packet", NULL);
free(poll_packet_buf);
return answer_txt->len;
}
const SOCKET_TYPE send_socket = s_ctx->fd;
int sock_err = 0;
ret = picoquic_sendmsg(send_socket,
(struct sockaddr*)peer_addr, (struct sockaddr*)local_addr, 0,
@ -300,6 +296,8 @@ static void slipstream_client_free_context(slipstream_client_ctx_t* client_ctx)
slipstream_client_free_stream_ctx(client_ctx, stream_ctx);
}
free(client_ctx->server_addresses);
/* release the memory */
free(client_ctx);
}
@ -348,19 +346,16 @@ int slipstream_client_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l
struct sockaddr_storage *peer_addr = &cnx->path[0]->peer_addr;
struct sockaddr_storage *local_addr = &cnx->path[0]->local_addr;
// DBG_PRINTF("[current:%u][last:%u][passed:%u] POLL!", current_time, client_ctx->last_request, passed);
picoquic_socket_ctxs_t* s_ctxs = (picoquic_socket_ctxs_t*)callback_arg;
picoquic_socket_ctx_t* s_ctx = (picoquic_socket_ctx_t*)callback_arg;
unsigned char* encoded;
ssize_t encoded_len = client_encode(quic, cnx, client_ctx, s_ctxs, &encoded, poll_packet_buf, poll_packet_len,
&poll_packet_len, NULL, NULL);
ssize_t encoded_len = client_encode(quic, NULL, client_ctx, &encoded, poll_packet_buf, poll_packet_len, &poll_packet_len, NULL, NULL);
if (encoded_len <= 0) {
DBG_PRINTF("error encoding poll packet", NULL);
free(poll_packet_buf);
return 0;
}
const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr);
const SOCKET_TYPE send_socket = s_ctx->fd;
int sock_err = 0;
ret = picoquic_sendmsg(send_socket,
@ -711,7 +706,7 @@ static int slipstream_connect(struct sockaddr_storage* server_address,
}
// 400ms
picoquic_enable_keep_alive(*cnx, 400000);
// picoquic_enable_keep_alive(*cnx, 400000);
/* Document connection in client's context */
client_ctx->cnx = *cnx;
@ -871,7 +866,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f
}
signal(SIGTERM, client_sighandler);
picoquic_packet_loop_v3(&thread_ctx);
slipstream_packet_loop(&thread_ctx);
ret = thread_ctx.return_code;
/* And finish. */

View file

@ -1,189 +0,0 @@
#include <assert.h>
#include <picohash.h>
#include <string.h>
#include <stdlib.h>
#include "picoquic.h"
#include "picoquic_utils.h"
#include "slipstream_utils.h"
#include "slipstream_dns_request_buffer.h"
static uint64_t cnxid_to_cnxid_buffer_hash(const void* key) {
const cnxid_to_cnxid_buffer_t* l_cid = key;
return picoquic_connection_id_hash(&l_cid->cnx_id);
}
static int cnxid_to_cnxid_buffer_compare(const void* key1, const void* key2) {
const cnxid_to_cnxid_buffer_t* l_cid1 = key1;
const cnxid_to_cnxid_buffer_t* l_cid2 = key2;
return picoquic_compare_connection_id(&l_cid1->cnx_id, &l_cid2->cnx_id);
}
void slipstream_dns_request_buffer_init(slipstream_dns_request_buffer_t* buffer) {
memset(buffer, 0, sizeof(slipstream_dns_request_buffer_t));
buffer->head = NULL;
buffer->tail = NULL;
for (int i = 0; i < GLOBAL_BUFFER_SIZE; i++) {
slot_t* element = &buffer->slots[i];
element->buffer_next = buffer->free;
element->buffer_prev = NULL;
buffer->free = element;
}
buffer->cnxid_to_cnxid_buffer = picohash_create(32, cnxid_to_cnxid_buffer_hash, cnxid_to_cnxid_buffer_compare);
}
slipstream_cnxid_dns_request_buffer_t* slipstream_dns_request_buffer_get_cnxid_buffer(
slipstream_dns_request_buffer_t* buffer, picoquic_connection_id_t* initial_cnxid, bool create) {
cnxid_to_cnxid_buffer_t key = {
.cnx_id = *initial_cnxid,
.cnxid_buffer = NULL
};
const picohash_item* item = picohash_retrieve(buffer->cnxid_to_cnxid_buffer, &key);
if (item != NULL) {
return ((cnxid_to_cnxid_buffer_t*)item->key)->cnxid_buffer;
}
if (!create) {
return NULL;
}
char* initial_cnxid_str = picoquic_connection_id_to_string(initial_cnxid);
DBG_PRINTF("creating new hash key for %s\n", initial_cnxid_str);
free(initial_cnxid_str);
cnxid_to_cnxid_buffer_t* new_key = malloc(sizeof(cnxid_to_cnxid_buffer_t));
memcpy(&new_key->cnx_id, initial_cnxid, sizeof(picoquic_connection_id_t));
new_key->cnxid_buffer = malloc(sizeof(slipstream_cnxid_dns_request_buffer_t));
memset(new_key->cnxid_buffer, 0, sizeof(slipstream_cnxid_dns_request_buffer_t));
if (new_key->cnxid_buffer == NULL) {
fprintf(stderr, "error allocating memory for cnx buffer\n");
return NULL;
}
if (picohash_insert(buffer->cnxid_to_cnxid_buffer, new_key) < 0) {
free(new_key->cnxid_buffer);
fprintf(stderr, "error adding a cnx buffer for a new cnx id\n");
return NULL;
}
buffer->cnxid_buffers_len++;
slipstream_cnxid_dns_request_buffer_t** cnxid_buffers = realloc(buffer->cnxid_buffers,
buffer->cnxid_buffers_len * sizeof(slipstream_cnxid_dns_request_buffer_t*));
if (cnxid_buffers == NULL) {
return NULL;
}
buffer->cnxid_buffers = cnxid_buffers;
buffer->cnxid_buffers[buffer->cnxid_buffers_len - 1] = new_key->cnxid_buffer;
return new_key->cnxid_buffer;
}
void slipstream_dns_request_buffer_free_slot(slipstream_dns_request_buffer_t* buffer, slot_t* slot) {
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer = slot->cnxid_buffer;
if (cnxid_buffer != NULL) {
if (slot->cnxid_buffer_prev != NULL) {
slot->cnxid_buffer_prev->cnxid_buffer_next = slot->cnxid_buffer_next;
}
if (slot->cnxid_buffer_next != NULL) {
slot->cnxid_buffer_next->cnxid_buffer_prev = slot->cnxid_buffer_prev;
}
if (cnxid_buffer->head == slot) {
cnxid_buffer->head = slot->cnxid_buffer_next;
}
if (cnxid_buffer->tail == slot) {
cnxid_buffer->tail = slot->cnxid_buffer_prev;
}
slot->cnxid_buffer = NULL;
}
if (slot->buffer_prev != NULL) {
slot->buffer_prev->buffer_next = slot->buffer_next;
}
if (slot->buffer_next != NULL) {
slot->buffer_next->buffer_prev = slot->buffer_prev;
}
if (buffer->head == slot) {
buffer->head = slot->buffer_next;
}
if (buffer->tail == slot) {
buffer->tail = slot->buffer_prev;
}
if (buffer->free) {
buffer->free->buffer_prev = slot;
}
slot->buffer_next = buffer->free;
buffer->free = slot;
slot->query_id = 0;
}
slot_t* slipstream_dns_request_buffer_get_write_slot(slipstream_dns_request_buffer_t* buffer) {
if (!buffer->free) {
return NULL;
}
// Get the first free element
slot_t* slot = buffer->free;
buffer->free = slot->buffer_next;
// Add the element to the head of the global buffer
slot->buffer_next = buffer->head;
slot->buffer_prev = NULL;
buffer->head = slot;
if (slot->buffer_next != NULL) {
slot->buffer_next->buffer_prev = slot;
}
// If the tail is NULL (first element), set it to the new element
if (buffer->tail == NULL) {
buffer->tail = slot;
}
return slot;
}
// TODO: what happens if we don't commit
void slipstream_dns_request_buffer_commit_slot_to_cnxid_buffer(slipstream_dns_request_buffer_t* buffer,
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer,
slot_t* slot) {
slot->cnxid_buffer = cnxid_buffer;
// Add this slot to a specific cnxid buffer
slot->cnxid_buffer_next = cnxid_buffer->head;
slot->cnxid_buffer_prev = NULL;
cnxid_buffer->head = slot;
if (slot->cnxid_buffer_next != NULL) {
slot->cnxid_buffer_next->cnxid_buffer_prev = slot;
}
// If the tail is NULL (first element), set it to the new element
if (cnxid_buffer->tail == NULL) {
cnxid_buffer->tail = slot;
}
}
slot_t* slipstream_dns_request_buffer_get_read_slot(slipstream_dns_request_buffer_t* buffer,
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer) {
// Get the last element from the cnxid buffer
slot_t* slot = cnxid_buffer->tail;
if (!slot) {
return NULL;
}
return slot;
}
// TODO: free up cnxid_buffer

View file

@ -13,13 +13,14 @@
#include <sys/param.h>
#include <sys/poll.h>
#include <assert.h>
#include <slipstream_sockloop.h>
#include "lua-resty-base-encoding-base32.h"
#include "picoquic_config.h"
#include "slipstream.h"
#include "slipstream_inline_dots.h"
#include "slipstream_packet.h"
#include "slipstream_dns_request_buffer.h"
#include "slipstream_slot.h"
#include "slipstream_utils.h"
#include "SPCDNS/src/dns.h"
#include "SPCDNS/src/mappings.h"
@ -27,82 +28,13 @@
char* server_domain_name = NULL;
size_t server_domain_name_len = 0;
slipstream_dns_request_buffer_t slipstream_server_dns_request_buffer;
ssize_t respond_and_free_slot(slot_t* slot, const picoquic_socket_ctxs_t* s_ctxs, dns_rcode_t rcode) {
const dns_query_t *query = (dns_query_t *) slot->dns_decoded;
dns_query_t response = {0};
response.id = query->id;
response.query = false;
response.opcode = query->opcode;
response.aa = true;
response.rd = query->rd;
response.cd = query->cd;
response.rcode = rcode;
response.qdcount = query->qdcount;
response.questions = query->questions;
dns_packet_t packet[DNS_BUFFER_UDP_MAX];
size_t packet_len = MAX_UDP_PACKET_SIZE;
dns_rcode_t rc = dns_encode(packet, &packet_len, &response);
// slot not used anymore, free it
slipstream_dns_request_buffer_free_slot(&slipstream_server_dns_request_buffer, slot);
if (rc != RCODE_OKAY) {
DBG_PRINTF("dns_encode() = (%d) %s", rc, dns_rcode_text(rc));
return -1;
}
const struct sockaddr_storage *peer_addr = &slot->peer_addr;
const struct sockaddr_storage *local_addr = &slot->local_addr;
const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr);
if (send_socket == INVALID_SOCKET) {
DBG_PRINTF("no valid socket found for poll packet", NULL);
return -1;
}
int sock_err = 0;
int ret = picoquic_sendmsg(send_socket,
(struct sockaddr*)peer_addr, (struct sockaddr*)local_addr, 0,
(const char*)packet, packet_len, 0, &sock_err);
if (ret < 0) {
DBG_PRINTF("Error sending poll packet, ret=%d, sock_err=%d %s", ret, sock_err, strerror(sock_err));
return -1;
}
return 0;
}
ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs,
unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_size,
struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
ssize_t server_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
// we don't support segmentation in the server
assert(segment_size == NULL || *segment_size == 0 || *segment_size == src_buf_len);
assert(segment_len == NULL || *segment_len == 0 || *segment_len == src_buf_len);
picoquic_connection_id_t initial_cnxid = picoquic_get_initial_cnxid(cnx);
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer = slipstream_dns_request_buffer_get_cnxid_buffer(
&slipstream_server_dns_request_buffer, &initial_cnxid, false);
if (cnxid_buffer == NULL) {
DBG_PRINTF("error getting cnxid buffer", NULL);
return -1;
}
slot_t *slot = slipstream_dns_request_buffer_get_read_slot(&slipstream_server_dns_request_buffer, cnxid_buffer);
if (slot == NULL) {
DBG_PRINTF("no available DNS request to respond to", NULL);
return -1;
}
slot_t* slot = (slot_t*) slot_p;
dns_query_t *query = (dns_query_t *) slot->dns_decoded;
const dns_question_t *question = &query->questions[0]; // assuming server_decode ensures there is exactly one question
dns_txt_t answer_txt;
answer_txt.name = question->name;
answer_txt.type = question->type;
answer_txt.class = question->class;
answer_txt.ttl = 60;
answer_txt.text = (char *)src_buf;
answer_txt.len = src_buf_len;
dns_txt_t answer_txt; // TODO: fix
dns_answer_t edns = {0};
edns.opt.name = ".";
edns.opt.type = RR_OPT;
@ -117,18 +49,33 @@ ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback
response.aa = true;
response.rd = query->rd;
response.cd = query->cd;
response.rcode = RCODE_OKAY;
response.rcode = slot->error;
response.qdcount = 1;
response.questions = query->questions;
response.ancount = 1;
response.answers = (dns_answer_t *)&answer_txt;
if (src_buf_len > 0) {
const dns_question_t *question = &query->questions[0]; // assuming server_decode ensures there is exactly one question
answer_txt.name = question->name;
answer_txt.type = question->type;
answer_txt.class = question->class;
answer_txt.ttl = 60;
answer_txt.text = (char *)src_buf;
answer_txt.len = src_buf_len;
response.ancount = 1;
response.answers = (dns_answer_t *)&answer_txt;
} else {
if (slot->error == RCODE_OKAY) {
response.rcode = RCODE_NAME_ERROR;
}
}
response.arcount = 1;
response.additional = &edns;
dns_packet_t* packet = malloc(MAX_UDP_PACKET_SIZE);
size_t packet_len = MAX_UDP_PACKET_SIZE;
dns_rcode_t rc = dns_encode(packet, &packet_len, &response);
slipstream_dns_request_buffer_free_slot(&slipstream_server_dns_request_buffer, slot);
if (rc != RCODE_OKAY) {
free(packet);
DBG_PRINTF("dns_encode() = (%d) %s", rc, dns_rcode_text(rc));
@ -137,27 +84,15 @@ ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback
*dest_buf = (unsigned char*)packet;
memcpy(peer_addr, &slot->peer_addr, sizeof(struct sockaddr_storage));
memcpy(local_addr, &slot->local_addr, sizeof(struct sockaddr_storage));
return packet_len;
}
ssize_t server_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) {
ssize_t server_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, picoquic_socket_ctx_t* s_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) {
*dest_buf = NULL;
slot_t* slot;
if (!slipstream_server_dns_request_buffer.free) {
slot = slipstream_server_dns_request_buffer.tail;
assert(slot != NULL);
assert(slot->cnxid_buffer != NULL);
respond_and_free_slot(slot, s_ctxs, RCODE_NAME_ERROR);
}
slot = slipstream_dns_request_buffer_get_write_slot(&slipstream_server_dns_request_buffer);
if (slot == NULL) {
DBG_PRINTF("error getting write slot", NULL);
sockaddr_dummy(peer_addr);
return -1;
}
slot_t* slot = slot_p;
slot->created_time = picoquic_current_time();
// DNS packets arrive from random source ports, so:
@ -173,31 +108,36 @@ ssize_t server_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket
const dns_rcode_t rc = dns_decode(packet, &packet_len, (const dns_packet_t*) src_buf, src_buf_len);
if (rc != RCODE_OKAY) {
DBG_PRINTF("dns_decode() = (%d) %s", rc, dns_rcode_text(rc));
return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE);
// TODO: how to get rid of this packet
return -1; // TODO: server failure
}
const dns_query_t *query = (dns_query_t*) packet;
if (!query->query) {
DBG_PRINTF("dns record is not a query", NULL);
return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED);
slot->error = RCODE_FORMAT_ERROR;
return 0;
}
if (query->qdcount != 1) {
DBG_PRINTF("dns record should contain exactly one query", NULL);
return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED);
slot->error = RCODE_FORMAT_ERROR;
return 0;
}
const dns_question_t *question = &query->questions[0];
if (question->type != RR_TXT) {
// resolvers send anything for pinging, so we only respond to TXT queries
// DBG_PRINTF("query type is not TXT", NULL);
return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED);
slot->error = RCODE_NAME_ERROR;
return 0;
}
const ssize_t data_len = strlen(question->name) - server_domain_name_len - 1 - 1;
if (data_len <= 0) {
DBG_PRINTF("subdomain is empty", NULL);
return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED);
slot->error = RCODE_NAME_ERROR;
return 0;
}
// copy the subdomain from name to a new buffer
@ -211,7 +151,8 @@ ssize_t server_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket
if (decoded_len == (size_t) -1) {
free(decoded_buf);
DBG_PRINTF("error decoding base32: %lu", decoded_len);
return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE);
slot->error = RCODE_SERVER_FAILURE;
return 0;
}
picoquic_connection_id_t incoming_src_connection_id = {0};
@ -223,27 +164,13 @@ ssize_t server_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket
if (ret != 0) {
free(decoded_buf);
DBG_PRINTF("error parsing slipstream packet: %d", ret);
return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE);
slot->error = RCODE_SERVER_FAILURE;
return 0;
}
picoquic_cnx_t *cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id);
picoquic_connection_id_t initial_cnxid;
if (cnx == NULL) {
initial_cnxid = incoming_dest_connection_id;
} else {
initial_cnxid = picoquic_get_initial_cnxid(cnx);
}
slipstream_cnxid_dns_request_buffer_t* cnxid_buffer = slipstream_dns_request_buffer_get_cnxid_buffer(
&slipstream_server_dns_request_buffer, &initial_cnxid, true);
if (cnxid_buffer == NULL) {
free(decoded_buf);
DBG_PRINTF("error getting or creating cnxid buffer", NULL);
return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE);
}
slot->cnx = cnx;
slot->query_id = query->id;
slipstream_dns_request_buffer_commit_slot_to_cnxid_buffer(&slipstream_server_dns_request_buffer, cnxid_buffer, slot);
if (is_poll_packet) {
free(decoded_buf);
@ -361,26 +288,6 @@ int slipstream_server_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l
slipstream_server_ctx_t* default_ctx = callback_ctx;
switch (cb_mode) {
case picoquic_packet_loop_after_select:
const picoquic_socket_ctxs_t* s_ctxs = (picoquic_socket_ctxs_t*)callback_arg;
uint64_t current_time = picoquic_current_time();
for (int i = 0; i < slipstream_server_dns_request_buffer.cnxid_buffers_len; ++i) {
const slipstream_cnxid_dns_request_buffer_t* cnxid_buffer = slipstream_server_dns_request_buffer.cnxid_buffers[i];
assert(cnxid_buffer != NULL);
slot_t* slot = cnxid_buffer->tail;
while (slot != NULL) {
const uint64_t age = current_time - slot->created_time;
if (age < 100000) {
break;
}
// attempt to reply before resolver retries
respond_and_free_slot(slot, s_ctxs, RCODE_NAME_ERROR);
slot = slot->cnxid_buffer_prev;
}
}
break;
case picoquic_packet_loop_wake_up:
if (callback_ctx == NULL) {
return 0;
@ -704,8 +611,6 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
#endif
picoquic_set_key_log_file_from_env(quic);
slipstream_dns_request_buffer_init(&slipstream_server_dns_request_buffer);
picoquic_packet_loop_param_t param = {0};
param.local_af = AF_INET;
param.local_port = server_port;
@ -713,7 +618,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
param.is_client = 0;
param.decode = server_decode;
param.encode = server_encode;
param.delay_max = 5000;
// param.delay_max = 5000;
picoquic_network_thread_ctx_t thread_ctx = {0};
thread_ctx.quic = quic;
@ -727,7 +632,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
default_context.thread_ctx = &thread_ctx;
signal(SIGTERM, server_sighandler);
picoquic_packet_loop_v3(&thread_ctx);
slipstream_packet_loop(&thread_ctx);
ret = thread_ctx.return_code;
/* And finish. */

248
src/slipstream_sockloop.c Normal file
View file

@ -0,0 +1,248 @@
#include "slipstream_sockloop.h"
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <sys/socket.h>
#include <sys/types.h>
#ifndef __USE_XOPEN2K
#define __USE_XOPEN2K
#endif
#ifndef __USE_POSIX
#define __USE_POSIX
#endif
#include <netdb.h>
#ifndef __APPLE__
#ifdef __LINUX__
#include <linux/prctl.h> /* Definition of PR_* constants */
#else
#endif
#endif
#include <pthread.h>
#ifndef SOCKET_TYPE
#define SOCKET_TYPE int
#endif
#ifndef INVALID_SOCKET
#define INVALID_SOCKET -1
#endif
#ifndef SOCKET_CLOSE
#define SOCKET_CLOSE(x) close(x)
#endif
#ifndef WSA_LAST_ERROR
#define WSA_LAST_ERROR(x) ((long)(x))
#endif
#include "picosocks.h"
#include "picoquic.h"
#include "picoquic_internal.h"
#include "picoquic_packet_loop.h"
#include "slipstream_slot.h"
# if defined(UDP_SEGMENT)
static int udp_gso_available = 1;
#else
static int udp_gso_available = 0;
#endif
int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_socket_ctx_t* s_ctx,
size_t send_buffer_size, size_t send_msg_size, size_t* send_msg_ptr) {
picoquic_quic_t* quic = thread_ctx->quic;
picoquic_packet_loop_param_t* param = thread_ctx->param;
const picoquic_packet_loop_cb_fn loop_callback = thread_ctx->loop_callback;
void* loop_callback_ctx = thread_ctx->loop_callback_ctx;
slot_t slots[PICOQUIC_PACKET_LOOP_RECV_MAX];
while (!thread_ctx->thread_should_close) {
if (loop_callback) {
loop_callback(quic, picoquic_packet_loop_before_select, loop_callback_ctx, s_ctx);
}
size_t nb_slots_written = 0;
while (nb_slots_written < PICOQUIC_PACKET_LOOP_RECV_MAX) {
int64_t delta_t = 0;
if (!param->is_client && nb_slots_written == 0) {
// Server mode: wait for a packet to arrive
delta_t = 10000000;
}
if (param->is_client && nb_slots_written == 0) {
const uint64_t current_time = picoquic_current_time();
const int64_t delay_max = param->delay_max == 0 ? 10000000 : param->delay_max;
delta_t = picoquic_get_next_wake_delay(quic, current_time, delay_max);
}
struct sockaddr_storage peer_addr;
struct sockaddr_storage local_addr;
int if_index_to = 0;
uint8_t received_ecn;
uint8_t buffer[1536];
int is_wake_up_event;
int socket_rank = -1;
int bytes_recv = picoquic_packet_loop_select(s_ctx, 1, &peer_addr, &local_addr, &if_index_to, &received_ecn,
buffer, sizeof(buffer), delta_t, &is_wake_up_event, thread_ctx, &socket_rank);
if (bytes_recv < 0) {
/* The interrupt error is expected if the loop is closing. */
return thread_ctx->thread_should_close ? PICOQUIC_NO_ERROR_TERMINATE_PACKET_LOOP : -1;
}
if (bytes_recv == 0 && is_wake_up_event) {
const int ret = loop_callback(quic, picoquic_packet_loop_wake_up, loop_callback_ctx, NULL);
if (ret < 0) {
return ret;
}
}
if (bytes_recv == 0) {
break;
}
slot_t* slot = param->is_client ? &slots[0] : &slots[nb_slots_written];
memset(slot, 0, sizeof(slot_t));
nb_slots_written++;
if (slot == NULL) {
return -1;
}
unsigned char* decoded;
bytes_recv = param->decode(thread_ctx->quic, slot, thread_ctx->loop_callback_ctx, s_ctx, &decoded,
(const unsigned char*)buffer, bytes_recv, &peer_addr, &local_addr);
if (bytes_recv < 0) {
DBG_PRINTF("decode() failed with error %d\n", bytes_recv);
return bytes_recv;
}
if (bytes_recv == 0) {
// poll packet
continue;
}
memcpy(buffer, decoded, bytes_recv);
free(decoded);
/* Submit the packet to the server */
uint8_t* received_buffer = buffer;
uint64_t current_time = picoquic_current_time();
picoquic_cnx_t* last_cnx = NULL;
int ret = picoquic_incoming_packet_ex(quic, received_buffer,
(size_t)bytes_recv, (struct sockaddr*)&peer_addr,
(struct sockaddr*)&local_addr, if_index_to, received_ecn,
&last_cnx, current_time);
if (ret < 0) {
return ret;
}
slot->cnx = last_cnx;
}
const uint64_t loop_time = picoquic_current_time();
size_t nb_slots_read = 0;
const size_t max_slots = param->is_client ? PICOQUIC_PACKET_LOOP_SEND_MAX : nb_slots_written;
while (nb_slots_read < max_slots) {
uint8_t send_buffer[send_buffer_size];
slot_t* slot = param->is_client ? &slots[0] : &slots[nb_slots_read];
assert(slot != NULL);
nb_slots_read++;
size_t send_length = 0;
struct sockaddr_storage peer_addr = {0};
struct sockaddr_storage local_addr = {0};
int if_index = param->dest_if;
if (slot->error == RCODE_OKAY) {
picoquic_connection_id_t log_cid;
picoquic_cnx_t* last_cnx = NULL;
int ret;
if (!param->is_client && slot->cnx) {
ret = picoquic_prepare_packet_ex(slot->cnx, loop_time,
send_buffer, send_buffer_size, &send_length,
&peer_addr, &local_addr, &if_index, send_msg_ptr);
last_cnx = slot->cnx;
}
else if (param->is_client) {
ret = picoquic_prepare_next_packet_ex(quic, loop_time,
send_buffer, send_buffer_size, &send_length,
&peer_addr, &local_addr, &if_index, &log_cid, &last_cnx,
send_msg_ptr);
}
if (ret < 0) {
return -1;
}
if (param->is_client && send_length == 0) {
break;
}
}
int sock_err = 0;
int bytes_sent;
unsigned char* encoded;
size_t segment_len = send_msg_size == 0 ? send_length : send_msg_size;
ssize_t encoded_len = param->encode(thread_ctx->quic, slot, loop_callback_ctx, &encoded,
(const unsigned char*)send_buffer, send_length, &segment_len, &peer_addr, &local_addr);
if (encoded_len <= 0) {
DBG_PRINTF("Encoding fails, ret=%d\n", encoded_len);
continue;
}
if (send_msg_size > 0) {
send_msg_size = segment_len; // new size after encoding
}
const SOCKET_TYPE send_socket = s_ctx->fd;
bytes_sent = picoquic_sendmsg(send_socket,
(struct sockaddr*)&peer_addr, (struct sockaddr*)&local_addr, if_index,
(const char*)encoded, (int)encoded_len, (int)send_msg_size, &sock_err);
free(encoded);
if (bytes_sent == 0) {
DBG_PRINTF("BYTES_SENT == 0 %d\n", bytes_sent);
return -1;
}
if (bytes_sent < 0) {
return bytes_sent;
}
}
}
return thread_ctx->return_code;
}
void* slipstream_packet_loop(picoquic_network_thread_ctx_t* thread_ctx) {
const picoquic_packet_loop_param_t* param = thread_ctx->param;
if (!param->do_not_use_gso && param->encode != NULL && !param->is_client) {
DBG_FATAL_PRINTF("%s", "GSO disabled because encoding is enabled and server mode");
}
picoquic_socket_ctx_t s_ctx = {0};
if (picoquic_packet_loop_open_sockets(param->local_port,
param->local_af, param->socket_buffer_size,
0, param->do_not_use_gso, &s_ctx) <= 0) {
thread_ctx->return_code = PICOQUIC_ERROR_UNEXPECTED_ERROR;
return NULL;
}
size_t send_buffer_size = param->socket_buffer_size;
size_t send_msg_size = 0;
size_t* send_msg_ptr = NULL;
if (udp_gso_available && !param->do_not_use_gso) {
send_buffer_size = 0xFFFF;
send_msg_ptr = &send_msg_size;
}
if (send_buffer_size == 0) {
send_buffer_size = 0xffff;
}
thread_ctx->thread_is_ready = 1;
thread_ctx->return_code = slipstream_packet_loop_(thread_ctx, &s_ctx, send_buffer_size, send_msg_size, send_msg_ptr);
thread_ctx->thread_is_ready = 0;
/* Close the sockets */
picoquic_packet_loop_close_socket(&s_ctx);
if (thread_ctx->is_threaded) {
pthread_exit((void*)&thread_ctx->return_code);
}
return (NULL);
}