From 2a1db36750edfedef7a288317ac0af4fb5903768 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 6 Jun 2015 18:57:41 +0800 Subject: [PATCH] refine the pithy print of ingesters. --- trunk/src/app/srs_app_ingest.cpp | 98 +++++++++++++++++++++++++------- trunk/src/app/srs_app_ingest.hpp | 20 +++++-- 2 files changed, 94 insertions(+), 24 deletions(-) diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index 2a5293e95..da4705725 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -39,11 +39,9 @@ using namespace std; // ingest never sleep a long time, for we must start the stream ASAP. #define SRS_AUTO_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL) -SrsIngesterFFMPEG::SrsIngesterFFMPEG(SrsFFMPEG* _ffmpeg, string _vhost, string _id) +SrsIngesterFFMPEG::SrsIngesterFFMPEG() { - ffmpeg = _ffmpeg; - vhost = _vhost; - id = _id; + ffmpeg = NULL; } SrsIngesterFFMPEG::~SrsIngesterFFMPEG() @@ -51,6 +49,53 @@ SrsIngesterFFMPEG::~SrsIngesterFFMPEG() srs_freep(ffmpeg); } +int SrsIngesterFFMPEG::initialize(SrsFFMPEG* ff, string v, string i) +{ + int ret = ERROR_SUCCESS; + + ffmpeg = ff; + vhost = v; + id = i; + starttime = srs_get_system_time_ms(); + + return ret; +} + +string SrsIngesterFFMPEG::uri() +{ + return vhost + "/" + id; +} + +int SrsIngesterFFMPEG::alive() +{ + return (int)(srs_get_system_time_ms() - starttime); +} + +bool SrsIngesterFFMPEG::equals(string v) +{ + return vhost == v; +} + +bool SrsIngesterFFMPEG::equals(string v, string i) +{ + return vhost == v && id == i; +} + +int SrsIngesterFFMPEG::start() +{ + return ffmpeg->start(); +} + +void SrsIngesterFFMPEG::stop() +{ + ffmpeg->stop(); +} + +int SrsIngesterFFMPEG::cycle() +{ + return ffmpeg->cycle(); +} + void SrsIngesterFFMPEG::fast_stop() { ffmpeg->fast_stop(); @@ -129,6 +174,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest // get all engines. std::vector engines = _srs_config->get_transcode_engines(ingest); + + // create ingesters without engines. if (engines.empty()) { SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != ERROR_SUCCESS) { @@ -139,12 +186,17 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest return ret; } - SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(ffmpeg, vhost->arg0(), ingest->arg0()); + SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(); + if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) { + srs_freep(ingester); + return ret; + } + ingesters.push_back(ingester); return ret; } - // create engine + // create ingesters with engine for (int i = 0; i < (int)engines.size(); i++) { SrsConfDirective* engine = engines[i]; SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); @@ -156,8 +208,13 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest } return ret; } + + SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(); + if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) { + srs_freep(ingester); + return ret; + } - SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(ffmpeg, vhost->arg0(), ingest->arg0()); ingesters.push_back(ingester); } @@ -196,13 +253,13 @@ int SrsIngester::cycle() SrsIngesterFFMPEG* ingester = *it; // start all ffmpegs. - if ((ret = ingester->ffmpeg->start()) != ERROR_SUCCESS) { + if ((ret = ingester->start()) != ERROR_SUCCESS) { srs_error("ingest ffmpeg start failed. ret=%d", ret); return ret; } // check ffmpeg status. - if ((ret = ingester->ffmpeg->cycle()) != ERROR_SUCCESS) { + if ((ret = ingester->cycle()) != ERROR_SUCCESS) { srs_error("ingest ffmpeg cycle failed. ret=%d", ret); return ret; } @@ -376,11 +433,14 @@ void SrsIngester::show_ingest_log_message() return; } + // random choose one ingester to report. + int index = rand() % (int)ingesters.size(); + SrsIngesterFFMPEG* ingester = ingesters.at(index); + // reportable if (pprint->can_print()) { - // TODO: FIXME: show more info. - srs_trace("-> "SRS_CONSTS_LOG_INGESTER - " time=%"PRId64", ingesters=%d", pprint->age(), (int)ingesters.size()); + srs_trace("-> "SRS_CONSTS_LOG_INGESTER" time=%"PRId64", ingesters=%d, #%d(alive=%ds, %s)", + pprint->age(), (int)ingesters.size(), index, ingester->alive() / 1000, ingester->uri().c_str()); } } @@ -407,16 +467,15 @@ int SrsIngester::on_reload_vhost_removed(string vhost) for (it = ingesters.begin(); it != ingesters.end();) { SrsIngesterFFMPEG* ingester = *it; - if (ingester->vhost != vhost) { + if (!ingester->equals(vhost)) { ++it; continue; } // stop the ffmpeg and free it. - ingester->ffmpeg->stop(); + ingester->stop(); - srs_trace("reload stop ingester, " - "vhost=%s, id=%s", vhost.c_str(), ingester->id.c_str()); + srs_trace("reload stop ingester, vhost=%s, id=%s", vhost.c_str(), ingester->uri().c_str()); srs_freep(ingester); @@ -436,16 +495,15 @@ int SrsIngester::on_reload_ingest_removed(string vhost, string ingest_id) for (it = ingesters.begin(); it != ingesters.end();) { SrsIngesterFFMPEG* ingester = *it; - if (ingester->vhost != vhost || ingester->id != ingest_id) { + if (!ingester->equals(vhost, ingest_id)) { ++it; continue; } // stop the ffmpeg and free it. - ingester->ffmpeg->stop(); + ingester->stop(); - srs_trace("reload stop ingester, " - "vhost=%s, id=%s", vhost.c_str(), ingester->id.c_str()); + srs_trace("reload stop ingester, vhost=%s, id=%s", vhost.c_str(), ingester->uri().c_str()); srs_freep(ingester); diff --git a/trunk/src/app/srs_app_ingest.hpp b/trunk/src/app/srs_app_ingest.hpp index ede1e5211..15137d035 100644 --- a/trunk/src/app/srs_app_ingest.hpp +++ b/trunk/src/app/srs_app_ingest.hpp @@ -45,14 +45,26 @@ class SrsPithyPrint; */ class SrsIngesterFFMPEG { -public: +private: std::string vhost; std::string id; SrsFFMPEG* ffmpeg; - - SrsIngesterFFMPEG(SrsFFMPEG* _ffmpeg, std::string _vhost, std::string _id); + int64_t starttime; +public: + SrsIngesterFFMPEG(); virtual ~SrsIngesterFFMPEG(); - +public: + virtual int initialize(SrsFFMPEG* ff, std::string v, std::string i); + // the ingest uri, [vhost]/[ingest id] + virtual std::string uri(); + // the alive in ms. + virtual int alive(); + virtual bool equals(std::string v, std::string i); + virtual bool equals(std::string v); +public: + virtual int start(); + virtual void stop(); + virtual int cycle(); // @see SrsFFMPEG.fast_stop(). virtual void fast_stop(); };