diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index e63be65dc..e4f57800d 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -469,6 +469,8 @@ SrsRtcPackets::SrsRtcPackets() nn_audios = nn_extras = 0; nn_videos = nn_samples = 0; nn_paddings = 0; + + cursor = 0; } SrsRtcPackets::~SrsRtcPackets() @@ -481,12 +483,38 @@ void SrsRtcPackets::reset(bool gso, bool merge_nalus) use_gso = gso; should_merge_nalus = merge_nalus; - vector::iterator it; - for (it = packets.begin(); it != packets.end(); ++it ) { - SrsRtpPacket2* packet = *it; - srs_freep(packet); + for (int i = 0; i < cursor; i++) { + SrsRtpPacket2* packet = packets[i]; + packet->reset(); } - packets.clear(); + + cursor = 0; +} + +SrsRtpPacket2* SrsRtcPackets::fetch() +{ + if (cursor >= (int)packets.size()) { + packets.push_back(new SrsRtpPacket2()); + } + + return packets[cursor++]; +} + +SrsRtpPacket2* SrsRtcPackets::back() +{ + srs_assert(cursor > 0); + return packets[cursor - 1]; +} + +int SrsRtcPackets::size() +{ + return cursor; +} + +SrsRtpPacket2* SrsRtcPackets::at(int index) +{ + srs_assert(index < cursor); + return packets[index]; } SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid) @@ -684,12 +712,12 @@ srs_error_t SrsRtcSenderThread::cycle() int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos; stat->perf_on_rtc_packets(nn_rtc_packets); // Stat the RAW RTP packets, which maybe group by GSO. - stat->perf_on_rtp_packets(pkts.packets.size()); + stat->perf_on_rtp_packets(pkts.size()); // Stat the RTP packets going into kernel. stat->perf_on_gso_packets(pkts.nn_rtp_pkts); #if defined(SRS_DEBUG) srs_trace("RTC PLAY done, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d bytes", - msg_count, nn_rtc_packets, pkts.packets.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, + msg_count, nn_rtc_packets, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, pkts.nn_samples, pkts.nn_bytes); #endif @@ -697,7 +725,7 @@ srs_error_t SrsRtcSenderThread::cycle() if (pprint->can_print()) { // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. srs_trace("-> RTC PLAY %d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d bytes, %d pad", - msg_count, pkts.packets.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, + msg_count, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, pkts.nn_samples, pkts.nn_bytes, pkts.nn_paddings); } } @@ -753,16 +781,14 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( packets.nn_samples += nn_samples; // For audio, we transcoded AAC to opus in extra payloads. - SrsRtpPacket2* packet = NULL; if (msg->is_audio()) { packets.nn_audios++; for (int i = 0; i < nn_extra_payloads; i++) { SrsSample* sample = msg->extra_payloads() + i; - if ((err = packet_opus(sample, &packet)) != srs_success) { + if ((err = packet_opus(sample, packets)) != srs_success) { return srs_error_wrap(err, "opus package"); } - packets.packets.push_back(packet); } continue; } @@ -772,10 +798,9 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. if (msg->has_idr()) { - if ((err = packet_stap_a(source, msg, &packet)) != srs_success) { + if ((err = packet_stap_a(source, msg, packets)) != srs_success) { return srs_error_wrap(err, "packet stap-a"); } - packets.packets.push_back(packet); } // If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet. @@ -797,10 +822,9 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( } if (sample->size <= kRtpMaxPayloadSize) { - if ((err = packet_single_nalu(msg, sample, &packet)) != srs_success) { + if ((err = packet_single_nalu(msg, sample, packets)) != srs_success) { return srs_error_wrap(err, "packet single nalu"); } - packets.packets.push_back(packet); } else { if ((err = packet_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) { return srs_error_wrap(err, "packet fu-a"); @@ -808,7 +832,7 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( } if (i == nn_samples - 1) { - packets.packets.back()->rtp_header.set_marker(true); + packets.back()->rtp_header.set_marker(true); } } } @@ -822,9 +846,9 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets ISrsUdpSender* sender = skt->sender(); - vector::iterator it; - for (it = packets.packets.begin(); it != packets.packets.end(); ++it) { - SrsRtpPacket2* packet = *it; + int nn_packets = packets.size(); + for (int i = 0; i < nn_packets; i++) { + SrsRtpPacket2* packet = packets.at(i); // Fetch a cached message from queue. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. @@ -832,6 +856,17 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets if ((err = sender->fetch(&mhdr)) != srs_success) { return srs_error_wrap(err, "fetch msghdr"); } + + // Reset the iovec, we should never change the msg_iovlen. + for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) { + iovec* p = mhdr->msg_hdr.msg_iov + j; + + if (!p->iov_len) { + break; + } + p->iov_len = 0; + } + char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; int length = kRtpPacketSize; @@ -882,15 +917,15 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac bool use_gso = false; bool gso_final = false; ISrsUdpSender* sender = skt->sender(); - int nn_packets = (int)packets.packets.size(); + int nn_packets = packets.size(); for (int i = 0; i < nn_packets; i++) { - SrsRtpPacket2* packet = packets.packets[i]; + SrsRtpPacket2* packet = packets.at(i); int nn_packet = packet->nb_bytes(); SrsRtpPacket2* next_packet = NULL; int nn_next_packet = 0; if (i < nn_packets - 1) { - next_packet = (i < nn_packets - 1)? packets.packets[i + 1]:NULL; + next_packet = (i < nn_packets - 1)? packets.at(i + 1):NULL; nn_next_packet = next_packet? next_packet->nb_bytes() : 0; } @@ -1075,7 +1110,7 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac } #if defined(SRS_DEBUG) - srs_trace("RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d, pad %d", packets.packets.size(), packets.nn_rtp_pkts, + srs_trace("RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d, pad %d", packets.size(), packets.nn_rtp_pkts, packets.nn_videos, packets.nn_samples, packets.nn_audios, packets.nn_extras, packets.nn_paddings); #endif @@ -1109,13 +1144,12 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac if (nn_bytes < kRtpMaxPayloadSize) { // Package NALUs in a single RTP packet. - SrsRtpPacket2* packet = new SrsRtpPacket2(); + SrsRtpPacket2* packet = packets.fetch(); packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_payload_type(video_payload_type); packet->payload = raw; - packets.packets.push_back(packet); } else { SrsAutoFree(SrsRtpRawNALUs, raw); @@ -1131,15 +1165,19 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac for (int i = 0; i < num_of_packet; ++i) { int packet_size = srs_min(nb_left, fu_payload_size); - SrsRtpPacket2* packet = new SrsRtpPacket2(); - packets.packets.push_back(packet); + SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); + if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) { + srs_freep(fua); + return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes); + } + + SrsRtpPacket2* packet = packets.fetch(); packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_payload_type(video_payload_type); - SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); packet->payload = fua; fua->nri = (SrsAvcNaluType)header; @@ -1147,26 +1185,22 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac fua->start = bool(i == 0); fua->end = bool(i == num_of_packet - 1); - if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) { - return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes); - } - nb_left -= packet_size; } } - if (!packets.packets.empty()) { - packets.packets.back()->rtp_header.set_marker(true); + if (packets.size() > 0) { + packets.back()->rtp_header.set_marker(true); } return err; } -srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket) +srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtcPackets& packets) { srs_error_t err = srs_success; - SrsRtpPacket2* packet = new SrsRtpPacket2(); + SrsRtpPacket2* packet = packets.fetch(); packet->rtp_header.set_marker(true); packet->rtp_header.set_timestamp(audio_timestamp); packet->rtp_header.set_sequence(audio_sequence++); @@ -1181,8 +1215,6 @@ srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** p // TODO: FIXME: Why 960? Need Refactoring? audio_timestamp += 960; - *ppacket = packet; - return err; } @@ -1199,8 +1231,7 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* for (int i = 0; i < num_of_packet; ++i) { int packet_size = srs_min(nb_left, fu_payload_size); - SrsRtpPacket2* packet = new SrsRtpPacket2(); - packets.packets.push_back(packet); + SrsRtpPacket2* packet = packets.fetch(); packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); @@ -1228,11 +1259,11 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* } // Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 -srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket) +srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets) { srs_error_t err = srs_success; - SrsRtpPacket2* packet = new SrsRtpPacket2(); + SrsRtpPacket2* packet = packets.fetch(); packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_ssrc(video_ssrc); @@ -1246,12 +1277,10 @@ srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, Srs p->size = sample->size; raw->push_back(p); - *ppacket = packet; - return err; } -srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket) +srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -1271,7 +1300,7 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty"); } - SrsRtpPacket2* packet = new SrsRtpPacket2(); + SrsRtpPacket2* packet = packets.fetch(); packet->rtp_header.set_marker(false); packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); @@ -1298,8 +1327,6 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes stap->nalus.push_back(sample); } - *ppacket = packet; - return err; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index c2edcd625..7dbf8eefb 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -140,13 +140,18 @@ public: int nn_videos; // The number of padded packet. int nn_paddings; -public: +private: + int cursor; std::vector packets; public: SrsRtcPackets(); virtual ~SrsRtcPackets(); public: void reset(bool gso, bool merge_nalus); + SrsRtpPacket2* fetch(); + SrsRtpPacket2* back(); + int size(); + SrsRtpPacket2* at(int index); }; class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler @@ -195,12 +200,12 @@ private: srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); srs_error_t send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); private: - srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); + srs_error_t packet_opus(SrsSample* sample, SrsRtcPackets& packets); private: srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets); srs_error_t packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets); - srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket); - srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket); + srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets); + srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets); }; class SrsRtcSession diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index 83dcf6071..ef33c057a 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -171,6 +171,13 @@ void SrsRtpPacket2::set_padding(int size) padding = size; } +void SrsRtpPacket2::reset() +{ + memset((void*)&rtp_header, 0, sizeof(SrsRtpHeader)); + padding = 0; + srs_freep(payload); +} + int SrsRtpPacket2::nb_bytes() { return rtp_header.header_size() + (payload? payload->nb_bytes():0) + padding; diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index 7733b431f..1b4fd2de6 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -88,7 +88,9 @@ public: virtual ~SrsRtpPacket2(); public: // Append size of bytes as padding. - virtual void set_padding(int size); + void set_padding(int size); + // Reset RTP packet. + void reset(); // interface ISrsEncoder public: virtual int nb_bytes();