From 24b38999725b3516e48bef63acd0f3293a8c4636 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 14 Oct 2015 10:48:08 +0800 Subject: [PATCH] use SrsTcpClient instead raw socket. --- trunk/src/app/srs_app_caster_flv.cpp | 4 ++- trunk/src/app/srs_app_edge.cpp | 50 ++++++++------------------ trunk/src/app/srs_app_edge.hpp | 9 ++--- trunk/src/app/srs_app_forward.cpp | 25 ++++--------- trunk/src/app/srs_app_forward.hpp | 4 +-- trunk/src/app/srs_app_http_client.cpp | 31 +++++++--------- trunk/src/app/srs_app_http_client.hpp | 4 +-- trunk/src/app/srs_app_http_conn.cpp | 14 ++++---- trunk/src/app/srs_app_http_conn.hpp | 10 +++--- trunk/src/app/srs_app_mpegts_udp.cpp | 23 ++++++------ trunk/src/app/srs_app_mpegts_udp.hpp | 3 +- trunk/src/app/srs_app_rtmp_conn.cpp | 25 +++++-------- trunk/src/app/srs_app_rtmp_conn.hpp | 2 +- trunk/src/app/srs_app_rtsp.cpp | 34 ++++++++++-------- trunk/src/app/srs_app_rtsp.hpp | 2 +- trunk/src/main/srs_main_ingest_hls.cpp | 20 +++++------ 16 files changed, 107 insertions(+), 153 deletions(-) diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 20dc505f7..8edb04815 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -276,7 +276,9 @@ int SrsDynamicHttpConn::connect() if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) { return ret; } - client = new SrsRtmpClient(transport); + + srs_freep(client); + client = new SrsRtmpClient(transport); client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 2ac192bb5..3d88e6c63 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -63,13 +63,12 @@ using namespace std; SrsEdgeIngester::SrsEdgeIngester() { - io = NULL; + transport = new SrsTcpClient(); kbps = new SrsKbps(); client = NULL; _edge = NULL; _req = NULL; stream_id = 0; - stfd = NULL; lb = new SrsLbRoundRobin(); pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); } @@ -78,6 +77,7 @@ SrsEdgeIngester::~SrsEdgeIngester() { stop(); + srs_freep(transport); srs_freep(lb); srs_freep(pthread); srs_freep(kbps); @@ -109,11 +109,9 @@ int SrsEdgeIngester::start() void SrsEdgeIngester::stop() { pthread->stop(); - - close_underlayer_socket(); + transport->close(); srs_freep(client); - srs_freep(io); kbps->set_io(NULL, NULL); // notice to unpublish. @@ -336,17 +334,12 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) return ret; } -void SrsEdgeIngester::close_underlayer_socket() -{ - srs_close_stfd(stfd); -} - int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port) { int ret = ERROR_SUCCESS; // reopen - close_underlayer_socket(); + transport->close(); SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost); @@ -368,20 +361,16 @@ int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port) // open socket. int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US; - if ((ret = srs_socket_connect(ep_server, ep_port, timeout, &stfd)) != ERROR_SUCCESS) { + if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) { srs_warn("edge pull failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret); return ret; } srs_freep(client); - srs_freep(io); + client = new SrsRtmpClient(transport); - srs_assert(stfd); - io = new SrsStSocket(stfd); - client = new SrsRtmpClient(io); - - kbps->set_io(io, io); + kbps->set_io(transport, transport); srs_trace("edge pull connected, url=%s/%s, server=%s:%d", _req->tcUrl.c_str(), _req->stream.c_str(), ep_server.c_str(), ep_port); @@ -391,14 +380,13 @@ int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port) SrsEdgeForwarder::SrsEdgeForwarder() { - io = NULL; + transport = new SrsTcpClient(); kbps = new SrsKbps(); client = NULL; _edge = NULL; _req = NULL; lb = new SrsLbRoundRobin(); stream_id = 0; - stfd = NULL; pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); queue = new SrsMessageQueue(); send_error_code = ERROR_SUCCESS; @@ -408,6 +396,7 @@ SrsEdgeForwarder::~SrsEdgeForwarder() { stop(); + srs_freep(transport); srs_freep(lb); srs_freep(pthread); srs_freep(queue); @@ -473,13 +462,11 @@ int SrsEdgeForwarder::start() void SrsEdgeForwarder::stop() { pthread->stop(); - - close_underlayer_socket(); + transport->close(); queue->clear(); srs_freep(client); - srs_freep(io); kbps->set_io(NULL, NULL); } @@ -586,17 +573,12 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) return ret; } -void SrsEdgeForwarder::close_underlayer_socket() -{ - srs_close_stfd(stfd); -} - int SrsEdgeForwarder::connect_server(string& ep_server, int& ep_port) { int ret = ERROR_SUCCESS; // reopen - close_underlayer_socket(); + transport->close(); SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost); srs_assert(conf); @@ -610,20 +592,16 @@ int SrsEdgeForwarder::connect_server(string& ep_server, int& ep_port) // open socket. int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US; - if ((ret = srs_socket_connect(ep_server, ep_port, timeout, &stfd)) != ERROR_SUCCESS) { + if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) { srs_warn("edge push failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret); return ret; } srs_freep(client); - srs_freep(io); + client = new SrsRtmpClient(transport); - srs_assert(stfd); - io = new SrsStSocket(stfd); - client = new SrsRtmpClient(io); - - kbps->set_io(io, io); + kbps->set_io(transport, transport); // open socket. srs_trace("edge push connected, stream=%s, tcUrl=%s to server=%s, port=%d", diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 3f05ce925..6fa22db2a 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -47,6 +47,7 @@ class SrsMessageQueue; class ISrsProtocolReaderWriter; class SrsKbps; class SrsLbRoundRobin; +class SrsTcpClient; /** * the state of edge, auto machine @@ -85,8 +86,7 @@ private: SrsPlayEdge* _edge; SrsRequest* _req; SrsReusableThread2* pthread; - st_netfd_t stfd; - ISrsProtocolReaderWriter* io; + SrsTcpClient* transport; SrsKbps* kbps; SrsRtmpClient* client; SrsLbRoundRobin* lb; @@ -103,7 +103,6 @@ public: virtual int cycle(); private: virtual int ingest(); - virtual void close_underlayer_socket(); virtual int connect_server(std::string& ep_server, int& ep_port); virtual int connect_app(std::string ep_server, int ep_port); virtual int process_publish_message(SrsCommonMessage* msg); @@ -121,8 +120,7 @@ private: SrsPublishEdge* _edge; SrsRequest* _req; SrsReusableThread2* pthread; - st_netfd_t stfd; - ISrsProtocolReaderWriter* io; + SrsTcpClient* transport; SrsKbps* kbps; SrsRtmpClient* client; SrsLbRoundRobin* lb; @@ -152,7 +150,6 @@ public: public: virtual int proxy(SrsCommonMessage* msg); private: - virtual void close_underlayer_socket(); virtual int connect_server(std::string& ep_server, int& ep_port); virtual int connect_app(std::string ep_server, int ep_port); }; diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 21383b570..478cddcda 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -54,9 +54,8 @@ SrsForwarder::SrsForwarder(SrsSource* _source) source = _source; _req = NULL; - io = NULL; client = NULL; - stfd = NULL; + transport = new SrsTcpClient(); kbps = new SrsKbps(); stream_id = 0; @@ -71,6 +70,7 @@ SrsForwarder::~SrsForwarder() { on_unpublish(); + srs_freep(transport); srs_freep(pthread); srs_freep(queue); srs_freep(jitter); @@ -151,11 +151,9 @@ int SrsForwarder::on_publish() void SrsForwarder::on_unpublish() { pthread->stop(); - - close_underlayer_socket(); + transport->close(); srs_freep(client); - srs_freep(io); kbps->set_io(NULL, NULL); } @@ -271,11 +269,6 @@ int SrsForwarder::cycle() return ret; } -void SrsForwarder::close_underlayer_socket() -{ - srs_close_stfd(stfd); -} - void SrsForwarder::discovery_ep(string& server, int& port, string& tc_url) { SrsRequest* req = _req; @@ -292,7 +285,7 @@ int SrsForwarder::connect_server(string& ep_server, int& ep_port) int ret = ERROR_SUCCESS; // reopen - close_underlayer_socket(); + transport->close(); // discovery the server port and tcUrl from req and ep_forward. string tc_url; @@ -300,20 +293,16 @@ int SrsForwarder::connect_server(string& ep_server, int& ep_port) // open socket. int64_t timeout = SRS_FORWARDER_SLEEP_US; - if ((ret = srs_socket_connect(ep_server, ep_port, timeout, &stfd)) != ERROR_SUCCESS) { + if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) { srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret); return ret; } srs_freep(client); - srs_freep(io); + client = new SrsRtmpClient(transport); - srs_assert(stfd); - io = new SrsStSocket(stfd); - client = new SrsRtmpClient(io); - - kbps->set_io(io, io); + kbps->set_io(transport, transport); srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d", _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port); diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index 5842f0ac6..6d108f899 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -56,11 +56,10 @@ private: SrsRequest* _req; int stream_id; private: - st_netfd_t stfd; SrsReusableThread2* pthread; private: SrsSource* source; - ISrsProtocolReaderWriter* io; + SrsTcpClient* transport; SrsKbps* kbps; SrsRtmpClient* client; SrsRtmpJitter* jitter; @@ -99,7 +98,6 @@ public: public: virtual int cycle(); private: - virtual void close_underlayer_socket(); virtual void discovery_ep(std::string& server, int& port, std::string& tc_url); virtual int connect_server(std::string& ep_server, int& ep_port); virtual int connect_app(std::string ep_server, int ep_port); diff --git a/trunk/src/app/srs_app_http_client.cpp b/trunk/src/app/srs_app_http_client.cpp index 08f4b2b0c..1bf9ff185 100644 --- a/trunk/src/app/srs_app_http_client.cpp +++ b/trunk/src/app/srs_app_http_client.cpp @@ -39,9 +39,7 @@ using namespace std; SrsHttpClient::SrsHttpClient() { - connected = false; - stfd = NULL; - skt = NULL; + transport = new SrsTcpClient(); parser = NULL; timeout_us = 0; port = 0; @@ -50,6 +48,8 @@ SrsHttpClient::SrsHttpClient() SrsHttpClient::~SrsHttpClient() { disconnect(); + + srs_freep(transport); srs_freep(parser); } @@ -102,7 +102,7 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg) << req; std::string data = ss.str(); - if ((ret = skt->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { + if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { // disconnect when error. disconnect(); @@ -111,7 +111,7 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg) } ISrsHttpMessage* msg = NULL; - if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) { + if ((ret = parser->parse_message(transport, NULL, &msg)) != ERROR_SUCCESS) { srs_error("parse http post response failed. ret=%d", ret); return ret; } @@ -148,7 +148,7 @@ int SrsHttpClient::get(string path, std::string req, ISrsHttpMessage** ppmsg) << req; std::string data = ss.str(); - if ((ret = skt->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { + if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { // disconnect when error. disconnect(); @@ -157,7 +157,7 @@ int SrsHttpClient::get(string path, std::string req, ISrsHttpMessage** ppmsg) } ISrsHttpMessage* msg = NULL; - if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) { + if ((ret = parser->parse_message(transport, NULL, &msg)) != ERROR_SUCCESS) { srs_error("parse http post response failed. ret=%d", ret); return ret; } @@ -171,37 +171,30 @@ int SrsHttpClient::get(string path, std::string req, ISrsHttpMessage** ppmsg) void SrsHttpClient::disconnect() { - connected = false; - - srs_close_stfd(stfd); - srs_freep(skt); + transport->close(); } int SrsHttpClient::connect() { int ret = ERROR_SUCCESS; - if (connected) { + if (transport->connected()) { return ret; } disconnect(); // open socket. - if ((ret = srs_socket_connect(host, port, timeout_us, &stfd)) != ERROR_SUCCESS) { + if ((ret = transport->connect(host, port, timeout_us)) != ERROR_SUCCESS) { srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", host.c_str(), port, timeout_us, ret); return ret; } srs_info("connect to server success. server=%s, port=%d", host.c_str(), port); - srs_assert(!skt); - skt = new SrsStSocket(stfd); - connected = true; - // set the recv/send timeout in us. - skt->set_recv_timeout(timeout_us); - skt->set_send_timeout(timeout_us); + transport->set_recv_timeout(timeout_us); + transport->set_send_timeout(timeout_us); return ret; } diff --git a/trunk/src/app/srs_app_http_client.hpp b/trunk/src/app/srs_app_http_client.hpp index b05def7ca..c905277ee 100644 --- a/trunk/src/app/srs_app_http_client.hpp +++ b/trunk/src/app/srs_app_http_client.hpp @@ -49,9 +49,7 @@ class SrsStSocket; class SrsHttpClient { private: - bool connected; - st_netfd_t stfd; - SrsStSocket* skt; + SrsTcpClient* transport; SrsHttpParser* parser; private: int64_t timeout_us; diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 5476ae41a..786e99dd0 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -303,7 +303,7 @@ int SrsHttpResponseWriter::send_header(char* data, int size) return skt->write((void*)buf.c_str(), buf.length(), NULL); } -SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io) +SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, ISrsProtocolReaderWriter* io) { skt = io; owner = msg; @@ -494,7 +494,7 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read) return ret; } -SrsHttpMessage::SrsHttpMessage(SrsStSocket* io, SrsConnection* c) : ISrsHttpMessage() +SrsHttpMessage::SrsHttpMessage(ISrsProtocolReaderWriter* io, SrsConnection* c) : ISrsHttpMessage() { conn = c; chunked = false; @@ -870,7 +870,7 @@ int SrsHttpParser::initialize(enum http_parser_type type, bool allow_jsonp) return ret; } -int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, ISrsHttpMessage** ppmsg) +int SrsHttpParser::parse_message(ISrsProtocolReaderWriter* io, SrsConnection* conn, ISrsHttpMessage** ppmsg) { *ppmsg = NULL; @@ -887,7 +887,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, ISrsHttp header_parsed = 0; // do parse - if ((ret = parse_message_imp(skt)) != ERROR_SUCCESS) { + if ((ret = parse_message_imp(io)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("parse http msg failed. ret=%d", ret); } @@ -895,7 +895,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, ISrsHttp } // create msg - SrsHttpMessage* msg = new SrsHttpMessage(skt, conn); + SrsHttpMessage* msg = new SrsHttpMessage(io, conn); // initalize http msg, parse url. if ((ret = msg->update(url, jsonp, &header, buffer, headers)) != ERROR_SUCCESS) { @@ -910,7 +910,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, ISrsHttp return ret; } -int SrsHttpParser::parse_message_imp(SrsStSocket* skt) +int SrsHttpParser::parse_message_imp(ISrsProtocolReaderWriter* io) { int ret = ERROR_SUCCESS; @@ -944,7 +944,7 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt) // when nothing parsed, read more to parse. if (nparsed == 0) { // when requires more, only grow 1bytes, but the buffer will cache more. - if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) { + if ((ret = buffer->grow(io, buffer->size() + 1)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("read body from server failed. ret=%d", ret); } diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index e54c3a1f5..2fab8765d 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -120,7 +120,7 @@ public: class SrsHttpResponseReader : virtual public ISrsHttpResponseReader { private: - SrsStSocket* skt; + ISrsProtocolReaderWriter* skt; SrsHttpMessage* owner; SrsFastStream* buffer; bool is_eof; @@ -131,7 +131,7 @@ private: // already read total bytes. int64_t nb_total_read; public: - SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io); + SrsHttpResponseReader(SrsHttpMessage* msg, ISrsProtocolReaderWriter* io); virtual ~SrsHttpResponseReader(); public: /** @@ -208,7 +208,7 @@ private: // the method in QueryString will override the HTTP method. std::string jsonp_method; public: - SrsHttpMessage(SrsStSocket* io, SrsConnection* c); + SrsHttpMessage(ISrsProtocolReaderWriter* io, SrsConnection* c); virtual ~SrsHttpMessage(); public: /** @@ -332,12 +332,12 @@ public: * or error and *ppmsg must be NULL. * @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete(). */ - virtual int parse_message(SrsStSocket* skt, SrsConnection* conn, ISrsHttpMessage** ppmsg); + virtual int parse_message(ISrsProtocolReaderWriter* io, SrsConnection* conn, ISrsHttpMessage** ppmsg); private: /** * parse the HTTP message to member field: msg. */ - virtual int parse_message_imp(SrsStSocket* skt); + virtual int parse_message_imp(ISrsProtocolReaderWriter* io); private: static int on_message_begin(http_parser* parser); static int on_headers_complete(http_parser* parser); diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index 71593f72c..9bb92e46f 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -132,9 +132,8 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c) output = _srs_config->get_stream_caster_output(c); req = NULL; - io = NULL; client = NULL; - stfd = NULL; + transport = new SrsTcpClient(); stream_id = 0; avc = new SrsRawH264Stream(); @@ -150,6 +149,7 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp() { close(); + srs_freep(transport); srs_freep(buffer); srs_freep(stream); srs_freep(context); @@ -345,8 +345,8 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) } // ts tbn to flv tbn. - u_int32_t dts = msg->dts / 90; - u_int32_t pts = msg->dts / 90; + u_int32_t dts = (u_int32_t)(msg->dts / 90); + u_int32_t pts = (u_int32_t)(msg->dts / 90); // send each frame. while (!avs->empty()) { @@ -504,7 +504,7 @@ int SrsMpegtsOverUdp::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs) } // ts tbn to flv tbn. - u_int32_t dts = msg->dts / 90; + u_int32_t dts = (u_int32_t)(msg->dts / 90); // send each frame. while (!avs->empty()) { @@ -604,7 +604,7 @@ int SrsMpegtsOverUdp::connect() // when ok, ignore. // TODO: FIXME: should reconnect when disconnected. - if (io || client) { + if (transport->connected()) { return ret; } @@ -616,12 +616,13 @@ int SrsMpegtsOverUdp::connect() } // connect host. - if ((ret = srs_socket_connect(req->host, req->port, ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) { + if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) { srs_error("mpegts: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret); return ret; } - io = new SrsStSocket(stfd); - client = new SrsRtmpClient(io); + + srs_freep(client); + client = new SrsRtmpClient(transport); client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); @@ -704,9 +705,9 @@ int SrsMpegtsOverUdp::connect_app(string ep_server, int ep_port) void SrsMpegtsOverUdp::close() { srs_freep(client); - srs_freep(io); srs_freep(req); - srs_close_stfd(stfd); + + transport->close(); } #endif diff --git a/trunk/src/app/srs_app_mpegts_udp.hpp b/trunk/src/app/srs_app_mpegts_udp.hpp index 98b4cd3c3..f2ba65727 100644 --- a/trunk/src/app/srs_app_mpegts_udp.hpp +++ b/trunk/src/app/srs_app_mpegts_udp.hpp @@ -86,8 +86,7 @@ private: std::string output; private: SrsRequest* req; - st_netfd_t stfd; - SrsStSocket* io; + SrsTcpClient* transport; SrsRtmpClient* client; int stream_id; private: diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 5bc2d0dd2..c02dd8b75 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1229,11 +1229,13 @@ int SrsRtmpConn::check_edge_token_traverse_auth() srs_assert(req); - st_netfd_t stsock = NULL; + SrsTcpClient* transport = new SrsTcpClient(); + SrsAutoFree(SrsTcpClient, transport); + vector args = _srs_config->get_vhost_edge_origin(req->vhost)->args; for (int i = 0; i < (int)args.size(); i++) { string hostport = args.at(i); - if ((ret = connect_server(hostport, &stsock)) == ERROR_SUCCESS) { + if ((ret = connect_server(hostport, transport)) == ERROR_SUCCESS) { break; } } @@ -1242,20 +1244,13 @@ int SrsRtmpConn::check_edge_token_traverse_auth() return ret; } - srs_assert(stsock); - SrsStSocket* io = new SrsStSocket(stsock); - SrsRtmpClient* client = new SrsRtmpClient(io); + SrsRtmpClient* client = new SrsRtmpClient(transport); + SrsAutoFree(SrsRtmpClient, client); - ret = do_token_traverse_auth(client); - - srs_freep(client); - srs_freep(io); - srs_close_stfd(stsock); - - return ret; + return do_token_traverse_auth(client); } -int SrsRtmpConn::connect_server(string hostport, st_netfd_t* pstsock) +int SrsRtmpConn::connect_server(string hostport, SrsTcpClient* transport) { int ret = ERROR_SUCCESS; @@ -1268,16 +1263,14 @@ int SrsRtmpConn::connect_server(string hostport, st_netfd_t* pstsock) srs_parse_hostport(hostport, server, port); // open socket. - st_netfd_t stsock = NULL; int64_t timeout = SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US; - if ((ret = srs_socket_connect(server, port, timeout, &stsock)) != ERROR_SUCCESS) { + if ((ret = transport->connect(server, port, timeout)) != ERROR_SUCCESS) { srs_warn("edge token traverse failed, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", req->tcUrl.c_str(), server.c_str(), port, timeout, ret); return ret; } srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port); - *pstsock = stsock; return ret; } diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 005315aa4..5ae160537 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -134,7 +134,7 @@ private: virtual void set_sock_options(); private: virtual int check_edge_token_traverse_auth(); - virtual int connect_server(std::string hostport, st_netfd_t* pstsock); + virtual int connect_server(std::string hostport, SrsTcpClient* transport); virtual int do_token_traverse_auth(SrsRtmpClient* client); /** * when the connection disconnect, call this method. diff --git a/trunk/src/app/srs_app_rtsp.cpp b/trunk/src/app/srs_app_rtsp.cpp index b35eccb03..8ca22c1b1 100644 --- a/trunk/src/app/srs_app_rtsp.cpp +++ b/trunk/src/app/srs_app_rtsp.cpp @@ -195,8 +195,8 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o) trd = new SrsOneCycleThread("rtsp", this); req = NULL; - io = NULL; client = NULL; + transport = new SrsTcpClient(); stream_id = 0; vjitter = new SrsRtspJitter(); ajitter = new SrsRtspJitter(); @@ -219,7 +219,7 @@ SrsRtspConn::~SrsRtspConn() srs_freep(rtsp); srs_freep(client); - srs_freep(io); + srs_freep(transport); srs_freep(req); srs_freep(vjitter); @@ -254,7 +254,7 @@ int SrsRtspConn::do_cycle() srs_info("rtsp: got rtsp request"); if (req->is_options()) { - SrsRtspOptionsResponse* res = new SrsRtspOptionsResponse(req->seq); + SrsRtspOptionsResponse* res = new SrsRtspOptionsResponse((int)req->seq); res->session = session; if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { @@ -288,7 +288,7 @@ int SrsRtspConn::do_cycle() audio_sample_rate, audio_channel, rtsp_tcUrl.c_str(), rtsp_stream.c_str() ); - SrsRtspResponse* res = new SrsRtspResponse(req->seq); + SrsRtspResponse* res = new SrsRtspResponse((int)req->seq); res->session = session; if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { @@ -328,7 +328,7 @@ int SrsRtspConn::do_cycle() session = "O9EaZ4bf"; // TODO: FIXME: generate session id. } - SrsRtspSetupResponse* res = new SrsRtspSetupResponse(req->seq); + SrsRtspSetupResponse* res = new SrsRtspSetupResponse((int)req->seq); res->client_port_min = req->transport->client_port_min; res->client_port_max = req->transport->client_port_max; res->local_port_min = lpm; @@ -341,7 +341,7 @@ int SrsRtspConn::do_cycle() return ret; } } else if (req->is_record()) { - SrsRtspResponse* res = new SrsRtspResponse(req->seq); + SrsRtspResponse* res = new SrsRtspResponse((int)req->seq); res->session = session; if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { @@ -434,7 +434,11 @@ int SrsRtspConn::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts) return ret; } - if ((ret = write_h264_ipb_frame(pkt->payload->bytes(), pkt->payload->length(), dts / 90, pts / 90)) != ERROR_SUCCESS) { + char* bytes = pkt->payload->bytes(); + int length = pkt->payload->length(); + u_int32_t fdts = (u_int32_t)(dts / 90); + u_int32_t fpts = (u_int32_t)(pts / 90); + if ((ret = write_h264_ipb_frame(bytes, length, fdts, fpts)) != ERROR_SUCCESS) { return ret; } @@ -476,7 +480,7 @@ int SrsRtspConn::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts) int nb_frame = acache->audio_samples->sample_units[i].size; int64_t timestamp = (acache->dts + delta * i) / 90; acodec->aac_packet_type = 1; - if ((ret = write_audio_raw_frame(frame, nb_frame, acodec, timestamp)) != ERROR_SUCCESS) { + if ((ret = write_audio_raw_frame(frame, nb_frame, acodec, (u_int32_t)timestamp)) != ERROR_SUCCESS) { return ret; } } @@ -497,7 +501,7 @@ int SrsRtspConn::write_sequence_header() int64_t dts = vjitter->timestamp() / 90; // send video sps/pps - if ((ret = write_h264_sps_pps(dts, dts)) != ERROR_SUCCESS) { + if ((ret = write_h264_sps_pps((u_int32_t)dts, (u_int32_t)dts)) != ERROR_SUCCESS) { return ret; } @@ -535,7 +539,7 @@ int SrsRtspConn::write_sequence_header() break; }; - if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), acodec, dts)) != ERROR_SUCCESS) { + if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), acodec, (u_int32_t)dts)) != ERROR_SUCCESS) { return ret; } } @@ -642,7 +646,8 @@ int SrsRtspConn::connect() int ret = ERROR_SUCCESS; // when ok, ignore. - if (io || client) { + // TODO: FIXME: support reconnect. + if (transport->connected()) { return ret; } @@ -663,12 +668,13 @@ int SrsRtspConn::connect() } // connect host. - if ((ret = srs_socket_connect(req->host, req->port, ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) { + if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) { srs_error("rtsp: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret); return ret; } - io = new SrsStSocket(stfd); - client = new SrsRtmpClient(io); + + srs_freep(client); + client = new SrsRtmpClient(transport); client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); diff --git a/trunk/src/app/srs_app_rtsp.hpp b/trunk/src/app/srs_app_rtsp.hpp index 3fa245053..135839b33 100644 --- a/trunk/src/app/srs_app_rtsp.hpp +++ b/trunk/src/app/srs_app_rtsp.hpp @@ -139,7 +139,7 @@ private: SrsOneCycleThread* trd; private: SrsRequest* req; - SrsStSocket* io; + SrsTcpClient* transport; SrsRtmpClient* client; SrsRtspJitter* vjitter; SrsRtspJitter* ajitter; diff --git a/trunk/src/main/srs_main_ingest_hls.cpp b/trunk/src/main/srs_main_ingest_hls.cpp index fdf2a911a..2fdde094f 100644 --- a/trunk/src/main/srs_main_ingest_hls.cpp +++ b/trunk/src/main/srs_main_ingest_hls.cpp @@ -637,8 +637,7 @@ private: int64_t raw_aac_dts; private: SrsRequest* req; - st_netfd_t stfd; - SrsStSocket* io; + SrsTcpClient* transport; SrsRtmpClient* client; int stream_id; private: @@ -658,9 +657,8 @@ public: raw_aac_dts = srs_update_system_time_ms(); req = NULL; - io = NULL; client = NULL; - stfd = NULL; + transport = new SrsTcpClient(); stream_id = 0; avc = new SrsRawH264Stream(); @@ -672,6 +670,7 @@ public: virtual ~SrsIngestSrsOutput() { close(); + srs_freep(transport); srs_freep(avc); srs_freep(aac); @@ -1211,7 +1210,7 @@ int SrsIngestSrsOutput::connect() // when ok, ignore. // TODO: FIXME: should reconnect when disconnected. - if (io || client) { + if (transport->connected()) { return ret; } @@ -1235,12 +1234,13 @@ int SrsIngestSrsOutput::connect() } // connect host. - if ((ret = srs_socket_connect(req->host, req->port, ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) { + if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) { srs_error("mpegts: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret); return ret; } - io = new SrsStSocket(stfd); - client = new SrsRtmpClient(io); + + srs_freep(client); + client = new SrsRtmpClient(transport); client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); @@ -1326,9 +1326,9 @@ void SrsIngestSrsOutput::close() h264_sps_pps_sent = false; srs_freep(client); - srs_freep(io); srs_freep(req); - srs_close_stfd(stfd); + + transport->close(); } // the context for ingest hls stream.