From f0015a7cc1caf6b0c63c0917bc34d0f11f943731 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 17 Apr 2020 07:10:16 +0800 Subject: [PATCH] For #307, refine GSO performance, alloc iovs --- trunk/src/app/srs_app_rtc_conn.cpp | 130 +++++++++--------------- trunk/src/core/srs_core_performance.hpp | 9 +- trunk/src/service/srs_service_st.cpp | 2 + 3 files changed, 58 insertions(+), 83 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index d84ead035..d18602ed5 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -704,8 +704,6 @@ srs_error_t SrsRtcSenderThread::cycle() // For RTC, we always try to read messages, only wait when no message. if (msg_count <= 0) { - srs_usleep(0); - #ifdef SRS_PERF_QUEUE_COND_WAIT if (realtime) { // for realtime, min required msgs is 0, send when got one+ msgs. @@ -887,43 +885,40 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsRtcPackets& packets) return srs_error_wrap(err, "fetch msghdr"); } - // Reset the iovec, we should never change the msg_iovlen. - for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) { - iovec* p = mhdr->msg_hdr.msg_iov + j; + // For this message, select the first iovec. + iovec* iov = mhdr->msg_hdr.msg_iov; + mhdr->msg_hdr.msg_iovlen = 1; - if (!p->iov_len) { - break; - } - p->iov_len = 0; + if (!iov->iov_base) { + iov->iov_base = new char[kRtpPacketSize]; } + iov->iov_len = kRtpPacketSize; - char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; - int length = kRtpPacketSize; - - // Marshal packet to bytes. + // Marshal packet to bytes in iovec. if (true) { - SrsBuffer stream(buf, length); + SrsBuffer stream((char*)iov->iov_base, iov->iov_len); if ((err = packet->encode(&stream)) != srs_success) { return srs_error_wrap(err, "encode packet"); } - length = stream.pos(); + iov->iov_len = stream.pos(); } // Whether encrypt the RTP bytes. if (rtc_session->encrypt) { - if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length)) != srs_success) { + int nn_encrypt = (int)iov->iov_len; + if ((err = rtc_session->dtls_session->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) { return srs_error_wrap(err, "srtp protect"); } + iov->iov_len = (size_t)nn_encrypt; } + // Set the address and control information. sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr(); socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen(); mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; - mhdr->msg_hdr.msg_iov->iov_len = length; mhdr->msg_hdr.msg_controllen = 0; - mhdr->msg_len = 0; // When we send out a packet, we commit a RTP packet. packets.nn_rtp_pkts++; @@ -996,23 +991,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) using_gso = true; gso_final = (gso_size && gso_size != nn_packet); mhdr = gso_mhdr; - - // We need to increase the iov and cursor. - int nb_iovs = mhdr->msg_hdr.msg_iovlen; - if (gso_cursor >= nb_iovs - 1) { - int nn_new_iovs = 1; - mhdr->msg_hdr.msg_iovlen = nb_iovs + nn_new_iovs; - mhdr->msg_hdr.msg_iov = (iovec*)realloc(mhdr->msg_hdr.msg_iov, sizeof(iovec) * (nb_iovs + nn_new_iovs)); - memset(mhdr->msg_hdr.msg_iov + nb_iovs, 0, sizeof(iovec) * nn_new_iovs); - } - gso_cursor++; - - // Create payload cache for RTP packet. - iovec* p = mhdr->msg_hdr.msg_iov + gso_cursor; - if (!p->iov_base) { - p->iov_base = new char[kRtpPacketSize]; - p->iov_len = kRtpPacketSize; - } } // Change the state according to the next packet. @@ -1037,16 +1015,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) return srs_error_wrap(err, "fetch msghdr"); } - // Reset the iovec, we should never change the msg_iovlen. - for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) { - iovec* p = mhdr->msg_hdr.msg_iov + j; - - if (!p->iov_len) { - break; - } - p->iov_len = 0; - } - // Now, GSO will use this message and size. if (using_gso) { gso_mhdr = mhdr; @@ -1054,10 +1022,17 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) } } - // Marshal packet to bytes. + // For this message, select a new iovec. iovec* iov = mhdr->msg_hdr.msg_iov + gso_cursor; + mhdr->msg_hdr.msg_iovlen = gso_cursor + 1; + gso_cursor++; + + if (!iov->iov_base) { + iov->iov_base = new char[kRtpPacketSize]; + } iov->iov_len = kRtpPacketSize; + // Marshal packet to bytes in iovec. if (true) { SrsBuffer stream((char*)iov->iov_base, iov->iov_len); if ((err = packet->encode(&stream)) != srs_success) { @@ -1085,7 +1060,7 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) } // If exceed the max GSO size, set to final. - if (using_gso && gso_cursor > 64) { + if (using_gso && gso_cursor + 1 >= SRS_PERF_RTC_GSO_MAX) { gso_final = true; } @@ -1099,10 +1074,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) if (do_send) { for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) { iovec* iov = mhdr->msg_hdr.msg_iov + j; - - if (iov->iov_len <= 0) { - break; - } srs_trace("#%d, %s #%d/%d/%d, %d/%d bytes, size %d/%d", packets.debug_id, (using_gso? "GSO":"RAW"), j, gso_cursor + 1, mhdr->msg_hdr.msg_iovlen, iov->iov_len, padding, gso_size, gso_encrypt); } @@ -1110,13 +1081,13 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) #endif if (do_send) { + // Set the address and control information. sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr(); socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen(); mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; mhdr->msg_hdr.msg_controllen = 0; - mhdr->msg_len = 0; #ifndef SRS_AUTO_OSX if (using_gso) { @@ -1130,9 +1101,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) cm->cmsg_type = UDP_SEGMENT; cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); *((uint16_t*)CMSG_DATA(cm)) = gso_encrypt; - - // Private message, use it to store the cursor. - mhdr->msg_len = gso_cursor + 1; } #endif @@ -1870,18 +1838,21 @@ srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd) void SrsUdpMuxSender::free_mhdrs(std::vector& mhdrs) { - for (int i = 0; i < (int)mhdrs.size(); i++) { + int nn_mhdrs = (int)mhdrs.size(); + for (int i = 0; i < nn_mhdrs; i++) { + // @see https://linux.die.net/man/2/sendmmsg + // @see https://linux.die.net/man/2/sendmsg mmsghdr* hdr = &mhdrs[i]; // Free control for GSO. char* msg_control = (char*)hdr->msg_hdr.msg_control; - srs_freep(msg_control); + srs_freepa(msg_control); // Free iovec. - for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) { + for (int j = SRS_PERF_RTC_GSO_MAX - 1; j >= 0 ; j--) { iovec* iov = hdr->msg_hdr.msg_iov + j; char* data = (char*)iov->iov_base; - srs_freep(data); + srs_freepa(data); srs_freepa(iov); } } @@ -1892,19 +1863,21 @@ srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr) { // TODO: FIXME: Maybe need to shrink? if (cache_pos >= (int)cache.size()) { + // @see https://linux.die.net/man/2/sendmmsg + // @see https://linux.die.net/man/2/sendmsg mmsghdr mhdr; - memset(&mhdr, 0, sizeof(mmsghdr)); mhdr.msg_len = 0; + mhdr.msg_hdr.msg_flags = 0; + mhdr.msg_hdr.msg_control = NULL; - mhdr.msg_hdr.msg_iovlen = SRS_PERF_RTC_GSO_IOVS; + mhdr.msg_hdr.msg_iovlen = SRS_PERF_RTC_GSO_MAX; mhdr.msg_hdr.msg_iov = new iovec[mhdr.msg_hdr.msg_iovlen]; + memset((void*)mhdr.msg_hdr.msg_iov, 0, sizeof(iovec) * mhdr.msg_hdr.msg_iovlen); - for (int i = 0; i < (int)mhdr.msg_hdr.msg_iovlen; i++) { + for (int i = 0; i < SRS_PERF_RTC_GSO_IOVS; i++) { iovec* p = mhdr.msg_hdr.msg_iov + i; - p->iov_base = new char[kRtpPacketSize]; - p->iov_len = kRtpPacketSize; } cache.push_back(mhdr); @@ -1949,15 +1922,10 @@ srs_error_t SrsUdpMuxSender::cycle() int pos = cache_pos; int gso_iovs = 0; if (pos <= 0) { - srs_usleep(0); - if (pos <= 0) { - waiting_msgs = true; - nn_wait++; - srs_cond_wait(cond); - } - if (pos <= 0) { - continue; - } + waiting_msgs = true; + nn_wait++; + srs_cond_wait(cond); + continue; } // We are working on hotspot now. @@ -1968,16 +1936,12 @@ srs_error_t SrsUdpMuxSender::cycle() int gso_pos = 0; if (pos > 0 && stat_enabled) { // For shared GSO cache, stat the messages. - mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; - for (p = &hotspot[0]; p < end; p++) { - if (!p->msg_len) { - continue; - } - - // Private message, use it to store the cursor. - int real_iovs = p->msg_len; - p->msg_len = 0; + // @see https://linux.die.net/man/2/sendmmsg + // @see https://linux.die.net/man/2/sendmsg + for (int i = 0; i < pos; i++) { + mmsghdr* mhdr = &hotspot[i]; + int real_iovs = mhdr->msg_hdr.msg_iovlen; gso_pos++; nn_gso_msgs++; nn_gso_iovs += real_iovs; gso_iovs += real_iovs; } @@ -1986,6 +1950,8 @@ srs_error_t SrsUdpMuxSender::cycle() // Send out all messages. if (pos > 0) { // Send out all messages. + // @see https://linux.die.net/man/2/sendmmsg + // @see https://linux.die.net/man/2/sendmsg mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; for (p = &hotspot[0]; p < end; p += max_sendmmsg) { int vlen = (int)(end - p); diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 6958aed4f..01c6138f7 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -201,7 +201,14 @@ // = 715MB # For SRS_PERF_RTC_GSO_IOVS = 1 // = 1402MB # For SRS_PERF_RTC_GSO_IOVS = 2 // = 2775MB # For SRS_PERF_RTC_GSO_IOVS = 4 -#define SRS_PERF_RTC_GSO_IOVS 2 +#if defined(__linux__) + #define SRS_PERF_RTC_GSO_IOVS 2 +#else + #define SRS_PERF_RTC_GSO_IOVS 1 +#endif + +// For RTC, the max iovs in msghdr, the max packets sent in a msghdr. +#define SRS_PERF_RTC_GSO_MAX 64 #endif diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index 4e1e1bc01..da54a5dae 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -436,6 +436,8 @@ int srs_sendmmsg(srs_netfd_t stfd, struct mmsghdr *msgvec, unsigned int vlen, in } msgvec->msg_len = r0; #else + msgvec->msg_len = 0; + int tolen = (int)msgvec->msg_hdr.msg_namelen; const struct sockaddr* to = (const struct sockaddr*)msgvec->msg_hdr.msg_name; for (int i = 0; i < (int)msgvec->msg_hdr.msg_iovlen; i++) {