diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index deada64a1..1137f4add 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -150,15 +150,31 @@ void SrsKafkaRequestHeader::set_api_key(SrsKafkaApiKey key) api_key = (int16_t)key; } -SrsKafkaResponse::SrsKafkaResponse() +SrsKafkaResponseHeader::SrsKafkaResponseHeader() { + size = 0; correlation_id = 0; } -SrsKafkaResponse::~SrsKafkaResponse() +SrsKafkaResponseHeader::~SrsKafkaResponseHeader() { } +int SrsKafkaResponseHeader::header_size() +{ + return 4; +} + +int SrsKafkaResponseHeader::message_size() +{ + return size - header_size(); +} + +int SrsKafkaResponseHeader::total_size() +{ + return 4 + size; +} + SrsKafkaMessage::SrsKafkaMessage() { offset = 0; diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index dc6a9c61d..bcb0b7ffa 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -179,18 +179,24 @@ public: virtual ~SrsKafkaRequestHeader(); public: /** - * the size of header, exclude the 4bytes size. - * @remark total_size = 4 + header_size + message_size. + * the layout of request: + * +-----------+----------------------------------+ + * | 4B size | [size] bytes | + * +-----------+------------+---------------------+ + * | 4B size | header | message | + * +-----------+------------+---------------------+ + * | total size = 4 + header + message | + * +----------------------------------------------+ + * where the header is specifies this request header without the start 4B size. + * @remark size = 4 + header + message. */ virtual int header_size(); /** * the size of message, the bytes left after the header. - * @remark total_size = 4 + header_size + message_size. */ virtual int message_size(); /** - * the total size of the request, 4bytes + size of header and message. - * @remark total_size = 4 + header_size + message_size. + * the total size of the request, includes the 4B size. */ virtual int total_size(); public: @@ -210,21 +216,53 @@ public: }; /** - * the common kafka response. - * The response will always match the paired request (e.g. we will + * the header of response, include the size of response. + * The response will always match the paired request (e.g. we will * send a MetadataResponse in return to a MetadataRequest). * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Responses */ -class SrsKafkaResponse +class SrsKafkaResponseHeader { -protected: +private: /** - * The server passes back whatever integer the client supplied as the correlation in the request. + * The MessageSize field gives the size of the subsequent request or response + * message in bytes. The client can read requests by first reading this 4 byte + * size as an integer N, and then reading and parsing the subsequent N bytes + * of the request. + */ + int32_t size; +private: + /** + * This is a user-supplied integer. It will be passed back in + * the response by the server, unmodified. It is useful for matching + * request and response between the client and server. */ int32_t correlation_id; public: - SrsKafkaResponse(); - virtual ~SrsKafkaResponse(); + SrsKafkaResponseHeader(); + virtual ~SrsKafkaResponseHeader(); +public: + /** + * the layout of response: + * +-----------+----------------------------------+ + * | 4B size | [size] bytes | + * +-----------+------------+---------------------+ + * | 4B size | 4B header | message | + * +-----------+------------+---------------------+ + * | total size = 4 + 4 + message | + * +----------------------------------------------+ + * where the header is specifies this request header without the start 4B size. + * @remark size = 4 + 4 + message. + */ + virtual int header_size(); + /** + * the size of message, the bytes left after the header. + */ + virtual int message_size(); + /** + * the total size of the request, includes the 4B size. + */ + virtual int total_size(); }; /** @@ -316,6 +354,15 @@ public: virtual ~SrsKafkaTopicMetadataRequest(); }; +class SrsKafkaTopicMetadataResponse +{ +private: + SrsKafkaRequestHeader header; +public: + SrsKafkaTopicMetadataResponse(); + virtual ~SrsKafkaTopicMetadataResponse(); +}; + /** * the kafka protocol stack, use to send and recv kakfa messages. */