mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Timer: Extract shared FastTimer to use one timer for all connections
This commit is contained in:
parent
7bdc9e8e96
commit
7b413edbb7
4 changed files with 218 additions and 63 deletions
|
@ -27,11 +27,22 @@ using namespace std;
|
|||
|
||||
#include <srs_kernel_error.hpp>
|
||||
#include <srs_kernel_log.hpp>
|
||||
#include <srs_kernel_utility.hpp>
|
||||
|
||||
#include <srs_protocol_kbps.hpp>
|
||||
|
||||
SrsPps* _srs_pps_timer = new SrsPps();
|
||||
|
||||
extern SrsPps* _srs_pps_clock_15ms;
|
||||
extern SrsPps* _srs_pps_clock_20ms;
|
||||
extern SrsPps* _srs_pps_clock_25ms;
|
||||
extern SrsPps* _srs_pps_clock_30ms;
|
||||
extern SrsPps* _srs_pps_clock_35ms;
|
||||
extern SrsPps* _srs_pps_clock_40ms;
|
||||
extern SrsPps* _srs_pps_clock_80ms;
|
||||
extern SrsPps* _srs_pps_clock_160ms;
|
||||
extern SrsPps* _srs_pps_timer_s;
|
||||
|
||||
ISrsHourGlass::ISrsHourGlass()
|
||||
{
|
||||
}
|
||||
|
@ -89,6 +100,14 @@ srs_error_t SrsHourGlass::tick(int event, srs_utime_t interval)
|
|||
return err;
|
||||
}
|
||||
|
||||
void SrsHourGlass::untick(int event)
|
||||
{
|
||||
map<int, srs_utime_t>::iterator it = ticks.find(event);
|
||||
if (it != ticks.end()) {
|
||||
ticks.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
srs_error_t SrsHourGlass::cycle()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
@ -119,3 +138,128 @@ srs_error_t SrsHourGlass::cycle()
|
|||
|
||||
return err;
|
||||
}
|
||||
|
||||
ISrsFastTimer::ISrsFastTimer()
|
||||
{
|
||||
}
|
||||
|
||||
ISrsFastTimer::~ISrsFastTimer()
|
||||
{
|
||||
}
|
||||
|
||||
SrsFastTimer::SrsFastTimer(std::string label, srs_utime_t resolution)
|
||||
{
|
||||
timer_ = new SrsHourGlass(label, this, resolution);
|
||||
}
|
||||
|
||||
SrsFastTimer::~SrsFastTimer()
|
||||
{
|
||||
srs_freep(timer_);
|
||||
}
|
||||
|
||||
srs_error_t SrsFastTimer::start()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
if ((err = timer_->start()) != srs_success) {
|
||||
return srs_error_wrap(err, "start timer");
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
void SrsFastTimer::subscribe(srs_utime_t interval, ISrsFastTimer* timer)
|
||||
{
|
||||
static int g_event = 0;
|
||||
|
||||
int event = g_event++;
|
||||
|
||||
// TODO: FIXME: Error leak. Change tick to void in future.
|
||||
timer_->tick(event, interval);
|
||||
|
||||
handlers_[event] = timer;
|
||||
}
|
||||
|
||||
void SrsFastTimer::unsubscribe(ISrsFastTimer* timer)
|
||||
{
|
||||
for (map<int, ISrsFastTimer*>::iterator it = handlers_.begin(); it != handlers_.end();) {
|
||||
if (it->second != timer) {
|
||||
++it;
|
||||
continue;
|
||||
}
|
||||
|
||||
handlers_.erase(it++);
|
||||
|
||||
int event = it->first;
|
||||
timer_->untick(event);
|
||||
}
|
||||
}
|
||||
|
||||
srs_error_t SrsFastTimer::notify(int event, srs_utime_t interval, srs_utime_t tick)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
for (map<int, ISrsFastTimer*>::iterator it = handlers_.begin(); it != handlers_.end(); ++it) {
|
||||
ISrsFastTimer* timer = it->second;
|
||||
|
||||
if (event != it->first) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((err = timer->on_timer(interval, tick)) != srs_success) {
|
||||
return srs_error_wrap(err, "tick for event=%d, interval=%dms, tick=%dms",
|
||||
event, srsu2msi(interval), srsu2msi(tick));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
SrsClockWallMonitor::SrsClockWallMonitor()
|
||||
{
|
||||
}
|
||||
|
||||
SrsClockWallMonitor::~SrsClockWallMonitor()
|
||||
{
|
||||
}
|
||||
|
||||
srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
static srs_utime_t clock = 0;
|
||||
|
||||
srs_utime_t now = srs_update_system_time();
|
||||
if (!clock) {
|
||||
clock = now;
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_utime_t elapsed = now - clock;
|
||||
clock = now;
|
||||
|
||||
if (elapsed <= 15 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_15ms->sugar;
|
||||
} else if (elapsed <= 21 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_20ms->sugar;
|
||||
} else if (elapsed <= 25 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_25ms->sugar;
|
||||
} else if (elapsed <= 30 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_30ms->sugar;
|
||||
} else if (elapsed <= 35 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_35ms->sugar;
|
||||
} else if (elapsed <= 40 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_40ms->sugar;
|
||||
} else if (elapsed <= 80 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_80ms->sugar;
|
||||
} else if (elapsed <= 160 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_160ms->sugar;
|
||||
} else {
|
||||
++_srs_pps_timer_s->sugar;
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ public:
|
|||
//
|
||||
// // The hg will create a thread for timer.
|
||||
// hg->start();
|
||||
class SrsHourGlass : virtual public ISrsCoroutineHandler
|
||||
class SrsHourGlass : public ISrsCoroutineHandler
|
||||
{
|
||||
private:
|
||||
std::string label_;
|
||||
|
@ -97,10 +97,55 @@ public:
|
|||
// @param interval the interval in srs_utime_t of tick.
|
||||
virtual srs_error_t tick(srs_utime_t interval);
|
||||
virtual srs_error_t tick(int event, srs_utime_t interval);
|
||||
// Remove the tick by event.
|
||||
void untick(int event);
|
||||
public:
|
||||
// Cycle the hourglass, which will sleep resolution every time.
|
||||
// and call handler when ticked.
|
||||
virtual srs_error_t cycle();
|
||||
};
|
||||
|
||||
// The handler for fast timer.
|
||||
class ISrsFastTimer
|
||||
{
|
||||
public:
|
||||
ISrsFastTimer();
|
||||
virtual ~ISrsFastTimer();
|
||||
public:
|
||||
// Tick when timer is active.
|
||||
virtual srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick) = 0;
|
||||
};
|
||||
|
||||
// The fast timer, shared by objects, for high performance.
|
||||
// For example, we should never start a timer for each connection or publisher or player,
|
||||
// instead, we should start only one fast timer in server.
|
||||
class SrsFastTimer : public ISrsHourGlass
|
||||
{
|
||||
private:
|
||||
SrsHourGlass* timer_;
|
||||
std::map<int, ISrsFastTimer*> handlers_;
|
||||
public:
|
||||
SrsFastTimer(std::string label, srs_utime_t resolution);
|
||||
virtual ~SrsFastTimer();
|
||||
public:
|
||||
srs_error_t start();
|
||||
public:
|
||||
void subscribe(srs_utime_t interval, ISrsFastTimer* timer);
|
||||
void unsubscribe(ISrsFastTimer* timer);
|
||||
// Interface ISrsHourGlass
|
||||
private:
|
||||
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
|
||||
};
|
||||
|
||||
// To monitor the system wall clock timer deviation.
|
||||
class SrsClockWallMonitor : public ISrsFastTimer
|
||||
{
|
||||
public:
|
||||
SrsClockWallMonitor();
|
||||
virtual ~SrsClockWallMonitor();
|
||||
// interface ISrsFastTimer
|
||||
private:
|
||||
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -139,11 +139,18 @@ ISrsHybridServer::~ISrsHybridServer()
|
|||
|
||||
SrsHybridServer::SrsHybridServer()
|
||||
{
|
||||
// Note that the timer depends on other global variables,
|
||||
// so we MUST never create it in constructor.
|
||||
timer_ = NULL;
|
||||
|
||||
clock_monitor_ = new SrsClockWallMonitor();
|
||||
}
|
||||
|
||||
SrsHybridServer::~SrsHybridServer()
|
||||
{
|
||||
srs_freep(clock_monitor_);
|
||||
srs_freep(timer_);
|
||||
|
||||
vector<ISrsHybridServer*>::iterator it;
|
||||
for (it = servers.begin(); it != servers.end(); ++it) {
|
||||
ISrsHybridServer* server = *it;
|
||||
|
@ -166,10 +173,20 @@ srs_error_t SrsHybridServer::initialize()
|
|||
return srs_error_wrap(err, "initialize st failed");
|
||||
}
|
||||
|
||||
if ((err = setup_ticks()) != srs_success) {
|
||||
return srs_error_wrap(err, "tick");
|
||||
// Create global shared timer.
|
||||
timer_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS);
|
||||
|
||||
// Start the timer first.
|
||||
if ((err = timer_->start()) != srs_success) {
|
||||
return srs_error_wrap(err, "start timer");
|
||||
}
|
||||
|
||||
// The hybrid server start a timer, do routines of hybrid server.
|
||||
timer_->subscribe(5 * SRS_UTIME_SECONDS, this);
|
||||
|
||||
// A monitor to check the clock wall deviation, per clock tick.
|
||||
timer_->subscribe(20 * SRS_UTIME_MILLISECONDS, clock_monitor_);
|
||||
|
||||
vector<ISrsHybridServer*>::iterator it;
|
||||
for (it = servers.begin(); it != servers.end(); ++it) {
|
||||
ISrsHybridServer* server = *it;
|
||||
|
@ -220,67 +237,15 @@ SrsServerAdapter* SrsHybridServer::srs()
|
|||
return NULL;
|
||||
}
|
||||
|
||||
srs_error_t SrsHybridServer::setup_ticks()
|
||||
SrsFastTimer* SrsHybridServer::timer()
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// Start timer for system global works.
|
||||
timer_ = new SrsHourGlass("hybrid", this, 20 * SRS_UTIME_MILLISECONDS);
|
||||
|
||||
if ((err = timer_->tick(1, 20 * SRS_UTIME_MILLISECONDS)) != srs_success) {
|
||||
return srs_error_wrap(err, "tick");
|
||||
}
|
||||
|
||||
if ((err = timer_->tick(2, 5 * SRS_UTIME_SECONDS)) != srs_success) {
|
||||
return srs_error_wrap(err, "tick");
|
||||
}
|
||||
|
||||
if ((err = timer_->start()) != srs_success) {
|
||||
return srs_error_wrap(err, "start");
|
||||
}
|
||||
|
||||
return err;
|
||||
return timer_;
|
||||
}
|
||||
|
||||
srs_error_t SrsHybridServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
|
||||
srs_error_t SrsHybridServer::on_timer(srs_utime_t interval, srs_utime_t tick)
|
||||
{
|
||||
srs_error_t err = srs_success;
|
||||
|
||||
// Update system wall clock.
|
||||
if (event == 1) {
|
||||
static srs_utime_t clock = 0;
|
||||
|
||||
srs_utime_t now = srs_update_system_time();
|
||||
if (!clock) {
|
||||
clock = now;
|
||||
return err;
|
||||
}
|
||||
|
||||
srs_utime_t elapsed = now - clock;
|
||||
clock = now;
|
||||
|
||||
if (elapsed <= 15 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_15ms->sugar;
|
||||
} else if (elapsed <= 21 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_20ms->sugar;
|
||||
} else if (elapsed <= 25 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_25ms->sugar;
|
||||
} else if (elapsed <= 30 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_30ms->sugar;
|
||||
} else if (elapsed <= 35 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_35ms->sugar;
|
||||
} else if (elapsed <= 40 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_40ms->sugar;
|
||||
} else if (elapsed <= 80 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_80ms->sugar;
|
||||
} else if (elapsed <= 160 * SRS_UTIME_MILLISECONDS) {
|
||||
++_srs_pps_clock_160ms->sugar;
|
||||
} else {
|
||||
++_srs_pps_timer_s->sugar;
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
// Show statistics for RTC server.
|
||||
SrsProcSelfStat* u = srs_get_self_proc_stat();
|
||||
// Resident Set Size: number of pages the process has in real memory.
|
||||
|
|
|
@ -49,11 +49,12 @@ public:
|
|||
};
|
||||
|
||||
// The hybrid server manager.
|
||||
class SrsHybridServer : public ISrsHourGlass
|
||||
class SrsHybridServer : public ISrsFastTimer
|
||||
{
|
||||
private:
|
||||
std::vector<ISrsHybridServer*> servers;
|
||||
SrsHourGlass* timer_;
|
||||
SrsFastTimer* timer_;
|
||||
SrsClockWallMonitor* clock_monitor_;
|
||||
public:
|
||||
SrsHybridServer();
|
||||
virtual ~SrsHybridServer();
|
||||
|
@ -65,10 +66,10 @@ public:
|
|||
virtual void stop();
|
||||
public:
|
||||
virtual SrsServerAdapter* srs();
|
||||
// interface ISrsHourGlass
|
||||
SrsFastTimer* timer();
|
||||
// interface ISrsFastTimer
|
||||
private:
|
||||
virtual srs_error_t setup_ticks();
|
||||
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
|
||||
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
|
||||
};
|
||||
|
||||
extern SrsHybridServer* _srs_hybrid;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue