diff --git a/trunk/src/app/srs_app_pipe.cpp b/trunk/src/app/srs_app_pipe.cpp index 56823ffe0..d8a4e0367 100644 --- a/trunk/src/app/srs_app_pipe.cpp +++ b/trunk/src/app/srs_app_pipe.cpp @@ -66,6 +66,11 @@ int SrsPipe::initialize() return ret; } +st_netfd_t SrsPipe::rfd() +{ + return read_stfd; +} + bool SrsPipe::already_written() { return _already_written; diff --git a/trunk/src/app/srs_app_pipe.hpp b/trunk/src/app/srs_app_pipe.hpp index 088bf32fa..f7e080310 100644 --- a/trunk/src/app/srs_app_pipe.hpp +++ b/trunk/src/app/srs_app_pipe.hpp @@ -57,6 +57,10 @@ public: * initialize pipes, open fds. */ virtual int initialize(); + /** + * get the read fd to poll. + */ + virtual st_netfd_t rfd(); public: /** * for event based service, whether already writen data. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 64fd2d0e0..7ec07353a 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -41,6 +41,7 @@ using namespace std; #include #include #include +#include #define CONST_MAX_JITTER_MS 500 #define DEFAULT_FRAME_TIME_MS 40 @@ -171,6 +172,11 @@ void SrsMessageQueue::set_queue_size(double queue_size) queue_size_ms = (int)(queue_size * 1000); } +bool SrsMessageQueue::empty() +{ + return msgs.size() == 0; +} + int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -290,6 +296,7 @@ SrsConsumer::SrsConsumer(SrsSource* _source) jitter = new SrsRtmpJitter(); queue = new SrsMessageQueue(); should_update_source_id = false; + pipe = new SrsPipe(); } SrsConsumer::~SrsConsumer() @@ -299,6 +306,23 @@ SrsConsumer::~SrsConsumer() srs_freep(queue); } +int SrsConsumer::initialize() +{ + int ret = ERROR_SUCCESS; + + if ((ret = pipe->initialize()) != ERROR_SUCCESS) { + srs_error("initialize the pipe for consumer failed. ret=%d", ret); + return ret; + } + + return ret; +} + +st_netfd_t SrsConsumer::pipe_fd() +{ + return pipe->rfd(); +} + void SrsConsumer::set_queue_size(double queue_size) { queue->set_queue_size(queue_size); @@ -329,11 +353,18 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S return ret; } + // notify the rtmp connection to resume to send packet. + if (!pipe->already_written()) { + pipe->active(); + } + return ret; } int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) { + int ret = ERROR_SUCCESS; + srs_assert(max_count > 0); if (should_update_source_id) { @@ -346,7 +377,15 @@ int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& c return ERROR_SUCCESS; } - return queue->dump_packets(max_count, pmsgs, count); + if ((ret = queue->dump_packets(max_count, pmsgs, count)) != ERROR_SUCCESS) { + return ret; + } + + if (queue->empty()) { + return pipe->reset(); + } + + return ret; } int SrsConsumer::on_play_client_pause(bool is_pause) @@ -1454,7 +1493,13 @@ void SrsSource::on_unpublish() { int ret = ERROR_SUCCESS; - consumer = new SrsConsumer(this); + SrsConsumer* c = new SrsConsumer(this); + if ((ret = c->initialize()) != ERROR_SUCCESS) { + srs_freep(c); + return ret; + } + + consumer = c; consumers.push_back(consumer); double queue_size = _srs_config->get_queue_length(_req->vhost); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 1759ebf7e..4d4a510d6 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -58,6 +58,7 @@ class SrsDvr; class SrsEncoder; #endif class SrsStream; +class SrsPipe; /** * the time jitter algorithm: @@ -121,6 +122,10 @@ public: */ virtual void set_queue_size(double queue_size); public: + /** + * whether queue is empty. + */ + virtual bool empty(); /** * enqueue the message, the timestamp always monotonically. * @param msg, the msg to enqueue, user never free it whatever the return code. @@ -148,6 +153,7 @@ private: class SrsConsumer { private: + SrsPipe* pipe; SrsRtmpJitter* jitter; SrsSource* source; SrsMessageQueue* queue; @@ -157,6 +163,16 @@ private: public: SrsConsumer(SrsSource* _source); virtual ~SrsConsumer(); +public: + /** + * initialize the consumer. + */ + virtual int initialize(); + /** + * source can use this fd to poll with the read event, + * for performance issue, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 + */ + virtual st_netfd_t pipe_fd(); public: /** * set the size of queue.