1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 20:01:56 +00:00

change srt epoll thread to one thread (#1597)

* if there isn't srt connect, it needn't epoll wait

* solve repush srt bugs

* change two thread to one thread

* mpegts discard header is not 0x47

* add srt_epoll_clear_usocks
This commit is contained in:
Alex.CR 2020-02-11 20:23:39 -06:00 committed by GitHub
parent 3847807534
commit c2012379af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 57 additions and 163 deletions

View file

@ -22,7 +22,7 @@ const long long CHECK_ALIVE_TIMEOUT = 5*1000;
long long srt_now_ms = 0; long long srt_now_ms = 0;
srt_handle::srt_handle():_run_flag(false) srt_handle::srt_handle(int pollid):_handle_pollid(pollid)
,_last_timestamp(0) ,_last_timestamp(0)
,_last_check_alive_ts(0) { ,_last_check_alive_ts(0) {
} }
@ -31,26 +31,6 @@ srt_handle::~srt_handle() {
} }
int srt_handle::start() {
_handle_pollid = srt_epoll_create();
if (_handle_pollid < -1) {
srs_error("srt handle srt_epoll_create error, _handle_pollid=%d", _handle_pollid);
return -1;
}
_run_flag = true;
srs_trace("srt handle is starting...");
_work_thread_ptr = std::make_shared<std::thread>(&srt_handle::onwork, this);
return 0;
}
void srt_handle::stop() {
_run_flag = false;
_work_thread_ptr->join();
return;
}
void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) { void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) {
SRT_TRACEBSTATS mon; SRT_TRACEBSTATS mon;
srt_bstats(srtsocket, &mon, 1); srt_bstats(srtsocket, &mon, 1);
@ -185,91 +165,6 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) {
return; return;
} }
int srt_handle::get_srt_mode(SRTSOCKET conn_srt_socket) {
auto iter = _conn_map.find(conn_srt_socket);
if (iter == _conn_map.end()) {
return 0;
}
return iter->second->get_mode();
}
void srt_handle::insert_message_queue(request_message_t msg) {
std::unique_lock<std::mutex> lock(_queue_mutex);
_message_queue.push(msg);
}
bool srt_handle::get_message_from_queue(request_message_t& msg) {
std::unique_lock<std::mutex> lock(_queue_mutex);
bool ret = false;
if (_message_queue.empty()) {
return ret;
}
ret = true;
msg = _message_queue.front();
_message_queue.pop();
return ret;
}
void srt_handle::onwork()
{
const unsigned int SRT_FD_MAX = 1024;
SRT_SOCKSTATUS status = SRTS_INIT;
std::string streamid;
int ret;
const int64_t DEF_TIMEOUT_INTERVAL = 30;
srs_trace("srt handle epoll work is starting...");
while(_run_flag)
{
SRTSOCKET read_fds[SRT_FD_MAX];
SRTSOCKET write_fds[SRT_FD_MAX];
int rfd_num = SRT_FD_MAX;
int wfd_num = SRT_FD_MAX;
srt_now_ms = now_ms();
request_message_t msg;
if (get_message_from_queue(msg)) {
add_newconn(msg.conn_ptr, msg.events);
}
if (_conn_map.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
check_alive();
ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num,
DEF_TIMEOUT_INTERVAL, 0, 0, 0, 0);
if (ret < 0) {
srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld",
ret, srt_now_ms);
continue;
}
for (int index = 0; index < rfd_num; index++)
{
status = srt_getsockstate(read_fds[index]);
srs_info("srt handle read(push) rfd num:%d, status:%d, streamid:%s, read_fd",
rfd_num, status, streamid.c_str(), read_fds[index]);
handle_srt_socket(status, read_fds[index]);
}
for (int index = 0; index < wfd_num; index++)
{
status = srt_getsockstate(write_fds[index]);
streamid = UDT::getstreamid(write_fds[index]);
srs_info("srt handle write(puller) wfd num:%d, status:%d, streamid:%s, write_fd",
wfd_num, status, streamid.c_str(), write_fds[index]);
handle_srt_socket(status, write_fds[index]);
}
}
}
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) { void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
SRT_CONN_PTR srt_conn_ptr; SRT_CONN_PTR srt_conn_ptr;
unsigned char data[DEF_DATA_SIZE]; unsigned char data[DEF_DATA_SIZE];
@ -293,6 +188,7 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
close_push_conn(conn_fd); close_push_conn(conn_fd);
return; return;
} }
srt_conn_ptr->update_timestamp(srt_now_ms); srt_conn_ptr->update_timestamp(srt_now_ms);
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath); srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);

