1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

extract protocol io interface to prepare to extract the protocol from underlayer socket.

This commit is contained in:
winlin 2014-03-01 12:43:04 +08:00
parent a73dec4c41
commit 7dfc902b87
18 changed files with 247 additions and 95 deletions

View file

@ -41,6 +41,7 @@ using namespace std;
#include <srs_core_hls.hpp>
#include <srs_core_http.hpp>
#include <srs_core_bandwidth.hpp>
#include <srs_core_socket.hpp>
SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
: SrsConnection(srs_server, client_stfd)
@ -48,7 +49,8 @@ SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
ip = NULL;
req = new SrsRequest();
res = new SrsResponse();
rtmp = new SrsRtmp(client_stfd);
skt = new SrsSocket(client_stfd);
rtmp = new SrsRtmp(skt);
refer = new SrsRefer();
#ifdef SRS_HTTP
http_hooks = new SrsHttpHooks();
@ -66,6 +68,7 @@ SrsClient::~SrsClient()
srs_freep(req);
srs_freep(res);
srs_freep(rtmp);
srs_freep(skt);
srs_freep(refer);
#ifdef SRS_HTTP
srs_freep(http_hooks);

View file

@ -41,6 +41,7 @@ class SrsSource;
class SrsRefer;
class SrsConsumer;
class SrsCommonMessage;
class SrsSocket;
#ifdef SRS_HTTP
class SrsHttpHooks;
#endif
@ -55,6 +56,7 @@ private:
char* ip;
SrsRequest* req;
SrsResponse* res;
SrsSocket* skt;
SrsRtmp* rtmp;
SrsRefer* refer;
#ifdef SRS_HTTP

View file

@ -37,11 +37,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_config.hpp>
#include <srs_core_source.hpp>
#include <srs_core_autofree.hpp>
#include <srs_core_socket.hpp>
SrsForwarder::SrsForwarder(SrsSource* _source)
{
source = _source;
io = NULL;
client = NULL;
stfd = NULL;
stream_id = 0;
@ -127,6 +129,7 @@ void SrsForwarder::on_unpublish()
close_underlayer_socket();
srs_freep(client);
srs_freep(io);
}
int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
@ -250,9 +253,12 @@ int SrsForwarder::connect_server()
srs_error("st_netfd_open_socket failed. ret=%d", ret);
return ret;
}
srs_freep(client);
client = new SrsRtmpClient(stfd);
srs_freep(io);
io = new SrsSocket(stfd);
client = new SrsRtmpClient(io);
// connect to server.
std::string ip = srs_dns_resolve(server);

View file

@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_st.hpp>
#include <srs_core_thread.hpp>
class ISrsProtocolReaderWriter;
class SrsSharedPtrMessage;
class SrsOnMetaDataPacket;
class SrsMessageQueue;
@ -59,6 +60,7 @@ private:
SrsThread* pthread;
private:
SrsSource* source;
ISrsProtocolReaderWriter* io;
SrsRtmpClient* client;
SrsRtmpJitter* jitter;
SrsMessageQueue* queue;

View file

