1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 20:01:56 +00:00

For #1638, #307, use hourglass for timer.

This commit is contained in:
winlin 2020-03-19 12:58:04 +08:00
parent 830e60da29
commit 8eef439c90
4 changed files with 87 additions and 120 deletions

View file

@ -26,7 +26,6 @@
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_app_st.hpp>
#include <srs_kernel_log.hpp>
ISrsHourGlass::ISrsHourGlass()
@ -42,13 +41,31 @@ SrsHourGlass::SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution)
handler = h;
_resolution = resolution;
total_elapse = 0;
trd = new SrsSTCoroutine("timer", this, _srs_context->get_id());
}
SrsHourGlass::~SrsHourGlass()
{
srs_freep(trd);
}
srs_error_t SrsHourGlass::tick(int type, srs_utime_t interval)
srs_error_t SrsHourGlass::start()
{
srs_error_t err = srs_success;
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
return err;
}
srs_error_t SrsHourGlass::tick(srs_utime_t interval)
{
return tick(0, interval);
}
srs_error_t SrsHourGlass::tick(int event, srs_utime_t interval)
{
srs_error_t err = srs_success;
@ -57,7 +74,7 @@ srs_error_t SrsHourGlass::tick(int type, srs_utime_t interval)
"invalid interval=%dms, resolution=%dms", srsu2msi(interval), srsu2msi(_resolution));
}
ticks[type] = interval;
ticks[event] = interval;
return err;
}
@ -66,13 +83,18 @@ srs_error_t SrsHourGlass::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "quit");
}
map<int, srs_utime_t>::iterator it;
for (it = ticks.begin(); it != ticks.end(); ++it) {
int type = it->first;
int event = it->first;
srs_utime_t interval = it->second;
if (interval == 0 || (total_elapse % interval) == 0) {
if ((err = handler->notify(type, interval, total_elapse)) != srs_success) {
if ((err = handler->notify(event, interval, total_elapse)) != srs_success) {
return srs_error_wrap(err, "notify");
}
}
@ -81,6 +103,7 @@ srs_error_t SrsHourGlass::cycle()
// TODO: FIXME: Maybe we should use wallclock.
total_elapse += _resolution;
srs_usleep(_resolution);
}
return err;
}

View file

@ -26,8 +26,12 @@
#include <srs_core.hpp>
#include <srs_app_st.hpp>
#include <map>
class SrsCoroutine;
// The handler for the tick.
class ISrsHourGlass
{
@ -36,40 +40,41 @@ public:
virtual ~ISrsHourGlass();
public:
// When time is ticked, this function is called.
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick) = 0;
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick) = 0;
};
// he hourglass used to do some specieal task,
// while these task is cycle when some interval, for example,
// there are N=3 tasks to do:
// 1. heartbeat every 3s.
// 2. print message every 5s.
// 3. notify backend every 7s.
// The hourglass(timer or SrsTimer) for special tasks,
// while these tasks are attached to some intervals, for example,
// there are N=3 tasks bellow:
// 1. A heartbeat every 3s.
// 2. A print message every 5s.
// 3. A notify backend every 7s.
// The hourglass will call back when ticks:
// 1. notify(type=1, time=3)
// 2. notify(type=2, time=5)
// 3. notify(type=1, time=6)
// 4. notify(type=3, time=7)
// 5. notify(type=1, time=9)
// 6. notify(type=2, time=10)
// This is used for server and bocar server and other manager.
// 1. Got notify(event=1, time=3)
// 2. Got notify(event=2, time=5)
// 3. Got notify(event=1, time=6)
// 4. Got notify(event=3, time=7)
// 5. Got notify(event=1, time=9)
// 6. Got notify(event=2, time=10)
// It's a complex and high-performance timer.
//
// Usage:
// SrsHourGlass* hg = new SrsHourGlass(handler, 1 * SRS_UTIME_MILLISECONDS);
//
// hg->tick(1, 3 * SRS_UTIME_MILLISECONDS);
// hg->tick(2, 5 * SRS_UTIME_MILLISECONDS);
// hg->tick(3, 7 * SRS_UTIME_MILLISECONDS);
// // create a thread to cycle, which will call handerl when ticked.
// while (true) {
// hg->cycle();
// }
class SrsHourGlass
//
// // The hg will create a thread for timer.
// hg->start();
class SrsHourGlass : virtual public ISrsCoroutineHandler
{
private:
SrsCoroutine* trd;
ISrsHourGlass* handler;
srs_utime_t _resolution;
// The ticks:
// key: the type of tick.
// key: the event of tick.
// value: the interval of tick.
std::map<int, srs_utime_t> ticks;
// The total elapsed time,
@ -79,10 +84,14 @@ public:
SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution);
virtual ~SrsHourGlass();
public:
// Add a pair of tick(type, interval).
// @param type the type of tick.
// Start the hourglass.
virtual srs_error_t start();
public:
// Add a pair of tick(event, interval).
// @param event the event of tick, default is 0.
// @param interval the interval in srs_utime_t of tick.
virtual srs_error_t tick(int type, srs_utime_t interval);
virtual srs_error_t tick(srs_utime_t interval);
virtual srs_error_t tick(int event, srs_utime_t interval);
public:
// Cycle the hourglass, which will sleep resolution every time.
// and call handler when ticked.