View file

@ -12,33 +12,22 @@
#include "srt_conn.hpp" #include "srt_conn.hpp"
#include "srt_to_rtmp.hpp" #include "srt_to_rtmp.hpp"
typedef struct {
SRT_CONN_PTR conn_ptr;
int events;
} request_message_t;
class srt_handle { class srt_handle {
public: public:
srt_handle(); srt_handle(int pollid);
~srt_handle(); ~srt_handle();
int start();//create srt epoll and create epoll thread
void stop();//close srt epoll and end epoll thread
void insert_message_queue(request_message_t msg);
bool get_message_from_queue(request_message_t& msg);
private:
//add new srt connection into epoll event //add new srt connection into epoll event
void add_newconn(SRT_CONN_PTR conn_ptr, int events); void add_newconn(SRT_CONN_PTR conn_ptr, int events);
//get srt conn object by srt socket
SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket);
//get srt connect mode: push or pull
int get_srt_mode(SRTSOCKET conn_srt_socket);
void onwork();//epoll thread loop
//handle recv/send srt socket //handle recv/send srt socket
void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd); void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd);
//check srt connection whether it's still alive.
void check_alive();
private:
//get srt conn object by srt socket
SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket);
void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
@ -52,27 +41,19 @@ private:
//remove push connection and remove epoll //remove push connection and remove epoll
void close_push_conn(SRTSOCKET srtsocket); void close_push_conn(SRTSOCKET srtsocket);
//check srt connection whether it's still alive.
void check_alive();
//debug statics //debug statics
void debug_statics(SRTSOCKET srtsocket, const std::string& streamid); void debug_statics(SRTSOCKET srtsocket, const std::string& streamid);
private: private:
bool _run_flag;
int _handle_pollid; int _handle_pollid;
std::unordered_map<SRTSOCKET, SRT_CONN_PTR> _conn_map;//save all srt connection: pull or push std::unordered_map<SRTSOCKET, SRT_CONN_PTR> _conn_map;//save all srt connection: pull or push
std::shared_ptr<std::thread> _work_thread_ptr;
//save push srt connection for prevent from repeat push connection //save push srt connection for prevent from repeat push connection
std::unordered_map<std::string, SRT_CONN_PTR> _push_conn_map;//key:streamid, value:SRT_CONN_PTR std::unordered_map<std::string, SRT_CONN_PTR> _push_conn_map;//key:streamid, value:SRT_CONN_PTR
//streamid, play map<SRTSOCKET, SRT_CONN_PTR> //streamid, play map<SRTSOCKET, SRT_CONN_PTR>
std::unordered_map<std::string, std::unordered_map<SRTSOCKET, SRT_CONN_PTR>> _streamid_map; std::unordered_map<std::string, std::unordered_map<SRTSOCKET, SRT_CONN_PTR>> _streamid_map;
std::mutex _queue_mutex;
std::queue<request_message_t> _message_queue;
long long _last_timestamp; long long _last_timestamp;
long long _last_check_alive_ts; long long _last_check_alive_ts;
}; };

View file

