diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index d94af3193..52236e9cd 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -275,8 +275,6 @@ SrsKafkaProducer::SrsKafkaProducer() cache = new SrsKafkaCache(); lb = new SrsLbRoundRobin(); - transport = new SrsTcpClient(); - kafka = new SrsKafkaClient(transport); } SrsKafkaProducer::~SrsKafkaProducer() @@ -284,8 +282,6 @@ SrsKafkaProducer::~SrsKafkaProducer() clear_metadata(); srs_freep(lb); - srs_freep(kafka); - srs_freep(transport); srs_freep(worker); srs_freep(pthread); @@ -443,6 +439,12 @@ int SrsKafkaProducer::request_metadata() return ret; } + SrsTcpClient* transport = new SrsTcpClient(); + SrsAutoFree(SrsTcpClient, transport); + + SrsKafkaClient* kafka = new SrsKafkaClient(transport); + SrsAutoFree(SrsKafkaClient, kafka); + std::string server; int port = SRS_CONSTS_KAFKA_DEFAULT_PORT; if (true) { @@ -460,7 +462,6 @@ int SrsKafkaProducer::request_metadata() } // reconnect to kafka server. - transport->close(); if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) { srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret); return ret; diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index a36942ee8..4300a9db7 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -164,8 +164,6 @@ public: private: SrsLbRoundRobin* lb; SrsAsyncCallWorker* worker; - SrsTcpClient* transport; - SrsKafkaClient* kafka; public: SrsKafkaProducer(); virtual ~SrsKafkaProducer();