diff --git a/trunk/src/protocol/srs_service_st_srt.cpp b/trunk/src/protocol/srs_service_st_srt.cpp index d2f580911..9aed09fdd 100644 --- a/trunk/src/protocol/srs_service_st_srt.cpp +++ b/trunk/src/protocol/srs_service_st_srt.cpp @@ -549,25 +549,26 @@ srs_error_t SrsSrtSocket::connect(const string& ip, int port) // @see https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_connect int ret = srt_connect(srt_fd_, (const sockaddr*)&inaddr, sizeof(inaddr)); + if (ret != 0) { + return srs_error_new(ERROR_SRT_IO, "srt_connect, err=%s", srt_getlasterror_str()); + } - if (ret == 0) { - // Connect succuess, in async mode, means SRT API succuess and return directly, - // and the connection is in progress, like tcp socket API connect errno EINPROGRESS, - // and the SRT IO threads will do the real handshake step to finish srt connect. - SRT_SOCKSTATUS srt_status = srt_getsockstate(srt_fd_); - if (srt_status != SRTS_CONNECTED) { - // Connect is in progress, wait until it finish or error. - if ((err = wait_writeable()) != srs_success) { - return srs_error_wrap(err, "wait writeable"); - } + // Connect succeed, in async mode, means SRT API succeed and return directly, + // and the connection is in progress, like tcp socket API connect errno EINPROGRESS, + // and the SRT IO threads will do the real handshake step to finish srt connect. + SRT_SOCKSTATUS srt_status = srt_getsockstate(srt_fd_); + if (srt_status == SRTS_CONNECTED) { + return err; + } - // Double check if connect is established. - srt_status = srt_getsockstate(srt_fd_); - if (srt_status != SRTS_CONNECTED) { - return srs_error_new(ERROR_SRT_IO, "srt_connect, err=%s", srt_getlasterror_str()); - } - } - } else { + // Connect is in progress, wait until it finish or error. + if ((err = wait_writeable()) != srs_success) { + return srs_error_wrap(err, "wait writeable"); + } + + // Double check if connect is established. + srt_status = srt_getsockstate(srt_fd_); + if (srt_status != SRTS_CONNECTED) { return srs_error_new(ERROR_SRT_IO, "srt_connect, err=%s", srt_getlasterror_str()); } @@ -583,19 +584,21 @@ srs_error_t SrsSrtSocket::accept(SRTSOCKET* client_srt_fd) int addrlen = sizeof(inaddr); // @see https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_accept SRTSOCKET srt_fd = srt_accept(srt_fd_, (sockaddr*)&inaddr, &addrlen); - if (srt_fd == SRT_INVALID_SOCK) { - if (srt_getlasterror(NULL) == SRT_EASYNCRCV) { - // Accept would block, wait until new client connect or error. - if ((err = wait_readable()) != srs_success) { - return srs_error_wrap(err, "wait readable"); - } - continue; - } else { - return srs_error_new(ERROR_SRT_IO, "srt_accept, err=%s", srt_getlasterror_str()); - } - } else { + + // Accept ok, return with the SRT client fd. + if (srt_fd != SRT_INVALID_SOCK) { *client_srt_fd = srt_fd; - break; + return err; + } + + // Got something error, return immediately. + if (srt_getlasterror(NULL) != SRT_EASYNCRCV) { + return srs_error_new(ERROR_SRT_IO, "srt_accept, err=%s", srt_getlasterror_str()); + } + + // Accept would block, wait until new client connect or error. + if ((err = wait_readable()) != srs_success) { + return srs_error_wrap(err, "wait readable"); } } @@ -609,19 +612,22 @@ srs_error_t SrsSrtSocket::recvmsg(void* buf, size_t size, ssize_t* nread) while (true) { // @see https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_recvmsg int ret = srt_recvmsg(srt_fd_, (char*)buf, size); - if (ret < 0) { - if (srt_getlasterror(NULL) == SRT_EASYNCRCV) { - if ((err = wait_readable()) != srs_success) { - return srs_error_wrap(err, "wait readable"); - } - continue; - } else { - return srs_error_new(ERROR_SRT_IO, "srt_recvmsg, err=%s", srt_getlasterror_str()); - } - } else { + + // Receive message ok. + if (ret >= 0) { recv_bytes_ += ret; *nread = ret; - break; + return err; + } + + // Got something error, return immediately. + if (srt_getlasterror(NULL) != SRT_EASYNCRCV) { + return srs_error_new(ERROR_SRT_IO, "srt_recvmsg, err=%s", srt_getlasterror_str()); + } + + // Wait for the fd ready or error, switch to other coroutines. + if ((err = wait_readable()) != srs_success) { + return srs_error_wrap(err, "wait readable"); } } @@ -635,19 +641,22 @@ srs_error_t SrsSrtSocket::sendmsg(void* buf, size_t size, ssize_t* nwrite) while (true) { // @see https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_sendmsg int ret = srt_sendmsg(srt_fd_, (const char*)buf, size, -1, 1); - if (ret < 0) { - if (srt_getlasterror(NULL) == SRT_EASYNCSND) { - if ((err = wait_writeable()) != srs_success) { - return srs_error_wrap(err, "wait writeable"); - } - continue; - } else { - return srs_error_new(ERROR_SRT_IO, "srt_sendmsg, err=%s", srt_getlasterror_str()); - } - } else { + + // Send message ok. + if (ret >= 0) { send_bytes_ += ret; *nwrite = ret; - break; + return err; + } + + // Got something error, return immediately. + if (srt_getlasterror(NULL) != SRT_EASYNCSND) { + return srs_error_new(ERROR_SRT_IO, "srt_sendmsg, err=%s", srt_getlasterror_str()); + } + + // Wait for the fd ready or error, switch to other coroutines. + if ((err = wait_writeable()) != srs_success) { + return srs_error_wrap(err, "wait writeable"); } } @@ -671,8 +680,7 @@ srs_error_t SrsSrtSocket::wait_readable() // Wait event fired or timeout. int ret = srs_cond_timedwait(read_cond_, recv_timeout_); // TODO: FIXME: need to disable it? - err = disable_read(); - if (err != srs_success) { + if ((err = disable_read()) != srs_success) { srs_freep(err); } @@ -709,8 +717,7 @@ srs_error_t SrsSrtSocket::wait_writeable() } int ret = srs_cond_timedwait(write_cond_, send_timeout_); - err = disable_write(); - if (err != srs_success) { + if ((err = disable_write()) != srs_success) { srs_freep(err); } @@ -769,8 +776,10 @@ srs_error_t SrsSrtSocket::disable_write() return disable_event(SRT_EPOLL_OUT); } -srs_error_t SrsSrtSocket::enable_event(int event) { +srs_error_t SrsSrtSocket::enable_event(int event) +{ srs_error_t err = srs_success; + // Event has been subscribed. if ((events_ & event) == event) { return err; @@ -784,11 +793,14 @@ srs_error_t SrsSrtSocket::enable_event(int event) { } else { err = srt_poller_->mod_socket(this); } + return err; } -srs_error_t SrsSrtSocket::disable_event(int event) { +srs_error_t SrsSrtSocket::disable_event(int event) +{ srs_error_t err = srs_success; + // Event has been unsubscribed. if ((events_ & event) == 0) { return err; @@ -801,10 +813,12 @@ srs_error_t SrsSrtSocket::disable_event(int event) { } else { err = srt_poller_->mod_socket(this); } + return err; } -srs_error_t SrsSrtSocket::check_error() { +srs_error_t SrsSrtSocket::check_error() +{ srs_error_t err = srs_success; if (has_error_) { diff --git a/trunk/src/utest/srs_utest.cpp b/trunk/src/utest/srs_utest.cpp index f2478eb9f..52365b942 100644 --- a/trunk/src/utest/srs_utest.cpp +++ b/trunk/src/utest/srs_utest.cpp @@ -36,10 +36,6 @@ SrsConfig* _srs_config = NULL; SrsServer* _srs_server = NULL; bool _srs_in_docker = false; -#ifdef SRS_SRT -SrsSrtEventLoop* _srt_eventloop = NULL; -#endif - #include // Initialize global settings.