From 76cd3f874972d06d10fd918dff09b3a30b0f5ff7 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 23 Oct 2015 10:16:53 +0800 Subject: [PATCH] kafka convert json to producer message. --- trunk/src/protocol/srs_kafka_stack.cpp | 109 +++++++++++++++++++++++-- trunk/src/protocol/srs_kafka_stack.hpp | 15 +++- 2 files changed, 115 insertions(+), 9 deletions(-) diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index 5cf1e7b20..6ecda777a 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -37,6 +37,8 @@ using namespace std; #ifdef SRS_AUTO_KAFKA +#define SRS_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS 300000 + SrsKafkaString::SrsKafkaString() { _size = -1; @@ -45,11 +47,10 @@ SrsKafkaString::SrsKafkaString() SrsKafkaString::SrsKafkaString(string v) { - _size = (int16_t)v.length(); + _size = -1; + data = NULL; - srs_assert(_size > 0); - data = new char[_size]; - memcpy(data, v.data(), _size); + set_value(v); } SrsKafkaString::~SrsKafkaString() @@ -76,6 +77,19 @@ string SrsKafkaString::to_str() return ret; } +void SrsKafkaString::set_value(string v) +{ + // free previous data. + srs_freep(data); + + // copy new value to data. + _size = (int16_t)v.length(); + + srs_assert(_size > 0); + data = new char[_size]; + memcpy(data, v.data(), _size); +} + int SrsKafkaString::nb_bytes() { return _size == -1? 2 : 2 + _size; @@ -149,11 +163,10 @@ SrsKafkaBytes::SrsKafkaBytes() SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v) { - _size = (int16_t)nb_v; + _size = -1; + data = NULL; - srs_assert(_size > 0); - data = new char[_size]; - memcpy(data, v, _size); + set_value(v, nb_v); } SrsKafkaBytes::~SrsKafkaBytes() @@ -171,6 +184,24 @@ bool SrsKafkaBytes::empty() return _size <= 0; } +void SrsKafkaBytes::set_value(string v) +{ + set_value(v.data(), (int)v.length()); +} + +void SrsKafkaBytes::set_value(const char* v, int nb_v) +{ + // free previous data. + srs_freep(data); + + // copy new value to data. + _size = (int16_t)nb_v; + + srs_assert(_size > 0); + data = new char[_size]; + memcpy(data, v, _size); +} + int SrsKafkaBytes::nb_bytes() { return 4 + (_size == -1? 0 : _size); @@ -479,6 +510,32 @@ SrsKafkaRawMessage::~SrsKafkaRawMessage() srs_freep(value); } +int SrsKafkaRawMessage::create(SrsJsonObject* obj) +{ + int ret = ERROR_SUCCESS; + + // current must be 0. + magic_byte = 0; + + // no compression codec. + attributes = 0; + + // dumps the json to string. + value->set_value(obj->dumps()); + + // TODO: FIXME: implements it. + crc = 0; + + message_size = raw_message_size(); + + return ret; +} + +int SrsKafkaRawMessage::raw_message_size() +{ + return 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes(); +} + int SrsKafkaRawMessage::nb_bytes() { return 8 + 4 + 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes(); @@ -1352,6 +1409,42 @@ int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse** int SrsKafkaClient::write_messages(std::string topic, int32_t partition, vector& msgs) { int ret = ERROR_SUCCESS; + + SrsKafkaProducerRequest* req = new SrsKafkaProducerRequest(); + + // 0 the server will not send any response. + req->required_acks = 0; + // timeout of producer message. + req->timeout = SRS_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS; + + // create the topic and partition to write message to. + SrsKafkaProducerTopicMessages* topics = new SrsKafkaProducerTopicMessages(); + SrsKafkaProducerPartitionMessages* partitions = new SrsKafkaProducerPartitionMessages(); + + topics->partitions.append(partitions); + req->topics.append(topics); + + topics->topic_name.set_value(topic); + partitions->partition = partition; + + // convert json objects to kafka raw messages. + vector::iterator it; + for (it = msgs.begin(); it != msgs.end(); ++it) { + SrsJsonObject* obj = *it; + SrsKafkaRawMessage* msg = new SrsKafkaRawMessage(); + + if ((ret = msg->create(obj)) != ERROR_SUCCESS) { + srs_freep(msg); + srs_freep(req); + srs_error("kafka write messages failed. ret=%d", ret); + return ret; + } + + partitions->messages.append(msg); + } + + partitions->message_set_size = partitions->messages.nb_bytes(); + // TODO: FIXME: implements it. return ret; } diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 6764a6e35..f08a5450c 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -79,6 +79,7 @@ public: virtual bool null(); virtual bool empty(); virtual std::string to_str(); + virtual void set_value(std::string v); // interface ISrsCodec public: virtual int nb_bytes(); @@ -103,6 +104,8 @@ public: public: virtual bool null(); virtual bool empty(); + virtual void set_value(std::string v); + virtual void set_value(const char* v, int nb_v); // interface ISrsCodec public: virtual int nb_bytes(); @@ -531,6 +534,16 @@ public: public: SrsKafkaRawMessage(); virtual ~SrsKafkaRawMessage(); +public: + /** + * create message from json object. + */ + virtual int create(SrsJsonObject* obj); +private: + /** + * get the raw message, bytes after the message_size. + */ + virtual int raw_message_size(); // interface ISrsCodec public: virtual int nb_bytes(); @@ -768,7 +781,7 @@ public: */ class SrsKafkaProducerRequest : public SrsKafkaRequest { -private: +public: /** * This field indicates how many acknowledgements the servers should receive * before responding to the request. If it is 0 the server will not send any