@ -15,10 +15,9 @@
#include <srs_app_rtmp_conn.hpp> #include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp> #include <srs_app_config.hpp>
srt_server::srt_server(unsigned short port):listen_port(port) srt_server::srt_server(unsigned short port):_listen_port(port)
,server_socket(-1) ,_server_socket(-1)
{ {
handle_ptr = std::make_shared<srt_handle>();
} }
srt_server::~srt_server() srt_server::~srt_server()
@ -27,48 +26,49 @@ srt_server::~srt_server()
} }
int srt_server::init_srt() { int srt_server::init_srt() {
if (server_socket != -1) { if (_server_socket != -1) {
return -1; return -1;
} }
server_socket = srt_create_socket(); _server_socket = srt_create_socket();
sockaddr_in sa; sockaddr_in sa;
memset(&sa, 0, sizeof sa); memset(&sa, 0, sizeof sa);
sa.sin_family = AF_INET; sa.sin_family = AF_INET;
sa.sin_port = htons(listen_port); sa.sin_port = htons(_listen_port);
sockaddr* psa = (sockaddr*)&sa; sockaddr* psa = (sockaddr*)&sa;
int ret = srt_bind(server_socket, psa, sizeof(sa)); int ret = srt_bind(_server_socket, psa, sizeof(sa));
if ( ret == SRT_ERROR ) if ( ret == SRT_ERROR )
{ {
srt_close(server_socket); srt_close(_server_socket);
srs_error("srt bind error: %d", ret); srs_error("srt bind error: %d", ret);
return -1; return -1;
} }
ret = srt_listen(server_socket, 5); ret = srt_listen(_server_socket, 5);
if (ret == SRT_ERROR) if (ret == SRT_ERROR)
{ {
srt_close(server_socket); srt_close(_server_socket);
srs_error("srt listen error: %d", ret); srs_error("srt listen error: %d", ret);
return -2; return -2;
} }
_pollid = srt_epoll_create(); _pollid = srt_epoll_create();
if (_pollid < -1) { if (_pollid < -1) {
srs_error("srt server srt_epoll_create error, port=%d", listen_port); srs_error("srt server srt_epoll_create error, port=%d", _listen_port);
return -1; return -1;
} }
_handle_ptr = std::make_shared<srt_handle>(_pollid);
int events = SRT_EPOLL_IN | SRT_EPOLL_ERR; int events = SRT_EPOLL_IN | SRT_EPOLL_ERR;
ret = srt_epoll_add_usock(_pollid, server_socket, &events); ret = srt_epoll_add_usock(_pollid, _server_socket, &events);
if (ret < 0) { if (ret < 0) {
srs_error("srt server run add epoll error:%d", ret); srs_error("srt server run add epoll error:%d", ret);
return ret; return ret;
} }
srs_trace("srt server listen port=%d, server_fd=%d", listen_port, server_socket); srs_trace("srt server listen port=%d, server_fd=%d", _listen_port, _server_socket);
return 0; return 0;
} }
@ -80,13 +80,9 @@ int srt_server::start()
if ((ret = init_srt()) < 0) { if ((ret = init_srt()) < 0) {
return ret; return ret;
} }
ret = handle_ptr->start();
if (ret < 0) {
return ret;
}
run_flag = true; run_flag = true;
srs_trace("srt server is starting... port(%d)", listen_port); srs_trace("srt server is starting... port(%d)", _listen_port);
thread_run_ptr = std::make_shared<std::thread>(&srt_server::on_work, this); thread_run_ptr = std::make_shared<std::thread>(&srt_server::on_work, this);
return 0; return 0;
} }
@ -99,7 +95,6 @@ void srt_server::stop()
} }
thread_run_ptr->join(); thread_run_ptr->join();
handle_ptr->stop();
return; return;
} }
@ -146,8 +141,8 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd
srt_conn_ptr->close(); srt_conn_ptr->close();
return; return;
} }
request_message_t msg = {srt_conn_ptr, conn_event};
handle_ptr->insert_message_queue(msg); _handle_ptr->add_newconn(srt_conn_ptr, conn_event);
break; break;
} }
case SRTS_CONNECTED: case SRTS_CONNECTED:
@ -169,10 +164,15 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd
} }
} }
void srt_server::srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) {
_handle_ptr->handle_srt_socket(status, input_fd);
return;
}
void srt_server::on_work() void srt_server::on_work()
{ {
const unsigned int SRT_FD_MAX = 100; const unsigned int SRT_FD_MAX = 100;
srs_trace("srt server is working port(%d)", listen_port); srs_trace("srt server is working port(%d)", _listen_port);
while (run_flag) while (run_flag)
{ {
SRTSOCKET read_fds[SRT_FD_MAX]; SRTSOCKET read_fds[SRT_FD_MAX];
@ -185,19 +185,27 @@ void srt_server::on_work()
if (ret < 0) { if (ret < 0) {
continue; continue;
} }
srs_trace("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d", _handle_ptr->check_alive();
ret, rfd_num, wfd_num);
for (int index = 0; index < rfd_num; index++) { for (int index = 0; index < rfd_num; index++) {
SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]); SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);
if (_server_socket == read_fds[index]) {
srt_handle_connection(status, read_fds[index], "read fd"); srt_handle_connection(status, read_fds[index], "read fd");
} else {
srt_handle_data(status, read_fds[index], "read fd");
}
} }
for (int index = 0; index < wfd_num; index++) { for (int index = 0; index < wfd_num; index++) {
SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]); SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]);
srt_handle_connection(status, read_fds[index], "write fd"); if (_server_socket == write_fds[index]) {
srt_handle_connection(status, write_fds[index], "write fd");
} else {
srt_handle_data(status, write_fds[index], "write fd");
} }
} }
}
srt_epoll_clear_usocks(_pollid);
} }
SrtServerAdapter::SrtServerAdapter() SrtServerAdapter::SrtServerAdapter()

