1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Improve performance for GSO

This commit is contained in:
winlin 2020-04-16 19:33:10 +08:00
parent 32f43a9920
commit 8383f1b27a
2 changed files with 31 additions and 22 deletions

View file

@ -548,6 +548,8 @@ SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int
rtc_session = s; rtc_session = s;
sendonly_ukt = u->copy_sendonly(); sendonly_ukt = u->copy_sendonly();
sender = u->sender();
gso = false; gso = false;
merge_nalus = false; merge_nalus = false;
max_padding = 0; max_padding = 0;
@ -652,6 +654,7 @@ void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* skt)
srs_freep(sendonly_ukt); srs_freep(sendonly_ukt);
sendonly_ukt = skt->copy_sendonly(); sendonly_ukt = skt->copy_sendonly();
sender = skt->sender();
} }
srs_error_t SrsRtcSenderThread::cycle() srs_error_t SrsRtcSenderThread::cycle()
@ -701,6 +704,8 @@ srs_error_t SrsRtcSenderThread::cycle()
// For RTC, we always try to read messages, only wait when no message. // For RTC, we always try to read messages, only wait when no message.
if (msg_count <= 0) { if (msg_count <= 0) {
srs_usleep(0);
#ifdef SRS_PERF_QUEUE_COND_WAIT #ifdef SRS_PERF_QUEUE_COND_WAIT
if (realtime) { if (realtime) {
// for realtime, min required msgs is 0, send when got one+ msgs. // for realtime, min required msgs is 0, send when got one+ msgs.
@ -718,7 +723,7 @@ srs_error_t SrsRtcSenderThread::cycle()
// Transmux and send out messages. // Transmux and send out messages.
pkts.reset(gso, merge_nalus); pkts.reset(gso, merge_nalus);
if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, pkts)) != srs_success) { if ((err = send_messages(source, msgs.msgs, msg_count, pkts)) != srs_success) {
srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err);
} }
@ -759,7 +764,7 @@ srs_error_t SrsRtcSenderThread::cycle()
} }
srs_error_t SrsRtcSenderThread::send_messages( srs_error_t SrsRtcSenderThread::send_messages(
SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets
) { ) {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -775,7 +780,7 @@ srs_error_t SrsRtcSenderThread::send_messages(
#ifndef SRS_AUTO_OSX #ifndef SRS_AUTO_OSX
// If enabled GSO, send out some packets in a msghdr. // If enabled GSO, send out some packets in a msghdr.
if (packets.use_gso) { if (packets.use_gso) {
if ((err = send_packets_gso(skt, packets)) != srs_success) { if ((err = send_packets_gso(packets)) != srs_success) {
return srs_error_wrap(err, "gso send"); return srs_error_wrap(err, "gso send");
} }
return err; return err;
@ -783,7 +788,7 @@ srs_error_t SrsRtcSenderThread::send_messages(
#endif #endif
// By default, we send packets by sendmmsg. // By default, we send packets by sendmmsg.
if ((err = send_packets(skt, packets)) != srs_success) { if ((err = send_packets(packets)) != srs_success) {
return srs_error_wrap(err, "raw send"); return srs_error_wrap(err, "raw send");
} }
@ -867,12 +872,10 @@ srs_error_t SrsRtcSenderThread::messages_to_packets(
return err; return err;
} }
srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets) srs_error_t SrsRtcSenderThread::send_packets(SrsRtcPackets& packets)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
ISrsUdpSender* sender = skt->sender();
int nn_packets = packets.size(); int nn_packets = packets.size();
for (int i = 0; i < nn_packets; i++) { for (int i = 0; i < nn_packets; i++) {
SrsRtpPacket2* packet = packets.at(i); SrsRtpPacket2* packet = packets.at(i);
@ -913,8 +916,8 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets
} }
} }
sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr();
socklen_t addrlen = (socklen_t)skt->peer_addrlen(); socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen();
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
@ -934,7 +937,7 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets
} }
// TODO: FIXME: We can gather and pad audios, because they have similar size. // TODO: FIXME: We can gather and pad audios, because they have similar size.
srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets) srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -943,7 +946,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac
// GSO, N packets has same length, the final one may not. // GSO, N packets has same length, the final one may not.
bool using_gso = false; bool gso_final = false; bool using_gso = false; bool gso_final = false;
ISrsUdpSender* sender = skt->sender();
int nn_packets = packets.size(); int nn_packets = packets.size();
for (int i = 0; i < nn_packets; i++) { for (int i = 0; i < nn_packets; i++) {
SrsRtpPacket2* packet = packets.at(i); SrsRtpPacket2* packet = packets.at(i);
@ -1108,8 +1110,8 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac
#endif #endif
if (do_send) { if (do_send) {
sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr();
socklen_t addrlen = (socklen_t)skt->peer_addrlen(); socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen();
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
@ -1945,13 +1947,17 @@ srs_error_t SrsUdpMuxSender::cycle()
nn_loop++; nn_loop++;
int pos = cache_pos; int pos = cache_pos;
int gso_pos = 0;
int gso_iovs = 0; int gso_iovs = 0;
if (pos <= 0 && gso_pos == 0) { if (pos <= 0) {
waiting_msgs = true; srs_usleep(0);
nn_wait++; if (pos <= 0) {
srs_cond_wait(cond); waiting_msgs = true;
continue; nn_wait++;
srs_cond_wait(cond);
}
if (pos <= 0) {
continue;
}
} }
// We are working on hotspot now. // We are working on hotspot now.
@ -1959,6 +1965,7 @@ srs_error_t SrsUdpMuxSender::cycle()
cache_pos = 0; cache_pos = 0;
// Collect informations for GSO. // Collect informations for GSO.
int gso_pos = 0;
if (pos > 0 && stat_enabled) { if (pos > 0 && stat_enabled) {
// For shared GSO cache, stat the messages. // For shared GSO cache, stat the messages.
mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos;

View file

@ -50,6 +50,7 @@ class SrsRtcSession;
class SrsSharedPtrMessage; class SrsSharedPtrMessage;
class SrsSource; class SrsSource;
class SrsRtpPacket2; class SrsRtpPacket2;
class ISrsUdpSender;
const uint8_t kSR = 200; const uint8_t kSR = 200;
const uint8_t kRR = 201; const uint8_t kRR = 201;
@ -182,6 +183,7 @@ private:
uint16_t video_sequence; uint16_t video_sequence;
public: public:
SrsUdpMuxSocket* sendonly_ukt; SrsUdpMuxSocket* sendonly_ukt;
ISrsUdpSender* sender;
bool merge_nalus; bool merge_nalus;
bool gso; bool gso;
int max_padding; int max_padding;
@ -204,10 +206,10 @@ public:
public: public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
private: private:
srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets);
srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets);
srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); srs_error_t send_packets(SrsRtcPackets& packets);
srs_error_t send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); srs_error_t send_packets_gso(SrsRtcPackets& packets);
private: private:
srs_error_t packet_opus(SrsSample* sample, SrsRtcPackets& packets); srs_error_t packet_opus(SrsSample* sample, SrsRtcPackets& packets);
private: private: