1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #913, KAFKA, RTSP support complex error.

This commit is contained in:
winlin 2018-01-01 10:30:31 +08:00
parent 70a20ffadb
commit 15aea686c3
6 changed files with 508 additions and 713 deletions

View file

@ -80,8 +80,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -109,8 +109,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -177,50 +177,44 @@ public:
return s;
}
virtual int encode(SrsBuffer* buf)
virtual srs_error_t encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if (!buf->require(4)) {
ret = ERROR_KAFKA_CODEC_ARRAY;
srs_error("kafka encode array failed. ret=%d", ret);
return ret;
return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left());
}
buf->write_4bytes(length);
for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
T* elem = *it;
if ((ret = elem->encode(buf)) != ERROR_SUCCESS) {
srs_error("kafka encode array elem failed. ret=%d", ret);
return ret;
if ((err = elem->encode(buf)) != srs_success) {
return srs_error_wrap(err, "encode elem");
}
}
return ret;
return err;
}
virtual int decode(SrsBuffer* buf)
virtual srs_error_t decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if (!buf->require(4)) {
ret = ERROR_KAFKA_CODEC_ARRAY;
srs_error("kafka decode array failed. ret=%d", ret);
return ret;
return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left());
}
length = buf->read_4bytes();
for (int i = 0; i < length; i++) {
T* elem = new T();
if ((ret = elem->decode(buf)) != ERROR_SUCCESS) {
srs_error("kafka decode array elem failed. ret=%d", ret);
if ((err = elem->decode(buf)) != srs_success) {
srs_freep(elem);
return ret;
return srs_error_wrap(err, "decode elem");
}
elems.push_back(elem);
}
return ret;
return err;
}
};
template<>
@ -263,14 +257,13 @@ public:
{
return 4 + 4 * (int)elems.size();
}
virtual int encode(SrsBuffer* buf)
virtual srs_error_t encode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if (!buf->require(4 + sizeof(int32_t) * (int)elems.size())) {
ret = ERROR_KAFKA_CODEC_ARRAY;
srs_error("kafka encode array failed. ret=%d", ret);
return ret;
int nb_required = 4 + sizeof(int32_t) * (int)elems.size();
if (!buf->require(nb_required)) {
return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires %d only %d bytes", nb_required, buf->left());
}
buf->write_4bytes(length);
@ -279,24 +272,20 @@ public:
buf->write_4bytes(elem);
}
return ret;
return err;
}
virtual int decode(SrsBuffer* buf)
virtual srs_error_t decode(SrsBuffer* buf)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if (!buf->require(4)) {
ret = ERROR_KAFKA_CODEC_ARRAY;
srs_error("kafka decode array failed. ret=%d", ret);
return ret;
return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left());
}
length = buf->read_4bytes();
for (int i = 0; i < length; i++) {
if (!buf->require(sizeof(int32_t))) {
ret = ERROR_KAFKA_CODEC_ARRAY;
srs_error("kafka decode array elem failed. ret=%d", ret);
return ret;
return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires %d only %d bytes", sizeof(int32_t), buf->left());
}
@ -304,7 +293,7 @@ public:
elems.push_back(elem);
}
return ret;
return err;
}
};
@ -414,8 +403,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -480,8 +469,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -538,7 +527,7 @@ public:
/**
* create message from json object.
*/
virtual int create(SrsJsonObject* obj);
virtual srs_error_t create(SrsJsonObject* obj);
private:
/**
* get the raw message, bytes after the message_size.
@ -547,8 +536,8 @@ private:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -568,8 +557,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -599,8 +588,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -622,8 +611,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -652,8 +641,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -672,8 +661,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
struct SrsKafkaPartitionMetadata : public ISrsCodec
{
@ -689,8 +678,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
struct SrsKafkaTopicMetadata : public ISrsCodec
{
@ -704,8 +693,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -727,8 +716,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
@ -754,8 +743,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
struct SrsKafkaProducerTopicMessages : public ISrsCodec
{
@ -771,8 +760,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -815,8 +804,8 @@ public:
// interface ISrsCodec
public:
virtual int nb_bytes();
virtual int encode(SrsBuffer* buf);
virtual int decode(SrsBuffer* buf);
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
@ -875,26 +864,25 @@ public:
* write the message to kafka server.
* @param msg the msg to send. user must not free it again.
*/
virtual int send_and_free_message(SrsKafkaRequest* msg);
virtual srs_error_t send_and_free_message(SrsKafkaRequest* msg);
/**
* read the message from kafka server.
* @param pmsg output the received message. user must free it.
*/
virtual int recv_message(SrsKafkaResponse** pmsg);
virtual srs_error_t recv_message(SrsKafkaResponse** pmsg);
public:
/**
* expect specified message.
*/
template<typename T>
int expect_message(T** pmsg)
srs_error_t expect_message(T** pmsg)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
while (true) {
SrsKafkaResponse* res = NULL;
if ((ret = recv_message(&res)) != ERROR_SUCCESS) {
srs_error("recv response failed. ret=%d", ret);
return ret;
if ((err = recv_message(&res)) != srs_success) {
return srs_error_wrap(err, "recv message");
}
// drop not matched.
@ -909,7 +897,7 @@ public:
break;
}
return ret;
return err;
}
};
@ -927,11 +915,11 @@ public:
/**
* fetch the metadata from broker for topic.
*/
virtual int fetch_metadata(std::string topic, SrsKafkaTopicMetadataResponse** pmsg);
virtual srs_error_t fetch_metadata(std::string topic, SrsKafkaTopicMetadataResponse** pmsg);
/**
* write the messages to partition of topic.
*/
virtual int write_messages(std::string topic, int32_t partition, std::vector<SrsJsonObject*>& msgs);
virtual srs_error_t write_messages(std::string topic, int32_t partition, std::vector<SrsJsonObject*>& msgs);
};
// convert kafka array[string] to vector[string]