diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 52236e9cd..ac4e96041 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -41,7 +41,8 @@ using namespace std; #ifdef SRS_AUTO_KAFKA #define SRS_KAKFA_CYCLE_INTERVAL_MS 3000 -#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 10 +#define SRS_KAFKA_PRODUCER_TIMEOUT 30000 +#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 1 std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata) { @@ -131,10 +132,15 @@ SrsKafkaPartition::SrsKafkaPartition() { id = broker = 0; port = SRS_CONSTS_KAFKA_DEFAULT_PORT; + + transport = new SrsTcpClient(); + kafka = new SrsKafkaClient(transport); } SrsKafkaPartition::~SrsKafkaPartition() { + srs_freep(kafka); + srs_freep(transport); } string SrsKafkaPartition::hostport() @@ -146,6 +152,25 @@ string SrsKafkaPartition::hostport() return ep; } +int SrsKafkaPartition::connect() +{ + int ret = ERROR_SUCCESS; + + if (transport->connected()) { + return ret; + } + + int64_t timeout = SRS_KAFKA_PRODUCER_TIMEOUT * 1000; + if ((ret = transport->connect(host, port, timeout)) != ERROR_SUCCESS) { + srs_error("connect to %s partition=%d failed, timeout=%"PRId64". ret=%d", hostport().c_str(), id, timeout, ret); + return ret; + } + + srs_trace("connect at %s, partition=%d, broker=%d", hostport().c_str(), id, broker); + + return ret; +} + SrsKafkaMessage::SrsKafkaMessage(int k) { key = k; @@ -252,7 +277,34 @@ bool SrsKafkaCache::fetch(int* pkey, SrsKafkaPartitionCache** ppc) int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc) { int ret = ERROR_SUCCESS; + + // ensure the key exists. + srs_assert (cache.find(key) != cache.end()); + + // connect transport. + if ((ret = partition->connect()) != ERROR_SUCCESS) { + srs_error("connect to partition failed. ret=%d", ret); + return ret; + } + + // copy the messages to a temp cache. + SrsKafkaPartitionCache tpc(*pc); + // TODO: FIXME: implements it. + + // free all wrote messages. + for (vector::iterator it = tpc.begin(); it != tpc.end(); ++it) { + SrsJsonObject* obj = *it; + srs_freep(obj); + } + + // remove the messages from cache. + if (pc->size() == tpc.size()) { + pc->clear(); + } else { + pc->erase(pc->begin(), pc->begin() + tpc.size()); + } + return ret; } @@ -516,7 +568,7 @@ int SrsKafkaProducer::flush() // flush all available partition caches. while (true) { - int key = 0; + int key = -1; SrsKafkaPartitionCache* pc = NULL; // all flushed, or no kafka partition to write to. @@ -525,7 +577,7 @@ int SrsKafkaProducer::flush() } // flush specified partition. - srs_assert(key && pc); + srs_assert(key >= 0 && pc); SrsKafkaPartition* partition = partitions.at(key % partitions.size()); if ((ret = cache->flush(partition, key, pc)) != ERROR_SUCCESS) { srs_error("flush partition failed. ret=%d", ret); diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index 4300a9db7..7f777e812 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -52,6 +52,8 @@ struct SrsKafkaPartition { private: std::string ep; + SrsTcpClient* transport; + SrsKafkaClient* kafka; public: int id; // leader. @@ -63,6 +65,7 @@ public: virtual ~SrsKafkaPartition(); public: virtual std::string hostport(); + virtual int connect(); }; /**