Implement QUIC-native polls, disable server congestion control, "disable" ack delays in server, path add checks, disable nat rebinding

This commit is contained in:
Jop Zitman 2025-01-03 11:19:45 +08:00
parent 7d7750864a
commit 2fe1241583
16 changed files with 292 additions and 309 deletions

View file

@ -39,15 +39,15 @@ add_executable(slipstream
src/slipstream.c
src/slipstream_client.c
src/slipstream_inline_dots.c
src/slipstream_packet.c
src/slipstream_resolver_addresses.c
src/slipstream_server.c
src/slipstream_server_cc.c
src/slipstream_sockloop.c
src/slipstream_utils.c
include/slipstream.h
include/slipstream_inline_dots.h
include/slipstream_packet.h
include/slipstream_resolver_addresses.h
include/slipstream_server_cc.h
include/slipstream_slot.h
include/slipstream_sockloop.h
include/slipstream_utils.h

2
extern/picoquic vendored

@ -1 +1 @@
Subproject commit c00a6f69032a255ee15892a0fec3c8e8cc1da4a4
Subproject commit c3419f8dccf441ad9c2a14d747261701f2884edd

View file

@ -23,7 +23,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f
const char* cc_algo_id);
int picoquic_slipstream_server(int server_port, const char* pem_cert, const char* pem_key, char const* upstream_name,
int upstream_port, const char* domain_name, const char* cc_algo_id);
int upstream_port, const char* domain_name);
#ifdef __cplusplus
}

View file

@ -1,15 +0,0 @@
#ifndef SLIPSTREAM_PACKET
#define SLIPSTREAM_PACKET
#include <stdbool.h>
#include "picoquic.h"
#define PICOQUIC_SHORT_HEADER_CONNECTION_ID_SIZE 8
bool slipstream_packet_is_long_header(const uint8_t first_byte);
int slipstream_packet_create_poll(uint8_t** dest_buf, size_t* dest_buf_len, picoquic_connection_id_t dst_connection_id);
int slipstream_packet_parse(uint8_t* src_buf, size_t src_buf_len, size_t short_header_conn_id_len, picoquic_connection_id_t* src_connection_id, picoquic_connection_id_t* dst_connection_id, bool* is_poll_packet);
#endif // SLIPSTREAM_PACKET

View file

@ -1,8 +1,15 @@
#ifndef SLIPSTREAM_RESOLVERS_H
#define SLIPSTREAM_RESOLVERS_H
#include <stdbool.h>
#include <stdio.h>
#include <sys/socket.h>
struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_filename, size_t *count);
typedef struct st_address_t {
struct sockaddr_storage server_address;
bool added;
} address_t;
struct st_address_t* read_resolver_addresses(const char *resolver_addresses_filename, size_t *count);
#endif //SLIPSTREAM_RESOLVERS_H

View file

@ -0,0 +1,8 @@
#ifndef SLIPSTREAM_SERVER_CC_H
#define SLIPSTREAM_SERVER_CC_H
#include "picoquic.h"
extern picoquic_congestion_algorithm_t* slipstream_server_cc_algorithm;
#endif //SLIPSTREAM_SERVER_CC_H

View file

@ -11,7 +11,7 @@ typedef struct st_slot_t {
struct sockaddr_storage local_addr;
picoquic_cnx_t* cnx;
uint64_t created_time;
int query_id;
bool is_poll_packet;
} slot_t;
#endif // SLIPSTREAM_SLOT

View file

@ -7,4 +7,6 @@ char* picoquic_connection_id_to_string(const picoquic_connection_id_t* cid);
void sockaddr_dummy(struct sockaddr_storage *addr_storage);
void print_sockaddr_ip_and_port(struct sockaddr_storage *addr_storage);
#endif //SLIPSTREAM_UTILS_H

View file

