diff --git a/trunk/src/app/srs_app_rtc.hpp b/trunk/src/app/srs_app_rtc.hpp index c695718ec..9bd794b62 100644 --- a/trunk/src/app/srs_app_rtc.hpp +++ b/trunk/src/app/srs_app_rtc.hpp @@ -48,13 +48,6 @@ const int kRtpPacketSize = 1500; const uint8_t kOpusPayloadType = 111; const uint8_t kH264PayloadType = 102; -// @see: https://tools.ietf.org/html/rfc6184#section-5.2 -const uint8_t kFuA = 28; - -// @see: https://tools.ietf.org/html/rfc6184#section-5.8 -const uint8_t kStart = 0x80; // Fu-header start bit -const uint8_t kEnd = 0x40; // Fu-header end bit - const int kChannel = 2; const int kSamplerate = 48000; diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 0e3e02aec..14f105ca8 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -641,8 +641,6 @@ srs_error_t SrsRtcSenderThread::send_messages( *pnn_rtp_pkts += 1; } } else { - vector rtp_packets; - for (int i = 0; i < msg->nn_samples(); i++) { SrsSample* sample = msg->samples() + i; @@ -668,90 +666,51 @@ srs_error_t SrsRtcSenderThread::send_messages( *pnn_rtp_pkts += 1; } + vector packets; if (sample->size <= kRtpMaxPayloadSize) { - if ((err = packet_single_nalu(msg, sample, rtp_packets)) != srs_success) { + SrsRtpPacket2* packet = NULL; + if ((err = packet_single_nalu(msg, sample, &packet)) != srs_success) { return srs_error_wrap(err, "packet single nalu"); } + packets.push_back(packet); } else { - if ((err = packet_fu_a(msg, sample, rtp_packets)) != srs_success) { + if ((err = packet_fu_a(msg, sample, packets)) != srs_success) { return srs_error_wrap(err, "packet fu-a"); } } - } - if (!rtp_packets.empty()) { - // At the end of the frame, set marker bit. - // One frame may have multi nals. Set the marker bit in the last nal end, no the end of the nal. - if ((err = rtp_packets.back()->modify_rtp_header_marker(true)) != srs_success) { - return srs_error_wrap(err, "set marker"); + if (i == msg->nn_samples() - 1) { + packets.back()->rtp_header.set_marker(true); } - } - int nn_rtp_pkts = (int)rtp_packets.size(); - for (int j = 0; j < nn_rtp_pkts; j++) { - SrsRtpSharedPacket* pkt = rtp_packets[j]; - if ((err = send_message(msg, is_video, is_audio, pkt, skt)) != srs_success) { - return srs_error_wrap(err, "send message"); + for (int j = 0; j < (int)packets.size(); j++) { + SrsRtpPacket2* packet = packets[j]; + err = send_message2(msg, is_video, is_audio, packet, skt); + srs_freep(packet); + if (err != srs_success) { + return srs_error_wrap(err, "send message"); + } } - srs_freep(pkt); + *pnn_rtp_pkts += (int)packets.size(); } - - *pnn_rtp_pkts += nn_rtp_pkts; } } return err; } -srs_error_t SrsRtcSenderThread::send_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* skt) -{ - srs_error_t err = srs_success; - - if (is_video) { - pkt->modify_rtp_header_payload_type(video_payload_type); - pkt->modify_rtp_header_ssrc(video_ssrc); - srs_verbose("send video, ssrc=%u, seq=%u, timestamp=%u", video_ssrc, pkt->rtp_header.get_sequence(), pkt->rtp_header.get_timestamp()); - } else if (is_audio) { - pkt->modify_rtp_header_payload_type(audio_payload_type); - pkt->modify_rtp_header_ssrc(audio_ssrc); - } - - int length = pkt->size; - // Fetch a cached message from queue. - // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. - mmsghdr* mhdr = rtc_session->rtc_server->fetch(); - char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; - - if (rtc_session->encrypt) { - if ((err = rtc_session->dtls_session->protect_rtp(buf, pkt->payload, length)) != srs_success) { - return srs_error_wrap(err, "srtp protect"); - } - } else { - memcpy(buf, pkt->payload, length); - } - - sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); - socklen_t addrlen = (socklen_t)skt->peer_addrlen(); - - mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; - mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; - mhdr->msg_hdr.msg_iov->iov_len = length; - mhdr->msg_len = 0; - - rtc_session->rtc_server->sendmmsg(skt->stfd(), mhdr); - return err; -} - srs_error_t SrsRtcSenderThread::send_message2(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; - int length = kRtpPacketSize; // Fetch a cached message from queue. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. mmsghdr* mhdr = rtc_session->rtc_server->fetch(); char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; + // Length of iov, default size. + int length = kRtpPacketSize; + if (rtc_session->encrypt) { if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length, pkt)) != srs_success) { return srs_error_wrap(err, "srtp protect"); @@ -800,7 +759,7 @@ srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** p return err; } -srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsSample* sample, vector& rtp_packets) +srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, vector& packets) { srs_error_t err = srs_success; @@ -809,61 +768,60 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* shared_frame, S uint8_t header = sample->bytes[0]; uint8_t nal_type = header & kNalTypeMask; - int num_of_packet = (sample->size - 1 + kRtpMaxPayloadSize) / kRtpMaxPayloadSize; + int num_of_packet = 1 + (sample->size - 1) / kRtpMaxPayloadSize; for (int i = 0; i < num_of_packet; ++i) { - char buf[kRtpPacketSize]; - SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize); - SrsAutoFree(SrsBuffer, stream); + int packet_size = srs_min(nb_left, kRtpMaxPayloadSize); - int packet_size = min(nb_left, kRtpMaxPayloadSize); + SrsRtpPacket2* packet = new SrsRtpPacket2(); + packets.push_back(packet); - // fu-indicate - uint8_t fu_indicate = kFuA; - fu_indicate |= (header & (~kNalTypeMask)); - stream->write_1bytes(fu_indicate); + packet->rtp_header.set_timestamp(msg->timestamp * 90); + packet->rtp_header.set_sequence(video_sequence++); + packet->rtp_header.set_ssrc(video_ssrc); + packet->rtp_header.set_payload_type(video_payload_type); - uint8_t fu_header = nal_type; - if (i == 0) - fu_header |= kStart; - if (i == num_of_packet - 1) - fu_header |= kEnd; - stream->write_1bytes(fu_header); + SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); + packet->payload = fua; + + fua->nri = (SrsAvcNaluType)header; + fua->nalu_type = (SrsAvcNaluType)nal_type; + fua->start = bool(i == 0); + fua->end = bool(i == num_of_packet - 1); + + SrsSample* sample = new SrsSample(); + sample->bytes = p; + sample->size = packet_size; + fua->nalus.push_back(sample); - stream->write_bytes(p, packet_size); p += packet_size; nb_left -= packet_size; - - srs_verbose("rtp fu-a nalu, size=%u, seq=%u, timestamp=%lu", sample->size, video_sequence, (shared_frame->timestamp * 90)); - - SrsRtpSharedPacket* packet = new SrsRtpSharedPacket(); - if ((err = packet->create((shared_frame->timestamp * 90), video_sequence++, kVideoSSRC, kH264PayloadType, stream->data(), stream->pos())) != srs_success) { - return srs_error_wrap(err, "rtp packet encode"); - } - - rtp_packets.push_back(packet); } return err; } // Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 -srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsSample* sample, vector& rtp_packets) +srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket) { srs_error_t err = srs_success; - srs_verbose("rtp single nalu, size=%u, seq=%u, timestamp=%lu", sample->size, video_sequence, (shared_frame->timestamp * 90)); + SrsRtpPacket2* packet = new SrsRtpPacket2(); + packet->rtp_header.set_timestamp(msg->timestamp * 90); + packet->rtp_header.set_sequence(video_sequence++); + packet->rtp_header.set_ssrc(video_ssrc); + packet->rtp_header.set_payload_type(video_payload_type); - SrsRtpSharedPacket* packet = new SrsRtpSharedPacket(); - if ((err = packet->create((shared_frame->timestamp * 90), video_sequence++, kVideoSSRC, kH264PayloadType, sample->bytes, sample->size)) != srs_success) { - return srs_error_wrap(err, "rtp packet encode"); - } + SrsRtpRawPayload* raw = new SrsRtpRawPayload(); + raw->payload = sample->bytes; + raw->nn_payload = sample->size; + packet->payload = raw; - rtp_packets.push_back(packet); + *ppacket = packet; return err; } -srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMessage* shared_frame, SrsRtpPacket2** ppacket) +srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket) { srs_error_t err = srs_success; @@ -883,12 +841,9 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty"); } - srs_verbose("rtp stap-a nalu, size=%u, seq=%u, timestamp=%lu", - (sps.size() + pps.size()), video_sequence, (shared_frame->timestamp * 90)); - SrsRtpPacket2* packet = new SrsRtpPacket2(); packet->rtp_header.set_marker(false); - packet->rtp_header.set_timestamp(shared_frame->timestamp * 90); + packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_payload_type(video_payload_type); @@ -898,7 +853,7 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes uint8_t header = sps[0]; uint8_t nal_type = header & kNalTypeMask; - stap->nri = (SrsAvcNaluType)nal_type; + stap->nri = (SrsAvcNaluType)header; stap->nn_nalus = 2; stap->nalus = new SrsSample[stap->nn_nalus]; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index fb5095b81..16cba0d9f 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -153,14 +153,13 @@ public: void update_sendonly_socket(SrsUdpMuxSocket* skt); private: srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts); - srs_error_t send_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* skt); srs_error_t send_message2(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt); private: srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); private: - srs_error_t packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector& rtp_packets); - srs_error_t packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector& rtp_packets); - srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* shared_frame, SrsRtpPacket2** ppacket); + srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector& packets); + srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket); + srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket); }; class SrsRtcSession diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index 733b67315..7a719abcc 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -35,6 +35,13 @@ using namespace std; // @see: https://tools.ietf.org/html/rfc6184#section-5.2 const uint8_t kStapA = 24; +// @see: https://tools.ietf.org/html/rfc6184#section-5.2 +const uint8_t kFuA = 28; + +// @see: https://tools.ietf.org/html/rfc6184#section-5.8 +const uint8_t kStart = 0x80; // Fu-header start bit +const uint8_t kEnd = 0x40; // Fu-header end bit + SrsRtpHeader::SrsRtpHeader() { padding = false; @@ -252,6 +259,69 @@ srs_error_t SrsRtpSTAPPayload::encode(SrsBuffer* buf) return srs_success; } +SrsRtpFUAPayload::SrsRtpFUAPayload() +{ + start = end = false; + nri = nalu_type = (SrsAvcNaluType)0; +} + +SrsRtpFUAPayload::~SrsRtpFUAPayload() +{ + vector::iterator it; + for (it = nalus.begin(); it != nalus.end(); ++it) { + SrsSample* p = *it; + srs_freep(p); + } +} + +int SrsRtpFUAPayload::nb_bytes() +{ + int size = 2; + + vector::iterator it; + for (it = nalus.begin(); it != nalus.end(); ++it) { + SrsSample* p = *it; + size += p->size; + } + + return size; +} + +srs_error_t SrsRtpFUAPayload::encode(SrsBuffer* buf) +{ + if (!buf->require(2)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1); + } + + // FU indicator, @see https://tools.ietf.org/html/rfc6184#section-5.8 + uint8_t fu_indicate = kFuA; + fu_indicate |= (nri & (~kNalTypeMask)); + buf->write_1bytes(fu_indicate); + + // FU header, @see https://tools.ietf.org/html/rfc6184#section-5.8 + uint8_t fu_header = nalu_type; + if (start) { + fu_header |= kStart; + } + if (end) { + fu_header |= kEnd; + } + buf->write_1bytes(fu_header); + + // FU payload, @see https://tools.ietf.org/html/rfc6184#section-5.8 + vector::iterator it; + for (it = nalus.begin(); it != nalus.end(); ++it) { + SrsSample* p = *it; + if (!buf->require(p->size)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2 + p->size); + } + + buf->write_bytes(p->bytes, p->size); + } + + return srs_success; +} + SrsRtpSharedPacket::SrsRtpSharedPacketPayload::SrsRtpSharedPacketPayload() { payload = NULL; diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index c5379b9fb..be22567b8 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -121,6 +121,27 @@ public: virtual srs_error_t encode(SrsBuffer* buf); }; +class SrsRtpFUAPayload : public ISrsEncoder +{ +public: + // The NRI in NALU type. + SrsAvcNaluType nri; + // The FUA header. + bool start; + bool end; + SrsAvcNaluType nalu_type; + // The NALU samples. + // @remark We only refer to the memory, user must free its bytes. + std::vector nalus; +public: + SrsRtpFUAPayload(); + virtual ~SrsRtpFUAPayload(); +// interface ISrsEncoder +public: + virtual int nb_bytes(); + virtual srs_error_t encode(SrsBuffer* buf); +}; + class SrsRtpSharedPacket { private: