diff --git a/trunk/src/libs/srs_lib_simple_socket.cpp b/trunk/src/libs/srs_lib_simple_socket.cpp index df2373d6a..50715e14f 100644 --- a/trunk/src/libs/srs_lib_simple_socket.cpp +++ b/trunk/src/libs/srs_lib_simple_socket.cpp @@ -30,10 +30,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include +#include + +#ifndef ST_UTIME_NO_TIMEOUT + #define ST_UTIME_NO_TIMEOUT -1 +#endif SimpleSocketStream::SimpleSocketStream() { fd = -1; + send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT; + recv_bytes = send_bytes = 0; + + srs_update_system_time_ms(); + start_time_ms = srs_get_system_time_ms(); } SimpleSocketStream::~SimpleSocketStream() @@ -71,70 +82,149 @@ int SimpleSocketStream::connect(const char* server_ip, int port) int SimpleSocketStream::read(const void* buf, size_t size, ssize_t* nread) { int ret = ERROR_SUCCESS; + + *nread = ::recv(fd, (void*)buf, size, 0); + + // On success a non-negative integer indicating the number of bytes actually read is returned + // (a value of 0 means the network connection is closed or end of file is reached). + if (*nread <= 0) { + if (errno == ETIME) { + return ERROR_SOCKET_TIMEOUT; + } + + if (*nread == 0) { + errno = ECONNRESET; + } + + return ERROR_SOCKET_READ; + } + + recv_bytes += *nread; + return ret; } // ISrsProtocolReader void SimpleSocketStream::set_recv_timeout(int64_t timeout_us) { + recv_timeout = timeout_us; } int64_t SimpleSocketStream::get_recv_timeout() { - return -1; + return recv_timeout; } int64_t SimpleSocketStream::get_recv_bytes() { - return 0; + return recv_bytes; } int SimpleSocketStream::get_recv_kbps() { - return 0; + srs_update_system_time_ms(); + int64_t diff_ms = srs_get_system_time_ms() - start_time_ms; + + if (diff_ms <= 0) { + return 0; + } + + return recv_bytes * 8 / diff_ms; } // ISrsProtocolWriter void SimpleSocketStream::set_send_timeout(int64_t timeout_us) { + send_timeout = timeout_us; } int64_t SimpleSocketStream::get_send_timeout() { - return -1; + return send_timeout; } int64_t SimpleSocketStream::get_send_bytes() { - return 0; + return send_bytes; } int SimpleSocketStream::get_send_kbps() { - return 0; + srs_update_system_time_ms(); + int64_t diff_ms = srs_get_system_time_ms() - start_time_ms; + + if (diff_ms <= 0) { + return 0; + } + + return send_bytes * 8 / diff_ms; } int SimpleSocketStream::writev(const iovec *iov, int iov_size, ssize_t* nwrite) { int ret = ERROR_SUCCESS; + + *nwrite = ::writev(fd, iov, iov_size); + + if (*nwrite <= 0) { + if (errno == ETIME) { + return ERROR_SOCKET_TIMEOUT; + } + + return ERROR_SOCKET_WRITE; + } + + send_bytes += *nwrite; + return ret; } // ISrsProtocolReaderWriter bool SimpleSocketStream::is_never_timeout(int64_t timeout_us) { - return true; + return timeout_us == (int64_t)ST_UTIME_NO_TIMEOUT; } int SimpleSocketStream::read_fully(const void* buf, size_t size, ssize_t* nread) { int ret = ERROR_SUCCESS; + + size_t left = size; + *nread = 0; + + while (left > 0) { + char* this_buf = (char*)buf + *nread; + ssize_t this_nread; + + if ((ret = this->read(this_buf, left, &this_nread)) != ERROR_SUCCESS) { + return ret; + } + + *nread += this_nread; + left -= this_nread; + } + + recv_bytes += *nread; + return ret; } int SimpleSocketStream::write(const void* buf, size_t size, ssize_t* nwrite) { int ret = ERROR_SUCCESS; + + *nwrite = ::send(fd, (void*)buf, size, 0); + + if (*nwrite <= 0) { + if (errno == ETIME) { + return ERROR_SOCKET_TIMEOUT; + } + + return ERROR_SOCKET_WRITE; + } + + send_bytes += *nwrite; + return ret; } diff --git a/trunk/src/libs/srs_lib_simple_socket.hpp b/trunk/src/libs/srs_lib_simple_socket.hpp index 99034e49d..a2740afc0 100644 --- a/trunk/src/libs/srs_lib_simple_socket.hpp +++ b/trunk/src/libs/srs_lib_simple_socket.hpp @@ -39,6 +39,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SimpleSocketStream : public ISrsProtocolReaderWriter { private: + int64_t start_time_ms; + int64_t recv_timeout; + int64_t send_timeout; + int64_t recv_bytes; + int64_t send_bytes; int fd; public: SimpleSocketStream();