1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

listen and serve stream caster mpegts over udp.

This commit is contained in:
winlin 2015-01-24 14:52:52 +08:00
parent f9d1e1111a
commit 52891b491a
8 changed files with 338 additions and 13 deletions

View file

@ -93,6 +93,22 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES
#define SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES 9
std::string __srs_listener_type2string(SrsListenerType type)
{
switch (type) {
case SrsListenerRtmpStream:
return "RTMP";
case SrsListenerHttpApi:
return "HTTP-API";
case SrsListenerHttpStream:
return "HTTP-Server";
case SrsListenerMpegTsOverUdp:
return "MPEG-TS over UDP";
default:
return "UNKONWN";
}
}
SrsListener::SrsListener(SrsServer* server, SrsListenerType type)
{
fd = -1;
@ -174,18 +190,15 @@ int SrsListener::listen(int port)
}
srs_verbose("create st listen thread success, port=%d", port);
srs_trace("listen thread cid=%d, current_cid=%d, "
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, port=%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, port);
srs_trace("%s listen at tcp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, fd);
return ret;
}
void SrsListener::on_thread_start()
{
srs_trace("listen cycle start, port=%d, type=%d, fd=%d", _port, _type, fd);
}
int SrsListener::cycle()
{
int ret = ERROR_SUCCESS;
@ -207,6 +220,86 @@ int SrsListener::cycle()
return ret;
}
SrsUdpListener::SrsUdpListener(SrsServer* server, SrsListenerType type) : SrsListener(server, type)
{
}
SrsUdpListener::~SrsUdpListener()
{
}
int SrsUdpListener::listen(int port)
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerMpegTsOverUdp);
_port = port;
if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
ret = ERROR_SOCKET_CREATE;
srs_error("create linux socket error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("create linux socket success. port=%d, fd=%d", port, fd);
int reuse_socket = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
ret = ERROR_SOCKET_SETREUSE;
srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, fd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(_port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
ret = ERROR_SOCKET_BIND;
srs_error("bind socket error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("bind socket success. port=%d, fd=%d", port, fd);
if ((stfd = st_netfd_open_socket(fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("st open socket success. port=%d, fd=%d", port, fd);
if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create listen thread error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("create st listen thread success, port=%d", port);
srs_info("listen thread cid=%d, current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, port=%d",
pthread->cid(), _srs_context->get_id(), _port, _type, fd, port);
srs_trace("%s listen at udp://%d, fd=%d", __srs_listener_type2string(_type).c_str(), _port, fd);
return ret;
}
int SrsUdpListener::cycle()
{
int ret = ERROR_SUCCESS;
// the caller already ensure the type is ok,
// we just assert here for unknown stream caster.
srs_assert(_type == SrsListenerMpegTsOverUdp);
// TODO: FIXME: recv udp packet.
st_sleep(1);
return ret;
}
SrsSignalManager* SrsSignalManager::instance = NULL;
SrsSignalManager::SrsSignalManager(SrsServer* server)
@ -608,6 +701,10 @@ int SrsServer::listen()
return ret;
}
if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
@ -869,6 +966,53 @@ int SrsServer::listen_http_stream()
return ret;
}
int SrsServer::listen_stream_caster()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_STREAM_CASTER
close_listeners(SrsListenerMpegTsOverUdp);
std::vector<SrsConfDirective*>::iterator it;
std::vector<SrsConfDirective*> stream_casters = _srs_config->get_stream_casters();
for (it = stream_casters.begin(); it != stream_casters.end(); ++it) {
SrsConfDirective* stream_caster = *it;
if (!_srs_config->get_stream_caster_enabled(stream_caster)) {
continue;
}
SrsUdpListener* listener = NULL;
std::string caster = _srs_config->get_stream_caster_engine(stream_caster);
if (caster == SRS_CONF_DEFAULT_STREAM_CASTER_MPEGTS_OVER_UDP) {
listener = new SrsUdpListener(this, SrsListenerMpegTsOverUdp);
} else {
ret = ERROR_STREAM_CASTER_ENGINE;
srs_error("unsupported stream caster %s. ret=%d", caster.c_str(), ret);
return ret;
}
srs_assert(listener != NULL);
listeners.push_back(listener);
int port = _srs_config->get_stream_caster_listen(stream_caster);
if (port <= 0) {
ret = ERROR_STREAM_CASTER_PORT;
srs_error("invalid stream caster port %d. ret=%d", port, ret);
return ret;
}
if ((ret = listener->listen(port)) != ERROR_SUCCESS) {
srs_error("StreamCaster listen at port %d failed. ret=%d", port, ret);
return ret;
}
}
#endif
return ret;
}
void SrsServer::close_listeners(SrsListenerType type)
{
std::vector<SrsListener*>::iterator it;