View file

@ -25,14 +25,16 @@ private:
void on_work(); void on_work();
//accept new srt connection //accept new srt connection
void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr);
//get srt data read/write
void srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr);
private: private:
unsigned short listen_port; unsigned short _listen_port;
SRTSOCKET server_socket; SRTSOCKET _server_socket;
int _pollid; int _pollid;
bool run_flag; bool run_flag;
std::shared_ptr<std::thread> thread_run_ptr; std::shared_ptr<std::thread> thread_run_ptr;
std::shared_ptr<srt_handle> handle_ptr; std::shared_ptr<srt_handle> _handle_ptr;
}; };
typedef std::shared_ptr<srt_server> SRT_SERVER_PTR; typedef std::shared_ptr<srt_server> SRT_SERVER_PTR;

View file

@ -220,6 +220,8 @@ rtmp_client::rtmp_client(std::string key_path):_key_path(key_path)
_h264_sps_changed = false; _h264_sps_changed = false;
_h264_pps_changed = false; _h264_pps_changed = false;
_h264_sps_pps_sent = false; _h264_sps_pps_sent = false;
_last_live_ts = now_ms();
srs_trace("rtmp client construct url:%s", url_sz); srs_trace("rtmp client construct url:%s", url_sz);
} }
@ -251,6 +253,7 @@ srs_error_t rtmp_client::connect() {
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
_last_live_ts = now_ms();
if (_connect_flag) { if (_connect_flag) {
return srs_success; return srs_success;
} }

View file

@ -296,7 +296,11 @@ int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback)
path = data_ptr->get_path(); path = data_ptr->get_path();
for (unsigned int index = 0; index < count; index++) for (unsigned int index = 0; index < count; index++)
{ {
ret = decode_unit(data_ptr->get_data() + 188*index, path, callback); unsigned char* data = data_ptr->get_data() + 188*index;
if (data[0] != 0x47) {
continue;
}
ret = decode_unit(data, path, callback);
if (ret < 0) if (ret < 0)
{ {
break; break;