network: ping pong keepalive for tcp connections

To make the tcp connections keepalive and better handle the timeout
of connections

con_timeout indicate the connection timeout and it is configurable

Signed-off-by: Chen Minqiang <ptpt52@gmail.com>
This commit is contained in:
Chen Minqiang 2022-07-21 06:03:58 +08:00 committed by Nick Hainke
parent eba03547a5
commit 47e98efed6
8 changed files with 136 additions and 30 deletions

View file

@ -12,6 +12,10 @@
#define STR_QUOTE(x) STR_EVAL(x)
#define HEADER_SIZE sizeof(uint32_t)
#define PING_STR "ping"
#define PONG_STR "pong"
#define PING_SIZE (strlen(PING_STR)+1)
#define PONG_SIZE (strlen(PONG_STR)+1)
LIST_HEAD(tcp_sock_list);
LIST_HEAD(cli_list);
@ -183,6 +187,12 @@ static void client_read_cb(struct ustream *s, int bytes) {
if (cl->state == READ_STATUS_COMPLETE)
{
dawnlog_debug("tcp_socket: processing message...\n");
/* received pong */
if (cl->final_len == HEADER_SIZE + PONG_SIZE && memcmp(cl->str + HEADER_SIZE, PONG_STR, PONG_SIZE) == 0) {
goto process_done;
}
if (network_config.use_symm_enc) {
char *dec = gcrypt_decrypt_msg(cl->str + HEADER_SIZE, cl->final_len - HEADER_SIZE);//len of str is final_len
if (!dec) {
@ -198,6 +208,7 @@ static void client_read_cb(struct ustream *s, int bytes) {
handle_network_msg(cl->str + HEADER_SIZE);//len of str is final_len
}
process_done:
cl->state = READ_STATUS_READY;
cl->curr_len = 0;
cl->final_len = 0;
@ -258,15 +269,48 @@ int run_server(int port) {
return 0;
}
static void client_not_be_used_read_cb(struct ustream *s, int bytes) {
static void client_ping_read_cb(struct ustream *s, int bytes) {
int len;
char buf[2048];
uint32_t ping_len = HEADER_SIZE + PING_SIZE;
dawnlog_debug_func("Entering...");
len = ustream_read(s, buf, sizeof(buf));
buf[len] = '\0';
dawnlog_debug("Read %d bytes from SSL connection: %s\n", len, buf);
len = ustream_read(s, buf, ping_len);
/* client received ping, send pong back to server */
if (len == ping_len && ntohl(*(uint32_t *)buf) == ping_len && memcmp(buf + HEADER_SIZE, PING_STR, PING_SIZE) == 0) {
struct network_con_s *con = container_of(s, struct network_con_s, stream.stream);
int len_ustream;
const char *msg = PONG_STR;
size_t msglen = PONG_SIZE;
uint32_t final_len = msglen + HEADER_SIZE;
char final_str[HEADER_SIZE + PONG_SIZE];
uint32_t *msg_header = (uint32_t *)final_str;
con->time_alive = time(0);
*msg_header = htonl(final_len);
memcpy(final_str + HEADER_SIZE, msg, msglen);
len_ustream = ustream_write(&con->stream.stream, final_str, final_len, 0);
if (len_ustream <= 0) {
dawnlog_error("Ustream error(" STR_QUOTE(__LINE__) ")!\n");
//ERROR HANDLING!
if (con->stream.stream.write_error) {
ustream_free(&con->stream.stream);
dawn_unregmem(&con->stream.stream);
close(con->fd.fd);
list_del(&con->list);
dawn_free(con);
con = NULL;
}
}
} else {
buf[len] = 0;
dawnlog_error("Read %d bytes upexpected: %s\n", len, buf);
}
}
static void connect_cb(struct uloop_fd *f, unsigned int events) {
@ -287,7 +331,7 @@ static void connect_cb(struct uloop_fd *f, unsigned int events) {
dawnlog_debug("Connection established\n");
uloop_fd_delete(&entry->fd);
entry->stream.stream.notify_read = client_not_be_used_read_cb;
entry->stream.stream.notify_read = client_ping_read_cb;
entry->stream.stream.notify_state = client_to_server_state;
ustream_fd_init(&entry->stream, entry->fd.fd);
@ -336,6 +380,7 @@ int add_tcp_connection(char *ipv4, int port) {
uloop_fd_add(&tcp_entry->fd, ULOOP_WRITE | ULOOP_EDGE_TRIGGER);
dawnlog_debug("New TCP connection to %s:%d\n", ipv4, port);
tcp_entry->time_alive = time(0);
list_add(&tcp_entry->list, &tcp_sock_list);
return 0;
@ -429,18 +474,59 @@ void send_tcp(char *msg) {
}
}
void check_client_timeout(int timeout) {
void server_to_clients_ping(void)
{
struct client *cl, *tmp;
time_t now = time(0);
const char *msg = PING_STR;
size_t msglen = PING_SIZE;
uint32_t final_len = msglen + HEADER_SIZE;
char final_str[HEADER_SIZE + PING_SIZE];
uint32_t *msg_header = (uint32_t *)final_str;
*msg_header = htonl(final_len);
memcpy(final_str + HEADER_SIZE, msg, msglen);
list_for_each_entry_safe(cl, tmp, &cli_list, list)
{
if (now - cl->time_alive > timeout || now - cl->time_alive < -timeout) {
dawnlog_debug("Ustream client_close: timeout=%d\n", (int)(now - cl->time_alive));
client_close(&cl->s.stream);
int len_ustream = ustream_write(&cl->s.stream, final_str, final_len, 0);
if (len_ustream <= 0) {
dawnlog_error("Ustream error(" STR_QUOTE(__LINE__) ")!\n");
if (cl->s.stream.write_error) {
client_close(&cl->s.stream);
}
}
}
}
void check_timeout(int timeout) {
do {
struct client *cl, *tmp;
time_t now = time(0);
list_for_each_entry_safe(cl, tmp, &cli_list, list)
{
if (now - cl->time_alive > timeout || now - cl->time_alive < -timeout) {
dawnlog_info("server: close client connection! timeout=%d\n", (int)(now - cl->time_alive));
client_close(&cl->s.stream);
}
}
} while (0);
do {
struct network_con_s *con, *tmp;
time_t now = time(0);
list_for_each_entry_safe(con, tmp, &cli_list, list)
{
if (now - con->time_alive > timeout || now - con->time_alive < -timeout) {
dawnlog_info("client: close client_to_server connection! timeout=%d\n", (int)(now - con->time_alive));
ustream_free(&con->stream.stream);
dawn_unregmem(&con->stream.stream);
close(con->fd.fd);
list_del(&con->list);
dawn_free(con);
}
}
} while (0);
}
struct network_con_s* tcp_list_contains_address(struct sockaddr_in entry) {
struct network_con_s *con;