1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 11:51:57 +00:00

Fix #2837: Callback: Support stream_url and stream_id. v5.0.55

This commit is contained in:
winlin 2022-08-30 22:04:13 +08:00
parent 9c6774b644
commit 6a108fab6d
14 changed files with 184 additions and 80 deletions

View file

@ -1199,7 +1199,8 @@ vhost hooks.callback.srs.com {
# "action": "on_publish",
# "client_id": "9308h583",
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty"
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
@ -1215,7 +1216,8 @@ vhost hooks.callback.srs.com {
# "action": "on_unpublish",
# "client_id": "9308h583",
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty"
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
@ -1232,7 +1234,8 @@ vhost hooks.callback.srs.com {
# "client_id": "9308h583",
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", "param":"?token=xxx&salt=yyy",
# "pageUrl": "http://www.test.com/live.html", "server_id": "vid-werty"
# "pageUrl": "http://www.test.com/live.html", "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
@ -1248,7 +1251,8 @@ vhost hooks.callback.srs.com {
# "action": "on_stop",
# "client_id": "9308h583",
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty"
# "stream": "livestream", "param":"?token=xxx&salt=yyy", "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
@ -1266,7 +1270,8 @@ vhost hooks.callback.srs.com {
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", "param":"?token=xxx&salt=yyy",
# "cwd": "/usr/local/srs",
# "file": "./objs/nginx/html/live/livestream.1420254068776.flv", "server_id": "vid-werty"
# "file": "./objs/nginx/html/live/livestream.1420254068776.flv", "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):
@ -1285,7 +1290,8 @@ vhost hooks.callback.srs.com {
# "url": "live/livestream/2015-04-23/01/476584165.ts",
# "m3u8": "./objs/nginx/html/live/livestream/live.m3u8",
# "m3u8_url": "live/livestream/live.m3u8",
# "seq_no": 100, "server_id": "vid-werty"
# "seq_no": 100, "server_id": "vid-werty",
# "stream_url": "video.test.com/live/livestream", "stream_id": "vid-124q9y3"
# }
# if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success):

View file

@ -7,6 +7,7 @@ The changelog for SRS.
## SRS 5.0 Changelog
* v5.0, 2022-08-30, Fix [#2837](https://github.com/ossrs/srs/issues/2837): Callback: Support stream_url and stream_id. v5.0.55
* v5.0, 2022-08-30, STAT: Refine tcUrl for SRT/RTC. v5.0.54
* v5.0, 2022-08-30, Refactor: Extract SrsNetworkKbps from SrsKbps. v5.0.53
* v5.0, 2022-08-30, Remove bandwidth check because falsh is disabled. v5.0.52

View file

@ -292,6 +292,7 @@ SrsHttpxConn::SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd,
manager = cm;
skt = new SrsTcpConnection(fd);
enable_stat_ = false;
if (https) {
ssl = new SrsSslConnection(skt);
@ -313,6 +314,11 @@ SrsHttpxConn::~SrsHttpxConn()
srs_freep(skt);
}
void SrsHttpxConn::set_enable_stat(bool v)
{
enable_stat_ = v;
}
srs_error_t SrsHttpxConn::pop_message(ISrsHttpMessage** preq)
{
srs_error_t err = srs_success;
@ -399,9 +405,8 @@ srs_error_t SrsHttpxConn::on_message_done(ISrsHttpMessage* r, SrsHttpResponseWri
srs_error_t SrsHttpxConn::on_conn_done(srs_error_t r0)
{
// Only stat the HTTP streaming clients, ignore all API clients.
bool exists = false;
SrsStatistic::instance()->on_disconnect(get_id().c_str(), &exists);
if (exists) {
if (enable_stat_) {
SrsStatistic::instance()->on_disconnect(get_id().c_str());
SrsStatistic::instance()->kbps_add_delta(get_id().c_str(), conn->delta());
}

View file

@ -131,10 +131,14 @@ private:
SrsTcpConnection* skt;
SrsSslConnection* ssl;
SrsHttpConn* conn;
// We should never enable the stat, unless HTTP stream connection requires.
bool enable_stat_;
public:
SrsHttpxConn(bool https, ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpxConn();
public:
// Require statistic about HTTP connection, for HTTP streaming clients only.
void set_enable_stat(bool v);
// Directly read a HTTP request message.
// It's exported for HTTP stream, such as HTTP FLV, only need to write to client when
// serving it, but we need to start a thread to read message to detect whether FD is closed.

View file

@ -138,6 +138,12 @@ srs_error_t SrsHttpHooks::on_publish(string url, SrsRequest* req)
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}
std::string data = obj->dumps();
std::string res;
int status_code;
@ -171,9 +177,16 @@ void SrsHttpHooks::on_unpublish(string url, SrsRequest* req)
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}
std::string data = obj->dumps();
std::string res;
int status_code;
@ -211,9 +224,16 @@ srs_error_t SrsHttpHooks::on_play(string url, SrsRequest* req)
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("pageUrl", SrsJsonAny::str(req->pageUrl.c_str()));
obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}
std::string data = obj->dumps();
std::string res;
int status_code;
@ -247,9 +267,16 @@ void SrsHttpHooks::on_stop(string url, SrsRequest* req)
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}
std::string data = obj->dumps();
std::string res;
int status_code;
@ -287,11 +314,18 @@ srs_error_t SrsHttpHooks::on_dvr(SrsContextId c, string url, SrsRequest* req, st
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("cwd", SrsJsonAny::str(cwd.c_str()));
obj->set("file", SrsJsonAny::str(file.c_str()));
obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}
std::string data = obj->dumps();
std::string res;
int status_code;
@ -332,6 +366,7 @@ srs_error_t SrsHttpHooks::on_hls(SrsContextId c, string url, SrsRequest* req, st
obj->set("ip", SrsJsonAny::str(req->ip.c_str()));
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("duration", SrsJsonAny::number(srsu2ms(duration)/1000.0));
@ -342,6 +377,12 @@ srs_error_t SrsHttpHooks::on_hls(SrsContextId c, string url, SrsRequest* req, st
obj->set("m3u8_url", SrsJsonAny::str(m3u8_url.c_str()));
obj->set("seq_no", SrsJsonAny::integer(sn));
obj->set("stream_url", SrsJsonAny::str(req->get_stream_url().c_str()));
SrsStatisticStream* stream = stat->find_stream_by_url(req->get_stream_url());
if (stream) {
obj->set("stream_id", SrsJsonAny::str(stream->id.c_str()));
}
std::string data = obj->dumps();
std::string res;
int status_code;

View file

@ -128,6 +128,13 @@ srs_error_t SrsHlsStream::serve_new_session(ISrsHttpResponseWriter* w, ISrsHttpM
SrsContextRestore(_srs_context->get_id());
_srs_context->set_id(SrsContextId().set_value(ctx));
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(ctx, req, NULL, SrsHlsPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}
// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_play(req)) != srs_success) {
return srs_error_wrap(err, "HLS: http_hooks_on_play");
}
@ -155,12 +162,6 @@ srs_error_t SrsHlsStream::serve_new_session(ISrsHttpResponseWriter* w, ISrsHttpM
return srs_error_wrap(err, "final request");
}
// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(ctx, req, NULL, SrsHlsPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}
return err;
}
@ -497,6 +498,13 @@ srs_error_t SrsVodStream::serve_ts_ctx(ISrsHttpResponseWriter * w, ISrsHttpMessa
// SrsServer also stat all HTTP connections including this one, but it should be ignored because the id is not
// matched to any exists client. And we will do stat for the HLS streaming by session in hls_ctx.
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsHttpConn* hc = dynamic_cast<SrsHttpConn*>(hr->connection());
SrsHttpxConn* hxc = dynamic_cast<SrsHttpxConn*>(hc->handler());
// Note that we never enable the stat for the HTTP connection, because we always stat the pseudo HLS streaming
// session identified by hls_ctx, which served by an SrsHlsStream object.
hxc->set_enable_stat(false);
// Serve by default HLS handler.
err = SrsHttpFileServer::serve_ts_ctx(w, r, fullpath);

View file

@ -524,10 +524,28 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
{
srs_error_t err = srs_success;
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsHttpConn* hc = dynamic_cast<SrsHttpConn*>(hr->connection());
SrsHttpxConn* hxc = dynamic_cast<SrsHttpxConn*>(hc->handler());
// Note that we should enable stat for HTTP streaming client, because each HTTP streaming connection is a real
// session that should have statistics for itself.
hxc->set_enable_stat(true);
// Correct the app and stream by path, which is created from template.
// @remark Be careful that the stream has extension now, might cause identify fail.
req->stream = srs_path_basename(r->path());
// update client ip
req->ip = hc->remote_ip();
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, hc, SrsFlvPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}
// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_play(r)) != srs_success) {
return srs_error_wrap(err, "http hook");
}
@ -591,15 +609,6 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsHttpConn* hc = dynamic_cast<SrsHttpConn*>(hr->connection());
// update client ip
req->ip = hc->remote_ip();
// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, hc, SrsFlvPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}
// the memory writer.
SrsBufferWriter writer(w);
if ((err = enc->initialize(&writer, cache)) != srs_success) {
@ -616,25 +625,25 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Try to use fast flv encoder, remember that it maybe NULL.
SrsFlvStreamEncoder* ffe = dynamic_cast<SrsFlvStreamEncoder*>(enc);
// Note that the handler of hc now is rohc.
SrsHttpxConn* rohc = dynamic_cast<SrsHttpxConn*>(hc->handler());
srs_assert(rohc);
// Note that the handler of hc now is hxc.
SrsHttpxConn* hxc = dynamic_cast<SrsHttpxConn*>(hc->handler());
srs_assert(hxc);
// Set the socket options for transport.
bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost);
if (tcp_nodelay) {
if ((err = rohc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
if ((err = hxc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
return srs_error_wrap(err, "set tcp nodelay");
}
}
srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);
if ((err = rohc->set_socket_buffer(mw_sleep)) != srs_success) {
if ((err = hxc->set_socket_buffer(mw_sleep)) != srs_success) {
return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep);
}
// Start a thread to receive all messages from client, then drop them.
SrsHttpRecvThread* trd = new SrsHttpRecvThread(rohc);
SrsHttpRecvThread* trd = new SrsHttpRecvThread(hxc);
SrsAutoFree(SrsHttpRecvThread, trd);
if ((err = trd->start()) != srs_success) {

View file

@ -130,10 +130,6 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
ruc.req_->vhost = parsed_vhost->arg0();
}
if ((err = http_hooks_on_play(ruc.req_)) != srs_success) {
return srs_error_wrap(err, "RTC: http_hooks_on_play");
}
// For client to specifies the candidate(EIP) of server.
string eip = r->query_get("eip");
if (eip.empty()) {
@ -202,11 +198,17 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
}
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
// We must do stat the client before hooks, because hooks depends on it.
SrsRtcConnection* session = NULL;
if ((err = server_->create_session(&ruc, local_sdp, &session)) != srs_success) {
return srs_error_wrap(err, "create session, dtls=%u, srtp=%u, eip=%s", ruc.dtls_, ruc.srtp_, eip.c_str());
}
// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_play(ruc.req_)) != srs_success) {
return srs_error_wrap(err, "RTC: http_hooks_on_play");
}
ostringstream os;
if ((err = local_sdp.encode(os)) != srs_success) {
return srs_error_wrap(err, "encode sdp");
@ -406,10 +408,6 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
ruc.req_->vhost = parsed_vhost->arg0();
}
if ((err = http_hooks_on_publish(ruc.req_)) != srs_success) {
return srs_error_wrap(err, "RTC: http_hooks_on_publish");
}
// For client to specifies the candidate(EIP) of server.
string eip = r->query_get("eip");
if (eip.empty()) {
@ -455,11 +453,17 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
}
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
// We must do stat the client before hooks, because hooks depends on it.
SrsRtcConnection* session = NULL;
if ((err = server_->create_session(&ruc, local_sdp, &session)) != srs_success) {
return srs_error_wrap(err, "create session");
}
// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_publish(ruc.req_)) != srs_success) {
return srs_error_wrap(err, "RTC: http_hooks_on_publish");
}
ostringstream os;
if ((err = local_sdp.encode(os)) != srs_success) {
return srs_error_wrap(err, "encode sdp");

View file

@ -471,6 +471,12 @@ srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map<uint32_t, Srs
req_ = req->copy();
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(cid_.c_str(), req_, session_, SrsRtcConnPlay)) != srs_success) {
return srs_error_wrap(err, "rtc: stat client");
}
if ((err = _srs_rtc_sources->fetch_or_create(req_, &source_)) != srs_success) {
return srs_error_wrap(err, "rtc fetch source failed");
}
@ -589,12 +595,6 @@ srs_error_t SrsRtcPlayStream::start()
}
}
// update the statistic when client discoveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(cid_.c_str(), req_, session_, SrsRtcConnPlay)) != srs_success) {
return srs_error_wrap(err, "rtc: stat client");
}
is_started = true;
return err;
@ -1130,6 +1130,12 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
req_ = r->copy();
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(cid_.c_str(), req_, session_, SrsRtcConnPublish)) != srs_success) {
return srs_error_wrap(err, "rtc: stat client");
}
if (stream_desc->audio_track_desc_) {
audio_tracks_.push_back(new SrsRtcAudioRecvTrack(session_, stream_desc->audio_track_desc_));
}
@ -1237,12 +1243,6 @@ srs_error_t SrsRtcPublishStream::start()
}
}
// update the statistic when client discoveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(cid_.c_str(), req_, session_, SrsRtcConnPublish)) != srs_success) {
return srs_error_wrap(err, "rtc: stat client");
}
is_started = true;
return err;

View file

@ -524,6 +524,14 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start play");
}
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
return srs_error_wrap(err, "rtmp: stat client");
}
// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_play()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on play");
}
@ -691,12 +699,6 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons
srs_assert(req);
srs_assert(consumer);
// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
return srs_error_wrap(err, "rtmp: stat client");
}
// initialize other components
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
SrsAutoFree(SrsPithyPrint, pprint);
@ -825,6 +827,13 @@ srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
}
}
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
return srs_error_wrap(err, "rtmp: stat client");
}
// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_publish()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on publish");
}
@ -860,12 +869,6 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
SrsAutoFree(SrsPithyPrint, pprint);
// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
return srs_error_wrap(err, "rtmp: stat client");
}
// start isolate recv thread.
// TODO: FIXME: Pass the callback here.
if ((err = rtrd->start()) != srs_success) {

View file

@ -303,6 +303,13 @@ srs_error_t SrsMpegtsSrtConn::publishing()
{
srs_error_t err = srs_success;
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPublish)) != srs_success) {
return srs_error_wrap(err, "srt: stat client");
}
// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_publish()) != srs_success) {
return srs_error_wrap(err, "srt: callback on publish");
}
@ -321,15 +328,17 @@ srs_error_t SrsMpegtsSrtConn::playing()
{
srs_error_t err = srs_success;
if ((err = http_hooks_on_play()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on play");
}
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPlay)) != srs_success) {
return srs_error_wrap(err, "rtmp: stat client");
}
// We must do hook after stat, because depends on it.
if ((err = http_hooks_on_play()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on play");
}
err = do_playing();
http_hooks_on_stop();
@ -385,11 +394,6 @@ srs_error_t SrsMpegtsSrtConn::do_publishing()
SrsPithyPrint* pprint = SrsPithyPrint::create_srt_publish();
SrsAutoFree(SrsPithyPrint, pprint);
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(_srs_context->get_id().c_str(), req_, this, SrsSrtConnPublish)) != srs_success) {
return srs_error_wrap(err, "srt: stat client");
}
int nb_packets = 0;
// Max udp packet size equal to 1500.

View file

@ -114,6 +114,8 @@ srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj)
obj->set("name", SrsJsonAny::str(stream.c_str()));
obj->set("vhost", SrsJsonAny::str(vhost->id.c_str()));
obj->set("app", SrsJsonAny::str(app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(tcUrl.c_str()));
obj->set("url", SrsJsonAny::str(url.c_str()));
obj->set("live_ms", SrsJsonAny::integer(srsu2ms(srs_get_system_time())));
obj->set("clients", SrsJsonAny::integer(nb_clients));
obj->set("frames", SrsJsonAny::integer(frames->sugar));
@ -218,6 +220,7 @@ srs_error_t SrsStatisticClient::dumps(SrsJsonObject* obj)
obj->set("swfUrl", SrsJsonAny::str(req->swfUrl.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("url", SrsJsonAny::str(req->get_stream_url().c_str()));
obj->set("name", SrsJsonAny::str(req->stream.c_str()));
obj->set("type", SrsJsonAny::str(srs_client_type_string(type).c_str()));
obj->set("publish", SrsJsonAny::boolean(srs_client_type_is_publish(type)));
obj->set("alive", SrsJsonAny::number(srsu2ms(srs_get_system_time() - create) / 1000.0));
@ -311,6 +314,15 @@ SrsStatisticStream* SrsStatistic::find_stream(string sid)
return NULL;
}
SrsStatisticStream* SrsStatistic::find_stream_by_url(string url)
{
std::map<std::string, SrsStatisticStream*>::iterator it;
if ((it = rstreams.find(url)) != rstreams.end()) {
return it->second;
}
return NULL;
}
SrsStatisticClient* SrsStatistic::find_client(string client_id)
{
std::map<std::string, SrsStatisticClient*>::iterator it;
@ -413,10 +425,9 @@ srs_error_t SrsStatistic::on_client(std::string id, SrsRequest* req, ISrsExpire*
return err;
}
void SrsStatistic::on_disconnect(std::string id, bool* exists)
void SrsStatistic::on_disconnect(std::string id)
{
std::map<std::string, SrsStatisticClient*>::iterator it = clients.find(id);
if (exists) *exists = (it != clients.end());
if (it == clients.end()) return;
SrsStatisticClient* client = it->second;
@ -683,7 +694,12 @@ SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req)
SrsStatisticStream* SrsStatistic::create_stream(SrsStatisticVhost* vhost, SrsRequest* req)
{
std::string url = req->get_stream_url();
// To identify a stream, use url without extension, for example, the bellow are the same stream:
// ossrs.io/live/livestream
// ossrs.io/live/livestream.flv
// ossrs.io/live/livestream.m3u8
// Note that we also don't use schema, and vhost is optional.
string url = req->get_stream_url();
SrsStatisticStream* stream = NULL;
@ -694,6 +710,7 @@ SrsStatisticStream* SrsStatistic::create_stream(SrsStatisticVhost* vhost, SrsReq
stream->stream = req->stream;
stream->app = req->app;
stream->url = url;
stream->tcUrl = req->tcUrl;
rstreams[url] = stream;
streams[stream->id] = stream;
return stream;

View file

@ -53,6 +53,7 @@ public:
std::string app;
std::string stream;
std::string url;
std::string tcUrl;
bool active;
// The publisher connection id.
std::string publisher_id;
@ -147,6 +148,7 @@ public:
virtual SrsStatisticVhost* find_vhost_by_id(std::string vid);
virtual SrsStatisticVhost* find_vhost_by_name(std::string name);
virtual SrsStatisticStream* find_stream(std::string sid);
virtual SrsStatisticStream* find_stream_by_url(std::string url);
virtual SrsStatisticClient* find_client(std::string client_id);
public:
// When got video info for stream.
@ -175,7 +177,7 @@ public:
// @remark the on_disconnect always call, while the on_client is call when
// only got the request object, so the client specified by id maybe not
// exists in stat.
virtual void on_disconnect(std::string id, bool* exists = NULL);
virtual void on_disconnect(std::string id);
private:
// Cleanup the stream if stream is not active and for the last client.
void cleanup_stream(SrsStatisticStream* stream);

View file

@ -9,6 +9,6 @@
#define VERSION_MAJOR 5
#define VERSION_MINOR 0
#define VERSION_REVISION 54
#define VERSION_REVISION 55
#endif