diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 19a8d24e1..2cf983012 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1924,18 +1924,14 @@ void SrsRtcPublisher::on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* bu srs_error_t SrsRtcPublisher::on_audio(SrsRtpPacket2* pkt) { + srs_error_t err = srs_success; + // TODO: FIXME: Error check. audio_queue_->consume(audio_nack_, pkt); check_send_nacks(audio_nack_, audio_ssrc); - return collect_audio_frames(); -} - -srs_error_t SrsRtcPublisher::collect_audio_frames() -{ - srs_error_t err = srs_success; - + // Collect all audio frames. std::vector frames; audio_queue_->collect_frames(audio_nack_, frames); @@ -1943,7 +1939,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frames() SrsRtpPacket2* pkt = frames[i]; // TODO: FIXME: Check error. - do_collect_audio_frame(pkt); + collect_audio_frame(pkt); srs_freep(pkt); } @@ -1951,7 +1947,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frames() return err; } -srs_error_t SrsRtcPublisher::do_collect_audio_frame(SrsRtpPacket2* pkt) +srs_error_t SrsRtcPublisher::collect_audio_frame(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; @@ -1989,28 +1985,6 @@ srs_error_t SrsRtcPublisher::do_collect_audio_frame(SrsRtpPacket2* pkt) srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) { - uint8_t v = (uint8_t)pkt->nalu_type; - if (v == kFuA) { - SrsRtpFUAPayload2* payload = dynamic_cast(pkt->payload); - if (!payload) { - srs_freep(pkt); - return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload"); - } - - pkt->video_is_first_packet = payload->start; - pkt->video_is_last_packet = payload->end; - pkt->video_is_idr = (payload->nalu_type == SrsAvcNaluTypeIDR); - } else { - pkt->video_is_first_packet = true; - pkt->video_is_last_packet = true; - - if (v == kStapA) { - pkt->video_is_idr = true; - } else { - pkt->video_is_idr = (pkt->nalu_type == SrsAvcNaluTypeIDR); - } - } - // TODO: FIXME: Error check. video_queue_->consume(video_nack_, pkt); @@ -2021,110 +1995,35 @@ srs_error_t SrsRtcPublisher::on_video(SrsRtpPacket2* pkt) check_send_nacks(video_nack_, video_ssrc); - return collect_video_frames(); -} - -srs_error_t SrsRtcPublisher::collect_video_frames() -{ - std::vector > frames; + // Collect video frames. + std::vector frames; video_queue_->collect_frames(video_nack_, frames); for (size_t i = 0; i < frames.size(); ++i) { - vector& packets = frames[i]; - if (packets.empty()) { - continue; - } + SrsRtpPacket2* pkt = frames[i]; // TODO: FIXME: Check error. - do_collect_video_frame(packets); + collect_video_frame(pkt); - for (size_t j = 0; j < packets.size(); ++j) { - SrsRtpPacket2* pkt = packets[j]; - srs_freep(pkt); - } + srs_freep(pkt); } return srs_success; } -srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector& packets) +srs_error_t SrsRtcPublisher::collect_video_frame(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; - // Although a video frame may contain many packets, they share the same NALU type. - SrsRtpPacket2* head = packets.at(0); - SrsAvcNaluType nalu_type = head->nalu_type; - int64_t timestamp = head->rtp_header.get_timestamp(); + int64_t timestamp = pkt->rtp_header.get_timestamp(); - // For FU-A or STAP-A, there must be more than one packets. - if (nalu_type == (SrsAvcNaluType)kFuA) { - if (packets.size() < 2) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A/STAP-A %#x %d packets", nalu_type, packets.size()); - } - } else { - // For others type, should be one packet for one frame. - if (packets.size() != 1) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "NonFU-A %d packets", packets.size()); - } - } - - // For FU-A, group packets to one video frame. - if (nalu_type == (SrsAvcNaluType)kFuA) { - int nn_nalus = 0; - for (size_t i = 0; i < packets.size(); ++i) { - SrsRtpPacket2* pkt = packets[i]; - SrsRtpFUAPayload2* payload = dynamic_cast(pkt->payload); - if (!payload) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload"); - } - nn_nalus += payload->size; - } - if (!nn_nalus) { - return err; - } - - // TODO: FIXME: Directly covert to sample for performance. - // 5 bytes FLV tag header. - // 4 bytes NALU IBMF header, define by sequence header. - // 1 byte NALU header. - nn_nalus += 1; - int nn_payload = nn_nalus + 5 + 4; - char* data = new char[nn_payload]; - SrsBuffer buf(data, nn_payload); - - SrsRtpFUAPayload2* head_payload = dynamic_cast(head->payload); - if (head_payload->nalu_type == SrsAvcNaluTypeIDR) { - buf.write_1bytes(0x17); // Keyframe. - srs_trace("RTC got IDR %d bytes", nn_nalus); - } else { - buf.write_1bytes(0x27); // Not Keyframe. - } - buf.write_1bytes(0x01); // Not Sequence header. - buf.write_3bytes(0x00); // CTS. - buf.write_4bytes(nn_nalus); - - buf.write_1bytes(head_payload->nri | head_payload->nalu_type); // NALU header. - - for (size_t i = 0; i < packets.size(); ++i) { - SrsRtpPacket2* pkt = packets[i]; - SrsRtpFUAPayload2* payload = dynamic_cast(pkt->payload); - buf.write_bytes(payload->payload, payload->size); - } - - SrsMessageHeader header; - header.message_type = RTMP_MSG_VideoMessage; - // TODO: FIXME: Maybe the tbn is not 90k. - header.timestamp = (timestamp / 90) & 0x3fffffff; - SrsCommonMessage* shared_video = new SrsCommonMessage(); - SrsAutoFree(SrsCommonMessage, shared_video); - // TODO: FIXME: Check error. - shared_video->create(&header, data, nn_payload); - return source->on_video(shared_video); + // No FU-A, because we convert it to RAW RTP packet. + if (pkt->nalu_type == (SrsAvcNaluType)kFuA) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid FU-A"); } // For STAP-A, it must be SPS/PPS, and only one packet. - if (nalu_type == (SrsAvcNaluType)kStapA) { - SrsRtpPacket2* pkt = head; + if (pkt->nalu_type == (SrsAvcNaluType)kStapA) { SrsRtpSTAPPayload* payload = dynamic_cast(pkt->payload); if (!payload) { return srs_error_new(ERROR_RTC_RTP_MUXER, "STAP-A payload"); @@ -2178,7 +2077,6 @@ srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector& } // For RAW NALU, should be one RAW packet. - SrsRtpPacket2* pkt = head; SrsRtpRawPayload* payload = dynamic_cast(pkt->payload); if (!payload) { return srs_error_new(ERROR_RTC_RTP_MUXER, "RAW-NALU payload"); @@ -2194,7 +2092,7 @@ srs_error_t SrsRtcPublisher::do_collect_video_frame(std::vector& char* data = new char[nn_payload]; SrsBuffer buf(data, nn_payload); - if (nalu_type == SrsAvcNaluTypeIDR) { + if (pkt->nalu_type == SrsAvcNaluTypeIDR) { buf.write_1bytes(0x17); // Keyframe. srs_trace("RTC got IDR %d bytes", nn_payload); } else { diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 9babd62e0..2c3474b46 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -58,6 +58,7 @@ class SrsRtpVideoQueue; class SrsRtpPacket2; class ISrsCodec; class SrsRtpNackForReceiver; +class SrsRtpIncommingVideoFrame; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -287,11 +288,9 @@ public: virtual void on_before_decode_payload(SrsRtpPacket2* pkt, SrsBuffer* buf, ISrsCodec** ppayload); private: srs_error_t on_audio(SrsRtpPacket2* pkt); - srs_error_t collect_audio_frames(); - srs_error_t do_collect_audio_frame(SrsRtpPacket2* packet); + srs_error_t collect_audio_frame(SrsRtpPacket2* pkt); srs_error_t on_video(SrsRtpPacket2* pkt); - srs_error_t collect_video_frames(); - srs_error_t do_collect_video_frame(std::vector& packets); + srs_error_t collect_video_frame(SrsRtpPacket2* pkt); public: void request_keyframe(); // interface ISrsHourGlass diff --git a/trunk/src/app/srs_app_rtp_queue.cpp b/trunk/src/app/srs_app_rtp_queue.cpp index 511ff90a1..7f7070c7f 100644 --- a/trunk/src/app/srs_app_rtp_queue.cpp +++ b/trunk/src/app/srs_app_rtp_queue.cpp @@ -450,9 +450,53 @@ SrsRtpVideoQueue::~SrsRtpVideoQueue() { } -void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector >& frames) +srs_error_t SrsRtpVideoQueue::consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt) { - collect_packet(frames, nack); + srs_error_t err = srs_success; + + uint8_t v = (uint8_t)pkt->nalu_type; + if (v == kFuA) { + SrsRtpFUAPayload2* payload = dynamic_cast(pkt->payload); + if (!payload) { + srs_freep(pkt); + return srs_error_new(ERROR_RTC_RTP_MUXER, "FU-A payload"); + } + + pkt->video_is_first_packet = payload->start; + pkt->video_is_last_packet = payload->end; + pkt->video_is_idr = (payload->nalu_type == SrsAvcNaluTypeIDR); + } else { + pkt->video_is_first_packet = true; + pkt->video_is_last_packet = true; + + if (v == kStapA) { + pkt->video_is_idr = true; + } else { + pkt->video_is_idr = (pkt->nalu_type == SrsAvcNaluTypeIDR); + } + } + + if ((err = SrsRtpQueue::consume(nack, pkt)) != srs_success) { + return srs_error_wrap(err, "video consume"); + } + + return err; +} + +void SrsRtpVideoQueue::collect_frames(SrsRtpNackForReceiver* nack, std::vector& frames) +{ + while (queue_->low() != queue_->high()) { + SrsRtpPacket2* pkt = NULL; + + collect_packet(nack, &pkt); + + if (!pkt) { + return; + } + + nn_collected_frames++; + frames.push_back(pkt); + } if (queue_->overflow()) { on_overflow(nack); @@ -493,41 +537,27 @@ void SrsRtpVideoQueue::on_overflow(SrsRtpNackForReceiver* nack) queue_->advance_to(next + 1); } -void SrsRtpVideoQueue::collect_packet(vector >& frames, SrsRtpNackForReceiver* nack) -{ - while (queue_->low() != queue_->high()) { - vector frame; - - do_collect_packet(nack, frame); - - if (frame.empty()) { - return; - } - - nn_collected_frames++; - frames.push_back(frame); - } -} - // TODO: FIXME: Should refer to the FU-A original video frame, to avoid finding for each packet. -void SrsRtpVideoQueue::do_collect_packet(SrsRtpNackForReceiver* nack, vector& frame) +void SrsRtpVideoQueue::collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt) { // When done, s point to the next available packet. uint16_t next = queue_->low(); bool found = false; + vector frame; + for (; next != queue_->high(); ++next) { SrsRtpPacket2* pkt = queue_->at(next); // Not found or in NACK, stop collecting frame. if (!pkt || nack->find(next) != NULL) { srs_trace("wait for nack seq=%u", next); - break; + return; } // Ignore when the first packet not the start. if (next == queue_->low() && !pkt->video_is_first_packet) { - break; + return; } // OK, collect packet to frame. @@ -542,17 +572,78 @@ void SrsRtpVideoQueue::do_collect_packet(SrsRtpNackForReceiver* nack, vectorhigh()) { + if (cur != queue_->high()) { // Reset the range of packets to NULL in buffer. queue_->reset(queue_->low(), next); srs_verbose("collect on frame, update head seq=%u t %u", queue_->low(), next); queue_->advance_to(next); } + + // Merge packets to one packet. + covert_packet(frame, ppkt); + return; +} + +void SrsRtpVideoQueue::covert_packet(std::vector& frame, SrsRtpPacket2** ppkt) +{ + if (frame.size() == 1) { + *ppkt = frame[0]; + return; + } + + // If more than one packet in a frame, it must be FU-A. + SrsRtpPacket2* head = frame.at(0); + SrsAvcNaluType nalu_type = head->nalu_type; + + // Covert FU-A to one RAW RTP packet. + int nn_nalus = 0; + for (size_t i = 0; i < frame.size(); ++i) { + SrsRtpPacket2* pkt = frame[i]; + SrsRtpFUAPayload2* payload = dynamic_cast(pkt->payload); + if (!payload) { + nn_nalus = 0; + break; + } + nn_nalus += payload->size; + } + + // Invalid packets, ignore. + if (nalu_type != (SrsAvcNaluType)kFuA || !nn_nalus) { + for (int i = 0; i < (int)frame.size(); i++) { + SrsRtpPacket2* pkt = frame[i]; + srs_freep(pkt); + } + return; + } + + // Merge to one RAW RTP packet. + // TODO: FIXME: Should covert to multiple NALU RTP packet to avoid copying. + SrsRtpPacket2* pkt = new SrsRtpPacket2(); + pkt->rtp_header = head->rtp_header; + + SrsRtpFUAPayload2* head_payload = dynamic_cast(head->payload); + pkt->nalu_type = head_payload->nalu_type; + + SrsRtpRawPayload* payload = pkt->reuse_raw(); + payload->nn_payload = nn_nalus + 1; + payload->payload = new char[payload->nn_payload]; + + SrsBuffer buf(payload->payload, payload->nn_payload); + + buf.write_1bytes(head_payload->nri | head_payload->nalu_type); // NALU header. + + for (size_t i = 0; i < frame.size(); ++i) { + SrsRtpPacket2* pkt = frame[i]; + SrsRtpFUAPayload2* payload = dynamic_cast(pkt->payload); + buf.write_bytes(payload->payload, payload->size); + } + + *ppkt = pkt; } diff --git a/trunk/src/app/srs_app_rtp_queue.hpp b/trunk/src/app/srs_app_rtp_queue.hpp index 445b2c6a7..d5674f429 100644 --- a/trunk/src/app/srs_app_rtp_queue.hpp +++ b/trunk/src/app/srs_app_rtp_queue.hpp @@ -204,13 +204,14 @@ public: SrsRtpVideoQueue(int capacity); virtual ~SrsRtpVideoQueue(); public: - virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector >& frames); + virtual srs_error_t consume(SrsRtpNackForReceiver* nack, SrsRtpPacket2* pkt); + virtual void collect_frames(SrsRtpNackForReceiver* nack, std::vector& frame); bool should_request_key_frame(); void request_keyframe(); private: virtual void on_overflow(SrsRtpNackForReceiver* nack); - virtual void collect_packet(std::vector >& frames, SrsRtpNackForReceiver* nack); - virtual void do_collect_packet(SrsRtpNackForReceiver* nack, std::vector& frame); + virtual void collect_packet(SrsRtpNackForReceiver* nack, SrsRtpPacket2** ppkt); + virtual void covert_packet(std::vector& frame, SrsRtpPacket2** ppkt); }; #endif