diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 3c7394031..9cc00f865 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -117,7 +117,7 @@ int SrsBufferCache::cycle() // the stream cache will create consumer to cache stream, // which will trigger to fetch stream from origin for edge. SrsConsumer* consumer = NULL; - if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) { + if ((ret = source->create_consumer(NULL, consumer, false, false, true)) != ERROR_SUCCESS) { srs_error("http: create consumer failed. ret=%d", ret); return ret; } @@ -484,7 +484,7 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) // create consumer of souce, ignore gop cache, use the audio gop cache. SrsConsumer* consumer = NULL; - if ((ret = source->create_consumer(consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) { + if ((ret = source->create_consumer(NULL, consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) { srs_error("http: create consumer failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 94fa2f0b7..6d95f313b 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -832,7 +832,7 @@ int SrsRtmpConn::playing(SrsSource* source) // create consumer of souce. SrsConsumer* consumer = NULL; - if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) { + if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) { srs_error("create consumer failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index e926af7a3..4de8a5d7d 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -419,9 +419,10 @@ ISrsWakable::~ISrsWakable() { } -SrsConsumer::SrsConsumer(SrsSource* _source) +SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c) { - source = _source; + source = s; + conn = c; paused = false; jitter = new SrsRtmpJitter(); queue = new SrsMessageQueue(); @@ -2157,11 +2158,11 @@ void SrsSource::on_unpublish() handler->on_unpublish(this, req); } -int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg) +int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) { int ret = ERROR_SUCCESS; - consumer = new SrsConsumer(this); + consumer = new SrsConsumer(this, conn); consumers.push_back(consumer); double queue_size = _srs_config->get_queue_length(req->vhost); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 3d2468e33..275555a95 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -52,6 +52,7 @@ class SrsRtmpServer; class SrsEdgeProxyContext; class SrsMessageArray; class SrsNgExec; +class SrsConnection; #ifdef SRS_AUTO_HLS class SrsHls; #endif @@ -225,6 +226,8 @@ private: SrsRtmpJitter* jitter; SrsSource* source; SrsMessageQueue* queue; + // the owner connection for debug, maybe NULL. + SrsConnection* conn; bool paused; // when source id changed, notice all consumers bool should_update_source_id; @@ -237,7 +240,7 @@ private: int mw_duration; #endif public: - SrsConsumer(SrsSource* _source); + SrsConsumer(SrsSource* s, SrsConnection* c); virtual ~SrsConsumer(); public: /** @@ -575,7 +578,7 @@ public: * @param dg, whether dumps the gop cache. */ virtual int create_consumer( - SrsConsumer*& consumer, + SrsConnection* conn, SrsConsumer*& consumer, bool ds = true, bool dm = true, bool dg = true ); virtual void on_consumer_destroy(SrsConsumer* consumer);