From c93cd86ce4684a53b70c141e12c204d219c8c631 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 18 Apr 2020 20:37:08 +0800 Subject: [PATCH] For #307, refine performance --- trunk/conf/full.conf | 2 + trunk/src/app/srs_app_config.cpp | 8 +- trunk/src/app/srs_app_rtc_conn.cpp | 139 +++++++++------- trunk/src/app/srs_app_rtc_conn.hpp | 5 +- trunk/src/app/srs_app_source.cpp | 53 ++++-- trunk/src/app/srs_app_source.hpp | 12 +- trunk/src/core/srs_core_performance.hpp | 3 + trunk/src/kernel/srs_kernel_buffer.cpp | 5 - trunk/src/kernel/srs_kernel_buffer.hpp | 3 +- trunk/src/kernel/srs_kernel_rtp.cpp | 210 ++++++++++++++---------- trunk/src/kernel/srs_kernel_rtp.hpp | 58 +++++-- 11 files changed, 323 insertions(+), 175 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 39f02bd7f..2c1089665 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -488,6 +488,7 @@ vhost rtc.vhost.srs.com { min_latency on; play { # set the MW(merged-write) latency in ms. + # @remark For WebRTC, we enable pass-timestamp mode, so we ignore this config. # default: 0 (For WebRTC) mw_latency 0; # Set the MW(merged-write) min messages. @@ -720,6 +721,7 @@ vhost play.srs.com { # SRS always set mw on, so we just set the latency value. # the latency of stream >= mw_latency + mr_latency # the value recomment is [300, 1800] + # @remark For WebRTC, we enable pass-timestamp mode, so we ignore this config. # default: 350 (For RTMP/HTTP-FLV) # default: 0 (For WebRTC) mw_latency 350; diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index e9b89a346..2a8454633 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -5420,8 +5420,14 @@ srs_utime_t SrsConfig::get_mw_sleep(string vhost, bool is_rtc) if (!conf || conf->arg0().empty()) { return DEFAULT; } + + int v = ::atoi(conf->arg0().c_str()); + if (is_rtc && v > 0) { + srs_warn("For RTC, we ignore mw_latency"); + return 0; + } - return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS); + return (srs_utime_t)(v * SRS_UTIME_MILLISECONDS); } int SrsConfig::get_mw_msgs(string vhost, bool is_realtime, bool is_rtc) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 9414741ab..230ea2e49 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -460,7 +460,7 @@ srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, in return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed"); } -SrsRtcPackets::SrsRtcPackets() +SrsRtcPackets::SrsRtcPackets(int nn_cache_max) { #if defined(SRS_DEBUG) debug_id = 0; @@ -477,22 +477,20 @@ SrsRtcPackets::SrsRtcPackets() nn_dropped = 0; cursor = 0; + nn_cache = nn_cache_max; + cache = new SrsRtpPacket2[nn_cache]; } SrsRtcPackets::~SrsRtcPackets() { - vector::iterator it; - for (it = packets.begin(); it != packets.end(); ++it) { - SrsRtpPacket2* p = *it; - srs_freep(p); - } - packets.clear(); + srs_freepa(cache); + nn_cache = 0; } void SrsRtcPackets::reset(bool gso, bool merge_nalus) { for (int i = 0; i < cursor; i++) { - SrsRtpPacket2* packet = packets[i]; + SrsRtpPacket2* packet = cache + i; packet->reset(); } @@ -515,17 +513,16 @@ void SrsRtcPackets::reset(bool gso, bool merge_nalus) SrsRtpPacket2* SrsRtcPackets::fetch() { - if (cursor >= (int)packets.size()) { - packets.push_back(new SrsRtpPacket2()); + if (cursor >= nn_cache) { + return NULL; } - - return packets[cursor++]; + return cache + (cursor++); } SrsRtpPacket2* SrsRtcPackets::back() { srs_assert(cursor > 0); - return packets[cursor - 1]; + return cache + cursor - 1; } int SrsRtcPackets::size() @@ -535,13 +532,13 @@ int SrsRtcPackets::size() int SrsRtcPackets::capacity() { - return (int)packets.size(); + return nn_cache; } SrsRtpPacket2* SrsRtcPackets::at(int index) { srs_assert(index < cursor); - return packets[index]; + return cache + index; } SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid) @@ -688,6 +685,13 @@ srs_error_t SrsRtcSenderThread::cycle() return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str()); } + // For RTC, we enable pass-timestamp mode, ignore the timestamp in queue, never depends on the duration, + // because RTC allows the audio and video has its own timebase, that is the audio timestamp and video timestamp + // maybe not monotonically increase. + // In this mode, we use mw_msgs to set the delay. We never shrink the consumer queue, instead, we dumps the + // messages and drop them if the shared sender queue is full. + consumer->enable_pass_timestamp(); + realtime = _srs_config->get_realtime_enabled(req->vhost, true); mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); @@ -702,8 +706,8 @@ srs_error_t SrsRtcSenderThread::cycle() srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", req->get_stream_url().c_str(), ::getpid(), source->source_id(), rtc_session->encrypt, realtime, srsu2msi(mw_sleep), mw_msgs); - SrsRtcPackets pkts; SrsMessageArray msgs(SRS_PERF_MW_MSGS); + SrsRtcPackets pkts(SRS_PERF_RTC_RTP_PACKETS); SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_play(); SrsAutoFree(SrsPithyPrint, pprint); @@ -748,27 +752,29 @@ srs_error_t SrsRtcSenderThread::cycle() } // Stat for performance analysis. - if (stat_enabled) { - // Stat the original RAW AV frame, maybe h264+aac. - stat->perf_on_msgs(msg_count); - // Stat the RTC packets, RAW AV frame, maybe h.264+opus. - int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos; - stat->perf_on_rtc_packets(nn_rtc_packets); - // Stat the RAW RTP packets, which maybe group by GSO. - stat->perf_on_rtp_packets(pkts.size()); - // Stat the RTP packets going into kernel. - stat->perf_on_gso_packets(pkts.nn_rtp_pkts); - // Stat the bytes and paddings. - stat->perf_on_rtc_bytes(pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes); - // Stat the messages and dropped count. - stat->perf_on_dropped(msg_count, nn_rtc_packets, pkts.nn_dropped); + if (!stat_enabled) { + continue; + } + + // Stat the original RAW AV frame, maybe h264+aac. + stat->perf_on_msgs(msg_count); + // Stat the RTC packets, RAW AV frame, maybe h.264+opus. + int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos; + stat->perf_on_rtc_packets(nn_rtc_packets); + // Stat the RAW RTP packets, which maybe group by GSO. + stat->perf_on_rtp_packets(pkts.size()); + // Stat the RTP packets going into kernel. + stat->perf_on_gso_packets(pkts.nn_rtp_pkts); + // Stat the bytes and paddings. + stat->perf_on_rtc_bytes(pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes); + // Stat the messages and dropped count. + stat->perf_on_dropped(msg_count, nn_rtc_packets, pkts.nn_dropped); #if defined(SRS_DEBUG) - srs_trace("RTC PLAY perf, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes", - msg_count, nn_rtc_packets, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, - pkts.nn_samples, pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes); + srs_trace("RTC PLAY perf, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes", + msg_count, nn_rtc_packets, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, + pkts.nn_samples, pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes); #endif - } pprint->elapse(); if (pprint->can_print()) { @@ -1193,6 +1199,10 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac if (nn_bytes < kRtpMaxPayloadSize) { // Package NALUs in a single RTP packet. SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + srs_freep(raw); + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); @@ -1216,13 +1226,19 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac int packet_size = srs_min(nb_left, fu_payload_size); SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + srs_freep(raw); + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } + packet->extra_payload = raw; packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_payload_type(video_payload_type); - SrsRtpFUAPayload* fua = packet->reuse_fua(); + SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); + packet->payload = fua; fua->nri = (SrsAvcNaluType)header; fua->nalu_type = (SrsAvcNaluType)nal_type; @@ -1249,6 +1265,9 @@ srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtcPackets& pa srs_error_t err = srs_success; SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } packet->rtp_header.set_marker(true); packet->rtp_header.set_timestamp(audio_timestamp); packet->rtp_header.set_sequence(audio_sequence++); @@ -1291,23 +1310,24 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* int packet_size = srs_min(nb_left, fu_payload_size); SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_payload_type(video_payload_type); - SrsRtpFUAPayload* fua = packet->reuse_fua(); + SrsRtpFUAPayload2* fua = packet->reuse_fua(); fua->nri = (SrsAvcNaluType)header; fua->nalu_type = (SrsAvcNaluType)nal_type; fua->start = bool(i == 0); fua->end = bool(i == num_of_packet - 1); - SrsSample* fragment_sample = new SrsSample(); - fragment_sample->bytes = p; - fragment_sample->size = packet_size; - fua->nalus.push_back(fragment_sample); + fua->payload = p; + fua->size = packet_size; p += packet_size; nb_left -= packet_size; @@ -1322,6 +1342,9 @@ srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, Srs srs_error_t err = srs_success; SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_ssrc(video_ssrc); @@ -1355,6 +1378,9 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes } SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } packet->rtp_header.set_marker(false); packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); @@ -2017,23 +2043,22 @@ srs_error_t SrsUdpMuxSender::cycle() cache.swap(hotspot); cache_pos = 0; - // Collect informations for GSO. int gso_pos = 0; - if (pos > 0 && stat_enabled) { - // For shared GSO cache, stat the messages. - // @see https://linux.die.net/man/2/sendmmsg - // @see https://linux.die.net/man/2/sendmsg - for (int i = 0; i < pos; i++) { - mmsghdr* mhdr = &hotspot[i]; - - int real_iovs = mhdr->msg_hdr.msg_iovlen; - gso_pos++; nn_gso_msgs++; nn_gso_iovs += real_iovs; - gso_iovs += real_iovs; - } - } - - // Send out all messages. if (pos > 0) { + // Collect informations for GSO. + if (stat_enabled) { + // For shared GSO cache, stat the messages. + // @see https://linux.die.net/man/2/sendmmsg + // @see https://linux.die.net/man/2/sendmsg + for (int i = 0; i < pos; i++) { + mmsghdr* mhdr = &hotspot[i]; + + int real_iovs = mhdr->msg_hdr.msg_iovlen; + gso_pos++; nn_gso_msgs++; nn_gso_iovs += real_iovs; + gso_iovs += real_iovs; + } + } + // Send out all messages. // @see https://linux.die.net/man/2/sendmmsg // @see https://linux.die.net/man/2/sendmsg @@ -2053,6 +2078,10 @@ srs_error_t SrsUdpMuxSender::cycle() } } + if (!stat_enabled) { + continue; + } + // Increase total messages. nn_msgs += pos + gso_iovs; nn_msgs_max = srs_max(pos, nn_msgs_max); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 3ad6a1072..c3796c711 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -155,9 +155,10 @@ public: int nn_dropped; private: int cursor; - std::vector packets; + int nn_cache; + SrsRtpPacket2* cache; public: - SrsRtcPackets(); + SrsRtcPackets(int nn_cache_max); virtual ~SrsRtcPackets(); public: void reset(bool gso, bool merge_nalus); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index fcf3b071e..c0548708f 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -269,9 +269,17 @@ void SrsMessageQueue::set_queue_size(srs_utime_t queue_size) max_queue_size = queue_size; } -srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) +srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow, bool pass_timestamp) { srs_error_t err = srs_success; + + msgs.push_back(msg); + + // For RTC, we never care about the timestamp and duration, so we never shrink queue here, + // but we will drop messages in each consumer coroutine. + if (pass_timestamp) { + return err; + } if (msg->is_av()) { if (av_start_time == -1) { @@ -281,8 +289,6 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS); } - msgs.push_back(msg); - while (av_end_time - av_start_time > max_queue_size) { // notice the caller queue already overflow and shrinked. if (is_overflow) { @@ -295,7 +301,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow return err; } -srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) +srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp) { srs_error_t err = srs_success; @@ -308,13 +314,15 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p count = srs_min(max_count, nb_msgs); SrsSharedPtrMessage** omsgs = msgs.data(); - for (int i = 0; i < count; i++) { - pmsgs[i] = omsgs[i]; + memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*)); + + // For RTC, we enable pass_timestamp mode, which never care about the timestamp and duration, + // so we do not have to update the start time here. + if (!pass_timestamp) { + SrsSharedPtrMessage* last = omsgs[count - 1]; + av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS); } - SrsSharedPtrMessage* last = omsgs[count - 1]; - av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS); - if (count >= nb_msgs) { // the pmsgs is big enough and clear msgs at most time. msgs.clear(); @@ -433,6 +441,8 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c) mw_duration = 0; mw_waiting = false; #endif + + pass_timestamp = false; } SrsConsumer::~SrsConsumer() @@ -466,20 +476,35 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR srs_error_t err = srs_success; SrsSharedPtrMessage* msg = shared_msg->copy(); - - if (!atc) { + + // For RTC, we enable pass_timestamp mode, which never correct or depends on monotonic increasing of + // timestamp. And in RTC, the audio and video timebase can be different, so we ignore time_jitter here. + if (!pass_timestamp && !atc) { if ((err = jitter->correct(msg, ag)) != srs_success) { return srs_error_wrap(err, "consume message"); } } - - if ((err = queue->enqueue(msg, NULL)) != srs_success) { + + // Put message in queue, here we may enable pass_timestamp mode. + if ((err = queue->enqueue(msg, NULL, pass_timestamp)) != srs_success) { return srs_error_wrap(err, "enqueue message"); } #ifdef SRS_PERF_QUEUE_COND_WAIT // fire the mw when msgs is enough. if (mw_waiting) { + // For RTC, we use pass_timestamp mode, we don't care about the timestamp in queue, + // so we only check the messages in queue. + if (pass_timestamp) { + if (queue->size() > mw_min_msgs) { + srs_cond_signal(mw_wait); + mw_waiting = false; + return err; + } + return err; + } + + // For RTMP, we wait for messages and duration. srs_utime_t duration = queue->duration(); bool match_min_msgs = queue->size() > mw_min_msgs; @@ -529,7 +554,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) } // pump msgs from queue. - if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) { + if ((err = queue->dump_packets(max, msgs->msgs, count, pass_timestamp)) != srs_success) { return srs_error_wrap(err, "dump packets"); } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 6e64e9b21..fdfe9c789 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -151,12 +151,13 @@ public: // Enqueue the message, the timestamp always monotonically. // @param msg, the msg to enqueue, user never free it whatever the return code. // @param is_overflow, whether overflow and shrinked. NULL to ignore. - virtual srs_error_t enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL); + // @remark If pass_timestamp, we never shrink and never care about the timestamp or duration. + virtual srs_error_t enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL, bool pass_timestamp = false); // Get packets in consumer queue. // @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it. // @count the count in array, output param. // @max_count the max count to dequeue, must be positive. - virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); + virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp = false); // Dumps packets to consumer, use specified args. // @remark the atc/tba/tbv/ag are same to SrsConsumer.enqueue(). virtual srs_error_t dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag); @@ -203,10 +204,17 @@ private: int mw_min_msgs; srs_utime_t mw_duration; #endif +private: + // For RTC, we never use jitter to correct timestamp. + // But we should not change the atc or time_jitter for source or RTMP. + // @remark In this mode, we also never check the queue by timstamp, but only by count. + bool pass_timestamp; public: SrsConsumer(SrsSource* s, SrsConnection* c); virtual ~SrsConsumer(); public: + // Use pass timestamp mode. + void enable_pass_timestamp() { pass_timestamp = true; } // Set the size of queue. virtual void set_queue_size(srs_utime_t queue_size); // when source id changed, notice client to print. diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 4d71ccdf8..39493646a 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -211,5 +211,8 @@ // For RTC, the max iovs in msghdr, the max packets sent in a msghdr. #define SRS_PERF_RTC_GSO_MAX 64 +// For RTC, the max count of RTP packets we process in one loop. +#define SRS_PERF_RTC_RTP_PACKETS 1024 + #endif diff --git a/trunk/src/kernel/srs_kernel_buffer.cpp b/trunk/src/kernel/srs_kernel_buffer.cpp index 73c6cafe1..be94d0bcb 100644 --- a/trunk/src/kernel/srs_kernel_buffer.cpp +++ b/trunk/src/kernel/srs_kernel_buffer.cpp @@ -67,11 +67,6 @@ SrsBuffer::~SrsBuffer() { } -char* SrsBuffer::data() -{ - return bytes; -} - int SrsBuffer::size() { return nb_bytes; diff --git a/trunk/src/kernel/srs_kernel_buffer.hpp b/trunk/src/kernel/srs_kernel_buffer.hpp index 870da9ef8..532375f8d 100644 --- a/trunk/src/kernel/srs_kernel_buffer.hpp +++ b/trunk/src/kernel/srs_kernel_buffer.hpp @@ -113,7 +113,8 @@ public: * get data of stream, set by initialize. * current bytes = data() + pos() */ - virtual char* data(); + inline char* data() { return bytes; } + inline char* head() { return p; } /** * the total stream size, set by initialize. * left bytes = size() - pos(). diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index b9659174e..fa98c5957 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -85,7 +85,7 @@ srs_error_t SrsRtpHeader::encode(SrsBuffer* stream) // Encode the RTP fix header, 12bytes. // @see https://tools.ietf.org/html/rfc1889#section-5.1 - char* op = stream->data() + stream->pos(); + char* op = stream->head(); char* p = op; // The version, padding, extension and cc, total 1 byte. @@ -148,60 +148,45 @@ size_t SrsRtpHeader::header_size() return kRtpHeaderFixedSize + cc * 4 + (extension ? (extension_length + 1) * 4 : 0); } -void SrsRtpHeader::set_marker(bool marker) -{ - this->marker = marker; -} - -void SrsRtpHeader::set_payload_type(uint8_t payload_type) -{ - this->payload_type = payload_type; -} - -void SrsRtpHeader::set_sequence(uint16_t sequence) -{ - this->sequence = sequence; -} - -void SrsRtpHeader::set_timestamp(int64_t timestamp) -{ - this->timestamp = (uint32_t)timestamp; -} - -void SrsRtpHeader::set_ssrc(uint32_t ssrc) -{ - this->ssrc = ssrc; -} - SrsRtpPacket2::SrsRtpPacket2() { payload = NULL; + extra_payload = NULL; padding = 0; cache_raw = new SrsRtpRawPayload(); - cache_fua = new SrsRtpFUAPayload(); + cache_fua = new SrsRtpFUAPayload2(); + cache_payload = 0; + using_cache = false; } SrsRtpPacket2::~SrsRtpPacket2() { // We may use the cache as payload. - if (payload == cache_raw || payload == cache_fua) { + if (using_cache) { payload = NULL; } srs_freep(payload); + srs_freep(extra_payload); srs_freep(cache_raw); } void SrsRtpPacket2::set_padding(int size) { rtp_header.set_padding(size > 0); + if (cache_payload) { + cache_payload += size - padding; + } padding = size; } void SrsRtpPacket2::add_padding(int size) { rtp_header.set_padding(padding + size > 0); + if (cache_payload) { + cache_payload += size; + } padding += size; } @@ -209,31 +194,39 @@ void SrsRtpPacket2::reset() { rtp_header.reset(); padding = 0; + cache_payload = 0; + srs_freep(extra_payload); // We may use the cache as payload. - if (payload == cache_raw || payload == cache_fua) { + if (using_cache) { payload = NULL; + } else { + srs_freep(payload); } - srs_freep(payload); + using_cache = false; } SrsRtpRawPayload* SrsRtpPacket2::reuse_raw() { + using_cache = true; payload = cache_raw; return cache_raw; } -SrsRtpFUAPayload* SrsRtpPacket2::reuse_fua() +SrsRtpFUAPayload2* SrsRtpPacket2::reuse_fua() { + using_cache = true; payload = cache_fua; - cache_fua->reset(); return cache_fua; } int SrsRtpPacket2::nb_bytes() { - return rtp_header.header_size() + (payload? payload->nb_bytes():0) + padding; + if (!cache_payload) { + cache_payload = rtp_header.header_size() + (payload? payload->nb_bytes():0) + padding; + } + return cache_payload; } srs_error_t SrsRtpPacket2::encode(SrsBuffer* buf) @@ -297,12 +290,20 @@ SrsRtpRawNALUs::SrsRtpRawNALUs() SrsRtpRawNALUs::~SrsRtpRawNALUs() { - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; - srs_freep(p); + if (true) { + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + srs_freep(p); + } + } + if (true) { + int nn_nalus = (int)extra_nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = extra_nalus[i]; + srs_freep(p); + } } - nalus.clear(); } void SrsRtpRawNALUs::push_back(SrsSample* sample) @@ -330,19 +331,19 @@ uint8_t SrsRtpRawNALUs::skip_first_byte() return uint8_t(nalus[0]->bytes[0]); } -srs_error_t SrsRtpRawNALUs::read_samples(vector& samples, int size) +srs_error_t SrsRtpRawNALUs::read_samples(vector& samples, int packet_size) { - if (cursor + size < 0 || cursor + size > nn_bytes) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "cursor=%d, max=%d, size=%d", cursor, nn_bytes, size); + if (cursor + packet_size < 0 || cursor + packet_size > nn_bytes) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cursor=%d, max=%d, size=%d", cursor, nn_bytes, packet_size); } int pos = cursor; - cursor += size; - int left = size; + cursor += packet_size; + int left = packet_size; - vector::iterator it; - for (it = nalus.begin(); it != nalus.end() && left > 0; ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; // Ignore previous consumed samples. if (pos && pos - p->size >= 0) { @@ -355,9 +356,11 @@ srs_error_t SrsRtpRawNALUs::read_samples(vector& samples, int size) srs_assert(nn > 0); SrsSample* sample = new SrsSample(); + extra_nalus.push_back(sample); + samples.push_back(sample); + sample->bytes = p->bytes + pos; sample->size = nn; - samples.push_back(sample); left -= nn; pos = 0; @@ -370,9 +373,9 @@ int SrsRtpRawNALUs::nb_bytes() { int size = 0; - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; size += p->size; } @@ -381,9 +384,9 @@ int SrsRtpRawNALUs::nb_bytes() srs_error_t SrsRtpRawNALUs::encode(SrsBuffer* buf) { - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; if (!buf->require(p->size)) { return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", p->size); @@ -402,21 +405,20 @@ SrsRtpSTAPPayload::SrsRtpSTAPPayload() SrsRtpSTAPPayload::~SrsRtpSTAPPayload() { - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; srs_freep(p); } - nalus.clear(); } int SrsRtpSTAPPayload::nb_bytes() { int size = 1; - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; size += 2 + p->size; } @@ -436,9 +438,10 @@ srs_error_t SrsRtpSTAPPayload::encode(SrsBuffer* buf) buf->write_1bytes(v); // NALUs. - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + if (!buf->require(2 + p->size)) { return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2 + p->size); } @@ -458,31 +461,20 @@ SrsRtpFUAPayload::SrsRtpFUAPayload() SrsRtpFUAPayload::~SrsRtpFUAPayload() { - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; srs_freep(p); } - nalus.clear(); -} - -void SrsRtpFUAPayload::reset() -{ - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; - srs_freep(p); - } - nalus.clear(); } int SrsRtpFUAPayload::nb_bytes() { int size = 2; - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; size += p->size; } @@ -511,9 +503,10 @@ srs_error_t SrsRtpFUAPayload::encode(SrsBuffer* buf) buf->write_1bytes(fu_header); // FU payload, @see https://tools.ietf.org/html/rfc6184#section-5.8 - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + if (!buf->require(p->size)) { return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", p->size); } @@ -524,6 +517,57 @@ srs_error_t SrsRtpFUAPayload::encode(SrsBuffer* buf) return srs_success; } +SrsRtpFUAPayload2::SrsRtpFUAPayload2() +{ + start = end = false; + nri = nalu_type = (SrsAvcNaluType)0; + + payload = NULL; + size = 0; +} + +SrsRtpFUAPayload2::~SrsRtpFUAPayload2() +{ +} + +int SrsRtpFUAPayload2::nb_bytes() +{ + return 2 + size; +} + +srs_error_t SrsRtpFUAPayload2::encode(SrsBuffer* buf) +{ + if (!buf->require(2 + size)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1); + } + + // Fast encoding. + char* p = buf->head(); + + // FU indicator, @see https://tools.ietf.org/html/rfc6184#section-5.8 + uint8_t fu_indicate = kFuA; + fu_indicate |= (nri & (~kNalTypeMask)); + *p++ = fu_indicate; + + // FU header, @see https://tools.ietf.org/html/rfc6184#section-5.8 + uint8_t fu_header = nalu_type; + if (start) { + fu_header |= kStart; + } + if (end) { + fu_header |= kEnd; + } + *p++ = fu_header; + + // FU payload, @see https://tools.ietf.org/html/rfc6184#section-5.8 + memcpy(p, payload, size); + + // Consume bytes. + buf->skip(2 + size); + + return srs_success; +} + SrsRtpSharedPacket::SrsRtpSharedPacketPayload::SrsRtpSharedPacketPayload() { payload = NULL; diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index 973de2205..53601d126 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -39,7 +39,7 @@ const uint8_t kNalTypeMask = 0x1F; class SrsBuffer; class SrsRtpRawPayload; -class SrsRtpFUAPayload; +class SrsRtpFUAPayload2; class SrsRtpHeader { @@ -65,17 +65,17 @@ public: public: size_t header_size(); public: - void set_marker(bool marker); + inline void set_marker(bool v) { marker = v; } bool get_marker() const { return marker; } - void set_payload_type(uint8_t payload_type); + inline void set_payload_type(uint8_t v) { payload_type = v; } uint8_t get_payload_type() const { return payload_type; } - void set_sequence(uint16_t sequence); + inline void set_sequence(uint16_t v) { sequence = v; } uint16_t get_sequence() const { return sequence; } - void set_timestamp(int64_t timestamp); + inline void set_timestamp(int64_t v) { timestamp = (uint32_t)v; } int64_t get_timestamp() const { return timestamp; } - void set_ssrc(uint32_t ssrc); + inline void set_ssrc(uint32_t v) { ssrc = v; } uint32_t get_ssrc() const { return ssrc; } - void set_padding(bool v) { padding = v; } + inline void set_padding(bool v) { padding = v; } }; class SrsRtpPacket2 @@ -84,9 +84,16 @@ public: SrsRtpHeader rtp_header; ISrsEncoder* payload; int padding; +public: + // User can set an extra payload, we will free it. + // For example, when reassemble NALUs by SrsRtpRawNALUs, we can set the extra payload to + // SrsRtpRawNALUs, then we can use SrsRtpFUAPayload which never free samples. + ISrsEncoder* extra_payload; private: SrsRtpRawPayload* cache_raw; - SrsRtpFUAPayload* cache_fua; + SrsRtpFUAPayload2* cache_fua; + bool using_cache; + int cache_payload; public: SrsRtpPacket2(); virtual ~SrsRtpPacket2(); @@ -100,7 +107,7 @@ public: // Reuse the cached raw message as payload. SrsRtpRawPayload* reuse_raw(); // Reuse the cached fua message as payload. - SrsRtpFUAPayload* reuse_fua(); + SrsRtpFUAPayload2* reuse_fua(); // interface ISrsEncoder public: virtual int nb_bytes(); @@ -111,7 +118,8 @@ public: class SrsRtpRawPayload : public ISrsEncoder { public: - // @remark We only refer to the memory, user must free it. + // The RAW payload, directly point to the shared memory. + // @remark We only refer to the memory, user must free its bytes. char* payload; int nn_payload; public: @@ -127,6 +135,9 @@ public: class SrsRtpRawNALUs : public ISrsEncoder { private: + // The returned samples. + std::vector extra_nalus; + // We will manage the samples, but the sample itself point to the shared memory. std::vector nalus; int nn_bytes; int cursor; @@ -137,7 +148,8 @@ public: void push_back(SrsSample* sample); public: uint8_t skip_first_byte(); - srs_error_t read_samples(std::vector& samples, int size); + // We will manage the returned samples, if user want to manage it, please copy it. + srs_error_t read_samples(std::vector& samples, int packet_size); // interface ISrsEncoder public: virtual int nb_bytes(); @@ -163,6 +175,7 @@ public: }; // FU-A, for one NALU with multiple fragments. +// With more than one payload. class SrsRtpFUAPayload : public ISrsEncoder { public: @@ -178,8 +191,29 @@ public: public: SrsRtpFUAPayload(); virtual ~SrsRtpFUAPayload(); +// interface ISrsEncoder public: - void reset(); + virtual int nb_bytes(); + virtual srs_error_t encode(SrsBuffer* buf); +}; + +// FU-A, for one NALU with multiple fragments. +// With only one payload. +class SrsRtpFUAPayload2 : public ISrsEncoder +{ +public: + // The NRI in NALU type. + SrsAvcNaluType nri; + // The FUA header. + bool start; + bool end; + SrsAvcNaluType nalu_type; + // The payload and size, + char* payload; + int size; +public: + SrsRtpFUAPayload2(); + virtual ~SrsRtpFUAPayload2(); // interface ISrsEncoder public: virtual int nb_bytes();