diff --git a/trunk/conf/srt.conf b/trunk/conf/srt.conf index 2aa29aa36..b9fcc9d32 100644 --- a/trunk/conf/srt.conf +++ b/trunk/conf/srt.conf @@ -27,10 +27,6 @@ srt_server { # @doc https://github.com/ossrs/srs/issues/1147#issuecomment-577607026 vhost __defaultVhost__ { - play { - #atc on; - mix_correct on; - } } vhost srs.srt.com.cn { } diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 0eaabb4c9..5781d2f1b 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3530,7 +3530,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "mss" && n != "latency" && n != "recvlatency" && n != "peerlatency" && n != "tlpkdrop" && n != "connect_timeout" && n != "sendbuf" && n != "recvbuf" && n != "payloadsize" - && n != "default_app") { + && n != "default_app" && n != "mix_correct") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal srt_stream.%s", n.c_str()); } } @@ -6754,6 +6754,20 @@ unsigned short SrsConfig::get_srt_listen_port() return (unsigned short)atoi(conf->arg0().c_str()); } +bool SrsConfig::get_srt_mix_correct() { + static bool DEFAULT = true; + SrsConfDirective* conf = root->get("srt_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("mix_correct"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + int SrsConfig::get_srto_maxbw() { static int64_t DEFAULT = -1; SrsConfDirective* conf = root->get("srt_server"); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index dde708f8e..4360c41da 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -625,6 +625,8 @@ public: virtual int get_srto_payloadsize(); // Get the default app. virtual std::string get_default_app_name(); + // Get the mix_correct + virtual bool get_srt_mix_correct(); // http_hooks section private: diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp index 4592785e3..634376275 100644 --- a/trunk/src/srt/srt_handle.cpp +++ b/trunk/src/srt/srt_handle.cpp @@ -145,6 +145,7 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { srs_trace("srto SRTO_RCVBUF=%d", val_i); srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, &opt_len); srs_trace("srto SRTO_MAXBW=%d", val_i); + srs_trace("srt mix_correct is %s", _srs_config->get_srt_mix_correct() ? "enable" : "disable"); if (conn_ptr->get_mode() == PULL_SRT_MODE) { add_new_puller(conn_ptr, conn_ptr->get_subpath()); diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp index 71a903b94..ec2d7d828 100644 --- a/trunk/src/srt/srt_to_rtmp.cpp +++ b/trunk/src/srt/srt_to_rtmp.cpp @@ -322,12 +322,13 @@ srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) { return srs_error_wrap(err, "avc to flv"); } - // the timestamp in rtmp message header is dts. - uint32_t timestamp = dts; - if ((err = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != srs_success) { - return srs_error_wrap(err, "write packet"); + if (_srs_config->get_srt_mix_correct()) { + _rtmp_queue.insert_rtmp_data((unsigned char*)flv, nb_flv, (int64_t)dts, SrsFrameTypeVideo); + rtmp_write_work(); + } else { + rtmp_write_packet(SrsFrameTypeVideo, dts, flv, nb_flv); } - + // reset sps and pps. _h264_sps_changed = false; _h264_pps_changed = false; @@ -367,10 +368,14 @@ srs_error_t rtmp_client::write_h264_ipb_frame(char* frame, int frame_size, uint3 if ((err = _avc_ptr->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { return srs_error_wrap(err, "mux avc to flv"); } - - // the timestamp in rtmp message header is dts. - uint32_t timestamp = dts; - return rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv); + if (_srs_config->get_srt_mix_correct()) { + _rtmp_queue.insert_rtmp_data((unsigned char*)flv, nb_flv, (int64_t)dts, SrsFrameTypeVideo); + rtmp_write_work(); + } else { + rtmp_write_packet(SrsFrameTypeVideo, dts, flv, nb_flv); + } + + return err; } srs_error_t rtmp_client::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) { @@ -381,14 +386,20 @@ srs_error_t rtmp_client::write_audio_raw_frame(char* frame, int frame_size, SrsR if ((err = _aac_ptr->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) { return srs_error_wrap(err, "mux aac to flv"); } + if (_srs_config->get_srt_mix_correct()) { + _rtmp_queue.insert_rtmp_data((unsigned char*)data, size, (int64_t)dts, SrsFrameTypeAudio); + rtmp_write_work(); + } else { + rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); + } - return rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); + return err; } srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) { srs_error_t err = srs_success; SrsSharedPtrMessage* msg = NULL; - + if ((err = srs_rtmp_create_msg(type, timestamp, data, size, _rtmp_conn_ptr->sid(), &msg)) != srs_success) { return srs_error_wrap(err, "create message"); } @@ -403,6 +414,19 @@ srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char* return err; } +void rtmp_client::rtmp_write_work() { + rtmp_packet_info_s packet_info; + bool ret = false; + + do { + ret = _rtmp_queue.get_rtmp_data(packet_info); + if (ret) { + rtmp_write_packet(packet_info._type, packet_info._dts, (char*)packet_info._data, packet_info._len); + } + } while(ret); + return; +} + srs_error_t rtmp_client::on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts) { srs_error_t err = srs_success; @@ -416,6 +440,7 @@ srs_error_t rtmp_client::on_ts_video(std::shared_ptr avs_ptr, uint64_ if (dts == 0) { dts = pts; } + // send each frame. while (!avs_ptr->empty()) { char* frame = NULL; @@ -510,8 +535,9 @@ int rtmp_client::get_sample_rate(char sound_rate) { srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts) { srs_error_t err = srs_success; - uint64_t last_dts; + uint64_t base_dts; uint64_t real_dts; + uint64_t first_dts; int index = 0; int sample_size = 1024; @@ -520,11 +546,11 @@ srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_ return srs_error_wrap(err, "connect"); } - last_dts = dts/90; - if (last_dts == 0) { - last_dts = pts/90; + base_dts = dts/90; + if (base_dts == 0) { + base_dts = pts/90; } - + // send each frame. while (!avs_ptr->empty()) { char* frame = NULL; @@ -545,7 +571,10 @@ srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_ sample_size = 1024; } - real_dts = last_dts + index * 1000.0 * sample_size / sample_rate; + real_dts = base_dts + index * 1000.0 * sample_size / sample_rate; + if (index == 0) { + first_dts = real_dts; + } index++; // generate sh. @@ -571,6 +600,12 @@ srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_ _last_live_ts = now_ms(); } + uint64_t diff_t = real_dts - first_dts; + diff_t += 100; + if ((diff_t > 200) && (diff_t < 600)) { + srs_info("set_queue_timeout timeout:%lu", diff_t); + _rtmp_queue.set_queue_timeout(diff_t); + } return err; } @@ -594,3 +629,75 @@ void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media } return; } + +rtmp_packet_queue::rtmp_packet_queue():_queue_timeout(QUEUE_DEF_TIMEOUT) + ,_queue_maxlen(QUEUE_LEN_MAX) + ,_first_packet_t(-1) + ,_first_local_t(-1) { + +} + +rtmp_packet_queue::~rtmp_packet_queue() { + for (auto item : _send_map) { + rtmp_packet_info_s info = item.second; + if (info._data) { + delete info._data; + } + } + _send_map.clear(); +} + +void rtmp_packet_queue::set_queue_timeout(int64_t queue_timeout) { + _queue_timeout = queue_timeout; +} + +void rtmp_packet_queue::insert_rtmp_data(unsigned char* data, int len, int64_t dts, char media_type) { + rtmp_packet_info_s packet_info; + + packet_info._data = data; + packet_info._len = len; + packet_info._dts = dts; + packet_info._type = media_type; + + if (_first_packet_t == -1) { + _first_packet_t = dts; + _first_local_t = (int64_t)now_ms(); + } + + _send_map.insert(std::make_pair(dts, packet_info)); + return; +} + +bool rtmp_packet_queue::is_ready() { + if (!_srs_config->get_srt_mix_correct() && !_send_map.empty()) { + return true; + } + if (_send_map.size() < 2) { + return false; + } + + if (_send_map.size() >= (size_t)_queue_maxlen) { + return true; + } + + auto first_item = _send_map.begin(); + int64_t now_t = (int64_t)now_ms(); + + int64_t diff_t = (now_t - _first_local_t) - (first_item->first - _first_packet_t); + + if (diff_t >= _queue_timeout) { + return true; + } + return false; +} + +bool rtmp_packet_queue::get_rtmp_data(rtmp_packet_info_s& packet_info) { + if (!is_ready()) { + return false; + } + auto iter = _send_map.begin(); + packet_info = iter->second; + _send_map.erase(iter); + + return true; +} \ No newline at end of file diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp index 3a3dea3d9..5fcaf6acf 100644 --- a/trunk/src/srt/srt_to_rtmp.hpp +++ b/trunk/src/srt/srt_to_rtmp.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,37 @@ typedef std::shared_ptr AAC_PTR; #define DEFAULT_VHOST "__default_host__" +#define QUEUE_DEF_TIMEOUT 500 +#define QUEUE_LEN_MAX 100 + +typedef struct { + unsigned char* _data; + int _len; + int64_t _dts; + char _type; + char reserve[3]; +} rtmp_packet_info_s; + +class rtmp_packet_queue { +public: + rtmp_packet_queue(); + ~rtmp_packet_queue(); + + void set_queue_timeout(int64_t queue_timeout); + void insert_rtmp_data(unsigned char* data, int len, int64_t dts, char media_type); + bool get_rtmp_data(rtmp_packet_info_s& packet_info); + +private: + bool is_ready(); + +private: + int64_t _queue_timeout; + int64_t _queue_maxlen; + int64_t _first_packet_t; + int64_t _first_local_t; + std::multimap _send_map;//key:dts, value:rtmp_packet_info +}; + class rtmp_client : public ts_media_data_callback_I, public std::enable_shared_from_this { public: rtmp_client(std::string key_path); @@ -48,6 +80,8 @@ private: int get_sample_rate(char sound_rate); + void rtmp_write_work(); + private: virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size); @@ -73,6 +107,9 @@ private: RTMP_CONN_PTR _rtmp_conn_ptr; bool _connect_flag; int64_t _last_live_ts; + +private: + rtmp_packet_queue _rtmp_queue; }; typedef std::shared_ptr RTMP_CLIENT_PTR; diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp index 3eed4f74f..ead598bbe 100644 --- a/trunk/src/srt/ts_demux.cpp +++ b/trunk/src/srt/ts_demux.cpp @@ -259,11 +259,17 @@ int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_C if(ts_header_info._payload_unit_start_indicator){ unsigned char* ret_data_p = nullptr; size_t ret_size = 0; + uint64_t dts = 0; + uint64_t pts = 0; //callback last media data in data buffer on_callback(callback, _last_pid, key_path, _last_dts, _last_pts); - pes_parse(data_p+npos, npos, &ret_data_p, ret_size, _last_dts, _last_pts); + pes_parse(data_p+npos, npos, &ret_data_p, ret_size, dts, pts); + + _last_pts = pts; + _last_dts = (dts == 0) ? pts : dts; + if ((ret_data_p != nullptr) && (ret_size > 0)) { insert_into_databuf(ret_data_p, ret_size, key_path, ts_header_info._PID); }