From 3584bdb7b66599ef1e2d19189cd0448eadc37027 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 2 Dec 2014 12:19:17 +0800 Subject: [PATCH] srs-librtmp support hijack io apis for st-load. 2.0.42. --- README.md | 1 + trunk/src/core/srs_core.hpp | 2 +- trunk/src/libs/srs_lib_simple_socket.cpp | 357 +++++++++++++++-------- trunk/src/libs/srs_lib_simple_socket.hpp | 8 +- trunk/src/libs/srs_librtmp.cpp | 2 +- trunk/src/libs/srs_librtmp.hpp | 87 ++++++ 6 files changed, 329 insertions(+), 128 deletions(-) diff --git a/README.md b/README.md index 25894e65e..1fcdd58cf 100755 --- a/README.md +++ b/README.md @@ -485,6 +485,7 @@ Supported operating systems and hardware: * 2013-10-17, Created.
## History +* v2.0, 2014-12-02, srs-librtmp support hijack io apis for st-load. 2.0.42. * v2.0, 2014-12-01, for [#237](https://github.com/winlinvip/simple-rtmp-server/issues/237), refine syscall for recv, supports 1.5k clients. 2.0.41. * v2.0, 2014-11-30, add qtcreate project file trunk/src/qt/srs/srs-qt.pro. 2.0.39. * v2.0, 2014-11-29, fix [#235](https://github.com/winlinvip/simple-rtmp-server/issues/235), refine handshake, replace union with template method. 2.0.38. diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index cb506c223..83f4778e5 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 41 +#define VERSION_REVISION 42 // server info. #define RTMP_SIG_SRS_KEY "SRS" #define RTMP_SIG_SRS_ROLE "origin/edge server" diff --git a/trunk/src/libs/srs_lib_simple_socket.cpp b/trunk/src/libs/srs_lib_simple_socket.cpp index 08096e714..1725b1505 100644 --- a/trunk/src/libs/srs_lib_simple_socket.cpp +++ b/trunk/src/libs/srs_lib_simple_socket.cpp @@ -43,188 +43,305 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ST_UTIME_NO_TIMEOUT -1 #endif +// when io not hijacked, use simple socket, the block sync stream. +#ifndef SRS_HIJACK_IO + struct SrsBlockSyncSocket + { + SOCKET fd; + int64_t recv_timeout; + int64_t send_timeout; + int64_t recv_bytes; + int64_t send_bytes; + + SrsBlockSyncSocket() { + send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT; + recv_bytes = send_bytes = 0; + + SOCKET_RESET(fd); + SOCKET_SETUP(); + } + + virtual ~SrsBlockSyncSocket() { + SOCKET_CLOSE(fd); + SOCKET_CLEANUP(); + } + }; + srs_hijack_io_t srs_hijack_io_create() + { + SrsBlockSyncSocket* skt = new SrsBlockSyncSocket(); + return skt; + } + void srs_hijack_io_destroy(srs_hijack_io_t ctx) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + srs_freep(skt); + } + int srs_hijack_io_create_socket(srs_hijack_io_t ctx) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + + skt->fd = ::socket(AF_INET, SOCK_STREAM, 0); + if (!SOCKET_VALID(skt->fd)) { + return ERROR_SOCKET_CREATE; + } + + return ERROR_SUCCESS; + } + int srs_hijack_io_connect(srs_hijack_io_t ctx, const char* server_ip, int port) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(server_ip); + + if(::connect(skt->fd, (const struct sockaddr*)&addr, sizeof(sockaddr_in)) < 0){ + return ERROR_SOCKET_CONNECT; + } + + return ERROR_SUCCESS; + } + int srs_hijack_io_read(srs_hijack_io_t ctx, void* buf, size_t size, ssize_t* nread) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + + int ret = ERROR_SUCCESS; + + ssize_t nb_read = ::recv(skt->fd, (char*)buf, size, 0); + + if (nread) { + *nread = nb_read; + } + + // On success a non-negative integer indicating the number of bytes actually read is returned + // (a value of 0 means the network connection is closed or end of file is reached). + if (nb_read <= 0) { + if (nb_read < 0 && SOCKET_ERRNO() == SOCKET_ETIME) { + return ERROR_SOCKET_TIMEOUT; + } + + if (nb_read == 0) { + errno = SOCKET_ECONNRESET; + } + + return ERROR_SOCKET_READ; + } + + skt->recv_bytes += nb_read; + + return ret; + } + void srs_hijack_io_set_recv_timeout(srs_hijack_io_t ctx, int64_t timeout_us) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + skt->recv_timeout = timeout_us; + } + int64_t srs_hijack_io_get_recv_timeout(srs_hijack_io_t ctx) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + return skt->recv_timeout; + } + int64_t srs_hijack_io_get_recv_bytes(srs_hijack_io_t ctx) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + return skt->recv_bytes; + } + void srs_hijack_io_set_send_timeout(srs_hijack_io_t ctx, int64_t timeout_us) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + skt->send_timeout = timeout_us; + } + int64_t srs_hijack_io_get_send_timeout(srs_hijack_io_t ctx) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + return skt->send_timeout; + } + int64_t srs_hijack_io_get_send_bytes(srs_hijack_io_t ctx) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + return skt->send_bytes; + } + int srs_hijack_io_writev(srs_hijack_io_t ctx, const iovec *iov, int iov_size, ssize_t* nwrite) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + + int ret = ERROR_SUCCESS; + + ssize_t nb_write = ::writev(skt->fd, iov, iov_size); + + if (nwrite) { + *nwrite = nb_write; + } + + // On success, the readv() function returns the number of bytes read; + // the writev() function returns the number of bytes written. On error, -1 is + // returned, and errno is set appropriately. + if (nb_write <= 0) { + // @see https://github.com/winlinvip/simple-rtmp-server/issues/200 + if (nb_write < 0 && SOCKET_ERRNO() == SOCKET_ETIME) { + return ERROR_SOCKET_TIMEOUT; + } + + return ERROR_SOCKET_WRITE; + } + + skt->send_bytes += nb_write; + + return ret; + } + bool srs_hijack_io_is_never_timeout(srs_hijack_io_t ctx, int64_t timeout_us) + { + return timeout_us == (int64_t)ST_UTIME_NO_TIMEOUT; + } + int srs_hijack_io_read_fully(srs_hijack_io_t ctx, void* buf, size_t size, ssize_t* nread) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + + int ret = ERROR_SUCCESS; + + size_t left = size; + ssize_t nb_read = 0; + + while (left > 0) { + char* this_buf = (char*)buf + nb_read; + ssize_t this_nread; + + if ((ret = srs_hijack_io_read(ctx, this_buf, left, &this_nread)) != ERROR_SUCCESS) { + return ret; + } + + nb_read += this_nread; + left -= (size_t)this_nread; + } + + if (nread) { + *nread = nb_read; + } + skt->recv_bytes += nb_read; + + return ret; + } + int srs_hijack_io_write(srs_hijack_io_t ctx, void* buf, size_t size, ssize_t* nwrite) + { + SrsBlockSyncSocket* skt = (SrsBlockSyncSocket*)ctx; + + int ret = ERROR_SUCCESS; + + ssize_t nb_write = ::send(skt->fd, (char*)buf, size, 0); + + if (nwrite) { + *nwrite = nb_write; + } + + if (nb_write <= 0) { + // @see https://github.com/winlinvip/simple-rtmp-server/issues/200 + if (nb_write < 0 && SOCKET_ERRNO() == SOCKET_ETIME) { + return ERROR_SOCKET_TIMEOUT; + } + + return ERROR_SOCKET_WRITE; + } + + skt->send_bytes += nb_write; + + return ret; + } +#endif + SimpleSocketStream::SimpleSocketStream() { - SOCKET_RESET(fd); - send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT; - recv_bytes = send_bytes = 0; - SOCKET_SETUP(); + io = srs_hijack_io_create(); } SimpleSocketStream::~SimpleSocketStream() { - SOCKET_CLOSE(fd); - SOCKET_CLEANUP(); + if (io) { + srs_hijack_io_destroy(io); + io = NULL; + } } int SimpleSocketStream::create_socket() { - fd = ::socket(AF_INET, SOCK_STREAM, 0); - if (!SOCKET_VALID(fd)) { - return ERROR_SOCKET_CREATE; - } - - return ERROR_SUCCESS; + srs_assert(io); + return srs_hijack_io_create_socket(io); } int SimpleSocketStream::connect(const char* server_ip, int port) { - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(server_ip); - - if(::connect(fd, (const struct sockaddr*)&addr, sizeof(sockaddr_in)) < 0){ - return ERROR_SOCKET_CONNECT; - } - - return ERROR_SUCCESS; + srs_assert(io); + return srs_hijack_io_connect(io, server_ip, port); } // ISrsBufferReader int SimpleSocketStream::read(void* buf, size_t size, ssize_t* nread) { - int ret = ERROR_SUCCESS; - - ssize_t nb_read = ::recv(fd, (char*)buf, size, 0); - - if (nread) { - *nread = nb_read; - } - - // On success a non-negative integer indicating the number of bytes actually read is returned - // (a value of 0 means the network connection is closed or end of file is reached). - if (nb_read <= 0) { - if (nb_read < 0 && SOCKET_ERRNO() == SOCKET_ETIME) { - return ERROR_SOCKET_TIMEOUT; - } - - if (nb_read == 0) { - errno = SOCKET_ECONNRESET; - } - - return ERROR_SOCKET_READ; - } - - recv_bytes += nb_read; - - return ret; + srs_assert(io); + return srs_hijack_io_read(io, buf, size, nread); } // ISrsProtocolReader void SimpleSocketStream::set_recv_timeout(int64_t timeout_us) { - recv_timeout = timeout_us; + srs_assert(io); + srs_hijack_io_set_recv_timeout(io, timeout_us); } int64_t SimpleSocketStream::get_recv_timeout() { - return recv_timeout; + srs_assert(io); + return srs_hijack_io_get_recv_timeout(io); } int64_t SimpleSocketStream::get_recv_bytes() { - return recv_bytes; + srs_assert(io); + return srs_hijack_io_get_recv_bytes(io); } // ISrsProtocolWriter void SimpleSocketStream::set_send_timeout(int64_t timeout_us) { - send_timeout = timeout_us; + srs_assert(io); + srs_hijack_io_set_send_timeout(io, timeout_us); } int64_t SimpleSocketStream::get_send_timeout() { - return send_timeout; + srs_assert(io); + return srs_hijack_io_get_send_timeout(io); } int64_t SimpleSocketStream::get_send_bytes() { - return send_bytes; + srs_assert(io); + return srs_hijack_io_get_send_bytes(io); } int SimpleSocketStream::writev(const iovec *iov, int iov_size, ssize_t* nwrite) { - int ret = ERROR_SUCCESS; - - ssize_t nb_write = ::writev(fd, iov, iov_size); - - if (nwrite) { - *nwrite = nb_write; - } - - // On success, the readv() function returns the number of bytes read; - // the writev() function returns the number of bytes written. On error, -1 is - // returned, and errno is set appropriately. - if (nb_write <= 0) { - // @see https://github.com/winlinvip/simple-rtmp-server/issues/200 - if (nb_write < 0 && SOCKET_ERRNO() == SOCKET_ETIME) { - return ERROR_SOCKET_TIMEOUT; - } - - return ERROR_SOCKET_WRITE; - } - - send_bytes += nb_write; - - return ret; + srs_assert(io); + return srs_hijack_io_writev(io, iov, iov_size, nwrite); } // ISrsProtocolReaderWriter bool SimpleSocketStream::is_never_timeout(int64_t timeout_us) { - return timeout_us == (int64_t)ST_UTIME_NO_TIMEOUT; + srs_assert(io); + return srs_hijack_io_is_never_timeout(io, timeout_us); } int SimpleSocketStream::read_fully(void* buf, size_t size, ssize_t* nread) { - int ret = ERROR_SUCCESS; - - size_t left = size; - ssize_t nb_read = 0; - - while (left > 0) { - char* this_buf = (char*)buf + nb_read; - ssize_t this_nread; - - if ((ret = this->read(this_buf, left, &this_nread)) != ERROR_SUCCESS) { - return ret; - } - - nb_read += this_nread; - left -= (size_t)this_nread; - } - - if (nread) { - *nread = nb_read; - } - recv_bytes += nb_read; - - return ret; + srs_assert(io); + return srs_hijack_io_read_fully(io, buf, size, nread); } int SimpleSocketStream::write(void* buf, size_t size, ssize_t* nwrite) { - int ret = ERROR_SUCCESS; - - ssize_t nb_write = ::send(fd, (char*)buf, size, 0); - - if (nwrite) { - *nwrite = nb_write; - } - - if (nb_write <= 0) { - // @see https://github.com/winlinvip/simple-rtmp-server/issues/200 - if (nb_write < 0 && SOCKET_ERRNO() == SOCKET_ETIME) { - return ERROR_SOCKET_TIMEOUT; - } - - return ERROR_SOCKET_WRITE; - } - - send_bytes += nb_write; - - return ret; + srs_assert(io); + return srs_hijack_io_write(io, buf, size, nwrite); } diff --git a/trunk/src/libs/srs_lib_simple_socket.hpp b/trunk/src/libs/srs_lib_simple_socket.hpp index dca0c45b2..438a27d32 100644 --- a/trunk/src/libs/srs_lib_simple_socket.hpp +++ b/trunk/src/libs/srs_lib_simple_socket.hpp @@ -32,7 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include - + /** * simple socket stream, * use tcp socket, sync block mode, for client like srs-librtmp. @@ -40,11 +40,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SimpleSocketStream : public ISrsProtocolReaderWriter { private: - int64_t recv_timeout; - int64_t send_timeout; - int64_t recv_bytes; - int64_t send_bytes; - SOCKET fd; + srs_hijack_io_t io; public: SimpleSocketStream(); virtual ~SimpleSocketStream(); diff --git a/trunk/src/libs/srs_librtmp.cpp b/trunk/src/libs/srs_librtmp.cpp index 04d307d9d..d552814f0 100644 --- a/trunk/src/libs/srs_librtmp.cpp +++ b/trunk/src/libs/srs_librtmp.cpp @@ -49,7 +49,7 @@ using namespace std; #include // if want to use your log, define the folowing macro. -#ifndef SRS_RTMP_USER_DEFINED_LOG +#ifndef SRS_HIJACK_LOG // kernel module. ISrsLog* _srs_log = new ISrsLog(); ISrsThreadContext* _srs_context = new ISrsThreadContext(); diff --git a/trunk/src/libs/srs_librtmp.hpp b/trunk/src/libs/srs_librtmp.hpp index 8cb20b5a1..8b90d816f 100644 --- a/trunk/src/libs/srs_librtmp.hpp +++ b/trunk/src/libs/srs_librtmp.hpp @@ -930,6 +930,93 @@ extern const char* srs_human_format_time(); #define srs_human_verbose(msg, ...) printf("[%s] ", srs_human_format_time());printf(msg, ##__VA_ARGS__);printf("\n") #define srs_human_raw(msg, ...) printf(msg, ##__VA_ARGS__) +/************************************************************* +************************************************************** +* IO hijack, use your specified io functions. +************************************************************** +*************************************************************/ +// the void* will convert to your handler for io hijack. +typedef void* srs_hijack_io_t; +// define the following macro and functions in your module to hijack the io. +// the example @see https://github.com/winlinvip/st-load +// which use librtmp but use its own io(use st also). +#ifdef SRS_HIJACK_IO + /** + * create hijack. + * @return NULL for error; otherwise, ok. + */ + extern srs_hijack_io_t srs_hijack_io_create(); + /** + * destroy the context, user must close the socket. + */ + extern void srs_hijack_io_destroy(srs_hijack_io_t ctx); + /** + * create socket, not connect yet. + * @return 0, success; otherswise, failed. + */ + extern int srs_hijack_io_create_socket(srs_hijack_io_t ctx); + /** + * connect socket at server_ip:port. + * @return 0, success; otherswise, failed. + */ + extern int srs_hijack_io_connect(srs_hijack_io_t ctx, const char* server_ip, int port); + /** + * read from socket. + * @return 0, success; otherswise, failed. + */ + extern int srs_hijack_io_read(srs_hijack_io_t ctx, void* buf, size_t size, ssize_t* nread); + /** + * set the socket recv timeout. + * @return 0, success; otherswise, failed. + */ + extern void srs_hijack_io_set_recv_timeout(srs_hijack_io_t ctx, int64_t timeout_us); + /** + * get the socket recv timeout. + * @return 0, success; otherswise, failed. + */ + extern int64_t srs_hijack_io_get_recv_timeout(srs_hijack_io_t ctx); + /** + * get the socket recv bytes. + * @return 0, success; otherswise, failed. + */ + extern int64_t srs_hijack_io_get_recv_bytes(srs_hijack_io_t ctx); + /** + * set the socket send timeout. + * @return 0, success; otherswise, failed. + */ + extern void srs_hijack_io_set_send_timeout(srs_hijack_io_t ctx, int64_t timeout_us); + /** + * get the socket send timeout. + * @return 0, success; otherswise, failed. + */ + extern int64_t srs_hijack_io_get_send_timeout(srs_hijack_io_t ctx); + /** + * get the socket send bytes. + * @return 0, success; otherswise, failed. + */ + extern int64_t srs_hijack_io_get_send_bytes(srs_hijack_io_t ctx); + /** + * writev of socket. + * @return 0, success; otherswise, failed. + */ + extern int srs_hijack_io_writev(srs_hijack_io_t ctx, const iovec *iov, int iov_size, ssize_t* nwrite); + /** + * whether the timeout is never timeout. + * @return 0, success; otherswise, failed. + */ + extern bool srs_hijack_io_is_never_timeout(srs_hijack_io_t ctx, int64_t timeout_us); + /** + * read fully, fill the buf exactly size bytes. + * @return 0, success; otherswise, failed. + */ + extern int srs_hijack_io_read_fully(srs_hijack_io_t ctx, void* buf, size_t size, ssize_t* nread); + /** + * write bytes to socket. + * @return 0, success; otherswise, failed. + */ + extern int srs_hijack_io_write(srs_hijack_io_t ctx, void* buf, size_t size, ssize_t* nwrite); +#endif + #ifdef __cplusplus } #endif