@ -30,6 +30,9 @@ int main(int argc, char** argv)
(void)WSA_START(MAKEWORD(2, 2), &wsaData);
#endif
setbuf(stdout, NULL);
setbuf(stderr, NULL);
if (argc < 2) {
usage(argv[0]);
}
@ -46,15 +49,14 @@ int main(int argc, char** argv)
}
}
else if (strcmp(argv[1], "server") == 0) {
if (argc != 9) {
if (argc != 8) {
usage(argv[0]);
}
else {
int server_port = get_port(argv[0], argv[2]);
int remote_port = get_port(argv[0], argv[6]);
const char* domain_name = argv[7];
const char* cc_algo_id = argv[8];
exit_code = picoquic_slipstream_server(server_port, argv[3], argv[4], argv[5], remote_port, domain_name, cc_algo_id);
exit_code = picoquic_slipstream_server(server_port, argv[3], argv[4], argv[5], remote_port, domain_name);
}
}
else

View file

@ -22,8 +22,8 @@
#include "picoquic_config.h"
#include "slipstream.h"
#include "slipstream_inline_dots.h"
#include "slipstream_packet.h"
#include "slipstream_resolver_addresses.h"
#include "slipstream_utils.h"
#include "SPCDNS/src/dns.h"
#include "SPCDNS/src/mappings.h"
@ -39,15 +39,17 @@ typedef struct st_slipstream_client_ctx_t {
picoquic_cnx_t* cnx;
slipstream_client_stream_ctx_t* first_stream;
picoquic_network_thread_ctx_t* thread_ctx;
struct sockaddr_storage* server_addresses;
struct st_address_t* server_addresses;
size_t server_address_count;
uint64_t last_request;
bool ready;
bool closed;
} slipstream_client_ctx_t;
char* client_domain_name = NULL;
size_t client_domain_name_len = 0;
ssize_t client_encode_segment(picoquic_quic_t* quic, dns_packet_t* packet, size_t* packet_len, const unsigned char* src_buf, size_t src_buf_len) {
ssize_t client_encode_segment(dns_packet_t* packet, size_t* packet_len, const unsigned char* src_buf, size_t src_buf_len) {
char name[255];
const size_t len = b32_encode(&name[0], (const char*) src_buf, src_buf_len, true, false);
const size_t encoded_len = slipstream_inline_dotify(name, 255, len);
@ -89,7 +91,7 @@ ssize_t client_encode_segment(picoquic_quic_t* quic, dns_packet_t* packet, size_
return 0;
}
ssize_t client_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
ssize_t client_encode(void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
assert(callback_ctx);
slipstream_client_ctx_t* client_ctx = callback_ctx;
@ -97,7 +99,7 @@ ssize_t client_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, u
if (src_buf_len <= *segment_len) {
size_t packet_len = MAX_DNS_QUERY_SIZE;
unsigned char* packet = malloc(packet_len);
const ssize_t ret = client_encode_segment(quic, (dns_packet_t*) packet, &packet_len, src_buf, src_buf_len);
const ssize_t ret = client_encode_segment((dns_packet_t*) packet, &packet_len, src_buf, src_buf_len);
if (ret < 0) {
free(packet);
return -1;
@ -119,7 +121,7 @@ ssize_t client_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, u
size_t first_packet_len = 0;
for (size_t i = 0; i < num_segments; i++) {
size_t packet_len = MAX_DNS_QUERY_SIZE;
const ssize_t ret = client_encode_segment(quic, (dns_packet_t*) current_packet, &packet_len, segment, *segment_len);
const ssize_t ret = client_encode_segment((dns_packet_t*) current_packet, &packet_len, segment, *segment_len);
if (ret < 0) {
free(packets);
return -1;
@ -147,9 +149,8 @@ ssize_t client_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, u
return current_packet - packets;
}
ssize_t client_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, picoquic_socket_ctx_t* s_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
ssize_t client_decode(void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
*dest_buf = NULL;
slipstream_client_ctx_t* client_ctx = callback_ctx;
size_t bufsize = DNS_DECODEBUF_4K * sizeof(dns_decoded_t);
dns_decoded_t decoded[DNS_DECODEBUF_4K] = {0};
@ -190,62 +191,6 @@ ssize_t client_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, p
*dest_buf = malloc(answer_txt->len);
memcpy((void*)*dest_buf, answer_txt->text, answer_txt->len);
picoquic_connection_id_t incoming_src_connection_id = {0};
picoquic_connection_id_t incoming_dest_connection_id; // sure to be set by parser
bool is_poll_packet = false;
int ret = slipstream_packet_parse(*dest_buf, answer_txt->len, PICOQUIC_SHORT_HEADER_CONNECTION_ID_SIZE,
&incoming_src_connection_id, &incoming_dest_connection_id, &is_poll_packet);
if (ret != 0 || is_poll_packet) {
DBG_PRINTF("[%d] error parsing slipstream packet: %d", query->id, ret);
return answer_txt->len;
}
// get active destination connection id on this ctx
picoquic_cnx_t* cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id);
picoquic_connection_id_t outgoing_dest_connection_id = cnx->path[0]->p_remote_cnxid->cnx_id;
if (outgoing_dest_connection_id.id_len == 0) {
// p_remote_cnxid is not set yet when we are receiving the first server response
outgoing_dest_connection_id = incoming_src_connection_id;
}
const int poll_ratio = 1;
for (int j = 0; j < poll_ratio; ++j) {
uint8_t* poll_packet_buf;
size_t poll_packet_len;
ret = slipstream_packet_create_poll(&poll_packet_buf, &poll_packet_len, outgoing_dest_connection_id);
if (ret < 0) {
DBG_PRINTF("error creating poll packet", NULL);
return answer_txt->len;
}
unsigned char* encoded;
ssize_t encoded_len = client_encode(quic, slot_p, callback_ctx, &encoded, poll_packet_buf, poll_packet_len, &poll_packet_len, peer_addr, local_addr);
if (encoded_len <= 0) {
DBG_PRINTF("error encoding poll packet", NULL);
free(poll_packet_buf);
return answer_txt->len;
}
const SOCKET_TYPE send_socket = s_ctx->fd;
int sock_err = 0;
ret = picoquic_sendmsg(send_socket,
(struct sockaddr*)peer_addr, (struct sockaddr*)local_addr, 0,
(const char*)encoded, encoded_len, 0, &sock_err);
if (ret < 0) {
DBG_PRINTF("Error sending poll packet, ret=%d, sock_err=%d %s", ret, sock_err, strerror(sock_err));
free(poll_packet_buf);
free(encoded);
return answer_txt->len;
}
free(poll_packet_buf);
free(encoded);
}
client_ctx->last_request = picoquic_current_time();
return answer_txt->len;
}
@ -298,8 +243,7 @@ static void slipstream_client_free_context(slipstream_client_ctx_t* client_ctx)
free(client_ctx->server_addresses);
/* release the memory */
free(client_ctx);
client_ctx->closed = true;
}
void slipstream_client_mark_active_pass(slipstream_client_ctx_t* client_ctx) {
@ -315,64 +259,48 @@ void slipstream_client_mark_active_pass(slipstream_client_ctx_t* client_ctx) {
}
}
void slipstream_add_paths(slipstream_client_ctx_t* client_ctx) {
picoquic_cnx_t* cnx = client_ctx->cnx;
// add rest of the resolvers
for (size_t i = 1; i < client_ctx->server_address_count; i++) {
address_t* slipstream_path = &client_ctx->server_addresses[i];
if (slipstream_path->added) {
continue;
}
uint64_t current_time = picoquic_current_time();
// if (current_time - cnx->start_time < 10000000) {
// DBG_PRINTF("Can't add path yet", NULL);
// continue;
// }
print_sockaddr_ip_and_port(&slipstream_path->server_address);
int path_id = -2;
picoquic_probe_new_path_ex(cnx, (struct sockaddr*)&slipstream_path->server_address, (struct sockaddr*)&cnx->path[0]->local_addr, 0, current_time, 0, &path_id);
if (path_id < 0) {
DBG_PRINTF("Failed adding path", NULL);
continue;
}
DBG_PRINTF("Added path", NULL);
picoquic_reinsert_by_wake_time(cnx->quic, cnx, current_time);
slipstream_path->added = true;
}
}
int slipstream_client_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_loop_cb_enum cb_mode,
void* callback_ctx, void* callback_arg) {
slipstream_client_ctx_t* client_ctx = callback_ctx;
if (client_ctx->closed) {
return 0;
}
switch (cb_mode) {
case picoquic_packet_loop_before_select:
uint64_t current_time = picoquic_current_time();
uint64_t passed = current_time - client_ctx->last_request;
if (passed < 200000) {
break;
if (client_ctx->ready) {
slipstream_add_paths(client_ctx);
}
const picoquic_cnx_t* cnx = client_ctx->cnx;
const int path_index = rand() % cnx->nb_paths;
const picoquic_path_t* path_x = cnx->path[path_index];
picoquic_connection_id_t outgoing_dest_connection_id = path_x->p_remote_cnxid->cnx_id;
if (outgoing_dest_connection_id.id_len == 0) {
outgoing_dest_connection_id = cnx->initial_cnxid;
}
uint8_t* poll_packet_buf;
size_t poll_packet_len;
int ret = slipstream_packet_create_poll(&poll_packet_buf, &poll_packet_len, outgoing_dest_connection_id);
if (ret < 0) {
DBG_PRINTF("error creating poll packet", NULL);
return 0;
}
struct sockaddr_storage *peer_addr = &cnx->path[0]->peer_addr;
struct sockaddr_storage *local_addr = &cnx->path[0]->local_addr;
picoquic_socket_ctx_t* s_ctx = (picoquic_socket_ctx_t*)callback_arg;
unsigned char* encoded;
ssize_t encoded_len = client_encode(quic, NULL, client_ctx, &encoded, poll_packet_buf, poll_packet_len, &poll_packet_len, NULL, NULL);
if (encoded_len <= 0) {
DBG_PRINTF("error encoding poll packet", NULL);
free(poll_packet_buf);
return 0;
}
const SOCKET_TYPE send_socket = s_ctx->fd;
int sock_err = 0;
ret = picoquic_sendmsg(send_socket,
(struct sockaddr*)peer_addr, (struct sockaddr*)local_addr, 0,
(const char*)encoded, encoded_len, 0, &sock_err);
if (ret < 0) {
DBG_PRINTF("Error sending poll packet, ret=%d, sock_err=%d %s", ret, sock_err, strerror(sock_err));
free(poll_packet_buf);
free(encoded);
return 0;
}
free(poll_packet_buf);
free(encoded);
client_ctx->last_request = current_time;
case picoquic_packet_loop_wake_up:
if (callback_ctx == NULL) {
return 0;
@ -640,26 +568,8 @@ int slipstream_client_callback(picoquic_cnx_t* cnx,
break;
case picoquic_callback_ready:
fprintf(stdout, "Connection confirmed.\n");
// add rest of the resolvers
for (size_t i = 1; i < client_ctx->server_address_count; i++) {
struct sockaddr* server_address = (struct sockaddr*)&client_ctx->server_addresses[i];
uint64_t current_time = picoquic_current_time();
// convert server address to string
char host[NI_MAXHOST];
socklen_t addrlen = sizeof(*server_address);
ret = getnameinfo(server_address, addrlen, host, sizeof(host), NULL, 0, NI_NUMERICHOST | NI_NUMERICSERV);
if (ret == 0) {
printf("Probing path: %s", host);
}
ret = picoquic_probe_new_path(cnx, server_address, NULL, current_time);
if (ret != 0) {
fprintf(stderr, "Could not create probe new path\n");
return -1;
}
}
break;
client_ctx->ready = true;
slipstream_add_paths(client_ctx);
default:
/* unexpected -- just ignore. */
break;
@ -707,7 +617,7 @@ static int slipstream_connect(struct sockaddr_storage* server_address,
}
// 400ms
// picoquic_enable_keep_alive(*cnx, 400000);
picoquic_enable_keep_alive(*cnx, 400000);
/* Document connection in client's context */
client_ctx->cnx = *cnx;
@ -768,10 +678,9 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f
/* Create the QUIC context for the server */
current_time = picoquic_current_time();
// one connection only, freed in slipstream_client_free_context on picoquic close callback
slipstream_client_ctx_t *client_ctx = malloc(sizeof(slipstream_client_ctx_t));
memset(client_ctx, 0, sizeof(slipstream_client_ctx_t));
slipstream_client_ctx_t client_ctx = {0};
/* Create QUIC context */
picoquic_quic_t* quic = picoquic_create_and_configure(&config, slipstream_client_callback, client_ctx, current_time, NULL);
picoquic_quic_t* quic = picoquic_create_and_configure(&config, slipstream_client_callback, &client_ctx, current_time, NULL);
if (quic == NULL) {
fprintf(stderr, "Could not create server context\n");
return -1;
@ -783,17 +692,19 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f
debug_printf_push_stream(stderr);
#endif
picoquic_set_key_log_file_from_env(quic);
// picoquic_set_textlog(quic, "-");
// picoquic_set_log_level(quic, 1);
// TODO: idle timeout?
/* Read the server address list from the file */
client_ctx->server_addresses = read_resolver_addresses(resolver_addresses_filename, &client_ctx->server_address_count);
if (!client_ctx->server_addresses) {
client_ctx.server_addresses = read_resolver_addresses(resolver_addresses_filename, &client_ctx.server_address_count);
if (!client_ctx.server_addresses) {
printf("Failed to read IP addresses\n");
return 1;
}
picoquic_cnx_t* cnx = NULL;
ret = slipstream_connect(client_ctx->server_addresses, quic, &cnx, client_ctx);
ret = slipstream_connect(&client_ctx.server_addresses[0].server_address, quic, &cnx, &client_ctx);
if (ret != 0) {
fprintf(stderr, "Could not connect to server\n");
return -1;
@ -847,17 +758,17 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f
thread_ctx.quic = quic;
thread_ctx.param = &param;
thread_ctx.loop_callback = slipstream_client_sockloop_callback;
thread_ctx.loop_callback_ctx = client_ctx;
thread_ctx.loop_callback_ctx = &client_ctx;
/* Open the wake up pipe or event */
picoquic_open_network_wake_up(&thread_ctx, &ret);
client_ctx->thread_ctx = &thread_ctx;
client_ctx.thread_ctx = &thread_ctx;
slipstream_client_accepter_args* args = malloc(sizeof(slipstream_client_accepter_args));
args->fd = listen_sock;
args->cnx = cnx;
args->client_ctx = client_ctx;
args->client_ctx = &client_ctx;
args->thread_ctx = &thread_ctx;
pthread_t thread;
@ -867,6 +778,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f
}
signal(SIGTERM, client_sighandler);
// picoquic_packet_loop_v3(&thread_ctx);
slipstream_packet_loop(&thread_ctx);
ret = thread_ctx.return_code;

View file

@ -1,95 +0,0 @@
#include <stdlib.h>
#include <string.h>
#include "slipstream_packet.h"
#include "picoquic_utils.h"
const int num_padding_for_poll = 5;
#define PICOQUIC_SHORT_HEADER_CONNECTION_ID_SIZE 8
bool slipstream_packet_is_long_header(const uint8_t first_byte) {
return first_byte & 0x80;
}
int slipstream_packet_create_poll(uint8_t** dest_buf, size_t* dest_buf_len, picoquic_connection_id_t dst_connection_id) {
*dest_buf = NULL;
if (num_padding_for_poll < 5) {
return -1;
}
// Allocate a num_padding_for_poll + dst_connection_id.id_len + len marker + dst_connection_id len marker
size_t packet_len = num_padding_for_poll + dst_connection_id.id_len + 1 + 1;
uint8_t* packet = malloc(packet_len);
// Write random padding bytes to the entire packet
for (int i = 0; i < packet_len; i++) {
packet[i] = rand() % 256;
}
packet[0] |= 0x80; // Set bit 7 (long header format)
// Write destination connection ID
packet[5] = dst_connection_id.id_len;
memcpy(&packet[6], dst_connection_id.id, dst_connection_id.id_len);
// Ensure the source connection ID len marker byte is larger than PICOQUIC_CONNECTION_ID_MAX_SIZE
int randomly_written_src_connection_id = packet[5+1+dst_connection_id.id_len];
if (randomly_written_src_connection_id <= PICOQUIC_CONNECTION_ID_MAX_SIZE) {
packet[5+1+dst_connection_id.id_len] = PICOQUIC_CONNECTION_ID_MAX_SIZE + 1;
}
// The rest of the payload (including pretend second connection ID) is random padding
*dest_buf = packet;
*dest_buf_len = packet_len;
return packet_len;
}
int slipstream_packet_parse(uint8_t* src_buf, size_t src_buf_len, size_t short_header_conn_id_len, picoquic_connection_id_t* src_connection_id, picoquic_connection_id_t* dst_connection_id, bool* is_poll_packet) {
if (src_buf_len < 1) {
return -1;
}
// Short header packet
if (!slipstream_packet_is_long_header(src_buf[0])) {
// Short header packets can't be poll packets
if (src_buf_len < short_header_conn_id_len + 1) {
return -1;
}
picoquic_parse_connection_id(&src_buf[1], short_header_conn_id_len, dst_connection_id);
return 0;
}
// Read destination connection ID
if (src_buf_len < 5+1) {
return -1;
}
const size_t dst_connection_id_len = src_buf[5];
if (dst_connection_id_len > PICOQUIC_CONNECTION_ID_MAX_SIZE) {
return -1;
}
if (src_buf_len < 5+1+dst_connection_id_len) {
return -1;
}
picoquic_parse_connection_id(&src_buf[5+1], dst_connection_id_len, dst_connection_id);
// Read source connection ID
if (src_buf_len < 5+1+dst_connection_id_len+1) {
return -1;
}
const size_t src_connection_id_len = src_buf[5+1+dst_connection_id_len];
if (src_connection_id_len > PICOQUIC_CONNECTION_ID_MAX_SIZE) {
*is_poll_packet = true;
return 0;
}
if (src_buf_len < 5+1+dst_connection_id_len+1+src_connection_id_len) {
return -1;
}
picoquic_parse_connection_id(&src_buf[5+1+dst_connection_id_len+1], src_connection_id_len, src_connection_id);
return 0;
}

View file

@ -10,7 +10,7 @@
#define MAX_LINE_LENGTH 50
#define DEFAULT_PORT 53
struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_filename, size_t *count) {
struct st_address_t* read_resolver_addresses(const char *resolver_addresses_filename, size_t *count) {
*count = 0;
FILE *fp = fopen(resolver_addresses_filename, "r");
@ -19,7 +19,7 @@ struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_
}
int capacity = INITIAL_CAPACITY;
struct sockaddr_storage* server_address = calloc(capacity, sizeof(struct sockaddr_storage));
struct st_address_t* server_address = calloc(capacity, sizeof(struct st_address_t));
if (!server_address) {
fclose(fp);
return NULL;
@ -40,7 +40,7 @@ struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_
// Resize array if needed
if (valid_addresses == capacity) {
capacity *= 2;
struct sockaddr_storage* temp = realloc(server_address, capacity * sizeof(struct sockaddr_storage));
struct st_address_t* temp = realloc(server_address, capacity * sizeof(struct st_address_t));
if (!temp) {
fprintf(stderr, "Memory allocation failed\n");
free(server_address);
@ -61,7 +61,7 @@ struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_
printf("Adding %s:%d\n", server_name, server_port);
int is_name = 0;
if (picoquic_get_server_address(server_name, server_port, &server_address[valid_addresses], &is_name) != 0) {
if (picoquic_get_server_address(server_name, server_port, &server_address[valid_addresses].server_address, &is_name) != 0) {
fprintf(stderr, "Cannot get the IP address for <%s> port <%d>\n", server_name, server_port);
continue; // Skip invalid addresses instead of failing
}
@ -73,7 +73,7 @@ struct sockaddr_storage* read_resolver_addresses(const char *resolver_addresses_
// Trim excess memory if needed
if (valid_addresses < capacity) {
struct sockaddr_storage* temp = realloc(server_address, valid_addresses * sizeof(struct sockaddr_storage));
struct st_address_t* temp = realloc(server_address, valid_addresses * sizeof(struct st_address_t));
if (temp) {
server_address = temp;
}

View file

@ -13,13 +13,15 @@
#include <sys/param.h>
#include <sys/poll.h>
#include <assert.h>
#include <picoquic_internal.h>
#include <slipstream_sockloop.h>
#include "lua-resty-base-encoding-base32.h"
#include "picoquic_config.h"
#include "picoquic_logger.h"
#include "slipstream.h"
#include "slipstream_inline_dots.h"
#include "slipstream_packet.h"
#include "../include/slipstream_server_cc.h"
#include "slipstream_slot.h"
#include "slipstream_utils.h"
#include "SPCDNS/src/dns.h"
@ -28,7 +30,7 @@
char* server_domain_name = NULL;
size_t server_domain_name_len = 0;
ssize_t server_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
ssize_t server_encode(void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, size_t* segment_len, struct sockaddr_storage* peer_addr, struct sockaddr_storage* local_addr) {
// we don't support segmentation in the server
assert(segment_len == NULL || *segment_len == 0 || *segment_len == src_buf_len);
@ -89,7 +91,7 @@ ssize_t server_encode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, u
return packet_len;
}
ssize_t server_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, picoquic_socket_ctx_t* s_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) {
ssize_t server_decode(void* slot_p, void* callback_ctx, unsigned char** dest_buf, const unsigned char* src_buf, size_t src_buf_len, struct sockaddr_storage *peer_addr, struct sockaddr_storage *local_addr) {
*dest_buf = NULL;
slot_t* slot = slot_p;
@ -155,28 +157,6 @@ ssize_t server_decode(picoquic_quic_t* quic, void* slot_p, void* callback_ctx, p
return 0;
}
picoquic_connection_id_t incoming_src_connection_id = {0};
picoquic_connection_id_t incoming_dest_connection_id; // sure to be set by parser
bool is_poll_packet = false;
// ReSharper disable once CppDFAUnreachableCode
const int ret = slipstream_packet_parse(decoded_buf, decoded_len, PICOQUIC_SHORT_HEADER_CONNECTION_ID_SIZE,
&incoming_src_connection_id, &incoming_dest_connection_id, &is_poll_packet);
if (ret != 0) {
free(decoded_buf);
DBG_PRINTF("error parsing slipstream packet: %d", ret);
slot->error = RCODE_SERVER_FAILURE;
return 0;
}
picoquic_cnx_t *cnx = picoquic_cnx_by_id_(quic, incoming_dest_connection_id);
slot->cnx = cnx;
slot->query_id = query->id;
if (is_poll_packet) {
free(decoded_buf);
return 0;
}
*dest_buf = decoded_buf;
return decoded_len;
@ -556,7 +536,7 @@ void server_sighandler(int signum) {
}
int picoquic_slipstream_server(int server_port, const char* server_cert, const char* server_key,
char const* upstream_name, int upstream_port, const char* domain_name, const char* cc_algo_id) {
char const* upstream_name, int upstream_port, const char* domain_name) {
/* Start: start the QUIC process with cert and key files */
int ret = 0;
uint64_t current_time = 0;
@ -586,7 +566,6 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
config.mtu_max = mtu;
config.initial_send_mtu_ipv4 = mtu;
config.initial_send_mtu_ipv6 = mtu;
config.cc_algo_id = cc_algo_id;
config.multipath_option = 1;
config.use_long_log = 1;
config.do_preemptive_repeat = 1;
@ -610,6 +589,10 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
debug_printf_push_stream(stderr);
#endif
picoquic_set_key_log_file_from_env(quic);
// picoquic_set_textlog(quic, "-");
// picoquic_set_log_level(quic, 1);
picoquic_set_default_congestion_algorithm(quic, slipstream_server_cc_algorithm);
picoquic_packet_loop_param_t param = {0};
param.local_af = AF_INET;
@ -632,6 +615,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
default_context.thread_ctx = &thread_ctx;
signal(SIGTERM, server_sighandler);
// picoquic_packet_loop_v3(&thread_ctx);
slipstream_packet_loop(&thread_ctx);
ret = thread_ctx.return_code;

View file

@ -0,0 +1,58 @@
#include "../include/slipstream_server_cc.h"
#include <stdlib.h>
#include <picoquic_internal.h>
typedef enum {
slipstream_server_cc_alg_none = 0,
} slipstream_server_cc_alg_state_t;
typedef struct st_slipstream_server_cc_t {
slipstream_server_cc_alg_state_t state;
} slipstream_server_cc_t;
static void slipstream_server_cc_init(picoquic_cnx_t * cnx, picoquic_path_t* path_x, uint64_t current_time)
{
slipstream_server_cc_t* state = (slipstream_server_cc_t*)malloc(sizeof(slipstream_server_cc_t));
path_x->congestion_alg_state = (void*)state;
}
static void slipstream_server_cc_notify(
picoquic_cnx_t* cnx,
picoquic_path_t* path_x,
picoquic_congestion_notification_t notification,
picoquic_per_ack_state_t * ack_state,
uint64_t current_time)
{
path_x->is_cc_data_updated = 1;
path_x->cwin = UINT64_MAX;
}
static void slipstream_server_cc_delete(picoquic_path_t* path_x) {
if (path_x->congestion_alg_state != NULL) {
free(path_x->congestion_alg_state);
path_x->congestion_alg_state = NULL;
}
}
void slipstream_server_cc_observe(picoquic_path_t* path_x, uint64_t* cc_state, uint64_t* cc_param)
{
slipstream_server_cc_t* state = (slipstream_server_cc_t*)path_x->congestion_alg_state;
*cc_state = (uint64_t)state->state;
*cc_param = UINT64_MAX;
}
#define picoquic_slipstream_server_cc_ID "slipstream_server"
#define PICOQUIC_CC_ALGO_NUMBER_SLIPSTREAM_SERVER 10
picoquic_congestion_algorithm_t slipstream_server_cc_algorithm_struct = {
picoquic_slipstream_server_cc_ID, PICOQUIC_CC_ALGO_NUMBER_SLIPSTREAM_SERVER,
slipstream_server_cc_init,
slipstream_server_cc_notify,
slipstream_server_cc_delete,
slipstream_server_cc_observe
};
picoquic_congestion_algorithm_t* slipstream_server_cc_algorithm = &slipstream_server_cc_algorithm_struct;

View file

@ -57,6 +57,7 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_
const picoquic_packet_loop_cb_fn loop_callback = thread_ctx->loop_callback;
void* loop_callback_ctx = thread_ctx->loop_callback_ctx;
slot_t slots[PICOQUIC_PACKET_LOOP_RECV_MAX] = {0};
slot_t send_slot = {0};
while (!thread_ctx->thread_should_close) {
if (loop_callback) {
@ -64,6 +65,7 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_
}
size_t nb_slots_written = 0;
size_t nb_packet_received = 0;
while (nb_slots_written < PICOQUIC_PACKET_LOOP_RECV_MAX) {
int64_t delta_t = 0;
@ -102,15 +104,13 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_
break;
}
slot_t* slot = param->is_client ? &slots[0] : &slots[nb_slots_written];
slot_t* slot = &slots[nb_slots_written];
assert(slot != NULL);
memset(slot, 0, sizeof(slot_t));
nb_slots_written++;
if (slot == NULL) {
return -1;
}
unsigned char* decoded;
bytes_recv = param->decode(thread_ctx->quic, slot, thread_ctx->loop_callback_ctx, s_ctx, &decoded,
bytes_recv = param->decode(slot, thread_ctx->loop_callback_ctx, &decoded,
(const unsigned char*)buffer, bytes_recv, &peer_addr, &local_addr);
if (bytes_recv < 0) {
DBG_PRINTF("decode() failed with error %d\n", bytes_recv);
@ -118,7 +118,6 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_
}
if (bytes_recv == 0) {
// poll packet
continue;
}
@ -137,14 +136,28 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_
return ret;
}
slot->cnx = last_cnx;
nb_packet_received++;
if (!param->is_client && last_cnx->nb_paths == 1) {
// server can only have 1 incoming path
picoquic_path_t* path_x = last_cnx->path[0];
picoquic_set_ack_needed(last_cnx, current_time, picoquic_packet_context_application, path_x, 1);
}
}
const uint64_t loop_time = picoquic_current_time();
size_t nb_packets_sent = 0;
size_t nb_slots_read = 0;
const size_t max_slots = param->is_client ? PICOQUIC_PACKET_LOOP_SEND_MAX : nb_slots_written;
while (nb_slots_read < max_slots) {
uint8_t send_buffer[send_buffer_size];
slot_t* slot = param->is_client ? &slots[0] : &slots[nb_slots_read];
slot_t* slot;
if (param->is_client) {
memset(&send_slot, 0, sizeof(slot_t));
slot = &send_slot;
} else {
slot = &slots[nb_slots_read];
}
assert(slot != NULL);
nb_slots_read++;
@ -154,18 +167,16 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_
int if_index = param->dest_if;
if (slot->error == RCODE_OKAY) {
picoquic_connection_id_t log_cid;
picoquic_cnx_t* last_cnx = NULL;
int ret;
if (!param->is_client && slot->cnx) {
ret = picoquic_prepare_packet_ex(slot->cnx, loop_time,
send_buffer, send_buffer_size, &send_length,
&peer_addr, &local_addr, &if_index, send_msg_ptr);
last_cnx = slot->cnx;
}
else if (param->is_client) {
ret = picoquic_prepare_next_packet_ex(quic, loop_time,
send_buffer, send_buffer_size, &send_length,
&peer_addr, &local_addr, &if_index, &log_cid, &last_cnx,
&peer_addr, &local_addr, &if_index, &log_cid, &slot->cnx,
send_msg_ptr);
}
if (ret < 0) {
@ -176,17 +187,97 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_
}
}
if (param->is_client && peer_addr.ss_family == 0 && slot->peer_addr.ss_family == 0) {
continue;
}
int sock_err = 0;
int bytes_sent;
unsigned char* encoded;
size_t segment_len = send_msg_size == 0 ? send_length : send_msg_size;
ssize_t encoded_len = param->encode(thread_ctx->quic, slot, loop_callback_ctx, &encoded,
ssize_t encoded_len = param->encode(slot, loop_callback_ctx, &encoded,
(const unsigned char*)send_buffer, send_length, &segment_len, &peer_addr, &local_addr);
if (encoded_len <= 0) {
DBG_PRINTF("Encoding fails, ret=%d\n", encoded_len);
continue;
}
if (encoded_len < segment_len) {
DBG_PRINTF("Encoded len shorter than original: %d < %d", encoded_len, segment_len);
return -1;
}
if (send_msg_size > 0) {
send_msg_size = segment_len; // new size after encoding
}
const SOCKET_TYPE send_socket = s_ctx->fd;
bytes_sent = picoquic_sendmsg(send_socket,
(struct sockaddr*)&peer_addr, (struct sockaddr*)&local_addr, param->dest_if,
(const char*)encoded, (int)encoded_len, (int)send_msg_size, &sock_err);
free(encoded);
if (bytes_sent == 0) {
DBG_PRINTF("BYTES_SENT == 0 %d\n", bytes_sent);
return -1;
}
if (bytes_sent < 0) {
return bytes_sent;
}
nb_packets_sent++;
}
if (!param->is_client || nb_packet_received == 0) {
continue;
}
size_t nb_polls_sent = 0;
nb_slots_read = 0;
while (nb_slots_read < nb_slots_written) {
uint8_t send_buffer[send_buffer_size];
slot_t* slot = &slots[nb_slots_read];
assert(slot != NULL);
nb_slots_read++;
if (slot->cnx == NULL) {
continue; // in case the slot written was a bogus message
}
slot->cnx->is_poll_requested = 1;
size_t send_length = 0;
struct sockaddr_storage peer_addr = {0};
struct sockaddr_storage local_addr = {0};
int if_index = param->dest_if;
int ret = picoquic_prepare_packet_ex(slot->cnx, loop_time,
send_buffer, send_buffer_size, &send_length,
&peer_addr, &local_addr, &if_index, send_msg_ptr);
if (ret < 0) {
return -1;
}
if (param->is_client && send_length == 0) {
break;
}
if (slot->cnx->is_poll_requested == 1) {
// TODO: should we try again or skip this slot
continue;
}
int sock_err = 0;
int bytes_sent;
unsigned char* encoded;
size_t segment_len = send_msg_size == 0 ? send_length : send_msg_size;
ssize_t encoded_len = param->encode(slot, loop_callback_ctx, &encoded,
(const unsigned char*)send_buffer, send_length, &segment_len, &peer_addr, &local_addr);
if (encoded_len <= 0) {
DBG_PRINTF("Encoding fails, ret=%d\n", encoded_len);
continue;
}
if (encoded_len < segment_len) {
DBG_PRINTF("Encoded len shorter than original: %d < %d", encoded_len, segment_len);
return -1;
}
if (send_msg_size > 0) {
send_msg_size = segment_len; // new size after encoding
}
@ -203,6 +294,12 @@ int slipstream_packet_loop_(picoquic_network_thread_ctx_t* thread_ctx, picoquic_
if (bytes_sent < 0) {
return bytes_sent;
}
nb_polls_sent++;
}
if (param->is_client) {
DBG_PRINTF("[polls_sent:%d][sent:%d][received:%d]", nb_polls_sent, nb_packets_sent, nb_packet_received);
}
}

View file

@ -1,9 +1,12 @@
#include "slipstream_utils.h"
#include <picoquic_internal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "picoquic_utils.h"
char* picoquic_connection_id_to_string(const picoquic_connection_id_t* cid) {
// Each byte needs 2 hex characters + null terminator
@ -42,3 +45,23 @@ void sockaddr_dummy(struct sockaddr_storage *addr_storage) {
addr4->sin_len = sizeof(struct sockaddr_in);
#endif
}
void print_sockaddr_ip_and_port(struct sockaddr_storage *addr_storage) {
char ip_str[INET6_ADDRSTRLEN];
int port;
if (addr_storage->ss_family == AF_INET) {
struct sockaddr_in *addr4 = (struct sockaddr_in *)addr_storage;
inet_ntop(AF_INET, &addr4->sin_addr, ip_str, INET6_ADDRSTRLEN);
port = ntohs(addr4->sin_port);
} else if (addr_storage->ss_family == AF_INET6) {
struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr_storage;
inet_ntop(AF_INET6, &addr6->sin6_addr, ip_str, INET6_ADDRSTRLEN);
port = ntohs(addr6->sin6_port);
} else {
DBG_PRINTF("Unknown address family", NULL);
return;
}
DBG_PRINTF("%s:%d", ip_str, port);
}