1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

add load balance round robin for brokers.

This commit is contained in:
winlin 2015-09-24 17:20:04 +08:00
parent 2a4ab8a923
commit 6efd2dd27e
8 changed files with 147 additions and 7 deletions

View file

@ -23,21 +23,28 @@
#include <srs_app_kafka.hpp>
#include <vector>
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_app_config.hpp>
#include <srs_app_async_call.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_balance.hpp>
#ifdef SRS_AUTO_KAFKA
SrsKafkaProducer::SrsKafkaProducer()
{
lb = new SrsLbRoundRobin();
worker = new SrsAsyncCallWorker();
}
SrsKafkaProducer::~SrsKafkaProducer()
{
srs_freep(lb);
srs_freep(worker);
}
@ -46,7 +53,7 @@ int SrsKafkaProducer::initialize()
int ret = ERROR_SUCCESS;
// when kafka enabled, request metadata when startup.
if (_srs_config->get_kafka_enabled() && (ret = request_metadata()) != ERROR_SUCCESS) {
if ((ret = request_metadata()) != ERROR_SUCCESS) {
srs_error("request kafka metadata failed. ret=%d", ret);
return ret;
}
@ -65,8 +72,7 @@ int SrsKafkaProducer::start()
return ret;
}
std::string enabled = srs_bool2switch(_srs_config->get_kafka_enabled());
srs_trace("kafka worker ok, enabled:%s", enabled.c_str());
srs_info("kafka worker ok");
return ret;
}
@ -80,7 +86,26 @@ int SrsKafkaProducer::request_metadata()
{
int ret = ERROR_SUCCESS;
srs_info("update kafka metadata ok");
bool enabled = _srs_config->get_kafka_enabled();
if (!enabled) {
return ret;
}
SrsConfDirective* brokers = _srs_config->get_kafka_brokers();
if (!brokers) {
srs_warn("ignore for empty brokers.");
return ret;
}
srs_assert(!brokers->args.empty());
std::string broker = lb->select<string>(brokers->args);
if (true) {
std::string senabled = srs_bool2switch(enabled);
std::string sbrokers = srs_join_vector_string(brokers->args, ",");
srs_trace("kafka ok, enabled:%s, brokers:%s, current:[%d]%s",
senabled.c_str(), sbrokers.c_str(), lb->current(), broker.c_str());
}
return ret;
}