1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

fix mem leak of encoder, edge and source. add destroy for gmc to detect mem leak. to 0.9.89

This commit is contained in:
winlin 2014-05-03 22:59:21 +08:00
parent 9a1c478266
commit 96a5c7b1ab
11 changed files with 145 additions and 90 deletions

View file

@ -35,26 +35,59 @@ SrsConnection::SrsConnection(SrsServer* srs_server, st_netfd_t client_stfd)
server = srs_server; server = srs_server;
stfd = client_stfd; stfd = client_stfd;
connection_id = 0; connection_id = 0;
pthread = new SrsThread(this, 0);
} }
SrsConnection::~SrsConnection() SrsConnection::~SrsConnection()
{ {
srs_freepa(ip); stop();
srs_close_stfd(stfd);
} }
int SrsConnection::start() int SrsConnection::start()
{
return pthread->start();
}
int SrsConnection::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if (st_thread_create(cycle_thread, this, 0, 0) == NULL) { _srs_context->generate_id();
ret = ERROR_ST_CREATE_CYCLE_THREAD; connection_id = _srs_context->get_id();
srs_error("st_thread_create conn cycle thread error. ret=%d", ret);
return ret;
}
srs_verbose("create st conn cycle thread success.");
return ret; ret = do_cycle();
// if socket io error, set to closed.
if (srs_is_client_gracefully_close(ret)) {
ret = ERROR_SOCKET_CLOSED;
}
// success.
if (ret == ERROR_SUCCESS) {
srs_trace("client process normally finished. ret=%d", ret);
}
// client close peer.
if (ret == ERROR_SOCKET_CLOSED) {
srs_warn("client disconnect peer. ret=%d", ret);
}
// set loop to stop to quit.
pthread->stop_loop();
return ERROR_SUCCESS;
}
void SrsConnection::on_thread_stop()
{
server->remove(this);
}
void SrsConnection::stop()
{
srs_close_stfd(stfd);
srs_freep(pthread);
srs_freepa(ip);
} }
int SrsConnection::get_peer_ip() int SrsConnection::get_peer_ip()
@ -92,40 +125,3 @@ int SrsConnection::get_peer_ip()
return ret; return ret;
} }
void SrsConnection::cycle()
{
int ret = ERROR_SUCCESS;
_srs_context->generate_id();
connection_id = _srs_context->get_id();
ret = do_cycle();
// if socket io error, set to closed.
if (srs_is_client_gracefully_close(ret)) {
ret = ERROR_SOCKET_CLOSED;
}
// success.
if (ret == ERROR_SUCCESS) {
srs_trace("client process normally finished. ret=%d", ret);
}
// client close peer.
if (ret == ERROR_SOCKET_CLOSED) {
srs_warn("client disconnect peer. ret=%d", ret);
}
server->remove(this);
}
void* SrsConnection::cycle_thread(void* arg)
{
SrsConnection* conn = (SrsConnection*)arg;
srs_assert(conn != NULL);
conn->cycle();
return NULL;
}

View file

@ -31,10 +31,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp> #include <srs_core.hpp>
#include <srs_app_st.hpp> #include <srs_app_st.hpp>
#include <srs_app_thread.hpp>
class SrsServer; class SrsServer;
class SrsConnection class SrsConnection : public ISrsThreadHandler
{ {
private:
SrsThread* pthread;
protected: protected:
char* ip; char* ip;
SrsServer* server; SrsServer* server;
@ -45,13 +48,13 @@ public:
virtual ~SrsConnection(); virtual ~SrsConnection();
public: public:
virtual int start(); virtual int start();
virtual int cycle();
virtual void on_thread_stop();
protected: protected:
virtual int do_cycle() = 0; virtual int do_cycle() = 0;
virtual void stop();
protected: protected:
virtual int get_peer_ip(); virtual int get_peer_ip();
private:
virtual void cycle();
static void* cycle_thread(void* arg);
}; };
#endif #endif

View file

@ -335,6 +335,9 @@ SrsEdgeForwarder::SrsEdgeForwarder()
SrsEdgeForwarder::~SrsEdgeForwarder() SrsEdgeForwarder::~SrsEdgeForwarder()
{ {
stop(); stop();
srs_freep(pthread);
srs_freep(queue);
} }
void SrsEdgeForwarder::set_queue_size(double queue_size) void SrsEdgeForwarder::set_queue_size(double queue_size)

View file

@ -53,6 +53,7 @@ SrsEncoder::~SrsEncoder()
on_unpublish(); on_unpublish();
srs_freep(pthread); srs_freep(pthread);
srs_freep(pithy_print);
} }
int SrsEncoder::on_publish(SrsRequest* req) int SrsEncoder::on_publish(SrsRequest* req)

