diff --git a/README.md b/README.md index 79bae01d4..9d13745bd 100755 --- a/README.md +++ b/README.md @@ -190,6 +190,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw * nginx v1.5.0: 139524 lines
### History +* v0.8, 2013-12-08, support http hooks: on_connect/close/publish/unpublish/play/stop. * v0.8, 2013-12-08, support multiple http hooks for a event. * v0.8, 2013-12-07, support http callback hooks, on_connect. * v0.8, 2013-12-07, support network based cli and json result, add CherryPy 3.2.4. diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index 6469d1e6a..2eb077665 100755 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -139,6 +139,8 @@ vhost hooks.callback.vhost.com { # when client connect to vhost/app, call the hook, # the request in the POST data string is a object encode by json: # { + # "action": "on_connect", + # "client_id": 1985, # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", # "pageUrl": "http://www.test.com/live.html" # } @@ -148,6 +150,75 @@ vhost hooks.callback.vhost.com { # support multiple api hooks, format: # on_connect http://xxx/api0 http://xxx/api1 http://xxx/apiN on_connect http://127.0.0.1:8085/api/v1/clients http://localhost:8085/api/v1/clients; + # when client close/disconnect to vhost/app/stream, call the hook, + # the request in the POST data string is a object encode by json: + # { + # "action": "on_close", + # "client_id": 1985, + # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live" + # } + # if valid, the hook must return HTTP code 200(Stauts OK) and response + # an int value specifies the error code(0 corresponding to success): + # 0 + # support multiple api hooks, format: + # on_close http://xxx/api0 http://xxx/api1 http://xxx/apiN + on_close http://127.0.0.1:8085/api/v1/clients http://localhost:8085/api/v1/clients; + # when client(encoder) publish to vhost/app/stream, call the hook, + # the request in the POST data string is a object encode by json: + # { + # "action": "on_publish", + # "client_id": 1985, + # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + # "stream": "livestream" + # } + # if valid, the hook must return HTTP code 200(Stauts OK) and response + # an int value specifies the error code(0 corresponding to success): + # 0 + # support multiple api hooks, format: + # on_publish http://xxx/api0 http://xxx/api1 http://xxx/apiN + on_publish http://127.0.0.1:8085/api/v1/streams http://localhost:8085/api/v1/streams; + # when client(encoder) stop publish to vhost/app/stream, call the hook, + # the request in the POST data string is a object encode by json: + # { + # "action": "on_unpublish", + # "client_id": 1985, + # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + # "stream": "livestream" + # } + # if valid, the hook must return HTTP code 200(Stauts OK) and response + # an int value specifies the error code(0 corresponding to success): + # 0 + # support multiple api hooks, format: + # on_unpublish http://xxx/api0 http://xxx/api1 http://xxx/apiN + on_unpublish http://127.0.0.1:8085/api/v1/streams http://localhost:8085/api/v1/streams; + # when client start to play vhost/app/stream, call the hook, + # the request in the POST data string is a object encode by json: + # { + # "action": "on_play", + # "client_id": 1985, + # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + # "stream": "livestream" + # } + # if valid, the hook must return HTTP code 200(Stauts OK) and response + # an int value specifies the error code(0 corresponding to success): + # 0 + # support multiple api hooks, format: + # on_play http://xxx/api0 http://xxx/api1 http://xxx/apiN + on_play http://127.0.0.1:8085/api/v1/sessions http://localhost:8085/api/v1/sessions; + # when client stop to play vhost/app/stream, call the hook, + # the request in the POST data string is a object encode by json: + # { + # "action": "on_stop", + # "client_id": 1985, + # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + # "stream": "livestream" + # } + # if valid, the hook must return HTTP code 200(Stauts OK) and response + # an int value specifies the error code(0 corresponding to success): + # 0 + # support multiple api hooks, format: + # on_stop http://xxx/api0 http://xxx/api1 http://xxx/apiN + on_stop http://127.0.0.1:8085/api/v1/sessions http://localhost:8085/api/v1/sessions; } } # the mirror filter of ffmpeg, @see: http://ffmpeg.org/ffmpeg-filters.html#Filtering-Introduction diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index e5405011a..617668c45 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -57,10 +57,11 @@ class Error: success = 0 # error when parse json system_parse_json = 100 + # request action invalid + request_invalid_action = 200 ''' -handle the clients requests: -POST: create new client, handle the SRS on_connect callback. +handle the clients requests: connect/disconnect vhost/app. ''' class RESTClients(object): exposed = True @@ -72,13 +73,24 @@ class RESTClients(object): return json.dumps(clients) ''' - for SRS hook: on_connect - when client connect to vhost/app, call the hook, - the request in the POST data string is a object encode by json: - { - "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", - "pageUrl": "http://www.test.com/live.html" - } + for SRS hook: on_connect/on_close + on_connect: + when client connect to vhost/app, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_connect", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "pageUrl": "http://www.test.com/live.html" + } + on_close: + when client close/disconnect to vhost/app/stream, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_close", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live" + } if valid, the hook must return HTTP code 200(Stauts OK) and response an int value specifies the error code(0 corresponding to success): 0 @@ -87,29 +99,227 @@ class RESTClients(object): enable_crossdomain() # return the error code in str - ret = Error.success + code = Error.success req = cherrypy.request.body.read() trace("post to clients, req=%s"%(req)) try: json_req = json.loads(req) except Exception, ex: - ret = Error.system_parse_json - trace("parse the request to json failed, req=%s, ex=%s, ret=%s"%(req, ex, ret)) - return str(ret) + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return str(code) - trace("srs on_connect: client ip=%s, vhost=%s, app=%s, pageUrl=%s"%( - json_req["ip"], json_req["vhost"], json_req["app"], json_req["pageUrl"] - )) + action = json_req["action"] + if action == "on_connect": + code = self.__on_connect(json_req) + elif action == "on_close": + code = self.__on_close(json_req) + else: + trace("invalid request action: %s"%(json_req["action"])) + code = Error.request_invalid_action - # TODO: valid the client. - - trace("valid clients post request success.") - return str(ret) + return str(code) def OPTIONS(self): enable_crossdomain() + def __on_connect(self, req): + code = Error.success + + trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, pageUrl=%s"%( + req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["pageUrl"] + )) + + # TODO: process the on_connect event + + return code + + def __on_close(self, req): + code = Error.success + + trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s"%( + req["action"], req["client_id"], req["ip"], req["vhost"], req["app"] + )) + + # TODO: process the on_close event + + return code + +''' +handle the streams requests: publish/unpublish stream. +''' +class RESTStreams(object): + exposed = True + + def GET(self): + enable_crossdomain() + + streams = {} + return json.dumps(streams) + + ''' + for SRS hook: on_publish/on_unpublish + on_publish: + when client(encoder) publish to vhost/app/stream, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_publish", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + on_unpublish: + when client(encoder) stop publish to vhost/app/stream, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_unpublish", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + if valid, the hook must return HTTP code 200(Stauts OK) and response + an int value specifies the error code(0 corresponding to success): + 0 + ''' + def POST(self): + enable_crossdomain() + + # return the error code in str + code = Error.success + + req = cherrypy.request.body.read() + trace("post to streams, req=%s"%(req)) + try: + json_req = json.loads(req) + except Exception, ex: + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return str(code) + + action = json_req["action"] + if action == "on_publish": + code = self.__on_publish(json_req) + elif action == "on_unpublish": + code = self.__on_unpublish(json_req) + else: + trace("invalid request action: %s"%(json_req["action"])) + code = Error.request_invalid_action + + return str(code) + + def OPTIONS(self): + enable_crossdomain() + + def __on_publish(self, req): + code = Error.success + + trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%( + req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"] + )) + + # TODO: process the on_publish event + + return code + + def __on_unpublish(self, req): + code = Error.success + + trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%( + req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"] + )) + + # TODO: process the on_unpublish event + + return code + +''' +handle the sessions requests: client play/stop stream +''' +class RESTSessions(object): + exposed = True + + def GET(self): + enable_crossdomain() + + sessions = {} + return json.dumps(sessions) + + ''' + for SRS hook: on_play/on_stop + on_play: + when client(encoder) publish to vhost/app/stream, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_play", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + on_stop: + when client(encoder) stop publish to vhost/app/stream, call the hook, + the request in the POST data string is a object encode by json: + { + "action": "on_stop", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + if valid, the hook must return HTTP code 200(Stauts OK) and response + an int value specifies the error code(0 corresponding to success): + 0 + ''' + def POST(self): + enable_crossdomain() + + # return the error code in str + code = Error.success + + req = cherrypy.request.body.read() + trace("post to sessions, req=%s"%(req)) + try: + json_req = json.loads(req) + except Exception, ex: + code = Error.system_parse_json + trace("parse the request to json failed, req=%s, ex=%s, code=%s"%(req, ex, code)) + return str(code) + + action = json_req["action"] + if action == "on_play": + code = self.__on_play(json_req) + elif action == "on_stop": + code = self.__on_stop(json_req) + else: + trace("invalid request action: %s"%(json_req["action"])) + code = Error.request_invalid_action + + return str(code) + + def OPTIONS(self): + enable_crossdomain() + + def __on_play(self, req): + code = Error.success + + trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%( + req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"] + )) + + # TODO: process the on_play event + + return code + + def __on_stop(self, req): + code = Error.success + + trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%( + req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"] + )) + + # TODO: process the on_stop event + + return code + # HTTP RESTful path. class Root(object): def __init__(self): @@ -123,6 +333,8 @@ class Api(object): class V1(object): def __init__(self): self.clients = RESTClients() + self.streams = RESTStreams() + self.sessions = RESTSessions() ''' main code start. diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index aa114165c..1b84b3287 100644 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -108,12 +108,16 @@ int SrsClient::do_cycle() req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str()); - if ((ret = refer->check(req->pageUrl, config->get_refer(req->vhost))) != ERROR_SUCCESS) { - srs_error("check refer failed. ret=%d", ret); - return ret; - } - srs_verbose("check refer success."); - + ret = service_cycle(); + on_close(); + + return ret; +} + +int SrsClient::service_cycle() +{ + int ret = ERROR_SUCCESS; + if ((ret = rtmp->set_window_ack_size(2.5 * 1000 * 1000)) != ERROR_SUCCESS) { srs_error("set window acknowledgement size failed. ret=%d", ret); return ret; @@ -188,8 +192,14 @@ int SrsClient::do_cycle() srs_error("start to play stream failed. ret=%d", ret); return ret; } + if ((ret = on_play()) != ERROR_SUCCESS) { + srs_error("http hook on_play failed. ret=%d", ret); + return ret; + } srs_info("start to play stream %s success", req->stream.c_str()); - return playing(source); + ret = playing(source); + on_stop(); + return ret; } case SrsClientFMLEPublish: { srs_verbose("FMLE start to publish stream %s.", req->stream.c_str()); @@ -198,9 +208,14 @@ int SrsClient::do_cycle() srs_error("start to publish stream failed. ret=%d", ret); return ret; } + if ((ret = on_publish()) != ERROR_SUCCESS) { + srs_error("http hook on_publish failed. ret=%d", ret); + return ret; + } srs_info("start to publish stream %s success", req->stream.c_str()); ret = publish(source, true); source->on_unpublish(); + on_unpublish(); return ret; } case SrsClientFlashPublish: { @@ -210,9 +225,14 @@ int SrsClient::do_cycle() srs_error("flash start to publish stream failed. ret=%d", ret); return ret; } + if ((ret = on_publish()) != ERROR_SUCCESS) { + srs_error("http hook on_publish failed. ret=%d", ret); + return ret; + } srs_info("flash start to publish stream %s success", req->stream.c_str()); ret = publish(source, false); source->on_unpublish(); + on_unpublish(); return ret; } default: { @@ -249,22 +269,15 @@ int SrsClient::check_vhost() req->vhost = vhost->arg0(); } -#ifdef SRS_HTTP - // HTTP: on_connect - SrsConfDirective* on_connect = config->get_vhost_on_connect(req->vhost); - if (!on_connect) { - srs_info("ignore the empty http callback: on_connect"); + if ((ret = refer->check(req->pageUrl, config->get_refer(req->vhost))) != ERROR_SUCCESS) { + srs_error("check refer failed. ret=%d", ret); return ret; } + srs_verbose("check refer success."); - for (int i = 0; i < (int)on_connect->args.size(); i++) { - std::string url = on_connect->args.at(i); - if ((ret = http_hooks->on_connect(url, ip, req)) != ERROR_SUCCESS) { - srs_error("hook client failed. url=%s, ret=%d", url.c_str(), ret); - return ret; - } + if ((ret = on_connect()) != ERROR_SUCCESS) { + return ret; } -#endif return ret; } @@ -545,3 +558,129 @@ int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* return ret; } +int SrsClient::on_connect() +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_HTTP + // HTTP: on_connect + SrsConfDirective* on_connect = config->get_vhost_on_connect(req->vhost); + if (!on_connect) { + srs_info("ignore the empty http callback: on_connect"); + return ret; + } + + for (int i = 0; i < (int)on_connect->args.size(); i++) { + std::string url = on_connect->args.at(i); + if ((ret = http_hooks->on_connect(url, connection_id, ip, req)) != ERROR_SUCCESS) { + srs_error("hook client on_connect failed. url=%s, ret=%d", url.c_str(), ret); + return ret; + } + } +#endif + + return ret; +} + +void SrsClient::on_close() +{ +#ifdef SRS_HTTP + // whatever the ret code, notify the api hooks. + // HTTP: on_close + SrsConfDirective* on_close = config->get_vhost_on_close(req->vhost); + if (!on_close) { + srs_info("ignore the empty http callback: on_close"); + return; + } + + for (int i = 0; i < (int)on_close->args.size(); i++) { + std::string url = on_close->args.at(i); + http_hooks->on_close(url, connection_id, ip, req); + } +#endif +} + +int SrsClient::on_publish() +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_HTTP + // HTTP: on_publish + SrsConfDirective* on_publish = config->get_vhost_on_publish(req->vhost); + if (!on_publish) { + srs_info("ignore the empty http callback: on_publish"); + return ret; + } + + for (int i = 0; i < (int)on_publish->args.size(); i++) { + std::string url = on_publish->args.at(i); + if ((ret = http_hooks->on_publish(url, connection_id, ip, req)) != ERROR_SUCCESS) { + srs_error("hook client on_publish failed. url=%s, ret=%d", url.c_str(), ret); + return ret; + } + } +#endif + + return ret; +} + +void SrsClient::on_unpublish() +{ +#ifdef SRS_HTTP + // whatever the ret code, notify the api hooks. + // HTTP: on_unpublish + SrsConfDirective* on_unpublish = config->get_vhost_on_unpublish(req->vhost); + if (!on_unpublish) { + srs_info("ignore the empty http callback: on_unpublish"); + return; + } + + for (int i = 0; i < (int)on_unpublish->args.size(); i++) { + std::string url = on_unpublish->args.at(i); + http_hooks->on_unpublish(url, connection_id, ip, req); + } +#endif +} + +int SrsClient::on_play() +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_HTTP + // HTTP: on_play + SrsConfDirective* on_play = config->get_vhost_on_play(req->vhost); + if (!on_play) { + srs_info("ignore the empty http callback: on_play"); + return ret; + } + + for (int i = 0; i < (int)on_play->args.size(); i++) { + std::string url = on_play->args.at(i); + if ((ret = http_hooks->on_play(url, connection_id, ip, req)) != ERROR_SUCCESS) { + srs_error("hook client on_play failed. url=%s, ret=%d", url.c_str(), ret); + return ret; + } + } +#endif + + return ret; +} + +void SrsClient::on_stop() +{ +#ifdef SRS_HTTP + // whatever the ret code, notify the api hooks. + // HTTP: on_stop + SrsConfDirective* on_stop = config->get_vhost_on_stop(req->vhost); + if (!on_stop) { + srs_info("ignore the empty http callback: on_stop"); + return; + } + + for (int i = 0; i < (int)on_stop->args.size(); i++) { + std::string url = on_stop->args.at(i); + http_hooks->on_stop(url, connection_id, ip, req); + } +#endif +} + diff --git a/trunk/src/core/srs_core_client.hpp b/trunk/src/core/srs_core_client.hpp index d951fbd88..0d84b527d 100644 --- a/trunk/src/core/srs_core_client.hpp +++ b/trunk/src/core/srs_core_client.hpp @@ -63,12 +63,21 @@ public: protected: virtual int do_cycle(); private: + // when valid and connected to vhost/app, service the client. + virtual int service_cycle(); virtual int check_vhost(); virtual int playing(SrsSource* source); virtual int publish(SrsSource* source, bool is_fmle); virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle); virtual int get_peer_ip(); virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); +private: + virtual int on_connect(); + virtual void on_close(); + virtual int on_publish(); + virtual void on_unpublish(); + virtual int on_play(); + virtual void on_stop(); }; #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_config.cpp b/trunk/src/core/srs_core_config.cpp index 3ea183387..242e15438 100644 --- a/trunk/src/core/srs_core_config.cpp +++ b/trunk/src/core/srs_core_config.cpp @@ -578,6 +578,86 @@ SrsConfDirective* SrsConfig::get_vhost_on_connect(std::string vhost) return conf->get("on_connect"); } +SrsConfDirective* SrsConfig::get_vhost_on_close(std::string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return NULL; + } + + conf = conf->get("http_hooks"); + if (!conf) { + return NULL; + } + + return conf->get("on_close"); +} + +SrsConfDirective* SrsConfig::get_vhost_on_publish(std::string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return NULL; + } + + conf = conf->get("http_hooks"); + if (!conf) { + return NULL; + } + + return conf->get("on_publish"); +} + +SrsConfDirective* SrsConfig::get_vhost_on_unpublish(std::string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return NULL; + } + + conf = conf->get("http_hooks"); + if (!conf) { + return NULL; + } + + return conf->get("on_unpublish"); +} + +SrsConfDirective* SrsConfig::get_vhost_on_play(std::string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return NULL; + } + + conf = conf->get("http_hooks"); + if (!conf) { + return NULL; + } + + return conf->get("on_play"); +} + +SrsConfDirective* SrsConfig::get_vhost_on_stop(std::string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return NULL; + } + + conf = conf->get("http_hooks"); + if (!conf) { + return NULL; + } + + return conf->get("on_stop"); +} + bool SrsConfig::get_vhost_enabled(std::string vhost) { SrsConfDirective* vhost_conf = get_vhost(vhost); diff --git a/trunk/src/core/srs_core_config.hpp b/trunk/src/core/srs_core_config.hpp index 5e3dac4e4..50538cb53 100644 --- a/trunk/src/core/srs_core_config.hpp +++ b/trunk/src/core/srs_core_config.hpp @@ -120,6 +120,11 @@ public: virtual SrsConfDirective* get_vhost(std::string vhost); virtual bool get_vhost_enabled(std::string vhost); virtual SrsConfDirective* get_vhost_on_connect(std::string vhost); + virtual SrsConfDirective* get_vhost_on_close(std::string vhost); + virtual SrsConfDirective* get_vhost_on_publish(std::string vhost); + virtual SrsConfDirective* get_vhost_on_unpublish(std::string vhost); + virtual SrsConfDirective* get_vhost_on_play(std::string vhost); + virtual SrsConfDirective* get_vhost_on_stop(std::string vhost); virtual SrsConfDirective* get_transcode(std::string vhost, std::string scope); virtual bool get_transcode_enabled(SrsConfDirective* transcode); virtual std::string get_transcode_ffmpeg(SrsConfDirective* transcode); diff --git a/trunk/src/core/srs_core_conn.cpp b/trunk/src/core/srs_core_conn.cpp index bcb6e4c5e..b11e8ea18 100644 --- a/trunk/src/core/srs_core_conn.cpp +++ b/trunk/src/core/srs_core_conn.cpp @@ -31,6 +31,7 @@ SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd) { server = srs_server; stfd = client_stfd; + connection_id = 0; } SrsConnection::~SrsConnection() @@ -65,6 +66,8 @@ void SrsConnection::cycle() int ret = ERROR_SUCCESS; log_context->generate_id(); + connection_id = log_context->get_id(); + ret = do_cycle(); // if socket io error, set to closed. diff --git a/trunk/src/core/srs_core_conn.hpp b/trunk/src/core/srs_core_conn.hpp index 502457e0f..8f8519c95 100644 --- a/trunk/src/core/srs_core_conn.hpp +++ b/trunk/src/core/srs_core_conn.hpp @@ -38,6 +38,7 @@ class SrsConnection protected: SrsServer* server; st_netfd_t stfd; + int connection_id; public: SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd); virtual ~SrsConnection(); diff --git a/trunk/src/core/srs_core_http.cpp b/trunk/src/core/srs_core_http.cpp index 7bb8ac006..461d5cd2d 100644 --- a/trunk/src/core/srs_core_http.cpp +++ b/trunk/src/core/srs_core_http.cpp @@ -400,25 +400,35 @@ SrsHttpHooks::~SrsHttpHooks() { } -int SrsHttpHooks::on_connect(std::string url, std::string ip, SrsRequest* req) +int SrsHttpHooks::on_connect(std::string url, int client_id, std::string ip, SrsRequest* req) { int ret = ERROR_SUCCESS; SrsHttpUri uri; if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { - srs_error("http uri parse url failed. " - "url=%s, ret=%d", url.c_str(), ret); + srs_error("http uri parse on_connect url failed. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); return ret; } /** { + "action": "on_connect", + "client_id": 1985, "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", "pageUrl": "http://www.test.com/live.html" } */ std::stringstream ss; ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_connect" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' // ip << '"' << "ip" << '"' << ':' << '"' << ip << '"' @@ -441,24 +451,375 @@ int SrsHttpHooks::on_connect(std::string url, std::string ip, SrsRequest* req) SrsHttpClient http; if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { - srs_error("http post uri failed. " - "url=%s, request=%s, response=%s, ret=%d", - url.c_str(), data.c_str(), res.c_str(), ret); + srs_error("http post on_connect uri failed. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); return ret; } if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { ret = ERROR_HTTP_DATA_INVLIAD; - srs_error("http hook validate failed. " - "res=%s, ret=%d", res.c_str(), ret); + srs_error("http hook on_connect validate failed. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); return ret; } srs_trace("http hook on_connect success. " - "url=%s, request=%s, response=%s, ret=%d", - url.c_str(), data.c_str(), res.c_str(), ret); + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); return ret; } +void SrsHttpHooks::on_close(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_warn("http uri parse on_close url failed, ignored. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return; + } + + /** + { + "action": "on_close", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_close" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_warn("http post on_close uri failed, ignored. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_warn("http hook on_close validate failed, ignored. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return; + } + + srs_trace("http hook on_close success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return; +} + +int SrsHttpHooks::on_publish(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_error("http uri parse on_publish url failed. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return ret; + } + + /** + { + "action": "on_publish", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_publish" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + << ',' + // stream + << '"' << "stream" << '"' << ':' + << '"' << req->stream << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_error("http post on_publish uri failed. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return ret; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_error("http hook on_publish validate failed. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return ret; + } + + srs_trace("http hook on_publish success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return ret; +} + +void SrsHttpHooks::on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_warn("http uri parse on_unpublish url failed, ignored. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return; + } + + /** + { + "action": "on_unpublish", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_unpublish" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + << ',' + // stream + << '"' << "stream" << '"' << ':' + << '"' << req->stream << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_warn("http post on_unpublish uri failed, ignored. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_warn("http hook on_unpublish validate failed, ignored. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return; + } + + srs_trace("http hook on_unpublish success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return; +} + +int SrsHttpHooks::on_play(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_error("http uri parse on_play url failed. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return ret; + } + + /** + { + "action": "on_play", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_play" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + << ',' + // stream + << '"' << "stream" << '"' << ':' + << '"' << req->stream << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_error("http post on_play uri failed. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return ret; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_error("http hook on_play validate failed. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return ret; + } + + srs_trace("http hook on_play success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return ret; +} + +void SrsHttpHooks::on_stop(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_warn("http uri parse on_stop url failed, ignored. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return; + } + + /** + { + "action": "on_stop", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_stop" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + << ',' + // stream + << '"' << "stream" << '"' << ':' + << '"' << req->stream << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_warn("http post on_stop uri failed, ignored. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_warn("http hook on_stop validate failed, ignored. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return; + } + + srs_trace("http hook on_stop success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return; +} + #endif diff --git a/trunk/src/core/srs_core_http.hpp b/trunk/src/core/srs_core_http.hpp index 426760c1f..0848dd66b 100644 --- a/trunk/src/core/srs_core_http.hpp +++ b/trunk/src/core/srs_core_http.hpp @@ -120,12 +120,50 @@ public: virtual ~SrsHttpHooks(); public: /** - * on_connect hook, + * on_connect hook, when client connect to srs. + * @param client_id the id of client on server. * @param url the api server url, to valid the client. * ignore if empty. * @return valid failed or connect to the url failed. */ - virtual int on_connect(std::string url, std::string ip, SrsRequest* req); + virtual int on_connect(std::string url, int client_id, std::string ip, SrsRequest* req); + /** + * on_close hook, when client disconnect to srs, where client is valid by on_connect. + * @param client_id the id of client on server. + * @param url the api server url, to process the event. + * ignore if empty. + */ + virtual void on_close(std::string url, int client_id, std::string ip, SrsRequest* req); + /** + * on_publish hook, when client(encoder) start to publish stream + * @param client_id the id of client on server. + * @param url the api server url, to valid the client. + * ignore if empty. + * @return valid failed or connect to the url failed. + */ + virtual int on_publish(std::string url, int client_id, std::string ip, SrsRequest* req); + /** + * on_unpublish hook, when client(encoder) stop publish stream. + * @param client_id the id of client on server. + * @param url the api server url, to process the event. + * ignore if empty. + */ + virtual void on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req); + /** + * on_play hook, when client start to play stream. + * @param client_id the id of client on server. + * @param url the api server url, to valid the client. + * ignore if empty. + * @return valid failed or connect to the url failed. + */ + virtual int on_play(std::string url, int client_id, std::string ip, SrsRequest* req); + /** + * on_stop hook, when client stop to play the stream. + * @param client_id the id of client on server. + * @param url the api server url, to process the event. + * ignore if empty. + */ + virtual void on_stop(std::string url, int client_id, std::string ip, SrsRequest* req); }; #endif