diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 5c15f1604..00abb442c 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -146,7 +146,17 @@ string SrsKafkaPartition::hostport() return ep; } -SrsKafkaMessageOnClient::SrsKafkaMessageOnClient(SrsKafkaProducer* p, SrsListenerType t, string i) +SrsKafkaMessage::SrsKafkaMessage(int k) +{ + key = k; +} + +SrsKafkaMessage::~SrsKafkaMessage() +{ +} + +SrsKafkaMessageOnClient::SrsKafkaMessageOnClient(SrsKafkaProducer* p, int k, SrsListenerType t, string i) + : SrsKafkaMessage(k) { producer = p; type = t; @@ -165,7 +175,7 @@ int SrsKafkaMessageOnClient::call() obj->set("type", SrsJsonAny::integer(type)); obj->set("ip", SrsJsonAny::str(ip.c_str())); - return producer->send(obj); + return producer->send(key, obj); } string SrsKafkaMessageOnClient::to_string() @@ -173,6 +183,87 @@ string SrsKafkaMessageOnClient::to_string() return ip; } +SrsKafkaCache::SrsKafkaCache() +{ + count = 0; + nb_partitions = 0; +} + +SrsKafkaCache::~SrsKafkaCache() +{ + map::iterator it; + for (it = cache.begin(); it != cache.end(); ++it) { + SrsKafkaPartitionCache* pc = it->second; + + for (vector::iterator it2 = pc->begin(); it2 != pc->end(); ++it2) { + SrsJsonObject* obj = *it2; + srs_freep(obj); + } + pc->clear(); + + srs_freep(pc); + } + cache.clear(); +} + +void SrsKafkaCache::append(int key, SrsJsonObject* obj) +{ + count++; + + int partition = 0; + if (nb_partitions > 0) { + partition = key % nb_partitions; + } + + SrsKafkaPartitionCache* pc = NULL; + map::iterator it = cache.find(partition); + if (it == cache.end()) { + pc = new SrsKafkaPartitionCache(); + cache[partition] = pc; + } else { + pc = it->second; + } + + pc->push_back(obj); +} + +int SrsKafkaCache::size() +{ + return count; +} + +bool SrsKafkaCache::fetch(int* pkey, SrsKafkaPartitionCache** ppc) +{ + map::iterator it; + for (it = cache.begin(); it != cache.end(); ++it) { + int32_t key = it->first; + SrsKafkaPartitionCache* pc = it->second; + + if (!pc->empty()) { + *pkey = (int)key; + *ppc = pc; + return true; + } + } + + return false; +} + +int SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +ISrsKafkaCluster::ISrsKafkaCluster() +{ +} + +ISrsKafkaCluster::~ISrsKafkaCluster() +{ +} + SrsKafkaProducer::SrsKafkaProducer() { metadata_ok = false; @@ -181,6 +272,7 @@ SrsKafkaProducer::SrsKafkaProducer() lock = st_mutex_new(); pthread = new SrsReusableThread("kafka", this, SRS_KAKFA_CYCLE_INTERVAL_MS * 1000); worker = new SrsAsyncCallWorker(); + cache = new SrsKafkaCache(); lb = new SrsLbRoundRobin(); transport = new SrsTcpClient(); @@ -189,12 +281,7 @@ SrsKafkaProducer::SrsKafkaProducer() SrsKafkaProducer::~SrsKafkaProducer() { - vector::iterator it; - for (it = partitions.begin(); it != partitions.end(); ++it) { - SrsKafkaPartition* partition = *it; - srs_freep(partition); - } - partitions.clear(); + clear_metadata(); srs_freep(lb); srs_freep(kafka); @@ -202,6 +289,7 @@ SrsKafkaProducer::~SrsKafkaProducer() srs_freep(worker); srs_freep(pthread); + srs_freep(cache); st_mutex_destroy(lock); st_cond_destroy(metadata_expired); @@ -240,26 +328,26 @@ void SrsKafkaProducer::stop() worker->stop(); } -int SrsKafkaProducer::on_client(SrsListenerType type, st_netfd_t stfd) +int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) { - return worker->execute(new SrsKafkaMessageOnClient(this, type, srs_get_peer_ip(st_netfd_fileno(stfd)))); + return worker->execute(new SrsKafkaMessageOnClient(this, key, type, ip)); } -int SrsKafkaProducer::send(SrsJsonObject* obj) +int SrsKafkaProducer::send(int key, SrsJsonObject* obj) { int ret = ERROR_SUCCESS; // cache the json object. - objects.push_back(obj); + cache->append(key, obj); // too few messages, ignore. - if (objects.size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) { + if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) { return ret; } // too many messages, warn user. - if (objects.size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) { - srs_warn("kafka cache too many messages: %d", objects.size()); + if (cache->size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) { + srs_warn("kafka cache too many messages: %d", cache->size()); } // sync with backgound metadata worker. @@ -307,6 +395,18 @@ int SrsKafkaProducer::on_end_cycle() return ERROR_SUCCESS; } +void SrsKafkaProducer::clear_metadata() +{ + vector::iterator it; + + for (it = partitions.begin(); it != partitions.end(); ++it) { + SrsKafkaPartition* partition = *it; + srs_freep(partition); + } + + partitions.clear(); +} + int SrsKafkaProducer::do_cycle() { int ret = ERROR_SUCCESS; @@ -381,6 +481,9 @@ int SrsKafkaProducer::request_metadata() srs_kafka_metadata2connector(metadata, partitions); srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str()); + // update the total partition for cache. + cache->nb_partitions = (int)partitions.size(); + metadata_ok = true; return ret; @@ -388,6 +491,8 @@ int SrsKafkaProducer::request_metadata() void SrsKafkaProducer::refresh_metadata() { + clear_metadata(); + metadata_ok = false; st_cond_signal(metadata_expired); srs_trace("kafka async refresh metadata in background"); @@ -396,7 +501,26 @@ void SrsKafkaProducer::refresh_metadata() int SrsKafkaProducer::flush() { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + + // flush all available partition caches. + while (true) { + int key = 0; + SrsKafkaPartitionCache* pc = NULL; + + // all flushed, or no kafka partition to write to. + if (!cache->fetch(&key, &pc) || partitions.empty()) { + break; + } + + // flush specified partition. + srs_assert(key && pc); + SrsKafkaPartition* partition = partitions.at(key % partitions.size()); + if ((ret = cache->flush(partition, key, pc)) != ERROR_SUCCESS) { + srs_error("flush partition failed. ret=%d", ret); + return ret; + } + } + return ret; } diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index f4ba4231d..a36942ee8 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -29,6 +29,7 @@ */ #include +#include #include class SrsLbRoundRobin; @@ -67,14 +68,22 @@ public: /** * the following is all types of kafka messages. */ -struct SrsKafkaMessageOnClient : public ISrsAsyncCallTask +class SrsKafkaMessage : public ISrsAsyncCallTask +{ +protected: + int key; +public: + SrsKafkaMessage(int k); + virtual ~SrsKafkaMessage(); +}; +struct SrsKafkaMessageOnClient : public SrsKafkaMessage { public: SrsKafkaProducer* producer; SrsListenerType type; std::string ip; public: - SrsKafkaMessageOnClient(SrsKafkaProducer* p, SrsListenerType t, std::string i); + SrsKafkaMessageOnClient(SrsKafkaProducer* p, int k, SrsListenerType t, std::string i); virtual ~SrsKafkaMessageOnClient(); // interface ISrsAsyncCallTask public: @@ -82,10 +91,66 @@ public: virtual std::string to_string(); }; +/** + * the partition messages cache. + */ +typedef std::vector SrsKafkaPartitionCache; + +/** + * a message cache for kafka. + */ +class SrsKafkaCache +{ +public: + // the total partitions, + // for the key to map to the parition by key%nb_partitions. + int nb_partitions; +private: + // total messages for all partitions. + int count; + // key is the partition id, value is the message set to write to this partition. + // @remark, when refresh metadata, the partition will increase, + // so maybe some message will dispatch to new partition. + std::map< int32_t, SrsKafkaPartitionCache*> cache; +public: + SrsKafkaCache(); + virtual ~SrsKafkaCache(); +public: + virtual void append(int key, SrsJsonObject* obj); + virtual int size(); + /** + * fetch out a available partition cache. + * @return true when got a key and pc; otherwise, false. + */ + virtual bool fetch(int* pkey, SrsKafkaPartitionCache** ppc); + /** + * flush the specified partition cache. + */ + virtual int flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc); +}; + +/** + * the kafka cluster interface. + */ +class ISrsKafkaCluster +{ +public: + ISrsKafkaCluster(); + virtual ~ISrsKafkaCluster(); +public: + /** + * when got any client connect to SRS, notify kafka. + * @param key the partition map key, a id or hash. + * @param type the type of client. + * @param ip the peer ip of client. + */ + virtual int on_client(int key, SrsListenerType type, std::string ip) = 0; +}; + /** * the kafka producer used to save log to kafka cluster. */ -class SrsKafkaProducer : public ISrsReusableThreadHandler +class SrsKafkaProducer : virtual public ISrsReusableThreadHandler, virtual public ISrsKafkaCluster { private: st_mutex_t lock; @@ -95,7 +160,7 @@ private: st_cond_t metadata_expired; public: std::vector partitions; - std::vector objects; + SrsKafkaCache* cache; private: SrsLbRoundRobin* lb; SrsAsyncCallWorker* worker; @@ -112,19 +177,23 @@ public: /** * when got any client connect to SRS, notify kafka. */ - virtual int on_client(SrsListenerType type, st_netfd_t stfd); + virtual int on_client(int key, SrsListenerType type, std::string ip); +// for worker to call task to send object. +public: /** * send json object to kafka cluster. * the producer will aggregate message and send in kafka message set. + * @param key the key to map to the partition, user can use cid or hash. * @param obj the json object; user must never free it again. */ - virtual int send(SrsJsonObject* obj); + virtual int send(int key, SrsJsonObject* obj); // interface ISrsReusableThreadHandler public: virtual int cycle(); virtual int on_before_cycle(); virtual int on_end_cycle(); private: + virtual void clear_metadata(); virtual int do_cycle(); virtual int request_metadata(); // set the metadata to invalid and refresh it. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index a9229611a..0ab1b58df 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -55,6 +55,7 @@ using namespace std; #include #include #include +#include // when stream is busy, for example, streaming is already // publishing, when a new client to request to publish, @@ -310,10 +311,18 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) transport->set_recv_timeout(timeout); } +#ifdef SRS_AUTO_KAFKA +SrsRtmpConn::SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c) +#else SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c) +#endif : SrsConnection(svr, c) { server = svr; +#ifdef SRS_AUTO_KAFKA + kafka = k; +#endif + req = new SrsRequest(); res = new SrsResponse(); skt = new SrsStSocket(c); @@ -365,6 +374,14 @@ int SrsRtmpConn::do_cycle() int ret = ERROR_SUCCESS; srs_trace("RTMP client ip=%s", ip.c_str()); + + // notify kafka cluster. +#ifdef SRS_AUTO_KAFKA + if ((ret = kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != ERROR_SUCCESS) { + srs_error("kafka handler on_client failed. ret=%d", ret); + return ret; + } +#endif rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT_US); rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT_US); diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 5cfc0624f..29fea6ee4 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -58,6 +58,9 @@ class SrsSecurity; class ISrsWakable; class SrsCommonMessage; class SrsPacket; +#ifdef SRS_AUTO_KAFKA +class ISrsKafkaCluster; +#endif /** * the simple rtmp client stub, use SrsRtmpClient and provides high level APIs. @@ -135,8 +138,16 @@ private: int publish_normal_timeout; // whether enable the tcp_nodelay. bool tcp_nodelay; + // the kafka cluster +#ifdef SRS_AUTO_KAFKA + ISrsKafkaCluster* kafka; +#endif public: +#ifdef SRS_AUTO_KAFKA + SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c); +#else SrsRtmpConn(SrsServer* svr, st_netfd_t c); +#endif virtual ~SrsRtmpConn(); public: virtual void dispose(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 6fc8aec49..896f8622c 100755 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1267,7 +1267,7 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) SrsConnection* conn = NULL; if (type == SrsListenerRtmpStream) { - conn = new SrsRtmpConn(this, client_stfd); + conn = new SrsRtmpConn(this, kafka, client_stfd); } else if (type == SrsListenerHttpApi) { #ifdef SRS_AUTO_HTTP_API conn = new SrsHttpApi(this, client_stfd, http_api_mux); @@ -1289,14 +1289,6 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) } srs_assert(conn); -#ifdef SRS_AUTO_KAFKA - // notify kafka cluster. - if ((ret = kafka->on_client(type, client_stfd)) != ERROR_SUCCESS) { - srs_error("kafka handler on_client failed. ret=%d", ret); - return ret; - } -#endif - // directly enqueue, the cycle thread will remove the client. conns.push_back(conn); srs_verbose("add conn to vector.");