1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 11:51:57 +00:00

SRT: Hide srt implements from API.

This commit is contained in:
winlin 2022-06-07 21:04:04 +08:00
parent d39ec3cf45
commit 6009395c10
9 changed files with 157 additions and 127 deletions

View file

@ -20,7 +20,7 @@ using namespace std;
#include <srs_app_srt_server.hpp>
#include <srs_app_srt_source.hpp>
SrsSrtConnection::SrsSrtConnection(SRTSOCKET srt_fd)
SrsSrtConnection::SrsSrtConnection(srs_srt_t srt_fd)
{
srt_fd_ = srt_fd;
srt_skt_ = new SrsSrtSocket(_srt_eventloop->poller(), srt_fd_);
@ -148,7 +148,7 @@ srs_error_t SrsSrtRecvThread::get_recv_err()
return srs_error_copy(recv_err_);
}
SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, SRTSOCKET srt_fd, std::string ip, int port)
SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port)
{
// Create a identify for this client.
_srs_context->set_id(_srs_context->generate_id());

View file

@ -28,7 +28,7 @@ class SrsSrtServer;
class SrsSrtConnection : public ISrsProtocolReadWriter
{
public:
SrsSrtConnection(SRTSOCKET srt_fd);
SrsSrtConnection(srs_srt_t srt_fd);
virtual ~SrsSrtConnection();
public:
virtual srs_error_t initialize();
@ -46,7 +46,7 @@ public:
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
private:
// The underlayer srt fd handler.
SRTSOCKET srt_fd_;
srs_srt_t srt_fd_;
// The underlayer srt socket.
SrsSrtSocket* srt_skt_;
};
@ -73,7 +73,7 @@ private:
class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHandler
{
public:
SrsMpegtsSrtConn(SrsSrtServer* srt_server, SRTSOCKET srt_fd, std::string ip, int port);
SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port);
virtual ~SrsMpegtsSrtConn();
// Interface ISrsResource.
public:
@ -111,7 +111,7 @@ private:
void http_hooks_on_stop();
private:
SrsSrtServer* srt_server_;
SRTSOCKET srt_fd_;
srs_srt_t srt_fd_;
SrsSrtConnection* srt_conn_;
SrsWallClock* clock_;
SrsKbps* kbps_;

View file

@ -26,7 +26,7 @@ SrsSrtListener::SrsSrtListener(ISrsSrtHandler* h, std::string i, int p)
ip_ = i;
port_ = p;
lfd_ = SRT_INVALID_SOCK;
lfd_ = srs_srt_socket_invalid();
srt_skt_ = NULL;
trd_ = new SrsDummyCoroutine();
@ -36,7 +36,8 @@ SrsSrtListener::~SrsSrtListener()
{
srs_freep(trd_);
srs_freep(srt_skt_);
srt_close(lfd_);
// TODO: FIXME: Handle error.
srs_srt_close(lfd_);
}
int SrsSrtListener::fd()
@ -84,7 +85,7 @@ srs_error_t SrsSrtListener::cycle()
return srs_error_wrap(err, "srt listener");
}
SRTSOCKET client_srt_fd = SRT_INVALID_SOCK;
srs_srt_t client_srt_fd = srs_srt_socket_invalid();
if ((err = srt_skt_->accept(&client_srt_fd)) != srs_success) {
return srs_error_wrap(err, "srt accept");
}

View file

