1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

solve repush srt bugs

This commit is contained in:
runner365 2020-02-09 11:09:48 +08:00
parent 1941f55475
commit 1c6203bda2
5 changed files with 76 additions and 10 deletions

View file

@ -1,13 +1,22 @@
#include "srt_data.hpp" #include "srt_data.hpp"
#include <string.h> #include <string.h>
SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path):_len(len) SRT_DATA_MSG::SRT_DATA_MSG(const std::string& path, unsigned int msg_type):_msg_type(msg_type)
,_len(0)
,_data_p(nullptr)
,_key_path(path) {
}
SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type)
,_len(len)
,_key_path(path) { ,_key_path(path) {
_data_p = new unsigned char[len]; _data_p = new unsigned char[len];
memset(_data_p, 0, len); memset(_data_p, 0, len);
} }
SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path):_len(len) SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type)
,_len(len)
,_key_path(path) ,_key_path(path)
{ {
_data_p = new unsigned char[len]; _data_p = new unsigned char[len];
@ -15,8 +24,14 @@ SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::s
} }
SRT_DATA_MSG::~SRT_DATA_MSG() { SRT_DATA_MSG::~SRT_DATA_MSG() {
if (_data_p && (_len > 0)) {
delete _data_p; delete _data_p;
} }
}
unsigned int SRT_DATA_MSG::msg_type() {
return _msg_type;
}
std::string SRT_DATA_MSG::get_path() { std::string SRT_DATA_MSG::get_path() {
return _key_path; return _key_path;

View file

@ -3,17 +3,23 @@
#include <string> #include <string>
#include <memory> #include <memory>
#define SRT_MSG_DATA_TYPE 0x01
#define SRT_MSG_CLOSE_TYPE 0x02
class SRT_DATA_MSG { class SRT_DATA_MSG {
public: public:
SRT_DATA_MSG(unsigned int len, const std::string& path); SRT_DATA_MSG(const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path); SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
~SRT_DATA_MSG(); ~SRT_DATA_MSG();
unsigned int msg_type();
unsigned int data_len(); unsigned int data_len();
unsigned char* get_data(); unsigned char* get_data();
std::string get_path(); std::string get_path();
private: private:
unsigned int _msg_type;
unsigned int _len; unsigned int _len;
unsigned char* _data_p; unsigned char* _data_p;
std::string _key_path; std::string _key_path;

View file

@ -17,8 +17,8 @@
static bool MONITOR_STATICS_ENABLE = false; static bool MONITOR_STATICS_ENABLE = false;
static long long MONITOR_TIMEOUT = 5000; static long long MONITOR_TIMEOUT = 5000;
const unsigned int DEF_DATA_SIZE = 188*7; const unsigned int DEF_DATA_SIZE = 188*7;
const long long CHECK_ALIVE_INTERVAL = 10*1000; const long long CHECK_ALIVE_INTERVAL = 5*1000;
const long long CHECK_ALIVE_TIMEOUT = 15*1000; const long long CHECK_ALIVE_TIMEOUT = 5*1000;
long long srt_now_ms = 0; long long srt_now_ms = 0;
@ -379,6 +379,7 @@ void srt_handle::close_push_conn(SRTSOCKET srtsocket) {
_push_conn_map.erase(push_iter); _push_conn_map.erase(push_iter);
} }
_conn_map.erase(iter); _conn_map.erase(iter);
srt2rtmp::get_instance()->insert_ctrl_message(SRT_MSG_CLOSE_TYPE, conn_ptr->get_subpath());
conn_ptr->close(); conn_ptr->close();
} }

View file

@ -61,6 +61,14 @@ void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, cons
return; return;
} }
void srt2rtmp::insert_ctrl_message(unsigned int msg_type, const std::string& key_path) {
std::unique_lock<std::mutex> locker(_mutex);
SRT_DATA_MSG_PTR msg_ptr = std::make_shared<SRT_DATA_MSG>(key_path, msg_type);
_msg_queue.push(msg_ptr);
//_notify_cond.notify_one();
return;
}
SRT_DATA_MSG_PTR srt2rtmp::get_data_message() { SRT_DATA_MSG_PTR srt2rtmp::get_data_message() {
std::unique_lock<std::mutex> locker(_mutex); std::unique_lock<std::mutex> locker(_mutex);
SRT_DATA_MSG_PTR msg_ptr; SRT_DATA_MSG_PTR msg_ptr;
@ -79,8 +87,8 @@ SRT_DATA_MSG_PTR srt2rtmp::get_data_message() {
} }
void srt2rtmp::check_rtmp_alive() { void srt2rtmp::check_rtmp_alive() {
const int64_t CHECK_INTERVAL = 15*1000; const int64_t CHECK_INTERVAL = 5*1000;
const int64_t ALIVE_TIMEOUT_MAX = 20*1000; const int64_t ALIVE_TIMEOUT_MAX = 5*1000;
if (_lastcheck_ts == 0) { if (_lastcheck_ts == 0) {
_lastcheck_ts = now_ms(); _lastcheck_ts = now_ms();
@ -108,6 +116,22 @@ void srt2rtmp::check_rtmp_alive() {
return; return;
} }
void srt2rtmp::handle_close_rtmpsession(const std::string& key_path) {
RTMP_CLIENT_PTR rtmp_ptr;
auto iter = _rtmp_client_map.find(key_path);
if (iter == _rtmp_client_map.end()) {
srs_error("fail to close rtmp session fail, can't find session by key_path:%s",
key_path.c_str());
return;
}
rtmp_ptr = iter->second;
_rtmp_client_map.erase(iter);
srs_trace("close rtmp session which key_path is %s", key_path.c_str());
rtmp_ptr->close();
return;
}
//the cycle is running in srs coroutine //the cycle is running in srs coroutine
srs_error_t srt2rtmp::cycle() { srs_error_t srt2rtmp::cycle() {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -119,7 +143,25 @@ srs_error_t srt2rtmp::cycle() {
if (!msg_ptr) { if (!msg_ptr) {
srs_usleep((30 * SRS_UTIME_MILLISECONDS)); srs_usleep((30 * SRS_UTIME_MILLISECONDS));
} else { } else {
switch (msg_ptr->msg_type()) {
case SRT_MSG_DATA_TYPE:
{
handle_ts_data(msg_ptr); handle_ts_data(msg_ptr);
break;
}
case SRT_MSG_CLOSE_TYPE:
{
handle_close_rtmpsession(msg_ptr->get_path());
break;
}
default:
{
srs_error("srt to rtmp get wrong message type(%u), path:%s",
msg_ptr->msg_type(), msg_ptr->get_path().c_str());
assert(0);
}
}
} }
check_rtmp_alive(); check_rtmp_alive();
if ((err = _trd_ptr->pull()) != srs_success) { if ((err = _trd_ptr->pull()) != srs_success) {

View file

@ -85,11 +85,13 @@ public:
void release(); void release();
void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path); void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path);
void insert_ctrl_message(unsigned int msg_type, const std::string& key_path);
private: private:
SRT_DATA_MSG_PTR get_data_message(); SRT_DATA_MSG_PTR get_data_message();
virtual srs_error_t cycle(); virtual srs_error_t cycle();
void handle_ts_data(SRT_DATA_MSG_PTR data_ptr); void handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
void handle_close_rtmpsession(const std::string& key_path);
void check_rtmp_alive(); void check_rtmp_alive();
private: private: