1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

support http hooks: on_connect/close/publish/unpublish/play/stop.

This commit is contained in:
winlin 2013-12-08 12:45:12 +08:00
parent 3d4474f620
commit 889ad9238b
11 changed files with 971 additions and 51 deletions

View file

@ -190,6 +190,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw
* nginx v1.5.0: 139524 lines <br/> * nginx v1.5.0: 139524 lines <br/>
### History ### History
* v0.8, 2013-12-08, support http hooks: on_connect/close/publish/unpublish/play/stop.
* v0.8, 2013-12-08, support multiple http hooks for a event. * v0.8, 2013-12-08, support multiple http hooks for a event.
* v0.8, 2013-12-07, support http callback hooks, on_connect. * v0.8, 2013-12-07, support http callback hooks, on_connect.
* v0.8, 2013-12-07, support network based cli and json result, add CherryPy 3.2.4. * v0.8, 2013-12-07, support network based cli and json result, add CherryPy 3.2.4.

View file

@ -139,6 +139,8 @@ vhost hooks.callback.vhost.com {
# when client connect to vhost/app, call the hook, # when client connect to vhost/app, call the hook,
# the request in the POST data string is a object encode by json: # the request in the POST data string is a object encode by json:
# { # {
# "action": "on_connect",
# "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "pageUrl": "http://www.test.com/live.html" # "pageUrl": "http://www.test.com/live.html"
# } # }
@ -148,6 +150,75 @@ vhost hooks.callback.vhost.com {
# support multiple api hooks, format: # support multiple api hooks, format:
# on_connect http://xxx/api0 http://xxx/api1 http://xxx/apiN # on_connect http://xxx/api0 http://xxx/api1 http://xxx/apiN
on_connect http://127.0.0.1:8085/api/v1/clients http://localhost:8085/api/v1/clients; on_connect http://127.0.0.1:8085/api/v1/clients http://localhost:8085/api/v1/clients;
# when client close/disconnect to vhost/app/stream, call the hook,
# the request in the POST data string is a object encode by json:
# {
# "action": "on_close",
# "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live"
# }
# if valid, the hook must return HTTP code 200(Stauts OK) and response
# an int value specifies the error code(0 corresponding to success):
# 0
# support multiple api hooks, format:
# on_close http://xxx/api0 http://xxx/api1 http://xxx/apiN
on_close http://127.0.0.1:8085/api/v1/clients http://localhost:8085/api/v1/clients;
# when client(encoder) publish to vhost/app/stream, call the hook,
# the request in the POST data string is a object encode by json:
# {
# "action": "on_publish",
# "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream"
# }
# if valid, the hook must return HTTP code 200(Stauts OK) and response
# an int value specifies the error code(0 corresponding to success):
# 0
# support multiple api hooks, format:
# on_publish http://xxx/api0 http://xxx/api1 http://xxx/apiN
on_publish http://127.0.0.1:8085/api/v1/streams http://localhost:8085/api/v1/streams;
# when client(encoder) stop publish to vhost/app/stream, call the hook,
# the request in the POST data string is a object encode by json:
# {
# "action": "on_unpublish",
# "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream"
# }
# if valid, the hook must return HTTP code 200(Stauts OK) and response
# an int value specifies the error code(0 corresponding to success):
# 0
# support multiple api hooks, format:
# on_unpublish http://xxx/api0 http://xxx/api1 http://xxx/apiN
on_unpublish http://127.0.0.1:8085/api/v1/streams http://localhost:8085/api/v1/streams;
# when client start to play vhost/app/stream, call the hook,
# the request in the POST data string is a object encode by json:
# {
# "action": "on_play",
# "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream"
# }
# if valid, the hook must return HTTP code 200(Stauts OK) and response
# an int value specifies the error code(0 corresponding to success):
# 0
# support multiple api hooks, format:
# on_play http://xxx/api0 http://xxx/api1 http://xxx/apiN
on_play http://127.0.0.1:8085/api/v1/sessions http://localhost:8085/api/v1/sessions;
# when client stop to play vhost/app/stream, call the hook,
# the request in the POST data string is a object encode by json:
# {
# "action": "on_stop",
# "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream"
# }
# if valid, the hook must return HTTP code 200(Stauts OK) and response
# an int value specifies the error code(0 corresponding to success):
# 0
# support multiple api hooks, format:
# on_stop http://xxx/api0 http://xxx/api1 http://xxx/apiN
on_stop http://127.0.0.1:8085/api/v1/sessions http://localhost:8085/api/v1/sessions;
} }
} }
# the mirror filter of ffmpeg, @see: http://ffmpeg.org/ffmpeg-filters.html#Filtering-Introduction # the mirror filter of ffmpeg, @see: http://ffmpeg.org/ffmpeg-filters.html#Filtering-Introduction

View file

@ -57,10 +57,11 @@ class Error:
success = 0 success = 0
# error when parse json # error when parse json
system_parse_json = 100 system_parse_json = 100
# request action invalid
request_invalid_action = 200
''' '''
handle the clients requests: handle the clients requests: connect/disconnect vhost/app.
POST: create new client, handle the SRS on_connect callback.
''' '''
class RESTClients(object): class RESTClients(object):
exposed = True exposed = True
@ -72,13 +73,24 @@ class RESTClients(object):
return json.dumps(clients) return json.dumps(clients)
''' '''
for SRS hook: on_connect for SRS hook: on_connect/on_close
on_connect:
when client connect to vhost/app, call the hook, when client connect to vhost/app, call the hook,
the request in the POST data string is a object encode by json: the request in the POST data string is a object encode by json:
{ {
"action": "on_connect",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"pageUrl": "http://www.test.com/live.html" "pageUrl": "http://www.test.com/live.html"
} }
on_close:
when client close/disconnect to vhost/app/stream, call the hook,
the request in the POST data string is a object encode by json:
{
"action": "on_close",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live"
}
if valid, the hook must return HTTP code 200(Stauts OK) and response if valid, the hook must return HTTP code 200(Stauts OK) and response
an int value specifies the error code(0 corresponding to success): an int value specifies the error code(0 corresponding to success):
0 0
@ -87,29 +99,227 @@ class RESTClients(object):
enable_crossdomain() enable_crossdomain()
# return the error code in str # return the error code in str
ret = Error.success code = Error.success
req = cherrypy.request.body.read() req = cherrypy.request.body.read()
trace("post to clients, req=%s"%(req)) trace("post to clients, req=%s"%(req))
try: try:
json_req = json.loads(req) json_req = json.loads(req)
except Exception, ex: except Exception, ex:
ret = Error.system_parse_json code = Error.system_parse_json
trace("parse the request to json failed, req=%s, ex=%s, ret=%s"%(req, ex, ret)) trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
return str(ret) return str(code)
trace("srs on_connect: client ip=%s, vhost=%s, app=%s, pageUrl=%s"%( action = json_req["action"]
json_req["ip"], json_req["vhost"], json_req["app"], json_req["pageUrl"] if action == "on_connect":
)) code = self.__on_connect(json_req)
elif action == "on_close":
code = self.__on_close(json_req)
else:
trace("invalid request action: %s"%(json_req["action"]))
code = Error.request_invalid_action
# TODO: valid the client. return str(code)
trace("valid clients post request success.")
return str(ret)
def OPTIONS(self): def OPTIONS(self):
enable_crossdomain() enable_crossdomain()
def __on_connect(self, req):
code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, pageUrl=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["pageUrl"]
))
# TODO: process the on_connect event
return code
def __on_close(self, req):
code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"]
))
# TODO: process the on_close event
return code
'''
handle the streams requests: publish/unpublish stream.
'''
class RESTStreams(object):
exposed = True
def GET(self):
enable_crossdomain()
streams = {}
return json.dumps(streams)
'''
for SRS hook: on_publish/on_unpublish
on_publish:
when client(encoder) publish to vhost/app/stream, call the hook,
the request in the POST data string is a object encode by json:
{
"action": "on_publish",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream"
}
on_unpublish:
when client(encoder) stop publish to vhost/app/stream, call the hook,
the request in the POST data string is a object encode by json:
{
"action": "on_unpublish",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream"
}
if valid, the hook must return HTTP code 200(Stauts OK) and response
an int value specifies the error code(0 corresponding to success):
0
'''
def POST(self):
enable_crossdomain()
# return the error code in str
code = Error.success
req = cherrypy.request.body.read()
trace("post to streams, req=%s"%(req))
try:
json_req = json.loads(req)
except Exception, ex:
code = Error.system_parse_json
trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
return str(code)
action = json_req["action"]
if action == "on_publish":
code = self.__on_publish(json_req)
elif action == "on_unpublish":
code = self.__on_unpublish(json_req)
else:
trace("invalid request action: %s"%(json_req["action"]))
code = Error.request_invalid_action
return str(code)
def OPTIONS(self):
enable_crossdomain()
def __on_publish(self, req):
code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"]
))
# TODO: process the on_publish event
return code
def __on_unpublish(self, req):
code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"]
))
# TODO: process the on_unpublish event
return code
'''
handle the sessions requests: client play/stop stream
'''
class RESTSessions(object):
exposed = True
def GET(self):
enable_crossdomain()
sessions = {}
return json.dumps(sessions)
'''
for SRS hook: on_play/on_stop
on_play:
when client(encoder) publish to vhost/app/stream, call the hook,
the request in the POST data string is a object encode by json:
{
"action": "on_play",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream"
}
on_stop:
when client(encoder) stop publish to vhost/app/stream, call the hook,
the request in the POST data string is a object encode by json:
{
"action": "on_stop",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream"
}
if valid, the hook must return HTTP code 200(Stauts OK) and response
an int value specifies the error code(0 corresponding to success):
0
'''
def POST(self):
enable_crossdomain()
# return the error code in str
code = Error.success
req = cherrypy.request.body.read()
trace("post to sessions, req=%s"%(req))
try:
json_req = json.loads(req)
except Exception, ex:
code = Error.system_parse_json
trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code))
return str(code)
action = json_req["action"]
if action == "on_play":
code = self.__on_play(json_req)
elif action == "on_stop":
code = self.__on_stop(json_req)
else:
trace("invalid request action: %s"%(json_req["action"]))
code = Error.request_invalid_action
return str(code)
def OPTIONS(self):
enable_crossdomain()
def __on_play(self, req):
code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"]
))
# TODO: process the on_play event
return code
def __on_stop(self, req):
code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"]
))
# TODO: process the on_stop event
return code
# HTTP RESTful path. # HTTP RESTful path.
class Root(object): class Root(object):
def __init__(self): def __init__(self):
@ -123,6 +333,8 @@ class Api(object):
class V1(object): class V1(object):
def __init__(self): def __init__(self):
self.clients = RESTClients() self.clients = RESTClients()
self.streams = RESTStreams()
self.sessions = RESTSessions()
''' '''
main code start. main code start.

View file

@ -108,11 +108,15 @@ int SrsClient::do_cycle()
req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
req->app.c_str()); req->app.c_str());
if ((ret = refer->check(req->pageUrl, config->get_refer(req->vhost))) != ERROR_SUCCESS) { ret = service_cycle();
srs_error("check refer failed. ret=%d", ret); on_close();
return ret; return ret;
} }
srs_verbose("check refer success.");
int SrsClient::service_cycle()
{
int ret = ERROR_SUCCESS;
if ((ret = rtmp->set_window_ack_size(2.5 * 1000 * 1000)) != ERROR_SUCCESS) { if ((ret = rtmp->set_window_ack_size(2.5 * 1000 * 1000)) != ERROR_SUCCESS) {
srs_error("set window acknowledgement size failed. ret=%d", ret); srs_error("set window acknowledgement size failed. ret=%d", ret);
@ -188,8 +192,14 @@ int SrsClient::do_cycle()
srs_error("start to play stream failed. ret=%d", ret); srs_error("start to play stream failed. ret=%d", ret);
return ret; return ret;
} }
if ((ret = on_play()) != ERROR_SUCCESS) {
srs_error("http hook on_play failed. ret=%d", ret);
return ret;
}
srs_info("start to play stream %s success", req->stream.c_str()); srs_info("start to play stream %s success", req->stream.c_str());
return playing(source); ret = playing(source);
on_stop();
return ret;
} }
case SrsClientFMLEPublish: { case SrsClientFMLEPublish: {
srs_verbose("FMLE start to publish stream %s.", req->stream.c_str()); srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
@ -198,9 +208,14 @@ int SrsClient::do_cycle()
srs_error("start to publish stream failed. ret=%d", ret); srs_error("start to publish stream failed. ret=%d", ret);
return ret; return ret;
} }
if ((ret = on_publish()) != ERROR_SUCCESS) {
srs_error("http hook on_publish failed. ret=%d", ret);
return ret;
}
srs_info("start to publish stream %s success", req->stream.c_str()); srs_info("start to publish stream %s success", req->stream.c_str());
ret = publish(source, true); ret = publish(source, true);
source->on_unpublish(); source->on_unpublish();
on_unpublish();
return ret; return ret;
} }
case SrsClientFlashPublish: { case SrsClientFlashPublish: {
@ -210,9 +225,14 @@ int SrsClient::do_cycle()
srs_error("flash start to publish stream failed. ret=%d", ret); srs_error("flash start to publish stream failed. ret=%d", ret);
return ret; return ret;
} }
if ((ret = on_publish()) != ERROR_SUCCESS) {
srs_error("http hook on_publish failed. ret=%d", ret);
return ret;
}
srs_info("flash start to publish stream %s success", req->stream.c_str()); srs_info("flash start to publish stream %s success", req->stream.c_str());
ret = publish(source, false); ret = publish(source, false);
source->on_unpublish(); source->on_unpublish();
on_unpublish();
return ret; return ret;
} }
default: { default: {
@ -249,22 +269,15 @@ int SrsClient::check_vhost()
req->vhost = vhost->arg0(); req->vhost = vhost->arg0();
} }
#ifdef SRS_HTTP if ((ret = refer->check(req->pageUrl, config->get_refer(req->vhost))) != ERROR_SUCCESS) {
// HTTP: on_connect srs_error("check refer failed. ret=%d", ret);
SrsConfDirective* on_connect = config->get_vhost_on_connect(req->vhost);
if (!on_connect) {
srs_info("ignore the empty http callback: on_connect");
return ret; return ret;
} }
srs_verbose("check refer success.");
for (int i = 0; i < (int)on_connect->args.size(); i++) { if ((ret = on_connect()) != ERROR_SUCCESS) {
std::string url = on_connect->args.at(i);
if ((ret = http_hooks->on_connect(url, ip, req)) != ERROR_SUCCESS) {
srs_error("hook client failed. url=%s, ret=%d", url.c_str(), ret);
return ret; return ret;
} }
}
#endif
return ret; return ret;
} }
@ -545,3 +558,129 @@ int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage*
return ret; return ret;
} }
int SrsClient::on_connect()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_HTTP
// HTTP: on_connect
SrsConfDirective* on_connect = config->get_vhost_on_connect(req->vhost);
if (!on_connect) {
srs_info("ignore the empty http callback: on_connect");
return ret;
}
for (int i = 0; i < (int)on_connect->args.size(); i++) {
std::string url = on_connect->args.at(i);
if ((ret = http_hooks->on_connect(url, connection_id, ip, req)) != ERROR_SUCCESS) {
srs_error("hook client on_connect failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
}
#endif
return ret;
}
void SrsClient::on_close()
{
#ifdef SRS_HTTP
// whatever the ret code, notify the api hooks.
// HTTP: on_close
SrsConfDirective* on_close = config->get_vhost_on_close(req->vhost);
if (!on_close) {
srs_info("ignore the empty http callback: on_close");
return;
}
for (int i = 0; i < (int)on_close->args.size(); i++) {
std::string url = on_close->args.at(i);
http_hooks->on_close(url, connection_id, ip, req);
}
#endif
}
int SrsClient::on_publish()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_HTTP
// HTTP: on_publish
SrsConfDirective* on_publish = config->get_vhost_on_publish(req->vhost);
if (!on_publish) {
srs_info("ignore the empty http callback: on_publish");
return ret;
}
for (int i = 0; i < (int)on_publish->args.size(); i++) {
std::string url = on_publish->args.at(i);
if ((ret = http_hooks->on_publish(url, connection_id, ip, req)) != ERROR_SUCCESS) {
srs_error("hook client on_publish failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
}
#endif
return ret;
}
void SrsClient::on_unpublish()
{
#ifdef SRS_HTTP
// whatever the ret code, notify the api hooks.
// HTTP: on_unpublish
SrsConfDirective* on_unpublish = config->get_vhost_on_unpublish(req->vhost);
if (!on_unpublish) {
srs_info("ignore the empty http callback: on_unpublish");
return;
}
for (int i = 0; i < (int)on_unpublish->args.size(); i++) {
std::string url = on_unpublish->args.at(i);
http_hooks->on_unpublish(url, connection_id, ip, req);
}
#endif
}
int SrsClient::on_play()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_HTTP
// HTTP: on_play
SrsConfDirective* on_play = config->get_vhost_on_play(req->vhost);
if (!on_play) {
srs_info("ignore the empty http callback: on_play");
return ret;
}
for (int i = 0; i < (int)on_play->args.size(); i++) {
std::string url = on_play->args.at(i);
if ((ret = http_hooks->on_play(url, connection_id, ip, req)) != ERROR_SUCCESS) {
srs_error("hook client on_play failed. url=%s, ret=%d", url.c_str(), ret);
return ret;
}
}
#endif
return ret;
}
void SrsClient::on_stop()
{
#ifdef SRS_HTTP
// whatever the ret code, notify the api hooks.
// HTTP: on_stop
SrsConfDirective* on_stop = config->get_vhost_on_stop(req->vhost);
if (!on_stop) {
srs_info("ignore the empty http callback: on_stop");
return;
}
for (int i = 0; i < (int)on_stop->args.size(); i++) {
std::string url = on_stop->args.at(i);
http_hooks->on_stop(url, connection_id, ip, req);
}
#endif
}

View file

@ -63,12 +63,21 @@ public:
protected: protected:
virtual int do_cycle(); virtual int do_cycle();
private: private:
// when valid and connected to vhost/app, service the client.
virtual int service_cycle();
virtual int check_vhost(); virtual int check_vhost();
virtual int playing(SrsSource* source); virtual int playing(SrsSource* source);
virtual int publish(SrsSource* source, bool is_fmle); virtual int publish(SrsSource* source, bool is_fmle);
virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle); virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle);
virtual int get_peer_ip(); virtual int get_peer_ip();
virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
private:
virtual int on_connect();
virtual void on_close();
virtual int on_publish();
virtual void on_unpublish();
virtual int on_play();
virtual void on_stop();
}; };
#endif #endif