@ -21,14 +21,14 @@ public:
virtual ~ISrsSrtHandler();
public:
// When got srt client.
virtual srs_error_t on_srt_client(SRTSOCKET srt_fd) = 0;
virtual srs_error_t on_srt_client(srs_srt_t srt_fd) = 0;
};
// Bind and listen SRT(udp) port, use handler to process the client.
class SrsSrtListener : public ISrsCoroutineHandler
{
private:
SRTSOCKET lfd_;
srs_srt_t lfd_;
SrsSrtSocket* srt_skt_;
SrsCoroutine* trd_;
private:
@ -39,7 +39,7 @@ public:
SrsSrtListener(ISrsSrtHandler* h, std::string i, int p);
virtual ~SrsSrtListener();
public:
virtual SRTSOCKET fd();
virtual srs_srt_t fd();
public:
// Create srt socket, separate this step because of srt have some option must set before listen.
virtual srs_error_t create_socket();

View file

@ -141,7 +141,7 @@ srs_error_t SrsSrtMessageAcceptor::set_srt_opt()
return err;
}
srs_error_t SrsSrtMessageAcceptor::on_srt_client(SRTSOCKET srt_fd)
srs_error_t SrsSrtMessageAcceptor::on_srt_client(srs_srt_t srt_fd)
{
// Notify srt server to accept srt client, and create new SrsSrtConn on it.
srs_error_t err = srt_server_->accept_srt_client(type_, srt_fd);
@ -226,7 +226,7 @@ void SrsSrtServer::close_listeners(SrsSrtListenerType type)
}
}
srs_error_t SrsSrtServer::accept_srt_client(SrsSrtListenerType type, SRTSOCKET srt_fd)
srs_error_t SrsSrtServer::accept_srt_client(SrsSrtListenerType type, srs_srt_t srt_fd)
{
srs_error_t err = srs_success;
@ -234,7 +234,8 @@ srs_error_t SrsSrtServer::accept_srt_client(SrsSrtListenerType type, SRTSOCKET s
if ((err = fd_to_resource(type, srt_fd, &conn)) != srs_success) {
//close fd on conn error, otherwise will lead to fd leak -gs
srt_close(srt_fd);
// TODO: FIXME: Handle error.
srs_srt_close(srt_fd);
return srs_error_wrap(err, "srt fd to resource");
}
srs_assert(conn);
@ -249,7 +250,7 @@ srs_error_t SrsSrtServer::accept_srt_client(SrsSrtListenerType type, SRTSOCKET s
return err;
}
srs_error_t SrsSrtServer::fd_to_resource(SrsSrtListenerType type, SRTSOCKET srt_fd, ISrsStartableConneciton** pr)
srs_error_t SrsSrtServer::fd_to_resource(SrsSrtListenerType type, srs_srt_t srt_fd, ISrsStartableConneciton** pr)
{
srs_error_t err = srs_success;
@ -271,7 +272,8 @@ srs_error_t SrsSrtServer::fd_to_resource(SrsSrtListenerType type, SRTSOCKET srt_
*pr = new SrsMpegtsSrtConn(this, srt_fd, ip, port);
} else {
srs_warn("close for no service handler. srtfd=%d, ip=%s:%d", srt_fd, ip.c_str(), port);
srt_close(srt_fd);
// TODO: FIXME: Handle error.
srs_srt_close(srt_fd);
return err;
}

View file

@ -50,7 +50,7 @@ public:
virtual srs_error_t set_srt_opt();
// Interface ISrsSrtHandler
public:
virtual srs_error_t on_srt_client(SRTSOCKET srt_fd);
virtual srs_error_t on_srt_client(srs_srt_t srt_fd);
};
// SRS SRT server, initialize and listen, start connection service thread, destroy client.
@ -78,9 +78,9 @@ public:
// @param type, the client type, used to create concrete connection,
// for instance SRT connection to serve client.
// @param srt_fd, the client fd in srt boxed, the underlayer fd.
virtual srs_error_t accept_srt_client(SrsSrtListenerType type, SRTSOCKET srt_fd);
virtual srs_error_t accept_srt_client(SrsSrtListenerType type, srs_srt_t srt_fd);
private:
virtual srs_error_t fd_to_resource(SrsSrtListenerType type, SRTSOCKET srt_fd, ISrsStartableConneciton** pr);
virtual srs_error_t fd_to_resource(SrsSrtListenerType type, srs_srt_t srt_fd, ISrsStartableConneciton** pr);
// Interface ISrsResourceManager
public:
// A callback for connection to remove itself.

View file

@ -14,6 +14,8 @@ using namespace std;
#include <srs_kernel_log.hpp>
#include <srs_core_autofree.hpp>
#include <srt/srt.h>
#define SET_SRT_OPT_STR(srtfd, optname, buf, size) \
if (srt_setsockflag(srtfd, optname, buf, size) == SRT_ERROR) { \
std::stringstream ss; \
@ -41,7 +43,7 @@ using namespace std;
} \
} while (0)
static srs_error_t do_srs_srt_listen(SRTSOCKET srt_fd, addrinfo* r)
static srs_error_t do_srs_srt_listen(srs_srt_t srt_fd, addrinfo* r)
{
srs_error_t err = srs_success;
@ -60,7 +62,7 @@ static srs_error_t do_srs_srt_listen(SRTSOCKET srt_fd, addrinfo* r)
return err;
}
static srs_error_t do_srs_srt_get_streamid(SRTSOCKET srt_fd, string& streamid)
static srs_error_t do_srs_srt_get_streamid(srs_srt_t srt_fd, string& streamid)
{
// SRT max streamid length is 512.
char sid[512];
@ -70,11 +72,16 @@ static srs_error_t do_srs_srt_get_streamid(SRTSOCKET srt_fd, string& streamid)
return srs_success;
}
srs_error_t srs_srt_socket(SRTSOCKET* pfd)
srs_srt_t srs_srt_socket_invalid()
{
return SRT_INVALID_SOCK;
}
srs_error_t srs_srt_socket(srs_srt_t* pfd)
{
srs_error_t err = srs_success;
SRTSOCKET srt_fd = 0;
srs_srt_t srt_fd = 0;
if ((srt_fd = srt_create_socket()) < 0) {
return srs_error_new(ERROR_SOCKET_CREATE, "create srt socket");
}
@ -84,11 +91,18 @@ srs_error_t srs_srt_socket(SRTSOCKET* pfd)
return err;
}
srs_error_t srs_srt_socket_with_default_option(SRTSOCKET* pfd)
srs_error_t srs_srt_close(srs_srt_t fd)
{
// TODO: FIXME: Handle error.
srt_close(fd);
return srs_success;
}
srs_error_t srs_srt_socket_with_default_option(srs_srt_t* pfd)
{
srs_error_t err = srs_success;
SRTSOCKET srt_fd = 0;
srs_srt_t srt_fd = 0;
if ((srt_fd = srt_create_socket()) < 0) {
return srs_error_new(ERROR_SOCKET_CREATE, "create srt socket");
}
@ -114,7 +128,7 @@ srs_error_t srs_srt_socket_with_default_option(SRTSOCKET* pfd)
return err;
}
srs_error_t srs_srt_listen(SRTSOCKET srt_fd, std::string ip, int port)
srs_error_t srs_srt_listen(srs_srt_t srt_fd, std::string ip, int port)
{
srs_error_t err = srs_success;
@ -142,7 +156,7 @@ srs_error_t srs_srt_listen(SRTSOCKET srt_fd, std::string ip, int port)
return err;
}
srs_error_t srs_srt_nonblock(SRTSOCKET srt_fd)
srs_error_t srs_srt_nonblock(srs_srt_t srt_fd)
{
int sync = 0;
SET_SRT_OPT(srt_fd, SRTO_SNDSYN, sync);
@ -151,157 +165,157 @@ srs_error_t srs_srt_nonblock(SRTSOCKET srt_fd)
return srs_success;
}
srs_error_t srs_srt_set_maxbw(SRTSOCKET srt_fd, int maxbw)
srs_error_t srs_srt_set_maxbw(srs_srt_t srt_fd, int maxbw)
{
SET_SRT_OPT(srt_fd, SRTO_MAXBW, maxbw);
return srs_success;
}
srs_error_t srs_srt_set_mss(SRTSOCKET srt_fd, int mss)
srs_error_t srs_srt_set_mss(srs_srt_t srt_fd, int mss)
{
SET_SRT_OPT(srt_fd, SRTO_MSS, mss);
return srs_success;
}
srs_error_t srs_srt_set_payload_size(SRTSOCKET srt_fd, int payload_size)
srs_error_t srs_srt_set_payload_size(srs_srt_t srt_fd, int payload_size)
{
SET_SRT_OPT(srt_fd, SRTO_PAYLOADSIZE, payload_size);
return srs_success;
}
srs_error_t srs_srt_set_connect_timeout(SRTSOCKET srt_fd, int timeout)
srs_error_t srs_srt_set_connect_timeout(srs_srt_t srt_fd, int timeout)
{
SET_SRT_OPT(srt_fd, SRTO_CONNTIMEO, timeout);
return srs_success;
}
srs_error_t srs_srt_set_peer_idle_timeout(SRTSOCKET srt_fd, int timeout)
srs_error_t srs_srt_set_peer_idle_timeout(srs_srt_t srt_fd, int timeout)
{
SET_SRT_OPT(srt_fd, SRTO_PEERIDLETIMEO, timeout);
return srs_success;
}
srs_error_t srs_srt_set_tsbpdmode(SRTSOCKET srt_fd, bool tsbpdmode)
srs_error_t srs_srt_set_tsbpdmode(srs_srt_t srt_fd, bool tsbpdmode)
{
SET_SRT_OPT(srt_fd, SRTO_TSBPDMODE, tsbpdmode);
return srs_success;
}
srs_error_t srs_srt_set_sndbuf(SRTSOCKET srt_fd, int sndbuf)
srs_error_t srs_srt_set_sndbuf(srs_srt_t srt_fd, int sndbuf)
{
SET_SRT_OPT(srt_fd, SRTO_SNDBUF, sndbuf);
return srs_success;
}
srs_error_t srs_srt_set_rcvbuf(SRTSOCKET srt_fd, int rcvbuf)
srs_error_t srs_srt_set_rcvbuf(srs_srt_t srt_fd, int rcvbuf)
{
SET_SRT_OPT(srt_fd, SRTO_RCVBUF, rcvbuf);
return srs_success;
}
srs_error_t srs_srt_set_tlpktdrop(SRTSOCKET srt_fd, bool tlpktdrop)
srs_error_t srs_srt_set_tlpktdrop(srs_srt_t srt_fd, bool tlpktdrop)
{
SET_SRT_OPT(srt_fd, SRTO_TLPKTDROP, tlpktdrop);
return srs_success;
}
srs_error_t srs_srt_set_latency(SRTSOCKET srt_fd, int latency)
srs_error_t srs_srt_set_latency(srs_srt_t srt_fd, int latency)
{
SET_SRT_OPT(srt_fd, SRTO_LATENCY, latency);
return srs_success;
}
srs_error_t srs_srt_set_rcv_latency(SRTSOCKET srt_fd, int rcv_latency)
srs_error_t srs_srt_set_rcv_latency(srs_srt_t srt_fd, int rcv_latency)
{
SET_SRT_OPT(srt_fd, SRTO_RCVLATENCY, rcv_latency);
return srs_success;
}
srs_error_t srs_srt_set_peer_latency(SRTSOCKET srt_fd, int peer_latency)
srs_error_t srs_srt_set_peer_latency(srs_srt_t srt_fd, int peer_latency)
{
SET_SRT_OPT(srt_fd, SRTO_PEERLATENCY, peer_latency);
return srs_success;
}
srs_error_t srs_srt_set_streamid(SRTSOCKET srt_fd, const std::string& streamid)
srs_error_t srs_srt_set_streamid(srs_srt_t srt_fd, const std::string& streamid)
{
SET_SRT_OPT_STR(srt_fd, SRTO_STREAMID, streamid.data(), streamid.size());
return srs_success;
}
srs_error_t srs_srt_get_maxbw(SRTSOCKET srt_fd, int& maxbw)
srs_error_t srs_srt_get_maxbw(srs_srt_t srt_fd, int& maxbw)
{
GET_SRT_OPT(srt_fd, SRTO_MAXBW, maxbw);
return srs_success;
}
srs_error_t srs_srt_get_mss(SRTSOCKET srt_fd, int& mss)
srs_error_t srs_srt_get_mss(srs_srt_t srt_fd, int& mss)
{
GET_SRT_OPT(srt_fd, SRTO_MSS, mss);
return srs_success;
}
srs_error_t srs_srt_get_payload_size(SRTSOCKET srt_fd, int& payload_size)
srs_error_t srs_srt_get_payload_size(srs_srt_t srt_fd, int& payload_size)
{
GET_SRT_OPT(srt_fd, SRTO_PAYLOADSIZE, payload_size);
return srs_success;
}
srs_error_t srs_srt_get_connect_timeout(SRTSOCKET srt_fd, int& timeout)
srs_error_t srs_srt_get_connect_timeout(srs_srt_t srt_fd, int& timeout)
{
GET_SRT_OPT(srt_fd, SRTO_CONNTIMEO, timeout);
return srs_success;
}
srs_error_t srs_srt_get_peer_idle_timeout(SRTSOCKET srt_fd, int& timeout)
srs_error_t srs_srt_get_peer_idle_timeout(srs_srt_t srt_fd, int& timeout)
{
GET_SRT_OPT(srt_fd, SRTO_PEERIDLETIMEO, timeout);
return srs_success;
}
srs_error_t srs_srt_get_tsbpdmode(SRTSOCKET srt_fd, bool& tsbpdmode)
srs_error_t srs_srt_get_tsbpdmode(srs_srt_t srt_fd, bool& tsbpdmode)
{
GET_SRT_OPT(srt_fd, SRTO_TSBPDMODE, tsbpdmode);
return srs_success;
}
srs_error_t srs_srt_get_sndbuf(SRTSOCKET srt_fd, int& sndbuf)
srs_error_t srs_srt_get_sndbuf(srs_srt_t srt_fd, int& sndbuf)
{
GET_SRT_OPT(srt_fd, SRTO_SNDBUF, sndbuf);
return srs_success;
}
srs_error_t srs_srt_get_rcvbuf(SRTSOCKET srt_fd, int& rcvbuf)
srs_error_t srs_srt_get_rcvbuf(srs_srt_t srt_fd, int& rcvbuf)
{
GET_SRT_OPT(srt_fd, SRTO_RCVBUF, rcvbuf);
return srs_success;
}
srs_error_t srs_srt_get_tlpktdrop(SRTSOCKET srt_fd, bool& tlpktdrop)
srs_error_t srs_srt_get_tlpktdrop(srs_srt_t srt_fd, bool& tlpktdrop)
{
GET_SRT_OPT(srt_fd, SRTO_TLPKTDROP, tlpktdrop);
return srs_success;
}
srs_error_t srs_srt_get_latency(SRTSOCKET srt_fd, int& latency)
srs_error_t srs_srt_get_latency(srs_srt_t srt_fd, int& latency)
{
GET_SRT_OPT(srt_fd, SRTO_LATENCY, latency);
return srs_success;
}
srs_error_t srs_srt_get_rcv_latency(SRTSOCKET srt_fd, int& rcv_latency)
srs_error_t srs_srt_get_rcv_latency(srs_srt_t srt_fd, int& rcv_latency)
{
GET_SRT_OPT(srt_fd, SRTO_RCVLATENCY, rcv_latency);
return srs_success;
}
srs_error_t srs_srt_get_peer_latency(SRTSOCKET srt_fd, int& peer_latency)
srs_error_t srs_srt_get_peer_latency(srs_srt_t srt_fd, int& peer_latency)
{
GET_SRT_OPT(srt_fd, SRTO_PEERLATENCY, peer_latency);
return srs_success;
}
srs_error_t srs_srt_get_streamid(SRTSOCKET srt_fd, std::string& streamid)
srs_error_t srs_srt_get_streamid(srs_srt_t srt_fd, std::string& streamid)
{
srs_error_t err = srs_success;
@ -312,7 +326,7 @@ srs_error_t srs_srt_get_streamid(SRTSOCKET srt_fd, std::string& streamid)
return err;
}
srs_error_t srs_srt_get_local_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port)
srs_error_t srs_srt_get_local_ip_port(srs_srt_t srt_fd, std::string& ip, int& port)
{
srs_error_t err = srs_success;
@ -344,7 +358,7 @@ srs_error_t srs_srt_get_local_ip_port(SRTSOCKET srt_fd, std::string& ip, int& po
return err;
}
srs_error_t srs_srt_get_remote_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port)
srs_error_t srs_srt_get_remote_ip_port(srs_srt_t srt_fd, std::string& ip, int& port)
{
srs_error_t err = srs_success;
@ -427,7 +441,7 @@ int SrsSrtStat::pktSndDrop()
return ((SRT_TRACEBSTATS*)stat_)->pktSndDrop;
}
srs_error_t SrsSrtStat::fetch(SRTSOCKET srt_fd, bool clear)
srs_error_t SrsSrtStat::fetch(srs_srt_t srt_fd, bool clear)
{
srs_error_t err = srs_success;
@ -450,9 +464,11 @@ public:
srs_error_t mod_socket(SrsSrtSocket* srt_skt);
srs_error_t del_socket(SrsSrtSocket* srt_skt);
srs_error_t wait(int timeout_ms, int* pn_fds);
public:
virtual int size();
private:
// Find SrsSrtSocket* context by SRTSOCKET.
std::map<SRTSOCKET, SrsSrtSocket*> fd_sockets_;
// Find SrsSrtSocket* context by srs_srt_t.
std::map<srs_srt_t, SrsSrtSocket*> fd_sockets_;
int srt_epoller_fd_;
std::vector<SRT_EPOLL_EVENT> events_;
};
@ -487,7 +503,7 @@ srs_error_t SrsSrtPoller::add_socket(SrsSrtSocket* srt_skt)
srs_error_t err = srs_success;
int events = srt_skt->events();
SRTSOCKET srtfd = srt_skt->fd();
srs_srt_t srtfd = srt_skt->fd();
int ret = srt_epoll_add_usock(srt_epoller_fd_, srtfd, &events);
@ -506,7 +522,7 @@ srs_error_t SrsSrtPoller::del_socket(SrsSrtSocket* srt_skt)
{
srs_error_t err = srs_success;
SRTSOCKET srtfd = srt_skt->fd();
srs_srt_t srtfd = srt_skt->fd();
int ret = srt_epoll_remove_usock(srt_epoller_fd_, srtfd);
srs_info("srt poller %d remove srt socket %d", srt_epoller_fd_, srtfd);
@ -533,7 +549,7 @@ srs_error_t SrsSrtPoller::wait(int timeout_ms, int* pn_fds)
for (int i = 0; i < ret; ++i) {
SRT_EPOLL_EVENT event = events_[i];
map<SRTSOCKET, SrsSrtSocket*>::iterator iter = fd_sockets_.find(event.fd);
map<srs_srt_t, SrsSrtSocket*>::iterator iter = fd_sockets_.find(event.fd);
if (iter == fd_sockets_.end()) {
srs_assert(false);
}
@ -557,12 +573,17 @@ srs_error_t SrsSrtPoller::wait(int timeout_ms, int* pn_fds)
return err;
}
int SrsSrtPoller::size()
{
return (int)fd_sockets_.size();
}
srs_error_t SrsSrtPoller::mod_socket(SrsSrtSocket* srt_skt)
{
srs_error_t err = srs_success;
int events = srt_skt->events();
SRTSOCKET srtfd = srt_skt->fd();
srs_srt_t srtfd = srt_skt->fd();
int ret = srt_epoll_update_usock(srt_epoller_fd_, srtfd, &events);
srs_info("srt poller %d update srt socket %d, events=%d", srt_epoller_fd_, srtfd, events);
@ -587,7 +608,7 @@ ISrsSrtPoller* srs_srt_poller_new()
return new SrsSrtPoller();
}
SrsSrtSocket::SrsSrtSocket(ISrsSrtPoller* srt_poller, SRTSOCKET srt_fd)
SrsSrtSocket::SrsSrtSocket(ISrsSrtPoller* srt_poller, srs_srt_t srt_fd)
{
srt_poller_ = srt_poller;
srt_fd_ = srt_fd;
@ -657,7 +678,7 @@ srs_error_t SrsSrtSocket::connect(const string& ip, int port)
return err;
}
srs_error_t SrsSrtSocket::accept(SRTSOCKET* client_srt_fd)
srs_error_t SrsSrtSocket::accept(srs_srt_t* client_srt_fd)
{
srs_error_t err = srs_success;
@ -665,10 +686,10 @@ srs_error_t SrsSrtSocket::accept(SRTSOCKET* client_srt_fd)
sockaddr_in inaddr;
int addrlen = sizeof(inaddr);
// @see https://github.com/Haivision/srt/blob/master/docs/API/API-functions.md#srt_accept
SRTSOCKET srt_fd = srt_accept(srt_fd_, (sockaddr*)&inaddr, &addrlen);
srs_srt_t srt_fd = srt_accept(srt_fd_, (sockaddr*)&inaddr, &addrlen);
// Accept ok, return with the SRT client fd.
if (srt_fd != SRT_INVALID_SOCK) {
if (srt_fd != srs_srt_socket_invalid()) {
*client_srt_fd = srt_fd;
return err;
}

View file

@ -13,55 +13,57 @@
#include <map>
#include <vector>
#include <srt/srt.h>
class SrsSrtSocket;
typedef int srs_srt_t;
extern srs_srt_t srs_srt_socket_invalid();
// Create srt socket only, with libsrt's default option.
extern srs_error_t srs_srt_socket(SRTSOCKET* pfd);
extern srs_error_t srs_srt_socket(srs_srt_t* pfd);
extern srs_error_t srs_srt_close(srs_srt_t fd);
// Create srt socket with srs recommend default option(tsbpdmode=false,tlpktdrop=false,latency=0,sndsyn=0,rcvsyn=0)
extern srs_error_t srs_srt_socket_with_default_option(SRTSOCKET* pfd);
extern srs_error_t srs_srt_socket_with_default_option(srs_srt_t* pfd);
// For server, listen at SRT endpoint.
extern srs_error_t srs_srt_listen(SRTSOCKET srt_fd, std::string ip, int port);
extern srs_error_t srs_srt_listen(srs_srt_t srt_fd, std::string ip, int port);
// Set read/write no block.
extern srs_error_t srs_srt_nonblock(SRTSOCKET srt_fd);
extern srs_error_t srs_srt_nonblock(srs_srt_t srt_fd);
// Set SRT options.
extern srs_error_t srs_srt_set_maxbw(SRTSOCKET srt_fd, int maxbw);
extern srs_error_t srs_srt_set_mss(SRTSOCKET srt_fd, int mss);
extern srs_error_t srs_srt_set_payload_size(SRTSOCKET srt_fd, int payload_size);
extern srs_error_t srs_srt_set_connect_timeout(SRTSOCKET srt_fd, int timeout);
extern srs_error_t srs_srt_set_peer_idle_timeout(SRTSOCKET srt_fd, int timeout);
extern srs_error_t srs_srt_set_tsbpdmode(SRTSOCKET srt_fd, bool tsbpdmode);
extern srs_error_t srs_srt_set_sndbuf(SRTSOCKET srt_fd, int sndbuf);
extern srs_error_t srs_srt_set_rcvbuf(SRTSOCKET srt_fd, int rcvbuf);
extern srs_error_t srs_srt_set_tlpktdrop(SRTSOCKET srt_fd, bool tlpktdrop);
extern srs_error_t srs_srt_set_latency(SRTSOCKET srt_fd, int latency);
extern srs_error_t srs_srt_set_rcv_latency(SRTSOCKET srt_fd, int rcv_latency);
extern srs_error_t srs_srt_set_peer_latency(SRTSOCKET srt_fd, int peer_latency);
extern srs_error_t srs_srt_set_streamid(SRTSOCKET srt_fd, const std::string& streamid);
extern srs_error_t srs_srt_set_maxbw(srs_srt_t srt_fd, int maxbw);
extern srs_error_t srs_srt_set_mss(srs_srt_t srt_fd, int mss);
extern srs_error_t srs_srt_set_payload_size(srs_srt_t srt_fd, int payload_size);
extern srs_error_t srs_srt_set_connect_timeout(srs_srt_t srt_fd, int timeout);
extern srs_error_t srs_srt_set_peer_idle_timeout(srs_srt_t srt_fd, int timeout);
extern srs_error_t srs_srt_set_tsbpdmode(srs_srt_t srt_fd, bool tsbpdmode);
extern srs_error_t srs_srt_set_sndbuf(srs_srt_t srt_fd, int sndbuf);
extern srs_error_t srs_srt_set_rcvbuf(srs_srt_t srt_fd, int rcvbuf);
extern srs_error_t srs_srt_set_tlpktdrop(srs_srt_t srt_fd, bool tlpktdrop);
extern srs_error_t srs_srt_set_latency(srs_srt_t srt_fd, int latency);
extern srs_error_t srs_srt_set_rcv_latency(srs_srt_t srt_fd, int rcv_latency);
extern srs_error_t srs_srt_set_peer_latency(srs_srt_t srt_fd, int peer_latency);
extern srs_error_t srs_srt_set_streamid(srs_srt_t srt_fd, const std::string& streamid);
// Get SRT options.
extern srs_error_t srs_srt_get_maxbw(SRTSOCKET srt_fd, int& maxbw);
extern srs_error_t srs_srt_get_mss(SRTSOCKET srt_fd, int& mss);
extern srs_error_t srs_srt_get_payload_size(SRTSOCKET srt_fd, int& payload_size);
extern srs_error_t srs_srt_get_connect_timeout(SRTSOCKET srt_fd, int& timeout);
extern srs_error_t srs_srt_get_peer_idle_timeout(SRTSOCKET srt_fd, int& timeout);
extern srs_error_t srs_srt_get_tsbpdmode(SRTSOCKET srt_fd, bool& tsbpdmode);
extern srs_error_t srs_srt_get_sndbuf(SRTSOCKET srt_fd, int& sndbuf);
extern srs_error_t srs_srt_get_rcvbuf(SRTSOCKET srt_fd, int& rcvbuf);
extern srs_error_t srs_srt_get_tlpktdrop(SRTSOCKET srt_fd, bool& tlpktdrop);
extern srs_error_t srs_srt_get_latency(SRTSOCKET srt_fd, int& latency);
extern srs_error_t srs_srt_get_rcv_latency(SRTSOCKET srt_fd, int& rcv_latency);
extern srs_error_t srs_srt_get_peer_latency(SRTSOCKET srt_fd, int& peer_latency);
extern srs_error_t srs_srt_get_streamid(SRTSOCKET srt_fd, std::string& streamid);
extern srs_error_t srs_srt_get_maxbw(srs_srt_t srt_fd, int& maxbw);
extern srs_error_t srs_srt_get_mss(srs_srt_t srt_fd, int& mss);
extern srs_error_t srs_srt_get_payload_size(srs_srt_t srt_fd, int& payload_size);
extern srs_error_t srs_srt_get_connect_timeout(srs_srt_t srt_fd, int& timeout);
extern srs_error_t srs_srt_get_peer_idle_timeout(srs_srt_t srt_fd, int& timeout);
extern srs_error_t srs_srt_get_tsbpdmode(srs_srt_t srt_fd, bool& tsbpdmode);
extern srs_error_t srs_srt_get_sndbuf(srs_srt_t srt_fd, int& sndbuf);
extern srs_error_t srs_srt_get_rcvbuf(srs_srt_t srt_fd, int& rcvbuf);
extern srs_error_t srs_srt_get_tlpktdrop(srs_srt_t srt_fd, bool& tlpktdrop);
extern srs_error_t srs_srt_get_latency(srs_srt_t srt_fd, int& latency);
extern srs_error_t srs_srt_get_rcv_latency(srs_srt_t srt_fd, int& rcv_latency);
extern srs_error_t srs_srt_get_peer_latency(srs_srt_t srt_fd, int& peer_latency);
extern srs_error_t srs_srt_get_streamid(srs_srt_t srt_fd, std::string& streamid);
// Get SRT socket info.
extern srs_error_t srs_srt_get_local_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port);
extern srs_error_t srs_srt_get_remote_ip_port(SRTSOCKET srt_fd, std::string& ip, int& port);
extern srs_error_t srs_srt_get_local_ip_port(srs_srt_t srt_fd, std::string& ip, int& port);
extern srs_error_t srs_srt_get_remote_ip_port(srs_srt_t srt_fd, std::string& ip, int& port);
// Get SRT stats.
class SrsSrtStat
@ -82,7 +84,7 @@ public:
int pktRetrans();
int pktSndDrop();
public:
srs_error_t fetch(SRTSOCKET srt_fd, bool clear);
srs_error_t fetch(srs_srt_t srt_fd, bool clear);
};
// Srt poller, subscribe/unsubscribed events and wait them fired.
@ -99,6 +101,8 @@ public:
// Wait for the fds in its epoll to be fired in specified timeout_ms, where the pn_fds is the number of active fds.
// Note that for ST, please always use timeout_ms(0) and switch coroutine by yourself.
virtual srs_error_t wait(int timeout_ms, int* pn_fds) = 0;
public:
virtual int size() = 0;
};
ISrsSrtPoller* srs_srt_poller_new();
@ -106,15 +110,15 @@ ISrsSrtPoller* srs_srt_poller_new();
class SrsSrtSocket
{
public:
SrsSrtSocket(ISrsSrtPoller* srt_poller, SRTSOCKET srt_fd);
SrsSrtSocket(ISrsSrtPoller* srt_poller, srs_srt_t srt_fd);
virtual ~SrsSrtSocket();
public: // IO API
srs_error_t connect(const std::string& ip, int port);
srs_error_t accept(SRTSOCKET* client_srt_fd);
srs_error_t accept(srs_srt_t* client_srt_fd);
srs_error_t recvmsg(void* buf, size_t size, ssize_t* nread);
srs_error_t sendmsg(void* buf, size_t size, ssize_t* nwrite);
public:
SRTSOCKET fd() const { return srt_fd_; }
srs_srt_t fd() const { return srt_fd_; }
int events() const { return events_; }
public:
void set_recv_timeout(srs_utime_t tm) { recv_timeout_ = tm; }
@ -148,7 +152,7 @@ private:
srs_error_t check_error();
private:
SRTSOCKET srt_fd_;
srs_srt_t srt_fd_;
// Mark if some error occured in srt socket.
bool has_error_;
// When read operator like recvmsg/accept would block, wait this condition timeout or notified,

View file

@ -16,6 +16,8 @@
#include <vector>
using namespace std;
#include <srt/srt.h>
extern SrsSrtEventLoop* _srt_eventloop;
// Test srt st service
@ -26,7 +28,7 @@ VOID TEST(ServiceSrtPoller, SrtPollOperateSocket)
ISrsSrtPoller* srt_poller = srs_srt_poller_new();
HELPER_EXPECT_SUCCESS(srt_poller->initialize());
SRTSOCKET srt_fd = SRT_INVALID_SOCK;
srs_srt_t srt_fd = srs_srt_socket_invalid();
HELPER_EXPECT_SUCCESS(srs_srt_socket(&srt_fd));
EXPECT_TRUE(srt_fd > 0);
@ -53,10 +55,10 @@ VOID TEST(ServiceSrtPoller, SrtPollOperateSocket)
EXPECT_FALSE(srt_socket->events() & SRT_EPOLL_OUT);
EXPECT_TRUE(srt_socket->events() & SRT_EPOLL_ERR);
EXPECT_EQ(srt_poller->fd_sockets_.size(), 1);
EXPECT_EQ(srt_poller->size(), 1);
// Delete socket, will remove in srt poller.
srs_freep(srt_socket);
EXPECT_EQ(srt_poller->fd_sockets_.size(), 0);
EXPECT_EQ(srt_poller->size(), 0);
srs_freep(srt_poller);
}
@ -65,7 +67,7 @@ VOID TEST(ServiceSrtPoller, SrtSetGetSocketOpt)
{
srs_error_t err = srs_success;
SRTSOCKET srt_fd = SRT_INVALID_SOCK;
srs_srt_t srt_fd = srs_srt_socket_invalid();
HELPER_EXPECT_SUCCESS(srs_srt_socket(&srt_fd));
HELPER_EXPECT_SUCCESS(srs_srt_nonblock(srt_fd));
@ -129,10 +131,10 @@ class MockSrtServer
{
public:
SrsSrtSocket* srt_socket_;
SRTSOCKET srt_server_fd_;
srs_srt_t srt_server_fd_;
MockSrtServer() {
srt_server_fd_ = SRT_INVALID_SOCK;
srt_server_fd_ = srs_srt_socket_invalid();
srt_socket_ = NULL;
}
@ -160,7 +162,7 @@ public:
srs_freep(srt_socket_);
}
virtual srs_error_t accept(SRTSOCKET* client_fd) {
virtual srs_error_t accept(srs_srt_t* client_fd) {
srs_error_t err = srs_success;
if ((err = srt_socket_->accept(client_fd)) != srs_success) {
@ -182,32 +184,32 @@ VOID TEST(ServiceStSRTTest, ListenConnectAccept)
HELPER_EXPECT_SUCCESS(srt_server.create_socket());
HELPER_EXPECT_SUCCESS(srt_server.listen(server_ip, server_port));
SRTSOCKET srt_client_fd = SRT_INVALID_SOCK;
srs_srt_t srt_client_fd = srs_srt_socket_invalid();
HELPER_EXPECT_SUCCESS(srs_srt_socket(&srt_client_fd));
SrsSrtSocket* srt_client_socket = new SrsSrtSocket(_srt_eventloop->poller(), srt_client_fd);
// No client connected, accept will timeout.
SRTSOCKET srt_fd = SRT_INVALID_SOCK;
srs_srt_t srt_fd = srs_srt_socket_invalid();
// Make utest fast timeout.
srt_server.srt_socket_->set_recv_timeout(50 * SRS_UTIME_MILLISECONDS);
err = srt_server.accept(&srt_fd);
EXPECT_EQ(srs_error_code(err), ERROR_SRT_TIMEOUT);
EXPECT_EQ(srt_fd, SRT_INVALID_SOCK);
EXPECT_EQ(srt_fd, srs_srt_socket_invalid());
// Client connect to server
HELPER_EXPECT_SUCCESS(srt_client_socket->connect(server_ip, server_port));
// Server will accept one client.
HELPER_EXPECT_SUCCESS(srt_server.accept(&srt_fd));
EXPECT_NE(srt_fd, SRT_INVALID_SOCK);
EXPECT_NE(srt_fd, srs_srt_socket_invalid());
}
VOID TEST(ServiceStSRTTest, ConnectTimeout)
{
srs_error_t err = srs_success;
SRTSOCKET srt_client_fd = SRT_INVALID_SOCK;
srs_srt_t srt_client_fd = srs_srt_socket_invalid();
HELPER_EXPECT_SUCCESS(srs_srt_socket_with_default_option(&srt_client_fd));
SrsSrtSocket* srt_client_socket = new SrsSrtSocket(_srt_eventloop->poller(), srt_client_fd);
@ -228,16 +230,16 @@ VOID TEST(ServiceStSRTTest, ConnectWithStreamid)
HELPER_EXPECT_SUCCESS(srt_server.listen(server_ip, server_port));
std::string streamid = "SRS_SRT_Streamid";
SRTSOCKET srt_client_fd = SRT_INVALID_SOCK;
srs_srt_t srt_client_fd = srs_srt_socket_invalid();
HELPER_EXPECT_SUCCESS(srs_srt_socket_with_default_option(&srt_client_fd));
HELPER_EXPECT_SUCCESS(srs_srt_set_streamid(srt_client_fd, streamid));
SrsSrtSocket* srt_client_socket = new SrsSrtSocket(_srt_eventloop->poller(), srt_client_fd);
HELPER_EXPECT_SUCCESS(srt_client_socket->connect("127.0.0.1", 9000));
SRTSOCKET srt_server_accepted_fd = SRT_INVALID_SOCK;
srs_srt_t srt_server_accepted_fd = srs_srt_socket_invalid();
HELPER_EXPECT_SUCCESS(srt_server.accept(&srt_server_accepted_fd));
EXPECT_NE(srt_server_accepted_fd, SRT_INVALID_SOCK);
EXPECT_NE(srt_server_accepted_fd, srs_srt_socket_invalid());
std::string s;
HELPER_EXPECT_SUCCESS(srs_srt_get_streamid(srt_server_accepted_fd, s));
EXPECT_EQ(s, streamid);
@ -254,7 +256,7 @@ VOID TEST(ServiceStSRTTest, ReadWrite)
HELPER_EXPECT_SUCCESS(srt_server.create_socket());
HELPER_EXPECT_SUCCESS(srt_server.listen(server_ip, server_port));
SRTSOCKET srt_client_fd = SRT_INVALID_SOCK;
srs_srt_t srt_client_fd = srs_srt_socket_invalid();
HELPER_EXPECT_SUCCESS(srs_srt_socket_with_default_option(&srt_client_fd));
SrsSrtSocket* srt_client_socket = new SrsSrtSocket(_srt_eventloop->poller(), srt_client_fd);
@ -262,9 +264,9 @@ VOID TEST(ServiceStSRTTest, ReadWrite)
HELPER_EXPECT_SUCCESS(srt_client_socket->connect(server_ip, server_port));
// Server will accept one client.
SRTSOCKET srt_server_accepted_fd = SRT_INVALID_SOCK;
srs_srt_t srt_server_accepted_fd = srs_srt_socket_invalid();
HELPER_EXPECT_SUCCESS(srt_server.accept(&srt_server_accepted_fd));
EXPECT_NE(srt_server_accepted_fd, SRT_INVALID_SOCK);
EXPECT_NE(srt_server_accepted_fd, srs_srt_socket_invalid());
SrsSrtSocket* srt_server_accepted_socket = new SrsSrtSocket(_srt_eventloop->poller(), srt_server_accepted_fd);
if (true) {
@ -307,15 +309,15 @@ VOID TEST(ServiceStSRTTest, ReadWrite)
class MockSrtHandler : public ISrsSrtHandler
{
private:
SRTSOCKET srt_fd;
srs_srt_t srt_fd;
public:
MockSrtHandler() {
srt_fd = SRT_INVALID_SOCK;
srt_fd = srs_srt_socket_invalid();
}
virtual ~MockSrtHandler() {
}
public:
virtual srs_error_t on_srt_client(SRTSOCKET fd) {
virtual srs_error_t on_srt_client(srs_srt_t fd) {
srt_fd = fd;
return srs_success;
}