diff --git a/CMakeLists.txt b/CMakeLists.txt index 01d85cf..651e13f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,17 +38,18 @@ add_subdirectory(extern/picoquic) add_executable(slipstream src/slipstream.c src/slipstream_client.c - src/slipstream_server.c - src/slipstream_dns_request_buffer.c src/slipstream_inline_dots.c src/slipstream_packet.c src/slipstream_resolver_addresses.c + src/slipstream_server.c + src/slipstream_sockloop.c src/slipstream_utils.c include/slipstream.h - include/slipstream_dns_request_buffer.h include/slipstream_inline_dots.h include/slipstream_packet.h include/slipstream_resolver_addresses.h + include/slipstream_slot.h + include/slipstream_sockloop.h include/slipstream_utils.h extern/lua-resty-base-encoding/b32_data.h diff --git a/extern/picoquic b/extern/picoquic index a7b7df4..c00a6f6 160000 --- a/extern/picoquic +++ b/extern/picoquic @@ -1 +1 @@ -Subproject commit a7b7df45d831a6d1b6ff37a0a4f8ce847a7f48c2 +Subproject commit c00a6f69032a255ee15892a0fec3c8e8cc1da4a4 diff --git a/include/slipstream.h b/include/slipstream.h index 1237d91..5d690be 100644 --- a/include/slipstream.h +++ b/include/slipstream.h @@ -19,7 +19,6 @@ extern "C" { #define SLIPSTREAM_QLOG_DIR "./qlog"; - int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_filename, const char* domain_name, const char* cc_algo_id); diff --git a/include/slipstream_dns_request_buffer.h b/include/slipstream_dns_request_buffer.h deleted file mode 100644 index 5c9f5a0..0000000 --- a/include/slipstream_dns_request_buffer.h +++ /dev/null @@ -1,62 +0,0 @@ -#ifndef SLIPSTREAM_DNS_REQUEST_BUFFER -#define SLIPSTREAM_DNS_REQUEST_BUFFER - -#include -#include -#include "SPCDNS/src/dns.h" - -#define GLOBAL_BUFFER_SIZE 32 - -typedef struct st_slipstream_cnxid_dns_request_buffer_t slipstream_cnxid_dns_request_buffer_t; - -typedef struct st_element_t { - dns_decoded_t dns_decoded[DNS_DECODEBUF_4K]; - struct sockaddr_storage peer_addr; - struct sockaddr_storage local_addr; - struct st_element_t* buffer_prev; - struct st_element_t* buffer_next; - struct st_element_t* cnxid_buffer_prev; - struct st_element_t* cnxid_buffer_next; - slipstream_cnxid_dns_request_buffer_t* cnxid_buffer; - uint64_t created_time; - int query_id; -} slot_t; - -typedef struct st_slipstream_cnxid_dns_request_buffer_t { - slot_t* head; - slot_t* tail; -} slipstream_cnxid_dns_request_buffer_t; - -typedef struct { - slot_t slots[GLOBAL_BUFFER_SIZE]; - slot_t* head; - slot_t* tail; - slot_t* free; - picohash_table* cnxid_to_cnxid_buffer; - slipstream_cnxid_dns_request_buffer_t** cnxid_buffers; - size_t cnxid_buffers_len; -} slipstream_dns_request_buffer_t; - -typedef struct st_cnxid_to_cnxid_buffer_t { - picoquic_connection_id_t cnx_id; - slipstream_cnxid_dns_request_buffer_t* cnxid_buffer; -} cnxid_to_cnxid_buffer_t; - - -void slipstream_dns_request_buffer_init(slipstream_dns_request_buffer_t* buffer); - -slipstream_cnxid_dns_request_buffer_t* slipstream_dns_request_buffer_get_cnxid_buffer( - slipstream_dns_request_buffer_t* buffer, picoquic_connection_id_t* initial_cnxid, bool create); - -void slipstream_dns_request_buffer_free_slot(slipstream_dns_request_buffer_t* buffer, slot_t* slot); - -slot_t* slipstream_dns_request_buffer_get_write_slot(slipstream_dns_request_buffer_t* buffer); - -void slipstream_dns_request_buffer_commit_slot_to_cnxid_buffer(slipstream_dns_request_buffer_t* buffer, - slipstream_cnxid_dns_request_buffer_t* cnxid_buffer, - slot_t* slot); - -slot_t* slipstream_dns_request_buffer_get_read_slot(slipstream_dns_request_buffer_t* buffer, - slipstream_cnxid_dns_request_buffer_t* cnxid_buffer); - -#endif //SLIPSTREAM_DNS_REQUEST_BUFFER diff --git a/include/slipstream_slot.h b/include/slipstream_slot.h new file mode 100644 index 0000000..4fc2d95 --- /dev/null +++ b/include/slipstream_slot.h @@ -0,0 +1,17 @@ +#ifndef SLIPSTREAM_SLOT +#define SLIPSTREAM_SLOT + +#include "SPCDNS/src/dns.h" +#include "picoquic.h" + +typedef struct st_slot_t { + dns_decoded_t dns_decoded[DNS_DECODEBUF_4K]; + dns_rcode_t error; + struct sockaddr_storage peer_addr; + struct sockaddr_storage local_addr; + picoquic_cnx_t* cnx; + uint64_t created_time; + int query_id; +} slot_t; + +#endif // SLIPSTREAM_SLOT diff --git a/include/slipstream_sockloop.h b/include/slipstream_sockloop.h new file mode 100644 index 0000000..3ca9aba --- /dev/null +++ b/include/slipstream_sockloop.h @@ -0,0 +1,8 @@ +#ifndef SLIPSTREAM_SOCKLOOP_H +#define SLIPSTREAM_SOCKLOOP_H + +#include "picoquic_packet_loop.h" + +void* slipstream_packet_loop(picoquic_network_thread_ctx_t* thread_ctx); + +#endif //SLIPSTREAM_SOCKLOOP_H \ No newline at end of file diff --git a/src/slipstream_client.c b/src/slipstream_client.c index c0da91d..ed58eff 100644 --- a/src/slipstream_client.c +++ b/src/slipstream_client.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -88,7 +89,7 @@ ssize_t client_encode_segment(picoquic_quic_t* quic, dns_packet_t* packet, size_ return 0; } -ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { +ssize_t client_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { assert(callback_ctx); slipstream_client_ctx_t* client_ctx = callback_ctx; @@ -146,7 +147,7 @@ ssize_t client_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback return current_packet - packets; } -ssize_t client_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { +ssize_t client_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, picoquic_socket_ctx_t* s_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { *dest_buf = NULL; slipstream_client_ctx_t* client_ctx = callback_ctx; @@ -176,7 +177,7 @@ ssize_t client_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket } if (query->ancount != 1) { - DBG_PRINTF("[%d] dns record should contain exactly one answer", query->id); + // DBG_PRINTF("[%d] dns record should contain exactly one answer", query->id); return 0; } @@ -200,11 +201,6 @@ ssize_t client_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket return answer_txt->len; } - const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr); - if (send_socket == INVALID_SOCKET) { - DBG_PRINTF("[%d] no valid socket found for poll packet", query->id); - return answer_txt->len; - } // get active destination connection id on this ctx picoquic_cnx_t* cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id); @@ -225,14 +221,14 @@ ssize_t client_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket } unsigned char* encoded; - ssize_t encoded_len = client_encode(quic, cnx, callback_ctx, s_ctxs, &encoded, poll_packet_buf, poll_packet_len, - &poll_packet_len, peer_addr, local_addr); + ssize_t encoded_len = client_encode(quic, slot_p, callback_ctx, &encoded, poll_packet_buf, poll_packet_len, &poll_packet_len, peer_addr, local_addr); if (encoded_len <= 0) { DBG_PRINTF("error encoding poll packet", NULL); free(poll_packet_buf); return answer_txt->len; } + const SOCKET_TYPE send_socket = s_ctx->fd; int sock_err = 0; ret = picoquic_sendmsg(send_socket, (struct sockaddr*)peer_addr, (struct sockaddr*)local_addr, 0, @@ -300,6 +296,8 @@ static void slipstream_client_free_context(slipstream_client_ctx_t* client_ctx) slipstream_client_free_stream_ctx(client_ctx, stream_ctx); } + free(client_ctx->server_addresses); + /* release the memory */ free(client_ctx); } @@ -348,19 +346,16 @@ int slipstream_client_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l struct sockaddr_storage *peer_addr = &cnx->path[0]->peer_addr; struct sockaddr_storage *local_addr = &cnx->path[0]->local_addr; - // DBG_PRINTF("[current:%u][last:%u][passed:%u] POLL!", current_time, client_ctx->last_request, passed); - - picoquic_socket_ctxs_t* s_ctxs = (picoquic_socket_ctxs_t*)callback_arg; + picoquic_socket_ctx_t* s_ctx = (picoquic_socket_ctx_t*)callback_arg; unsigned char* encoded; - ssize_t encoded_len = client_encode(quic, cnx, client_ctx, s_ctxs, &encoded, poll_packet_buf, poll_packet_len, - &poll_packet_len, NULL, NULL); + ssize_t encoded_len = client_encode(quic, NULL, client_ctx, &encoded, poll_packet_buf, poll_packet_len, &poll_packet_len, NULL, NULL); if (encoded_len <= 0) { DBG_PRINTF("error encoding poll packet", NULL); free(poll_packet_buf); return 0; } - const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr); + const SOCKET_TYPE send_socket = s_ctx->fd; int sock_err = 0; ret = picoquic_sendmsg(send_socket, @@ -711,7 +706,7 @@ static int slipstream_connect(struct sockaddr_storage* server_address, } // 400ms - picoquic_enable_keep_alive(*cnx, 400000); + // picoquic_enable_keep_alive(*cnx, 400000); /* Document connection in client's context */ client_ctx->cnx = *cnx; @@ -871,7 +866,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f } signal(SIGTERM, client_sighandler); - picoquic_packet_loop_v3(&thread_ctx); + slipstream_packet_loop(&thread_ctx); ret = thread_ctx.return_code; /* And finish. */ diff --git a/src/slipstream_dns_request_buffer.c b/src/slipstream_dns_request_buffer.c deleted file mode 100644 index 54eee16..0000000 --- a/src/slipstream_dns_request_buffer.c +++ /dev/null @@ -1,189 +0,0 @@ -#include -#include -#include -#include - -#include "picoquic.h" -#include "picoquic_utils.h" -#include "slipstream_utils.h" -#include "slipstream_dns_request_buffer.h" - - -static uint64_t cnxid_to_cnxid_buffer_hash(const void* key) { - const cnxid_to_cnxid_buffer_t* l_cid = key; - return picoquic_connection_id_hash(&l_cid->cnx_id); -} - -static int cnxid_to_cnxid_buffer_compare(const void* key1, const void* key2) { - const cnxid_to_cnxid_buffer_t* l_cid1 = key1; - const cnxid_to_cnxid_buffer_t* l_cid2 = key2; - - return picoquic_compare_connection_id(&l_cid1->cnx_id, &l_cid2->cnx_id); -} - -void slipstream_dns_request_buffer_init(slipstream_dns_request_buffer_t* buffer) { - memset(buffer, 0, sizeof(slipstream_dns_request_buffer_t)); - - buffer->head = NULL; - buffer->tail = NULL; - - for (int i = 0; i < GLOBAL_BUFFER_SIZE; i++) { - slot_t* element = &buffer->slots[i]; - element->buffer_next = buffer->free; - element->buffer_prev = NULL; - buffer->free = element; - } - - buffer->cnxid_to_cnxid_buffer = picohash_create(32, cnxid_to_cnxid_buffer_hash, cnxid_to_cnxid_buffer_compare); -} - -slipstream_cnxid_dns_request_buffer_t* slipstream_dns_request_buffer_get_cnxid_buffer( - slipstream_dns_request_buffer_t* buffer, picoquic_connection_id_t* initial_cnxid, bool create) { - cnxid_to_cnxid_buffer_t key = { - .cnx_id = *initial_cnxid, - .cnxid_buffer = NULL - }; - const picohash_item* item = picohash_retrieve(buffer->cnxid_to_cnxid_buffer, &key); - if (item != NULL) { - return ((cnxid_to_cnxid_buffer_t*)item->key)->cnxid_buffer; - } - if (!create) { - return NULL; - } - - char* initial_cnxid_str = picoquic_connection_id_to_string(initial_cnxid); - DBG_PRINTF("creating new hash key for %s\n", initial_cnxid_str); - free(initial_cnxid_str); - - cnxid_to_cnxid_buffer_t* new_key = malloc(sizeof(cnxid_to_cnxid_buffer_t)); - memcpy(&new_key->cnx_id, initial_cnxid, sizeof(picoquic_connection_id_t)); - - new_key->cnxid_buffer = malloc(sizeof(slipstream_cnxid_dns_request_buffer_t)); - memset(new_key->cnxid_buffer, 0, sizeof(slipstream_cnxid_dns_request_buffer_t)); - if (new_key->cnxid_buffer == NULL) { - fprintf(stderr, "error allocating memory for cnx buffer\n"); - return NULL; - } - - if (picohash_insert(buffer->cnxid_to_cnxid_buffer, new_key) < 0) { - free(new_key->cnxid_buffer); - fprintf(stderr, "error adding a cnx buffer for a new cnx id\n"); - return NULL; - } - - buffer->cnxid_buffers_len++; - slipstream_cnxid_dns_request_buffer_t** cnxid_buffers = realloc(buffer->cnxid_buffers, - buffer->cnxid_buffers_len * sizeof(slipstream_cnxid_dns_request_buffer_t*)); - if (cnxid_buffers == NULL) { - return NULL; - } - buffer->cnxid_buffers = cnxid_buffers; - buffer->cnxid_buffers[buffer->cnxid_buffers_len - 1] = new_key->cnxid_buffer; - - return new_key->cnxid_buffer; -} - - -void slipstream_dns_request_buffer_free_slot(slipstream_dns_request_buffer_t* buffer, slot_t* slot) { - slipstream_cnxid_dns_request_buffer_t* cnxid_buffer = slot->cnxid_buffer; - if (cnxid_buffer != NULL) { - if (slot->cnxid_buffer_prev != NULL) { - slot->cnxid_buffer_prev->cnxid_buffer_next = slot->cnxid_buffer_next; - } - - if (slot->cnxid_buffer_next != NULL) { - slot->cnxid_buffer_next->cnxid_buffer_prev = slot->cnxid_buffer_prev; - } - - if (cnxid_buffer->head == slot) { - cnxid_buffer->head = slot->cnxid_buffer_next; - } - - if (cnxid_buffer->tail == slot) { - cnxid_buffer->tail = slot->cnxid_buffer_prev; - } - - slot->cnxid_buffer = NULL; - } - - if (slot->buffer_prev != NULL) { - slot->buffer_prev->buffer_next = slot->buffer_next; - } - - if (slot->buffer_next != NULL) { - slot->buffer_next->buffer_prev = slot->buffer_prev; - } - - if (buffer->head == slot) { - buffer->head = slot->buffer_next; - } - - if (buffer->tail == slot) { - buffer->tail = slot->buffer_prev; - } - - if (buffer->free) { - buffer->free->buffer_prev = slot; - } - slot->buffer_next = buffer->free; - buffer->free = slot; - slot->query_id = 0; -} - -slot_t* slipstream_dns_request_buffer_get_write_slot(slipstream_dns_request_buffer_t* buffer) { - if (!buffer->free) { - return NULL; - } - - // Get the first free element - slot_t* slot = buffer->free; - buffer->free = slot->buffer_next; - - // Add the element to the head of the global buffer - slot->buffer_next = buffer->head; - slot->buffer_prev = NULL; - buffer->head = slot; - if (slot->buffer_next != NULL) { - slot->buffer_next->buffer_prev = slot; - } - - // If the tail is NULL (first element), set it to the new element - if (buffer->tail == NULL) { - buffer->tail = slot; - } - - return slot; -} - -// TODO: what happens if we don't commit -void slipstream_dns_request_buffer_commit_slot_to_cnxid_buffer(slipstream_dns_request_buffer_t* buffer, - slipstream_cnxid_dns_request_buffer_t* cnxid_buffer, - slot_t* slot) { - slot->cnxid_buffer = cnxid_buffer; - - // Add this slot to a specific cnxid buffer - slot->cnxid_buffer_next = cnxid_buffer->head; - slot->cnxid_buffer_prev = NULL; - cnxid_buffer->head = slot; - if (slot->cnxid_buffer_next != NULL) { - slot->cnxid_buffer_next->cnxid_buffer_prev = slot; - } - - // If the tail is NULL (first element), set it to the new element - if (cnxid_buffer->tail == NULL) { - cnxid_buffer->tail = slot; - } -} - -slot_t* slipstream_dns_request_buffer_get_read_slot(slipstream_dns_request_buffer_t* buffer, - slipstream_cnxid_dns_request_buffer_t* cnxid_buffer) { - // Get the last element from the cnxid buffer - slot_t* slot = cnxid_buffer->tail; - if (!slot) { - return NULL; - } - - return slot; -} - -// TODO: free up cnxid_buffer diff --git a/src/slipstream_server.c b/src/slipstream_server.c index 0eb3a81..4e26ae4 100644 --- a/src/slipstream_server.c +++ b/src/slipstream_server.c @@ -13,13 +13,14 @@ #include #include #include +#include #include "lua-resty-base-encoding-base32.h" #include "picoquic_config.h" #include "slipstream.h" #include "slipstream_inline_dots.h" #include "slipstream_packet.h" -#include "slipstream_dns_request_buffer.h" +#include "slipstream_slot.h" #include "slipstream_utils.h" #include "SPCDNS/src/dns.h" #include "SPCDNS/src/mappings.h" @@ -27,82 +28,13 @@ char* server_domain_name = NULL; size_t server_domain_name_len = 0; -slipstream_dns_request_buffer_t slipstream_server_dns_request_buffer; - -ssize_t respond_and_free_slot(slot_t* slot, const picoquic_socket_ctxs_t* s_ctxs, dns_rcode_t rcode) { - const dns_query_t *query = (dns_query_t *) slot->dns_decoded; - - dns_query_t response = {0}; - response.id = query->id; - response.query = false; - response.opcode = query->opcode; - response.aa = true; - response.rd = query->rd; - response.cd = query->cd; - response.rcode = rcode; - response.qdcount = query->qdcount; - response.questions = query->questions; - - dns_packet_t packet[DNS_BUFFER_UDP_MAX]; - size_t packet_len = MAX_UDP_PACKET_SIZE; - dns_rcode_t rc = dns_encode(packet, &packet_len, &response); - // slot not used anymore, free it - slipstream_dns_request_buffer_free_slot(&slipstream_server_dns_request_buffer, slot); - if (rc != RCODE_OKAY) { - DBG_PRINTF("dns_encode() = (%d) %s", rc, dns_rcode_text(rc)); - return -1; - } - - const struct sockaddr_storage *peer_addr = &slot->peer_addr; - const struct sockaddr_storage *local_addr = &slot->local_addr; - const SOCKET_TYPE send_socket = picoquic_socket_get_send_socket(s_ctxs, peer_addr, local_addr); - if (send_socket == INVALID_SOCKET) { - DBG_PRINTF("no valid socket found for poll packet", NULL); - return -1; - } - - int sock_err = 0; - int ret = picoquic_sendmsg(send_socket, - (struct sockaddr*)peer_addr, (struct sockaddr*)local_addr, 0, - (const char*)packet, packet_len, 0, &sock_err); - if (ret < 0) { - DBG_PRINTF("Error sending poll packet, ret=%d, sock_err=%d %s", ret, sock_err, strerror(sock_err)); - return -1; - } - - return 0; -} - -ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, - unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_size, - struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { +ssize_t server_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { // we don't support segmentation in the server - assert(segment_size == NULL || *segment_size == 0 || *segment_size == src_buf_len); + assert(segment_len == NULL || *segment_len == 0 || *segment_len == src_buf_len); - picoquic_connection_id_t initial_cnxid = picoquic_get_initial_cnxid(cnx); - - slipstream_cnxid_dns_request_buffer_t* cnxid_buffer = slipstream_dns_request_buffer_get_cnxid_buffer( - &slipstream_server_dns_request_buffer, &initial_cnxid, false); - if (cnxid_buffer == NULL) { - DBG_PRINTF("error getting cnxid buffer", NULL); - return -1; - } - - slot_t *slot = slipstream_dns_request_buffer_get_read_slot(&slipstream_server_dns_request_buffer, cnxid_buffer); - if (slot == NULL) { - DBG_PRINTF("no available DNS request to respond to", NULL); - return -1; - } + slot_t* slot = (slot_t*) slot_p; dns_query_t *query = (dns_query_t *) slot->dns_decoded; - const dns_question_t *question = &query->questions[0]; // assuming server_decode ensures there is exactly one question - dns_txt_t answer_txt; - answer_txt.name = question->name; - answer_txt.type = question->type; - answer_txt.class = question->class; - answer_txt.ttl = 60; - answer_txt.text = (char *)src_buf; - answer_txt.len = src_buf_len; - + dns_txt_t answer_txt; // TODO: fix dns_answer_t edns = {0}; edns.opt.name = "."; edns.opt.type = RR_OPT; @@ -117,18 +49,33 @@ ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback response.aa = true; response.rd = query->rd; response.cd = query->cd; - response.rcode = RCODE_OKAY; + response.rcode = slot->error; response.qdcount = 1; response.questions = query->questions; - response.ancount = 1; - response.answers = (dns_answer_t *)&answer_txt; + + if (src_buf_len > 0) { + const dns_question_t *question = &query->questions[0]; // assuming server_decode ensures there is exactly one question + answer_txt.name = question->name; + answer_txt.type = question->type; + answer_txt.class = question->class; + answer_txt.ttl = 60; + answer_txt.text = (char *)src_buf; + answer_txt.len = src_buf_len; + + response.ancount = 1; + response.answers = (dns_answer_t *)&answer_txt; + } else { + if (slot->error == RCODE_OKAY) { + response.rcode = RCODE_NAME_ERROR; + } + } + response.arcount = 1; response.additional = &edns; dns_packet_t* packet = malloc(MAX_UDP_PACKET_SIZE); size_t packet_len = MAX_UDP_PACKET_SIZE; dns_rcode_t rc = dns_encode(packet, &packet_len, &response); - slipstream_dns_request_buffer_free_slot(&slipstream_server_dns_request_buffer, slot); if (rc != RCODE_OKAY) { free(packet); DBG_PRINTF("dns_encode() = (%d) %s", rc, dns_rcode_text(rc)); @@ -137,27 +84,15 @@ ssize_t server_encode(picoquic_quic_t* quic, picoquic_cnx_t* cnx, void* callback *dest_buf = (unsigned char*)packet; memcpy(peer_addr, &slot->peer_addr, sizeof(struct sockaddr_storage)); + memcpy(local_addr, &slot->local_addr, sizeof(struct sockaddr_storage)); return packet_len; } -ssize_t server_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket_ctxs_t* s_ctxs, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) { +ssize_t server_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, picoquic_socket_ctx_t* s_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) { *dest_buf = NULL; - slot_t* slot; - if (!slipstream_server_dns_request_buffer.free) { - slot = slipstream_server_dns_request_buffer.tail; - assert(slot != NULL); - assert(slot->cnxid_buffer != NULL); - respond_and_free_slot(slot, s_ctxs, RCODE_NAME_ERROR); - } - - slot = slipstream_dns_request_buffer_get_write_slot(&slipstream_server_dns_request_buffer); - if (slot == NULL) { - DBG_PRINTF("error getting write slot", NULL); - sockaddr_dummy(peer_addr); - return -1; - } + slot_t* slot = slot_p; slot->created_time = picoquic_current_time(); // DNS packets arrive from random source ports, so: @@ -173,31 +108,36 @@ ssize_t server_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket const dns_rcode_t rc = dns_decode(packet, &packet_len, (const dns_packet_t*) src_buf, src_buf_len); if (rc != RCODE_OKAY) { DBG_PRINTF("dns_decode() = (%d) %s", rc, dns_rcode_text(rc)); - return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE); + // TODO: how to get rid of this packet + return -1; // TODO: server failure } const dns_query_t *query = (dns_query_t*) packet; if (!query->query) { DBG_PRINTF("dns record is not a query", NULL); - return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED); + slot->error = RCODE_FORMAT_ERROR; + return 0; } if (query->qdcount != 1) { DBG_PRINTF("dns record should contain exactly one query", NULL); - return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED); + slot->error = RCODE_FORMAT_ERROR; + return 0; } const dns_question_t *question = &query->questions[0]; if (question->type != RR_TXT) { // resolvers send anything for pinging, so we only respond to TXT queries // DBG_PRINTF("query type is not TXT", NULL); - return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED); + slot->error = RCODE_NAME_ERROR; + return 0; } const ssize_t data_len = strlen(question->name) - server_domain_name_len - 1 - 1; if (data_len <= 0) { DBG_PRINTF("subdomain is empty", NULL); - return respond_and_free_slot(slot, s_ctxs, RCODE_REFUSED); + slot->error = RCODE_NAME_ERROR; + return 0; } // copy the subdomain from name to a new buffer @@ -211,7 +151,8 @@ ssize_t server_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket if (decoded_len == (size_t) -1) { free(decoded_buf); DBG_PRINTF("error decoding base32: %lu", decoded_len); - return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE); + slot->error = RCODE_SERVER_FAILURE; + return 0; } picoquic_connection_id_t incoming_src_connection_id = {0}; @@ -223,27 +164,13 @@ ssize_t server_decode(picoquic_quic_t* quic, void* callback_ctx, picoquic_socket if (ret != 0) { free(decoded_buf); DBG_PRINTF("error parsing slipstream packet: %d", ret); - return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE); + slot->error = RCODE_SERVER_FAILURE; + return 0; } picoquic_cnx_t *cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id); - picoquic_connection_id_t initial_cnxid; - if (cnx == NULL) { - initial_cnxid = incoming_dest_connection_id; - } else { - initial_cnxid = picoquic_get_initial_cnxid(cnx); - } - - slipstream_cnxid_dns_request_buffer_t* cnxid_buffer = slipstream_dns_request_buffer_get_cnxid_buffer( - &slipstream_server_dns_request_buffer, &initial_cnxid, true); - if (cnxid_buffer == NULL) { - free(decoded_buf); - DBG_PRINTF("error getting or creating cnxid buffer", NULL); - return respond_and_free_slot(slot, s_ctxs, RCODE_SERVER_FAILURE); - } - + slot->cnx = cnx; slot->query_id = query->id; - slipstream_dns_request_buffer_commit_slot_to_cnxid_buffer(&slipstream_server_dns_request_buffer, cnxid_buffer, slot); if (is_poll_packet) { free(decoded_buf); @@ -361,26 +288,6 @@ int slipstream_server_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l slipstream_server_ctx_t* default_ctx = callback_ctx; switch (cb_mode) { - case picoquic_packet_loop_after_select: - const picoquic_socket_ctxs_t* s_ctxs = (picoquic_socket_ctxs_t*)callback_arg; - - uint64_t current_time = picoquic_current_time(); - for (int i = 0; i < slipstream_server_dns_request_buffer.cnxid_buffers_len; ++i) { - const slipstream_cnxid_dns_request_buffer_t* cnxid_buffer = slipstream_server_dns_request_buffer.cnxid_buffers[i]; - assert(cnxid_buffer != NULL); - slot_t* slot = cnxid_buffer->tail; - while (slot != NULL) { - const uint64_t age = current_time - slot->created_time; - if (age < 100000) { - break; - } - - // attempt to reply before resolver retries - respond_and_free_slot(slot, s_ctxs, RCODE_NAME_ERROR); - slot = slot->cnxid_buffer_prev; - } - } - break; case picoquic_packet_loop_wake_up: if (callback_ctx == NULL) { return 0; @@ -704,8 +611,6 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c #endif picoquic_set_key_log_file_from_env(quic); - slipstream_dns_request_buffer_init(&slipstream_server_dns_request_buffer); - picoquic_packet_loop_param_t param = {0}; param.local_af = AF_INET; param.local_port = server_port; @@ -713,7 +618,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c param.is_client = 0; param.decode = server_decode; param.encode = server_encode; - param.delay_max = 5000; + // param.delay_max = 5000; picoquic_network_thread_ctx_t thread_ctx = {0}; thread_ctx.quic = quic; @@ -727,7 +632,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c default_context.thread_ctx = &thread_ctx; signal(SIGTERM, server_sighandler); - picoquic_packet_loop_v3(&thread_ctx); + slipstream_packet_loop(&thread_ctx); ret = thread_ctx.return_code; /* And finish. */ diff --git a/src/slipstream_sockloop.c b/src/slipstream_sockloop.c new file mode 100644 index 0000000..4e928d4 --- /dev/null +++ b/src/slipstream_sockloop.c @@ -0,0 +1,248 @@ +#include "slipstream_sockloop.h" + +#include +#include +#include +#include +#include +#include + +#ifndef __USE_XOPEN2K +#define __USE_XOPEN2K +#endif +#ifndef __USE_POSIX +#define __USE_POSIX +#endif +#include + +#ifndef __APPLE__ +#ifdef __LINUX__ +#include /* Definition of PR_* constants */ +#else +#endif +#endif + +#include + +#ifndef SOCKET_TYPE +#define SOCKET_TYPE int +#endif +#ifndef INVALID_SOCKET +#define INVALID_SOCKET -1 +#endif +#ifndef SOCKET_CLOSE +#define SOCKET_CLOSE(x) close(x) +#endif +#ifndef WSA_LAST_ERROR +#define WSA_LAST_ERROR(x) ((long)(x)) +#endif + +#include "picosocks.h" +#include "picoquic.h" +#include "picoquic_internal.h" +#include "picoquic_packet_loop.h" +#include "slipstream_slot.h" + +# if defined(UDP_SEGMENT) +static int udp_gso_available = 1; +#else +static int udp_gso_available = 0; +#endif + + +int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_socket_ctx_t* s_ctx, + size_t send_buffer_size, size_t send_msg_size, size_t* send_msg_ptr) { + picoquic_quic_t* quic = thread_ctx->quic; + picoquic_packet_loop_param_t* param = thread_ctx->param; + const picoquic_packet_loop_cb_fn loop_callback = thread_ctx->loop_callback; + void* loop_callback_ctx = thread_ctx->loop_callback_ctx; + slot_t slots[PICOQUIC_PACKET_LOOP_RECV_MAX]; + + while (!thread_ctx->thread_should_close) { + if (loop_callback) { + loop_callback(quic, picoquic_packet_loop_before_select, loop_callback_ctx, s_ctx); + } + + size_t nb_slots_written = 0; + while (nb_slots_written < PICOQUIC_PACKET_LOOP_RECV_MAX) { + int64_t delta_t = 0; + + if (!param->is_client && nb_slots_written == 0) { + // Server mode: wait for a packet to arrive + delta_t = 10000000; + } + + if (param->is_client && nb_slots_written == 0) { + const uint64_t current_time = picoquic_current_time(); + const int64_t delay_max = param->delay_max == 0 ? 10000000 : param->delay_max; + delta_t = picoquic_get_next_wake_delay(quic, current_time, delay_max); + } + + struct sockaddr_storage peer_addr; + struct sockaddr_storage local_addr; + int if_index_to = 0; + uint8_t received_ecn; + uint8_t buffer[1536]; + int is_wake_up_event; + int socket_rank = -1; + int bytes_recv = picoquic_packet_loop_select(s_ctx, 1, &peer_addr, &local_addr, &if_index_to, &received_ecn, + buffer, sizeof(buffer), delta_t, &is_wake_up_event, thread_ctx, &socket_rank); + if (bytes_recv < 0) { + /* The interrupt error is expected if the loop is closing. */ + return thread_ctx->thread_should_close ? PICOQUIC_NO_ERROR_TERMINATE_PACKET_LOOP : -1; + } + + if (bytes_recv == 0 && is_wake_up_event) { + const int ret = loop_callback(quic, picoquic_packet_loop_wake_up, loop_callback_ctx, NULL); + if (ret < 0) { + return ret; + } + } + if (bytes_recv == 0) { + break; + } + + slot_t* slot = param->is_client ? &slots[0] : &slots[nb_slots_written]; + memset(slot, 0, sizeof(slot_t)); + nb_slots_written++; + if (slot == NULL) { + return -1; + } + + unsigned char* decoded; + bytes_recv = param->decode(thread_ctx->quic, slot, thread_ctx->loop_callback_ctx, s_ctx, &decoded, + (const unsigned char*)buffer, bytes_recv, &peer_addr, &local_addr); + if (bytes_recv < 0) { + DBG_PRINTF("decode() failed with error %d\n", bytes_recv); + return bytes_recv; + } + + if (bytes_recv == 0) { + // poll packet + continue; + } + + memcpy(buffer, decoded, bytes_recv); + free(decoded); + + /* Submit the packet to the server */ + uint8_t* received_buffer = buffer; + uint64_t current_time = picoquic_current_time(); + picoquic_cnx_t* last_cnx = NULL; + int ret = picoquic_incoming_packet_ex(quic, received_buffer, + (size_t)bytes_recv, (struct sockaddr*)&peer_addr, + (struct sockaddr*)&local_addr, if_index_to, received_ecn, + &last_cnx, current_time); + if (ret < 0) { + return ret; + } + slot->cnx = last_cnx; + } + + const uint64_t loop_time = picoquic_current_time(); + size_t nb_slots_read = 0; + const size_t max_slots = param->is_client ? PICOQUIC_PACKET_LOOP_SEND_MAX : nb_slots_written; + while (nb_slots_read < max_slots) { + uint8_t send_buffer[send_buffer_size]; + slot_t* slot = param->is_client ? &slots[0] : &slots[nb_slots_read]; + assert(slot != NULL); + nb_slots_read++; + + size_t send_length = 0; + struct sockaddr_storage peer_addr = {0}; + struct sockaddr_storage local_addr = {0}; + int if_index = param->dest_if; + if (slot->error == RCODE_OKAY) { + picoquic_connection_id_t log_cid; + picoquic_cnx_t* last_cnx = NULL; + int ret; + if (!param->is_client && slot->cnx) { + ret = picoquic_prepare_packet_ex(slot->cnx, loop_time, + send_buffer, send_buffer_size, &send_length, + &peer_addr, &local_addr, &if_index, send_msg_ptr); + last_cnx = slot->cnx; + } + else if (param->is_client) { + ret = picoquic_prepare_next_packet_ex(quic, loop_time, + send_buffer, send_buffer_size, &send_length, + &peer_addr, &local_addr, &if_index, &log_cid, &last_cnx, + send_msg_ptr); + } + if (ret < 0) { + return -1; + } + if (param->is_client && send_length == 0) { + break; + } + } + + int sock_err = 0; + int bytes_sent; + unsigned char* encoded; + size_t segment_len = send_msg_size == 0 ? send_length : send_msg_size; + ssize_t encoded_len = param->encode(thread_ctx->quic, slot, loop_callback_ctx, &encoded, + (const unsigned char*)send_buffer, send_length, &segment_len, &peer_addr, &local_addr); + if (encoded_len <= 0) { + DBG_PRINTF("Encoding fails, ret=%d\n", encoded_len); + continue; + } + + if (send_msg_size > 0) { + send_msg_size = segment_len; // new size after encoding + } + + const SOCKET_TYPE send_socket = s_ctx->fd; + bytes_sent = picoquic_sendmsg(send_socket, + (struct sockaddr*)&peer_addr, (struct sockaddr*)&local_addr, if_index, + (const char*)encoded, (int)encoded_len, (int)send_msg_size, &sock_err); + free(encoded); + if (bytes_sent == 0) { + DBG_PRINTF("BYTES_SENT == 0 %d\n", bytes_sent); + return -1; + } + if (bytes_sent < 0) { + return bytes_sent; + } + } + } + + return thread_ctx->return_code; +} + +void* slipstream_packet_loop(picoquic_network_thread_ctx_t* thread_ctx) { + const picoquic_packet_loop_param_t* param = thread_ctx->param; + if (!param->do_not_use_gso && param->encode != NULL && !param->is_client) { + DBG_FATAL_PRINTF("%s", "GSO disabled because encoding is enabled and server mode"); + } + + picoquic_socket_ctx_t s_ctx = {0}; + if (picoquic_packet_loop_open_sockets(param->local_port, + param->local_af, param->socket_buffer_size, + 0, param->do_not_use_gso, &s_ctx) <= 0) { + thread_ctx->return_code = PICOQUIC_ERROR_UNEXPECTED_ERROR; + return NULL; + } + + size_t send_buffer_size = param->socket_buffer_size; + size_t send_msg_size = 0; + size_t* send_msg_ptr = NULL; + if (udp_gso_available && !param->do_not_use_gso) { + send_buffer_size = 0xFFFF; + send_msg_ptr = &send_msg_size; + } + if (send_buffer_size == 0) { + send_buffer_size = 0xffff; + } + + thread_ctx->thread_is_ready = 1; + thread_ctx->return_code = slipstream_packet_loop_(thread_ctx, &s_ctx, send_buffer_size, send_msg_size, send_msg_ptr); + thread_ctx->thread_is_ready = 0; + + /* Close the sockets */ + picoquic_packet_loop_close_socket(&s_ctx); + + if (thread_ctx->is_threaded) { + pthread_exit((void*)&thread_ctx->return_code); + } + return (NULL); +}