From 2a4ab8a923d4df1805b276f217b616419308c0e5 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 24 Sep 2015 14:53:22 +0800 Subject: [PATCH] add brokers to config --- trunk/conf/full.conf | 4 ++++ trunk/src/app/srs_app_config.cpp | 28 +++++++++++++++++++++++++++- trunk/src/app/srs_app_config.hpp | 4 ++++ trunk/src/app/srs_app_kafka.cpp | 4 +++- 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 0ffd986c0..e2c6ae406 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -240,6 +240,10 @@ kafka { # whether enabled kafka. # default: off enabled off; + # the broker list, broker is + # and use space to specify multple brokers. + # for exampl, 127.0.0.1:9092 127.0.0.1:9093 + brokers 127.0.0.1:9092; } ############################################################################################# diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 06a86aba4..83c63568e 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2120,6 +2120,17 @@ int SrsConfig::global_to_json(SrsJsonObject* obj) } } obj->set(dir->name, sobj); + } else if (dir->name == "kafka") { + SrsJsonObject* sobj = SrsJsonAny::object(); + for (int j = 0; j < (int)dir->directives.size(); j++) { + SrsConfDirective* sdir = dir->directives.at(j); + if (sdir->name == "enabled") { + sobj->set(sdir->name, sdir->dumps_arg0_to_boolean()); + } else if (sdir->name == "brokers") { + sobj->set(sdir->name, sdir->dumps_args()); + } + } + obj->set(dir->name, sobj); } else if (dir->name == "stream_caster") { SrsJsonObject* sobj = SrsJsonAny::object(); for (int j = 0; j < (int)dir->directives.size(); j++) { @@ -3535,7 +3546,7 @@ int SrsConfig::check_config() SrsConfDirective* conf = root->get("kafka"); for (int i = 0; conf && i < (int)conf->directives.size(); i++) { string n = conf->at(i)->name; - if (n != "enabled") { + if (n != "enabled" && n != "brokers") { ret = ERROR_SYSTEM_CONFIG_INVALID; srs_error("unsupported kafka directive %s, ret=%d", n.c_str(), ret); return ret; @@ -4272,6 +4283,21 @@ bool SrsConfig::get_kafka_enabled() return SRS_CONF_PERFER_FALSE(conf->arg0()); } +SrsConfDirective* SrsConfig::get_kafka_brokers() +{ + SrsConfDirective* conf = root->get("kafka"); + if (!conf) { + return NULL; + } + + conf->get("brokers"); + if (!conf || conf->args.empty()) { + return NULL; + } + + return conf; +} + SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost) { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 20dcb8749..48cbd59e3 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -634,6 +634,10 @@ public: * whether the kafka enabled. */ virtual bool get_kafka_enabled(); + /** + * get the broker list, each is format in . + */ + virtual SrsConfDirective* get_kafka_brokers(); // vhost specified section public: /** diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 0dbcb556a..a1f100be4 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #ifdef SRS_AUTO_KAFKA @@ -64,7 +65,8 @@ int SrsKafkaProducer::start() return ret; } - srs_trace("kafka worker ok, enabled:%d", _srs_config->get_kafka_enabled()); + std::string enabled = srs_bool2switch(_srs_config->get_kafka_enabled()); + srs_trace("kafka worker ok, enabled:%s", enabled.c_str()); return ret; }