1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #913, APP support complex error.

This commit is contained in:
winlin 2018-01-01 21:20:57 +08:00
parent 6eae93258a
commit e2c1f58674
13 changed files with 423 additions and 536 deletions

View file

@ -152,32 +152,27 @@ string SrsKafkaPartition::hostport()
return ep;
}
int SrsKafkaPartition::connect()
srs_error_t SrsKafkaPartition::connect()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if (transport) {
return ret;
return err;
}
transport = new SrsTcpClient(host, port, SRS_KAFKA_PRODUCER_TIMEOUT);
kafka = new SrsKafkaClient(transport);
if ((err = transport->connect()) != srs_success) {
disconnect();
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
srs_error("connect to %s partition=%d failed. ret=%d", hostport().c_str(), id, ret);
return ret;
return srs_error_wrap(err, "connect to %s partition=%d failed", hostport().c_str(), id);
}
srs_trace("connect at %s, partition=%d, broker=%d", hostport().c_str(), id, broker);
return ret;
return err;
}
int SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc)
srs_error_t SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc)
{
return kafka->write_messages(topic, id, *pc);
}
@ -202,12 +197,12 @@ SrsKafkaMessage::~SrsKafkaMessage()
srs_error_t SrsKafkaMessage::call()
{
int ret = producer->send(key, obj);
srs_error_t err = producer->send(key, obj);
// the obj is manged by producer now.
obj = NULL;
return srs_error_new(ret, "kafka send");
return srs_error_wrap(err, "kafka send");
}
string SrsKafkaMessage::to_string()
@ -281,9 +276,9 @@ bool SrsKafkaCache::fetch(int* pkey, SrsKafkaPartitionCache** ppc)
return false;
}
int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc)
srs_error_t SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// ensure the key exists.
srs_assert (cache.find(key) != cache.end());
@ -292,19 +287,17 @@ int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitio
// we remember the messages we have written and clear it when completed.
int nb_msgs = (int)pc->size();
if (pc->empty()) {
return ret;
return err;
}
// connect transport.
if ((ret = partition->connect()) != ERROR_SUCCESS) {
srs_error("kafka connect to partition failed. ret=%d", ret);
return ret;
if ((err = partition->connect()) != srs_success) {
return srs_error_wrap(err, "connect partition");
}
// write the json objects.
if ((ret = partition->flush(pc)) != ERROR_SUCCESS) {
srs_error("kafka write messages failed. ret=%d", ret);
return ret;
if ((err = partition->flush(pc)) != srs_success) {
return srs_error_wrap(err, "flush partition");
}
// free all wrote messages.
@ -320,7 +313,7 @@ int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitio
pc->erase(pc->begin(), pc->begin() + nb_msgs);
}
return ret;
return err;
}
ISrsKafkaCluster::ISrsKafkaCluster()
@ -432,16 +425,16 @@ void SrsKafkaProducer::stop()
worker->stop();
}
int SrsKafkaProducer::send(int key, SrsJsonObject* obj)
srs_error_t SrsKafkaProducer::send(int key, SrsJsonObject* obj)
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// cache the json object.
cache->append(key, obj);
// too few messages, ignore.
if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) {
return ret;
return err;
}
// too many messages, warn user.
@ -454,10 +447,10 @@ int SrsKafkaProducer::send(int key, SrsJsonObject* obj)
// flush message when metadata is ok.
if (metadata_ok) {
ret = flush();
err = flush();
}
return ret;
return err;
}
srs_error_t SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip)
@ -537,7 +530,6 @@ void SrsKafkaProducer::clear_metadata()
srs_error_t SrsKafkaProducer::do_cycle()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// ignore when disabled.
@ -546,28 +538,27 @@ srs_error_t SrsKafkaProducer::do_cycle()
}
// when kafka enabled, request metadata when startup.
if ((ret = request_metadata()) != ERROR_SUCCESS) {
return srs_error_new(ret, "request metadata");
if ((err = request_metadata()) != srs_success) {
return srs_error_wrap(err, "request metadata");
}
return err;
}
int SrsKafkaProducer::request_metadata()
srs_error_t SrsKafkaProducer::request_metadata()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// ignore when disabled.
if (!enabled) {
return ret;
return err;
}
// select one broker to connect to.
SrsConfDirective* brokers = _srs_config->get_kafka_brokers();
if (!brokers) {
srs_warn("ignore for empty brokers.");
return ret;
return err;
}
std::string server;
@ -594,18 +585,13 @@ int SrsKafkaProducer::request_metadata()
// reconnect to kafka server.
if ((err = transport->connect()) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret);
return ret;
return srs_error_wrap(err, "connect %s:%d failed", server.c_str(), port);
}
// do fetch medata from broker.
SrsKafkaTopicMetadataResponse* metadata = NULL;
if ((ret = kafka->fetch_metadata(topic, &metadata)) != ERROR_SUCCESS) {
srs_error("kafka fetch metadata failed. ret=%d", ret);
return ret;
if ((err = kafka->fetch_metadata(topic, &metadata)) != srs_success) {
return srs_error_wrap(err, "fetch metadata");
}
SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata);
@ -615,7 +601,7 @@ int SrsKafkaProducer::request_metadata()
SrsKafkaTopicMetadata* topic = metadata->metadatas.at(0);
if (topic->metadatas.empty()) {
srs_warn("topic %s metadata empty, retry.", topic->name.to_str().c_str());
return ret;
return err;
}
}
@ -632,7 +618,7 @@ int SrsKafkaProducer::request_metadata()
metadata_ok = true;
return ret;
return err;
}
void SrsKafkaProducer::refresh_metadata()
@ -644,9 +630,9 @@ void SrsKafkaProducer::refresh_metadata()
srs_trace("kafka async refresh metadata in background");
}
int SrsKafkaProducer::flush()
srs_error_t SrsKafkaProducer::flush()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// flush all available partition caches.
while (true) {
@ -661,13 +647,12 @@ int SrsKafkaProducer::flush()
// flush specified partition.
srs_assert(key >= 0 && pc);
SrsKafkaPartition* partition = partitions.at(key % partitions.size());
if ((ret = cache->flush(partition, key, pc)) != ERROR_SUCCESS) {
srs_error("flush partition failed. ret=%d", ret);
return ret;
if ((err = cache->flush(partition, key, pc)) != srs_success) {
return srs_error_wrap(err, "flush partition");
}
}
return ret;
return err;
}
#endif