@ -29,7 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_core_autofree.hpp>
#include <srs_core_socket.hpp>
#include <srs_protocol_io.hpp>
void srs_random_generate(char* bytes, int size)
{
@ -1067,7 +1067,7 @@ SrsSimpleHandshake::~SrsSimpleHandshake()
{
}
int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshake& complex_hs)
int SrsSimpleHandshake::handshake_with_client(ISrsProtocolReaderWriter* skt, SrsComplexHandshake& complex_hs)
{
int ret = ERROR_SUCCESS;
@ -1075,7 +1075,7 @@ int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshak
char* c0c1 = new char[1537];
SrsAutoFree(char, c0c1, true);
if ((ret = skt.read_fully(c0c1, 1537, &nsize)) != ERROR_SUCCESS) {
if ((ret = skt->read_fully(c0c1, 1537, &nsize)) != ERROR_SUCCESS) {
srs_warn("read c0c1 failed. ret=%d", ret);
return ret;
}
@ -1106,7 +1106,7 @@ int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshak
SrsAutoFree(char, s0s1s2, true);
// plain text required.
s0s1s2[0] = 0x03;
if ((ret = skt.write(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) {
if ((ret = skt->write(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) {
srs_warn("simple handshake send s0s1s2 failed. ret=%d", ret);
return ret;
}
@ -1114,7 +1114,7 @@ int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshak
char* c2 = new char[1536];
SrsAutoFree(char, c2, true);
if ((ret = skt.read_fully(c2, 1536, &nsize)) != ERROR_SUCCESS) {
if ((ret = skt->read_fully(c2, 1536, &nsize)) != ERROR_SUCCESS) {
srs_warn("simple handshake read c2 failed. ret=%d", ret);
return ret;
}
@ -1125,7 +1125,7 @@ int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshak
return ret;
}
int SrsSimpleHandshake::handshake_with_server(SrsSocket& skt, SrsComplexHandshake& complex_hs)
int SrsSimpleHandshake::handshake_with_server(ISrsProtocolReaderWriter* skt, SrsComplexHandshake& complex_hs)
{
int ret = ERROR_SUCCESS;
@ -1151,7 +1151,7 @@ int SrsSimpleHandshake::handshake_with_server(SrsSocket& skt, SrsComplexHandshak
// plain text required.
c0c1[0] = 0x03;
if ((ret = skt.write(c0c1, 1537, &nsize)) != ERROR_SUCCESS) {
if ((ret = skt->write(c0c1, 1537, &nsize)) != ERROR_SUCCESS) {
srs_warn("write c0c1 failed. ret=%d", ret);
return ret;
}
@ -1159,7 +1159,7 @@ int SrsSimpleHandshake::handshake_with_server(SrsSocket& skt, SrsComplexHandshak
char* s0s1s2 = new char[3073];
SrsAutoFree(char, s0s1s2, true);
if ((ret = skt.read_fully(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) {
if ((ret = skt->read_fully(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) {
srs_warn("simple handshake recv s0s1s2 failed. ret=%d", ret);
return ret;
}
@ -1175,7 +1175,7 @@ int SrsSimpleHandshake::handshake_with_server(SrsSocket& skt, SrsComplexHandshak
char* c2 = new char[1536];
SrsAutoFree(char, c2, true);
srs_random_generate(c2, 1536);
if ((ret = skt.write(c2, 1536, &nsize)) != ERROR_SUCCESS) {
if ((ret = skt->write(c2, 1536, &nsize)) != ERROR_SUCCESS) {
srs_warn("simple handshake write c2 failed. ret=%d", ret);
return ret;
}
@ -1195,12 +1195,12 @@ SrsComplexHandshake::~SrsComplexHandshake()
}
#ifndef SRS_SSL
int SrsComplexHandshake::handshake_with_client(SrsSocket& /*skt*/, char* /*_c1*/)
int SrsComplexHandshake::handshake_with_client(ISrsProtocolReaderWriter* /*skt*/, char* /*_c1*/)
{
return ERROR_RTMP_TRY_SIMPLE_HS;
}
#else
int SrsComplexHandshake::handshake_with_client(SrsSocket& skt, char* _c1)
int SrsComplexHandshake::handshake_with_client(ISrsProtocolReaderWriter* skt, char* _c1)
{
int ret = ERROR_SUCCESS;
@ -1258,7 +1258,7 @@ int SrsComplexHandshake::handshake_with_client(SrsSocket& skt, char* _c1)
s0s1s2[0] = 0x03;
s1.dump(s0s1s2 + 1);
s2.dump(s0s1s2 + 1537);
if ((ret = skt.write(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) {
if ((ret = skt->write(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) {
srs_warn("complex handshake send s0s1s2 failed. ret=%d", ret);
return ret;
}
@ -1267,7 +1267,7 @@ int SrsComplexHandshake::handshake_with_client(SrsSocket& skt, char* _c1)
// recv c2
char* c2 = new char[1536];
SrsAutoFree(char, c2, true);
if ((ret = skt.read_fully(c2, 1536, &nsize)) != ERROR_SUCCESS) {
if ((ret = skt->read_fully(c2, 1536, &nsize)) != ERROR_SUCCESS) {
srs_warn("complex handshake read c2 failed. ret=%d", ret);
return ret;
}
@ -1278,12 +1278,12 @@ int SrsComplexHandshake::handshake_with_client(SrsSocket& skt, char* _c1)
#endif
#ifndef SRS_SSL
int SrsComplexHandshake::handshake_with_server(SrsSocket& /*skt*/)
int SrsComplexHandshake::handshake_with_server(ISrsProtocolReaderWriter* /*skt*/)
{
return ERROR_RTMP_TRY_SIMPLE_HS;
}
#else
int SrsComplexHandshake::handshake_with_server(SrsSocket& /*skt*/)
int SrsComplexHandshake::handshake_with_server(ISrsProtocolReaderWriter* /*skt*/)
{
int ret = ERROR_SUCCESS;

View file

@ -30,7 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
class SrsSocket;
class ISrsProtocolReaderWriter;
class SrsComplexHandshake;
/**
@ -47,8 +47,8 @@ public:
* @param complex_hs, try complex handshake first,
* if failed, rollback to simple handshake.
*/
virtual int handshake_with_client(SrsSocket& skt, SrsComplexHandshake& complex_hs);
virtual int handshake_with_server(SrsSocket& skt, SrsComplexHandshake& complex_hs);
virtual int handshake_with_client(ISrsProtocolReaderWriter* io, SrsComplexHandshake& complex_hs);
virtual int handshake_with_server(ISrsProtocolReaderWriter* io, SrsComplexHandshake& complex_hs);
};
/**
@ -71,8 +71,8 @@ public:
* try simple handshake if error is ERROR_RTMP_TRY_SIMPLE_HS,
* otherwise, disconnect
*/
virtual int handshake_with_client(SrsSocket& skt, char* _c1);
virtual int handshake_with_server(SrsSocket& skt);
virtual int handshake_with_client(ISrsProtocolReaderWriter* io, char* _c1);
virtual int handshake_with_server(ISrsProtocolReaderWriter* io);
};
#endif

View file

@ -26,7 +26,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_log.hpp>
#include <srs_protocol_amf0.hpp>
#include <srs_kernel_error.hpp>
#include <srs_core_socket.hpp>
#include <srs_protocol_io.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_stream.hpp>
#include <srs_core_autofree.hpp>
@ -290,11 +290,10 @@ SrsProtocol::AckWindowSize::AckWindowSize()
ack_window_size = acked_size = 0;
}
SrsProtocol::SrsProtocol(st_netfd_t client_stfd)
SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
{
stfd = client_stfd;
buffer = new SrsBuffer();
skt = new SrsSocket(stfd);
skt = io;
in_chunk_size = out_chunk_size = RTMP_DEFAULT_CHUNK_SIZE;
}
@ -311,7 +310,6 @@ SrsProtocol::~SrsProtocol()
chunk_streams.clear();
srs_freep(buffer);
srs_freep(skt);
}
string SrsProtocol::get_request_name(double transcationId)
@ -720,7 +718,7 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
// when we got a chunk header, we should increase the timeout,
// or we maybe timeout and disconnect the client.
int64_t timeout_us = skt->get_recv_timeout();
if (timeout_us != (int64_t)ST_UTIME_NO_TIMEOUT) {
if (!skt->is_never_timeout(timeout_us)) {
int64_t pkt_timeout_us = srs_max(timeout_us, SRS_MIN_RECV_TIMEOUT_US);
skt->set_recv_timeout(pkt_timeout_us);
srs_verbose("change recv timeout_us "
@ -764,7 +762,7 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
}
// reset the recv timeout
if (timeout_us != (int64_t)ST_UTIME_NO_TIMEOUT) {
if (!skt->is_never_timeout(timeout_us)) {
skt->set_recv_timeout(timeout_us);
srs_verbose("reset recv timeout_us to %"PRId64"", timeout_us);
}

View file

@ -33,7 +33,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <map>
#include <string>
#include <srs_core_st.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
@ -76,7 +75,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// when error, encoder sleep for a while and retry.
#define SRS_ENCODER_SLEEP_US (int64_t)(3*1000*1000LL)
class SrsSocket;
class ISrsProtocolReaderWriter;
class SrsBuffer;
class SrsPacket;
class SrsStream;
@ -123,8 +122,7 @@ private:
};
// peer in/out
private:
st_netfd_t stfd;
SrsSocket* skt;
ISrsProtocolReaderWriter* skt;
char* pp;
/**
* requests sent out, used to build the response.
@ -144,7 +142,11 @@ private:
char out_header_fmt3[RTMP_MAX_FMT3_HEADER_SIZE];
int32_t out_chunk_size;
public:
SrsProtocol(st_netfd_t client_stfd);
/**
* use io to create the protocol stack,
* @param io, provides io interfaces, user must free it.
*/
SrsProtocol(ISrsProtocolReaderWriter* io);
virtual ~SrsProtocol();
public:
std::string get_request_name(double transcationId);
@ -1214,6 +1216,15 @@ protected:
* @pmsg, user must free it. NULL if not success.
* @ppacket, store in the pmsg, user must never free it. NULL if not success.
* @remark, only when success, user can use and must free the pmsg/ppacket.
* for example:
SrsCommonMessage* msg = NULL;
SrsConnectAppResPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
return ret;
}
// use pkt
* user should never recv message and convert it, use this method instead.
* if need to set timeout, use set timeout of SrsProtocol.
*/
template<class T>
int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T** ppacket)

View file

@ -25,7 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_core_socket.hpp>
#include <srs_protocol_io.hpp>
#include <srs_core_protocol.hpp>
#include <srs_core_autofree.hpp>
#include <srs_protocol_amf0.hpp>
@ -197,10 +197,10 @@ SrsResponse::~SrsResponse()
{
}
SrsRtmpClient::SrsRtmpClient(st_netfd_t _stfd)
SrsRtmpClient::SrsRtmpClient(ISrsProtocolReaderWriter* skt)
{
stfd = _stfd;
protocol = new SrsProtocol(stfd);
io = skt;
protocol = new SrsProtocol(skt);
}
SrsRtmpClient::~SrsRtmpClient()
@ -251,15 +251,10 @@ int SrsRtmpClient::send_message(ISrsMessage* msg)
int SrsRtmpClient::handshake()
{
int ret = ERROR_SUCCESS;
SrsSocket skt(stfd);
skt.set_recv_timeout(protocol->get_recv_timeout());
skt.set_send_timeout(protocol->get_send_timeout());
SrsComplexHandshake complex_hs;
SrsSimpleHandshake simple_hs;
if ((ret = simple_hs.handshake_with_server(skt, complex_hs)) != ERROR_SUCCESS) {
if ((ret = simple_hs.handshake_with_server(io, complex_hs)) != ERROR_SUCCESS) {
return ret;
}
@ -449,10 +444,10 @@ int SrsRtmpClient::publish(string stream, int stream_id)
return ret;
}
SrsRtmp::SrsRtmp(st_netfd_t client_stfd)
SrsRtmp::SrsRtmp(ISrsProtocolReaderWriter* skt)
{
protocol = new SrsProtocol(client_stfd);
stfd = client_stfd;
io = skt;
protocol = new SrsProtocol(skt);
}
SrsRtmp::~SrsRtmp()
@ -518,15 +513,10 @@ int SrsRtmp::send_message(ISrsMessage* msg)
int SrsRtmp::handshake()
{
int ret = ERROR_SUCCESS;
SrsSocket skt(stfd);
skt.set_recv_timeout(protocol->get_recv_timeout());
skt.set_send_timeout(protocol->get_send_timeout());
SrsComplexHandshake complex_hs;
SrsSimpleHandshake simple_hs;
if ((ret = simple_hs.handshake_with_client(skt, complex_hs)) != ERROR_SUCCESS) {
if ((ret = simple_hs.handshake_with_client(io, complex_hs)) != ERROR_SUCCESS) {
return ret;
}

View file

@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_st.hpp>
class SrsProtocol;
class ISrsProtocolReaderWriter;
class ISrsMessage;
class SrsCommonMessage;
class SrsCreateStreamPacket;
@ -115,9 +116,9 @@ class SrsRtmpClient
{
protected:
SrsProtocol* protocol;
st_netfd_t stfd;
ISrsProtocolReaderWriter* io;
public:
SrsRtmpClient(st_netfd_t _stfd);
SrsRtmpClient(ISrsProtocolReaderWriter* skt);
virtual ~SrsRtmpClient();
public:
virtual void set_recv_timeout(int64_t timeout_us);
@ -145,9 +146,9 @@ class SrsRtmp
{
private:
SrsProtocol* protocol;
st_netfd_t stfd;
ISrsProtocolReaderWriter* io;
public:
SrsRtmp(st_netfd_t client_stfd);
SrsRtmp(ISrsProtocolReaderWriter* skt);
virtual ~SrsRtmp();
public:
virtual SrsProtocol* get_protocol();

View file

@ -37,6 +37,11 @@ SrsSocket::~SrsSocket()
{
}
bool SrsSocket::is_never_timeout(int64_t timeout_us)
{
return timeout_us == (int64_t)ST_UTIME_NO_TIMEOUT;
}
void SrsSocket::set_recv_timeout(int64_t timeout_us)
{
recv_timeout = timeout_us;

View file

@ -31,13 +31,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <srs_core_st.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_protocol_io.hpp>
/**
* the socket provides TCP socket over st,
* that is, the sync socket mechanism.
*/
class SrsSocket : public ISrsBufferReader
class SrsSocket : public ISrsProtocolReaderWriter
{
private:
int64_t recv_timeout;
@ -50,6 +50,7 @@ public:
SrsSocket(st_netfd_t client_stfd);
virtual ~SrsSocket();
public:
virtual bool is_never_timeout(int64_t timeout_us);
virtual void set_recv_timeout(int64_t timeout_us);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t timeout_us);