diff --git a/CMakeLists.txt b/CMakeLists.txt index 651e13f..4e6a751 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,15 +39,15 @@ add_executable(slipstream src/slipstream.c src/slipstream_client.c src/slipstream_inline_dots.c - src/slipstream_packet.c src/slipstream_resolver_addresses.c src/slipstream_server.c + src/slipstream_server_cc.c src/slipstream_sockloop.c src/slipstream_utils.c include/slipstream.h include/slipstream_inline_dots.h - include/slipstream_packet.h include/slipstream_resolver_addresses.h + include/slipstream_server_cc.h include/slipstream_slot.h include/slipstream_sockloop.h include/slipstream_utils.h diff --git a/extern/picoquic b/extern/picoquic index c00a6f6..c3419f8 160000 --- a/extern/picoquic +++ b/extern/picoquic @@ -1 +1 @@ -Subproject commit c00a6f69032a255ee15892a0fec3c8e8cc1da4a4 +Subproject commit c3419f8dccf441ad9c2a14d747261701f2884edd diff --git a/include/slipstream.h b/include/slipstream.h index 5d690be..ffe4c18 100644 --- a/include/slipstream.h +++ b/include/slipstream.h @@ -23,7 +23,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f const char* cc_algo_id); int picoquic_slipstream_server(int server_port, const char* pem_cert, const char* pem_key, char const* upstream_name, - int upstream_port, const char* domain_name, const char* cc_algo_id); + int upstream_port, const char* domain_name); #ifdef __cplusplus } diff --git a/include/slipstream_packet.h b/include/slipstream_packet.h deleted file mode 100644 index d777abf..0000000 --- a/include/slipstream_packet.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef SLIPSTREAM_PACKET -#define SLIPSTREAM_PACKET - -#include -#include "picoquic.h" - -#define PICOQUIC_SHORT_HEADER_CONNECTION_ID_SIZE 8 - -bool slipstream_packet_is_long_header(const uint8_t first_byte); - -int slipstream_packet_create_poll(uint8_t** dest_buf, size_t* dest_buf_len, picoquic_connection_id_t dst_connection_id); - -int slipstream_packet_parse(uint8_t* src_buf, size_t src_buf_len, size_t short_header_conn_id_len, picoquic_connection_id_t* src_connection_id, picoquic_connection_id_t* dst_connection_id, bool* is_poll_packet); - -#endif // SLIPSTREAM_PACKET diff --git a/include/slipstream_resolver_addresses.h b/include/slipstream_resolver_addresses.h index 55c16a2..3fec042 100644 --- a/include/slipstream_resolver_addresses.h +++ b/include/slipstream_resolver_addresses.h @@ -1,8 +1,15 @@ #ifndef SLIPSTREAM_RESOLVERS_H #define SLIPSTREAM_RESOLVERS_H +#include #include +#include -struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_filename, size_t *count); +typedef struct st_address_t { + struct sockaddr_storage server_address; + bool added; +} address_t; + +struct st_address_t* read_resolver_addresses(const char *resolver_addresses_filename, size_t *count); #endif //SLIPSTREAM_RESOLVERS_H diff --git a/include/slipstream_server_cc.h b/include/slipstream_server_cc.h new file mode 100644 index 0000000..28f1df1 --- /dev/null +++ b/include/slipstream_server_cc.h @@ -0,0 +1,8 @@ +#ifndef SLIPSTREAM_SERVER_CC_H +#define SLIPSTREAM_SERVER_CC_H + +#include "picoquic.h" + +extern picoquic_congestion_algorithm_t* slipstream_server_cc_algorithm; + +#endif //SLIPSTREAM_SERVER_CC_H diff --git a/include/slipstream_slot.h b/include/slipstream_slot.h index 4fc2d95..ba78697 100644 --- a/include/slipstream_slot.h +++ b/include/slipstream_slot.h @@ -11,7 +11,7 @@ typedef struct st_slot_t { struct sockaddr_storage local_addr; picoquic_cnx_t* cnx; uint64_t created_time; - int query_id; + bool is_poll_packet; } slot_t; #endif // SLIPSTREAM_SLOT diff --git a/include/slipstream_utils.h b/include/slipstream_utils.h index 384e0cb..e2c87a7 100644 --- a/include/slipstream_utils.h +++ b/include/slipstream_utils.h @@ -7,4 +7,6 @@ char* picoquic_connection_id_to_string(const picoquic_connection_id_t* cid); void sockaddr_dummy(struct sockaddr_storage *addr_storage); +void print_sockaddr_ip_and_port(struct sockaddr_storage *addr_storage); + #endif //SLIPSTREAM_UTILS_H diff --git a/src/slipstream.c b/src/slipstream.c index 2a48c72..832dfae 100644 --- a/src/slipstream.c +++ b/src/slipstream.c @@ -30,6 +30,9 @@ int main(int argc, char** argv) (void)WSA_START(MAKEWORD(2, 2), &wsaData); #endif + + setbuf(stdout, NULL); + setbuf(stderr, NULL); if (argc < 2) { usage(argv[0]); } @@ -46,15 +49,14 @@ int main(int argc, char** argv) } } else if (strcmp(argv[1], "server") == 0) { - if (argc != 9) { + if (argc != 8) { usage(argv[0]); } else { int server_port = get_port(argv[0], argv[2]); int remote_port = get_port(argv[0], argv[6]); const char* domain_name = argv[7]; - const char* cc_algo_id = argv[8]; - exit_code = picoquic_slipstream_server(server_port, argv[3], argv[4], argv[5], remote_port, domain_name, cc_algo_id); + exit_code = picoquic_slipstream_server(server_port, argv[3], argv[4], argv[5], remote_port, domain_name); } } else diff --git a/src/slipstream_client.c b/src/slipstream_client.c index 1ec76a8..8f04ada 100644 --- a/src/slipstream_client.c +++ b/src/slipstream_client.c @@ -22,8 +22,8 @@ #include "picoquic_config.h" #include "slipstream.h" #include "slipstream_inline_dots.h" -#include "slipstream_packet.h" #include "slipstream_resolver_addresses.h" +#include "slipstream_utils.h" #include "SPCDNS/src/dns.h" #include "SPCDNS/src/mappings.h" @@ -39,15 +39,17 @@ typedef struct st_slipstream_client_ctx_t { picoquic_cnx_t* cnx; slipstream_client_stream_ctx_t* first_stream; picoquic_network_thread_ctx_t* thread_ctx; - struct sockaddr_storage* server_addresses; + struct st_address_t* server_addresses; size_t server_address_count; uint64_t last_request; + bool ready; + bool closed; } slipstream_client_ctx_t; char* client_domain_name = NULL; size_t client_domain_name_len = 0; -ssize_t client_encode_segment(picoquic_quic_t* quic, dns_packet_t* packet, size_t* packet_len, const unsigned char* src_buf, size_t src_buf_len) { +ssize_t client_encode_segment(dns_packet_t* packet, size_t* packet_len, const unsigned char* src_buf, size_t src_buf_len) { char name[255]; const size_t len = b32_encode(&name[0], (const char*) src_buf, src_buf_len, true, false); const size_t encoded_len = slipstream_inline_dotify(name, 255, len); @@ -89,7 +91,7 @@ ssize_t client_encode_segment(picoquic_quic_t* quic, dns_packet_t* packet, size_ return 0; } -ssize_t client_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { +ssize_t client_encode(void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { assert(callback_ctx); slipstream_client_ctx_t* client_ctx = callback_ctx; @@ -97,7 +99,7 @@ ssize_t client_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, u if (src_buf_len <= *segment_len) { size_t packet_len = MAX_DNS_QUERY_SIZE; unsigned char* packet = malloc(packet_len); - const ssize_t ret = client_encode_segment(quic, (dns_packet_t*) packet, &packet_len, src_buf, src_buf_len); + const ssize_t ret = client_encode_segment((dns_packet_t*) packet, &packet_len, src_buf, src_buf_len); if (ret < 0) { free(packet); return -1; @@ -119,7 +121,7 @@ ssize_t client_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, u size_t first_packet_len = 0; for (size_t i = 0; i < num_segments; i++) { size_t packet_len = MAX_DNS_QUERY_SIZE; - const ssize_t ret = client_encode_segment(quic, (dns_packet_t*) current_packet, &packet_len, segment, *segment_len); + const ssize_t ret = client_encode_segment((dns_packet_t*) current_packet, &packet_len, segment, *segment_len); if (ret < 0) { free(packets); return -1; @@ -147,9 +149,8 @@ ssize_t client_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, u return current_packet - packets; } -ssize_t client_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, picoquic_socket_ctx_t* s_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { +ssize_t client_decode(void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { *dest_buf = NULL; - slipstream_client_ctx_t* client_ctx = callback_ctx; size_t bufsize = DNS_DECODEBUF_4K * sizeof(dns_decoded_t); dns_decoded_t decoded[DNS_DECODEBUF_4K] = {0}; @@ -190,62 +191,6 @@ ssize_t client_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, p *dest_buf = malloc(answer_txt->len); memcpy((void*)*dest_buf, answer_txt->text, answer_txt->len); - - picoquic_connection_id_t incoming_src_connection_id = {0}; - picoquic_connection_id_t incoming_dest_connection_id; // sure to be set by parser - bool is_poll_packet = false; - int ret = slipstream_packet_parse(*dest_buf, answer_txt->len, PICOQUIC_SHORT_HEADER_CONNECTION_ID_SIZE, - &incoming_src_connection_id, &incoming_dest_connection_id, &is_poll_packet); - if (ret != 0 || is_poll_packet) { - DBG_PRINTF("[%d] error parsing slipstream packet: %d", query->id, ret); - return answer_txt->len; - } - - - // get active destination connection id on this ctx - picoquic_cnx_t* cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id); - picoquic_connection_id_t outgoing_dest_connection_id = cnx->path[0]->p_remote_cnxid->cnx_id; - if (outgoing_dest_connection_id.id_len == 0) { - // p_remote_cnxid is not set yet when we are receiving the first server response - outgoing_dest_connection_id = incoming_src_connection_id; - } - - const int poll_ratio = 1; - for (int j = 0; j < poll_ratio; ++j) { - uint8_t* poll_packet_buf; - size_t poll_packet_len; - ret = slipstream_packet_create_poll(&poll_packet_buf, &poll_packet_len, outgoing_dest_connection_id); - if (ret < 0) { - DBG_PRINTF("error creating poll packet", NULL); - return answer_txt->len; - } - - unsigned char* encoded; - ssize_t encoded_len = client_encode(quic, slot_p, callback_ctx, &encoded, poll_packet_buf, poll_packet_len, &poll_packet_len, peer_addr, local_addr); - if (encoded_len <= 0) { - DBG_PRINTF("error encoding poll packet", NULL); - free(poll_packet_buf); - return answer_txt->len; - } - - const SOCKET_TYPE send_socket = s_ctx->fd; - int sock_err = 0; - ret = picoquic_sendmsg(send_socket, - (struct sockaddr*)peer_addr, (struct sockaddr*)local_addr, 0, - (const char*)encoded, encoded_len, 0, &sock_err); - if (ret < 0) { - DBG_PRINTF("Error sending poll packet, ret=%d, sock_err=%d %s", ret, sock_err, strerror(sock_err)); - free(poll_packet_buf); - free(encoded); - return answer_txt->len; - } - - free(poll_packet_buf); - free(encoded); - } - - client_ctx->last_request = picoquic_current_time(); - return answer_txt->len; } @@ -298,8 +243,7 @@ static void slipstream_client_free_context(slipstream_client_ctx_t* client_ctx) free(client_ctx->server_addresses); - /* release the memory */ - free(client_ctx); + client_ctx->closed = true; } void slipstream_client_mark_active_pass(slipstream_client_ctx_t* client_ctx) { @@ -315,64 +259,48 @@ void slipstream_client_mark_active_pass(slipstream_client_ctx_t* client_ctx) { } } +void slipstream_add_paths(slipstream_client_ctx_t* client_ctx) { + picoquic_cnx_t* cnx = client_ctx->cnx; + // add rest of the resolvers + for (size_t i = 1; i < client_ctx->server_address_count; i++) { + address_t* slipstream_path = &client_ctx->server_addresses[i]; + if (slipstream_path->added) { + continue; + } + + uint64_t current_time = picoquic_current_time(); + // if (current_time - cnx->start_time < 10000000) { + // DBG_PRINTF("Can't add path yet", NULL); + // continue; + // } + + print_sockaddr_ip_and_port(&slipstream_path->server_address); + int path_id = -2; + picoquic_probe_new_path_ex(cnx, (struct sockaddr*)&slipstream_path->server_address, (struct sockaddr*)&cnx->path[0]->local_addr, 0, current_time, 0, &path_id); + if (path_id < 0) { + DBG_PRINTF("Failed adding path", NULL); + continue; + } + DBG_PRINTF("Added path", NULL); + + picoquic_reinsert_by_wake_time(cnx->quic, cnx, current_time); + slipstream_path->added = true; + } +} + int slipstream_client_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_loop_cb_enum cb_mode, void* callback_ctx, void* callback_arg) { slipstream_client_ctx_t* client_ctx = callback_ctx; + if (client_ctx->closed) { + return 0; + } + switch (cb_mode) { case picoquic_packet_loop_before_select: - uint64_t current_time = picoquic_current_time(); - uint64_t passed = current_time - client_ctx->last_request; - if (passed < 200000) { - break; + if (client_ctx->ready) { + slipstream_add_paths(client_ctx); } - - const picoquic_cnx_t* cnx = client_ctx->cnx; - const int path_index = rand() % cnx->nb_paths; - const picoquic_path_t* path_x = cnx->path[path_index]; - picoquic_connection_id_t outgoing_dest_connection_id = path_x->p_remote_cnxid->cnx_id; - if (outgoing_dest_connection_id.id_len == 0) { - outgoing_dest_connection_id = cnx->initial_cnxid; - } - - - uint8_t* poll_packet_buf; - size_t poll_packet_len; - int ret = slipstream_packet_create_poll(&poll_packet_buf, &poll_packet_len, outgoing_dest_connection_id); - if (ret < 0) { - DBG_PRINTF("error creating poll packet", NULL); - return 0; - } - - struct sockaddr_storage *peer_addr = &cnx->path[0]->peer_addr; - struct sockaddr_storage *local_addr = &cnx->path[0]->local_addr; - - picoquic_socket_ctx_t* s_ctx = (picoquic_socket_ctx_t*)callback_arg; - unsigned char* encoded; - ssize_t encoded_len = client_encode(quic, NULL, client_ctx, &encoded, poll_packet_buf, poll_packet_len, &poll_packet_len, NULL, NULL); - if (encoded_len <= 0) { - DBG_PRINTF("error encoding poll packet", NULL); - free(poll_packet_buf); - return 0; - } - - const SOCKET_TYPE send_socket = s_ctx->fd; - - int sock_err = 0; - ret = picoquic_sendmsg(send_socket, - (struct sockaddr*)peer_addr, (struct sockaddr*)local_addr, 0, - (const char*)encoded, encoded_len, 0, &sock_err); - if (ret < 0) { - DBG_PRINTF("Error sending poll packet, ret=%d, sock_err=%d %s", ret, sock_err, strerror(sock_err)); - free(poll_packet_buf); - free(encoded); - return 0; - } - - free(poll_packet_buf); - free(encoded); - - client_ctx->last_request = current_time; case picoquic_packet_loop_wake_up: if (callback_ctx == NULL) { return 0; @@ -640,26 +568,8 @@ int slipstream_client_callback(picoquic_cnx_t* cnx, break; case picoquic_callback_ready: fprintf(stdout, "Connection confirmed.\n"); - - // add rest of the resolvers - for (size_t i = 1; i < client_ctx->server_address_count; i++) { - struct sockaddr* server_address = (struct sockaddr*)&client_ctx->server_addresses[i]; - uint64_t current_time = picoquic_current_time(); - // convert server address to string - char host[NI_MAXHOST]; - socklen_t addrlen = sizeof(*server_address); - ret = getnameinfo(server_address, addrlen, host, sizeof(host), NULL, 0, NI_NUMERICHOST | NI_NUMERICSERV); - if (ret == 0) { - printf("Probing path: %s", host); - } - - ret = picoquic_probe_new_path(cnx, server_address, NULL, current_time); - if (ret != 0) { - fprintf(stderr, "Could not create probe new path\n"); - return -1; - } - } - break; + client_ctx->ready = true; + slipstream_add_paths(client_ctx); default: /* unexpected -- just ignore. */ break; @@ -707,7 +617,7 @@ static int slipstream_connect(struct sockaddr_storage* server_address, } // 400ms - // picoquic_enable_keep_alive(*cnx, 400000); + picoquic_enable_keep_alive(*cnx, 400000); /* Document connection in client's context */ client_ctx->cnx = *cnx; @@ -768,10 +678,9 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f /* Create the QUIC context for the server */ current_time = picoquic_current_time(); // one connection only, freed in slipstream_client_free_context on picoquic close callback - slipstream_client_ctx_t *client_ctx = malloc(sizeof(slipstream_client_ctx_t)); - memset(client_ctx, 0, sizeof(slipstream_client_ctx_t)); + slipstream_client_ctx_t client_ctx = {0}; /* Create QUIC context */ - picoquic_quic_t* quic = picoquic_create_and_configure(&config, slipstream_client_callback, client_ctx, current_time, NULL); + picoquic_quic_t* quic = picoquic_create_and_configure(&config, slipstream_client_callback, &client_ctx, current_time, NULL); if (quic == NULL) { fprintf(stderr, "Could not create server context\n"); return -1; @@ -783,17 +692,19 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f debug_printf_push_stream(stderr); #endif picoquic_set_key_log_file_from_env(quic); + // picoquic_set_textlog(quic, "-"); + // picoquic_set_log_level(quic, 1); // TODO: idle timeout? /* Read the server address list from the file */ - client_ctx->server_addresses = read_resolver_addresses(resolver_addresses_filename, &client_ctx->server_address_count); - if (!client_ctx->server_addresses) { + client_ctx.server_addresses = read_resolver_addresses(resolver_addresses_filename, &client_ctx.server_address_count); + if (!client_ctx.server_addresses) { printf("Failed to read IP addresses\n"); return 1; } picoquic_cnx_t* cnx = NULL; - ret = slipstream_connect(client_ctx->server_addresses, quic, &cnx, client_ctx); + ret = slipstream_connect(&client_ctx.server_addresses[0].server_address, quic, &cnx, &client_ctx); if (ret != 0) { fprintf(stderr, "Could not connect to server\n"); return -1; @@ -847,17 +758,17 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f thread_ctx.quic = quic; thread_ctx.param = ¶m; thread_ctx.loop_callback = slipstream_client_sockloop_callback; - thread_ctx.loop_callback_ctx = client_ctx; + thread_ctx.loop_callback_ctx = &client_ctx; /* Open the wake up pipe or event */ picoquic_open_network_wake_up(&thread_ctx, &ret); - client_ctx->thread_ctx = &thread_ctx; + client_ctx.thread_ctx = &thread_ctx; slipstream_client_accepter_args* args = malloc(sizeof(slipstream_client_accepter_args)); args->fd = listen_sock; args->cnx = cnx; - args->client_ctx = client_ctx; + args->client_ctx = &client_ctx; args->thread_ctx = &thread_ctx; pthread_t thread; @@ -867,6 +778,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f } signal(SIGTERM, client_sighandler); + // picoquic_packet_loop_v3(&thread_ctx); slipstream_packet_loop(&thread_ctx); ret = thread_ctx.return_code; diff --git a/src/slipstream_packet.c b/src/slipstream_packet.c deleted file mode 100644 index cce17f3..0000000 --- a/src/slipstream_packet.c +++ /dev/null @@ -1,95 +0,0 @@ -#include -#include - -#include "slipstream_packet.h" -#include "picoquic_utils.h" - -const int num_padding_for_poll = 5; - -#define PICOQUIC_SHORT_HEADER_CONNECTION_ID_SIZE 8 - -bool slipstream_packet_is_long_header(const uint8_t first_byte) { - return first_byte & 0x80; -} - -int slipstream_packet_create_poll(uint8_t** dest_buf, size_t* dest_buf_len, picoquic_connection_id_t dst_connection_id) { - *dest_buf = NULL; - - if (num_padding_for_poll < 5) { - return -1; - } - - // Allocate a num_padding_for_poll + dst_connection_id.id_len + len marker + dst_connection_id len marker - size_t packet_len = num_padding_for_poll + dst_connection_id.id_len + 1 + 1; - uint8_t* packet = malloc(packet_len); - - // Write random padding bytes to the entire packet - for (int i = 0; i < packet_len; i++) { - packet[i] = rand() % 256; - } - - packet[0] |= 0x80; // Set bit 7 (long header format) - - // Write destination connection ID - packet[5] = dst_connection_id.id_len; - memcpy(&packet[6], dst_connection_id.id, dst_connection_id.id_len); - - // Ensure the source connection ID len marker byte is larger than PICOQUIC_CONNECTION_ID_MAX_SIZE - int randomly_written_src_connection_id = packet[5+1+dst_connection_id.id_len]; - if (randomly_written_src_connection_id <= PICOQUIC_CONNECTION_ID_MAX_SIZE) { - packet[5+1+dst_connection_id.id_len] = PICOQUIC_CONNECTION_ID_MAX_SIZE + 1; - } - - // The rest of the payload (including pretend second connection ID) is random padding - - *dest_buf = packet; - *dest_buf_len = packet_len; - - return packet_len; -} - -int slipstream_packet_parse(uint8_t* src_buf, size_t src_buf_len, size_t short_header_conn_id_len, picoquic_connection_id_t* src_connection_id, picoquic_connection_id_t* dst_connection_id, bool* is_poll_packet) { - if (src_buf_len < 1) { - return -1; - } - - // Short header packet - if (!slipstream_packet_is_long_header(src_buf[0])) { - // Short header packets can't be poll packets - if (src_buf_len < short_header_conn_id_len + 1) { - return -1; - } - - picoquic_parse_connection_id(&src_buf[1], short_header_conn_id_len, dst_connection_id); - return 0; - } - - // Read destination connection ID - if (src_buf_len < 5+1) { - return -1; - } - const size_t dst_connection_id_len = src_buf[5]; - if (dst_connection_id_len > PICOQUIC_CONNECTION_ID_MAX_SIZE) { - return -1; - } - if (src_buf_len < 5+1+dst_connection_id_len) { - return -1; - } - picoquic_parse_connection_id(&src_buf[5+1], dst_connection_id_len, dst_connection_id); - - // Read source connection ID - if (src_buf_len < 5+1+dst_connection_id_len+1) { - return -1; - } - const size_t src_connection_id_len = src_buf[5+1+dst_connection_id_len]; - if (src_connection_id_len > PICOQUIC_CONNECTION_ID_MAX_SIZE) { - *is_poll_packet = true; - return 0; - } - if (src_buf_len < 5+1+dst_connection_id_len+1+src_connection_id_len) { - return -1; - } - picoquic_parse_connection_id(&src_buf[5+1+dst_connection_id_len+1], src_connection_id_len, src_connection_id); - - return 0; -} diff --git a/src/slipstream_resolver_addresses.c b/src/slipstream_resolver_addresses.c index 89cf78d..014d733 100644 --- a/src/slipstream_resolver_addresses.c +++ b/src/slipstream_resolver_addresses.c @@ -10,7 +10,7 @@ #define MAX_LINE_LENGTH 50 #define DEFAULT_PORT 53 -struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_filename, size_t *count) { +struct st_address_t* read_resolver_addresses(const char *resolver_addresses_filename, size_t *count) { *count = 0; FILE *fp = fopen(resolver_addresses_filename, "r"); @@ -19,7 +19,7 @@ struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_ } int capacity = INITIAL_CAPACITY; - struct sockaddr_storage* server_address = calloc(capacity, sizeof(struct sockaddr_storage)); + struct st_address_t* server_address = calloc(capacity, sizeof(struct st_address_t)); if (!server_address) { fclose(fp); return NULL; @@ -40,7 +40,7 @@ struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_ // Resize array if needed if (valid_addresses == capacity) { capacity *= 2; - struct sockaddr_storage* temp = realloc(server_address, capacity * sizeof(struct sockaddr_storage)); + struct st_address_t* temp = realloc(server_address, capacity * sizeof(struct st_address_t)); if (!temp) { fprintf(stderr, "Memory allocation failed\n"); free(server_address); @@ -61,7 +61,7 @@ struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_ printf("Adding %s:%d\n", server_name, server_port); int is_name = 0; - if (picoquic_get_server_address(server_name, server_port, &server_address[valid_addresses], &is_name) != 0) { + if (picoquic_get_server_address(server_name, server_port, &server_address[valid_addresses].server_address, &is_name) != 0) { fprintf(stderr, "Cannot get the IP address for <%s> port <%d>\n", server_name, server_port); continue; // Skip invalid addresses instead of failing } @@ -73,7 +73,7 @@ struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_ // Trim excess memory if needed if (valid_addresses < capacity) { - struct sockaddr_storage* temp = realloc(server_address, valid_addresses * sizeof(struct sockaddr_storage)); + struct st_address_t* temp = realloc(server_address, valid_addresses * sizeof(struct st_address_t)); if (temp) { server_address = temp; } diff --git a/src/slipstream_server.c b/src/slipstream_server.c index 4e26ae4..f721a76 100644 --- a/src/slipstream_server.c +++ b/src/slipstream_server.c @@ -13,13 +13,15 @@ #include #include #include +#include #include #include "lua-resty-base-encoding-base32.h" #include "picoquic_config.h" +#include "picoquic_logger.h" #include "slipstream.h" #include "slipstream_inline_dots.h" -#include "slipstream_packet.h" +#include "../include/slipstream_server_cc.h" #include "slipstream_slot.h" #include "slipstream_utils.h" #include "SPCDNS/src/dns.h" @@ -28,7 +30,7 @@ char* server_domain_name = NULL; size_t server_domain_name_len = 0; -ssize_t server_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { +ssize_t server_encode(void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) { // we don't support segmentation in the server assert(segment_len == NULL || *segment_len == 0 || *segment_len == src_buf_len); @@ -89,7 +91,7 @@ ssize_t server_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, u return packet_len; } -ssize_t server_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, picoquic_socket_ctx_t* s_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) { +ssize_t server_decode(void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) { *dest_buf = NULL; slot_t* slot = slot_p; @@ -155,28 +157,6 @@ ssize_t server_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, p return 0; } - picoquic_connection_id_t incoming_src_connection_id = {0}; - picoquic_connection_id_t incoming_dest_connection_id; // sure to be set by parser - bool is_poll_packet = false; - // ReSharper disable once CppDFAUnreachableCode - const int ret = slipstream_packet_parse(decoded_buf, decoded_len, PICOQUIC_SHORT_HEADER_CONNECTION_ID_SIZE, - &incoming_src_connection_id, &incoming_dest_connection_id, &is_poll_packet); - if (ret != 0) { - free(decoded_buf); - DBG_PRINTF("error parsing slipstream packet: %d", ret); - slot->error = RCODE_SERVER_FAILURE; - return 0; - } - - picoquic_cnx_t *cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id); - slot->cnx = cnx; - slot->query_id = query->id; - - if (is_poll_packet) { - free(decoded_buf); - return 0; - } - *dest_buf = decoded_buf; return decoded_len; @@ -556,7 +536,7 @@ void server_sighandler(int signum) { } int picoquic_slipstream_server(int server_port, const char* server_cert, const char* server_key, - char const* upstream_name, int upstream_port, const char* domain_name, const char* cc_algo_id) { + char const* upstream_name, int upstream_port, const char* domain_name) { /* Start: start the QUIC process with cert and key files */ int ret = 0; uint64_t current_time = 0; @@ -586,7 +566,6 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c config.mtu_max = mtu; config.initial_send_mtu_ipv4 = mtu; config.initial_send_mtu_ipv6 = mtu; - config.cc_algo_id = cc_algo_id; config.multipath_option = 1; config.use_long_log = 1; config.do_preemptive_repeat = 1; @@ -610,6 +589,10 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c debug_printf_push_stream(stderr); #endif picoquic_set_key_log_file_from_env(quic); + // picoquic_set_textlog(quic, "-"); + // picoquic_set_log_level(quic, 1); + + picoquic_set_default_congestion_algorithm(quic, slipstream_server_cc_algorithm); picoquic_packet_loop_param_t param = {0}; param.local_af = AF_INET; @@ -632,6 +615,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c default_context.thread_ctx = &thread_ctx; signal(SIGTERM, server_sighandler); + // picoquic_packet_loop_v3(&thread_ctx); slipstream_packet_loop(&thread_ctx); ret = thread_ctx.return_code; diff --git a/src/slipstream_server_cc.c b/src/slipstream_server_cc.c new file mode 100644 index 0000000..95e3510 --- /dev/null +++ b/src/slipstream_server_cc.c @@ -0,0 +1,58 @@ +#include "../include/slipstream_server_cc.h" + +#include + +#include + +typedef enum { + slipstream_server_cc_alg_none = 0, +} slipstream_server_cc_alg_state_t; + +typedef struct st_slipstream_server_cc_t { + slipstream_server_cc_alg_state_t state; +} slipstream_server_cc_t; + + +static void slipstream_server_cc_init(picoquic_cnx_t * cnx, picoquic_path_t* path_x, uint64_t current_time) +{ + slipstream_server_cc_t* state = (slipstream_server_cc_t*)malloc(sizeof(slipstream_server_cc_t)); + path_x->congestion_alg_state = (void*)state; +} + +static void slipstream_server_cc_notify( + picoquic_cnx_t* cnx, + picoquic_path_t* path_x, + picoquic_congestion_notification_t notification, + picoquic_per_ack_state_t * ack_state, + uint64_t current_time) +{ + path_x->is_cc_data_updated = 1; + path_x->cwin = UINT64_MAX; +} + +static void slipstream_server_cc_delete(picoquic_path_t* path_x) { + if (path_x->congestion_alg_state != NULL) { + free(path_x->congestion_alg_state); + path_x->congestion_alg_state = NULL; + } +} + +void slipstream_server_cc_observe(picoquic_path_t* path_x, uint64_t* cc_state, uint64_t* cc_param) +{ + slipstream_server_cc_t* state = (slipstream_server_cc_t*)path_x->congestion_alg_state; + *cc_state = (uint64_t)state->state; + *cc_param = UINT64_MAX; +} + +#define picoquic_slipstream_server_cc_ID "slipstream_server" +#define PICOQUIC_CC_ALGO_NUMBER_SLIPSTREAM_SERVER 10 + +picoquic_congestion_algorithm_t slipstream_server_cc_algorithm_struct = { + picoquic_slipstream_server_cc_ID, PICOQUIC_CC_ALGO_NUMBER_SLIPSTREAM_SERVER, + slipstream_server_cc_init, + slipstream_server_cc_notify, + slipstream_server_cc_delete, + slipstream_server_cc_observe +}; + +picoquic_congestion_algorithm_t* slipstream_server_cc_algorithm = &slipstream_server_cc_algorithm_struct; diff --git a/src/slipstream_sockloop.c b/src/slipstream_sockloop.c index 4c951ec..4dd4092 100644 --- a/src/slipstream_sockloop.c +++ b/src/slipstream_sockloop.c @@ -57,6 +57,7 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_ const picoquic_packet_loop_cb_fn loop_callback = thread_ctx->loop_callback; void* loop_callback_ctx = thread_ctx->loop_callback_ctx; slot_t slots[PICOQUIC_PACKET_LOOP_RECV_MAX] = {0}; + slot_t send_slot = {0}; while (!thread_ctx->thread_should_close) { if (loop_callback) { @@ -64,6 +65,7 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_ } size_t nb_slots_written = 0; + size_t nb_packet_received = 0; while (nb_slots_written < PICOQUIC_PACKET_LOOP_RECV_MAX) { int64_t delta_t = 0; @@ -102,15 +104,13 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_ break; } - slot_t* slot = param->is_client ? &slots[0] : &slots[nb_slots_written]; + slot_t* slot = &slots[nb_slots_written]; + assert(slot != NULL); memset(slot, 0, sizeof(slot_t)); nb_slots_written++; - if (slot == NULL) { - return -1; - } unsigned char* decoded; - bytes_recv = param->decode(thread_ctx->quic, slot, thread_ctx->loop_callback_ctx, s_ctx, &decoded, + bytes_recv = param->decode(slot, thread_ctx->loop_callback_ctx, &decoded, (const unsigned char*)buffer, bytes_recv, &peer_addr, &local_addr); if (bytes_recv < 0) { DBG_PRINTF("decode() failed with error %d\n", bytes_recv); @@ -118,7 +118,6 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_ } if (bytes_recv == 0) { - // poll packet continue; } @@ -137,14 +136,28 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_ return ret; } slot->cnx = last_cnx; + nb_packet_received++; + + if (!param->is_client && last_cnx->nb_paths == 1) { + // server can only have 1 incoming path + picoquic_path_t* path_x = last_cnx->path[0]; + picoquic_set_ack_needed(last_cnx, current_time, picoquic_packet_context_application, path_x, 1); + } } const uint64_t loop_time = picoquic_current_time(); + size_t nb_packets_sent = 0; size_t nb_slots_read = 0; const size_t max_slots = param->is_client ? PICOQUIC_PACKET_LOOP_SEND_MAX : nb_slots_written; while (nb_slots_read < max_slots) { uint8_t send_buffer[send_buffer_size]; - slot_t* slot = param->is_client ? &slots[0] : &slots[nb_slots_read]; + slot_t* slot; + if (param->is_client) { + memset(&send_slot, 0, sizeof(slot_t)); + slot = &send_slot; + } else { + slot = &slots[nb_slots_read]; + } assert(slot != NULL); nb_slots_read++; @@ -154,18 +167,16 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_ int if_index = param->dest_if; if (slot->error == RCODE_OKAY) { picoquic_connection_id_t log_cid; - picoquic_cnx_t* last_cnx = NULL; int ret; if (!param->is_client && slot->cnx) { ret = picoquic_prepare_packet_ex(slot->cnx, loop_time, send_buffer, send_buffer_size, &send_length, &peer_addr, &local_addr, &if_index, send_msg_ptr); - last_cnx = slot->cnx; } else if (param->is_client) { ret = picoquic_prepare_next_packet_ex(quic, loop_time, send_buffer, send_buffer_size, &send_length, - &peer_addr, &local_addr, &if_index, &log_cid, &last_cnx, + &peer_addr, &local_addr, &if_index, &log_cid, &slot->cnx, send_msg_ptr); } if (ret < 0) { @@ -176,17 +187,97 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_ } } + if (param->is_client && peer_addr.ss_family == 0 && slot->peer_addr.ss_family == 0) { + continue; + } + int sock_err = 0; int bytes_sent; unsigned char* encoded; size_t segment_len = send_msg_size == 0 ? send_length : send_msg_size; - ssize_t encoded_len = param->encode(thread_ctx->quic, slot, loop_callback_ctx, &encoded, + ssize_t encoded_len = param->encode(slot, loop_callback_ctx, &encoded, (const unsigned char*)send_buffer, send_length, &segment_len, &peer_addr, &local_addr); if (encoded_len <= 0) { DBG_PRINTF("Encoding fails, ret=%d\n", encoded_len); continue; } + if (encoded_len < segment_len) { + DBG_PRINTF("Encoded len shorter than original: %d < %d", encoded_len, segment_len); + return -1; + } + + if (send_msg_size > 0) { + send_msg_size = segment_len; // new size after encoding + } + + const SOCKET_TYPE send_socket = s_ctx->fd; + bytes_sent = picoquic_sendmsg(send_socket, + (struct sockaddr*)&peer_addr, (struct sockaddr*)&local_addr, param->dest_if, + (const char*)encoded, (int)encoded_len, (int)send_msg_size, &sock_err); + free(encoded); + if (bytes_sent == 0) { + DBG_PRINTF("BYTES_SENT == 0 %d\n", bytes_sent); + return -1; + } + if (bytes_sent < 0) { + return bytes_sent; + } + + nb_packets_sent++; + } + + if (!param->is_client || nb_packet_received == 0) { + continue; + } + + size_t nb_polls_sent = 0; + nb_slots_read = 0; + while (nb_slots_read < nb_slots_written) { + uint8_t send_buffer[send_buffer_size]; + slot_t* slot = &slots[nb_slots_read]; + assert(slot != NULL); + nb_slots_read++; + if (slot->cnx == NULL) { + continue; // in case the slot written was a bogus message + } + + slot->cnx->is_poll_requested = 1; + + size_t send_length = 0; + struct sockaddr_storage peer_addr = {0}; + struct sockaddr_storage local_addr = {0}; + int if_index = param->dest_if; + int ret = picoquic_prepare_packet_ex(slot->cnx, loop_time, + send_buffer, send_buffer_size, &send_length, + &peer_addr, &local_addr, &if_index, send_msg_ptr); + if (ret < 0) { + return -1; + } + if (param->is_client && send_length == 0) { + break; + } + if (slot->cnx->is_poll_requested == 1) { + // TODO: should we try again or skip this slot + continue; + } + + int sock_err = 0; + int bytes_sent; + unsigned char* encoded; + size_t segment_len = send_msg_size == 0 ? send_length : send_msg_size; + ssize_t encoded_len = param->encode(slot, loop_callback_ctx, &encoded, + (const unsigned char*)send_buffer, send_length, &segment_len, &peer_addr, &local_addr); + if (encoded_len <= 0) { + DBG_PRINTF("Encoding fails, ret=%d\n", encoded_len); + continue; + } + + if (encoded_len < segment_len) { + DBG_PRINTF("Encoded len shorter than original: %d < %d", encoded_len, segment_len); + return -1; + } + if (send_msg_size > 0) { send_msg_size = segment_len; // new size after encoding } @@ -203,6 +294,12 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_ if (bytes_sent < 0) { return bytes_sent; } + + nb_polls_sent++; + } + + if (param->is_client) { + DBG_PRINTF("[polls_sent:%d][sent:%d][received:%d]", nb_polls_sent, nb_packets_sent, nb_packet_received); } } diff --git a/src/slipstream_utils.c b/src/slipstream_utils.c index bee8c69..87cc97d 100644 --- a/src/slipstream_utils.c +++ b/src/slipstream_utils.c @@ -1,9 +1,12 @@ #include "slipstream_utils.h" +#include #include #include #include +#include "picoquic_utils.h" + char* picoquic_connection_id_to_string(const picoquic_connection_id_t* cid) { // Each byte needs 2 hex characters + null terminator @@ -42,3 +45,23 @@ void sockaddr_dummy(struct sockaddr_storage *addr_storage) { addr4->sin_len = sizeof(struct sockaddr_in); #endif } + +void print_sockaddr_ip_and_port(struct sockaddr_storage *addr_storage) { + char ip_str[INET6_ADDRSTRLEN]; + int port; + + if (addr_storage->ss_family == AF_INET) { + struct sockaddr_in *addr4 = (struct sockaddr_in *)addr_storage; + inet_ntop(AF_INET, &addr4->sin_addr, ip_str, INET6_ADDRSTRLEN); + port = ntohs(addr4->sin_port); + } else if (addr_storage->ss_family == AF_INET6) { + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr_storage; + inet_ntop(AF_INET6, &addr6->sin6_addr, ip_str, INET6_ADDRSTRLEN); + port = ntohs(addr6->sin6_port); + } else { + DBG_PRINTF("Unknown address family", NULL); + return; + } + + DBG_PRINTF("%s:%d", ip_str, port); +}