diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index a4e6cc50d..25419f4b2 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -149,10 +149,14 @@ SrsDynamicHttpConn::SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, conn = new SrsHttpConn(this, fd, m, cip, cport); ip = cip; port = cport; + + _srs_config->subscribe(this); } SrsDynamicHttpConn::~SrsDynamicHttpConn() { + _srs_config->unsubscribe(this); + srs_freep(conn); srs_freep(sdk); srs_freep(pprint); @@ -249,6 +253,12 @@ srs_error_t SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecod return err; } +srs_error_t SrsDynamicHttpConn::on_reload_http_stream_crossdomain() +{ + bool v = _srs_config->get_http_stream_crossdomain(); + return conn->set_crossdomain_enabled(v); +} + srs_error_t SrsDynamicHttpConn::on_http_message(ISrsHttpMessage* msg) { return srs_success; @@ -278,6 +288,13 @@ const SrsContextId& SrsDynamicHttpConn::get_id() srs_error_t SrsDynamicHttpConn::start() { + srs_error_t err = srs_success; + + bool v = _srs_config->get_http_stream_crossdomain(); + if ((err = conn->set_crossdomain_enabled(v)) != srs_success) { + return srs_error_wrap(err, "set cors=%d", v); + } + return conn->start(); } diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index 226b7deca..278464171 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -73,6 +73,7 @@ public: // The dynamic http connection, never drop the body. class SrsDynamicHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsHttpConnOwner + , virtual public ISrsReloadHandler { private: // The manager object to manage the connection. @@ -93,6 +94,9 @@ public: private: virtual srs_error_t do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec); // Extract APIs from SrsTcpConnection. +// Interface ISrsReloadHandler +public: + virtual srs_error_t on_reload_http_stream_crossdomain(); // Interface ISrsHttpConnOwner. public: virtual srs_error_t on_http_message(ISrsHttpMessage* msg); diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 85dbce4f0..e972dbcdb 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1674,22 +1674,11 @@ srs_error_t SrsGoApiTcmalloc::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess } #endif -SrsHttpApi::SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int cport) +SrsHttpApi::SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) { - mux = m; - cors = new SrsHttpCorsMux(); - parser = new SrsHttpParser(); - - skt = new SrsTcpConnection(fd); manager = cm; - ip = cip; - port = cport; - create_time = srsu2ms(srs_get_system_time()); - clk = new SrsWallClock(); - kbps = new SrsKbps(clk); - kbps->set_io(skt, skt); - trd = new SrsSTCoroutine("api", this); - + conn = new SrsHttpConn(this, fd, m, cip, port); + _srs_config->subscribe(this); } @@ -1697,15 +1686,30 @@ SrsHttpApi::~SrsHttpApi() { _srs_config->unsubscribe(this); - trd->interrupt(); - srs_freep(trd); + srs_freep(conn); +} - srs_freep(parser); - srs_freep(cors); +srs_error_t SrsHttpApi::on_http_message(ISrsHttpMessage* req) +{ + srs_error_t err = srs_success; - srs_freep(kbps); - srs_freep(clk); - srs_freep(skt); + // read all rest bytes in request body. + char buf[SRS_HTTP_READ_CACHE_BYTES]; + ISrsHttpResponseReader* br = req->body_reader(); + while (!br->eof()) { + if ((err = br->read(buf, SRS_HTTP_READ_CACHE_BYTES, NULL)) != srs_success) { + return srs_error_wrap(err, "read response"); + } + } + + return err; +} + +void SrsHttpApi::on_conn_done() +{ + // Because we use manager to manage this object, + // not the http connection object, so we must remove it here. + manager->remove(this); } std::string SrsHttpApi::desc() @@ -1715,168 +1719,38 @@ std::string SrsHttpApi::desc() void SrsHttpApi::remark(int64_t* in, int64_t* out) { - kbps->remark(in, out); -} - -srs_error_t SrsHttpApi::do_cycle() -{ - srs_error_t err = srs_success; - - // Create context for API. - _srs_context->set_id(_srs_context->generate_id()); - - srs_trace("API server client, ip=%s:%d", ip.c_str(), port); - - // initialize parser - if ((err = parser->initialize(HTTP_REQUEST, true)) != srs_success) { - return srs_error_wrap(err, "init parser"); - } - - // set the recv timeout, for some clients never disconnect the connection. - // @see https://github.com/ossrs/srs/issues/398 - skt->set_recv_timeout(SRS_HTTP_RECV_TIMEOUT); - - // initialize the cors, which will proxy to mux. - bool crossdomain_enabled = _srs_config->get_http_api_crossdomain(); - if ((err = cors->initialize(mux, crossdomain_enabled)) != srs_success) { - return srs_error_wrap(err, "init cors"); - } - - // process http messages. - while ((err = trd->pull()) == srs_success) { - ISrsHttpMessage* req = NULL; - - // get a http message - if ((err = parser->parse_message(skt, &req)) != srs_success) { - // For HTTP timeout, we think it's ok. - if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) { - srs_freep(err); - return srs_error_wrap(srs_success, "http api timeout"); - } - return srs_error_wrap(err, "parse message"); - } - - // if SUCCESS, always NOT-NULL. - // always free it in this scope. - srs_assert(req); - SrsAutoFree(ISrsHttpMessage, req); - - // Attach owner connection to message. - SrsHttpMessage* hreq = (SrsHttpMessage*)req; - hreq->set_connection(this); - - // ok, handle http request. - SrsHttpResponseWriter writer(skt); - if ((err = process_request(&writer, req)) != srs_success) { - return srs_error_wrap(err, "process request"); - } - - // read all rest bytes in request body. - char buf[SRS_HTTP_READ_CACHE_BYTES]; - ISrsHttpResponseReader* br = req->body_reader(); - while (!br->eof()) { - if ((err = br->read(buf, SRS_HTTP_READ_CACHE_BYTES, NULL)) != srs_success) { - return srs_error_wrap(err, "read response"); - } - } - - // donot keep alive, disconnect it. - // @see https://github.com/ossrs/srs/issues/399 - if (!req->is_keep_alive()) { - break; - } - } - - return err; -} - -srs_error_t SrsHttpApi::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) -{ - srs_error_t err = srs_success; - - SrsHttpMessage* hm = dynamic_cast(r); - srs_assert(hm); - - srs_trace("HTTP API %s:%d %s %s, content-length=%" PRId64 ", chunked=%d", ip.c_str(), port, r->method_str().c_str(), - r->url().c_str(), r->content_length(), hm->is_chunked()); - - // use cors server mux to serve http request, which will proxy to mux. - if ((err = cors->serve_http(w, r)) != srs_success) { - return srs_error_wrap(err, "mux serve"); - } - - return err; + conn->remark(in, out); } srs_error_t SrsHttpApi::on_reload_http_api_crossdomain() { - srs_error_t err = srs_success; - - bool crossdomain_enabled = _srs_config->get_http_api_crossdomain(); - if ((err = cors->initialize(mux, crossdomain_enabled)) != srs_success) { - return srs_error_wrap(err, "reload"); - } - - return err; + bool v = _srs_config->get_http_api_crossdomain(); + return conn->set_crossdomain_enabled(v); } srs_error_t SrsHttpApi::start() { srs_error_t err = srs_success; - if ((err = skt->initialize()) != srs_success) { - return srs_error_wrap(err, "init socket"); + bool v = _srs_config->get_http_api_crossdomain(); + if ((err = conn->set_crossdomain_enabled(v)) != srs_success) { + return srs_error_wrap(err, "set cors=%d", v); } - if ((err = trd->start()) != srs_success) { - return srs_error_wrap(err, "coroutine"); + if ((err = conn->set_jsonp(true)) != srs_success) { + return srs_error_wrap(err, "set jsonp"); } - return err; -} - -srs_error_t SrsHttpApi::cycle() -{ - srs_error_t err = do_cycle(); - - // Notify manager to remove it. - // Note that we create this object, so we use manager to remove it. - manager->remove(this); - - // success. - if (err == srs_success) { - srs_trace("client finished."); - return err; - } - - // It maybe success with message. - if (srs_error_code(err) == ERROR_SUCCESS) { - srs_trace("client finished%s.", srs_error_summary(err).c_str()); - srs_freep(err); - return err; - } - - // client close peer. - // TODO: FIXME: Only reset the error when client closed it. - if (srs_is_client_gracefully_close(err)) { - srs_warn("client disconnect peer. ret=%d", srs_error_code(err)); - } else if (srs_is_server_gracefully_close(err)) { - srs_warn("server disconnect. ret=%d", srs_error_code(err)); - } else { - srs_error("serve error %s", srs_error_desc(err).c_str()); - } - - srs_freep(err); - return srs_success; + return conn->start(); } string SrsHttpApi::remote_ip() { - return ip; + return conn->remote_ip(); } const SrsContextId& SrsHttpApi::get_id() { - return trd->cid(); + return conn->get_id(); } diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index f4cf06670..ae5d0ac2e 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -26,7 +26,6 @@ #include -class SrsStSocket; class ISrsHttpMessage; class SrsHttpParser; class SrsHttpHandler; @@ -36,6 +35,7 @@ class SrsJsonObject; class SrsSdp; class SrsRequest; class ISrsHttpResponseWriter; +class SrsHttpConn; #include @@ -43,6 +43,7 @@ class ISrsHttpResponseWriter; #include #include #include +#include extern srs_error_t srs_api_response(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string json); extern srs_error_t srs_api_response_code(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, int code); @@ -255,45 +256,26 @@ public: #endif // Handle the HTTP API request. -class SrsHttpApi : virtual public ISrsStartableConneciton, virtual public ISrsReloadHandler - , virtual public ISrsCoroutineHandler +class SrsHttpApi : virtual public ISrsStartableConneciton, virtual public ISrsHttpConnOwner + , virtual public ISrsReloadHandler { private: - SrsHttpParser* parser; - SrsHttpCorsMux* cors; - SrsHttpServeMux* mux; -private: - SrsTcpConnection* skt; - // Each connection start a green thread, - // when thread stop, the connection will be delete by server. - SrsCoroutine* trd; // The manager object to manage the connection. ISrsResourceManager* manager; - // The ip and port of client. - std::string ip; - int port; - // The connection total kbps. - // not only the rtmp or http connection, all type of connection are - // need to statistic the kbps of io. - // The SrsStatistic will use it indirectly to statistic the bytes delta of current connection. - SrsKbps* kbps; - SrsWallClock* clk; - // The create time in milliseconds. - // for current connection to log self create time and calculate the living time. - int64_t create_time; + SrsHttpConn* conn; public: SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port); virtual ~SrsHttpApi(); +// Interface ISrsHttpConnOwner. +public: + virtual srs_error_t on_http_message(ISrsHttpMessage* msg); + virtual void on_conn_done(); // Interface ISrsResource. public: virtual std::string desc(); // Interface ISrsKbpsDelta public: virtual void remark(int64_t* in, int64_t* out); -private: - virtual srs_error_t do_cycle(); -private: - virtual srs_error_t process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); // Interface ISrsReloadHandler public: virtual srs_error_t on_reload_http_api_crossdomain(); @@ -308,12 +290,6 @@ public: // when client cycle thread stop, invoke the on_thread_stop(), which will use server // To remove the client by server->remove(this). virtual srs_error_t start(); -// Interface ISrsOneCycleThreadHandler -public: - // The thread cycle function, - // when serve connection completed, terminate the loop which will terminate the thread, - // thread will invoke the on_thread_stop() when it terminated. - virtual srs_error_t cycle(); // Interface ISrsConnection. public: virtual std::string remote_ip(); diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 594dd00d7..5672a1648 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -82,14 +82,10 @@ SrsHttpConn::SrsHttpConn(ISrsHttpConnOwner* handler, srs_netfd_t fd, ISrsHttpSer kbps = new SrsKbps(clk); kbps->set_io(skt, skt); trd = new SrsSTCoroutine("http", this); - - _srs_config->subscribe(this); } SrsHttpConn::~SrsHttpConn() { - _srs_config->unsubscribe(this); - trd->interrupt(); srs_freep(trd); @@ -115,11 +111,6 @@ srs_error_t SrsHttpConn::do_cycle() { srs_error_t err = srs_success; - // initialize parser - if ((err = parser->initialize(HTTP_REQUEST, false)) != srs_success) { - return srs_error_wrap(err, "init parser for %s", ip.c_str()); - } - // set the recv timeout, for some clients never disconnect the connection. // @see https://github.com/ossrs/srs/issues/398 skt->set_recv_timeout(SRS_HTTP_RECV_TIMEOUT); @@ -127,12 +118,6 @@ srs_error_t SrsHttpConn::do_cycle() SrsRequest* last_req = NULL; SrsAutoFree(SrsRequest, last_req); - // initialize the cors, which will proxy to mux. - bool crossdomain_enabled = _srs_config->get_http_stream_crossdomain(); - if ((err = cors->initialize(http_mux, crossdomain_enabled)) != srs_success) { - return srs_error_wrap(err, "init cors"); - } - // process http messages. for (int req_id = 0; (err = trd->pull()) == srs_success; req_id++) { // Try to receive a message from http. @@ -194,6 +179,30 @@ srs_error_t SrsHttpConn::pull() return trd->pull(); } +srs_error_t SrsHttpConn::set_crossdomain_enabled(bool v) +{ + srs_error_t err = srs_success; + + // initialize the cors, which will proxy to mux. + if ((err = cors->initialize(http_mux, v)) != srs_success) { + return srs_error_wrap(err, "init cors"); + } + + return err; +} + +srs_error_t SrsHttpConn::set_jsonp(bool v) +{ + srs_error_t err = srs_success; + + // initialize parser + if ((err = parser->initialize(HTTP_REQUEST, v)) != srs_success) { + return srs_error_wrap(err, "init parser for %s", ip.c_str()); + } + + return err; +} + srs_error_t SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { srs_error_t err = srs_success; @@ -215,19 +224,6 @@ srs_error_t SrsHttpConn::on_disconnect(SrsRequest* req) return srs_success; } -srs_error_t SrsHttpConn::on_reload_http_stream_crossdomain() -{ - srs_error_t err = srs_success; - - // initialize the cors, which will proxy to mux. - bool crossdomain_enabled = _srs_config->get_http_stream_crossdomain(); - if ((err = cors->initialize(http_mux, crossdomain_enabled)) != srs_success) { - return srs_error_wrap(err, "init mux"); - } - - return err; -} - srs_error_t SrsHttpConn::set_tcp_nodelay(bool v) { return skt->set_tcp_nodelay(v); @@ -307,10 +303,14 @@ SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_ne manager = cm; conn = new SrsHttpConn(this, fd, m, cip, port); stfd = fd; + + _srs_config->subscribe(this); } SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn() { + _srs_config->unsubscribe(this); + srs_freep(conn); } @@ -350,6 +350,12 @@ srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq) return err; } +srs_error_t SrsResponseOnlyHttpConn::on_reload_http_stream_crossdomain() +{ + bool v = _srs_config->get_http_stream_crossdomain(); + return conn->set_crossdomain_enabled(v); +} + srs_error_t SrsResponseOnlyHttpConn::on_http_message(ISrsHttpMessage* msg) { srs_error_t err = srs_success; @@ -407,6 +413,13 @@ const SrsContextId& SrsResponseOnlyHttpConn::get_id() srs_error_t SrsResponseOnlyHttpConn::start() { + srs_error_t err = srs_success; + + bool v = _srs_config->get_http_stream_crossdomain(); + if ((err = conn->set_crossdomain_enabled(v)) != srs_success) { + return srs_error_wrap(err, "set cors=%d", v); + } + return conn->start(); } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 497e0d817..e487772df 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -70,8 +70,8 @@ public: }; // The http connection which request the static or stream content. -class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsReloadHandler - , virtual public ISrsCoroutineHandler, virtual public ISrsExpire +class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsCoroutineHandler + , virtual public ISrsExpire { protected: SrsHttpParser* parser; @@ -112,15 +112,16 @@ public: virtual ISrsHttpConnOwner* handler(); // Whether the connection coroutine is error or terminated. virtual srs_error_t pull(); + // Whether enable the CORS(cross-domain). + virtual srs_error_t set_crossdomain_enabled(bool v); + // Whether enable the JSONP. + virtual srs_error_t set_jsonp(bool v); private: virtual srs_error_t process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); // When the connection disconnect, call this method. // e.g. log msg of connection and report to other system. // @param request: request which is converted by the last http message. virtual srs_error_t on_disconnect(SrsRequest* req); -// Interface ISrsReloadHandler -public: - virtual srs_error_t on_reload_http_stream_crossdomain(); // Extract APIs from SrsTcpConnection. public: // Set socket option TCP_NODELAY. @@ -144,6 +145,7 @@ public: // Drop body of request, only process the response. class SrsResponseOnlyHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsHttpConnOwner + , virtual public ISrsReloadHandler { private: // The manager object to manage the connection. @@ -160,6 +162,9 @@ public: // @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427 // @remark Should only used in HTTP-FLV streaming connection. virtual srs_error_t pop_message(ISrsHttpMessage** preq); +// Interface ISrsReloadHandler +public: + virtual srs_error_t on_reload_http_stream_crossdomain(); // Interface ISrsHttpConnOwner. public: virtual srs_error_t on_http_message(ISrsHttpMessage* msg);