1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #913, coroutine support complex error.

This commit is contained in:
winlin 2017-06-11 18:44:20 +08:00
parent 9ae54850bf
commit 9db2a04c3b
38 changed files with 620 additions and 414 deletions

View file

@ -332,7 +332,6 @@ ISrsKafkaCluster* _srs_kafka = NULL;
srs_error_t srs_initialize_kafka()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
SrsKafkaProducer* kafka = new SrsKafkaProducer();
@ -342,8 +341,8 @@ srs_error_t srs_initialize_kafka()
return srs_error_wrap(err, "initialize kafka producer");
}
if ((ret = kafka->start()) != ERROR_SUCCESS) {
return srs_error_new(ret, "start kafka producer");
if ((err = kafka->start()) != srs_success) {
return srs_error_wrap(err, "start kafka producer");
}
return err;
@ -396,28 +395,27 @@ srs_error_t SrsKafkaProducer::initialize()
return srs_success;
}
int SrsKafkaProducer::start()
srs_error_t SrsKafkaProducer::start()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
if (!enabled) {
return ret;
return err;
}
if ((ret = worker->start()) != ERROR_SUCCESS) {
srs_error("start kafka worker failed. ret=%d", ret);
return ret;
if ((err = worker->start()) != srs_success) {
return srs_error_wrap(err, "async worker");
}
srs_freep(trd);
trd = new SrsSTCoroutine("kafka", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("start kafka thread failed. ret=%d", ret);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
refresh_metadata();
return ret;
return err;
}
void SrsKafkaProducer::stop()
@ -448,15 +446,13 @@ int SrsKafkaProducer::send(int key, SrsJsonObject* obj)
}
// sync with backgound metadata worker.
srs_mutex_lock(lock);
SrsLocker(lock);
// flush message when metadata is ok.
if (metadata_ok) {
ret = flush();
}
srs_mutex_unlock(lock);
return ret;
}
@ -493,25 +489,11 @@ int SrsKafkaProducer::on_close(int key)
}
#define SRS_KAKFA_CIMS 3000
int SrsKafkaProducer::cycle()
{
int ret = ERROR_SUCCESS;
while (!trd->pull()) {
if ((ret = do_cycle()) != ERROR_SUCCESS) {
srs_warn("ignore kafka error. ret=%d", ret);
}
if (!trd->pull()) {
srs_usleep(SRS_KAKFA_CIMS * 1000);
}
}
return ret;
}
int SrsKafkaProducer::on_before_cycle()
srs_error_t SrsKafkaProducer::cycle()
{
srs_error_t err = srs_success;
// wait for the metadata expired.
// when metadata is ok, wait for it expired.
if (metadata_ok) {
@ -519,16 +501,22 @@ int SrsKafkaProducer::on_before_cycle()
}
// request to lock to acquire the socket.
srs_mutex_lock(lock);
SrsLocker(lock);
return ERROR_SUCCESS;
}
int SrsKafkaProducer::on_end_cycle()
{
srs_mutex_unlock(lock);
while (true) {
if ((err = do_cycle()) != srs_success) {
srs_warn("KafkaProducer: Ignore error, %s", srs_error_desc(err).c_str());
srs_freep(err);
}
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "kafka cycle");
}
return ERROR_SUCCESS;
srs_usleep(SRS_KAKFA_CIMS * 1000);
}
return err;
}
void SrsKafkaProducer::clear_metadata()
@ -543,22 +531,22 @@ void SrsKafkaProducer::clear_metadata()
partitions.clear();
}
int SrsKafkaProducer::do_cycle()
srs_error_t SrsKafkaProducer::do_cycle()
{
int ret = ERROR_SUCCESS;
srs_error_t err = srs_success;
// ignore when disabled.
if (!enabled) {
return ret;
return err;
}
// when kafka enabled, request metadata when startup.
if ((ret = request_metadata()) != ERROR_SUCCESS) {
srs_error("request kafka metadata failed. ret=%d", ret);
return ret;
return srs_error_new(ret, "request metadata");
}
return ret;
return err;
}
int SrsKafkaProducer::request_metadata()