From 8d534d34709cf343535b5a0d17fa1aaab425d25c Mon Sep 17 00:00:00 2001 From: "qiang.li" Date: Sat, 3 Jan 2015 12:57:13 +0800 Subject: [PATCH 1/2] get stream info use http api #227 --- trunk/src/app/srs_app_edge.hpp | 8 +++ trunk/src/app/srs_app_http_api.cpp | 90 +++++++++++++++++++++++++++++- trunk/src/app/srs_app_http_api.hpp | 22 ++++++++ trunk/src/app/srs_app_source.cpp | 11 +++- trunk/src/app/srs_app_source.hpp | 6 ++ 5 files changed, 135 insertions(+), 2 deletions(-) diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 6a7329cfb..285806a95 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -188,6 +188,10 @@ public: * when ingester start to play stream. */ virtual int on_ingest_play(); + /** + * get state info. + */ + virtual int get_state() { return state; } }; /** @@ -219,6 +223,10 @@ public: * proxy unpublish stream to edge. */ virtual void on_proxy_unpublish(); + /** + * get state info. + */ + virtual int get_state() { return state; } }; #endif diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 3958fd838..54e824843 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -35,6 +35,8 @@ using namespace std; #include #include #include +#include +#include SrsApiRoot::SrsApiRoot() { @@ -122,6 +124,8 @@ SrsApiV1::SrsApiV1() handlers.push_back(new SrsApiMemInfos()); handlers.push_back(new SrsApiAuthors()); handlers.push_back(new SrsApiRequests()); + handlers.push_back(new SrsApiVhosts()); + handlers.push_back(new SrsApiStreams()); } SrsApiV1::~SrsApiV1() @@ -147,7 +151,9 @@ int SrsApiV1::do_process_request(SrsStSocket* skt, SrsHttpMessage* req) << __SRS_JFIELD_STR("system_proc_stats", "the system process stats") << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("meminfos", "the meminfo of system") << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("authors", "the primary authors and contributors") << __SRS_JFIELD_CONT - << __SRS_JFIELD_STR("requests", "the request itself, for http debug") + << __SRS_JFIELD_STR("requests", "the request itself, for http debug") << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("vhosts", "list all vhosts") << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("streams?(name/vhost)=xxx", "list streams that match the name or vhost") << __SRS_JOBJECT_END << __SRS_JOBJECT_END; @@ -500,6 +506,88 @@ int SrsApiAuthors::do_process_request(SrsStSocket* skt, SrsHttpMessage* req) return res_json(skt, req, ss.str()); } +SrsApiVhosts::SrsApiVhosts() +{ +} + +SrsApiVhosts::~SrsApiVhosts() +{ +} + +bool SrsApiVhosts::can_handle(const char* path, int length, const char** /*pchild*/) +{ + return srs_path_equals("/vhosts", path, length); +} + +int SrsApiVhosts::do_process_request(SrsStSocket* skt, SrsHttpMessage* req) +{ + std::stringstream ss; + + ss << __SRS_JARRAY_START; + bool first = true; + std::map *source_pool = SrsSource::get_source_pool(); + std::map::iterator it; + for (it=source_pool->begin(); it!=source_pool->end(); it++) { + SrsRequest* source_req = it->second->get_reqinfo(); + if (first) first = false; + else ss << __SRS_JFIELD_CONT; + + ss << "\"" << source_req->vhost << "\""; + } + ss << __SRS_JARRAY_END; + + return res_json(skt, req, ss.str()); +} + +SrsApiStreams::SrsApiStreams() +{ +} + +SrsApiStreams::~SrsApiStreams() +{ +} + +bool SrsApiStreams::can_handle(const char* path, int length, const char** /*pchild*/) +{ + return srs_path_equals("/streams", path, length); +} + +int SrsApiStreams::do_process_request(SrsStSocket* skt, SrsHttpMessage* req) +{ + std::stringstream ss; + + std::string query_name = req->query_get("name"); + std::string query_vhost = req->query_get("vhost"); + if (query_name.size()>0 || query_vhost.size()>0) { + ss << __SRS_JARRAY_START; + bool first = true; + std::map *source_pool = SrsSource::get_source_pool(); + std::map::iterator it; + for (it=source_pool->begin(); it!=source_pool->end(); it++) { + SrsSource* source = it->second; + SrsRequest* source_req = source->get_reqinfo(); + if (source_req->stream==query_name || source_req->vhost==query_vhost) { + if (first) first = false; + else ss << __SRS_JFIELD_CONT; + + ss << __SRS_JOBJECT_START + << __SRS_JFIELD_STR("name", source_req->stream) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("url", source_req->tcUrl) << __SRS_JFIELD_CONT + << __SRS_JFIELD_ORG("clients", source->get_consumers_size()) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("status", (source->can_publish()?"idle":"streaming")) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("type", source->get_source_type()) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("codec", "") + << __SRS_JOBJECT_END; + } + } + ss << __SRS_JARRAY_END; + } else { + return res_error(skt, req, 400, "Bad Request", "unknown query"); + } + + return res_json(skt, req, ss.str()); +} + SrsHttpApi::SrsHttpApi(SrsServer* srs_server, st_netfd_t client_stfd, SrsHttpHandler* _handler) : SrsConnection(srs_server, client_stfd) { diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index ea12ffcf1..a29e46066 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -164,6 +164,28 @@ protected: virtual int do_process_request(SrsStSocket* skt, SrsHttpMessage* req); }; +class SrsApiVhosts : public SrsHttpHandler +{ +public: + SrsApiVhosts(); + virtual ~SrsApiVhosts(); +public: + virtual bool can_handle(const char* path, int length, const char** pchild); +protected: + virtual int do_process_request(SrsStSocket* skt, SrsHttpMessage* req); +}; + +class SrsApiStreams : public SrsHttpHandler +{ +public: + SrsApiStreams(); + virtual ~SrsApiStreams(); +public: + virtual bool can_handle(const char* path, int length, const char** pchild); +protected: + virtual int do_process_request(SrsStSocket* skt, SrsHttpMessage* req); +}; + class SrsHttpApi : public SrsConnection { private: diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 6003c8883..e788fb465 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1807,4 +1807,13 @@ void SrsSource::destroy_forwarders() forwarders.clear(); } - +std::string SrsSource::get_source_type() +{ + if (play_edge->get_state() == SrsEdgeStateIngestConnected) { + return "origin pull"; + } else if (publish_edge->get_state() == SrsEdgeStatePublish) { + return "edge publish"; + } else { + return "normal publish"; + } +} diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 145adb8eb..7a6ce1f2a 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -486,6 +486,12 @@ public: private: virtual int create_forwarders(); virtual void destroy_forwarders(); +//get information +public: + static std::map* get_source_pool() { return &pool; } + virtual SrsRequest* get_reqinfo() { return _req; } + virtual std::size_t get_consumers_size() { return consumers.size(); } + virtual std::string get_source_type(); }; #endif From a4a93613d0cda57f51ea4f4cb8a60f2965d9c100 Mon Sep 17 00:00:00 2001 From: "qiang.li" Date: Sun, 4 Jan 2015 22:47:12 +0800 Subject: [PATCH 2/2] add statistic for stream --- trunk/configure | 2 +- trunk/src/app/srs_app_edge.hpp | 8 --- trunk/src/app/srs_app_http_api.cpp | 62 +++++++++++++++--------- trunk/src/app/srs_app_rtmp_conn.cpp | 3 ++ trunk/src/app/srs_app_source.cpp | 10 ---- trunk/src/app/srs_app_source.hpp | 6 --- trunk/src/app/srs_app_statistic.cpp | 75 +++++++++++++++++++++++++++++ trunk/src/app/srs_app_statistic.hpp | 70 +++++++++++++++++++++++++++ 8 files changed, 188 insertions(+), 48 deletions(-) create mode 100644 trunk/src/app/srs_app_statistic.cpp create mode 100644 trunk/src/app/srs_app_statistic.hpp diff --git a/trunk/configure b/trunk/configure index dd6d5f4e3..c543d150c 100755 --- a/trunk/configure +++ b/trunk/configure @@ -389,7 +389,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api" "srs_app_http_conn" "srs_app_http_hooks" "srs_app_json" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_dvr" "srs_app_edge" "srs_app_kbps" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_avc_aac" - "srs_app_recv_thread") + "srs_app_recv_thread" "srs_app_statistic") APP_INCS="src/app"; MODULE_DIR=${APP_INCS} . auto/modules.sh APP_OBJS="${MODULE_OBJS[@]}" fi diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 285806a95..6a7329cfb 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -188,10 +188,6 @@ public: * when ingester start to play stream. */ virtual int on_ingest_play(); - /** - * get state info. - */ - virtual int get_state() { return state; } }; /** @@ -223,10 +219,6 @@ public: * proxy unpublish stream to edge. */ virtual void on_proxy_unpublish(); - /** - * get state info. - */ - virtual int get_state() { return state; } }; #endif diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 54e824843..a0c43492e 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -26,6 +26,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #ifdef SRS_AUTO_HTTP_API #include +#include using namespace std; #include @@ -35,7 +36,7 @@ using namespace std; #include #include #include -#include +#include #include SrsApiRoot::SrsApiRoot() @@ -153,7 +154,7 @@ int SrsApiV1::do_process_request(SrsStSocket* skt, SrsHttpMessage* req) << __SRS_JFIELD_STR("authors", "the primary authors and contributors") << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("requests", "the request itself, for http debug") << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("vhosts", "list all vhosts") << __SRS_JFIELD_CONT - << __SRS_JFIELD_STR("streams?(name/vhost)=xxx", "list streams that match the name or vhost") + << __SRS_JFIELD_STR("streams", "list streams that match the name or vhost") << __SRS_JOBJECT_END << __SRS_JOBJECT_END; @@ -523,16 +524,26 @@ int SrsApiVhosts::do_process_request(SrsStSocket* skt, SrsHttpMessage* req) { std::stringstream ss; + std::set vhost_set; + SrsStreamInfoMap* pool = SrsStatistic::instance()->get_pool(); + SrsStreamInfoMap::iterator it; + for (it = pool->begin(); it != pool->end(); it++) { + if (it->second->_req == NULL) + continue; + vhost_set.insert(it->second->_req->vhost); + } + ss << __SRS_JARRAY_START; bool first = true; - std::map *source_pool = SrsSource::get_source_pool(); - std::map::iterator it; - for (it=source_pool->begin(); it!=source_pool->end(); it++) { - SrsRequest* source_req = it->second->get_reqinfo(); - if (first) first = false; - else ss << __SRS_JFIELD_CONT; + std::set::iterator it_set; + for (it_set = vhost_set.begin(); it_set != vhost_set.end(); it_set++) { + if (first) { + first = false; + } else { + ss << __SRS_JFIELD_CONT; + } - ss << "\"" << source_req->vhost << "\""; + ss << "\"" << (*it_set) << "\""; } ss << __SRS_JARRAY_END; @@ -558,24 +569,29 @@ int SrsApiStreams::do_process_request(SrsStSocket* skt, SrsHttpMessage* req) std::string query_name = req->query_get("name"); std::string query_vhost = req->query_get("vhost"); - if (query_name.size()>0 || query_vhost.size()>0) { + if (query_name.size() > 0 || query_vhost.size() > 0) { ss << __SRS_JARRAY_START; bool first = true; - std::map *source_pool = SrsSource::get_source_pool(); - std::map::iterator it; - for (it=source_pool->begin(); it!=source_pool->end(); it++) { - SrsSource* source = it->second; - SrsRequest* source_req = source->get_reqinfo(); - if (source_req->stream==query_name || source_req->vhost==query_vhost) { - if (first) first = false; - else ss << __SRS_JFIELD_CONT; + SrsStreamInfoMap* pool = SrsStatistic::instance()->get_pool(); + SrsStreamInfoMap::iterator it; + for (it = pool->begin(); it != pool->end(); it++) { + SrsRequest* reqinfo = it->second->_req; + if (reqinfo == NULL) + continue; + + if (reqinfo->stream == query_name || reqinfo->vhost == query_vhost) { + if (first) { + first = false; + } else { + ss << __SRS_JFIELD_CONT; + } ss << __SRS_JOBJECT_START - << __SRS_JFIELD_STR("name", source_req->stream) << __SRS_JFIELD_CONT - << __SRS_JFIELD_STR("url", source_req->tcUrl) << __SRS_JFIELD_CONT - << __SRS_JFIELD_ORG("clients", source->get_consumers_size()) << __SRS_JFIELD_CONT - << __SRS_JFIELD_STR("status", (source->can_publish()?"idle":"streaming")) << __SRS_JFIELD_CONT - << __SRS_JFIELD_STR("type", source->get_source_type()) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("name", reqinfo->stream) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("url", reqinfo->tcUrl) << __SRS_JFIELD_CONT + << __SRS_JFIELD_ORG("clients", 0) << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("status", "idle") << __SRS_JFIELD_CONT + << __SRS_JFIELD_STR("type", "") << __SRS_JFIELD_CONT << __SRS_JFIELD_STR("codec", "") << __SRS_JOBJECT_END; } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 87eef5c4a..43feda5a7 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -51,6 +51,7 @@ using namespace std; #include #include #include +#include // when stream is busy, for example, streaming is already // publishing, when a new client to request to publish, @@ -382,6 +383,8 @@ int SrsRtmpConn::stream_service_cycle() } srs_assert(source != NULL); + SrsStatistic::instance()->add_request_info(source, req); + // check ASAP, to fail it faster if invalid. if (type != SrsRtmpConnPlay && !vhost_is_edge) { // check publish available diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index e788fb465..a7cbea204 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1807,13 +1807,3 @@ void SrsSource::destroy_forwarders() forwarders.clear(); } -std::string SrsSource::get_source_type() -{ - if (play_edge->get_state() == SrsEdgeStateIngestConnected) { - return "origin pull"; - } else if (publish_edge->get_state() == SrsEdgeStatePublish) { - return "edge publish"; - } else { - return "normal publish"; - } -} diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 7a6ce1f2a..145adb8eb 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -486,12 +486,6 @@ public: private: virtual int create_forwarders(); virtual void destroy_forwarders(); -//get information -public: - static std::map* get_source_pool() { return &pool; } - virtual SrsRequest* get_reqinfo() { return _req; } - virtual std::size_t get_consumers_size() { return consumers.size(); } - virtual std::string get_source_type(); }; #endif diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp new file mode 100644 index 000000000..d93dfe737 --- /dev/null +++ b/trunk/src/app/srs_app_statistic.cpp @@ -0,0 +1,75 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include + +SrsStreamInfo::SrsStreamInfo() +{ + _req = NULL; +} + +SrsStreamInfo::~SrsStreamInfo() +{ + if (_req != NULL) + delete _req; +} + +SrsStatistic *SrsStatistic::_instance = NULL; + +SrsStatistic::SrsStatistic() +{ + +} + +SrsStatistic::~SrsStatistic() +{ + SrsStreamInfoMap::iterator it; + for (it = pool.begin(); it != pool.end(); it++) { + delete it->second; + } +} + +SrsStreamInfoMap* SrsStatistic::get_pool() +{ + return &pool; +} + +SrsStreamInfo* SrsStatistic::get(void *p) +{ + SrsStreamInfoMap::iterator it = pool.find(p); + if (it == pool.end()) { + pool[p] = new SrsStreamInfo(); + return pool[p]; + } else { + return it->second; + } +} + +void SrsStatistic::add_request_info(void *p, SrsRequest *req) +{ + SrsStreamInfo *info = get(p); + if (info->_req == NULL) + info->_req = req->copy(); +} \ No newline at end of file diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp new file mode 100644 index 000000000..42e0d81d2 --- /dev/null +++ b/trunk/src/app/srs_app_statistic.hpp @@ -0,0 +1,70 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2015 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_APP_STATISTIC_HPP +#define SRS_APP_STATISTIC_HPP + +/* +#include +*/ + +#include + +#include + +class SrsRequest; + +class SrsStreamInfo +{ +public: + SrsStreamInfo(); + virtual ~SrsStreamInfo(); + + SrsRequest *_req; +}; +typedef std::map SrsStreamInfoMap; + +class SrsStatistic +{ +public: + static SrsStatistic *instance() + { + if (_instance == NULL) { + _instance = new SrsStatistic(); + } + return _instance; + } + + virtual SrsStreamInfoMap* get_pool(); + + virtual void add_request_info(void *p, SrsRequest *req); + +private: + SrsStatistic(); + virtual ~SrsStatistic(); + static SrsStatistic *_instance; + SrsStreamInfoMap pool; + virtual SrsStreamInfo *get(void *p); +}; + +#endif \ No newline at end of file