diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index a5bf8f910..17d971698 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +using namespace std; + #include #include #include @@ -36,11 +38,12 @@ IConnectionManager::~IConnectionManager() { } -SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c) +SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c, string cip) { id = 0; manager = cm; stfd = c; + ip = cip; disposed = false; expired = false; create_time = srs_get_system_time_ms(); @@ -112,8 +115,6 @@ int SrsConnection::cycle() _srs_context->generate_id(); id = _srs_context->get_id(); - ip = srs_get_peer_ip(st_netfd_fileno(stfd)); - int oret = ret = do_cycle(); // if socket io error, set to closed. diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 52ed3ba89..e931c5f5e 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -110,7 +110,7 @@ protected: */ int64_t create_time; public: - SrsConnection(IConnectionManager* cm, st_netfd_t c); + SrsConnection(IConnectionManager* cm, st_netfd_t c, std::string cip); virtual ~SrsConnection(); // interface IKbpsDelta public: diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index fb26c1053..fd18aa907 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1309,8 +1309,8 @@ int SrsGoApiError::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) return srs_api_response_code(w, r, 100); } -SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) - : SrsConnection(cm, fd) +SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip) + : SrsConnection(cm, fd, cip) { mux = m; parser = new SrsHttpParser(); diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index a963e6b48..b8cd63314 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -215,7 +215,7 @@ private: bool crossdomain_required; bool crossdomain_enabled; public: - SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m); + SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, std::string cip); virtual ~SrsHttpApi(); // interface IKbpsDelta public: diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 14ebbb595..5d506f8d6 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -1063,8 +1063,8 @@ int SrsHttpParser::on_body(http_parser* parser, const char* at, size_t length) return 0; } -SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m) - : SrsConnection(cm, fd) +SrsHttpConn::SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, string cip) + : SrsConnection(cm, fd, cip) { parser = new SrsHttpParser(); http_mux = m; @@ -1187,8 +1187,8 @@ int SrsHttpConn::on_disconnect(SrsRequest* req) return ret; } -SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m) - : SrsHttpConn(cm, fd, m) +SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, string cip) + : SrsHttpConn(cm, fd, m, cip) { } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index f3e8f7b65..f0b0aa93b 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -350,7 +350,7 @@ private: SrsHttpParser* parser; ISrsHttpServeMux* http_mux; public: - SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m); + SrsHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, std::string cip); virtual ~SrsHttpConn(); // interface IKbpsDelta public: @@ -381,7 +381,7 @@ private: class SrsResponseOnlyHttpConn : public SrsHttpConn { public: - SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m); + SrsResponseOnlyHttpConn(IConnectionManager* cm, st_netfd_t fd, ISrsHttpServeMux* m, std::string cip); virtual ~SrsResponseOnlyHttpConn(); public: virtual int on_got_http_message(ISrsHttpMessage* msg); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index e516d2901..92f4dc13c 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -312,11 +312,11 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) } #ifdef SRS_AUTO_KAFKA -SrsRtmpConn::SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c) +SrsRtmpConn::SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c, string cip) #else -SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c) +SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip) #endif - : SrsConnection(svr, c) + : SrsConnection(svr, c, cip) { server = svr; #ifdef SRS_AUTO_KAFKA @@ -373,7 +373,7 @@ int SrsRtmpConn::do_cycle() { int ret = ERROR_SUCCESS; - srs_trace("RTMP client ip=%s", ip.c_str()); + srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), st_netfd_fileno(stfd)); // notify kafka cluster. #ifdef SRS_AUTO_KAFKA diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 73e528104..3dc13187d 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -144,9 +144,9 @@ private: #endif public: #ifdef SRS_AUTO_KAFKA - SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c); + SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c, std::string cip); #else - SrsRtmpConn(SrsServer* svr, st_netfd_t c); + SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip); #endif virtual ~SrsRtmpConn(); public: diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 96a1f08c1..37e5cbdb4 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1241,68 +1241,15 @@ void SrsServer::resample_kbps() srs_update_rtmp_server((int)conns.size(), kbps); } -int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) +int SrsServer::accept_client(SrsListenerType type, st_netfd_t stfd) { int ret = ERROR_SUCCESS; - int fd = st_netfd_fileno(client_stfd); - - // check connection limitation. - int max_connections = _srs_config->get_max_connections(); - if (handler && (ret = handler->on_accept_client(max_connections, (int)conns.size()) != ERROR_SUCCESS)) { - srs_error("handle accept client failed, drop client: " - "clients=%d, max=%d, fd=%d. ret=%d", (int)conns.size(), max_connections, fd, ret); - srs_close_stfd(client_stfd); + SrsConnection* conn = fd2conn(type, stfd); + if (conn == NULL) { + srs_close_stfd(stfd); return ERROR_SUCCESS; } - if ((int)conns.size() >= max_connections) { - srs_error("exceed the max connections, drop client: " - "clients=%d, max=%d, fd=%d", (int)conns.size(), max_connections, fd); - srs_close_stfd(client_stfd); - return ERROR_SUCCESS; - } - - // avoid fd leak when fork. - // @see https://github.com/ossrs/srs/issues/518 - if (true) { - int val; - if ((val = fcntl(fd, F_GETFD, 0)) < 0) { - ret = ERROR_SYSTEM_PID_GET_FILE_INFO; - srs_error("fnctl F_GETFD error! fd=%d. ret=%#x", fd, ret); - srs_close_stfd(client_stfd); - return ret; - } - val |= FD_CLOEXEC; - if (fcntl(fd, F_SETFD, val) < 0) { - ret = ERROR_SYSTEM_PID_SET_FILE_INFO; - srs_error("fcntl F_SETFD error! fd=%d ret=%#x", fd, ret); - srs_close_stfd(client_stfd); - return ret; - } - } - - SrsConnection* conn = NULL; - if (type == SrsListenerRtmpStream) { - conn = new SrsRtmpConn(this, kafka, client_stfd); - } else if (type == SrsListenerHttpApi) { -#ifdef SRS_AUTO_HTTP_API - conn = new SrsHttpApi(this, client_stfd, http_api_mux); -#else - srs_warn("close http client for server not support http-api"); - srs_close_stfd(client_stfd); - return ret; -#endif - } else if (type == SrsListenerHttpStream) { -#ifdef SRS_AUTO_HTTP_SERVER - conn = new SrsResponseOnlyHttpConn(this, client_stfd, http_server); -#else - srs_warn("close http client for server not support http-server"); - srs_close_stfd(client_stfd); - return ret; -#endif - } else { - // TODO: FIXME: handler others - } srs_assert(conn); // directly enqueue, the cycle thread will remove the client. @@ -1314,13 +1261,80 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) if ((ret = conn->start()) != ERROR_SUCCESS) { return ret; } - srs_verbose("conn started success."); - srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret); return ret; } +SrsConnection* SrsServer::fd2conn(SrsListenerType type, st_netfd_t stfd) +{ + int ret = ERROR_SUCCESS; + + int fd = st_netfd_fileno(stfd); + string ip = srs_get_peer_ip(fd); + + // for some keep alive application, for example, the keepalived, + // will send some tcp packet which we cann't got the ip, + // we just ignore it. + if (ip.empty()) { + srs_info("ignore empty ip client, fd=%d.", fd); + return NULL; + } + + // check connection limitation. + int max_connections = _srs_config->get_max_connections(); + if (handler && (ret = handler->on_accept_client(max_connections, (int)conns.size()) != ERROR_SUCCESS)) { + srs_error("handle accept client failed, drop client: clients=%d, max=%d, fd=%d. ret=%d", (int)conns.size(), max_connections, fd, ret); + return NULL; + } + if ((int)conns.size() >= max_connections) { + srs_error("exceed the max connections, drop client: clients=%d, max=%d, fd=%d", (int)conns.size(), max_connections, fd); + return NULL; + } + + // avoid fd leak when fork. + // @see https://github.com/ossrs/srs/issues/518 + if (true) { + int val; + if ((val = fcntl(fd, F_GETFD, 0)) < 0) { + ret = ERROR_SYSTEM_PID_GET_FILE_INFO; + srs_error("fnctl F_GETFD error! fd=%d. ret=%#x", fd, ret); + return NULL; + } + val |= FD_CLOEXEC; + if (fcntl(fd, F_SETFD, val) < 0) { + ret = ERROR_SYSTEM_PID_SET_FILE_INFO; + srs_error("fcntl F_SETFD error! fd=%d ret=%#x", fd, ret); + return NULL; + } + } + + SrsConnection* conn = NULL; + + if (type == SrsListenerRtmpStream) { + conn = new SrsRtmpConn(this, kafka, stfd, ip); + } else if (type == SrsListenerHttpApi) { +#ifdef SRS_AUTO_HTTP_API + conn = new SrsHttpApi(this, stfd, http_api_mux, ip); +#else + srs_warn("close http client for server not support http-api"); + srs_close_stfd(stfd); + return ret; +#endif + } else if (type == SrsListenerHttpStream) { +#ifdef SRS_AUTO_HTTP_SERVER + conn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip); +#else + srs_warn("close http client for server not support http-server"); + return NULL; +#endif + } else { + // TODO: FIXME: handler others + } + + return conn; +} + void SrsServer::remove(SrsConnection* conn) { std::vector::iterator it = std::find(conns.begin(), conns.end(), conn); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 3bb4ff984..00f450019 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -365,9 +365,11 @@ public: * when listener got a fd, notice server to accept it. * @param type, the client type, used to create concrete connection, * for instance RTMP connection to serve client. - * @param client_stfd, the client fd in st boxed, the underlayer fd. + * @param stfd, the client fd in st boxed, the underlayer fd. */ - virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd); + virtual int accept_client(SrsListenerType type, st_netfd_t stfd); +private: + virtual SrsConnection* fd2conn(SrsListenerType type, st_netfd_t stfd); // IConnectionManager public: /**