diff --git a/trunk/configure b/trunk/configure index 208e38226..9950c5279 100755 --- a/trunk/configure +++ b/trunk/configure @@ -142,7 +142,7 @@ KERNEL_OBJS="${MODULE_OBJS[@]}" MODULE_ID="PROTOCOL" MODULE_DEPENDS=("CORE" "KERNEL") ModuleLibIncs=(${SRS_OBJS}) -MODULE_FILES=("srs_protocol_amf0") +MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io") MODULE_DIR="src/protocol" . auto/modules.sh PROTOCOL_OBJS="${MODULE_OBJS[@]}" # diff --git a/trunk/src/app/srs_core_client.cpp b/trunk/src/app/srs_core_client.cpp index bb25dd5dd..0145f71e4 100644 --- a/trunk/src/app/srs_core_client.cpp +++ b/trunk/src/app/srs_core_client.cpp @@ -41,6 +41,7 @@ using namespace std; #include #include #include +#include SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) : SrsConnection(srs_server, client_stfd) @@ -48,7 +49,8 @@ SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) ip = NULL; req = new SrsRequest(); res = new SrsResponse(); - rtmp = new SrsRtmp(client_stfd); + skt = new SrsSocket(client_stfd); + rtmp = new SrsRtmp(skt); refer = new SrsRefer(); #ifdef SRS_HTTP http_hooks = new SrsHttpHooks(); @@ -66,6 +68,7 @@ SrsClient::~SrsClient() srs_freep(req); srs_freep(res); srs_freep(rtmp); + srs_freep(skt); srs_freep(refer); #ifdef SRS_HTTP srs_freep(http_hooks); diff --git a/trunk/src/app/srs_core_client.hpp b/trunk/src/app/srs_core_client.hpp index f2c94edb1..d72f59f2a 100644 --- a/trunk/src/app/srs_core_client.hpp +++ b/trunk/src/app/srs_core_client.hpp @@ -41,6 +41,7 @@ class SrsSource; class SrsRefer; class SrsConsumer; class SrsCommonMessage; +class SrsSocket; #ifdef SRS_HTTP class SrsHttpHooks; #endif @@ -55,6 +56,7 @@ private: char* ip; SrsRequest* req; SrsResponse* res; + SrsSocket* skt; SrsRtmp* rtmp; SrsRefer* refer; #ifdef SRS_HTTP diff --git a/trunk/src/app/srs_core_forward.cpp b/trunk/src/app/srs_core_forward.cpp index 289173bdb..d2787e072 100644 --- a/trunk/src/app/srs_core_forward.cpp +++ b/trunk/src/app/srs_core_forward.cpp @@ -37,11 +37,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include SrsForwarder::SrsForwarder(SrsSource* _source) { source = _source; + io = NULL; client = NULL; stfd = NULL; stream_id = 0; @@ -127,6 +129,7 @@ void SrsForwarder::on_unpublish() close_underlayer_socket(); srs_freep(client); + srs_freep(io); } int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) @@ -250,9 +253,12 @@ int SrsForwarder::connect_server() srs_error("st_netfd_open_socket failed. ret=%d", ret); return ret; } - + srs_freep(client); - client = new SrsRtmpClient(stfd); + srs_freep(io); + + io = new SrsSocket(stfd); + client = new SrsRtmpClient(io); // connect to server. std::string ip = srs_dns_resolve(server); diff --git a/trunk/src/app/srs_core_forward.hpp b/trunk/src/app/srs_core_forward.hpp index afc1217af..857e80356 100644 --- a/trunk/src/app/srs_core_forward.hpp +++ b/trunk/src/app/srs_core_forward.hpp @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +class ISrsProtocolReaderWriter; class SrsSharedPtrMessage; class SrsOnMetaDataPacket; class SrsMessageQueue; @@ -59,6 +60,7 @@ private: SrsThread* pthread; private: SrsSource* source; + ISrsProtocolReaderWriter* io; SrsRtmpClient* client; SrsRtmpJitter* jitter; SrsMessageQueue* queue; diff --git a/trunk/src/app/srs_core_handshake.cpp b/trunk/src/app/srs_core_handshake.cpp index 9f05133df..979906657 100644 --- a/trunk/src/app/srs_core_handshake.cpp +++ b/trunk/src/app/srs_core_handshake.cpp @@ -29,7 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include -#include +#include void srs_random_generate(char* bytes, int size) { @@ -1067,7 +1067,7 @@ SrsSimpleHandshake::~SrsSimpleHandshake() { } -int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshake& complex_hs) +int SrsSimpleHandshake::handshake_with_client(ISrsProtocolReaderWriter* skt, SrsComplexHandshake& complex_hs) { int ret = ERROR_SUCCESS; @@ -1075,7 +1075,7 @@ int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshak char* c0c1 = new char[1537]; SrsAutoFree(char, c0c1, true); - if ((ret = skt.read_fully(c0c1, 1537, &nsize)) != ERROR_SUCCESS) { + if ((ret = skt->read_fully(c0c1, 1537, &nsize)) != ERROR_SUCCESS) { srs_warn("read c0c1 failed. ret=%d", ret); return ret; } @@ -1106,7 +1106,7 @@ int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshak SrsAutoFree(char, s0s1s2, true); // plain text required. s0s1s2[0] = 0x03; - if ((ret = skt.write(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) { + if ((ret = skt->write(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) { srs_warn("simple handshake send s0s1s2 failed. ret=%d", ret); return ret; } @@ -1114,7 +1114,7 @@ int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshak char* c2 = new char[1536]; SrsAutoFree(char, c2, true); - if ((ret = skt.read_fully(c2, 1536, &nsize)) != ERROR_SUCCESS) { + if ((ret = skt->read_fully(c2, 1536, &nsize)) != ERROR_SUCCESS) { srs_warn("simple handshake read c2 failed. ret=%d", ret); return ret; } @@ -1125,7 +1125,7 @@ int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshak return ret; } -int SrsSimpleHandshake::handshake_with_server(SrsSocket& skt, SrsComplexHandshake& complex_hs) +int SrsSimpleHandshake::handshake_with_server(ISrsProtocolReaderWriter* skt, SrsComplexHandshake& complex_hs) { int ret = ERROR_SUCCESS; @@ -1151,7 +1151,7 @@ int SrsSimpleHandshake::handshake_with_server(SrsSocket& skt, SrsComplexHandshak // plain text required. c0c1[0] = 0x03; - if ((ret = skt.write(c0c1, 1537, &nsize)) != ERROR_SUCCESS) { + if ((ret = skt->write(c0c1, 1537, &nsize)) != ERROR_SUCCESS) { srs_warn("write c0c1 failed. ret=%d", ret); return ret; } @@ -1159,7 +1159,7 @@ int SrsSimpleHandshake::handshake_with_server(SrsSocket& skt, SrsComplexHandshak char* s0s1s2 = new char[3073]; SrsAutoFree(char, s0s1s2, true); - if ((ret = skt.read_fully(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) { + if ((ret = skt->read_fully(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) { srs_warn("simple handshake recv s0s1s2 failed. ret=%d", ret); return ret; } @@ -1175,7 +1175,7 @@ int SrsSimpleHandshake::handshake_with_server(SrsSocket& skt, SrsComplexHandshak char* c2 = new char[1536]; SrsAutoFree(char, c2, true); srs_random_generate(c2, 1536); - if ((ret = skt.write(c2, 1536, &nsize)) != ERROR_SUCCESS) { + if ((ret = skt->write(c2, 1536, &nsize)) != ERROR_SUCCESS) { srs_warn("simple handshake write c2 failed. ret=%d", ret); return ret; } @@ -1195,12 +1195,12 @@ SrsComplexHandshake::~SrsComplexHandshake() } #ifndef SRS_SSL -int SrsComplexHandshake::handshake_with_client(SrsSocket& /*skt*/, char* /*_c1*/) +int SrsComplexHandshake::handshake_with_client(ISrsProtocolReaderWriter* /*skt*/, char* /*_c1*/) { return ERROR_RTMP_TRY_SIMPLE_HS; } #else -int SrsComplexHandshake::handshake_with_client(SrsSocket& skt, char* _c1) +int SrsComplexHandshake::handshake_with_client(ISrsProtocolReaderWriter* skt, char* _c1) { int ret = ERROR_SUCCESS; @@ -1258,7 +1258,7 @@ int SrsComplexHandshake::handshake_with_client(SrsSocket& skt, char* _c1) s0s1s2[0] = 0x03; s1.dump(s0s1s2 + 1); s2.dump(s0s1s2 + 1537); - if ((ret = skt.write(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) { + if ((ret = skt->write(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) { srs_warn("complex handshake send s0s1s2 failed. ret=%d", ret); return ret; } @@ -1267,7 +1267,7 @@ int SrsComplexHandshake::handshake_with_client(SrsSocket& skt, char* _c1) // recv c2 char* c2 = new char[1536]; SrsAutoFree(char, c2, true); - if ((ret = skt.read_fully(c2, 1536, &nsize)) != ERROR_SUCCESS) { + if ((ret = skt->read_fully(c2, 1536, &nsize)) != ERROR_SUCCESS) { srs_warn("complex handshake read c2 failed. ret=%d", ret); return ret; } @@ -1278,12 +1278,12 @@ int SrsComplexHandshake::handshake_with_client(SrsSocket& skt, char* _c1) #endif #ifndef SRS_SSL -int SrsComplexHandshake::handshake_with_server(SrsSocket& /*skt*/) +int SrsComplexHandshake::handshake_with_server(ISrsProtocolReaderWriter* /*skt*/) { return ERROR_RTMP_TRY_SIMPLE_HS; } #else -int SrsComplexHandshake::handshake_with_server(SrsSocket& /*skt*/) +int SrsComplexHandshake::handshake_with_server(ISrsProtocolReaderWriter* /*skt*/) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_core_handshake.hpp b/trunk/src/app/srs_core_handshake.hpp index 06686fc1e..4b7484c5e 100644 --- a/trunk/src/app/srs_core_handshake.hpp +++ b/trunk/src/app/srs_core_handshake.hpp @@ -30,7 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include -class SrsSocket; +class ISrsProtocolReaderWriter; class SrsComplexHandshake; /** @@ -47,8 +47,8 @@ public: * @param complex_hs, try complex handshake first, * if failed, rollback to simple handshake. */ - virtual int handshake_with_client(SrsSocket& skt, SrsComplexHandshake& complex_hs); - virtual int handshake_with_server(SrsSocket& skt, SrsComplexHandshake& complex_hs); + virtual int handshake_with_client(ISrsProtocolReaderWriter* io, SrsComplexHandshake& complex_hs); + virtual int handshake_with_server(ISrsProtocolReaderWriter* io, SrsComplexHandshake& complex_hs); }; /** @@ -71,8 +71,8 @@ public: * try simple handshake if error is ERROR_RTMP_TRY_SIMPLE_HS, * otherwise, disconnect */ - virtual int handshake_with_client(SrsSocket& skt, char* _c1); - virtual int handshake_with_server(SrsSocket& skt); + virtual int handshake_with_client(ISrsProtocolReaderWriter* io, char* _c1); + virtual int handshake_with_server(ISrsProtocolReaderWriter* io); }; #endif \ No newline at end of file diff --git a/trunk/src/app/srs_core_protocol.cpp b/trunk/src/app/srs_core_protocol.cpp index f802c47af..9b333d212 100644 --- a/trunk/src/app/srs_core_protocol.cpp +++ b/trunk/src/app/srs_core_protocol.cpp @@ -26,7 +26,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include -#include +#include #include #include #include @@ -290,11 +290,10 @@ SrsProtocol::AckWindowSize::AckWindowSize() ack_window_size = acked_size = 0; } -SrsProtocol::SrsProtocol(st_netfd_t client_stfd) +SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io) { - stfd = client_stfd; buffer = new SrsBuffer(); - skt = new SrsSocket(stfd); + skt = io; in_chunk_size = out_chunk_size = RTMP_DEFAULT_CHUNK_SIZE; } @@ -311,7 +310,6 @@ SrsProtocol::~SrsProtocol() chunk_streams.clear(); srs_freep(buffer); - srs_freep(skt); } string SrsProtocol::get_request_name(double transcationId) @@ -720,7 +718,7 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) // when we got a chunk header, we should increase the timeout, // or we maybe timeout and disconnect the client. int64_t timeout_us = skt->get_recv_timeout(); - if (timeout_us != (int64_t)ST_UTIME_NO_TIMEOUT) { + if (!skt->is_never_timeout(timeout_us)) { int64_t pkt_timeout_us = srs_max(timeout_us, SRS_MIN_RECV_TIMEOUT_US); skt->set_recv_timeout(pkt_timeout_us); srs_verbose("change recv timeout_us " @@ -764,7 +762,7 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) } // reset the recv timeout - if (timeout_us != (int64_t)ST_UTIME_NO_TIMEOUT) { + if (!skt->is_never_timeout(timeout_us)) { skt->set_recv_timeout(timeout_us); srs_verbose("reset recv timeout_us to %"PRId64"", timeout_us); } diff --git a/trunk/src/app/srs_core_protocol.hpp b/trunk/src/app/srs_core_protocol.hpp index 7aaf83849..8e09925bf 100644 --- a/trunk/src/app/srs_core_protocol.hpp +++ b/trunk/src/app/srs_core_protocol.hpp @@ -33,7 +33,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -#include #include #include @@ -76,7 +75,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // when error, encoder sleep for a while and retry. #define SRS_ENCODER_SLEEP_US (int64_t)(3*1000*1000LL) -class SrsSocket; +class ISrsProtocolReaderWriter; class SrsBuffer; class SrsPacket; class SrsStream; @@ -123,8 +122,7 @@ private: }; // peer in/out private: - st_netfd_t stfd; - SrsSocket* skt; + ISrsProtocolReaderWriter* skt; char* pp; /** * requests sent out, used to build the response. @@ -144,7 +142,11 @@ private: char out_header_fmt3[RTMP_MAX_FMT3_HEADER_SIZE]; int32_t out_chunk_size; public: - SrsProtocol(st_netfd_t client_stfd); + /** + * use io to create the protocol stack, + * @param io, provides io interfaces, user must free it. + */ + SrsProtocol(ISrsProtocolReaderWriter* io); virtual ~SrsProtocol(); public: std::string get_request_name(double transcationId); @@ -1214,6 +1216,15 @@ protected: * @pmsg, user must free it. NULL if not success. * @ppacket, store in the pmsg, user must never free it. NULL if not success. * @remark, only when success, user can use and must free the pmsg/ppacket. +* for example: + SrsCommonMessage* msg = NULL; + SrsConnectAppResPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + return ret; + } + // use pkt +* user should never recv message and convert it, use this method instead. +* if need to set timeout, use set timeout of SrsProtocol. */ template int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T** ppacket) diff --git a/trunk/src/app/srs_core_rtmp.cpp b/trunk/src/app/srs_core_rtmp.cpp index 5cb452582..27ad3dba4 100644 --- a/trunk/src/app/srs_core_rtmp.cpp +++ b/trunk/src/app/srs_core_rtmp.cpp @@ -25,7 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -#include +#include #include #include #include @@ -197,10 +197,10 @@ SrsResponse::~SrsResponse() { } -SrsRtmpClient::SrsRtmpClient(st_netfd_t _stfd) +SrsRtmpClient::SrsRtmpClient(ISrsProtocolReaderWriter* skt) { - stfd = _stfd; - protocol = new SrsProtocol(stfd); + io = skt; + protocol = new SrsProtocol(skt); } SrsRtmpClient::~SrsRtmpClient() @@ -251,15 +251,10 @@ int SrsRtmpClient::send_message(ISrsMessage* msg) int SrsRtmpClient::handshake() { int ret = ERROR_SUCCESS; - - SrsSocket skt(stfd); - - skt.set_recv_timeout(protocol->get_recv_timeout()); - skt.set_send_timeout(protocol->get_send_timeout()); SrsComplexHandshake complex_hs; SrsSimpleHandshake simple_hs; - if ((ret = simple_hs.handshake_with_server(skt, complex_hs)) != ERROR_SUCCESS) { + if ((ret = simple_hs.handshake_with_server(io, complex_hs)) != ERROR_SUCCESS) { return ret; } @@ -449,10 +444,10 @@ int SrsRtmpClient::publish(string stream, int stream_id) return ret; } -SrsRtmp::SrsRtmp(st_netfd_t client_stfd) +SrsRtmp::SrsRtmp(ISrsProtocolReaderWriter* skt) { - protocol = new SrsProtocol(client_stfd); - stfd = client_stfd; + io = skt; + protocol = new SrsProtocol(skt); } SrsRtmp::~SrsRtmp() @@ -518,15 +513,10 @@ int SrsRtmp::send_message(ISrsMessage* msg) int SrsRtmp::handshake() { int ret = ERROR_SUCCESS; - - SrsSocket skt(stfd); - - skt.set_recv_timeout(protocol->get_recv_timeout()); - skt.set_send_timeout(protocol->get_send_timeout()); SrsComplexHandshake complex_hs; SrsSimpleHandshake simple_hs; - if ((ret = simple_hs.handshake_with_client(skt, complex_hs)) != ERROR_SUCCESS) { + if ((ret = simple_hs.handshake_with_client(io, complex_hs)) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/app/srs_core_rtmp.hpp b/trunk/src/app/srs_core_rtmp.hpp index 8c79661ca..bea35619a 100644 --- a/trunk/src/app/srs_core_rtmp.hpp +++ b/trunk/src/app/srs_core_rtmp.hpp @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class SrsProtocol; +class ISrsProtocolReaderWriter; class ISrsMessage; class SrsCommonMessage; class SrsCreateStreamPacket; @@ -115,9 +116,9 @@ class SrsRtmpClient { protected: SrsProtocol* protocol; - st_netfd_t stfd; + ISrsProtocolReaderWriter* io; public: - SrsRtmpClient(st_netfd_t _stfd); + SrsRtmpClient(ISrsProtocolReaderWriter* skt); virtual ~SrsRtmpClient(); public: virtual void set_recv_timeout(int64_t timeout_us); @@ -145,9 +146,9 @@ class SrsRtmp { private: SrsProtocol* protocol; - st_netfd_t stfd; + ISrsProtocolReaderWriter* io; public: - SrsRtmp(st_netfd_t client_stfd); + SrsRtmp(ISrsProtocolReaderWriter* skt); virtual ~SrsRtmp(); public: virtual SrsProtocol* get_protocol(); diff --git a/trunk/src/app/srs_core_socket.cpp b/trunk/src/app/srs_core_socket.cpp index 90ec123db..6fab06020 100644 --- a/trunk/src/app/srs_core_socket.cpp +++ b/trunk/src/app/srs_core_socket.cpp @@ -37,6 +37,11 @@ SrsSocket::~SrsSocket() { } +bool SrsSocket::is_never_timeout(int64_t timeout_us) +{ + return timeout_us == (int64_t)ST_UTIME_NO_TIMEOUT; +} + void SrsSocket::set_recv_timeout(int64_t timeout_us) { recv_timeout = timeout_us; diff --git a/trunk/src/app/srs_core_socket.hpp b/trunk/src/app/srs_core_socket.hpp index 38f0368e5..31ee28742 100644 --- a/trunk/src/app/srs_core_socket.hpp +++ b/trunk/src/app/srs_core_socket.hpp @@ -31,13 +31,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -#include +#include /** * the socket provides TCP socket over st, * that is, the sync socket mechanism. */ -class SrsSocket : public ISrsBufferReader +class SrsSocket : public ISrsProtocolReaderWriter { private: int64_t recv_timeout; @@ -50,6 +50,7 @@ public: SrsSocket(st_netfd_t client_stfd); virtual ~SrsSocket(); public: + virtual bool is_never_timeout(int64_t timeout_us); virtual void set_recv_timeout(int64_t timeout_us); virtual int64_t get_recv_timeout(); virtual void set_send_timeout(int64_t timeout_us); diff --git a/trunk/src/kernel/srs_kernel_buffer.hpp b/trunk/src/kernel/srs_kernel_buffer.hpp index f652309f0..309f96680 100644 --- a/trunk/src/kernel/srs_kernel_buffer.hpp +++ b/trunk/src/kernel/srs_kernel_buffer.hpp @@ -40,6 +40,7 @@ class ISrsBufferReader public: ISrsBufferReader(); virtual ~ISrsBufferReader(); +// for protocol/amf0/msg-codec public: virtual int read(const void* buf, size_t size, ssize_t* nread) = 0; }; diff --git a/trunk/src/main/srs_main_bandcheck.cpp b/trunk/src/main/srs_main_bandcheck.cpp index b5a865066..4d5665219 100644 --- a/trunk/src/main/srs_main_bandcheck.cpp +++ b/trunk/src/main/srs_main_bandcheck.cpp @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include #include @@ -57,6 +58,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying" #define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing" +class ISrsProtocolReaderWriter; + /** * @brief class of Linux version band check client * check play and publish speed. @@ -64,7 +67,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsBandCheckClient : public SrsRtmpClient { public: - SrsBandCheckClient(st_netfd_t _stfd); + SrsBandCheckClient(ISrsProtocolReaderWriter* io); ~SrsBandCheckClient(); public: @@ -144,8 +147,9 @@ public: private: int connect_server(); - private: + st_netfd_t stfd; + ISrsProtocolReaderWriter* skt; SrsBandCheckClient* bandCheck_Client; std::string server_address; int server_port; @@ -234,8 +238,8 @@ int main(int argc ,char* argv[]) return 0; } -SrsBandCheckClient::SrsBandCheckClient(st_netfd_t _stfd) - : SrsRtmpClient(_stfd) +SrsBandCheckClient::SrsBandCheckClient(ISrsProtocolReaderWriter* io) + : SrsRtmpClient(io) { } @@ -475,29 +479,17 @@ int SrsBandCheckClient::send_pub_data() int SrsBandCheckClient::expect_stop_pub() { int ret = ERROR_SUCCESS; - - while (true) { - if ((ret = st_netfd_poll(stfd, POLLIN, 1000)) == ERROR_SUCCESS) { - SrsCommonMessage* msg = 0; - if ((ret = recv_message(&msg)) != ERROR_SUCCESS) - { - srs_error("recv message failed while expect stop pub. ret=%d", ret); - return ret; - } - - if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) { - srs_error("decode packet error while expect stop pub. ret=%d", ret); - return ret; - } - - SrsBandwidthPacket* pkt = dynamic_cast(msg->get_packet()); - if (pkt && pkt->command_name == SRS_BW_CHECK_STOP_PUBLISH) { - - return ret; - } - } else { - break; - } + + this->set_recv_timeout(1000 * 1000); + this->set_send_timeout(1000 * 1000); + + SrsCommonMessage* msg; + SrsBandwidthPacket* pkt; + if ((ret = srs_rtmp_expect_message(this->protocol, &msg, &pkt)) != ERROR_SUCCESS) { + return ret; + } + if (pkt->command_name == SRS_BW_CHECK_STOP_PUBLISH) { + return ret; } return ret; @@ -638,15 +630,17 @@ int SrsBandCheckClient::send_final() } SrsBandCheck::SrsBandCheck() - : bandCheck_Client(0) { + skt = NULL; + bandCheck_Client = NULL; + stfd = NULL; } SrsBandCheck::~SrsBandCheck() { - if (bandCheck_Client) { - srs_freep(bandCheck_Client); - } + srs_freep(bandCheck_Client); + srs_freep(skt); + srs_close_stfd(stfd); } int SrsBandCheck::check(const std::string &app, const std::string &tcUrl) @@ -698,14 +692,15 @@ int SrsBandCheck::connect_server() return ret; } - st_netfd_t stfd = st_netfd_open_socket(sock); + stfd = st_netfd_open_socket(sock); if(stfd == NULL){ ret = ERROR_ST_OPEN_SOCKET; srs_error("st_netfd_open_socket failed. ret=%d", ret); return ret; } - bandCheck_Client = new SrsBandCheckClient(stfd); + skt = new SrsSocket(stfd); + bandCheck_Client = new SrsBandCheckClient(skt); // connect to server. std::string ip = srs_dns_resolve(server_address); @@ -763,7 +758,7 @@ void print_help(char** argv) " -h, --help display this help and exit \n" "\n" "For example:\n" - " %s -i 192.168.1.248 -p 1935 -v bandcheck.srs.com -k 35c9b402c12a7246868752e2878f7e0e" + " %s -i 127.0.0.1 -p 1935 -v bandcheck.srs.com -k 35c9b402c12a7246868752e2878f7e0e" "\n\n" "Exit status:\n" "0 if OK,\n" diff --git a/trunk/src/protocol/srs_protocol_io.cpp b/trunk/src/protocol/srs_protocol_io.cpp new file mode 100644 index 000000000..5ddb8854d --- /dev/null +++ b/trunk/src/protocol/srs_protocol_io.cpp @@ -0,0 +1,48 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2014 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +ISrsProtocolReader::ISrsProtocolReader() +{ +} + +ISrsProtocolReader::~ISrsProtocolReader() +{ +} + +ISrsProtocolWriter::ISrsProtocolWriter() +{ +} + +ISrsProtocolWriter::~ISrsProtocolWriter() +{ +} + +ISrsProtocolReaderWriter::ISrsProtocolReaderWriter() +{ +} + +ISrsProtocolReaderWriter::~ISrsProtocolReaderWriter() +{ +} diff --git a/trunk/src/protocol/srs_protocol_io.hpp b/trunk/src/protocol/srs_protocol_io.hpp new file mode 100644 index 000000000..48a4e5ea4 --- /dev/null +++ b/trunk/src/protocol/srs_protocol_io.hpp @@ -0,0 +1,87 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2014 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_PROTOCOL_IO_HPP +#define SRS_PROTOCOL_IO_HPP + +/* +#include +*/ + +#include + +#include + +#include + +/** +* the reader for the protocol to read from whatever channel. +*/ +class ISrsProtocolReader : public ISrsBufferReader +{ +public: + ISrsProtocolReader(); + virtual ~ISrsProtocolReader(); +// for protocol +public: + virtual void set_recv_timeout(int64_t timeout_us) = 0; + virtual int64_t get_recv_timeout() = 0; + virtual int64_t get_recv_bytes() = 0; + virtual int get_recv_kbps() = 0; +}; + +/** +* the writer for the protocol to write to whatever channel. +*/ +class ISrsProtocolWriter +{ +public: + ISrsProtocolWriter(); + virtual ~ISrsProtocolWriter(); +// for protocol +public: + virtual void set_send_timeout(int64_t timeout_us) = 0; + virtual int64_t get_send_timeout() = 0; + virtual int64_t get_send_bytes() = 0; + virtual int get_send_kbps() = 0; + virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite) = 0; +}; + +class ISrsProtocolReaderWriter : public ISrsProtocolReader, public ISrsProtocolWriter +{ +public: + ISrsProtocolReaderWriter(); + virtual ~ISrsProtocolReaderWriter(); +// for protocol +public: + /** + * whether the specified timeout_us is never timeout. + */ + virtual bool is_never_timeout(int64_t timeout_us) = 0; +// for handshake. +public: + virtual int read_fully(const void* buf, size_t size, ssize_t* nread) = 0; + virtual int write(const void* buf, size_t size, ssize_t* nwrite) = 0; +}; + +#endif diff --git a/trunk/src/srs/srs.upp b/trunk/src/srs/srs.upp index 48bbd433b..126a5a7ad 100755 --- a/trunk/src/srs/srs.upp +++ b/trunk/src/srs/srs.upp @@ -27,6 +27,8 @@ file protocol readonly separator, ..\protocol\srs_protocol_amf0.hpp, ..\protocol\srs_protocol_amf0.cpp, + ..\protocol\srs_protocol_io.hpp, + ..\protocol\srs_protocol_io.cpp, app readonly separator, ..\app\srs_core_bandwidth.hpp, ..\app\srs_core_bandwidth.cpp,