From dece458a90f0215e1be4108820c3f0cd56715eb9 Mon Sep 17 00:00:00 2001 From: runner365 Date: Sat, 25 Jan 2020 16:15:27 +0800 Subject: [PATCH] solve rtmp client timeout bugs in srt2rtmp --- trunk/src/srt/srt_conn.cpp | 2 +- trunk/src/srt/srt_to_rtmp.cpp | 65 +++++++++++++++++++++++++++++++---- trunk/src/srt/srt_to_rtmp.hpp | 11 ++++-- 3 files changed, 68 insertions(+), 10 deletions(-) diff --git a/trunk/src/srt/srt_conn.cpp b/trunk/src/srt/srt_conn.cpp index 2df80b011..a0fd93ad9 100644 --- a/trunk/src/srt/srt_conn.cpp +++ b/trunk/src/srt/srt_conn.cpp @@ -61,7 +61,7 @@ bool get_streamid_info(const std::string& streamid, int& mode, std::string& url_ return false; } - for (int index = 0; index < info_vec.size(); index++) { + for (size_t index = 0; index < info_vec.size(); index++) { std::string key; std::string value; diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp index ecb348b20..e86a3c92d 100644 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -1,11 +1,13 @@ #include "srt_to_rtmp.hpp" +#include "stringex.hpp" +#include "time_help.h" #include #include #include #include #include #include -#include "stringex.hpp" +#include std::shared_ptr srt2rtmp::s_srt2rtmp_ptr; @@ -16,7 +18,7 @@ std::shared_ptr srt2rtmp::get_instance() { return s_srt2rtmp_ptr; } -srt2rtmp::srt2rtmp() { +srt2rtmp::srt2rtmp():_lastcheck_ts(0) { } @@ -36,6 +38,7 @@ srs_error_t srt2rtmp::init() { if ((err = _trd_ptr->start()) != srs_success) { return srs_error_wrap(err, "start thread"); } + srs_trace("srt2rtmp start coroutine..."); return err; @@ -75,9 +78,40 @@ SRT_DATA_MSG_PTR srt2rtmp::get_data_message() { return msg_ptr; } +void srt2rtmp::check_rtmp_alive() { + const int64_t CHECK_INTERVAL = 15*1000; + const int64_t ALIVE_TIMEOUT_MAX = 20*1000; + + if (_lastcheck_ts == 0) { + _lastcheck_ts = now_ms(); + return; + } + int64_t timenow_ms = now_ms(); + + if ((timenow_ms - _lastcheck_ts) > CHECK_INTERVAL) { + _lastcheck_ts = timenow_ms; + + for (auto iter = _rtmp_client_map.begin(); + iter != _rtmp_client_map.end();) { + RTMP_CLIENT_PTR rtmp_ptr = iter->second; + + if ((timenow_ms - rtmp_ptr->get_last_live_ts()) >= ALIVE_TIMEOUT_MAX) { + srs_warn("srt2rtmp client is timeout, url:%s", + rtmp_ptr->get_url().c_str()); + _rtmp_client_map.erase(iter++); + rtmp_ptr->close(); + } else { + iter++; + } + } + } + return; +} + //the cycle is running in srs coroutine srs_error_t srt2rtmp::cycle() { srs_error_t err = srs_success; + _lastcheck_ts = 0; while(true) { SRT_DATA_MSG_PTR msg_ptr = get_data_message(); @@ -87,7 +121,7 @@ srs_error_t srt2rtmp::cycle() { } else { handle_ts_data(msg_ptr); } - + check_rtmp_alive(); if ((err = _trd_ptr->pull()) != srs_success) { return srs_error_wrap(err, "forwarder"); } @@ -112,6 +146,7 @@ void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) { rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) , _connect_flag(false) { + const std::string DEF_VHOST = "DEFAULT_VHOST"; _ts_demux_ptr = std::make_shared(); _avc_ptr = std::make_shared(); _aac_ptr = std::make_shared(); @@ -124,13 +159,20 @@ rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) _appname = ret_vec[1]; _streamname = ret_vec[2]; } else { - _vhost = "DEFAULT_VHOST"; + _vhost = DEF_VHOST; _appname = ret_vec[0]; _streamname = ret_vec[1]; } char url_sz[128]; - sprintf(url_sz, "rtmp://127.0.0.1/%s?vhost=%s/%s", - _appname.c_str(), _vhost.c_str(), _streamname.c_str()); + + if (_vhost == DEF_VHOST) { + sprintf(url_sz, "rtmp://127.0.0.1/%s/%s", + _appname.c_str(), _streamname.c_str()); + } else { + sprintf(url_sz, "rtmp://127.0.0.1/%s?vhost=%s/%s", + _appname.c_str(), _vhost.c_str(), _streamname.c_str()); + } + _url = url_sz; _h264_sps_changed = false; @@ -148,11 +190,20 @@ void rtmp_client::close() { if (!_rtmp_conn_ptr) { return; } + srs_trace("rtmp client close url:%s", _url.c_str()); _rtmp_conn_ptr->close(); _rtmp_conn_ptr = nullptr; } +int64_t rtmp_client::get_last_live_ts() { + return _last_live_ts; +} + +std::string rtmp_client::get_url() { + return _url; +} + srs_error_t rtmp_client::connect() { srs_error_t err = srs_success; srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; @@ -366,6 +417,7 @@ srs_error_t rtmp_client::on_ts_video(std::shared_ptr avs_ptr, uint64_ if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) { return srs_error_wrap(err, "write frame"); } + _last_live_ts = now_ms(); } return err; @@ -417,6 +469,7 @@ srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_ if ((err = write_audio_raw_frame(frame, frame_size, &codec, dts)) != srs_success) { return srs_error_wrap(err, "write audio raw frame"); } + _last_live_ts = now_ms(); } return err; diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp index 040b832b7..c361e7cd0 100644 --- a/trunk/src/srt/srt_to_rtmp.hpp +++ b/trunk/src/srt/srt_to_rtmp.hpp @@ -30,13 +30,15 @@ public: ~rtmp_client(); void receive_ts_data(SRT_DATA_MSG_PTR data_ptr); - -private: - virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts); + int64_t get_last_live_ts(); + std::string get_url(); srs_error_t connect(); void close(); +private: + virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts); + private: srs_error_t on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts); srs_error_t on_ts_audio(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts); @@ -68,6 +70,7 @@ private: private: RTMP_CONN_PTR _rtmp_conn_ptr; bool _connect_flag; + int64_t _last_live_ts; }; typedef std::shared_ptr RTMP_CLIENT_PTR; @@ -87,6 +90,7 @@ private: SRT_DATA_MSG_PTR get_data_message(); virtual srs_error_t cycle(); void handle_ts_data(SRT_DATA_MSG_PTR data_ptr); + void check_rtmp_alive(); private: static std::shared_ptr s_srt2rtmp_ptr; @@ -96,6 +100,7 @@ private: std::queue _msg_queue; std::unordered_map _rtmp_client_map; + int64_t _lastcheck_ts; }; #endif \ No newline at end of file