diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 01e99fab8..0f75217f0 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -102,7 +102,7 @@ std::string srs_kafka_summary_partitions(const vector& parti return srs_join_vector_string(ret, ", "); } -void srs_kafka_metadata2connector(SrsKafkaTopicMetadataResponse* metadata, vector& partitions) +void srs_kafka_metadata2connector(string topic_name, SrsKafkaTopicMetadataResponse* metadata, vector& partitions) { for (int i = 0; i < metadata->metadatas.size(); i++) { SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i); @@ -111,6 +111,8 @@ void srs_kafka_metadata2connector(SrsKafkaTopicMetadataResponse* metadata, vecto SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j); SrsKafkaPartition* p = new SrsKafkaPartition(); + + p->topic = topic_name; p->id = partition->partition_id; p->broker = partition->leader; @@ -171,6 +173,11 @@ int SrsKafkaPartition::connect() return ret; } +int SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc) +{ + return kafka->write_messages(topic, id, *pc); +} + SrsKafkaMessage::SrsKafkaMessage(int k) { key = k; @@ -290,11 +297,15 @@ int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitio // connect transport. if ((ret = partition->connect()) != ERROR_SUCCESS) { - srs_error("connect to partition failed. ret=%d", ret); + srs_error("kafka connect to partition failed. ret=%d", ret); return ret; } - // TODO: FIXME: implements it. + // write the json objects. + if ((ret = partition->flush(pc)) != ERROR_SUCCESS) { + srs_error("kafka write messages failed. ret=%d", ret); + return ret; + } // free all wrote messages. for (vector::iterator it = pc->begin(); it != pc->end(); ++it) { @@ -546,7 +557,7 @@ int SrsKafkaProducer::request_metadata() srs_trace("kafka metadata: %s", summary.c_str()); // generate the partition info. - srs_kafka_metadata2connector(metadata, partitions); + srs_kafka_metadata2connector(topic, metadata, partitions); srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str()); // update the total partition for cache. diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index 7f777e812..457470f0e 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -45,6 +45,11 @@ class SrsKafkaProducer; #ifdef SRS_AUTO_KAFKA +/** + * the partition messages cache. + */ +typedef std::vector SrsKafkaPartitionCache; + /** * the kafka partition info. */ @@ -56,6 +61,7 @@ private: SrsKafkaClient* kafka; public: int id; + std::string topic; // leader. int broker; std::string host; @@ -66,6 +72,7 @@ public: public: virtual std::string hostport(); virtual int connect(); + virtual int flush(SrsKafkaPartitionCache* pc); }; /** @@ -94,11 +101,6 @@ public: virtual std::string to_string(); }; -/** - * the partition messages cache. - */ -typedef std::vector SrsKafkaPartitionCache; - /** * a message cache for kafka. */ diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index eb5149b06..4435450f9 100755 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -274,6 +274,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_KAFKA_CODEC_RESPONSE 4033 #define ERROR_KAFKA_CODEC_ARRAY 4034 #define ERROR_KAFKA_CODEC_METADATA 4035 +#define ERROR_KAFKA_CODEC_MESSAGE 4036 +#define ERROR_KAFKA_CODEC_PRODUCER 4037 /////////////////////////////////////////////////////// // HTTP API error. diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index c0b0eaa16..5cf1e7b20 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -33,6 +33,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_AUTO_KAFKA @@ -478,6 +479,67 @@ SrsKafkaRawMessage::~SrsKafkaRawMessage() srs_freep(value); } +int SrsKafkaRawMessage::nb_bytes() +{ + return 8 + 4 + 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes(); +} + +int SrsKafkaRawMessage::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(8 + 4 + 4 + 1 + 1)) { + ret = ERROR_KAFKA_CODEC_MESSAGE; + srs_error("kafka encode message failed. ret=%d", ret); + return ret; + } + buf->write_8bytes(offset); + buf->write_4bytes(message_size); + buf->write_4bytes(crc); + buf->write_1bytes(magic_byte); + buf->write_1bytes(attributes); + + if ((ret = key->encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode message key failed. ret=%d", ret); + return ret; + } + + if ((ret = value->encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode message value failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsKafkaRawMessage::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(8 + 4 + 4 + 1 + 1)) { + ret = ERROR_KAFKA_CODEC_MESSAGE; + srs_error("kafka decode message failed. ret=%d", ret); + return ret; + } + offset = buf->read_8bytes(); + message_size = buf->read_4bytes(); + crc = buf->read_4bytes(); + magic_byte = buf->read_1bytes(); + attributes = buf->read_1bytes(); + + if ((ret = key->decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode message key failed. ret=%d", ret); + return ret; + } + + if ((ret = value->decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode message value failed. ret=%d", ret); + return ret; + } + + return ret; +} + SrsKafkaRawMessageSet::SrsKafkaRawMessageSet() { } @@ -492,6 +554,59 @@ SrsKafkaRawMessageSet::~SrsKafkaRawMessageSet() messages.clear(); } +void SrsKafkaRawMessageSet::append(SrsKafkaRawMessage* msg) +{ + messages.push_back(msg); +} + +int SrsKafkaRawMessageSet::nb_bytes() +{ + int s = 0; + + vector::iterator it; + for (it = messages.begin(); it != messages.end(); ++it) { + SrsKafkaRawMessage* message = *it; + s += message->nb_bytes(); + } + + return s; +} + +int SrsKafkaRawMessageSet::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + vector::iterator it; + for (it = messages.begin(); it != messages.end(); ++it) { + SrsKafkaRawMessage* message = *it; + if ((ret = message->encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode message set failed. ret=%d", ret); + return ret; + } + } + + return ret; +} + +int SrsKafkaRawMessageSet::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + while (!buf->empty()) { + SrsKafkaRawMessage* message = new SrsKafkaRawMessage(); + + if ((ret = message->decode(buf)) != ERROR_SUCCESS) { + srs_freep(message); + srs_error("kafka decode message set elem failed. ret=%d", ret); + return ret; + } + + messages.push_back(message); + } + + return ret; +} + SrsKafkaRequest::SrsKafkaRequest() { header.set_correlation_id(SrsKafkaCorrelationPool::instance()->generate_correlation_id()); @@ -863,6 +978,155 @@ int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf) return ret; } +int SrsKafkaProducerPartitionMessages::nb_bytes() +{ + return 4 + 4 + messages.nb_bytes(); +} + +int SrsKafkaProducerPartitionMessages::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(4 + 4)) { + ret = ERROR_KAFKA_CODEC_PRODUCER; + srs_error("kafka encode producer failed. ret=%d", ret); + return ret; + } + buf->write_4bytes(partition); + buf->write_4bytes(message_set_size); + + if ((ret = messages.encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode producer messages failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsKafkaProducerPartitionMessages::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(4 + 4)) { + ret = ERROR_KAFKA_CODEC_PRODUCER; + srs_error("kafka decode producer failed. ret=%d", ret); + return ret; + } + partition = buf->read_4bytes(); + message_set_size = buf->read_4bytes(); + + // for the message set decode util empty, we must create a new buffer when + // there exists other objects after message set. + if (buf->size() - buf->pos() != message_set_size) { + SrsBuffer* tbuf = new SrsBuffer(); + SrsAutoFree(SrsBuffer, tbuf); + + if ((ret = tbuf->initialize(buf->data() + buf->pos(), message_set_size)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = messages.decode(tbuf)) != ERROR_SUCCESS) { + srs_error("kafka decode procuder messages failed. ret=%d", ret); + return ret; + } + } else { + if ((ret = messages.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode procuder messages failed. ret=%d", ret); + return ret; + } + } + + return ret; +} + +int SrsKafkaProducerTopicMessages::nb_bytes() +{ + return topic_name.nb_bytes() + partitions.nb_bytes(); +} + +int SrsKafkaProducerTopicMessages::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if ((ret = topic_name.encode(buf)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = partitions.encode(buf)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsKafkaProducerTopicMessages::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if ((ret = topic_name.decode(buf)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = partitions.decode(buf)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +SrsKafkaProducerRequest::SrsKafkaProducerRequest() +{ + required_acks = 0; + timeout = 0; +} + +SrsKafkaProducerRequest::~SrsKafkaProducerRequest() +{ +} + +int SrsKafkaProducerRequest::nb_bytes() +{ + return 2 + 4 + topics.nb_bytes(); +} + +int SrsKafkaProducerRequest::encode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(2 + 4)) { + ret = ERROR_KAFKA_CODEC_PRODUCER; + srs_error("kafka encode producer failed. ret=%d", ret); + return ret; + } + buf->write_2bytes(required_acks); + buf->write_4bytes(timeout); + + if ((ret = topics.encode(buf)) != ERROR_SUCCESS) { + srs_error("kafka encode producer topics failed. ret=%d", ret); + } + + return ret; +} + +int SrsKafkaProducerRequest::decode(SrsBuffer* buf) +{ + int ret = ERROR_SUCCESS; + + if (!buf->require(2 + 4)) { + ret = ERROR_KAFKA_CODEC_PRODUCER; + srs_error("kafka decode producer failed. ret=%d", ret); + return ret; + } + required_acks = buf->read_2bytes(); + timeout = buf->read_4bytes(); + + if ((ret = topics.decode(buf)) != ERROR_SUCCESS) { + srs_error("kafka decode producer topics failed. ret=%d", ret); + } + + return ret; +} + SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::_instance = new SrsKafkaCorrelationPool(); SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::instance() @@ -1085,6 +1349,13 @@ int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse** return ret; } +int SrsKafkaClient::write_messages(std::string topic, int32_t partition, vector& msgs) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + vector srs_kafka_array2vector(SrsKafkaArray* arr) { vector strs; diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 4cb5c676e..6764a6e35 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -39,6 +39,7 @@ class SrsFastStream; class ISrsProtocolReaderWriter; +class SrsJsonObject; #ifdef SRS_AUTO_KAFKA @@ -484,7 +485,7 @@ public: * the kafka message in message set. * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets */ -struct SrsKafkaRawMessage +struct SrsKafkaRawMessage : public ISrsCodec { // metadata. public: @@ -530,19 +531,32 @@ public: public: SrsKafkaRawMessage(); virtual ~SrsKafkaRawMessage(); +// interface ISrsCodec +public: + virtual int nb_bytes(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); }; /** * a set of kafka message. * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + * @remark because the message set are not preceded by int32, so we decode the buffer util empty. */ -class SrsKafkaRawMessageSet +class SrsKafkaRawMessageSet : public ISrsCodec { private: std::vector messages; public: SrsKafkaRawMessageSet(); virtual ~SrsKafkaRawMessageSet(); +public: + virtual void append(SrsKafkaRawMessage* msg); +// interface ISrsCodec +public: + virtual int nb_bytes(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); }; /** @@ -704,6 +718,94 @@ public: virtual int decode(SrsBuffer* buf); }; + +/** + * the messages for producer to send. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest + */ +struct SrsKafkaProducerPartitionMessages : public ISrsCodec +{ +public: + /** + * The partition that data is being published to. + */ + int32_t partition; + /** + * The size, in bytes, of the message set that follows. + */ + int32_t message_set_size; + /** + * messages in set. + */ + SrsKafkaRawMessageSet messages; +// interface ISrsCodec +public: + virtual int nb_bytes(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); +}; +struct SrsKafkaProducerTopicMessages : public ISrsCodec +{ +public: + /** + * The topic that data is being published to. + */ + SrsKafkaString topic_name; + /** + * messages of partitions. + */ + SrsKafkaArray partitions; +// interface ISrsCodec +public: + virtual int nb_bytes(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); +}; + +/** + * the request for producer to send message. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest + */ +class SrsKafkaProducerRequest : public SrsKafkaRequest +{ +private: + /** + * This field indicates how many acknowledgements the servers should receive + * before responding to the request. If it is 0 the server will not send any + * response (this is the only case where the server will not reply to a request). + * If it is 1, the server will wait the data is written to the local log + * before sending a response. If it is -1 the server will block until the + * message is committed by all in sync replicas before sending a response. + * For any number > 1 the server will block waiting for this number of + * acknowledgements to occur (but the server will never wait for more + * acknowledgements than there are in-sync replicas). + */ + int16_t required_acks; + /** + * This provides a maximum time in milliseconds the server can await the receipt + * of the number of acknowledgements in RequiredAcks. The timeout is not an exact + * limit on the request time for a few reasons: (1) it does not include network + * latency, (2) the timer begins at the beginning of the processing of this request + * so if many requests are queued due to server overload that wait time will not + * be included, (3) we will not terminate a local write so if the local write + * time exceeds this timeout it will not be respected. To get a hard timeout of + * this type the client should use the socket timeout. + */ + int32_t timeout; + /** + * messages of topics. + */ + SrsKafkaArray topics; +public: + SrsKafkaProducerRequest(); + virtual ~SrsKafkaProducerRequest(); +// interface ISrsCodec +public: + virtual int nb_bytes(); + virtual int encode(SrsBuffer* buf); + virtual int decode(SrsBuffer* buf); +}; + /** * the poll to discovery reponse. * @param CorrelationId This is a user-supplied integer. It will be passed back @@ -813,6 +915,10 @@ public: * fetch the metadata from broker for topic. */ virtual int fetch_metadata(std::string topic, SrsKafkaTopicMetadataResponse** pmsg); + /** + * write the messages to partition of topic. + */ + virtual int write_messages(std::string topic, int32_t partition, std::vector& msgs); }; // convert kafka array[string] to vector[string]