View file

@ -79,6 +79,8 @@ SrsRtmpConn::~SrsRtmpConn()
{ {
_srs_config->unsubscribe(this); _srs_config->unsubscribe(this);
stop();
srs_freep(req); srs_freep(req);
srs_freep(res); srs_freep(res);
srs_freep(rtmp); srs_freep(rtmp);

View file

@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#ifdef SRS_AUTO_INGEST #ifdef SRS_AUTO_INGEST
#include <srs_app_ingest.hpp> #include <srs_app_ingest.hpp>
#endif #endif
#include <srs_app_source.hpp>
#define SERVER_LISTEN_BACKLOG 512 #define SERVER_LISTEN_BACKLOG 512
@ -251,6 +252,13 @@ int SrsSignalManager::start()
sa.sa_flags = 0; sa.sa_flags = 0;
sigaction(SIGINT, &sa, NULL); sigaction(SIGINT, &sa, NULL);
sa.sa_handler = SrsSignalManager::sig_catcher;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGUSR2, &sa, NULL);
srs_trace("signal installed");
return pthread->start(); return pthread->start();
} }
@ -316,27 +324,17 @@ SrsServer::~SrsServer()
void SrsServer::destroy() void SrsServer::destroy()
{ {
_srs_config->unsubscribe(this); srs_warn("start destroy server");
if (true) { _srs_config->unsubscribe(this);
std::vector<SrsConnection*>::iterator it;
for (it = conns.begin(); it != conns.end(); ++it) {
SrsConnection* conn = *it;
srs_freep(conn);
}
conns.clear();
}
close_listeners(SrsListenerRtmpStream); close_listeners(SrsListenerRtmpStream);
close_listeners(SrsListenerHttpApi); close_listeners(SrsListenerHttpApi);
close_listeners(SrsListenerHttpStream); close_listeners(SrsListenerHttpStream);
if (pid_fd > 0) { #ifdef SRS_AUTO_INGEST
::close(pid_fd); ingester->stop();
pid_fd = -1; #endif
}
srs_freep(signal_manager);
#ifdef SRS_AUTO_HTTP_API #ifdef SRS_AUTO_HTTP_API
srs_freep(http_api_handler); srs_freep(http_api_handler);
@ -348,6 +346,27 @@ void SrsServer::destroy()
#ifdef SRS_AUTO_INGEST #ifdef SRS_AUTO_INGEST
srs_freep(ingester); srs_freep(ingester);
#endif #endif
if (pid_fd > 0) {
::close(pid_fd);
pid_fd = -1;
}
srs_freep(signal_manager);
for (std::vector<SrsConnection*>::iterator it = conns.begin(); it != conns.end();) {
SrsConnection* conn = *it;
// remove the connection, then free it,
// for the free will remove itself from server,
// when erased here, the remove of server will ignore.
it = conns.erase(it);
srs_freep(conn);
}
conns.clear();
SrsSource::destroy();
} }
int SrsServer::initialize() int SrsServer::initialize()
@ -540,12 +559,14 @@ int SrsServer::cycle()
ret = do_cycle(); ret = do_cycle();
#ifdef SRS_AUTO_INGEST
ingester->stop();
#endif
destroy(); destroy();
#ifdef SRS_AUTO_GPERF_MC
srs_warn("sleep a long time for system st-threads to cleanup.");
st_usleep(3 * 1000 * 1000);
srs_warn("system quit");
#endif
return ret; return ret;
} }
@ -553,10 +574,14 @@ void SrsServer::remove(SrsConnection* conn)
{ {
std::vector<SrsConnection*>::iterator it = std::find(conns.begin(), conns.end(), conn); std::vector<SrsConnection*>::iterator it = std::find(conns.begin(), conns.end(), conn);
if (it != conns.end()) { // removed by destroy, ignore.
conns.erase(it); if (it == conns.end()) {
srs_warn("server moved connection, ignore.");
return;
} }
conns.erase(it);
srs_info("conn removed. conns=%d", (int)conns.size()); srs_info("conn removed. conns=%d", (int)conns.size());
// all connections are created by server, // all connections are created by server,
@ -571,7 +596,7 @@ void SrsServer::on_signal(int signo)
return; return;
} }
if (signo == SIGINT) { if (signo == SIGINT || signo == SIGUSR2) {
#ifdef SRS_AUTO_GPERF_MC #ifdef SRS_AUTO_GPERF_MC
srs_trace("gmc is on, main cycle will terminate normally."); srs_trace("gmc is on, main cycle will terminate normally.");
signal_gmc_stop = true; signal_gmc_stop = true;
@ -611,6 +636,7 @@ int SrsServer::do_cycle()
// because directly exit will cause core-dump. // because directly exit will cause core-dump.
#ifdef SRS_AUTO_GPERF_MC #ifdef SRS_AUTO_GPERF_MC
if (signal_gmc_stop) { if (signal_gmc_stop) {
srs_warn("gmc got singal to stop server.");
return ret; return ret;
} }
#endif #endif

View file

@ -437,6 +437,16 @@ int SrsSource::find(SrsRequest* req, SrsSource** ppsource)
return ret; return ret;
} }
void SrsSource::destroy()
{
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
SrsSource* source = it->second;
srs_freep(source);
}
pool.clear();
}
SrsSource::SrsSource(SrsRequest* req) SrsSource::SrsSource(SrsRequest* req)
{ {
_req = req->copy(); _req = req->copy();
@ -468,14 +478,9 @@ SrsSource::~SrsSource()
{ {
_srs_config->unsubscribe(this); _srs_config->unsubscribe(this);
if (true) { // never free the consumers,
std::vector<SrsConsumer*>::iterator it; // for all consumers are auto free.
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
srs_freep(consumer);
}
consumers.clear(); consumers.clear();
}
if (true) { if (true) {
std::vector<SrsForwarder*>::iterator it; std::vector<SrsForwarder*>::iterator it;

View file

@ -222,6 +222,11 @@ public:
* @remark stream_url should without port and schema. * @remark stream_url should without port and schema.
*/ */
static int find(SrsRequest* req, SrsSource** ppsource); static int find(SrsRequest* req, SrsSource** ppsource);
/**
* when system exit, destroy the sources,
* for gmc to analysis mem leaks.
*/
static void destroy();
private: private:
// deep copy of client request. // deep copy of client request.
SrsRequest* _req; SrsRequest* _req;

View file

@ -121,6 +121,11 @@ bool SrsThread::can_loop()
return loop; return loop;
} }
void SrsThread::stop_loop()
{
loop = false;
}
void SrsThread::thread_cycle() void SrsThread::thread_cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;

View file

@ -49,7 +49,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* when thread interrupt, the socket maybe not got EINT, * when thread interrupt, the socket maybe not got EINT,
* espectially on st_usleep(), so the cycle must check the loop, * espectially on st_usleep(), so the cycle must check the loop,
* when handler->cycle() has loop itself, for example: * when handler->cycle() has loop itself, for example:
* handler->cycle() is:
* while (true): * while (true):
* st_usleep(0); * st_usleep(0);
* if (read_from_socket(skt) < 0) break; * if (read_from_socket(skt) < 0) break;
@ -57,11 +56,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* but when thread stop interrupt the s_usleep(0), then the loop is * but when thread stop interrupt the s_usleep(0), then the loop is
* death loop. * death loop.
* in a word, the handler->cycle() must: * in a word, the handler->cycle() must:
* handler->cycle() is:
* while (pthread->can_loop()): * while (pthread->can_loop()):
* st_usleep(0); * st_usleep(0);
* if (read_from_socket(skt) < 0) break; * if (read_from_socket(skt) < 0) break;
* check the loop, then it works. * check the loop, then it works.
*
* in the thread itself, that is the cycle method,
* if itself want to terminate the thread, should never use stop(),
* but use stop_loop() to set the loop to false and terminate normally.
*/ */
class ISrsThreadHandler class ISrsThreadHandler
{ {
@ -117,12 +119,19 @@ public:
* @remark user can stop multiple times, ignore if already stopped. * @remark user can stop multiple times, ignore if already stopped.
*/ */
virtual void stop(); virtual void stop();
public:
/** /**
* whether the thread should loop, * whether the thread should loop,
* used for handler->cycle() which has a loop method, * used for handler->cycle() which has a loop method,
* to check this method, break if false. * to check this method, break if false.
*/ */
virtual bool can_loop(); virtual bool can_loop();
/**
* for the loop thread to stop the loop.
* other thread can directly use stop() to stop loop and wait for quit.
* this stop loop method only set loop to false.
*/
virtual void stop_loop();
private: private:
virtual void thread_cycle(); virtual void thread_cycle();
static void* thread_fun(void* arg); static void* thread_fun(void* arg);

View file

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version // current release version
#define VERSION_MAJOR "0" #define VERSION_MAJOR "0"
#define VERSION_MINOR "9" #define VERSION_MINOR "9"
#define VERSION_REVISION "88" #define VERSION_REVISION "89"
#define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
// server info. // server info.
#define RTMP_SIG_SRS_KEY "srs" #define RTMP_SIG_SRS_KEY "srs"