View file

@ -578,6 +578,86 @@ SrsConfDirective* SrsConfig::get_vhost_on_connect(std::string vhost)
return conf->get("on_connect"); return conf->get("on_connect");
} }
SrsConfDirective* SrsConfig::get_vhost_on_close(std::string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("http_hooks");
if (!conf) {
return NULL;
}
return conf->get("on_close");
}
SrsConfDirective* SrsConfig::get_vhost_on_publish(std::string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("http_hooks");
if (!conf) {
return NULL;
}
return conf->get("on_publish");
}
SrsConfDirective* SrsConfig::get_vhost_on_unpublish(std::string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("http_hooks");
if (!conf) {
return NULL;
}
return conf->get("on_unpublish");
}
SrsConfDirective* SrsConfig::get_vhost_on_play(std::string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("http_hooks");
if (!conf) {
return NULL;
}
return conf->get("on_play");
}
SrsConfDirective* SrsConfig::get_vhost_on_stop(std::string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return NULL;
}
conf = conf->get("http_hooks");
if (!conf) {
return NULL;
}
return conf->get("on_stop");
}
bool SrsConfig::get_vhost_enabled(std::string vhost) bool SrsConfig::get_vhost_enabled(std::string vhost)
{ {
SrsConfDirective* vhost_conf = get_vhost(vhost); SrsConfDirective* vhost_conf = get_vhost(vhost);

View file

@ -120,6 +120,11 @@ public:
virtual SrsConfDirective* get_vhost(std::string vhost); virtual SrsConfDirective* get_vhost(std::string vhost);
virtual bool get_vhost_enabled(std::string vhost); virtual bool get_vhost_enabled(std::string vhost);
virtual SrsConfDirective* get_vhost_on_connect(std::string vhost); virtual SrsConfDirective* get_vhost_on_connect(std::string vhost);
virtual SrsConfDirective* get_vhost_on_close(std::string vhost);
virtual SrsConfDirective* get_vhost_on_publish(std::string vhost);
virtual SrsConfDirective* get_vhost_on_unpublish(std::string vhost);
virtual SrsConfDirective* get_vhost_on_play(std::string vhost);
virtual SrsConfDirective* get_vhost_on_stop(std::string vhost);
virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope); virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope);
virtual bool get_transcode_enabled(SrsConfDirective* transcode); virtual bool get_transcode_enabled(SrsConfDirective* transcode);
virtual std::string get_transcode_ffmpeg(SrsConfDirective* transcode); virtual std::string get_transcode_ffmpeg(SrsConfDirective* transcode);

View file

@ -31,6 +31,7 @@ SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd)
{ {
server = srs_server; server = srs_server;
stfd = client_stfd; stfd = client_stfd;
connection_id = 0;
} }
SrsConnection::~SrsConnection() SrsConnection::~SrsConnection()
@ -65,6 +66,8 @@ void SrsConnection::cycle()
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
log_context->generate_id(); log_context->generate_id();
connection_id = log_context->get_id();
ret = do_cycle(); ret = do_cycle();
// if socket io error, set to closed. // if socket io error, set to closed.

View file

@ -38,6 +38,7 @@ class SrsConnection
protected: protected:
SrsServer* server; SrsServer* server;
st_netfd_t stfd; st_netfd_t stfd;
int connection_id;
public: public:
SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd); SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd);
virtual ~SrsConnection(); virtual ~SrsConnection();

View file

@ -400,25 +400,35 @@ SrsHttpHooks::~SrsHttpHooks()
{ {
} }
int SrsHttpHooks::on_connect(std::string url, std::string ip, SrsRequest* req) int SrsHttpHooks::on_connect(std::string url, int client_id, std::string ip, SrsRequest* req)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
SrsHttpUri uri; SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_error("http uri parse url failed. " srs_error("http uri parse on_connect url failed. "
"url=%s, ret=%d", url.c_str(), ret); "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret);
return ret; return ret;
} }
/** /**
{ {
"action": "on_connect",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"pageUrl": "http://www.test.com/live.html" "pageUrl": "http://www.test.com/live.html"
} }
*/ */
std::stringstream ss; std::stringstream ss;
ss << "{" ss << "{"
// action
<< '"' << "action" << '"' << ':'
<< '"' << "on_connect" << '"'
<< ','
// client_id
<< '"' << "client_id" << '"' << ':'
<< std::dec << client_id
<< ','
// ip // ip
<< '"' << "ip" << '"' << ':' << '"' << "ip" << '"' << ':'
<< '"' << ip << '"' << '"' << ip << '"'
@ -441,24 +451,375 @@ int SrsHttpHooks::on_connect(std::string url, std::string ip, SrsRequest* req)
SrsHttpClient http; SrsHttpClient http;
if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) {
srs_error("http post uri failed. " srs_error("http post on_connect uri failed. "
"url=%s, request=%s, response=%s, ret=%d", "client_id=%d, url=%s, request=%s, response=%s, ret=%d",
url.c_str(), data.c_str(), res.c_str(), ret); client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return ret; return ret;
} }
if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { if (res.empty() || res != SRS_HTTP_RESPONSE_OK) {
ret = ERROR_HTTP_DATA_INVLIAD; ret = ERROR_HTTP_DATA_INVLIAD;
srs_error("http hook validate failed. " srs_error("http hook on_connect validate failed. "
"res=%s, ret=%d", res.c_str(), ret); "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret);
return ret; return ret;
} }
srs_trace("http hook on_connect success. " srs_trace("http hook on_connect success. "
"url=%s, request=%s, response=%s, ret=%d", "client_id=%d, url=%s, request=%s, response=%s, ret=%d",
url.c_str(), data.c_str(), res.c_str(), ret); client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return ret; return ret;
} }
void SrsHttpHooks::on_close(std::string url, int client_id, std::string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_warn("http uri parse on_close url failed, ignored. "
"client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret);
return;
}
/**
{
"action": "on_close",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream"
}
*/
std::stringstream ss;
ss << "{"
// action
<< '"' << "action" << '"' << ':'
<< '"' << "on_close" << '"'
<< ','
// client_id
<< '"' << "client_id" << '"' << ':'
<< std::dec << client_id
<< ','
// ip
<< '"' << "ip" << '"' << ':'
<< '"' << ip << '"'
<< ','
// vhost
<< '"' << "vhost" << '"' << ':'
<< '"' << req->vhost << '"'
<< ','
// app
<< '"' << "app" << '"' << ':'
<< '"' << req->app << '"'
//<< ','
<< "}";
std::string data = ss.str();
std::string res;
SrsHttpClient http;
if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) {
srs_warn("http post on_close uri failed, ignored. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return;
}
if (res.empty() || res != SRS_HTTP_RESPONSE_OK) {
ret = ERROR_HTTP_DATA_INVLIAD;
srs_warn("http hook on_close validate failed, ignored. "
"client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret);
return;
}
srs_trace("http hook on_close success. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return;
}
int SrsHttpHooks::on_publish(std::string url, int client_id, std::string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_error("http uri parse on_publish url failed. "
"client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret);
return ret;
}
/**
{
"action": "on_publish",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream"
}
*/
std::stringstream ss;
ss << "{"
// action
<< '"' << "action" << '"' << ':'
<< '"' << "on_publish" << '"'
<< ','
// client_id
<< '"' << "client_id" << '"' << ':'
<< std::dec << client_id
<< ','
// ip
<< '"' << "ip" << '"' << ':'
<< '"' << ip << '"'
<< ','
// vhost
<< '"' << "vhost" << '"' << ':'
<< '"' << req->vhost << '"'
<< ','
// app
<< '"' << "app" << '"' << ':'
<< '"' << req->app << '"'
<< ','
// stream
<< '"' << "stream" << '"' << ':'
<< '"' << req->stream << '"'
//<< ','
<< "}";
std::string data = ss.str();
std::string res;
SrsHttpClient http;
if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) {
srs_error("http post on_publish uri failed. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return ret;
}
if (res.empty() || res != SRS_HTTP_RESPONSE_OK) {
ret = ERROR_HTTP_DATA_INVLIAD;
srs_error("http hook on_publish validate failed. "
"client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret);
return ret;
}
srs_trace("http hook on_publish success. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return ret;
}
void SrsHttpHooks::on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_warn("http uri parse on_unpublish url failed, ignored. "
"client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret);
return;
}
/**
{
"action": "on_unpublish",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream"
}
*/
std::stringstream ss;
ss << "{"
// action
<< '"' << "action" << '"' << ':'
<< '"' << "on_unpublish" << '"'
<< ','
// client_id
<< '"' << "client_id" << '"' << ':'
<< std::dec << client_id
<< ','
// ip
<< '"' << "ip" << '"' << ':'
<< '"' << ip << '"'
<< ','
// vhost
<< '"' << "vhost" << '"' << ':'
<< '"' << req->vhost << '"'
<< ','
// app
<< '"' << "app" << '"' << ':'
<< '"' << req->app << '"'
<< ','
// stream
<< '"' << "stream" << '"' << ':'
<< '"' << req->stream << '"'
//<< ','
<< "}";
std::string data = ss.str();
std::string res;
SrsHttpClient http;
if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) {
srs_warn("http post on_unpublish uri failed, ignored. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return;
}
if (res.empty() || res != SRS_HTTP_RESPONSE_OK) {
ret = ERROR_HTTP_DATA_INVLIAD;
srs_warn("http hook on_unpublish validate failed, ignored. "
"client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret);
return;
}
srs_trace("http hook on_unpublish success. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return;
}
int SrsHttpHooks::on_play(std::string url, int client_id, std::string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_error("http uri parse on_play url failed. "
"client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret);
return ret;
}
/**
{
"action": "on_play",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream"
}
*/
std::stringstream ss;
ss << "{"
// action
<< '"' << "action" << '"' << ':'
<< '"' << "on_play" << '"'
<< ','
// client_id
<< '"' << "client_id" << '"' << ':'
<< std::dec << client_id
<< ','
// ip
<< '"' << "ip" << '"' << ':'
<< '"' << ip << '"'
<< ','
// vhost
<< '"' << "vhost" << '"' << ':'
<< '"' << req->vhost << '"'
<< ','
// app
<< '"' << "app" << '"' << ':'
<< '"' << req->app << '"'
<< ','
// stream
<< '"' << "stream" << '"' << ':'
<< '"' << req->stream << '"'
//<< ','
<< "}";
std::string data = ss.str();
std::string res;
SrsHttpClient http;
if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) {
srs_error("http post on_play uri failed. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return ret;
}
if (res.empty() || res != SRS_HTTP_RESPONSE_OK) {
ret = ERROR_HTTP_DATA_INVLIAD;
srs_error("http hook on_play validate failed. "
"client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret);
return ret;
}
srs_trace("http hook on_play success. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return ret;
}
void SrsHttpHooks::on_stop(std::string url, int client_id, std::string ip, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_warn("http uri parse on_stop url failed, ignored. "
"client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret);
return;
}
/**
{
"action": "on_stop",
"client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream"
}
*/
std::stringstream ss;
ss << "{"
// action
<< '"' << "action" << '"' << ':'
<< '"' << "on_stop" << '"'
<< ','
// client_id
<< '"' << "client_id" << '"' << ':'
<< std::dec << client_id
<< ','
// ip
<< '"' << "ip" << '"' << ':'
<< '"' << ip << '"'
<< ','
// vhost
<< '"' << "vhost" << '"' << ':'
<< '"' << req->vhost << '"'
<< ','
// app
<< '"' << "app" << '"' << ':'
<< '"' << req->app << '"'
<< ','
// stream
<< '"' << "stream" << '"' << ':'
<< '"' << req->stream << '"'
//<< ','
<< "}";
std::string data = ss.str();
std::string res;
SrsHttpClient http;
if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) {
srs_warn("http post on_stop uri failed, ignored. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return;
}
if (res.empty() || res != SRS_HTTP_RESPONSE_OK) {
ret = ERROR_HTTP_DATA_INVLIAD;
srs_warn("http hook on_stop validate failed, ignored. "
"client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret);
return;
}
srs_trace("http hook on_stop success. "
"client_id=%d, url=%s, request=%s, response=%s, ret=%d",
client_id, url.c_str(), data.c_str(), res.c_str(), ret);
return;
}
#endif #endif

View file

@ -120,12 +120,50 @@ public:
virtual ~SrsHttpHooks(); virtual ~SrsHttpHooks();
public: public:
/** /**
* on_connect hook, * on_connect hook, when client connect to srs.
* @param client_id the id of client on server.
* @param url the api server url, to valid the client. * @param url the api server url, to valid the client.
* ignore if empty. * ignore if empty.
* @return valid failed or connect to the url failed. * @return valid failed or connect to the url failed.
*/ */
virtual int on_connect(std::string url, std::string ip, SrsRequest* req); virtual int on_connect(std::string url, int client_id, std::string ip, SrsRequest* req);
/**
* on_close hook, when client disconnect to srs, where client is valid by on_connect.
* @param client_id the id of client on server.
* @param url the api server url, to process the event.
* ignore if empty.
*/
virtual void on_close(std::string url, int client_id, std::string ip, SrsRequest* req);
/**
* on_publish hook, when client(encoder) start to publish stream
* @param client_id the id of client on server.
* @param url the api server url, to valid the client.
* ignore if empty.
* @return valid failed or connect to the url failed.
*/
virtual int on_publish(std::string url, int client_id, std::string ip, SrsRequest* req);
/**
* on_unpublish hook, when client(encoder) stop publish stream.
* @param client_id the id of client on server.
* @param url the api server url, to process the event.
* ignore if empty.
*/
virtual void on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req);
/**
* on_play hook, when client start to play stream.
* @param client_id the id of client on server.
* @param url the api server url, to valid the client.
* ignore if empty.
* @return valid failed or connect to the url failed.
*/
virtual int on_play(std::string url, int client_id, std::string ip, SrsRequest* req);
/**
* on_stop hook, when client stop to play the stream.
* @param client_id the id of client on server.
* @param url the api server url, to process the event.
* ignore if empty.
*/
virtual void on_stop(std::string url, int client_id, std::string ip, SrsRequest* req);
}; };
#endif #endif