From c2012379afe04aec242fbd65556b828487a457f5 Mon Sep 17 00:00:00 2001 From: "Alex.CR" Date: Tue, 11 Feb 2020 20:23:39 -0600 Subject: [PATCH] change srt epoll thread to one thread (#1597) * if there isn't srt connect, it needn't epoll wait * solve repush srt bugs * change two thread to one thread * mpegts discard header is not 0x47 * add srt_epoll_clear_usocks --- trunk/src/srt/srt_handle.cpp | 108 +--------------------------------- trunk/src/srt/srt_handle.hpp | 35 +++-------- trunk/src/srt/srt_server.cpp | 60 +++++++++++-------- trunk/src/srt/srt_server.hpp | 8 ++- trunk/src/srt/srt_to_rtmp.cpp | 3 + trunk/src/srt/ts_demux.cpp | 6 +- 6 files changed, 57 insertions(+), 163 deletions(-) diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index f2bc2ff8c..dfba60cd1 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -22,7 +22,7 @@ const long long CHECK_ALIVE_TIMEOUT = 5*1000; long long srt_now_ms = 0; -srt_handle::srt_handle():_run_flag(false) +srt_handle::srt_handle(int pollid):_handle_pollid(pollid) ,_last_timestamp(0) ,_last_check_alive_ts(0) { } @@ -31,26 +31,6 @@ srt_handle::~srt_handle() { } -int srt_handle::start() { - _handle_pollid = srt_epoll_create(); - if (_handle_pollid < -1) { - srs_error("srt handle srt_epoll_create error, _handle_pollid=%d", _handle_pollid); - return -1; - } - - _run_flag = true; - srs_trace("srt handle is starting..."); - _work_thread_ptr = std::make_shared(&srt_handle::onwork, this); - - return 0; -} - -void srt_handle::stop() { - _run_flag = false; - _work_thread_ptr->join(); - return; -} - void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) { SRT_TRACEBSTATS mon; srt_bstats(srtsocket, &mon, 1); @@ -185,91 +165,6 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { return; } -int srt_handle::get_srt_mode(SRTSOCKET conn_srt_socket) { - auto iter = _conn_map.find(conn_srt_socket); - if (iter == _conn_map.end()) { - return 0; - } - return iter->second->get_mode(); -} - -void srt_handle::insert_message_queue(request_message_t msg) { - std::unique_lock lock(_queue_mutex); - _message_queue.push(msg); -} - -bool srt_handle::get_message_from_queue(request_message_t& msg) { - std::unique_lock lock(_queue_mutex); - bool ret = false; - - if (_message_queue.empty()) { - return ret; - } - ret = true; - msg = _message_queue.front(); - _message_queue.pop(); - - return ret; -} - -void srt_handle::onwork() -{ - const unsigned int SRT_FD_MAX = 1024; - SRT_SOCKSTATUS status = SRTS_INIT; - std::string streamid; - int ret; - const int64_t DEF_TIMEOUT_INTERVAL = 30; - - srs_trace("srt handle epoll work is starting..."); - while(_run_flag) - { - SRTSOCKET read_fds[SRT_FD_MAX]; - SRTSOCKET write_fds[SRT_FD_MAX]; - int rfd_num = SRT_FD_MAX; - int wfd_num = SRT_FD_MAX; - - srt_now_ms = now_ms(); - - request_message_t msg; - - if (get_message_from_queue(msg)) { - add_newconn(msg.conn_ptr, msg.events); - } - - if (_conn_map.empty()) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - continue; - } - check_alive(); - - ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num, - DEF_TIMEOUT_INTERVAL, 0, 0, 0, 0); - if (ret < 0) { - srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld", - ret, srt_now_ms); - continue; - } - - for (int index = 0; index < rfd_num; index++) - { - status = srt_getsockstate(read_fds[index]); - srs_info("srt handle read(push) rfd num:%d, status:%d, streamid:%s, read_fd", - rfd_num, status, streamid.c_str(), read_fds[index]); - handle_srt_socket(status, read_fds[index]); - } - - for (int index = 0; index < wfd_num; index++) - { - status = srt_getsockstate(write_fds[index]); - streamid = UDT::getstreamid(write_fds[index]); - srs_info("srt handle write(puller) wfd num:%d, status:%d, streamid:%s, write_fd", - wfd_num, status, streamid.c_str(), write_fds[index]); - handle_srt_socket(status, write_fds[index]); - } - - } -} - void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) { SRT_CONN_PTR srt_conn_ptr; unsigned char data[DEF_DATA_SIZE]; @@ -293,6 +188,7 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp close_push_conn(conn_fd); return; } + srt_conn_ptr->update_timestamp(srt_now_ms); srt2rtmp::get_instance()->insert_data_message(data, ret, subpath); diff --git a/trunk/src/srt/srt_handle.hpp b/trunk/src/srt/srt_handle.hpp index da555b714..d0ae14a78 100644 --- a/trunk/src/srt/srt_handle.hpp +++ b/trunk/src/srt/srt_handle.hpp @@ -12,33 +12,22 @@ #include "srt_conn.hpp" #include "srt_to_rtmp.hpp" -typedef struct { - SRT_CONN_PTR conn_ptr; - int events; -} request_message_t; - class srt_handle { public: - srt_handle(); + srt_handle(int pollid); ~srt_handle(); - int start();//create srt epoll and create epoll thread - void stop();//close srt epoll and end epoll thread - - void insert_message_queue(request_message_t msg); - bool get_message_from_queue(request_message_t& msg); - -private: //add new srt connection into epoll event void add_newconn(SRT_CONN_PTR conn_ptr, int events); - //get srt conn object by srt socket - SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket); - //get srt connect mode: push or pull - int get_srt_mode(SRTSOCKET conn_srt_socket); - - void onwork();//epoll thread loop //handle recv/send srt socket void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd); + //check srt connection whether it's still alive. + void check_alive(); + +private: + //get srt conn object by srt socket + SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket); + void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); @@ -51,27 +40,19 @@ private: bool add_new_pusher(SRT_CONN_PTR conn_ptr); //remove push connection and remove epoll void close_push_conn(SRTSOCKET srtsocket); - - //check srt connection whether it's still alive. - void check_alive(); //debug statics void debug_statics(SRTSOCKET srtsocket, const std::string& streamid); private: - bool _run_flag; int _handle_pollid; std::unordered_map _conn_map;//save all srt connection: pull or push - std::shared_ptr _work_thread_ptr; //save push srt connection for prevent from repeat push connection std::unordered_map _push_conn_map;//key:streamid, value:SRT_CONN_PTR //streamid, play map std::unordered_map> _streamid_map; - - std::mutex _queue_mutex; - std::queue _message_queue; long long _last_timestamp; long long _last_check_alive_ts; diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index 021f714ec..ae4918873 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -15,10 +15,9 @@ #include #include -srt_server::srt_server(unsigned short port):listen_port(port) - ,server_socket(-1) +srt_server::srt_server(unsigned short port):_listen_port(port) + ,_server_socket(-1) { - handle_ptr = std::make_shared(); } srt_server::~srt_server() @@ -27,48 +26,49 @@ srt_server::~srt_server() } int srt_server::init_srt() { - if (server_socket != -1) { + if (_server_socket != -1) { return -1; } - server_socket = srt_create_socket(); + _server_socket = srt_create_socket(); sockaddr_in sa; memset(&sa, 0, sizeof sa); sa.sin_family = AF_INET; - sa.sin_port = htons(listen_port); + sa.sin_port = htons(_listen_port); sockaddr* psa = (sockaddr*)&sa; - int ret = srt_bind(server_socket, psa, sizeof(sa)); + int ret = srt_bind(_server_socket, psa, sizeof(sa)); if ( ret == SRT_ERROR ) { - srt_close(server_socket); + srt_close(_server_socket); srs_error("srt bind error: %d", ret); return -1; } - ret = srt_listen(server_socket, 5); + ret = srt_listen(_server_socket, 5); if (ret == SRT_ERROR) { - srt_close(server_socket); + srt_close(_server_socket); srs_error("srt listen error: %d", ret); return -2; } _pollid = srt_epoll_create(); if (_pollid < -1) { - srs_error("srt server srt_epoll_create error, port=%d", listen_port); + srs_error("srt server srt_epoll_create error, port=%d", _listen_port); return -1; } + _handle_ptr = std::make_shared(_pollid); int events = SRT_EPOLL_IN | SRT_EPOLL_ERR; - ret = srt_epoll_add_usock(_pollid, server_socket, &events); + ret = srt_epoll_add_usock(_pollid, _server_socket, &events); if (ret < 0) { srs_error("srt server run add epoll error:%d", ret); return ret; } - srs_trace("srt server listen port=%d, server_fd=%d", listen_port, server_socket); + srs_trace("srt server listen port=%d, server_fd=%d", _listen_port, _server_socket); return 0; } @@ -80,13 +80,9 @@ int srt_server::start() if ((ret = init_srt()) < 0) { return ret; } - ret = handle_ptr->start(); - if (ret < 0) { - return ret; - } run_flag = true; - srs_trace("srt server is starting... port(%d)", listen_port); + srs_trace("srt server is starting... port(%d)", _listen_port); thread_run_ptr = std::make_shared(&srt_server::on_work, this); return 0; } @@ -99,7 +95,6 @@ void srt_server::stop() } thread_run_ptr->join(); - handle_ptr->stop(); return; } @@ -146,8 +141,8 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd srt_conn_ptr->close(); return; } - request_message_t msg = {srt_conn_ptr, conn_event}; - handle_ptr->insert_message_queue(msg); + + _handle_ptr->add_newconn(srt_conn_ptr, conn_event); break; } case SRTS_CONNECTED: @@ -169,10 +164,15 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd } } +void srt_server::srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) { + _handle_ptr->handle_srt_socket(status, input_fd); + return; +} + void srt_server::on_work() { const unsigned int SRT_FD_MAX = 100; - srs_trace("srt server is working port(%d)", listen_port); + srs_trace("srt server is working port(%d)", _listen_port); while (run_flag) { SRTSOCKET read_fds[SRT_FD_MAX]; @@ -185,19 +185,27 @@ void srt_server::on_work() if (ret < 0) { continue; } - srs_trace("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d", - ret, rfd_num, wfd_num); + _handle_ptr->check_alive(); for (int index = 0; index < rfd_num; index++) { SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]); - srt_handle_connection(status, read_fds[index], "read fd"); + if (_server_socket == read_fds[index]) { + srt_handle_connection(status, read_fds[index], "read fd"); + } else { + srt_handle_data(status, read_fds[index], "read fd"); + } } for (int index = 0; index < wfd_num; index++) { SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]); - srt_handle_connection(status, read_fds[index], "write fd"); + if (_server_socket == write_fds[index]) { + srt_handle_connection(status, write_fds[index], "write fd"); + } else { + srt_handle_data(status, write_fds[index], "write fd"); + } } } + srt_epoll_clear_usocks(_pollid); } SrtServerAdapter::SrtServerAdapter() diff --git a/trunk/src/srt/srt_server.hpp b/trunk/src/srt/srt_server.hpp index 27bb3bdfb..1234b891a 100644 --- a/trunk/src/srt/srt_server.hpp +++ b/trunk/src/srt/srt_server.hpp @@ -25,14 +25,16 @@ private: void on_work(); //accept new srt connection void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); + //get srt data read/write + void srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); private: - unsigned short listen_port; - SRTSOCKET server_socket; + unsigned short _listen_port; + SRTSOCKET _server_socket; int _pollid; bool run_flag; std::shared_ptr thread_run_ptr; - std::shared_ptr handle_ptr; + std::shared_ptr _handle_ptr; }; typedef std::shared_ptr SRT_SERVER_PTR; diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp index f8e037af3..729e3e970 100644 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -220,6 +220,8 @@ rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) _h264_sps_changed = false; _h264_pps_changed = false; _h264_sps_pps_sent = false; + + _last_live_ts = now_ms(); srs_trace("rtmp client construct url:%s", url_sz); } @@ -251,6 +253,7 @@ srs_error_t rtmp_client::connect() { srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; + _last_live_ts = now_ms(); if (_connect_flag) { return srs_success; } diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp index 4c10f871b..13cee93ec 100644 --- a/trunk/src/srt/ts_demux.cpp +++ b/trunk/src/srt/ts_demux.cpp @@ -296,7 +296,11 @@ int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback) path = data_ptr->get_path(); for (unsigned int index = 0; index < count; index++) { - ret = decode_unit(data_ptr->get_data() + 188*index, path, callback); + unsigned char* data = data_ptr->get_data() + 188*index; + if (data[0] != 0x47) { + continue; + } + ret = decode_unit(data, path, callback); if (ret < 0) { break;