diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 12e6525ee..477a3eca6 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2128,6 +2128,8 @@ int SrsConfig::global_to_json(SrsJsonObject* obj) sobj->set(sdir->name, sdir->dumps_arg0_to_boolean()); } else if (sdir->name == "brokers") { sobj->set(sdir->name, sdir->dumps_args()); + } else if (sdir->name == "topic") { + sobj->set(sdir->name, sdir->dumps_arg0_to_str()); } } obj->set(dir->name, sobj); @@ -3546,7 +3548,7 @@ int SrsConfig::check_config() SrsConfDirective* conf = root->get("kafka"); for (int i = 0; conf && i < (int)conf->directives.size(); i++) { string n = conf->at(i)->name; - if (n != "enabled" && n != "brokers") { + if (n != "enabled" && n != "brokers" && n != "topic") { ret = ERROR_SYSTEM_CONFIG_INVALID; srs_error("unsupported kafka directive %s, ret=%d", n.c_str(), ret); return ret; diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 9dd1f3def..3c7b01c99 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -32,6 +32,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -201,6 +202,48 @@ int SrsKafkaProducer::request_metadata() } SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata); + // show kafka metadata. + string summary; + if (true) { + vector bs; + for (int i = 0; i < metadata->brokers.size(); i++) { + SrsKafkaBroker* broker = metadata->brokers.at(i); + + string hostport = srs_int2str(broker->node_id) + "/" + broker->host.to_str(); + if (broker->port > 0) { + hostport += ":" + srs_int2str(broker->port); + } + + bs.push_back(hostport); + } + + vector ps; + for (int i = 0; i < metadata->metadatas.size(); i++) { + SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i); + + string desc = "topic=" + topic->name.to_str(); + + for (int j = 0; j < topic->metadatas.size(); j++) { + SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j); + + desc += ", partition" + srs_int2str(partition->partition_id) +"="; + desc += srs_int2str(partition->leader) + "/"; + + vector replicas = srs_kafka_array2vector(&partition->replicas); + desc += srs_join_vector_string(replicas, ","); + } + + ps.push_back(desc); + } + + std::stringstream ss; + ss << "brokers=" << srs_join_vector_string(bs, ","); + ss << ", " << srs_join_vector_string(ps, ","); + + summary = ss.str(); + } + srs_trace("kafka metadata: %s", summary.c_str()); + meatadata_ok = true; return ret; diff --git a/trunk/src/app/srs_app_utility.cpp b/trunk/src/app/srs_app_utility.cpp index 490cff7ee..f978909dc 100644 --- a/trunk/src/app/srs_app_utility.cpp +++ b/trunk/src/app/srs_app_utility.cpp @@ -1467,17 +1467,3 @@ void srs_api_dump_summaries(SrsJsonObject* obj) sys->set("conn_srs", SrsJsonAny::integer(nrs->nb_conn_srs)); } -string srs_join_vector_string(vector& vs, string separator) -{ - string str = ""; - - for (int i = 0; i < (int)vs.size(); i++) { - str += vs.at(i); - if (i != (int)vs.size() - 1) { - str += separator; - } - } - - return str; -} - diff --git a/trunk/src/app/srs_app_utility.hpp b/trunk/src/app/srs_app_utility.hpp index 2ef07d330..971bdbe99 100644 --- a/trunk/src/app/srs_app_utility.hpp +++ b/trunk/src/app/srs_app_utility.hpp @@ -677,8 +677,5 @@ extern bool srs_is_boolean(const std::string& str); // dump summaries for /api/v1/summaries. extern void srs_api_dump_summaries(SrsJsonObject* obj); -// join string in vector with indicated separator -extern std::string srs_join_vector_string(std::vector& vs, std::string separator); - #endif diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index 9b966a2dc..ed66d76f5 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -23,7 +23,7 @@ #include -#include +#include using namespace std; #include @@ -31,6 +31,8 @@ using namespace std; #include #include #include +#include +#include #ifdef SRS_AUTO_KAFKA @@ -64,6 +66,15 @@ bool SrsKafkaString::empty() return _size <= 0; } +string SrsKafkaString::to_str() +{ + string ret; + if (_size > 0) { + ret.append(data, _size); + } + return ret; +} + int SrsKafkaString::nb_bytes() { return _size == -1? 2 : 2 + _size; @@ -1077,8 +1088,24 @@ int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse** vector srs_kafka_array2vector(SrsKafkaArray* arr) { vector strs; - for (int i = 0; i < arr->nb_bytes(); i++) { + + for (int i = 0; i < arr->size(); i++) { + SrsKafkaString* elem = arr->at(i); + strs.push_back(elem->to_str()); } + + return strs; +} + +vector srs_kafka_array2vector(SrsKafkaArray* arr) +{ + vector strs; + + for (int i = 0; i < arr->size(); i++) { + int32_t elem = arr->at(i); + strs.push_back(srs_int2str(elem)); + } + return strs; } diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 4e25f2f9e..f181d04dc 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -77,6 +77,7 @@ public: public: virtual bool null(); virtual bool empty(); + virtual std::string to_str(); // interface ISrsCodec public: virtual int nb_bytes(); @@ -147,6 +148,14 @@ public: length++; elems.push_back(elem); } + virtual int size() + { + return length; + } + virtual T* at(int index) + { + return elems.at(index); + } // interface ISrsCodec public: virtual int nb_bytes() @@ -228,6 +237,14 @@ public: length++; elems.push_back(elem); } + virtual int size() + { + return length; + } + virtual int32_t at(int index) + { + return elems.at(index); + } // interface ISrsCodec public: virtual int nb_bytes() @@ -792,6 +809,7 @@ public: // convert kafka array[string] to vector[string] extern std::vector srs_kafka_array2vector(SrsKafkaArray* arr); +extern std::vector srs_kafka_array2vector(SrsKafkaArray* arr); #endif diff --git a/trunk/src/protocol/srs_protocol_utility.cpp b/trunk/src/protocol/srs_protocol_utility.cpp index 09cfd6b5c..be36509b2 100644 --- a/trunk/src/protocol/srs_protocol_utility.cpp +++ b/trunk/src/protocol/srs_protocol_utility.cpp @@ -312,3 +312,17 @@ int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, s return ret; } +string srs_join_vector_string(vector& vs, string separator) +{ + string str = ""; + + for (int i = 0; i < (int)vs.size(); i++) { + str += vs.at(i); + if (i != (int)vs.size() - 1) { + str += separator; + } + } + + return str; +} + diff --git a/trunk/src/protocol/srs_protocol_utility.hpp b/trunk/src/protocol/srs_protocol_utility.hpp index d2ce19068..41893a34c 100644 --- a/trunk/src/protocol/srs_protocol_utility.hpp +++ b/trunk/src/protocol/srs_protocol_utility.hpp @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #endif #include +#include #include @@ -130,5 +131,8 @@ extern int srs_write_large_iovs( ssize_t* pnwrite = NULL ); +// join string in vector with indicated separator +extern std::string srs_join_vector_string(std::vector& vs, std::string separator); + #endif