1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

for #133, rtsp extract tcp/udp listener.

This commit is contained in:
winlin 2015-02-17 16:28:28 +08:00
parent 4807f7850d
commit 40fbfd8560
17 changed files with 876 additions and 300 deletions

View file

@ -24,8 +24,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_server.hpp>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
@ -51,15 +49,6 @@ using namespace std;
// signal defines.
#define SIGNAL_RELOAD SIGHUP
// nginx also set to 512
#define SERVER_LISTEN_BACKLOG 512
// sleep in ms for udp recv packet.
#define SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS 0
// set the max packet size.
#define SRS_UDP_MAX_PACKET_SIZE 65535
// system interval in ms,
// all resolution times should be times togother,
// for example, system-interval is x=1s(1000ms),
@ -122,26 +111,13 @@ std::string __srs_listener_type2string(SrsListenerType type)
SrsListener::SrsListener(SrsServer* server, SrsListenerType type)
{
fd = -1;
stfd = NULL;
_port = 0;
_server = server;
_type = type;
pthread = new SrsThread("listen", this, 0, true);
}
SrsListener::~SrsListener()
{
srs_close_stfd(stfd);
pthread->stop();
srs_freep(pthread);
// st does not close it sometimes,
// close it manually.
close(fd);
}
SrsListenerType SrsListener::type()
@ -149,92 +125,55 @@ SrsListenerType SrsListener::type()
return _type;
}
int SrsListener::listen(int port)
SrsStreamListener::SrsStreamListener(SrsServer* server, SrsListenerType type) : SrsListener(server, type)
{
listener = NULL;
}
SrsStreamListener::~SrsStreamListener()
{
srs_freep(listener);
}
int SrsStreamListener::listen(int port)
{
int ret = ERROR_SUCCESS;
_port = port;
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
ret = ERROR_SOCKET_CREATE;
srs_error("create linux socket error. port=%d, ret=%d", port, ret);
srs_freep(listener);
listener = new SrsTcpListener(this, port);
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("tcp listen failed. ret=%d", ret);
return ret;
}
srs_verbose("create linux socket success. port=%d, fd=%d", port, fd);
int reuse_socket = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
ret = ERROR_SOCKET_SETREUSE;
srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, fd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(_port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
ret = ERROR_SOCKET_BIND;
srs_error("bind socket error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("bind socket success. port=%d, fd=%d", port, fd);
if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) {
ret = ERROR_SOCKET_LISTEN;
srs_error("listen socket error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("listen socket success. port=%d, fd=%d", port, fd);
if ((stfd = st_netfd_open_socket(fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("st open socket success. port=%d, fd=%d", port, fd);
if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create listen thread error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("create st listen thread success, port=%d", port);
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, port=%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, port);
srs_trace("%s listen at tcp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, fd);
srs_trace("%s listen at tcp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, listener->fd());
return ret;
}
int SrsListener::cycle()
int SrsStreamListener::on_tcp_client(st_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
if(client_stfd == NULL){
// ignore error.
srs_error("ignore accept thread stoppped for accept client error");
return ret;
}
srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
if ((ret = _server->accept_client(_type, client_stfd)) != ERROR_SUCCESS) {
if ((ret = _server->accept_client(_type, stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
}
return ret;
}
#ifdef SRS_AUTO_STREAM_CASTER
SrsRtspListener::SrsRtspListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type)
{
_type = type;
listener = NULL;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
@ -247,34 +186,52 @@ SrsRtspListener::SrsRtspListener(SrsServer* server, SrsListenerType type, SrsCon
SrsRtspListener::~SrsRtspListener()
{
srs_freep(caster);
srs_freep(listener);
}
int SrsRtspListener::cycle()
int SrsRtspListener::listen(int port)
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerRtsp);
st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
if(client_stfd == NULL){
// ignore error.
srs_error("ignore accept thread stoppped for accept client error");
return ret;
}
srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
if ((ret = caster->serve_client(client_stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
_port = port;
srs_freep(listener);
listener = new SrsTcpListener(this, port);
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("udp caster listen failed. ret=%d", ret);
return ret;
}
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, port=%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, port);
srs_trace("%s listen at tcp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, listener->fd());
return ret;
}
SrsUdpListener::SrsUdpListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type)
int SrsRtspListener::on_tcp_client(st_netfd_t stfd)
{
int ret = ERROR_SUCCESS;
if ((ret = caster->serve_client(stfd)) != ERROR_SUCCESS) {
srs_warn("accept client error. ret=%d", ret);
return ret;
}
return ret;
}
SrsUdpCasterListener::SrsUdpCasterListener(SrsServer* server, SrsListenerType type, SrsConfDirective* c) : SrsListener(server, type)
{
_type = type;
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];
listener = NULL;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
@ -284,13 +241,13 @@ SrsUdpListener::SrsUdpListener(SrsServer* server, SrsListenerType type, SrsConfD
}
}
SrsUdpListener::~SrsUdpListener()
SrsUdpCasterListener::~SrsUdpCasterListener()
{
srs_freep(caster);
srs_freep(buf);
srs_freep(listener);
}
int SrsUdpListener::listen(int port)
int SrsUdpCasterListener::listen(int port)
{
int ret = ERROR_SUCCESS;
@ -299,83 +256,20 @@ int SrsUdpListener::listen(int port)
srs_assert(_type == SrsListenerMpegTsOverUdp);
_port = port;
if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
ret = ERROR_SOCKET_CREATE;
srs_error("create linux socket error. port=%d, ret=%d", port, ret);
srs_freep(listener);
listener = new SrsUdpListener(caster, port);
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("udp caster listen failed. ret=%d", ret);
return ret;
}
srs_verbose("create linux socket success. port=%d, fd=%d", port, fd);
int reuse_socket = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
ret = ERROR_SOCKET_SETREUSE;
srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, fd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(_port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
ret = ERROR_SOCKET_BIND;
srs_error("bind socket error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("bind socket success. port=%d, fd=%d", port, fd);
if ((stfd = st_netfd_open_socket(fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("st open socket success. port=%d, fd=%d", port, fd);
if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create listen thread error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("create st listen thread success, port=%d", port);
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, port=%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, port);
srs_trace("%s listen at udp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, fd);
return ret;
}
int SrsUdpListener::cycle()
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerMpegTsOverUdp);
for (;;) {
// TODO: FIXME: support ipv6, @see man 7 ipv6
sockaddr_in from;
int nb_from = sizeof(sockaddr_in);
int nread = 0;
if ((nread = st_recvfrom(stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) {
srs_warn("ignore recv udp packet failed, nread=%d", nread);
continue;
}
if ((ret = caster->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) {
srs_warn("handle udp packet failed. ret=%d", ret);
continue;
}
if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) {
st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000);
}
}
srs_trace("%s listen at udp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, listener->fd());
return ret;
}
@ -992,7 +886,7 @@ int SrsServer::listen_rtmp()
close_listeners(SrsListenerRtmpStream);
for (int i = 0; i < (int)ports.size(); i++) {
SrsListener* listener = new SrsListener(this, SrsListenerRtmpStream);
SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);
listeners.push_back(listener);
int port = ::atoi(ports[i].c_str());
@ -1012,7 +906,7 @@ int SrsServer::listen_http_api()
#ifdef SRS_AUTO_HTTP_API
close_listeners(SrsListenerHttpApi);
if (_srs_config->get_http_api_enabled()) {
SrsListener* listener = new SrsListener(this, SrsListenerHttpApi);
SrsListener* listener = new SrsStreamListener(this, SrsListenerHttpApi);
listeners.push_back(listener);
int port = _srs_config->get_http_api_listen();
@ -1033,7 +927,7 @@ int SrsServer::listen_http_stream()
#ifdef SRS_AUTO_HTTP_SERVER
close_listeners(SrsListenerHttpStream);
if (_srs_config->get_http_stream_enabled()) {
SrsListener* listener = new SrsListener(this, SrsListenerHttpStream);
SrsListener* listener = new SrsStreamListener(this, SrsListenerHttpStream);
listeners.push_back(listener);
int port = _srs_config->get_http_stream_listen();
@ -1067,7 +961,7 @@ int SrsServer::listen_stream_caster()
std::string caster = _srs_config->get_stream_caster_engine(stream_caster);
if (caster == SRS_CONF_DEFAULT_STREAM_CASTER_MPEGTS_OVER_UDP) {
listener = new SrsUdpListener(this, SrsListenerMpegTsOverUdp, stream_caster);
listener = new SrsUdpCasterListener(this, SrsListenerMpegTsOverUdp, stream_caster);
} else if (caster == SRS_CONF_DEFAULT_STREAM_CASTER_RTSP) {
listener = new SrsRtspListener(this, SrsListenerRtsp, stream_caster);
} else {