From 1941f554756ddaca1752ab7d6927c34a2f33a3a3 Mon Sep 17 00:00:00 2001 From: runner365 Date: Wed, 5 Feb 2020 21:21:12 +0800 Subject: [PATCH 01/10] if there isn't srt connect, it needn't epoll wait --- trunk/src/srt/srt_handle.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index ea25b2c83..918ba4ff8 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -236,6 +236,10 @@ void srt_handle::onwork() add_newconn(msg.conn_ptr, msg.events); } + if (_conn_map.empty()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } check_alive(); ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num, @@ -243,7 +247,6 @@ void srt_handle::onwork() if (ret < 0) { srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld", ret, srt_now_ms); - std::this_thread::sleep_for(std::chrono::milliseconds(30)); continue; } From 1c6203bda27e2804cc32020ce8c841a38c5220ff Mon Sep 17 00:00:00 2001 From: runner365 Date: Sun, 9 Feb 2020 11:09:48 +0800 Subject: [PATCH 02/10] solve repush srt bugs --- trunk/src/srt/srt_data.cpp | 21 ++++++++++++--- trunk/src/srt/srt_data.hpp | 10 ++++++-- trunk/src/srt/srt_handle.cpp | 5 ++-- trunk/src/srt/srt_to_rtmp.cpp | 48 ++++++++++++++++++++++++++++++++--- trunk/src/srt/srt_to_rtmp.hpp | 2 ++ 5 files changed, 76 insertions(+), 10 deletions(-) diff --git a/trunk/src/srt/srt_data.cpp b/trunk/src/srt/srt_data.cpp index 075064b14..5fc766814 100644 --- a/trunk/src/srt/srt_data.cpp +++ b/trunk/src/srt/srt_data.cpp @@ -1,13 +1,22 @@ #include "srt_data.hpp" #include -SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path):_len(len) +SRT_DATA_MSG::SRT_DATA_MSG(const std::string& path, unsigned int msg_type):_msg_type(msg_type) + ,_len(0) + ,_data_p(nullptr) + ,_key_path(path) { + +} + +SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type) + ,_len(len) ,_key_path(path) { _data_p = new unsigned char[len]; memset(_data_p, 0, len); } -SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path):_len(len) +SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type):_msg_type(msg_type) + ,_len(len) ,_key_path(path) { _data_p = new unsigned char[len]; @@ -15,7 +24,13 @@ SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::s } SRT_DATA_MSG::~SRT_DATA_MSG() { - delete _data_p; + if (_data_p && (_len > 0)) { + delete _data_p; + } +} + +unsigned int SRT_DATA_MSG::msg_type() { + return _msg_type; } std::string SRT_DATA_MSG::get_path() { diff --git a/trunk/src/srt/srt_data.hpp b/trunk/src/srt/srt_data.hpp index ab9cf81ea..cc0d9604e 100644 --- a/trunk/src/srt/srt_data.hpp +++ b/trunk/src/srt/srt_data.hpp @@ -3,17 +3,23 @@ #include #include +#define SRT_MSG_DATA_TYPE 0x01 +#define SRT_MSG_CLOSE_TYPE 0x02 + class SRT_DATA_MSG { public: - SRT_DATA_MSG(unsigned int len, const std::string& path); - SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path); + SRT_DATA_MSG(const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE); + SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE); + SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE); ~SRT_DATA_MSG(); + unsigned int msg_type(); unsigned int data_len(); unsigned char* get_data(); std::string get_path(); private: + unsigned int _msg_type; unsigned int _len; unsigned char* _data_p; std::string _key_path; diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index 918ba4ff8..f2bc2ff8c 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -17,8 +17,8 @@ static bool MONITOR_STATICS_ENABLE = false; static long long MONITOR_TIMEOUT = 5000; const unsigned int DEF_DATA_SIZE = 188*7; -const long long CHECK_ALIVE_INTERVAL = 10*1000; -const long long CHECK_ALIVE_TIMEOUT = 15*1000; +const long long CHECK_ALIVE_INTERVAL = 5*1000; +const long long CHECK_ALIVE_TIMEOUT = 5*1000; long long srt_now_ms = 0; @@ -379,6 +379,7 @@ void srt_handle::close_push_conn(SRTSOCKET srtsocket) { _push_conn_map.erase(push_iter); } _conn_map.erase(iter); + srt2rtmp::get_instance()->insert_ctrl_message(SRT_MSG_CLOSE_TYPE, conn_ptr->get_subpath()); conn_ptr->close(); } diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp index e86a3c92d..f8e037af3 100644 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -61,6 +61,14 @@ void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, cons return; } +void srt2rtmp::insert_ctrl_message(unsigned int msg_type, const std::string& key_path) { + std::unique_lock locker(_mutex); + + SRT_DATA_MSG_PTR msg_ptr = std::make_shared(key_path, msg_type); + _msg_queue.push(msg_ptr); + //_notify_cond.notify_one(); + return; +} SRT_DATA_MSG_PTR srt2rtmp::get_data_message() { std::unique_lock locker(_mutex); SRT_DATA_MSG_PTR msg_ptr; @@ -79,8 +87,8 @@ SRT_DATA_MSG_PTR srt2rtmp::get_data_message() { } void srt2rtmp::check_rtmp_alive() { - const int64_t CHECK_INTERVAL = 15*1000; - const int64_t ALIVE_TIMEOUT_MAX = 20*1000; + const int64_t CHECK_INTERVAL = 5*1000; + const int64_t ALIVE_TIMEOUT_MAX = 5*1000; if (_lastcheck_ts == 0) { _lastcheck_ts = now_ms(); @@ -108,6 +116,22 @@ void srt2rtmp::check_rtmp_alive() { return; } +void srt2rtmp::handle_close_rtmpsession(const std::string& key_path) { + RTMP_CLIENT_PTR rtmp_ptr; + auto iter = _rtmp_client_map.find(key_path); + if (iter == _rtmp_client_map.end()) { + srs_error("fail to close rtmp session fail, can't find session by key_path:%s", + key_path.c_str()); + return; + } + rtmp_ptr = iter->second; + _rtmp_client_map.erase(iter); + srs_trace("close rtmp session which key_path is %s", key_path.c_str()); + rtmp_ptr->close(); + + return; +} + //the cycle is running in srs coroutine srs_error_t srt2rtmp::cycle() { srs_error_t err = srs_success; @@ -119,7 +143,25 @@ srs_error_t srt2rtmp::cycle() { if (!msg_ptr) { srs_usleep((30 * SRS_UTIME_MILLISECONDS)); } else { - handle_ts_data(msg_ptr); + switch (msg_ptr->msg_type()) { + case SRT_MSG_DATA_TYPE: + { + handle_ts_data(msg_ptr); + break; + } + case SRT_MSG_CLOSE_TYPE: + { + handle_close_rtmpsession(msg_ptr->get_path()); + break; + } + default: + { + srs_error("srt to rtmp get wrong message type(%u), path:%s", + msg_ptr->msg_type(), msg_ptr->get_path().c_str()); + assert(0); + } + } + } check_rtmp_alive(); if ((err = _trd_ptr->pull()) != srs_success) { diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp index c361e7cd0..08735c09d 100644 --- a/trunk/src/srt/srt_to_rtmp.hpp +++ b/trunk/src/srt/srt_to_rtmp.hpp @@ -85,11 +85,13 @@ public: void release(); void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path); + void insert_ctrl_message(unsigned int msg_type, const std::string& key_path); private: SRT_DATA_MSG_PTR get_data_message(); virtual srs_error_t cycle(); void handle_ts_data(SRT_DATA_MSG_PTR data_ptr); + void handle_close_rtmpsession(const std::string& key_path); void check_rtmp_alive(); private: From 4fdf242e996d78e2416bdb89d0851525c1da348a Mon Sep 17 00:00:00 2001 From: runner365 Date: Tue, 11 Feb 2020 20:37:42 +0800 Subject: [PATCH 03/10] change two thread to one thread --- trunk/src/srt/srt_handle.cpp | 108 +--------------------------------- trunk/src/srt/srt_handle.hpp | 35 +++-------- trunk/src/srt/srt_server.cpp | 61 +++++++++++-------- trunk/src/srt/srt_server.hpp | 8 ++- trunk/src/srt/srt_to_rtmp.cpp | 3 + 5 files changed, 54 insertions(+), 161 deletions(-) diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index f2bc2ff8c..adc6b95ec 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -22,7 +22,7 @@ const long long CHECK_ALIVE_TIMEOUT = 5*1000; long long srt_now_ms = 0; -srt_handle::srt_handle():_run_flag(false) +srt_handle::srt_handle(int pollid):_handle_pollid(pollid) ,_last_timestamp(0) ,_last_check_alive_ts(0) { } @@ -31,26 +31,6 @@ srt_handle::~srt_handle() { } -int srt_handle::start() { - _handle_pollid = srt_epoll_create(); - if (_handle_pollid < -1) { - srs_error("srt handle srt_epoll_create error, _handle_pollid=%d", _handle_pollid); - return -1; - } - - _run_flag = true; - srs_trace("srt handle is starting..."); - _work_thread_ptr = std::make_shared(&srt_handle::onwork, this); - - return 0; -} - -void srt_handle::stop() { - _run_flag = false; - _work_thread_ptr->join(); - return; -} - void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) { SRT_TRACEBSTATS mon; srt_bstats(srtsocket, &mon, 1); @@ -185,91 +165,6 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { return; } -int srt_handle::get_srt_mode(SRTSOCKET conn_srt_socket) { - auto iter = _conn_map.find(conn_srt_socket); - if (iter == _conn_map.end()) { - return 0; - } - return iter->second->get_mode(); -} - -void srt_handle::insert_message_queue(request_message_t msg) { - std::unique_lock lock(_queue_mutex); - _message_queue.push(msg); -} - -bool srt_handle::get_message_from_queue(request_message_t& msg) { - std::unique_lock lock(_queue_mutex); - bool ret = false; - - if (_message_queue.empty()) { - return ret; - } - ret = true; - msg = _message_queue.front(); - _message_queue.pop(); - - return ret; -} - -void srt_handle::onwork() -{ - const unsigned int SRT_FD_MAX = 1024; - SRT_SOCKSTATUS status = SRTS_INIT; - std::string streamid; - int ret; - const int64_t DEF_TIMEOUT_INTERVAL = 30; - - srs_trace("srt handle epoll work is starting..."); - while(_run_flag) - { - SRTSOCKET read_fds[SRT_FD_MAX]; - SRTSOCKET write_fds[SRT_FD_MAX]; - int rfd_num = SRT_FD_MAX; - int wfd_num = SRT_FD_MAX; - - srt_now_ms = now_ms(); - - request_message_t msg; - - if (get_message_from_queue(msg)) { - add_newconn(msg.conn_ptr, msg.events); - } - - if (_conn_map.empty()) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - continue; - } - check_alive(); - - ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num, - DEF_TIMEOUT_INTERVAL, 0, 0, 0, 0); - if (ret < 0) { - srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld", - ret, srt_now_ms); - continue; - } - - for (int index = 0; index < rfd_num; index++) - { - status = srt_getsockstate(read_fds[index]); - srs_info("srt handle read(push) rfd num:%d, status:%d, streamid:%s, read_fd", - rfd_num, status, streamid.c_str(), read_fds[index]); - handle_srt_socket(status, read_fds[index]); - } - - for (int index = 0; index < wfd_num; index++) - { - status = srt_getsockstate(write_fds[index]); - streamid = UDT::getstreamid(write_fds[index]); - srs_info("srt handle write(puller) wfd num:%d, status:%d, streamid:%s, write_fd", - wfd_num, status, streamid.c_str(), write_fds[index]); - handle_srt_socket(status, write_fds[index]); - } - - } -} - void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) { SRT_CONN_PTR srt_conn_ptr; unsigned char data[DEF_DATA_SIZE]; @@ -293,6 +188,7 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp close_push_conn(conn_fd); return; } + srs_trace("srt read data len:%d", ret); srt_conn_ptr->update_timestamp(srt_now_ms); srt2rtmp::get_instance()->insert_data_message(data, ret, subpath); diff --git a/trunk/src/srt/srt_handle.hpp b/trunk/src/srt/srt_handle.hpp index da555b714..d0ae14a78 100644 --- a/trunk/src/srt/srt_handle.hpp +++ b/trunk/src/srt/srt_handle.hpp @@ -12,33 +12,22 @@ #include "srt_conn.hpp" #include "srt_to_rtmp.hpp" -typedef struct { - SRT_CONN_PTR conn_ptr; - int events; -} request_message_t; - class srt_handle { public: - srt_handle(); + srt_handle(int pollid); ~srt_handle(); - int start();//create srt epoll and create epoll thread - void stop();//close srt epoll and end epoll thread - - void insert_message_queue(request_message_t msg); - bool get_message_from_queue(request_message_t& msg); - -private: //add new srt connection into epoll event void add_newconn(SRT_CONN_PTR conn_ptr, int events); - //get srt conn object by srt socket - SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket); - //get srt connect mode: push or pull - int get_srt_mode(SRTSOCKET conn_srt_socket); - - void onwork();//epoll thread loop //handle recv/send srt socket void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd); + //check srt connection whether it's still alive. + void check_alive(); + +private: + //get srt conn object by srt socket + SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket); + void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); @@ -51,27 +40,19 @@ private: bool add_new_pusher(SRT_CONN_PTR conn_ptr); //remove push connection and remove epoll void close_push_conn(SRTSOCKET srtsocket); - - //check srt connection whether it's still alive. - void check_alive(); //debug statics void debug_statics(SRTSOCKET srtsocket, const std::string& streamid); private: - bool _run_flag; int _handle_pollid; std::unordered_map _conn_map;//save all srt connection: pull or push - std::shared_ptr _work_thread_ptr; //save push srt connection for prevent from repeat push connection std::unordered_map _push_conn_map;//key:streamid, value:SRT_CONN_PTR //streamid, play map std::unordered_map> _streamid_map; - - std::mutex _queue_mutex; - std::queue _message_queue; long long _last_timestamp; long long _last_check_alive_ts; diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index 021f714ec..2e2e44dfc 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -15,10 +15,9 @@ #include #include -srt_server::srt_server(unsigned short port):listen_port(port) - ,server_socket(-1) +srt_server::srt_server(unsigned short port):_listen_port(port) + ,_server_socket(-1) { - handle_ptr = std::make_shared(); } srt_server::~srt_server() @@ -27,48 +26,49 @@ srt_server::~srt_server() } int srt_server::init_srt() { - if (server_socket != -1) { + if (_server_socket != -1) { return -1; } - server_socket = srt_create_socket(); + _server_socket = srt_create_socket(); sockaddr_in sa; memset(&sa, 0, sizeof sa); sa.sin_family = AF_INET; - sa.sin_port = htons(listen_port); + sa.sin_port = htons(_listen_port); sockaddr* psa = (sockaddr*)&sa; - int ret = srt_bind(server_socket, psa, sizeof(sa)); + int ret = srt_bind(_server_socket, psa, sizeof(sa)); if ( ret == SRT_ERROR ) { - srt_close(server_socket); + srt_close(_server_socket); srs_error("srt bind error: %d", ret); return -1; } - ret = srt_listen(server_socket, 5); + ret = srt_listen(_server_socket, 5); if (ret == SRT_ERROR) { - srt_close(server_socket); + srt_close(_server_socket); srs_error("srt listen error: %d", ret); return -2; } _pollid = srt_epoll_create(); if (_pollid < -1) { - srs_error("srt server srt_epoll_create error, port=%d", listen_port); + srs_error("srt server srt_epoll_create error, port=%d", _listen_port); return -1; } + _handle_ptr = std::make_shared(_pollid); int events = SRT_EPOLL_IN | SRT_EPOLL_ERR; - ret = srt_epoll_add_usock(_pollid, server_socket, &events); + ret = srt_epoll_add_usock(_pollid, _server_socket, &events); if (ret < 0) { srs_error("srt server run add epoll error:%d", ret); return ret; } - srs_trace("srt server listen port=%d, server_fd=%d", listen_port, server_socket); + srs_trace("srt server listen port=%d, server_fd=%d", _listen_port, _server_socket); return 0; } @@ -80,13 +80,9 @@ int srt_server::start() if ((ret = init_srt()) < 0) { return ret; } - ret = handle_ptr->start(); - if (ret < 0) { - return ret; - } run_flag = true; - srs_trace("srt server is starting... port(%d)", listen_port); + srs_trace("srt server is starting... port(%d)", _listen_port); thread_run_ptr = std::make_shared(&srt_server::on_work, this); return 0; } @@ -99,7 +95,6 @@ void srt_server::stop() } thread_run_ptr->join(); - handle_ptr->stop(); return; } @@ -146,8 +141,8 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd srt_conn_ptr->close(); return; } - request_message_t msg = {srt_conn_ptr, conn_event}; - handle_ptr->insert_message_queue(msg); + + _handle_ptr->add_newconn(srt_conn_ptr, conn_event); break; } case SRTS_CONNECTED: @@ -169,10 +164,16 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd } } +void srt_server::srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) { + srs_trace("status:%d, fd:%d, dscr:%s", status, input_fd, dscr.c_str()); + _handle_ptr->handle_srt_socket(status, input_fd); + return; +} + void srt_server::on_work() { const unsigned int SRT_FD_MAX = 100; - srs_trace("srt server is working port(%d)", listen_port); + srs_trace("srt server is working port(%d)", _listen_port); while (run_flag) { SRTSOCKET read_fds[SRT_FD_MAX]; @@ -185,17 +186,27 @@ void srt_server::on_work() if (ret < 0) { continue; } - srs_trace("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d", + _handle_ptr->check_alive(); + + srs_info("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d", ret, rfd_num, wfd_num); for (int index = 0; index < rfd_num; index++) { SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]); - srt_handle_connection(status, read_fds[index], "read fd"); + if (_server_socket == read_fds[index]) { + srt_handle_connection(status, read_fds[index], "read fd"); + } else { + srt_handle_data(status, read_fds[index], "read fd"); + } } for (int index = 0; index < wfd_num; index++) { SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]); - srt_handle_connection(status, read_fds[index], "write fd"); + if (_server_socket == write_fds[index]) { + srt_handle_connection(status, write_fds[index], "write fd"); + } else { + srt_handle_data(status, write_fds[index], "write fd"); + } } } } diff --git a/trunk/src/srt/srt_server.hpp b/trunk/src/srt/srt_server.hpp index 27bb3bdfb..1234b891a 100644 --- a/trunk/src/srt/srt_server.hpp +++ b/trunk/src/srt/srt_server.hpp @@ -25,14 +25,16 @@ private: void on_work(); //accept new srt connection void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); + //get srt data read/write + void srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); private: - unsigned short listen_port; - SRTSOCKET server_socket; + unsigned short _listen_port; + SRTSOCKET _server_socket; int _pollid; bool run_flag; std::shared_ptr thread_run_ptr; - std::shared_ptr handle_ptr; + std::shared_ptr _handle_ptr; }; typedef std::shared_ptr SRT_SERVER_PTR; diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp index f8e037af3..729e3e970 100644 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -220,6 +220,8 @@ rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) _h264_sps_changed = false; _h264_pps_changed = false; _h264_sps_pps_sent = false; + + _last_live_ts = now_ms(); srs_trace("rtmp client construct url:%s", url_sz); } @@ -251,6 +253,7 @@ srs_error_t rtmp_client::connect() { srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; + _last_live_ts = now_ms(); if (_connect_flag) { return srs_success; } From 548c918efbb2de5f9651aeb7e6500c85399b0c74 Mon Sep 17 00:00:00 2001 From: runner365 Date: Tue, 11 Feb 2020 21:36:36 +0800 Subject: [PATCH 04/10] mpegts discard header is not 0x47 --- trunk/src/srt/srt_handle.cpp | 2 +- trunk/src/srt/srt_server.cpp | 1 - trunk/src/srt/ts_demux.cpp | 6 +++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index adc6b95ec..dfba60cd1 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -188,7 +188,7 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp close_push_conn(conn_fd); return; } - srs_trace("srt read data len:%d", ret); + srt_conn_ptr->update_timestamp(srt_now_ms); srt2rtmp::get_instance()->insert_data_message(data, ret, subpath); diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index 2e2e44dfc..f34e37fed 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -165,7 +165,6 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd } void srt_server::srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) { - srs_trace("status:%d, fd:%d, dscr:%s", status, input_fd, dscr.c_str()); _handle_ptr->handle_srt_socket(status, input_fd); return; } diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp index 4c10f871b..13cee93ec 100644 --- a/trunk/src/srt/ts_demux.cpp +++ b/trunk/src/srt/ts_demux.cpp @@ -296,7 +296,11 @@ int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback) path = data_ptr->get_path(); for (unsigned int index = 0; index < count; index++) { - ret = decode_unit(data_ptr->get_data() + 188*index, path, callback); + unsigned char* data = data_ptr->get_data() + 188*index; + if (data[0] != 0x47) { + continue; + } + ret = decode_unit(data, path, callback); if (ret < 0) { break; From 1a1d0e9267cd563c9093bf494047724e94138bdd Mon Sep 17 00:00:00 2001 From: runner365 Date: Wed, 12 Feb 2020 09:54:07 +0800 Subject: [PATCH 05/10] add srt_epoll_clear_usocks --- trunk/src/srt/srt_server.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index f34e37fed..ae4918873 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -187,9 +187,6 @@ void srt_server::on_work() } _handle_ptr->check_alive(); - srs_info("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d", - ret, rfd_num, wfd_num); - for (int index = 0; index < rfd_num; index++) { SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]); if (_server_socket == read_fds[index]) { @@ -208,6 +205,7 @@ void srt_server::on_work() } } } + srt_epoll_clear_usocks(_pollid); } SrtServerAdapter::SrtServerAdapter() From c2012379afe04aec242fbd65556b828487a457f5 Mon Sep 17 00:00:00 2001 From: "Alex.CR" Date: Tue, 11 Feb 2020 20:23:39 -0600 Subject: [PATCH 06/10] change srt epoll thread to one thread (#1597) * if there isn't srt connect, it needn't epoll wait * solve repush srt bugs * change two thread to one thread * mpegts discard header is not 0x47 * add srt_epoll_clear_usocks --- trunk/src/srt/srt_handle.cpp | 108 +--------------------------------- trunk/src/srt/srt_handle.hpp | 35 +++-------- trunk/src/srt/srt_server.cpp | 60 +++++++++++-------- trunk/src/srt/srt_server.hpp | 8 ++- trunk/src/srt/srt_to_rtmp.cpp | 3 + trunk/src/srt/ts_demux.cpp | 6 +- 6 files changed, 57 insertions(+), 163 deletions(-) diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index f2bc2ff8c..dfba60cd1 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -22,7 +22,7 @@ const long long CHECK_ALIVE_TIMEOUT = 5*1000; long long srt_now_ms = 0; -srt_handle::srt_handle():_run_flag(false) +srt_handle::srt_handle(int pollid):_handle_pollid(pollid) ,_last_timestamp(0) ,_last_check_alive_ts(0) { } @@ -31,26 +31,6 @@ srt_handle::~srt_handle() { } -int srt_handle::start() { - _handle_pollid = srt_epoll_create(); - if (_handle_pollid < -1) { - srs_error("srt handle srt_epoll_create error, _handle_pollid=%d", _handle_pollid); - return -1; - } - - _run_flag = true; - srs_trace("srt handle is starting..."); - _work_thread_ptr = std::make_shared(&srt_handle::onwork, this); - - return 0; -} - -void srt_handle::stop() { - _run_flag = false; - _work_thread_ptr->join(); - return; -} - void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) { SRT_TRACEBSTATS mon; srt_bstats(srtsocket, &mon, 1); @@ -185,91 +165,6 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { return; } -int srt_handle::get_srt_mode(SRTSOCKET conn_srt_socket) { - auto iter = _conn_map.find(conn_srt_socket); - if (iter == _conn_map.end()) { - return 0; - } - return iter->second->get_mode(); -} - -void srt_handle::insert_message_queue(request_message_t msg) { - std::unique_lock lock(_queue_mutex); - _message_queue.push(msg); -} - -bool srt_handle::get_message_from_queue(request_message_t& msg) { - std::unique_lock lock(_queue_mutex); - bool ret = false; - - if (_message_queue.empty()) { - return ret; - } - ret = true; - msg = _message_queue.front(); - _message_queue.pop(); - - return ret; -} - -void srt_handle::onwork() -{ - const unsigned int SRT_FD_MAX = 1024; - SRT_SOCKSTATUS status = SRTS_INIT; - std::string streamid; - int ret; - const int64_t DEF_TIMEOUT_INTERVAL = 30; - - srs_trace("srt handle epoll work is starting..."); - while(_run_flag) - { - SRTSOCKET read_fds[SRT_FD_MAX]; - SRTSOCKET write_fds[SRT_FD_MAX]; - int rfd_num = SRT_FD_MAX; - int wfd_num = SRT_FD_MAX; - - srt_now_ms = now_ms(); - - request_message_t msg; - - if (get_message_from_queue(msg)) { - add_newconn(msg.conn_ptr, msg.events); - } - - if (_conn_map.empty()) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - continue; - } - check_alive(); - - ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num, - DEF_TIMEOUT_INTERVAL, 0, 0, 0, 0); - if (ret < 0) { - srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld", - ret, srt_now_ms); - continue; - } - - for (int index = 0; index < rfd_num; index++) - { - status = srt_getsockstate(read_fds[index]); - srs_info("srt handle read(push) rfd num:%d, status:%d, streamid:%s, read_fd", - rfd_num, status, streamid.c_str(), read_fds[index]); - handle_srt_socket(status, read_fds[index]); - } - - for (int index = 0; index < wfd_num; index++) - { - status = srt_getsockstate(write_fds[index]); - streamid = UDT::getstreamid(write_fds[index]); - srs_info("srt handle write(puller) wfd num:%d, status:%d, streamid:%s, write_fd", - wfd_num, status, streamid.c_str(), write_fds[index]); - handle_srt_socket(status, write_fds[index]); - } - - } -} - void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) { SRT_CONN_PTR srt_conn_ptr; unsigned char data[DEF_DATA_SIZE]; @@ -293,6 +188,7 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp close_push_conn(conn_fd); return; } + srt_conn_ptr->update_timestamp(srt_now_ms); srt2rtmp::get_instance()->insert_data_message(data, ret, subpath); diff --git a/trunk/src/srt/srt_handle.hpp b/trunk/src/srt/srt_handle.hpp index da555b714..d0ae14a78 100644 --- a/trunk/src/srt/srt_handle.hpp +++ b/trunk/src/srt/srt_handle.hpp @@ -12,33 +12,22 @@ #include "srt_conn.hpp" #include "srt_to_rtmp.hpp" -typedef struct { - SRT_CONN_PTR conn_ptr; - int events; -} request_message_t; - class srt_handle { public: - srt_handle(); + srt_handle(int pollid); ~srt_handle(); - int start();//create srt epoll and create epoll thread - void stop();//close srt epoll and end epoll thread - - void insert_message_queue(request_message_t msg); - bool get_message_from_queue(request_message_t& msg); - -private: //add new srt connection into epoll event void add_newconn(SRT_CONN_PTR conn_ptr, int events); - //get srt conn object by srt socket - SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket); - //get srt connect mode: push or pull - int get_srt_mode(SRTSOCKET conn_srt_socket); - - void onwork();//epoll thread loop //handle recv/send srt socket void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd); + //check srt connection whether it's still alive. + void check_alive(); + +private: + //get srt conn object by srt socket + SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket); + void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd); @@ -51,27 +40,19 @@ private: bool add_new_pusher(SRT_CONN_PTR conn_ptr); //remove push connection and remove epoll void close_push_conn(SRTSOCKET srtsocket); - - //check srt connection whether it's still alive. - void check_alive(); //debug statics void debug_statics(SRTSOCKET srtsocket, const std::string& streamid); private: - bool _run_flag; int _handle_pollid; std::unordered_map _conn_map;//save all srt connection: pull or push - std::shared_ptr _work_thread_ptr; //save push srt connection for prevent from repeat push connection std::unordered_map _push_conn_map;//key:streamid, value:SRT_CONN_PTR //streamid, play map std::unordered_map> _streamid_map; - - std::mutex _queue_mutex; - std::queue _message_queue; long long _last_timestamp; long long _last_check_alive_ts; diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index 021f714ec..ae4918873 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -15,10 +15,9 @@ #include #include -srt_server::srt_server(unsigned short port):listen_port(port) - ,server_socket(-1) +srt_server::srt_server(unsigned short port):_listen_port(port) + ,_server_socket(-1) { - handle_ptr = std::make_shared(); } srt_server::~srt_server() @@ -27,48 +26,49 @@ srt_server::~srt_server() } int srt_server::init_srt() { - if (server_socket != -1) { + if (_server_socket != -1) { return -1; } - server_socket = srt_create_socket(); + _server_socket = srt_create_socket(); sockaddr_in sa; memset(&sa, 0, sizeof sa); sa.sin_family = AF_INET; - sa.sin_port = htons(listen_port); + sa.sin_port = htons(_listen_port); sockaddr* psa = (sockaddr*)&sa; - int ret = srt_bind(server_socket, psa, sizeof(sa)); + int ret = srt_bind(_server_socket, psa, sizeof(sa)); if ( ret == SRT_ERROR ) { - srt_close(server_socket); + srt_close(_server_socket); srs_error("srt bind error: %d", ret); return -1; } - ret = srt_listen(server_socket, 5); + ret = srt_listen(_server_socket, 5); if (ret == SRT_ERROR) { - srt_close(server_socket); + srt_close(_server_socket); srs_error("srt listen error: %d", ret); return -2; } _pollid = srt_epoll_create(); if (_pollid < -1) { - srs_error("srt server srt_epoll_create error, port=%d", listen_port); + srs_error("srt server srt_epoll_create error, port=%d", _listen_port); return -1; } + _handle_ptr = std::make_shared(_pollid); int events = SRT_EPOLL_IN | SRT_EPOLL_ERR; - ret = srt_epoll_add_usock(_pollid, server_socket, &events); + ret = srt_epoll_add_usock(_pollid, _server_socket, &events); if (ret < 0) { srs_error("srt server run add epoll error:%d", ret); return ret; } - srs_trace("srt server listen port=%d, server_fd=%d", listen_port, server_socket); + srs_trace("srt server listen port=%d, server_fd=%d", _listen_port, _server_socket); return 0; } @@ -80,13 +80,9 @@ int srt_server::start() if ((ret = init_srt()) < 0) { return ret; } - ret = handle_ptr->start(); - if (ret < 0) { - return ret; - } run_flag = true; - srs_trace("srt server is starting... port(%d)", listen_port); + srs_trace("srt server is starting... port(%d)", _listen_port); thread_run_ptr = std::make_shared(&srt_server::on_work, this); return 0; } @@ -99,7 +95,6 @@ void srt_server::stop() } thread_run_ptr->join(); - handle_ptr->stop(); return; } @@ -146,8 +141,8 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd srt_conn_ptr->close(); return; } - request_message_t msg = {srt_conn_ptr, conn_event}; - handle_ptr->insert_message_queue(msg); + + _handle_ptr->add_newconn(srt_conn_ptr, conn_event); break; } case SRTS_CONNECTED: @@ -169,10 +164,15 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd } } +void srt_server::srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) { + _handle_ptr->handle_srt_socket(status, input_fd); + return; +} + void srt_server::on_work() { const unsigned int SRT_FD_MAX = 100; - srs_trace("srt server is working port(%d)", listen_port); + srs_trace("srt server is working port(%d)", _listen_port); while (run_flag) { SRTSOCKET read_fds[SRT_FD_MAX]; @@ -185,19 +185,27 @@ void srt_server::on_work() if (ret < 0) { continue; } - srs_trace("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d", - ret, rfd_num, wfd_num); + _handle_ptr->check_alive(); for (int index = 0; index < rfd_num; index++) { SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]); - srt_handle_connection(status, read_fds[index], "read fd"); + if (_server_socket == read_fds[index]) { + srt_handle_connection(status, read_fds[index], "read fd"); + } else { + srt_handle_data(status, read_fds[index], "read fd"); + } } for (int index = 0; index < wfd_num; index++) { SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]); - srt_handle_connection(status, read_fds[index], "write fd"); + if (_server_socket == write_fds[index]) { + srt_handle_connection(status, write_fds[index], "write fd"); + } else { + srt_handle_data(status, write_fds[index], "write fd"); + } } } + srt_epoll_clear_usocks(_pollid); } SrtServerAdapter::SrtServerAdapter() diff --git a/trunk/src/srt/srt_server.hpp b/trunk/src/srt/srt_server.hpp index 27bb3bdfb..1234b891a 100644 --- a/trunk/src/srt/srt_server.hpp +++ b/trunk/src/srt/srt_server.hpp @@ -25,14 +25,16 @@ private: void on_work(); //accept new srt connection void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); + //get srt data read/write + void srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr); private: - unsigned short listen_port; - SRTSOCKET server_socket; + unsigned short _listen_port; + SRTSOCKET _server_socket; int _pollid; bool run_flag; std::shared_ptr thread_run_ptr; - std::shared_ptr handle_ptr; + std::shared_ptr _handle_ptr; }; typedef std::shared_ptr SRT_SERVER_PTR; diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp index f8e037af3..729e3e970 100644 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -220,6 +220,8 @@ rtmp_client::rtmp_client(std::string key_path):_key_path(key_path) _h264_sps_changed = false; _h264_pps_changed = false; _h264_sps_pps_sent = false; + + _last_live_ts = now_ms(); srs_trace("rtmp client construct url:%s", url_sz); } @@ -251,6 +253,7 @@ srs_error_t rtmp_client::connect() { srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; + _last_live_ts = now_ms(); if (_connect_flag) { return srs_success; } diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp index 4c10f871b..13cee93ec 100644 --- a/trunk/src/srt/ts_demux.cpp +++ b/trunk/src/srt/ts_demux.cpp @@ -296,7 +296,11 @@ int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback) path = data_ptr->get_path(); for (unsigned int index = 0; index < count; index++) { - ret = decode_unit(data_ptr->get_data() + 188*index, path, callback); + unsigned char* data = data_ptr->get_data() + 188*index; + if (data[0] != 0x47) { + continue; + } + ret = decode_unit(data, path, callback); if (ret < 0) { break; From 24f286684e40a258e1370f25a3ae2904ddb3ef0f Mon Sep 17 00:00:00 2001 From: runner365 Date: Wed, 12 Feb 2020 20:02:55 +0800 Subject: [PATCH 07/10] add srt parameter configure --- trunk/conf/srt.conf | 5 ++ trunk/src/app/srs_app_config.cpp | 145 ++++++++++++++++++++++++++++++- trunk/src/app/srs_app_config.hpp | 20 +++++ trunk/src/srt/srt_handle.cpp | 11 +-- trunk/src/srt/srt_server.cpp | 50 +++++++++++ trunk/src/srt/srt_server.hpp | 2 + 6 files changed, 227 insertions(+), 6 deletions(-) diff --git a/trunk/conf/srt.conf b/trunk/conf/srt.conf index d648b16b3..b9fcc9d32 100644 --- a/trunk/conf/srt.conf +++ b/trunk/conf/srt.conf @@ -9,6 +9,7 @@ http_api { enabled on; listen 1985; } + http_server { enabled on; listen 8080; @@ -18,6 +19,10 @@ http_server { srt_server { enabled on; listen 10080; + maxbw 1000000000; + connect_timeout 4000; + peerlatency 300; + recvlatency 300; } # @doc https://github.com/ossrs/srs/issues/1147#issuecomment-577607026 diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 585e1d6f0..1cea608db 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3525,7 +3525,10 @@ srs_error_t SrsConfig::check_normal_config() SrsConfDirective* conf = root->get("srt_server"); for (int i = 0; conf && i < (int)conf->directives.size(); i++) { string n = conf->at(i)->name; - if (n != "enabled" && n != "listen") { + if (n != "enabled" && n != "listen" && n != "maxbw" + && n != "mss" && n != "latency" && n != "recvlatency" + && n != "peerlatency" && n != "tlpkdrop" && n != "connect_timeout" + && n != "sendbuf" && n != "recvbuf" && n != "payloadsize") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal srt_stream.%s", n.c_str()); } } @@ -6701,6 +6704,146 @@ unsigned short SrsConfig::get_srt_listen_port() return (unsigned short)atoi(conf->arg0().c_str()); } +int SrsConfig::get_srto_maxbw() { + static int64_t DEFAULT = -1; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("maxbw"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_mss() { + static int DEFAULT = 1500; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("mms"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_latency() { + static int DEFAULT = 120; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("latency"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_recv_latency() { + static int DEFAULT = 120; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("recvlatency"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_peer_latency() { + static int DEFAULT = 120; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("peerlatency"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +bool SrsConfig::get_srto_tlpkdrop() { + static bool DEFAULT = true; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("tlpkdrop"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + +int SrsConfig::get_srto_conntimeout() { + static int DEFAULT = 3000; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("connect_timeout"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_sendbuf() { + static int64_t DEFAULT = 8192 * (1500-28); + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("sendbuf"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_recvbuf() { + static int64_t DEFAULT = 8192 * (1500-28); + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("recvbuf"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_payloadsize() { + static int DEFAULT = 1316; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("payloadsize"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + bool SrsConfig::get_http_stream_enabled() { SrsConfDirective* conf = root->get("http_server"); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 197c5decb..90a57e0c3 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -595,6 +595,26 @@ public: virtual bool get_srt_enabled(); // Get the srt service listen port virtual unsigned short get_srt_listen_port(); + // Get the srt SRTO_MAXBW, max bandwith, default is -1. + virtual int get_srto_maxbw(); + // Get the srt SRTO_MSS, Maximum Segment Size, default is 1500. + virtual int get_srto_mss(); + // Get the srt SRTO_LATENCY, latency, default is 0 which means peer/recv latency is 120ms. + virtual int get_srto_latency(); + // Get the srt SRTO_RCVLATENCY, recv latency, default is 120ms. + virtual int get_srto_recv_latency(); + // Get the srt SRTO_PEERLATENCY, peer latency, default is 0.. + virtual int get_srto_peer_latency(); + // Get the srt SRTO_TLPKDROP, Too-late Packet Drop, default is true. + virtual bool get_srto_tlpkdrop(); + // Get the srt SRTO_CONNTIMEO, connection timeout, default is 3000ms. + virtual int get_srto_conntimeout(); + // Get the srt SRTO_SNDBUF, send buffer, default is 8192 × (1500-28). + virtual int get_srto_sendbuf(); + // Get the srt SRTO_RCVBUF, recv buffer, default is 8192 × (1500-28). + virtual int get_srto_recvbuf(); + // SRTO_PAYLOADSIZE + virtual int get_srto_payloadsize(); // http_hooks section private: diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index dfba60cd1..4592785e3 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -131,13 +131,14 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { int val_i; int opt_len = sizeof(int); - val_i = 1000; - srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, opt_len); - val_i = 2048; - srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, opt_len); - srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len); srs_trace("srto SRTO_LATENCY=%d", val_i); + + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_PEERLATENCY, &val_i, &opt_len); + srs_trace("srto SRTO_PEERLATENCY=%d", val_i); + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVLATENCY, &val_i, &opt_len); + srs_trace("srto SRTO_RCVLATENCY=%d", val_i); + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len); srs_trace("srto SRTO_SNDBUF=%d", val_i); srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len); diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index ae4918873..06660135c 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -25,6 +25,54 @@ srt_server::~srt_server() } +int srt_server::init_srt_parameter() { + const int DEF_LATENCY = 188*7; + + int opt_len = sizeof(int); + + if (_server_socket == -1) { + return -1; + } + int maxbw = _srs_config->get_srto_maxbw(); + srt_setsockopt(_server_socket, 0, SRTO_MAXBW, &maxbw, opt_len); + int mss = _srs_config->get_srto_mss(); + srt_setsockopt(_server_socket, 0, SRTO_MSS, &mss, opt_len); + + bool tlpkdrop = _srs_config->get_srto_tlpkdrop(); + int tlpkdrop_i = tlpkdrop ? 1 : 0; + srt_setsockopt(_server_socket, 0, SRTO_TLPKTDROP, &tlpkdrop_i, opt_len); + + int connection_timeout = _srs_config->get_srto_conntimeout(); + srt_setsockopt(_server_socket, 0, SRTO_CONNTIMEO, &connection_timeout, opt_len); + + int send_buff = _srs_config->get_srto_sendbuf(); + srt_setsockopt(_server_socket, 0, SRTO_SNDBUF, &send_buff, opt_len); + int recv_buff = _srs_config->get_srto_recvbuf(); + srt_setsockopt(_server_socket, 0, SRTO_RCVBUF, &recv_buff, opt_len); + int payload_size = _srs_config->get_srto_payloadsize(); + srt_setsockopt(_server_socket, 0, SRTO_PAYLOADSIZE, &payload_size, opt_len); + + int latency = _srs_config->get_srto_latency(); + if (DEF_LATENCY != latency) { + srt_setsockopt(_server_socket, 0, SRTO_LATENCY, &latency, opt_len); + } + + int recv_latency = _srs_config->get_srto_recv_latency(); + if (DEF_LATENCY != recv_latency) { + srt_setsockopt(_server_socket, 0, SRTO_RCVLATENCY, &recv_latency, opt_len); + } + + int peer_latency = _srs_config->get_srto_peer_latency(); + if (DEF_LATENCY != peer_latency) { + srt_setsockopt(_server_socket, 0, SRTO_PEERLATENCY, &recv_latency, opt_len); + } + + srs_trace("init srt parameter, maxbw:%d, mss:%d, tlpkdrop:%d, connect timeout:%d, \ +send buff:%d, recv buff:%d, payload size:%d, latency:%d, recv latency:%d, peer latency:%d", + maxbw, mss, tlpkdrop, connection_timeout, send_buff, recv_buff, payload_size, + latency, recv_latency, peer_latency); + return 0; +} int srt_server::init_srt() { if (_server_socket != -1) { return -1; @@ -54,6 +102,8 @@ int srt_server::init_srt() { return -2; } + init_srt_parameter(); + _pollid = srt_epoll_create(); if (_pollid < -1) { srs_error("srt server srt_epoll_create error, port=%d", _listen_port); diff --git a/trunk/src/srt/srt_server.hpp b/trunk/src/srt/srt_server.hpp index 1234b891a..0ef244b4b 100644 --- a/trunk/src/srt/srt_server.hpp +++ b/trunk/src/srt/srt_server.hpp @@ -21,6 +21,8 @@ public: private: //init srt socket and srt epoll int init_srt(); + int init_srt_parameter(); + //srt main epoll loop void on_work(); //accept new srt connection From 5f7d23f12345a4ade34b1a6c7b1de11a6e819960 Mon Sep 17 00:00:00 2001 From: "Alex.CR" Date: Wed, 12 Feb 2020 07:03:27 -0600 Subject: [PATCH 08/10] add srt parameter configure (#1599) * if there isn't srt connect, it needn't epoll wait * solve repush srt bugs * change two thread to one thread * mpegts discard header is not 0x47 * add srt_epoll_clear_usocks * add srt parameter configure --- trunk/conf/srt.conf | 5 ++ trunk/src/app/srs_app_config.cpp | 145 ++++++++++++++++++++++++++++++- trunk/src/app/srs_app_config.hpp | 20 +++++ trunk/src/srt/srt_handle.cpp | 11 +-- trunk/src/srt/srt_server.cpp | 50 +++++++++++ trunk/src/srt/srt_server.hpp | 2 + 6 files changed, 227 insertions(+), 6 deletions(-) diff --git a/trunk/conf/srt.conf b/trunk/conf/srt.conf index d648b16b3..b9fcc9d32 100644 --- a/trunk/conf/srt.conf +++ b/trunk/conf/srt.conf @@ -9,6 +9,7 @@ http_api { enabled on; listen 1985; } + http_server { enabled on; listen 8080; @@ -18,6 +19,10 @@ http_server { srt_server { enabled on; listen 10080; + maxbw 1000000000; + connect_timeout 4000; + peerlatency 300; + recvlatency 300; } # @doc https://github.com/ossrs/srs/issues/1147#issuecomment-577607026 diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 585e1d6f0..1cea608db 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3525,7 +3525,10 @@ srs_error_t SrsConfig::check_normal_config() SrsConfDirective* conf = root->get("srt_server"); for (int i = 0; conf && i < (int)conf->directives.size(); i++) { string n = conf->at(i)->name; - if (n != "enabled" && n != "listen") { + if (n != "enabled" && n != "listen" && n != "maxbw" + && n != "mss" && n != "latency" && n != "recvlatency" + && n != "peerlatency" && n != "tlpkdrop" && n != "connect_timeout" + && n != "sendbuf" && n != "recvbuf" && n != "payloadsize") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal srt_stream.%s", n.c_str()); } } @@ -6701,6 +6704,146 @@ unsigned short SrsConfig::get_srt_listen_port() return (unsigned short)atoi(conf->arg0().c_str()); } +int SrsConfig::get_srto_maxbw() { + static int64_t DEFAULT = -1; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("maxbw"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_mss() { + static int DEFAULT = 1500; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("mms"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_latency() { + static int DEFAULT = 120; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("latency"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_recv_latency() { + static int DEFAULT = 120; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("recvlatency"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_peer_latency() { + static int DEFAULT = 120; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("peerlatency"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +bool SrsConfig::get_srto_tlpkdrop() { + static bool DEFAULT = true; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("tlpkdrop"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + +int SrsConfig::get_srto_conntimeout() { + static int DEFAULT = 3000; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("connect_timeout"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_sendbuf() { + static int64_t DEFAULT = 8192 * (1500-28); + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("sendbuf"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_recvbuf() { + static int64_t DEFAULT = 8192 * (1500-28); + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("recvbuf"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_srto_payloadsize() { + static int DEFAULT = 1316; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("payloadsize"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return atoi(conf->arg0().c_str()); +} + bool SrsConfig::get_http_stream_enabled() { SrsConfDirective* conf = root->get("http_server"); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 197c5decb..90a57e0c3 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -595,6 +595,26 @@ public: virtual bool get_srt_enabled(); // Get the srt service listen port virtual unsigned short get_srt_listen_port(); + // Get the srt SRTO_MAXBW, max bandwith, default is -1. + virtual int get_srto_maxbw(); + // Get the srt SRTO_MSS, Maximum Segment Size, default is 1500. + virtual int get_srto_mss(); + // Get the srt SRTO_LATENCY, latency, default is 0 which means peer/recv latency is 120ms. + virtual int get_srto_latency(); + // Get the srt SRTO_RCVLATENCY, recv latency, default is 120ms. + virtual int get_srto_recv_latency(); + // Get the srt SRTO_PEERLATENCY, peer latency, default is 0.. + virtual int get_srto_peer_latency(); + // Get the srt SRTO_TLPKDROP, Too-late Packet Drop, default is true. + virtual bool get_srto_tlpkdrop(); + // Get the srt SRTO_CONNTIMEO, connection timeout, default is 3000ms. + virtual int get_srto_conntimeout(); + // Get the srt SRTO_SNDBUF, send buffer, default is 8192 × (1500-28). + virtual int get_srto_sendbuf(); + // Get the srt SRTO_RCVBUF, recv buffer, default is 8192 × (1500-28). + virtual int get_srto_recvbuf(); + // SRTO_PAYLOADSIZE + virtual int get_srto_payloadsize(); // http_hooks section private: diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index dfba60cd1..4592785e3 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -131,13 +131,14 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { int val_i; int opt_len = sizeof(int); - val_i = 1000; - srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, opt_len); - val_i = 2048; - srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, opt_len); - srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len); srs_trace("srto SRTO_LATENCY=%d", val_i); + + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_PEERLATENCY, &val_i, &opt_len); + srs_trace("srto SRTO_PEERLATENCY=%d", val_i); + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVLATENCY, &val_i, &opt_len); + srs_trace("srto SRTO_RCVLATENCY=%d", val_i); + srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len); srs_trace("srto SRTO_SNDBUF=%d", val_i); srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len); diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index ae4918873..06660135c 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -25,6 +25,54 @@ srt_server::~srt_server() } +int srt_server::init_srt_parameter() { + const int DEF_LATENCY = 188*7; + + int opt_len = sizeof(int); + + if (_server_socket == -1) { + return -1; + } + int maxbw = _srs_config->get_srto_maxbw(); + srt_setsockopt(_server_socket, 0, SRTO_MAXBW, &maxbw, opt_len); + int mss = _srs_config->get_srto_mss(); + srt_setsockopt(_server_socket, 0, SRTO_MSS, &mss, opt_len); + + bool tlpkdrop = _srs_config->get_srto_tlpkdrop(); + int tlpkdrop_i = tlpkdrop ? 1 : 0; + srt_setsockopt(_server_socket, 0, SRTO_TLPKTDROP, &tlpkdrop_i, opt_len); + + int connection_timeout = _srs_config->get_srto_conntimeout(); + srt_setsockopt(_server_socket, 0, SRTO_CONNTIMEO, &connection_timeout, opt_len); + + int send_buff = _srs_config->get_srto_sendbuf(); + srt_setsockopt(_server_socket, 0, SRTO_SNDBUF, &send_buff, opt_len); + int recv_buff = _srs_config->get_srto_recvbuf(); + srt_setsockopt(_server_socket, 0, SRTO_RCVBUF, &recv_buff, opt_len); + int payload_size = _srs_config->get_srto_payloadsize(); + srt_setsockopt(_server_socket, 0, SRTO_PAYLOADSIZE, &payload_size, opt_len); + + int latency = _srs_config->get_srto_latency(); + if (DEF_LATENCY != latency) { + srt_setsockopt(_server_socket, 0, SRTO_LATENCY, &latency, opt_len); + } + + int recv_latency = _srs_config->get_srto_recv_latency(); + if (DEF_LATENCY != recv_latency) { + srt_setsockopt(_server_socket, 0, SRTO_RCVLATENCY, &recv_latency, opt_len); + } + + int peer_latency = _srs_config->get_srto_peer_latency(); + if (DEF_LATENCY != peer_latency) { + srt_setsockopt(_server_socket, 0, SRTO_PEERLATENCY, &recv_latency, opt_len); + } + + srs_trace("init srt parameter, maxbw:%d, mss:%d, tlpkdrop:%d, connect timeout:%d, \ +send buff:%d, recv buff:%d, payload size:%d, latency:%d, recv latency:%d, peer latency:%d", + maxbw, mss, tlpkdrop, connection_timeout, send_buff, recv_buff, payload_size, + latency, recv_latency, peer_latency); + return 0; +} int srt_server::init_srt() { if (_server_socket != -1) { return -1; @@ -54,6 +102,8 @@ int srt_server::init_srt() { return -2; } + init_srt_parameter(); + _pollid = srt_epoll_create(); if (_pollid < -1) { srs_error("srt server srt_epoll_create error, port=%d", _listen_port); diff --git a/trunk/src/srt/srt_server.hpp b/trunk/src/srt/srt_server.hpp index 1234b891a..0ef244b4b 100644 --- a/trunk/src/srt/srt_server.hpp +++ b/trunk/src/srt/srt_server.hpp @@ -21,6 +21,8 @@ public: private: //init srt socket and srt epoll int init_srt(); + int init_srt_parameter(); + //srt main epoll loop void on_work(); //accept new srt connection From d8419bd4dd359aac3742ffd94bea0832907d65da Mon Sep 17 00:00:00 2001 From: runner365 Date: Thu, 13 Feb 2020 09:58:14 +0800 Subject: [PATCH 09/10] solve latency parameter error --- trunk/src/app/srs_app_config.cpp | 2 +- trunk/src/srt/srt_server.cpp | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 1cea608db..7a9f86a0a 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -6761,7 +6761,7 @@ int SrsConfig::get_srto_recv_latency() { } int SrsConfig::get_srto_peer_latency() { - static int DEFAULT = 120; + static int DEFAULT = 0; SrsConfDirective* conf = root->get("srt_server"); if (!conf) { return DEFAULT; diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index 06660135c..3a17a0a8b 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -26,7 +26,8 @@ srt_server::~srt_server() } int srt_server::init_srt_parameter() { - const int DEF_LATENCY = 188*7; + const int DEF_RECV_LATENCY = 120; + const int DEF_PEER_LATENCY = 0; int opt_len = sizeof(int); @@ -53,17 +54,17 @@ int srt_server::init_srt_parameter() { srt_setsockopt(_server_socket, 0, SRTO_PAYLOADSIZE, &payload_size, opt_len); int latency = _srs_config->get_srto_latency(); - if (DEF_LATENCY != latency) { + if (DEF_RECV_LATENCY != latency) { srt_setsockopt(_server_socket, 0, SRTO_LATENCY, &latency, opt_len); } int recv_latency = _srs_config->get_srto_recv_latency(); - if (DEF_LATENCY != recv_latency) { + if (DEF_RECV_LATENCY != recv_latency) { srt_setsockopt(_server_socket, 0, SRTO_RCVLATENCY, &recv_latency, opt_len); } int peer_latency = _srs_config->get_srto_peer_latency(); - if (DEF_LATENCY != peer_latency) { + if (DEF_PEER_LATENCY != peer_latency) { srt_setsockopt(_server_socket, 0, SRTO_PEERLATENCY, &recv_latency, opt_len); } From 9dad53691ed2c7c0c01bd33108385a7198ed5279 Mon Sep 17 00:00:00 2001 From: "Alex.CR" Date: Wed, 12 Feb 2020 20:54:24 -0600 Subject: [PATCH 10/10] solve latency parameter error (#1600) * solve latency parameter error --- trunk/src/app/srs_app_config.cpp | 2 +- trunk/src/srt/srt_server.cpp | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 1cea608db..7a9f86a0a 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -6761,7 +6761,7 @@ int SrsConfig::get_srto_recv_latency() { } int SrsConfig::get_srto_peer_latency() { - static int DEFAULT = 120; + static int DEFAULT = 0; SrsConfDirective* conf = root->get("srt_server"); if (!conf) { return DEFAULT; diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp index 06660135c..3a17a0a8b 100644 --- a/trunk/src/srt/srt_server.cpp +++ b/trunk/src/srt/srt_server.cpp @@ -26,7 +26,8 @@ srt_server::~srt_server() } int srt_server::init_srt_parameter() { - const int DEF_LATENCY = 188*7; + const int DEF_RECV_LATENCY = 120; + const int DEF_PEER_LATENCY = 0; int opt_len = sizeof(int); @@ -53,17 +54,17 @@ int srt_server::init_srt_parameter() { srt_setsockopt(_server_socket, 0, SRTO_PAYLOADSIZE, &payload_size, opt_len); int latency = _srs_config->get_srto_latency(); - if (DEF_LATENCY != latency) { + if (DEF_RECV_LATENCY != latency) { srt_setsockopt(_server_socket, 0, SRTO_LATENCY, &latency, opt_len); } int recv_latency = _srs_config->get_srto_recv_latency(); - if (DEF_LATENCY != recv_latency) { + if (DEF_RECV_LATENCY != recv_latency) { srt_setsockopt(_server_socket, 0, SRTO_RCVLATENCY, &recv_latency, opt_len); } int peer_latency = _srs_config->get_srto_peer_latency(); - if (DEF_LATENCY != peer_latency) { + if (DEF_PEER_LATENCY != peer_latency) { srt_setsockopt(_server_socket, 0, SRTO_PEERLATENCY, &recv_latency, opt_len); }