1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

use SrsTcpClient instead raw socket.

This commit is contained in:
winlin 2015-10-14 10:48:08 +08:00
parent a9bb6061c3
commit 24b3899972
16 changed files with 107 additions and 153 deletions

View file

@ -276,6 +276,8 @@ int SrsDynamicHttpConn::connect()
if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) { if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) {
return ret; return ret;
} }
srs_freep(client);
client = new SrsRtmpClient(transport); client = new SrsRtmpClient(transport);
client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);

View file

@ -63,13 +63,12 @@ using namespace std;
SrsEdgeIngester::SrsEdgeIngester() SrsEdgeIngester::SrsEdgeIngester()
{ {
io = NULL; transport = new SrsTcpClient();
kbps = new SrsKbps(); kbps = new SrsKbps();
client = NULL; client = NULL;
_edge = NULL; _edge = NULL;
_req = NULL; _req = NULL;
stream_id = 0; stream_id = 0;
stfd = NULL;
lb = new SrsLbRoundRobin(); lb = new SrsLbRoundRobin();
pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US); pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US);
} }
@ -78,6 +77,7 @@ SrsEdgeIngester::~SrsEdgeIngester()
{ {
stop(); stop();
srs_freep(transport);
srs_freep(lb); srs_freep(lb);
srs_freep(pthread); srs_freep(pthread);
srs_freep(kbps); srs_freep(kbps);
@ -109,11 +109,9 @@ int SrsEdgeIngester::start()
void SrsEdgeIngester::stop() void SrsEdgeIngester::stop()
{ {
pthread->stop(); pthread->stop();
transport->close();
close_underlayer_socket();
srs_freep(client); srs_freep(client);
srs_freep(io);
kbps->set_io(NULL, NULL); kbps->set_io(NULL, NULL);
// notice to unpublish. // notice to unpublish.
@ -336,17 +334,12 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
return ret; return ret;
} }
void SrsEdgeIngester::close_underlayer_socket()
{
srs_close_stfd(stfd);
}
int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port) int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// reopen // reopen
close_underlayer_socket(); transport->close();
SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost); SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost);
@ -368,20 +361,16 @@ int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port)
// open socket. // open socket.
int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US; int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US;
if ((ret = srs_socket_connect(ep_server, ep_port, timeout, &stfd)) != ERROR_SUCCESS) { if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) {
srs_warn("edge pull failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", srs_warn("edge pull failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
_req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret); _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret);
return ret; return ret;
} }
srs_freep(client); srs_freep(client);
srs_freep(io); client = new SrsRtmpClient(transport);
srs_assert(stfd); kbps->set_io(transport, transport);
io = new SrsStSocket(stfd);
client = new SrsRtmpClient(io);
kbps->set_io(io, io);
srs_trace("edge pull connected, url=%s/%s, server=%s:%d", srs_trace("edge pull connected, url=%s/%s, server=%s:%d",
_req->tcUrl.c_str(), _req->stream.c_str(), ep_server.c_str(), ep_port); _req->tcUrl.c_str(), _req->stream.c_str(), ep_server.c_str(), ep_port);
@ -391,14 +380,13 @@ int SrsEdgeIngester::connect_server(string& ep_server, int& ep_port)
SrsEdgeForwarder::SrsEdgeForwarder() SrsEdgeForwarder::SrsEdgeForwarder()
{ {
io = NULL; transport = new SrsTcpClient();
kbps = new SrsKbps(); kbps = new SrsKbps();
client = NULL; client = NULL;
_edge = NULL; _edge = NULL;
_req = NULL; _req = NULL;
lb = new SrsLbRoundRobin(); lb = new SrsLbRoundRobin();
stream_id = 0; stream_id = 0;
stfd = NULL;
pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US);
queue = new SrsMessageQueue(); queue = new SrsMessageQueue();
send_error_code = ERROR_SUCCESS; send_error_code = ERROR_SUCCESS;
@ -408,6 +396,7 @@ SrsEdgeForwarder::~SrsEdgeForwarder()
{ {
stop(); stop();
srs_freep(transport);
srs_freep(lb); srs_freep(lb);
srs_freep(pthread); srs_freep(pthread);
srs_freep(queue); srs_freep(queue);
@ -473,13 +462,11 @@ int SrsEdgeForwarder::start()
void SrsEdgeForwarder::stop() void SrsEdgeForwarder::stop()
{ {
pthread->stop(); pthread->stop();
transport->close();
close_underlayer_socket();
queue->clear(); queue->clear();
srs_freep(client); srs_freep(client);
srs_freep(io);
kbps->set_io(NULL, NULL); kbps->set_io(NULL, NULL);
} }
@ -586,17 +573,12 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
return ret; return ret;
} }
void SrsEdgeForwarder::close_underlayer_socket()
{
srs_close_stfd(stfd);
}
int SrsEdgeForwarder::connect_server(string& ep_server, int& ep_port) int SrsEdgeForwarder::connect_server(string& ep_server, int& ep_port)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// reopen // reopen
close_underlayer_socket(); transport->close();
SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost); SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost);
srs_assert(conf); srs_assert(conf);
@ -610,20 +592,16 @@ int SrsEdgeForwarder::connect_server(string& ep_server, int& ep_port)
// open socket. // open socket.
int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US; int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US;
if ((ret = srs_socket_connect(ep_server, ep_port, timeout, &stfd)) != ERROR_SUCCESS) { if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) {
srs_warn("edge push failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", srs_warn("edge push failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
_req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret); _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret);
return ret; return ret;
} }
srs_freep(client); srs_freep(client);
srs_freep(io); client = new SrsRtmpClient(transport);
srs_assert(stfd); kbps->set_io(transport, transport);
io = new SrsStSocket(stfd);
client = new SrsRtmpClient(io);
kbps->set_io(io, io);
// open socket. // open socket.
srs_trace("edge push connected, stream=%s, tcUrl=%s to server=%s, port=%d", srs_trace("edge push connected, stream=%s, tcUrl=%s to server=%s, port=%d",

View file

@ -47,6 +47,7 @@ class SrsMessageQueue;
class ISrsProtocolReaderWriter; class ISrsProtocolReaderWriter;
class SrsKbps; class SrsKbps;
class SrsLbRoundRobin; class SrsLbRoundRobin;
class SrsTcpClient;
/** /**
* the state of edge, auto machine * the state of edge, auto machine
@ -85,8 +86,7 @@ private:
SrsPlayEdge* _edge; SrsPlayEdge* _edge;
SrsRequest* _req; SrsRequest* _req;
SrsReusableThread2* pthread; SrsReusableThread2* pthread;
st_netfd_t stfd; SrsTcpClient* transport;
ISrsProtocolReaderWriter* io;
SrsKbps* kbps; SrsKbps* kbps;
SrsRtmpClient* client; SrsRtmpClient* client;
SrsLbRoundRobin* lb; SrsLbRoundRobin* lb;
@ -103,7 +103,6 @@ public:
virtual int cycle(); virtual int cycle();
private: private:
virtual int ingest(); virtual int ingest();
virtual void close_underlayer_socket();
virtual int connect_server(std::string& ep_server, int& ep_port); virtual int connect_server(std::string& ep_server, int& ep_port);
virtual int connect_app(std::string ep_server, int ep_port); virtual int connect_app(std::string ep_server, int ep_port);
virtual int process_publish_message(SrsCommonMessage* msg); virtual int process_publish_message(SrsCommonMessage* msg);
@ -121,8 +120,7 @@ private:
SrsPublishEdge* _edge; SrsPublishEdge* _edge;
SrsRequest* _req; SrsRequest* _req;
SrsReusableThread2* pthread; SrsReusableThread2* pthread;
st_netfd_t stfd; SrsTcpClient* transport;
ISrsProtocolReaderWriter* io;
SrsKbps* kbps; SrsKbps* kbps;
SrsRtmpClient* client; SrsRtmpClient* client;
SrsLbRoundRobin* lb; SrsLbRoundRobin* lb;
@ -152,7 +150,6 @@ public:
public: public:
virtual int proxy(SrsCommonMessage* msg); virtual int proxy(SrsCommonMessage* msg);
private: private:
virtual void close_underlayer_socket();
virtual int connect_server(std::string& ep_server, int& ep_port); virtual int connect_server(std::string& ep_server, int& ep_port);
virtual int connect_app(std::string ep_server, int ep_port); virtual int connect_app(std::string ep_server, int ep_port);
}; };

View file

@ -54,9 +54,8 @@ SrsForwarder::SrsForwarder(SrsSource* _source)
source = _source; source = _source;
_req = NULL; _req = NULL;
io = NULL;
client = NULL; client = NULL;
stfd = NULL; transport = new SrsTcpClient();
kbps = new SrsKbps(); kbps = new SrsKbps();
stream_id = 0; stream_id = 0;
@ -71,6 +70,7 @@ SrsForwarder::~SrsForwarder()
{ {
on_unpublish(); on_unpublish();
srs_freep(transport);
srs_freep(pthread); srs_freep(pthread);
srs_freep(queue); srs_freep(queue);
srs_freep(jitter); srs_freep(jitter);
@ -151,11 +151,9 @@ int SrsForwarder::on_publish()
void SrsForwarder::on_unpublish() void SrsForwarder::on_unpublish()
{ {
pthread->stop(); pthread->stop();
transport->close();
close_underlayer_socket();
srs_freep(client); srs_freep(client);
srs_freep(io);
kbps->set_io(NULL, NULL); kbps->set_io(NULL, NULL);
} }
@ -271,11 +269,6 @@ int SrsForwarder::cycle()
return ret; return ret;
} }
void SrsForwarder::close_underlayer_socket()
{
srs_close_stfd(stfd);
}
void SrsForwarder::discovery_ep(string& server, int& port, string& tc_url) void SrsForwarder::discovery_ep(string& server, int& port, string& tc_url)
{ {
SrsRequest* req = _req; SrsRequest* req = _req;
@ -292,7 +285,7 @@ int SrsForwarder::connect_server(string& ep_server, int& ep_port)
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// reopen // reopen
close_underlayer_socket(); transport->close();
// discovery the server port and tcUrl from req and ep_forward. // discovery the server port and tcUrl from req and ep_forward.
string tc_url; string tc_url;
@ -300,20 +293,16 @@ int SrsForwarder::connect_server(string& ep_server, int& ep_port)
// open socket. // open socket.
int64_t timeout = SRS_FORWARDER_SLEEP_US; int64_t timeout = SRS_FORWARDER_SLEEP_US;
if ((ret = srs_socket_connect(ep_server, ep_port, timeout, &stfd)) != ERROR_SUCCESS) { if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) {
srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
_req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret); _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret);
return ret; return ret;
} }
srs_freep(client); srs_freep(client);
srs_freep(io); client = new SrsRtmpClient(transport);
srs_assert(stfd); kbps->set_io(transport, transport);
io = new SrsStSocket(stfd);
client = new SrsRtmpClient(io);
kbps->set_io(io, io);
srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d", srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d",
_req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port); _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port);

View file

@ -56,11 +56,10 @@ private:
SrsRequest* _req; SrsRequest* _req;
int stream_id; int stream_id;
private: private:
st_netfd_t stfd;
SrsReusableThread2* pthread; SrsReusableThread2* pthread;
private: private:
SrsSource* source; SrsSource* source;
ISrsProtocolReaderWriter* io; SrsTcpClient* transport;
SrsKbps* kbps; SrsKbps* kbps;
SrsRtmpClient* client; SrsRtmpClient* client;
SrsRtmpJitter* jitter; SrsRtmpJitter* jitter;
@ -99,7 +98,6 @@ public:
public: public:
virtual int cycle(); virtual int cycle();
private: private:
virtual void close_underlayer_socket();
virtual void discovery_ep(std::string& server, int& port, std::string& tc_url); virtual void discovery_ep(std::string& server, int& port, std::string& tc_url);
virtual int connect_server(std::string& ep_server, int& ep_port); virtual int connect_server(std::string& ep_server, int& ep_port);
virtual int connect_app(std::string ep_server, int ep_port); virtual int connect_app(std::string ep_server, int ep_port);

View file

@ -39,9 +39,7 @@ using namespace std;
SrsHttpClient::SrsHttpClient() SrsHttpClient::SrsHttpClient()
{ {
connected = false; transport = new SrsTcpClient();
stfd = NULL;
skt = NULL;
parser = NULL; parser = NULL;
timeout_us = 0; timeout_us = 0;
port = 0; port = 0;
@ -50,6 +48,8 @@ SrsHttpClient::SrsHttpClient()
SrsHttpClient::~SrsHttpClient() SrsHttpClient::~SrsHttpClient()
{ {
disconnect(); disconnect();
srs_freep(transport);
srs_freep(parser); srs_freep(parser);
} }
@ -102,7 +102,7 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg)
<< req; << req;
std::string data = ss.str(); std::string data = ss.str();
if ((ret = skt->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) {
// disconnect when error. // disconnect when error.
disconnect(); disconnect();
@ -111,7 +111,7 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg)
} }
ISrsHttpMessage* msg = NULL; ISrsHttpMessage* msg = NULL;
if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) { if ((ret = parser->parse_message(transport, NULL, &msg)) != ERROR_SUCCESS) {
srs_error("parse http post response failed. ret=%d", ret); srs_error("parse http post response failed. ret=%d", ret);
return ret; return ret;
} }
@ -148,7 +148,7 @@ int SrsHttpClient::get(string path, std::string req, ISrsHttpMessage** ppmsg)
<< req; << req;
std::string data = ss.str(); std::string data = ss.str();
if ((ret = skt->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) { if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) {
// disconnect when error. // disconnect when error.
disconnect(); disconnect();
@ -157,7 +157,7 @@ int SrsHttpClient::get(string path, std::string req, ISrsHttpMessage** ppmsg)
} }
ISrsHttpMessage* msg = NULL; ISrsHttpMessage* msg = NULL;
if ((ret = parser->parse_message(skt, NULL, &msg)) != ERROR_SUCCESS) { if ((ret = parser->parse_message(transport, NULL, &msg)) != ERROR_SUCCESS) {
srs_error("parse http post response failed. ret=%d", ret); srs_error("parse http post response failed. ret=%d", ret);
return ret; return ret;
} }
@ -171,37 +171,30 @@ int SrsHttpClient::get(string path, std::string req, ISrsHttpMessage** ppmsg)
void SrsHttpClient::disconnect() void SrsHttpClient::disconnect()
{ {
connected = false; transport->close();
srs_close_stfd(stfd);
srs_freep(skt);
} }
int SrsHttpClient::connect() int SrsHttpClient::connect()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
if (connected) { if (transport->connected()) {
return ret; return ret;
} }
disconnect(); disconnect();
// open socket. // open socket.
if ((ret = srs_socket_connect(host, port, timeout_us, &stfd)) != ERROR_SUCCESS) { if ((ret = transport->connect(host, port, timeout_us)) != ERROR_SUCCESS) {
srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d",
host.c_str(), port, timeout_us, ret); host.c_str(), port, timeout_us, ret);
return ret; return ret;
} }
srs_info("connect to server success. server=%s, port=%d", host.c_str(), port); srs_info("connect to server success. server=%s, port=%d", host.c_str(), port);
srs_assert(!skt);
skt = new SrsStSocket(stfd);
connected = true;
// set the recv/send timeout in us. // set the recv/send timeout in us.
skt->set_recv_timeout(timeout_us); transport->set_recv_timeout(timeout_us);
skt->set_send_timeout(timeout_us); transport->set_send_timeout(timeout_us);
return ret; return ret;
} }

View file

@ -49,9 +49,7 @@ class SrsStSocket;
class SrsHttpClient class SrsHttpClient
{ {
private: private:
bool connected; SrsTcpClient* transport;
st_netfd_t stfd;
SrsStSocket* skt;
SrsHttpParser* parser; SrsHttpParser* parser;
private: private:
int64_t timeout_us; int64_t timeout_us;

View file

@ -303,7 +303,7 @@ int SrsHttpResponseWriter::send_header(char* data, int size)
return skt->write((void*)buf.c_str(), buf.length(), NULL); return skt->write((void*)buf.c_str(), buf.length(), NULL);
} }
SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io) SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, ISrsProtocolReaderWriter* io)
{ {
skt = io; skt = io;
owner = msg; owner = msg;
@ -494,7 +494,7 @@ int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read)
return ret; return ret;
} }
SrsHttpMessage::SrsHttpMessage(SrsStSocket* io, SrsConnection* c) : ISrsHttpMessage() SrsHttpMessage::SrsHttpMessage(ISrsProtocolReaderWriter* io, SrsConnection* c) : ISrsHttpMessage()
{ {
conn = c; conn = c;
chunked = false; chunked = false;
@ -870,7 +870,7 @@ int SrsHttpParser::initialize(enum http_parser_type type, bool allow_jsonp)
return ret; return ret;
} }
int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, ISrsHttpMessage** ppmsg) int SrsHttpParser::parse_message(ISrsProtocolReaderWriter* io, SrsConnection* conn, ISrsHttpMessage** ppmsg)
{ {
*ppmsg = NULL; *ppmsg = NULL;
@ -887,7 +887,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, ISrsHttp
header_parsed = 0; header_parsed = 0;
// do parse // do parse
if ((ret = parse_message_imp(skt)) != ERROR_SUCCESS) { if ((ret = parse_message_imp(io)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) { if (!srs_is_client_gracefully_close(ret)) {
srs_error("parse http msg failed. ret=%d", ret); srs_error("parse http msg failed. ret=%d", ret);
} }
@ -895,7 +895,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, ISrsHttp
} }
// create msg // create msg
SrsHttpMessage* msg = new SrsHttpMessage(skt, conn); SrsHttpMessage* msg = new SrsHttpMessage(io, conn);
// initalize http msg, parse url. // initalize http msg, parse url.
if ((ret = msg->update(url, jsonp, &header, buffer, headers)) != ERROR_SUCCESS) { if ((ret = msg->update(url, jsonp, &header, buffer, headers)) != ERROR_SUCCESS) {
@ -910,7 +910,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsConnection* conn, ISrsHttp
return ret; return ret;
} }
int SrsHttpParser::parse_message_imp(SrsStSocket* skt) int SrsHttpParser::parse_message_imp(ISrsProtocolReaderWriter* io)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -944,7 +944,7 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt)
// when nothing parsed, read more to parse. // when nothing parsed, read more to parse.
if (nparsed == 0) { if (nparsed == 0) {
// when requires more, only grow 1bytes, but the buffer will cache more. // when requires more, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) { if ((ret = buffer->grow(io, buffer->size() + 1)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) { if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret); srs_error("read body from server failed. ret=%d", ret);
} }

View file

@ -120,7 +120,7 @@ public:
class SrsHttpResponseReader : virtual public ISrsHttpResponseReader class SrsHttpResponseReader : virtual public ISrsHttpResponseReader
{ {
private: private:
SrsStSocket* skt; ISrsProtocolReaderWriter* skt;
SrsHttpMessage* owner; SrsHttpMessage* owner;
SrsFastStream* buffer; SrsFastStream* buffer;
bool is_eof; bool is_eof;
@ -131,7 +131,7 @@ private:
// already read total bytes. // already read total bytes.
int64_t nb_total_read; int64_t nb_total_read;
public: public:
SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io); SrsHttpResponseReader(SrsHttpMessage* msg, ISrsProtocolReaderWriter* io);
virtual ~SrsHttpResponseReader(); virtual ~SrsHttpResponseReader();
public: public:
/** /**
@ -208,7 +208,7 @@ private:
// the method in QueryString will override the HTTP method. // the method in QueryString will override the HTTP method.
std::string jsonp_method; std::string jsonp_method;
public: public:
SrsHttpMessage(SrsStSocket* io, SrsConnection* c); SrsHttpMessage(ISrsProtocolReaderWriter* io, SrsConnection* c);
virtual ~SrsHttpMessage(); virtual ~SrsHttpMessage();
public: public:
/** /**
@ -332,12 +332,12 @@ public:
* or error and *ppmsg must be NULL. * or error and *ppmsg must be NULL.
* @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete(). * @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete().
*/ */
virtual int parse_message(SrsStSocket* skt, SrsConnection* conn, ISrsHttpMessage** ppmsg); virtual int parse_message(ISrsProtocolReaderWriter* io, SrsConnection* conn, ISrsHttpMessage** ppmsg);
private: private:
/** /**
* parse the HTTP message to member field: msg. * parse the HTTP message to member field: msg.
*/ */
virtual int parse_message_imp(SrsStSocket* skt); virtual int parse_message_imp(ISrsProtocolReaderWriter* io);
private: private:
static int on_message_begin(http_parser* parser); static int on_message_begin(http_parser* parser);
static int on_headers_complete(http_parser* parser); static int on_headers_complete(http_parser* parser);

View file

@ -132,9 +132,8 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
output = _srs_config->get_stream_caster_output(c); output = _srs_config->get_stream_caster_output(c);
req = NULL; req = NULL;
io = NULL;
client = NULL; client = NULL;
stfd = NULL; transport = new SrsTcpClient();
stream_id = 0; stream_id = 0;
avc = new SrsRawH264Stream(); avc = new SrsRawH264Stream();
@ -150,6 +149,7 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
{ {
close(); close();
srs_freep(transport);
srs_freep(buffer); srs_freep(buffer);
srs_freep(stream); srs_freep(stream);
srs_freep(context); srs_freep(context);
@ -345,8 +345,8 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs)
} }
// ts tbn to flv tbn. // ts tbn to flv tbn.
u_int32_t dts = msg->dts / 90; u_int32_t dts = (u_int32_t)(msg->dts / 90);
u_int32_t pts = msg->dts / 90; u_int32_t pts = (u_int32_t)(msg->dts / 90);
// send each frame. // send each frame.
while (!avs->empty()) { while (!avs->empty()) {
@ -504,7 +504,7 @@ int SrsMpegtsOverUdp::on_ts_audio(SrsTsMessage* msg, SrsBuffer* avs)
} }
// ts tbn to flv tbn. // ts tbn to flv tbn.
u_int32_t dts = msg->dts / 90; u_int32_t dts = (u_int32_t)(msg->dts / 90);
// send each frame. // send each frame.
while (!avs->empty()) { while (!avs->empty()) {
@ -604,7 +604,7 @@ int SrsMpegtsOverUdp::connect()
// when ok, ignore. // when ok, ignore.
// TODO: FIXME: should reconnect when disconnected. // TODO: FIXME: should reconnect when disconnected.
if (io || client) { if (transport->connected()) {
return ret; return ret;
} }
@ -616,12 +616,13 @@ int SrsMpegtsOverUdp::connect()
} }
// connect host. // connect host.
if ((ret = srs_socket_connect(req->host, req->port, ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) { if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) {
srs_error("mpegts: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret); srs_error("mpegts: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret);
return ret; return ret;
} }
io = new SrsStSocket(stfd);
client = new SrsRtmpClient(io); srs_freep(client);
client = new SrsRtmpClient(transport);
client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
@ -704,9 +705,9 @@ int SrsMpegtsOverUdp::connect_app(string ep_server, int ep_port)
void SrsMpegtsOverUdp::close() void SrsMpegtsOverUdp::close()
{ {
srs_freep(client); srs_freep(client);
srs_freep(io);
srs_freep(req); srs_freep(req);
srs_close_stfd(stfd);
transport->close();
} }
#endif #endif

View file

@ -86,8 +86,7 @@ private:
std::string output; std::string output;
private: private:
SrsRequest* req; SrsRequest* req;
st_netfd_t stfd; SrsTcpClient* transport;
SrsStSocket* io;
SrsRtmpClient* client; SrsRtmpClient* client;
int stream_id; int stream_id;
private: private:

View file

@ -1229,11 +1229,13 @@ int SrsRtmpConn::check_edge_token_traverse_auth()
srs_assert(req); srs_assert(req);
st_netfd_t stsock = NULL; SrsTcpClient* transport = new SrsTcpClient();
SrsAutoFree(SrsTcpClient, transport);
vector<string> args = _srs_config->get_vhost_edge_origin(req->vhost)->args; vector<string> args = _srs_config->get_vhost_edge_origin(req->vhost)->args;
for (int i = 0; i < (int)args.size(); i++) { for (int i = 0; i < (int)args.size(); i++) {
string hostport = args.at(i); string hostport = args.at(i);
if ((ret = connect_server(hostport, &stsock)) == ERROR_SUCCESS) { if ((ret = connect_server(hostport, transport)) == ERROR_SUCCESS) {
break; break;
} }
} }
@ -1242,20 +1244,13 @@ int SrsRtmpConn::check_edge_token_traverse_auth()
return ret; return ret;
} }
srs_assert(stsock); SrsRtmpClient* client = new SrsRtmpClient(transport);
SrsStSocket* io = new SrsStSocket(stsock); SrsAutoFree(SrsRtmpClient, client);
SrsRtmpClient* client = new SrsRtmpClient(io);
ret = do_token_traverse_auth(client); return do_token_traverse_auth(client);
srs_freep(client);
srs_freep(io);
srs_close_stfd(stsock);
return ret;
} }
int SrsRtmpConn::connect_server(string hostport, st_netfd_t* pstsock) int SrsRtmpConn::connect_server(string hostport, SrsTcpClient* transport)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -1268,16 +1263,14 @@ int SrsRtmpConn::connect_server(string hostport, st_netfd_t* pstsock)
srs_parse_hostport(hostport, server, port); srs_parse_hostport(hostport, server, port);
// open socket. // open socket.
st_netfd_t stsock = NULL;
int64_t timeout = SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US; int64_t timeout = SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US;
if ((ret = srs_socket_connect(server, port, timeout, &stsock)) != ERROR_SUCCESS) { if ((ret = transport->connect(server, port, timeout)) != ERROR_SUCCESS) {
srs_warn("edge token traverse failed, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", srs_warn("edge token traverse failed, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d",
req->tcUrl.c_str(), server.c_str(), port, timeout, ret); req->tcUrl.c_str(), server.c_str(), port, timeout, ret);
return ret; return ret;
} }
srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port); srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port);
*pstsock = stsock;
return ret; return ret;
} }

