1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #307, support sendmmsg to improve RTC performance

This commit is contained in:
winlin 2020-04-04 22:43:44 +08:00
parent 3793404ceb
commit 55a4052d90
8 changed files with 107 additions and 9 deletions

View file

@ -743,6 +743,36 @@ int st_sendmsg(_st_netfd_t *fd, const struct msghdr *msg, int flags, st_utime_t
} }
int st_sendmmsg(st_netfd_t fd, struct mmsghdr *msgvec, unsigned int vlen, int flags, st_utime_t timeout)
{
int n;
int left;
struct mmsghdr *p;
left = (int)vlen;
while (left > 0) {
p = msgvec + (vlen - left);
if ((n = sendmmsg(fd->osfd, p, left, flags)) < 0) {
if (errno == EINTR)
continue;
if (!_IO_NOT_READY_ERROR)
break;
/* Wait until the socket becomes writable */
if (st_netfd_poll(fd, POLLOUT, timeout) < 0)
break;
}
left -= n;
}
// An error is returned only if no datagrams could be sent.
if (left == (int)vlen) {
return n;
}
return (int)vlen - left;
}
/* /*
* To open FIFOs or other special files. * To open FIFOs or other special files.

View file

@ -151,6 +151,7 @@ extern int st_recvfrom(st_netfd_t fd, void *buf, int len, struct sockaddr *from,
extern int st_sendto(st_netfd_t fd, const void *msg, int len, const struct sockaddr *to, int tolen, st_utime_t timeout); extern int st_sendto(st_netfd_t fd, const void *msg, int len, const struct sockaddr *to, int tolen, st_utime_t timeout);
extern int st_recvmsg(st_netfd_t fd, struct msghdr *msg, int flags, st_utime_t timeout); extern int st_recvmsg(st_netfd_t fd, struct msghdr *msg, int flags, st_utime_t timeout);
extern int st_sendmsg(st_netfd_t fd, const struct msghdr *msg, int flags, st_utime_t timeout); extern int st_sendmsg(st_netfd_t fd, const struct msghdr *msg, int flags, st_utime_t timeout);
extern int st_sendmmsg(st_netfd_t fd, struct mmsghdr *msgvec, unsigned int vlen, int flags, st_utime_t timeout);
extern st_netfd_t st_open(const char *path, int oflags, mode_t mode); extern st_netfd_t st_open(const char *path, int oflags, mode_t mode);
#ifdef DEBUG #ifdef DEBUG

View file

@ -316,6 +316,21 @@ srs_error_t SrsUdpMuxSocket::sendto(void* data, int size, srs_utime_t timeout)
return err; return err;
} }
srs_netfd_t SrsUdpMuxSocket::stfd()
{
return lfd;
}
sockaddr_in* SrsUdpMuxSocket::peer_addr()
{
return (sockaddr_in*)&from;
}
socklen_t SrsUdpMuxSocket::peer_addrlen()
{
return (socklen_t)fromlen;
}
std::string SrsUdpMuxSocket::get_peer_id() std::string SrsUdpMuxSocket::get_peer_id()
{ {
char id_buf[1024]; char id_buf[1024];

View file

@ -27,6 +27,7 @@
#include <srs_core.hpp> #include <srs_core.hpp>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h>
#include <string> #include <string>
@ -148,6 +149,10 @@ public:
int recvfrom(srs_utime_t timeout); int recvfrom(srs_utime_t timeout);
srs_error_t sendto(void* data, int size, srs_utime_t timeout); srs_error_t sendto(void* data, int size, srs_utime_t timeout);
srs_netfd_t stfd();
sockaddr_in* peer_addr();
socklen_t peer_addrlen();
char* data() { return buf; } char* data() { return buf; }
int size() { return nread; } int size() { return nread; }
std::string get_peer_ip() const { return peer_ip; } std::string get_peer_ip() const { return peer_ip; }

View file

@ -564,6 +564,7 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
vector<mmsghdr> mhdrs;
for (int i = 0; i < nb_msgs; i++) { for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i]; SrsSharedPtrMessage* msg = msgs[i];
@ -585,22 +586,42 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int
} }
int length = pkt->size; int length = pkt->size;
char buf[kRtpPacketSize]; char* buf = new char[kRtpPacketSize];
if ((err = rtc_session->dtls_session->protect_rtp(buf, pkt->payload, length)) != srs_success) { if ((err = rtc_session->dtls_session->protect_rtp(buf, pkt->payload, length)) != srs_success) {
srs_warn("srtp err %s", srs_error_desc(err).c_str()); srs_warn("srtp err %s", srs_error_desc(err).c_str()); srs_freep(err); srs_freepa(buf);
srs_freep(err);
continue; continue;
} }
// TODO: use sendmmsg to send multi packet one system call mmsghdr mhdr;
if ((err = udp_mux_skt->sendto(buf, length, 0)) != srs_success) { memset(&mhdr, 0, sizeof(mmsghdr));
srs_warn("send err %s", srs_error_desc(err).c_str()); mhdr.msg_hdr.msg_name = (sockaddr_in*)udp_mux_skt->peer_addr();
srs_freep(err); mhdr.msg_hdr.msg_namelen = udp_mux_skt->peer_addrlen();
} mhdr.msg_hdr.msg_iovlen = 1;
mhdr.msg_hdr.msg_iov = new iovec();
mhdr.msg_hdr.msg_iov->iov_base = buf;
mhdr.msg_hdr.msg_iov->iov_len = length;
mhdrs.push_back(mhdr);
} }
srs_freep(msg); srs_freep(msg);
} }
if (!mhdrs.empty()) {
mmsghdr* msgvec = &mhdrs[0];
unsigned int vlen = (unsigned int)mhdrs.size();
int r0 = srs_sendmmsg(udp_mux_skt->stfd(), msgvec, vlen, 0, SRS_UTIME_NO_TIMEOUT);
if (r0 != (int)vlen) {
srs_warn("sendmsg %d msgs, %d done", vlen, r0);
}
}
for (int i = 0; i < (int)mhdrs.size(); i++) {
msghdr* hdr = &mhdrs[i].msg_hdr;
for (int i = 0; i < (int)hdr->msg_iovlen; i++) {
iovec* iov = hdr->msg_iov + i;
delete (char*)iov->iov_base;
}
}
} }
SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id)

View file

@ -124,7 +124,7 @@
#define SRS_PERF_QUEUE_COND_WAIT #define SRS_PERF_QUEUE_COND_WAIT
#ifdef SRS_PERF_QUEUE_COND_WAIT #ifdef SRS_PERF_QUEUE_COND_WAIT
#define SRS_PERF_MW_MIN_MSGS 8 #define SRS_PERF_MW_MIN_MSGS 8
#define SRS_PERF_MW_MIN_MSGS_FOR_RTC 0 #define SRS_PERF_MW_MIN_MSGS_FOR_RTC 4
#endif #endif
/** /**
* the default value of vhost for * the default value of vhost for

View file

@ -407,6 +407,31 @@ int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime
return st_sendmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout); return st_sendmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout);
} }
int srs_sendmmsg(srs_netfd_t stfd, struct mmsghdr *msgvec, unsigned int vlen, int flags, srs_utime_t timeout)
{
#if defined(SRS_AUTO_OSX)
// @see http://man7.org/linux/man-pages/man2/sendmmsg.2.html
for (int i = 0; i < (int)vlen; ++i) {
struct mmsghdr* p = msgvec + i;
int n = srs_sendmsg(stfd, &p->msg_hdr, flags, timeout);
if (n < 0) {
// An error is returned only if no datagrams could be sent.
if (i == 0) {
return n;
}
return i + 1;
}
p->msg_len = n;
}
// Returns the number of messages sent from msgvec; if this is less than vlen, the caller can retry with a
// further sendmmsg() call to send the remaining messages.
return vlen;
#else
return st_sendmmsg((st_netfd_t)stfd, msgvec, vlen, flags, (st_utime_t)timeout);
#endif
}
srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout) srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout)
{ {
return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout); return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout);

View file

@ -90,6 +90,7 @@ extern srs_netfd_t srs_netfd_open(int osfd);
extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout); extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout);
extern int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr *to, int tolen, srs_utime_t timeout); extern int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr *to, int tolen, srs_utime_t timeout);
extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout); extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout);
extern int srs_sendmmsg(srs_netfd_t stfd, struct mmsghdr *msgvec, unsigned int vlen, int flags, srs_utime_t timeout);
extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout); extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);