From f516636448cb319bcac3ffb35690465e071fc560 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 29 Dec 2015 18:33:02 +0800 Subject: [PATCH] refine code for hooks and http core. --- trunk/research/librtmp/srs_detect_rtmp.c | 2 +- trunk/src/app/srs_app_config.cpp | 8 ++- trunk/src/app/srs_app_edge.cpp | 8 +-- trunk/src/app/srs_app_http_client.cpp | 84 ++++++++++++++++++------ trunk/src/app/srs_app_http_client.hpp | 38 +++++++---- trunk/src/app/srs_app_http_conn.cpp | 15 ++++- trunk/src/app/srs_app_http_conn.hpp | 1 + trunk/src/app/srs_app_http_hooks.cpp | 4 +- trunk/src/kernel/srs_kernel_consts.hpp | 2 + trunk/src/kernel/srs_kernel_error.hpp | 3 + trunk/src/kernel/srs_kernel_flv.cpp | 14 ++++ trunk/src/kernel/srs_kernel_flv.hpp | 8 +++ 12 files changed, 144 insertions(+), 43 deletions(-) mode change 100755 => 100644 trunk/src/kernel/srs_kernel_error.hpp diff --git a/trunk/research/librtmp/srs_detect_rtmp.c b/trunk/research/librtmp/srs_detect_rtmp.c index c5879f7b8..e5414eb22 100644 --- a/trunk/research/librtmp/srs_detect_rtmp.c +++ b/trunk/research/librtmp/srs_detect_rtmp.c @@ -147,7 +147,7 @@ int main(int argc, char** argv) goto rtmp_destroy; } - if ((timestamp - basetime) > duration * 1000) { + if (timestamp > basetime && (timestamp - basetime) > duration * 1000) { srs_human_trace("duration exceed, terminate."); goto rtmp_destroy; } diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 6ef413b8f..02eaf7aad 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4522,19 +4522,21 @@ bool SrsConfig::get_mix_correct(string vhost) double SrsConfig::get_queue_length(string vhost) { + static double DEFAULT = SRS_PERF_PLAY_QUEUE; + SrsConfDirective* conf = get_vhost(vhost); if (!conf) { - return SRS_PERF_PLAY_QUEUE; + return DEFAULT; } conf = conf->get("play"); if (!conf || conf->arg0().empty()) { - return SRS_PERF_GOP_CACHE; + return DEFAULT; } conf = conf->get("queue_length"); if (!conf || conf->arg0().empty()) { - return SRS_PERF_PLAY_QUEUE; + return DEFAULT; } return ::atoi(conf->arg0().c_str()); diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 77860dfb1..b58a4b869 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -48,16 +48,16 @@ using namespace std; #include // when error, edge ingester sleep for a while and retry. -#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL) +#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL) // when edge timeout, retry next. -#define SRS_EDGE_INGESTER_TIMEOUT_US (int64_t)(3*1000*1000LL) +#define SRS_EDGE_INGESTER_TIMEOUT_US (int64_t)(5*1000*1000LL) // when error, edge ingester sleep for a while and retry. -#define SRS_EDGE_FORWARDER_SLEEP_US (int64_t)(1*1000*1000LL) +#define SRS_EDGE_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL) // when edge timeout, retry next. -#define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(3*1000*1000LL) +#define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(5*1000*1000LL) // when edge error, wait for quit #define SRS_EDGE_FORWARDER_ERROR_US (int64_t)(50*1000LL) diff --git a/trunk/src/app/srs_app_http_client.cpp b/trunk/src/app/srs_app_http_client.cpp index 3f95a648b..3ff352361 100644 --- a/trunk/src/app/srs_app_http_client.cpp +++ b/trunk/src/app/srs_app_http_client.cpp @@ -36,10 +36,12 @@ using namespace std; #include #include #include +#include SrsHttpClient::SrsHttpClient() { transport = new SrsTcpClient(); + kbps = new SrsKbps(); parser = NULL; timeout_us = 0; port = 0; @@ -49,6 +51,7 @@ SrsHttpClient::~SrsHttpClient() { disconnect(); + srs_freep(kbps); srs_freep(transport); srs_freep(parser); } @@ -74,15 +77,37 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us) port = p; timeout_us = t_us; + // ep used for host in header. + string ep = host; + if (port > 0 && port != SRS_CONSTS_HTTP_DEFAULT_PORT) { + ep += ":" + srs_int2str(port); + } + + // set default value for headers. + headers["Host"] = ep; + headers["Connection"] = "Keep-Alive"; + headers["User-Agent"] = RTMP_SIG_SRS_SERVER; + headers["Content-Type"] = "application/json"; + return ret; } +SrsHttpClient* SrsHttpClient::set_header(string k, string v) +{ + headers[k] = v; + + return this; +} + int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg) { *ppmsg = NULL; int ret = ERROR_SUCCESS; + // always set the content length. + headers["Content-Length"] = srs_int2str(req.length()); + if ((ret = connect()) != ERROR_SUCCESS) { srs_warn("http connect server failed. ret=%d", ret); return ret; @@ -91,15 +116,13 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg) // send POST request to uri // POST %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s std::stringstream ss; - ss << "POST " << path << " " - << "HTTP/1.1" << SRS_HTTP_CRLF - << "Host: " << host << SRS_HTTP_CRLF - << "Connection: Keep-Alive" << SRS_HTTP_CRLF - << "Content-Length: " << std::dec << req.length() << SRS_HTTP_CRLF - << "User-Agent: " << RTMP_SIG_SRS_NAME << RTMP_SIG_SRS_VERSION << SRS_HTTP_CRLF - << "Content-Type: application/json" << SRS_HTTP_CRLF - << SRS_HTTP_CRLF - << req; + ss << "POST " << path << " " << "HTTP/1.1" << SRS_HTTP_CRLF; + for (map::iterator it = headers.begin(); it != headers.end(); ++it) { + string key = it->first; + string value = it->second; + ss << key << ": " << value << SRS_HTTP_CRLF; + } + ss << SRS_HTTP_CRLF << req; std::string data = ss.str(); if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { @@ -123,11 +146,14 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg) return ret; } -int SrsHttpClient::get(string path, std::string req, ISrsHttpMessage** ppmsg) +int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg) { *ppmsg = NULL; int ret = ERROR_SUCCESS; + + // always set the content length. + headers["Content-Length"] = srs_int2str(req.length()); if ((ret = connect()) != ERROR_SUCCESS) { srs_warn("http connect server failed. ret=%d", ret); @@ -137,15 +163,13 @@ int SrsHttpClient::get(string path, std::string req, ISrsHttpMessage** ppmsg) // send POST request to uri // GET %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s std::stringstream ss; - ss << "GET " << path << " " - << "HTTP/1.1" << SRS_HTTP_CRLF - << "Host: " << host << SRS_HTTP_CRLF - << "Connection: Keep-Alive" << SRS_HTTP_CRLF - << "Content-Length: " << std::dec << req.length() << SRS_HTTP_CRLF - << "User-Agent: " << RTMP_SIG_SRS_NAME << RTMP_SIG_SRS_VERSION << SRS_HTTP_CRLF - << "Content-Type: application/json" << SRS_HTTP_CRLF - << SRS_HTTP_CRLF - << req; + ss << "GET " << path << " " << "HTTP/1.1" << SRS_HTTP_CRLF; + for (map::iterator it = headers.begin(); it != headers.end(); ++it) { + string key = it->first; + string value = it->second; + ss << key << ": " << value << SRS_HTTP_CRLF; + } + ss << SRS_HTTP_CRLF << req; std::string data = ss.str(); if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { @@ -169,8 +193,28 @@ int SrsHttpClient::get(string path, std::string req, ISrsHttpMessage** ppmsg) return ret; } +void SrsHttpClient::set_recv_timeout(int64_t timeout) +{ + transport->set_recv_timeout(timeout); +} + +void SrsHttpClient::kbps_sample(const char* label, int64_t age) +{ + kbps->sample(); + + int sr = kbps->get_send_kbps(); + int sr30s = kbps->get_send_kbps_30s(); + int sr5m = kbps->get_send_kbps_5m(); + int rr = kbps->get_recv_kbps(); + int rr30s = kbps->get_recv_kbps_30s(); + int rr5m = kbps->get_recv_kbps_5m(); + + srs_trace("<- %s time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", label, age, sr, sr30s, sr5m, rr, rr30s, rr5m); +} + void SrsHttpClient::disconnect() { + kbps->set_io(NULL, NULL); transport->close(); } @@ -196,6 +240,8 @@ int SrsHttpClient::connect() transport->set_recv_timeout(timeout_us); transport->set_send_timeout(timeout_us); + kbps->set_io(transport, transport); + return ret; } diff --git a/trunk/src/app/srs_app_http_client.hpp b/trunk/src/app/srs_app_http_client.hpp index 6576cbbf9..bb8b63001 100644 --- a/trunk/src/app/srs_app_http_client.hpp +++ b/trunk/src/app/srs_app_http_client.hpp @@ -30,6 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include #ifdef SRS_AUTO_HTTP_CORE @@ -39,6 +40,7 @@ class SrsHttpUri; class SrsHttpParser; class ISrsHttpMessage; class SrsStSocket; +class SrsKbps; // the default timeout for http client. #define SRS_HTTP_CLIENT_TIMEOUT_US (int64_t)(30*1000*1000LL) @@ -51,6 +53,8 @@ class SrsHttpClient private: SrsTcpClient* transport; SrsHttpParser* parser; + std::map headers; + SrsKbps* kbps; private: int64_t timeout_us; // host name or ip. @@ -61,24 +65,34 @@ public: virtual ~SrsHttpClient(); public: /** - * initialize the client, connect to host and port. - */ + * initialize the client, connect to host and port. + * @remark we will set default values in headers, which can be override by set_header. + */ virtual int initialize(std::string h, int p, int64_t t_us = SRS_HTTP_CLIENT_TIMEOUT_US); + /** + * set the header[k]=v and return the client itself. + */ + virtual SrsHttpClient* set_header(std::string k, std::string v); public: /** - * to post data to the uri. - * @param the path to request on. - * @param req the data post to uri. empty string to ignore. - * @param ppmsg output the http message to read the response. - */ + * to post data to the uri. + * @param the path to request on. + * @param req the data post to uri. empty string to ignore. + * @param ppmsg output the http message to read the response. + * @remark user must free the ppmsg if not NULL. + */ virtual int post(std::string path, std::string req, ISrsHttpMessage** ppmsg); /** - * to get data from the uri. - * @param the path to request on. - * @param req the data post to uri. empty string to ignore. - * @param ppmsg output the http message to read the response. - */ + * to get data from the uri. + * @param the path to request on. + * @param req the data post to uri. empty string to ignore. + * @param ppmsg output the http message to read the response. + * @remark user must free the ppmsg if not NULL. + */ virtual int get(std::string path, std::string req, ISrsHttpMessage** ppmsg); +public: + virtual void set_recv_timeout(int64_t timeout); + virtual void kbps_sample(const char* label, int64_t age); private: virtual void disconnect(); virtual int connect(); diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 549b29447..c659b80cc 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -344,6 +344,12 @@ int SrsHttpResponseReader::read(char* data, int nb_data, int* nb_read) return ret; } + // for some server, content-length is -1, while not chunked, directly read + // everything as body. + if (owner->content_length() == -1 && !owner->is_chunked()) { + return read_specified(data, nb_data, nb_read); + } + // chunked encoding. if (owner->is_chunked()) { return read_chunked(data, nb_data, nb_read); @@ -483,8 +489,8 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read) // increase the total read to determine whether EOF. nb_total_read += nb_bytes; - // for not chunked - if (!owner->is_chunked()) { + // for not chunked and specified content length. + if (!owner->is_chunked() && owner->content_length() != -1) { // when read completed, eof. if (nb_total_read >= (int)owner->content_length()) { is_eof = true; @@ -1207,6 +1213,11 @@ int SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg) ISrsHttpResponseReader* br = msg->body_reader(); + // when not specified the content length, ignore. + if (msg->content_length() == -1) { + return ret; + } + // drop all request body. while (!br->eof()) { char body[4096]; diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index f0b0aa93b..30cfc3fe8 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -327,6 +327,7 @@ public: * that is, the *ppmsg always NOT-NULL when return success. * or error and *ppmsg must be NULL. * @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete(). + * @remark user must free the ppmsg if not NULL. */ virtual int parse_message(ISrsProtocolReaderWriter* io, SrsConnection* conn, ISrsHttpMessage** ppmsg); private: diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index b867cd1ad..6fd797be1 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -334,7 +334,7 @@ int SrsHttpHooks::on_hls(int cid, string url, SrsRequest* req, string file, stri obj->set("duration", SrsJsonAny::number(duration)); obj->set("cwd", SrsJsonAny::str(cwd.c_str())); obj->set("file", SrsJsonAny::str(file.c_str())); - obj->set("url", SrsJsonAny::str(url.c_str())); + obj->set("url", SrsJsonAny::str(ts_url.c_str())); obj->set("m3u8", SrsJsonAny::str(m3u8.c_str())); obj->set("m3u8_url", SrsJsonAny::str(m3u8_url.c_str())); obj->set("seq_no", SrsJsonAny::integer(sn)); @@ -453,7 +453,7 @@ int SrsHttpHooks::do_post(SrsHttpClient* hc, std::string url, std::string req, i // ensure the http status is ok. // https://github.com/ossrs/srs/issues/158 - if (code != SRS_CONSTS_HTTP_OK) { + if (code != SRS_CONSTS_HTTP_OK && code != SRS_CONSTS_HTTP_Created) { ret = ERROR_HTTP_STATUS_INVALID; srs_error("invalid response status=%d. ret=%d", code, ret); return ret; diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index f10db6c7d..0caf29d5e 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -213,6 +213,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. /////////////////////////////////////////////////////////// // HTTP consts values /////////////////////////////////////////////////////////// +// the default http port. +#define SRS_CONSTS_HTTP_DEFAULT_PORT 80 // linux path seprator #define SRS_CONSTS_HTTP_PATH_SEP '/' // query string seprator diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp old mode 100755 new mode 100644 index 6fd3c2818..1fcce968e --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -102,6 +102,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_SYSTEM_CONFIG_RAW_DISABLED 1061 #define ERROR_SYSTEM_CONFIG_RAW_NOT_ALLOWED 1062 #define ERROR_SYSTEM_CONFIG_RAW_PARAMS 1063 +#define ERROR_SYSTEM_FILE_NOT_EXISTS 1064 /////////////////////////////////////////////////////// // RTMP protocol error. @@ -234,6 +235,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_RESPONSE_CODE 3064 #define ERROR_RESPONSE_DATA 3065 #define ERROR_REQUEST_DATA 3066 +#define ERROR_EDGE_PORT_INVALID 3067 /////////////////////////////////////////////////////// // HTTP/StreamCaster/KAFKA protocol error. @@ -276,6 +278,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_KAFKA_CODEC_METADATA 4035 #define ERROR_KAFKA_CODEC_MESSAGE 4036 #define ERROR_KAFKA_CODEC_PRODUCER 4037 +#define ERROR_HTTP_302_INVALID 4038 /////////////////////////////////////////////////////// // HTTP API error. diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index 787abee98..892d90a22 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -178,6 +178,20 @@ void SrsCommonMessage::create_payload(int size) #endif } +int SrsCommonMessage::create(SrsMessageHeader* pheader, char* body, int size) +{ + int ret = ERROR_SUCCESS; + + // drop previous payload. + srs_freepa(payload); + + this->header = *pheader; + this->payload = body; + this->size = size; + + return ret; +} + SrsSharedPtrMessage::SrsSharedPtrPayload::SrsSharedPtrPayload() { payload = NULL; diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index e62af7cb2..f7cafbb83 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -289,6 +289,14 @@ public: * alloc the payload to specified size of bytes. */ virtual void create_payload(int size); +public: + /** + * create common message, + * from the header and body. + * @remark user should never free the body. + * @param pheader, the header to copy to the message. NULL to ignore. + */ + virtual int create(SrsMessageHeader* pheader, char* body, int size); }; /**