1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SRT: Refine the SRT socket code.

This commit is contained in:
winlin 2022-05-28 17:18:39 +08:00
parent 6c94e91792
commit 492e3a888c
2 changed files with 72 additions and 62 deletions

View file

@ -549,25 +549,26 @@ srs_error_t SrsSrtSocket::connect(const string& ip, int port)
// @see https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_connect
int ret = srt_connect(srt_fd_, (const sockaddr*)&inaddr, sizeof(inaddr));
if (ret != 0) {
return srs_error_new(ERROR_SRT_IO, "srt_connect, err=%s", srt_getlasterror_str());
}
if (ret == 0) {
// Connect succuess, in async mode, means SRT API succuess and return directly,
// and the connection is in progress, like tcp socket API connect errno EINPROGRESS,
// and the SRT IO threads will do the real handshake step to finish srt connect.
SRT_SOCKSTATUS srt_status = srt_getsockstate(srt_fd_);
if (srt_status != SRTS_CONNECTED) {
// Connect is in progress, wait until it finish or error.
if ((err = wait_writeable()) != srs_success) {
return srs_error_wrap(err, "wait writeable");
}
// Connect succeed, in async mode, means SRT API succeed and return directly,
// and the connection is in progress, like tcp socket API connect errno EINPROGRESS,
// and the SRT IO threads will do the real handshake step to finish srt connect.
SRT_SOCKSTATUS srt_status = srt_getsockstate(srt_fd_);
if (srt_status == SRTS_CONNECTED) {
return err;
}
// Double check if connect is established.
srt_status = srt_getsockstate(srt_fd_);
if (srt_status != SRTS_CONNECTED) {
return srs_error_new(ERROR_SRT_IO, "srt_connect, err=%s", srt_getlasterror_str());
}
}
} else {
// Connect is in progress, wait until it finish or error.
if ((err = wait_writeable()) != srs_success) {
return srs_error_wrap(err, "wait writeable");
}
// Double check if connect is established.
srt_status = srt_getsockstate(srt_fd_);
if (srt_status != SRTS_CONNECTED) {
return srs_error_new(ERROR_SRT_IO, "srt_connect, err=%s", srt_getlasterror_str());
}
@ -583,19 +584,21 @@ srs_error_t SrsSrtSocket::accept(SRTSOCKET* client_srt_fd)
int addrlen = sizeof(inaddr);
// @see https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_accept
SRTSOCKET srt_fd = srt_accept(srt_fd_, (sockaddr*)&inaddr, &addrlen);
if (srt_fd == SRT_INVALID_SOCK) {
if (srt_getlasterror(NULL) == SRT_EASYNCRCV) {
// Accept would block, wait until new client connect or error.
if ((err = wait_readable()) != srs_success) {
return srs_error_wrap(err, "wait readable");
}
continue;
} else {
return srs_error_new(ERROR_SRT_IO, "srt_accept, err=%s", srt_getlasterror_str());
}
} else {
// Accept ok, return with the SRT client fd.
if (srt_fd != SRT_INVALID_SOCK) {
*client_srt_fd = srt_fd;
break;
return err;
}
// Got something error, return immediately.
if (srt_getlasterror(NULL) != SRT_EASYNCRCV) {
return srs_error_new(ERROR_SRT_IO, "srt_accept, err=%s", srt_getlasterror_str());
}
// Accept would block, wait until new client connect or error.
if ((err = wait_readable()) != srs_success) {
return srs_error_wrap(err, "wait readable");
}
}
@ -609,19 +612,22 @@ srs_error_t SrsSrtSocket::recvmsg(void* buf, size_t size, ssize_t* nread)
while (true) {
// @see https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_recvmsg
int ret = srt_recvmsg(srt_fd_, (char*)buf, size);
if (ret < 0) {
if (srt_getlasterror(NULL) == SRT_EASYNCRCV) {
if ((err = wait_readable()) != srs_success) {
return srs_error_wrap(err, "wait readable");
}
continue;
} else {
return srs_error_new(ERROR_SRT_IO, "srt_recvmsg, err=%s", srt_getlasterror_str());
}
} else {
// Receive message ok.
if (ret >= 0) {
recv_bytes_ += ret;
*nread = ret;
break;
return err;
}
// Got something error, return immediately.
if (srt_getlasterror(NULL) != SRT_EASYNCRCV) {
return srs_error_new(ERROR_SRT_IO, "srt_recvmsg, err=%s", srt_getlasterror_str());
}
// Wait for the fd ready or error, switch to other coroutines.
if ((err = wait_readable()) != srs_success) {
return srs_error_wrap(err, "wait readable");
}
}
@ -635,19 +641,22 @@ srs_error_t SrsSrtSocket::sendmsg(void* buf, size_t size, ssize_t* nwrite)
while (true) {
// @see https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_sendmsg
int ret = srt_sendmsg(srt_fd_, (const char*)buf, size, -1, 1);
if (ret < 0) {
if (srt_getlasterror(NULL) == SRT_EASYNCSND) {
if ((err = wait_writeable()) != srs_success) {
return srs_error_wrap(err, "wait writeable");
}
continue;
} else {
return srs_error_new(ERROR_SRT_IO, "srt_sendmsg, err=%s", srt_getlasterror_str());
}
} else {
// Send message ok.
if (ret >= 0) {
send_bytes_ += ret;
*nwrite = ret;
break;
return err;
}
// Got something error, return immediately.
if (srt_getlasterror(NULL) != SRT_EASYNCSND) {
return srs_error_new(ERROR_SRT_IO, "srt_sendmsg, err=%s", srt_getlasterror_str());
}
// Wait for the fd ready or error, switch to other coroutines.
if ((err = wait_writeable()) != srs_success) {
return srs_error_wrap(err, "wait writeable");
}
}
@ -671,8 +680,7 @@ srs_error_t SrsSrtSocket::wait_readable()
// Wait event fired or timeout.
int ret = srs_cond_timedwait(read_cond_, recv_timeout_);
// TODO: FIXME: need to disable it?
err = disable_read();
if (err != srs_success) {
if ((err = disable_read()) != srs_success) {
srs_freep(err);
}
@ -709,8 +717,7 @@ srs_error_t SrsSrtSocket::wait_writeable()
}
int ret = srs_cond_timedwait(write_cond_, send_timeout_);
err = disable_write();
if (err != srs_success) {
if ((err = disable_write()) != srs_success) {
srs_freep(err);
}
@ -769,8 +776,10 @@ srs_error_t SrsSrtSocket::disable_write()
return disable_event(SRT_EPOLL_OUT);
}
srs_error_t SrsSrtSocket::enable_event(int event) {
srs_error_t SrsSrtSocket::enable_event(int event)
{
srs_error_t err = srs_success;
// Event has been subscribed.
if ((events_ & event) == event) {
return err;
@ -784,11 +793,14 @@ srs_error_t SrsSrtSocket::enable_event(int event) {
} else {
err = srt_poller_->mod_socket(this);
}
return err;
}
srs_error_t SrsSrtSocket::disable_event(int event) {
srs_error_t SrsSrtSocket::disable_event(int event)
{
srs_error_t err = srs_success;
// Event has been unsubscribed.
if ((events_ & event) == 0) {
return err;
@ -801,10 +813,12 @@ srs_error_t SrsSrtSocket::disable_event(int event) {
} else {
err = srt_poller_->mod_socket(this);
}
return err;
}
srs_error_t SrsSrtSocket::check_error() {
srs_error_t SrsSrtSocket::check_error()
{
srs_error_t err = srs_success;
if (has_error_) {

View file

@ -36,10 +36,6 @@ SrsConfig* _srs_config = NULL;
SrsServer* _srs_server = NULL;
bool _srs_in_docker = false;
#ifdef SRS_SRT
SrsSrtEventLoop* _srt_eventloop = NULL;
#endif
#include <srs_app_st.hpp>
// Initialize global settings.