From fd6c653d3c03a484d681f19cc31c829242788568 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 7 May 2021 11:25:37 +0800 Subject: [PATCH] SquashSRS4: Refine performance for FastTimer --- README.md | 2 + trunk/src/app/srs_app_hourglass.cpp | 58 +++++----- trunk/src/app/srs_app_hourglass.hpp | 22 ++-- trunk/src/app/srs_app_hybrid.cpp | 53 +++++++-- trunk/src/app/srs_app_hybrid.hpp | 12 ++- trunk/src/app/srs_app_rtc_conn.cpp | 156 +++++++++++++-------------- trunk/src/app/srs_app_rtc_conn.hpp | 41 +++---- trunk/src/app/srs_app_rtc_server.cpp | 4 +- trunk/src/app/srs_app_rtc_server.hpp | 2 +- trunk/src/app/srs_app_rtc_source.cpp | 22 ++-- trunk/src/app/srs_app_rtc_source.hpp | 6 +- trunk/src/core/srs_core_version4.hpp | 2 +- 12 files changed, 216 insertions(+), 164 deletions(-) diff --git a/README.md b/README.md index 15c0ea607..a3c105810 100755 --- a/README.md +++ b/README.md @@ -182,6 +182,8 @@ The ports used by SRS: ## V4 changes +* v4.0, 2021-05-07, RTC: Refine play stream find track. 4.0.102 +* v4.0, 2021-05-07, RTC: Refine FastTimer to fixed interval. 4.0.101 * v4.0, 2021-05-06, RTC: Fix config bug for nack and twcc. 4.0.99 * v4.0, 2021-05-04, Add video room demo. 4.0.98 * v4.0, 2021-05-03, Add RTC stream merging demo by FFmpeg. 4.0.97 diff --git a/trunk/src/app/srs_app_hourglass.cpp b/trunk/src/app/srs_app_hourglass.cpp index 95ccfdd40..16bd200e6 100644 --- a/trunk/src/app/srs_app_hourglass.cpp +++ b/trunk/src/app/srs_app_hourglass.cpp @@ -23,6 +23,7 @@ #include +#include using namespace std; #include @@ -147,71 +148,60 @@ ISrsFastTimer::~ISrsFastTimer() { } -SrsFastTimer::SrsFastTimer(std::string label, srs_utime_t resolution) +SrsFastTimer::SrsFastTimer(std::string label, srs_utime_t interval) { - timer_ = new SrsHourGlass(label, this, resolution); + interval_ = interval; + trd_ = new SrsSTCoroutine(label, this, _srs_context->get_id()); } SrsFastTimer::~SrsFastTimer() { - srs_freep(timer_); + srs_freep(trd_); } srs_error_t SrsFastTimer::start() { srs_error_t err = srs_success; - if ((err = timer_->start()) != srs_success) { + if ((err = trd_->start()) != srs_success) { return srs_error_wrap(err, "start timer"); } return err; } -void SrsFastTimer::subscribe(srs_utime_t interval, ISrsFastTimer* timer) +void SrsFastTimer::subscribe(ISrsFastTimer* timer) { - static int g_event = 0; - - int event = g_event++; - - // TODO: FIXME: Error leak. Change tick to void in future. - timer_->tick(event, interval); - - handlers_[event] = timer; + if (std::find(handlers_.begin(), handlers_.end(), timer) == handlers_.end()) { + handlers_.push_back(timer); + } } void SrsFastTimer::unsubscribe(ISrsFastTimer* timer) { - for (map::iterator it = handlers_.begin(); it != handlers_.end();) { - if (it->second != timer) { - ++it; - continue; - } - - handlers_.erase(it++); - - int event = it->first; - timer_->untick(event); + vector::iterator it = std::find(handlers_.begin(), handlers_.end(), timer); + if (it != handlers_.end()) { + handlers_.erase(it); } } -srs_error_t SrsFastTimer::notify(int event, srs_utime_t interval, srs_utime_t tick) +srs_error_t SrsFastTimer::cycle() { srs_error_t err = srs_success; - for (map::iterator it = handlers_.begin(); it != handlers_.end(); ++it) { - ISrsFastTimer* timer = it->second; - - if (event != it->first) { - continue; + while (true) { + if ((err = trd_->pull()) != srs_success) { + return srs_error_wrap(err, "quit"); } - if ((err = timer->on_timer(interval, tick)) != srs_success) { - return srs_error_wrap(err, "tick for event=%d, interval=%dms, tick=%dms", - event, srsu2msi(interval), srsu2msi(tick)); + for (int i = 0; i < (int)handlers_.size(); i++) { + ISrsFastTimer* timer = handlers_.at(i); + if ((err = timer->on_timer(interval_)) != srs_success) { + srs_freep(err); // Ignore any error for shared timer. + } } - break; + srs_usleep(interval_); } return err; @@ -225,7 +215,7 @@ SrsClockWallMonitor::~SrsClockWallMonitor() { } -srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick) +srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_hourglass.hpp b/trunk/src/app/srs_app_hourglass.hpp index d0b11c024..51b9bcf59 100644 --- a/trunk/src/app/srs_app_hourglass.hpp +++ b/trunk/src/app/srs_app_hourglass.hpp @@ -30,6 +30,7 @@ #include #include +#include class SrsCoroutine; @@ -113,28 +114,31 @@ public: virtual ~ISrsFastTimer(); public: // Tick when timer is active. - virtual srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick) = 0; + virtual srs_error_t on_timer(srs_utime_t interval) = 0; }; // The fast timer, shared by objects, for high performance. // For example, we should never start a timer for each connection or publisher or player, // instead, we should start only one fast timer in server. -class SrsFastTimer : public ISrsHourGlass +class SrsFastTimer : public ISrsCoroutineHandler { private: - SrsHourGlass* timer_; - std::map handlers_; + SrsCoroutine* trd_; + srs_utime_t interval_; + std::vector handlers_; public: - SrsFastTimer(std::string label, srs_utime_t resolution); + SrsFastTimer(std::string label, srs_utime_t interval); virtual ~SrsFastTimer(); public: srs_error_t start(); public: - void subscribe(srs_utime_t interval, ISrsFastTimer* timer); + void subscribe(ISrsFastTimer* timer); void unsubscribe(ISrsFastTimer* timer); -// Interface ISrsHourGlass +// Interface ISrsCoroutineHandler private: - virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); + // Cycle the hourglass, which will sleep resolution every time. + // and call handler when ticked. + virtual srs_error_t cycle(); }; // To monitor the system wall clock timer deviation. @@ -145,7 +149,7 @@ public: virtual ~SrsClockWallMonitor(); // interface ISrsFastTimer private: - srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); + srs_error_t on_timer(srs_utime_t interval); }; #endif diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index 7d691448e..0d878d658 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -141,7 +141,8 @@ SrsHybridServer::SrsHybridServer() { // Note that the timer depends on other global variables, // so we MUST never create it in constructor. - timer_ = NULL; + timer20ms_ = NULL; + timer5s_ = NULL; clock_monitor_ = new SrsClockWallMonitor(); } @@ -149,7 +150,8 @@ SrsHybridServer::SrsHybridServer() SrsHybridServer::~SrsHybridServer() { srs_freep(clock_monitor_); - srs_freep(timer_); + srs_freep(timer20ms_); + srs_freep(timer5s_); vector::iterator it; for (it = servers.begin(); it != servers.end(); ++it) { @@ -174,19 +176,33 @@ srs_error_t SrsHybridServer::initialize() } // Create global shared timer. - timer_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS); + timer20ms_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS); + timer100ms_ = new SrsFastTimer("hybrid", 100 * SRS_UTIME_MILLISECONDS); + timer1s_ = new SrsFastTimer("hybrid", 1 * SRS_UTIME_SECONDS); + timer5s_ = new SrsFastTimer("hybrid", 5 * SRS_UTIME_SECONDS); // Start the timer first. - if ((err = timer_->start()) != srs_success) { + if ((err = timer20ms_->start()) != srs_success) { return srs_error_wrap(err, "start timer"); } - // The hybrid server start a timer, do routines of hybrid server. - timer_->subscribe(5 * SRS_UTIME_SECONDS, this); + if ((err = timer100ms_->start()) != srs_success) { + return srs_error_wrap(err, "start timer"); + } - // A monitor to check the clock wall deviation, per clock tick. - timer_->subscribe(20 * SRS_UTIME_MILLISECONDS, clock_monitor_); + if ((err = timer1s_->start()) != srs_success) { + return srs_error_wrap(err, "start timer"); + } + if ((err = timer5s_->start()) != srs_success) { + return srs_error_wrap(err, "start timer"); + } + + // Register some timers. + timer20ms_->subscribe(clock_monitor_); + timer5s_->subscribe(this); + + // Initialize all hybrid servers. vector::iterator it; for (it = servers.begin(); it != servers.end(); ++it) { ISrsHybridServer* server = *it; @@ -237,12 +253,27 @@ SrsServerAdapter* SrsHybridServer::srs() return NULL; } -SrsFastTimer* SrsHybridServer::timer() +SrsFastTimer* SrsHybridServer::timer20ms() { - return timer_; + return timer20ms_; } -srs_error_t SrsHybridServer::on_timer(srs_utime_t interval, srs_utime_t tick) +SrsFastTimer* SrsHybridServer::timer100ms() +{ + return timer100ms_; +} + +SrsFastTimer* SrsHybridServer::timer1s() +{ + return timer1s_; +} + +SrsFastTimer* SrsHybridServer::timer5s() +{ + return timer5s_; +} + +srs_error_t SrsHybridServer::on_timer(srs_utime_t interval) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_hybrid.hpp b/trunk/src/app/srs_app_hybrid.hpp index 77b774b6c..729f00016 100644 --- a/trunk/src/app/srs_app_hybrid.hpp +++ b/trunk/src/app/srs_app_hybrid.hpp @@ -53,7 +53,10 @@ class SrsHybridServer : public ISrsFastTimer { private: std::vector servers; - SrsFastTimer* timer_; + SrsFastTimer* timer20ms_; + SrsFastTimer* timer100ms_; + SrsFastTimer* timer1s_; + SrsFastTimer* timer5s_; SrsClockWallMonitor* clock_monitor_; public: SrsHybridServer(); @@ -66,10 +69,13 @@ public: virtual void stop(); public: virtual SrsServerAdapter* srs(); - SrsFastTimer* timer(); + SrsFastTimer* timer20ms(); + SrsFastTimer* timer100ms(); + SrsFastTimer* timer1s(); + SrsFastTimer* timer5s(); // interface ISrsFastTimer private: - srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); + srs_error_t on_timer(srs_utime_t interval); }; extern SrsHybridServer* _srs_hybrid; diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index ffba74781..57ddb76f8 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -76,10 +76,6 @@ extern SrsPps* _srs_pps_snack2; extern SrsPps* _srs_pps_rnack; extern SrsPps* _srs_pps_rnack2; -#define SRS_TICKID_RTCP 0 -#define SRS_TICKID_TWCC 1 -#define SRS_TICKID_SEND_NACKS 2 - ISrsRtcTransport::ISrsRtcTransport() { } @@ -384,13 +380,17 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) nack_no_copy_ = false; _srs_config->subscribe(this); - timer_ = new SrsHourGlass("play", this, 1000 * SRS_UTIME_MILLISECONDS); nack_epp = new SrsErrorPithyPrint(); pli_worker_ = new SrsRtcPLIWorker(this); + + cache_ssrc0_ = cache_ssrc1_ = cache_ssrc2_ = 0; + cache_track0_ = cache_track1_ = cache_track2_ = NULL; } SrsRtcPlayStream::~SrsRtcPlayStream() { + _srs_hybrid->timer1s()->unsubscribe(this); + // TODO: FIXME: Should not do callback in de-constructor? if (_srs_rtc_hijacker) { _srs_rtc_hijacker->on_stop_play(session_, this, req_); @@ -401,7 +401,6 @@ SrsRtcPlayStream::~SrsRtcPlayStream() srs_freep(nack_epp); srs_freep(pli_worker_); srs_freep(trd_); - srs_freep(timer_); srs_freep(req_); if (true) { @@ -532,9 +531,8 @@ srs_error_t SrsRtcPlayStream::start() return srs_error_wrap(err, "rtc_sender"); } - if ((err = timer_->start()) != srs_success) { - return srs_error_wrap(err, "start timer"); - } + // The timer for play, process TWCC in the future. + _srs_hybrid->timer1s()->subscribe(this); if ((err = pli_worker_->start()) != srs_success) { return srs_error_wrap(err, "start pli worker"); @@ -629,44 +627,62 @@ srs_error_t SrsRtcPlayStream::send_packet(SrsRtpPacket2*& pkt) { srs_error_t err = srs_success; - // TODO: FIXME: Maybe refine for performance issue. - if (!audio_tracks_.count(pkt->header.get_ssrc()) && !video_tracks_.count(pkt->header.get_ssrc())) { - srs_warn("RTC: Drop for ssrc %u not found", pkt->header.get_ssrc()); + uint32_t ssrc = pkt->header.get_ssrc(); + + // Try to find track from cache. + SrsRtcSendTrack* track = NULL; + if (cache_ssrc0_ == ssrc) { + track = cache_track0_; + } else if (cache_ssrc1_ == ssrc) { + track = cache_track1_; + } else if (cache_ssrc2_ == ssrc) { + track = cache_track2_; + } + + // Find by original tracks and build fast cache. + if (!track) { + if (pkt->is_audio()) { + map::iterator it = audio_tracks_.find(ssrc); + if (it != audio_tracks_.end()) { + track = it->second; + } + } else { + map::iterator it = video_tracks_.find(ssrc); + if (it != video_tracks_.end()) { + track = it->second; + } + } + + if (track && !cache_ssrc2_) { + if (!cache_ssrc0_) { + cache_ssrc0_ = ssrc; + cache_track0_ = track; + } else if (!cache_ssrc1_) { + cache_ssrc1_ = ssrc; + cache_track1_ = track; + } else if (!cache_ssrc2_) { + cache_ssrc2_ = ssrc; + cache_track2_ = track; + } + } + } + + // Ignore if no track found. + if (!track) { + srs_warn("RTC: Drop for ssrc %u not found", ssrc); return err; } - // For audio, we transcoded AAC to opus in extra payloads. - SrsRtcAudioSendTrack* audio_track = NULL; - SrsRtcVideoSendTrack* video_track = NULL; - if (pkt->is_audio()) { - // TODO: FIXME: Any simple solution? - audio_track = audio_tracks_[pkt->header.get_ssrc()]; - - if ((err = audio_track->on_rtp(pkt)) != srs_success) { - return srs_error_wrap(err, "audio track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence()); - } - - // TODO: FIXME: Padding audio to the max payload in RTP packets. - } else { - // TODO: FIXME: Any simple solution? - video_track = video_tracks_[pkt->header.get_ssrc()]; - - if ((err = video_track->on_rtp(pkt)) != srs_success) { - return srs_error_wrap(err, "video track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence()); - } + // Consume packet by track. + if ((err = track->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "audio track, SSRC=%u, SEQ=%u", ssrc, pkt->header.get_sequence()); } // For NACK to handle packet. // @remark Note that the pkt might be set to NULL. if (nack_enabled_) { - if (audio_track) { - if ((err = audio_track->on_nack(&pkt)) != srs_success) { - return srs_error_wrap(err, "on nack"); - } - } else if (video_track) { - if ((err = video_track->on_nack(&pkt)) != srs_success) { - return srs_error_wrap(err, "on nack"); - } + if ((err = track->on_nack(&pkt)) != srs_success) { + return srs_error_wrap(err, "on nack"); } } @@ -702,7 +718,7 @@ void SrsRtcPlayStream::set_all_tracks_status(bool status) srs_trace("RTC: Init tracks %s ok", merged_log.str().c_str()); } -srs_error_t SrsRtcPlayStream::notify(int type, srs_utime_t interval, srs_utime_t tick) +srs_error_t SrsRtcPlayStream::on_timer(srs_utime_t interval) { srs_error_t err = srs_success; @@ -875,8 +891,6 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid) { - timer_ = new SrsHourGlass("publish", this, 100 * SRS_UTIME_MILLISECONDS); - cid_ = cid; is_started = false; session_ = session; @@ -902,6 +916,8 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon SrsRtcPublishStream::~SrsRtcPublishStream() { + _srs_hybrid->timer100ms()->unsubscribe(this); + // TODO: FIXME: Should remove and delete source. if (source) { source->set_publish_stream(NULL); @@ -928,7 +944,6 @@ SrsRtcPublishStream::~SrsRtcPublishStream() } audio_tracks_.clear(); - srs_freep(timer_); srs_freep(pli_worker_); srs_freep(twcc_epp_); srs_freep(pli_epp); @@ -1037,17 +1052,8 @@ srs_error_t SrsRtcPublishStream::start() return err; } - if ((err = timer_->tick(SRS_TICKID_TWCC, 100 * SRS_UTIME_MILLISECONDS)) != srs_success) { - return srs_error_wrap(err, "twcc tick"); - } - - if ((err = timer_->tick(SRS_TICKID_RTCP, 1000 * SRS_UTIME_MILLISECONDS)) != srs_success) { - return srs_error_wrap(err, "rtcp tick"); - } - - if ((err = timer_->start()) != srs_success) { - return srs_error_wrap(err, "start timer"); - } + // For publisher timer, such as TWCC and RR. + _srs_hybrid->timer100ms()->subscribe(this); if ((err = source->on_publish()) != srs_success) { return srs_error_wrap(err, "on publish"); @@ -1512,7 +1518,7 @@ srs_error_t SrsRtcPublishStream::do_request_keyframe(uint32_t ssrc, SrsContextId return err; } -srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utime_t tick) +srs_error_t SrsRtcPublishStream::on_timer(srs_utime_t interval) { srs_error_t err = srs_success; @@ -1522,7 +1528,8 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim return err; } - if (type == SRS_TICKID_RTCP) { + // For RR and RRTR. + if (true) { ++_srs_pps_rr->sugar; if ((err = send_rtcp_rr()) != srs_success) { @@ -1536,7 +1543,8 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim } } - if (twcc_enabled_ && type == SRS_TICKID_TWCC) { + // For TWCC feedback. + if (twcc_enabled_) { ++_srs_pps_twcc->sugar; // We should not depends on the received packet, @@ -1672,7 +1680,6 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) req = NULL; cid_ = cid; stat_ = new SrsRtcConnectionStatistic(); - timer_ = new SrsHourGlass("conn", this, 20 * SRS_UTIME_MILLISECONDS); hijacker_ = NULL; sendonly_skt = NULL; @@ -1699,10 +1706,10 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) SrsRtcConnection::~SrsRtcConnection() { + _srs_hybrid->timer20ms()->unsubscribe(this); + _srs_rtc_manager->unsubscribe(this); - srs_freep(timer_); - // Cleanup publishers. for(map::iterator it = publishers_.begin(); it != publishers_.end(); ++it) { SrsRtcPublishStream* publisher = it->second; @@ -1946,13 +1953,8 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st return srs_error_wrap(err, "init"); } - if ((err = timer_->tick(SRS_TICKID_SEND_NACKS, 20 * SRS_UTIME_MILLISECONDS)) != srs_success) { - return srs_error_wrap(err, "tick nack"); - } - - if ((err = timer_->start()) != srs_success) { - return srs_error_wrap(err, "start timer"); - } + // The RTC connection start a timer, handle nacks. + _srs_hybrid->timer20ms()->subscribe(this); // TODO: FIXME: Support reload. session_timeout = _srs_config->get_rtc_stun_timeout(req->vhost); @@ -2326,25 +2328,23 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt) sendonly_skt = addr_cache; } -srs_error_t SrsRtcConnection::notify(int type, srs_utime_t interval, srs_utime_t tick) +srs_error_t SrsRtcConnection::on_timer(srs_utime_t interval) { srs_error_t err = srs_success; ++_srs_pps_conn->sugar; // For publisher to send NACK. - if (type == SRS_TICKID_SEND_NACKS) { - // TODO: FIXME: Merge with hybrid system clock. - srs_update_system_time(); + // TODO: FIXME: Merge with hybrid system clock. + srs_update_system_time(); - std::map::iterator it; - for (it = publishers_.begin(); it != publishers_.end(); it++) { - SrsRtcPublishStream* publisher = it->second; + std::map::iterator it; + for (it = publishers_.begin(); it != publishers_.end(); it++) { + SrsRtcPublishStream* publisher = it->second; - if ((err = publisher->check_send_nacks()) != srs_success) { - srs_warn("ignore nack err %s", srs_error_desc(err).c_str()); - srs_freep(err); - } + if ((err = publisher->check_send_nacks()) != srs_success) { + srs_warn("ignore nack err %s", srs_error_desc(err).c_str()); + srs_freep(err); } } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 0f219f58b..ab3d0a4e0 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -65,6 +65,7 @@ class SrsErrorPithyPrint; class SrsPithyPrint; class SrsStatistic; class SrsRtcUserConfig; +class SrsRtcSendTrack; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -211,8 +212,8 @@ public: }; // A RTC play stream, client pull and play stream from SRS. -class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler - , virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback +class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler + , public ISrsFastTimer, public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback { private: SrsContextId cid_; @@ -222,12 +223,19 @@ private: private: SrsRequest* req_; SrsRtcStream* source_; - SrsHourGlass* timer_; // key: publish_ssrc, value: send track to process rtp/rtcp std::map audio_tracks_; std::map video_tracks_; // The pithy print for special stage. SrsErrorPithyPrint* nack_epp; +private: + // Fast cache for tracks. + uint32_t cache_ssrc0_; + uint32_t cache_ssrc1_; + uint32_t cache_ssrc2_; + SrsRtcSendTrack* cache_track0_; + SrsRtcSendTrack* cache_track1_; + SrsRtcSendTrack* cache_track2_; private: // For merged-write messages. int mw_msgs; @@ -261,9 +269,9 @@ private: public: // Directly set the status of track, generally for init to set the default value. void set_all_tracks_status(bool status); -// interface ISrsHourGlass -public: - virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval); public: srs_error_t on_rtcp(SrsRtcpCommon* rtcp); private: @@ -278,12 +286,11 @@ public: }; // A RTC publish stream, client push and publish stream to SRS. -class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler - , virtual public ISrsRtcPublishStream, virtual public ISrsRtcPLIWorkerHandler +class SrsRtcPublishStream : public ISrsFastTimer, public ISrsRtpPacketDecodeHandler + , public ISrsRtcPublishStream, public ISrsRtcPLIWorkerHandler { private: SrsContextId cid_; - SrsHourGlass* timer_; uint64_t nn_audio_frames; SrsRtcPLIWorker* pli_worker_; SrsErrorPithyPrint* twcc_epp_; @@ -346,9 +353,9 @@ private: public: void request_keyframe(uint32_t ssrc); virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); -// interface ISrsHourGlass -public: - virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval); public: void simulate_nack_drop(int nn); private: @@ -393,8 +400,7 @@ public: // // For performance, we use non-virtual public from resource, // see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a -class SrsRtcConnection : public ISrsResource - , virtual public ISrsHourGlass, virtual public ISrsDisposingHandler +class SrsRtcConnection : public ISrsResource, public ISrsFastTimer, public ISrsDisposingHandler { friend class SrsSecurityTransport; friend class SrsRtcPlayStream; @@ -407,7 +413,6 @@ private: SrsRtcServer* server_; SrsRtcConnectionStateType state_; ISrsRtcTransport* transport_; - SrsHourGlass* timer_; private: iovec* cache_iov_; SrsBuffer* cache_buffer_; @@ -506,9 +511,9 @@ public: bool is_alive(); void alive(); void update_sendonly_socket(SrsUdpMuxSocket* skt); -// interface ISrsHourGlass -public: - virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval); public: // send rtcp srs_error_t send_rtcp(char *data, int nb_data); diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 0f83bc9a6..1fa107b60 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -281,7 +281,7 @@ srs_error_t SrsRtcServer::initialize() srs_error_t err = srs_success; // The RTC server start a timer, do routines of RTC server. - _srs_hybrid->timer()->subscribe(5 * SRS_UTIME_SECONDS, this); + _srs_hybrid->timer5s()->subscribe(this); // Initialize the black hole. if ((err = _srs_blackhole->initialize()) != srs_success) { @@ -633,7 +633,7 @@ SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& user return dynamic_cast(conn); } -srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick) +srs_error_t SrsRtcServer::on_timer(srs_utime_t interval) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index ef3064dbb..d302ad64b 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -138,7 +138,7 @@ public: SrsRtcConnection* find_session_by_username(const std::string& ufrag); // interface ISrsFastTimer private: - srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); + srs_error_t on_timer(srs_utime_t interval); }; // The RTC server adapter. diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 344d42168..8ecabee05 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -346,6 +346,8 @@ SrsRtcStream::SrsRtcStream() req = NULL; bridger_ = NULL; + + pli_for_rtmp_ = pli_elapsed_ = 0; } SrsRtcStream::~SrsRtcStream() @@ -498,8 +500,8 @@ srs_error_t SrsRtcStream::on_publish() } // For SrsRtcStream::on_timer() - srs_utime_t pli_for_rtmp = _srs_config->get_rtc_pli_for_rtmp(req->vhost); - _srs_hybrid->timer()->subscribe(pli_for_rtmp, this); + pli_for_rtmp_ = _srs_config->get_rtc_pli_for_rtmp(req->vhost); + _srs_hybrid->timer100ms()->subscribe(this); } // TODO: FIXME: Handle by statistic. @@ -532,7 +534,7 @@ void SrsRtcStream::on_unpublish() //free bridger resource if (bridger_) { // For SrsRtcStream::on_timer() - _srs_hybrid->timer()->unsubscribe(this); + _srs_hybrid->timer100ms()->unsubscribe(this); bridger_->on_unpublish(); srs_freep(bridger_); @@ -626,7 +628,7 @@ std::vector SrsRtcStream::get_track_desc(std::string ty return track_descs; } -srs_error_t SrsRtcStream::on_timer(srs_utime_t interval, srs_utime_t tick) +srs_error_t SrsRtcStream::on_timer(srs_utime_t interval) { srs_error_t err = srs_success; @@ -634,6 +636,14 @@ srs_error_t SrsRtcStream::on_timer(srs_utime_t interval, srs_utime_t tick) return err; } + pli_elapsed_ += interval; + if (pli_elapsed_ < pli_for_rtmp_) { + return err; + } + + // Request PLI and reset the timer. + pli_elapsed_ = 0; + for (int i = 0; i < (int)stream_desc_->video_track_descs_.size(); i++) { SrsRtcTrackDescription* desc = stream_desc_->video_track_descs_.at(i); publish_stream_->request_keyframe(desc->ssrc_); @@ -1530,7 +1540,7 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); if (stap_payload) { - for (int j = 0; j < stap_payload->nalus.size(); ++j) { + for (int j = 0; j < (int)stap_payload->nalus.size(); ++j) { SrsSample* sample = stap_payload->nalus.at(j); nb_payload += 4 + sample->size; } @@ -1594,7 +1604,7 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); if (stap_payload) { - for (int j = 0; j < stap_payload->nalus.size(); ++j) { + for (int j = 0; j < (int)stap_payload->nalus.size(); ++j) { SrsSample* sample = stap_payload->nalus.at(j); payload.write_4bytes(sample->size); payload.write_bytes(sample->bytes, sample->size); diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 2543daa6f..19172304b 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -201,6 +201,10 @@ private: bool is_delivering_packets_; // Notify stream event to event handler std::vector event_handlers_; +private: + // The PLI for RTC2RTMP. + srs_utime_t pli_for_rtmp_; + srs_utime_t pli_elapsed_; public: SrsRtcStream(); virtual ~SrsRtcStream(); @@ -252,7 +256,7 @@ public: std::vector get_track_desc(std::string type, std::string media_type); // interface ISrsFastTimer private: - srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); + srs_error_t on_timer(srs_utime_t interval); }; // A helper class, to release the packet to cache. diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 90e4226c5..4c287f878 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -26,6 +26,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 100 +#define VERSION_REVISION 102 #endif