From 493d2822559befdb9f63fc9a79b265fcdfd8ab44 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 19 Oct 2015 13:55:53 +0800 Subject: [PATCH] kafka refine array, to decode and create object. --- trunk/src/kernel/srs_kernel_error.hpp | 1 + trunk/src/protocol/srs_kafka_stack.cpp | 40 +++++++++++++++++++ trunk/src/protocol/srs_kafka_stack.hpp | 53 +++++++++++++++++++------- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index bddb811a4..354dcd3cd 100755 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -272,6 +272,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_KAFKA_CODEC_BYTES 4031 #define ERROR_KAFKA_CODEC_REQUEST 4032 #define ERROR_KAFKA_CODEC_RESPONSE 4033 +#define ERROR_KAFKA_CODEC_ARRAY 4034 /////////////////////////////////////////////////////// // HTTP API error. diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index e935b8c3f..144737913 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -30,6 +30,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_AUTO_KAFKA @@ -682,10 +683,12 @@ SrsKafkaApiKey SrsKafkaCorrelationPool::get(int32_t correlation_id) SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io) { skt = io; + reader = new SrsFastStream(); } SrsKafkaProtocol::~SrsKafkaProtocol() { + srs_freep(reader); } int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg) @@ -733,6 +736,36 @@ int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg) return ret; } +int SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg) +{ + *pmsg = NULL; + + int ret = ERROR_SUCCESS; + + SrsKafkaResponseHeader header; + while (reader->size() < header.size()) { + if ((ret = reader->grow(skt, header.size())) != ERROR_SUCCESS) { + srs_error("kafka recv message failed. ret=%d", ret); + return ret; + } + } + + SrsBuffer buffer; + if ((ret = buffer.initialize(reader->bytes(), reader->size())) != ERROR_SUCCESS) { + return ret; + } + + SrsBuffer* buf = &buffer; + if ((ret = header.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode response header failed. ret=%d", ret); + return ret; + } + + // TODO: FIXME: decode message. + + return ret; +} + SrsKafkaClient::SrsKafkaClient(ISrsProtocolReaderWriter* io) { protocol = new SrsKafkaProtocol(io); @@ -756,6 +789,13 @@ int SrsKafkaClient::fetch_metadata(string topic) return ret; } + SrsKafkaResponse* res = NULL; + if ((ret = protocol->recv_message(&res)) != ERROR_SUCCESS) { + srs_error("kafka recv response failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsKafkaResponse, res); + // TODO: FIXME: implements it. return ret; diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 1ea3ae8c1..986f03b31 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -35,7 +35,9 @@ #include #include +#include +class SrsFastStream; class ISrsProtocolReaderWriter; #ifdef SRS_AUTO_KAFKA @@ -113,9 +115,9 @@ public: * array of a structure foo as [foo]. * * Usage: - * SrsKafkaArray body; + * SrsKafkaArray body; * body.append(new SrsKafkaBytes()); - * @remark the typename T must be a ISrsCodec* + * @remark array elem is the T*, which must be ISrsCodec* * * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests */ @@ -123,9 +125,9 @@ template class SrsKafkaArray : public ISrsCodec { private: - int length; - std::vector elems; - typedef typename std::vector::iterator SrsIterator; + int32_t length; + std::vector elems; + typedef typename std::vector::iterator SrsIterator; public: SrsKafkaArray() { @@ -134,13 +136,13 @@ public: virtual ~SrsKafkaArray() { for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { - T elem = *it; + T* elem = *it; srs_freep(elem); } elems.clear(); } public: - virtual void append(T elem) + virtual void append(T* elem) { length++; elems.push_back(elem); @@ -149,10 +151,10 @@ public: public: virtual int size() { - int s = 0; + int s = 4; for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { - T elem = *it; + T* elem = *it; s += elem->size(); } @@ -162,9 +164,17 @@ public: { int ret = ERROR_SUCCESS; + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_ARRAY; + srs_error("kafka encode array failed. ret=%d", ret); + return ret; + } + buf->write_4bytes(length); + for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { - T elem = *it; + T* elem = *it; if ((ret = elem->encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode array elem failed. ret=%d", ret); return ret; } } @@ -175,11 +185,22 @@ public: { int ret = ERROR_SUCCESS; - for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { - T elem = *it; + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_ARRAY; + srs_error("kafka decode array failed. ret=%d", ret); + return ret; + } + length = buf->read_2bytes(); + + for (int i = 0; i < length; i++) { + T* elem = new T(); if ((ret = elem->decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode array elem failed. ret=%d", ret); + srs_freep(elem); return ret; } + + elems.push_back(elem); } return ret; @@ -493,7 +514,7 @@ public: class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest { private: - SrsKafkaArray topics; + SrsKafkaArray topics; public: SrsKafkaTopicMetadataRequest(); virtual ~SrsKafkaTopicMetadataRequest(); @@ -559,6 +580,7 @@ class SrsKafkaProtocol { private: ISrsProtocolReaderWriter* skt; + SrsFastStream* reader; public: SrsKafkaProtocol(ISrsProtocolReaderWriter* io); virtual ~SrsKafkaProtocol(); @@ -568,6 +590,11 @@ public: * @param msg the msg to send. user must not free it again. */ virtual int send_and_free_message(SrsKafkaRequest* msg); + /** + * read the message from kafka server. + * @param pmsg output the received message. user must free it. + */ + virtual int recv_message(SrsKafkaResponse** pmsg); }; /**