diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 028b5d218..c2882be55 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -45,51 +45,47 @@ using namespace std; #include #include #include +#include // when error, forwarder sleep for a while and retry. #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL) -SrsForwarder::SrsForwarder(SrsSource* _source) +SrsForwarder::SrsForwarder(SrsSource* s) { - source = _source; + source = s; - _req = NULL; - client = NULL; - transport = new SrsTcpClient(); - kbps = new SrsKbps(); - stream_id = 0; + req = NULL; + sh_video = sh_audio = NULL; + sdk = new SrsSimpleRtmpClient(); pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US); queue = new SrsMessageQueue(); jitter = new SrsRtmpJitter(); - - sh_video = sh_audio = NULL; } SrsForwarder::~SrsForwarder() { on_unpublish(); - srs_freep(transport); + srs_freep(sdk); srs_freep(pthread); srs_freep(queue); srs_freep(jitter); - srs_freep(kbps); srs_freep(sh_video); srs_freep(sh_audio); } -int SrsForwarder::initialize(SrsRequest* req, string ep_forward) +int SrsForwarder::initialize(SrsRequest* r, string ep) { int ret = ERROR_SUCCESS; // it's ok to use the request object, // SrsSource already copy it and never delete it. - _req = req; + req = r; // the ep(endpoint) to forward to - _ep_forward = ep_forward; + ep_forward = ep; return ret; } @@ -103,12 +99,17 @@ int SrsForwarder::on_publish() { int ret = ERROR_SUCCESS; - SrsRequest* req = _req; - // discovery the server port and tcUrl from req and ep_forward. - int port; - std::string server, tc_url; - discovery_ep(server, port, tc_url); + std::string server; + std::string tcUrl; + int port = SRS_CONSTS_RTMP_DEFAULT_PORT; + if (true) { + // parse host:port from hostport. + srs_parse_hostport(ep_forward, server, port); + + // generate tcUrl + tcUrl = srs_generate_tc_url(server, req->vhost, req->app, port, req->param); + } // dead loop check std::string source_ep = "rtmp://"; @@ -119,7 +120,7 @@ int SrsForwarder::on_publish() source_ep += req->vhost; std::string dest_ep = "rtmp://"; - if (_ep_forward == SRS_CONSTS_LOCALHOST) { + if (ep_forward == SRS_CONSTS_LOCALHOST) { dest_ep += req->host; } else { dest_ep += server; @@ -136,7 +137,7 @@ int SrsForwarder::on_publish() return ret; } srs_trace("start forward %s to %s, tcUrl=%s, stream=%s", - source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(), + source_ep.c_str(), dest_ep.c_str(), tcUrl.c_str(), req->stream.c_str()); if ((ret = pthread->start()) != ERROR_SUCCESS) { @@ -151,10 +152,7 @@ int SrsForwarder::on_publish() void SrsForwarder::on_unpublish() { pthread->stop(); - transport->close(); - - srs_freep(client); - kbps->set_io(NULL, NULL); + sdk->close(); } int SrsForwarder::on_meta_data(SrsSharedPtrMessage* shared_metadata) @@ -228,32 +226,26 @@ int SrsForwarder::cycle() { int ret = ERROR_SUCCESS; - std::string ep_server; - int ep_port; - if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) { - return ret; + std::string url; + if (true) { + std::string server; + int port = SRS_CONSTS_RTMP_DEFAULT_PORT; + + // parse host:port from hostport. + srs_parse_hostport(ep_forward, server, port); + + // generate url + url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream); } - srs_assert(client); - - client->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT_US); - client->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT_US); - if ((ret = client->handshake()) != ERROR_SUCCESS) { - srs_error("handshake with server failed. ret=%d", ret); - return ret; - } - if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) { - srs_error("connect with server failed. ret=%d", ret); - return ret; - } - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { - srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); + int64_t cto = SRS_FORWARDER_SLEEP_US; + int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US; + if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { + srs_warn("forward failed, url=%s, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); return ret; } - if ((ret = client->publish(_req->stream, stream_id)) != ERROR_SUCCESS) { - srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d", - _req->stream.c_str(), stream_id, ret); + if ((ret = sdk->publish()) != ERROR_SUCCESS) { return ret; } @@ -269,106 +261,12 @@ int SrsForwarder::cycle() return ret; } -void SrsForwarder::discovery_ep(string& server, int& port, string& tc_url) -{ - SrsRequest* req = _req; - - port = SRS_CONSTS_RTMP_DEFAULT_PORT; - srs_parse_hostport(_ep_forward, server, port); - - // generate tcUrl - tc_url = srs_generate_tc_url(server, req->vhost, req->app, port, req->param); -} - -int SrsForwarder::connect_server(string& ep_server, int& ep_port) -{ - int ret = ERROR_SUCCESS; - - // reopen - transport->close(); - - // discovery the server port and tcUrl from req and ep_forward. - string tc_url; - discovery_ep(ep_server, ep_port, tc_url); - - // open socket. - int64_t timeout = SRS_FORWARDER_SLEEP_US; - if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) { - srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", - _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret); - return ret; - } - - srs_freep(client); - client = new SrsRtmpClient(transport); - - kbps->set_io(transport, transport); - - srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d", - _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port); - - return ret; -} - -// TODO: FIXME: refine the connect_app. -int SrsForwarder::connect_app(string ep_server, int ep_port) -{ - int ret = ERROR_SUCCESS; - - SrsRequest* req = _req; - - // args of request takes the srs info. - if (req->args == NULL) { - req->args = SrsAmf0Any::object(); - } - - // notify server the edge identity, - // @see https://github.com/simple-rtmp-server/srs/issues/147 - SrsAmf0Object* data = req->args; - data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY)); - data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER)); - data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE)); - data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE)); - data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL)); - data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); - data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB)); - data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL)); - data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT)); - data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY)); - data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS)); - // for edge to directly get the id of client. - data->set("srs_pid", SrsAmf0Any::number(getpid())); - data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id())); - - // local ip of edge - std::vector ips = srs_get_local_ipv4_ips(); - assert(_srs_config->get_stats_network() < (int)ips.size()); - std::string local_ip = ips[_srs_config->get_stats_network()]; - data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str())); - - // generate the tcUrl - std::string param = ""; - std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param); - - // upnode server identity will show in the connect_app of client. - // @see https://github.com/simple-rtmp-server/srs/issues/160 - // the debug_srs_upnode is config in vhost and default to true. - bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost); - if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) { - srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d", - tc_url.c_str(), debug_srs_upnode, ret); - return ret; - } - - return ret; -} - #define SYS_MAX_FORWARD_SEND_MSGS 128 int SrsForwarder::forward() { int ret = ERROR_SUCCESS; - client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); SrsPithyPrint* pprint = SrsPithyPrint::create_forwarder(); SrsAutoFree(SrsPithyPrint, pprint); @@ -378,13 +276,13 @@ int SrsForwarder::forward() // update sequence header // TODO: FIXME: maybe need to zero the sequence header timestamp. if (sh_video) { - if ((ret = client->send_and_free_message(sh_video->copy(), stream_id)) != ERROR_SUCCESS) { + if ((ret = sdk->send_and_free_message(sh_video->copy())) != ERROR_SUCCESS) { srs_error("forwarder send sh_video to server failed. ret=%d", ret); return ret; } } if (sh_audio) { - if ((ret = client->send_and_free_message(sh_audio->copy(), stream_id)) != ERROR_SUCCESS) { + if ((ret = sdk->send_and_free_message(sh_audio->copy())) != ERROR_SUCCESS) { srs_error("forwarder send sh_audio to server failed. ret=%d", ret); return ret; } @@ -396,7 +294,7 @@ int SrsForwarder::forward() // read from client. if (true) { SrsCommonMessage* msg = NULL; - ret = client->recv_message(&msg); + ret = sdk->recv_message(&msg); srs_verbose("play loop recv message. ret=%d", ret); if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { @@ -417,12 +315,7 @@ int SrsForwarder::forward() // pithy print if (pprint->can_print()) { - kbps->sample(); - srs_trace("-> "SRS_CONSTS_LOG_FOWARDER - " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", - pprint->age(), count, - kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); + sdk->kbps_sample(SRS_CONSTS_LOG_FOWARDER, pprint->age(), count); } // ignore when no messages. @@ -432,7 +325,7 @@ int SrsForwarder::forward() } // sendout messages, all messages are freed by send_and_free_messages(). - if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) { + if ((ret = sdk->send_and_free_messages(msgs.msgs, count)) != ERROR_SUCCESS) { srs_error("forwarder messages to server failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index 6d108f899..487cd2b83 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -43,6 +43,7 @@ class SrsRtmpClient; class SrsRequest; class SrsSource; class SrsKbps; +class SrsSimpleRtmpClient; /** * forward the stream to other servers. @@ -52,16 +53,13 @@ class SrsForwarder : public ISrsReusableThread2Handler { private: // the ep to forward, server[:port]. - std::string _ep_forward; - SrsRequest* _req; - int stream_id; + std::string ep_forward; + SrsRequest* req; private: SrsReusableThread2* pthread; private: SrsSource* source; - SrsTcpClient* transport; - SrsKbps* kbps; - SrsRtmpClient* client; + SrsSimpleRtmpClient* sdk; SrsRtmpJitter* jitter; SrsMessageQueue* queue; /** @@ -74,7 +72,7 @@ public: SrsForwarder(SrsSource* _source); virtual ~SrsForwarder(); public: - virtual int initialize(SrsRequest* req, std::string ep_forward); + virtual int initialize(SrsRequest* r, std::string ep); virtual void set_queue_size(double queue_size); public: virtual int on_publish(); @@ -98,9 +96,6 @@ public: public: virtual int cycle(); private: - virtual void discovery_ep(std::string& server, int& port, std::string& tc_url); - virtual int connect_server(std::string& ep_server, int& ep_port); - virtual int connect_app(std::string ep_server, int ep_port); virtual int forward(); }; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index d3efb40b7..0301c4602 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -301,6 +301,11 @@ int SrsSimpleRtmpClient::send_and_free_messages(SrsSharedPtrMessage** msgs, int return client->send_and_free_messages(msgs, nb_msgs, stream_id); } +int SrsSimpleRtmpClient::send_and_free_message(SrsSharedPtrMessage* msg) +{ + return client->send_and_free_message(msg, stream_id); +} + void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) { transport->set_recv_timeout(timeout); diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index dab29ebfb..be345660d 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -90,6 +90,7 @@ public: virtual int recv_message(SrsCommonMessage** pmsg); virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs); + virtual int send_and_free_message(SrsSharedPtrMessage* msg); public: virtual void set_recv_timeout(int64_t timeout); };