mirror of
https://github.com/ossrs/srs.git
synced 2025-02-15 04:42:04 +00:00
RTC: Refine FastTimer to fixed interval. 4.0.101
This commit is contained in:
parent
0c0b467917
commit
b823dcdfd7
12 changed files with 155 additions and 131 deletions
|
@ -176,6 +176,7 @@ The ports used by SRS:
|
||||||
|
|
||||||
## V4 changes
|
## V4 changes
|
||||||
|
|
||||||
|
* v4.0, 2021-05-07, RTC: Refine FastTimer to fixed interval. 4.0.101
|
||||||
* v4.0, 2021-05-06, RTC: Fix config bug for nack and twcc. 4.0.99
|
* v4.0, 2021-05-06, RTC: Fix config bug for nack and twcc. 4.0.99
|
||||||
* v4.0, 2021-05-04, Add video room demo. 4.0.98
|
* v4.0, 2021-05-04, Add video room demo. 4.0.98
|
||||||
* v4.0, 2021-05-03, Add RTC stream merging demo by FFmpeg. 4.0.97
|
* v4.0, 2021-05-03, Add RTC stream merging demo by FFmpeg. 4.0.97
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
|
|
||||||
#include <srs_app_hourglass.hpp>
|
#include <srs_app_hourglass.hpp>
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
#include <srs_kernel_error.hpp>
|
#include <srs_kernel_error.hpp>
|
||||||
|
@ -147,71 +148,60 @@ ISrsFastTimer::~ISrsFastTimer()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsFastTimer::SrsFastTimer(std::string label, srs_utime_t resolution)
|
SrsFastTimer::SrsFastTimer(std::string label, srs_utime_t interval)
|
||||||
{
|
{
|
||||||
timer_ = new SrsHourGlass(label, this, resolution);
|
interval_ = interval;
|
||||||
|
trd_ = new SrsSTCoroutine(label, this, _srs_context->get_id());
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsFastTimer::~SrsFastTimer()
|
SrsFastTimer::~SrsFastTimer()
|
||||||
{
|
{
|
||||||
srs_freep(timer_);
|
srs_freep(trd_);
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsFastTimer::start()
|
srs_error_t SrsFastTimer::start()
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
if ((err = timer_->start()) != srs_success) {
|
if ((err = trd_->start()) != srs_success) {
|
||||||
return srs_error_wrap(err, "start timer");
|
return srs_error_wrap(err, "start timer");
|
||||||
}
|
}
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SrsFastTimer::subscribe(srs_utime_t interval, ISrsFastTimer* timer)
|
void SrsFastTimer::subscribe(ISrsFastTimer* timer)
|
||||||
{
|
{
|
||||||
static int g_event = 0;
|
if (std::find(handlers_.begin(), handlers_.end(), timer) == handlers_.end()) {
|
||||||
|
handlers_.push_back(timer);
|
||||||
int event = g_event++;
|
}
|
||||||
|
|
||||||
// TODO: FIXME: Error leak. Change tick to void in future.
|
|
||||||
timer_->tick(event, interval);
|
|
||||||
|
|
||||||
handlers_[event] = timer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void SrsFastTimer::unsubscribe(ISrsFastTimer* timer)
|
void SrsFastTimer::unsubscribe(ISrsFastTimer* timer)
|
||||||
{
|
{
|
||||||
for (map<int, ISrsFastTimer*>::iterator it = handlers_.begin(); it != handlers_.end();) {
|
vector<ISrsFastTimer*>::iterator it = std::find(handlers_.begin(), handlers_.end(), timer);
|
||||||
if (it->second != timer) {
|
if (it != handlers_.end()) {
|
||||||
++it;
|
handlers_.erase(it);
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
handlers_.erase(it++);
|
|
||||||
|
|
||||||
int event = it->first;
|
|
||||||
timer_->untick(event);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsFastTimer::notify(int event, srs_utime_t interval, srs_utime_t tick)
|
srs_error_t SrsFastTimer::cycle()
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
for (map<int, ISrsFastTimer*>::iterator it = handlers_.begin(); it != handlers_.end(); ++it) {
|
while (true) {
|
||||||
ISrsFastTimer* timer = it->second;
|
if ((err = trd_->pull()) != srs_success) {
|
||||||
|
return srs_error_wrap(err, "quit");
|
||||||
if (event != it->first) {
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((err = timer->on_timer(interval, tick)) != srs_success) {
|
for (int i = 0; i < (int)handlers_.size(); i++) {
|
||||||
return srs_error_wrap(err, "tick for event=%d, interval=%dms, tick=%dms",
|
ISrsFastTimer* timer = handlers_.at(i);
|
||||||
event, srsu2msi(interval), srsu2msi(tick));
|
if ((err = timer->on_timer(interval_)) != srs_success) {
|
||||||
|
srs_freep(err); // Ignore any error for shared timer.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
srs_usleep(interval_);
|
||||||
}
|
}
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
|
@ -225,7 +215,7 @@ SrsClockWallMonitor::~SrsClockWallMonitor()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick)
|
srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
|
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
class SrsCoroutine;
|
class SrsCoroutine;
|
||||||
|
|
||||||
|
@ -112,28 +113,31 @@ public:
|
||||||
virtual ~ISrsFastTimer();
|
virtual ~ISrsFastTimer();
|
||||||
public:
|
public:
|
||||||
// Tick when timer is active.
|
// Tick when timer is active.
|
||||||
virtual srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick) = 0;
|
virtual srs_error_t on_timer(srs_utime_t interval) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
// The fast timer, shared by objects, for high performance.
|
// The fast timer, shared by objects, for high performance.
|
||||||
// For example, we should never start a timer for each connection or publisher or player,
|
// For example, we should never start a timer for each connection or publisher or player,
|
||||||
// instead, we should start only one fast timer in server.
|
// instead, we should start only one fast timer in server.
|
||||||
class SrsFastTimer : public ISrsHourGlass
|
class SrsFastTimer : public ISrsCoroutineHandler
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
SrsHourGlass* timer_;
|
SrsCoroutine* trd_;
|
||||||
std::map<int, ISrsFastTimer*> handlers_;
|
srs_utime_t interval_;
|
||||||
|
std::vector<ISrsFastTimer*> handlers_;
|
||||||
public:
|
public:
|
||||||
SrsFastTimer(std::string label, srs_utime_t resolution);
|
SrsFastTimer(std::string label, srs_utime_t interval);
|
||||||
virtual ~SrsFastTimer();
|
virtual ~SrsFastTimer();
|
||||||
public:
|
public:
|
||||||
srs_error_t start();
|
srs_error_t start();
|
||||||
public:
|
public:
|
||||||
void subscribe(srs_utime_t interval, ISrsFastTimer* timer);
|
void subscribe(ISrsFastTimer* timer);
|
||||||
void unsubscribe(ISrsFastTimer* timer);
|
void unsubscribe(ISrsFastTimer* timer);
|
||||||
// Interface ISrsHourGlass
|
// Interface ISrsCoroutineHandler
|
||||||
private:
|
private:
|
||||||
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
|
// Cycle the hourglass, which will sleep resolution every time.
|
||||||
|
// and call handler when ticked.
|
||||||
|
virtual srs_error_t cycle();
|
||||||
};
|
};
|
||||||
|
|
||||||
// To monitor the system wall clock timer deviation.
|
// To monitor the system wall clock timer deviation.
|
||||||
|
@ -144,7 +148,7 @@ public:
|
||||||
virtual ~SrsClockWallMonitor();
|
virtual ~SrsClockWallMonitor();
|
||||||
// interface ISrsFastTimer
|
// interface ISrsFastTimer
|
||||||
private:
|
private:
|
||||||
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
|
srs_error_t on_timer(srs_utime_t interval);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -210,7 +210,8 @@ SrsHybridServer::SrsHybridServer()
|
||||||
{
|
{
|
||||||
// Note that the timer depends on other global variables,
|
// Note that the timer depends on other global variables,
|
||||||
// so we MUST never create it in constructor.
|
// so we MUST never create it in constructor.
|
||||||
timer_ = NULL;
|
timer20ms_ = NULL;
|
||||||
|
timer5s_ = NULL;
|
||||||
|
|
||||||
clock_monitor_ = new SrsClockWallMonitor();
|
clock_monitor_ = new SrsClockWallMonitor();
|
||||||
}
|
}
|
||||||
|
@ -218,7 +219,8 @@ SrsHybridServer::SrsHybridServer()
|
||||||
SrsHybridServer::~SrsHybridServer()
|
SrsHybridServer::~SrsHybridServer()
|
||||||
{
|
{
|
||||||
srs_freep(clock_monitor_);
|
srs_freep(clock_monitor_);
|
||||||
srs_freep(timer_);
|
srs_freep(timer20ms_);
|
||||||
|
srs_freep(timer5s_);
|
||||||
|
|
||||||
vector<ISrsHybridServer*>::iterator it;
|
vector<ISrsHybridServer*>::iterator it;
|
||||||
for (it = servers.begin(); it != servers.end(); ++it) {
|
for (it = servers.begin(); it != servers.end(); ++it) {
|
||||||
|
@ -243,19 +245,33 @@ srs_error_t SrsHybridServer::initialize()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create global shared timer.
|
// Create global shared timer.
|
||||||
timer_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS);
|
timer20ms_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS);
|
||||||
|
timer100ms_ = new SrsFastTimer("hybrid", 100 * SRS_UTIME_MILLISECONDS);
|
||||||
|
timer1s_ = new SrsFastTimer("hybrid", 1 * SRS_UTIME_SECONDS);
|
||||||
|
timer5s_ = new SrsFastTimer("hybrid", 5 * SRS_UTIME_SECONDS);
|
||||||
|
|
||||||
// Start the timer first.
|
// Start the timer first.
|
||||||
if ((err = timer_->start()) != srs_success) {
|
if ((err = timer20ms_->start()) != srs_success) {
|
||||||
return srs_error_wrap(err, "start timer");
|
return srs_error_wrap(err, "start timer");
|
||||||
}
|
}
|
||||||
|
|
||||||
// The hybrid server start a timer, do routines of hybrid server.
|
if ((err = timer100ms_->start()) != srs_success) {
|
||||||
timer_->subscribe(5 * SRS_UTIME_SECONDS, this);
|
return srs_error_wrap(err, "start timer");
|
||||||
|
}
|
||||||
|
|
||||||
// A monitor to check the clock wall deviation, per clock tick.
|
if ((err = timer1s_->start()) != srs_success) {
|
||||||
timer_->subscribe(20 * SRS_UTIME_MILLISECONDS, clock_monitor_);
|
return srs_error_wrap(err, "start timer");
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((err = timer5s_->start()) != srs_success) {
|
||||||
|
return srs_error_wrap(err, "start timer");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register some timers.
|
||||||
|
timer20ms_->subscribe(clock_monitor_);
|
||||||
|
timer5s_->subscribe(this);
|
||||||
|
|
||||||
|
// Initialize all hybrid servers.
|
||||||
vector<ISrsHybridServer*>::iterator it;
|
vector<ISrsHybridServer*>::iterator it;
|
||||||
for (it = servers.begin(); it != servers.end(); ++it) {
|
for (it = servers.begin(); it != servers.end(); ++it) {
|
||||||
ISrsHybridServer* server = *it;
|
ISrsHybridServer* server = *it;
|
||||||
|
@ -306,12 +322,27 @@ SrsServerAdapter* SrsHybridServer::srs()
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsFastTimer* SrsHybridServer::timer()
|
SrsFastTimer* SrsHybridServer::timer20ms()
|
||||||
{
|
{
|
||||||
return timer_;
|
return timer20ms_;
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsHybridServer::on_timer(srs_utime_t interval, srs_utime_t tick)
|
SrsFastTimer* SrsHybridServer::timer100ms()
|
||||||
|
{
|
||||||
|
return timer100ms_;
|
||||||
|
}
|
||||||
|
|
||||||
|
SrsFastTimer* SrsHybridServer::timer1s()
|
||||||
|
{
|
||||||
|
return timer1s_;
|
||||||
|
}
|
||||||
|
|
||||||
|
SrsFastTimer* SrsHybridServer::timer5s()
|
||||||
|
{
|
||||||
|
return timer5s_;
|
||||||
|
}
|
||||||
|
|
||||||
|
srs_error_t SrsHybridServer::on_timer(srs_utime_t interval)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
|
|
@ -68,7 +68,10 @@ class SrsHybridServer : public ISrsFastTimer
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
std::vector<ISrsHybridServer*> servers;
|
std::vector<ISrsHybridServer*> servers;
|
||||||
SrsFastTimer* timer_;
|
SrsFastTimer* timer20ms_;
|
||||||
|
SrsFastTimer* timer100ms_;
|
||||||
|
SrsFastTimer* timer1s_;
|
||||||
|
SrsFastTimer* timer5s_;
|
||||||
SrsClockWallMonitor* clock_monitor_;
|
SrsClockWallMonitor* clock_monitor_;
|
||||||
public:
|
public:
|
||||||
SrsHybridServer();
|
SrsHybridServer();
|
||||||
|
@ -81,10 +84,13 @@ public:
|
||||||
virtual void stop();
|
virtual void stop();
|
||||||
public:
|
public:
|
||||||
virtual SrsServerAdapter* srs();
|
virtual SrsServerAdapter* srs();
|
||||||
SrsFastTimer* timer();
|
SrsFastTimer* timer20ms();
|
||||||
|
SrsFastTimer* timer100ms();
|
||||||
|
SrsFastTimer* timer1s();
|
||||||
|
SrsFastTimer* timer5s();
|
||||||
// interface ISrsFastTimer
|
// interface ISrsFastTimer
|
||||||
private:
|
private:
|
||||||
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
|
srs_error_t on_timer(srs_utime_t interval);
|
||||||
};
|
};
|
||||||
|
|
||||||
extern SrsHybridServer* _srs_hybrid;
|
extern SrsHybridServer* _srs_hybrid;
|
||||||
|
|
|
@ -76,10 +76,6 @@ extern SrsPps* _srs_pps_snack2;
|
||||||
extern SrsPps* _srs_pps_rnack;
|
extern SrsPps* _srs_pps_rnack;
|
||||||
extern SrsPps* _srs_pps_rnack2;
|
extern SrsPps* _srs_pps_rnack2;
|
||||||
|
|
||||||
#define SRS_TICKID_RTCP 0
|
|
||||||
#define SRS_TICKID_TWCC 1
|
|
||||||
#define SRS_TICKID_SEND_NACKS 2
|
|
||||||
|
|
||||||
ISrsRtcTransport::ISrsRtcTransport()
|
ISrsRtcTransport::ISrsRtcTransport()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@ -384,13 +380,14 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)
|
||||||
nack_no_copy_ = false;
|
nack_no_copy_ = false;
|
||||||
|
|
||||||
_srs_config->subscribe(this);
|
_srs_config->subscribe(this);
|
||||||
timer_ = new SrsHourGlass("play", this, 1000 * SRS_UTIME_MILLISECONDS);
|
|
||||||
nack_epp = new SrsErrorPithyPrint();
|
nack_epp = new SrsErrorPithyPrint();
|
||||||
pli_worker_ = new SrsRtcPLIWorker(this);
|
pli_worker_ = new SrsRtcPLIWorker(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsRtcPlayStream::~SrsRtcPlayStream()
|
SrsRtcPlayStream::~SrsRtcPlayStream()
|
||||||
{
|
{
|
||||||
|
_srs_hybrid->timer1s()->unsubscribe(this);
|
||||||
|
|
||||||
// TODO: FIXME: Should not do callback in de-constructor?
|
// TODO: FIXME: Should not do callback in de-constructor?
|
||||||
if (_srs_rtc_hijacker) {
|
if (_srs_rtc_hijacker) {
|
||||||
_srs_rtc_hijacker->on_stop_play(session_, this, req_);
|
_srs_rtc_hijacker->on_stop_play(session_, this, req_);
|
||||||
|
@ -401,7 +398,6 @@ SrsRtcPlayStream::~SrsRtcPlayStream()
|
||||||
srs_freep(nack_epp);
|
srs_freep(nack_epp);
|
||||||
srs_freep(pli_worker_);
|
srs_freep(pli_worker_);
|
||||||
srs_freep(trd_);
|
srs_freep(trd_);
|
||||||
srs_freep(timer_);
|
|
||||||
srs_freep(req_);
|
srs_freep(req_);
|
||||||
|
|
||||||
if (true) {
|
if (true) {
|
||||||
|
@ -532,9 +528,8 @@ srs_error_t SrsRtcPlayStream::start()
|
||||||
return srs_error_wrap(err, "rtc_sender");
|
return srs_error_wrap(err, "rtc_sender");
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((err = timer_->start()) != srs_success) {
|
// The timer for play, process TWCC in the future.
|
||||||
return srs_error_wrap(err, "start timer");
|
_srs_hybrid->timer1s()->subscribe(this);
|
||||||
}
|
|
||||||
|
|
||||||
if ((err = pli_worker_->start()) != srs_success) {
|
if ((err = pli_worker_->start()) != srs_success) {
|
||||||
return srs_error_wrap(err, "start pli worker");
|
return srs_error_wrap(err, "start pli worker");
|
||||||
|
@ -702,7 +697,7 @@ void SrsRtcPlayStream::set_all_tracks_status(bool status)
|
||||||
srs_trace("RTC: Init tracks %s ok", merged_log.str().c_str());
|
srs_trace("RTC: Init tracks %s ok", merged_log.str().c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsRtcPlayStream::notify(int type, srs_utime_t interval, srs_utime_t tick)
|
srs_error_t SrsRtcPlayStream::on_timer(srs_utime_t interval)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
@ -875,8 +870,6 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci
|
||||||
|
|
||||||
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid)
|
SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid)
|
||||||
{
|
{
|
||||||
timer_ = new SrsHourGlass("publish", this, 100 * SRS_UTIME_MILLISECONDS);
|
|
||||||
|
|
||||||
cid_ = cid;
|
cid_ = cid;
|
||||||
is_started = false;
|
is_started = false;
|
||||||
session_ = session;
|
session_ = session;
|
||||||
|
@ -902,6 +895,8 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon
|
||||||
|
|
||||||
SrsRtcPublishStream::~SrsRtcPublishStream()
|
SrsRtcPublishStream::~SrsRtcPublishStream()
|
||||||
{
|
{
|
||||||
|
_srs_hybrid->timer100ms()->unsubscribe(this);
|
||||||
|
|
||||||
// TODO: FIXME: Should remove and delete source.
|
// TODO: FIXME: Should remove and delete source.
|
||||||
if (source) {
|
if (source) {
|
||||||
source->set_publish_stream(NULL);
|
source->set_publish_stream(NULL);
|
||||||
|
@ -928,7 +923,6 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
|
||||||
}
|
}
|
||||||
audio_tracks_.clear();
|
audio_tracks_.clear();
|
||||||
|
|
||||||
srs_freep(timer_);
|
|
||||||
srs_freep(pli_worker_);
|
srs_freep(pli_worker_);
|
||||||
srs_freep(twcc_epp_);
|
srs_freep(twcc_epp_);
|
||||||
srs_freep(pli_epp);
|
srs_freep(pli_epp);
|
||||||
|
@ -1037,17 +1031,8 @@ srs_error_t SrsRtcPublishStream::start()
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((err = timer_->tick(SRS_TICKID_TWCC, 100 * SRS_UTIME_MILLISECONDS)) != srs_success) {
|
// For publisher timer, such as TWCC and RR.
|
||||||
return srs_error_wrap(err, "twcc tick");
|
_srs_hybrid->timer100ms()->subscribe(this);
|
||||||
}
|
|
||||||
|
|
||||||
if ((err = timer_->tick(SRS_TICKID_RTCP, 1000 * SRS_UTIME_MILLISECONDS)) != srs_success) {
|
|
||||||
return srs_error_wrap(err, "rtcp tick");
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((err = timer_->start()) != srs_success) {
|
|
||||||
return srs_error_wrap(err, "start timer");
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((err = source->on_publish()) != srs_success) {
|
if ((err = source->on_publish()) != srs_success) {
|
||||||
return srs_error_wrap(err, "on publish");
|
return srs_error_wrap(err, "on publish");
|
||||||
|
@ -1512,7 +1497,7 @@ srs_error_t SrsRtcPublishStream::do_request_keyframe(uint32_t ssrc, SrsContextId
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utime_t tick)
|
srs_error_t SrsRtcPublishStream::on_timer(srs_utime_t interval)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
@ -1522,7 +1507,8 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type == SRS_TICKID_RTCP) {
|
// For RR and RRTR.
|
||||||
|
if (true) {
|
||||||
++_srs_pps_rr->sugar;
|
++_srs_pps_rr->sugar;
|
||||||
|
|
||||||
if ((err = send_rtcp_rr()) != srs_success) {
|
if ((err = send_rtcp_rr()) != srs_success) {
|
||||||
|
@ -1536,7 +1522,8 @@ srs_error_t SrsRtcPublishStream::notify(int type, srs_utime_t interval, srs_utim
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (twcc_enabled_ && type == SRS_TICKID_TWCC) {
|
// For TWCC feedback.
|
||||||
|
if (twcc_enabled_) {
|
||||||
++_srs_pps_twcc->sugar;
|
++_srs_pps_twcc->sugar;
|
||||||
|
|
||||||
// We should not depends on the received packet,
|
// We should not depends on the received packet,
|
||||||
|
@ -1672,7 +1659,6 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
|
||||||
req = NULL;
|
req = NULL;
|
||||||
cid_ = cid;
|
cid_ = cid;
|
||||||
stat_ = new SrsRtcConnectionStatistic();
|
stat_ = new SrsRtcConnectionStatistic();
|
||||||
timer_ = new SrsHourGlass("conn", this, 20 * SRS_UTIME_MILLISECONDS);
|
|
||||||
hijacker_ = NULL;
|
hijacker_ = NULL;
|
||||||
|
|
||||||
sendonly_skt = NULL;
|
sendonly_skt = NULL;
|
||||||
|
@ -1699,10 +1685,10 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
|
||||||
|
|
||||||
SrsRtcConnection::~SrsRtcConnection()
|
SrsRtcConnection::~SrsRtcConnection()
|
||||||
{
|
{
|
||||||
|
_srs_hybrid->timer20ms()->unsubscribe(this);
|
||||||
|
|
||||||
_srs_rtc_manager->unsubscribe(this);
|
_srs_rtc_manager->unsubscribe(this);
|
||||||
|
|
||||||
srs_freep(timer_);
|
|
||||||
|
|
||||||
// Cleanup publishers.
|
// Cleanup publishers.
|
||||||
for(map<string, SrsRtcPublishStream*>::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
|
for(map<string, SrsRtcPublishStream*>::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
|
||||||
SrsRtcPublishStream* publisher = it->second;
|
SrsRtcPublishStream* publisher = it->second;
|
||||||
|
@ -1946,13 +1932,8 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
|
||||||
return srs_error_wrap(err, "init");
|
return srs_error_wrap(err, "init");
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((err = timer_->tick(SRS_TICKID_SEND_NACKS, 20 * SRS_UTIME_MILLISECONDS)) != srs_success) {
|
// The RTC connection start a timer, handle nacks.
|
||||||
return srs_error_wrap(err, "tick nack");
|
_srs_hybrid->timer20ms()->subscribe(this);
|
||||||
}
|
|
||||||
|
|
||||||
if ((err = timer_->start()) != srs_success) {
|
|
||||||
return srs_error_wrap(err, "start timer");
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: FIXME: Support reload.
|
// TODO: FIXME: Support reload.
|
||||||
session_timeout = _srs_config->get_rtc_stun_timeout(req->vhost);
|
session_timeout = _srs_config->get_rtc_stun_timeout(req->vhost);
|
||||||
|
@ -2326,25 +2307,23 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt)
|
||||||
sendonly_skt = addr_cache;
|
sendonly_skt = addr_cache;
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsRtcConnection::notify(int type, srs_utime_t interval, srs_utime_t tick)
|
srs_error_t SrsRtcConnection::on_timer(srs_utime_t interval)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
++_srs_pps_conn->sugar;
|
++_srs_pps_conn->sugar;
|
||||||
|
|
||||||
// For publisher to send NACK.
|
// For publisher to send NACK.
|
||||||
if (type == SRS_TICKID_SEND_NACKS) {
|
// TODO: FIXME: Merge with hybrid system clock.
|
||||||
// TODO: FIXME: Merge with hybrid system clock.
|
srs_update_system_time();
|
||||||
srs_update_system_time();
|
|
||||||
|
|
||||||
std::map<std::string, SrsRtcPublishStream*>::iterator it;
|
std::map<std::string, SrsRtcPublishStream*>::iterator it;
|
||||||
for (it = publishers_.begin(); it != publishers_.end(); it++) {
|
for (it = publishers_.begin(); it != publishers_.end(); it++) {
|
||||||
SrsRtcPublishStream* publisher = it->second;
|
SrsRtcPublishStream* publisher = it->second;
|
||||||
|
|
||||||
if ((err = publisher->check_send_nacks()) != srs_success) {
|
if ((err = publisher->check_send_nacks()) != srs_success) {
|
||||||
srs_warn("ignore nack err %s", srs_error_desc(err).c_str());
|
srs_warn("ignore nack err %s", srs_error_desc(err).c_str());
|
||||||
srs_freep(err);
|
srs_freep(err);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -211,8 +211,8 @@ public:
|
||||||
};
|
};
|
||||||
|
|
||||||
// A RTC play stream, client pull and play stream from SRS.
|
// A RTC play stream, client pull and play stream from SRS.
|
||||||
class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
|
class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler
|
||||||
, virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback
|
, public ISrsFastTimer, public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
SrsContextId cid_;
|
SrsContextId cid_;
|
||||||
|
@ -261,9 +261,9 @@ private:
|
||||||
public:
|
public:
|
||||||
// Directly set the status of track, generally for init to set the default value.
|
// Directly set the status of track, generally for init to set the default value.
|
||||||
void set_all_tracks_status(bool status);
|
void set_all_tracks_status(bool status);
|
||||||
// interface ISrsHourGlass
|
// interface ISrsFastTimer
|
||||||
public:
|
private:
|
||||||
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
|
srs_error_t on_timer(srs_utime_t interval);
|
||||||
public:
|
public:
|
||||||
srs_error_t on_rtcp(SrsRtcpCommon* rtcp);
|
srs_error_t on_rtcp(SrsRtcpCommon* rtcp);
|
||||||
private:
|
private:
|
||||||
|
@ -278,8 +278,8 @@ public:
|
||||||
};
|
};
|
||||||
|
|
||||||
// A RTC publish stream, client push and publish stream to SRS.
|
// A RTC publish stream, client push and publish stream to SRS.
|
||||||
class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtpPacketDecodeHandler
|
class SrsRtcPublishStream : public ISrsFastTimer, public ISrsRtpPacketDecodeHandler
|
||||||
, virtual public ISrsRtcPublishStream, virtual public ISrsRtcPLIWorkerHandler
|
, public ISrsRtcPublishStream, public ISrsRtcPLIWorkerHandler
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
SrsContextId cid_;
|
SrsContextId cid_;
|
||||||
|
@ -346,9 +346,9 @@ private:
|
||||||
public:
|
public:
|
||||||
void request_keyframe(uint32_t ssrc);
|
void request_keyframe(uint32_t ssrc);
|
||||||
virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid);
|
virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid);
|
||||||
// interface ISrsHourGlass
|
// interface ISrsFastTimer
|
||||||
public:
|
private:
|
||||||
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
|
srs_error_t on_timer(srs_utime_t interval);
|
||||||
public:
|
public:
|
||||||
void simulate_nack_drop(int nn);
|
void simulate_nack_drop(int nn);
|
||||||
private:
|
private:
|
||||||
|
@ -393,8 +393,7 @@ public:
|
||||||
//
|
//
|
||||||
// For performance, we use non-virtual public from resource,
|
// For performance, we use non-virtual public from resource,
|
||||||
// see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a
|
// see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a
|
||||||
class SrsRtcConnection : public ISrsResource
|
class SrsRtcConnection : public ISrsResource, public ISrsFastTimer, public ISrsDisposingHandler
|
||||||
, virtual public ISrsHourGlass, virtual public ISrsDisposingHandler
|
|
||||||
{
|
{
|
||||||
friend class SrsSecurityTransport;
|
friend class SrsSecurityTransport;
|
||||||
friend class SrsRtcPlayStream;
|
friend class SrsRtcPlayStream;
|
||||||
|
@ -506,9 +505,9 @@ public:
|
||||||
bool is_alive();
|
bool is_alive();
|
||||||
void alive();
|
void alive();
|
||||||
void update_sendonly_socket(SrsUdpMuxSocket* skt);
|
void update_sendonly_socket(SrsUdpMuxSocket* skt);
|
||||||
// interface ISrsHourGlass
|
// interface ISrsFastTimer
|
||||||
public:
|
private:
|
||||||
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
|
srs_error_t on_timer(srs_utime_t interval);
|
||||||
public:
|
public:
|
||||||
// send rtcp
|
// send rtcp
|
||||||
srs_error_t send_rtcp(char *data, int nb_data);
|
srs_error_t send_rtcp(char *data, int nb_data);
|
||||||
|
|
|
@ -281,7 +281,7 @@ srs_error_t SrsRtcServer::initialize()
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
// The RTC server start a timer, do routines of RTC server.
|
// The RTC server start a timer, do routines of RTC server.
|
||||||
_srs_hybrid->timer()->subscribe(5 * SRS_UTIME_SECONDS, this);
|
_srs_hybrid->timer5s()->subscribe(this);
|
||||||
|
|
||||||
// Initialize the black hole.
|
// Initialize the black hole.
|
||||||
if ((err = _srs_blackhole->initialize()) != srs_success) {
|
if ((err = _srs_blackhole->initialize()) != srs_success) {
|
||||||
|
@ -633,7 +633,7 @@ SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& user
|
||||||
return dynamic_cast<SrsRtcConnection*>(conn);
|
return dynamic_cast<SrsRtcConnection*>(conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick)
|
srs_error_t SrsRtcServer::on_timer(srs_utime_t interval)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
|
|
@ -138,7 +138,7 @@ public:
|
||||||
SrsRtcConnection* find_session_by_username(const std::string& ufrag);
|
SrsRtcConnection* find_session_by_username(const std::string& ufrag);
|
||||||
// interface ISrsFastTimer
|
// interface ISrsFastTimer
|
||||||
private:
|
private:
|
||||||
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
|
srs_error_t on_timer(srs_utime_t interval);
|
||||||
};
|
};
|
||||||
|
|
||||||
// The RTC server adapter.
|
// The RTC server adapter.
|
||||||
|
|
|
@ -346,6 +346,8 @@ SrsRtcStream::SrsRtcStream()
|
||||||
|
|
||||||
req = NULL;
|
req = NULL;
|
||||||
bridger_ = NULL;
|
bridger_ = NULL;
|
||||||
|
|
||||||
|
pli_for_rtmp_ = pli_elapsed_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsRtcStream::~SrsRtcStream()
|
SrsRtcStream::~SrsRtcStream()
|
||||||
|
@ -498,8 +500,8 @@ srs_error_t SrsRtcStream::on_publish()
|
||||||
}
|
}
|
||||||
|
|
||||||
// For SrsRtcStream::on_timer()
|
// For SrsRtcStream::on_timer()
|
||||||
srs_utime_t pli_for_rtmp = _srs_config->get_rtc_pli_for_rtmp(req->vhost);
|
pli_for_rtmp_ = _srs_config->get_rtc_pli_for_rtmp(req->vhost);
|
||||||
_srs_hybrid->timer()->subscribe(pli_for_rtmp, this);
|
_srs_hybrid->timer100ms()->subscribe(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: FIXME: Handle by statistic.
|
// TODO: FIXME: Handle by statistic.
|
||||||
|
@ -532,7 +534,7 @@ void SrsRtcStream::on_unpublish()
|
||||||
//free bridger resource
|
//free bridger resource
|
||||||
if (bridger_) {
|
if (bridger_) {
|
||||||
// For SrsRtcStream::on_timer()
|
// For SrsRtcStream::on_timer()
|
||||||
_srs_hybrid->timer()->unsubscribe(this);
|
_srs_hybrid->timer100ms()->unsubscribe(this);
|
||||||
|
|
||||||
bridger_->on_unpublish();
|
bridger_->on_unpublish();
|
||||||
srs_freep(bridger_);
|
srs_freep(bridger_);
|
||||||
|
@ -626,7 +628,7 @@ std::vector<SrsRtcTrackDescription*> SrsRtcStream::get_track_desc(std::string ty
|
||||||
return track_descs;
|
return track_descs;
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsRtcStream::on_timer(srs_utime_t interval, srs_utime_t tick)
|
srs_error_t SrsRtcStream::on_timer(srs_utime_t interval)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
@ -634,6 +636,14 @@ srs_error_t SrsRtcStream::on_timer(srs_utime_t interval, srs_utime_t tick)
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pli_elapsed_ += interval;
|
||||||
|
if (pli_elapsed_ < pli_for_rtmp_) {
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request PLI and reset the timer.
|
||||||
|
pli_elapsed_ = 0;
|
||||||
|
|
||||||
for (int i = 0; i < (int)stream_desc_->video_track_descs_.size(); i++) {
|
for (int i = 0; i < (int)stream_desc_->video_track_descs_.size(); i++) {
|
||||||
SrsRtcTrackDescription* desc = stream_desc_->video_track_descs_.at(i);
|
SrsRtcTrackDescription* desc = stream_desc_->video_track_descs_.at(i);
|
||||||
publish_stream_->request_keyframe(desc->ssrc_);
|
publish_stream_->request_keyframe(desc->ssrc_);
|
||||||
|
@ -1530,7 +1540,7 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const
|
||||||
|
|
||||||
SrsRtpSTAPPayload* stap_payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload());
|
SrsRtpSTAPPayload* stap_payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload());
|
||||||
if (stap_payload) {
|
if (stap_payload) {
|
||||||
for (int j = 0; j < stap_payload->nalus.size(); ++j) {
|
for (int j = 0; j < (int)stap_payload->nalus.size(); ++j) {
|
||||||
SrsSample* sample = stap_payload->nalus.at(j);
|
SrsSample* sample = stap_payload->nalus.at(j);
|
||||||
nb_payload += 4 + sample->size;
|
nb_payload += 4 + sample->size;
|
||||||
}
|
}
|
||||||
|
@ -1594,7 +1604,7 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const
|
||||||
|
|
||||||
SrsRtpSTAPPayload* stap_payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload());
|
SrsRtpSTAPPayload* stap_payload = dynamic_cast<SrsRtpSTAPPayload*>(pkt->payload());
|
||||||
if (stap_payload) {
|
if (stap_payload) {
|
||||||
for (int j = 0; j < stap_payload->nalus.size(); ++j) {
|
for (int j = 0; j < (int)stap_payload->nalus.size(); ++j) {
|
||||||
SrsSample* sample = stap_payload->nalus.at(j);
|
SrsSample* sample = stap_payload->nalus.at(j);
|
||||||
payload.write_4bytes(sample->size);
|
payload.write_4bytes(sample->size);
|
||||||
payload.write_bytes(sample->bytes, sample->size);
|
payload.write_bytes(sample->bytes, sample->size);
|
||||||
|
|
|
@ -201,6 +201,10 @@ private:
|
||||||
bool is_delivering_packets_;
|
bool is_delivering_packets_;
|
||||||
// Notify stream event to event handler
|
// Notify stream event to event handler
|
||||||
std::vector<ISrsRtcStreamEventHandler*> event_handlers_;
|
std::vector<ISrsRtcStreamEventHandler*> event_handlers_;
|
||||||
|
private:
|
||||||
|
// The PLI for RTC2RTMP.
|
||||||
|
srs_utime_t pli_for_rtmp_;
|
||||||
|
srs_utime_t pli_elapsed_;
|
||||||
public:
|
public:
|
||||||
SrsRtcStream();
|
SrsRtcStream();
|
||||||
virtual ~SrsRtcStream();
|
virtual ~SrsRtcStream();
|
||||||
|
@ -252,7 +256,7 @@ public:
|
||||||
std::vector<SrsRtcTrackDescription*> get_track_desc(std::string type, std::string media_type);
|
std::vector<SrsRtcTrackDescription*> get_track_desc(std::string type, std::string media_type);
|
||||||
// interface ISrsFastTimer
|
// interface ISrsFastTimer
|
||||||
private:
|
private:
|
||||||
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
|
srs_error_t on_timer(srs_utime_t interval);
|
||||||
};
|
};
|
||||||
|
|
||||||
// A helper class, to release the packet to cache.
|
// A helper class, to release the packet to cache.
|
||||||
|
|
|
@ -26,6 +26,6 @@
|
||||||
|
|
||||||
#define VERSION_MAJOR 4
|
#define VERSION_MAJOR 4
|
||||||
#define VERSION_MINOR 0
|
#define VERSION_MINOR 0
|
||||||
#define VERSION_REVISION 100
|
#define VERSION_REVISION 101
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Reference in a new issue