diff --git a/extern/picoquic b/extern/picoquic index afb1a46..a7b7df4 160000 --- a/extern/picoquic +++ b/extern/picoquic @@ -1 +1 @@ -Subproject commit afb1a46057b1e506d48d8126b4f958fbc1e9d869 +Subproject commit a7b7df45d831a6d1b6ff37a0a4f8ce847a7f48c2 diff --git a/src/slipstream_client.c b/src/slipstream_client.c index 340e872..1727eb3 100644 --- a/src/slipstream_client.c +++ b/src/slipstream_client.c @@ -1,3 +1,4 @@ +// ReSharper disable CppDFAUnreachableCode #include #include #include @@ -7,6 +8,7 @@ #ifdef BUILD_LOGLIB #include #endif +#include #include #include #include @@ -24,6 +26,23 @@ #include "SPCDNS/src/dns.h" #include "SPCDNS/src/mappings.h" +typedef struct st_slipstream_client_stream_ctx_t { + struct st_slipstream_client_stream_ctx_t* next_stream; + struct st_slipstream_client_stream_ctx_t* previous_stream; + int fd; + uint64_t stream_id; + volatile sig_atomic_t set_active; +} slipstream_client_stream_ctx_t; + +typedef struct st_slipstream_client_ctx_t { + picoquic_cnx_t* cnx; + slipstream_client_stream_ctx_t* first_stream; + picoquic_network_thread_ctx_t* thread_ctx; + struct sockaddr_storage* server_addresses; + size_t server_address_count; + uint64_t last_request; +} slipstream_client_ctx_t; + char* client_domain_name = NULL; size_t client_domain_name_len = 0; @@ -69,7 +88,10 @@ ssize_t client_encode_segment(picoquic_quic_t* quic, dns_packet_t* packet, size_ return 0; } -ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, picoquic_socket_ctx_t* s_ctx, size_t s_ctx_len, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { +ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { + assert(callback_ctx); + slipstream_client_ctx_t* client_ctx = callback_ctx; + // optimize path for single segment if (src_buf_len <= *segment_len) { size_t packet_len = MAX_DNS_QUERY_SIZE; @@ -82,6 +104,9 @@ ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, picoquic_socke *dest_buf = packet; *segment_len = packet_len; + + client_ctx->last_request = picoquic_current_time(); + return packet_len; } @@ -116,12 +141,14 @@ ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, picoquic_socke *dest_buf = packets; *segment_len = first_packet_len; + client_ctx->last_request = picoquic_current_time(); return current_packet - packets; } -ssize_t client_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_t s_ctx_len, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { +ssize_t client_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { *dest_buf = NULL; + slipstream_client_ctx_t* client_ctx = callback_ctx; size_t bufsize = DNS_DECODEBUF_4K * sizeof(dns_decoded_t); dns_decoded_t decoded[DNS_DECODEBUF_4K] = {0}; @@ -138,6 +165,11 @@ ssize_t client_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_ return 0; } + if (query->rcode == RCODE_NAME_ERROR) { + // returned when the server has nothing to send + return 0; + } + if (query->rcode != RCODE_OKAY) { DBG_PRINTF("[%d] dns record rcode not okay: %d", query->id, query->rcode); return 0; @@ -168,7 +200,7 @@ ssize_t client_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_ return answer_txt->len; } - const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctx, s_ctx_len, peer_addr, local_addr); + const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr); if (send_socket == INVALID_SOCKET) { DBG_PRINTF("[%d] no valid socket found for poll packet", query->id); return answer_txt->len; @@ -193,7 +225,7 @@ ssize_t client_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_ } unsigned char* encoded; - ssize_t encoded_len = client_encode(quic, cnx, s_ctx, s_ctx_len, &encoded, poll_packet_buf, poll_packet_len, + ssize_t encoded_len = client_encode(quic, cnx, callback_ctx, s_ctxs, &encoded, poll_packet_buf, poll_packet_len, &poll_packet_len, peer_addr, local_addr); if (encoded_len <= 0) { DBG_PRINTF("error encoding poll packet", NULL); @@ -216,26 +248,11 @@ ssize_t client_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_ free(encoded); } + client_ctx->last_request = picoquic_current_time(); return answer_txt->len; } -typedef struct st_slipstream_client_stream_ctx_t { - struct st_slipstream_client_stream_ctx_t* next_stream; - struct st_slipstream_client_stream_ctx_t* previous_stream; - int fd; - uint64_t stream_id; - volatile sig_atomic_t set_active; -} slipstream_client_stream_ctx_t; - -typedef struct st_slipstream_client_ctx_t { - picoquic_cnx_t* cnx; - slipstream_client_stream_ctx_t* first_stream; - picoquic_network_thread_ctx_t* thread_ctx; - struct sockaddr_storage* server_addresses; - size_t server_address_count; -} slipstream_client_ctx_t; - slipstream_client_stream_ctx_t* slipstream_client_create_stream_ctx(picoquic_cnx_t* cnx, slipstream_client_ctx_t* client_ctx, int sock_fd) { slipstream_client_stream_ctx_t* stream_ctx = malloc(sizeof(slipstream_client_stream_ctx_t)); @@ -305,6 +322,61 @@ int slipstream_client_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l slipstream_client_ctx_t* client_ctx = callback_ctx; switch (cb_mode) { + case picoquic_packet_loop_before_select: + picoquic_cnx_t* cnx = client_ctx->cnx; + picoquic_path_t* path_x = cnx->path[0]; + picoquic_connection_id_t outgoing_dest_connection_id = path_x->p_remote_cnxid->cnx_id; + if (outgoing_dest_connection_id.id_len == 0) { + outgoing_dest_connection_id = cnx->initial_cnxid; + } + + uint64_t current_time = picoquic_current_time(); + uint64_t passed = current_time - client_ctx->last_request; + if (passed < 20000) { + break; + } + + + uint8_t* poll_packet_buf; + size_t poll_packet_len; + int ret = slipstream_packet_create_poll(&poll_packet_buf, &poll_packet_len, outgoing_dest_connection_id); + if (ret < 0) { + DBG_PRINTF("error creating poll packet", NULL); + return 0; + } + + struct sockaddr_storage *peer_addr = &cnx->path[0]->peer_addr; + struct sockaddr_storage *local_addr = &cnx->path[0]->local_addr; + + // DBG_PRINTF("[current:%u][last:%u][passed:%u] POLL!", current_time, client_ctx->last_request, passed); + + picoquic_socket_ctxs_t* s_ctxs = (picoquic_socket_ctxs_t*)callback_arg; + unsigned char* encoded; + ssize_t encoded_len = client_encode(quic, cnx, client_ctx, s_ctxs, &encoded, poll_packet_buf, poll_packet_len, + &poll_packet_len, NULL, NULL); + if (encoded_len <= 0) { + DBG_PRINTF("error encoding poll packet", NULL); + free(poll_packet_buf); + return 0; + } + + const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr); + + int sock_err = 0; + ret = picoquic_sendmsg(send_socket, + (struct sockaddr*)peer_addr, (struct sockaddr*)local_addr, 0, + (const char*)encoded, encoded_len, 0, &sock_err); + if (ret < 0) { + DBG_PRINTF("Error sending poll packet, ret=%d, sock_err=%d %s", ret, sock_err, strerror(sock_err)); + free(poll_packet_buf); + free(encoded); + return 0; + } + + free(poll_packet_buf); + free(encoded); + + client_ctx->last_request = current_time; case picoquic_packet_loop_wake_up: if (callback_ctx == NULL) { return 0; @@ -773,6 +845,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f param.is_client = 1; param.decode = client_decode; param.encode = client_encode; + param.delay_max = 5000; picoquic_network_thread_ctx_t thread_ctx = {0}; thread_ctx.quic = quic; diff --git a/src/slipstream_server.c b/src/slipstream_server.c index 44c9f81..13365ea 100644 --- a/src/slipstream_server.c +++ b/src/slipstream_server.c @@ -29,7 +29,7 @@ size_t server_domain_name_len = 0; slipstream_dns_request_buffer_t slipstream_server_dns_request_buffer; -ssize_t respond_and_free_slot(slot_t* slot, const picoquic_socket_ctx_t* s_ctx, const size_t s_ctx_len, dns_rcode_t rcode) { +ssize_t respond_and_free_slot(slot_t* slot, const picoquic_socket_ctxs_t* s_ctxs, dns_rcode_t rcode) { const dns_query_t *query = (dns_query_t *) slot->dns_decoded; dns_query_t response = {0}; @@ -55,7 +55,7 @@ ssize_t respond_and_free_slot(slot_t* slot, const picoquic_socket_ctx_t* s_ctx, const struct sockaddr_storage *peer_addr = &slot->peer_addr; const struct sockaddr_storage *local_addr = &slot->local_addr; - const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctx, s_ctx_len, peer_addr, local_addr); + const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr); if (send_socket == INVALID_SOCKET) { DBG_PRINTF("no valid socket found for poll packet", NULL); return -1; @@ -73,7 +73,7 @@ ssize_t respond_and_free_slot(slot_t* slot, const picoquic_socket_ctx_t* s_ctx, return 0; } -ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, picoquic_socket_ctx_t* s_ctx, size_t s_ctx_len, +ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_size, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { // we don't support segmentation in the server @@ -141,7 +141,7 @@ ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, picoquic_socke return packet_len; } -ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_t s_ctx_len, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) { +ssize_t server_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) { *dest_buf = NULL; slot_t* slot; @@ -149,7 +149,7 @@ ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_ slot = slipstream_server_dns_request_buffer.tail; assert(slot != NULL); assert(slot->cnxid_buffer != NULL); - respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_NAME_ERROR); + respond_and_free_slot(slot, s_ctxs, RCODE_NAME_ERROR); } slot = slipstream_dns_request_buffer_get_write_slot(&slipstream_server_dns_request_buffer); @@ -173,30 +173,31 @@ ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_ const dns_rcode_t rc = dns_decode(packet, &packet_len, (const dns_packet_t*) src_buf, src_buf_len); if (rc != RCODE_OKAY) { DBG_PRINTF("dns_decode() = (%d) %s", rc, dns_rcode_text(rc)); - return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_SERVER_FAILURE); + return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE); } const dns_query_t *query = (dns_query_t*) packet; if (!query->query) { DBG_PRINTF("dns record is not a query", NULL); - return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_REFUSED); + return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED); } if (query->qdcount != 1) { DBG_PRINTF("dns record should contain exactly one query", NULL); - return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_REFUSED); + return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED); } const dns_question_t *question = &query->questions[0]; if (question->type != RR_TXT) { - DBG_PRINTF("query type is not TXT", NULL); - return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_REFUSED); + // resolvers send anything for pinging, so we only respond to TXT queries + // DBG_PRINTF("query type is not TXT", NULL); + return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED); } const ssize_t data_len = strlen(question->name) - server_domain_name_len - 1 - 1; if (data_len <= 0) { DBG_PRINTF("subdomain is empty", NULL); - return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_REFUSED); + return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED); } // copy the subdomain from name to a new buffer @@ -210,7 +211,7 @@ ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_ if (decoded_len == (size_t) -1) { free(decoded_buf); DBG_PRINTF("error decoding base32: %lu", decoded_len); - return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_SERVER_FAILURE); + return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE); } picoquic_connection_id_t incoming_src_connection_id = {0}; @@ -222,7 +223,7 @@ ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_ if (ret != 0) { free(decoded_buf); DBG_PRINTF("error parsing slipstream packet: %d", ret); - return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_SERVER_FAILURE); + return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE); } picoquic_cnx_t *cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id); @@ -238,7 +239,7 @@ ssize_t server_decode(picoquic_quic_t* quic, picoquic_socket_ctx_t* s_ctx, size_ if (cnxid_buffer == NULL) { free(decoded_buf); DBG_PRINTF("error getting or creating cnxid buffer", NULL); - return respond_and_free_slot(slot, s_ctx, s_ctx_len, RCODE_SERVER_FAILURE); + return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE); } slot->query_id = query->id; @@ -330,6 +331,14 @@ static void slipstream_server_free_context(slipstream_server_ctx_t* server_ctx) slipstream_server_free_stream_context(server_ctx, stream_ctx); } + if (server_ctx->prev_ctx) { + server_ctx->prev_ctx->next_ctx = server_ctx->next_ctx; + } + + if (server_ctx->next_ctx) { + server_ctx->next_ctx->prev_ctx = server_ctx->prev_ctx; + } + /* release the memory */ free(server_ctx); } @@ -349,7 +358,7 @@ void slipstream_server_mark_active_pass(slipstream_server_ctx_t* server_ctx) { int slipstream_server_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_loop_cb_enum cb_mode, void* callback_ctx, void* callback_arg) { - slipstream_server_ctx_t* server_ctx = callback_ctx; + slipstream_server_ctx_t* default_ctx = callback_ctx; switch (cb_mode) { case picoquic_packet_loop_after_select: @@ -362,14 +371,12 @@ int slipstream_server_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l slot_t* slot = cnxid_buffer->tail; while (slot != NULL) { const uint64_t age = current_time - slot->created_time; - if (age < 10000) { - DBG_PRINTF("[%d][age:%d] found young slot", slot->query_id, age); + if (age < 100000) { break; } // attempt to reply before resolver retries - DBG_PRINTF("[%d][age:%d] freeing old slot", slot->query_id, age); - respond_and_free_slot(slot, s_ctxs->s_ctx, s_ctxs->len, RCODE_NAME_ERROR); + respond_and_free_slot(slot, s_ctxs, RCODE_NAME_ERROR); slot = slot->cnxid_buffer_prev; } } @@ -379,10 +386,11 @@ int slipstream_server_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l return 0; } - while (server_ctx->next_ctx != NULL) { - /* skip default ctx */ - server_ctx = server_ctx->next_ctx; + /* skip default ctx */ + slipstream_server_ctx_t* server_ctx = default_ctx->next_ctx; + while (server_ctx != NULL) { slipstream_server_mark_active_pass(server_ctx); + server_ctx = server_ctx->next_ctx; } break; @@ -705,7 +713,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c param.is_client = 0; param.decode = server_decode; param.encode = server_encode; - // param.delay_max = 1; + param.delay_max = 5000; picoquic_network_thread_ctx_t thread_ctx = {0}; thread_ctx.quic = quic;