diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index c3232a476..d84ead035 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -548,6 +548,8 @@ SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int rtc_session = s; sendonly_ukt = u->copy_sendonly(); + sender = u->sender(); + gso = false; merge_nalus = false; max_padding = 0; @@ -652,6 +654,7 @@ void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* skt) srs_freep(sendonly_ukt); sendonly_ukt = skt->copy_sendonly(); + sender = skt->sender(); } srs_error_t SrsRtcSenderThread::cycle() @@ -701,6 +704,8 @@ srs_error_t SrsRtcSenderThread::cycle() // For RTC, we always try to read messages, only wait when no message. if (msg_count <= 0) { + srs_usleep(0); + #ifdef SRS_PERF_QUEUE_COND_WAIT if (realtime) { // for realtime, min required msgs is 0, send when got one+ msgs. @@ -718,7 +723,7 @@ srs_error_t SrsRtcSenderThread::cycle() // Transmux and send out messages. pkts.reset(gso, merge_nalus); - if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, pkts)) != srs_success) { + if ((err = send_messages(source, msgs.msgs, msg_count, pkts)) != srs_success) { srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); } @@ -759,7 +764,7 @@ srs_error_t SrsRtcSenderThread::cycle() } srs_error_t SrsRtcSenderThread::send_messages( - SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets + SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets ) { srs_error_t err = srs_success; @@ -775,7 +780,7 @@ srs_error_t SrsRtcSenderThread::send_messages( #ifndef SRS_AUTO_OSX // If enabled GSO, send out some packets in a msghdr. if (packets.use_gso) { - if ((err = send_packets_gso(skt, packets)) != srs_success) { + if ((err = send_packets_gso(packets)) != srs_success) { return srs_error_wrap(err, "gso send"); } return err; @@ -783,7 +788,7 @@ srs_error_t SrsRtcSenderThread::send_messages( #endif // By default, we send packets by sendmmsg. - if ((err = send_packets(skt, packets)) != srs_success) { + if ((err = send_packets(packets)) != srs_success) { return srs_error_wrap(err, "raw send"); } @@ -867,12 +872,10 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( return err; } -srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets) +srs_error_t SrsRtcSenderThread::send_packets(SrsRtcPackets& packets) { srs_error_t err = srs_success; - ISrsUdpSender* sender = skt->sender(); - int nn_packets = packets.size(); for (int i = 0; i < nn_packets; i++) { SrsRtpPacket2* packet = packets.at(i); @@ -913,8 +916,8 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets } } - sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); - socklen_t addrlen = (socklen_t)skt->peer_addrlen(); + sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr(); + socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen(); mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; @@ -934,7 +937,7 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets } // TODO: FIXME: We can gather and pad audios, because they have similar size. -srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets) +srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -943,7 +946,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac // GSO, N packets has same length, the final one may not. bool using_gso = false; bool gso_final = false; - ISrsUdpSender* sender = skt->sender(); int nn_packets = packets.size(); for (int i = 0; i < nn_packets; i++) { SrsRtpPacket2* packet = packets.at(i); @@ -1108,8 +1110,8 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac #endif if (do_send) { - sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); - socklen_t addrlen = (socklen_t)skt->peer_addrlen(); + sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr(); + socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen(); mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; @@ -1945,13 +1947,17 @@ srs_error_t SrsUdpMuxSender::cycle() nn_loop++; int pos = cache_pos; - int gso_pos = 0; int gso_iovs = 0; - if (pos <= 0 && gso_pos == 0) { - waiting_msgs = true; - nn_wait++; - srs_cond_wait(cond); - continue; + if (pos <= 0) { + srs_usleep(0); + if (pos <= 0) { + waiting_msgs = true; + nn_wait++; + srs_cond_wait(cond); + } + if (pos <= 0) { + continue; + } } // We are working on hotspot now. @@ -1959,6 +1965,7 @@ srs_error_t SrsUdpMuxSender::cycle() cache_pos = 0; // Collect informations for GSO. + int gso_pos = 0; if (pos > 0 && stat_enabled) { // For shared GSO cache, stat the messages. mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 0af9d0b5e..a1fe62514 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -50,6 +50,7 @@ class SrsRtcSession; class SrsSharedPtrMessage; class SrsSource; class SrsRtpPacket2; +class ISrsUdpSender; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -182,6 +183,7 @@ private: uint16_t video_sequence; public: SrsUdpMuxSocket* sendonly_ukt; + ISrsUdpSender* sender; bool merge_nalus; bool gso; int max_padding; @@ -204,10 +206,10 @@ public: public: virtual srs_error_t cycle(); private: - srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); + srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); - srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); - srs_error_t send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); + srs_error_t send_packets(SrsRtcPackets& packets); + srs_error_t send_packets_gso(SrsRtcPackets& packets); private: srs_error_t packet_opus(SrsSample* sample, SrsRtcPackets& packets); private: