diff --git a/trunk/.gitignore b/trunk/.gitignore new file mode 100644 index 000000000..4319156b7 --- /dev/null +++ b/trunk/.gitignore @@ -0,0 +1,46 @@ +console.conf +doc/frozen.2Mbps.1644x1028.flv +doc/frozen.500Kbps.766x480.flv +doc/kungfupanda3-tlr1_h1080p.200kbps.flv +doc/kungfupanda3-tlr1_h1080p.300kbps.flv +doc/kungfupanda3-tlr1_h1080p.400kbps.flv +doc/kungfupanda3-tlr1_h1080p.500kbps.flv +doc/kungfupanda3-tlr1_h1080p.600kbps.flv +doc/kungfupanda3-tlr1_h1080p.700kbps.flv +doc/kungfupanda3-tlr1_h1080p.800kbps.flv +doc/kungfupanda3-tlr1_h1080p.8mbps.flv +doc/kungfupanda3-tlr1_h1080p.900kbps.flv +doc/time.300kbps.flv +edge.conf +html +ide/srs_xcode/srs_xcode.xcodeproj/project.xcworkspace/xcshareddata/ +ide/srs_xcode/srs_xcode.xcodeproj/project.xcworkspace/xcuserdata/ +ide/srs_xcode/srs_xcode.xcodeproj/xcuserdata/ +ingest.conf +origin.conf +research/aac/ +research/api-server/.idea/ +research/api-server/static-dir/mse +research/bat/ +research/big/ +research/bitch/ +research/bott/ +research/cgo/ +research/dns/ +research/empty/ +research/golang/golang +research/golang/temp.flv +research/librtmp/720p.h264.raw +research/librtmp/test.h264 +research/licenser/ +research/players/.idea/ +research/players/fls_player/ +research/players/mic/ +research/players/srs_player/.idea/ +research/proxy/ +research/redis-ocluster/ +research/rtmfp/ +research/snap/ +research/speex/ +test/ + diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 1d2775330..fc76613c0 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -321,6 +321,35 @@ ISrsKafkaCluster::~ISrsKafkaCluster() { } +// @global kafka event producer, user must use srs_initialize_kafka to initialize it. +ISrsKafkaCluster* _srs_kafka = NULL; + +int srs_initialize_kafka() +{ + int ret = ERROR_SUCCESS; + + SrsKafkaProducer* kafka = new SrsKafkaProducer(); + _srs_kafka = kafka; + + if ((ret = kafka->initialize()) != ERROR_SUCCESS) { + srs_error("initialize the kafka producer failed. ret=%d", ret); + return ret; + } + + if ((ret = kafka->start()) != ERROR_SUCCESS) { + srs_error("start kafka failed. ret=%d", ret); + return ret; + } + + return ret; +} + +void srs_dispose_kafka() +{ + SrsKafkaProducer* kafka = dynamic_cast(_srs_kafka); + kafka->stop(); +} + SrsKafkaProducer::SrsKafkaProducer() { metadata_ok = false; @@ -362,6 +391,10 @@ int SrsKafkaProducer::start() { int ret = ERROR_SUCCESS; + if (!enabled) { + return ret; + } + if ((ret = worker->start()) != ERROR_SUCCESS) { srs_error("start kafka worker failed. ret=%d", ret); return ret; @@ -378,6 +411,10 @@ int SrsKafkaProducer::start() void SrsKafkaProducer::stop() { + if (!enabled) { + return; + } + pthread->stop(); worker->stop(); } diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index b9115a79c..71d24635f 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -149,6 +149,12 @@ public: virtual int on_close(int key) = 0; }; +// @global kafka event producer. +extern ISrsKafkaCluster* _srs_kafka; +// kafka initialize and disposer for global object. +extern int srs_initialize_kafka(); +extern void srs_dispose_kafka(); + /** * the kafka producer used to save log to kafka cluster. */ diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 1b1dcc0b9..9c02bf759 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -309,17 +309,10 @@ void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) transport->set_recv_timeout(timeout); } -#ifdef SRS_AUTO_KAFKA -SrsRtmpConn::SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c, string cip) -#else SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c, string cip) -#endif : SrsConnection(svr, c, cip) { server = svr; -#ifdef SRS_AUTO_KAFKA - kafka = k; -#endif req = new SrsRequest(); res = new SrsResponse(); @@ -375,7 +368,7 @@ int SrsRtmpConn::do_cycle() // notify kafka cluster. #ifdef SRS_AUTO_KAFKA - if ((ret = kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != ERROR_SUCCESS) { + if ((ret = _srs_kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != ERROR_SUCCESS) { srs_error("kafka handler on_client failed. ret=%d", ret); return ret; } @@ -1558,7 +1551,7 @@ int SrsRtmpConn::on_disconnect() http_hooks_on_close(); #ifdef SRS_AUTO_KAFKA - if ((ret = kafka->on_close(srs_id())) != ERROR_SUCCESS) { + if ((ret = _srs_kafka->on_close(srs_id())) != ERROR_SUCCESS) { srs_error("notify kafka failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 244f509b6..57ac967ff 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -138,16 +138,8 @@ private: int publish_normal_timeout; // whether enable the tcp_nodelay. bool tcp_nodelay; - // the kafka cluster -#ifdef SRS_AUTO_KAFKA - ISrsKafkaCluster* kafka; -#endif public: -#ifdef SRS_AUTO_KAFKA - SrsRtmpConn(SrsServer* svr, ISrsKafkaCluster* k, st_netfd_t c, std::string cip); -#else SrsRtmpConn(SrsServer* svr, st_netfd_t c, std::string cip); -#endif virtual ~SrsRtmpConn(); public: virtual void dispose(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 3847c2ee4..9ef91d7a8 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -511,9 +511,6 @@ SrsServer::SrsServer() #ifdef SRS_AUTO_INGEST ingester = NULL; #endif -#ifdef SRS_AUTO_KAFKA - kafka = new SrsKafkaProducer(); -#endif } SrsServer::~SrsServer() @@ -543,10 +540,6 @@ void SrsServer::destroy() srs_freep(ingester); #endif -#ifdef SRS_AUTO_KAFKA - srs_freep(kafka); -#endif - if (pid_fd > 0) { ::close(pid_fd); pid_fd = -1; @@ -570,7 +563,7 @@ void SrsServer::dispose() // @remark don't dispose ingesters, for too slow. #ifdef SRS_AUTO_KAFKA - kafka->stop(); + srs_dispose_kafka(); #endif // dispose the source for hls and dvr. @@ -655,6 +648,14 @@ int SrsServer::initialize_st() // set current log id. _srs_context->generate_id(); + // initialize the conponents that depends on st. +#ifdef SRS_AUTO_KAFKA + if ((ret = srs_initialize_kafka()) != ERROR_SUCCESS) { + srs_error("initialize kafka failed, ret=%d", ret); + return ret; + } +#endif + // check asprocess. bool asprocess = _srs_config->get_asprocess(); if (asprocess && ppid == 1) { @@ -876,25 +877,6 @@ int SrsServer::ingest() return ret; } -int SrsServer::start_kafka() -{ - int ret = ERROR_SUCCESS; - -#ifdef SRS_AUTO_KAFKA - if ((ret = kafka->initialize()) != ERROR_SUCCESS) { - srs_error("initialize the kafka producer failed. ret=%d", ret); - return ret; - } - - if ((ret = kafka->start()) != ERROR_SUCCESS) { - srs_error("start kafka failed. ret=%d", ret); - return ret; - } -#endif - - return ret; -} - int SrsServer::cycle() { int ret = ERROR_SUCCESS; @@ -1338,7 +1320,7 @@ SrsConnection* SrsServer::fd2conn(SrsListenerType type, st_netfd_t stfd) SrsConnection* conn = NULL; if (type == SrsListenerRtmpStream) { - conn = new SrsRtmpConn(this, kafka, stfd, ip); + conn = new SrsRtmpConn(this, stfd, ip); } else if (type == SrsListenerHttpApi) { #ifdef SRS_AUTO_HTTP_API conn = new SrsHttpApi(this, stfd, http_api_mux, ip); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 415d432e1..6883845c8 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -254,9 +254,6 @@ private: #ifdef SRS_AUTO_INGEST SrsIngester* ingester; #endif -#ifdef SRS_AUTO_KAFKA - SrsKafkaProducer* kafka; -#endif private: /** * the pid file fd, lock the file write when server is running. @@ -319,7 +316,6 @@ public: virtual int register_signal(); virtual int http_handle(); virtual int ingest(); - virtual int start_kafka(); virtual int cycle(); // server utilities. public: diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index b0a1e9dea..0a35ef2d1 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -405,10 +405,6 @@ int run_master() return ret; } - if ((ret = _srs_server->start_kafka()) != ERROR_SUCCESS) { - return ret; - } - if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) { return ret; }