1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

kafka encode and send packet.

This commit is contained in:
winlin 2015-10-16 17:18:16 +08:00
parent 9117e1e918
commit 3c64e4b957
3 changed files with 335 additions and 39 deletions

View file

@ -29,22 +29,23 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_log.hpp>
#include <srs_protocol_io.hpp>
#ifdef SRS_AUTO_KAFKA
SrsKafkaString::SrsKafkaString()
{
size = -1;
_size = -1;
data = NULL;
}
SrsKafkaString::SrsKafkaString(string v)
{
size = (int16_t)v.length();
_size = (int16_t)v.length();
srs_assert(size > 0);
data = new char[size];
memcpy(data, v.data(), size);
srs_assert(_size > 0);
data = new char[_size];
memcpy(data, v.data(), _size);
}
SrsKafkaString::~SrsKafkaString()
@ -54,32 +55,92 @@ SrsKafkaString::~SrsKafkaString()
bool SrsKafkaString::null()
{
return size == -1;
return _size == -1;
}
bool SrsKafkaString::empty()
{
return size <= 0;
return _size <= 0;
}
int SrsKafkaString::total_size()
int SrsKafkaString::size()
{
return 2 + (size == -1? 0 : size);
return _size == -1? 2 : 2 + _size;
}
int SrsKafkaString::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if (!buf->require(2)) {
ret = ERROR_KAFKA_CODEC_STRING;
srs_error("kafka encode string failed. ret=%d", ret);
return ret;
}
buf->write_2bytes(_size);
if (_size <= 0) {
return ret;
}
if (!buf->require(_size)) {
ret = ERROR_KAFKA_CODEC_STRING;
srs_error("kafka encode string data failed. ret=%d", ret);
return ret;
}
buf->write_bytes(data, _size);
return ret;
}
int SrsKafkaString::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if (!buf->require(2)) {
ret = ERROR_KAFKA_CODEC_STRING;
srs_error("kafka decode string failed. ret=%d", ret);
return ret;
}
_size = buf->read_2bytes();
if (_size != -1 && _size < 0) {
ret = ERROR_KAFKA_CODEC_STRING;
srs_error("kafka string must be -1 or >=0, actual is %d. ret=%d", _size, ret);
return ret;
}
if (_size <= 0) {
return ret;
}
if (!buf->require(_size)) {
ret = ERROR_KAFKA_CODEC_STRING;
srs_error("kafka decode string data failed. ret=%d", ret);
return ret;
}
srs_freep(data);
data = new char[_size];
buf->read_bytes(data, _size);
return ret;
}
SrsKafkaBytes::SrsKafkaBytes()
{
size = -1;
_size = -1;
data = NULL;
}
SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v)
{
size = (int16_t)nb_v;
_size = (int16_t)nb_v;
srs_assert(size > 0);
data = new char[size];
memcpy(data, v, size);
srs_assert(_size > 0);
data = new char[_size];
memcpy(data, v, _size);
}
SrsKafkaBytes::~SrsKafkaBytes()
@ -89,17 +150,72 @@ SrsKafkaBytes::~SrsKafkaBytes()
bool SrsKafkaBytes::null()
{
return size == -1;
return _size == -1;
}
bool SrsKafkaBytes::empty()
{
return size <= 0;
return _size <= 0;
}
int SrsKafkaBytes::total_size()
int SrsKafkaBytes::size()
{
return 4 + (size == -1? 0 : size);
return 4 + (_size == -1? 0 : _size);
}
int SrsKafkaBytes::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if (!buf->require(4)) {
ret = ERROR_KAFKA_CODEC_BYTES;
srs_error("kafka encode bytes failed. ret=%d", ret);
return ret;
}
buf->write_4bytes(_size);
if (_size <= 0) {
return ret;
}
if (!buf->require(_size)) {
ret = ERROR_KAFKA_CODEC_BYTES;
srs_error("kafka encode bytes data failed. ret=%d", ret);
return ret;
}
buf->write_bytes(data, _size);
return ret;
}
int SrsKafkaBytes::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if (!buf->require(4)) {
ret = ERROR_KAFKA_CODEC_BYTES;
srs_error("kafka decode bytes failed. ret=%d", ret);
return ret;
}
_size = buf->read_4bytes();
if (_size != -1 && _size < 0) {
ret = ERROR_KAFKA_CODEC_BYTES;
srs_error("kafka bytes must be -1 or >=0, actual is %d. ret=%d", _size, ret);
return ret;
}
if (!buf->require(_size)) {
ret = ERROR_KAFKA_CODEC_BYTES;
srs_error("kafka decode bytes data failed. ret=%d", ret);
return ret;
}
srs_freep(data);
data = new char[_size];
buf->read_bytes(data, _size);
return ret;
}
SrsKafkaRequestHeader::SrsKafkaRequestHeader()
@ -117,7 +233,7 @@ SrsKafkaRequestHeader::~SrsKafkaRequestHeader()
int SrsKafkaRequestHeader::header_size()
{
return 2 + 2 + 4 + client_id->total_size();
return 2 + 2 + 4 + client_id->size();
}
int SrsKafkaRequestHeader::message_size()
@ -178,14 +294,56 @@ int SrsKafkaRequestHeader::size()
int SrsKafkaRequestHeader::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
if (!buf->require(4 + _size)) {
ret = ERROR_KAFKA_CODEC_REQUEST;
srs_error("kafka encode request failed. ret=%d", ret);
return ret;
}
buf->write_4bytes(_size);
buf->write_2bytes(api_key);
buf->write_2bytes(api_version);
buf->write_4bytes(correlation_id);
if ((ret = client_id->encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode request client_id failed. ret=%d", ret);
return ret;
}
return ret;
}
int SrsKafkaRequestHeader::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
if (!buf->require(4)) {
ret = ERROR_KAFKA_CODEC_REQUEST;
srs_error("kafka decode request size failed. ret=%d", ret);
return ret;
}
_size = buf->read_4bytes();
if (_size <= 0) {
srs_warn("kafka got empty request");
return ret;
}
if (!buf->require(_size)) {
ret = ERROR_KAFKA_CODEC_REQUEST;
srs_error("kafka decode request message failed. ret=%d", ret);
return ret;
}
api_key = buf->read_2bytes();
api_version = buf->read_2bytes();
correlation_id = buf->read_4bytes();
if ((ret = client_id->decode(buf)) != ERROR_SUCCESS) {
srs_error("kafka decode request client_id failed. ret=%d", ret);
return ret;
}
return ret;
}
@ -222,14 +380,42 @@ int SrsKafkaResponseHeader::size()
int SrsKafkaResponseHeader::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
if (!buf->require(4 + _size)) {
ret = ERROR_KAFKA_CODEC_RESPONSE;
srs_error("kafka encode response failed. ret=%d", ret);
return ret;
}
buf->write_4bytes(_size);
buf->write_4bytes(correlation_id);
return ret;
}
int SrsKafkaResponseHeader::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
if (!buf->require(4)) {
ret = ERROR_KAFKA_CODEC_RESPONSE;
srs_error("kafka decode response size failed. ret=%d", ret);
return ret;
}
_size = buf->read_4bytes();
if (_size <= 0) {
srs_warn("kafka got empty response");
return ret;
}
if (!buf->require(_size)) {
ret = ERROR_KAFKA_CODEC_RESPONSE;
srs_error("kafka decode response message failed. ret=%d", ret);
return ret;
}
correlation_id = buf->read_4bytes();
return ret;
}
@ -326,22 +512,40 @@ void SrsKafkaTopicMetadataRequest::add_topic(string topic)
int SrsKafkaTopicMetadataRequest::size()
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
return SrsKafkaRequest::size() + topics.size();
}
int SrsKafkaTopicMetadataRequest::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
if ((ret = SrsKafkaRequest::encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode metadata request failed. ret=%d", ret);
return ret;
}
if ((ret = topics.encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode metadata topics failed. ret=%d", ret);
return ret;
}
return ret;
}
int SrsKafkaTopicMetadataRequest::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
if ((ret = SrsKafkaRequest::decode(buf)) != ERROR_SUCCESS) {
srs_error("kafka decode metadata request failed. ret=%d", ret);
return ret;
}
if ((ret = topics.decode(buf)) != ERROR_SUCCESS) {
srs_error("kafka decode metadata topics failed. ret=%d", ret);
return ret;
}
return ret;
}
@ -355,14 +559,19 @@ SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse()
int SrsKafkaTopicMetadataResponse::size()
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
return ret;
return SrsKafkaResponse::size();
}
int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if ((ret = SrsKafkaResponse::encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode metadata response failed. ret=%d", ret);
return ret;
}
// TODO: FIXME: implements it.
return ret;
}
@ -370,6 +579,12 @@ int SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf)
int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
if ((ret = SrsKafkaResponse::decode(buf)) != ERROR_SUCCESS) {
srs_error("kafka decode metadata response failed. ret=%d", ret);
return ret;
}
// TODO: FIXME: implements it.
return ret;
}
@ -387,7 +602,36 @@ int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: implements it.
// TODO: FIXME: refine for performance issue.
SrsAutoFree(SrsKafkaRequest, msg);
int size = msg->size();
if (size <= 0) {
return ret;
}
// TODO: FIXME: refine for performance issue.
char* bytes = new char[size];
SrsAutoFree(char, bytes);
// TODO: FIXME: refine for performance issue.
SrsBuffer* buf = new SrsBuffer();
SrsAutoFree(SrsBuffer, buf);
if ((ret = buf->initialize(bytes, size)) != ERROR_SUCCESS) {
srs_error("kafka create buffer failed. ret=%d", ret);
return ret;
}
if ((ret = msg->encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode message failed. ret=%d", ret);
return ret;
}
if ((ret = skt->write(bytes, size, NULL)) != ERROR_SUCCESS) {
srs_error("kafka send message failed. ret=%d", ret);
return ret;
}
return ret;
}