1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-24 06:54:22 +00:00
srs/trunk/src/app/srs_app_kafka.hpp

215 lines
5.8 KiB
C++
Raw Normal View History

2017-03-25 09:21:39 +00:00
/**
* The MIT License (MIT)
*
2017-03-25 13:29:29 +00:00
* Copyright (c) 2013-2017 OSSRS(winlin)
2017-03-25 09:21:39 +00:00
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2015-09-22 09:40:05 +00:00
*/
#ifndef SRS_APP_KAFKA_HPP
#define SRS_APP_KAFKA_HPP
#include <srs_core.hpp>
2015-10-22 08:24:10 +00:00
#include <map>
2015-10-22 06:22:10 +00:00
#include <vector>
class SrsLbRoundRobin;
2015-09-22 09:40:05 +00:00
class SrsAsyncCallWorker;
class SrsTcpClient;
class SrsKafkaClient;
2015-10-22 07:26:57 +00:00
class SrsJsonObject;
class SrsKafkaProducer;
2015-09-22 09:40:05 +00:00
#include <srs_app_thread.hpp>
2015-10-22 07:26:57 +00:00
#include <srs_app_server.hpp>
#include <srs_app_async_call.hpp>
2015-09-22 09:46:07 +00:00
#ifdef SRS_AUTO_KAFKA
2015-10-23 01:55:29 +00:00
/**
* the partition messages cache.
*/
typedef std::vector<SrsJsonObject*> SrsKafkaPartitionCache;
2015-10-22 06:22:10 +00:00
/**
* the kafka partition info.
*/
struct SrsKafkaPartition
{
private:
std::string ep;
// Not NULL when connected.
2015-10-22 09:25:48 +00:00
SrsTcpClient* transport;
SrsKafkaClient* kafka;
2015-10-22 06:22:10 +00:00
public:
int id;
2015-10-23 01:55:29 +00:00
std::string topic;
2015-10-22 06:22:10 +00:00
// leader.
int broker;
std::string host;
int port;
public:
SrsKafkaPartition();
virtual ~SrsKafkaPartition();
public:
virtual std::string hostport();
2015-10-22 09:25:48 +00:00
virtual int connect();
2015-10-23 01:55:29 +00:00
virtual int flush(SrsKafkaPartitionCache* pc);
private:
virtual void disconnect();
2015-10-22 06:22:10 +00:00
};
2015-10-22 07:26:57 +00:00
/**
* the following is all types of kafka messages.
*/
2015-10-22 08:24:10 +00:00
class SrsKafkaMessage : public ISrsAsyncCallTask
{
2015-10-23 06:30:16 +00:00
private:
SrsKafkaProducer* producer;
2015-10-22 08:24:10 +00:00
int key;
2015-10-23 06:30:16 +00:00
SrsJsonObject* obj;
2015-10-22 08:24:10 +00:00
public:
2015-10-23 06:30:16 +00:00
SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j);
2015-10-22 08:24:10 +00:00
virtual ~SrsKafkaMessage();
2015-10-22 07:26:57 +00:00
// interface ISrsAsyncCallTask
public:
virtual int call();
virtual std::string to_string();
};
2015-10-22 08:24:10 +00:00
/**
* a message cache for kafka.
*/
class SrsKafkaCache
{
public:
// the total partitions,
// for the key to map to the parition by key%nb_partitions.
int nb_partitions;
private:
// total messages for all partitions.
int count;
// key is the partition id, value is the message set to write to this partition.
// @remark, when refresh metadata, the partition will increase,
// so maybe some message will dispatch to new partition.
std::map< int32_t, SrsKafkaPartitionCache*> cache;
public:
SrsKafkaCache();
virtual ~SrsKafkaCache();
public:
virtual void append(int key, SrsJsonObject* obj);
virtual int size();
/**
* fetch out a available partition cache.
* @return true when got a key and pc; otherwise, false.
*/
virtual bool fetch(int* pkey, SrsKafkaPartitionCache** ppc);
/**
* flush the specified partition cache.
*/
virtual int flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc);
};
/**
* the kafka cluster interface.
*/
class ISrsKafkaCluster
{
public:
ISrsKafkaCluster();
virtual ~ISrsKafkaCluster();
public:
/**
* when got any client connect to SRS, notify kafka.
2015-10-23 06:36:55 +00:00
* @param key the partition map key, the client id or hash(ip).
2015-10-22 08:24:10 +00:00
* @param type the type of client.
* @param ip the peer ip of client.
*/
virtual int on_client(int key, SrsListenerType type, std::string ip) = 0;
2015-10-23 06:36:55 +00:00
/**
* when client close or disconnect for error.
* @param key the partition map key, the client id or hash(ip).
*/
virtual int on_close(int key) = 0;
2015-10-22 08:24:10 +00:00
};
2016-12-08 03:44:49 +00:00
// @global kafka event producer.
extern ISrsKafkaCluster* _srs_kafka;
// kafka initialize and disposer for global object.
extern int srs_initialize_kafka();
extern void srs_dispose_kafka();
2015-09-22 09:46:07 +00:00
/**
* the kafka producer used to save log to kafka cluster.
*/
class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISrsKafkaCluster
2015-09-22 09:40:05 +00:00
{
private:
2015-10-23 06:40:14 +00:00
// TODO: FIXME: support reload.
bool enabled;
srs_mutex_t lock;
SrsCoroutine* trd;
private:
2015-10-22 06:29:37 +00:00
bool metadata_ok;
srs_cond_t metadata_expired;
2015-10-22 06:22:10 +00:00
public:
std::vector<SrsKafkaPartition*> partitions;
2015-10-22 08:24:10 +00:00
SrsKafkaCache* cache;
2015-09-22 09:40:05 +00:00
private:
SrsLbRoundRobin* lb;
2015-09-22 09:40:05 +00:00
SrsAsyncCallWorker* worker;
public:
SrsKafkaProducer();
virtual ~SrsKafkaProducer();
public:
virtual int initialize();
virtual int start();
virtual void stop();
2015-10-23 06:37:34 +00:00
// internal: for worker to call task to send object.
2015-10-22 08:24:10 +00:00
public:
2015-10-22 07:26:57 +00:00
/**
* send json object to kafka cluster.
* the producer will aggregate message and send in kafka message set.
2015-10-22 08:24:10 +00:00
* @param key the key to map to the partition, user can use cid or hash.
2015-10-22 07:26:57 +00:00
* @param obj the json object; user must never free it again.
*/
2015-10-22 08:24:10 +00:00
virtual int send(int key, SrsJsonObject* obj);
2015-10-23 06:37:34 +00:00
// interface ISrsKafkaCluster
public:
virtual int on_client(int key, SrsListenerType type, std::string ip);
virtual int on_close(int key);
// interface ISrsReusableThreadHandler
public:
virtual int cycle();
virtual int on_before_cycle();
virtual int on_end_cycle();
2015-09-24 04:15:12 +00:00
private:
2015-10-22 08:24:10 +00:00
virtual void clear_metadata();
virtual int do_cycle();
2015-09-24 04:15:12 +00:00
virtual int request_metadata();
2015-10-22 06:29:37 +00:00
// set the metadata to invalid and refresh it.
virtual void refresh_metadata();
2015-10-22 07:26:57 +00:00
virtual int flush();
2015-09-22 09:40:05 +00:00
};
#endif
2015-09-22 09:46:07 +00:00
#endif