Memory leaks, proper closing and align client and server code again

This commit is contained in:
Jop Zitman 2024-12-11 20:24:37 +08:00
parent 34dba1d25d
commit b1cb046417
5 changed files with 113 additions and 81 deletions

View file

@ -10,7 +10,7 @@ set(ENABLE_ASAN OFF)
set(ENABLE_UBSAN OFF)
set(BUILD_DEMO OFF)
set(BUILD_HTTP OFF)
set(BUILD_LOGLIB OFF)
set(BUILD_LOGLIB ON)
set(BUILD_LOGREADER OFF)
set(BUILD_TESTING OFF)
@ -23,12 +23,15 @@ endif()
if(${CMAKE_BUILD_TYPE} STREQUAL "Debug")
list(APPEND PICOQUIC_ADDITIONAL_C_FLAGS -Og)
list(APPEND PICOQUIC_ADDITIONAL_CXX_FLAGS -Og)
set(ENABLE_ASAN Y)
else()
list(APPEND PICOQUIC_ADDITIONAL_C_FLAGS -O3)
list(APPEND PICOQUIC_ADDITIONAL_CXX_FLAGS -O3)
endif()
if(BUILD_LOGLIB)
list(APPEND PICOQUIC_COMPILE_DEFINITIONS BUILD_LOGLIB)
endif()
add_subdirectory(extern/picoquic)
add_executable(slipstream
@ -57,7 +60,7 @@ add_executable(slipstream
)
target_link_libraries(slipstream PRIVATE m)
target_link_libraries(slipstream PRIVATE picoquic-core)
if (${CMAKE_BUILD_TYPE} STREQUAL DEBUG)
if (BUILD_LOGLIB)
target_link_libraries(slipstream PRIVATE picoquic-log)
endif ()

2
extern/picoquic vendored

@ -1 +1 @@
Subproject commit f36b4daa6111057cb05cbbfcec287c74bdd65d06
Subproject commit 47e4eb1a7880fead410933a1d4d8b3822f78fbda

View file

@ -2,6 +2,7 @@
#include <stdio.h>
#include <picoquic.h>
#include <picoquic_utils.h>
#include <picoquic_packet_loop.h>
#include <picosocks.h>
#ifdef BUILD_LOGLIB
#include <autoqlog.h>
@ -14,13 +15,10 @@
#include <sys/param.h>
#include <sys/poll.h>
#include "lua-resty-base-encoding-base32.h"
#include "picoquic_config.h"
#include "picoquic_packet_loop.h"
#include "slipstream.h"
#include "slipstream_inline_dots.h"
#include "lua-resty-base-encoding-base32.h"
#include "SPCDNS/src/dns.h"
#include "SPCDNS/src/mappings.h"
@ -120,7 +118,7 @@ ssize_t client_decode(const unsigned char** dest_buf, const unsigned char* src_b
*dest_buf = NULL;
size_t bufsize = DNS_DECODEBUF_4K * sizeof(dns_decoded_t);
dns_decoded_t* decoded = malloc(bufsize);
dns_decoded_t decoded[DNS_DECODEBUF_4K];
const dns_rcode_t rc = dns_decode(decoded, &bufsize, (const dns_packet_t*) src_buf, src_buf_len);
if (rc != RCODE_OKAY) {
fprintf(stderr, "dns_decode() = (%d) %s\n", rc, dns_rcode_text(rc));
@ -153,6 +151,7 @@ ssize_t client_decode(const unsigned char** dest_buf, const unsigned char* src_b
typedef struct st_slipstream_client_stream_ctx_t {
struct st_slipstream_client_stream_ctx_t* next_stream;
struct st_slipstream_client_stream_ctx_t* previous_stream;
int fd;
uint64_t stream_id;
volatile sig_atomic_t set_active;
@ -161,7 +160,6 @@ typedef struct st_slipstream_client_stream_ctx_t {
typedef struct st_slipstream_client_ctx_t {
picoquic_cnx_t* cnx;
slipstream_client_stream_ctx_t* first_stream;
slipstream_client_stream_ctx_t* last_stream;
picoquic_network_thread_ctx_t* thread_ctx;
} slipstream_client_ctx_t;
@ -177,11 +175,10 @@ slipstream_client_stream_ctx_t* slipstream_client_create_stream_ctx(picoquic_cnx
memset(stream_ctx, 0, sizeof(slipstream_client_stream_ctx_t));
if (client_ctx->first_stream == NULL) {
client_ctx->first_stream = stream_ctx;
client_ctx->last_stream = stream_ctx;
}
else {
client_ctx->last_stream->next_stream = stream_ctx;
client_ctx->last_stream = stream_ctx;
} else {
stream_ctx->next_stream = client_ctx->first_stream;
stream_ctx->next_stream->previous_stream = stream_ctx;
client_ctx->first_stream = stream_ctx;
}
stream_ctx->fd = sock_fd;
stream_ctx->stream_id = picoquic_get_next_local_stream_id(client_ctx->cnx, 0);
@ -189,17 +186,32 @@ slipstream_client_stream_ctx_t* slipstream_client_create_stream_ctx(picoquic_cnx
return stream_ctx;
}
static void slipstream_client_free_stream_ctx(slipstream_client_ctx_t* client_ctx, slipstream_client_stream_ctx_t* stream_ctx) {
if (stream_ctx->previous_stream != NULL) {
stream_ctx->previous_stream->next_stream = stream_ctx->next_stream;
}
if (stream_ctx->next_stream != NULL) {
stream_ctx->next_stream->previous_stream = stream_ctx->previous_stream;
}
if (client_ctx->first_stream == stream_ctx) {
client_ctx->first_stream = stream_ctx->next_stream;
}
stream_ctx->fd = close(stream_ctx->fd);
free(stream_ctx);
}
static void slipstream_client_free_context(slipstream_client_ctx_t* client_ctx) {
slipstream_client_stream_ctx_t* stream_ctx;
/* Delete any remaining stream context */
while ((stream_ctx = client_ctx->first_stream) != NULL) {
client_ctx->first_stream = stream_ctx->next_stream;
if (stream_ctx->fd != 0) {
stream_ctx->fd = close(stream_ctx->fd);
}
free(stream_ctx);
slipstream_client_free_stream_ctx(client_ctx, stream_ctx);
}
client_ctx->last_stream = NULL;
/* release the memory */
free(client_ctx);
}
void slipstream_client_mark_active_pass(slipstream_client_ctx_t* client_ctx) {
@ -221,9 +233,18 @@ int slipstream_client_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l
switch (cb_mode) {
case picoquic_packet_loop_wake_up:
if (callback_ctx == NULL) {
return 0;
}
slipstream_client_mark_active_pass(client_ctx);
break;
case picoquic_packet_loop_after_send:
if (callback_ctx == NULL) {
return 0;
}
if (client_ctx->cnx->cnx_state == picoquic_state_disconnected) {
printf("Terminate packet loop\n");
return PICOQUIC_NO_ERROR_TERMINATE_PACKET_LOOP;
@ -255,7 +276,7 @@ void* slipstream_client_poller(void* arg) {
int ret = poll(&fds, 1, 1000);
if (ret < 0) {
perror("poll() failed");
pthread_exit(NULL);
break;
}
if (ret == 0) {
continue;
@ -267,9 +288,13 @@ void* slipstream_client_poller(void* arg) {
if (ret != 0) {
fprintf(stderr, "poll: could not wake up network thread, ret = %d\n", ret);
}
printf("[%lu:%d] wakeup\n", args->stream_ctx->stream_id, args->fd);
pthread_exit(NULL);
break;
}
free(args);
pthread_exit(NULL);
}
typedef struct st_slipstream_client_accepter_args {
@ -293,13 +318,13 @@ void* slipstream_client_accepter(void* arg) {
continue;
}
perror("accept() failed");
pthread_exit(NULL);
break;
}
slipstream_client_stream_ctx_t* stream_ctx = slipstream_client_create_stream_ctx(args->cnx, args->client_ctx, client_sock);
if (stream_ctx == NULL) {
fprintf(stderr, "Could not initiate stream for %d", client_sock);
pthread_exit(NULL);
break;
}
stream_ctx->set_active = 1;
@ -310,8 +335,11 @@ void* slipstream_client_accepter(void* arg) {
pthread_exit(NULL);
}
printf("[?:%d] accept: connection\n[?:%d] wakeup\n", client_sock, client_sock);
printf("[%lu:%d] accept: connection\n[%lu:%d] wakeup\n", stream_ctx->stream_id, client_sock, stream_ctx->stream_id, client_sock);
}
free(args);
pthread_exit(NULL);
}
int slipstream_client_callback(picoquic_cnx_t* cnx,
@ -377,19 +405,17 @@ int slipstream_client_callback(picoquic_cnx_t* cnx,
else {
printf("[%lu:%d] stream reset\n", stream_id, stream_ctx->fd);
/* Close the local_sock fd */
stream_ctx->fd = close(stream_ctx->fd);
slipstream_client_free_stream_ctx(client_ctx, stream_ctx);
picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR);
}
break;
case picoquic_callback_stateless_reset:
case picoquic_callback_close: /* Received connection close */
case picoquic_callback_application_close: /* Received application close */
printf("Connection closed (%d).\n", fin_or_event);
/* Delete the server application context */
printf("Connection closed.\n");
slipstream_client_free_context(client_ctx);
/* Remove the application callback */
picoquic_set_callback(cnx, NULL, NULL);
picoquic_delete_cnx(cnx);
picoquic_close(cnx, 0);
break;
case picoquic_callback_prepare_to_send:
@ -482,6 +508,10 @@ int slipstream_client_callback(picoquic_cnx_t* cnx,
return ret;
}
void client_sighandler(int signum) {
printf("Signal %d received\n", signum);
}
static int slipstream_connect(char const* server_name, int server_port,
picoquic_quic_t* quic, picoquic_cnx_t** cnx,
slipstream_client_ctx_t* client_ctx) {
@ -537,10 +567,6 @@ static int slipstream_connect(char const* server_name, int server_port,
return ret;
}
void client_sighandler(int signum) {
printf("Signal %d received\n", signum);
}
int picoquic_slipstream_client(int listen_port, char const* server_name, int server_port) {
/* Start: start the QUIC process */
int ret = 0;
@ -667,6 +693,9 @@ int picoquic_slipstream_client(int listen_port, char const* server_name, int ser
picoquic_packet_loop_v3(&thread_ctx);
ret = thread_ctx.return_code;
/* And finish. */
printf("Client exit, ret = %d\n", ret);
/* Save tickets and tokens, and free the QUIC context */
if (picoquic_save_session_tickets(quic, ticket_store_filename) != 0) {
fprintf(stderr, "Could not store the saved session tickets.\n");

View file

@ -130,7 +130,6 @@ typedef struct st_slipstream_server_stream_ctx_t {
typedef struct st_slipstream_server_ctx_t {
picoquic_cnx_t* cnx;
slipstream_server_stream_ctx_t* first_stream;
slipstream_server_stream_ctx_t* last_stream;
picoquic_network_thread_ctx_t* thread_ctx;
struct sockaddr_storage upstream_addr;
struct st_slipstream_server_ctx_t* prev_ctx;
@ -141,6 +140,11 @@ slipstream_server_stream_ctx_t* slipstream_server_create_stream_ctx(slipstream_s
uint64_t stream_id) {
slipstream_server_stream_ctx_t* stream_ctx = malloc(sizeof(slipstream_server_stream_ctx_t));
if (stream_ctx == NULL) {
fprintf(stdout, "Memory Error, cannot create stream\n");
return NULL;
}
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (sock_fd < 0) {
perror("socket() failed");
@ -152,51 +156,43 @@ slipstream_server_stream_ctx_t* slipstream_server_create_stream_ctx(slipstream_s
return NULL;
}
if (stream_ctx != NULL) {
memset(stream_ctx, 0, sizeof(slipstream_server_stream_ctx_t));
if (server_ctx->last_stream == NULL) {
server_ctx->last_stream = stream_ctx;
server_ctx->first_stream = stream_ctx;
}
else {
stream_ctx->previous_stream = server_ctx->last_stream;
server_ctx->last_stream->next_stream = stream_ctx;
server_ctx->last_stream = stream_ctx;
}
stream_ctx->stream_id = stream_id;
stream_ctx->fd = sock_fd;
memset(stream_ctx, 0, sizeof(slipstream_server_stream_ctx_t));
if (server_ctx->first_stream == NULL) {
server_ctx->first_stream = stream_ctx;
} else {
stream_ctx->next_stream = server_ctx->first_stream;
stream_ctx->next_stream->previous_stream = stream_ctx;
server_ctx->first_stream = stream_ctx;
}
stream_ctx->fd = sock_fd;
stream_ctx->stream_id = stream_id;
return stream_ctx;
}
void slipstream_server_delete_stream_context(slipstream_server_ctx_t* server_ctx,
static void slipstream_server_free_stream_context(slipstream_server_ctx_t* server_ctx,
slipstream_server_stream_ctx_t* stream_ctx) {
/* Remove the context from the server's list */
if (stream_ctx->previous_stream == NULL) {
server_ctx->first_stream = stream_ctx->next_stream;
}
else {
if (stream_ctx->previous_stream != NULL) {
stream_ctx->previous_stream->next_stream = stream_ctx->next_stream;
}
if (stream_ctx->next_stream == NULL) {
server_ctx->last_stream = stream_ctx->previous_stream;
}
else {
if (stream_ctx->next_stream != NULL) {
stream_ctx->next_stream->previous_stream = stream_ctx->previous_stream;
}
if (server_ctx->first_stream == stream_ctx) {
server_ctx->first_stream = stream_ctx->next_stream;
}
stream_ctx->fd = close(stream_ctx->fd);
/* release the memory */
free(stream_ctx);
}
void slipstream_server_delete_context(slipstream_server_ctx_t* server_ctx) {
static void slipstream_server_free_context(slipstream_server_ctx_t* server_ctx) {
slipstream_server_stream_ctx_t* stream_ctx;
/* Delete any remaining stream context */
while (server_ctx->first_stream != NULL) {
slipstream_server_delete_stream_context(server_ctx, server_ctx->first_stream);
while ((stream_ctx = server_ctx->first_stream) != NULL) {
slipstream_server_free_stream_context(server_ctx, stream_ctx);
}
/* release the memory */
@ -243,7 +239,7 @@ int slipstream_server_sockloop_callback(picoquic_quic_t* quic, picoquic_packet_l
typedef struct st_slipstream_server_poller_args {
int fd;
picoquic_cnx_t* cnx;
slipstream_server_ctx_t* client_ctx;
slipstream_server_ctx_t* server_ctx;
slipstream_server_stream_ctx_t* stream_ctx;
} slipstream_server_poller_args;
@ -260,7 +256,7 @@ void* slipstream_server_poller(void* arg) {
int ret = poll(&fds, 1, 1000);
if (ret < 0) {
perror("poll() failed");
pthread_exit(NULL);
break;
}
if (ret == 0) {
continue;
@ -268,14 +264,18 @@ void* slipstream_server_poller(void* arg) {
args->stream_ctx->set_active = 1;
ret = picoquic_wake_up_network_thread(args->client_ctx->thread_ctx);
ret = picoquic_wake_up_network_thread(args->server_ctx->thread_ctx);
if (ret != 0) {
fprintf(stderr, "poll: could not wake up network thread, ret = %d\n", ret);
}
printf("[%lu:%d] wakeup\n", args->stream_ctx->stream_id, args->fd);
pthread_exit(NULL);
break;
}
free(args);
pthread_exit(NULL);
}
int slipstream_server_callback(picoquic_cnx_t* cnx,
@ -374,16 +374,20 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
else {
printf("[%lu:%d] stream reset\n", stream_id, stream_ctx->fd);
/* Close the local_sock fd */
stream_ctx->fd = close(stream_ctx->fd);
slipstream_server_free_stream_context(server_ctx, stream_ctx);
picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR);
}
break;
case picoquic_callback_stateless_reset:
case picoquic_callback_close: /* Received connection close */
case picoquic_callback_application_close: /* Received application close */
printf("Connection closed.\n");
if (server_ctx != NULL) {
slipstream_server_free_context(server_ctx);
}
/* Remove the application callback */
picoquic_set_callback(cnx, NULL, NULL);
picoquic_close(cnx, 0);
break;
case picoquic_callback_prepare_to_send:
/* Active sending API */
@ -416,7 +420,7 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
slipstream_server_poller_args* args = malloc(sizeof(slipstream_server_poller_args));
args->fd = stream_ctx->fd;
args->cnx = cnx;
args->client_ctx = server_ctx;
args->server_ctx = server_ctx;
args->stream_ctx = stream_ctx;
pthread_t thread;
@ -501,7 +505,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
config.server_cert_file = server_cert;
config.server_key_file = server_key;
// config.log_file = "-";
#ifndef DISABLE_DEBUG_PRINTF
#ifndef BUILD_LOGLIB
config.qlog_dir = SLIPSTREAM_QLOG_DIR;
#endif
config.server_port = server_port;

View file

@ -1,10 +1,6 @@
#include <time.h>
#include "slipstream_server_circular_query_buffer.h"
#include "picoquic_utils.h"
#include "SPCDNS/src/dns.h"
// Get next available slot for writing
dns_decoded_t* circular_query_buffer_get_write_slot(circular_query_buffer_t* buf) {
dns_decoded_t* slot = buf->queries[buf->head];
@ -36,7 +32,7 @@ dns_decoded_t* circular_query_buffer_get_read_slot(circular_query_buffer_t* buf)
size_t circular_query_buffer_get_size(circular_query_buffer_t* buf) {
if (buf->head >= buf->tail) {
return buf->head - buf->tail;
} else {
return SIZE - buf->tail + buf->head;
}
}
return SIZE - buf->tail + buf->head;
}