diff --git a/trunk/3rdparty/st-srs/io.c b/trunk/3rdparty/st-srs/io.c index 912de0b28..8b0e9294a 100644 --- a/trunk/3rdparty/st-srs/io.c +++ b/trunk/3rdparty/st-srs/io.c @@ -743,6 +743,36 @@ int st_sendmsg(_st_netfd_t *fd, const struct msghdr *msg, int flags, st_utime_t } +int st_sendmmsg(st_netfd_t fd, struct mmsghdr *msgvec, unsigned int vlen, int flags, st_utime_t timeout) +{ + int n; + int left; + struct mmsghdr *p; + + left = (int)vlen; + while (left > 0) { + p = msgvec + (vlen - left); + + if ((n = sendmmsg(fd->osfd, p, left, flags)) < 0) { + if (errno == EINTR) + continue; + if (!_IO_NOT_READY_ERROR) + break; + /* Wait until the socket becomes writable */ + if (st_netfd_poll(fd, POLLOUT, timeout) < 0) + break; + } + + left -= n; + } + + // An error is returned only if no datagrams could be sent. + if (left == (int)vlen) { + return n; + } + return (int)vlen - left; +} + /* * To open FIFOs or other special files. diff --git a/trunk/3rdparty/st-srs/public.h b/trunk/3rdparty/st-srs/public.h index 20b09407b..c54bd790a 100644 --- a/trunk/3rdparty/st-srs/public.h +++ b/trunk/3rdparty/st-srs/public.h @@ -151,6 +151,7 @@ extern int st_recvfrom(st_netfd_t fd, void *buf, int len, struct sockaddr *from, extern int st_sendto(st_netfd_t fd, const void *msg, int len, const struct sockaddr *to, int tolen, st_utime_t timeout); extern int st_recvmsg(st_netfd_t fd, struct msghdr *msg, int flags, st_utime_t timeout); extern int st_sendmsg(st_netfd_t fd, const struct msghdr *msg, int flags, st_utime_t timeout); +extern int st_sendmmsg(st_netfd_t fd, struct mmsghdr *msgvec, unsigned int vlen, int flags, st_utime_t timeout); extern st_netfd_t st_open(const char *path, int oflags, mode_t mode); #ifdef DEBUG diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index fa02a3c4e..4a23ed399 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -316,6 +316,21 @@ srs_error_t SrsUdpMuxSocket::sendto(void* data, int size, srs_utime_t timeout) return err; } +srs_netfd_t SrsUdpMuxSocket::stfd() +{ + return lfd; +} + +sockaddr_in* SrsUdpMuxSocket::peer_addr() +{ + return (sockaddr_in*)&from; +} + +socklen_t SrsUdpMuxSocket::peer_addrlen() +{ + return (socklen_t)fromlen; +} + std::string SrsUdpMuxSocket::get_peer_id() { char id_buf[1024]; diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index dce082efd..0222413c1 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -27,6 +27,7 @@ #include #include +#include #include @@ -148,6 +149,10 @@ public: int recvfrom(srs_utime_t timeout); srs_error_t sendto(void* data, int size, srs_utime_t timeout); + srs_netfd_t stfd(); + sockaddr_in* peer_addr(); + socklen_t peer_addrlen(); + char* data() { return buf; } int size() { return nread; } std::string get_peer_ip() const { return peer_ip; } diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 6b464f173..e8963a213 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -564,6 +564,7 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int { srs_error_t err = srs_success; + vector mhdrs; for (int i = 0; i < nb_msgs; i++) { SrsSharedPtrMessage* msg = msgs[i]; @@ -585,22 +586,42 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int } int length = pkt->size; - char buf[kRtpPacketSize]; + char* buf = new char[kRtpPacketSize]; if ((err = rtc_session->dtls_session->protect_rtp(buf, pkt->payload, length)) != srs_success) { - srs_warn("srtp err %s", srs_error_desc(err).c_str()); - srs_freep(err); + srs_warn("srtp err %s", srs_error_desc(err).c_str()); srs_freep(err); srs_freepa(buf); continue; } - // TODO: use sendmmsg to send multi packet one system call - if ((err = udp_mux_skt->sendto(buf, length, 0)) != srs_success) { - srs_warn("send err %s", srs_error_desc(err).c_str()); - srs_freep(err); - } + mmsghdr mhdr; + memset(&mhdr, 0, sizeof(mmsghdr)); + mhdr.msg_hdr.msg_name = (sockaddr_in*)udp_mux_skt->peer_addr(); + mhdr.msg_hdr.msg_namelen = udp_mux_skt->peer_addrlen(); + mhdr.msg_hdr.msg_iovlen = 1; + mhdr.msg_hdr.msg_iov = new iovec(); + mhdr.msg_hdr.msg_iov->iov_base = buf; + mhdr.msg_hdr.msg_iov->iov_len = length; + mhdrs.push_back(mhdr); } srs_freep(msg); } + + + if (!mhdrs.empty()) { + mmsghdr* msgvec = &mhdrs[0]; + unsigned int vlen = (unsigned int)mhdrs.size(); + int r0 = srs_sendmmsg(udp_mux_skt->stfd(), msgvec, vlen, 0, SRS_UTIME_NO_TIMEOUT); + if (r0 != (int)vlen) { + srs_warn("sendmsg %d msgs, %d done", vlen, r0); + } + } + for (int i = 0; i < (int)mhdrs.size(); i++) { + msghdr* hdr = &mhdrs[i].msg_hdr; + for (int i = 0; i < (int)hdr->msg_iovlen; i++) { + iovec* iov = hdr->msg_iov + i; + delete (char*)iov->iov_base; + } + } } SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index e265c373b..122a1469f 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -124,7 +124,7 @@ #define SRS_PERF_QUEUE_COND_WAIT #ifdef SRS_PERF_QUEUE_COND_WAIT #define SRS_PERF_MW_MIN_MSGS 8 - #define SRS_PERF_MW_MIN_MSGS_FOR_RTC 0 + #define SRS_PERF_MW_MIN_MSGS_FOR_RTC 4 #endif /** * the default value of vhost for diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index 947150f61..8ed113be2 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -407,6 +407,31 @@ int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime return st_sendmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout); } +int srs_sendmmsg(srs_netfd_t stfd, struct mmsghdr *msgvec, unsigned int vlen, int flags, srs_utime_t timeout) +{ +#if defined(SRS_AUTO_OSX) + // @see http://man7.org/linux/man-pages/man2/sendmmsg.2.html + for (int i = 0; i < (int)vlen; ++i) { + struct mmsghdr* p = msgvec + i; + int n = srs_sendmsg(stfd, &p->msg_hdr, flags, timeout); + if (n < 0) { + // An error is returned only if no datagrams could be sent. + if (i == 0) { + return n; + } + return i + 1; + } + + p->msg_len = n; + } + // Returns the number of messages sent from msgvec; if this is less than vlen, the caller can retry with a + // further sendmmsg() call to send the remaining messages. + return vlen; +#else + return st_sendmmsg((st_netfd_t)stfd, msgvec, vlen, flags, (st_utime_t)timeout); +#endif +} + srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout) { return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout); diff --git a/trunk/src/service/srs_service_st.hpp b/trunk/src/service/srs_service_st.hpp index 947950e85..ed10fd667 100644 --- a/trunk/src/service/srs_service_st.hpp +++ b/trunk/src/service/srs_service_st.hpp @@ -90,6 +90,7 @@ extern srs_netfd_t srs_netfd_open(int osfd); extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout); extern int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr *to, int tolen, srs_utime_t timeout); extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout); +extern int srs_sendmmsg(srs_netfd_t stfd, struct mmsghdr *msgvec, unsigned int vlen, int flags, srs_utime_t timeout); extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);