mirror of
https://github.com/EndPositive/slipstream.git
synced 2025-10-08 12:25:04 +00:00
Pipe structure
This commit is contained in:
parent
9f457c1392
commit
a4871685e0
1 changed files with 110 additions and 31 deletions
|
|
@ -183,6 +183,7 @@ typedef struct st_slipstream_server_stream_ctx_t {
|
|||
struct st_slipstream_server_stream_ctx_t* next_stream;
|
||||
struct st_slipstream_server_stream_ctx_t* previous_stream;
|
||||
int fd;
|
||||
int pipefd[2];
|
||||
uint64_t stream_id;
|
||||
volatile sig_atomic_t set_active;
|
||||
} slipstream_server_stream_ctx_t;
|
||||
|
|
@ -201,25 +202,29 @@ slipstream_server_stream_ctx_t* slipstream_server_create_stream_ctx(slipstream_s
|
|||
slipstream_server_stream_ctx_t* stream_ctx = malloc(sizeof(slipstream_server_stream_ctx_t));
|
||||
|
||||
if (stream_ctx == NULL) {
|
||||
fprintf(stdout, "Memory Error, cannot create stream\n");
|
||||
DBG_PRINTF("Memory Error, cannot create stream", NULL);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memset(stream_ctx, 0, sizeof(slipstream_server_stream_ctx_t));
|
||||
stream_ctx->stream_id = stream_id;
|
||||
|
||||
if (pipe(stream_ctx->pipefd) < 0) {
|
||||
perror("pipe() failed");
|
||||
free(stream_ctx);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (sock_fd < 0) {
|
||||
perror("socket() failed");
|
||||
close(stream_ctx->pipefd[0]);
|
||||
close(stream_ctx->pipefd[1]);
|
||||
free(stream_ctx);
|
||||
return NULL;
|
||||
}
|
||||
stream_ctx->fd = sock_fd;
|
||||
|
||||
if (connect(sock_fd, (struct sockaddr*)&server_ctx->upstream_addr, sizeof(server_ctx->upstream_addr)) < 0) {
|
||||
perror("connect() failed");
|
||||
free(stream_ctx);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memset(stream_ctx, 0, sizeof(slipstream_server_stream_ctx_t));
|
||||
if (server_ctx->first_stream == NULL) {
|
||||
server_ctx->first_stream = stream_ctx;
|
||||
} else {
|
||||
|
|
@ -227,8 +232,6 @@ slipstream_server_stream_ctx_t* slipstream_server_create_stream_ctx(slipstream_s
|
|||
stream_ctx->next_stream->previous_stream = stream_ctx;
|
||||
server_ctx->first_stream = stream_ctx;
|
||||
}
|
||||
stream_ctx->fd = sock_fd;
|
||||
stream_ctx->stream_id = stream_id;
|
||||
|
||||
return stream_ctx;
|
||||
}
|
||||
|
|
@ -338,7 +341,7 @@ void* slipstream_server_poller(void* arg) {
|
|||
|
||||
ret = picoquic_wake_up_network_thread(args->server_ctx->thread_ctx);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "poll: could not wake up network thread, ret = %d\n", ret);
|
||||
DBG_PRINTF("poll: could not wake up network thread, ret = %d", ret);
|
||||
}
|
||||
DBG_PRINTF("[stream_id=%d][fd=%d] wakeup", args->stream_ctx->stream_id, args->fd);
|
||||
|
||||
|
|
@ -350,6 +353,69 @@ void* slipstream_server_poller(void* arg) {
|
|||
pthread_exit(NULL);
|
||||
}
|
||||
|
||||
typedef struct st_slipstream_io_copy_args {
|
||||
int pipe;
|
||||
int socket;
|
||||
picoquic_cnx_t* cnx;
|
||||
slipstream_server_ctx_t* server_ctx;
|
||||
slipstream_server_stream_ctx_t* stream_ctx;
|
||||
} slipstream_io_copy_args;
|
||||
|
||||
void* slipstream_io_copy(void* arg) {
|
||||
char buffer[1024];
|
||||
slipstream_io_copy_args* args = arg;
|
||||
int pipe = args->pipe;
|
||||
int socket = args->socket;
|
||||
slipstream_server_ctx_t* server_ctx = args->server_ctx;
|
||||
slipstream_server_stream_ctx_t* stream_ctx = args->stream_ctx;
|
||||
|
||||
if (connect(socket, (struct sockaddr*)&server_ctx->upstream_addr, sizeof(server_ctx->upstream_addr)) < 0) {
|
||||
perror("connect() failed");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
DBG_PRINTF("[%lu:%d] setup pipe done", stream_ctx->stream_id, stream_ctx->fd);
|
||||
|
||||
stream_ctx->set_active = 1;
|
||||
|
||||
args->stream_ctx->set_active = 1;
|
||||
|
||||
int ret = picoquic_wake_up_network_thread(args->server_ctx->thread_ctx);
|
||||
if (ret != 0) {
|
||||
DBG_PRINTF("poll: could not wake up network thread, ret = %d", ret);
|
||||
}
|
||||
DBG_PRINTF("[stream_id=%d][fd=%d] wakeup", args->stream_ctx->stream_id, args->socket);
|
||||
|
||||
while (1) {
|
||||
ssize_t bytes_read = read(pipe, buffer, sizeof(buffer));
|
||||
|
||||
DBG_PRINTF("[%lu:%d] read %d bytes", stream_ctx->stream_id, stream_ctx->fd, bytes_read);
|
||||
if (bytes_read < 0) {
|
||||
perror("recv failed");
|
||||
return NULL;
|
||||
} else if (bytes_read == 0) {
|
||||
// End of stream - source socket closed connection
|
||||
break;
|
||||
}
|
||||
|
||||
char *p = buffer;
|
||||
ssize_t remaining = bytes_read;
|
||||
|
||||
while (remaining > 0) {
|
||||
ssize_t bytes_written = send(socket, p, remaining, 0);
|
||||
if (bytes_written < 0) {
|
||||
perror("send failed");
|
||||
return NULL;
|
||||
}
|
||||
remaining -= bytes_written;
|
||||
p += bytes_written;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
int slipstream_server_callback(picoquic_cnx_t* cnx,
|
||||
uint64_t stream_id, uint8_t* bytes, size_t length,
|
||||
picoquic_call_back_event_t fin_or_event, void* callback_ctx, void* v_stream_ctx) {
|
||||
|
|
@ -386,7 +452,7 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
|
|||
server_ctx->prev_ctx = d_ctx;
|
||||
d_ctx->next_ctx = server_ctx;
|
||||
|
||||
printf("Created ctx\n");
|
||||
DBG_PRINTF("Created ctx", NULL);
|
||||
}
|
||||
|
||||
switch (fin_or_event) {
|
||||
|
|
@ -401,19 +467,32 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
|
|||
(void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR);
|
||||
return 0;
|
||||
}
|
||||
printf("[%lu:%d] connected\n", stream_id, stream_ctx->fd);
|
||||
picoquic_mark_active_stream(cnx, stream_id, 1, stream_ctx);
|
||||
printf("[%lu:%d] marked active\n", stream_id, stream_ctx->fd);
|
||||
DBG_PRINTF("[%lu:%d] setup pipe", stream_id, stream_ctx->pipefd[1]);
|
||||
|
||||
slipstream_io_copy_args* args = malloc(sizeof(slipstream_io_copy_args));
|
||||
args->pipe = stream_ctx->pipefd[0];
|
||||
args->socket = stream_ctx->fd;
|
||||
args->cnx = cnx;
|
||||
args->server_ctx = server_ctx;
|
||||
args->stream_ctx = stream_ctx;
|
||||
|
||||
pthread_t thread;
|
||||
if (pthread_create(&thread, NULL, slipstream_io_copy, args) != 0) {
|
||||
perror("pthread_create() failed for thread1");
|
||||
free(args);
|
||||
}
|
||||
pthread_setname_np(thread, "slipstream_io_copy");
|
||||
pthread_detach(thread);
|
||||
|
||||
}
|
||||
|
||||
// DBG_PRINTF("[stream_id=%d] quic_recv->send %lu bytes", stream_id, length);
|
||||
|
||||
if (length > 0) {
|
||||
DBG_PRINTF("[stream_id=%d] conn opened", stream_ctx->stream_id);
|
||||
picoquic_mark_active_stream(cnx, stream_id, 1, stream_ctx);
|
||||
DBG_PRINTF("[stream_id=%d][leftover_length=%d]", stream_ctx->stream_id, length);
|
||||
|
||||
ssize_t bytes_sent = send(stream_ctx->fd, bytes, length, MSG_NOSIGNAL);
|
||||
ssize_t bytes_sent = write(stream_ctx->pipefd[1], bytes, length);
|
||||
DBG_PRINTF("[stream_id=%d][bytes_sent=%d]", stream_ctx->stream_id, bytes_sent);
|
||||
if (bytes_sent < 0) {
|
||||
if (errno == EPIPE) {
|
||||
/* Connection closed */
|
||||
|
|
@ -458,7 +537,7 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
|
|||
case picoquic_callback_stateless_reset:
|
||||
case picoquic_callback_close: /* Received connection close */
|
||||
case picoquic_callback_application_close: /* Received application close */
|
||||
printf("Connection closed.\n");
|
||||
DBG_PRINTF("Connection closed.", NULL);
|
||||
if (server_ctx != NULL) {
|
||||
slipstream_server_free_context(server_ctx);
|
||||
}
|
||||
|
|
@ -487,9 +566,9 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
|
|||
if (length_to_read == 0) {
|
||||
char a;
|
||||
ssize_t bytes_read = recv(stream_ctx->fd, &a, 1, MSG_PEEK | MSG_DONTWAIT);
|
||||
// printf("[%lu:%d] recv->quic_send empty read %d bytes\n", stream_id, stream_ctx->fd, bytes_read);
|
||||
// DBG_PRINTF("[%lu:%d] recv->quic_send empty read %d bytes\n", stream_id, stream_ctx->fd, bytes_read);
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
// printf("[%lu:%d] recv->quic_send empty errno set: %s\n", stream_id, stream_ctx->fd, strerror(errno));
|
||||
// DBG_PRINTF("[%lu:%d] recv->quic_send empty errno set: %s\n", stream_id, stream_ctx->fd, strerror(errno));
|
||||
/* No bytes available, wait for next event */
|
||||
(void)picoquic_provide_stream_data_buffer(bytes, 0, 0, 0);
|
||||
DBG_PRINTF("[stream_id=%d] recv->quic_send: empty, disactivate", stream_ctx->stream_id);
|
||||
|
|
@ -526,16 +605,16 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
|
|||
/* Should never happen according to callback spec. */
|
||||
break;
|
||||
}
|
||||
// printf("[%lu:%d] recv->quic_send recv %d bytes into quic\n", stream_id, stream_ctx->fd, length_to_read);
|
||||
// DBG_PRINTF("[%lu:%d] recv->quic_send recv %d bytes into quic\n", stream_id, stream_ctx->fd, length_to_read);
|
||||
ssize_t bytes_read = recv(stream_ctx->fd, buffer, length_to_read, MSG_DONTWAIT);
|
||||
// printf("[%lu:%d] recv->quic_send recv done %d bytes into quic\n", stream_id, stream_ctx->fd, bytes_read);
|
||||
// DBG_PRINTF("[%lu:%d] recv->quic_send recv done %d bytes into quic\n", stream_id, stream_ctx->fd, bytes_read);
|
||||
if (bytes_read == 0) {
|
||||
printf("Closed connection on sock %d on recv", stream_ctx->fd);
|
||||
DBG_PRINTF("Closed connection on sock %d on recv", stream_ctx->fd);
|
||||
(void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_FILE_CANCEL_ERROR);
|
||||
return 0;
|
||||
}
|
||||
if (bytes_read < 0) {
|
||||
fprintf(stderr, "recv: %s (%d)\n", strerror(errno), errno);
|
||||
DBG_PRINTF("recv: %s (%d)", strerror(errno), errno);
|
||||
/* There should be bytes available, so a return value of 0 is an error */
|
||||
(void)picoquic_reset_stream(cnx, stream_id, SLIPSTREAM_INTERNAL_ERROR);
|
||||
return 0;
|
||||
|
|
@ -543,10 +622,10 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
|
|||
}
|
||||
break;
|
||||
case picoquic_callback_almost_ready:
|
||||
fprintf(stdout, "Connection completed, almost ready.\n");
|
||||
DBG_PRINTF("Connection completed, almost ready", NULL);
|
||||
break;
|
||||
case picoquic_callback_ready:
|
||||
fprintf(stdout, "Connection confirmed.\n");
|
||||
DBG_PRINTF("Connection confirmed", NULL);
|
||||
break;
|
||||
default:
|
||||
/* unexpected -- just ignore. */
|
||||
|
|
@ -557,7 +636,7 @@ int slipstream_server_callback(picoquic_cnx_t* cnx,
|
|||
}
|
||||
|
||||
void server_sighandler(int signum) {
|
||||
printf("Signal %d received\n", signum);
|
||||
DBG_PRINTF("Signal %d received", signum);
|
||||
}
|
||||
|
||||
int picoquic_slipstream_server(int server_port, const char* server_cert, const char* server_key,
|
||||
|
|
@ -566,7 +645,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
|
|||
int ret = 0;
|
||||
uint64_t current_time = 0;
|
||||
slipstream_server_ctx_t default_context = {0};
|
||||
printf("Starting Picoquic Sample server on port %d\n", server_port);
|
||||
DBG_PRINTF("Starting Picoquic Sample server on port %d", server_port);
|
||||
|
||||
int is_name = 0;
|
||||
picoquic_get_server_address(upstream_name, upstream_port, &default_context.upstream_addr, &is_name);
|
||||
|
|
@ -604,7 +683,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
|
|||
/* Create QUIC context */
|
||||
picoquic_quic_t* quic = picoquic_create_and_configure(&config, slipstream_server_callback, &default_context, current_time, NULL);
|
||||
if (quic == NULL) {
|
||||
fprintf(stderr, "Could not create server context\n");
|
||||
DBG_PRINTF("Could not create server context", NULL);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
@ -646,7 +725,7 @@ int picoquic_slipstream_server(int server_port, const char* server_cert, const c
|
|||
ret = thread_ctx.return_code;
|
||||
|
||||
/* And finish. */
|
||||
printf("Server exit, ret = %d\n", ret);
|
||||
DBG_PRINTF("Server exit, ret = %d", ret);
|
||||
|
||||
picoquic_free(quic);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue