From e70609cea367138785cc507188c364cac937366b Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 1 Apr 2014 18:40:24 +0800 Subject: [PATCH] refine code, extract http hooks. --- trunk/configure | 2 +- trunk/src/app/srs_app_http.cpp | 730 ++------------------------ trunk/src/app/srs_app_http.hpp | 143 ++---- trunk/src/app/srs_app_http_conn.cpp | 25 +- trunk/src/app/srs_app_http_conn.hpp | 25 +- trunk/src/app/srs_app_http_hooks.cpp | 732 +++++++++++++++++++++++++++ trunk/src/app/srs_app_http_hooks.hpp | 135 +++++ trunk/src/app/srs_app_rtmp_conn.cpp | 1 + trunk/src/srs/srs.upp | 2 + 9 files changed, 948 insertions(+), 847 deletions(-) create mode 100644 trunk/src/app/srs_app_http_hooks.cpp create mode 100644 trunk/src/app/srs_app_http_hooks.hpp diff --git a/trunk/configure b/trunk/configure index 84e9f1180..270ca9c19 100755 --- a/trunk/configure +++ b/trunk/configure @@ -427,7 +427,7 @@ MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_socke "srs_app_codec" "srs_app_refer" "srs_app_hls" "srs_app_forward" "srs_app_encoder" "srs_app_http" "srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config" "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api" - "srs_app_http_conn") + "srs_app_http_conn" "srs_app_http_hooks") APP_INCS="src/app"; MODULE_DIR=${APP_INCS} . auto/modules.sh APP_OBJS="${MODULE_OBJS[@]}" # diff --git a/trunk/src/app/srs_app_http.cpp b/trunk/src/app/srs_app_http.cpp index d990460b5..079f84e12 100644 --- a/trunk/src/app/srs_app_http.cpp +++ b/trunk/src/app/srs_app_http.cpp @@ -23,26 +23,46 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include -#ifdef SRS_HTTP_CALLBACK - -#include -using namespace std; +#ifdef SRS_HTTP_PARSER #include -#include -#include -#include #include -#include #include #include #define SRS_DEFAULT_HTTP_PORT 80 -#define SRS_HTTP_RESPONSE_OK "0" -#define SRS_HTTP_HEADER_BUFFER 1024 -#define SRS_HTTP_BODY_BUFFER 32 * 1024 +SrsHttpMessage::SrsHttpMessage() +{ + body = new SrsBuffer(); + state = SrsHttpParseStateInit; +} + +SrsHttpMessage::~SrsHttpMessage() +{ + srs_freep(body); +} + +void SrsHttpMessage::reset() +{ + state = SrsHttpParseStateInit; + body->clear(); + url = ""; +} + +bool SrsHttpMessage::is_complete() +{ + return state == SrsHttpParseStateComplete; +} + +SrsHttpParser::SrsHttpParser() +{ +} + +SrsHttpParser::~SrsHttpParser() +{ +} SrsHttpUri::SrsHttpUri() { @@ -130,692 +150,4 @@ std::string SrsHttpUri::get_uri_field(std::string uri, http_parser_url* hp_u, ht return uri.substr(offset, len); } -SrsHttpClient::SrsHttpClient() -{ - connected = false; - stfd = NULL; -} - -SrsHttpClient::~SrsHttpClient() -{ - disconnect(); -} - -int SrsHttpClient::post(SrsHttpUri* uri, std::string req, std::string& res) -{ - int ret = ERROR_SUCCESS; - - if ((ret = connect(uri)) != ERROR_SUCCESS) { - srs_error("http connect server failed. ret=%d", ret); - return ret; - } - - // send POST request to uri - // POST %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s - std::stringstream ss; - ss << "POST " << uri->get_path() << " " - << "HTTP/1.1" << __CRLF - << "Host: " << uri->get_host() << __CRLF - << "Connection: Keep-Alive" << __CRLF - << "Content-Length: " << std::dec << req.length() << __CRLF - << "User-Agent: " << RTMP_SIG_SRS_NAME << RTMP_SIG_SRS_VERSION << __CRLF - << "Content-Type: text/html" << __CRLF - << __CRLF - << req; - - SrsSocket skt(stfd); - - std::string data = ss.str(); - ssize_t nwrite; - if ((ret = skt.write(data.c_str(), data.length(), &nwrite)) != ERROR_SUCCESS) { - // disconnect when error. - disconnect(); - - srs_error("write http post failed. ret=%d", ret); - return ret; - } - - if ((ret = parse_response(uri, &skt, &res)) != ERROR_SUCCESS) { - srs_error("parse http post response failed. ret=%d", ret); - return ret; - } - srs_info("parse http post response success."); - - return ret; -} - -void SrsHttpClient::disconnect() -{ - connected = false; - - srs_close_stfd(stfd); -} - -int SrsHttpClient::connect(SrsHttpUri* uri) -{ - int ret = ERROR_SUCCESS; - - if (connected) { - return ret; - } - - disconnect(); - - std::string ip = srs_dns_resolve(uri->get_host()); - if (ip.empty()) { - ret = ERROR_SYSTEM_IP_INVALID; - srs_error("dns resolve server error, ip empty. ret=%d", ret); - return ret; - } - - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1){ - ret = ERROR_SOCKET_CREATE; - srs_error("create socket error. ret=%d", ret); - return ret; - } - - stfd = st_netfd_open_socket(sock); - if(stfd == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket failed. ret=%d", ret); - return ret; - } - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(uri->get_port()); - addr.sin_addr.s_addr = inet_addr(ip.c_str()); - - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ - ret = ERROR_ST_CONNECT; - srs_error("connect to server error. " - "ip=%s, port=%d, ret=%d", ip.c_str(), uri->get_port(), ret); - return ret; - } - srs_info("connect to server success. " - "http url=%s, server=%s, ip=%s, port=%d", - uri->get_url(), uri->get_host(), ip.c_str(), uri->get_port()); - - connected = true; - - return ret; -} - -int SrsHttpClient::parse_response(SrsHttpUri* uri, SrsSocket* skt, std::string* response) -{ - int ret = ERROR_SUCCESS; - - int body_received = 0; - if ((ret = parse_response_header(skt, response, body_received)) != ERROR_SUCCESS) { - srs_error("parse response header failed. ret=%d", ret); - return ret; - } - - if ((ret = parse_response_body(uri, skt, response, body_received)) != ERROR_SUCCESS) { - srs_error("parse response body failed. ret=%d", ret); - return ret; - } - - srs_info("url %s download, body size=%"PRId64, uri->get_url(), http_header.content_length); - - return ret; -} - -int SrsHttpClient::parse_response_header(SrsSocket* skt, std::string* response, int& body_received) -{ - int ret = ERROR_SUCCESS; - - http_parser_settings settings; - - memset(&settings, 0, sizeof(settings)); - settings.on_headers_complete = on_headers_complete; - - http_parser parser; - http_parser_init(&parser, HTTP_RESPONSE); - // callback object ptr. - parser.data = (void*)this; - - // reset response header. - memset(&http_header, 0, sizeof(http_header)); - - // parser header. - char buf[SRS_HTTP_HEADER_BUFFER]; - for (;;) { - ssize_t nread; - if ((ret = skt->read(buf, (size_t)sizeof(buf), &nread)) != ERROR_SUCCESS) { - srs_error("read body from server failed. ret=%d", ret); - return ret; - } - - ssize_t nparsed = http_parser_execute(&parser, &settings, buf, nread); - srs_info("read_size=%d, nparsed=%d", (int)nread, (int)nparsed); - - // check header size. - if (http_header.nread != 0) { - body_received = nread - nparsed; - - srs_info("http header parsed, size=%d, content-length=%"PRId64", body-received=%d", - http_header.nread, http_header.content_length, body_received); - - if(response != NULL && body_received > 0){ - response->append(buf + nparsed, body_received); - } - - return ret; - } - - if (nparsed != nread) { - ret = ERROR_HTTP_PARSE_HEADER; - srs_error("parse response error, parsed(%d)!=read(%d), ret=%d", (int)nparsed, (int)nread, ret); - return ret; - } - } - - return ret; -} - -int SrsHttpClient::parse_response_body(SrsHttpUri* uri, SrsSocket* skt, std::string* response, int body_received) -{ - int ret = ERROR_SUCCESS; - - srs_assert(uri != NULL); - - uint64_t body_left = http_header.content_length - body_received; - - if (body_left <= 0) { - return ret; - } - - if (response != NULL) { - char buf[SRS_HTTP_BODY_BUFFER]; - - return parse_response_body_data( - uri, skt, response, (size_t)body_left, - (const void*)buf, (size_t)SRS_HTTP_BODY_BUFFER - ); - } else { - // if ignore response, use shared fast memory. - static char buf[SRS_HTTP_BODY_BUFFER]; - - return parse_response_body_data( - uri, skt, response, (size_t)body_left, - (const void*)buf, (size_t)SRS_HTTP_BODY_BUFFER - ); - } - - return ret; -} - -int SrsHttpClient::parse_response_body_data(SrsHttpUri* uri, SrsSocket* skt, std::string* response, size_t body_left, const void* buf, size_t size) -{ - int ret = ERROR_SUCCESS; - - srs_assert(uri != NULL); - - while (body_left > 0) { - ssize_t nread; - int size_to_read = srs_min(size, body_left); - if ((ret = skt->read(buf, size_to_read, &nread)) != ERROR_SUCCESS) { - srs_error("read header from server failed. ret=%d", ret); - return ret; - } - - if (response != NULL && nread > 0) { - response->append((char*)buf, nread); - } - - body_left -= nread; - srs_info("read url(%s) content partial %"PRId64"/%"PRId64"", - uri->get_url(), http_header.content_length - body_left, http_header.content_length); - } - - return ret; -} - -int SrsHttpClient::on_headers_complete(http_parser* parser) -{ - SrsHttpClient* obj = (SrsHttpClient*)parser->data; - obj->complete_header(parser); - - // see http_parser.c:1570, return 1 to skip body. - return 1; -} - -void SrsHttpClient::complete_header(http_parser* parser) -{ - // save the parser status when header parse completed. - memcpy(&http_header, parser, sizeof(http_header)); -} - -SrsHttpHooks::SrsHttpHooks() -{ -} - -SrsHttpHooks::~SrsHttpHooks() -{ -} - -int SrsHttpHooks::on_connect(std::string url, int client_id, std::string ip, SrsRequest* req) -{ - int ret = ERROR_SUCCESS; - - SrsHttpUri uri; - if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { - srs_error("http uri parse on_connect url failed. " - "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); - return ret; - } - - /** - { - "action": "on_connect", - "client_id": 1985, - "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", - "pageUrl": "http://www.test.com/live.html" - } - */ - std::stringstream ss; - ss << "{" - // action - << '"' << "action" << '"' << ':' - << '"' << "on_connect" << '"' - << ',' - // client_id - << '"' << "client_id" << '"' << ':' - << std::dec << client_id - << ',' - // ip - << '"' << "ip" << '"' << ':' - << '"' << ip << '"' - << ',' - // vhost - << '"' << "vhost" << '"' << ':' - << '"' << req->vhost << '"' - << ',' - // app - << '"' << "app" << '"' << ':' - << '"' << req->app << '"' - << ',' - // pageUrl - << '"' << "pageUrl" << '"' << ':' - << '"' << req->pageUrl << '"' - //<< ',' - << "}"; - std::string data = ss.str(); - std::string res; - - SrsHttpClient http; - if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { - srs_error("http post on_connect uri failed. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - return ret; - } - - if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { - ret = ERROR_HTTP_DATA_INVLIAD; - srs_error("http hook on_connect validate failed. " - "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); - return ret; - } - - srs_trace("http hook on_connect success. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - - return ret; -} - -void SrsHttpHooks::on_close(std::string url, int client_id, std::string ip, SrsRequest* req) -{ - int ret = ERROR_SUCCESS; - - SrsHttpUri uri; - if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { - srs_warn("http uri parse on_close url failed, ignored. " - "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); - return; - } - - /** - { - "action": "on_close", - "client_id": 1985, - "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", - "stream": "livestream" - } - */ - std::stringstream ss; - ss << "{" - // action - << '"' << "action" << '"' << ':' - << '"' << "on_close" << '"' - << ',' - // client_id - << '"' << "client_id" << '"' << ':' - << std::dec << client_id - << ',' - // ip - << '"' << "ip" << '"' << ':' - << '"' << ip << '"' - << ',' - // vhost - << '"' << "vhost" << '"' << ':' - << '"' << req->vhost << '"' - << ',' - // app - << '"' << "app" << '"' << ':' - << '"' << req->app << '"' - //<< ',' - << "}"; - std::string data = ss.str(); - std::string res; - - SrsHttpClient http; - if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { - srs_warn("http post on_close uri failed, ignored. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - return; - } - - if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { - ret = ERROR_HTTP_DATA_INVLIAD; - srs_warn("http hook on_close validate failed, ignored. " - "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); - return; - } - - srs_trace("http hook on_close success. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - - return; -} - -int SrsHttpHooks::on_publish(std::string url, int client_id, std::string ip, SrsRequest* req) -{ - int ret = ERROR_SUCCESS; - - SrsHttpUri uri; - if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { - srs_error("http uri parse on_publish url failed. " - "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); - return ret; - } - - /** - { - "action": "on_publish", - "client_id": 1985, - "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", - "stream": "livestream" - } - */ - std::stringstream ss; - ss << "{" - // action - << '"' << "action" << '"' << ':' - << '"' << "on_publish" << '"' - << ',' - // client_id - << '"' << "client_id" << '"' << ':' - << std::dec << client_id - << ',' - // ip - << '"' << "ip" << '"' << ':' - << '"' << ip << '"' - << ',' - // vhost - << '"' << "vhost" << '"' << ':' - << '"' << req->vhost << '"' - << ',' - // app - << '"' << "app" << '"' << ':' - << '"' << req->app << '"' - << ',' - // stream - << '"' << "stream" << '"' << ':' - << '"' << req->stream << '"' - //<< ',' - << "}"; - std::string data = ss.str(); - std::string res; - - SrsHttpClient http; - if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { - srs_error("http post on_publish uri failed. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - return ret; - } - - if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { - ret = ERROR_HTTP_DATA_INVLIAD; - srs_error("http hook on_publish validate failed. " - "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); - return ret; - } - - srs_trace("http hook on_publish success. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - - return ret; -} - -void SrsHttpHooks::on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req) -{ - int ret = ERROR_SUCCESS; - - SrsHttpUri uri; - if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { - srs_warn("http uri parse on_unpublish url failed, ignored. " - "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); - return; - } - - /** - { - "action": "on_unpublish", - "client_id": 1985, - "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", - "stream": "livestream" - } - */ - std::stringstream ss; - ss << "{" - // action - << '"' << "action" << '"' << ':' - << '"' << "on_unpublish" << '"' - << ',' - // client_id - << '"' << "client_id" << '"' << ':' - << std::dec << client_id - << ',' - // ip - << '"' << "ip" << '"' << ':' - << '"' << ip << '"' - << ',' - // vhost - << '"' << "vhost" << '"' << ':' - << '"' << req->vhost << '"' - << ',' - // app - << '"' << "app" << '"' << ':' - << '"' << req->app << '"' - << ',' - // stream - << '"' << "stream" << '"' << ':' - << '"' << req->stream << '"' - //<< ',' - << "}"; - std::string data = ss.str(); - std::string res; - - SrsHttpClient http; - if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { - srs_warn("http post on_unpublish uri failed, ignored. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - return; - } - - if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { - ret = ERROR_HTTP_DATA_INVLIAD; - srs_warn("http hook on_unpublish validate failed, ignored. " - "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); - return; - } - - srs_trace("http hook on_unpublish success. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - - return; -} - -int SrsHttpHooks::on_play(std::string url, int client_id, std::string ip, SrsRequest* req) -{ - int ret = ERROR_SUCCESS; - - SrsHttpUri uri; - if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { - srs_error("http uri parse on_play url failed. " - "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); - return ret; - } - - /** - { - "action": "on_play", - "client_id": 1985, - "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", - "stream": "livestream" - } - */ - std::stringstream ss; - ss << "{" - // action - << '"' << "action" << '"' << ':' - << '"' << "on_play" << '"' - << ',' - // client_id - << '"' << "client_id" << '"' << ':' - << std::dec << client_id - << ',' - // ip - << '"' << "ip" << '"' << ':' - << '"' << ip << '"' - << ',' - // vhost - << '"' << "vhost" << '"' << ':' - << '"' << req->vhost << '"' - << ',' - // app - << '"' << "app" << '"' << ':' - << '"' << req->app << '"' - << ',' - // stream - << '"' << "stream" << '"' << ':' - << '"' << req->stream << '"' - //<< ',' - << "}"; - std::string data = ss.str(); - std::string res; - - SrsHttpClient http; - if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { - srs_error("http post on_play uri failed. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - return ret; - } - - if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { - ret = ERROR_HTTP_DATA_INVLIAD; - srs_error("http hook on_play validate failed. " - "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); - return ret; - } - - srs_trace("http hook on_play success. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - - return ret; -} - -void SrsHttpHooks::on_stop(std::string url, int client_id, std::string ip, SrsRequest* req) -{ - int ret = ERROR_SUCCESS; - - SrsHttpUri uri; - if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { - srs_warn("http uri parse on_stop url failed, ignored. " - "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); - return; - } - - /** - { - "action": "on_stop", - "client_id": 1985, - "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", - "stream": "livestream" - } - */ - std::stringstream ss; - ss << "{" - // action - << '"' << "action" << '"' << ':' - << '"' << "on_stop" << '"' - << ',' - // client_id - << '"' << "client_id" << '"' << ':' - << std::dec << client_id - << ',' - // ip - << '"' << "ip" << '"' << ':' - << '"' << ip << '"' - << ',' - // vhost - << '"' << "vhost" << '"' << ':' - << '"' << req->vhost << '"' - << ',' - // app - << '"' << "app" << '"' << ':' - << '"' << req->app << '"' - << ',' - // stream - << '"' << "stream" << '"' << ':' - << '"' << req->stream << '"' - //<< ',' - << "}"; - std::string data = ss.str(); - std::string res; - - SrsHttpClient http; - if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { - srs_warn("http post on_stop uri failed, ignored. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - return; - } - - if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { - ret = ERROR_HTTP_DATA_INVLIAD; - srs_warn("http hook on_stop validate failed, ignored. " - "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); - return; - } - - srs_trace("http hook on_stop success. " - "client_id=%d, url=%s, request=%s, response=%s, ret=%d", - client_id, url.c_str(), data.c_str(), res.c_str(), ret); - - return; -} - #endif diff --git a/trunk/src/app/srs_app_http.hpp b/trunk/src/app/srs_app_http.hpp index 8e0f19ffd..a4193dda5 100644 --- a/trunk/src/app/srs_app_http.hpp +++ b/trunk/src/app/srs_app_http.hpp @@ -29,9 +29,17 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include +#ifdef SRS_HTTP_PARSER + +#include + +#include + #include -#ifdef SRS_HTTP_PARSER +class SrsBuffer; +class SrsRequest; +class SrsSocket; // http specification // CR = @@ -49,16 +57,42 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define __CRLF "\r\n" // 0x0D0A #define __CRLFCRLF "\r\n\r\n" // 0x0D0A0D0A -#endif +enum SrsHttpParseState { + SrsHttpParseStateInit = 0, + SrsHttpParseStateStart, + SrsHttpParseStateComplete +}; -#ifdef SRS_HTTP_CALLBACK +/** +* the http message, request or response. +*/ +class SrsHttpMessage +{ +public: + std::string url; + http_parser header; + SrsBuffer* body; + SrsHttpParseState state; + + SrsHttpMessage(); + virtual ~SrsHttpMessage(); + + virtual void reset(); + virtual bool is_complete(); +}; -class SrsRequest; -class SrsSocket; - -#include - -#include +/** +* wrapper for http-parser, +* provides HTTP message originted service. +*/ +class SrsHttpParser +{ +public: + SrsHttpParser(); + virtual ~SrsHttpParser(); +public: + //virtual int parse_requst(SrsHttpMessage** ppreq); +}; /** * used to resolve the http uri. @@ -93,97 +127,6 @@ private: virtual std::string get_uri_field(std::string uri, http_parser_url* hp_u, http_parser_url_fields field); }; -/** -* http client to GET/POST/PUT/DELETE uri -*/ -class SrsHttpClient -{ -private: - bool connected; - st_netfd_t stfd; -private: - http_parser http_header; -public: - SrsHttpClient(); - virtual ~SrsHttpClient(); -public: - /** - * to post data to the uri. - * @param req the data post to uri. - * @param res the response data from server. - */ - virtual int post(SrsHttpUri* uri, std::string req, std::string& res); -private: - virtual void disconnect(); - virtual int connect(SrsHttpUri* uri); -private: - virtual int parse_response(SrsHttpUri* uri, SrsSocket* skt, std::string* response); - virtual int parse_response_header(SrsSocket* skt, std::string* response, int& body_received); - virtual int parse_response_body(SrsHttpUri* uri, SrsSocket* skt, std::string* response, int body_received); - virtual int parse_response_body_data(SrsHttpUri* uri, SrsSocket* skt, std::string* response, size_t body_left, const void* buf, size_t size); -private: - static int on_headers_complete(http_parser* parser); - virtual void complete_header(http_parser* parser); -}; - -/** -* the http hooks, http callback api, -* for some event, such as on_connect, call -* a http api(hooks). -*/ -class SrsHttpHooks -{ -public: - SrsHttpHooks(); - virtual ~SrsHttpHooks(); -public: - /** - * on_connect hook, when client connect to srs. - * @param client_id the id of client on server. - * @param url the api server url, to valid the client. - * ignore if empty. - * @return valid failed or connect to the url failed. - */ - virtual int on_connect(std::string url, int client_id, std::string ip, SrsRequest* req); - /** - * on_close hook, when client disconnect to srs, where client is valid by on_connect. - * @param client_id the id of client on server. - * @param url the api server url, to process the event. - * ignore if empty. - */ - virtual void on_close(std::string url, int client_id, std::string ip, SrsRequest* req); - /** - * on_publish hook, when client(encoder) start to publish stream - * @param client_id the id of client on server. - * @param url the api server url, to valid the client. - * ignore if empty. - * @return valid failed or connect to the url failed. - */ - virtual int on_publish(std::string url, int client_id, std::string ip, SrsRequest* req); - /** - * on_unpublish hook, when client(encoder) stop publish stream. - * @param client_id the id of client on server. - * @param url the api server url, to process the event. - * ignore if empty. - */ - virtual void on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req); - /** - * on_play hook, when client start to play stream. - * @param client_id the id of client on server. - * @param url the api server url, to valid the client. - * ignore if empty. - * @return valid failed or connect to the url failed. - */ - virtual int on_play(std::string url, int client_id, std::string ip, SrsRequest* req); - /** - * on_stop hook, when client stop to play the stream. - * @param client_id the id of client on server. - * @param url the api server url, to process the event. - * ignore if empty. - */ - virtual void on_stop(std::string url, int client_id, std::string ip, SrsRequest* req); -}; - #endif #endif \ No newline at end of file diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 796286399..26b9bdcaa 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -36,33 +36,10 @@ using namespace std; #define SRS_HTTP_HEADER_BUFFER 1024 -SrsHttpRequest::SrsHttpRequest() -{ - body = new SrsBuffer(); - state = SrsHttpParseStateInit; -} - -SrsHttpRequest::~SrsHttpRequest() -{ - srs_freep(body); -} - -void SrsHttpRequest::reset() -{ - state = SrsHttpParseStateInit; - body->clear(); - url = ""; -} - -bool SrsHttpRequest::is_complete() -{ - return state == SrsHttpParseStateComplete; -} - SrsHttpConn::SrsHttpConn(SrsServer* srs_server, st_netfd_t client_stfd) : SrsConnection(srs_server, client_stfd) { - req = new SrsHttpRequest(); + req = new SrsHttpMessage(); } SrsHttpConn::~SrsHttpConn() diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 910c0b7a3..2cc05eb24 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -38,33 +38,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class SrsSocket; -class SrsBuffer; - -enum SrsHttpParseState { - SrsHttpParseStateInit = 0, - SrsHttpParseStateStart, - SrsHttpParseStateComplete -}; - -class SrsHttpRequest -{ -public: - std::string url; - http_parser header; - SrsBuffer* body; - SrsHttpParseState state; - - SrsHttpRequest(); - virtual ~SrsHttpRequest(); - - virtual void reset(); - virtual bool is_complete(); -}; +class SrsHttpMessage; class SrsHttpConn : public SrsConnection { private: - SrsHttpRequest* req; + SrsHttpMessage* req; public: SrsHttpConn(SrsServer* srs_server, st_netfd_t client_stfd); virtual ~SrsHttpConn(); diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp new file mode 100644 index 000000000..8ad4f42bd --- /dev/null +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -0,0 +1,732 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2014 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#ifdef SRS_HTTP_CALLBACK + +#include +using namespace std; + +#include + +#include +#include +#include +#include +#include + +#define SRS_HTTP_RESPONSE_OK "0" + +#define SRS_HTTP_HEADER_BUFFER 1024 +#define SRS_HTTP_BODY_BUFFER 32 * 1024 + +SrsHttpClient::SrsHttpClient() +{ + connected = false; + stfd = NULL; +} + +SrsHttpClient::~SrsHttpClient() +{ + disconnect(); +} + +int SrsHttpClient::post(SrsHttpUri* uri, std::string req, std::string& res) +{ + int ret = ERROR_SUCCESS; + + if ((ret = connect(uri)) != ERROR_SUCCESS) { + srs_error("http connect server failed. ret=%d", ret); + return ret; + } + + // send POST request to uri + // POST %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s + std::stringstream ss; + ss << "POST " << uri->get_path() << " " + << "HTTP/1.1" << __CRLF + << "Host: " << uri->get_host() << __CRLF + << "Connection: Keep-Alive" << __CRLF + << "Content-Length: " << std::dec << req.length() << __CRLF + << "User-Agent: " << RTMP_SIG_SRS_NAME << RTMP_SIG_SRS_VERSION << __CRLF + << "Content-Type: text/html" << __CRLF + << __CRLF + << req; + + SrsSocket skt(stfd); + + std::string data = ss.str(); + ssize_t nwrite; + if ((ret = skt.write(data.c_str(), data.length(), &nwrite)) != ERROR_SUCCESS) { + // disconnect when error. + disconnect(); + + srs_error("write http post failed. ret=%d", ret); + return ret; + } + + if ((ret = parse_response(uri, &skt, &res)) != ERROR_SUCCESS) { + srs_error("parse http post response failed. ret=%d", ret); + return ret; + } + srs_info("parse http post response success."); + + return ret; +} + +void SrsHttpClient::disconnect() +{ + connected = false; + + srs_close_stfd(stfd); +} + +int SrsHttpClient::connect(SrsHttpUri* uri) +{ + int ret = ERROR_SUCCESS; + + if (connected) { + return ret; + } + + disconnect(); + + std::string ip = srs_dns_resolve(uri->get_host()); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + return ret; + } + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + stfd = st_netfd_open_socket(sock); + if(stfd == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(uri->get_port()); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ + ret = ERROR_ST_CONNECT; + srs_error("connect to server error. " + "ip=%s, port=%d, ret=%d", ip.c_str(), uri->get_port(), ret); + return ret; + } + srs_info("connect to server success. " + "http url=%s, server=%s, ip=%s, port=%d", + uri->get_url(), uri->get_host(), ip.c_str(), uri->get_port()); + + connected = true; + + return ret; +} + +int SrsHttpClient::parse_response(SrsHttpUri* uri, SrsSocket* skt, std::string* response) +{ + int ret = ERROR_SUCCESS; + + int body_received = 0; + if ((ret = parse_response_header(skt, response, body_received)) != ERROR_SUCCESS) { + srs_error("parse response header failed. ret=%d", ret); + return ret; + } + + if ((ret = parse_response_body(uri, skt, response, body_received)) != ERROR_SUCCESS) { + srs_error("parse response body failed. ret=%d", ret); + return ret; + } + + srs_info("url %s download, body size=%"PRId64, uri->get_url(), http_header.content_length); + + return ret; +} + +int SrsHttpClient::parse_response_header(SrsSocket* skt, std::string* response, int& body_received) +{ + int ret = ERROR_SUCCESS; + + http_parser_settings settings; + + memset(&settings, 0, sizeof(settings)); + settings.on_headers_complete = on_headers_complete; + + http_parser parser; + http_parser_init(&parser, HTTP_RESPONSE); + // callback object ptr. + parser.data = (void*)this; + + // reset response header. + memset(&http_header, 0, sizeof(http_header)); + + // parser header. + char buf[SRS_HTTP_HEADER_BUFFER]; + for (;;) { + ssize_t nread; + if ((ret = skt->read(buf, (size_t)sizeof(buf), &nread)) != ERROR_SUCCESS) { + srs_error("read body from server failed. ret=%d", ret); + return ret; + } + + ssize_t nparsed = http_parser_execute(&parser, &settings, buf, nread); + srs_info("read_size=%d, nparsed=%d", (int)nread, (int)nparsed); + + // check header size. + if (http_header.nread != 0) { + body_received = nread - nparsed; + + srs_info("http header parsed, size=%d, content-length=%"PRId64", body-received=%d", + http_header.nread, http_header.content_length, body_received); + + if(response != NULL && body_received > 0){ + response->append(buf + nparsed, body_received); + } + + return ret; + } + + if (nparsed != nread) { + ret = ERROR_HTTP_PARSE_HEADER; + srs_error("parse response error, parsed(%d)!=read(%d), ret=%d", (int)nparsed, (int)nread, ret); + return ret; + } + } + + return ret; +} + +int SrsHttpClient::parse_response_body(SrsHttpUri* uri, SrsSocket* skt, std::string* response, int body_received) +{ + int ret = ERROR_SUCCESS; + + srs_assert(uri != NULL); + + uint64_t body_left = http_header.content_length - body_received; + + if (body_left <= 0) { + return ret; + } + + if (response != NULL) { + char buf[SRS_HTTP_BODY_BUFFER]; + + return parse_response_body_data( + uri, skt, response, (size_t)body_left, + (const void*)buf, (size_t)SRS_HTTP_BODY_BUFFER + ); + } else { + // if ignore response, use shared fast memory. + static char buf[SRS_HTTP_BODY_BUFFER]; + + return parse_response_body_data( + uri, skt, response, (size_t)body_left, + (const void*)buf, (size_t)SRS_HTTP_BODY_BUFFER + ); + } + + return ret; +} + +int SrsHttpClient::parse_response_body_data(SrsHttpUri* uri, SrsSocket* skt, std::string* response, size_t body_left, const void* buf, size_t size) +{ + int ret = ERROR_SUCCESS; + + srs_assert(uri != NULL); + + while (body_left > 0) { + ssize_t nread; + int size_to_read = srs_min(size, body_left); + if ((ret = skt->read(buf, size_to_read, &nread)) != ERROR_SUCCESS) { + srs_error("read header from server failed. ret=%d", ret); + return ret; + } + + if (response != NULL && nread > 0) { + response->append((char*)buf, nread); + } + + body_left -= nread; + srs_info("read url(%s) content partial %"PRId64"/%"PRId64"", + uri->get_url(), http_header.content_length - body_left, http_header.content_length); + } + + return ret; +} + +int SrsHttpClient::on_headers_complete(http_parser* parser) +{ + SrsHttpClient* obj = (SrsHttpClient*)parser->data; + obj->complete_header(parser); + + // see http_parser.c:1570, return 1 to skip body. + return 1; +} + +void SrsHttpClient::complete_header(http_parser* parser) +{ + // save the parser status when header parse completed. + memcpy(&http_header, parser, sizeof(http_header)); +} + +SrsHttpHooks::SrsHttpHooks() +{ +} + +SrsHttpHooks::~SrsHttpHooks() +{ +} + +int SrsHttpHooks::on_connect(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_error("http uri parse on_connect url failed. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return ret; + } + + /** + { + "action": "on_connect", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "pageUrl": "http://www.test.com/live.html" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_connect" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + << ',' + // pageUrl + << '"' << "pageUrl" << '"' << ':' + << '"' << req->pageUrl << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_error("http post on_connect uri failed. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return ret; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_error("http hook on_connect validate failed. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return ret; + } + + srs_trace("http hook on_connect success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return ret; +} + +void SrsHttpHooks::on_close(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_warn("http uri parse on_close url failed, ignored. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return; + } + + /** + { + "action": "on_close", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_close" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_warn("http post on_close uri failed, ignored. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_warn("http hook on_close validate failed, ignored. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return; + } + + srs_trace("http hook on_close success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return; +} + +int SrsHttpHooks::on_publish(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_error("http uri parse on_publish url failed. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return ret; + } + + /** + { + "action": "on_publish", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_publish" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + << ',' + // stream + << '"' << "stream" << '"' << ':' + << '"' << req->stream << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_error("http post on_publish uri failed. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return ret; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_error("http hook on_publish validate failed. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return ret; + } + + srs_trace("http hook on_publish success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return ret; +} + +void SrsHttpHooks::on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_warn("http uri parse on_unpublish url failed, ignored. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return; + } + + /** + { + "action": "on_unpublish", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_unpublish" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + << ',' + // stream + << '"' << "stream" << '"' << ':' + << '"' << req->stream << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_warn("http post on_unpublish uri failed, ignored. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_warn("http hook on_unpublish validate failed, ignored. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return; + } + + srs_trace("http hook on_unpublish success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return; +} + +int SrsHttpHooks::on_play(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_error("http uri parse on_play url failed. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return ret; + } + + /** + { + "action": "on_play", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_play" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + << ',' + // stream + << '"' << "stream" << '"' << ':' + << '"' << req->stream << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_error("http post on_play uri failed. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return ret; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_error("http hook on_play validate failed. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return ret; + } + + srs_trace("http hook on_play success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return ret; +} + +void SrsHttpHooks::on_stop(std::string url, int client_id, std::string ip, SrsRequest* req) +{ + int ret = ERROR_SUCCESS; + + SrsHttpUri uri; + if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { + srs_warn("http uri parse on_stop url failed, ignored. " + "client_id=%d, url=%s, ret=%d", client_id, url.c_str(), ret); + return; + } + + /** + { + "action": "on_stop", + "client_id": 1985, + "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", + "stream": "livestream" + } + */ + std::stringstream ss; + ss << "{" + // action + << '"' << "action" << '"' << ':' + << '"' << "on_stop" << '"' + << ',' + // client_id + << '"' << "client_id" << '"' << ':' + << std::dec << client_id + << ',' + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + << ',' + // stream + << '"' << "stream" << '"' << ':' + << '"' << req->stream << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; + + SrsHttpClient http; + if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { + srs_warn("http post on_stop uri failed, ignored. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + return; + } + + if (res.empty() || res != SRS_HTTP_RESPONSE_OK) { + ret = ERROR_HTTP_DATA_INVLIAD; + srs_warn("http hook on_stop validate failed, ignored. " + "client_id=%d, res=%s, ret=%d", client_id, res.c_str(), ret); + return; + } + + srs_trace("http hook on_stop success. " + "client_id=%d, url=%s, request=%s, response=%s, ret=%d", + client_id, url.c_str(), data.c_str(), res.c_str(), ret); + + return; +} + +#endif diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp new file mode 100644 index 000000000..1570515f5 --- /dev/null +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -0,0 +1,135 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2014 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_APP_HTTP_HOOKS_HPP +#define SRS_APP_HTTP_HOOKS_HPP + +/* +#include +*/ +#include + +#ifdef SRS_HTTP_CALLBACK + +#include + +class SrsHttpUri; +class SrsSocket; +class SrsRequest; + +#include + +/** +* http client to GET/POST/PUT/DELETE uri +*/ +class SrsHttpClient +{ +private: + bool connected; + st_netfd_t stfd; +private: + http_parser http_header; +public: + SrsHttpClient(); + virtual ~SrsHttpClient(); +public: + /** + * to post data to the uri. + * @param req the data post to uri. + * @param res the response data from server. + */ + virtual int post(SrsHttpUri* uri, std::string req, std::string& res); +private: + virtual void disconnect(); + virtual int connect(SrsHttpUri* uri); +private: + virtual int parse_response(SrsHttpUri* uri, SrsSocket* skt, std::string* response); + virtual int parse_response_header(SrsSocket* skt, std::string* response, int& body_received); + virtual int parse_response_body(SrsHttpUri* uri, SrsSocket* skt, std::string* response, int body_received); + virtual int parse_response_body_data(SrsHttpUri* uri, SrsSocket* skt, std::string* response, size_t body_left, const void* buf, size_t size); +private: + static int on_headers_complete(http_parser* parser); + virtual void complete_header(http_parser* parser); +}; + +/** +* the http hooks, http callback api, +* for some event, such as on_connect, call +* a http api(hooks). +*/ +class SrsHttpHooks +{ +public: + SrsHttpHooks(); + virtual ~SrsHttpHooks(); +public: + /** + * on_connect hook, when client connect to srs. + * @param client_id the id of client on server. + * @param url the api server url, to valid the client. + * ignore if empty. + * @return valid failed or connect to the url failed. + */ + virtual int on_connect(std::string url, int client_id, std::string ip, SrsRequest* req); + /** + * on_close hook, when client disconnect to srs, where client is valid by on_connect. + * @param client_id the id of client on server. + * @param url the api server url, to process the event. + * ignore if empty. + */ + virtual void on_close(std::string url, int client_id, std::string ip, SrsRequest* req); + /** + * on_publish hook, when client(encoder) start to publish stream + * @param client_id the id of client on server. + * @param url the api server url, to valid the client. + * ignore if empty. + * @return valid failed or connect to the url failed. + */ + virtual int on_publish(std::string url, int client_id, std::string ip, SrsRequest* req); + /** + * on_unpublish hook, when client(encoder) stop publish stream. + * @param client_id the id of client on server. + * @param url the api server url, to process the event. + * ignore if empty. + */ + virtual void on_unpublish(std::string url, int client_id, std::string ip, SrsRequest* req); + /** + * on_play hook, when client start to play stream. + * @param client_id the id of client on server. + * @param url the api server url, to valid the client. + * ignore if empty. + * @return valid failed or connect to the url failed. + */ + virtual int on_play(std::string url, int client_id, std::string ip, SrsRequest* req); + /** + * on_stop hook, when client stop to play the stream. + * @param client_id the id of client on server. + * @param url the api server url, to process the event. + * ignore if empty. + */ + virtual void on_stop(std::string url, int client_id, std::string ip, SrsRequest* req); +}; + +#endif + +#endif \ No newline at end of file diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index faf3dafdd..147bf2f00 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -41,6 +41,7 @@ using namespace std; #include #include #include +#include SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd) : SrsConnection(srs_server, client_stfd) diff --git a/trunk/src/srs/srs.upp b/trunk/src/srs/srs.upp index e04462986..e96552ac0 100755 --- a/trunk/src/srs/srs.upp +++ b/trunk/src/srs/srs.upp @@ -59,6 +59,8 @@ file ..\app\srs_app_http_api.cpp, ..\app\srs_app_http_conn.hpp, ..\app\srs_app_http_conn.cpp, + ..\app\srs_app_http_hooks.hpp, + ..\app\srs_app_http_hooks.cpp, ..\app\srs_app_log.hpp, ..\app\srs_app_log.cpp, ..\app\srs_app_refer.hpp,