diff --git a/src/slipstream_server.c b/src/slipstream_server.c index 585335d..68b0128 100644 --- a/src/slipstream_server.c +++ b/src/slipstream_server.c @@ -183,6 +183,7 @@ typedef struct st_slipstream_server_stream_ctx_t { struct st_slipstream_server_stream_ctx_t* next_stream; struct st_slipstream_server_stream_ctx_t* previous_stream; int fd; + int pipefd[2]; uint64_t stream_id; volatile sig_atomic_t set_active; } slipstream_server_stream_ctx_t; @@ -201,25 +202,29 @@ slipstream_server_stream_ctx_t* slipstream_server_create_stream_ctx(slipstream_s slipstream_server_stream_ctx_t* stream_ctx = malloc(sizeof(slipstream_server_stream_ctx_t)); if (stream_ctx == NULL) { - fprintf(stdout, "Memory Error, cannot create stream\n"); + DBG_PRINTF("Memory Error, cannot create stream", NULL); return NULL; } memset(stream_ctx, 0, sizeof(slipstream_server_stream_ctx_t)); + stream_ctx->stream_id = stream_id; + + if (pipe(stream_ctx->pipefd) < 0) { + perror("pipe() failed"); + free(stream_ctx); + return NULL; + } + int sock_fd = socket(AF_INET, SOCK_STREAM, 0); if (sock_fd < 0) { perror("socket() failed"); + close(stream_ctx->pipefd[0]); + close(stream_ctx->pipefd[1]); free(stream_ctx); return NULL; } + stream_ctx->fd = sock_fd; - if (connect(sock_fd, (struct sockaddr*)&server_ctx->upstream_addr, sizeof(server_ctx->upstream_addr)) < 0) { - perror("connect() failed"); - free(stream_ctx); - return NULL; - } - - memset(stream_ctx, 0, sizeof(slipstream_server_stream_ctx_t)); if (server_ctx->first_stream == NULL) { server_ctx->first_stream = stream_ctx; } else { @@ -227,8 +232,6 @@ slipstream_server_stream_ctx_t* slipstream_server_create_stream_ctx(slipstream_s stream_ctx->next_stream->previous_stream = stream_ctx; server_ctx->first_stream = stream_ctx; } - stream_ctx->fd = sock_fd; - stream_ctx->stream_id = stream_id; return stream_ctx; } @@ -338,7 +341,7 @@ void* slipstream_server_poller(void* arg) { ret = picoquic_wake_up_network_thread(args->server_ctx->thread_ctx); if (ret != 0) { - fprintf(stderr, "poll: could not wake up network thread, ret = %d\n", ret); + DBG_PRINTF("poll: could not wake up network thread, ret = %d", ret); } DBG_PRINTF("[stream_id=%d][fd=%d] wakeup", args->stream_ctx->stream_id, args->fd); @@ -350,6 +353,69 @@ void* slipstream_server_poller(void* arg) { pthread_exit(NULL); } +typedef struct st_slipstream_io_copy_args { + int pipe; + int socket; + picoquic_cnx_t* cnx; + slipstream_server_ctx_t* server_ctx; + slipstream_server_stream_ctx_t* stream_ctx; +} slipstream_io_copy_args; + +void* slipstream_io_copy(void* arg) { + char buffer[1024]; + slipstream_io_copy_args* args = arg; + int pipe = args->pipe; + int socket = args->socket; + slipstream_server_ctx_t* server_ctx = args->server_ctx; + slipstream_server_stream_ctx_t* stream_ctx = args->stream_ctx; + + if (connect(socket, (struct sockaddr*)&server_ctx->upstream_addr, sizeof(server_ctx->upstream_addr)) < 0) { + perror("connect() failed"); + return NULL; + } + + DBG_PRINTF("[%lu:%d] setup pipe done", stream_ctx->stream_id, stream_ctx->fd); + + stream_ctx->set_active = 1; + + args->stream_ctx->set_active = 1; + + int ret = picoquic_wake_up_network_thread(args->server_ctx->thread_ctx); + if (ret != 0) { + DBG_PRINTF("poll: could not wake up network thread, ret = %d", ret); + } + DBG_PRINTF("[stream_id=%d][fd=%d] wakeup", args->stream_ctx->stream_id, args->socket); + + while (1) { + ssize_t bytes_read = read(pipe, buffer, sizeof(buffer)); + + DBG_PRINTF("[%lu:%d] read %d bytes", stream_ctx->stream_id, stream_ctx->fd, bytes_read); + if (bytes_read < 0) { + perror("recv failed"); + return NULL; + } else if (bytes_read == 0) { + // End of stream - source socket closed connection + break; + } + + char *p = buffer; + ssize_t remaining = bytes_read; + + while (remaining > 0) { + ssize_t bytes_written = send(socket, p, remaining, 0); + if (bytes_written < 0) { + perror("send failed"); + return NULL; + } + remaining -= bytes_written; + p += bytes_written; + } + } + + return NULL; +} + + int slipstream_server_callback(picoquic_cnx_t* cnx, uint64_t stream_id, uint8_t* bytes, size_t length, picoquic_call_back_event_t fin_or_event, void* callback_ctx, void* v_stream_ctx) { @@ -386,7 +452,7 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, server_ctx->prev_ctx = d_ctx; d_ctx->next_ctx = server_ctx; - printf("Created ctx\n"); + DBG_PRINTF("Created ctx", NULL); } switch (fin_or_event) { @@ -401,19 +467,32 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); return 0; } - printf("[%lu:%d] connected\n", stream_id, stream_ctx->fd); - picoquic_mark_active_stream(cnx, stream_id, 1, stream_ctx); - printf("[%lu:%d] marked active\n", stream_id, stream_ctx->fd); + DBG_PRINTF("[%lu:%d] setup pipe", stream_id, stream_ctx->pipefd[1]); + + slipstream_io_copy_args* args = malloc(sizeof(slipstream_io_copy_args)); + args->pipe = stream_ctx->pipefd[0]; + args->socket = stream_ctx->fd; + args->cnx = cnx; + args->server_ctx = server_ctx; + args->stream_ctx = stream_ctx; + + pthread_t thread; + if (pthread_create(&thread, NULL, slipstream_io_copy, args) != 0) { + perror("pthread_create() failed for thread1"); + free(args); + } + pthread_setname_np(thread, "slipstream_io_copy"); + pthread_detach(thread); + } // DBG_PRINTF("[stream_id=%d] quic_recv->send %lu bytes", stream_id, length); if (length > 0) { - DBG_PRINTF("[stream_id=%d] conn opened", stream_ctx->stream_id); - picoquic_mark_active_stream(cnx, stream_id, 1, stream_ctx); DBG_PRINTF("[stream_id=%d][leftover_length=%d]", stream_ctx->stream_id, length); - ssize_t bytes_sent = send(stream_ctx->fd, bytes, length, MSG_NOSIGNAL); + ssize_t bytes_sent = write(stream_ctx->pipefd[1], bytes, length); + DBG_PRINTF("[stream_id=%d][bytes_sent=%d]", stream_ctx->stream_id, bytes_sent); if (bytes_sent < 0) { if (errno == EPIPE) { /* Connection closed */ @@ -458,7 +537,7 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, case picoquic_callback_stateless_reset: case picoquic_callback_close: /* Received connection close */ case picoquic_callback_application_close: /* Received application close */ - printf("Connection closed.\n"); + DBG_PRINTF("Connection closed.", NULL); if (server_ctx != NULL) { slipstream_server_free_context(server_ctx); } @@ -487,9 +566,9 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, if (length_to_read == 0) { char a; ssize_t bytes_read = recv(stream_ctx->fd, &a, 1, MSG_PEEK | MSG_DONTWAIT); - // printf("[%lu:%d] recv->quic_send empty read %d bytes\n", stream_id, stream_ctx->fd, bytes_read); + // DBG_PRINTF("[%lu:%d] recv->quic_send empty read %d bytes\n", stream_id, stream_ctx->fd, bytes_read); if (errno == EAGAIN || errno == EWOULDBLOCK) { - // printf("[%lu:%d] recv->quic_send empty errno set: %s\n", stream_id, stream_ctx->fd, strerror(errno)); + // DBG_PRINTF("[%lu:%d] recv->quic_send empty errno set: %s\n", stream_id, stream_ctx->fd, strerror(errno)); /* No bytes available, wait for next event */ (void)picoquic_provide_stream_data_buffer(bytes, 0, 0, 0); DBG_PRINTF("[stream_id=%d] recv->quic_send: empty, disactivate", stream_ctx->stream_id); @@ -526,16 +605,16 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, /* Should never happen according to callback spec. */ break; } - // printf("[%lu:%d] recv->quic_send recv %d bytes into quic\n", stream_id, stream_ctx->fd, length_to_read); + // DBG_PRINTF("[%lu:%d] recv->quic_send recv %d bytes into quic\n", stream_id, stream_ctx->fd, length_to_read); ssize_t bytes_read = recv(stream_ctx->fd, buffer, length_to_read, MSG_DONTWAIT); - // printf("[%lu:%d] recv->quic_send recv done %d bytes into quic\n", stream_id, stream_ctx->fd, bytes_read); + // DBG_PRINTF("[%lu:%d] recv->quic_send recv done %d bytes into quic\n", stream_id, stream_ctx->fd, bytes_read); if (bytes_read == 0) { - printf("Closed connection on sock %d on recv", stream_ctx->fd); + DBG_PRINTF("Closed connection on sock %d on recv", stream_ctx->fd); (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR); return 0; } if (bytes_read < 0) { - fprintf(stderr, "recv: %s (%d)\n", strerror(errno), errno); + DBG_PRINTF("recv: %s (%d)", strerror(errno), errno); /* There should be bytes available, so a return value of 0 is an error */ (void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR); return 0; @@ -543,10 +622,10 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, } break; case picoquic_callback_almost_ready: - fprintf(stdout, "Connection completed, almost ready.\n"); + DBG_PRINTF("Connection completed, almost ready", NULL); break; case picoquic_callback_ready: - fprintf(stdout, "Connection confirmed.\n"); + DBG_PRINTF("Connection confirmed", NULL); break; default: /* unexpected -- just ignore. */ @@ -557,7 +636,7 @@ int slipstream_server_callback(picoquic_cnx_t* cnx, } void server_sighandler(int signum) { - printf("Signal %d received\n", signum); + DBG_PRINTF("Signal %d received", signum); } int picoquic_slipstream_server(int server_port, const char* server_cert, const char* server_key, @@ -566,7 +645,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c int ret = 0; uint64_t current_time = 0; slipstream_server_ctx_t default_context = {0}; - printf("Starting Picoquic Sample server on port %d\n", server_port); + DBG_PRINTF("Starting Picoquic Sample server on port %d", server_port); int is_name = 0; picoquic_get_server_address(upstream_name, upstream_port, &default_context.upstream_addr, &is_name); @@ -604,7 +683,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c /* Create QUIC context */ picoquic_quic_t* quic = picoquic_create_and_configure(&config, slipstream_server_callback, &default_context, current_time, NULL); if (quic == NULL) { - fprintf(stderr, "Could not create server context\n"); + DBG_PRINTF("Could not create server context", NULL); return -1; } @@ -646,7 +725,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c ret = thread_ctx.return_code; /* And finish. */ - printf("Server exit, ret = %d\n", ret); + DBG_PRINTF("Server exit, ret = %d", ret); picoquic_free(quic);