diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 7a1e014fa..c1467eeee 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -34,7 +34,7 @@ SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd) { server = srs_server; stfd = client_stfd; - connection_id = 0; + // the client thread should reap itself, // so we never use joinable. // TODO: FIXME: maybe other thread need to stop it. @@ -57,7 +57,6 @@ int SrsConnection::cycle() int ret = ERROR_SUCCESS; _srs_context->generate_id(); - connection_id = _srs_context->get_id(); ip = srs_get_peer_ip(st_netfd_fileno(stfd)); ret = do_cycle(); diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index c0eeaf2e4..71376a7aa 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -37,26 +37,78 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class SrsServer; + +/** +* the basic connection of SRS, +* all connections accept from listener must extends from this base class, +* server will add the connection to manager, and delete it when remove. +*/ class SrsConnection : public virtual ISrsThreadHandler, public virtual IKbpsDelta { private: + /** + * each connection start a green thread, + * when thread stop, the connection will be delete by server. + */ SrsThread* pthread; protected: + /** + * the server object to manage the connection. + */ SrsServer* server; + /** + * the underlayer st fd handler. + */ st_netfd_t stfd; - int connection_id; + /** + * the ip of client. + */ std::string ip; public: SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd); virtual ~SrsConnection(); public: + /** + * start the client green thread. + * when server get a client from listener, + * 1. server will create an concrete connection(for instance, RTMP connection), + * 2. then add connection to its connection manager, + * 3. start the client thread by invoke this start() + * when client cycle thread stop, invoke the on_thread_stop(), which will use server + * to remove the client by server->remove(this). + */ virtual int start(); + /** + * the thread cycle function, + * when serve connection completed, terminate the loop which will terminate the thread, + * thread will invoke the on_thread_stop() when it terminated. + */ virtual int cycle(); + /** + * when the thread cycle finished, thread will invoke the on_thread_stop(), + * which will remove self from server, server will remove the connection from manager + * then delete the connection. + */ virtual void on_thread_stop(); public: + /** + * when server to get the kbps of connection, + * it cannot wait the connection terminated then get the kbps, + * it must sample the kbps every some interval, for instance, 9s to sample all connections kbps, + * all connections will extends from IKbpsDelta which provides the bytes delta, + * while the delta must be update by the sample which invoke by the kbps_resample(). + */ virtual void kbps_resample() = 0; protected: + /** + * for concrete connection to do the cycle. + */ virtual int do_cycle() = 0; +private: + /** + * when delete the connection, stop the connection, + * close the underlayer socket, delete the thread. + */ virtual void stop(); }; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index acee59250..de332b8e1 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -94,8 +94,6 @@ SrsRtmpConn::~SrsRtmpConn() { _srs_config->unsubscribe(this); - stop(); - srs_freep(req); srs_freep(res); srs_freep(rtmp); @@ -993,6 +991,7 @@ int SrsRtmpConn::http_hooks_on_connect() return ret; } + int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_connect->args.size(); i++) { std::string url = on_connect->args.at(i); if ((ret = SrsHttpHooks::on_connect(url, connection_id, ip, req)) != ERROR_SUCCESS) { @@ -1016,6 +1015,7 @@ void SrsRtmpConn::http_hooks_on_close() return; } + int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_close->args.size(); i++) { std::string url = on_close->args.at(i); SrsHttpHooks::on_close(url, connection_id, ip, req); @@ -1035,6 +1035,7 @@ int SrsRtmpConn::http_hooks_on_publish() return ret; } + int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_publish->args.size(); i++) { std::string url = on_publish->args.at(i); if ((ret = SrsHttpHooks::on_publish(url, connection_id, ip, req)) != ERROR_SUCCESS) { @@ -1058,6 +1059,7 @@ void SrsRtmpConn::http_hooks_on_unpublish() return; } + int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_unpublish->args.size(); i++) { std::string url = on_unpublish->args.at(i); SrsHttpHooks::on_unpublish(url, connection_id, ip, req); @@ -1077,6 +1079,7 @@ int SrsRtmpConn::http_hooks_on_play() return ret; } + int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_play->args.size(); i++) { std::string url = on_play->args.at(i); if ((ret = SrsHttpHooks::on_play(url, connection_id, ip, req)) != ERROR_SUCCESS) { @@ -1100,6 +1103,7 @@ void SrsRtmpConn::http_hooks_on_stop() return; } + int connection_id = _srs_context->get_id(); for (int i = 0; i < (int)on_stop->args.size(); i++) { std::string url = on_stop->args.at(i); SrsHttpHooks::on_stop(url, connection_id, ip, req); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 42b693236..bdea60094 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -857,6 +857,7 @@ void SrsServer::resample_kbps(SrsConnection* conn, bool do_resample) kbps->add_delta(conn); + // resample for server. if (do_resample) { kbps->sample(); } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 3f7dac932..5dd930ff3 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -128,17 +128,45 @@ private: SrsIngester* ingester; #endif private: + /** + * the pid file fd, lock the file write when server is running. + * @remark the init.d script should cleanup the pid file, when stop service, + * for the server never delete the file; when system startup, the pid in pid file + * maybe valid but the process is not SRS, the init.d script will never start server. + */ int pid_fd; + /** + * all connections, connection manager + */ std::vector conns; + /** + * all listners, listener manager. + */ std::vector listeners; + /** + * signal manager which convert gignal to io message. + */ SrsSignalManager* signal_manager; + /** + * server total kbps. + */ SrsKbps* kbps; + /** + * user send the signal, convert to variable. + */ bool signal_reload; bool signal_gmc_stop; public: SrsServer(); virtual ~SrsServer(); +public: + /** + * the destroy is for gmc to analysis the memory leak, + * if not destroy global/static data, the gmc will warning memory leak. + * in service, server never destroy, directly exit when restart. + */ virtual void destroy(); +// server startup workflow, @see run_master() public: virtual int initialize(); virtual int initialize_signal(); @@ -148,18 +176,58 @@ public: virtual int register_signal(); virtual int ingest(); virtual int cycle(); +// server utility +public: + /** + * callback for connection to remove itself. + * when connection thread cycle terminated, callback this to delete connection. + * @see SrsConnection.on_thread_stop(). + */ virtual void remove(SrsConnection* conn); + /** + * callback for signal manager got a signal. + * the signal manager convert signal to io message, + * whatever, we will got the signo like the orignal signal(int signo) handler. + * @remark, direclty exit for SIGTERM. + * @remark, do reload for SIGNAL_RELOAD. + * @remark, for SIGINT and SIGUSR2: + * no gmc, directly exit. + * for gmc, set the variable signal_gmc_stop, the cycle will return and cleanup for gmc. + */ virtual void on_signal(int signo); private: + /** + * the server thread main cycle, + * update the global static data, for instance, the current time, + * the cpu/mem/network statistic. + */ virtual int do_cycle(); + /** + * listen at specified protocol. + */ virtual int listen_rtmp(); virtual int listen_http_api(); virtual int listen_http_stream(); + /** + * close the listeners for specified type, + * remove the listen object from manager. + */ virtual void close_listeners(SrsListenerType type); - // resample the server kbps. resample all when conn is NULL. + /** + * resample the server kbps. + * if conn is NULL, resample all connections delta, then calc the total kbps. + * @param conn, the connection to do resample the kbps. NULL to resample all connections. + * @param do_resample, whether resample the server kbps. always false when sample a connection. + */ virtual void resample_kbps(SrsConnection* conn, bool do_resample = true); // internal only public: + /** + * when listener got a fd, notice server to accept it. + * @param type, the client type, used to create concrete connection, + * for instance RTMP connection to serve client. + * @param client_stfd, the client fd in st boxed, the underlayer fd. + */ virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd); // interface ISrsThreadHandler. public: