mirror of
https://github.com/ossrs/srs.git
synced 2025-02-15 04:42:04 +00:00
Merge branch 'develop' into min
This commit is contained in:
commit
0ba9ea6e1c
9 changed files with 285 additions and 169 deletions
|
@ -9,6 +9,7 @@ http_api {
|
||||||
enabled on;
|
enabled on;
|
||||||
listen 1985;
|
listen 1985;
|
||||||
}
|
}
|
||||||
|
|
||||||
http_server {
|
http_server {
|
||||||
enabled on;
|
enabled on;
|
||||||
listen 8080;
|
listen 8080;
|
||||||
|
@ -18,6 +19,10 @@ http_server {
|
||||||
srt_server {
|
srt_server {
|
||||||
enabled on;
|
enabled on;
|
||||||
listen 10080;
|
listen 10080;
|
||||||
|
maxbw 1000000000;
|
||||||
|
connect_timeout 4000;
|
||||||
|
peerlatency 300;
|
||||||
|
recvlatency 300;
|
||||||
}
|
}
|
||||||
|
|
||||||
# @doc https://github.com/ossrs/srs/issues/1147#issuecomment-577607026
|
# @doc https://github.com/ossrs/srs/issues/1147#issuecomment-577607026
|
||||||
|
|
|
@ -3525,7 +3525,10 @@ srs_error_t SrsConfig::check_normal_config()
|
||||||
SrsConfDirective* conf = root->get("srt_server");
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
|
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
|
||||||
string n = conf->at(i)->name;
|
string n = conf->at(i)->name;
|
||||||
if (n != "enabled" && n != "listen") {
|
if (n != "enabled" && n != "listen" && n != "maxbw"
|
||||||
|
&& n != "mss" && n != "latency" && n != "recvlatency"
|
||||||
|
&& n != "peerlatency" && n != "tlpkdrop" && n != "connect_timeout"
|
||||||
|
&& n != "sendbuf" && n != "recvbuf" && n != "payloadsize") {
|
||||||
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal srt_stream.%s", n.c_str());
|
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal srt_stream.%s", n.c_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6701,6 +6704,146 @@ unsigned short SrsConfig::get_srt_listen_port()
|
||||||
return (unsigned short)atoi(conf->arg0().c_str());
|
return (unsigned short)atoi(conf->arg0().c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int SrsConfig::get_srto_maxbw() {
|
||||||
|
static int64_t DEFAULT = -1;
|
||||||
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
|
if (!conf) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = conf->get("maxbw");
|
||||||
|
if (!conf || conf->arg0().empty()) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
return atoi(conf->arg0().c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
int SrsConfig::get_srto_mss() {
|
||||||
|
static int DEFAULT = 1500;
|
||||||
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
|
if (!conf) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = conf->get("mms");
|
||||||
|
if (!conf || conf->arg0().empty()) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
return atoi(conf->arg0().c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
int SrsConfig::get_srto_latency() {
|
||||||
|
static int DEFAULT = 120;
|
||||||
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
|
if (!conf) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = conf->get("latency");
|
||||||
|
if (!conf || conf->arg0().empty()) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
return atoi(conf->arg0().c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
int SrsConfig::get_srto_recv_latency() {
|
||||||
|
static int DEFAULT = 120;
|
||||||
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
|
if (!conf) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = conf->get("recvlatency");
|
||||||
|
if (!conf || conf->arg0().empty()) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
return atoi(conf->arg0().c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
int SrsConfig::get_srto_peer_latency() {
|
||||||
|
static int DEFAULT = 0;
|
||||||
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
|
if (!conf) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = conf->get("peerlatency");
|
||||||
|
if (!conf || conf->arg0().empty()) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
return atoi(conf->arg0().c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
bool SrsConfig::get_srto_tlpkdrop() {
|
||||||
|
static bool DEFAULT = true;
|
||||||
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
|
if (!conf) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = conf->get("tlpkdrop");
|
||||||
|
if (!conf || conf->arg0().empty()) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
return SRS_CONF_PERFER_TRUE(conf->arg0());
|
||||||
|
}
|
||||||
|
|
||||||
|
int SrsConfig::get_srto_conntimeout() {
|
||||||
|
static int DEFAULT = 3000;
|
||||||
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
|
if (!conf) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = conf->get("connect_timeout");
|
||||||
|
if (!conf || conf->arg0().empty()) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
return atoi(conf->arg0().c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
int SrsConfig::get_srto_sendbuf() {
|
||||||
|
static int64_t DEFAULT = 8192 * (1500-28);
|
||||||
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
|
if (!conf) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = conf->get("sendbuf");
|
||||||
|
if (!conf || conf->arg0().empty()) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
return atoi(conf->arg0().c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
int SrsConfig::get_srto_recvbuf() {
|
||||||
|
static int64_t DEFAULT = 8192 * (1500-28);
|
||||||
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
|
if (!conf) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = conf->get("recvbuf");
|
||||||
|
if (!conf || conf->arg0().empty()) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
return atoi(conf->arg0().c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
int SrsConfig::get_srto_payloadsize() {
|
||||||
|
static int DEFAULT = 1316;
|
||||||
|
SrsConfDirective* conf = root->get("srt_server");
|
||||||
|
if (!conf) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
conf = conf->get("payloadsize");
|
||||||
|
if (!conf || conf->arg0().empty()) {
|
||||||
|
return DEFAULT;
|
||||||
|
}
|
||||||
|
return atoi(conf->arg0().c_str());
|
||||||
|
}
|
||||||
|
|
||||||
bool SrsConfig::get_http_stream_enabled()
|
bool SrsConfig::get_http_stream_enabled()
|
||||||
{
|
{
|
||||||
SrsConfDirective* conf = root->get("http_server");
|
SrsConfDirective* conf = root->get("http_server");
|
||||||
|
|
|
@ -595,6 +595,26 @@ public:
|
||||||
virtual bool get_srt_enabled();
|
virtual bool get_srt_enabled();
|
||||||
// Get the srt service listen port
|
// Get the srt service listen port
|
||||||
virtual unsigned short get_srt_listen_port();
|
virtual unsigned short get_srt_listen_port();
|
||||||
|
// Get the srt SRTO_MAXBW, max bandwith, default is -1.
|
||||||
|
virtual int get_srto_maxbw();
|
||||||
|
// Get the srt SRTO_MSS, Maximum Segment Size, default is 1500.
|
||||||
|
virtual int get_srto_mss();
|
||||||
|
// Get the srt SRTO_LATENCY, latency, default is 0 which means peer/recv latency is 120ms.
|
||||||
|
virtual int get_srto_latency();
|
||||||
|
// Get the srt SRTO_RCVLATENCY, recv latency, default is 120ms.
|
||||||
|
virtual int get_srto_recv_latency();
|
||||||
|
// Get the srt SRTO_PEERLATENCY, peer latency, default is 0..
|
||||||
|
virtual int get_srto_peer_latency();
|
||||||
|
// Get the srt SRTO_TLPKDROP, Too-late Packet Drop, default is true.
|
||||||
|
virtual bool get_srto_tlpkdrop();
|
||||||
|
// Get the srt SRTO_CONNTIMEO, connection timeout, default is 3000ms.
|
||||||
|
virtual int get_srto_conntimeout();
|
||||||
|
// Get the srt SRTO_SNDBUF, send buffer, default is 8192 × (1500-28).
|
||||||
|
virtual int get_srto_sendbuf();
|
||||||
|
// Get the srt SRTO_RCVBUF, recv buffer, default is 8192 × (1500-28).
|
||||||
|
virtual int get_srto_recvbuf();
|
||||||
|
// SRTO_PAYLOADSIZE
|
||||||
|
virtual int get_srto_payloadsize();
|
||||||
|
|
||||||
// http_hooks section
|
// http_hooks section
|
||||||
private:
|
private:
|
||||||
|
|
|
@ -22,7 +22,7 @@ const long long CHECK_ALIVE_TIMEOUT = 5*1000;
|
||||||
|
|
||||||
long long srt_now_ms = 0;
|
long long srt_now_ms = 0;
|
||||||
|
|
||||||
srt_handle::srt_handle():_run_flag(false)
|
srt_handle::srt_handle(int pollid):_handle_pollid(pollid)
|
||||||
,_last_timestamp(0)
|
,_last_timestamp(0)
|
||||||
,_last_check_alive_ts(0) {
|
,_last_check_alive_ts(0) {
|
||||||
}
|
}
|
||||||
|
@ -31,26 +31,6 @@ srt_handle::~srt_handle() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int srt_handle::start() {
|
|
||||||
_handle_pollid = srt_epoll_create();
|
|
||||||
if (_handle_pollid < -1) {
|
|
||||||
srs_error("srt handle srt_epoll_create error, _handle_pollid=%d", _handle_pollid);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
_run_flag = true;
|
|
||||||
srs_trace("srt handle is starting...");
|
|
||||||
_work_thread_ptr = std::make_shared<std::thread>(&srt_handle::onwork, this);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void srt_handle::stop() {
|
|
||||||
_run_flag = false;
|
|
||||||
_work_thread_ptr->join();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) {
|
void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) {
|
||||||
SRT_TRACEBSTATS mon;
|
SRT_TRACEBSTATS mon;
|
||||||
srt_bstats(srtsocket, &mon, 1);
|
srt_bstats(srtsocket, &mon, 1);
|
||||||
|
@ -151,13 +131,14 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) {
|
||||||
int val_i;
|
int val_i;
|
||||||
int opt_len = sizeof(int);
|
int opt_len = sizeof(int);
|
||||||
|
|
||||||
val_i = 1000;
|
|
||||||
srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, opt_len);
|
|
||||||
val_i = 2048;
|
|
||||||
srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, opt_len);
|
|
||||||
|
|
||||||
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len);
|
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len);
|
||||||
srs_trace("srto SRTO_LATENCY=%d", val_i);
|
srs_trace("srto SRTO_LATENCY=%d", val_i);
|
||||||
|
|
||||||
|
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_PEERLATENCY, &val_i, &opt_len);
|
||||||
|
srs_trace("srto SRTO_PEERLATENCY=%d", val_i);
|
||||||
|
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVLATENCY, &val_i, &opt_len);
|
||||||
|
srs_trace("srto SRTO_RCVLATENCY=%d", val_i);
|
||||||
|
|
||||||
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len);
|
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len);
|
||||||
srs_trace("srto SRTO_SNDBUF=%d", val_i);
|
srs_trace("srto SRTO_SNDBUF=%d", val_i);
|
||||||
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len);
|
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len);
|
||||||
|
@ -185,91 +166,6 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int srt_handle::get_srt_mode(SRTSOCKET conn_srt_socket) {
|
|
||||||
auto iter = _conn_map.find(conn_srt_socket);
|
|
||||||
if (iter == _conn_map.end()) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return iter->second->get_mode();
|
|
||||||
}
|
|
||||||
|
|
||||||
void srt_handle::insert_message_queue(request_message_t msg) {
|
|
||||||
std::unique_lock<std::mutex> lock(_queue_mutex);
|
|
||||||
_message_queue.push(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool srt_handle::get_message_from_queue(request_message_t& msg) {
|
|
||||||
std::unique_lock<std::mutex> lock(_queue_mutex);
|
|
||||||
bool ret = false;
|
|
||||||
|
|
||||||
if (_message_queue.empty()) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
ret = true;
|
|
||||||
msg = _message_queue.front();
|
|
||||||
_message_queue.pop();
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
void srt_handle::onwork()
|
|
||||||
{
|
|
||||||
const unsigned int SRT_FD_MAX = 1024;
|
|
||||||
SRT_SOCKSTATUS status = SRTS_INIT;
|
|
||||||
std::string streamid;
|
|
||||||
int ret;
|
|
||||||
const int64_t DEF_TIMEOUT_INTERVAL = 30;
|
|
||||||
|
|
||||||
srs_trace("srt handle epoll work is starting...");
|
|
||||||
while(_run_flag)
|
|
||||||
{
|
|
||||||
SRTSOCKET read_fds[SRT_FD_MAX];
|
|
||||||
SRTSOCKET write_fds[SRT_FD_MAX];
|
|
||||||
int rfd_num = SRT_FD_MAX;
|
|
||||||
int wfd_num = SRT_FD_MAX;
|
|
||||||
|
|
||||||
srt_now_ms = now_ms();
|
|
||||||
|
|
||||||
request_message_t msg;
|
|
||||||
|
|
||||||
if (get_message_from_queue(msg)) {
|
|
||||||
add_newconn(msg.conn_ptr, msg.events);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_conn_map.empty()) {
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
check_alive();
|
|
||||||
|
|
||||||
ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num,
|
|
||||||
DEF_TIMEOUT_INTERVAL, 0, 0, 0, 0);
|
|
||||||
if (ret < 0) {
|
|
||||||
srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld",
|
|
||||||
ret, srt_now_ms);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int index = 0; index < rfd_num; index++)
|
|
||||||
{
|
|
||||||
status = srt_getsockstate(read_fds[index]);
|
|
||||||
srs_info("srt handle read(push) rfd num:%d, status:%d, streamid:%s, read_fd",
|
|
||||||
rfd_num, status, streamid.c_str(), read_fds[index]);
|
|
||||||
handle_srt_socket(status, read_fds[index]);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int index = 0; index < wfd_num; index++)
|
|
||||||
{
|
|
||||||
status = srt_getsockstate(write_fds[index]);
|
|
||||||
streamid = UDT::getstreamid(write_fds[index]);
|
|
||||||
srs_info("srt handle write(puller) wfd num:%d, status:%d, streamid:%s, write_fd",
|
|
||||||
wfd_num, status, streamid.c_str(), write_fds[index]);
|
|
||||||
handle_srt_socket(status, write_fds[index]);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
|
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
|
||||||
SRT_CONN_PTR srt_conn_ptr;
|
SRT_CONN_PTR srt_conn_ptr;
|
||||||
unsigned char data[DEF_DATA_SIZE];
|
unsigned char data[DEF_DATA_SIZE];
|
||||||
|
@ -293,6 +189,7 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
|
||||||
close_push_conn(conn_fd);
|
close_push_conn(conn_fd);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
srt_conn_ptr->update_timestamp(srt_now_ms);
|
srt_conn_ptr->update_timestamp(srt_now_ms);
|
||||||
|
|
||||||
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);
|
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);
|
||||||
|
|
|
@ -12,33 +12,22 @@
|
||||||
#include "srt_conn.hpp"
|
#include "srt_conn.hpp"
|
||||||
#include "srt_to_rtmp.hpp"
|
#include "srt_to_rtmp.hpp"
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SRT_CONN_PTR conn_ptr;
|
|
||||||
int events;
|
|
||||||
} request_message_t;
|
|
||||||
|
|
||||||
class srt_handle {
|
class srt_handle {
|
||||||
public:
|
public:
|
||||||
srt_handle();
|
srt_handle(int pollid);
|
||||||
~srt_handle();
|
~srt_handle();
|
||||||
|
|
||||||
int start();//create srt epoll and create epoll thread
|
|
||||||
void stop();//close srt epoll and end epoll thread
|
|
||||||
|
|
||||||
void insert_message_queue(request_message_t msg);
|
|
||||||
bool get_message_from_queue(request_message_t& msg);
|
|
||||||
|
|
||||||
private:
|
|
||||||
//add new srt connection into epoll event
|
//add new srt connection into epoll event
|
||||||
void add_newconn(SRT_CONN_PTR conn_ptr, int events);
|
void add_newconn(SRT_CONN_PTR conn_ptr, int events);
|
||||||
//get srt conn object by srt socket
|
|
||||||
SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket);
|
|
||||||
//get srt connect mode: push or pull
|
|
||||||
int get_srt_mode(SRTSOCKET conn_srt_socket);
|
|
||||||
|
|
||||||
void onwork();//epoll thread loop
|
|
||||||
//handle recv/send srt socket
|
//handle recv/send srt socket
|
||||||
void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd);
|
void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd);
|
||||||
|
//check srt connection whether it's still alive.
|
||||||
|
void check_alive();
|
||||||
|
|
||||||
|
private:
|
||||||
|
//get srt conn object by srt socket
|
||||||
|
SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket);
|
||||||
|
|
||||||
void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
|
void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
|
||||||
void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
|
void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
|
||||||
|
|
||||||
|
@ -51,27 +40,19 @@ private:
|
||||||
bool add_new_pusher(SRT_CONN_PTR conn_ptr);
|
bool add_new_pusher(SRT_CONN_PTR conn_ptr);
|
||||||
//remove push connection and remove epoll
|
//remove push connection and remove epoll
|
||||||
void close_push_conn(SRTSOCKET srtsocket);
|
void close_push_conn(SRTSOCKET srtsocket);
|
||||||
|
|
||||||
//check srt connection whether it's still alive.
|
|
||||||
void check_alive();
|
|
||||||
|
|
||||||
//debug statics
|
//debug statics
|
||||||
void debug_statics(SRTSOCKET srtsocket, const std::string& streamid);
|
void debug_statics(SRTSOCKET srtsocket, const std::string& streamid);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool _run_flag;
|
|
||||||
int _handle_pollid;
|
int _handle_pollid;
|
||||||
|
|
||||||
std::unordered_map<SRTSOCKET, SRT_CONN_PTR> _conn_map;//save all srt connection: pull or push
|
std::unordered_map<SRTSOCKET, SRT_CONN_PTR> _conn_map;//save all srt connection: pull or push
|
||||||
std::shared_ptr<std::thread> _work_thread_ptr;
|
|
||||||
|
|
||||||
//save push srt connection for prevent from repeat push connection
|
//save push srt connection for prevent from repeat push connection
|
||||||
std::unordered_map<std::string, SRT_CONN_PTR> _push_conn_map;//key:streamid, value:SRT_CONN_PTR
|
std::unordered_map<std::string, SRT_CONN_PTR> _push_conn_map;//key:streamid, value:SRT_CONN_PTR
|
||||||
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
|
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
|
||||||
std::unordered_map<std::string, std::unordered_map<SRTSOCKET, SRT_CONN_PTR>> _streamid_map;
|
std::unordered_map<std::string, std::unordered_map<SRTSOCKET, SRT_CONN_PTR>> _streamid_map;
|
||||||
|
|
||||||
std::mutex _queue_mutex;
|
|
||||||
std::queue<request_message_t> _message_queue;
|
|
||||||
|
|
||||||
long long _last_timestamp;
|
long long _last_timestamp;
|
||||||
long long _last_check_alive_ts;
|
long long _last_check_alive_ts;
|
||||||
|
|
|
@ -15,10 +15,9 @@
|
||||||
#include <srs_app_rtmp_conn.hpp>
|
#include <srs_app_rtmp_conn.hpp>
|
||||||
#include <srs_app_config.hpp>
|
#include <srs_app_config.hpp>
|
||||||
|
|
||||||
srt_server::srt_server(unsigned short port):listen_port(port)
|
srt_server::srt_server(unsigned short port):_listen_port(port)
|
||||||
,server_socket(-1)
|
,_server_socket(-1)
|
||||||
{
|
{
|
||||||
handle_ptr = std::make_shared<srt_handle>();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
srt_server::~srt_server()
|
srt_server::~srt_server()
|
||||||
|
@ -26,49 +25,101 @@ srt_server::~srt_server()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int srt_server::init_srt_parameter() {
|
||||||
|
const int DEF_RECV_LATENCY = 120;
|
||||||
|
const int DEF_PEER_LATENCY = 0;
|
||||||
|
|
||||||
|
int opt_len = sizeof(int);
|
||||||
|
|
||||||
|
if (_server_socket == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
int maxbw = _srs_config->get_srto_maxbw();
|
||||||
|
srt_setsockopt(_server_socket, 0, SRTO_MAXBW, &maxbw, opt_len);
|
||||||
|
int mss = _srs_config->get_srto_mss();
|
||||||
|
srt_setsockopt(_server_socket, 0, SRTO_MSS, &mss, opt_len);
|
||||||
|
|
||||||
|
bool tlpkdrop = _srs_config->get_srto_tlpkdrop();
|
||||||
|
int tlpkdrop_i = tlpkdrop ? 1 : 0;
|
||||||
|
srt_setsockopt(_server_socket, 0, SRTO_TLPKTDROP, &tlpkdrop_i, opt_len);
|
||||||
|
|
||||||
|
int connection_timeout = _srs_config->get_srto_conntimeout();
|
||||||
|
srt_setsockopt(_server_socket, 0, SRTO_CONNTIMEO, &connection_timeout, opt_len);
|
||||||
|
|
||||||
|
int send_buff = _srs_config->get_srto_sendbuf();
|
||||||
|
srt_setsockopt(_server_socket, 0, SRTO_SNDBUF, &send_buff, opt_len);
|
||||||
|
int recv_buff = _srs_config->get_srto_recvbuf();
|
||||||
|
srt_setsockopt(_server_socket, 0, SRTO_RCVBUF, &recv_buff, opt_len);
|
||||||
|
int payload_size = _srs_config->get_srto_payloadsize();
|
||||||
|
srt_setsockopt(_server_socket, 0, SRTO_PAYLOADSIZE, &payload_size, opt_len);
|
||||||
|
|
||||||
|
int latency = _srs_config->get_srto_latency();
|
||||||
|
if (DEF_RECV_LATENCY != latency) {
|
||||||
|
srt_setsockopt(_server_socket, 0, SRTO_LATENCY, &latency, opt_len);
|
||||||
|
}
|
||||||
|
|
||||||
|
int recv_latency = _srs_config->get_srto_recv_latency();
|
||||||
|
if (DEF_RECV_LATENCY != recv_latency) {
|
||||||
|
srt_setsockopt(_server_socket, 0, SRTO_RCVLATENCY, &recv_latency, opt_len);
|
||||||
|
}
|
||||||
|
|
||||||
|
int peer_latency = _srs_config->get_srto_peer_latency();
|
||||||
|
if (DEF_PEER_LATENCY != peer_latency) {
|
||||||
|
srt_setsockopt(_server_socket, 0, SRTO_PEERLATENCY, &recv_latency, opt_len);
|
||||||
|
}
|
||||||
|
|
||||||
|
srs_trace("init srt parameter, maxbw:%d, mss:%d, tlpkdrop:%d, connect timeout:%d, \
|
||||||
|
send buff:%d, recv buff:%d, payload size:%d, latency:%d, recv latency:%d, peer latency:%d",
|
||||||
|
maxbw, mss, tlpkdrop, connection_timeout, send_buff, recv_buff, payload_size,
|
||||||
|
latency, recv_latency, peer_latency);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
int srt_server::init_srt() {
|
int srt_server::init_srt() {
|
||||||
if (server_socket != -1) {
|
if (_server_socket != -1) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
server_socket = srt_create_socket();
|
_server_socket = srt_create_socket();
|
||||||
sockaddr_in sa;
|
sockaddr_in sa;
|
||||||
memset(&sa, 0, sizeof sa);
|
memset(&sa, 0, sizeof sa);
|
||||||
sa.sin_family = AF_INET;
|
sa.sin_family = AF_INET;
|
||||||
sa.sin_port = htons(listen_port);
|
sa.sin_port = htons(_listen_port);
|
||||||
|
|
||||||
sockaddr* psa = (sockaddr*)&sa;
|
sockaddr* psa = (sockaddr*)&sa;
|
||||||
|
|
||||||
int ret = srt_bind(server_socket, psa, sizeof(sa));
|
int ret = srt_bind(_server_socket, psa, sizeof(sa));
|
||||||
if ( ret == SRT_ERROR )
|
if ( ret == SRT_ERROR )
|
||||||
{
|
{
|
||||||
srt_close(server_socket);
|
srt_close(_server_socket);
|
||||||
srs_error("srt bind error: %d", ret);
|
srs_error("srt bind error: %d", ret);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = srt_listen(server_socket, 5);
|
ret = srt_listen(_server_socket, 5);
|
||||||
if (ret == SRT_ERROR)
|
if (ret == SRT_ERROR)
|
||||||
{
|
{
|
||||||
srt_close(server_socket);
|
srt_close(_server_socket);
|
||||||
srs_error("srt listen error: %d", ret);
|
srs_error("srt listen error: %d", ret);
|
||||||
return -2;
|
return -2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
init_srt_parameter();
|
||||||
|
|
||||||
_pollid = srt_epoll_create();
|
_pollid = srt_epoll_create();
|
||||||
if (_pollid < -1) {
|
if (_pollid < -1) {
|
||||||
srs_error("srt server srt_epoll_create error, port=%d", listen_port);
|
srs_error("srt server srt_epoll_create error, port=%d", _listen_port);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
_handle_ptr = std::make_shared<srt_handle>(_pollid);
|
||||||
|
|
||||||
int events = SRT_EPOLL_IN | SRT_EPOLL_ERR;
|
int events = SRT_EPOLL_IN | SRT_EPOLL_ERR;
|
||||||
ret = srt_epoll_add_usock(_pollid, server_socket, &events);
|
ret = srt_epoll_add_usock(_pollid, _server_socket, &events);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
srs_error("srt server run add epoll error:%d", ret);
|
srs_error("srt server run add epoll error:%d", ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
srs_trace("srt server listen port=%d, server_fd=%d", listen_port, server_socket);
|
srs_trace("srt server listen port=%d, server_fd=%d", _listen_port, _server_socket);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -80,13 +131,9 @@ int srt_server::start()
|
||||||
if ((ret = init_srt()) < 0) {
|
if ((ret = init_srt()) < 0) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
ret = handle_ptr->start();
|
|
||||||
if (ret < 0) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
run_flag = true;
|
run_flag = true;
|
||||||
srs_trace("srt server is starting... port(%d)", listen_port);
|
srs_trace("srt server is starting... port(%d)", _listen_port);
|
||||||
thread_run_ptr = std::make_shared<std::thread>(&srt_server::on_work, this);
|
thread_run_ptr = std::make_shared<std::thread>(&srt_server::on_work, this);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -99,7 +146,6 @@ void srt_server::stop()
|
||||||
}
|
}
|
||||||
thread_run_ptr->join();
|
thread_run_ptr->join();
|
||||||
|
|
||||||
handle_ptr->stop();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,8 +192,8 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd
|
||||||
srt_conn_ptr->close();
|
srt_conn_ptr->close();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
request_message_t msg = {srt_conn_ptr, conn_event};
|
|
||||||
handle_ptr->insert_message_queue(msg);
|
_handle_ptr->add_newconn(srt_conn_ptr, conn_event);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case SRTS_CONNECTED:
|
case SRTS_CONNECTED:
|
||||||
|
@ -169,10 +215,15 @@ void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void srt_server::srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) {
|
||||||
|
_handle_ptr->handle_srt_socket(status, input_fd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
void srt_server::on_work()
|
void srt_server::on_work()
|
||||||
{
|
{
|
||||||
const unsigned int SRT_FD_MAX = 100;
|
const unsigned int SRT_FD_MAX = 100;
|
||||||
srs_trace("srt server is working port(%d)", listen_port);
|
srs_trace("srt server is working port(%d)", _listen_port);
|
||||||
while (run_flag)
|
while (run_flag)
|
||||||
{
|
{
|
||||||
SRTSOCKET read_fds[SRT_FD_MAX];
|
SRTSOCKET read_fds[SRT_FD_MAX];
|
||||||
|
@ -185,19 +236,27 @@ void srt_server::on_work()
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
srs_trace("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d",
|
_handle_ptr->check_alive();
|
||||||
ret, rfd_num, wfd_num);
|
|
||||||
|
|
||||||
for (int index = 0; index < rfd_num; index++) {
|
for (int index = 0; index < rfd_num; index++) {
|
||||||
SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);
|
SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);
|
||||||
srt_handle_connection(status, read_fds[index], "read fd");
|
if (_server_socket == read_fds[index]) {
|
||||||
|
srt_handle_connection(status, read_fds[index], "read fd");
|
||||||
|
} else {
|
||||||
|
srt_handle_data(status, read_fds[index], "read fd");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int index = 0; index < wfd_num; index++) {
|
for (int index = 0; index < wfd_num; index++) {
|
||||||
SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]);
|
SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]);
|
||||||
srt_handle_connection(status, read_fds[index], "write fd");
|
if (_server_socket == write_fds[index]) {
|
||||||
|
srt_handle_connection(status, write_fds[index], "write fd");
|
||||||
|
} else {
|
||||||
|
srt_handle_data(status, write_fds[index], "write fd");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
srt_epoll_clear_usocks(_pollid);
|
||||||
}
|
}
|
||||||
|
|
||||||
SrtServerAdapter::SrtServerAdapter()
|
SrtServerAdapter::SrtServerAdapter()
|
||||||
|
|
|
@ -21,18 +21,22 @@ public:
|
||||||
private:
|
private:
|
||||||
//init srt socket and srt epoll
|
//init srt socket and srt epoll
|
||||||
int init_srt();
|
int init_srt();
|
||||||
|
int init_srt_parameter();
|
||||||
|
|
||||||
//srt main epoll loop
|
//srt main epoll loop
|
||||||
void on_work();
|
void on_work();
|
||||||
//accept new srt connection
|
//accept new srt connection
|
||||||
void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr);
|
void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr);
|
||||||
|
//get srt data read/write
|
||||||
|
void srt_handle_data(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
unsigned short listen_port;
|
unsigned short _listen_port;
|
||||||
SRTSOCKET server_socket;
|
SRTSOCKET _server_socket;
|
||||||
int _pollid;
|
int _pollid;
|
||||||
bool run_flag;
|
bool run_flag;
|
||||||
std::shared_ptr<std::thread> thread_run_ptr;
|
std::shared_ptr<std::thread> thread_run_ptr;
|
||||||
std::shared_ptr<srt_handle> handle_ptr;
|
std::shared_ptr<srt_handle> _handle_ptr;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef std::shared_ptr<srt_server> SRT_SERVER_PTR;
|
typedef std::shared_ptr<srt_server> SRT_SERVER_PTR;
|
||||||
|
|
|
@ -220,6 +220,8 @@ rtmp_client::rtmp_client(std::string key_path):_key_path(key_path)
|
||||||
_h264_sps_changed = false;
|
_h264_sps_changed = false;
|
||||||
_h264_pps_changed = false;
|
_h264_pps_changed = false;
|
||||||
_h264_sps_pps_sent = false;
|
_h264_sps_pps_sent = false;
|
||||||
|
|
||||||
|
_last_live_ts = now_ms();
|
||||||
srs_trace("rtmp client construct url:%s", url_sz);
|
srs_trace("rtmp client construct url:%s", url_sz);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,6 +253,7 @@ srs_error_t rtmp_client::connect() {
|
||||||
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
|
srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
|
||||||
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
|
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
|
||||||
|
|
||||||
|
_last_live_ts = now_ms();
|
||||||
if (_connect_flag) {
|
if (_connect_flag) {
|
||||||
return srs_success;
|
return srs_success;
|
||||||
}
|
}
|
||||||
|
|
|
@ -296,7 +296,11 @@ int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback)
|
||||||
path = data_ptr->get_path();
|
path = data_ptr->get_path();
|
||||||
for (unsigned int index = 0; index < count; index++)
|
for (unsigned int index = 0; index < count; index++)
|
||||||
{
|
{
|
||||||
ret = decode_unit(data_ptr->get_data() + 188*index, path, callback);
|
unsigned char* data = data_ptr->get_data() + 188*index;
|
||||||
|
if (data[0] != 0x47) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ret = decode_unit(data, path, callback);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in a new issue