From 025b70733001511b858c5bbf281bab10c71d4d37 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 4 Mar 2015 18:20:15 +0800 Subject: [PATCH] refine the http request reader. --- trunk/auto/depends.sh | 14 ++ trunk/src/app/srs_app_http.cpp | 279 ++++++++++++++++---------- trunk/src/app/srs_app_http.hpp | 96 +++++++-- trunk/src/app/srs_app_http_api.cpp | 11 +- trunk/src/app/srs_app_http_client.cpp | 11 +- trunk/src/app/srs_app_http_conn.cpp | 11 +- 6 files changed, 289 insertions(+), 133 deletions(-) diff --git a/trunk/auto/depends.sh b/trunk/auto/depends.sh index becd2c311..14800a3a3 100755 --- a/trunk/auto/depends.sh +++ b/trunk/auto/depends.sh @@ -74,6 +74,13 @@ function Ubuntu_prepare() echo "install patch success" fi + unzip --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then + echo "install unzip" + require_sudoer "sudo apt-get install -y --force-yes unzip" + sudo apt-get install -y --force-yes unzip; ret=$?; if [[ 0 -ne $ret ]]; then return $ret; fi + echo "install unzip success" + fi + if [ $SRS_FFMPEG_TOOL = YES ]; then autoconf --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then echo "install autoconf" @@ -170,6 +177,13 @@ function Centos_prepare() echo "install patch success" fi + unzip --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then + echo "install unzip" + require_sudoer "sudo yum install -y unzip" + sudo yum install -y unzip; ret=$?; if [[ 0 -ne $ret ]]; then return $ret; fi + echo "install unzip success" + fi + if [ $SRS_FFMPEG_TOOL = YES ]; then automake --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then echo "install automake" diff --git a/trunk/src/app/srs_app_http.cpp b/trunk/src/app/srs_app_http.cpp index 5ce562b43..a11d090c3 100644 --- a/trunk/src/app/srs_app_http.cpp +++ b/trunk/src/app/srs_app_http.cpp @@ -43,6 +43,7 @@ using namespace std; #define SRS_DEFAULT_HTTP_PORT 80 #define SRS_HTTP_HEADER_BUFFER 1024 +#define SRS_HTTP_BODY_BUFFER 1024 // for http parser macros #define SRS_CONSTS_HTTP_OPTIONS HTTP_OPTIONS @@ -227,6 +228,22 @@ ISrsHttpResponseWriter::~ISrsHttpResponseWriter() { } +ISrsHttpResponseReader::ISrsHttpResponseReader() +{ +} + +ISrsHttpResponseReader::~ISrsHttpResponseReader() +{ +} + +ISrsHttpResponseAppender::ISrsHttpResponseAppender() +{ +} + +ISrsHttpResponseAppender::~ISrsHttpResponseAppender() +{ +} + ISrsHttpHandler::ISrsHttpHandler() { entry = NULL; @@ -833,11 +850,63 @@ int SrsHttpResponseWriter::send_header(char* data, int size) return skt->write((void*)buf.c_str(), buf.length(), NULL); } -SrsHttpMessage::SrsHttpMessage() +SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io) +{ + skt = io; + owner = msg; + cache = new SrsSimpleBuffer(); +} + +SrsHttpResponseReader::~SrsHttpResponseReader() +{ + srs_freep(cache); +} + +int SrsHttpResponseReader::read(int max, std::string& data) +{ + int ret = ERROR_SUCCESS; + + // read from cache first. + if (cache->length() > 0) { + int nb_bytes = srs_min(cache->length(), max); + data.append(cache->bytes(), nb_bytes); + cache->erase(nb_bytes); + + return ret; + } + + // read some from underlayer. + int left = srs_max(SRS_HTTP_BODY_BUFFER, max); + + // read from io. + char* buf = new char[left]; + SrsAutoFree(char, buf); + + ssize_t nread = 0; + if ((ret = skt->read(buf, left, &nread)) != ERROR_SUCCESS) { + return ret; + } + + if (nread) { + data.append(buf, nread); + } + + return ret; +} + +int SrsHttpResponseReader::append(char* data, int size) +{ + int ret = ERROR_SUCCESS; + + cache->append(data, size); + + return ret; +} + +SrsHttpMessage::SrsHttpMessage(SrsStSocket* io) { - _body = new SrsSimpleBuffer(); - _state = SrsHttpParseStateInit; _uri = new SrsHttpUri(); + _body = new SrsHttpResponseReader(this, io); _http_ts_send_buffer = new char[__SRS_HTTP_TS_SEND_BUFFER_SIZE]; } @@ -898,18 +967,6 @@ char* SrsHttpMessage::http_ts_send_buffer() return _http_ts_send_buffer; } -void SrsHttpMessage::reset() -{ - _state = SrsHttpParseStateInit; - _body->erase(_body->length()); - _url = ""; -} - -bool SrsHttpMessage::is_complete() -{ - return _state == SrsHttpParseStateComplete; -} - u_int8_t SrsHttpMessage::method() { return (u_int8_t)_header.method; @@ -993,25 +1050,49 @@ string SrsHttpMessage::path() return _uri->get_path(); } -string SrsHttpMessage::body() +void SrsHttpMessage::set(string url, http_parser* header, string body, vector& headers) { - std::string b; + _url = url; + _header = *header; + _headers = headers; + + if (!body.empty()) { + _body->append((char*)body.data(), (int)body.length()); + } +} + +int SrsHttpMessage::body_read_all(string body) +{ + int ret = ERROR_SUCCESS; - if (_body && _body->length() > 0) { - b.append(_body->bytes(), _body->length()); + // when content length specified, read specified length. + if ((int64_t)_header.content_length > 0) { + int left = (int)(int64_t)_header.content_length; + while (left > 0) { + int nb_read = (int)body.length(); + if ((ret = _body->read(left, body)) != ERROR_SUCCESS) { + return ret; + } + + left -= (int)body.length() - nb_read; + } + return ret; } - return b; -} - -char* SrsHttpMessage::body_raw() -{ - return _body? _body->bytes() : NULL; -} - -int64_t SrsHttpMessage::body_size() -{ - return (int64_t)_body->length(); + // chunked encoding, read util got size=0 chunk. + for (;;) { + int nb_read = (int)body.length(); + if ((ret = _body->read(0, body)) != ERROR_SUCCESS) { + return ret; + } + + // eof. + if (nb_read == (int)body.length()) { + break; + } + } + + return ret; } int64_t SrsHttpMessage::content_length() @@ -1019,26 +1100,6 @@ int64_t SrsHttpMessage::content_length() return _header.content_length; } -void SrsHttpMessage::set_url(string url) -{ - _url = url; -} - -void SrsHttpMessage::set_state(SrsHttpParseState state) -{ - _state = state; -} - -void SrsHttpMessage::set_header(http_parser* header) -{ - memcpy(&_header, header, sizeof(http_parser)); -} - -void SrsHttpMessage::append_body(const char* body, int length) -{ - _body->append(body, length); -} - string SrsHttpMessage::query_get(string key) { std::string v; @@ -1052,33 +1113,28 @@ string SrsHttpMessage::query_get(string key) int SrsHttpMessage::request_header_count() { - return (int)headers.size(); + return (int)_headers.size(); } string SrsHttpMessage::request_header_key_at(int index) { srs_assert(index < request_header_count()); - SrsHttpHeaderField item = headers[index]; + SrsHttpHeaderField item = _headers[index]; return item.first; } string SrsHttpMessage::request_header_value_at(int index) { srs_assert(index < request_header_count()); - SrsHttpHeaderField item = headers[index]; + SrsHttpHeaderField item = _headers[index]; return item.second; } -void SrsHttpMessage::set_request_header(string key, string value) -{ - headers.push_back(std::make_pair(key, value)); -} - string SrsHttpMessage::get_request_header(string name) { std::vector::iterator it; - for (it = headers.begin(); it != headers.end(); ++it) { + for (it = _headers.begin(); it != _headers.end(); ++it) { SrsHttpHeaderField& elem = *it; std::string key = elem.first; std::string value = elem.second; @@ -1092,12 +1148,10 @@ string SrsHttpMessage::get_request_header(string name) SrsHttpParser::SrsHttpParser() { - msg = NULL; } SrsHttpParser::~SrsHttpParser() { - srs_freep(msg); } int SrsHttpParser::initialize(enum http_parser_type type) @@ -1125,26 +1179,31 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) *ppmsg = NULL; int ret = ERROR_SUCCESS; - - // the msg must be always NULL - srs_assert(msg == NULL); - msg = new SrsHttpMessage(); // reset request data. filed_name = ""; - - // reset response header. - msg->reset(); + field_value = ""; + expect_filed_name = true; + state = SrsHttpParseStateInit; + header = http_parser(); + url = ""; + headers.clear(); + body = ""; // do parse if ((ret = parse_message_imp(skt)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("parse http msg failed. ret=%d", ret); } - srs_freep(msg); return ret; } + // create msg + SrsHttpMessage* msg = new SrsHttpMessage(skt); + + // dumps the header and body read. + msg->set(url, &header, body, headers); + // initalize http msg, parse url. if ((ret = msg->initialize()) != ERROR_SUCCESS) { srs_error("initialize http msg failed. ret=%d", ret); @@ -1154,7 +1213,6 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) // parse ok, return the msg. *ppmsg = msg; - msg = NULL; return ret; } @@ -1163,13 +1221,14 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt) { int ret = ERROR_SUCCESS; - // the msg should never be NULL - srs_assert(msg != NULL); + ssize_t nread = 0; + ssize_t nparsed = 0; + + char* buf = new char[SRS_HTTP_HEADER_BUFFER]; + SrsAutoFree(char, buf); // parser header. - char buf[SRS_HTTP_HEADER_BUFFER]; for (;;) { - ssize_t nread; if ((ret = skt->read(buf, (size_t)sizeof(buf), &nread)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("read body from server failed. ret=%d", ret); @@ -1177,20 +1236,28 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt) return ret; } - ssize_t nparsed = http_parser_execute(&parser, &settings, buf, nread); + nparsed = http_parser_execute(&parser, &settings, buf, nread); srs_info("read_size=%d, nparsed=%d", (int)nread, (int)nparsed); - // check header size. - if (msg->is_complete()) { - return ret; + // ok atleast header completed, + // never wait for body completed, for maybe chunked. + if (state == SrsHttpParseStateHeaderComplete || state == SrsHttpParseStateMessageComplete) { + break; } + // when not complete, the parser should consume all bytes. if (nparsed != nread) { ret = ERROR_HTTP_PARSE_HEADER; srs_error("parse response error, parsed(%d)!=read(%d), ret=%d", (int)nparsed, (int)nread, ret); return ret; } } + + // when parse completed, cache the left body. + if (nread && nparsed < nread) { + body.append(buf + nparsed, nread - nparsed); + srs_info("cache %d bytes read body.", nread - nparsed); + } return ret; } @@ -1198,7 +1265,9 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt) int SrsHttpParser::on_message_begin(http_parser* parser) { SrsHttpParser* obj = (SrsHttpParser*)parser->data; - obj->msg->set_state(SrsHttpParseStateStart); + srs_assert(obj); + + obj->state = SrsHttpParseStateStart; srs_info("***MESSAGE BEGIN***"); @@ -1208,7 +1277,11 @@ int SrsHttpParser::on_message_begin(http_parser* parser) int SrsHttpParser::on_headers_complete(http_parser* parser) { SrsHttpParser* obj = (SrsHttpParser*)parser->data; - obj->msg->set_header(parser); + srs_assert(obj); + + obj->header = *parser; + // save the parser when header parse completed. + obj->state = SrsHttpParseStateHeaderComplete; srs_info("***HEADERS COMPLETE***"); @@ -1219,8 +1292,10 @@ int SrsHttpParser::on_headers_complete(http_parser* parser) int SrsHttpParser::on_message_complete(http_parser* parser) { SrsHttpParser* obj = (SrsHttpParser*)parser->data; - // save the parser when header parse completed. - obj->msg->set_state(SrsHttpParseStateComplete); + srs_assert(obj); + + // save the parser when body parse completed. + obj->state = SrsHttpParseStateMessageComplete; srs_info("***MESSAGE COMPLETE***\n"); @@ -1230,13 +1305,10 @@ int SrsHttpParser::on_message_complete(http_parser* parser) int SrsHttpParser::on_url(http_parser* parser, const char* at, size_t length) { SrsHttpParser* obj = (SrsHttpParser*)parser->data; + srs_assert(obj); if (length > 0) { - std::string url; - - url.append(at, (int)length); - - obj->msg->set_url(url); + obj->url.append(at, (int)length); } srs_info("Method: %d, Url: %.*s", parser->method, (int)length, at); @@ -1247,44 +1319,47 @@ int SrsHttpParser::on_url(http_parser* parser, const char* at, size_t length) int SrsHttpParser::on_header_field(http_parser* parser, const char* at, size_t length) { SrsHttpParser* obj = (SrsHttpParser*)parser->data; + srs_assert(obj); + + // field value=>name, reap the field. + if (!obj->expect_filed_name) { + obj->headers.push_back(std::make_pair(obj->filed_name, obj->field_value)); + + // reset the field name when value parsed. + obj->filed_name = ""; + obj->field_value = ""; + } + obj->expect_filed_name = true; if (length > 0) { - srs_assert(obj); obj->filed_name.append(at, (int)length); } - srs_info("Header field: %.*s", (int)length, at); + srs_trace("Header field: %.*s", (int)length, at); return 0; } int SrsHttpParser::on_header_value(http_parser* parser, const char* at, size_t length) { SrsHttpParser* obj = (SrsHttpParser*)parser->data; + srs_assert(obj); if (length > 0) { - srs_assert(obj); - srs_assert(obj->msg); - - std::string field_value; - field_value.append(at, (int)length); - - obj->msg->set_request_header(obj->filed_name, field_value); - obj->filed_name = ""; + obj->field_value.append(at, (int)length); } + obj->expect_filed_name = false; - srs_info("Header value: %.*s", (int)length, at); + srs_trace("Header value: %.*s", (int)length, at); return 0; } int SrsHttpParser::on_body(http_parser* parser, const char* at, size_t length) { SrsHttpParser* obj = (SrsHttpParser*)parser->data; + srs_assert(obj); if (length > 0) { - srs_assert(obj); - srs_assert(obj->msg); - - obj->msg->append_body(at, (int)length); + obj->body.append(at, (int)length); } srs_info("Body: %.*s", (int)length, at); diff --git a/trunk/src/app/srs_app_http.hpp b/trunk/src/app/srs_app_http.hpp index c80b89424..eee995a14 100644 --- a/trunk/src/app/srs_app_http.hpp +++ b/trunk/src/app/srs_app_http.hpp @@ -76,7 +76,8 @@ extern int srs_go_http_response_json(ISrsHttpResponseWriter* w, std::string data enum SrsHttpParseState { SrsHttpParseStateInit = 0, SrsHttpParseStateStart, - SrsHttpParseStateComplete + SrsHttpParseStateHeaderComplete, + SrsHttpParseStateMessageComplete }; // A Header represents the key-value pairs in an HTTP header. @@ -154,6 +155,8 @@ public: public: // when chunked mode, // final the request to complete the chunked encoding. + // for no-chunked mode, + // final to send request, for example, content-length is 0. virtual int final_request() = 0; // Header returns the header map that will be sent by WriteHeader. @@ -178,6 +181,39 @@ public: virtual void write_header(int code) = 0; }; +/** +* the reader interface for http response. +*/ +class ISrsHttpResponseReader +{ +public: + ISrsHttpResponseReader(); + virtual ~ISrsHttpResponseReader(); +public: + /** + * read from the response body. + * @param max the max size to read. 0 to ignore. + */ + virtual int read(int max, std::string& data) = 0; +}; + +/** +* for connection response only. +*/ +class ISrsHttpResponseAppender +{ +public: + ISrsHttpResponseAppender(); + virtual ~ISrsHttpResponseAppender(); +public: + /** + * append specified size of bytes data to reader. + * when we read http message from socket, we maybe read header+body, + * so the reader should provides stream cache feature. + */ + virtual int append(char* data, int size) = 0; +}; + // Objects implementing the Handler interface can be // registered to serve a particular path or subtree // in the HTTP server. @@ -366,6 +402,27 @@ public: virtual int send_header(char* data, int size); }; +/** +* response reader use st socket. +*/ +class SrsHttpResponseReader : virtual public ISrsHttpResponseReader + , virtual public ISrsHttpResponseAppender +{ +private: + SrsStSocket* skt; + SrsHttpMessage* owner; + SrsSimpleBuffer* cache; +public: + SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io); + virtual ~SrsHttpResponseReader(); +public: + virtual int read(int max, std::string& data); + virtual int append(char* data, int size); +}; + +// for http header. +typedef std::pair SrsHttpHeaderField; + // A Request represents an HTTP request received by a server // or to be sent by a client. // @@ -387,15 +444,10 @@ private: */ http_parser _header; /** - * body object, in bytes. + * body object, reader object. * @remark, user can get body in string by get_body(). */ - SrsSimpleBuffer* _body; - /** - * parser state - * @remark, user can use is_complete() to determine the state. - */ - SrsHttpParseState _state; + SrsHttpResponseReader* _body; /** * uri parser */ @@ -405,20 +457,17 @@ private: */ char* _http_ts_send_buffer; // http headers - typedef std::pair SrsHttpHeaderField; - std::vector headers; + std::vector _headers; // the query map std::map _query; public: - SrsHttpMessage(); + SrsHttpMessage(SrsStSocket* io); virtual ~SrsHttpMessage(); public: virtual int initialize(); public: virtual char* http_ts_send_buffer(); - virtual void reset(); public: - virtual bool is_complete(); virtual u_int8_t method(); virtual u_int16_t status_code(); virtual std::string method_str(); @@ -432,14 +481,10 @@ public: virtual std::string host(); virtual std::string path(); public: - virtual std::string body(); - virtual char* body_raw(); - virtual int64_t body_size(); + virtual void set(std::string url, http_parser* header, std::string body, std::vector& headers); +public: + virtual int body_read_all(std::string body); virtual int64_t content_length(); - virtual void set_url(std::string url); - virtual void set_state(SrsHttpParseState state); - virtual void set_header(http_parser* header); - virtual void append_body(const char* body, int length); /** * get the param in query string, * for instance, query is "start=100&end=200", @@ -449,7 +494,6 @@ public: virtual int request_header_count(); virtual std::string request_header_key_at(int index); virtual std::string request_header_value_at(int index); - virtual void set_request_header(std::string key, std::string value); virtual std::string get_request_header(std::string name); }; @@ -462,8 +506,16 @@ class SrsHttpParser private: http_parser_settings settings; http_parser parser; - SrsHttpMessage* msg; +private: + // http parse data, reset before parse message. + bool expect_filed_name; std::string filed_name; + std::string field_value; + SrsHttpParseState state; + http_parser header; + std::string url; + std::vector headers; + std::string body; public: SrsHttpParser(); virtual ~SrsHttpParser(); diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index b8a97c9c7..d1dc4c6b3 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -527,13 +527,20 @@ int SrsHttpApi::do_cycle() return ret; } - // if SUCCESS, always NOT-NULL and completed message. + // if SUCCESS, always NOT-NULL. srs_assert(req); - srs_assert(req->is_complete()); // always free it in this scope. SrsAutoFree(SrsHttpMessage, req); + // TODO: FIXME: use the post body. + std::string res; + + // get response body. + if ((ret = req->body_read_all(res)) != ERROR_SUCCESS) { + return ret; + } + // ok, handle http request. SrsHttpResponseWriter writer(&skt); if ((ret = process_request(&writer, req)) != ERROR_SUCCESS) { diff --git a/trunk/src/app/srs_app_http_client.cpp b/trunk/src/app/srs_app_http_client.cpp index aeaebdc1c..298754386 100644 --- a/trunk/src/app/srs_app_http_client.cpp +++ b/trunk/src/app/srs_app_http_client.cpp @@ -35,6 +35,7 @@ using namespace std; #include #include #include +#include // when error, http client sleep for a while and retry. #define SRS_HTTP_CLIENT_SLEEP_US (int64_t)(3*1000*1000LL) @@ -103,18 +104,18 @@ int SrsHttpClient::post(SrsHttpUri* uri, string req, int& status_code, string& r } srs_assert(msg); - srs_assert(msg->is_complete()); + + // always free it in this scope. + SrsAutoFree(SrsHttpMessage, msg); status_code = (int)msg->status_code(); // get response body. - if (msg->body_size() > 0) { - res = msg->body(); + if ((ret = msg->body_read_all(res)) != ERROR_SUCCESS) { + return ret; } srs_info("parse http post response success."); - srs_freep(msg); - return ret; } diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index be8c8259d..407355f5e 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -1194,13 +1194,20 @@ int SrsHttpConn::do_cycle() return ret; } - // if SUCCESS, always NOT-NULL and completed message. + // if SUCCESS, always NOT-NULL. srs_assert(req); - srs_assert(req->is_complete()); // always free it in this scope. SrsAutoFree(SrsHttpMessage, req); + // TODO: FIXME: use the post body. + std::string res; + + // get response body. + if ((ret = req->body_read_all(res)) != ERROR_SUCCESS) { + return ret; + } + // ok, handle http request. SrsHttpResponseWriter writer(&skt); if ((ret = process_request(&writer, req)) != ERROR_SUCCESS) {