View file

@ -1151,23 +1151,25 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt)
SrsRtcServer::SrsRtcServer()
{
listener = NULL;
timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS);
}
SrsRtcServer::~SrsRtcServer()
{
srs_freep(listener);
rttrd->stop();
srs_freep(rttrd);
srs_freep(timer);
}
srs_error_t SrsRtcServer::initialize()
{
srs_error_t err = srs_success;
rttrd = new SrsRtcTimerThread(this, _srs_context->get_id());
if ((err = rttrd->start()) != srs_success) {
return srs_error_wrap(err, "rtc timer thread init failed");
if ((err = timer->tick(1 * SRS_UTIME_SECONDS)) != srs_success) {
return srs_error_wrap(err, "hourglass tick");
}
if ((err = timer->start()) != srs_success) {
return srs_error_wrap(err, "start timer");
}
return err;
@ -1369,60 +1371,10 @@ void SrsRtcServer::check_and_clean_timeout_session()
}
}
SrsRtcTimerThread::SrsRtcTimerThread(SrsRtcServer* rtc_svr, int parent_cid)
srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tick)
{
_parent_cid = parent_cid;
trd = new SrsDummyCoroutine();
rtc_server = rtc_svr;
}
SrsRtcTimerThread::~SrsRtcTimerThread()
{
srs_freep(trd);
}
int SrsRtcTimerThread::cid()
{
return trd->cid();
}
srs_error_t SrsRtcTimerThread::start()
{
srs_error_t err = srs_success;
srs_freep(trd);
trd = new SrsSTCoroutine("rtc_timer", this, _parent_cid);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "rtc_timer");
}
return err;
}
void SrsRtcTimerThread::stop()
{
trd->stop();
}
void SrsRtcTimerThread::stop_loop()
{
trd->interrupt();
}
srs_error_t SrsRtcTimerThread::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "rtc timer thread");
}
srs_usleep(1 * SRS_UTIME_SECONDS);
rtc_server->check_and_clean_timeout_session();
}
check_and_clean_timeout_session();
return srs_success;
}
RtcServerAdapter::RtcServerAdapter()

View file

@ -30,6 +30,7 @@
#include <srs_kernel_utility.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_hourglass.hpp>
#include <string>
#include <map>
@ -238,32 +239,11 @@ private:
srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt);
};
// TODO: FIXME: is there any other timer thread?
class SrsRtcTimerThread : public ISrsCoroutineHandler
{
protected:
SrsCoroutine* trd;
int _parent_cid;
private:
SrsRtcServer* rtc_server;
public:
SrsRtcTimerThread(SrsRtcServer* rtc_svr, int parent_cid);
virtual ~SrsRtcTimerThread();
public:
virtual int cid();
public:
virtual srs_error_t start();
virtual void stop();
virtual void stop_loop();
public:
virtual srs_error_t cycle();
};
class SrsRtcServer : public ISrsUdpMuxHandler
class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass
{
private:
SrsUdpMuxListener* listener;
SrsRtcTimerThread* rttrd;
SrsHourGlass* timer;
private:
std::map<std::string, SrsRtcSession*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
std::map<std::string, SrsRtcSession*> map_id_session; // key: peerip(ip + ":" + port)
@ -289,6 +269,9 @@ private:
private:
SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag);
SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id);
// interface ISrsHourGlass
public:
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick);
};
// The RTC server adapter.