From 1c6203bda27e2804cc32020ce8c841a38c5220ff Mon Sep 17 00:00:00 2001 From: runner365 Date: Sun, 9 Feb 2020 11:09:48 +0800 Subject: [PATCH] solve repush srt bugs --- trunk/src/srt/srt_data.cpp | 21 ++++++++++++--- trunk/src/srt/srt_data.hpp | 10 ++++++-- trunk/src/srt/srt_handle.cpp | 5 ++-- trunk/src/srt/srt_to_rtmp.cpp | 48 ++++++++++++++++++++++++++++++++--- trunk/src/srt/srt_to_rtmp.hpp | 2 ++ 5 files changed, 76 insertions(+), 10 deletions(-) diff --git a/trunk/src/srt/srt_data.cpp b/trunk/src/srt/srt_data.cpp index 075064b14..5fc766814 100644 --- a/trunk/src/srt/srt_data.cpp +++ b/trunk/src/srt/srt_data.cpp @@ -1,13 +1,22 @@ #include "srt_data.hpp" #include -SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path):_len(len) +SRT_DATA_MSG::SRT_DATA_MSG(const std::string& path, unsigned int msg_type):_msg_type(msg_type) + ,_len(0) + ,_data_p(nullptr) + ,_key_path(path) { + +} + +SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type) + ,_len(len) ,_key_path(path) { _data_p = new unsigned char[len]; memset(_data_p, 0, len); } -SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path):_len(len) +SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type) + ,_len(len) ,_key_path(path) { _data_p = new unsigned char[len]; @@ -15,7 +24,13 @@ SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::s } SRT_DATA_MSG::~SRT_DATA_MSG() { - delete _data_p; + if (_data_p && (_len > 0)) { + delete _data_p; + } +} + +unsigned int SRT_DATA_MSG::msg_type() { + return _msg_type; } std::string SRT_DATA_MSG::get_path() { diff --git a/trunk/src/srt/srt_data.hpp b/trunk/src/srt/srt_data.hpp index ab9cf81ea..cc0d9604e 100644 --- a/trunk/src/srt/srt_data.hpp +++ b/trunk/src/srt/srt_data.hpp @@ -3,17 +3,23 @@ #include #include +#define SRT_MSG_DATA_TYPE 0x01 +#define SRT_MSG_CLOSE_TYPE 0x02 + class SRT_DATA_MSG { public: - SRT_DATA_MSG(unsigned int len, const std::string& path); - SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path); + SRT_DATA_MSG(const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE); + SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE); + SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE); ~SRT_DATA_MSG(); + unsigned int msg_type(); unsigned int data_len(); unsigned char* get_data(); std::string get_path(); private: + unsigned int _msg_type; unsigned int _len; unsigned char* _data_p; std::string _key_path; diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index 918ba4ff8..f2bc2ff8c 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -17,8 +17,8 @@ static bool MONITOR_STATICS_ENABLE = false; static long long MONITOR_TIMEOUT = 5000; const unsigned int DEF_DATA_SIZE = 188*7; -const long long CHECK_ALIVE_INTERVAL = 10*1000; -const long long CHECK_ALIVE_TIMEOUT = 15*1000; +const long long CHECK_ALIVE_INTERVAL = 5*1000; +const long long CHECK_ALIVE_TIMEOUT = 5*1000; long long srt_now_ms = 0; @@ -379,6 +379,7 @@ void srt_handle::close_push_conn(SRTSOCKET srtsocket) { _push_conn_map.erase(push_iter); } _conn_map.erase(iter); + srt2rtmp::get_instance()->insert_ctrl_message(SRT_MSG_CLOSE_TYPE, conn_ptr->get_subpath()); conn_ptr->close(); } diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp index e86a3c92d..f8e037af3 100644 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -61,6 +61,14 @@ void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, cons return; } +void srt2rtmp::insert_ctrl_message(unsigned int msg_type, const std::string& key_path) { + std::unique_lock locker(_mutex); + + SRT_DATA_MSG_PTR msg_ptr = std::make_shared(key_path, msg_type); + _msg_queue.push(msg_ptr); + //_notify_cond.notify_one(); + return; +} SRT_DATA_MSG_PTR srt2rtmp::get_data_message() { std::unique_lock locker(_mutex); SRT_DATA_MSG_PTR msg_ptr; @@ -79,8 +87,8 @@ SRT_DATA_MSG_PTR srt2rtmp::get_data_message() { } void srt2rtmp::check_rtmp_alive() { - const int64_t CHECK_INTERVAL = 15*1000; - const int64_t ALIVE_TIMEOUT_MAX = 20*1000; + const int64_t CHECK_INTERVAL = 5*1000; + const int64_t ALIVE_TIMEOUT_MAX = 5*1000; if (_lastcheck_ts == 0) { _lastcheck_ts = now_ms(); @@ -108,6 +116,22 @@ void srt2rtmp::check_rtmp_alive() { return; } +void srt2rtmp::handle_close_rtmpsession(const std::string& key_path) { + RTMP_CLIENT_PTR rtmp_ptr; + auto iter = _rtmp_client_map.find(key_path); + if (iter == _rtmp_client_map.end()) { + srs_error("fail to close rtmp session fail, can't find session by key_path:%s", + key_path.c_str()); + return; + } + rtmp_ptr = iter->second; + _rtmp_client_map.erase(iter); + srs_trace("close rtmp session which key_path is %s", key_path.c_str()); + rtmp_ptr->close(); + + return; +} + //the cycle is running in srs coroutine srs_error_t srt2rtmp::cycle() { srs_error_t err = srs_success; @@ -119,7 +143,25 @@ srs_error_t srt2rtmp::cycle() { if (!msg_ptr) { srs_usleep((30 * SRS_UTIME_MILLISECONDS)); } else { - handle_ts_data(msg_ptr); + switch (msg_ptr->msg_type()) { + case SRT_MSG_DATA_TYPE: + { + handle_ts_data(msg_ptr); + break; + } + case SRT_MSG_CLOSE_TYPE: + { + handle_close_rtmpsession(msg_ptr->get_path()); + break; + } + default: + { + srs_error("srt to rtmp get wrong message type(%u), path:%s", + msg_ptr->msg_type(), msg_ptr->get_path().c_str()); + assert(0); + } + } + } check_rtmp_alive(); if ((err = _trd_ptr->pull()) != srs_success) { diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp index c361e7cd0..08735c09d 100644 --- a/trunk/src/srt/srt_to_rtmp.hpp +++ b/trunk/src/srt/srt_to_rtmp.hpp @@ -85,11 +85,13 @@ public: void release(); void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path); + void insert_ctrl_message(unsigned int msg_type, const std::string& key_path); private: SRT_DATA_MSG_PTR get_data_message(); virtual srs_error_t cycle(); void handle_ts_data(SRT_DATA_MSG_PTR data_ptr); + void handle_close_rtmpsession(const std::string& key_path); void check_rtmp_alive(); private: