1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

add producer api messages.

This commit is contained in:
winlin 2015-10-23 09:55:29 +08:00
parent 7a0aaf5900
commit 8e344f1c26
5 changed files with 403 additions and 11 deletions

View file

@ -33,6 +33,7 @@ using namespace std;
#include <srs_protocol_stream.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_protocol_json.hpp>
#ifdef SRS_AUTO_KAFKA
@ -478,6 +479,67 @@ SrsKafkaRawMessage::~SrsKafkaRawMessage()
srs_freep(value);
}
int SrsKafkaRawMessage::nb_bytes()
{
return 8 + 4 + 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes();
}
int SrsKafkaRawMessage::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if (!buf->require(8 + 4 + 4 + 1 + 1)) {
ret = ERROR_KAFKA_CODEC_MESSAGE;
srs_error("kafka encode message failed. ret=%d", ret);
return ret;
}
buf->write_8bytes(offset);
buf->write_4bytes(message_size);
buf->write_4bytes(crc);
buf->write_1bytes(magic_byte);
buf->write_1bytes(attributes);
if ((ret = key->encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode message key failed. ret=%d", ret);
return ret;
}
if ((ret = value->encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode message value failed. ret=%d", ret);
return ret;
}
return ret;
}
int SrsKafkaRawMessage::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if (!buf->require(8 + 4 + 4 + 1 + 1)) {
ret = ERROR_KAFKA_CODEC_MESSAGE;
srs_error("kafka decode message failed. ret=%d", ret);
return ret;
}
offset = buf->read_8bytes();
message_size = buf->read_4bytes();
crc = buf->read_4bytes();
magic_byte = buf->read_1bytes();
attributes = buf->read_1bytes();
if ((ret = key->decode(buf)) != ERROR_SUCCESS) {
srs_error("kafka decode message key failed. ret=%d", ret);
return ret;
}
if ((ret = value->decode(buf)) != ERROR_SUCCESS) {
srs_error("kafka decode message value failed. ret=%d", ret);
return ret;
}
return ret;
}
SrsKafkaRawMessageSet::SrsKafkaRawMessageSet()
{
}
@ -492,6 +554,59 @@ SrsKafkaRawMessageSet::~SrsKafkaRawMessageSet()
messages.clear();
}
void SrsKafkaRawMessageSet::append(SrsKafkaRawMessage* msg)
{
messages.push_back(msg);
}
int SrsKafkaRawMessageSet::nb_bytes()
{
int s = 0;
vector<SrsKafkaRawMessage*>::iterator it;
for (it = messages.begin(); it != messages.end(); ++it) {
SrsKafkaRawMessage* message = *it;
s += message->nb_bytes();
}
return s;
}
int SrsKafkaRawMessageSet::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
vector<SrsKafkaRawMessage*>::iterator it;
for (it = messages.begin(); it != messages.end(); ++it) {
SrsKafkaRawMessage* message = *it;
if ((ret = message->encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode message set failed. ret=%d", ret);
return ret;
}
}
return ret;
}
int SrsKafkaRawMessageSet::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
while (!buf->empty()) {
SrsKafkaRawMessage* message = new SrsKafkaRawMessage();
if ((ret = message->decode(buf)) != ERROR_SUCCESS) {
srs_freep(message);
srs_error("kafka decode message set elem failed. ret=%d", ret);
return ret;
}
messages.push_back(message);
}
return ret;
}
SrsKafkaRequest::SrsKafkaRequest()
{
header.set_correlation_id(SrsKafkaCorrelationPool::instance()->generate_correlation_id());
@ -863,6 +978,155 @@ int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf)
return ret;
}
int SrsKafkaProducerPartitionMessages::nb_bytes()
{
return 4 + 4 + messages.nb_bytes();
}
int SrsKafkaProducerPartitionMessages::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if (!buf->require(4 + 4)) {
ret = ERROR_KAFKA_CODEC_PRODUCER;
srs_error("kafka encode producer failed. ret=%d", ret);
return ret;
}
buf->write_4bytes(partition);
buf->write_4bytes(message_set_size);
if ((ret = messages.encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode producer messages failed. ret=%d", ret);
return ret;
}
return ret;
}
int SrsKafkaProducerPartitionMessages::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if (!buf->require(4 + 4)) {
ret = ERROR_KAFKA_CODEC_PRODUCER;
srs_error("kafka decode producer failed. ret=%d", ret);
return ret;
}
partition = buf->read_4bytes();
message_set_size = buf->read_4bytes();
// for the message set decode util empty, we must create a new buffer when
// there exists other objects after message set.
if (buf->size() - buf->pos() != message_set_size) {
SrsBuffer* tbuf = new SrsBuffer();
SrsAutoFree(SrsBuffer, tbuf);
if ((ret = tbuf->initialize(buf->data() + buf->pos(), message_set_size)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = messages.decode(tbuf)) != ERROR_SUCCESS) {
srs_error("kafka decode procuder messages failed. ret=%d", ret);
return ret;
}
} else {
if ((ret = messages.decode(buf)) != ERROR_SUCCESS) {
srs_error("kafka decode procuder messages failed. ret=%d", ret);
return ret;
}
}
return ret;
}
int SrsKafkaProducerTopicMessages::nb_bytes()
{
return topic_name.nb_bytes() + partitions.nb_bytes();
}
int SrsKafkaProducerTopicMessages::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if ((ret = topic_name.encode(buf)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = partitions.encode(buf)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsKafkaProducerTopicMessages::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if ((ret = topic_name.decode(buf)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = partitions.decode(buf)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
SrsKafkaProducerRequest::SrsKafkaProducerRequest()
{
required_acks = 0;
timeout = 0;
}
SrsKafkaProducerRequest::~SrsKafkaProducerRequest()
{
}
int SrsKafkaProducerRequest::nb_bytes()
{
return 2 + 4 + topics.nb_bytes();
}
int SrsKafkaProducerRequest::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if (!buf->require(2 + 4)) {
ret = ERROR_KAFKA_CODEC_PRODUCER;
srs_error("kafka encode producer failed. ret=%d", ret);
return ret;
}
buf->write_2bytes(required_acks);
buf->write_4bytes(timeout);
if ((ret = topics.encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode producer topics failed. ret=%d", ret);
}
return ret;
}
int SrsKafkaProducerRequest::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if (!buf->require(2 + 4)) {
ret = ERROR_KAFKA_CODEC_PRODUCER;
srs_error("kafka decode producer failed. ret=%d", ret);
return ret;
}
required_acks = buf->read_2bytes();
timeout = buf->read_4bytes();
if ((ret = topics.decode(buf)) != ERROR_SUCCESS) {
srs_error("kafka decode producer topics failed. ret=%d", ret);
}
return ret;
}
SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::_instance = new SrsKafkaCorrelationPool();
SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::instance()
@ -1085,6 +1349,13 @@ int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse**
return ret;
}
int SrsKafkaClient::write_messages(std::string topic, int32_t partition, vector<SrsJsonObject*>& msgs)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
}
vector<string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr)
{
vector<string> strs;

View file

@ -39,6 +39,7 @@
class SrsFastStream;
class ISrsProtocolReaderWriter;
class SrsJsonObject;
#ifdef SRS_AUTO_KAFKA
@ -484,7 +485,7 @@ public:
* the kafka message in message set.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
*/
struct SrsKafkaRawMessage
struct SrsKafkaRawMessage : public ISrsCodec
{
// metadata.
public:
@ -530,19 +531,32 @@ public:
public:
SrsKafkaRawMessage();
virtual ~SrsKafkaRawMessage();
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
};
/**
* a set of kafka message.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
* @remark because the message set are not preceded by int32, so we decode the buffer util empty.
*/
class SrsKafkaRawMessageSet
class SrsKafkaRawMessageSet : public ISrsCodec
{
private:
std::vector<SrsKafkaRawMessage*> messages;
public:
SrsKafkaRawMessageSet();
virtual ~SrsKafkaRawMessageSet();
public:
virtual void append(SrsKafkaRawMessage* msg);
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
};
/**
@ -704,6 +718,94 @@ public:
virtual int decode(SrsBuffer* buf);
};
/**
* the messages for producer to send.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest
*/
struct SrsKafkaProducerPartitionMessages : public ISrsCodec
{
public:
/**
* The partition that data is being published to.
*/
int32_t partition;
/**
* The size, in bytes, of the message set that follows.
*/
int32_t message_set_size;
/**
* messages in set.
*/
SrsKafkaRawMessageSet messages;
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
};
struct SrsKafkaProducerTopicMessages : public ISrsCodec
{
public:
/**
* The topic that data is being published to.
*/
SrsKafkaString topic_name;
/**
* messages of partitions.
*/
SrsKafkaArray<SrsKafkaProducerPartitionMessages> partitions;
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
};
/**
* the request for producer to send message.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest
*/
class SrsKafkaProducerRequest : public SrsKafkaRequest
{
private:
/**
* This field indicates how many acknowledgements the servers should receive
* before responding to the request. If it is 0 the server will not send any
* response (this is the only case where the server will not reply to a request).
* If it is 1, the server will wait the data is written to the local log
* before sending a response. If it is -1 the server will block until the
* message is committed by all in sync replicas before sending a response.
* For any number > 1 the server will block waiting for this number of
* acknowledgements to occur (but the server will never wait for more
* acknowledgements than there are in-sync replicas).
*/
int16_t required_acks;
/**
* This provides a maximum time in milliseconds the server can await the receipt
* of the number of acknowledgements in RequiredAcks. The timeout is not an exact
* limit on the request time for a few reasons: (1) it does not include network
* latency, (2) the timer begins at the beginning of the processing of this request
* so if many requests are queued due to server overload that wait time will not
* be included, (3) we will not terminate a local write so if the local write
* time exceeds this timeout it will not be respected. To get a hard timeout of
* this type the client should use the socket timeout.
*/
int32_t timeout;
/**
* messages of topics.
*/
SrsKafkaArray<SrsKafkaProducerTopicMessages> topics;
public:
SrsKafkaProducerRequest();
virtual ~SrsKafkaProducerRequest();
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
};
/**
* the poll to discovery reponse.
* @param CorrelationId This is a user-supplied integer. It will be passed back
@ -813,6 +915,10 @@ public:
* fetch the metadata from broker for topic.
*/
virtual int fetch_metadata(std::string topic, SrsKafkaTopicMetadataResponse** pmsg);
/**
* write the messages to partition of topic.
*/
virtual int write_messages(std::string topic, int32_t partition, std::vector<SrsJsonObject*>& msgs);
};
// convert kafka array[string] to vector[string]