1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

srt pull/push server

This commit is contained in:
runner365 2020-01-14 21:10:33 +08:00
parent aeee3011ef
commit b53b75ea1e
12 changed files with 982 additions and 15 deletions

2
.gitignore vendored
View file

@ -16,6 +16,8 @@
*.pyc *.pyc
*.swp *.swp
.DS_Store .DS_Store
.vscode
.vscode/*
/trunk/Makefile /trunk/Makefile
/trunk/objs /trunk/objs
/trunk/src/build-qt-Desktop-Debug /trunk/src/build-qt-Desktop-Debug

75
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,75 @@
{
"files.associations": {
"string": "cpp",
"memory": "cpp",
"__bit_reference": "cpp",
"__config": "cpp",
"__debug": "cpp",
"__errc": "cpp",
"__functional_base": "cpp",
"__hash_table": "cpp",
"__locale": "cpp",
"__mutex_base": "cpp",
"__node_handle": "cpp",
"__nullptr": "cpp",
"__split_buffer": "cpp",
"__string": "cpp",
"__threading_support": "cpp",
"__tree": "cpp",
"__tuple": "cpp",
"algorithm": "cpp",
"array": "cpp",
"atomic": "cpp",
"bit": "cpp",
"bitset": "cpp",
"cctype": "cpp",
"chrono": "cpp",
"cmath": "cpp",
"complex": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdint": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"deque": "cpp",
"exception": "cpp",
"fstream": "cpp",
"functional": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"ios": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"iterator": "cpp",
"limits": "cpp",
"list": "cpp",
"locale": "cpp",
"map": "cpp",
"mutex": "cpp",
"new": "cpp",
"optional": "cpp",
"ostream": "cpp",
"ratio": "cpp",
"set": "cpp",
"sstream": "cpp",
"stack": "cpp",
"stdexcept": "cpp",
"streambuf": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"thread": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"typeinfo": "cpp",
"unordered_map": "cpp",
"utility": "cpp",
"vector": "cpp",
"hash_map": "cpp",
"hash_set": "cpp"
}
}

View file

@ -5,6 +5,8 @@ listen 1935;
max_connections 1000; max_connections 1000;
srs_log_tank file; srs_log_tank file;
srs_log_file ./objs/srs.log; srs_log_file ./objs/srs.log;
daemon off;
http_api { http_api {
enabled on; enabled on;
listen 1985; listen 1985;
@ -24,9 +26,12 @@ stats {
network 0; network 0;
disk sda sdb xvda xvdb; disk sda sdb xvda xvdb;
} }
vhost __defaultVhost__ { #vhost __defaultVhost__ {
forward { # forward {
enabled on; # enabled on;
destination 172.16.43.153:19350; # destination 172.16.43.153:19350;
} # }
#}
vhost sina.mobile.com.cn {
} }

11
trunk/configure vendored
View file

@ -133,8 +133,7 @@ ARFLAGS = -rs
LINK = g++ LINK = g++
CXXFLAGS = ${CXXFLAGS} CXXFLAGS = ${CXXFLAGS}
.PHONY: default srs srs_ingest_hls librtmp .PHONY: default srs librtmp
default: default:
END END
@ -160,7 +159,7 @@ if [ $SRS_GPERF_MD = YES ]; then
LibGperfFile="${SRS_OBJS_DIR}/gperf/lib/libtcmalloc_debug.a"; LibGperfFile="${SRS_OBJS_DIR}/gperf/lib/libtcmalloc_debug.a";
fi fi
# the link options, always use static link # the link options, always use static link
SrsLinkOptions="-pthread -ldl"; SrsLinkOptions="-pthread -ldl -lsrt";
if [[ $SRS_SSL == YES && $SRS_USE_SYS_SSL == YES ]]; then if [[ $SRS_SSL == YES && $SRS_USE_SYS_SSL == YES ]]; then
SrsLinkOptions="${SrsLinkOptions} -lssl -lcrypto"; SrsLinkOptions="${SrsLinkOptions} -lssl -lcrypto";
fi fi
@ -212,7 +211,7 @@ PROTOCOL_OBJS="${MODULE_OBJS[@]}"
MODULE_ID="SRT" MODULE_ID="SRT"
MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP") MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP")
ModuleLibIncs=(${SRS_OBJS_DIR}) ModuleLibIncs=(${SRS_OBJS_DIR})
MODULE_FILES=("srt_server") MODULE_FILES=("srt_server" "srt_handle" "srt_conn")
SRT_INCS="src/srt"; MODULE_DIR=${SRT_INCS} . auto/modules.sh SRT_INCS="src/srt"; MODULE_DIR=${SRT_INCS} . auto/modules.sh
SRT_OBJS="${MODULE_OBJS[@]}" SRT_OBJS="${MODULE_OBJS[@]}"
# #
@ -355,7 +354,7 @@ mv ${SRS_WORKDIR}/${SRS_MAKEFILE} ${SRS_WORKDIR}/${SRS_MAKEFILE}.bk
# generate phony header # generate phony header
cat << END > ${SRS_WORKDIR}/${SRS_MAKEFILE} cat << END > ${SRS_WORKDIR}/${SRS_MAKEFILE}
.PHONY: default _default install install-api help clean server srs_ingest_hls librtmp utest _prepare_dir $__mphonys .PHONY: default _default install install-api help clean server librtmp _prepare_dir $__mphonys
# install prefix. # install prefix.
SRS_PREFIX=${SRS_PREFIX} SRS_PREFIX=${SRS_PREFIX}
@ -370,7 +369,7 @@ END
# the server, librtmp and utest # the server, librtmp and utest
# where the bellow will check and disable some entry by only echo. # where the bellow will check and disable some entry by only echo.
cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE} cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE}
_default: server srs_ingest_hls librtmp utest __modules $__mdefaults _default: server librtmp __modules $__mdefaults
@bash objs/_srs_build_summary.sh @bash objs/_srs_build_summary.sh
help: help:

View file

@ -0,0 +1,82 @@
#include "srt_conn.hpp"
#include "time_help.h"
#include "stringex.hpp"
#include <vector>
srt_conn::srt_conn(SRTSOCKET conn_fd, const std::string& streamid):_conn_fd(conn_fd),
_streamid(streamid) {
get_streamid_info(streamid, _mode, _url_subpath);
_update_timestamp = now_ms();
std::vector<std::string> path_vec;
string_split(_url_subpath, "/", path_vec);
if (path_vec.size() >= 3) {
_vhost = path_vec[0];
} else {
_vhost = "__default_host__";
}
srs_trace("srt connect construct streamid:%s, mode:%d, subpath:%s, vhost:%s",
streamid.c_str(), _mode, _url_subpath.c_str(), _vhost.c_str());
}
srt_conn::~srt_conn() {
close();
}
std::string srt_conn::get_vhost() {
return _vhost;
}
void srt_conn::update_timestamp(long long now_ts) {
_update_timestamp = now_ts;
}
long long srt_conn::get_last_ts() {
return _update_timestamp;
}
void srt_conn::close() {
if (_conn_fd == SRT_INVALID_SOCK) {
return;
}
srt_close(_conn_fd);
_conn_fd = SRT_INVALID_SOCK;
}
SRTSOCKET srt_conn::get_conn() {
return _conn_fd;
}
int srt_conn::get_mode() {
return _mode;
}
std::string srt_conn::get_streamid() {
return _streamid;
}
std::string srt_conn::get_subpath() {
return _url_subpath;
}
int srt_conn::read(unsigned char* data, int len) {
int ret = 0;
ret = srt_recv(_conn_fd, (char*)data, len);
if (ret <= 0) {
srs_error("srt read error:%d, socket fd:%d", ret, _conn_fd);
return ret;
}
return ret;
}
int srt_conn::write(unsigned char* data, int len) {
int ret = 0;
ret = srt_send(_conn_fd, (char*)data, len);
if (ret <= 0) {
srs_error("srt write error:%d, socket fd:%d", ret, _conn_fd);
return ret;
}
return ret;
}

View file

@ -0,0 +1,69 @@
#ifndef SRT_CONN_H
#define SRT_CONN_H
#include <srt/srt.h>
#include <thread>
#include <memory>
#include <string>
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
#define PULL_SRT_MODE 0x01
#define PUSH_SRT_MODE 0x02
inline void get_streamid_info(const std::string& streamid, int& mode, std::string& url_subpash) {
std::string real_streamip;
size_t pos = streamid.find("?");
if (pos == std::string::npos) {
mode = PULL_SRT_MODE;
url_subpash = streamid;
} else {
std::string mode_str = streamid.substr(pos+1);
url_subpash = streamid.substr(0, pos);
size_t mode_pos = mode_str.find("m=");
if (mode_pos == std::string::npos) {
mode = PULL_SRT_MODE;
} else {
mode_str = mode_str.substr(mode_pos+2);
if (mode_str == "push") {
mode = PUSH_SRT_MODE;
} else {
mode = PULL_SRT_MODE;
}
}
}
}
class srt_conn {
public:
srt_conn(SRTSOCKET conn_fd, const std::string& streamid);
~srt_conn();
void close();
SRTSOCKET get_conn();
int get_mode();
std::string get_streamid();
std::string get_subpath();
std::string get_vhost();
int read(unsigned char* data, int len);
int write(unsigned char* data, int len);
void update_timestamp(long long now_ts);
long long get_last_ts();
private:
SRTSOCKET _conn_fd;
std::string _streamid;
std::string _url_subpath;
std::string _vhost;
int _mode;
long long _update_timestamp;
};
typedef std::shared_ptr<srt_conn> SRT_CONN_PTR;
#endif //SRT_CONN_H

View file

@ -0,0 +1,448 @@
#include "srt_handle.hpp"
#include "time_help.h"
#include <srt/udt.h>
#include <stdio.h>
#include <vector>
#include <sstream>
#include <iomanip>
#include <assert.h>
#include <list>
#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>
static bool MONITOR_STATICS_ENABLE = false;
static long long MONITOR_TIMEOUT = 5000;
const unsigned int DEF_DATA_SIZE = 188*7;
const long long CHECK_ALIVE_INTERVAL = 10*1000;
const long long CHECK_ALIVE_TIMEOUT = 15*1000;
long long srt_now_ms = 0;
srt_handle::srt_handle():_last_timestamp(0)
,_last_check_alive_ts(0) {
}
srt_handle::~srt_handle() {
}
int srt_handle::start() {
_handle_pollid = srt_epoll_create();
if (_handle_pollid < -1) {
srs_error("srt handle srt_epoll_create error, _handle_pollid=%d", _handle_pollid);
return -1;
}
srs_trace("srt handle is starting...");
_work_thread_ptr = std::make_shared<std::thread>(&srt_handle::onwork, this);
return 0;
}
void srt_handle::stop() {
_work_thread_ptr->join();
return;
}
void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) {
SRT_TRACEBSTATS mon;
srt_bstats(srtsocket, &mon, 1);
std::ostringstream output;
long long now_ul = now_ms();
if (!MONITOR_STATICS_ENABLE) {
return;
}
if (_last_timestamp == 0) {
_last_timestamp = now_ul;
return;
}
if ((now_ul - _last_timestamp) < MONITOR_TIMEOUT) {
return;
}
_last_timestamp = now_ul;
output << "======= SRT STATS: sid=" << streamid << std::endl;
output << "PACKETS SENT: " << std::setw(11) << mon.pktSent << " RECEIVED: " << std::setw(11) << mon.pktRecv << std::endl;
output << "LOST PKT SENT: " << std::setw(11) << mon.pktSndLoss << " RECEIVED: " << std::setw(11) << mon.pktRcvLoss << std::endl;
output << "REXMIT SENT: " << std::setw(11) << mon.pktRetrans << " RECEIVED: " << std::setw(11) << mon.pktRcvRetrans << std::endl;
output << "DROP PKT SENT: " << std::setw(11) << mon.pktSndDrop << " RECEIVED: " << std::setw(11) << mon.pktRcvDrop << std::endl;
output << "RATE SENDING: " << std::setw(11) << mon.mbpsSendRate << " RECEIVING: " << std::setw(11) << mon.mbpsRecvRate << std::endl;
output << "BELATED RECEIVED: " << std::setw(11) << mon.pktRcvBelated << " AVG TIME: " << std::setw(11) << mon.pktRcvAvgBelatedTime << std::endl;
output << "REORDER DISTANCE: " << std::setw(11) << mon.pktReorderDistance << std::endl;
output << "WINDOW FLOW: " << std::setw(11) << mon.pktFlowWindow << " CONGESTION: " << std::setw(11) << mon.pktCongestionWindow << " FLIGHT: " << std::setw(11) << mon.pktFlightSize << std::endl;
output << "LINK RTT: " << std::setw(9) << mon.msRTT << "ms BANDWIDTH: " << std::setw(7) << mon.mbpsBandwidth << "Mb/s " << std::endl;
output << "BUFFERLEFT: SND: " << std::setw(11) << mon.byteAvailSndBuf << " RCV: " << std::setw(11) << mon.byteAvailRcvBuf << std::endl;
srs_trace("\r\n%s", output.str().c_str());
return;
}
void srt_handle::add_new_puller(SRT_CONN_PTR conn_ptr, std::string stream_id) {
_conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
auto iter = _streamid_map.find(stream_id);
if (iter == _streamid_map.end()) {
std::unordered_map<SRTSOCKET, SRT_CONN_PTR> srtsocket_map;
srtsocket_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
_streamid_map.insert(std::make_pair(stream_id, srtsocket_map));
srs_trace("add new puller fd:%d, streamid:%s", conn_ptr->get_conn(), stream_id.c_str());
} else {
iter->second.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
srs_trace("add new puller fd:%d, streamid:%s, size:%d",
conn_ptr->get_conn(), stream_id.c_str(), iter->second.size());
}
return;
}
void srt_handle::close_pull_conn(SRTSOCKET srtsocket, std::string stream_id) {
srs_warn("close_pull_conn read_fd=%d, streamid=%s", srtsocket, stream_id.c_str());
srt_epoll_remove_usock(_handle_pollid, srtsocket);
auto streamid_iter = _streamid_map.find(stream_id);
if (streamid_iter != _streamid_map.end()) {
auto srtsocket_map = streamid_iter->second;
if (srtsocket_map.size() == 0) {
_streamid_map.erase(stream_id);
} else if (srtsocket_map.size() == 1) {
srtsocket_map.erase(srtsocket);
_streamid_map.erase(stream_id);
} else {
srtsocket_map.erase(srtsocket);
}
} else {
assert(0);
}
auto conn_iter = _conn_map.find(srtsocket);
if (conn_iter != _conn_map.end()) {
_conn_map.erase(conn_iter);
return;
} else {
assert(0);
}
return;
}
SRT_CONN_PTR srt_handle::get_srt_conn(SRTSOCKET conn_srt_socket) {
SRT_CONN_PTR ret_conn;
auto iter = _conn_map.find(conn_srt_socket);
if (iter == _conn_map.end()) {
return ret_conn;
}
ret_conn = iter->second;
return ret_conn;
}
void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) {
int val_i;
int opt_len = sizeof(int);
val_i = 1000;
srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, opt_len);
val_i = 2048;
srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, opt_len);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len);
srs_trace("srto SRTO_LATENCY=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len);
srs_trace("srto SRTO_SNDBUF=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len);
srs_trace("srto SRTO_RCVBUF=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, &opt_len);
srs_trace("srto SRTO_MAXBW=%d", val_i);
if (conn_ptr->get_mode() == PULL_SRT_MODE) {
add_new_puller(conn_ptr, conn_ptr->get_subpath());
} else {
if(add_new_pusher(conn_ptr) == false) {
conn_ptr->close();
return;
}
}
srs_trace("new conn added fd:%d, event:0x%08x", conn_ptr->get_conn(), events);
int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events);
if (ret < 0) {
srs_error("srt handle run add epoll error:%d", ret);
return;
}
return;
}
int srt_handle::get_srt_mode(SRTSOCKET conn_srt_socket) {
auto iter = _conn_map.find(conn_srt_socket);
if (iter == _conn_map.end()) {
return 0;
}
return iter->second->get_mode();
}
void srt_handle::insert_message_queue(request_message_t msg) {
std::unique_lock<std::mutex> lock(_queue_mutex);
_message_queue.push(msg);
}
bool srt_handle::get_message_from_queue(request_message_t& msg) {
std::unique_lock<std::mutex> lock(_queue_mutex);
bool ret = false;
if (_message_queue.empty()) {
return ret;
}
ret = true;
msg = _message_queue.front();
_message_queue.pop();
return ret;
}
void srt_handle::onwork()
{
const unsigned int SRT_FD_MAX = 1024;
SRT_SOCKSTATUS status = SRTS_INIT;
std::string streamid;
int ret;
const int64_t DEF_TIMEOUT_INTERVAL = 30;
srs_trace("srt handle epoll work is starting...");
while(true)
{
SRTSOCKET read_fds[SRT_FD_MAX];
SRTSOCKET write_fds[SRT_FD_MAX];
int rfd_num = SRT_FD_MAX;
int wfd_num = SRT_FD_MAX;
srt_now_ms = now_ms();
request_message_t msg;
if (get_message_from_queue(msg)) {
add_newconn(msg.conn_ptr, msg.events);
}
check_alive();
ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num,
DEF_TIMEOUT_INTERVAL, 0, 0, 0, 0);
if (ret < 0) {
srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld",
ret, srt_now_ms);
std::this_thread::sleep_for(std::chrono::milliseconds(30));
continue;
}
for (int index = 0; index < rfd_num; index++)
{
status = srt_getsockstate(read_fds[index]);
srs_info("srt handle read(push) rfd num:%d, status:%d, streamid:%s, read_fd",
rfd_num, status, streamid.c_str(), read_fds[index]);
handle_srt_socket(status, read_fds[index]);
}
for (int index = 0; index < wfd_num; index++)
{
status = srt_getsockstate(write_fds[index]);
streamid = UDT::getstreamid(write_fds[index]);
srs_info("srt handle write(puller) wfd num:%d, status:%d, streamid:%s, write_fd",
wfd_num, status, streamid.c_str(), write_fds[index]);
handle_srt_socket(status, write_fds[index]);
}
}
}
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
SRT_CONN_PTR srt_conn_ptr;
unsigned char data[DEF_DATA_SIZE];
int ret;
srt_conn_ptr = get_srt_conn(conn_fd);
if (!srt_conn_ptr) {
srs_error("handle_push_data fd:%d fail to find srt connection.", conn_fd);
return;
}
if (status != SRTS_CONNECTED) {
srs_error("handle_push_data error status:%d fd:%d", status, conn_fd);
close_push_conn(conn_fd);
return;
}
ret = srt_conn_ptr->read(data, DEF_DATA_SIZE);
if (ret <= 0) {
srs_error("handle_push_data srt connect read error:%d, fd:%d", ret, conn_fd);
close_push_conn(conn_fd);
return;
}
srt_conn_ptr->update_timestamp(srt_now_ms);
//send data to subscriber(players)
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
auto streamid_iter = _streamid_map.find(subpath);
if (streamid_iter == _streamid_map.end()) {//no puler
srs_info("receive data size(%d) from pusher(%d) but no puller", ret, conn_fd);
return;
}
srs_info("receive data size(%d) from pusher(%d) to pullers, count:%d",
ret, conn_fd, streamid_iter->second.size());
for (auto puller_iter = streamid_iter->second.begin();
puller_iter != streamid_iter->second.end();
puller_iter++) {
auto player_conn = puller_iter->second;
if (!player_conn) {
srs_error("handle_push_data get srt connect error from fd:%d", puller_iter->first);
continue;
}
int write_ret = player_conn->write(data, ret);
srs_info("send data size(%d) to puller fd:%d", write_ret, puller_iter->first);
if (write_ret > 0) {
puller_iter->second->update_timestamp(srt_now_ms);
}
}
return;
}
void srt_handle::check_alive() {
long long diff_t;
std::list<SRT_CONN_PTR> conn_list;
if (_last_check_alive_ts == 0) {
_last_check_alive_ts = srt_now_ms;
return;
}
diff_t = srt_now_ms - _last_check_alive_ts;
if (diff_t < CHECK_ALIVE_INTERVAL) {
return;
}
for (auto conn_iter = _conn_map.begin();
conn_iter != _conn_map.end();
conn_iter++)
{
long long timeout = srt_now_ms - conn_iter->second->get_last_ts();
if (timeout > CHECK_ALIVE_TIMEOUT) {
conn_list.push_back(conn_iter->second);
}
}
for (auto del_iter = conn_list.begin();
del_iter != conn_list.end();
del_iter++)
{
SRT_CONN_PTR conn_ptr = *del_iter;
if (conn_ptr->get_mode() == PUSH_SRT_MODE) {
srs_warn("check alive close pull connection fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_subpath().c_str());
close_push_conn(conn_ptr->get_conn());
} else if (conn_ptr->get_mode() == PULL_SRT_MODE) {
srs_warn("check alive close pull connection fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_subpath().c_str());
close_pull_conn(conn_ptr->get_conn(), conn_ptr->get_subpath());
} else {
srs_error("check_alive get unkown srt mode:%d, fd:%d",
conn_ptr->get_mode(), conn_ptr->get_conn());
assert(0);
}
}
}
void srt_handle::close_push_conn(SRTSOCKET srtsocket) {
auto iter = _conn_map.find(srtsocket);
if (iter == _conn_map.end()) {
return;
}
srt_epoll_remove_usock(_handle_pollid, srtsocket);
_conn_map.erase(iter);
}
bool srt_handle::add_new_pusher(SRT_CONN_PTR conn_ptr) {
_conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
srs_trace("srt_handle add new pusher streamid:%s, subpath:%s",
conn_ptr->get_streamid().c_str(), conn_ptr->get_subpath().c_str());
return true;
}
void srt_handle::handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
srs_info("handle_pull_data status:%d, subpath:%s, fd:%d",
status, subpath.c_str(), conn_fd);
auto conn_ptr = get_srt_conn(conn_fd);
if (!conn_ptr) {
srs_error("handle_pull_data fail to find fd(%d)", conn_fd);
assert(0);
return;
}
conn_ptr->update_timestamp(srt_now_ms);
}
void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd)
{
std::string subpath;
int mode;
auto conn_ptr = get_srt_conn(conn_fd);
if (!conn_ptr) {
if (status != SRTS_CLOSED) {
srs_error("handle_srt_socket find srt connection error, fd:%d, status:%d",
conn_fd, status);
}
return;
}
get_streamid_info(conn_ptr->get_streamid(), mode, subpath);
if (mode == PUSH_SRT_MODE) {
switch (status)
{
case SRTS_CONNECTED:
{
handle_push_data(status, subpath, conn_fd);
break;
}
case SRTS_BROKEN:
{
srs_warn("srt push disconnected event fd:%d, streamid:%s",
conn_fd, conn_ptr->get_streamid().c_str());
close_push_conn(conn_fd);
break;
}
default:
srs_error("push mode unkown status:%d, fd:%d", status, conn_fd);
break;
}
} else if (mode == PULL_SRT_MODE) {
switch (status)
{
case SRTS_CONNECTED:
{
handle_pull_data(status, subpath, conn_fd);
break;
}
case SRTS_BROKEN:
{
srs_warn("srt pull disconnected fd:%d, streamid:%s",
conn_fd, conn_ptr->get_streamid().c_str());
close_pull_conn(conn_fd, subpath);
break;
}
default:
srs_error("pull mode unkown status:%d, fd:%d", status, conn_fd);
break;
}
} else {
assert(0);
}
return;
}

View file

@ -0,0 +1,76 @@
#ifndef SRT_HANDLE_H
#define SRT_HANDLE_H
#include <srt/srt.h>
#include <thread>
#include <memory>
#include <unordered_map>
#include <queue>
#include <string.h>
#include <mutex>
#include "srt_conn.hpp"
typedef struct {
SRT_CONN_PTR conn_ptr;
int events;
} request_message_t;
class srt_handle {
public:
srt_handle();
~srt_handle();
int start();//create srt epoll and create epoll thread
void stop();//close srt epoll and end epoll thread
void insert_message_queue(request_message_t msg);
bool get_message_from_queue(request_message_t& msg);
private:
//add new srt connection into epoll event
void add_newconn(SRT_CONN_PTR conn_ptr, int events);
//get srt conn object by srt socket
SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket);
//get srt connect mode: push or pull
int get_srt_mode(SRTSOCKET conn_srt_socket);
void onwork();//epoll thread loop
//handle recv/send srt socket
void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd);
void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
//add new puller into puller list and conn_map
void add_new_puller(SRT_CONN_PTR, std::string stream_id);
//remove pull srt from play list
void close_pull_conn(SRTSOCKET srtsocket, std::string stream_id);
//add new pusher into pusher map: <socket fd, pusher conn ptr>
bool add_new_pusher(SRT_CONN_PTR conn_ptr);
//remove push connection and remove epoll
void close_push_conn(SRTSOCKET srtsocket);
//check srt connection whether it's still alive.
void check_alive();
//debug statics
void debug_statics(SRTSOCKET srtsocket, const std::string& streamid);
private:
int _handle_pollid;
std::unordered_map<SRTSOCKET, SRT_CONN_PTR> _conn_map;
std::shared_ptr<std::thread> _work_thread_ptr;
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
std::unordered_map<std::string, std::unordered_map<SRTSOCKET, SRT_CONN_PTR>> _streamid_map;
std::mutex _queue_mutex;
std::queue<request_message_t> _message_queue;
long long _last_timestamp;
long long _last_check_alive_ts;
};
#endif //SRT_HANDLE_H

View file

@ -1,13 +1,24 @@
#include "srt_server.hpp" #include "srt_server.hpp"
#include "srt_handle.hpp"
#include <srt/udt.h>
#include <thread> #include <thread>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdexcept>
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp> #include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp> #include <srs_app_config.hpp>
srt_server::srt_server(unsigned short port):listen_port(port) srt_server::srt_server(unsigned short port):listen_port(port)
,server_socket(-1)
{ {
handle_ptr = std::make_shared<srt_handle>();
} }
srt_server::~srt_server() srt_server::~srt_server()
@ -15,8 +26,65 @@ srt_server::~srt_server()
} }
int srt_server::init_srt() {
if (server_socket != -1) {
return -1;
}
server_socket = srt_create_socket();
sockaddr_in sa;
memset(&sa, 0, sizeof sa);
sa.sin_family = AF_INET;
sa.sin_port = htons(listen_port);
sockaddr* psa = (sockaddr*)&sa;
int ret = srt_bind(server_socket, psa, sizeof(sa));
if ( ret == SRT_ERROR )
{
srt_close(server_socket);
srs_error("srt bind error: %d", ret);
return -1;
}
ret = srt_listen(server_socket, 5);
if (ret == SRT_ERROR)
{
srt_close(server_socket);
srs_error("srt listen error: %d", ret);
return -2;
}
_pollid = srt_epoll_create();
if (_pollid < -1) {
srs_error("srt server srt_epoll_create error, port=%d", listen_port);
return -1;
}
int events = SRT_EPOLL_IN | SRT_EPOLL_ERR;
ret = srt_epoll_add_usock(_pollid, server_socket, &events);
if (ret < 0) {
srs_error("srt server run add epoll error:%d", ret);
return ret;
}
srs_trace("srt server listen port=%d, server_fd=%d", listen_port, server_socket);
return 0;
}
int srt_server::start() int srt_server::start()
{ {
int ret;
if ((ret = init_srt()) < 0) {
return ret;
}
ret = handle_ptr->start();
if (ret < 0) {
return ret;
}
run_flag = true; run_flag = true;
srs_trace("srt server is starting... port(%d)", listen_port); srs_trace("srt server is starting... port(%d)", listen_port);
thread_run_ptr = std::make_shared<std::thread>(&srt_server::on_work, this); thread_run_ptr = std::make_shared<std::thread>(&srt_server::on_work, this);
@ -30,14 +98,96 @@ void srt_server::stop()
return; return;
} }
thread_run_ptr->join(); thread_run_ptr->join();
handle_ptr->stop();
return; return;
} }
void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) {
SRTSOCKET conn_fd = -1;
sockaddr_in scl;
int sclen = sizeof(scl);
int conn_event;// = SRT_EPOLL_IN |SRT_EPOLL_OUT| SRT_EPOLL_ERR;
switch(status) {
case SRTS_LISTENING:
{
conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen);
if (conn_fd == -1) {
return;
}
//add new srt connect into srt handle
std::string streamid = UDT::getstreamid(conn_fd);
SRT_CONN_PTR srt_conn_ptr = std::make_shared<srt_conn>(conn_fd, streamid);
std::string vhost_str = srt_conn_ptr->get_vhost();
srs_trace("new srt connection streamid:%s, fd:%d, vhost:%s",
streamid.c_str(), conn_fd, vhost_str.c_str());
SrsConfDirective* vhost_p = _srs_config->get_vhost(vhost_str, true);
if (!vhost_p) {
srs_trace("srt streamid(%s): no vhost %s, fd:%d",
streamid.c_str(), vhost_str.c_str(), conn_fd);
srt_conn_ptr->close();
return;
}
if (srt_conn_ptr->get_mode() == PULL_SRT_MODE) {
//add SRT_EPOLL_IN for information notify
conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;//not inlucde SRT_EPOLL_OUT for save cpu
} else {
conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;
}
request_message_t msg = {srt_conn_ptr, conn_event};
handle_ptr->insert_message_queue(msg);
break;
}
case SRTS_CONNECTED:
{
srs_trace("srt connected: socket=%d, mode:%s", input_fd, dscr.c_str());
break;
}
case SRTS_BROKEN:
{
srt_epoll_remove_usock(_pollid, input_fd);
srt_close(input_fd);
srs_warn("srt close: socket=%d", input_fd);
break;
}
default:
{
srs_error("srt server unkown status:%d", status);
}
}
}
void srt_server::on_work() void srt_server::on_work()
{ {
const unsigned int SRT_FD_MAX = 100;
srs_trace("srt server is working port(%d)", listen_port); srs_trace("srt server is working port(%d)", listen_port);
while (run_flag) while (run_flag)
{ {
std::this_thread::sleep_for(std::chrono::milliseconds(10)); SRTSOCKET read_fds[SRT_FD_MAX];
SRTSOCKET write_fds[SRT_FD_MAX];
int rfd_num = SRT_FD_MAX;
int wfd_num = SRT_FD_MAX;
int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds, &wfd_num, -1,
nullptr, nullptr, nullptr, nullptr);
if (ret < 0) {
srs_error("listen srt epoll is timeout, port=%d", listen_port);
continue;
}
srs_trace("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d",
ret, rfd_num, wfd_num);
for (int index = 0; index < rfd_num; index++) {
SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);
srt_handle_connection(status, read_fds[index], "read fd");
}
for (int index = 0; index < wfd_num; index++) {
SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]);
srt_handle_connection(status, read_fds[index], "write fd");
}
} }
} }

View file

@ -1,23 +1,35 @@
#ifndef SRT_SERVER_H #ifndef SRT_SERVER_H
#define SRT_SERVER_H #define SRT_SERVER_H
#include <srt/srt.h>
#include <thread> #include <thread>
#include <memory> #include <memory>
class srt_handle;
class srt_server { class srt_server {
public: public:
srt_server(unsigned short port); srt_server(unsigned short port);
~srt_server(); ~srt_server();
int start(); int start();//init srt handl and create srt main thread loop
void stop(); void stop();//stop srt main thread loop
private: private:
//init srt socket and srt epoll
int init_srt();
//srt main epoll loop
void on_work(); void on_work();
//accept new srt connection
void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr);
private: private:
unsigned short listen_port; unsigned short listen_port;
SRTSOCKET server_socket;
int _pollid;
bool run_flag; bool run_flag;
std::shared_ptr<std::thread> thread_run_ptr; std::shared_ptr<std::thread> thread_run_ptr;
std::shared_ptr<srt_handle> handle_ptr;
}; };
typedef std::shared_ptr<srt_server> SRT_SERVER_PTR; typedef std::shared_ptr<srt_server> SRT_SERVER_PTR;

View file

@ -0,0 +1,39 @@
#ifndef STRING_EX_H
#define STRING_EX_H
#include <iostream>
#include <string.h>
#include <vector>
#include <algorithm>
#include <iterator>
#include <cctype>
inline int string_split(const std::string& input_str, const std::string& split_str, std::vector<std::string>& output_vec) {
if (input_str.length() == 0) {
return 0;
}
std::string tempString(input_str);
do {
size_t pos = tempString.find(split_str);
if (pos == tempString.npos) {
output_vec.push_back(tempString);
break;
}
std::string seg_str = tempString.substr(0, pos);
tempString = tempString.substr(pos+split_str.size());
output_vec.push_back(seg_str);
} while(tempString.size() > 0);
return output_vec.size();
}
inline std::string string_lower(const std::string input_str) {
std::string output_str(input_str);
std::transform(input_str.begin(), input_str.end(), output_str.begin(), ::tolower);
return output_str;
}
#endif//STRING_EX_H

10
trunk/src/srt/time_help.h Normal file
View file

@ -0,0 +1,10 @@
#ifndef TIME_HELP_H
#define TIME_HELP_H
#include <chrono>
inline long long now_ms() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
#endif //TIME_HELP_H