diff --git a/README.md b/README.md
index b0b884888..b11b60905 100755
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@
[
](https://github.com/ossrs/srs/wiki/v1_EN_Contact#skype-or-gitter)
SRS/3.0,[OuXuli][release3],是一个简单的流媒体直播集群,简单的快乐。
-SRS, a simple joy, created on 2013.10, is a simple live streaming cluster.
+SRS is a simple live streaming cluster, a simple joy.
Download binaries from github.io: [Centos6-x86_64][centos0], [more...][more0]
Download binaries from ossrs.net: [Centos6-x86_64][centos1], [more...][more1]
diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp
index 077b0f3c9..59d436a2c 100644
--- a/trunk/src/app/srs_app_hls.cpp
+++ b/trunk/src/app/srs_app_hls.cpp
@@ -462,6 +462,9 @@ srs_error_t SrsHlsMuxer::segment_open()
if ((err = current->writer->open(tmp_file)) != srs_success) {
return srs_error_wrap(err, "open hls muxer");
}
+
+ // reset the context for a new ts start.
+ context->reset();
return err;
}
diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp
index cb95ac738..5d7535f1c 100644
--- a/trunk/src/app/srs_app_hls.hpp
+++ b/trunk/src/app/srs_app_hls.hpp
@@ -157,7 +157,8 @@ private:
// The key and iv.
unsigned char key[16];
unsigned char iv[16];
- SrsFileWriter *writer;
+ // The underlayer file writer.
+ SrsFileWriter* writer;
private:
int _sequence_no;
srs_utime_t max_td;
diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp
index a88a83d5a..1468272a0 100755
--- a/trunk/src/app/srs_app_listener.cpp
+++ b/trunk/src/app/srs_app_listener.cpp
@@ -46,9 +46,6 @@ using namespace std;
// sleep in srs_utime_t for udp recv packet.
#define SrsUdpPacketRecvCycleInterval 0
-// nginx also set to 512
-#define SERVER_LISTEN_BACKLOG 512
-
ISrsUdpHandler::ISrsUdpHandler()
{
}
@@ -75,9 +72,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
handler = h;
ip = i;
port = p;
-
- _fd = -1;
- _stfd = NULL;
+ lfd = NULL;
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];
@@ -87,60 +82,27 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
SrsUdpListener::~SrsUdpListener()
{
- // close the stfd to trigger thread to interrupted.
- srs_close_stfd(_stfd);
-
srs_freep(trd);
-
- // st does not close it sometimes,
- // close it manually.
- close(_fd);
-
+ srs_close_stfd(lfd);
srs_freepa(buf);
}
int SrsUdpListener::fd()
{
- return _fd;
+ return srs_netfd_fileno(lfd);
}
srs_netfd_t SrsUdpListener::stfd()
{
- return _stfd;
+ return lfd;
}
srs_error_t SrsUdpListener::listen()
{
srs_error_t err = srs_success;
-
- char sport[8];
- snprintf(sport, sizeof(sport), "%d", port);
-
- addrinfo hints;
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_DGRAM;
- hints.ai_flags = AI_NUMERICHOST;
-
- addrinfo* r = NULL;
- SrsAutoFree(addrinfo, r);
- if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
- return srs_error_new(ERROR_SYSTEM_IP_INVALID, "get address info");
- }
-
- if ((_fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
- return srs_error_new(ERROR_SOCKET_CREATE, "create socket. ip=%s, port=%d", ip.c_str(), port);
- }
- srs_fd_close_exec(_fd);
- srs_socket_reuse_addr(_fd);
-
- if (bind(_fd, r->ai_addr, r->ai_addrlen) == -1) {
- return srs_error_new(ERROR_SOCKET_BIND, "bind socket. ep=%s:%d", ip.c_str(), port);;
- }
-
- if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
- return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket");
+ if ((err = srs_udp_listen(ip, port, &lfd)) != srs_success) {
+ return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port);
}
srs_freep(trd);
@@ -160,12 +122,11 @@ srs_error_t SrsUdpListener::cycle()
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "udp listener");
}
-
+
+ int nread = 0;
sockaddr_storage from;
int nb_from = sizeof(from);
- int nread = 0;
-
- if ((nread = srs_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
+ if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread);
}
@@ -186,9 +147,8 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
handler = h;
ip = i;
port = p;
-
- _fd = -1;
- _stfd = NULL;
+
+ lfd = NULL;
trd = new SrsDummyCoroutine();
}
@@ -196,60 +156,20 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
SrsTcpListener::~SrsTcpListener()
{
srs_freep(trd);
-
- srs_close_stfd(_stfd);
+ srs_close_stfd(lfd);
}
int SrsTcpListener::fd()
{
- return _fd;
+ return srs_netfd_fileno(lfd);;
}
srs_error_t SrsTcpListener::listen()
{
srs_error_t err = srs_success;
-
- char sport[8];
- snprintf(sport, sizeof(sport), "%d", port);
-
- addrinfo hints;
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_NUMERICHOST;
-
- addrinfo* r = NULL;
- SrsAutoFree(addrinfo, r);
- if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
- return srs_error_new(ERROR_SYSTEM_IP_INVALID, "get address info");
- }
-
- if ((_fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
- return srs_error_new(ERROR_SOCKET_CREATE, "create socket. ip=%s, port=%d", ip.c_str(), port);
- }
-
- // Detect alive for TCP connection.
- // @see https://github.com/ossrs/srs/issues/1044
-#ifdef SO_KEEPALIVE
- int tcp_keepalive = 1;
- if (setsockopt(_fd, SOL_SOCKET, SO_KEEPALIVE, &tcp_keepalive, sizeof(int)) == -1) {
- return srs_error_new(ERROR_SOCKET_SETKEEPALIVE, "setsockopt SO_KEEPALIVE[%d]error. port=%d", tcp_keepalive, port);
- }
-#endif
- srs_fd_close_exec(_fd);
- srs_socket_reuse_addr(_fd);
-
- if (bind(_fd, r->ai_addr, r->ai_addrlen) == -1) {
- return srs_error_new(ERROR_SOCKET_BIND, "bind socket. ep=%s:%d", ip.c_str(), port);;
- }
-
- if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
- return srs_error_new(ERROR_SOCKET_LISTEN, "listen socket");
- }
-
- if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
- return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket");
+ if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) {
+ return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);
}
srs_freep(trd);
@@ -270,16 +190,17 @@ srs_error_t SrsTcpListener::cycle()
return srs_error_wrap(err, "tcp listener");
}
- srs_netfd_t cstfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
- if(cstfd == NULL){
- return srs_error_new(ERROR_SOCKET_CREATE, "accept failed");
+ srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
+ if(fd == NULL){
+ return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
}
- int cfd = srs_netfd_fileno(cstfd);
- srs_fd_close_exec(cfd);
+ if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
+ return srs_error_wrap(err, "set closeexec");
+ }
- if ((err = handler->on_tcp_client(cstfd)) != srs_success) {
- return srs_error_wrap(err, "handle fd=%d", cfd);
+ if ((err = handler->on_tcp_client(fd)) != srs_success) {
+ return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
}
}
diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp
index 538ecf1b9..99da48bda 100644
--- a/trunk/src/app/srs_app_listener.hpp
+++ b/trunk/src/app/srs_app_listener.hpp
@@ -69,8 +69,7 @@ public:
class SrsUdpListener : public ISrsCoroutineHandler
{
private:
- int _fd;
- srs_netfd_t _stfd;
+ srs_netfd_t lfd;
SrsCoroutine* trd;
private:
char* buf;
@@ -96,8 +95,7 @@ public:
class SrsTcpListener : public ISrsCoroutineHandler
{
private:
- int _fd;
- srs_netfd_t _stfd;
+ srs_netfd_t lfd;
SrsCoroutine* trd;
private:
ISrsTcpHandler* handler;
diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp
index 95ab4cd36..1875ed033 100644
--- a/trunk/src/kernel/srs_kernel_error.hpp
+++ b/trunk/src/kernel/srs_kernel_error.hpp
@@ -115,6 +115,9 @@
#define ERROR_SOCKET_NO_NODELAY 1076
#define ERROR_SOCKET_SNDBUF 1077
#define ERROR_THREAD_STARTED 1078
+#define ERROR_SOCKET_SETREUSEADDR 1079
+#define ERROR_SOCKET_SETCLOSEEXEC 1080
+#define ERROR_SOCKET_ACCEPT 1081
///////////////////////////////////////////////////////
// RTMP protocol error.
diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp
index debd9422d..f25cf20d8 100644
--- a/trunk/src/service/srs_service_st.cpp
+++ b/trunk/src/service/srs_service_st.cpp
@@ -35,6 +35,9 @@ using namespace std;
#include
#include
+// nginx also set to 512
+#define SERVER_LISTEN_BACKLOG 512
+
#ifdef __linux__
#include
@@ -86,17 +89,37 @@ void srs_close_stfd(srs_netfd_t& stfd)
}
}
-void srs_fd_close_exec(int fd)
+srs_error_t srs_fd_closeexec(int fd)
{
int flags = fcntl(fd, F_GETFD);
flags |= FD_CLOEXEC;
- fcntl(fd, F_SETFD, flags);
+ if (fcntl(fd, F_SETFD, flags) == -1) {
+ return srs_error_new(ERROR_SOCKET_SETCLOSEEXEC, "FD_CLOEXEC fd=%v", fd);
+ }
+
+ return srs_success;
}
-void srs_socket_reuse_addr(int fd)
+srs_error_t srs_fd_reuseaddr(int fd)
{
int v = 1;
- setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(int));
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(int)) == -1) {
+ return srs_error_new(ERROR_SOCKET_SETREUSEADDR, "SO_REUSEADDR fd=%v", fd);
+ }
+
+ return srs_success;
+}
+
+srs_error_t srs_fd_keepalive(int fd)
+{
+#ifdef SO_KEEPALIVE
+ int v = 1;
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &v, sizeof(int)) == -1) {
+ return srs_error_new(ERROR_SOCKET_SETKEEPALIVE, "SO_KEEPALIVE fd=%d", fd);
+ }
+#endif
+
+ return srs_success;
}
srs_thread_t srs_thread_self()
@@ -104,7 +127,7 @@ srs_thread_t srs_thread_self()
return (srs_thread_t)st_thread_self();
}
-srs_error_t srs_socket_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd)
+srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd)
{
st_utime_t timeout = ST_UTIME_NO_TIMEOUT;
if (tm != SRS_UTIME_NO_TIMEOUT) {
@@ -149,6 +172,116 @@ srs_error_t srs_socket_connect(string server, int port, srs_utime_t tm, srs_netf
return srs_success;
}
+srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd)
+{
+ srs_error_t err = srs_success;
+
+ char sport[8];
+ snprintf(sport, sizeof(sport), "%d", port);
+
+ addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_NUMERICHOST;
+
+ addrinfo* r = NULL;
+ SrsAutoFree(addrinfo, r);
+ if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
+ return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)",
+ hints.ai_family, hints.ai_socktype, hints.ai_flags);
+ }
+
+ int fd = 0;
+ if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
+ return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",
+ r->ai_family, r->ai_socktype, r->ai_protocol);
+ }
+
+ // Detect alive for TCP connection.
+ // @see https://github.com/ossrs/srs/issues/1044
+ if ((err = srs_fd_keepalive(fd)) != srs_success) {
+ ::close(fd);
+ return srs_error_wrap(err, "set keepalive fd=%d", fd);
+ }
+
+ if ((err = srs_fd_closeexec(fd)) != srs_success) {
+ ::close(fd);
+ return srs_error_wrap(err, "set closeexec fd=%d", fd);
+ }
+
+ if ((err = srs_fd_reuseaddr(fd)) != srs_success) {
+ ::close(fd);
+ return srs_error_wrap(err, "set reuseaddr fd=%d", fd);
+ }
+
+ if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) {
+ ::close(fd);
+ return srs_error_new(ERROR_SOCKET_BIND, "bind fd=%d", fd);
+ }
+
+ if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) {
+ ::close(fd);
+ return srs_error_new(ERROR_SOCKET_LISTEN, "listen fd=%d", fd);
+ }
+
+ if ((*pfd = srs_netfd_open_socket(fd)) == NULL){
+ ::close(fd);
+ return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open fd=%d", fd);
+ }
+
+ return err;
+}
+
+srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd)
+{
+ srs_error_t err = srs_success;
+
+ char sport[8];
+ snprintf(sport, sizeof(sport), "%d", port);
+
+ addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_DGRAM;
+ hints.ai_flags = AI_NUMERICHOST;
+
+ addrinfo* r = NULL;
+ SrsAutoFree(addrinfo, r);
+ if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {
+ return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)",
+ hints.ai_family, hints.ai_socktype, hints.ai_flags);
+ }
+
+ int fd = 0;
+ if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {
+ return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",
+ r->ai_family, r->ai_socktype, r->ai_protocol);
+ }
+
+ if ((err = srs_fd_closeexec(fd)) != srs_success) {
+ ::close(fd);
+ return srs_error_wrap(err, "set closeexec fd=%d", fd);
+ }
+
+ if ((err = srs_fd_reuseaddr(fd)) != srs_success) {
+ ::close(fd);
+ return srs_error_wrap(err, "set reuseaddr fd=%d", fd);
+ }
+
+ if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) {
+ ::close(fd);
+ return srs_error_new(ERROR_SOCKET_BIND, "bind fd=%d", fd);
+ }
+
+ if ((*pfd = srs_netfd_open_socket(fd)) == NULL){
+ ::close(fd);
+ return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open fd=%d", fd);
+ }
+
+ return err;
+}
+
srs_cond_t srs_cond_new()
{
return (srs_cond_t)st_cond_new();
@@ -439,7 +572,7 @@ srs_error_t SrsTcpClient::connect()
close();
srs_assert(stfd == NULL);
- if ((err = srs_socket_connect(host, port, timeout, &stfd)) != srs_success) {
+ if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) {
return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout));
}
diff --git a/trunk/src/service/srs_service_st.hpp b/trunk/src/service/srs_service_st.hpp
index 383b1323d..18b0837fe 100644
--- a/trunk/src/service/srs_service_st.hpp
+++ b/trunk/src/service/srs_service_st.hpp
@@ -44,17 +44,26 @@ extern srs_error_t srs_st_init();
extern void srs_close_stfd(srs_netfd_t& stfd);
// Set the FD_CLOEXEC of FD.
-extern void srs_fd_close_exec(int fd);
+extern srs_error_t srs_fd_closeexec(int fd);
-// Set the SO_REUSEADDR of socket.
-extern void srs_socket_reuse_addr(int fd);
+// Set the SO_REUSEADDR of fd.
+extern srs_error_t srs_fd_reuseaddr(int fd);
+
+// Set the SO_KEEPALIVE of fd.
+extern srs_error_t srs_fd_keepalive(int fd);
// Get current coroutine/thread.
extern srs_thread_t srs_thread_self();
-// client open socket and connect to server.
+// For client, to open socket and connect to server.
// @param tm The timeout in srs_utime_t.
-extern srs_error_t srs_socket_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd);
+extern srs_error_t srs_tcp_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd);
+
+// For server, listen at TCP endpoint.
+extern srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd);
+
+// For server, listen at UDP endpoint.
+extern srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd);
// Wrap for coroutine.
extern srs_cond_t srs_cond_new();
diff --git a/trunk/src/utest/srs_utest_app.cpp b/trunk/src/utest/srs_utest_app.cpp
index 28f9fcd2f..f3b8e759a 100644
--- a/trunk/src/utest/srs_utest_app.cpp
+++ b/trunk/src/utest/srs_utest_app.cpp
@@ -377,3 +377,9 @@ VOID TEST(AppFragmentTest, CheckDuration)
}
}
+#define MOCK_LISTEN_PORT 11935
+
+VOID TEST(TCPServerTest, PingPong)
+{
+}
+