diff --git a/trunk/src/app/srs_app_http.cpp b/trunk/src/app/srs_app_http.cpp index bb858032e..2be383c0d 100644 --- a/trunk/src/app/srs_app_http.cpp +++ b/trunk/src/app/srs_app_http.cpp @@ -53,6 +53,9 @@ using namespace std; #define SRS_CONSTS_HTTP_PUT HTTP_PUT #define SRS_CONSTS_HTTP_DELETE HTTP_DELETE +// for ead all of http body, read each time. +#define SRS_HTTP_READ_CACHE_BYTES 4096 + #define SRS_HTTP_DEFAULT_PAGE "index.html" int srs_go_http_response_json(ISrsHttpResponseWriter* w, string data) @@ -889,7 +892,8 @@ SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* i skt = io; owner = msg; is_eof = false; - nb_read = 0; + nb_total_read = 0; + nb_left_chunk = 0; buffer = NULL; } @@ -901,6 +905,8 @@ int SrsHttpResponseReader::initialize(SrsFastBuffer* body) { int ret = ERROR_SUCCESS; + nb_left_chunk = 0; + nb_total_read = 0; buffer = body; return ret; @@ -911,7 +917,7 @@ bool SrsHttpResponseReader::eof() return is_eof; } -int SrsHttpResponseReader::read(std::string& data) +int SrsHttpResponseReader::read(char* data, int nb_data, int* nb_read) { int ret = ERROR_SUCCESS; @@ -923,95 +929,115 @@ int SrsHttpResponseReader::read(std::string& data) // chunked encoding. if (owner->is_chunked()) { - return read_chunked(data); + return read_chunked(data, nb_data, nb_read); } // read by specified content-length - int max = (int)owner->content_length() - (int)nb_read; + int max = (int)owner->content_length() - (int)nb_total_read; if (max <= 0) { is_eof = true; return ret; } - return read_specified(max, data); + + // change the max to read. + nb_data = srs_min(nb_data, max); + return read_specified(data, nb_data, nb_read); } -int SrsHttpResponseReader::read_chunked(std::string& data) +int SrsHttpResponseReader::read_chunked(char* data, int nb_data, int* nb_read) { int ret = ERROR_SUCCESS; + // when no bytes left in chunk, // parse the chunk length first. - char* at = NULL; - int length = 0; - while (!at) { - // find the CRLF of chunk header end. - char* start = buffer->bytes(); - char* end = start + buffer->size(); - for (char* p = start; p < end - 1; p++) { - if (p[0] == SRS_HTTP_CR && p[1] == SRS_HTTP_LF) { - // invalid chunk, ignore. - if (p == start) { - ret = ERROR_HTTP_INVALID_CHUNK_HEADER; - srs_error("chunk header start with CRLF. ret=%d", ret); - return ret; + if (nb_left_chunk <= 0) { + char* at = NULL; + int length = 0; + while (!at) { + // find the CRLF of chunk header end. + char* start = buffer->bytes(); + char* end = start + buffer->size(); + for (char* p = start; p < end - 1; p++) { + if (p[0] == SRS_HTTP_CR && p[1] == SRS_HTTP_LF) { + // invalid chunk, ignore. + if (p == start) { + ret = ERROR_HTTP_INVALID_CHUNK_HEADER; + srs_error("chunk header start with CRLF. ret=%d", ret); + return ret; + } + length = (int)(p - start + 2); + at = buffer->read_slice(length); + break; } - length = (int)(p - start + 2); - at = buffer->read_slice(length); + } + + // got at, ok. + if (at) { break; } - } - - // got at, ok. - if (at) { - break; - } - - // when empty, only grow 1bytes, but the buffer will cache more. - if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("read body from server failed. ret=%d", ret); + + // when empty, only grow 1bytes, but the buffer will cache more. + if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("read body from server failed. ret=%d", ret); + } + return ret; } + } + srs_assert(length >= 3); + + // it's ok to set the pos and pos+1 to NULL. + at[length - 1] = 0; + at[length - 2] = 0; + + // size is the bytes size, excludes the chunk header and end CRLF. + int ilength = (int)::strtol(at, NULL, 16); + if (ilength < 0) { + ret = ERROR_HTTP_INVALID_CHUNK_HEADER; + srs_error("chunk header negative, length=%d. ret=%d", ilength, ret); return ret; } + + // all bytes in chunk is left now. + nb_left_chunk = ilength; } - srs_assert(length >= 3); - // it's ok to set the pos and pos+1 to NULL. - at[length - 1] = 0; - at[length - 2] = 0; + // left bytes in chunk, read some. + srs_assert(nb_left_chunk); - // size is the bytes size, excludes the chunk header and end CRLF. - int ilength = (int)::strtol(at, NULL, 16); - if (ilength < 0) { - ret = ERROR_HTTP_INVALID_CHUNK_HEADER; - srs_error("chunk header negative, length=%d. ret=%d", ilength, ret); + int nb_bytes = srs_min(nb_left_chunk, nb_data); + ret = read_specified(data, nb_bytes, &nb_bytes); + + // the nb_bytes used for output already read size of bytes. + if (nb_read) { + *nb_read = nb_bytes; + } + nb_left_chunk -= nb_bytes; + + // error or still left bytes in chunk, ignore and read in future. + if (nb_left_chunk > 0 || (ret != ERROR_SUCCESS)) { return ret; } - - // when empty, only grow 1bytes, but the buffer will cache more. - if ((ret = buffer->grow(skt, ilength + 2)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("read body from server failed. ret=%d", ret); - } - return ret; - } - srs_info("http: read %d chunk", ilength); + srs_info("http: read %d bytes of chunk", nb_bytes); // read payload when length specifies some payload. - if (ilength <= 0) { + if (nb_left_chunk <= 0) { is_eof = true; - } else { - srs_assert(ilength); - data.append(buffer->read_slice(ilength), ilength); - nb_read += ilength; } // the CRLF of chunk payload end. + if ((ret = buffer->grow(skt, 2)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("read EOF of chunk from server failed. ret=%d", ret); + } + return ret; + } buffer->read_slice(2); return ret; } -int SrsHttpResponseReader::read_specified(int max, std::string& data) +int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read) { int ret = ERROR_SUCCESS; @@ -1025,14 +1051,21 @@ int SrsHttpResponseReader::read_specified(int max, std::string& data) } } - int nb_bytes = srs_min(max, buffer->size()); + int nb_bytes = srs_min(nb_data, buffer->size()); + // read data to buffer. srs_assert(nb_bytes); - data.append(buffer->read_slice(nb_bytes), nb_bytes); - nb_read += nb_bytes; + char* p = buffer->read_slice(nb_bytes); + memcpy(data, p, nb_bytes); + if (nb_read) { + *nb_read = nb_bytes; + } + + // increase the total read to determine whether EOF. + nb_total_read += nb_bytes; // when read completed, eof. - if (nb_read >= (int)owner->content_length()) { + if (nb_total_read >= (int)owner->content_length()) { is_eof = true; } @@ -1223,11 +1256,19 @@ int SrsHttpMessage::body_read_all(string& body) { int ret = ERROR_SUCCESS; + // cache to read. + char* buf = new char[SRS_HTTP_READ_CACHE_BYTES]; + SrsAutoFree(char, buf); + // whatever, read util EOF. while (!_body->eof()) { - if ((ret = _body->read(body)) != ERROR_SUCCESS) { + int nb_read = 0; + if ((ret = _body->read(buf, SRS_HTTP_READ_CACHE_BYTES, &nb_read)) != ERROR_SUCCESS) { return ret; } + + srs_assert (nb_read > 0); + body.append(buf, nb_read); } return ret; diff --git a/trunk/src/app/srs_app_http.hpp b/trunk/src/app/srs_app_http.hpp index bc524666e..76cf602da 100644 --- a/trunk/src/app/srs_app_http.hpp +++ b/trunk/src/app/srs_app_http.hpp @@ -196,10 +196,13 @@ public: */ virtual bool eof() = 0; /** - * read from the response body. - * @remark when eof(), return error. - */ - virtual int read(std::string& data) = 0; + * read from the response body. + * @param data, the buffer to read data buffer to. + * @param nb_data, the max size of data buffer. + * @param nb_read, the actual read size of bytes. NULL to ignore. + * @remark when eof(), return error. + */ + virtual int read(char* data, int nb_data, int* nb_read) = 0; }; // Objects implementing the Handler interface can be @@ -431,7 +434,10 @@ private: SrsHttpMessage* owner; SrsFastBuffer* buffer; bool is_eof; - int64_t nb_read; + // the left bytes in chunk. + int nb_left_chunk; + // already read total bytes. + int64_t nb_total_read; public: SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io); virtual ~SrsHttpResponseReader(); @@ -443,10 +449,10 @@ public: // interface ISrsHttpResponseReader public: virtual bool eof(); - virtual int read(std::string& data); + virtual int read(char* data, int nb_data, int* nb_read); private: - virtual int read_chunked(std::string& data); - virtual int read_specified(int max, std::string& data); + virtual int read_chunked(char* data, int nb_data, int* nb_read); + virtual int read_specified(char* data, int nb_data, int* nb_read); }; // for http header. diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 7824a6978..c262ba07f 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -363,17 +363,16 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts } SrsAutoFree(SrsHttpMessage, msg); + int nb_read = 0; ISrsHttpResponseReader* br = msg->body_reader(); - while (!br->eof()) { - std::string data; - // for notify, only read some data. - ret = br->read(data); - break; + if (!br->eof()) { + char buf[64]; // only read a little of bytes of ts. + ret = br->read(buf, 64, &nb_read); } int spenttime = (int)(srs_update_system_time_ms() - starttime); - srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, spent=%dms, ret=%d", - client_id, url.c_str(), msg->status_code(), spenttime, ret); + srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, spent=%dms, read=%dB, ret=%d", + client_id, url.c_str(), msg->status_code(), spenttime, nb_read, ret); // ignore any error for on_hls_notify. ret = ERROR_SUCCESS;