mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
change two thread to one thread
This commit is contained in:
parent
1c6203bda2
commit
4fdf242e99
5 changed files with 54 additions and 161 deletions
|
@ -22,7 +22,7 @@ const long long CHECK_ALIVE_TIMEOUT = 5*1000;
|
|||
|
||||
long long srt_now_ms = 0;
|
||||
|
||||
srt_handle::srt_handle():_run_flag(false)
|
||||
srt_handle::srt_handle(int pollid):_handle_pollid(pollid)
|
||||
,_last_timestamp(0)
|
||||
,_last_check_alive_ts(0) {
|
||||
}
|
||||
|
@ -31,26 +31,6 @@ srt_handle::~srt_handle() {
|
|||
|
||||
}
|
||||
|
||||
int srt_handle::start() {
|
||||
_handle_pollid = srt_epoll_create();
|
||||
if (_handle_pollid < -1) {
|
||||
srs_error("srt handle srt_epoll_create error, _handle_pollid=%d", _handle_pollid);
|
||||
return -1;
|
||||
}
|
||||
|
||||
_run_flag = true;
|
||||
srs_trace("srt handle is starting...");
|
||||
_work_thread_ptr = std::make_shared<std::thread>(&srt_handle::onwork, this);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void srt_handle::stop() {
|
||||
_run_flag = false;
|
||||
_work_thread_ptr->join();
|
||||
return;
|
||||
}
|
||||
|
||||
void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) {
|
||||
SRT_TRACEBSTATS mon;
|
||||
srt_bstats(srtsocket, &mon, 1);
|
||||
|
@ -185,91 +165,6 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) {
|
|||
return;
|
||||
}
|
||||
|
||||
int srt_handle::get_srt_mode(SRTSOCKET conn_srt_socket) {
|
||||
auto iter = _conn_map.find(conn_srt_socket);
|
||||
if (iter == _conn_map.end()) {
|
||||
return 0;
|
||||
}
|
||||
return iter->second->get_mode();
|
||||
}
|
||||
|
||||
void srt_handle::insert_message_queue(request_message_t msg) {
|
||||
std::unique_lock<std::mutex> lock(_queue_mutex);
|
||||
_message_queue.push(msg);
|
||||
}
|
||||
|
||||
bool srt_handle::get_message_from_queue(request_message_t& msg) {
|
||||
std::unique_lock<std::mutex> lock(_queue_mutex);
|
||||
bool ret = false;
|
||||
|
||||
if (_message_queue.empty()) {
|
||||
return ret;
|
||||
}
|
||||
ret = true;
|
||||
msg = _message_queue.front();
|
||||
_message_queue.pop();
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void srt_handle::onwork()
|
||||
{
|
||||
const unsigned int SRT_FD_MAX = 1024;
|
||||
SRT_SOCKSTATUS status = SRTS_INIT;
|
||||
std::string streamid;
|
||||
int ret;
|
||||
const int64_t DEF_TIMEOUT_INTERVAL = 30;
|
||||
|
||||
srs_trace("srt handle epoll work is starting...");
|
||||
while(_run_flag)
|
||||
{
|
||||
SRTSOCKET read_fds[SRT_FD_MAX];
|
||||
SRTSOCKET write_fds[SRT_FD_MAX];
|
||||
int rfd_num = SRT_FD_MAX;
|
||||
int wfd_num = SRT_FD_MAX;
|
||||
|
||||
srt_now_ms = now_ms();
|
||||
|
||||
request_message_t msg;
|
||||
|
||||
if (get_message_from_queue(msg)) {
|
||||
add_newconn(msg.conn_ptr, msg.events);
|
||||
}
|
||||
|
||||
if (_conn_map.empty()) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
continue;
|
||||
}
|
||||
check_alive();
|
||||
|
||||
ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num,
|
||||
DEF_TIMEOUT_INTERVAL, 0, 0, 0, 0);
|
||||
if (ret < 0) {
|
||||
srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld",
|
||||
ret, srt_now_ms);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int index = 0; index < rfd_num; index++)
|
||||
{
|
||||
status = srt_getsockstate(read_fds[index]);
|
||||
srs_info("srt handle read(push) rfd num:%d, status:%d, streamid:%s, read_fd",
|
||||
rfd_num, status, streamid.c_str(), read_fds[index]);
|
||||
handle_srt_socket(status, read_fds[index]);
|
||||
}
|
||||
|
||||
for (int index = 0; index < wfd_num; index++)
|
||||
{
|
||||
status = srt_getsockstate(write_fds[index]);
|
||||
streamid = UDT::getstreamid(write_fds[index]);
|
||||
srs_info("srt handle write(puller) wfd num:%d, status:%d, streamid:%s, write_fd",
|
||||
wfd_num, status, streamid.c_str(), write_fds[index]);
|
||||
handle_srt_socket(status, write_fds[index]);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
|
||||
SRT_CONN_PTR srt_conn_ptr;
|
||||
unsigned char data[DEF_DATA_SIZE];
|
||||
|
@ -293,6 +188,7 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
|
|||
close_push_conn(conn_fd);
|
||||
return;
|
||||
}
|
||||
srs_trace("srt read data len:%d", ret);
|
||||
srt_conn_ptr->update_timestamp(srt_now_ms);
|
||||
|
||||
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue