From 3c64e4b957aafb48ee9fb1c75661f3c82947f2c9 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 16 Oct 2015 17:18:16 +0800 Subject: [PATCH] kafka encode and send packet. --- trunk/src/kernel/srs_kernel_error.hpp | 6 +- trunk/src/protocol/srs_kafka_stack.cpp | 306 ++++++++++++++++++++++--- trunk/src/protocol/srs_kafka_stack.hpp | 62 ++++- 3 files changed, 335 insertions(+), 39 deletions(-) diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 5e1fed433..bddb811a4 100755 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -236,7 +236,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_REQUEST_DATA 3066 /////////////////////////////////////////////////////// -// HTTP/StreamCaster protocol error. +// HTTP/StreamCaster/KAFKA protocol error. /////////////////////////////////////////////////////// #define ERROR_HTTP_PATTERN_EMPTY 4000 #define ERROR_HTTP_PATTERN_DUPLICATED 4001 @@ -268,6 +268,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_AVC_NALU_UEV 4027 #define ERROR_AAC_BYTES_INVALID 4028 #define ERROR_HTTP_REQUEST_EOF 4029 +#define ERROR_KAFKA_CODEC_STRING 4030 +#define ERROR_KAFKA_CODEC_BYTES 4031 +#define ERROR_KAFKA_CODEC_REQUEST 4032 +#define ERROR_KAFKA_CODEC_RESPONSE 4033 /////////////////////////////////////////////////////// // HTTP API error. diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index 67ee28907..44e2faed7 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -29,22 +29,23 @@ using namespace std; #include #include #include +#include #ifdef SRS_AUTO_KAFKA SrsKafkaString::SrsKafkaString() { - size = -1; + _size = -1; data = NULL; } SrsKafkaString::SrsKafkaString(string v) { - size = (int16_t)v.length(); + _size = (int16_t)v.length(); - srs_assert(size > 0); - data = new char[size]; - memcpy(data, v.data(), size); + srs_assert(_size > 0); + data = new char[_size]; + memcpy(data, v.data(), _size); } SrsKafkaString::~SrsKafkaString() @@ -54,32 +55,92 @@ SrsKafkaString::~SrsKafkaString() bool SrsKafkaString::null() { - return size == -1; + return _size == -1; } bool SrsKafkaString::empty() { - return size <= 0; + return _size <= 0; } -int SrsKafkaString::total_size() +int SrsKafkaString::size() { - return 2 + (size == -1? 0 : size); + return _size == -1? 2 : 2 + _size; +} + +int SrsKafkaString::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(2)) { + ret = ERROR_KAFKA_CODEC_STRING; + srs_error("kafka encode string failed. ret=%d", ret); + return ret; + } + buf->write_2bytes(_size); + + if (_size <= 0) { + return ret; + } + + if (!buf->require(_size)) { + ret = ERROR_KAFKA_CODEC_STRING; + srs_error("kafka encode string data failed. ret=%d", ret); + return ret; + } + buf->write_bytes(data, _size); + + return ret; +} + +int SrsKafkaString::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(2)) { + ret = ERROR_KAFKA_CODEC_STRING; + srs_error("kafka decode string failed. ret=%d", ret); + return ret; + } + _size = buf->read_2bytes(); + + if (_size != -1 && _size < 0) { + ret = ERROR_KAFKA_CODEC_STRING; + srs_error("kafka string must be -1 or >=0, actual is %d. ret=%d", _size, ret); + return ret; + } + + if (_size <= 0) { + return ret; + } + + if (!buf->require(_size)) { + ret = ERROR_KAFKA_CODEC_STRING; + srs_error("kafka decode string data failed. ret=%d", ret); + return ret; + } + + srs_freep(data); + data = new char[_size]; + + buf->read_bytes(data, _size); + + return ret; } SrsKafkaBytes::SrsKafkaBytes() { - size = -1; + _size = -1; data = NULL; } SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v) { - size = (int16_t)nb_v; + _size = (int16_t)nb_v; - srs_assert(size > 0); - data = new char[size]; - memcpy(data, v, size); + srs_assert(_size > 0); + data = new char[_size]; + memcpy(data, v, _size); } SrsKafkaBytes::~SrsKafkaBytes() @@ -89,17 +150,72 @@ SrsKafkaBytes::~SrsKafkaBytes() bool SrsKafkaBytes::null() { - return size == -1; + return _size == -1; } bool SrsKafkaBytes::empty() { - return size <= 0; + return _size <= 0; } -int SrsKafkaBytes::total_size() +int SrsKafkaBytes::size() { - return 4 + (size == -1? 0 : size); + return 4 + (_size == -1? 0 : _size); +} + +int SrsKafkaBytes::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_BYTES; + srs_error("kafka encode bytes failed. ret=%d", ret); + return ret; + } + buf->write_4bytes(_size); + + if (_size <= 0) { + return ret; + } + + if (!buf->require(_size)) { + ret = ERROR_KAFKA_CODEC_BYTES; + srs_error("kafka encode bytes data failed. ret=%d", ret); + return ret; + } + buf->write_bytes(data, _size); + + return ret; +} + +int SrsKafkaBytes::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_BYTES; + srs_error("kafka decode bytes failed. ret=%d", ret); + return ret; + } + _size = buf->read_4bytes(); + + if (_size != -1 && _size < 0) { + ret = ERROR_KAFKA_CODEC_BYTES; + srs_error("kafka bytes must be -1 or >=0, actual is %d. ret=%d", _size, ret); + return ret; + } + + if (!buf->require(_size)) { + ret = ERROR_KAFKA_CODEC_BYTES; + srs_error("kafka decode bytes data failed. ret=%d", ret); + return ret; + } + + srs_freep(data); + data = new char[_size]; + buf->read_bytes(data, _size); + + return ret; } SrsKafkaRequestHeader::SrsKafkaRequestHeader() @@ -117,7 +233,7 @@ SrsKafkaRequestHeader::~SrsKafkaRequestHeader() int SrsKafkaRequestHeader::header_size() { - return 2 + 2 + 4 + client_id->total_size(); + return 2 + 2 + 4 + client_id->size(); } int SrsKafkaRequestHeader::message_size() @@ -178,14 +294,56 @@ int SrsKafkaRequestHeader::size() int SrsKafkaRequestHeader::encode(SrsBuffer* buf) { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + + if (!buf->require(4 + _size)) { + ret = ERROR_KAFKA_CODEC_REQUEST; + srs_error("kafka encode request failed. ret=%d", ret); + return ret; + } + + buf->write_4bytes(_size); + buf->write_2bytes(api_key); + buf->write_2bytes(api_version); + buf->write_4bytes(correlation_id); + + if ((ret = client_id->encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode request client_id failed. ret=%d", ret); + return ret; + } + return ret; } int SrsKafkaRequestHeader::decode(SrsBuffer* buf) { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_REQUEST; + srs_error("kafka decode request size failed. ret=%d", ret); + return ret; + } + _size = buf->read_4bytes(); + + if (_size <= 0) { + srs_warn("kafka got empty request"); + return ret; + } + + if (!buf->require(_size)) { + ret = ERROR_KAFKA_CODEC_REQUEST; + srs_error("kafka decode request message failed. ret=%d", ret); + return ret; + } + api_key = buf->read_2bytes(); + api_version = buf->read_2bytes(); + correlation_id = buf->read_4bytes(); + + if ((ret = client_id->decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode request client_id failed. ret=%d", ret); + return ret; + } + return ret; } @@ -222,14 +380,42 @@ int SrsKafkaResponseHeader::size() int SrsKafkaResponseHeader::encode(SrsBuffer* buf) { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + + if (!buf->require(4 + _size)) { + ret = ERROR_KAFKA_CODEC_RESPONSE; + srs_error("kafka encode response failed. ret=%d", ret); + return ret; + } + + buf->write_4bytes(_size); + buf->write_4bytes(correlation_id); + return ret; } int SrsKafkaResponseHeader::decode(SrsBuffer* buf) { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + + if (!buf->require(4)) { + ret = ERROR_KAFKA_CODEC_RESPONSE; + srs_error("kafka decode response size failed. ret=%d", ret); + return ret; + } + _size = buf->read_4bytes(); + + if (_size <= 0) { + srs_warn("kafka got empty response"); + return ret; + } + + if (!buf->require(_size)) { + ret = ERROR_KAFKA_CODEC_RESPONSE; + srs_error("kafka decode response message failed. ret=%d", ret); + return ret; + } + correlation_id = buf->read_4bytes(); + return ret; } @@ -326,22 +512,40 @@ void SrsKafkaTopicMetadataRequest::add_topic(string topic) int SrsKafkaTopicMetadataRequest::size() { - int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. - return ret; + return SrsKafkaRequest::size() + topics.size(); } int SrsKafkaTopicMetadataRequest::encode(SrsBuffer* buf) { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + + if ((ret = SrsKafkaRequest::encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode metadata request failed. ret=%d", ret); + return ret; + } + + if ((ret = topics.encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode metadata topics failed. ret=%d", ret); + return ret; + } + return ret; } int SrsKafkaTopicMetadataRequest::decode(SrsBuffer* buf) { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + + if ((ret = SrsKafkaRequest::decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode metadata request failed. ret=%d", ret); + return ret; + } + + if ((ret = topics.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode metadata topics failed. ret=%d", ret); + return ret; + } + return ret; } @@ -355,14 +559,19 @@ SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse() int SrsKafkaTopicMetadataResponse::size() { - int ret = ERROR_SUCCESS; // TODO: FIXME: implements it. - return ret; + return SrsKafkaResponse::size(); } int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf) { int ret = ERROR_SUCCESS; + + if ((ret = SrsKafkaResponse::encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode metadata response failed. ret=%d", ret); + return ret; + } + // TODO: FIXME: implements it. return ret; } @@ -370,6 +579,12 @@ int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf) int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf) { int ret = ERROR_SUCCESS; + + if ((ret = SrsKafkaResponse::decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode metadata response failed. ret=%d", ret); + return ret; + } + // TODO: FIXME: implements it. return ret; } @@ -387,7 +602,36 @@ int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg) { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + // TODO: FIXME: refine for performance issue. + SrsAutoFree(SrsKafkaRequest, msg); + + int size = msg->size(); + if (size <= 0) { + return ret; + } + + // TODO: FIXME: refine for performance issue. + char* bytes = new char[size]; + SrsAutoFree(char, bytes); + + // TODO: FIXME: refine for performance issue. + SrsBuffer* buf = new SrsBuffer(); + SrsAutoFree(SrsBuffer, buf); + + if ((ret = buf->initialize(bytes, size)) != ERROR_SUCCESS) { + srs_error("kafka create buffer failed. ret=%d", ret); + return ret; + } + + if ((ret = msg->encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode message failed. ret=%d", ret); + return ret; + } + + if ((ret = skt->write(bytes, size, NULL)) != ERROR_SUCCESS) { + srs_error("kafka send message failed. ret=%d", ret); + return ret; + } return ret; } diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 871403aab..933d2cd51 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -33,6 +33,7 @@ #include #include +#include class ISrsProtocolReaderWriter; @@ -59,10 +60,10 @@ enum SrsKafkaApiKey * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes */ -class SrsKafkaString +class SrsKafkaString : public ISrsCodec { private: - int16_t size; + int16_t _size; char* data; public: SrsKafkaString(); @@ -71,7 +72,11 @@ public: public: virtual bool null(); virtual bool empty(); - virtual int total_size(); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); }; /** @@ -79,10 +84,10 @@ public: * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes */ -class SrsKafkaBytes +class SrsKafkaBytes : public ISrsCodec { private: - int32_t size; + int32_t _size; char* data; public: SrsKafkaBytes(); @@ -91,7 +96,11 @@ public: public: virtual bool null(); virtual bool empty(); - virtual int total_size(); +// interface ISrsCodec +public: + virtual int size(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); }; /** @@ -107,7 +116,7 @@ public: * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests */ template -class SrsKafkaArray +class SrsKafkaArray : public ISrsCodec { private: int length; @@ -132,6 +141,45 @@ public: length++; elems.push_back(elem); } +// interface ISrsCodec +public: + virtual int size() + { + int s = 0; + + for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { + T elem = *it; + s += elem->size(); + } + + return s; + } + virtual int encode(SrsBuffer* buf) + { + int ret = ERROR_SUCCESS; + + for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { + T elem = *it; + if ((ret = elem->encode(buf)) != ERROR_SUCCESS) { + return ret; + } + } + + return ret; + } + virtual int decode(SrsBuffer* buf) + { + int ret = ERROR_SUCCESS; + + for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { + T elem = *it; + if ((ret = elem->decode(buf)) != ERROR_SUCCESS) { + return ret; + } + } + + return ret; + } }; /**