diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index c8e7c046a..5c58a2667 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1458,6 +1458,8 @@ srs_error_t SrsRtcSenderThread::package_stap_a(SrsSource* source, SrsSharedPtrMe stap->nalus.push_back(sample); } + srs_trace("RTC STAP-A seq=%u, sps %d, pps %d bytes", packet->rtp_header.get_sequence(), sps.size(), pps.size()); + return err; } @@ -1466,10 +1468,10 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); rtc_session = session; - video_queue_ = new SrsRtpQueue(1000); + video_queue_ = new SrsRtpVideoQueue(1000); video_nack_ = new SrsRtpNackForReceiver(video_queue_, 1000 * 2 / 3); - audio_queue_ = new SrsRtpQueue(100, true); - audio_nack_ = new SrsRtpNackForReceiver(video_queue_, 100 * 2 / 3); + audio_queue_ = new SrsRtpAudioQueue(100); + audio_nack_ = new SrsRtpNackForReceiver(audio_queue_, 100 * 2 / 3); source = NULL; } @@ -2055,10 +2057,10 @@ srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector& SrsAvcNaluType nalu_type = head->nalu_type; int64_t timestamp = head->rtp_header.get_timestamp(); + // For FU-A or STAP-A, there must be more than one packets. if (nalu_type == (SrsAvcNaluType)kFuA) { - // For FU-A, there must be more than one packets. if (packets.size() < 2) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A %d packets", packets.size()); + return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A/STAP-A %#x %d packets", nalu_type, packets.size()); } } else { // For others type, should be one packet for one frame. @@ -2069,50 +2071,51 @@ srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector& // For FU-A, group packets to one video frame. if (nalu_type == (SrsAvcNaluType)kFuA) { - int nn_payload = 0; + int nn_nalus = 0; for (size_t i = 0; i < packets.size(); ++i) { SrsRtpPacket2* pkt = packets[i]; SrsRtpFUAPayload2* payload = dynamic_cast(pkt->payload); if (!payload) { return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload"); } - nn_payload += payload->size; + nn_nalus += payload->size; } - if (!nn_payload) { + if (!nn_nalus) { return err; } // TODO: FIXME: Directly covert to sample for performance. - // 1 byte NALU header. // 5 bytes FLV tag header. - nn_payload += 1 + 5; + // 4 bytes NALU IBMF header, define by sequence header. + // 1 byte NALU header. + nn_nalus += 1; + int nn_payload = nn_nalus + 5 + 4; char* data = new char[nn_payload]; - SrsRtpFUAPayload2* head_payload = dynamic_cast(head->payload); + SrsBuffer buf(data, nn_payload); - char* p = data + 5; - *p++ = head_payload->nri | head_payload->nalu_type; + SrsRtpFUAPayload2* head_payload = dynamic_cast(head->payload); + if (head_payload->nalu_type == SrsAvcNaluTypeIDR) { + buf.write_1bytes(0x17); // Keyframe. + srs_trace("RTC got IDR %d bytes", nn_nalus); + } else { + buf.write_1bytes(0x27); // Not Keyframe. + } + buf.write_1bytes(0x01); // Not Sequence header. + buf.write_3bytes(0x00); // CTS. + buf.write_4bytes(nn_nalus); + + buf.write_1bytes(head_payload->nri | head_payload->nalu_type); // NALU header. for (size_t i = 0; i < packets.size(); ++i) { SrsRtpPacket2* pkt = packets[i]; SrsRtpFUAPayload2* payload = dynamic_cast(pkt->payload); - memcpy(p, payload->payload, payload->size); - p += payload->size; + buf.write_bytes(payload->payload, payload->size); } - if (head_payload->nalu_type == SrsAvcNaluTypeIDR) { - data[0] = 0x17; - } else { - data[0] = 0x27; - } - data[1] = 0x01; - data[2] = 0x00; - data[3] = 0x00; - data[4] = 0x00; - SrsMessageHeader header; header.message_type = RTMP_MSG_VideoMessage; // TODO: FIXME: Maybe the tbn is not 90k. - header.timestamp = timestamp / 90; + header.timestamp = (timestamp / 90) & 0x3fffffff; SrsCommonMessage* shared_video = new SrsCommonMessage(); SrsAutoFree(SrsCommonMessage, shared_video); // TODO: FIXME: Check error. @@ -2127,49 +2130,47 @@ srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector& if (!payload) { return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload"); } - if (payload->nalus.size() != 2) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload %d nalus", payload->nalus.size()); - } - SrsSample* sps = payload->nalus[0]; - SrsSample* pps = payload->nalus[1]; - if (!sps->size || !pps->size) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload %d sps, %d pps", sps->size, pps->size); + SrsSample* sps = payload->get_sps(); + SrsSample* pps = payload->get_pps(); + if (!sps || !sps->size) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload no sps"); + } + if (!pps || !pps->size) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload no pps"); } // TODO: FIXME: Directly covert to sample for performance. // 5 bytes flv tag header. - // 6 bytes sps/pps sequence header. - // 1 byte seperator between sps and pps. - int nn_payload = sps->size + pps->size + 5 + 6 + 1; + // 5 bytes sps/pps sequence header. + // 6 bytes size for sps/pps, each is 3 bytes. + int nn_payload = sps->size + pps->size + 5 + 5 + 6; char* data = new char[nn_payload]; SrsBuffer buf(data, nn_payload); - buf.write_1bytes(0x17); - buf.write_1bytes(0x00); - buf.write_1bytes(0x00); - buf.write_1bytes(0x00); - buf.write_1bytes(0x00); + + buf.write_1bytes(0x17); // Keyframe. + buf.write_1bytes(0x00); // Sequence header. + buf.write_3bytes(0x00); // CTS. // FIXME: Replace magic number for avc_demux_sps_pps. - buf.write_1bytes(0x01); - buf.write_1bytes(0x42); - buf.write_1bytes(0xC0); - buf.write_1bytes(0x1E); - buf.write_1bytes(0xFF); - buf.write_1bytes(0xE1); + buf.write_1bytes(0x01); // configurationVersion + buf.write_1bytes(0x42); // AVCProfileIndication, 0x42 = Baseline + buf.write_1bytes(0xC0); // profile_compatibility + buf.write_1bytes(0x1f); // AVCLevelIndication, 0x1f = Level3.1 + buf.write_1bytes(0x03); // lengthSizeMinusOne, size of length for NALU. - buf.write_2bytes(sps->size); - buf.write_string(sps->bytes); + buf.write_1bytes(0x01); // numOfSequenceParameterSets + buf.write_2bytes(sps->size); // sequenceParameterSetLength + buf.write_bytes(sps->bytes, sps->size); // sps - buf.write_1bytes(0x01); - - buf.write_2bytes(pps->size); - buf.write_string(pps->bytes); + buf.write_1bytes(0x01); // numOfPictureParameterSets + buf.write_2bytes(pps->size); // pictureParameterSetLength + buf.write_bytes(pps->bytes, pps->size); // pps SrsMessageHeader header; header.message_type = RTMP_MSG_VideoMessage; // TODO: FIXME: Maybe the tbn is not 90k. - header.timestamp = timestamp / 90; + header.timestamp = (timestamp / 90) & 0x3fffffff; SrsCommonMessage* shared_video = new SrsCommonMessage(); SrsAutoFree(SrsCommonMessage, shared_video); // TODO: FIXME: Check error. @@ -2188,27 +2189,28 @@ srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector& } // TODO: FIXME: Directly covert to sample for performance. - // 1 byte NALU header. // 5 bytes FLV tag header. - int nn_payload = payload->nn_payload + 1 + 5; + // 4 bytes NALU IBMF header, define by sequence header. + int nn_payload = payload->nn_payload + 5 + 4; char* data = new char[nn_payload]; + SrsBuffer buf(data, nn_payload); if (nalu_type == SrsAvcNaluTypeIDR) { - data[0] = 0x17; + buf.write_1bytes(0x17); // Keyframe. + srs_trace("RTC got IDR %d bytes", nn_payload); } else { - data[0] = 0x27; + buf.write_1bytes(0x27); // Not-Keyframe. } - data[1] = 0x01; - data[2] = 0x00; - data[3] = 0x00; - data[4] = 0x00; + buf.write_1bytes(0x01); // Not-SequenceHeader. + buf.write_3bytes(0x00); // CTS. - memcpy(data + 5, payload->payload, payload->nn_payload); + buf.write_4bytes(payload->nn_payload); // Size of NALU. + buf.write_bytes(payload->payload, payload->nn_payload); // NALU. SrsMessageHeader header; header.message_type = RTMP_MSG_VideoMessage; // TODO: FIXME: Maybe the tbn is not 90k. - header.timestamp = timestamp / 90; + header.timestamp = (timestamp / 90) & 0x3fffffff; SrsCommonMessage* shared_video = new SrsCommonMessage(); SrsAutoFree(SrsCommonMessage, shared_video); // TODO: FIXME: Check error. @@ -2578,6 +2580,7 @@ srs_error_t SrsRtcSession::start_publish() srs_freep(publisher); publisher = new SrsRtcPublisher(this); + publisher->request_keyframe(); uint32_t video_ssrc = 0; uint32_t audio_ssrc = 0; @@ -3382,6 +3385,7 @@ srs_error_t SrsRtcServer::create_rtc_session( } } + // TODO: FIXME: In answer, we should use the same SSRC as in offer. session->set_remote_sdp(remote_sdp); session->set_local_sdp(local_sdp); diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index 4b02eb6a9..5adef6e95 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -81,7 +81,6 @@ SrsRtpNackInfo* SrsRtpNackForReceiver::find(uint16_t seq) void SrsRtpNackForReceiver::check_queue_size() { if (queue_.size() >= max_queue_size_) { - srs_verbose("NACK list full, queue size=%u, max_queue_size=%u", queue_.size(), max_queue_size_); rtp_queue_->notify_nack_list_full(); } } @@ -102,9 +101,6 @@ void SrsRtpNackForReceiver::get_nack_seqs(vector& seqs) int alive_time = now - nack_info.generate_time_; if (alive_time > opts_.max_alive_time || nack_info.req_nack_count_ > opts_.max_count) { - srs_verbose("NACK, drop seq=%u alive time %d bigger than max_alive_time=%d OR nack count %d bigger than %d", - seq, alive_time, opts_.max_alive_time, nack_info.req_nack_count_, opts_.max_count); - rtp_queue_->notify_drop_seq(seq); queue_.erase(iter++); continue; @@ -119,7 +115,6 @@ void SrsRtpNackForReceiver::get_nack_seqs(vector& seqs) ++nack_info.req_nack_count_; nack_info.pre_req_nack_time_ = now; seqs.push_back(seq); - srs_verbose("NACK, resend seq=%u, count=%d", seq, nack_info.req_nack_count_); } ++iter; @@ -129,16 +124,15 @@ void SrsRtpNackForReceiver::get_nack_seqs(vector& seqs) void SrsRtpNackForReceiver::update_rtt(int rtt) { rtt_ = rtt * SRS_UTIME_MILLISECONDS; - srs_verbose("NACK, update rtt from %ld to %d", opts_.nack_interval, rtt_); // FIXME: limit min and max value. opts_.nack_interval = rtt_; } -SrsRtpRingBuffer::SrsRtpRingBuffer(size_t capacity) +SrsRtpRingBuffer::SrsRtpRingBuffer(int capacity) { nn_seq_flip_backs = 0; high_ = low_ = 0; - capacity_ = capacity; + capacity_ = (uint16_t)capacity; initialized_ = false; queue_ = new SrsRtpPacket2*[capacity_]; @@ -190,7 +184,7 @@ void SrsRtpRingBuffer::reset(uint16_t low, uint16_t high) bool SrsRtpRingBuffer::overflow() { - return high_ - low_ < capacity_; + return high_ - low_ >= capacity_; } bool SrsRtpRingBuffer::is_heavy() @@ -251,7 +245,6 @@ void SrsRtpRingBuffer::update(uint16_t seq, bool startup, uint16_t& nack_low, ui // When distance(seq,high_)>0 and seq0 and 1<65535. if (seq < high_) { - srs_verbose("warp around, flip_back=%" PRId64, nn_seq_flip_backs); ++nn_seq_flip_backs; } high_ = seq; @@ -266,11 +259,7 @@ void SrsRtpRingBuffer::update(uint16_t seq, bool startup, uint16_t& nack_low, ui if (startup) { nack_low = seq + 1; nack_high = low_; - - srs_info("head seq=%u, cur seq=%u, update head seq because recv less than it.", low_, seq); low_ = seq; - } else { - srs_verbose("seq=%u, rtx success, too old", seq); } } } @@ -280,7 +269,7 @@ SrsRtpPacket2* SrsRtpRingBuffer::at(uint16_t seq) return queue_[seq % capacity_]; } -SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) +SrsRtpQueue::SrsRtpQueue(const char* tag, int capacity) { nn_collected_frames = 0; queue_ = new SrsRtpRingBuffer(capacity); @@ -294,9 +283,8 @@ SrsRtpQueue::SrsRtpQueue(size_t capacity, bool one_packet_per_frame) num_of_packet_received_ = 0; number_of_packet_lossed_ = 0; - one_packet_per_frame_ = one_packet_per_frame; - request_key_frame_ = false; + tag_ = tag; } SrsRtpQueue::~SrsRtpQueue() @@ -316,8 +304,6 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt if (nack_info) { int nack_rtt = nack_info->req_nack_count_ ? ((now - nack_info->pre_req_nack_time_) / SRS_UTIME_MILLISECONDS) : 0; (void)nack_rtt; - srs_verbose("seq=%u, alive time=%d, nack count=%d, rtx success, resend use %dms", - seq, now - nack_info->generate_time_, nack_info->req_nack_count_, nack_rtt); nack->remove(seq); } @@ -336,7 +322,6 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt last_trans_time_ = trans_time; jitter_ = (jitter_ * 15.0 / 16.0) + (static_cast(cur_jitter) / 16.0); - srs_verbose("jitter=%.2f", jitter_); } // OK, we got one new RTP packet, which is not in NACK. @@ -345,14 +330,13 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt uint16_t nack_low = 0, nack_high = 0; queue_->update(seq, !nn_collected_frames, nack_low, nack_high); if (srs_rtp_seq_distance(nack_low, nack_high)) { - srs_trace("update nack seq=%u, startup=%d, nack range [%u, %u]", seq, !nn_collected_frames, nack_low, nack_high); + srs_trace("%s update nack seq=%u, startup=%d, range [%u, %u]", tag_, seq, !nn_collected_frames, nack_low, nack_high); insert_into_nack_list(nack, nack_low, nack_high); } } // When packets overflow, collect frame and move head to next frame start. if (queue_->overflow()) { - srs_verbose("try collect packet becuase seq out of range"); collect_packet(nack); uint16_t next = queue_->next_start_of_frame(); @@ -361,28 +345,20 @@ srs_error_t SrsRtpQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt if (next == queue_->low()) { next = queue_->high() - 1; } - srs_trace("seqs out of range, seq range [%u, %u]", queue_->low(), next); + srs_trace("%s seq out of range [%u, %u]", tag_, queue_->low(), next); for (uint16_t s = queue_->low(); s != next; ++s) { nack->remove(s); queue_->remove(s); } - srs_trace("force update, update head seq from %u to %u when seqs out of range", queue_->low(), next + 1); + srs_trace("%s force update seq %u to %u", tag_, queue_->low(), next + 1); queue_->advance_to(next + 1); } // Save packet at the position seq. queue_->set(seq, pkt); - // Collect packets to frame when: - // 1. Marker bit means the last packet of frame received. - // 2. Queue has lots of packets, the load is heavy. - // 3. The frame contains only one packet for each frame. - if (pkt->rtp_header.get_marker() || queue_->is_heavy() || one_packet_per_frame_) { - collect_packet(nack); - } - return err; } @@ -411,7 +387,7 @@ void SrsRtpQueue::notify_drop_seq(uint16_t seq) } // When NACK is timeout, move to the next start of frame. - srs_trace("nack drop seq=%u, drop range [%u, %u]", seq, queue_->low(), next + 1); + srs_trace("%s nack drop seq=%u, drop range [%u, %u]", tag_, seq, queue_->low(), next + 1); queue_->advance_to(next + 1); } @@ -425,7 +401,7 @@ void SrsRtpQueue::notify_nack_list_full() } // When NACK is overflow, move to the next keyframe. - srs_trace("nack overflow drop range [%u, %u]", queue_->low(), next + 1); + srs_trace("%s nack overflow drop range [%u, %u]", tag_, queue_->low(), next + 1); queue_->advance_to(next + 1); } @@ -466,7 +442,6 @@ uint32_t SrsRtpQueue::get_interarrival_jitter() void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end) { for (uint16_t s = seq_start; s != seq_end; ++s) { - srs_verbose("loss seq=%u, insert into nack list", s); nack->insert(s); ++number_of_packet_lossed_; } @@ -474,46 +449,147 @@ void SrsRtpQueue::insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t se nack->check_queue_size(); } -void SrsRtpQueue::collect_packet(SrsRtpNackForReceiver* nack) +SrsRtpAudioQueue::SrsRtpAudioQueue(int capacity) : SrsRtpQueue("audio", capacity) +{ +} + +SrsRtpAudioQueue::~SrsRtpAudioQueue() +{ +} + +srs_error_t SrsRtpAudioQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) { + return srs_error_wrap(err, "audio queue"); + } + + // For audio, always try to collect frame, because each packet is a frame. + collect_packet(nack); + + return err; +} + +void SrsRtpAudioQueue::collect_packet(SrsRtpNackForReceiver* nack) +{ + // When done, s point to the next available packet. + uint16_t next = queue_->low(); + + for (; next != queue_->high(); ++next) { + SrsRtpPacket2* pkt = queue_->at(next); + + // Not found or in NACK, stop collecting frame. + if (!pkt || nack->find(next) != NULL) { + srs_trace("%s wait for nack seq=%u", tag_, next); + break; + } + + // OK, collect packet to frame. + vector frame; + frame.push_back(pkt); + + // Done, we got the last packet of frame. + nn_collected_frames++; + frames_.push_back(frame); + } + + if (queue_->low() != next) { + // Reset the range of packets to NULL in buffer. + queue_->reset(queue_->low(), next); + + srs_verbose("%s collect on frame, update head seq=%u t %u", tag_, queue_->low(), next); + queue_->advance_to(next); + } +} + +SrsRtpVideoQueue::SrsRtpVideoQueue(int capacity) : SrsRtpQueue("video", capacity) +{ +} + +SrsRtpVideoQueue::~SrsRtpVideoQueue() +{ +} + +srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) { + return srs_error_wrap(err, "video queue"); + } + + // Collect packets to frame when: + // 1. Marker bit means the last packet of frame received. + // 2. Queue has lots of packets, the load is heavy. + // TODO: FIMXE: For real-time, we should collect each frame ASAP. + if (pkt->rtp_header.get_marker() || queue_->is_heavy()) { + collect_packet(nack); + } + + return err; +} + +void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack) { while (queue_->low() != queue_->high()) { vector frame; - uint16_t s = queue_->low(); - for (; s != queue_->high(); ++s) { - SrsRtpPacket2* pkt = queue_->at(s); + do_collect_packet(nack, frame); - // In NACK, never collect frame. - if (nack->find(s) != NULL) { - srs_verbose("seq=%u, found in nack list when collect frame", s); - return; - } + if (frame.empty()) { + return; + } - // Ignore when the first packet not the start. - if (s == queue_->low() && pkt->nn_original_payload && !pkt->is_first_packet_of_frame) { - return; - } + nn_collected_frames++; + frames_.push_back(frame); + } +} - // OK, collect packet to frame. - frame.push_back(pkt); +// TODO: FIXME: Should refer to the FU-A original video frame, to avoid finding for each packet. +void SrsRtpVideoQueue::do_collect_packet(SrsRtpNackForReceiver* nack, vector& frame) +{ + // When done, s point to the next available packet. + uint16_t next = queue_->low(); - // Not the last packet, continue to process next one. - if (!pkt->rtp_header.get_marker() && !one_packet_per_frame_) { - continue; - } + bool found = false; + for (; next != queue_->high(); ++next) { + SrsRtpPacket2* pkt = queue_->at(next); - // Done, we got the last packet of frame. - nn_collected_frames++; - frames_.push_back(frame); + // Not found or in NACK, stop collecting frame. + if (!pkt || nack->find(next) != NULL) { + srs_trace("%s wait for nack seq=%u", tag_, next); break; } - if (queue_->low() != s) { - // Reset the range of packets to NULL in buffer. - queue_->reset(queue_->low(), s); + // Ignore when the first packet not the start. + if (next == queue_->low() && !pkt->is_first_packet_of_frame) { + break; + } - srs_verbose("head seq=%u, update to %u because collect one full farme", queue_->low(), s + 1); - queue_->advance_to(s + 1); + // OK, collect packet to frame. + frame.push_back(pkt); + + // Done, we got the last packet of frame. + // @remark Note that the STAP-A is marker false and it's the last packet. + if (pkt->rtp_header.get_marker() || pkt->is_last_packet_of_frame) { + found = true; + next++; + break; } } + + if (!found) { + frame.clear(); + } + + uint16_t cur = next - 1; + if (found && cur != queue_->high()) { + // Reset the range of packets to NULL in buffer. + queue_->reset(queue_->low(), next); + + srs_verbose("%s collect on frame, update head seq=%u t %u", tag_, queue_->low(), next); + queue_->advance_to(next); + } } + diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 833107d32..cb14a3d19 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -129,7 +129,7 @@ private: uint16_t low_; uint16_t high_; public: - SrsRtpRingBuffer(size_t capacity); + SrsRtpRingBuffer(int capacity); virtual ~SrsRtpRingBuffer(); public: // Move the position of buffer. @@ -161,10 +161,6 @@ public: class SrsRtpQueue { -private: - uint64_t nn_collected_frames; - SrsRtpRingBuffer* queue_; - bool one_packet_per_frame_; private: double jitter_; // TODO: FIXME: Covert time to srs_utime_t. @@ -173,14 +169,18 @@ private: uint64_t pre_number_of_packet_lossed_; uint64_t num_of_packet_received_; uint64_t number_of_packet_lossed_; -private: +protected: + SrsRtpRingBuffer* queue_; + uint64_t nn_collected_frames; std::vector > frames_; + const char* tag_; +private: bool request_key_frame_; public: - SrsRtpQueue(size_t capacity = 1024, bool one_packet_per_frame = false); + SrsRtpQueue(const char* tag, int capacity); virtual ~SrsRtpQueue(); public: - srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); + virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); // TODO: FIXME: Should merge FU-A to RAW, then we can return RAW payloads. void collect_frames(std::vector >& frames); bool should_request_key_frame(); @@ -194,7 +194,32 @@ public: uint32_t get_interarrival_jitter(); private: void insert_into_nack_list(SrsRtpNackForReceiver* nack, uint16_t seq_start, uint16_t seq_end); - void collect_packet(SrsRtpNackForReceiver* nack); +protected: + virtual void collect_packet(SrsRtpNackForReceiver* nack) = 0; +}; + +class SrsRtpAudioQueue : public SrsRtpQueue +{ +public: + SrsRtpAudioQueue(int capacity); + virtual ~SrsRtpAudioQueue(); +public: + virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); +protected: + virtual void collect_packet(SrsRtpNackForReceiver* nack); +}; + +class SrsRtpVideoQueue : public SrsRtpQueue +{ +public: + SrsRtpVideoQueue(int capacity); + virtual ~SrsRtpVideoQueue(); +public: + virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); +protected: + virtual void collect_packet(SrsRtpNackForReceiver* nack); +private: + virtual void do_collect_packet(SrsRtpNackForReceiver* nack, std::vector& frame); }; #endif diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index a50fe7781..c21c9157b 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -409,7 +409,7 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf) // Try to parse the NALU type for video decoder. if (!buf->empty()) { - nalu_type = SrsAvcNaluType((uint8_t)(buf->head()[0] & 0x3f)); + nalu_type = SrsAvcNaluType((uint8_t)(buf->head()[0] & kNalTypeMask)); } // If user set the decode handler, call it to set the payload. @@ -608,6 +608,42 @@ SrsRtpSTAPPayload::~SrsRtpSTAPPayload() } } +SrsSample* SrsRtpSTAPPayload::get_sps() +{ + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + if (!p || !p->size) { + continue; + } + + SrsAvcNaluType nalu_type = (SrsAvcNaluType)(p->bytes[0] & kNalTypeMask); + if (nalu_type == SrsAvcNaluTypeSPS) { + return p; + } + } + + return NULL; +} + +SrsSample* SrsRtpSTAPPayload::get_pps() +{ + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + if (!p || !p->size) { + continue; + } + + SrsAvcNaluType nalu_type = (SrsAvcNaluType)(p->bytes[0] & kNalTypeMask); + if (nalu_type == SrsAvcNaluTypePPS) { + return p; + } + } + + return NULL; +} + int SrsRtpSTAPPayload::nb_bytes() { int size = 1; @@ -658,7 +694,7 @@ srs_error_t SrsRtpSTAPPayload::decode(SrsBuffer* buf) // STAP header, RTP payload format for aggregation packets // @see https://tools.ietf.org/html/rfc6184#section-5.7 uint8_t v = buf->read_1bytes(); - nri = SrsAvcNaluType(v & kNalTypeMask); + nri = SrsAvcNaluType(v & (~kNalTypeMask)); // NALUs. while (!buf->empty()) { @@ -754,13 +790,13 @@ srs_error_t SrsRtpFUAPayload::decode(SrsBuffer* buf) // FU indicator, @see https://tools.ietf.org/html/rfc6184#section-5.8 uint8_t v = buf->read_1bytes(); - nri = SrsAvcNaluType(v & kNalTypeMask); + nri = SrsAvcNaluType(v & (~kNalTypeMask)); // FU header, @see https://tools.ietf.org/html/rfc6184#section-5.8 v = buf->read_1bytes(); start = v & kStart; end = v & kEnd; - nalu_type = SrsAvcNaluType(v & 0x3f); + nalu_type = SrsAvcNaluType(v & kNalTypeMask); if (!buf->require(1)) { return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1); @@ -841,7 +877,7 @@ srs_error_t SrsRtpFUAPayload2::decode(SrsBuffer* buf) v = buf->read_1bytes(); start = v & kStart; end = v & kEnd; - nalu_type = SrsAvcNaluType(v & 0x3f); + nalu_type = SrsAvcNaluType(v & kNalTypeMask); if (!buf->require(1)) { return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1); diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index c587113c8..fd38a4d12 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -214,6 +214,9 @@ public: public: SrsRtpSTAPPayload(); virtual ~SrsRtpSTAPPayload(); +public: + SrsSample* get_sps(); + SrsSample* get_pps(); // interface ISrsEncoder public: virtual int nb_bytes();