diff --git a/.gitignore b/.gitignore index 6216052aa..f869a3544 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,8 @@ *.pyc *.swp .DS_Store +.vscode +.vscode/* /trunk/Makefile /trunk/objs /trunk/src/build-qt-Desktop-Debug diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..049520f83 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,75 @@ +{ + "files.associations": { + "string": "cpp", + "memory": "cpp", + "__bit_reference": "cpp", + "__config": "cpp", + "__debug": "cpp", + "__errc": "cpp", + "__functional_base": "cpp", + "__hash_table": "cpp", + "__locale": "cpp", + "__mutex_base": "cpp", + "__node_handle": "cpp", + "__nullptr": "cpp", + "__split_buffer": "cpp", + "__string": "cpp", + "__threading_support": "cpp", + "__tree": "cpp", + "__tuple": "cpp", + "algorithm": "cpp", + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "bitset": "cpp", + "cctype": "cpp", + "chrono": "cpp", + "cmath": "cpp", + "complex": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "exception": "cpp", + "fstream": "cpp", + "functional": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "ios": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "iterator": "cpp", + "limits": "cpp", + "list": "cpp", + "locale": "cpp", + "map": "cpp", + "mutex": "cpp", + "new": "cpp", + "optional": "cpp", + "ostream": "cpp", + "ratio": "cpp", + "set": "cpp", + "sstream": "cpp", + "stack": "cpp", + "stdexcept": "cpp", + "streambuf": "cpp", + "string_view": "cpp", + "system_error": "cpp", + "thread": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "typeinfo": "cpp", + "unordered_map": "cpp", + "utility": "cpp", + "vector": "cpp", + "hash_map": "cpp", + "hash_set": "cpp" + } +} \ No newline at end of file diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index a12cddcde..297292b1d 100644 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -5,6 +5,8 @@ listen 1935; max_connections 1000; srs_log_tank file; srs_log_file ./objs/srs.log; +daemon off; + http_api { enabled on; listen 1985; @@ -24,9 +26,12 @@ stats { network 0; disk sda sdb xvda xvdb; } -vhost __defaultVhost__ { - forward { - enabled on; - destination 172.16.43.153:19350; - } +#vhost __defaultVhost__ { +# forward { +# enabled on; +# destination 172.16.43.153:19350; +# } +#} + +vhost sina.mobile.com.cn { } diff --git a/trunk/configure b/trunk/configure index e3e31ef61..5cfcc9151 100755 --- a/trunk/configure +++ b/trunk/configure @@ -133,8 +133,7 @@ ARFLAGS = -rs LINK = g++ CXXFLAGS = ${CXXFLAGS} -.PHONY: default srs srs_ingest_hls librtmp - +.PHONY: default srs librtmp default: END @@ -160,7 +159,7 @@ if [ $SRS_GPERF_MD = YES ]; then LibGperfFile="${SRS_OBJS_DIR}/gperf/lib/libtcmalloc_debug.a"; fi # the link options, always use static link -SrsLinkOptions="-pthread -ldl"; +SrsLinkOptions="-pthread -ldl -lsrt"; if [[ $SRS_SSL == YES && $SRS_USE_SYS_SSL == YES ]]; then SrsLinkOptions="${SrsLinkOptions} -lssl -lcrypto"; fi @@ -212,7 +211,7 @@ PROTOCOL_OBJS="${MODULE_OBJS[@]}" MODULE_ID="SRT" MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP") ModuleLibIncs=(${SRS_OBJS_DIR}) -MODULE_FILES=("srt_server") +MODULE_FILES=("srt_server" "srt_handle" "srt_conn") SRT_INCS="src/srt"; MODULE_DIR=${SRT_INCS} . auto/modules.sh SRT_OBJS="${MODULE_OBJS[@]}" # @@ -355,7 +354,7 @@ mv ${SRS_WORKDIR}/${SRS_MAKEFILE} ${SRS_WORKDIR}/${SRS_MAKEFILE}.bk # generate phony header cat << END > ${SRS_WORKDIR}/${SRS_MAKEFILE} -.PHONY: default _default install install-api help clean server srs_ingest_hls librtmp utest _prepare_dir $__mphonys +.PHONY: default _default install install-api help clean server librtmp _prepare_dir $__mphonys # install prefix. SRS_PREFIX=${SRS_PREFIX} @@ -370,7 +369,7 @@ END # the server, librtmp and utest # where the bellow will check and disable some entry by only echo. cat << END >> ${SRS_WORKDIR}/${SRS_MAKEFILE} -_default: server srs_ingest_hls librtmp utest __modules $__mdefaults +_default: server librtmp __modules $__mdefaults @bash objs/_srs_build_summary.sh help: diff --git a/trunk/src/srt/srt_conn.cpp b/trunk/src/srt/srt_conn.cpp new file mode 100644 index 000000000..ab4d0584a --- /dev/null +++ b/trunk/src/srt/srt_conn.cpp @@ -0,0 +1,82 @@ +#include "srt_conn.hpp" +#include "time_help.h" +#include "stringex.hpp" +#include + +srt_conn::srt_conn(SRTSOCKET conn_fd, const std::string& streamid):_conn_fd(conn_fd), + _streamid(streamid) { + get_streamid_info(streamid, _mode, _url_subpath); + _update_timestamp = now_ms(); + + std::vector path_vec; + + string_split(_url_subpath, "/", path_vec); + if (path_vec.size() >= 3) { + _vhost = path_vec[0]; + } else { + _vhost = "__default_host__"; + } + srs_trace("srt connect construct streamid:%s, mode:%d, subpath:%s, vhost:%s", + streamid.c_str(), _mode, _url_subpath.c_str(), _vhost.c_str()); +} + +srt_conn::~srt_conn() { + close(); +} + +std::string srt_conn::get_vhost() { + return _vhost; +} + +void srt_conn::update_timestamp(long long now_ts) { + _update_timestamp = now_ts; +} + +long long srt_conn::get_last_ts() { + return _update_timestamp; +} + +void srt_conn::close() { + if (_conn_fd == SRT_INVALID_SOCK) { + return; + } + srt_close(_conn_fd); + _conn_fd = SRT_INVALID_SOCK; +} + +SRTSOCKET srt_conn::get_conn() { + return _conn_fd; +} +int srt_conn::get_mode() { + return _mode; +} + +std::string srt_conn::get_streamid() { + return _streamid; +} + +std::string srt_conn::get_subpath() { + return _url_subpath; +} + +int srt_conn::read(unsigned char* data, int len) { + int ret = 0; + + ret = srt_recv(_conn_fd, (char*)data, len); + if (ret <= 0) { + srs_error("srt read error:%d, socket fd:%d", ret, _conn_fd); + return ret; + } + return ret; +} + +int srt_conn::write(unsigned char* data, int len) { + int ret = 0; + + ret = srt_send(_conn_fd, (char*)data, len); + if (ret <= 0) { + srs_error("srt write error:%d, socket fd:%d", ret, _conn_fd); + return ret; + } + return ret; +} \ No newline at end of file diff --git a/trunk/src/srt/srt_conn.hpp b/trunk/src/srt/srt_conn.hpp new file mode 100644 index 000000000..91d79c06c --- /dev/null +++ b/trunk/src/srt/srt_conn.hpp @@ -0,0 +1,69 @@ +#ifndef SRT_CONN_H +#define SRT_CONN_H +#include +#include +#include +#include +#include +#include +#include +#include + +#define PULL_SRT_MODE 0x01 +#define PUSH_SRT_MODE 0x02 + +inline void get_streamid_info(const std::string& streamid, int& mode, std::string& url_subpash) { + std::string real_streamip; + + size_t pos = streamid.find("?"); + if (pos == std::string::npos) { + mode = PULL_SRT_MODE; + url_subpash = streamid; + } else { + std::string mode_str = streamid.substr(pos+1); + + url_subpash = streamid.substr(0, pos); + + size_t mode_pos = mode_str.find("m="); + if (mode_pos == std::string::npos) { + mode = PULL_SRT_MODE; + } else { + mode_str = mode_str.substr(mode_pos+2); + if (mode_str == "push") { + mode = PUSH_SRT_MODE; + } else { + mode = PULL_SRT_MODE; + } + } + } +} + +class srt_conn { +public: + srt_conn(SRTSOCKET conn_fd, const std::string& streamid); + ~srt_conn(); + + void close(); + SRTSOCKET get_conn(); + int get_mode(); + std::string get_streamid(); + std::string get_subpath(); + std::string get_vhost(); + int read(unsigned char* data, int len); + int write(unsigned char* data, int len); + + void update_timestamp(long long now_ts); + long long get_last_ts(); + +private: + SRTSOCKET _conn_fd; + std::string _streamid; + std::string _url_subpath; + std::string _vhost; + int _mode; + long long _update_timestamp; +}; + +typedef std::shared_ptr SRT_CONN_PTR; + +#endif //SRT_CONN_H \ No newline at end of file diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp new file mode 100644 index 000000000..09c28267a --- /dev/null +++ b/trunk/src/srt/srt_handle.cpp @@ -0,0 +1,448 @@ + +#include "srt_handle.hpp" +#include "time_help.h" +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +static bool MONITOR_STATICS_ENABLE = false; +static long long MONITOR_TIMEOUT = 5000; +const unsigned int DEF_DATA_SIZE = 188*7; +const long long CHECK_ALIVE_INTERVAL = 10*1000; +const long long CHECK_ALIVE_TIMEOUT = 15*1000; + +long long srt_now_ms = 0; + +srt_handle::srt_handle():_last_timestamp(0) + ,_last_check_alive_ts(0) { + +} + +srt_handle::~srt_handle() { + +} + +int srt_handle::start() { + _handle_pollid = srt_epoll_create(); + if (_handle_pollid < -1) { + srs_error("srt handle srt_epoll_create error, _handle_pollid=%d", _handle_pollid); + return -1; + } + + srs_trace("srt handle is starting..."); + _work_thread_ptr = std::make_shared(&srt_handle::onwork, this); + + return 0; +} + +void srt_handle::stop() { + _work_thread_ptr->join(); + return; +} + +void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) { + SRT_TRACEBSTATS mon; + srt_bstats(srtsocket, &mon, 1); + std::ostringstream output; + long long now_ul = now_ms(); + + if (!MONITOR_STATICS_ENABLE) { + return; + } + if (_last_timestamp == 0) { + _last_timestamp = now_ul; + return; + } + + if ((now_ul - _last_timestamp) < MONITOR_TIMEOUT) { + return; + } + _last_timestamp = now_ul; + output << "======= SRT STATS: sid=" << streamid << std::endl; + output << "PACKETS SENT: " << std::setw(11) << mon.pktSent << " RECEIVED: " << std::setw(11) << mon.pktRecv << std::endl; + output << "LOST PKT SENT: " << std::setw(11) << mon.pktSndLoss << " RECEIVED: " << std::setw(11) << mon.pktRcvLoss << std::endl; + output << "REXMIT SENT: " << std::setw(11) << mon.pktRetrans << " RECEIVED: " << std::setw(11) << mon.pktRcvRetrans << std::endl; + output << "DROP PKT SENT: " << std::setw(11) << mon.pktSndDrop << " RECEIVED: " << std::setw(11) << mon.pktRcvDrop << std::endl; + output << "RATE SENDING: " << std::setw(11) << mon.mbpsSendRate << " RECEIVING: " << std::setw(11) << mon.mbpsRecvRate << std::endl; + output << "BELATED RECEIVED: " << std::setw(11) << mon.pktRcvBelated << " AVG TIME: " << std::setw(11) << mon.pktRcvAvgBelatedTime << std::endl; + output << "REORDER DISTANCE: " << std::setw(11) << mon.pktReorderDistance << std::endl; + output << "WINDOW FLOW: " << std::setw(11) << mon.pktFlowWindow << " CONGESTION: " << std::setw(11) << mon.pktCongestionWindow << " FLIGHT: " << std::setw(11) << mon.pktFlightSize << std::endl; + output << "LINK RTT: " << std::setw(9) << mon.msRTT << "ms BANDWIDTH: " << std::setw(7) << mon.mbpsBandwidth << "Mb/s " << std::endl; + output << "BUFFERLEFT: SND: " << std::setw(11) << mon.byteAvailSndBuf << " RCV: " << std::setw(11) << mon.byteAvailRcvBuf << std::endl; + + srs_trace("\r\n%s", output.str().c_str()); + return; +} + +void srt_handle::add_new_puller(SRT_CONN_PTR conn_ptr, std::string stream_id) { + _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); + + auto iter = _streamid_map.find(stream_id); + if (iter == _streamid_map.end()) { + std::unordered_map srtsocket_map; + srtsocket_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); + + _streamid_map.insert(std::make_pair(stream_id, srtsocket_map)); + srs_trace("add new puller fd:%d, streamid:%s", conn_ptr->get_conn(), stream_id.c_str()); + } else { + iter->second.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); + srs_trace("add new puller fd:%d, streamid:%s, size:%d", + conn_ptr->get_conn(), stream_id.c_str(), iter->second.size()); + } + + return; +} + +void srt_handle::close_pull_conn(SRTSOCKET srtsocket, std::string stream_id) { + srs_warn("close_pull_conn read_fd=%d, streamid=%s", srtsocket, stream_id.c_str()); + srt_epoll_remove_usock(_handle_pollid, srtsocket); + + auto streamid_iter = _streamid_map.find(stream_id); + if (streamid_iter != _streamid_map.end()) { + auto srtsocket_map = streamid_iter->second; + if (srtsocket_map.size() == 0) { + _streamid_map.erase(stream_id); + } else if (srtsocket_map.size() == 1) { + srtsocket_map.erase(srtsocket); + _streamid_map.erase(stream_id); + } else { + srtsocket_map.erase(srtsocket); + } + } else { + assert(0); + } + + auto conn_iter = _conn_map.find(srtsocket); + if (conn_iter != _conn_map.end()) { + _conn_map.erase(conn_iter); + return; + } else { + assert(0); + } + + return; +} + +SRT_CONN_PTR srt_handle::get_srt_conn(SRTSOCKET conn_srt_socket) { + SRT_CONN_PTR ret_conn; + + auto iter = _conn_map.find(conn_srt_socket); + if (iter == _conn_map.end()) { + return ret_conn; + } + + ret_conn = iter->second; + + return ret_conn; +} + +void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { + int val_i; + int opt_len = sizeof(int); + + val_i = 1000; + srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, opt_len); + val_i = 2048; + srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, opt_len); + + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len); + srs_trace("srto SRTO_LATENCY=%d", val_i); + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len); + srs_trace("srto SRTO_SNDBUF=%d", val_i); + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len); + srs_trace("srto SRTO_RCVBUF=%d", val_i); + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, &opt_len); + srs_trace("srto SRTO_MAXBW=%d", val_i); + + if (conn_ptr->get_mode() == PULL_SRT_MODE) { + add_new_puller(conn_ptr, conn_ptr->get_subpath()); + } else { + if(add_new_pusher(conn_ptr) == false) { + conn_ptr->close(); + return; + } + } + srs_trace("new conn added fd:%d, event:0x%08x", conn_ptr->get_conn(), events); + int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events); + if (ret < 0) { + srs_error("srt handle run add epoll error:%d", ret); + return; + } + + return; +} + +int srt_handle::get_srt_mode(SRTSOCKET conn_srt_socket) { + auto iter = _conn_map.find(conn_srt_socket); + if (iter == _conn_map.end()) { + return 0; + } + return iter->second->get_mode(); +} + +void srt_handle::insert_message_queue(request_message_t msg) { + std::unique_lock lock(_queue_mutex); + _message_queue.push(msg); +} + +bool srt_handle::get_message_from_queue(request_message_t& msg) { + std::unique_lock lock(_queue_mutex); + bool ret = false; + + if (_message_queue.empty()) { + return ret; + } + ret = true; + msg = _message_queue.front(); + _message_queue.pop(); + + return ret; +} + +void srt_handle::onwork() +{ + const unsigned int SRT_FD_MAX = 1024; + SRT_SOCKSTATUS status = SRTS_INIT; + std::string streamid; + int ret; + const int64_t DEF_TIMEOUT_INTERVAL = 30; + + srs_trace("srt handle epoll work is starting..."); + while(true) + { + SRTSOCKET read_fds[SRT_FD_MAX]; + SRTSOCKET write_fds[SRT_FD_MAX]; + int rfd_num = SRT_FD_MAX; + int wfd_num = SRT_FD_MAX; + + srt_now_ms = now_ms(); + + request_message_t msg; + + if (get_message_from_queue(msg)) { + add_newconn(msg.conn_ptr, msg.events); + } + + check_alive(); + + ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num, + DEF_TIMEOUT_INTERVAL, 0, 0, 0, 0); + if (ret < 0) { + srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld", + ret, srt_now_ms); + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + continue; + } + + for (int index = 0; index < rfd_num; index++) + { + status = srt_getsockstate(read_fds[index]); + srs_info("srt handle read(push) rfd num:%d, status:%d, streamid:%s, read_fd", + rfd_num, status, streamid.c_str(), read_fds[index]); + handle_srt_socket(status, read_fds[index]); + } + + for (int index = 0; index < wfd_num; index++) + { + status = srt_getsockstate(write_fds[index]); + streamid = UDT::getstreamid(write_fds[index]); + srs_info("srt handle write(puller) wfd num:%d, status:%d, streamid:%s, write_fd", + wfd_num, status, streamid.c_str(), write_fds[index]); + handle_srt_socket(status, write_fds[index]); + } + + } +} + +void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) { + SRT_CONN_PTR srt_conn_ptr; + unsigned char data[DEF_DATA_SIZE]; + int ret; + srt_conn_ptr = get_srt_conn(conn_fd); + + if (!srt_conn_ptr) { + srs_error("handle_push_data fd:%d fail to find srt connection.", conn_fd); + return; + } + + if (status != SRTS_CONNECTED) { + srs_error("handle_push_data error status:%d fd:%d", status, conn_fd); + close_push_conn(conn_fd); + return; + } + + ret = srt_conn_ptr->read(data, DEF_DATA_SIZE); + if (ret <= 0) { + srs_error("handle_push_data srt connect read error:%d, fd:%d", ret, conn_fd); + close_push_conn(conn_fd); + return; + } + srt_conn_ptr->update_timestamp(srt_now_ms); + + //send data to subscriber(players) + //streamid, play map + auto streamid_iter = _streamid_map.find(subpath); + if (streamid_iter == _streamid_map.end()) {//no puler + srs_info("receive data size(%d) from pusher(%d) but no puller", ret, conn_fd); + return; + } + srs_info("receive data size(%d) from pusher(%d) to pullers, count:%d", + ret, conn_fd, streamid_iter->second.size()); + + for (auto puller_iter = streamid_iter->second.begin(); + puller_iter != streamid_iter->second.end(); + puller_iter++) { + auto player_conn = puller_iter->second; + if (!player_conn) { + srs_error("handle_push_data get srt connect error from fd:%d", puller_iter->first); + continue; + } + int write_ret = player_conn->write(data, ret); + srs_info("send data size(%d) to puller fd:%d", write_ret, puller_iter->first); + if (write_ret > 0) { + puller_iter->second->update_timestamp(srt_now_ms); + } + } + + return; +} + +void srt_handle::check_alive() { + long long diff_t; + std::list conn_list; + + if (_last_check_alive_ts == 0) { + _last_check_alive_ts = srt_now_ms; + return; + } + diff_t = srt_now_ms - _last_check_alive_ts; + if (diff_t < CHECK_ALIVE_INTERVAL) { + return; + } + + for (auto conn_iter = _conn_map.begin(); + conn_iter != _conn_map.end(); + conn_iter++) + { + long long timeout = srt_now_ms - conn_iter->second->get_last_ts(); + if (timeout > CHECK_ALIVE_TIMEOUT) { + conn_list.push_back(conn_iter->second); + } + } + + for (auto del_iter = conn_list.begin(); + del_iter != conn_list.end(); + del_iter++) + { + SRT_CONN_PTR conn_ptr = *del_iter; + if (conn_ptr->get_mode() == PUSH_SRT_MODE) { + srs_warn("check alive close pull connection fd:%d, streamid:%s", + conn_ptr->get_conn(), conn_ptr->get_subpath().c_str()); + close_push_conn(conn_ptr->get_conn()); + } else if (conn_ptr->get_mode() == PULL_SRT_MODE) { + srs_warn("check alive close pull connection fd:%d, streamid:%s", + conn_ptr->get_conn(), conn_ptr->get_subpath().c_str()); + close_pull_conn(conn_ptr->get_conn(), conn_ptr->get_subpath()); + } else { + srs_error("check_alive get unkown srt mode:%d, fd:%d", + conn_ptr->get_mode(), conn_ptr->get_conn()); + assert(0); + } + } +} + +void srt_handle::close_push_conn(SRTSOCKET srtsocket) { + auto iter = _conn_map.find(srtsocket); + if (iter == _conn_map.end()) { + return; + } + srt_epoll_remove_usock(_handle_pollid, srtsocket); + _conn_map.erase(iter); +} + +bool srt_handle::add_new_pusher(SRT_CONN_PTR conn_ptr) { + _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); + srs_trace("srt_handle add new pusher streamid:%s, subpath:%s", + conn_ptr->get_streamid().c_str(), conn_ptr->get_subpath().c_str()); + return true; +} + +void srt_handle::handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) { + srs_info("handle_pull_data status:%d, subpath:%s, fd:%d", + status, subpath.c_str(), conn_fd); + auto conn_ptr = get_srt_conn(conn_fd); + if (!conn_ptr) { + srs_error("handle_pull_data fail to find fd(%d)", conn_fd); + assert(0); + return; + } + conn_ptr->update_timestamp(srt_now_ms); +} + +void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) +{ + std::string subpath; + int mode; + auto conn_ptr = get_srt_conn(conn_fd); + + if (!conn_ptr) { + if (status != SRTS_CLOSED) { + srs_error("handle_srt_socket find srt connection error, fd:%d, status:%d", + conn_fd, status); + } + return; + } + get_streamid_info(conn_ptr->get_streamid(), mode, subpath); + + if (mode == PUSH_SRT_MODE) { + switch (status) + { + case SRTS_CONNECTED: + { + handle_push_data(status, subpath, conn_fd); + break; + } + case SRTS_BROKEN: + { + srs_warn("srt push disconnected event fd:%d, streamid:%s", + conn_fd, conn_ptr->get_streamid().c_str()); + close_push_conn(conn_fd); + break; + } + default: + srs_error("push mode unkown status:%d, fd:%d", status, conn_fd); + break; + } + } else if (mode == PULL_SRT_MODE) { + switch (status) + { + case SRTS_CONNECTED: + { + handle_pull_data(status, subpath, conn_fd); + break; + } + case SRTS_BROKEN: + { + srs_warn("srt pull disconnected fd:%d, streamid:%s", + conn_fd, conn_ptr->get_streamid().c_str()); + close_pull_conn(conn_fd, subpath); + break; + } + default: + srs_error("pull mode unkown status:%d, fd:%d", status, conn_fd); + break; + } + } else { + assert(0); + } + return; +} \ No newline at end of file diff --git a/trunk/src/srt/srt_handle.hpp b/trunk/src/srt/srt_handle.hpp new file mode 100644 index 000000000..3185b4678 --- /dev/null +++ b/trunk/src/srt/srt_handle.hpp @@ -0,0 +1,76 @@ +#ifndef SRT_HANDLE_H +#define SRT_HANDLE_H +#include + +#include +#include +#include +#include +#include +#include + +#include "srt_conn.hpp" + +typedef struct { + SRT_CONN_PTR conn_ptr; + int events; +} request_message_t; + +class srt_handle { +public: + srt_handle(); + ~srt_handle(); + + int start();//create srt epoll and create epoll thread + void stop();//close srt epoll and end epoll thread + + void insert_message_queue(request_message_t msg); + bool get_message_from_queue(request_message_t& msg); + +private: + //add new srt connection into epoll event + void add_newconn(SRT_CONN_PTR conn_ptr, int events); + //get srt conn object by srt socket + SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket); + //get srt connect mode: push or pull + int get_srt_mode(SRTSOCKET conn_srt_socket); + + void onwork();//epoll thread loop + //handle recv/send srt socket + void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd); + void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); + void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); + + //add new puller into puller list and conn_map + void add_new_puller(SRT_CONN_PTR, std::string stream_id); + //remove pull srt from play list + void close_pull_conn(SRTSOCKET srtsocket, std::string stream_id); + + //add new pusher into pusher map: + bool add_new_pusher(SRT_CONN_PTR conn_ptr); + //remove push connection and remove epoll + void close_push_conn(SRTSOCKET srtsocket); + + //check srt connection whether it's still alive. + void check_alive(); + + //debug statics + void debug_statics(SRTSOCKET srtsocket, const std::string& streamid); + +private: + int _handle_pollid; + + std::unordered_map _conn_map; + std::shared_ptr _work_thread_ptr; + + //streamid, play map + std::unordered_map> _streamid_map; + + std::mutex _queue_mutex; + std::queue _message_queue; + + long long _last_timestamp; + long long _last_check_alive_ts; +}; + +#endif //SRT_HANDLE_H \ No newline at end of file diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index f4f7ee8da..d4f0214de 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -1,13 +1,24 @@ #include "srt_server.hpp" +#include "srt_handle.hpp" +#include #include +#include +#include +#include +#include +#include +#include +#include + #include #include #include #include srt_server::srt_server(unsigned short port):listen_port(port) + ,server_socket(-1) { - + handle_ptr = std::make_shared(); } srt_server::~srt_server() @@ -15,8 +26,65 @@ srt_server::~srt_server() } +int srt_server::init_srt() { + if (server_socket != -1) { + return -1; + } + + server_socket = srt_create_socket(); + sockaddr_in sa; + memset(&sa, 0, sizeof sa); + sa.sin_family = AF_INET; + sa.sin_port = htons(listen_port); + + sockaddr* psa = (sockaddr*)&sa; + + int ret = srt_bind(server_socket, psa, sizeof(sa)); + if ( ret == SRT_ERROR ) + { + srt_close(server_socket); + srs_error("srt bind error: %d", ret); + return -1; + } + + ret = srt_listen(server_socket, 5); + if (ret == SRT_ERROR) + { + srt_close(server_socket); + srs_error("srt listen error: %d", ret); + return -2; + } + + _pollid = srt_epoll_create(); + if (_pollid < -1) { + srs_error("srt server srt_epoll_create error, port=%d", listen_port); + return -1; + } + + int events = SRT_EPOLL_IN | SRT_EPOLL_ERR; + ret = srt_epoll_add_usock(_pollid, server_socket, &events); + if (ret < 0) { + srs_error("srt server run add epoll error:%d", ret); + return ret; + } + + srs_trace("srt server listen port=%d, server_fd=%d", listen_port, server_socket); + + return 0; +} + int srt_server::start() { + int ret; + + if ((ret = init_srt()) < 0) { + return ret; + } + ret = handle_ptr->start(); + if (ret < 0) { + return ret; + } + run_flag = true; srs_trace("srt server is starting... port(%d)", listen_port); thread_run_ptr = std::make_shared(&srt_server::on_work, this); @@ -30,14 +98,96 @@ void srt_server::stop() return; } thread_run_ptr->join(); + + handle_ptr->stop(); return; } +void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) { + SRTSOCKET conn_fd = -1; + sockaddr_in scl; + int sclen = sizeof(scl); + int conn_event;// = SRT_EPOLL_IN |SRT_EPOLL_OUT| SRT_EPOLL_ERR; + + switch(status) { + case SRTS_LISTENING: + { + conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen); + if (conn_fd == -1) { + return; + } + //add new srt connect into srt handle + std::string streamid = UDT::getstreamid(conn_fd); + + SRT_CONN_PTR srt_conn_ptr = std::make_shared(conn_fd, streamid); + + std::string vhost_str = srt_conn_ptr->get_vhost(); + srs_trace("new srt connection streamid:%s, fd:%d, vhost:%s", + streamid.c_str(), conn_fd, vhost_str.c_str()); + SrsConfDirective* vhost_p = _srs_config->get_vhost(vhost_str, true); + if (!vhost_p) { + srs_trace("srt streamid(%s): no vhost %s, fd:%d", + streamid.c_str(), vhost_str.c_str(), conn_fd); + srt_conn_ptr->close(); + return; + } + if (srt_conn_ptr->get_mode() == PULL_SRT_MODE) { + //add SRT_EPOLL_IN for information notify + conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;//not inlucde SRT_EPOLL_OUT for save cpu + } else { + conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR; + } + request_message_t msg = {srt_conn_ptr, conn_event}; + handle_ptr->insert_message_queue(msg); + break; + } + case SRTS_CONNECTED: + { + srs_trace("srt connected: socket=%d, mode:%s", input_fd, dscr.c_str()); + break; + } + case SRTS_BROKEN: + { + srt_epoll_remove_usock(_pollid, input_fd); + srt_close(input_fd); + srs_warn("srt close: socket=%d", input_fd); + break; + } + default: + { + srs_error("srt server unkown status:%d", status); + } + } +} + void srt_server::on_work() { + const unsigned int SRT_FD_MAX = 100; srs_trace("srt server is working port(%d)", listen_port); while (run_flag) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + SRTSOCKET read_fds[SRT_FD_MAX]; + SRTSOCKET write_fds[SRT_FD_MAX]; + int rfd_num = SRT_FD_MAX; + int wfd_num = SRT_FD_MAX; + + int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds, &wfd_num, -1, + nullptr, nullptr, nullptr, nullptr); + if (ret < 0) { + srs_error("listen srt epoll is timeout, port=%d", listen_port); + continue; + } + srs_trace("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d", + ret, rfd_num, wfd_num); + + for (int index = 0; index < rfd_num; index++) { + SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]); + srt_handle_connection(status, read_fds[index], "read fd"); + } + + for (int index = 0; index < wfd_num; index++) { + SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]); + srt_handle_connection(status, read_fds[index], "write fd"); + } } } diff --git a/trunk/src/srt/srt_server.hpp b/trunk/src/srt/srt_server.hpp index 49229dc6d..7201b7263 100644 --- a/trunk/src/srt/srt_server.hpp +++ b/trunk/src/srt/srt_server.hpp @@ -1,23 +1,35 @@ #ifndef SRT_SERVER_H #define SRT_SERVER_H +#include + #include #include +class srt_handle; + class srt_server { public: srt_server(unsigned short port); ~srt_server(); - int start(); - void stop(); + int start();//init srt handl and create srt main thread loop + void stop();//stop srt main thread loop private: + //init srt socket and srt epoll + int init_srt(); + //srt main epoll loop void on_work(); + //accept new srt connection + void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); private: unsigned short listen_port; + SRTSOCKET server_socket; + int _pollid; bool run_flag; std::shared_ptr thread_run_ptr; + std::shared_ptr handle_ptr; }; typedef std::shared_ptr SRT_SERVER_PTR; diff --git a/trunk/src/srt/stringex.hpp b/trunk/src/srt/stringex.hpp new file mode 100644 index 000000000..36e8eb8b4 --- /dev/null +++ b/trunk/src/srt/stringex.hpp @@ -0,0 +1,39 @@ +#ifndef STRING_EX_H +#define STRING_EX_H +#include +#include +#include +#include +#include +#include + +inline int string_split(const std::string& input_str, const std::string& split_str, std::vector& output_vec) { + if (input_str.length() == 0) { + return 0; + } + + std::string tempString(input_str); + do { + + size_t pos = tempString.find(split_str); + if (pos == tempString.npos) { + output_vec.push_back(tempString); + break; + } + std::string seg_str = tempString.substr(0, pos); + tempString = tempString.substr(pos+split_str.size()); + output_vec.push_back(seg_str); + } while(tempString.size() > 0); + + return output_vec.size(); +} + +inline std::string string_lower(const std::string input_str) { + std::string output_str(input_str); + + std::transform(input_str.begin(), input_str.end(), output_str.begin(), ::tolower); + + return output_str; +} + +#endif//STRING_EX_H \ No newline at end of file diff --git a/trunk/src/srt/time_help.h b/trunk/src/srt/time_help.h new file mode 100644 index 000000000..301139df7 --- /dev/null +++ b/trunk/src/srt/time_help.h @@ -0,0 +1,10 @@ +#ifndef TIME_HELP_H +#define TIME_HELP_H +#include + +inline long long now_ms() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); +} + +#endif //TIME_HELP_H \ No newline at end of file