diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index afc727d84..36e104190 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -282,6 +282,75 @@ srs_error_t SrsPlaintextTransport::unprotect_rtcp(const char* cipher, char* plai return srs_success; } +ISrsRtcPLIWorkerHandler::ISrsRtcPLIWorkerHandler() +{ +} + +ISrsRtcPLIWorkerHandler::~ISrsRtcPLIWorkerHandler() +{ +} + +ISrsRtcPLIWorker::ISrsRtcPLIWorker(ISrsRtcPLIWorkerHandler* h) +{ + handler_ = h; + wait_ = srs_cond_new(); + trd_ = new SrsSTCoroutine("pli", this, _srs_context->get_id()); +} + +ISrsRtcPLIWorker::~ISrsRtcPLIWorker() +{ + srs_cond_signal(wait_); + trd_->stop(); + + srs_freep(trd_); + srs_cond_destroy(wait_); +} + +srs_error_t ISrsRtcPLIWorker::start() +{ + srs_error_t err = srs_success; + + if ((err = trd_->start()) != srs_success) { + return srs_error_wrap(err, "start pli worker"); + } + + return err; +} + +void ISrsRtcPLIWorker::request_keyframe(uint32_t ssrc, SrsContextId cid) +{ + plis_.insert(make_pair(ssrc, cid)); + srs_cond_signal(wait_); +} + +srs_error_t ISrsRtcPLIWorker::cycle() +{ + srs_error_t err = srs_success; + + while (true) { + if ((err = trd_->pull()) != srs_success) { + return srs_error_wrap(err, "quit"); + } + + std::map plis; + plis.swap(plis_); + + for (map::iterator it = plis.begin(); it != plis.end(); ++it) { + uint32_t ssrc = it->first; + SrsContextId cid = it->second; + + if ((err = handler_->do_request_keyframe(ssrc, cid)) != srs_success) { + srs_warn("PLI error, %s", srs_error_desc(err).c_str()); + srs_error_reset(err); + } + } + + srs_cond_wait(wait_); + } + + return err; +} + SrsRtcPlayStreamStatistic::SrsRtcPlayStreamStatistic() { nn_rtp_pkts = 0; @@ -314,13 +383,14 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) _srs_config->subscribe(this); timer_ = new SrsHourGlass(this, 1000 * SRS_UTIME_MILLISECONDS); - nack_epp = new SrsErrorPithyPrint(); + pli_worker_ = new ISrsRtcPLIWorker(this); } SrsRtcPlayStream::~SrsRtcPlayStream() { _srs_config->unsubscribe(this); + srs_freep(pli_worker_); srs_freep(trd); srs_freep(timer_); srs_freep(req_); @@ -421,6 +491,10 @@ srs_error_t SrsRtcPlayStream::start() return srs_error_wrap(err, "start timer"); } + if ((err = pli_worker_->start()) != srs_success) { + return srs_error_wrap(err, "start pli worker"); + } + if (_srs_rtc_hijacker) { if ((err = _srs_rtc_hijacker->on_start_play(session_, this, req_)) != srs_success) { return srs_error_wrap(err, "on start play"); @@ -760,13 +834,9 @@ srs_error_t SrsRtcPlayStream::on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp) uint8_t fmt = rtcp->get_rc(); switch (fmt) { case kPLI: { - ISrsRtcPublishStream* publisher = source_->publish_stream(); - if (publisher) { - uint32_t ssrc = get_video_publish_ssrc(rtcp->get_media_ssrc()); - if (ssrc != 0) { - publisher->request_keyframe(ssrc); - srs_info("RTC request PLI"); - } + uint32_t ssrc = get_video_publish_ssrc(rtcp->get_media_ssrc()); + if (ssrc) { + pli_worker_->request_keyframe(ssrc, cid_); } session_->stat_->nn_pli++; @@ -804,6 +874,23 @@ uint32_t SrsRtcPlayStream::get_video_publish_ssrc(uint32_t play_ssrc) return 0; } +srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId cid) +{ + srs_error_t err = srs_success; + + // The source MUST exists, when PLI thread is running. + srs_assert(source_); + + ISrsRtcPublishStream* publisher = source_->publish_stream(); + if (!publisher) { + return err; + } + + publisher->request_keyframe(ssrc); + + return err; +} + SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid) { timer_ = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); @@ -823,6 +910,8 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon nn_audio_frames = 0; twcc_id_ = 0; twcc_fb_count_ = 0; + + pli_worker_ = new ISrsRtcPLIWorker(this); } SrsRtcPublishStream::~SrsRtcPublishStream() @@ -831,16 +920,16 @@ SrsRtcPublishStream::~SrsRtcPublishStream() _srs_rtc_hijacker->on_stop_publish(session_, this, req); } + // TODO: FIXME: Should remove and delete source. if (source) { source->set_publish_stream(NULL); source->on_unpublish(); } - // TODO: FIXME: Should remove and delete source. - + srs_freep(timer_); + srs_freep(pli_worker_); srs_freep(pli_epp); srs_freep(req); - srs_freep(timer_); } srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescription* stream_desc) @@ -908,6 +997,10 @@ srs_error_t SrsRtcPublishStream::start() return srs_error_wrap(err, "on publish"); } + if ((err = pli_worker_->start()) != srs_success) { + return srs_error_wrap(err, "start pli worker"); + } + if (_srs_rtc_hijacker) { if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req)) != srs_success) { return srs_error_wrap(err, "on start publish"); @@ -1302,23 +1395,30 @@ srs_error_t SrsRtcPublishStream::on_rtcp_xr(SrsRtcpXr* rtcp) return err; } -// TODO: FIXME: Use async request PLI to prevent dup requests. void SrsRtcPublishStream::request_keyframe(uint32_t ssrc) { + pli_worker_->request_keyframe(ssrc, _srs_context->get_id()); +} + +srs_error_t SrsRtcPublishStream::do_request_keyframe(uint32_t ssrc, SrsContextId sub_cid) +{ + srs_error_t err = srs_success; + uint32_t nn = 0; if (pli_epp->can_print(ssrc, &nn)) { // The player(subscriber) cid, which requires PLI. - const SrsContextId& sub_cid = _srs_context->get_id(); srs_trace("RTC: Need PLI ssrc=%u, play=[%s], publish=[%s], count=%u/%u", ssrc, sub_cid.c_str(), cid_.c_str(), nn, pli_epp->nn_count); } - SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc); - if (video_track) { - video_track->request_keyframe(); + if ((err = session_->send_rtcp_fb_pli(ssrc, sub_cid)) != srs_success) { + srs_warn("PLI err %s", srs_error_desc(err).c_str()); + srs_freep(err); } session_->stat_->nn_pli++; + + return err; } void SrsRtcPublishStream::on_consumers_finished() diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 0f593d8e7..c414d09a9 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -175,6 +175,37 @@ public: virtual srs_error_t unprotect_rtcp(const char* cipher, char* plaintext, int& nb_plaintext); }; +// The handler for PLI worker coroutine. +class ISrsRtcPLIWorkerHandler +{ +public: + ISrsRtcPLIWorkerHandler(); + virtual ~ISrsRtcPLIWorkerHandler(); +public: + virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid) = 0; +}; + +// A worker coroutine to request the PLI. +class ISrsRtcPLIWorker : virtual public ISrsCoroutineHandler +{ +private: + SrsCoroutine* trd_; + srs_cond_t wait_; + ISrsRtcPLIWorkerHandler* handler_; +private: + // Key is SSRC, value is the CID of subscriber which requests PLI. + std::map plis_; +public: + ISrsRtcPLIWorker(ISrsRtcPLIWorkerHandler* h); + virtual ~ISrsRtcPLIWorker(); +public: + virtual srs_error_t start(); + virtual void request_keyframe(uint32_t ssrc, SrsContextId cid); +// interface ISrsCoroutineHandler +public: + virtual srs_error_t cycle(); +}; + // A group of RTP packets for outgoing(send to players). class SrsRtcPlayStreamStatistic { @@ -208,12 +239,14 @@ public: }; // A RTC play stream, client pull and play stream from SRS. -class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler, virtual public ISrsHourGlass +class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler + , virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler { private: SrsContextId cid_; SrsCoroutine* trd; SrsRtcConnection* session_; + ISrsRtcPLIWorker* pli_worker_; private: SrsRequest* req_; SrsRtcStream* source_; @@ -266,15 +299,20 @@ private: srs_error_t on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp); srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp); uint32_t get_video_publish_ssrc(uint32_t play_ssrc); +// inteface ISrsRtcPLIWorkerHandler +public: + virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); }; // A RTC publish stream, client push and publish stream to SRS. -class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler, virtual public ISrsRtcPublishStream +class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler + , virtual public ISrsRtcPublishStream, virtual public ISrsRtcPLIWorkerHandler { private: SrsContextId cid_; SrsHourGlass* timer_; uint64_t nn_audio_frames; + ISrsRtcPLIWorker* pli_worker_; private: SrsRtcConnection* session_; uint16_t pt_to_drop_; @@ -326,6 +364,7 @@ private: srs_error_t on_rtcp_xr(SrsRtcpXr* rtcp); public: void request_keyframe(uint32_t ssrc); + virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); void on_consumers_finished(); // interface ISrsHourGlass public: diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 393ba629d..d74171f2f 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -1768,7 +1768,6 @@ srs_error_t SrsRtcAudioRecvTrack::check_send_nacks() SrsRtcVideoRecvTrack::SrsRtcVideoRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc) : SrsRtcRecvTrack(session, track_desc, false) { - request_key_frame_ = false; } SrsRtcVideoRecvTrack::~SrsRtcVideoRecvTrack() @@ -1792,17 +1791,6 @@ srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcStream* source, SrsRtpPacket2* pk return srs_error_wrap(err, "source on rtp"); } - // TODO: FIXME: add rtp process - if (request_key_frame_) { - // TODO: FIXME: add coroutine to request key frame. - request_key_frame_ = false; - - if ((err = session_->send_rtcp_fb_pli(track_desc_->ssrc_, cid_of_subscriber_)) != srs_success) { - srs_warn("PLI err %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - } - // For NACK to handle packet. if ((err = on_nack(pkt)) != srs_success) { return srs_error_wrap(err, "on nack"); @@ -1821,7 +1809,7 @@ srs_error_t SrsRtcVideoRecvTrack::check_send_nacks() } // If NACK timeout, start PLI if not requesting. - if (timeout_nacks == 0 || request_key_frame_) { + if (timeout_nacks == 0) { return err; } @@ -1831,13 +1819,6 @@ srs_error_t SrsRtcVideoRecvTrack::check_send_nacks() return err; } -void SrsRtcVideoRecvTrack::request_keyframe() -{ - cid_of_subscriber_ = _srs_context->get_id(); - request_key_frame_ = true; -} - - SrsRtcSendTrack::SrsRtcSendTrack(SrsRtcConnection* session, SrsRtcTrackDescription* track_desc, bool is_audio) { session_ = session; diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index e692610fe..2efad8ca3 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -518,18 +518,12 @@ public: class SrsRtcVideoRecvTrack : public SrsRtcRecvTrack { -private: - bool request_key_frame_; - // The player(subscriber) cid, which requires PLI. - SrsContextId cid_of_subscriber_; public: SrsRtcVideoRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* stream_descs); virtual ~SrsRtcVideoRecvTrack(); public: virtual srs_error_t on_rtp(SrsRtcStream* source, SrsRtpPacket2* pkt); virtual srs_error_t check_send_nacks(); -public: - void request_keyframe(); }; class SrsRtcSendTrack