View file

@ -134,7 +134,7 @@ private:
virtual void set_sock_options(); virtual void set_sock_options();
private: private:
virtual int check_edge_token_traverse_auth(); virtual int check_edge_token_traverse_auth();
virtual int connect_server(std::string hostport, st_netfd_t* pstsock); virtual int connect_server(std::string hostport, SrsTcpClient* transport);
virtual int do_token_traverse_auth(SrsRtmpClient* client); virtual int do_token_traverse_auth(SrsRtmpClient* client);
/** /**
* when the connection disconnect, call this method. * when the connection disconnect, call this method.

View file

@ -195,8 +195,8 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, st_netfd_t fd, std::string o)
trd = new SrsOneCycleThread("rtsp", this); trd = new SrsOneCycleThread("rtsp", this);
req = NULL; req = NULL;
io = NULL;
client = NULL; client = NULL;
transport = new SrsTcpClient();
stream_id = 0; stream_id = 0;
vjitter = new SrsRtspJitter(); vjitter = new SrsRtspJitter();
ajitter = new SrsRtspJitter(); ajitter = new SrsRtspJitter();
@ -219,7 +219,7 @@ SrsRtspConn::~SrsRtspConn()
srs_freep(rtsp); srs_freep(rtsp);
srs_freep(client); srs_freep(client);
srs_freep(io); srs_freep(transport);
srs_freep(req); srs_freep(req);
srs_freep(vjitter); srs_freep(vjitter);
@ -254,7 +254,7 @@ int SrsRtspConn::do_cycle()
srs_info("rtsp: got rtsp request"); srs_info("rtsp: got rtsp request");
if (req->is_options()) { if (req->is_options()) {
SrsRtspOptionsResponse* res = new SrsRtspOptionsResponse(req->seq); SrsRtspOptionsResponse* res = new SrsRtspOptionsResponse((int)req->seq);
res->session = session; res->session = session;
if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) { if (!srs_is_client_gracefully_close(ret)) {
@ -288,7 +288,7 @@ int SrsRtspConn::do_cycle()
audio_sample_rate, audio_channel, rtsp_tcUrl.c_str(), rtsp_stream.c_str() audio_sample_rate, audio_channel, rtsp_tcUrl.c_str(), rtsp_stream.c_str()
); );
SrsRtspResponse* res = new SrsRtspResponse(req->seq); SrsRtspResponse* res = new SrsRtspResponse((int)req->seq);
res->session = session; res->session = session;
if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) { if (!srs_is_client_gracefully_close(ret)) {
@ -328,7 +328,7 @@ int SrsRtspConn::do_cycle()
session = "O9EaZ4bf"; // TODO: FIXME: generate session id. session = "O9EaZ4bf"; // TODO: FIXME: generate session id.
} }
SrsRtspSetupResponse* res = new SrsRtspSetupResponse(req->seq); SrsRtspSetupResponse* res = new SrsRtspSetupResponse((int)req->seq);
res->client_port_min = req->transport->client_port_min; res->client_port_min = req->transport->client_port_min;
res->client_port_max = req->transport->client_port_max; res->client_port_max = req->transport->client_port_max;
res->local_port_min = lpm; res->local_port_min = lpm;
@ -341,7 +341,7 @@ int SrsRtspConn::do_cycle()
return ret; return ret;
} }
} else if (req->is_record()) { } else if (req->is_record()) {
SrsRtspResponse* res = new SrsRtspResponse(req->seq); SrsRtspResponse* res = new SrsRtspResponse((int)req->seq);
res->session = session; res->session = session;
if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) { if ((ret = rtsp->send_message(res)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) { if (!srs_is_client_gracefully_close(ret)) {
@ -434,7 +434,11 @@ int SrsRtspConn::on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts)
return ret; return ret;
} }
if ((ret = write_h264_ipb_frame(pkt->payload->bytes(), pkt->payload->length(), dts / 90, pts / 90)) != ERROR_SUCCESS) { char* bytes = pkt->payload->bytes();
int length = pkt->payload->length();
u_int32_t fdts = (u_int32_t)(dts / 90);
u_int32_t fpts = (u_int32_t)(pts / 90);
if ((ret = write_h264_ipb_frame(bytes, length, fdts, fpts)) != ERROR_SUCCESS) {
return ret; return ret;
} }
@ -476,7 +480,7 @@ int SrsRtspConn::kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts)
int nb_frame = acache->audio_samples->sample_units[i].size; int nb_frame = acache->audio_samples->sample_units[i].size;
int64_t timestamp = (acache->dts + delta * i) / 90; int64_t timestamp = (acache->dts + delta * i) / 90;
acodec->aac_packet_type = 1; acodec->aac_packet_type = 1;
if ((ret = write_audio_raw_frame(frame, nb_frame, acodec, timestamp)) != ERROR_SUCCESS) { if ((ret = write_audio_raw_frame(frame, nb_frame, acodec, (u_int32_t)timestamp)) != ERROR_SUCCESS) {
return ret; return ret;
} }
} }
@ -497,7 +501,7 @@ int SrsRtspConn::write_sequence_header()
int64_t dts = vjitter->timestamp() / 90; int64_t dts = vjitter->timestamp() / 90;
// send video sps/pps // send video sps/pps
if ((ret = write_h264_sps_pps(dts, dts)) != ERROR_SUCCESS) { if ((ret = write_h264_sps_pps((u_int32_t)dts, (u_int32_t)dts)) != ERROR_SUCCESS) {
return ret; return ret;
} }
@ -535,7 +539,7 @@ int SrsRtspConn::write_sequence_header()
break; break;
}; };
if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), acodec, dts)) != ERROR_SUCCESS) { if ((ret = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), acodec, (u_int32_t)dts)) != ERROR_SUCCESS) {
return ret; return ret;
} }
} }
@ -642,7 +646,8 @@ int SrsRtspConn::connect()
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// when ok, ignore. // when ok, ignore.
if (io || client) { // TODO: FIXME: support reconnect.
if (transport->connected()) {
return ret; return ret;
} }
@ -663,12 +668,13 @@ int SrsRtspConn::connect()
} }
// connect host. // connect host.
if ((ret = srs_socket_connect(req->host, req->port, ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) { if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) {
srs_error("rtsp: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret); srs_error("rtsp: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret);
return ret; return ret;
} }
io = new SrsStSocket(stfd);
client = new SrsRtmpClient(io); srs_freep(client);
client = new SrsRtmpClient(transport);
client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);

View file

@ -139,7 +139,7 @@ private:
SrsOneCycleThread* trd; SrsOneCycleThread* trd;
private: private:
SrsRequest* req; SrsRequest* req;
SrsStSocket* io; SrsTcpClient* transport;
SrsRtmpClient* client; SrsRtmpClient* client;
SrsRtspJitter* vjitter; SrsRtspJitter* vjitter;
SrsRtspJitter* ajitter; SrsRtspJitter* ajitter;

View file

@ -637,8 +637,7 @@ private:
int64_t raw_aac_dts; int64_t raw_aac_dts;
private: private:
SrsRequest* req; SrsRequest* req;
st_netfd_t stfd; SrsTcpClient* transport;
SrsStSocket* io;
SrsRtmpClient* client; SrsRtmpClient* client;
int stream_id; int stream_id;
private: private:
@ -658,9 +657,8 @@ public:
raw_aac_dts = srs_update_system_time_ms(); raw_aac_dts = srs_update_system_time_ms();
req = NULL; req = NULL;
io = NULL;
client = NULL; client = NULL;
stfd = NULL; transport = new SrsTcpClient();
stream_id = 0; stream_id = 0;
avc = new SrsRawH264Stream(); avc = new SrsRawH264Stream();
@ -672,6 +670,7 @@ public:
virtual ~SrsIngestSrsOutput() { virtual ~SrsIngestSrsOutput() {
close(); close();
srs_freep(transport);
srs_freep(avc); srs_freep(avc);
srs_freep(aac); srs_freep(aac);
@ -1211,7 +1210,7 @@ int SrsIngestSrsOutput::connect()
// when ok, ignore. // when ok, ignore.
// TODO: FIXME: should reconnect when disconnected. // TODO: FIXME: should reconnect when disconnected.
if (io || client) { if (transport->connected()) {
return ret; return ret;
} }
@ -1235,12 +1234,13 @@ int SrsIngestSrsOutput::connect()
} }
// connect host. // connect host.
if ((ret = srs_socket_connect(req->host, req->port, ST_UTIME_NO_TIMEOUT, &stfd)) != ERROR_SUCCESS) { if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) {
srs_error("mpegts: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret); srs_error("mpegts: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret);
return ret; return ret;
} }
io = new SrsStSocket(stfd);
client = new SrsRtmpClient(io); srs_freep(client);
client = new SrsRtmpClient(transport);
client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
@ -1326,9 +1326,9 @@ void SrsIngestSrsOutput::close()
h264_sps_pps_sent = false; h264_sps_pps_sent = false;
srs_freep(client); srs_freep(client);
srs_freep(io);
srs_freep(req); srs_freep(req);
srs_close_stfd(stfd);
transport->close();
} }
// the context for ingest hls stream. // the context for ingest hls stream.