From 2d5cc7a3e45d16d787dba6558719c694f1c58566 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 22 Sep 2015 12:38:07 +0800 Subject: [PATCH] add structures for kafka --- trunk/src/protocol/srs_kafka_stack.cpp | 122 +++++++++++++++ trunk/src/protocol/srs_kafka_stack.hpp | 207 +++++++++++++++++++++++++ 2 files changed, 329 insertions(+) diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index fa2dcbe82..342475e4a 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -23,3 +23,125 @@ #include +using namespace std; + +SrsKafkaString::SrsKafkaString() +{ + size = -1; + data = NULL; +} + +SrsKafkaString::~SrsKafkaString() +{ + srs_freep(data); +} + +bool SrsKafkaString::null() +{ + return size == -1; +} + +bool SrsKafkaString::empty() +{ + return size <= 0; +} + +int SrsKafkaString::total_size() +{ + return 2 + (size == -1? 0 : size); +} + +SrsKafkaBytes::SrsKafkaBytes() +{ + size = -1; + data = NULL; +} + +SrsKafkaBytes::~SrsKafkaBytes() +{ + srs_freep(data); +} + +bool SrsKafkaBytes::null() +{ + return size == -1; +} + +bool SrsKafkaBytes::empty() +{ + return size <= 0; +} + +int SrsKafkaBytes::total_size() +{ + return 4 + (size == -1? 0 : size); +} + +SrsKafkaRequestHeader::SrsKafkaRequestHeader() +{ + size = 0; + api_key = api_version = 0; + correlation_id = 0; + client_id = new SrsKafkaString(); +} + +SrsKafkaRequestHeader::~SrsKafkaRequestHeader() +{ + srs_freep(client_id); +} + +int SrsKafkaRequestHeader::header_size() +{ + return 2 + 2 + 4 + client_id->total_size(); +} + +int SrsKafkaRequestHeader::message_size() +{ + return size - header_size(); +} + +int SrsKafkaRequestHeader::total_size() +{ + return 4 + size; +} + +SrsKafkaResponse::SrsKafkaResponse() +{ + correlation_id = 0; +} + +SrsKafkaResponse::~SrsKafkaResponse() +{ +} + +SrsKafkaMessage::SrsKafkaMessage() +{ + offset = 0; + message_size = 0; + + crc = 0; + magic_byte = attributes = 0; + key = new SrsKafkaBytes(); + value = new SrsKafkaBytes(); +} + +SrsKafkaMessage::~SrsKafkaMessage() +{ + srs_freep(key); + srs_freep(value); +} + +SrsKafkaMessageSet::SrsKafkaMessageSet() +{ +} + +SrsKafkaMessageSet::~SrsKafkaMessageSet() +{ + vector::iterator it; + for (it = messages.begin(); it != messages.end(); ++it) { + SrsKafkaMessage* message = *it; + srs_freep(message); + } + messages.clear(); +} + diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 03d17f312..5b2f14b5d 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -29,5 +29,212 @@ */ #include +#include + +/** + * These types consist of a signed integer giving a length N followed by N bytes of content. + * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes + */ +class SrsKafkaString +{ +private: + int16_t size; + char* data; +public: + SrsKafkaString(); + virtual ~SrsKafkaString(); +public: + virtual bool null(); + virtual bool empty(); + virtual int total_size(); +}; + +/** + * These types consist of a signed integer giving a length N followed by N bytes of content. + * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes + */ +class SrsKafkaBytes +{ +private: + int32_t size; + char* data; +public: + SrsKafkaBytes(); + virtual ~SrsKafkaBytes(); +public: + virtual bool null(); + virtual bool empty(); + virtual int total_size(); +}; + +/** + * the header of request, includes the size of request. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests + */ +class SrsKafkaRequestHeader +{ +private: + /** + * The MessageSize field gives the size of the subsequent request or response + * message in bytes. The client can read requests by first reading this 4 byte + * size as an integer N, and then reading and parsing the subsequent N bytes + * of the request. + */ + int32_t size; +private: + /** + * This is a numeric id for the API being invoked (i.e. is it + * a metadata request, a produce request, a fetch request, etc). + * @remark MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest + */ + int16_t api_key; + /** + * This is a numeric version number for this api. We version each API and + * this version number allows the server to properly interpret the request + * as the protocol evolves. Responses will always be in the format corresponding + * to the request version. Currently the supported version for all APIs is 0. + */ + int16_t api_version; + /** + * This is a user-supplied integer. It will be passed back in + * the response by the server, unmodified. It is useful for matching + * request and response between the client and server. + */ + int32_t correlation_id; + /** + * This is a user supplied identifier for the client application. + * The user can use any identifier they like and it will be used + * when logging errors, monitoring aggregates, etc. For example, + * one might want to monitor not just the requests per second overall, + * but the number coming from each client application (each of + * which could reside on multiple servers). This id acts as a + * logical grouping across all requests from a particular client. + */ + SrsKafkaString* client_id; +public: + SrsKafkaRequestHeader(); + virtual ~SrsKafkaRequestHeader(); +public: + /** + * the size of header, exclude the 4bytes size. + * @remark total_size = 4 + header_size + message_size. + */ + virtual int header_size(); + /** + * the size of message, the left bytes left after the header. + * @remark total_size = 4 + header_size + message_size. + */ + virtual int message_size(); + /** + * the total size of the request, 4bytes + size of header and message. + * @remark total_size = 4 + header_size + message_size. + */ + virtual int total_size(); +}; + +/** + * the common kafka response. + * The response will always match the paired request (e.g. we will + * send a MetadataResponse in return to a MetadataRequest). + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Responses + */ +class SrsKafkaResponse +{ +protected: + /** + * The server passes back whatever integer the client supplied as the correlation in the request. + */ + int32_t correlation_id; +public: + SrsKafkaResponse(); + virtual ~SrsKafkaResponse(); +}; + +/** + * the kafka message in message set. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + */ +struct SrsKafkaMessage +{ +// metadata. +public: + /** + * This is the offset used in kafka as the log sequence number. When the + * producer is sending messages it doesn't actually know the offset and + * can fill in any value here it likes. + */ + int64_t offset; + /** + * the size of this message. + */ + int32_t message_size; +// message. +public: + /** + * The CRC is the CRC32 of the remainder of the message bytes. + * This is used to check the integrity of the message on the broker and consumer. + */ + int32_t crc; + /** + * This is a version id used to allow backwards compatible evolution + * of the message binary format. The current value is 0. + */ + int8_t magic_byte; + /** + * This byte holds metadata attributes about the message. + * The lowest 2 bits contain the compression codec used + * for the message. The other bits should be set to 0. + */ + int8_t attributes; + /** + * The key is an optional message key that was used for + * partition assignment. The key can be null. + */ + SrsKafkaBytes* key; + /** + * The value is the actual message contents as an opaque byte array. + * Kafka supports recursive messages in which case this may itself + * contain a message set. The message can be null. + */ + SrsKafkaBytes* value; +public: + SrsKafkaMessage(); + virtual ~SrsKafkaMessage(); +}; + +/** + * a set of kafka message. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + */ +class SrsKafkaMessageSet +{ +private: + std::vector messages; +public: + SrsKafkaMessageSet(); + virtual ~SrsKafkaMessageSet(); +}; + +/** + * request the metadata from broker. + * This API answers the following questions: + * What topics exist? + * How many partitions does each topic have? + * Which broker is currently the leader for each partition? + * What is the host and port for each of these brokers? + * This is the only request that can be addressed to any broker in the cluster. + * + * Since there may be many topics the client can give an optional list of topic + * names in order to only return metadata for a subset of topics. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI + */ +class SrsKafkaTopicMetadataRequest +{ +public: +}; + #endif