diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 89da50e5f..9dd1f3def 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -34,6 +34,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_AUTO_KAFKA @@ -178,6 +179,14 @@ int SrsKafkaProducer::request_metadata() srs_parse_endpoint(broker, server, port); } + std::string topic = _srs_config->get_kafka_topic(); + if (true) { + std::string senabled = srs_bool2switch(enabled); + std::string sbrokers = srs_join_vector_string(brokers->args, ","); + srs_trace("kafka request enabled:%s, brokers:%s, current:[%d]%s:%d, topic:%s", + senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str()); + } + // connect to kafka server. if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) { srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret); @@ -185,19 +194,12 @@ int SrsKafkaProducer::request_metadata() } // do fetch medata from broker. - std::string topic = _srs_config->get_kafka_topic(); - if ((ret = kafka->fetch_metadata(topic)) != ERROR_SUCCESS) { + SrsKafkaTopicMetadataResponse* metadata = NULL; + if ((ret = kafka->fetch_metadata(topic, &metadata)) != ERROR_SUCCESS) { srs_error("kafka fetch metadata failed. ret=%d", ret); return ret; } - - // log when completed. - if (true) { - std::string senabled = srs_bool2switch(enabled); - std::string sbrokers = srs_join_vector_string(brokers->args, ","); - srs_trace("kafka ok, enabled:%s, brokers:%s, current:[%d]%s:%d, topic:%s", - senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str()); - } + SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata); meatadata_ok = true; diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index 0ae7384e9..87dabe6a3 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -1051,8 +1051,10 @@ SrsKafkaClient::~SrsKafkaClient() srs_freep(protocol); } -int SrsKafkaClient::fetch_metadata(string topic) +int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse** pmsg) { + *pmsg = NULL; + int ret = ERROR_SUCCESS; SrsKafkaTopicMetadataRequest* req = new SrsKafkaTopicMetadataRequest(); @@ -1064,17 +1066,21 @@ int SrsKafkaClient::fetch_metadata(string topic) return ret; } - SrsKafkaResponse* res = NULL; - if ((ret = protocol->recv_message(&res)) != ERROR_SUCCESS) { + if ((ret = protocol->expect_message(pmsg)) != ERROR_SUCCESS) { srs_error("kafka recv response failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsKafkaResponse, res); - - // TODO: FIXME: implements it. return ret; } +vector srs_kafka_array2vector(SrsKafkaArray* arr) +{ + vector strs; + for (int i = 0; i < arr->size(); i++) { + } + return strs; +} + #endif diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 714f8220d..6e253e744 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -666,7 +666,7 @@ public: */ class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse { -private: +public: SrsKafkaArray brokers; SrsKafkaArray metadatas; public: @@ -741,6 +741,36 @@ public: * @param pmsg output the received message. user must free it. */ virtual int recv_message(SrsKafkaResponse** pmsg); +public: + /** + * expect specified message. + */ + template + int expect_message(T** pmsg) + { + int ret = ERROR_SUCCESS; + + while (true) { + SrsKafkaResponse* res = NULL; + if ((ret = recv_message(&res)) != ERROR_SUCCESS) { + srs_error("recv response failed. ret=%d", ret); + return ret; + } + + // drop not matched. + T* msg = dynamic_cast(res); + if (!msg) { + srs_info("kafka drop response."); + srs_freep(res); + continue; + } + + *pmsg = msg; + break; + } + + return ret; + } }; /** @@ -757,9 +787,12 @@ public: /** * fetch the metadata from broker for topic. */ - virtual int fetch_metadata(std::string topic); + virtual int fetch_metadata(std::string topic, SrsKafkaTopicMetadataResponse** pmsg); }; +// convert kafka array[string] to vector[string] +extern std::vector srs_kafka_array2vector(SrsKafkaArray* arr); + #endif #endif diff --git a/trunk/src/protocol/srs_rtmp_stack.hpp b/trunk/src/protocol/srs_rtmp_stack.hpp index e895fc3f3..12dbb574d 100644 --- a/trunk/src/protocol/srs_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_rtmp_stack.hpp @@ -997,7 +997,7 @@ public: * for example: * SrsCommonMessage* msg = NULL; * SrsConnectAppResPacket* pkt = NULL; - * if ((ret = server->expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + * if ((ret = server->expect_message(&msg, &pkt)) != ERROR_SUCCESS) { * return ret; * } * // use then free msg and pkt