From 79c9c6dcb7ad99614c7b8d472f20e20723a81fc3 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 27 Apr 2014 11:11:15 +0800 Subject: [PATCH] implements the proxy for edge publish mode --- trunk/src/app/srs_app_edge.cpp | 159 ++++++++++++++------- trunk/src/app/srs_app_edge.hpp | 32 +++-- trunk/src/app/srs_app_rtmp_conn.cpp | 26 ++++ trunk/src/app/srs_app_source.cpp | 5 + trunk/src/app/srs_app_source.hpp | 5 + trunk/src/rtmp/srs_protocol_rtmp_stack.cpp | 21 ++- trunk/src/rtmp/srs_protocol_rtmp_stack.hpp | 4 + 7 files changed, 187 insertions(+), 65 deletions(-) diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 21f7cd106..f50030294 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -41,12 +41,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // when error, edge ingester sleep for a while and retry. #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL) // when edge timeout, retry next. -#define SRS_EDGE_TIMEOUT_US (int64_t)(3*1000*1000LL) +#define SRS_EDGE_INGESTER_TIMEOUT_US (int64_t)(3*1000*1000LL) SrsEdgeIngester::SrsEdgeIngester() { @@ -146,7 +147,7 @@ int SrsEdgeIngester::ingest() { int ret = ERROR_SUCCESS; - client->set_recv_timeout(SRS_EDGE_TIMEOUT_US); + client->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US); SrsPithyPrint pithy_print(SRS_STAGE_EDGE); @@ -306,6 +307,21 @@ int SrsEdgeIngester::connect_server() return ret; } +SrsEdgeProxyContext::SrsEdgeProxyContext() +{ + edge_stream_id = 0; + edge_io = NULL; + edge_rtmp = NULL; + + origin_stream_id = 0; + origin_io = NULL; + origin_rtmp = NULL; +} + +SrsEdgeProxyContext::~SrsEdgeProxyContext() +{ +} + SrsEdgeForwarder::SrsEdgeForwarder() { io = NULL; @@ -315,14 +331,11 @@ SrsEdgeForwarder::SrsEdgeForwarder() origin_index = 0; stream_id = 0; stfd = NULL; - pthread = new SrsThread(this, SRS_EDGE_INGESTER_SLEEP_US); } SrsEdgeForwarder::~SrsEdgeForwarder() { stop(); - - srs_freep(pthread); } int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req) @@ -337,21 +350,6 @@ int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsReq } int SrsEdgeForwarder::start() -{ - return pthread->start(); -} - -void SrsEdgeForwarder::stop() -{ - pthread->stop(); - - close_underlayer_socket(); - - srs_freep(client); - srs_freep(io); -} - -int SrsEdgeForwarder::cycle() { int ret = ERROR_SUCCESS; @@ -378,37 +376,36 @@ int SrsEdgeForwarder::cycle() return ret; } - if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) { + if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) { srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d", req->stream.c_str(), stream_id, ret); return ret; } - if ((ret = _source->on_publish()) != ERROR_SUCCESS) { - srs_error("edge ingester play stream then publish to edge failed. ret=%d", ret); - return ret; - } - - if ((ret = _edge->on_forward_publish()) != ERROR_SUCCESS) { - return ret; - } - - if ((ret = forward()) != ERROR_SUCCESS) { - return ret; - } - return ret; } -int SrsEdgeForwarder::forward() +void SrsEdgeForwarder::stop() +{ + close_underlayer_socket(); + + srs_freep(client); + srs_freep(io); +} + +int SrsEdgeForwarder::proxy(SrsEdgeProxyContext* context) { int ret = ERROR_SUCCESS; - client->set_recv_timeout(SRS_EDGE_TIMEOUT_US); + context->origin_io = io; + context->origin_rtmp = client; + context->origin_stream_id = stream_id; + + client->set_recv_timeout(SRS_PULSE_TIMEOUT_US); SrsPithyPrint pithy_print(SRS_STAGE_EDGE); - while (pthread->can_loop()) { + while (true) { // switch to other st-threads. st_usleep(0); @@ -419,17 +416,59 @@ int SrsEdgeForwarder::forward() srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); } - - // read from client. - SrsCommonMessage* msg = NULL; - if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) { - srs_error("recv origin server message failed. ret=%d", ret); + + if ((ret = proxy_message(context)) != ERROR_SUCCESS) { return ret; } - srs_verbose("edge loop recv message. ret=%d", ret); - - srs_assert(msg); - SrsAutoFree(SrsCommonMessage, msg, false); + } + + return ret; +} + +int SrsEdgeForwarder::proxy_message(SrsEdgeProxyContext* context) +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = NULL; + + // proxy origin message to client + msg = NULL; + ret = context->origin_rtmp->recv_message(&msg); + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { + srs_error("recv origin server message failed. ret=%d", ret); + return ret; + } + + if (msg) { + if (msg->size <= 0) { + srs_freep(msg); + } else { + msg->header.stream_id = context->edge_stream_id; + if ((ret = context->edge_rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send origin message to client failed. ret=%d", ret); + return ret; + } + } + } + + // proxy client message to origin + msg = NULL; + ret = context->edge_rtmp->recv_message(&msg); + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { + srs_error("recv client message failed. ret=%d", ret); + return ret; + } + + if (msg) { + if (msg->size <= 0) { + srs_freep(msg); + } else { + msg->header.stream_id = context->origin_stream_id; + if ((ret = context->origin_rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send client message to origin failed. ret=%d", ret); + return ret; + } + } } return ret; @@ -620,21 +659,33 @@ int SrsPublishEdge::on_client_publish() // error state. if (user_state != SrsEdgeUserStateInit) { ret = ERROR_RTMP_EDGE_PUBLISH_STATE; - srs_error("invalid state for client to play stream on edge. " + srs_error("invalid state for client to publish stream on edge. " "state=%d, user_state=%d, ret=%d", state, user_state, ret); return ret; } - // start ingest when init state. - if (state == SrsEdgeStateInit) { - state = SrsEdgeStatePublish; + // error when not init state. + if (state != SrsEdgeStateInit) { + ret = ERROR_RTMP_EDGE_PUBLISH_STATE; + srs_error("invalid state for client to publish stream on edge. " + "state=%d, user_state=%d, ret=%d", state, user_state, ret); + return ret; } - - return ret; + + SrsEdgeState pstate = state; + state = SrsEdgeStatePublish; + srs_trace("edge change from %d to state %d (forward publish).", pstate, state); + + return forwarder->start(); } -int SrsPublishEdge::on_forward_publish() +int SrsPublishEdge::on_proxy_publish(SrsEdgeProxyContext* context) { - int ret = ERROR_SUCCESS; + int ret = forwarder->proxy(context); + + SrsEdgeState pstate = state; + state = SrsEdgeStateInit; + srs_trace("edge change from %d to state %d (init).", pstate, state); + return ret; } diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index e5d274771..15663f15d 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -33,6 +33,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +class SrsSocket; +class SrsRtmpServer; class SrsSource; class SrsRequest; class SrsPlayEdge; @@ -55,8 +57,6 @@ enum SrsEdgeState // for publish edge SrsEdgeStatePublish = 200, - // publish stream to edge, forward to origin - SrsEdgeStateForwardConnected, }; /** @@ -101,10 +101,25 @@ private: virtual int process_publish_message(SrsCommonMessage* msg); }; +class SrsEdgeProxyContext +{ +public: + int edge_stream_id; + ISrsProtocolReaderWriter* edge_io; + SrsRtmpServer* edge_rtmp; +public: + int origin_stream_id; + ISrsProtocolReaderWriter* origin_io; + SrsRtmpClient* origin_rtmp; +public: + SrsEdgeProxyContext(); + virtual ~SrsEdgeProxyContext(); +}; + /** * edge used to forward stream to origin. */ -class SrsEdgeForwarder : public ISrsThreadHandler +class SrsEdgeForwarder { private: int stream_id; @@ -112,7 +127,6 @@ private: SrsSource* _source; SrsPublishEdge* _edge; SrsRequest* _req; - SrsThread* pthread; st_netfd_t stfd; ISrsProtocolReaderWriter* io; SrsRtmpClient* client; @@ -124,11 +138,10 @@ public: virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); virtual int start(); virtual void stop(); -// interface ISrsThreadHandler public: - virtual int cycle(); + virtual int proxy(SrsEdgeProxyContext* context); private: - virtual int forward(); + virtual int proxy_message(SrsEdgeProxyContext* context); virtual void close_underlayer_socket(); virtual int connect_server(); }; @@ -182,11 +195,10 @@ public: * when client publish stream on edge. */ virtual int on_client_publish(); -public: /** - * when forwarder start to publish stream. + * proxy publish stream to edge */ - virtual int on_forward_publish(); + virtual int on_proxy_publish(SrsEdgeProxyContext* context); }; #endif diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 3dc8de6d6..98ec71d8c 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -42,6 +42,7 @@ using namespace std; #include #include #include +#include // when stream is busy, for example, streaming is already // publishing, when a new client to request to publish, @@ -332,6 +333,15 @@ int SrsRtmpConn::stream_service_cycle() srs_error("start to publish stream failed. ret=%d", ret); return ret; } + + SrsEdgeProxyContext context; + context.edge_io = skt; + context.edge_stream_id = res->stream_id; + context.edge_rtmp = rtmp; + if (vhost_is_edge) { + return source->on_edge_proxy_publish(&context); + } + if ((ret = on_publish()) != ERROR_SUCCESS) { srs_error("http hook on_publish failed. ret=%d", ret); return ret; @@ -345,10 +355,26 @@ int SrsRtmpConn::stream_service_cycle() case SrsRtmpConnFlashPublish: { srs_verbose("flash start to publish stream %s.", req->stream.c_str()); + if (vhost_is_edge) { + if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { + srs_error("notice edge start publish stream failed. ret=%d", ret); + return ret; + } + } + if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("flash start to publish stream failed. ret=%d", ret); return ret; } + + SrsEdgeProxyContext context; + context.edge_io = skt; + context.edge_stream_id = res->stream_id; + context.edge_rtmp = rtmp; + if (vhost_is_edge) { + return source->on_edge_proxy_publish(&context); + } + if ((ret = on_publish()) != ERROR_SUCCESS) { srs_error("http hook on_publish failed. ret=%d", ret); return ret; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index b29d821fd..41a1ad4c8 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1199,6 +1199,11 @@ int SrsSource::on_edge_start_publish() return publish_edge->on_client_publish(); } +int SrsSource::on_edge_proxy_publish(SrsEdgeProxyContext* context) +{ + return publish_edge->on_proxy_publish(context); +} + int SrsSource::create_forwarders() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index a7c83217e..2f8a6a477 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -45,6 +45,9 @@ class SrsOnMetaDataPacket; class SrsSharedPtrMessage; class SrsForwarder; class SrsRequest; +class SrsSocket; +class SrsRtmpServer; +class SrsEdgeProxyContext; #ifdef SRS_AUTO_HLS class SrsHls; #endif @@ -318,6 +321,8 @@ public: virtual int on_edge_start_play(); // for edge, when publish edge stream, check the state virtual int on_edge_start_publish(); + // for edge, proxy the publish + virtual int on_edge_proxy_publish(SrsEdgeProxyContext* context); private: virtual int create_forwarders(); virtual void destroy_forwarders(); diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp index 28addb7cc..ca71f6bcb 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp @@ -652,11 +652,17 @@ int SrsProtocol::on_send_message(ISrsMessage* msg) } SrsCommonMessage* common_msg = dynamic_cast(msg); - if (!msg) { + if (!common_msg) { srs_verbose("ignore the shared ptr message."); return ret; } + // for proxy, the common msg is not decoded, ignore. + if (!common_msg->has_packet()) { + srs_verbose("ignore the proxy common message."); + return ret; + } + srs_assert(common_msg != NULL); switch (common_msg->header.message_type) { @@ -1459,6 +1465,11 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol) return ret; } +bool SrsCommonMessage::has_packet() +{ + return packet != NULL; +} + SrsPacket* SrsCommonMessage::get_packet() { if (!packet) { @@ -1501,6 +1512,14 @@ int SrsCommonMessage::encode_packet() { int ret = ERROR_SUCCESS; + // sometimes, for example, the edge proxy, + // the payload is not decoded, so directly sent out. + if (payload != NULL) { + header.payload_length = size; + return ret; + } + + // encode packet to payload and size. if (packet == NULL) { srs_warn("packet is empty, send out empty message."); return ret; diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp index 1486e014d..834ee1579 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp @@ -352,6 +352,10 @@ public: // TODO: use protocol to decode it. virtual int decode_packet(SrsProtocol* protocol); /** + * whether msg has decoded packet. + */ + virtual bool has_packet(); + /** * get the decoded packet which decoded by decode_packet(). * @remark, user never free the pkt, the message will auto free it. */