Ensure in-order TCP connections

This commit is contained in:
Jop Zitman 2025-01-17 17:13:08 +08:00
parent 223b3b3888
commit 52a97ded1b
2 changed files with 76 additions and 8 deletions

View file

@ -33,6 +33,8 @@ typedef struct st_slipstream_client_stream_ctx_t {
int fd;
uint64_t stream_id;
volatile sig_atomic_t set_active;
int syn_sent;
int syn_received;
} slipstream_client_stream_ctx_t;
typedef struct st_slipstream_client_ctx_t {
@ -44,6 +46,7 @@ typedef struct st_slipstream_client_ctx_t {
uint64_t last_request;
bool ready;
bool closed;
int listen_sock;
} slipstream_client_ctx_t;
char* client_domain_name = NULL;
@ -405,6 +408,7 @@ void* slipstream_client_accepter(void* arg) {
}
printf("[%lu:%d] accept: connection\n[%lu:%d] wakeup\n", stream_ctx->stream_id, client_sock, stream_ctx->stream_id, client_sock);
break;
}
free(args);
@ -434,6 +438,27 @@ int slipstream_client_callback(picoquic_cnx_t* cnx,
return 0;
}
// skip syn
if (length > 0 && !stream_ctx->syn_received) {
DBG_PRINTF("recv syn %lu\n", stream_id);
length--;
bytes++;
stream_ctx->syn_received = 1;
// allow accepting next connection
slipstream_client_accepter_args* args = malloc(sizeof(slipstream_client_accepter_args));
args->fd = client_ctx->listen_sock;
args->cnx = cnx;
args->client_ctx = client_ctx;
args->thread_ctx = client_ctx->thread_ctx;
pthread_t thread;
if (pthread_create(&thread, NULL, slipstream_client_accepter, args) != 0) {
perror("pthread_create() failed for thread");
free(args);
}
}
// printf("[%lu:%d] quic_recv->send %lu bytes\n", stream_id, stream_ctx->fd, length);
if (length > 0) {
ssize_t bytes_sent = send(stream_ctx->fd, bytes, length, MSG_NOSIGNAL);
@ -493,6 +518,19 @@ int slipstream_client_callback(picoquic_cnx_t* cnx,
/* This should never happen */
}
else {
if (!stream_ctx->syn_sent) {
DBG_PRINTF("send syn %lu\n", stream_id);
uint8_t* buffer = picoquic_provide_stream_data_buffer(bytes, 1, 0, 1);
if (buffer == NULL) {
/* Should never happen according to callback spec. */
break;
}
buffer[0] = 0;
stream_ctx->syn_sent = 1;
break;
}
// allow to send on stream even if we haven't syn_received
int length_available;
ret = ioctl(stream_ctx->fd, FIONREAD, &length_available);
// printf("[%lu:%d] recv->quic_send (available %d)\n", stream_id, stream_ctx->fd, length_available);
@ -687,6 +725,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f
}
picoquic_set_cookie_mode(quic, 2);
picoquic_set_default_priority(quic, 2);
#ifdef BUILD_LOGLIB
picoquic_set_qlog(quic, config.qlog_dir);
debug_printf_push_stream(stderr);
@ -711,29 +750,29 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f
}
// Create listening socket
int listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sock < 0) {
client_ctx.listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (client_ctx.listen_sock < 0) {
perror("socket() failed");
exit(EXIT_FAILURE);
}
int optval = 1;
setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
setsockopt(client_ctx.listen_sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
struct sockaddr_in listen_addr = {0};
listen_addr.sin_family = AF_INET;
listen_addr.sin_addr.s_addr = INADDR_ANY;
listen_addr.sin_port = htons(listen_port);
if (bind(listen_sock, (struct sockaddr*)&listen_addr, sizeof(listen_addr)) < 0) {
if (bind(client_ctx.listen_sock, (struct sockaddr*)&listen_addr, sizeof(listen_addr)) < 0) {
perror("bind() failed");
close(listen_sock);
close(client_ctx.listen_sock);
exit(EXIT_FAILURE);
}
if (listen(listen_sock, 5) < 0) {
if (listen(client_ctx.listen_sock, 5) < 0) {
perror("listen() failed");
close(listen_sock);
close(client_ctx.listen_sock);
exit(EXIT_FAILURE);
}
@ -766,7 +805,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f
client_ctx.thread_ctx = &thread_ctx;
slipstream_client_accepter_args* args = malloc(sizeof(slipstream_client_accepter_args));
args->fd = listen_sock;
args->fd = client_ctx.listen_sock;
args->cnx = cnx;
args->client_ctx = &client_ctx;
args->thread_ctx = &thread_ctx;

View file

@ -168,6 +168,8 @@ typedef struct st_slipstream_server_stream_ctx_t {
int fd;
uint64_t stream_id;
volatile sig_atomic_t set_active;
int syn_received;
int syn_sent;
} slipstream_server_stream_ctx_t;
typedef struct st_slipstream_server_ctx_t {
@ -188,6 +190,7 @@ slipstream_server_stream_ctx_t* slipstream_server_create_stream_ctx(slipstream_s
return NULL;
}
memset(stream_ctx, 0, sizeof(slipstream_server_stream_ctx_t));
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (sock_fd < 0) {
perror("socket() failed");
@ -386,6 +389,17 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
printf("[%lu:%d] marked active\n", stream_id, stream_ctx->fd);
}
// skip syn
if (length > 0 && !stream_ctx->syn_received) {
DBG_PRINTF("[stream_id=%d] recv syn", stream_ctx->stream_id);
length--;
bytes++;
stream_ctx->syn_received = 1;
picoquic_mark_active_stream(cnx, stream_id, 1, stream_ctx);
picoquic_set_stream_priority(cnx, stream_id, 0);
DBG_PRINTF("[stream_id=%d][leftover_length=%d]", stream_ctx->stream_id, length);
}
// printf("[%lu:%d] quic_recv->send %lu bytes\n", stream_id, stream_ctx->fd, length);
if (length > 0) {
ssize_t bytes_sent = send(stream_ctx->fd, bytes, length, MSG_NOSIGNAL);
@ -447,6 +461,20 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
/* This should never happen */
}
else {
if (stream_ctx->syn_received && !stream_ctx->syn_sent) {
DBG_PRINTF("[stream_id=%d] send syn", stream_ctx->stream_id);
uint8_t* buffer = picoquic_provide_stream_data_buffer(bytes, 1, 0, 1);
if (buffer == NULL) {
/* Should never happen according to callback spec. */
break;
}
buffer[0] = 0;
stream_ctx->syn_sent = 1;
picoquic_set_stream_priority(cnx, stream_id, cnx->quic->default_stream_priority);
break;
}
// allow to send on stream even if we haven't syn_received
int length_available;
ret = ioctl(stream_ctx->fd, FIONREAD, &length_available);
// printf("[%lu:%d] recv->quic_send (available %d)\n", stream_id, stream_ctx->fd, length_available);
@ -584,6 +612,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
}
picoquic_set_cookie_mode(quic, 2);
picoquic_set_default_priority(quic, 2);
#ifdef BUILD_LOGLIB
picoquic_set_qlog(quic, config.qlog_dir);
debug_printf_push_stream(stderr);