diff --git a/trunk/auto/auto_headers.sh b/trunk/auto/auto_headers.sh index d9c723014..a3dc7ce2e 100644 --- a/trunk/auto/auto_headers.sh +++ b/trunk/auto/auto_headers.sh @@ -94,6 +94,12 @@ else echo "#undef SRS_AUTO_STREAM_CASTER" >> $SRS_AUTO_HEADERS_H fi +if [ $SRS_KAFKA = YES ]; then + echo "#define SRS_AUTO_KAFKA" >> $SRS_AUTO_HEADERS_H +else + echo "#undef SRS_AUTO_KAFKA" >> $SRS_AUTO_HEADERS_H +fi + if [ $SRS_HTTP_API = YES ]; then echo "#define SRS_AUTO_HTTP_API" >> $SRS_AUTO_HEADERS_H else diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh index 94ad57b48..80856a1f6 100755 --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -27,6 +27,7 @@ SRS_STAT=RESERVED SRS_HTTP_CALLBACK=RESERVED SRS_HTTP_SERVER=RESERVED SRS_STREAM_CASTER=RESERVED +SRS_KAFKA=RESERVED SRS_HTTP_API=RESERVED SRS_LIBRTMP=RESERVED SRS_RESEARCH=RESERVED @@ -121,6 +122,7 @@ Options: --with-http-callback enable http hooks, build cherrypy as demo api server. --with-http-server enable http server to delivery http stream. --with-stream-caster enable stream caster to serve other stream over other protocol. + --with-kafka enable srs kafka producer to report to kafka. --with-http-api enable http api, to manage SRS by http api. --with-ffmpeg enable transcoding tool ffmpeg. --with-transcode enable transcoding features. @@ -145,6 +147,7 @@ Options: --without-http-callback disable http, http hooks callback. --without-http-server disable http server, use external server to delivery http stream. --without-stream-caster disable stream caster, only listen and serve RTMP/HTTP. + --without-kafka disable the srs kafka producer. --without-http-api disable http api, only use console to manage SRS process. --without-ffmpeg disable the ffmpeg transcode tool feature. --without-transcode disable the transcoding feature. @@ -232,6 +235,7 @@ function parse_user_option() { --with-http-callback) SRS_HTTP_CALLBACK=YES ;; --with-http-server) SRS_HTTP_SERVER=YES ;; --with-stream-caster) SRS_STREAM_CASTER=YES ;; + --with-kafka) SRS_KAFKA=YES ;; --with-http-api) SRS_HTTP_API=YES ;; --with-librtmp) SRS_LIBRTMP=YES ;; --with-research) SRS_RESEARCH=YES ;; @@ -256,6 +260,7 @@ function parse_user_option() { --without-http-callback) SRS_HTTP_CALLBACK=NO ;; --without-http-server) SRS_HTTP_SERVER=NO ;; --without-stream-caster) SRS_STREAM_CASTER=NO ;; + --without-kafka) SRS_KAFKA=NO ;; --without-http-api) SRS_HTTP_API=NO ;; --without-librtmp) SRS_LIBRTMP=NO ;; --without-research) SRS_RESEARCH=NO ;; @@ -390,6 +395,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=NO SRS_HTTP_SERVER=NO SRS_STREAM_CASTER=NO + SRS_KAFKA=NO SRS_HTTP_API=NO SRS_LIBRTMP=NO SRS_RESEARCH=NO @@ -417,6 +423,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=YES SRS_HTTP_SERVER=YES SRS_STREAM_CASTER=YES + SRS_KAFKA=YES SRS_HTTP_API=YES SRS_LIBRTMP=YES SRS_RESEARCH=YES @@ -444,6 +451,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=NO SRS_HTTP_SERVER=NO SRS_STREAM_CASTER=NO + SRS_KAFKA=NO SRS_HTTP_API=NO SRS_LIBRTMP=NO SRS_RESEARCH=NO @@ -471,6 +479,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=NO SRS_HTTP_SERVER=NO SRS_STREAM_CASTER=NO + SRS_KAFKA=NO SRS_HTTP_API=NO SRS_LIBRTMP=NO SRS_RESEARCH=NO @@ -498,6 +507,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=NO SRS_HTTP_SERVER=NO SRS_STREAM_CASTER=NO + SRS_KAFKA=NO SRS_HTTP_API=NO SRS_LIBRTMP=NO SRS_RESEARCH=NO @@ -525,6 +535,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=YES SRS_HTTP_SERVER=YES SRS_STREAM_CASTER=NO + SRS_KAFKA=YES SRS_HTTP_API=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO @@ -553,6 +564,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=YES SRS_HTTP_SERVER=YES SRS_STREAM_CASTER=NO + SRS_KAFKA=YES SRS_HTTP_API=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO @@ -580,6 +592,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=YES SRS_HTTP_SERVER=YES SRS_STREAM_CASTER=NO + SRS_KAFKA=YES SRS_HTTP_API=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO @@ -607,6 +620,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=YES SRS_HTTP_SERVER=YES SRS_STREAM_CASTER=NO + SRS_KAFKA=YES SRS_HTTP_API=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO @@ -634,6 +648,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=YES SRS_HTTP_SERVER=YES SRS_STREAM_CASTER=NO + SRS_KAFKA=YES SRS_HTTP_API=YES SRS_LIBRTMP=YES SRS_RESEARCH=YES @@ -661,6 +676,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=YES SRS_HTTP_SERVER=YES SRS_STREAM_CASTER=NO + SRS_KAFKA=YES SRS_HTTP_API=YES SRS_LIBRTMP=NO SRS_RESEARCH=NO @@ -688,6 +704,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=YES SRS_HTTP_SERVER=YES SRS_STREAM_CASTER=NO + SRS_KAFKA=YES SRS_HTTP_API=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO @@ -715,6 +732,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=YES SRS_HTTP_SERVER=YES SRS_STREAM_CASTER=NO + SRS_KAFKA=YES SRS_HTTP_API=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO @@ -742,6 +760,7 @@ function apply_user_presets() { SRS_HTTP_CALLBACK=YES SRS_HTTP_SERVER=YES SRS_STREAM_CASTER=NO + SRS_KAFKA=YES SRS_HTTP_API=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO @@ -804,6 +823,7 @@ function apply_user_detail_options() { SRS_HTTP_CALLBACK=NO SRS_HTTP_SERVER=NO SRS_STREAM_CASTER=NO + SRS_KAFKA=NO SRS_HTTP_API=NO SRS_LIBRTMP=YES SRS_RESEARCH=YES @@ -835,6 +855,7 @@ SRS_AUTO_CONFIGURE="--prefix=${SRS_PREFIX}" if [ $SRS_HTTP_CALLBACK = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-callback"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-callback"; fi if [ $SRS_HTTP_SERVER = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-server"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-server"; fi if [ $SRS_STREAM_CASTER = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-stream-caster"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-stream-caster"; fi + if [ $SRS_KAFKA = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-kafka"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-kafka"; fi if [ $SRS_HTTP_API = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-api"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-api"; fi if [ $SRS_LIBRTMP = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-librtmp"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-librtmp"; fi if [ $SRS_RESEARCH = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-research"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-research"; fi @@ -912,6 +933,7 @@ function check_option_conflicts() { if [ $SRS_HTTP_CALLBACK = RESERVED ]; then echo "you must specifies the http-callback, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_HTTP_SERVER = RESERVED ]; then echo "you must specifies the http-server, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_STREAM_CASTER = RESERVED ]; then echo "you must specifies the stream-caster, see: ./configure --help"; __check_ok=NO; fi + if [ $SRS_KAFKA = RESERVED ]; then echo "you must specifies the kafka, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_HTTP_API = RESERVED ]; then echo "you must specifies the http-api, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_LIBRTMP = RESERVED ]; then echo "you must specifies the librtmp, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_RESEARCH = RESERVED ]; then echo "you must specifies the research, see: ./configure --help"; __check_ok=NO; fi diff --git a/trunk/auto/summary.sh b/trunk/auto/summary.sh index b1c3c2683..82f3a2a1c 100755 --- a/trunk/auto/summary.sh +++ b/trunk/auto/summary.sh @@ -12,6 +12,7 @@ SrsHttpCallbackSummaryColor="\${YELLOW}{disabled} "; if [ $SRS_HTTP_CALLBACK = Y SrsHttpServerSummaryColor="\${YELLOW}{disabled} "; if [ $SRS_HTTP_SERVER = YES ]; then SrsHttpServerSummaryColor="\${GREEN}"; fi SrsHttpApiSummaryColor="\${YELLOW}{disabled} "; if [ $SRS_HTTP_API = YES ]; then SrsHttpApiSummaryColor="\${GREEN}"; fi SrsStreamCasterSummaryColor="\${YELLOW}{disabled} "; if [ $SRS_STREAM_CASTER = YES ]; then SrsStreamCasterSummaryColor="\${GREEN}"; fi +SrsKafkaSummaryColor="\${YELLOW}{disabled} "; if [ $SRS_KAFKA = YES ]; then SrsKafkaSummaryColor="\${GREEN}"; fi SrsLibrtmpSummaryColor="\${YELLOW}{disabled} "; if [ $SRS_LIBRTMP = YES ]; then SrsLibrtmpSummaryColor="\${GREEN}"; fi SrsLibrtmpSSLSummaryColor="\${YELLOW}{disabled} "; if [ $SRS_LIBRTMP = YES ]; then if [ $SRS_SSL = YES ]; then SrsLibrtmpSSLSummaryColor="\${GREEN}"; fi fi SrsResearchSummaryColor="\${GREEN}{disabled} "; if [ $SRS_RESEARCH = YES ]; then SrsResearchSummaryColor="\${GREEN}"; fi @@ -101,6 +102,8 @@ echo -e " | ${SrsHttpApiSummaryColor}http-api @see: https://github.com/s echo -e " | ${SrsHttpApiSummaryColor}http-api: support http api to manage server\${BLACK}" echo -e " | ${SrsStreamCasterSummaryColor}stream-caster @see: https://github.com/simple-rtmp-server/srs/wiki/v2_CN_Streamer\${BLACK}" echo -e " | ${SrsStreamCasterSummaryColor}stream-caster: start server to cast stream over other protocols.\${BLACK}" +echo -e " | ${SrsKafkaSummaryColor}kafka @see: https://github.com/simple-rtmp-server/srs/wiki/v3_CN_Kafka\${BLACK}" +echo -e " | ${SrsKafkaSummaryColor}kafka: start srs kafka producer to report to kafka.\${BLACK}" echo -e " \${BLACK}+------------------------------------------------------------------------------------\${BLACK}" echo -e "\${GREEN}binaries @see: https://github.com/simple-rtmp-server/srs/wiki/v2_CN_Build\${BLACK}" diff --git a/trunk/configure b/trunk/configure index 6ab0af748..b61f8dc09 100755 --- a/trunk/configure +++ b/trunk/configure @@ -177,7 +177,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds" "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" - "srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec") + "srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec" "srs_app_kafka") DEFINES="" # add each modules for app for SRS_MODULE in ${SRS_MODULES[*]}; do @@ -514,6 +514,11 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then else echo -e "${GREEN}note: without StreamCaster support${BLACK}" fi + if [ $SRS_KAFKA = YES ]; then + echo -e "${GREEN}Kafka is enabled${BLACK}" + else + echo -e "${YELLOW}warning: without Kafka support${BLACK}" + fi if [ $SRS_HDS = YES ]; then echo -e "${YELLOW}Experiment: HDS is enabled${BLACK}" else diff --git a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj index b8de570cf..043bafe3d 100644 --- a/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj +++ b/trunk/ide/srs_xcode/srs_xcode.xcodeproj/project.pbxproj @@ -75,6 +75,7 @@ 3C1232ED1AAEA70F00CE8F6C /* libhttp_parser.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 3C1232EC1AAEA70F00CE8F6C /* libhttp_parser.a */; }; 3C1EE6AE1AB1055800576EE9 /* srs_app_hds.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C1EE6AC1AB1055800576EE9 /* srs_app_hds.cpp */; }; 3C1EE6D71AB1367D00576EE9 /* README.md in Sources */ = {isa = PBXBuildFile; fileRef = 3C1EE6D61AB1367D00576EE9 /* README.md */; }; + 3C26E3C61BB146FF00D0F9DB /* srs_app_kafka.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C26E3C41BB146FF00D0F9DB /* srs_app_kafka.cpp */; }; 3C28EDDF1AF5C43F00A3AEAC /* srs_app_caster_flv.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C28EDDD1AF5C43F00A3AEAC /* srs_app_caster_flv.cpp */; }; 3C36DB5B1ABD1CB90066CCAF /* srs_lib_bandwidth.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB551ABD1CB90066CCAF /* srs_lib_bandwidth.cpp */; }; 3C36DB5C1ABD1CB90066CCAF /* srs_lib_simple_socket.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3C36DB571ABD1CB90066CCAF /* srs_lib_simple_socket.cpp */; }; @@ -323,6 +324,8 @@ 3C1EE6D41AB1367D00576EE9 /* DONATIONS.txt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; name = DONATIONS.txt; path = ../../../DONATIONS.txt; sourceTree = ""; }; 3C1EE6D51AB1367D00576EE9 /* LICENSE */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; name = LICENSE; path = ../../../LICENSE; sourceTree = ""; }; 3C1EE6D61AB1367D00576EE9 /* README.md */ = {isa = PBXFileReference; explicitFileType = net.daringfireball.markdown; fileEncoding = 4; name = README.md; path = ../../../README.md; sourceTree = ""; wrapsLines = 0; }; + 3C26E3C41BB146FF00D0F9DB /* srs_app_kafka.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_kafka.cpp; path = ../../../src/app/srs_app_kafka.cpp; sourceTree = ""; }; + 3C26E3C51BB146FF00D0F9DB /* srs_app_kafka.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_kafka.hpp; path = ../../../src/app/srs_app_kafka.hpp; sourceTree = ""; }; 3C28EDDD1AF5C43F00A3AEAC /* srs_app_caster_flv.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_app_caster_flv.cpp; path = ../../../src/app/srs_app_caster_flv.cpp; sourceTree = ""; }; 3C28EDDE1AF5C43F00A3AEAC /* srs_app_caster_flv.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = srs_app_caster_flv.hpp; path = ../../../src/app/srs_app_caster_flv.hpp; sourceTree = ""; }; 3C36DB551ABD1CB90066CCAF /* srs_lib_bandwidth.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = srs_lib_bandwidth.cpp; path = ../../../src/libs/srs_lib_bandwidth.cpp; sourceTree = ""; }; @@ -584,6 +587,8 @@ 3C036B541B2D0AC10078E2E0 /* srs_app_http_stream.hpp */, 3C12326C1AAE81D900CE8F6C /* srs_app_ingest.cpp */, 3C12326D1AAE81D900CE8F6C /* srs_app_ingest.hpp */, + 3C26E3C41BB146FF00D0F9DB /* srs_app_kafka.cpp */, + 3C26E3C51BB146FF00D0F9DB /* srs_app_kafka.hpp */, 3C1232721AAE81D900CE8F6C /* srs_app_listener.cpp */, 3C1232731AAE81D900CE8F6C /* srs_app_listener.hpp */, 3C1232741AAE81D900CE8F6C /* srs_app_log.cpp */, @@ -887,6 +892,7 @@ 3C12329E1AAE81D900CE8F6C /* srs_app_hls.cpp in Sources */, 3CC52DD91ACE4023006FEB01 /* srs_utest_config.cpp in Sources */, 3C663F171AB0155100286D8B /* srs_ingest_rtmp.c in Sources */, + 3C26E3C61BB146FF00D0F9DB /* srs_app_kafka.cpp in Sources */, 3C663F131AB0155100286D8B /* srs_flv_injecter.c in Sources */, 3C1232971AAE81D900CE8F6C /* srs_app_dvr.cpp in Sources */, 3C1232271AAE814D00CE8F6C /* srs_kernel_log.cpp in Sources */, diff --git a/trunk/src/app/srs_app_async_call.hpp b/trunk/src/app/srs_app_async_call.hpp index 81bcdf71e..fae83ab3a 100644 --- a/trunk/src/app/srs_app_async_call.hpp +++ b/trunk/src/app/srs_app_async_call.hpp @@ -62,7 +62,7 @@ public: }; /** - * the async callback for dvr. + * the async callback for dvr, callback and other async worker. * when worker call with the task, the worker will do it in isolate thread. * that is, the task is execute/call in async mode. */ diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index cf4071fdf..06a86aba4 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1539,6 +1539,7 @@ int SrsConfig::reload_conf(SrsConfig* conf) } // TODO: FIXME: support reload stream_caster. + // TODO: FIXME: support reload kafka. // merge config: vhost if ((ret = reload_vhost(old_root)) != ERROR_SUCCESS) { diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp new file mode 100644 index 000000000..7dffec4be --- /dev/null +++ b/trunk/src/app/srs_app_kafka.cpp @@ -0,0 +1,68 @@ +/* + The MIT License (MIT) + + Copyright (c) 2013-2015 SRS(simple-rtmp-server) + + Permission is hereby granted, free of charge, to any person obtaining a copy of + this software and associated documentation files (the "Software"), to deal in + the Software without restriction, including without limitation the rights to + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + the Software, and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +#include +#include + +SrsKafkaProducer::SrsKafkaProducer() +{ + worker = new SrsAsyncCallWorker(); +} + +SrsKafkaProducer::~SrsKafkaProducer() +{ + srs_freep(worker); +} + +int SrsKafkaProducer::initialize() +{ + int ret = ERROR_SUCCESS; + + srs_trace("initialize kafka producer ok."); + + return ret; +} + +int SrsKafkaProducer::start() +{ + int ret = ERROR_SUCCESS; + + if ((ret = worker->start()) != ERROR_SUCCESS) { + srs_error("start kafka failed. ret=%d", ret); + return ret; + } + + srs_trace("start kafka async worker ok."); + + return ret; +} + +void SrsKafkaProducer::stop() +{ + worker->stop(); +} + diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp new file mode 100644 index 000000000..0bbd7b570 --- /dev/null +++ b/trunk/src/app/srs_app_kafka.hpp @@ -0,0 +1,47 @@ +/* + The MIT License (MIT) + + Copyright (c) 2013-2015 SRS(simple-rtmp-server) + + Permission is hereby granted, free of charge, to any person obtaining a copy of + this software and associated documentation files (the "Software"), to deal in + the Software without restriction, including without limitation the rights to + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + the Software, and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_KAFKA_HPP +#define SRS_APP_KAFKA_HPP + +/* +#include +*/ +#include + +class SrsAsyncCallWorker; + +class SrsKafkaProducer +{ +private: + SrsAsyncCallWorker* worker; +public: + SrsKafkaProducer(); + virtual ~SrsKafkaProducer(); +public: + virtual int initialize(); + virtual int start(); + virtual void stop(); +}; + +#endif diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 40b34109b..d6c315a24 100755 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -49,6 +49,7 @@ using namespace std; #include #include #include +#include // system interval in ms, // all resolution times should be times togother, @@ -508,6 +509,7 @@ SrsServer::SrsServer() #ifdef SRS_AUTO_INGEST ingester = NULL; #endif + kafka = new SrsKafkaProducer(); } SrsServer::~SrsServer() @@ -537,6 +539,8 @@ void SrsServer::destroy() srs_freep(ingester); #endif + srs_freep(kafka); + if (pid_fd > 0) { ::close(pid_fd); pid_fd = -1; @@ -561,6 +565,8 @@ void SrsServer::dispose() ingester->dispose(); #endif + kafka->stop(); + SrsSource::dispose_all(); while (!conns.empty()) { @@ -864,6 +870,18 @@ int SrsServer::ingest() return ret; } +int SrsServer::start_kafka() +{ + int ret = ERROR_SUCCESS; + + if ((ret = kafka->initialize()) != ERROR_SUCCESS) { + srs_error("initialize the kafka producer failed. ret=%d", ret); + return ret; + } + + return kafka->start(); +} + int SrsServer::cycle() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 969d03841..a2f131154 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -55,6 +55,7 @@ class SrsTcpListener; #ifdef SRS_AUTO_STREAM_CASTER class SrsAppCasterFlv; #endif +class SrsKafkaProducer; // listener type for server to identify the connection, // that is, use different type to process the connection. @@ -247,6 +248,7 @@ private: #ifdef SRS_AUTO_INGEST SrsIngester* ingester; #endif + SrsKafkaProducer* kafka; private: /** * the pid file fd, lock the file write when server is running. @@ -307,6 +309,7 @@ public: virtual int register_signal(); virtual int http_handle(); virtual int ingest(); + virtual int start_kafka(); virtual int cycle(); // IConnectionManager public: diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 5193ce600..c4b33349f 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -382,6 +382,10 @@ int run_master() return ret; } + if ((ret = _srs_server->start_kafka()) != ERROR_SUCCESS) { + return ret; + } + if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) { return ret; }