Improve server polling and in-time responses

This commit is contained in:
Jop Zitman 2024-12-20 16:02:43 +08:00
parent 16c45883b8
commit c316cbe119
3 changed files with 125 additions and 44 deletions

2
extern/picoquic vendored

@ -1 +1 @@
Subproject commit afb1a46057b1e506d48d8126b4f958fbc1e9d869
Subproject commit a7b7df45d831a6d1b6ff37a0a4f8ce847a7f48c2

View file

@ -1,3 +1,4 @@
// ReSharper disable CppDFAUnreachableCode
#include <stdint.h>
#include <stdio.h>
#include <picoquic.h>
@ -7,6 +8,7 @@
#ifdef BUILD_LOGLIB
#include <autoqlog.h>
#endif
#include <assert.h>
#include <picoquic_internal.h>
#include <pthread.h>
#include <stdbool.h>
@ -24,6 +26,23 @@
#include "SPCDNS/src/dns.h"
#include "SPCDNS/src/mappings.h"
typedef struct st_slipstream_client_stream_ctx_t {
struct st_slipstream_client_stream_ctx_t* next_stream;
struct st_slipstream_client_stream_ctx_t* previous_stream;
int fd;
uint64_t stream_id;
volatile sig_atomic_t set_active;
} slipstream_client_stream_ctx_t;
typedef struct st_slipstream_client_ctx_t {
picoquic_cnx_t* cnx;
slipstream_client_stream_ctx_t* first_stream;
picoquic_network_thread_ctx_t* thread_ctx;
struct sockaddr_storage* server_addresses;
size_t server_address_count;
uint64_t last_request;
} slipstream_client_ctx_t;
char* client_domain_name = NULL;
size_t client_domain_name_len = 0;
@ -69,7 +88,10 @@ ssize_t client_encode_segment(picoquic_quic_t* quic, dns_packet_t* packet, size_
return 0;
}
ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, picoquic_socket_ctx_t* s_ctx, size_t s_ctx_len, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
assert(callback_ctx);
slipstream_client_ctx_t* client_ctx = callback_ctx;
// optimize path for single segment
if (src_buf_len <= *segment_len) {
size_t packet_len = MAX_DNS_QUERY_SIZE;
@ -82,6 +104,9 @@ ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, picoquic_socke
*dest_buf = packet;
*segment_len = packet_len;
client_ctx->last_request = picoquic_current_time();
return packet_len;
}
@ -116,12 +141,14 @@ ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, picoquic_socke
*dest_buf = packets;
*segment_len = first_packet_len;
client_ctx->last_request = picoquic_current_time();
return current_packet - packets;
}
ssize_t client_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_t s_ctx_len, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
ssize_t client_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
*dest_buf = NULL;
slipstream_client_ctx_t* client_ctx = callback_ctx;
size_t bufsize = DNS_DECODEBUF_4K * sizeof(dns_decoded_t);
dns_decoded_t decoded[DNS_DECODEBUF_4K] = {0};
@ -138,6 +165,11 @@ ssize_t client_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_
return 0;
}
if (query->rcode == RCODE_NAME_ERROR) {
// returned when the server has nothing to send
return 0;
}
if (query->rcode != RCODE_OKAY) {
DBG_PRINTF("[%d] dns record rcode not okay: %d", query->id, query->rcode);
return 0;
@ -168,7 +200,7 @@ ssize_t client_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_
return answer_txt->len;
}
const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctx, s_ctx_len, peer_addr, local_addr);
const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr);
if (send_socket == INVALID_SOCKET) {
DBG_PRINTF("[%d] no valid socket found for poll packet", query->id);
return answer_txt->len;
@ -193,7 +225,7 @@ ssize_t client_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_
}
unsigned char* encoded;
ssize_t encoded_len = client_encode(quic, cnx, s_ctx, s_ctx_len, &encoded, poll_packet_buf, poll_packet_len,
ssize_t encoded_len = client_encode(quic, cnx, callback_ctx, s_ctxs, &encoded, poll_packet_buf, poll_packet_len,
&poll_packet_len, peer_addr, local_addr);
if (encoded_len <= 0) {
DBG_PRINTF("error encoding poll packet", NULL);
@ -216,26 +248,11 @@ ssize_t client_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_
free(encoded);
}
client_ctx->last_request = picoquic_current_time();
return answer_txt->len;
}
typedef struct st_slipstream_client_stream_ctx_t {
struct st_slipstream_client_stream_ctx_t* next_stream;
struct st_slipstream_client_stream_ctx_t* previous_stream;
int fd;
uint64_t stream_id;
volatile sig_atomic_t set_active;
} slipstream_client_stream_ctx_t;
typedef struct st_slipstream_client_ctx_t {
picoquic_cnx_t* cnx;
slipstream_client_stream_ctx_t* first_stream;
picoquic_network_thread_ctx_t* thread_ctx;
struct sockaddr_storage* server_addresses;
size_t server_address_count;
} slipstream_client_ctx_t;
slipstream_client_stream_ctx_t* slipstream_client_create_stream_ctx(picoquic_cnx_t* cnx,
slipstream_client_ctx_t* client_ctx, int sock_fd) {
slipstream_client_stream_ctx_t* stream_ctx = malloc(sizeof(slipstream_client_stream_ctx_t));
@ -305,6 +322,61 @@ int slipstream_client_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l
slipstream_client_ctx_t* client_ctx = callback_ctx;
switch (cb_mode) {
case picoquic_packet_loop_before_select:
picoquic_cnx_t* cnx = client_ctx->cnx;
picoquic_path_t* path_x = cnx->path[0];
picoquic_connection_id_t outgoing_dest_connection_id = path_x->p_remote_cnxid->cnx_id;
if (outgoing_dest_connection_id.id_len == 0) {
outgoing_dest_connection_id = cnx->initial_cnxid;
}
uint64_t current_time = picoquic_current_time();
uint64_t passed = current_time - client_ctx->last_request;
if (passed < 20000) {
break;
}
uint8_t* poll_packet_buf;
size_t poll_packet_len;
int ret = slipstream_packet_create_poll(&poll_packet_buf, &poll_packet_len, outgoing_dest_connection_id);
if (ret < 0) {
DBG_PRINTF("error creating poll packet", NULL);
return 0;
}
struct sockaddr_storage *peer_addr = &cnx->path[0]->peer_addr;
struct sockaddr_storage *local_addr = &cnx->path[0]->local_addr;
// DBG_PRINTF("[current:%u][last:%u][passed:%u] POLL!", current_time, client_ctx->last_request, passed);
picoquic_socket_ctxs_t* s_ctxs = (picoquic_socket_ctxs_t*)callback_arg;
unsigned char* encoded;
ssize_t encoded_len = client_encode(quic, cnx, client_ctx, s_ctxs, &encoded, poll_packet_buf, poll_packet_len,
&poll_packet_len, NULL, NULL);
if (encoded_len <= 0) {
DBG_PRINTF("error encoding poll packet", NULL);
free(poll_packet_buf);
return 0;
}
const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr);
int sock_err = 0;
ret = picoquic_sendmsg(send_socket,
(struct sockaddr*)peer_addr, (struct sockaddr*)local_addr, 0,
(const char*)encoded, encoded_len, 0, &sock_err);
if (ret < 0) {
DBG_PRINTF("Error sending poll packet, ret=%d, sock_err=%d %s", ret, sock_err, strerror(sock_err));
free(poll_packet_buf);
free(encoded);
return 0;
}
free(poll_packet_buf);
free(encoded);
client_ctx->last_request = current_time;
case picoquic_packet_loop_wake_up:
if (callback_ctx == NULL) {
return 0;
@ -773,6 +845,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f
param.is_client = 1;
param.decode = client_decode;
param.encode = client_encode;
param.delay_max = 5000;
picoquic_network_thread_ctx_t thread_ctx = {0};
thread_ctx.quic = quic;

View file

@ -29,7 +29,7 @@ size_t server_domain_name_len = 0;
slipstream_dns_request_buffer_t slipstream_server_dns_request_buffer;
ssize_t respond_and_free_slot(slot_t* slot, const picoquic_socket_ctx_t* s_ctx, const size_t s_ctx_len, dns_rcode_t rcode) {
ssize_t respond_and_free_slot(slot_t* slot, const picoquic_socket_ctxs_t* s_ctxs, dns_rcode_t rcode) {
const dns_query_t *query = (dns_query_t *) slot->dns_decoded;
dns_query_t response = {0};
@ -55,7 +55,7 @@ ssize_t respond_and_free_slot(slot_t* slot, const picoquic_socket_ctx_t* s_ctx,
const struct sockaddr_storage *peer_addr = &slot->peer_addr;
const struct sockaddr_storage *local_addr = &slot->local_addr;
const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctx, s_ctx_len, peer_addr, local_addr);
const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr);
if (send_socket == INVALID_SOCKET) {
DBG_PRINTF("no valid socket found for poll packet", NULL);
return -1;
@ -73,7 +73,7 @@ ssize_t respond_and_free_slot(slot_t* slot, const picoquic_socket_ctx_t* s_ctx,
return 0;
}
ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, picoquic_socket_ctx_t* s_ctx, size_t s_ctx_len,
ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs,
unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_size,
struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
// we don't support segmentation in the server
@ -141,7 +141,7 @@ ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, picoquic_socke
return packet_len;
}
ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_t s_ctx_len, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) {
ssize_t server_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) {
*dest_buf = NULL;
slot_t* slot;
@ -149,7 +149,7 @@ ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_
slot = slipstream_server_dns_request_buffer.tail;
assert(slot != NULL);
assert(slot->cnxid_buffer != NULL);
respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_NAME_ERROR);
respond_and_free_slot(slot, s_ctxs, RCODE_NAME_ERROR);
}
slot = slipstream_dns_request_buffer_get_write_slot(&slipstream_server_dns_request_buffer);
@ -173,30 +173,31 @@ ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_
const dns_rcode_t rc = dns_decode(packet, &packet_len, (const dns_packet_t*) src_buf, src_buf_len);
if (rc != RCODE_OKAY) {
DBG_PRINTF("dns_decode() = (%d) %s", rc, dns_rcode_text(rc));
return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_SERVER_FAILURE);
return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE);
}
const dns_query_t *query = (dns_query_t*) packet;
if (!query->query) {
DBG_PRINTF("dns record is not a query", NULL);
return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_REFUSED);
return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED);
}
if (query->qdcount != 1) {
DBG_PRINTF("dns record should contain exactly one query", NULL);
return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_REFUSED);
return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED);
}
const dns_question_t *question = &query->questions[0];
if (question->type != RR_TXT) {
DBG_PRINTF("query type is not TXT", NULL);
return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_REFUSED);
// resolvers send anything for pinging, so we only respond to TXT queries
// DBG_PRINTF("query type is not TXT", NULL);
return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED);
}
const ssize_t data_len = strlen(question->name) - server_domain_name_len - 1 - 1;
if (data_len <= 0) {
DBG_PRINTF("subdomain is empty", NULL);
return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_REFUSED);
return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED);
}
// copy the subdomain from name to a new buffer
@ -210,7 +211,7 @@ ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_
if (decoded_len == (size_t) -1) {
free(decoded_buf);
DBG_PRINTF("error decoding base32: %lu", decoded_len);
return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_SERVER_FAILURE);
return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE);
}
picoquic_connection_id_t incoming_src_connection_id = {0};
@ -222,7 +223,7 @@ ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_
if (ret != 0) {
free(decoded_buf);
DBG_PRINTF("error parsing slipstream packet: %d", ret);
return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_SERVER_FAILURE);
return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE);
}
picoquic_cnx_t *cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id);
@ -238,7 +239,7 @@ ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_
if (cnxid_buffer == NULL) {
free(decoded_buf);
DBG_PRINTF("error getting or creating cnxid buffer", NULL);
return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_SERVER_FAILURE);
return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE);
}
slot->query_id = query->id;
@ -330,6 +331,14 @@ static void slipstream_server_free_context(slipstream_server_ctx_t* server_ctx)
slipstream_server_free_stream_context(server_ctx, stream_ctx);
}
if (server_ctx->prev_ctx) {
server_ctx->prev_ctx->next_ctx = server_ctx->next_ctx;
}
if (server_ctx->next_ctx) {
server_ctx->next_ctx->prev_ctx = server_ctx->prev_ctx;
}
/* release the memory */
free(server_ctx);
}
@ -349,7 +358,7 @@ void slipstream_server_mark_active_pass(slipstream_server_ctx_t* server_ctx) {
int slipstream_server_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_loop_cb_enum cb_mode,
void* callback_ctx, void* callback_arg) {
slipstream_server_ctx_t* server_ctx = callback_ctx;
slipstream_server_ctx_t* default_ctx = callback_ctx;
switch (cb_mode) {
case picoquic_packet_loop_after_select:
@ -362,14 +371,12 @@ int slipstream_server_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l
slot_t* slot = cnxid_buffer->tail;
while (slot != NULL) {
const uint64_t age = current_time - slot->created_time;
if (age < 10000) {
DBG_PRINTF("[%d][age:%d] found young slot", slot->query_id, age);
if (age < 100000) {
break;
}
// attempt to reply before resolver retries
DBG_PRINTF("[%d][age:%d] freeing old slot", slot->query_id, age);
respond_and_free_slot(slot, s_ctxs->s_ctx, s_ctxs->len, RCODE_NAME_ERROR);
respond_and_free_slot(slot, s_ctxs, RCODE_NAME_ERROR);
slot = slot->cnxid_buffer_prev;
}
}
@ -379,10 +386,11 @@ int slipstream_server_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l
return 0;
}
while (server_ctx->next_ctx != NULL) {
/* skip default ctx */
server_ctx = server_ctx->next_ctx;
/* skip default ctx */
slipstream_server_ctx_t* server_ctx = default_ctx->next_ctx;
while (server_ctx != NULL) {
slipstream_server_mark_active_pass(server_ctx);
server_ctx = server_ctx->next_ctx;
}
break;
@ -705,7 +713,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
param.is_client = 0;
param.decode = server_decode;
param.encode = server_encode;
// param.delay_max = 1;
param.delay_max = 5000;
picoquic_network_thread_ctx_t thread_ctx = {0};
thread_ctx.quic = quic;