mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
solve srt repush bugs (#1596)
* if there isn't srt connect, it needn't epoll wait * solve repush srt bugs
This commit is contained in:
parent
bdc7973596
commit
3847807534
5 changed files with 80 additions and 11 deletions
|
@ -1,13 +1,22 @@
|
||||||
#include "srt_data.hpp"
|
#include "srt_data.hpp"
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path):_len(len)
|
SRT_DATA_MSG::SRT_DATA_MSG(const std::string& path, unsigned int msg_type):_msg_type(msg_type)
|
||||||
|
,_len(0)
|
||||||
|
,_data_p(nullptr)
|
||||||
|
,_key_path(path) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type)
|
||||||
|
,_len(len)
|
||||||
,_key_path(path) {
|
,_key_path(path) {
|
||||||
_data_p = new unsigned char[len];
|
_data_p = new unsigned char[len];
|
||||||
memset(_data_p, 0, len);
|
memset(_data_p, 0, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path):_len(len)
|
SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type)
|
||||||
|
,_len(len)
|
||||||
,_key_path(path)
|
,_key_path(path)
|
||||||
{
|
{
|
||||||
_data_p = new unsigned char[len];
|
_data_p = new unsigned char[len];
|
||||||
|
@ -15,7 +24,13 @@ SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::s
|
||||||
}
|
}
|
||||||
|
|
||||||
SRT_DATA_MSG::~SRT_DATA_MSG() {
|
SRT_DATA_MSG::~SRT_DATA_MSG() {
|
||||||
delete _data_p;
|
if (_data_p && (_len > 0)) {
|
||||||
|
delete _data_p;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsigned int SRT_DATA_MSG::msg_type() {
|
||||||
|
return _msg_type;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string SRT_DATA_MSG::get_path() {
|
std::string SRT_DATA_MSG::get_path() {
|
||||||
|
|
|
@ -3,17 +3,23 @@
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
|
#define SRT_MSG_DATA_TYPE 0x01
|
||||||
|
#define SRT_MSG_CLOSE_TYPE 0x02
|
||||||
|
|
||||||
class SRT_DATA_MSG {
|
class SRT_DATA_MSG {
|
||||||
public:
|
public:
|
||||||
SRT_DATA_MSG(unsigned int len, const std::string& path);
|
SRT_DATA_MSG(const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
|
||||||
SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path);
|
SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
|
||||||
|
SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
|
||||||
~SRT_DATA_MSG();
|
~SRT_DATA_MSG();
|
||||||
|
|
||||||
|
unsigned int msg_type();
|
||||||
unsigned int data_len();
|
unsigned int data_len();
|
||||||
unsigned char* get_data();
|
unsigned char* get_data();
|
||||||
std::string get_path();
|
std::string get_path();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
unsigned int _msg_type;
|
||||||
unsigned int _len;
|
unsigned int _len;
|
||||||
unsigned char* _data_p;
|
unsigned char* _data_p;
|
||||||
std::string _key_path;
|
std::string _key_path;
|
||||||
|
|
|
@ -17,8 +17,8 @@
|
||||||
static bool MONITOR_STATICS_ENABLE = false;
|
static bool MONITOR_STATICS_ENABLE = false;
|
||||||
static long long MONITOR_TIMEOUT = 5000;
|
static long long MONITOR_TIMEOUT = 5000;
|
||||||
const unsigned int DEF_DATA_SIZE = 188*7;
|
const unsigned int DEF_DATA_SIZE = 188*7;
|
||||||
const long long CHECK_ALIVE_INTERVAL = 10*1000;
|
const long long CHECK_ALIVE_INTERVAL = 5*1000;
|
||||||
const long long CHECK_ALIVE_TIMEOUT = 15*1000;
|
const long long CHECK_ALIVE_TIMEOUT = 5*1000;
|
||||||
|
|
||||||
long long srt_now_ms = 0;
|
long long srt_now_ms = 0;
|
||||||
|
|
||||||
|
@ -236,6 +236,10 @@ void srt_handle::onwork()
|
||||||
add_newconn(msg.conn_ptr, msg.events);
|
add_newconn(msg.conn_ptr, msg.events);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_conn_map.empty()) {
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
check_alive();
|
check_alive();
|
||||||
|
|
||||||
ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num,
|
ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num,
|
||||||
|
@ -243,7 +247,6 @@ void srt_handle::onwork()
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld",
|
srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld",
|
||||||
ret, srt_now_ms);
|
ret, srt_now_ms);
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(30));
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -376,6 +379,7 @@ void srt_handle::close_push_conn(SRTSOCKET srtsocket) {
|
||||||
_push_conn_map.erase(push_iter);
|
_push_conn_map.erase(push_iter);
|
||||||
}
|
}
|
||||||
_conn_map.erase(iter);
|
_conn_map.erase(iter);
|
||||||
|
srt2rtmp::get_instance()->insert_ctrl_message(SRT_MSG_CLOSE_TYPE, conn_ptr->get_subpath());
|
||||||
conn_ptr->close();
|
conn_ptr->close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,14 @@ void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, cons
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void srt2rtmp::insert_ctrl_message(unsigned int msg_type, const std::string& key_path) {
|
||||||
|
std::unique_lock<std::mutex> locker(_mutex);
|
||||||
|
|
||||||
|
SRT_DATA_MSG_PTR msg_ptr = std::make_shared<SRT_DATA_MSG>(key_path, msg_type);
|
||||||
|
_msg_queue.push(msg_ptr);
|
||||||
|
//_notify_cond.notify_one();
|
||||||
|
return;
|
||||||
|
}
|
||||||
SRT_DATA_MSG_PTR srt2rtmp::get_data_message() {
|
SRT_DATA_MSG_PTR srt2rtmp::get_data_message() {
|
||||||
std::unique_lock<std::mutex> locker(_mutex);
|
std::unique_lock<std::mutex> locker(_mutex);
|
||||||
SRT_DATA_MSG_PTR msg_ptr;
|
SRT_DATA_MSG_PTR msg_ptr;
|
||||||
|
@ -79,8 +87,8 @@ SRT_DATA_MSG_PTR srt2rtmp::get_data_message() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void srt2rtmp::check_rtmp_alive() {
|
void srt2rtmp::check_rtmp_alive() {
|
||||||
const int64_t CHECK_INTERVAL = 15*1000;
|
const int64_t CHECK_INTERVAL = 5*1000;
|
||||||
const int64_t ALIVE_TIMEOUT_MAX = 20*1000;
|
const int64_t ALIVE_TIMEOUT_MAX = 5*1000;
|
||||||
|
|
||||||
if (_lastcheck_ts == 0) {
|
if (_lastcheck_ts == 0) {
|
||||||
_lastcheck_ts = now_ms();
|
_lastcheck_ts = now_ms();
|
||||||
|
@ -108,6 +116,22 @@ void srt2rtmp::check_rtmp_alive() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void srt2rtmp::handle_close_rtmpsession(const std::string& key_path) {
|
||||||
|
RTMP_CLIENT_PTR rtmp_ptr;
|
||||||
|
auto iter = _rtmp_client_map.find(key_path);
|
||||||
|
if (iter == _rtmp_client_map.end()) {
|
||||||
|
srs_error("fail to close rtmp session fail, can't find session by key_path:%s",
|
||||||
|
key_path.c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
rtmp_ptr = iter->second;
|
||||||
|
_rtmp_client_map.erase(iter);
|
||||||
|
srs_trace("close rtmp session which key_path is %s", key_path.c_str());
|
||||||
|
rtmp_ptr->close();
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
//the cycle is running in srs coroutine
|
//the cycle is running in srs coroutine
|
||||||
srs_error_t srt2rtmp::cycle() {
|
srs_error_t srt2rtmp::cycle() {
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
|
@ -119,7 +143,25 @@ srs_error_t srt2rtmp::cycle() {
|
||||||
if (!msg_ptr) {
|
if (!msg_ptr) {
|
||||||
srs_usleep((30 * SRS_UTIME_MILLISECONDS));
|
srs_usleep((30 * SRS_UTIME_MILLISECONDS));
|
||||||
} else {
|
} else {
|
||||||
handle_ts_data(msg_ptr);
|
switch (msg_ptr->msg_type()) {
|
||||||
|
case SRT_MSG_DATA_TYPE:
|
||||||
|
{
|
||||||
|
handle_ts_data(msg_ptr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case SRT_MSG_CLOSE_TYPE:
|
||||||
|
{
|
||||||
|
handle_close_rtmpsession(msg_ptr->get_path());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
srs_error("srt to rtmp get wrong message type(%u), path:%s",
|
||||||
|
msg_ptr->msg_type(), msg_ptr->get_path().c_str());
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
check_rtmp_alive();
|
check_rtmp_alive();
|
||||||
if ((err = _trd_ptr->pull()) != srs_success) {
|
if ((err = _trd_ptr->pull()) != srs_success) {
|
||||||
|
|
|
@ -85,11 +85,13 @@ public:
|
||||||
void release();
|
void release();
|
||||||
|
|
||||||
void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path);
|
void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path);
|
||||||
|
void insert_ctrl_message(unsigned int msg_type, const std::string& key_path);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
SRT_DATA_MSG_PTR get_data_message();
|
SRT_DATA_MSG_PTR get_data_message();
|
||||||
virtual srs_error_t cycle();
|
virtual srs_error_t cycle();
|
||||||
void handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
|
void handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
|
||||||
|
void handle_close_rtmpsession(const std::string& key_path);
|
||||||
void check_rtmp_alive();
|
void check_rtmp_alive();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue