mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
SRT: Close connection if RTMP failed. (#2917)
* SRT: using global variables to pass errors (#2897) * SRT: using global variables to pass errors (#2897)
This commit is contained in:
parent
0a848430e9
commit
e8fca60ece
5 changed files with 49 additions and 25 deletions
|
@ -206,6 +206,19 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
|
||||||
srt_conn_ptr->update_timestamp(srt_now_ms);
|
srt_conn_ptr->update_timestamp(srt_now_ms);
|
||||||
|
|
||||||
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);
|
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> locker(srt2rtmp::_srt_error_mutex);
|
||||||
|
if (srt2rtmp::_srt_error_map.count(subpath) == 1) {
|
||||||
|
int err_code = srt2rtmp::_srt_error_map[subpath];
|
||||||
|
if (err_code != ERROR_SUCCESS) {
|
||||||
|
close_push_conn(conn_fd);
|
||||||
|
srt_log_error("handle_push_data srt to rtmp error:%d, fd:%d", err_code,conn_fd);
|
||||||
|
//todo: reset to next use, maybe update by srt2rtmp::cycle again
|
||||||
|
srt2rtmp::_srt_error_map[subpath] = ERROR_SUCCESS;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//send data to subscriber(players)
|
//send data to subscriber(players)
|
||||||
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
|
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#include <srs_kernel_stream.hpp>
|
#include <srs_kernel_stream.hpp>
|
||||||
|
|
||||||
std::shared_ptr<srt2rtmp> srt2rtmp::s_srt2rtmp_ptr;
|
std::shared_ptr<srt2rtmp> srt2rtmp::s_srt2rtmp_ptr;
|
||||||
|
std::mutex srt2rtmp::_srt_error_mutex;
|
||||||
|
std::map<std::string, int> srt2rtmp::_srt_error_map;
|
||||||
|
|
||||||
std::shared_ptr<srt2rtmp> srt2rtmp::get_instance() {
|
std::shared_ptr<srt2rtmp> srt2rtmp::get_instance() {
|
||||||
if (!s_srt2rtmp_ptr) {
|
if (!s_srt2rtmp_ptr) {
|
||||||
|
@ -153,6 +155,7 @@ void srt2rtmp::handle_close_rtmpsession(const std::string& key_path) {
|
||||||
srs_error_t srt2rtmp::cycle() {
|
srs_error_t srt2rtmp::cycle() {
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
_lastcheck_ts = 0;
|
_lastcheck_ts = 0;
|
||||||
|
int err_code = -1;
|
||||||
|
|
||||||
while(true) {
|
while(true) {
|
||||||
SRT_DATA_MSG_PTR msg_ptr = get_data_message();
|
SRT_DATA_MSG_PTR msg_ptr = get_data_message();
|
||||||
|
@ -163,7 +166,11 @@ srs_error_t srt2rtmp::cycle() {
|
||||||
switch (msg_ptr->msg_type()) {
|
switch (msg_ptr->msg_type()) {
|
||||||
case SRT_MSG_DATA_TYPE:
|
case SRT_MSG_DATA_TYPE:
|
||||||
{
|
{
|
||||||
handle_ts_data(msg_ptr);
|
err_code = handle_ts_data(msg_ptr);
|
||||||
|
if (err_code != ERROR_SUCCESS) {
|
||||||
|
std::unique_lock<std::mutex> locker(_srt_error_mutex);
|
||||||
|
_srt_error_map[msg_ptr->get_path()] = err_code;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case SRT_MSG_CLOSE_TYPE:
|
case SRT_MSG_CLOSE_TYPE:
|
||||||
|
@ -192,7 +199,7 @@ srs_error_t srt2rtmp::cycle() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
|
int srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
|
||||||
RTMP_CLIENT_PTR rtmp_ptr;
|
RTMP_CLIENT_PTR rtmp_ptr;
|
||||||
auto iter = _rtmp_client_map.find(data_ptr->get_path());
|
auto iter = _rtmp_client_map.find(data_ptr->get_path());
|
||||||
if (iter == _rtmp_client_map.end()) {
|
if (iter == _rtmp_client_map.end()) {
|
||||||
|
@ -203,9 +210,7 @@ void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
|
||||||
rtmp_ptr = iter->second;
|
rtmp_ptr = iter->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
rtmp_ptr->receive_ts_data(data_ptr);
|
return rtmp_ptr->receive_ts_data(data_ptr);
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void srt2rtmp::handle_log_data(SRT_DATA_MSG_PTR data_ptr) {
|
void srt2rtmp::handle_log_data(SRT_DATA_MSG_PTR data_ptr) {
|
||||||
|
@ -343,9 +348,8 @@ srs_error_t rtmp_client::connect() {
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) {
|
int rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) {
|
||||||
_ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback
|
return _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) {
|
srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) {
|
||||||
|
@ -668,13 +672,13 @@ srs_error_t rtmp_client::on_ts_audio(std::shared_ptr<SrsBuffer> avs_ptr, uint64_
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type,
|
int rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type,
|
||||||
uint64_t dts, uint64_t pts)
|
uint64_t dts, uint64_t pts)
|
||||||
{
|
{
|
||||||
srs_error_t err = srs_success;
|
srs_error_t err = srs_success;
|
||||||
if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) {
|
if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) {
|
||||||
assert(0);
|
assert(0);
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto avs_ptr = std::make_shared<SrsBuffer>((char*)data_ptr->get_data(), data_ptr->data_len());
|
auto avs_ptr = std::make_shared<SrsBuffer>((char*)data_ptr->get_data(), data_ptr->data_len());
|
||||||
|
@ -685,13 +689,16 @@ void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media
|
||||||
err = on_ts_audio(avs_ptr, dts, pts);
|
err = on_ts_audio(avs_ptr, dts, pts);
|
||||||
} else {
|
} else {
|
||||||
srs_error("mpegts demux unkown stream type:0x%02x, only support h264+aac", media_type);
|
srs_error("mpegts demux unkown stream type:0x%02x, only support h264+aac", media_type);
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (err != srs_success) {
|
if (err != srs_success) {
|
||||||
srs_error("send media data error:", srs_error_code(err));
|
srs_error("send media data error:%s", srs_error_desc(err).c_str());
|
||||||
|
int err_code = srs_error_code(err);
|
||||||
|
srs_freep(err);
|
||||||
|
return err_code;
|
||||||
}
|
}
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
rtmp_packet_queue::rtmp_packet_queue():_queue_timeout(QUEUE_DEF_TIMEOUT)
|
rtmp_packet_queue::rtmp_packet_queue():_queue_timeout(QUEUE_DEF_TIMEOUT)
|
||||||
|
|
|
@ -71,7 +71,7 @@ public:
|
||||||
rtmp_client(std::string key_path);
|
rtmp_client(std::string key_path);
|
||||||
virtual ~rtmp_client();
|
virtual ~rtmp_client();
|
||||||
|
|
||||||
void receive_ts_data(SRT_DATA_MSG_PTR data_ptr);
|
int receive_ts_data(SRT_DATA_MSG_PTR data_ptr);
|
||||||
int64_t get_last_live_ts();
|
int64_t get_last_live_ts();
|
||||||
std::string get_url();
|
std::string get_url();
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ public:
|
||||||
void close();
|
void close();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts);
|
virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
srs_error_t on_ts_video(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts);
|
srs_error_t on_ts_video(std::shared_ptr<SrsBuffer> avs_ptr, uint64_t dts, uint64_t pts);
|
||||||
|
@ -140,7 +140,7 @@ public:
|
||||||
private:
|
private:
|
||||||
SRT_DATA_MSG_PTR get_data_message();
|
SRT_DATA_MSG_PTR get_data_message();
|
||||||
virtual srs_error_t cycle();
|
virtual srs_error_t cycle();
|
||||||
void handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
|
int handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
|
||||||
void handle_close_rtmpsession(const std::string& key_path);
|
void handle_close_rtmpsession(const std::string& key_path);
|
||||||
void handle_log_data(SRT_DATA_MSG_PTR data_ptr);
|
void handle_log_data(SRT_DATA_MSG_PTR data_ptr);
|
||||||
void check_rtmp_alive();
|
void check_rtmp_alive();
|
||||||
|
@ -154,6 +154,9 @@ private:
|
||||||
|
|
||||||
std::unordered_map<std::string, RTMP_CLIENT_PTR> _rtmp_client_map;
|
std::unordered_map<std::string, RTMP_CLIENT_PTR> _rtmp_client_map;
|
||||||
int64_t _lastcheck_ts;
|
int64_t _lastcheck_ts;
|
||||||
|
public:
|
||||||
|
static std::mutex _srt_error_mutex;
|
||||||
|
static std::map<std::string, int> _srt_error_map;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -276,7 +276,9 @@ int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_C
|
||||||
uint64_t pts = 0;
|
uint64_t pts = 0;
|
||||||
|
|
||||||
//callback last media data in data buffer
|
//callback last media data in data buffer
|
||||||
on_callback(callback, _last_pid, key_path, _last_dts, _last_pts);
|
int err_code = on_callback(callback, _last_pid, key_path, _last_dts, _last_pts);
|
||||||
|
if (err_code != 0)
|
||||||
|
return err_code;
|
||||||
|
|
||||||
int ret = pes_parse(data_p+npos, npos, &ret_data_p, ret_size, dts, pts);
|
int ret = pes_parse(data_p+npos, npos, &ret_data_p, ret_size, dts, pts);
|
||||||
if (ret > 188) {
|
if (ret > 188) {
|
||||||
|
@ -320,7 +322,7 @@ int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ret = decode_unit(data, path, callback);
|
ret = decode_unit(data, path, callback);
|
||||||
if (ret < 0)
|
if (ret != 0) // srs_error_code is positive
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -335,15 +337,15 @@ void ts_demux::insert_into_databuf(unsigned char* data_p, size_t data_size, std:
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path,
|
int ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path,
|
||||||
uint64_t dts, uint64_t pts) {
|
uint64_t dts, uint64_t pts) {
|
||||||
if ((_data_total <=0 ) || (_data_buffer_vec.empty())) {
|
if ((_data_total <=0 ) || (_data_buffer_vec.empty())) {
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto iter = _pmt._pid2steamtype.find(pid);
|
auto iter = _pmt._pid2steamtype.find(pid);
|
||||||
if (iter == _pmt._pid2steamtype.end()) {
|
if (iter == _pmt._pid2steamtype.end()) {
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
unsigned char stream_type = iter->second;
|
unsigned char stream_type = iter->second;
|
||||||
auto total_data_ptr = std::make_shared<SRT_DATA_MSG>(_data_total, key_path);
|
auto total_data_ptr = std::make_shared<SRT_DATA_MSG>(_data_total, key_path);
|
||||||
|
@ -358,8 +360,7 @@ void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, st
|
||||||
_data_buffer_vec.clear();
|
_data_buffer_vec.clear();
|
||||||
_data_total = 0;
|
_data_total = 0;
|
||||||
|
|
||||||
callback->on_data_callback(total_data_ptr, stream_type, dts, pts);
|
return callback->on_data_callback(total_data_ptr, stream_type, dts, pts);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ts_demux::is_pmt(unsigned short pid) {
|
bool ts_demux::is_pmt(unsigned short pid) {
|
||||||
|
|
|
@ -74,7 +74,7 @@ Value Description
|
||||||
|
|
||||||
class ts_media_data_callback_I {
|
class ts_media_data_callback_I {
|
||||||
public:
|
public:
|
||||||
virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0;
|
virtual int on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef std::shared_ptr<ts_media_data_callback_I> TS_DATA_CALLBACK_PTR;
|
typedef std::shared_ptr<ts_media_data_callback_I> TS_DATA_CALLBACK_PTR;
|
||||||
|
@ -227,7 +227,7 @@ private:
|
||||||
int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size,
|
int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size,
|
||||||
uint64_t& dts, uint64_t& pts);
|
uint64_t& dts, uint64_t& pts);
|
||||||
void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid);
|
void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid);
|
||||||
void on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid,
|
int on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid,
|
||||||
std::string key_path, uint64_t dts, uint64_t pts);
|
std::string key_path, uint64_t dts, uint64_t pts);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue