diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 1d975ea74..922f81c38 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -581,7 +581,7 @@ srs_error_t SrsRtcSenderThread::cycle() } int nn_rtp_pkts = 0; - if ((err = send_messages(source, msgs.msgs, msg_count, sendonly_ukt, &nn_rtp_pkts)) != srs_success) { + if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, &nn_rtp_pkts)) != srs_success) { srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); } @@ -601,8 +601,7 @@ srs_error_t SrsRtcSenderThread::cycle() } srs_error_t SrsRtcSenderThread::send_messages( - SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, - SrsUdpMuxSocket* skt, int* pnn_rtp_pkts + SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, int* pnn_rtp_pkts ) { srs_error_t err = srs_success; @@ -620,13 +619,12 @@ srs_error_t SrsRtcSenderThread::send_messages( return err; } + // Send out RTP packets *pnn_rtp_pkts += (int)packets.size(); + err = send_packets(skt, packets); for (int j = 0; j < (int)packets.size(); j++) { SrsRtpPacket2* packet = packets[j]; - if ((err = send_packet(packet, skt)) != srs_success) { - srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); - } srs_freep(packet); } @@ -695,48 +693,52 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( return err; } -srs_error_t SrsRtcSenderThread::send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, vector& packets) { srs_error_t err = srs_success; - ISrsUdpSender* sender = skt->sender(); + for (vector::iterator it = packets.begin(); it != packets.end(); ++it) { + SrsRtpPacket2* packet = *it; + ISrsUdpSender* sender = skt->sender(); - // Fetch a cached message from queue. - // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. - mmsghdr* mhdr = NULL; - if ((err = sender->fetch(&mhdr)) != srs_success) { - return srs_error_wrap(err, "fetch msghdr"); - } - char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; - int length = kRtpPacketSize; - - // Marshal packet to bytes. - if (true) { - SrsBuffer stream(buf, length); - if ((err = pkt->encode(&stream)) != srs_success) { - return srs_error_wrap(err, "encode packet"); + // Fetch a cached message from queue. + // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. + mmsghdr* mhdr = NULL; + if ((err = sender->fetch(&mhdr)) != srs_success) { + return srs_error_wrap(err, "fetch msghdr"); } - length = stream.pos(); - } + char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; + int length = kRtpPacketSize; - // Whether encrypt the RTP bytes. - if (rtc_session->encrypt) { - if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length)) != srs_success) { - return srs_error_wrap(err, "srtp protect"); + // Marshal packet to bytes. + if (true) { + SrsBuffer stream(buf, length); + if ((err = packet->encode(&stream)) != srs_success) { + return srs_error_wrap(err, "encode packet"); + } + length = stream.pos(); + } + + // Whether encrypt the RTP bytes. + if (rtc_session->encrypt) { + if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length)) != srs_success) { + return srs_error_wrap(err, "srtp protect"); + } + } + + sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); + socklen_t addrlen = (socklen_t)skt->peer_addrlen(); + + mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; + mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; + mhdr->msg_hdr.msg_iov->iov_len = length; + mhdr->msg_len = 0; + + if ((err = sender->sendmmsg(mhdr)) != srs_success) { + return srs_error_wrap(err, "send msghdr"); } } - sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); - socklen_t addrlen = (socklen_t)skt->peer_addrlen(); - - mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; - mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; - mhdr->msg_hdr.msg_iov->iov_len = length; - mhdr->msg_len = 0; - - if ((err = sender->sendmmsg(mhdr)) != srs_success) { - return srs_error_wrap(err, "send msghdr"); - } return err; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index a0c370942..180872144 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -152,9 +152,9 @@ public: public: virtual srs_error_t cycle(); private: - srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn_rtp_pkts); + srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, int* pnn_rtp_pkts); srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, std::vector& packets); - srs_error_t send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt); + srs_error_t send_packets(SrsUdpMuxSocket* skt, std::vector& packets); private: srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); private: