From 73abb1a31d8922f3ddba41b668d5f0631f0a1d47 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 11 Nov 2014 17:04:56 +0800 Subject: [PATCH] Revert "for bug #194, add pipe to consumer." This reverts commit 1e601a6efc86b1ac91bbc9940513c259f478cbdc. --- trunk/src/app/srs_app_pipe.cpp | 5 ---- trunk/src/app/srs_app_pipe.hpp | 4 --- trunk/src/app/srs_app_source.cpp | 49 ++------------------------------ trunk/src/app/srs_app_source.hpp | 16 ----------- 4 files changed, 2 insertions(+), 72 deletions(-) diff --git a/trunk/src/app/srs_app_pipe.cpp b/trunk/src/app/srs_app_pipe.cpp index d8a4e0367..56823ffe0 100644 --- a/trunk/src/app/srs_app_pipe.cpp +++ b/trunk/src/app/srs_app_pipe.cpp @@ -66,11 +66,6 @@ int SrsPipe::initialize() return ret; } -st_netfd_t SrsPipe::rfd() -{ - return read_stfd; -} - bool SrsPipe::already_written() { return _already_written; diff --git a/trunk/src/app/srs_app_pipe.hpp b/trunk/src/app/srs_app_pipe.hpp index f7e080310..088bf32fa 100644 --- a/trunk/src/app/srs_app_pipe.hpp +++ b/trunk/src/app/srs_app_pipe.hpp @@ -57,10 +57,6 @@ public: * initialize pipes, open fds. */ virtual int initialize(); - /** - * get the read fd to poll. - */ - virtual st_netfd_t rfd(); public: /** * for event based service, whether already writen data. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 7ec07353a..64fd2d0e0 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -41,7 +41,6 @@ using namespace std; #include #include #include -#include #define CONST_MAX_JITTER_MS 500 #define DEFAULT_FRAME_TIME_MS 40 @@ -172,11 +171,6 @@ void SrsMessageQueue::set_queue_size(double queue_size) queue_size_ms = (int)(queue_size * 1000); } -bool SrsMessageQueue::empty() -{ - return msgs.size() == 0; -} - int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -296,7 +290,6 @@ SrsConsumer::SrsConsumer(SrsSource* _source) jitter = new SrsRtmpJitter(); queue = new SrsMessageQueue(); should_update_source_id = false; - pipe = new SrsPipe(); } SrsConsumer::~SrsConsumer() @@ -306,23 +299,6 @@ SrsConsumer::~SrsConsumer() srs_freep(queue); } -int SrsConsumer::initialize() -{ - int ret = ERROR_SUCCESS; - - if ((ret = pipe->initialize()) != ERROR_SUCCESS) { - srs_error("initialize the pipe for consumer failed. ret=%d", ret); - return ret; - } - - return ret; -} - -st_netfd_t SrsConsumer::pipe_fd() -{ - return pipe->rfd(); -} - void SrsConsumer::set_queue_size(double queue_size) { queue->set_queue_size(queue_size); @@ -353,18 +329,11 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S return ret; } - // notify the rtmp connection to resume to send packet. - if (!pipe->already_written()) { - pipe->active(); - } - return ret; } int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) { - int ret = ERROR_SUCCESS; - srs_assert(max_count > 0); if (should_update_source_id) { @@ -377,15 +346,7 @@ int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& c return ERROR_SUCCESS; } - if ((ret = queue->dump_packets(max_count, pmsgs, count)) != ERROR_SUCCESS) { - return ret; - } - - if (queue->empty()) { - return pipe->reset(); - } - - return ret; + return queue->dump_packets(max_count, pmsgs, count); } int SrsConsumer::on_play_client_pause(bool is_pause) @@ -1493,13 +1454,7 @@ void SrsSource::on_unpublish() { int ret = ERROR_SUCCESS; - SrsConsumer* c = new SrsConsumer(this); - if ((ret = c->initialize()) != ERROR_SUCCESS) { - srs_freep(c); - return ret; - } - - consumer = c; + consumer = new SrsConsumer(this); consumers.push_back(consumer); double queue_size = _srs_config->get_queue_length(_req->vhost); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 4d4a510d6..1759ebf7e 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -58,7 +58,6 @@ class SrsDvr; class SrsEncoder; #endif class SrsStream; -class SrsPipe; /** * the time jitter algorithm: @@ -122,10 +121,6 @@ public: */ virtual void set_queue_size(double queue_size); public: - /** - * whether queue is empty. - */ - virtual bool empty(); /** * enqueue the message, the timestamp always monotonically. * @param msg, the msg to enqueue, user never free it whatever the return code. @@ -153,7 +148,6 @@ private: class SrsConsumer { private: - SrsPipe* pipe; SrsRtmpJitter* jitter; SrsSource* source; SrsMessageQueue* queue; @@ -163,16 +157,6 @@ private: public: SrsConsumer(SrsSource* _source); virtual ~SrsConsumer(); -public: - /** - * initialize the consumer. - */ - virtual int initialize(); - /** - * source can use this fd to poll with the read event, - * for performance issue, @see: https://github.com/winlinvip/simple-rtmp-server/issues/194 - */ - virtual st_netfd_t pipe_fd(); public: /** * set the size of queue.