diff --git a/src/slipstream_client.c b/src/slipstream_client.c index 8f04ada..8be26eb 100644 --- a/src/slipstream_client.c +++ b/src/slipstream_client.c @@ -33,6 +33,8 @@ typedef struct st_slipstream_client_stream_ctx_t { int fd; uint64_t stream_id; volatile sig_atomic_t set_active; + int syn_sent; + int syn_received; } slipstream_client_stream_ctx_t; typedef struct st_slipstream_client_ctx_t { @@ -44,6 +46,7 @@ typedef struct st_slipstream_client_ctx_t { uint64_t last_request; bool ready; bool closed; + int listen_sock; } slipstream_client_ctx_t; char* client_domain_name = NULL; @@ -405,6 +408,7 @@ void* slipstream_client_accepter(void* arg) { } printf("[%lu:%d] accept: connection\n[%lu:%d] wakeup\n", stream_ctx->stream_id, client_sock, stream_ctx->stream_id, client_sock); + break; } free(args); @@ -434,6 +438,27 @@ int slipstream_client_callback(picoquic_cnx_t* cnx, return 0; } + // skip syn + if (length > 0 && !stream_ctx->syn_received) { + DBG_PRINTF("recv syn %lu\n", stream_id); + length--; + bytes++; + stream_ctx->syn_received = 1; + + // allow accepting next connection + slipstream_client_accepter_args* args = malloc(sizeof(slipstream_client_accepter_args)); + args->fd = client_ctx->listen_sock; + args->cnx = cnx; + args->client_ctx = client_ctx; + args->thread_ctx = client_ctx->thread_ctx; + + pthread_t thread; + if (pthread_create(&thread, NULL, slipstream_client_accepter, args) != 0) { + perror("pthread_create() failed for thread"); + free(args); + } + } + // printf("[%lu:%d] quic_recv->send %lu bytes\n", stream_id, stream_ctx->fd, length); if (length > 0) { ssize_t bytes_sent = send(stream_ctx->fd, bytes, length, MSG_NOSIGNAL); @@ -493,6 +518,19 @@ int slipstream_client_callback(picoquic_cnx_t* cnx, /* This should never happen */ } else { + if (!stream_ctx->syn_sent) { + DBG_PRINTF("send syn %lu\n", stream_id); + uint8_t* buffer = picoquic_provide_stream_data_buffer(bytes, 1, 0, 1); + if (buffer == NULL) { + /* Should never happen according to callback spec. */ + break; + } + buffer[0] = 0; + stream_ctx->syn_sent = 1; + break; + } + // allow to send on stream even if we haven't syn_received + int length_available; ret = ioctl(stream_ctx->fd, FIONREAD, &length_available); // printf("[%lu:%d] recv->quic_send (available %d)\n", stream_id, stream_ctx->fd, length_available); @@ -687,6 +725,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f } picoquic_set_cookie_mode(quic, 2); + picoquic_set_default_priority(quic, 2); #ifdef BUILD_LOGLIB picoquic_set_qlog(quic, config.qlog_dir); debug_printf_push_stream(stderr); @@ -711,29 +750,29 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f } // Create listening socket - int listen_sock = socket(AF_INET, SOCK_STREAM, 0); - if (listen_sock < 0) { + client_ctx.listen_sock = socket(AF_INET, SOCK_STREAM, 0); + if (client_ctx.listen_sock < 0) { perror("socket() failed"); exit(EXIT_FAILURE); } int optval = 1; - setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); + setsockopt(client_ctx.listen_sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); struct sockaddr_in listen_addr = {0}; listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = INADDR_ANY; listen_addr.sin_port = htons(listen_port); - if (bind(listen_sock, (struct sockaddr*)&listen_addr, sizeof(listen_addr)) < 0) { + if (bind(client_ctx.listen_sock, (struct sockaddr*)&listen_addr, sizeof(listen_addr)) < 0) { perror("bind() failed"); - close(listen_sock); + close(client_ctx.listen_sock); exit(EXIT_FAILURE); } - if (listen(listen_sock, 5) < 0) { + if (listen(client_ctx.listen_sock, 5) < 0) { perror("listen() failed"); - close(listen_sock); + close(client_ctx.listen_sock); exit(EXIT_FAILURE); } @@ -766,7 +805,7 @@ int picoquic_slipstream_client(int listen_port, char const* resolver_addresses_f client_ctx.thread_ctx = &thread_ctx; slipstream_client_accepter_args* args = malloc(sizeof(slipstream_client_accepter_args)); - args->fd = listen_sock; + args->fd = client_ctx.listen_sock; args->cnx = cnx; args->client_ctx = &client_ctx; args->thread_ctx = &thread_ctx; diff --git a/src/slipstream_server.c b/src/slipstream_server.c index f721a76..4d1855f 100644 --- a/src/slipstream_server.c +++ b/src/slipstream_server.c @@ -168,6 +168,8 @@ typedef struct st_slipstream_server_stream_ctx_t { int fd; uint64_t stream_id; volatile sig_atomic_t set_active; + int syn_received; + int syn_sent; } slipstream_server_stream_ctx_t; typedef struct st_slipstream_server_ctx_t { @@ -188,6 +190,7 @@ slipstream_server_stream_ctx_t* slipstream_server_create_stream_ctx(slipstream_s return NULL; } + memset(stream_ctx, 0, sizeof(slipstream_server_stream_ctx_t)); int sock_fd = socket(AF_INET, SOCK_STREAM, 0); if (sock_fd < 0) { perror("socket() failed"); @@ -386,6 +389,17 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, printf("[%lu:%d] marked active\n", stream_id, stream_ctx->fd); } + // skip syn + if (length > 0 && !stream_ctx->syn_received) { + DBG_PRINTF("[stream_id=%d] recv syn", stream_ctx->stream_id); + length--; + bytes++; + stream_ctx->syn_received = 1; + picoquic_mark_active_stream(cnx, stream_id, 1, stream_ctx); + picoquic_set_stream_priority(cnx, stream_id, 0); + DBG_PRINTF("[stream_id=%d][leftover_length=%d]", stream_ctx->stream_id, length); + } + // printf("[%lu:%d] quic_recv->send %lu bytes\n", stream_id, stream_ctx->fd, length); if (length > 0) { ssize_t bytes_sent = send(stream_ctx->fd, bytes, length, MSG_NOSIGNAL); @@ -447,6 +461,20 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, /* This should never happen */ } else { + if (stream_ctx->syn_received && !stream_ctx->syn_sent) { + DBG_PRINTF("[stream_id=%d] send syn", stream_ctx->stream_id); + uint8_t* buffer = picoquic_provide_stream_data_buffer(bytes, 1, 0, 1); + if (buffer == NULL) { + /* Should never happen according to callback spec. */ + break; + } + buffer[0] = 0; + stream_ctx->syn_sent = 1; + picoquic_set_stream_priority(cnx, stream_id, cnx->quic->default_stream_priority); + break; + } + // allow to send on stream even if we haven't syn_received + int length_available; ret = ioctl(stream_ctx->fd, FIONREAD, &length_available); // printf("[%lu:%d] recv->quic_send (available %d)\n", stream_id, stream_ctx->fd, length_available); @@ -584,6 +612,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c } picoquic_set_cookie_mode(quic, 2); + picoquic_set_default_priority(quic, 2); #ifdef BUILD_LOGLIB picoquic_set_qlog(quic, config.qlog_dir); debug_printf_push_stream(stderr);