diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 64b920f5b..caadce385 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -437,8 +437,8 @@ rtc_server { reuseport 4; # Whether merge multiple NALUs into one. # @see https://github.com/ossrs/srs/issues/307#issuecomment-612806318 - # default: off - merge_nalus off; + # default: on + merge_nalus on; # Whether enable GSO to send out RTP packets. # default: off gso off; diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index bc6c5c8bb..f60a1782e 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4774,7 +4774,7 @@ int SrsConfig::get_rtc_server_reuseport() bool SrsConfig::get_rtc_server_merge_nalus() { - static int DEFAULT = false; + static int DEFAULT = true; SrsConfDirective* conf = root->get("rtc_server"); if (!conf) { @@ -4786,7 +4786,7 @@ bool SrsConfig::get_rtc_server_merge_nalus() return DEFAULT; } - return SRS_CONF_PERFER_FALSE(conf->arg0()); + return SRS_CONF_PERFER_TRUE(conf->arg0()); } bool SrsConfig::get_rtc_server_gso() diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 2f4ef86c8..dfc3faf8e 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -727,7 +727,7 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( } // By default, we package each NALU(sample) to a RTP or FUA packet. - for (int i = 0; i < msg->nn_samples(); i++) { + for (int i = 0; i < nn_samples; i++) { SrsSample* sample = msg->samples() + i; // We always ignore bframe here, if config to discard bframe, @@ -810,6 +810,81 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets) { srs_error_t err = srs_success; + + SrsRtpRawNALUs* raw = new SrsRtpRawNALUs(); + + for (int i = 0; i < msg->nn_samples(); i++) { + SrsSample* sample = msg->samples() + i; + + // We always ignore bframe here, if config to discard bframe, + // the bframe flag will not be set. + if (sample->bframe) { + continue; + } + + raw->push_back(sample->copy()); + } + + // Ignore empty. + int nn_bytes = raw->nb_bytes(); + if (nn_bytes <= 0) { + srs_freep(raw); + return err; + } + + const int kRtpMaxPayloadSize = 1200; + if (nn_bytes < kRtpMaxPayloadSize) { + // Package NALUs in a single RTP packet. + SrsRtpPacket2* packet = new SrsRtpPacket2(); + packet->rtp_header.set_timestamp(msg->timestamp * 90); + packet->rtp_header.set_sequence(video_sequence++); + packet->rtp_header.set_ssrc(video_ssrc); + packet->rtp_header.set_payload_type(video_payload_type); + packet->payload = raw; + packets.packets.push_back(packet); + } else { + SrsAutoFree(SrsRtpRawNALUs, raw); + + // Package NALUs in FU-A RTP packets. + int fu_payload_size = kRtpMaxPayloadSize; + + // The first byte is store in FU-A header. + uint8_t header = raw->skip_first_byte(); + uint8_t nal_type = header & kNalTypeMask; + int nb_left = nn_bytes - 1; + + int num_of_packet = 1 + (nn_bytes - 1) / fu_payload_size; + for (int i = 0; i < num_of_packet; ++i) { + int packet_size = srs_min(nb_left, fu_payload_size); + + SrsRtpPacket2* packet = new SrsRtpPacket2(); + packets.packets.push_back(packet); + + packet->rtp_header.set_timestamp(msg->timestamp * 90); + packet->rtp_header.set_sequence(video_sequence++); + packet->rtp_header.set_ssrc(video_ssrc); + packet->rtp_header.set_payload_type(video_payload_type); + + SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); + packet->payload = fua; + + fua->nri = (SrsAvcNaluType)header; + fua->nalu_type = (SrsAvcNaluType)nal_type; + fua->start = bool(i == 0); + fua->end = bool(i == num_of_packet - 1); + + if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) { + return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes); + } + + nb_left -= packet_size; + } + } + + if (!packets.packets.empty()) { + packets.packets.back()->rtp_header.set_marker(true); + } + return err; } @@ -866,10 +941,10 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* fua->start = bool(i == 0); fua->end = bool(i == num_of_packet - 1); - SrsSample* sample = new SrsSample(); - sample->bytes = p; - sample->size = packet_size; - fua->nalus.push_back(sample); + SrsSample* fragment_sample = new SrsSample(); + fragment_sample->bytes = p; + fragment_sample->size = packet_size; + fua->nalus.push_back(fragment_sample); p += packet_size; nb_left -= packet_size; @@ -889,11 +964,13 @@ srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, Srs packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_payload_type(video_payload_type); - SrsRtpRawPayload* raw = new SrsRtpRawPayload(); + SrsRtpRawNALUs* raw = new SrsRtpRawNALUs(); packet->payload = raw; - raw->payload = sample->bytes; - raw->nn_payload = sample->size; + SrsSample* p = new SrsSample(); + p->bytes = sample->bytes; + p->size = sample->size; + raw->push_back(p); *ppacket = packet; diff --git a/trunk/src/kernel/srs_kernel_codec.cpp b/trunk/src/kernel/srs_kernel_codec.cpp index a351dcc52..e0789c236 100644 --- a/trunk/src/kernel/srs_kernel_codec.cpp +++ b/trunk/src/kernel/srs_kernel_codec.cpp @@ -409,6 +409,14 @@ srs_error_t SrsSample::parse_bframe() return err; } +SrsSample* SrsSample::copy() +{ + SrsSample* p = new SrsSample(); + p->bytes = bytes; + p->size = size; + return p; +} + SrsCodecConfig::SrsCodecConfig() { } diff --git a/trunk/src/kernel/srs_kernel_codec.hpp b/trunk/src/kernel/srs_kernel_codec.hpp index 49447b5de..9c74e1387 100644 --- a/trunk/src/kernel/srs_kernel_codec.hpp +++ b/trunk/src/kernel/srs_kernel_codec.hpp @@ -542,6 +542,8 @@ public: public: // If we need to know whether sample is bframe, we have to parse the NALU payload. srs_error_t parse_bframe(); + // Copy sample, share the bytes pointer. + SrsSample* copy(); }; /** diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index d00140005..ae044d4a2 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -229,6 +229,112 @@ srs_error_t SrsRtpRawPayload::encode(SrsBuffer* buf) return srs_success; } +SrsRtpRawNALUs::SrsRtpRawNALUs() +{ + cursor = 0; + nn_bytes = 0; +} + +SrsRtpRawNALUs::~SrsRtpRawNALUs() +{ + vector::iterator it; + for (it = nalus.begin(); it != nalus.end(); ++it) { + SrsSample* p = *it; + srs_freep(p); + } + nalus.clear(); +} + +void SrsRtpRawNALUs::push_back(SrsSample* sample) +{ + if (sample->size <= 0) { + return; + } + + if (!nalus.empty()) { + SrsSample* p = new SrsSample(); + p->bytes = (char*)"\0\0\1"; + p->size = 3; + nn_bytes += 3; + nalus.push_back(p); + } + + nn_bytes += sample->size; + nalus.push_back(sample); +} + +uint8_t SrsRtpRawNALUs::skip_first_byte() +{ + srs_assert (cursor >= 0 && nn_bytes > 0 && cursor < nn_bytes); + cursor++; + return uint8_t(nalus[0]->bytes[0]); +} + +srs_error_t SrsRtpRawNALUs::read_samples(vector& samples, int size) +{ + if (cursor + size < 0 || cursor + size > nn_bytes) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cursor=%d, max=%d, size=%d", cursor, nn_bytes, size); + } + + int pos = cursor; + cursor += size; + int left = size; + + vector::iterator it; + for (it = nalus.begin(); it != nalus.end() && left > 0; ++it) { + SrsSample* p = *it; + + // Ignore previous consumed samples. + if (pos && pos - p->size >= 0) { + pos -= p->size; + continue; + } + + // Now, we are working at the sample. + int nn = srs_min(left, p->size - pos); + srs_assert(nn > 0); + + SrsSample* sample = new SrsSample(); + sample->bytes = p->bytes + pos; + sample->size = nn; + samples.push_back(sample); + + left -= nn; + pos = 0; + } + + return srs_success; +} + +int SrsRtpRawNALUs::nb_bytes() +{ + int size = 0; + + vector::iterator it; + for (it = nalus.begin(); it != nalus.end(); ++it) { + SrsSample* p = *it; + size += p->size; + } + + return size; +} + +srs_error_t SrsRtpRawNALUs::encode(SrsBuffer* buf) +{ + vector::iterator it; + for (it = nalus.begin(); it != nalus.end(); ++it) { + SrsSample* p = *it; + + if (!buf->require(p->size)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", p->size); + } + + buf->write_bytes(p->bytes, p->size); + } + + return srs_success; +} + SrsRtpSTAPPayload::SrsRtpSTAPPayload() { nri = (SrsAvcNaluType)0; @@ -339,7 +445,7 @@ srs_error_t SrsRtpFUAPayload::encode(SrsBuffer* buf) for (it = nalus.begin(); it != nalus.end(); ++it) { SrsSample* p = *it; if (!buf->require(p->size)) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2 + p->size); + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", p->size); } buf->write_bytes(p->bytes, p->size); diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index 975b2b463..7733b431f 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -95,6 +95,7 @@ public: virtual srs_error_t encode(SrsBuffer* buf); }; +// Single payload data. class SrsRtpRawPayload : public ISrsEncoder { public: @@ -110,6 +111,28 @@ public: virtual srs_error_t encode(SrsBuffer* buf); }; +// Multiple NALUs, automatically insert 001 between NALUs. +class SrsRtpRawNALUs : public ISrsEncoder +{ +private: + std::vector nalus; + int nn_bytes; + int cursor; +public: + SrsRtpRawNALUs(); + virtual ~SrsRtpRawNALUs(); +public: + void push_back(SrsSample* sample); +public: + uint8_t skip_first_byte(); + srs_error_t read_samples(std::vector& samples, int size); +// interface ISrsEncoder +public: + virtual int nb_bytes(); + virtual srs_error_t encode(SrsBuffer* buf); +}; + +// STAP-A, for multiple NALUs. class SrsRtpSTAPPayload : public ISrsEncoder { public: @@ -127,6 +150,7 @@ public: virtual srs_error_t encode(SrsBuffer* buf); }; +// FU-A, for one NALU with multiple fragments. class SrsRtpFUAPayload : public ISrsEncoder { public: