1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 03:41:55 +00:00

SquashSRS4: Refine performance for FastTimer

This commit is contained in:
winlin 2021-05-07 11:25:37 +08:00
parent f995bf7ca8
commit fd6c653d3c
12 changed files with 216 additions and 164 deletions

View file

@ -182,6 +182,8 @@ The ports used by SRS:
## V4 changes
* v4.0, 2021-05-07, RTC: Refine play stream find track. 4.0.102
* v4.0, 2021-05-07, RTC: Refine FastTimer to fixed interval. 4.0.101
* v4.0, 2021-05-06, RTC: Fix config bug for nack and twcc. 4.0.99
* v4.0, 2021-05-04, Add video room demo. 4.0.98
* v4.0, 2021-05-03, Add RTC stream merging demo by FFmpeg. 4.0.97

View file

@ -23,6 +23,7 @@
#include <srs_app_hourglass.hpp>
#include <algorithm>
using namespace std;
#include <srs_kernel_error.hpp>
@ -147,71 +148,60 @@ ISrsFastTimer::~ISrsFastTimer()
{
}
SrsFastTimer::SrsFastTimer(std::string label, srs_utime_t resolution)
SrsFastTimer::SrsFastTimer(std::string label, srs_utime_t interval)
{
timer_ = new SrsHourGlass(label, this, resolution);
interval_ = interval;
trd_ = new SrsSTCoroutine(label, this, _srs_context->get_id());
}
SrsFastTimer::~SrsFastTimer()
{
srs_freep(timer_);
srs_freep(trd_);
}
srs_error_t SrsFastTimer::start()
{
srs_error_t err = srs_success;
if ((err = timer_->start()) != srs_success) {
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
return err;
}
void SrsFastTimer::subscribe(srs_utime_t interval, ISrsFastTimer* timer)
void SrsFastTimer::subscribe(ISrsFastTimer* timer)
{
static int g_event = 0;
int event = g_event++;
// TODO: FIXME: Error leak. Change tick to void in future.
timer_->tick(event, interval);
handlers_[event] = timer;
if (std::find(handlers_.begin(), handlers_.end(), timer) == handlers_.end()) {
handlers_.push_back(timer);
}
}
void SrsFastTimer::unsubscribe(ISrsFastTimer* timer)
{
for (map<int, ISrsFastTimer*>::iterator it = handlers_.begin(); it != handlers_.end();) {
if (it->second != timer) {
++it;
continue;
}
handlers_.erase(it++);
int event = it->first;
timer_->untick(event);
vector<ISrsFastTimer*>::iterator it = std::find(handlers_.begin(), handlers_.end(), timer);
if (it != handlers_.end()) {
handlers_.erase(it);
}
}
srs_error_t SrsFastTimer::notify(int event, srs_utime_t interval, srs_utime_t tick)
srs_error_t SrsFastTimer::cycle()
{
srs_error_t err = srs_success;
for (map<int, ISrsFastTimer*>::iterator it = handlers_.begin(); it != handlers_.end(); ++it) {
ISrsFastTimer* timer = it->second;
if (event != it->first) {
continue;
while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "quit");
}
if ((err = timer->on_timer(interval, tick)) != srs_success) {
return srs_error_wrap(err, "tick for event=%d, interval=%dms, tick=%dms",
event, srsu2msi(interval), srsu2msi(tick));
for (int i = 0; i < (int)handlers_.size(); i++) {
ISrsFastTimer* timer = handlers_.at(i);
if ((err = timer->on_timer(interval_)) != srs_success) {
srs_freep(err); // Ignore any error for shared timer.
}
}
break;
srs_usleep(interval_);
}
return err;
@ -225,7 +215,7 @@ SrsClockWallMonitor::~SrsClockWallMonitor()
{
}
srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick)
srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

View file

@ -30,6 +30,7 @@
#include <map>
#include <string>
#include <vector>
class SrsCoroutine;
@ -113,28 +114,31 @@ public:
virtual ~ISrsFastTimer();
public:
// Tick when timer is active.
virtual srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick) = 0;
virtual srs_error_t on_timer(srs_utime_t interval) = 0;
};
// The fast timer, shared by objects, for high performance.
// For example, we should never start a timer for each connection or publisher or player,
// instead, we should start only one fast timer in server.
class SrsFastTimer : public ISrsHourGlass
class SrsFastTimer : public ISrsCoroutineHandler
{
private:
SrsHourGlass* timer_;
std::map<int, ISrsFastTimer*> handlers_;
SrsCoroutine* trd_;
srs_utime_t interval_;
std::vector<ISrsFastTimer*> handlers_;
public:
SrsFastTimer(std::string label, srs_utime_t resolution);
SrsFastTimer(std::string label, srs_utime_t interval);
virtual ~SrsFastTimer();
public:
srs_error_t start();
public:
void subscribe(srs_utime_t interval, ISrsFastTimer* timer);
void subscribe(ISrsFastTimer* timer);
void unsubscribe(ISrsFastTimer* timer);
// Interface ISrsHourGlass
// Interface ISrsCoroutineHandler
private:
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
// Cycle the hourglass, which will sleep resolution every time.
// and call handler when ticked.
virtual srs_error_t cycle();
};
// To monitor the system wall clock timer deviation.
@ -145,7 +149,7 @@ public:
virtual ~SrsClockWallMonitor();
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
srs_error_t on_timer(srs_utime_t interval);
};
#endif

View file

@ -141,7 +141,8 @@ SrsHybridServer::SrsHybridServer()
{
// Note that the timer depends on other global variables,
// so we MUST never create it in constructor.
timer_ = NULL;
timer20ms_ = NULL;
timer5s_ = NULL;
clock_monitor_ = new SrsClockWallMonitor();
}
@ -149,7 +150,8 @@ SrsHybridServer::SrsHybridServer()
SrsHybridServer::~SrsHybridServer()
{
srs_freep(clock_monitor_);
srs_freep(timer_);
srs_freep(timer20ms_);
srs_freep(timer5s_);
vector<ISrsHybridServer*>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
@ -174,19 +176,33 @@ srs_error_t SrsHybridServer::initialize()
}
// Create global shared timer.
timer_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS);
timer20ms_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS);
timer100ms_ = new SrsFastTimer("hybrid", 100 * SRS_UTIME_MILLISECONDS);
timer1s_ = new SrsFastTimer("hybrid", 1 * SRS_UTIME_SECONDS);
timer5s_ = new SrsFastTimer("hybrid", 5 * SRS_UTIME_SECONDS);
// Start the timer first.
if ((err = timer_->start()) != srs_success) {
if ((err = timer20ms_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
// The hybrid server start a timer, do routines of hybrid server.
timer_->subscribe(5 * SRS_UTIME_SECONDS, this);
if ((err = timer100ms_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
// A monitor to check the clock wall deviation, per clock tick.
timer_->subscribe(20 * SRS_UTIME_MILLISECONDS, clock_monitor_);
if ((err = timer1s_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
if ((err = timer5s_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
// Register some timers.
timer20ms_->subscribe(clock_monitor_);
timer5s_->subscribe(this);
// Initialize all hybrid servers.
vector<ISrsHybridServer*>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer* server = *it;
@ -237,12 +253,27 @@ SrsServerAdapter* SrsHybridServer::srs()
return NULL;
}
SrsFastTimer* SrsHybridServer::timer()
SrsFastTimer* SrsHybridServer::timer20ms()
{
return timer_;
return timer20ms_;
}
srs_error_t SrsHybridServer::on_timer(srs_utime_t interval, srs_utime_t tick)
SrsFastTimer* SrsHybridServer::timer100ms()
{
return timer100ms_;
}
SrsFastTimer* SrsHybridServer::timer1s()
{
return timer1s_;
}
SrsFastTimer* SrsHybridServer::timer5s()
{
return timer5s_;
}
srs_error_t SrsHybridServer::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

View file

@ -53,7 +53,10 @@ class SrsHybridServer : public ISrsFastTimer
{
private:
std::vector<ISrsHybridServer*> servers;
SrsFastTimer* timer_;
SrsFastTimer* timer20ms_;
SrsFastTimer* timer100ms_;
SrsFastTimer* timer1s_;
SrsFastTimer* timer5s_;
SrsClockWallMonitor* clock_monitor_;
public:
SrsHybridServer();
@ -66,10 +69,13 @@ public:
virtual void stop();
public:
virtual SrsServerAdapter* srs();
SrsFastTimer* timer();
SrsFastTimer* timer20ms();
SrsFastTimer* timer100ms();
SrsFastTimer* timer1s();
SrsFastTimer* timer5s();
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
srs_error_t on_timer(srs_utime_t interval);
};
extern SrsHybridServer* _srs_hybrid;

View file

@ -76,10 +76,6 @@ extern SrsPps* _srs_pps_snack2;
extern SrsPps* _srs_pps_rnack;
extern SrsPps* _srs_pps_rnack2;
#define SRS_TICKID_RTCP 0
#define SRS_TICKID_TWCC 1
#define SRS_TICKID_SEND_NACKS 2
ISrsRtcTransport::ISrsRtcTransport()
{
}
@ -384,13 +380,17 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)
nack_no_copy_ = false;
_srs_config->subscribe(this);
timer_ = new SrsHourGlass("play", this, 1000 * SRS_UTIME_MILLISECONDS);
nack_epp = new SrsErrorPithyPrint();
pli_worker_ = new SrsRtcPLIWorker(this);
cache_ssrc0_ = cache_ssrc1_ = cache_ssrc2_ = 0;
cache_track0_ = cache_track1_ = cache_track2_ = NULL;
}
SrsRtcPlayStream::~SrsRtcPlayStream()
{
_srs_hybrid->timer1s()->unsubscribe(this);
// TODO: FIXME: Should not do callback in de-constructor?
if (_srs_rtc_hijacker) {
_srs_rtc_hijacker->on_stop_play(session_, this, req_);
@ -401,7 +401,6 @@ SrsRtcPlayStream::~SrsRtcPlayStream()
srs_freep(nack_epp);
srs_freep(pli_worker_);
srs_freep(trd_);
srs_freep(timer_);
srs_freep(req_);
if (true) {
@ -532,9 +531,8 @@ srs_error_t SrsRtcPlayStream::start()
return srs_error_wrap(err, "rtc_sender");
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
// The timer for play, process TWCC in the future.
_srs_hybrid->timer1s()->subscribe(this);
if ((err = pli_worker_->start()) != srs_success) {
return srs_error_wrap(err, "start pli worker");
@ -629,44 +627,62 @@ srs_error_t SrsRtcPlayStream::send_packet(SrsRtpPacket2*& pkt)
{
srs_error_t err = srs_success;
// TODO: FIXME: Maybe refine for performance issue.
if (!audio_tracks_.count(pkt->header.get_ssrc()) && !video_tracks_.count(pkt->header.get_ssrc())) {
srs_warn("RTC: Drop for ssrc %u not found", pkt->header.get_ssrc());
uint32_t ssrc = pkt->header.get_ssrc();
// Try to find track from cache.
SrsRtcSendTrack* track = NULL;
if (cache_ssrc0_ == ssrc) {
track = cache_track0_;
} else if (cache_ssrc1_ == ssrc) {
track = cache_track1_;
} else if (cache_ssrc2_ == ssrc) {
track = cache_track2_;
}
// Find by original tracks and build fast cache.
if (!track) {
if (pkt->is_audio()) {
map<uint32_t, SrsRtcAudioSendTrack*>::iterator it = audio_tracks_.find(ssrc);
if (it != audio_tracks_.end()) {
track = it->second;
}
} else {
map<uint32_t, SrsRtcVideoSendTrack*>::iterator it = video_tracks_.find(ssrc);
if (it != video_tracks_.end()) {
track = it->second;
}
}
if (track && !cache_ssrc2_) {
if (!cache_ssrc0_) {
cache_ssrc0_ = ssrc;
cache_track0_ = track;
} else if (!cache_ssrc1_) {
cache_ssrc1_ = ssrc;
cache_track1_ = track;
} else if (!cache_ssrc2_) {
cache_ssrc2_ = ssrc;
cache_track2_ = track;
}
}
}
// Ignore if no track found.
if (!track) {
srs_warn("RTC: Drop for ssrc %u not found", ssrc);
return err;
}
// For audio, we transcoded AAC to opus in extra payloads.
SrsRtcAudioSendTrack* audio_track = NULL;
SrsRtcVideoSendTrack* video_track = NULL;
if (pkt->is_audio()) {
// TODO: FIXME: Any simple solution?
audio_track = audio_tracks_[pkt->header.get_ssrc()];
if ((err = audio_track->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "audio track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence());
}
// TODO: FIXME: Padding audio to the max payload in RTP packets.
} else {
// TODO: FIXME: Any simple solution?
video_track = video_tracks_[pkt->header.get_ssrc()];
if ((err = video_track->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "video track, SSRC=%u, SEQ=%u", pkt->header.get_ssrc(), pkt->header.get_sequence());
}
// Consume packet by track.
if ((err = track->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "audio track, SSRC=%u, SEQ=%u", ssrc, pkt->header.get_sequence());
}
// For NACK to handle packet.
// @remark Note that the pkt might be set to NULL.
if (nack_enabled_) {
if (audio_track) {
if ((err = audio_track->on_nack(&pkt)) != srs_success) {
return srs_error_wrap(err, "on nack");
}
} else if (video_track) {
if ((err = video_track->on_nack(&pkt)) != srs_success) {
return srs_error_wrap(err, "on nack");
}
if ((err = track->on_nack(&pkt)) != srs_success) {
return srs_error_wrap(err, "on nack");
}
}
@ -702,7 +718,7 @@ void SrsRtcPlayStream::set_all_tracks_status(bool status)
srs_trace("RTC: Init tracks %s ok", merged_log.str().c_str());
}
srs_error_t SrsRtcPlayStream::notify(int type, srs_utime_t interval, srs_utime_t tick)
srs_error_t SrsRtcPlayStream::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;
@ -875,8 +891,6 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid)
{
timer_ = new SrsHourGlass("publish", this, 100 * SRS_UTIME_MILLISECONDS);
cid_ = cid;
is_started = false;
session_ = session;
@ -902,6 +916,8 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon
SrsRtcPublishStream::~SrsRtcPublishStream()
{
_srs_hybrid->timer100ms()->unsubscribe(this);
// TODO: FIXME: Should remove and delete source.
if (source) {
source->set_publish_stream(NULL);
@ -928,7 +944,6 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
}
audio_tracks_.clear();
srs_freep(timer_);
srs_freep(pli_worker_);
srs_freep(twcc_epp_);
srs_freep(pli_epp);
@ -1037,17 +1052,8 @@ srs_error_t SrsRtcPublishStream::start()
return err;
}
if ((err = timer_->tick(SRS_TICKID_TWCC, 100 * SRS_UTIME_MILLISECONDS)) != srs_success) {
return srs_error_wrap(err, "twcc tick");
}
if ((err = timer_->tick(SRS_TICKID_RTCP, 1000 * SRS_UTIME_MILLISECONDS)) != srs_success) {
return srs_error_wrap(err, "rtcp tick");
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
// For publisher timer, such as TWCC and RR.
_srs_hybrid->timer100ms()->subscribe(this);
if ((err = source->on_publish()) != srs_success) {
return srs_error_wrap(err, "on publish");
@ -1512,7 +1518,7 @@ srs_error_t SrsRtcPublishStream::do_request_keyframe(uint32_t ssrc, SrsContextId
return err;
}
srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utime_t tick)
srs_error_t SrsRtcPublishStream::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;
@ -1522,7 +1528,8 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim
return err;
}
if (type == SRS_TICKID_RTCP) {
// For RR and RRTR.
if (true) {
++_srs_pps_rr->sugar;
if ((err = send_rtcp_rr()) != srs_success) {
@ -1536,7 +1543,8 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim
}
}
if (twcc_enabled_ && type == SRS_TICKID_TWCC) {
// For TWCC feedback.
if (twcc_enabled_) {
++_srs_pps_twcc->sugar;
// We should not depends on the received packet,
@ -1672,7 +1680,6 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
req = NULL;
cid_ = cid;
stat_ = new SrsRtcConnectionStatistic();
timer_ = new SrsHourGlass("conn", this, 20 * SRS_UTIME_MILLISECONDS);
hijacker_ = NULL;
sendonly_skt = NULL;
@ -1699,10 +1706,10 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
SrsRtcConnection::~SrsRtcConnection()
{
_srs_hybrid->timer20ms()->unsubscribe(this);
_srs_rtc_manager->unsubscribe(this);
srs_freep(timer_);
// Cleanup publishers.
for(map<string, SrsRtcPublishStream*>::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
SrsRtcPublishStream* publisher = it->second;
@ -1946,13 +1953,8 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
return srs_error_wrap(err, "init");
}
if ((err = timer_->tick(SRS_TICKID_SEND_NACKS, 20 * SRS_UTIME_MILLISECONDS)) != srs_success) {
return srs_error_wrap(err, "tick nack");
}
if ((err = timer_->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
// The RTC connection start a timer, handle nacks.
_srs_hybrid->timer20ms()->subscribe(this);
// TODO: FIXME: Support reload.
session_timeout = _srs_config->get_rtc_stun_timeout(req->vhost);
@ -2326,25 +2328,23 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt)
sendonly_skt = addr_cache;
}
srs_error_t SrsRtcConnection::notify(int type, srs_utime_t interval, srs_utime_t tick)
srs_error_t SrsRtcConnection::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;
++_srs_pps_conn->sugar;
// For publisher to send NACK.
if (type == SRS_TICKID_SEND_NACKS) {
// TODO: FIXME: Merge with hybrid system clock.
srs_update_system_time();
// TODO: FIXME: Merge with hybrid system clock.
srs_update_system_time();
std::map<std::string, SrsRtcPublishStream*>::iterator it;
for (it = publishers_.begin(); it != publishers_.end(); it++) {
SrsRtcPublishStream* publisher = it->second;
std::map<std::string, SrsRtcPublishStream*>::iterator it;
for (it = publishers_.begin(); it != publishers_.end(); it++) {
SrsRtcPublishStream* publisher = it->second;
if ((err = publisher->check_send_nacks()) != srs_success) {
srs_warn("ignore nack err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
if ((err = publisher->check_send_nacks()) != srs_success) {
srs_warn("ignore nack err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
}

View file

@ -65,6 +65,7 @@ class SrsErrorPithyPrint;
class SrsPithyPrint;
class SrsStatistic;
class SrsRtcUserConfig;
class SrsRtcSendTrack;
const uint8_t kSR = 200;
const uint8_t kRR = 201;
@ -211,8 +212,8 @@ public:
};
// A RTC play stream, client pull and play stream from SRS.
class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
, virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback
class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler
, public ISrsFastTimer, public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback
{
private:
SrsContextId cid_;
@ -222,12 +223,19 @@ private:
private:
SrsRequest* req_;
SrsRtcStream* source_;
SrsHourGlass* timer_;
// key: publish_ssrc, value: send track to process rtp/rtcp
std::map<uint32_t, SrsRtcAudioSendTrack*> audio_tracks_;
std::map<uint32_t, SrsRtcVideoSendTrack*> video_tracks_;
// The pithy print for special stage.
SrsErrorPithyPrint* nack_epp;
private:
// Fast cache for tracks.
uint32_t cache_ssrc0_;
uint32_t cache_ssrc1_;
uint32_t cache_ssrc2_;
SrsRtcSendTrack* cache_track0_;
SrsRtcSendTrack* cache_track1_;
SrsRtcSendTrack* cache_track2_;
private:
// For merged-write messages.
int mw_msgs;
@ -261,9 +269,9 @@ private:
public:
// Directly set the status of track, generally for init to set the default value.
void set_all_tracks_status(bool status);
// interface ISrsHourGlass
public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
public:
srs_error_t on_rtcp(SrsRtcpCommon* rtcp);
private:
@ -278,12 +286,11 @@ public:
};
// A RTC publish stream, client push and publish stream to SRS.
class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler
, virtual public ISrsRtcPublishStream, virtual public ISrsRtcPLIWorkerHandler
class SrsRtcPublishStream : public ISrsFastTimer, public ISrsRtpPacketDecodeHandler
, public ISrsRtcPublishStream, public ISrsRtcPLIWorkerHandler
{
private:
SrsContextId cid_;
SrsHourGlass* timer_;
uint64_t nn_audio_frames;
SrsRtcPLIWorker* pli_worker_;
SrsErrorPithyPrint* twcc_epp_;
@ -346,9 +353,9 @@ private:
public:
void request_keyframe(uint32_t ssrc);
virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid);
// interface ISrsHourGlass
public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
public:
void simulate_nack_drop(int nn);
private:
@ -393,8 +400,7 @@ public:
//
// For performance, we use non-virtual public from resource,
// see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a
class SrsRtcConnection : public ISrsResource
, virtual public ISrsHourGlass, virtual public ISrsDisposingHandler
class SrsRtcConnection : public ISrsResource, public ISrsFastTimer, public ISrsDisposingHandler
{
friend class SrsSecurityTransport;
friend class SrsRtcPlayStream;
@ -407,7 +413,6 @@ private:
SrsRtcServer* server_;
SrsRtcConnectionStateType state_;
ISrsRtcTransport* transport_;
SrsHourGlass* timer_;
private:
iovec* cache_iov_;
SrsBuffer* cache_buffer_;
@ -506,9 +511,9 @@ public:
bool is_alive();
void alive();
void update_sendonly_socket(SrsUdpMuxSocket* skt);
// interface ISrsHourGlass
public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval);
public:
// send rtcp
srs_error_t send_rtcp(char *data, int nb_data);

View file

@ -281,7 +281,7 @@ srs_error_t SrsRtcServer::initialize()
srs_error_t err = srs_success;
// The RTC server start a timer, do routines of RTC server.
_srs_hybrid->timer()->subscribe(5 * SRS_UTIME_SECONDS, this);
_srs_hybrid->timer5s()->subscribe(this);
// Initialize the black hole.
if ((err = _srs_blackhole->initialize()) != srs_success) {
@ -633,7 +633,7 @@ SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& user
return dynamic_cast<SrsRtcConnection*>(conn);
}
srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick)
srs_error_t SrsRtcServer::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

View file

@ -138,7 +138,7 @@ public:
SrsRtcConnection* find_session_by_username(const std::string& ufrag);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
srs_error_t on_timer(srs_utime_t interval);
};
// The RTC server adapter.

View file

@ -346,6 +346,8 @@ SrsRtcStream::SrsRtcStream()
req = NULL;
bridger_ = NULL;
pli_for_rtmp_ = pli_elapsed_ = 0;
}
SrsRtcStream::~SrsRtcStream()
@ -498,8 +500,8 @@ srs_error_t SrsRtcStream::on_publish()
}
// For SrsRtcStream::on_timer()
srs_utime_t pli_for_rtmp = _srs_config->get_rtc_pli_for_rtmp(req->vhost);
_srs_hybrid->timer()->subscribe(pli_for_rtmp, this);
pli_for_rtmp_ = _srs_config->get_rtc_pli_for_rtmp(req->vhost);
_srs_hybrid->timer100ms()->subscribe(this);
}
// TODO: FIXME: Handle by statistic.
@ -532,7 +534,7 @@ void SrsRtcStream::on_unpublish()
//free bridger resource
if (bridger_) {
// For SrsRtcStream::on_timer()
_srs_hybrid->timer()->unsubscribe(this);
_srs_hybrid->timer100ms()->unsubscribe(this);
bridger_->on_unpublish();
srs_freep(bridger_);
@ -626,7 +628,7 @@ std::vector<SrsRtcTrackDescription*> SrsRtcStream::get_track_desc(std::string ty
return track_descs;
}
srs_error_t SrsRtcStream::on_timer(srs_utime_t interval, srs_utime_t tick)
srs_error_t SrsRtcStream::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;
@ -634,6 +636,14 @@ srs_error_t SrsRtcStream::on_timer(srs_utime_t interval, srs_utime_t tick)
return err;
}
pli_elapsed_ += interval;
if (pli_elapsed_ < pli_for_rtmp_) {
return err;
}
// Request PLI and reset the timer.
pli_elapsed_ = 0;
for (int i = 0; i < (int)stream_desc_->video_track_descs_.size(); i++) {
SrsRtcTrackDescription* desc = stream_desc_->video_track_descs_.at(i);
publish_stream_->request_keyframe(desc->ssrc_);
@ -1530,7 +1540,7 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const
SrsRtpSTAPPayload* stap_payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload());
if (stap_payload) {
for (int j = 0; j < stap_payload->nalus.size(); ++j) {
for (int j = 0; j < (int)stap_payload->nalus.size(); ++j) {
SrsSample* sample = stap_payload->nalus.at(j);
nb_payload += 4 + sample->size;
}
@ -1594,7 +1604,7 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const
SrsRtpSTAPPayload* stap_payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload());
if (stap_payload) {
for (int j = 0; j < stap_payload->nalus.size(); ++j) {
for (int j = 0; j < (int)stap_payload->nalus.size(); ++j) {
SrsSample* sample = stap_payload->nalus.at(j);
payload.write_4bytes(sample->size);
payload.write_bytes(sample->bytes, sample->size);

View file

@ -201,6 +201,10 @@ private:
bool is_delivering_packets_;
// Notify stream event to event handler
std::vector<ISrsRtcStreamEventHandler*> event_handlers_;
private:
// The PLI for RTC2RTMP.
srs_utime_t pli_for_rtmp_;
srs_utime_t pli_elapsed_;
public:
SrsRtcStream();
virtual ~SrsRtcStream();
@ -252,7 +256,7 @@ public:
std::vector<SrsRtcTrackDescription*> get_track_desc(std::string type, std::string media_type);
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
srs_error_t on_timer(srs_utime_t interval);
};
// A helper class, to release the packet to cache.

View file

@ -26,6 +26,6 @@
#define VERSION_MAJOR 4
#define VERSION_MINOR 0
#define VERSION_REVISION 100
#define VERSION_REVISION 102
#endif