mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
STAT: Update stat for SRT connection.
This commit is contained in:
parent
101e4fa3b9
commit
da24de5ecb
5 changed files with 104 additions and 15 deletions
|
@ -136,8 +136,7 @@ public:
|
||||||
};
|
};
|
||||||
|
|
||||||
// Interface for connection that is startable.
|
// Interface for connection that is startable.
|
||||||
class ISrsStartableConneciton : public ISrsConnection
|
class ISrsStartableConneciton : public ISrsConnection, public ISrsStartable, public ISrsKbpsDelta
|
||||||
, public ISrsStartable, public ISrsKbpsDelta
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ISrsStartableConneciton();
|
ISrsStartableConneciton();
|
||||||
|
|
|
@ -19,6 +19,9 @@ using namespace std;
|
||||||
#include <srs_app_pithy_print.hpp>
|
#include <srs_app_pithy_print.hpp>
|
||||||
#include <srs_app_srt_server.hpp>
|
#include <srs_app_srt_server.hpp>
|
||||||
#include <srs_app_srt_source.hpp>
|
#include <srs_app_srt_source.hpp>
|
||||||
|
#include <srs_app_statistic.hpp>
|
||||||
|
#include <srs_protocol_rtmp_stack.hpp>
|
||||||
|
#include <srs_kernel_utility.hpp>
|
||||||
|
|
||||||
SrsSrtConnection::SrsSrtConnection(srs_srt_t srt_fd)
|
SrsSrtConnection::SrsSrtConnection(srs_srt_t srt_fd)
|
||||||
{
|
{
|
||||||
|
@ -167,6 +170,7 @@ SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, s
|
||||||
|
|
||||||
srt_source_ = NULL;
|
srt_source_ = NULL;
|
||||||
req_ = new SrsRequest();
|
req_ = new SrsRequest();
|
||||||
|
req_->ip = ip;
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsMpegtsSrtConn::~SrsMpegtsSrtConn()
|
SrsMpegtsSrtConn::~SrsMpegtsSrtConn()
|
||||||
|
@ -188,10 +192,14 @@ std::string SrsMpegtsSrtConn::desc()
|
||||||
|
|
||||||
void SrsMpegtsSrtConn::remark(int64_t* in, int64_t* out)
|
void SrsMpegtsSrtConn::remark(int64_t* in, int64_t* out)
|
||||||
{
|
{
|
||||||
// TODO: FIXME: no impl currently.
|
|
||||||
kbps_->remark(in, out);
|
kbps_->remark(in, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SrsMpegtsSrtConn::expire()
|
||||||
|
{
|
||||||
|
trd_->interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
srs_error_t SrsMpegtsSrtConn::start()
|
srs_error_t SrsMpegtsSrtConn::start()
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
@ -215,9 +223,12 @@ const SrsContextId& SrsMpegtsSrtConn::get_id()
|
||||||
|
|
||||||
srs_error_t SrsMpegtsSrtConn::cycle()
|
srs_error_t SrsMpegtsSrtConn::cycle()
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = do_cycle();
|
||||||
|
|
||||||
err = do_cycle();
|
// Update statistic when done.
|
||||||
|
SrsStatistic* stat = SrsStatistic::instance();
|
||||||
|
stat->kbps_add_delta(get_id().c_str(), this);
|
||||||
|
stat->on_disconnect(get_id().c_str());
|
||||||
|
|
||||||
// Notify manager to remove it.
|
// Notify manager to remove it.
|
||||||
// Note that we create this object, so we use manager to remove it.
|
// Note that we create this object, so we use manager to remove it.
|
||||||
|
@ -256,6 +267,12 @@ srs_error_t SrsMpegtsSrtConn::do_cycle()
|
||||||
return srs_error_new(ERROR_SRT_CONN, "invalid srt streamid=%s", streamid.c_str());
|
return srs_error_new(ERROR_SRT_CONN, "invalid srt streamid=%s", streamid.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// discovery vhost, resolve the vhost from config
|
||||||
|
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req_->vhost);
|
||||||
|
if (parsed_vhost) {
|
||||||
|
req_->vhost = parsed_vhost->arg0();
|
||||||
|
}
|
||||||
|
|
||||||
if (! _srs_config->get_srt_enabled(req_->vhost)) {
|
if (! _srs_config->get_srt_enabled(req_->vhost)) {
|
||||||
return srs_error_new(ERROR_SRT_CONN, "srt disabled, vhost=%s", req_->vhost.c_str());
|
return srs_error_new(ERROR_SRT_CONN, "srt disabled, vhost=%s", req_->vhost.c_str());
|
||||||
}
|
}
|
||||||
|
@ -271,6 +288,9 @@ srs_error_t SrsMpegtsSrtConn::do_cycle()
|
||||||
return srs_error_wrap(err, "on connect");
|
return srs_error_wrap(err, "on connect");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build the tcUrl which is vhost/app.
|
||||||
|
req_->tcUrl = srs_generate_tc_url(req_->host, req_->vhost, req_->app, req_->port);
|
||||||
|
|
||||||
if (mode == SrtModePush) {
|
if (mode == SrtModePush) {
|
||||||
err = publishing();
|
err = publishing();
|
||||||
} else if (mode == SrtModePull) {
|
} else if (mode == SrtModePull) {
|
||||||
|
@ -307,6 +327,11 @@ srs_error_t SrsMpegtsSrtConn::playing()
|
||||||
if ((err = http_hooks_on_play()) != srs_success) {
|
if ((err = http_hooks_on_play()) != srs_success) {
|
||||||
return srs_error_wrap(err, "rtmp: callback on play");
|
return srs_error_wrap(err, "rtmp: callback on play");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SrsStatistic* stat = SrsStatistic::instance();
|
||||||
|
if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPlay)) != srs_success) {
|
||||||
|
return srs_error_wrap(err, "rtmp: stat client");
|
||||||
|
}
|
||||||
|
|
||||||
err = do_playing();
|
err = do_playing();
|
||||||
http_hooks_on_stop();
|
http_hooks_on_stop();
|
||||||
|
@ -363,6 +388,11 @@ srs_error_t SrsMpegtsSrtConn::do_publishing()
|
||||||
SrsPithyPrint* pprint = SrsPithyPrint::create_srt_publish();
|
SrsPithyPrint* pprint = SrsPithyPrint::create_srt_publish();
|
||||||
SrsAutoFree(SrsPithyPrint, pprint);
|
SrsAutoFree(SrsPithyPrint, pprint);
|
||||||
|
|
||||||
|
SrsStatistic* stat = SrsStatistic::instance();
|
||||||
|
if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPublish)) != srs_success) {
|
||||||
|
return srs_error_wrap(err, "srt: stat client");
|
||||||
|
}
|
||||||
|
|
||||||
int nb_packets = 0;
|
int nb_packets = 0;
|
||||||
|
|
||||||
// Max udp packet size equal to 1500.
|
// Max udp packet size equal to 1500.
|
||||||
|
|
|
@ -70,7 +70,7 @@ private:
|
||||||
srs_error_t recv_err_;
|
srs_error_t recv_err_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHandler
|
class SrsMpegtsSrtConn : public ISrsStartableConneciton, public ISrsCoroutineHandler, public ISrsExpire
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port);
|
SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port);
|
||||||
|
@ -81,6 +81,9 @@ public:
|
||||||
// Interface ISrsKbpsDelta
|
// Interface ISrsKbpsDelta
|
||||||
public:
|
public:
|
||||||
virtual void remark(int64_t* in, int64_t* out);
|
virtual void remark(int64_t* in, int64_t* out);
|
||||||
|
// Interface ISrsExpire
|
||||||
|
public:
|
||||||
|
virtual void expire();
|
||||||
public:
|
public:
|
||||||
virtual srs_error_t start();
|
virtual srs_error_t start();
|
||||||
// Interface ISrsConnection.
|
// Interface ISrsConnection.
|
||||||
|
|
|
@ -13,6 +13,7 @@ using namespace std;
|
||||||
#include <srs_protocol_log.hpp>
|
#include <srs_protocol_log.hpp>
|
||||||
#include <srs_app_config.hpp>
|
#include <srs_app_config.hpp>
|
||||||
#include <srs_app_srt_conn.hpp>
|
#include <srs_app_srt_conn.hpp>
|
||||||
|
#include <srs_app_statistic.hpp>
|
||||||
|
|
||||||
#ifdef SRS_SRT
|
#ifdef SRS_SRT
|
||||||
SrsSrtEventLoop* _srt_eventloop = NULL;
|
SrsSrtEventLoop* _srt_eventloop = NULL;
|
||||||
|
@ -131,16 +132,27 @@ srs_error_t SrsSrtAcceptor::on_srt_client(srs_srt_t srt_fd)
|
||||||
SrsSrtServer::SrsSrtServer()
|
SrsSrtServer::SrsSrtServer()
|
||||||
{
|
{
|
||||||
conn_manager_ = new SrsResourceManager("SRT", true);
|
conn_manager_ = new SrsResourceManager("SRT", true);
|
||||||
|
timer_ = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SrsSrtServer::~SrsSrtServer()
|
SrsSrtServer::~SrsSrtServer()
|
||||||
{
|
{
|
||||||
srs_freep(conn_manager_);
|
srs_freep(conn_manager_);
|
||||||
|
srs_freep(timer_);
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t SrsSrtServer::initialize()
|
srs_error_t SrsSrtServer::initialize()
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
if (! _srs_config->get_srt_enabled()) {
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((err = setup_ticks()) != srs_success) {
|
||||||
|
return srs_error_wrap(err, "tick");
|
||||||
|
}
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,17 +254,54 @@ srs_error_t SrsSrtServer::fd_to_resource(srs_srt_t srt_fd, ISrsStartableConnecit
|
||||||
|
|
||||||
void SrsSrtServer::remove(ISrsResource* c)
|
void SrsSrtServer::remove(ISrsResource* c)
|
||||||
{
|
{
|
||||||
// TODO: FIXME: add some statistic of srt.
|
|
||||||
// ISrsStartableConneciton* conn = dynamic_cast<ISrsStartableConneciton*>(c);
|
|
||||||
|
|
||||||
// SrsStatistic* stat = SrsStatistic::instance();
|
|
||||||
// stat->kbps_add_delta(c->get_id().c_str(), conn);
|
|
||||||
// stat->on_disconnect(c->get_id().c_str());
|
|
||||||
|
|
||||||
// use manager to free it async.
|
// use manager to free it async.
|
||||||
conn_manager_->remove(c);
|
conn_manager_->remove(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
srs_error_t SrsSrtServer::setup_ticks()
|
||||||
|
{
|
||||||
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
srs_freep(timer_);
|
||||||
|
timer_ = new SrsHourGlass("srt", this, 1 * SRS_UTIME_SECONDS);
|
||||||
|
|
||||||
|
if (_srs_config->get_stats_enabled()) {
|
||||||
|
if ((err = timer_->tick(8, 3 * SRS_UTIME_SECONDS)) != srs_success) {
|
||||||
|
return srs_error_wrap(err, "tick");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((err = timer_->start()) != srs_success) {
|
||||||
|
return srs_error_wrap(err, "timer");
|
||||||
|
}
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
srs_error_t SrsSrtServer::notify(int event, srs_utime_t interval, srs_utime_t tick)
|
||||||
|
{
|
||||||
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
|
switch (event) {
|
||||||
|
case 8: resample_kbps(); break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SrsSrtServer::resample_kbps()
|
||||||
|
{
|
||||||
|
// collect delta from all clients.
|
||||||
|
for (int i = 0; i < (int)conn_manager_->size(); i++) {
|
||||||
|
ISrsResource* c = conn_manager_->at(i);
|
||||||
|
ISrsKbpsDelta* conn = dynamic_cast<ISrsKbpsDelta*>(conn_manager_->at(i));
|
||||||
|
|
||||||
|
// add delta of connection to server kbps.,
|
||||||
|
// for next sample() of server kbps can get the stat.
|
||||||
|
SrsStatistic::instance()->kbps_add_delta(c->get_id().c_str(), conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SrsSrtServerAdapter::SrsSrtServerAdapter()
|
SrsSrtServerAdapter::SrsSrtServerAdapter()
|
||||||
{
|
{
|
||||||
srt_server_ = new SrsSrtServer();
|
srt_server_ = new SrsSrtServer();
|
||||||
|
@ -349,7 +398,7 @@ srs_error_t SrsSrtEventLoop::start()
|
||||||
srs_error_t SrsSrtEventLoop::cycle()
|
srs_error_t SrsSrtEventLoop::cycle()
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if ((err = trd_->pull()) != srs_success) {
|
if ((err = trd_->pull()) != srs_success) {
|
||||||
return srs_error_wrap(err, "srt listener");
|
return srs_error_wrap(err, "srt listener");
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include <srs_app_srt_listener.hpp>
|
#include <srs_app_srt_listener.hpp>
|
||||||
|
|
||||||
class SrsSrtServer;
|
class SrsSrtServer;
|
||||||
|
class SrsHourGlass;
|
||||||
|
|
||||||
// A common srt acceptor, for SRT server.
|
// A common srt acceptor, for SRT server.
|
||||||
class SrsSrtAcceptor : public ISrsSrtHandler
|
class SrsSrtAcceptor : public ISrsSrtHandler
|
||||||
|
@ -37,10 +38,11 @@ public:
|
||||||
};
|
};
|
||||||
|
|
||||||
// SRS SRT server, initialize and listen, start connection service thread, destroy client.
|
// SRS SRT server, initialize and listen, start connection service thread, destroy client.
|
||||||
class SrsSrtServer : public ISrsResourceManager
|
class SrsSrtServer : public ISrsResourceManager, public ISrsHourGlass
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
SrsResourceManager* conn_manager_;
|
SrsResourceManager* conn_manager_;
|
||||||
|
SrsHourGlass* timer_;
|
||||||
private:
|
private:
|
||||||
std::vector<SrsSrtAcceptor*> acceptors_;
|
std::vector<SrsSrtAcceptor*> acceptors_;
|
||||||
public:
|
public:
|
||||||
|
@ -66,6 +68,12 @@ public:
|
||||||
// A callback for connection to remove itself.
|
// A callback for connection to remove itself.
|
||||||
// When connection thread cycle terminated, callback this to delete connection.
|
// When connection thread cycle terminated, callback this to delete connection.
|
||||||
virtual void remove(ISrsResource* c);
|
virtual void remove(ISrsResource* c);
|
||||||
|
// interface ISrsHourGlass
|
||||||
|
private:
|
||||||
|
virtual srs_error_t setup_ticks();
|
||||||
|
virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick);
|
||||||
|
private:
|
||||||
|
virtual void resample_kbps();
|
||||||
};
|
};
|
||||||
|
|
||||||
// The srt server adapter, the master server.
|
// The srt server adapter, the master server.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue