From 86a7db0adb6054efc372694aa8deffe3d29729e9 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 28 Dec 2015 17:15:44 +0800 Subject: [PATCH] refine edge ingester, use upstream adapter. --- trunk/src/app/srs_app_edge.cpp | 163 +++++++++++++++++++++---------- trunk/src/app/srs_app_edge.hpp | 34 ++++++- trunk/src/app/srs_app_source.cpp | 4 +- 3 files changed, 145 insertions(+), 56 deletions(-) diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 770dfcc94..6158fb02f 100755 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -62,69 +62,32 @@ using namespace std; // when edge error, wait for quit #define SRS_EDGE_FORWARDER_ERROR_US (int64_t)(50*1000LL) -SrsEdgeIngester::SrsEdgeIngester() +SrsEdgeUpstream::SrsEdgeUpstream() { - source = NULL; - edge = NULL; - req = NULL; - - sdk = new SrsSimpleRtmpClient(); - lb = new SrsLbRoundRobin(); - pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); } -SrsEdgeIngester::~SrsEdgeIngester() +SrsEdgeUpstream::~SrsEdgeUpstream() { - stop(); +} + +SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream() +{ + sdk = new SrsSimpleRtmpClient(); +} + +SrsEdgeRtmpUpstream::~SrsEdgeRtmpUpstream() +{ + close(); srs_freep(sdk); - srs_freep(lb); - srs_freep(pthread); } -int SrsEdgeIngester::initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r) +int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb) { int ret = ERROR_SUCCESS; - source = s; - edge = e; - req = r; + SrsRequest* req = r; - return ret; -} - -int SrsEdgeIngester::start() -{ - int ret = ERROR_SUCCESS; - - if ((ret = source->on_publish()) != ERROR_SUCCESS) { - srs_error("edge pull stream then publish to edge failed. ret=%d", ret); - return ret; - } - - return pthread->start(); -} - -void SrsEdgeIngester::stop() -{ - pthread->stop(); - sdk->close(); - - // notice to unpublish. - source->on_unpublish(); -} - -string SrsEdgeIngester::get_curr_origin() -{ - return lb->selected(); -} - -int SrsEdgeIngester::cycle() -{ - int ret = ERROR_SUCCESS; - - source->on_source_id_changed(_srs_context->get_id()); - std::string url; if (true) { SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); @@ -163,6 +126,98 @@ int SrsEdgeIngester::cycle() return ret; } + return ret; +} + +int SrsEdgeRtmpUpstream::recv_message(SrsCommonMessage** pmsg) +{ + return sdk->recv_message(pmsg); +} + +int SrsEdgeRtmpUpstream::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) +{ + return sdk->decode_message(msg, ppacket); +} + +void SrsEdgeRtmpUpstream::close() +{ + sdk->close(); +} + +void SrsEdgeRtmpUpstream::kbps_sample(const char* label, int64_t age) +{ + sdk->kbps_sample(label, age); +} + +SrsEdgeIngester::SrsEdgeIngester() +{ + source = NULL; + edge = NULL; + req = NULL; + + upstream = new SrsEdgeRtmpUpstream(); + lb = new SrsLbRoundRobin(); + pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); +} + +SrsEdgeIngester::~SrsEdgeIngester() +{ + stop(); + + srs_freep(upstream); + srs_freep(lb); + srs_freep(pthread); +} + +int SrsEdgeIngester::initialize(SrsSource* s, SrsPlayEdge* e, SrsRequest* r) +{ + int ret = ERROR_SUCCESS; + + source = s; + edge = e; + req = r; + + return ret; +} + +int SrsEdgeIngester::start() +{ + int ret = ERROR_SUCCESS; + + if ((ret = source->on_publish()) != ERROR_SUCCESS) { + srs_error("edge pull stream then publish to edge failed. ret=%d", ret); + return ret; + } + + return pthread->start(); +} + +void SrsEdgeIngester::stop() +{ + pthread->stop(); + upstream->close(); + + // notice to unpublish. + source->on_unpublish(); +} + +string SrsEdgeIngester::get_curr_origin() +{ + return lb->selected(); +} + +int SrsEdgeIngester::cycle() +{ + int ret = ERROR_SUCCESS; + + if ((ret = source->on_source_id_changed(_srs_context->get_id())) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = upstream->connect(req, lb)) != ERROR_SUCCESS) { + return ret; + } + if ((ret = edge->on_ingest_play()) != ERROR_SUCCESS) { return ret; } @@ -188,12 +243,12 @@ int SrsEdgeIngester::ingest() // pithy print if (pprint->can_print()) { - sdk->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age()); + upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age()); } // read from client. SrsCommonMessage* msg = NULL; - if ((ret = sdk->recv_message(&msg)) != ERROR_SUCCESS) { + if ((ret = upstream->recv_message(&msg)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("pull origin server message failed. ret=%d", ret); } @@ -244,7 +299,7 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) // process onMetaData if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { SrsPacket* pkt = NULL; - if ((ret = sdk->decode_message(msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = upstream->decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("decode onMetaData message failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index a7dd4af7e..58b7c40b9 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -49,6 +49,7 @@ class SrsKbps; class SrsLbRoundRobin; class SrsTcpClient; class SrsSimpleRtmpClient; +class SrsPacket; /** * the state of edge, auto machine @@ -75,6 +76,37 @@ enum SrsEdgeUserState SrsEdgeUserStateReloading = 100, }; +/** + * the upstream of edge, can be rtmp or http. + */ +class SrsEdgeUpstream +{ +public: + SrsEdgeUpstream(); + virtual ~SrsEdgeUpstream(); +public: + virtual int connect(SrsRequest* r, SrsLbRoundRobin* lb) = 0; + virtual int recv_message(SrsCommonMessage** pmsg) = 0; + virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) = 0; + virtual void close() = 0; + virtual void kbps_sample(const char* label, int64_t age) = 0; +}; + +class SrsEdgeRtmpUpstream : public SrsEdgeUpstream +{ +private: + SrsSimpleRtmpClient* sdk; +public: + SrsEdgeRtmpUpstream(); + virtual ~SrsEdgeRtmpUpstream(); +public: + virtual int connect(SrsRequest* r, SrsLbRoundRobin* lb); + virtual int recv_message(SrsCommonMessage** pmsg); + virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); + virtual void close(); + virtual void kbps_sample(const char* label, int64_t age); +}; + /** * edge used to ingest stream from origin. */ @@ -85,8 +117,8 @@ private: SrsPlayEdge* edge; SrsRequest* req; SrsReusableThread2* pthread; - SrsSimpleRtmpClient* sdk; SrsLbRoundRobin* lb; + SrsEdgeUpstream* upstream; public: SrsEdgeIngester(); virtual ~SrsEdgeIngester(); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 4de8a5d7d..d636a16a2 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -2054,7 +2054,9 @@ int SrsSource::on_publish() // whatever, the publish thread is the source or edge source, // save its id to srouce id. - on_source_id_changed(_srs_context->get_id()); + if ((ret = on_source_id_changed(_srs_context->get_id())) != ERROR_SUCCESS) { + return ret; + } // reset the mix queue. mix_queue->clear();