1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for bug #194, add pipe to consumer.

This commit is contained in:
winlin 2014-11-11 16:27:35 +08:00
parent f9756ea14c
commit 1e601a6efc
4 changed files with 72 additions and 2 deletions

View file

@ -41,6 +41,7 @@ using namespace std;
#include <srs_app_edge.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_avc_aac.hpp>
#include <srs_app_pipe.hpp>
#define CONST_MAX_JITTER_MS 500
#define DEFAULT_FRAME_TIME_MS 40
@ -171,6 +172,11 @@ void SrsMessageQueue::set_queue_size(double queue_size)
queue_size_ms = (int)(queue_size * 1000);
}
bool SrsMessageQueue::empty()
{
return msgs.size() == 0;
}
int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
@ -290,6 +296,7 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
jitter = new SrsRtmpJitter();
queue = new SrsMessageQueue();
should_update_source_id = false;
pipe = new SrsPipe();
}
SrsConsumer::~SrsConsumer()
@ -299,6 +306,23 @@ SrsConsumer::~SrsConsumer()
srs_freep(queue);
}
int SrsConsumer::initialize()
{
int ret = ERROR_SUCCESS;
if ((ret = pipe->initialize()) != ERROR_SUCCESS) {
srs_error("initialize the pipe for consumer failed. ret=%d", ret);
return ret;
}
return ret;
}
st_netfd_t SrsConsumer::pipe_fd()
{
return pipe->rfd();
}
void SrsConsumer::set_queue_size(double queue_size)
{
queue->set_queue_size(queue_size);
@ -329,11 +353,18 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
return ret;
}
// notify the rtmp connection to resume to send packet.
if (!pipe->already_written()) {
pipe->active();
}
return ret;
}
int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count)
{
int ret = ERROR_SUCCESS;
srs_assert(max_count > 0);
if (should_update_source_id) {
@ -346,7 +377,15 @@ int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& c
return ERROR_SUCCESS;
}
return queue->dump_packets(max_count, pmsgs, count);
if ((ret = queue->dump_packets(max_count, pmsgs, count)) != ERROR_SUCCESS) {
return ret;
}
if (queue->empty()) {
return pipe->reset();
}
return ret;
}
int SrsConsumer::on_play_client_pause(bool is_pause)
@ -1454,7 +1493,13 @@ void SrsSource::on_unpublish()
{
int ret = ERROR_SUCCESS;
consumer = new SrsConsumer(this);
SrsConsumer* c = new SrsConsumer(this);
if ((ret = c->initialize()) != ERROR_SUCCESS) {
srs_freep(c);
return ret;
}
consumer = c;
consumers.push_back(consumer);
double queue_size = _srs_config->get_queue_length(_req->vhost);