diff --git a/trunk/3rdparty/st-srs/README.md b/trunk/3rdparty/st-srs/README.md index 60fc1a651..522d3c761 100644 --- a/trunk/3rdparty/st-srs/README.md +++ b/trunk/3rdparty/st-srs/README.md @@ -22,6 +22,8 @@ The branch [srs](https://github.com/ossrs/state-threads/tree/srs) will be patche - [x] Patch [st.osx10.14.build.patch](https://github.com/ossrs/srs/blob/2.0release/trunk/3rdparty/patches/6.st.osx10.14.build.patch), for osx 10.14 build. - [x] Support macro `MD_ST_NO_ASM` to disable ASM, [#8](https://github.com/ossrs/state-threads/issues/8). - [x] Merge patch [srs#1282](https://github.com/ossrs/srs/issues/1282#issuecomment-445539513) to support aarch64, [#9](https://github.com/ossrs/state-threads/issues/9). +- [x] Support OSX for Apple Darwin, macOS, [#11](https://github.com/ossrs/state-threads/issues/11). +- [x] Support sendmmsg for UDP, [#12](https://github.com/ossrs/state-threads/issues/12). ## Docs @@ -85,4 +87,10 @@ Important cli options: 1. `--track-origins= [default: no]`, Controls whether Memcheck tracks the origin of uninitialised values. By default, it does not, which means that although it can tell you that an uninitialised value is being used in a dangerous way, it cannot tell you where the uninitialised value came from. This often makes it difficult to track down the root problem. 1. `--show-reachable= , --show-possibly-lost=`, to show the using memory. +## Analysis + +1. About setjmp and longjmp, read [setjmp](https://gitee.com/winlinvip/srs-wiki/raw/master/images/st-setjmp.jpg). +1. About the stack structure, read [stack](https://gitee.com/winlinvip/srs-wiki/raw/master/images/st-stack.jpg) +1. About asm code comments, read [#91d530e](https://github.com/ossrs/state-threads/commit/91d530e#diff-ed9428b14ff6afda0e9ab04cc91d4445R25). + Winlin 2016 diff --git a/trunk/auto/depends.sh b/trunk/auto/depends.sh index f2d3e378c..557de4bb3 100755 --- a/trunk/auto/depends.sh +++ b/trunk/auto/depends.sh @@ -84,15 +84,6 @@ function Ubuntu_prepare() echo "The unzip is installed." fi - if [[ $SRS_NASM == YES ]]; then - nasm -v >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then - echo "Installing nasm." - require_sudoer "sudo apt-get install -y --force-yes nasm" - sudo apt-get install -y --force-yes nasm; ret=$?; if [[ 0 -ne $ret ]]; then return $ret; fi - echo "The nasm is installed." - fi - fi - if [[ $SRS_VALGRIND == YES ]]; then valgrind --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then echo "Installing valgrind." @@ -171,13 +162,6 @@ function Centos_prepare() echo "The unzip is installed." fi - nasm -v >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then - echo "Installing nasm." - require_sudoer "sudo yum install -y nasm" - sudo yum install -y nasm; ret=$?; if [[ 0 -ne $ret ]]; then return $ret; fi - echo "The nasm is installed." - fi - if [[ $SRS_VALGRIND == YES ]]; then valgrind --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then echo "Installing valgrind." @@ -230,15 +214,16 @@ function OSX_prepare() fi OS_IS_OSX=YES - echo "OSX detected, install tools if needed" # requires the osx when os if [ $OS_IS_OSX = YES ]; then if [ $SRS_OSX = NO ]; then - echo "OSX detected, must specifies the --osx" + echo "OSX detected, please use: ./configure --osx" exit 1 fi fi + echo "OSX detected, install tools if needed" + brew --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then echo "install brew" echo "ruby -e \"$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)\"" @@ -281,6 +266,10 @@ function OSX_prepare() echo "install unzip success" fi + pkg-config --version >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then + echo "Please install pkg-config"; exit -1; + fi + echo "OSX install tools success" return 0 } @@ -614,9 +603,16 @@ fi ##################################################################################### if [[ $SRS_EXPORT_LIBRTMP_PROJECT == NO && $SRS_RTC == YES ]]; then FFMPEG_OPTIONS="" + + # If disable nasm, disable all ASMs. if [[ $SRS_NASM == NO ]]; then FFMPEG_OPTIONS="--disable-asm --disable-x86asm --disable-inline-asm" fi + # If no nasm, we disable the x86asm. + nasm -v >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then + FFMPEG_OPTIONS="--disable-x86asm" + fi + if [[ -f ${SRS_OBJS}/${SRS_PLATFORM}/ffmpeg/lib/libavcodec.a ]]; then echo "The ffmpeg-4.2-fit is ok."; else diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh index 69293fce3..f2b0cb7e6 100755 --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -569,6 +569,7 @@ function apply_user_detail_options() { # Detect whether has sendmmsg. # @see http://man7.org/linux/man-pages/man2/sendmmsg.2.html + mkdir -p ${SRS_OBJS} && echo "#include " > ${SRS_OBJS}/_tmp_sendmmsg_detect.c echo "int main(int argc, char** argv) {" >> ${SRS_OBJS}/_tmp_sendmmsg_detect.c echo " struct mmsghdr hdr;" >> ${SRS_OBJS}/_tmp_sendmmsg_detect.c diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 581e4daa3..2c1089665 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -443,6 +443,21 @@ rtc_server { # @remark Linux 4.18+ only, for other OS always disabled. # default: on gso on; + # Whether pad first packet for GSO for padding bytes. + # If 0, disable padding for GSO. + # @remark The max padding size is 0x7f(127). + # default: 127 + padding 127; + # Whether enable the perf stat at http://localhost:1985/api/v1/perf + # default: on + perf_stat on; + # The queue length, in number of mmsghdr, in messages. + # For example, 30 means we will cache 30K messages at most. + # If exceed, we will drop messages. + # @remark Each reuseport use a dedicated queue, if queue is 2000, reuseport is 4, + # then system queue is 2000*4 = 8k, user can incrase reuseport to incrase the queue. + # default: 2000 + queue_length 2000; } vhost rtc.vhost.srs.com { @@ -469,12 +484,17 @@ vhost rtc.vhost.srs.com { stun_strict_check on; } # whether enable min delay mode for vhost. - # For RTC, we recommend to set to on. + # default: on, for RTC. min_latency on; play { # set the MW(merged-write) latency in ms. - # For RTC, we recommend lower value, such as 0. + # @remark For WebRTC, we enable pass-timestamp mode, so we ignore this config. + # default: 0 (For WebRTC) mw_latency 0; + # Set the MW(merged-write) min messages. + # default: 0 (For Real-Time, min_latency on) + # default: 1 (For WebRTC, min_latency off) + mw_msgs 0; } } @@ -701,10 +721,17 @@ vhost play.srs.com { # SRS always set mw on, so we just set the latency value. # the latency of stream >= mw_latency + mr_latency # the value recomment is [300, 1800] - # default: 350 (for RTMP/HTTP-FLV) - # default: 0 (for WebRTC) + # @remark For WebRTC, we enable pass-timestamp mode, so we ignore this config. + # default: 350 (For RTMP/HTTP-FLV) + # default: 0 (For WebRTC) mw_latency 350; + # Set the MW(merged-write) min messages. + # default: 0 (For Real-Time, min_latency on) + # default: 1 (For WebRTC, min_latency off) + # default: 8 (For RTMP/HTTP-FLV, min_latency off). + mw_msgs 8; + # the minimal packets send interval in ms, # used to control the ndiff of stream by srs_rtmp_dump, # for example, some device can only accept some stream which @@ -761,6 +788,7 @@ vhost mrw.srs.com { # @see play.srs.com play { mw_latency 350; + mw_msgs 8; } # @see publish.srs.com @@ -780,6 +808,7 @@ vhost min.delay.com { # @see play.srs.com play { mw_latency 100; + mw_msgs 4; gop_cache off; queue_length 10; } @@ -808,6 +837,7 @@ vhost stream.control.com { # @see play.srs.com play { mw_latency 100; + mw_msgs 4; queue_length 10; send_min_interval 10.0; reduce_sequence_header on; diff --git a/trunk/scripts/perf_gso.py b/trunk/scripts/perf_gso.py index 707e6c6b5..02590e759 100755 --- a/trunk/scripts/perf_gso.py +++ b/trunk/scripts/perf_gso.py @@ -41,8 +41,15 @@ print "Repsonse %s"%(s) obj = json.loads(s) +print "" +p = obj['data']['dropped'] +print('Frame-Dropped: %.1f%s'%(10000.0 * p['rtc_dropeed'] / p['rtc_frames'], '%%')) +p = obj['data']['bytes'] +print('Padding-Overload: %.1f%s %dMB'%(10000.0 * p['rtc_padding'] / p['rtc_bytes'], '%%', p['rtc_padding']/1024/1024)) + # 2, 3, 5, 9, 16, 32, 64, 128, 256 keys = ['lt_2', 'lt_3', 'lt_5', 'lt_9', 'lt_16', 'lt_32', 'lt_64', 'lt_128', 'lt_256', 'gt_256'] +print("\n----------- 1 2 [3,4] [5,8] [9,15] [16,31] [32,63] [64,127] [128,255] [256,+) Packets"), print "" print("AV---Frames"), diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 956bc99b0..2a8454633 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3618,7 +3618,8 @@ srs_error_t SrsConfig::check_normal_config() for (int i = 0; conf && i < (int)conf->directives.size(); i++) { string n = conf->at(i)->name; if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa" - && n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus") { + && n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus" + && n != "padding" && n != "perf_stat" && n != "queue_length") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str()); } } @@ -3784,7 +3785,8 @@ srs_error_t SrsConfig::check_normal_config() for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; if (m != "time_jitter" && m != "mix_correct" && m != "atc" && m != "atc_auto" && m != "mw_latency" - && m != "gop_cache" && m != "queue_length" && m != "send_min_interval" && m != "reduce_sequence_header") { + && m != "gop_cache" && m != "queue_length" && m != "send_min_interval" && m != "reduce_sequence_header" + && m != "mw_msgs") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.play.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -4840,6 +4842,57 @@ bool SrsConfig::get_rtc_server_gso2() return SRS_CONF_PERFER_TRUE(conf->arg0()); } +int SrsConfig::get_rtc_server_padding() +{ + static int DEFAULT = 127; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("padding"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return srs_min(127, ::atoi(conf->arg0().c_str())); +} + +bool SrsConfig::get_rtc_server_perf_stat() +{ + static bool DEFAULT = true; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("perf_stat"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + +int SrsConfig::get_rtc_server_queue_length() +{ + static int DEFAULT = 2000; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("queue_length"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + SrsConfDirective* SrsConfig::get_rtc(string vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -5367,8 +5420,48 @@ srs_utime_t SrsConfig::get_mw_sleep(string vhost, bool is_rtc) if (!conf || conf->arg0().empty()) { return DEFAULT; } + + int v = ::atoi(conf->arg0().c_str()); + if (is_rtc && v > 0) { + srs_warn("For RTC, we ignore mw_latency"); + return 0; + } - return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS); + return (srs_utime_t)(v * SRS_UTIME_MILLISECONDS); +} + +int SrsConfig::get_mw_msgs(string vhost, bool is_realtime, bool is_rtc) +{ + int DEFAULT = SRS_PERF_MW_MIN_MSGS; + if (is_rtc) { + DEFAULT = SRS_PERF_MW_MIN_MSGS_FOR_RTC; + } + if (is_realtime) { + DEFAULT = SRS_PERF_MW_MIN_MSGS_REALTIME; + } + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("play"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("mw_msgs"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + int v = ::atoi(conf->arg0().c_str()); + if (v > SRS_PERF_MW_MSGS) { + srs_warn("reset mw_msgs %d to max %d", v, SRS_PERF_MW_MSGS); + v = SRS_PERF_MW_MSGS; + } + + return v; } bool SrsConfig::get_realtime_enabled(string vhost, bool is_rtc) diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 96524c33a..a905949b1 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -535,6 +535,11 @@ public: virtual bool get_rtc_server_gso(); private: virtual bool get_rtc_server_gso2(); +public: + virtual int get_rtc_server_padding(); + virtual bool get_rtc_server_perf_stat(); + virtual int get_rtc_server_queue_length(); + public: SrsConfDirective* get_rtc(std::string vhost); bool get_rtc_enabled(std::string vhost); @@ -625,6 +630,10 @@ public: // @param vhost, the vhost to get the mw sleep time. // TODO: FIXME: add utest for mw config. virtual srs_utime_t get_mw_sleep(std::string vhost, bool is_rtc = false); + // Get the mw_msgs, mw wait time in packets for vhost. + // @param vhost, the vhost to get the mw sleep msgs. + // TODO: FIXME: add utest for mw config. + virtual int get_mw_msgs(std::string vhost, bool is_realtime, bool is_rtc = false); // Whether min latency mode enabled. // @param vhost, the vhost to get the min_latency. // TODO: FIXME: add utest for min_latency. diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 6a83f7a8c..61552def2 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -869,6 +869,7 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe api = prop->to_str(); } + // TODO: FIXME: Parse vhost. // Parse app and stream from streamurl. string app; string stream_name; @@ -909,6 +910,13 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe request.app = app; request.stream = stream_name; + // TODO: FIXME: Parse vhost. + // discovery vhost, resolve the vhost from config + SrsConfDirective* parsed_vhost = _srs_config->get_vhost(""); + if (parsed_vhost) { + request.vhost = parsed_vhost->arg0(); + } + // TODO: FIXME: Maybe need a better name? // TODO: FIXME: When server enabled, but vhost disabled, should report error. SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp, eip); @@ -1615,14 +1623,23 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* SrsStatistic* stat = SrsStatistic::instance(); string target = r->query_get("target"); - srs_trace("query target=%s", target.c_str()); + string reset = r->query_get("reset"); + srs_trace("query target=%s, reset=%s, rtc_stat_enabled=%d", target.c_str(), reset.c_str(), + _srs_config->get_rtc_server_perf_stat()); if (true) { SrsJsonObject* p = SrsJsonAny::object(); data->set("query", p); p->set("target", SrsJsonAny::str(target.c_str())); - p->set("help", SrsJsonAny::str("?target=avframes|rtc|rtp|gso|writev_iovs|sendmmsg")); + p->set("reset", SrsJsonAny::str(reset.c_str())); + p->set("help", SrsJsonAny::str("?target=avframes|rtc|rtp|gso|writev_iovs|sendmmsg|bytes|dropped")); + p->set("help2", SrsJsonAny::str("?reset=all")); + } + + if (!reset.empty()) { + stat->reset_perf(); + return srs_api_response(w, r, obj->dumps()); } if (target.empty() || target == "avframes") { @@ -1679,6 +1696,24 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* } } + if (target.empty() || target == "bytes") { + SrsJsonObject* p = SrsJsonAny::object(); + data->set("bytes", p); + if ((err = stat->dumps_perf_bytes(p)) != srs_success) { + int code = srs_error_code(err); srs_error_reset(err); + return srs_api_response_code(w, r, code); + } + } + + if (target.empty() || target == "dropped") { + SrsJsonObject* p = SrsJsonAny::object(); + data->set("dropped", p); + if ((err = stat->dumps_perf_dropped(p)) != srs_success) { + int code = srs_error_code(err); srs_error_reset(err); + return srs_api_response_code(w, r, code); + } + } + return srs_api_response(w, r, obj->dumps()); } diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 95c1f03fd..95a67c21a 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -660,7 +660,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess if ((err = consumer->dump_packets(&msgs, count)) != srs_success) { return srs_error_wrap(err, "consumer dump packets"); } - + + // TODO: FIXME: Support merged-write wait. if (count <= 0) { // Directly use sleep, donot use consumer wait, because we couldn't awake consumer. srs_usleep(mw_sleep); diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 8ef384dd9..cc20cc756 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -141,6 +141,11 @@ public: virtual srs_error_t fetch(mmsghdr** pphdr) = 0; // Notify the sender to send out the msg. virtual srs_error_t sendmmsg(mmsghdr* hdr) = 0; + // Whether sender exceed the max queue, that is, overflow. + virtual bool overflow() = 0; + // Set the queue extra ratio, for example, when mw_msgs > 0, we need larger queue. + // For r, 100 means x1, 200 means x2. + virtual void set_extra_ratio(int r) = 0; }; class SrsUdpMuxSocket diff --git a/trunk/src/app/srs_app_rtc.cpp b/trunk/src/app/srs_app_rtc.cpp index 7d2d44e82..95cfea597 100644 --- a/trunk/src/app/srs_app_rtc.cpp +++ b/trunk/src/app/srs_app_rtc.cpp @@ -195,15 +195,19 @@ srs_error_t SrsRtpOpusMuxer::transcode(SrsSharedPtrMessage* shared_audio, char* return err; } + int nn_max_extra_payload = 0; SrsSample samples[nn_opus_packets]; for (int i = 0; i < nn_opus_packets; i++) { SrsSample* p = samples + i; p->size = opus_sizes[i]; p->bytes = new char[p->size]; memcpy(p->bytes, opus_payloads[i], p->size); + + nn_max_extra_payload = srs_max(nn_max_extra_payload, p->size); } shared_audio->set_extra_payloads(samples, nn_opus_packets); + shared_audio->set_max_extra_payload(nn_max_extra_payload); return err; } diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index b3cd71922..ba8b48aad 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -60,6 +60,12 @@ using namespace std; #include #include +// The RTP payload max size, reserved some paddings for SRTP as such: +// kRtpPacketSize = kRtpMaxPayloadSize + paddings +// For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400, +// which reserves 100 bytes for SRTP or paddings. +const int kRtpMaxPayloadSize = kRtpPacketSize - 200; + static bool is_stun(const uint8_t* data, const int size) { return data != NULL && size > 0 && (data[0] == 0 || data[0] == 1); @@ -454,24 +460,86 @@ srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, in return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed"); } -SrsRtcPackets::SrsRtcPackets(bool gso, bool merge_nalus) +SrsRtcPackets::SrsRtcPackets(int nn_cache_max) { +#if defined(SRS_DEBUG) + debug_id = 0; +#endif + + use_gso = false; + should_merge_nalus = false; + + nn_rtp_pkts = 0; + nn_audios = nn_extras = 0; + nn_videos = nn_samples = 0; + nn_bytes = nn_rtp_bytes = 0; + nn_padding_bytes = nn_paddings = 0; + nn_dropped = 0; + + cursor = 0; + nn_cache = nn_cache_max; + // TODO: FIXME: We should allocate a smaller cache, and increase it when exhausted. + cache = new SrsRtpPacket2[nn_cache]; +} + +SrsRtcPackets::~SrsRtcPackets() +{ + srs_freepa(cache); + nn_cache = 0; +} + +void SrsRtcPackets::reset(bool gso, bool merge_nalus) +{ + for (int i = 0; i < cursor; i++) { + SrsRtpPacket2* packet = cache + i; + packet->reset(); + } + +#if defined(SRS_DEBUG) + debug_id++; +#endif + use_gso = gso; should_merge_nalus = merge_nalus; nn_rtp_pkts = 0; nn_audios = nn_extras = 0; nn_videos = nn_samples = 0; + nn_bytes = nn_rtp_bytes = 0; + nn_padding_bytes = nn_paddings = 0; + nn_dropped = 0; + + cursor = 0; } -SrsRtcPackets::~SrsRtcPackets() +SrsRtpPacket2* SrsRtcPackets::fetch() { - vector::iterator it; - for (it = packets.begin(); it != packets.end(); ++it ) { - SrsRtpPacket2* packet = *it; - srs_freep(packet); + if (cursor >= nn_cache) { + return NULL; } - packets.clear(); + return cache + (cursor++); +} + +SrsRtpPacket2* SrsRtcPackets::back() +{ + srs_assert(cursor > 0); + return cache + cursor - 1; +} + +int SrsRtcPackets::size() +{ + return cursor; +} + +int SrsRtcPackets::capacity() +{ + return nn_cache; +} + +SrsRtpPacket2* SrsRtcPackets::at(int index) +{ + srs_assert(index < cursor); + return cache + index; } SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid) @@ -482,14 +550,21 @@ SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int rtc_session = s; sendonly_ukt = u->copy_sendonly(); + sender = u->sender(); + gso = false; merge_nalus = false; + max_padding = 0; audio_timestamp = 0; audio_sequence = 0; video_sequence = 0; + mw_sleep = 0; + mw_msgs = 0; + realtime = true; + _srs_config->subscribe(this); } @@ -513,33 +588,46 @@ srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t gso = _srs_config->get_rtc_server_gso(); merge_nalus = _srs_config->get_rtc_server_merge_nalus(); - srs_trace("RTC sender video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d)", - video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus); + max_padding = _srs_config->get_rtc_server_padding(); + srs_trace("RTC sender video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d), padding=%d", + video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus, max_padding); return err; } srs_error_t SrsRtcSenderThread::on_reload_rtc_server() { - if (true) { - bool v = _srs_config->get_rtc_server_gso(); - if (gso != v) { - srs_trace("Reload gso %d=>%d", gso, v); - gso = v; - } - } + gso = _srs_config->get_rtc_server_gso(); + merge_nalus = _srs_config->get_rtc_server_merge_nalus(); + max_padding = _srs_config->get_rtc_server_padding(); - if (true) { - bool v = _srs_config->get_rtc_server_merge_nalus(); - if (merge_nalus != v) { - srs_trace("Reload merge_nalus %d=>%d", merge_nalus, v); - merge_nalus = v; - } - } + srs_trace("Reload rtc_server gso=%d, merge_nalus=%d, max_padding=%d", gso, merge_nalus, max_padding); return srs_success; } +srs_error_t SrsRtcSenderThread::on_reload_vhost_play(string vhost) +{ + SrsRequest* req = &rtc_session->request; + + if (req->vhost != vhost) { + return srs_success; + } + + realtime = _srs_config->get_realtime_enabled(req->vhost, true); + mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); + mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); + + srs_trace("Reload play realtime=%d, mw_msgs=%d, mw_sleep=%d", realtime, mw_msgs, mw_sleep); + + return srs_success; +} + +srs_error_t SrsRtcSenderThread::on_reload_vhost_realtime(string vhost) +{ + return on_reload_vhost_play(vhost); +} + int SrsRtcSenderThread::cid() { return trd->cid(); @@ -576,6 +664,7 @@ void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* skt) srs_freep(sendonly_ukt); sendonly_ukt = skt->copy_sendonly(); + sender = skt->sender(); } srs_error_t SrsRtcSenderThread::cycle() @@ -583,33 +672,49 @@ srs_error_t SrsRtcSenderThread::cycle() srs_error_t err = srs_success; SrsSource* source = NULL; + SrsRequest* req = &rtc_session->request; // TODO: FIXME: Should refactor it, directly use http server as handler. ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(&rtc_session->request, handler, &source)) != srs_success) { + if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { return srs_error_wrap(err, "rtc fetch source failed"); } - srs_trace("source url=%s, source_id=[%d][%d], encrypt=%d", - rtc_session->request.get_stream_url().c_str(), ::getpid(), source->source_id(), rtc_session->encrypt); - SrsConsumer* consumer = NULL; SrsAutoFree(SrsConsumer, consumer); if ((err = source->create_consumer(NULL, consumer)) != srs_success) { - return srs_error_wrap(err, "rtc create consumer, source url=%s", rtc_session->request.get_stream_url().c_str()); + return srs_error_wrap(err, "rtc create consumer, source url=%s", req->get_stream_url().c_str()); } - // TODO: FIXME: Support reload. - SrsRequest* req = &rtc_session->request; - bool realtime = _srs_config->get_realtime_enabled(req->vhost, true); - srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); + // For RTC, we enable pass-timestamp mode, ignore the timestamp in queue, never depends on the duration, + // because RTC allows the audio and video has its own timebase, that is the audio timestamp and video timestamp + // maybe not monotonically increase. + // In this mode, we use mw_msgs to set the delay. We never shrink the consumer queue, instead, we dumps the + // messages and drop them if the shared sender queue is full. + consumer->enable_pass_timestamp(); + + realtime = _srs_config->get_realtime_enabled(req->vhost, true); + mw_sleep = _srs_config->get_mw_sleep(req->vhost, true); + mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime, true); + + // We merged write more messages, so we need larger queue. + if (mw_msgs > 2) { + sender->set_extra_ratio(150); + } else if (mw_msgs > 0) { + sender->set_extra_ratio(80); + } + + srs_trace("RTC source url=%s, source_id=[%d][%d], encrypt=%d, realtime=%d, mw_sleep=%dms, mw_msgs=%d", req->get_stream_url().c_str(), + ::getpid(), source->source_id(), rtc_session->encrypt, realtime, srsu2msi(mw_sleep), mw_msgs); SrsMessageArray msgs(SRS_PERF_MW_MSGS); + SrsRtcPackets pkts(SRS_PERF_RTC_RTP_PACKETS); SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_play(); SrsAutoFree(SrsPithyPrint, pprint); srs_trace("rtc session=%s, start play", rtc_session->id().c_str()); + bool stat_enabled = _srs_config->get_rtc_server_perf_stat(); SrsStatistic* stat = SrsStatistic::instance(); while (true) { @@ -618,68 +723,77 @@ srs_error_t SrsRtcSenderThread::cycle() } #ifdef SRS_PERF_QUEUE_COND_WAIT - if (realtime) { - // for realtime, min required msgs is 0, send when got one+ msgs. - consumer->wait(SRS_PERF_MW_MIN_MSGS_FOR_RTC_REALTIME, mw_sleep); - } else { - // for no-realtime, got some msgs then send. - consumer->wait(SRS_PERF_MW_MIN_MSGS_FOR_RTC, mw_sleep); - } + // Wait for amount of messages or a duration. + consumer->wait(mw_msgs, mw_sleep); #endif + // Try to read some messages. int msg_count = 0; if ((err = consumer->dump_packets(&msgs, msg_count)) != srs_success) { continue; } - if (msg_count <= 0) { + if (msg_count <= 0) { #ifndef SRS_PERF_QUEUE_COND_WAIT srs_usleep(mw_sleep); #endif - // ignore when nothing got. continue; } - SrsRtcPackets pkts(gso, merge_nalus); - if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, pkts)) != srs_success) { + // Transmux and send out messages. + pkts.reset(gso, merge_nalus); + + if ((err = send_messages(source, msgs.msgs, msg_count, pkts)) != srs_success) { srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); } + // Do cleanup messages. for (int i = 0; i < msg_count; i++) { SrsSharedPtrMessage* msg = msgs.msgs[i]; srs_freep(msg); } + // Stat for performance analysis. + if (!stat_enabled) { + continue; + } + // Stat the original RAW AV frame, maybe h264+aac. stat->perf_on_msgs(msg_count); // Stat the RTC packets, RAW AV frame, maybe h.264+opus. int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos; stat->perf_on_rtc_packets(nn_rtc_packets); // Stat the RAW RTP packets, which maybe group by GSO. - stat->perf_on_rtp_packets(pkts.packets.size()); + stat->perf_on_rtp_packets(pkts.size()); // Stat the RTP packets going into kernel. stat->perf_on_gso_packets(pkts.nn_rtp_pkts); + // Stat the bytes and paddings. + stat->perf_on_rtc_bytes(pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes); + // Stat the messages and dropped count. + stat->perf_on_dropped(msg_count, nn_rtc_packets, pkts.nn_dropped); + #if defined(SRS_DEBUG) - srs_trace("RTC PLAY packets, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d bytes", - msg_count, nn_rtc_packets, pkts.packets.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, - pkts.nn_samples, pkts.nn_bytes); + srs_trace("RTC PLAY perf, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes", + msg_count, nn_rtc_packets, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, + pkts.nn_samples, pkts.nn_bytes, pkts.nn_rtp_bytes, pkts.nn_padding_bytes); #endif pprint->elapse(); if (pprint->can_print()) { // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. - srs_trace("-> RTC PLAY %d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d bytes", - msg_count, pkts.packets.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, - pkts.nn_samples, pkts.nn_bytes); + srs_trace("-> RTC PLAY %d/%d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d/%d/%d bytes, %d pad, %d/%d cache", + msg_count, pkts.nn_dropped, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, pkts.nn_samples, pkts.nn_bytes, + pkts.nn_rtp_bytes, pkts.nn_padding_bytes, pkts.nn_paddings, pkts.size(), pkts.capacity()); } } } srs_error_t SrsRtcSenderThread::send_messages( - SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets + SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets ) { srs_error_t err = srs_success; + // If DTLS is not OK, drop all messages. if (!rtc_session->dtls_session) { return err; } @@ -692,7 +806,7 @@ srs_error_t SrsRtcSenderThread::send_messages( #ifndef SRS_AUTO_OSX // If enabled GSO, send out some packets in a msghdr. if (packets.use_gso) { - if ((err = send_packets_gso(skt, packets)) != srs_success) { + if ((err = send_packets_gso(packets)) != srs_success) { return srs_error_wrap(err, "gso send"); } return err; @@ -700,7 +814,7 @@ srs_error_t SrsRtcSenderThread::send_messages( #endif // By default, we send packets by sendmmsg. - if ((err = send_packets(skt, packets)) != srs_success) { + if ((err = send_packets(packets)) != srs_success) { return srs_error_wrap(err, "raw send"); } @@ -715,6 +829,12 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( for (int i = 0; i < nb_msgs; i++) { SrsSharedPtrMessage* msg = msgs[i]; + // If overflow, drop all messages. + if (sender->overflow()) { + packets.nn_dropped += nb_msgs - i; + return err; + } + // Update stats. packets.nn_bytes += msg->size; @@ -725,16 +845,14 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( packets.nn_samples += nn_samples; // For audio, we transcoded AAC to opus in extra payloads. - SrsRtpPacket2* packet = NULL; if (msg->is_audio()) { packets.nn_audios++; for (int i = 0; i < nn_extra_payloads; i++) { SrsSample* sample = msg->extra_payloads() + i; - if ((err = packet_opus(sample, &packet)) != srs_success) { + if ((err = packet_opus(sample, packets, msg->nn_max_extra_payloads())) != srs_success) { return srs_error_wrap(err, "opus package"); } - packets.packets.push_back(packet); } continue; } @@ -744,10 +862,9 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. if (msg->has_idr()) { - if ((err = packet_stap_a(source, msg, &packet)) != srs_success) { + if ((err = packet_stap_a(source, msg, packets)) != srs_success) { return srs_error_wrap(err, "packet stap-a"); } - packets.packets.push_back(packet); } // If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet. @@ -768,12 +885,10 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( continue; } - const int kRtpMaxPayloadSize = 1200; if (sample->size <= kRtpMaxPayloadSize) { - if ((err = packet_single_nalu(msg, sample, &packet)) != srs_success) { + if ((err = packet_single_nalu(msg, sample, packets)) != srs_success) { return srs_error_wrap(err, "packet single nalu"); } - packets.packets.push_back(packet); } else { if ((err = packet_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) { return srs_error_wrap(err, "packet fu-a"); @@ -781,7 +896,7 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( } if (i == nn_samples - 1) { - packets.packets.back()->rtp_header.set_marker(true); + packets.back()->rtp_header.set_marker(true); } } } @@ -789,15 +904,16 @@ srs_error_t SrsRtcSenderThread::messages_to_packets( return err; } -srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets) +srs_error_t SrsRtcSenderThread::send_packets(SrsRtcPackets& packets) { srs_error_t err = srs_success; - ISrsUdpSender* sender = skt->sender(); + // Cache the encrypt flag. + bool encrypt = rtc_session->encrypt; - vector::iterator it; - for (it = packets.packets.begin(); it != packets.packets.end(); ++it) { - SrsRtpPacket2* packet = *it; + int nn_packets = packets.size(); + for (int i = 0; i < nn_packets; i++) { + SrsRtpPacket2* packet = packets.at(i); // Fetch a cached message from queue. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. @@ -805,33 +921,43 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets if ((err = sender->fetch(&mhdr)) != srs_success) { return srs_error_wrap(err, "fetch msghdr"); } - char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; - int length = kRtpPacketSize; - // Marshal packet to bytes. + // For this message, select the first iovec. + iovec* iov = mhdr->msg_hdr.msg_iov; + mhdr->msg_hdr.msg_iovlen = 1; + + if (!iov->iov_base) { + iov->iov_base = new char[kRtpPacketSize]; + } + iov->iov_len = kRtpPacketSize; + + // Marshal packet to bytes in iovec. if (true) { - SrsBuffer stream(buf, length); + SrsBuffer stream((char*)iov->iov_base, iov->iov_len); if ((err = packet->encode(&stream)) != srs_success) { return srs_error_wrap(err, "encode packet"); } - length = stream.pos(); + iov->iov_len = stream.pos(); } // Whether encrypt the RTP bytes. - if (rtc_session->encrypt) { - if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length)) != srs_success) { + if (encrypt) { + int nn_encrypt = (int)iov->iov_len; + if ((err = rtc_session->dtls_session->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) { return srs_error_wrap(err, "srtp protect"); } + iov->iov_len = (size_t)nn_encrypt; } - sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); - socklen_t addrlen = (socklen_t)skt->peer_addrlen(); + packets.nn_rtp_bytes += (int)iov->iov_len; + + // Set the address and control information. + sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr(); + socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen(); mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; - mhdr->msg_hdr.msg_iov->iov_len = length; mhdr->msg_hdr.msg_controllen = 0; - mhdr->msg_len = 0; // When we send out a packet, we commit a RTP packet. packets.nn_rtp_pkts++; @@ -845,67 +971,86 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets } // TODO: FIXME: We can gather and pad audios, because they have similar size. -srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets) +srs_error_t SrsRtcSenderThread::send_packets_gso(SrsRtcPackets& packets) { srs_error_t err = srs_success; + // Cache the encrypt flag. + bool encrypt = rtc_session->encrypt; + // Previous handler, if has the same size, we can use GSO. mmsghdr* gso_mhdr = NULL; int gso_size = 0; int gso_encrypt = 0; int gso_cursor = 0; // GSO, N packets has same length, the final one may not. - bool use_gso = false; bool gso_final = false; + bool using_gso = false; bool gso_final = false; + // The message will marshal in iovec. + iovec* iov = NULL; - ISrsUdpSender* sender = skt->sender(); - int nn_packets = (int)packets.packets.size(); + int nn_packets = packets.size(); for (int i = 0; i < nn_packets; i++) { - SrsRtpPacket2* packet = packets.packets[i]; + SrsRtpPacket2* packet = packets.at(i); + int nn_packet = packet->nb_bytes(); + int padding = 0; - // The handler to send message. - mmsghdr* mhdr = NULL; + SrsRtpPacket2* next_packet = NULL; + int nn_next_packet = 0; + if (max_padding > 0) { + if (i < nn_packets - 1) { + next_packet = (i < nn_packets - 1)? packets.at(i + 1):NULL; + nn_next_packet = next_packet? next_packet->nb_bytes() : 0; + } + + // Padding the packet to next or GSO size. + if (next_packet) { + if (!using_gso) { + // Padding to the next packet to merge with it. + if (nn_next_packet > nn_packet) { + padding = nn_next_packet - nn_packet; + } + } else { + // Padding to GSO size for next one to merge with us. + if (nn_next_packet < gso_size) { + padding = gso_size - nn_packet; + } + } + + // Reset padding if exceed max. + if (padding > max_padding) { + padding = 0; + } + + if (padding > 0) { +#if defined(SRS_DEBUG) + srs_trace("#%d, Padding %d bytes %d=>%d, packets %d, max_padding %d", packets.debug_id, + padding, nn_packet, nn_packet + padding, nn_packets, max_padding); +#endif + packet->add_padding(padding); + nn_packet += padding; + packets.nn_paddings++; + packets.nn_padding_bytes += padding; + } + } + } // Check whether we can use GSO to send it. - int nn_packet = packet->nb_bytes(); - - if ((gso_size && gso_size == nn_packet) || (use_gso && !gso_final)) { - use_gso = true; - gso_final = (gso_size && gso_size != nn_packet); - mhdr = gso_mhdr; - - // We need to increase the iov and cursor. - int nb_iovs = mhdr->msg_hdr.msg_iovlen; - if (gso_cursor >= nb_iovs - 1) { - int nn_new_iovs = nb_iovs; - mhdr->msg_hdr.msg_iovlen = nb_iovs + nn_new_iovs; - mhdr->msg_hdr.msg_iov = (iovec*)realloc(mhdr->msg_hdr.msg_iov, sizeof(iovec) * (nb_iovs + nn_new_iovs)); - memset(mhdr->msg_hdr.msg_iov + nb_iovs, 0, sizeof(iovec) * nn_new_iovs); - } - gso_cursor++; - - // Create payload cache for RTP packet. - iovec* p = mhdr->msg_hdr.msg_iov + gso_cursor; - if (!p->iov_base) { - p->iov_base = new char[kRtpPacketSize]; - p->iov_len = kRtpPacketSize; - } + if (using_gso && !gso_final) { + gso_final = (gso_size != nn_packet); } - // Change the state according to the next packet. - if (i < nn_packets - 1) { - SrsRtpPacket2* next_packet = (i < nn_packets - 1)? packets.packets[i + 1]:NULL; - int nn_next_packet = next_packet? next_packet->nb_bytes() : 0; - - // If GSO, but next is bigger than this one, we must enter the final state. - if (use_gso && !gso_final) { - gso_final = (nn_packet < nn_next_packet); - } - + if (next_packet) { // If not GSO, maybe the first fresh packet, we should see whether the next packet is smaller than this one, // if smaller, we can still enter GSO. - if (!use_gso) { - use_gso = (nn_packet >= nn_next_packet); + if (!using_gso) { + using_gso = (nn_packet >= nn_next_packet); + } + + // If GSO, but next is bigger than this one, we must enter the final state. + if (using_gso && !gso_final) { + gso_final = (nn_packet < nn_next_packet); } } - // Now, we fetch the msg from cache. + // For GSO, reuse mhdr if possible. + mmsghdr* mhdr = gso_mhdr; if (!mhdr) { // Fetch a cached message from queue. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. @@ -913,23 +1058,26 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac return srs_error_wrap(err, "fetch msghdr"); } - // Reset the iovec, we should never change the msg_iovlen. - for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) { - iovec* p = mhdr->msg_hdr.msg_iov + j; - p->iov_len = 0; - } - // Now, GSO will use this message and size. - if (use_gso) { - gso_mhdr = mhdr; - gso_size = nn_packet; - } + gso_mhdr = mhdr; + gso_size = nn_packet; } - // Marshal packet to bytes. - iovec* iov = mhdr->msg_hdr.msg_iov + gso_cursor; + // For this message, select a new iovec. + if (!iov) { + iov = mhdr->msg_hdr.msg_iov; + } else { + iov++; + } + gso_cursor++; + mhdr->msg_hdr.msg_iovlen = gso_cursor; + + if (gso_cursor > SRS_PERF_RTC_GSO_IOVS && !iov->iov_base) { + iov->iov_base = new char[kRtpPacketSize]; + } iov->iov_len = kRtpPacketSize; + // Marshal packet to bytes in iovec. if (true) { SrsBuffer stream((char*)iov->iov_base, iov->iov_len); if ((err = packet->encode(&stream)) != srs_success) { @@ -939,7 +1087,7 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac } // Whether encrypt the RTP bytes. - if (rtc_session->encrypt) { + if (encrypt) { int nn_encrypt = (int)iov->iov_len; if ((err = rtc_session->dtls_session->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) { return srs_error_wrap(err, "srtp protect"); @@ -947,50 +1095,49 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac iov->iov_len = (size_t)nn_encrypt; } + packets.nn_rtp_bytes += (int)iov->iov_len; + // If GSO, they must has same size, except the final one. - if (use_gso && !gso_final && gso_encrypt && gso_encrypt != (int)iov->iov_len) { + if (using_gso && !gso_final && gso_encrypt && gso_encrypt != (int)iov->iov_len) { return srs_error_new(ERROR_RTC_RTP_MUXER, "GSO size=%d/%d, encrypt=%d/%d", gso_size, nn_packet, gso_encrypt, iov->iov_len); } - if (use_gso && !gso_final) { + if (using_gso && !gso_final) { gso_encrypt = iov->iov_len; } // If exceed the max GSO size, set to final. - if (use_gso && gso_cursor > 64) { + if (using_gso && gso_cursor + 1 >= SRS_PERF_RTC_GSO_MAX) { gso_final = true; } // For last message, or final gso, or determined not using GSO, send it now. - bool do_send = (i == nn_packets - 1 || gso_final || !use_gso); + bool do_send = (i == nn_packets - 1 || gso_final || !using_gso); #if defined(SRS_DEBUG) bool is_video = packet->rtp_header.get_payload_type() == video_payload_type; - srs_trace("Packet %s SSRC=%d, SN=%d, %d bytes", is_video? "Video":"Audio", packet->rtp_header.get_ssrc(), - packet->rtp_header.get_sequence(), nn_packet); + srs_trace("#%d, Packet %s SSRC=%d, SN=%d, %d/%d bytes", packets.debug_id, is_video? "Video":"Audio", + packet->rtp_header.get_ssrc(), packet->rtp_header.get_sequence(), nn_packet - padding, padding); if (do_send) { for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) { iovec* iov = mhdr->msg_hdr.msg_iov + j; - if (iov->iov_len <= 0) { - break; - } - srs_trace("%s #%d/%d/%d, %d bytes, size %d/%d", (use_gso? "GSO":"RAW"), j, gso_cursor + 1, - mhdr->msg_hdr.msg_iovlen, iov->iov_len, gso_size, gso_encrypt); + srs_trace("#%d, %s #%d/%d/%d, %d/%d bytes, size %d/%d", packets.debug_id, (using_gso? "GSO":"RAW"), j, + gso_cursor + 1, mhdr->msg_hdr.msg_iovlen, iov->iov_len, padding, gso_size, gso_encrypt); } } #endif if (do_send) { - sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); - socklen_t addrlen = (socklen_t)skt->peer_addrlen(); + // Set the address and control information. + sockaddr_in* addr = (sockaddr_in*)sendonly_ukt->peer_addr(); + socklen_t addrlen = (socklen_t)sendonly_ukt->peer_addrlen(); mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; mhdr->msg_hdr.msg_controllen = 0; - mhdr->msg_len = 0; #ifndef SRS_AUTO_OSX - if (use_gso) { + if (using_gso) { mhdr->msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t)); if (!mhdr->msg_hdr.msg_control) { mhdr->msg_hdr.msg_control = new char[mhdr->msg_hdr.msg_controllen]; @@ -1001,9 +1148,6 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac cm->cmsg_type = UDP_SEGMENT; cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); *((uint16_t*)CMSG_DATA(cm)) = gso_encrypt; - - // Private message, use it to store the cursor. - mhdr->msg_len = gso_cursor + 1; } #endif @@ -1016,13 +1160,14 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac // Reset the GSO flag. gso_mhdr = NULL; gso_size = 0; gso_encrypt = 0; gso_cursor = 0; - use_gso = gso_final = false; + using_gso = gso_final = false; iov = NULL; } } #if defined(SRS_DEBUG) - srs_trace("RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d", packets.packets.size(), - packets.nn_rtp_pkts, packets.nn_videos, packets.nn_samples, packets.nn_audios, packets.nn_extras); + srs_trace("#%d, RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d, pad %d/%d/%d", packets.debug_id, packets.size(), + packets.nn_rtp_pkts, packets.nn_videos, packets.nn_samples, packets.nn_audios, packets.nn_extras, packets.nn_paddings, + packets.nn_padding_bytes, packets.nn_rtp_bytes); #endif return err; @@ -1053,17 +1198,23 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac return err; } - const int kRtpMaxPayloadSize = 1200; if (nn_bytes < kRtpMaxPayloadSize) { // Package NALUs in a single RTP packet. - SrsRtpPacket2* packet = new SrsRtpPacket2(); + SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + srs_freep(raw); + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } + packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_payload_type(video_payload_type); + packet->payload = raw; - packets.packets.push_back(packet); } else { + // We must free it, should never use RTP packets to free it, + // because more than one RTP packet will refer to it. SrsAutoFree(SrsRtpRawNALUs, raw); // Package NALUs in FU-A RTP packets. @@ -1078,8 +1229,11 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac for (int i = 0; i < num_of_packet; ++i) { int packet_size = srs_min(nb_left, fu_payload_size); - SrsRtpPacket2* packet = new SrsRtpPacket2(); - packets.packets.push_back(packet); + SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + srs_freep(raw); + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); @@ -1102,34 +1256,46 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac } } - if (!packets.packets.empty()) { - packets.packets.back()->rtp_header.set_marker(true); + if (packets.size() > 0) { + packets.back()->rtp_header.set_marker(true); } return err; } -srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket) +srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtcPackets& packets, int nn_max_payload) { srs_error_t err = srs_success; - SrsRtpPacket2* packet = new SrsRtpPacket2(); + SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } packet->rtp_header.set_marker(true); packet->rtp_header.set_timestamp(audio_timestamp); packet->rtp_header.set_sequence(audio_sequence++); packet->rtp_header.set_ssrc(audio_ssrc); packet->rtp_header.set_payload_type(audio_payload_type); - SrsRtpRawPayload* raw = new SrsRtpRawPayload(); + SrsRtpRawPayload* raw = packet->reuse_raw(); raw->payload = sample->bytes; raw->nn_payload = sample->size; - packet->payload = raw; + + if (max_padding > 0) { + if (sample->size < nn_max_payload && nn_max_payload - sample->size < max_padding) { + int padding = nn_max_payload - sample->size; + packet->set_padding(padding); + +#if defined(SRS_DEBUG) + srs_trace("#%d, Fast Padding %d bytes %d=>%d, SN=%d, max_payload %d, max_padding %d", packets.debug_id, + padding, sample->size, sample->size + padding, packet->rtp_header.get_sequence(), nn_max_payload, max_padding); +#endif + } + } // TODO: FIXME: Why 960? Need Refactoring? audio_timestamp += 960; - *ppacket = packet; - return err; } @@ -1146,26 +1312,25 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* for (int i = 0; i < num_of_packet; ++i) { int packet_size = srs_min(nb_left, fu_payload_size); - SrsRtpPacket2* packet = new SrsRtpPacket2(); - packets.packets.push_back(packet); + SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_payload_type(video_payload_type); - SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); - packet->payload = fua; + SrsRtpFUAPayload2* fua = packet->reuse_fua(); fua->nri = (SrsAvcNaluType)header; fua->nalu_type = (SrsAvcNaluType)nal_type; fua->start = bool(i == 0); fua->end = bool(i == num_of_packet - 1); - SrsSample* fragment_sample = new SrsSample(); - fragment_sample->bytes = p; - fragment_sample->size = packet_size; - fua->nalus.push_back(fragment_sample); + fua->payload = p; + fua->size = packet_size; p += packet_size; nb_left -= packet_size; @@ -1175,30 +1340,27 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* } // Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 -srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket) +srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets) { srs_error_t err = srs_success; - SrsRtpPacket2* packet = new SrsRtpPacket2(); + SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_payload_type(video_payload_type); - SrsRtpRawNALUs* raw = new SrsRtpRawNALUs(); - packet->payload = raw; - - SrsSample* p = new SrsSample(); - p->bytes = sample->bytes; - p->size = sample->size; - raw->push_back(p); - - *ppacket = packet; + SrsRtpRawPayload* raw = packet->reuse_raw(); + raw->payload = sample->bytes; + raw->nn_payload = sample->size; return err; } -srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket) +srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -1218,7 +1380,10 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty"); } - SrsRtpPacket2* packet = new SrsRtpPacket2(); + SrsRtpPacket2* packet = packets.fetch(); + if (!packet) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cache empty"); + } packet->rtp_header.set_marker(false); packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); @@ -1245,8 +1410,6 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes stap->nalus.push_back(sample); } - *ppacket = packet; - return err; } @@ -1719,6 +1882,12 @@ SrsUdpMuxSender::SrsUdpMuxSender(SrsRtcServer* s) trd = new SrsDummyCoroutine(); cache_pos = 0; + max_sendmmsg = 0; + queue_length = 0; + extra_ratio = 0; + extra_queue = 0; + gso = false; + nn_senders = 0; _srs_config->subscribe(this); } @@ -1737,7 +1906,7 @@ SrsUdpMuxSender::~SrsUdpMuxSender() cache.clear(); } -srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd) +srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd, int senders) { srs_error_t err = srs_success; @@ -1750,27 +1919,39 @@ srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd) } max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); - bool gso = _srs_config->get_rtc_server_gso(); - srs_trace("UDP sender #%d init ok, max_sendmmsg=%d, gso=%d", srs_netfd_fileno(fd), max_sendmmsg, gso); + gso = _srs_config->get_rtc_server_gso(); + queue_length = srs_max(128, _srs_config->get_rtc_server_queue_length()); + nn_senders = senders; + + // For no GSO, we need larger queue. + if (!gso) { + queue_length *= 2; + } + + srs_trace("RTC sender #%d init ok, max_sendmmsg=%d, gso=%d, queue_max=%dx%d, extra_ratio=%d/%d", srs_netfd_fileno(fd), + max_sendmmsg, gso, queue_length, nn_senders, extra_ratio, extra_queue); return err; } void SrsUdpMuxSender::free_mhdrs(std::vector& mhdrs) { - for (int i = 0; i < (int)mhdrs.size(); i++) { + int nn_mhdrs = (int)mhdrs.size(); + for (int i = 0; i < nn_mhdrs; i++) { + // @see https://linux.die.net/man/2/sendmmsg + // @see https://linux.die.net/man/2/sendmsg mmsghdr* hdr = &mhdrs[i]; // Free control for GSO. char* msg_control = (char*)hdr->msg_hdr.msg_control; - srs_freep(msg_control); + srs_freepa(msg_control); // Free iovec. - for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) { + for (int j = SRS_PERF_RTC_GSO_MAX - 1; j >= 0 ; j--) { iovec* iov = hdr->msg_hdr.msg_iov + j; char* data = (char*)iov->iov_base; - srs_freep(data); - srs_freep(iov); + srs_freepa(data); + srs_freepa(iov); } } mhdrs.clear(); @@ -1780,14 +1961,22 @@ srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr) { // TODO: FIXME: Maybe need to shrink? if (cache_pos >= (int)cache.size()) { + // @see https://linux.die.net/man/2/sendmmsg + // @see https://linux.die.net/man/2/sendmsg mmsghdr mhdr; - memset(&mhdr, 0, sizeof(mmsghdr)); - mhdr.msg_hdr.msg_iovlen = 1; - mhdr.msg_hdr.msg_iov = new iovec(); - mhdr.msg_hdr.msg_iov->iov_base = new char[kRtpPacketSize]; - mhdr.msg_hdr.msg_iov->iov_len = kRtpPacketSize; mhdr.msg_len = 0; + mhdr.msg_hdr.msg_flags = 0; + mhdr.msg_hdr.msg_control = NULL; + + mhdr.msg_hdr.msg_iovlen = SRS_PERF_RTC_GSO_MAX; + mhdr.msg_hdr.msg_iov = new iovec[mhdr.msg_hdr.msg_iovlen]; + memset((void*)mhdr.msg_hdr.msg_iov, 0, sizeof(iovec) * mhdr.msg_hdr.msg_iovlen); + + for (int i = 0; i < SRS_PERF_RTC_GSO_IOVS; i++) { + iovec* p = mhdr.msg_hdr.msg_iov + i; + p->iov_base = new char[kRtpPacketSize]; + } cache.push_back(mhdr); } @@ -1796,6 +1985,25 @@ srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr) return srs_success; } +bool SrsUdpMuxSender::overflow() +{ + return cache_pos > queue_length + extra_queue; +} + +void SrsUdpMuxSender::set_extra_ratio(int r) +{ + // We use the larger extra ratio, because all vhosts shares the senders. + if (extra_ratio > r) { + return; + } + + extra_ratio = r; + extra_queue = queue_length * r / 100; + + srs_trace("RTC sender #%d extra queue, max_sendmmsg=%d, gso=%d, queue_max=%dx%d, extra_ratio=%d/%d, cache=%d/%d/%d", srs_netfd_fileno(lfd), + max_sendmmsg, gso, queue_length, nn_senders, extra_ratio, extra_queue, cache_pos, (int)cache.size(), (int)hotspot.size()); +} + srs_error_t SrsUdpMuxSender::sendmmsg(mmsghdr* hdr) { if (waiting_msgs) { @@ -1811,9 +2019,12 @@ srs_error_t SrsUdpMuxSender::cycle() srs_error_t err = srs_success; uint64_t nn_msgs = 0; uint64_t nn_msgs_last = 0; int nn_msgs_max = 0; + uint64_t nn_bytes = 0; int nn_bytes_max = 0; uint64_t nn_gso_msgs = 0; uint64_t nn_gso_iovs = 0; int nn_gso_msgs_max = 0; int nn_gso_iovs_max = 0; int nn_loop = 0; int nn_wait = 0; srs_utime_t time_last = srs_get_system_time(); + + bool stat_enabled = _srs_config->get_rtc_server_perf_stat(); SrsStatistic* stat = SrsStatistic::instance(); SrsPithyPrint* pprint = SrsPithyPrint::create_rtc_send(srs_netfd_fileno(lfd)); @@ -1827,9 +2038,8 @@ srs_error_t SrsUdpMuxSender::cycle() nn_loop++; int pos = cache_pos; - int gso_pos = 0; int gso_iovs = 0; - if (pos <= 0 && gso_pos == 0) { + if (pos <= 0) { waiting_msgs = true; nn_wait++; srs_cond_wait(cond); @@ -1840,26 +2050,12 @@ srs_error_t SrsUdpMuxSender::cycle() cache.swap(hotspot); cache_pos = 0; - // Collect informations for GSO - if (pos > 0) { - // For shared GSO cache, stat the messages. - mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; - for (p = &hotspot[0]; p < end; p++) { - if (!p->msg_len) { - continue; - } - - // Private message, use it to store the cursor. - int real_iovs = p->msg_len; - p->msg_len = 0; - - gso_pos++; nn_gso_msgs++; nn_gso_iovs += real_iovs; gso_iovs += real_iovs; - } - } - - // Send out all messages, may GSO if shared cache. + int gso_pos = 0; + int nn_writen = 0; if (pos > 0) { // Send out all messages. + // @see https://linux.die.net/man/2/sendmmsg + // @see https://linux.die.net/man/2/sendmsg mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; for (p = &hotspot[0]; p < end; p += max_sendmmsg) { int vlen = (int)(end - p); @@ -1870,13 +2066,37 @@ srs_error_t SrsUdpMuxSender::cycle() srs_warn("sendmmsg %d msgs, %d done", vlen, r0); } - stat->perf_sendmmsg_on_packets(vlen); + if (stat_enabled) { + stat->perf_on_sendmmsg_packets(vlen); + } + } + + // Collect informations for GSO. + if (stat_enabled) { + // Stat the messages, iovs and bytes. + // @see https://linux.die.net/man/2/sendmmsg + // @see https://linux.die.net/man/2/sendmsg + for (int i = 0; i < pos; i++) { + mmsghdr* mhdr = &hotspot[i]; + + nn_writen += (int)mhdr->msg_len; + + int real_iovs = mhdr->msg_hdr.msg_iovlen; + gso_pos++; nn_gso_msgs++; nn_gso_iovs += real_iovs; + gso_iovs += real_iovs; + } } } + if (!stat_enabled) { + continue; + } + // Increase total messages. - nn_msgs += pos; + nn_msgs += pos + gso_iovs; nn_msgs_max = srs_max(pos, nn_msgs_max); + nn_bytes += nn_writen; + nn_bytes_max = srs_max(nn_bytes_max, nn_writen); nn_gso_msgs_max = srs_max(gso_pos, nn_gso_msgs_max); nn_gso_iovs_max = srs_max(gso_iovs, nn_gso_iovs_max); @@ -1900,12 +2120,20 @@ srs_error_t SrsUdpMuxSender::cycle() pps_unit = "(k)"; pps_last /= 1000; pps_average /= 1000; } - srs_trace("-> RTC SEND #%d, sessions %d, udp %d/%d/%" PRId64 ", gso %d/%d/%" PRId64 ", iovs %d/%d/%" PRId64 ", pps %d/%d%s", - srs_netfd_fileno(lfd), (int)server->nn_sessions(), pos, nn_msgs_max, nn_msgs, gso_pos, nn_gso_msgs_max, nn_gso_msgs, gso_iovs, nn_gso_iovs_max, nn_gso_iovs, - pps_average, pps_last, pps_unit.c_str()); + int nn_cache = 0; + int nn_hotspot_size = (int)hotspot.size(); + for (int i = 0; i < nn_hotspot_size; i++) { + mmsghdr* hdr = &hotspot[i]; + nn_cache += hdr->msg_hdr.msg_iovlen; + } + + srs_trace("-> RTC SEND #%d, sessions %d, udp %d/%d/%" PRId64 ", gso %d/%d/%" PRId64 ", iovs %d/%d/%" PRId64 ", pps %d/%d%s, cache %d/%d, bytes %d/%" PRId64, + srs_netfd_fileno(lfd), (int)server->nn_sessions(), pos, nn_msgs_max, nn_msgs, gso_pos, nn_gso_msgs_max, nn_gso_msgs, gso_iovs, + nn_gso_iovs_max, nn_gso_iovs, pps_average, pps_last, pps_unit.c_str(), (int)hotspot.size(), nn_cache, nn_bytes_max, nn_bytes); nn_msgs_last = nn_msgs; time_last = srs_get_system_time(); nn_loop = nn_wait = nn_msgs_max = 0; nn_gso_msgs_max = 0; nn_gso_iovs_max = 0; + nn_bytes_max = 0; } } @@ -1994,7 +2222,7 @@ srs_error_t SrsRtcServer::listen_udp() return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); } - if ((err = sender->initialize(listener->stfd())) != srs_success) { + if ((err = sender->initialize(listener->stfd(), nn_listeners)) != srs_success) { return srs_error_wrap(err, "init sender"); } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 2ffe75441..c3796c711 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -50,6 +50,7 @@ class SrsRtcSession; class SrsSharedPtrMessage; class SrsSource; class SrsRtpPacket2; +class ISrsUdpSender; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -124,8 +125,18 @@ public: bool use_gso; bool should_merge_nalus; public: - // The total bytes of RTP packets. +#if defined(SRS_DEBUG) + // Debug id. + uint32_t debug_id; +#endif +public: + // The total bytes of AVFrame packets. int nn_bytes; + // The total bytes of RTP packets. + int nn_rtp_bytes; + // The total padded bytes. + int nn_padding_bytes; +public: // The RTP packets send out by sendmmsg or sendmsg. Note that if many packets group to // one msghdr by GSO, it's only one RTP packet, because we only send once. int nn_rtp_pkts; @@ -138,11 +149,24 @@ public: int nn_audios; // The original video messages. int nn_videos; + // The number of padded packet. + int nn_paddings; + // The number of dropped messages. + int nn_dropped; +private: + int cursor; + int nn_cache; + SrsRtpPacket2* cache; public: - std::vector packets; -public: - SrsRtcPackets(bool gso, bool merge_nalus); + SrsRtcPackets(int nn_cache_max); virtual ~SrsRtcPackets(); +public: + void reset(bool gso, bool merge_nalus); + SrsRtpPacket2* fetch(); + SrsRtpPacket2* back(); + int size(); + int capacity(); + SrsRtpPacket2* at(int index); }; class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler @@ -164,8 +188,16 @@ private: uint16_t video_sequence; public: SrsUdpMuxSocket* sendonly_ukt; +private: + ISrsUdpSender* sender; +private: bool merge_nalus; bool gso; + int max_padding; +private: + srs_utime_t mw_sleep; + int mw_msgs; + bool realtime; public: SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid); virtual ~SrsRtcSenderThread(); @@ -174,6 +206,8 @@ public: // interface ISrsReloadHandler public: virtual srs_error_t on_reload_rtc_server(); + virtual srs_error_t on_reload_vhost_play(std::string vhost); + virtual srs_error_t on_reload_vhost_realtime(std::string vhost); public: virtual int cid(); public: @@ -185,17 +219,17 @@ public: public: virtual srs_error_t cycle(); private: - srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); + srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); - srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); - srs_error_t send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); + srs_error_t send_packets(SrsRtcPackets& packets); + srs_error_t send_packets_gso(SrsRtcPackets& packets); private: - srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); + srs_error_t packet_opus(SrsSample* sample, SrsRtcPackets& packets, int nn_max_payload); private: srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets); srs_error_t packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets); - srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket); - srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket); + srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets); + srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets); }; class SrsRtcSession @@ -274,6 +308,9 @@ private: private: srs_cond_t cond; bool waiting_msgs; + bool gso; + int nn_senders; +private: // Hotspot msgs, we are working on it. // @remark We will wait util all messages are ready. std::vector hotspot; @@ -282,16 +319,24 @@ private: int cache_pos; // The max number of messages for sendmmsg. If 1, we use sendmsg to send. int max_sendmmsg; + // The total queue length, for each sender. + int queue_length; + // The extra queue ratio. + int extra_ratio; + int extra_queue; public: SrsUdpMuxSender(SrsRtcServer* s); virtual ~SrsUdpMuxSender(); public: - virtual srs_error_t initialize(srs_netfd_t fd); + virtual srs_error_t initialize(srs_netfd_t fd, int senders); private: void free_mhdrs(std::vector& mhdrs); public: virtual srs_error_t fetch(mmsghdr** pphdr); virtual srs_error_t sendmmsg(mmsghdr* hdr); + virtual bool overflow(); + virtual void set_extra_ratio(int r); +public: virtual srs_error_t cycle(); // interface ISrsReloadHandler public: diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index d06d79a42..0f8b1012a 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -116,7 +116,7 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* svr, srs_netfd_t c, string cip) : SrsConnect wakable = NULL; mw_sleep = SRS_PERF_MW_SLEEP; - mw_enabled = false; + mw_msgs = 0; realtime = SRS_PERF_MIN_LATENCY_ENABLED; send_min_interval = 0; tcp_nodelay = false; @@ -264,6 +264,10 @@ srs_error_t SrsRtmpConn::on_reload_vhost_play(string vhost) send_min_interval = v; } } + + mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); + mw_sleep = _srs_config->get_mw_sleep(req->vhost); + set_socket_buffer(mw_sleep); return err; } @@ -298,6 +302,10 @@ srs_error_t SrsRtmpConn::on_reload_vhost_realtime(string vhost) srs_trace("realtime changed %d=>%d", realtime, realtime_enabled); realtime = realtime_enabled; } + + mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); + mw_sleep = _srs_config->get_mw_sleep(req->vhost); + set_socket_buffer(mw_sleep); return err; } @@ -689,18 +697,19 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr SrsMessageArray msgs(SRS_PERF_MW_MSGS); bool user_specified_duration_to_stop = (req->duration > 0); int64_t starttime = -1; - + // setup the realtime. realtime = _srs_config->get_realtime_enabled(req->vhost); // setup the mw config. // when mw_sleep changed, resize the socket send buffer. - mw_enabled = true; - change_mw_sleep(_srs_config->get_mw_sleep(req->vhost)); + mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime); + mw_sleep = _srs_config->get_mw_sleep(req->vhost); + set_socket_buffer(mw_sleep); // initialize the send_min_interval send_min_interval = _srs_config->get_send_min_interval(req->vhost); - srs_trace("start play smi=%dms, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d", - srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_enabled, realtime, tcp_nodelay); + srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d", + srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_msgs, realtime, tcp_nodelay); while (true) { // when source is set to expired, disconnect it. @@ -730,13 +739,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr // wait for message to incoming. // @see https://github.com/ossrs/srs/issues/251 // @see https://github.com/ossrs/srs/issues/257 - if (realtime) { - // for realtime, min required msgs is 0, send when got one+ msgs. - consumer->wait(SRS_PERF_MW_MIN_MSGS_REALTIME, mw_sleep); - } else { - // for no-realtime, got some msgs then send. - consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep); - } + consumer->wait(mw_msgs, mw_sleep); #endif // get messages from consumer. @@ -750,9 +753,9 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr // reportable if (pprint->can_print()) { kbps->sample(); - srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d", + srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d", (int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep)); + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep), mw_msgs); } if (count <= 0) { @@ -1114,16 +1117,6 @@ srs_error_t SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsComm return err; } -void SrsRtmpConn::change_mw_sleep(srs_utime_t sleep_v) -{ - if (!mw_enabled) { - return; - } - - set_socket_buffer(sleep_v); - mw_sleep = sleep_v; -} - void SrsRtmpConn::set_sock_options() { SrsRequest* req = info->req; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index ac4b7efb9..c58111f04 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -102,8 +102,7 @@ private: srs_utime_t duration; // The MR(merged-write) sleep time in srs_utime_t. srs_utime_t mw_sleep; - // The MR(merged-write) only enabled for play. - int mw_enabled; + int mw_msgs; // For realtime // @see https://github.com/ossrs/srs/issues/257 bool realtime; @@ -149,7 +148,6 @@ private: virtual srs_error_t handle_publish_message(SrsSource* source, SrsCommonMessage* msg); virtual srs_error_t process_publish_message(SrsSource* source, SrsCommonMessage* msg); virtual srs_error_t process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); - virtual void change_mw_sleep(srs_utime_t sleep_v); virtual void set_sock_options(); private: virtual srs_error_t check_edge_token_traverse_auth(); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index fcf3b071e..c0548708f 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -269,9 +269,17 @@ void SrsMessageQueue::set_queue_size(srs_utime_t queue_size) max_queue_size = queue_size; } -srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow) +srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow, bool pass_timestamp) { srs_error_t err = srs_success; + + msgs.push_back(msg); + + // For RTC, we never care about the timestamp and duration, so we never shrink queue here, + // but we will drop messages in each consumer coroutine. + if (pass_timestamp) { + return err; + } if (msg->is_av()) { if (av_start_time == -1) { @@ -281,8 +289,6 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS); } - msgs.push_back(msg); - while (av_end_time - av_start_time > max_queue_size) { // notice the caller queue already overflow and shrinked. if (is_overflow) { @@ -295,7 +301,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow return err; } -srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) +srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp) { srs_error_t err = srs_success; @@ -308,13 +314,15 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p count = srs_min(max_count, nb_msgs); SrsSharedPtrMessage** omsgs = msgs.data(); - for (int i = 0; i < count; i++) { - pmsgs[i] = omsgs[i]; + memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*)); + + // For RTC, we enable pass_timestamp mode, which never care about the timestamp and duration, + // so we do not have to update the start time here. + if (!pass_timestamp) { + SrsSharedPtrMessage* last = omsgs[count - 1]; + av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS); } - SrsSharedPtrMessage* last = omsgs[count - 1]; - av_start_time = srs_utime_t(last->timestamp * SRS_UTIME_MILLISECONDS); - if (count >= nb_msgs) { // the pmsgs is big enough and clear msgs at most time. msgs.clear(); @@ -433,6 +441,8 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c) mw_duration = 0; mw_waiting = false; #endif + + pass_timestamp = false; } SrsConsumer::~SrsConsumer() @@ -466,20 +476,35 @@ srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsR srs_error_t err = srs_success; SrsSharedPtrMessage* msg = shared_msg->copy(); - - if (!atc) { + + // For RTC, we enable pass_timestamp mode, which never correct or depends on monotonic increasing of + // timestamp. And in RTC, the audio and video timebase can be different, so we ignore time_jitter here. + if (!pass_timestamp && !atc) { if ((err = jitter->correct(msg, ag)) != srs_success) { return srs_error_wrap(err, "consume message"); } } - - if ((err = queue->enqueue(msg, NULL)) != srs_success) { + + // Put message in queue, here we may enable pass_timestamp mode. + if ((err = queue->enqueue(msg, NULL, pass_timestamp)) != srs_success) { return srs_error_wrap(err, "enqueue message"); } #ifdef SRS_PERF_QUEUE_COND_WAIT // fire the mw when msgs is enough. if (mw_waiting) { + // For RTC, we use pass_timestamp mode, we don't care about the timestamp in queue, + // so we only check the messages in queue. + if (pass_timestamp) { + if (queue->size() > mw_min_msgs) { + srs_cond_signal(mw_wait); + mw_waiting = false; + return err; + } + return err; + } + + // For RTMP, we wait for messages and duration. srs_utime_t duration = queue->duration(); bool match_min_msgs = queue->size() > mw_min_msgs; @@ -529,7 +554,7 @@ srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count) } // pump msgs from queue. - if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) { + if ((err = queue->dump_packets(max, msgs->msgs, count, pass_timestamp)) != srs_success) { return srs_error_wrap(err, "dump packets"); } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 6e64e9b21..fdfe9c789 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -151,12 +151,13 @@ public: // Enqueue the message, the timestamp always monotonically. // @param msg, the msg to enqueue, user never free it whatever the return code. // @param is_overflow, whether overflow and shrinked. NULL to ignore. - virtual srs_error_t enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL); + // @remark If pass_timestamp, we never shrink and never care about the timestamp or duration. + virtual srs_error_t enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL, bool pass_timestamp = false); // Get packets in consumer queue. // @pmsgs SrsSharedPtrMessage*[], used to store the msgs, user must alloc it. // @count the count in array, output param. // @max_count the max count to dequeue, must be positive. - virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); + virtual srs_error_t dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count, bool pass_timestamp = false); // Dumps packets to consumer, use specified args. // @remark the atc/tba/tbv/ag are same to SrsConsumer.enqueue(). virtual srs_error_t dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag); @@ -203,10 +204,17 @@ private: int mw_min_msgs; srs_utime_t mw_duration; #endif +private: + // For RTC, we never use jitter to correct timestamp. + // But we should not change the atc or time_jitter for source or RTMP. + // @remark In this mode, we also never check the queue by timstamp, but only by count. + bool pass_timestamp; public: SrsConsumer(SrsSource* s, SrsConnection* c); virtual ~SrsConsumer(); public: + // Use pass timestamp mode. + void enable_pass_timestamp() { pass_timestamp = true; } // Set the size of queue. virtual void set_queue_size(srs_utime_t queue_size); // when source id changed, notice client to print. diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index 01abcd446..890e43746 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -271,6 +271,8 @@ SrsStatistic::SrsStatistic() perf_gso = new SrsStatisticCategory(); perf_rtp = new SrsStatisticCategory(); perf_rtc = new SrsStatisticCategory(); + perf_bytes = new SrsStatisticCategory(); + perf_dropped = new SrsStatisticCategory(); } SrsStatistic::~SrsStatistic() @@ -311,6 +313,8 @@ SrsStatistic::~SrsStatistic() srs_freep(perf_gso); srs_freep(perf_rtp); srs_freep(perf_rtc); + srs_freep(perf_bytes); + srs_freep(perf_dropped); } SrsStatistic* SrsStatistic::instance() @@ -641,7 +645,7 @@ srs_error_t SrsStatistic::dumps_perf_writev_iovs(SrsJsonObject* obj) return dumps_perf(perf_iovs, obj); } -void SrsStatistic::perf_sendmmsg_on_packets(int nb_packets) +void SrsStatistic::perf_on_sendmmsg_packets(int nb_packets) { perf_on_packets(perf_sendmmsg, nb_packets); } @@ -651,6 +655,73 @@ srs_error_t SrsStatistic::dumps_perf_sendmmsg(SrsJsonObject* obj) return dumps_perf(perf_sendmmsg, obj); } +void SrsStatistic::perf_on_rtc_bytes(int nn_bytes, int nn_rtp_bytes, int nn_padding) +{ + // a: AVFrame bytes. + // b: RTC bytes. + // c: RTC paddings. + perf_bytes->a += nn_bytes; + perf_bytes->b += nn_rtp_bytes; + perf_bytes->c += nn_padding; + + perf_bytes->nn += nn_rtp_bytes; +} + +srs_error_t SrsStatistic::dumps_perf_bytes(SrsJsonObject* obj) +{ + obj->set("avframe_bytes", SrsJsonAny::integer(perf_bytes->a)); + obj->set("rtc_bytes", SrsJsonAny::integer(perf_bytes->b)); + obj->set("rtc_padding", SrsJsonAny::integer(perf_bytes->c)); + + obj->set("nn", SrsJsonAny::integer(perf_bytes->nn)); + + return srs_success; +} + +void SrsStatistic::perf_on_dropped(int nn_msgs, int nn_rtc, int nn_dropped) +{ + // a: System AVFrames. + // b: RTC frames. + // c: Dropped frames. + perf_dropped->a += nn_msgs; + perf_dropped->b += nn_rtc; + perf_dropped->c += nn_dropped; + + perf_dropped->nn += nn_dropped; +} + +srs_error_t SrsStatistic::dumps_perf_dropped(SrsJsonObject* obj) +{ + obj->set("avframes", SrsJsonAny::integer(perf_dropped->a)); + obj->set("rtc_frames", SrsJsonAny::integer(perf_dropped->b)); + obj->set("rtc_dropeed", SrsJsonAny::integer(perf_dropped->c)); + + obj->set("nn", SrsJsonAny::integer(perf_dropped->nn)); + + return srs_success; +} + +void SrsStatistic::reset_perf() +{ + srs_freep(perf_iovs); + srs_freep(perf_msgs); + srs_freep(perf_sendmmsg); + srs_freep(perf_gso); + srs_freep(perf_rtp); + srs_freep(perf_rtc); + srs_freep(perf_bytes); + srs_freep(perf_dropped); + + perf_iovs = new SrsStatisticCategory(); + perf_msgs = new SrsStatisticCategory(); + perf_sendmmsg = new SrsStatisticCategory(); + perf_gso = new SrsStatisticCategory(); + perf_rtp = new SrsStatisticCategory(); + perf_rtc = new SrsStatisticCategory(); + perf_bytes = new SrsStatisticCategory(); + perf_dropped = new SrsStatisticCategory(); +} + void SrsStatistic::perf_on_packets(SrsStatisticCategory* p, int nb_msgs) { // The range for stat: diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 0d9905f75..81ed9e27d 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -174,6 +174,8 @@ private: SrsStatisticCategory* perf_gso; SrsStatisticCategory* perf_rtp; SrsStatisticCategory* perf_rtc; + SrsStatisticCategory* perf_bytes; + SrsStatisticCategory* perf_dropped; private: SrsStatistic(); virtual ~SrsStatistic(); @@ -245,13 +247,11 @@ public: // Stat for packets merged written, nb_packets is the number of RTP packets. // For example, a RTC/opus packet maybe package to three RTP packets. virtual void perf_on_rtp_packets(int nb_packets); - // Dumps the perf statistic data for RTP packets, for performance analysis. virtual srs_error_t dumps_perf_rtp_packets(SrsJsonObject* obj); public: // Stat for packets UDP GSO, nb_packets is the merged RTP packets. // For example, three RTP/audio packets maybe GSO to one msghdr. virtual void perf_on_gso_packets(int nb_packets); - // Dumps the perf statistic data for UDP GSO, for performance analysis. virtual srs_error_t dumps_perf_gso(SrsJsonObject* obj); public: // Stat for TCP writev, nb_iovs is the total number of iovec. @@ -259,9 +259,19 @@ public: virtual srs_error_t dumps_perf_writev_iovs(SrsJsonObject* obj); public: // Stat for packets UDP sendmmsg, nb_packets is the vlen for sendmmsg. - virtual void perf_sendmmsg_on_packets(int nb_packets); - // Dumps the perf statistic data for UDP sendmmsg, for performance analysis. + virtual void perf_on_sendmmsg_packets(int nb_packets); virtual srs_error_t dumps_perf_sendmmsg(SrsJsonObject* obj); +public: + // Stat for bytes, nn_bytes is the size of bytes, nb_padding is padding bytes. + virtual void perf_on_rtc_bytes(int nn_bytes, int nn_rtp_bytes, int nn_padding); + virtual srs_error_t dumps_perf_bytes(SrsJsonObject* obj); +public: + // Stat for rtc messages, nn_rtc is rtc messages, nn_dropped is dropped messages. + virtual void perf_on_dropped(int nn_msgs, int nn_rtc, int nn_dropped); + virtual srs_error_t dumps_perf_dropped(SrsJsonObject* obj); +public: + // Reset all perf stat data. + virtual void reset_perf(); private: virtual void perf_on_packets(SrsStatisticCategory* p, int nb_msgs); virtual srs_error_t dumps_perf(SrsStatisticCategory* p, SrsJsonObject* obj); diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 7cafa055e..39493646a 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -129,10 +129,10 @@ #ifdef SRS_PERF_QUEUE_COND_WAIT // For RTMP, use larger wait queue. #define SRS_PERF_MW_MIN_MSGS 8 - #define SRS_PERF_MW_MIN_MSGS_REALTIME 0 // For RTC, use smaller wait queue. - #define SRS_PERF_MW_MIN_MSGS_FOR_RTC 2 - #define SRS_PERF_MW_MIN_MSGS_FOR_RTC_REALTIME 0 + #define SRS_PERF_MW_MIN_MSGS_FOR_RTC 1 + // For Real-Time, never wait messages. + #define SRS_PERF_MW_MIN_MSGS_REALTIME 0 #endif /** * the default value of vhost for @@ -192,5 +192,27 @@ #define SRS_PERF_GLIBC_MEMORY_CHECK #undef SRS_PERF_GLIBC_MEMORY_CHECK +// For RTC, how many iovs we alloc for each mmsghdr for GSO. +// Assume that there are 3300 clients, say, 10000 msgs in queue to send, the memory is: +// 2 # We have two queue, cache and hotspot. +// * 4 # We have reuseport, each have msg cache queue. +// * (64 + 16*SRS_PERF_RTC_GSO_MAX + SRS_PERF_RTC_GSO_IOVS * 1500) # Each message size. +// * 10000 # Total messages. +// = 197MB # For SRS_PERF_RTC_GSO_IOVS = 1 +// = 311MB # For SRS_PERF_RTC_GSO_IOVS = 2 +// = 540MB # For SRS_PERF_RTC_GSO_IOVS = 4 +// = 998MB # For SRS_PERF_RTC_GSO_IOVS = 8 +#if defined(__linux__) + #define SRS_PERF_RTC_GSO_IOVS 4 +#else + #define SRS_PERF_RTC_GSO_IOVS 1 +#endif + +// For RTC, the max iovs in msghdr, the max packets sent in a msghdr. +#define SRS_PERF_RTC_GSO_MAX 64 + +// For RTC, the max count of RTP packets we process in one loop. +#define SRS_PERF_RTC_RTP_PACKETS 1024 + #endif diff --git a/trunk/src/kernel/srs_kernel_buffer.cpp b/trunk/src/kernel/srs_kernel_buffer.cpp index 73c6cafe1..be94d0bcb 100644 --- a/trunk/src/kernel/srs_kernel_buffer.cpp +++ b/trunk/src/kernel/srs_kernel_buffer.cpp @@ -67,11 +67,6 @@ SrsBuffer::~SrsBuffer() { } -char* SrsBuffer::data() -{ - return bytes; -} - int SrsBuffer::size() { return nb_bytes; diff --git a/trunk/src/kernel/srs_kernel_buffer.hpp b/trunk/src/kernel/srs_kernel_buffer.hpp index 870da9ef8..532375f8d 100644 --- a/trunk/src/kernel/srs_kernel_buffer.hpp +++ b/trunk/src/kernel/srs_kernel_buffer.hpp @@ -113,7 +113,8 @@ public: * get data of stream, set by initialize. * current bytes = data() + pos() */ - virtual char* data(); + inline char* data() { return bytes; } + inline char* head() { return p; } /** * the total stream size, set by initialize. * left bytes = size() - pos(). diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 5a6b8c6e5..c5421d640 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -422,7 +422,7 @@ public: }; // Error helpers, should use these functions to new or wrap an error. -#define srs_success SrsCplxError::success() +#define srs_success 0 // SrsCplxError::success() #define srs_error_new(ret, fmt, ...) SrsCplxError::create(__FUNCTION__, __FILE__, __LINE__, ret, fmt, ##__VA_ARGS__) #define srs_error_wrap(err, fmt, ...) SrsCplxError::wrap(__FUNCTION__, __FILE__, __LINE__, err, fmt, ##__VA_ARGS__) #define srs_error_copy(err) SrsCplxError::copy(err) diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index 28b2d938b..eead02374 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -218,6 +218,7 @@ SrsSharedPtrMessage::SrsSharedPtrPayload::SrsSharedPtrPayload() extra_payloads = NULL; nn_extra_payloads = 0; + nn_max_extra_payloads = 0; #endif } @@ -227,9 +228,10 @@ SrsSharedPtrMessage::SrsSharedPtrPayload::~SrsSharedPtrPayload() srs_memory_unwatch(payload); #endif srs_freepa(payload); - srs_freepa(samples); #ifdef SRS_AUTO_RTC + srs_freepa(samples); + for (int i = 0; i < nn_extra_payloads; i++) { SrsSample* p = extra_payloads + i; srs_freep(p->bytes); diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index 84d7d34d9..8cfda3557 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -310,10 +310,14 @@ private: int nn_samples; // For RTC video, whether NALUs has IDR. bool has_idr; + public: // For RTC audio, we may need to transcode AAC to opus, // so there must be an extra payloads, which is transformed from payload. SrsSample* extra_payloads; int nn_extra_payloads; + // The max size payload in extras. + // @remark For GSO to fast guess the best padding. + int nn_max_extra_payloads; #endif public: SrsSharedPtrPayload(); @@ -363,6 +367,9 @@ public: void set_extra_payloads(SrsSample* payloads, int nn_payloads); int nn_extra_payloads() { return ptr->nn_extra_payloads; } SrsSample* extra_payloads() { return ptr->extra_payloads; } + // The max extra payload size. + void set_max_extra_payload(int v) { ptr->nn_max_extra_payloads = v; } + int nn_max_extra_payloads() { return ptr->nn_max_extra_payloads; } // Whether samples has idr. bool has_idr() { return ptr->has_idr; } void set_has_idr(bool v) { ptr->has_idr = v; } diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index ae044d4a2..5c23cbd34 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -55,27 +55,15 @@ SrsRtpHeader::SrsRtpHeader() extension_length = 0; } -SrsRtpHeader::SrsRtpHeader(const SrsRtpHeader& rhs) +void SrsRtpHeader::reset() { - operator=(rhs); -} - -SrsRtpHeader& SrsRtpHeader::operator=(const SrsRtpHeader& rhs) -{ - padding = rhs.padding; - extension = rhs.extension; - cc = rhs.cc; - marker = rhs.marker; - payload_type = rhs.payload_type; - sequence = rhs.sequence; - timestamp = rhs.timestamp; - ssrc = rhs.ssrc; - for (size_t i = 0; i < cc; ++i) { - csrc[i] = rhs.csrc[i]; - } - extension_length = rhs.extension_length; - - return *this; + // We only reset the optional fields, the required field such as ssrc + // will always be set by user. + padding = false; + extension = false; + cc = 0; + marker = false; + extension_length = 0; } SrsRtpHeader::~SrsRtpHeader() @@ -95,32 +83,63 @@ srs_error_t SrsRtpHeader::encode(SrsBuffer* stream) { srs_error_t err = srs_success; + // Encode the RTP fix header, 12bytes. + // @see https://tools.ietf.org/html/rfc1889#section-5.1 + char* op = stream->head(); + char* p = op; + + // The version, padding, extension and cc, total 1 byte. uint8_t v = 0x80 | cc; if (padding) { - v |= 0x40; + v |= 0x20; } if (extension) { v |= 0x10; } - stream->write_1bytes(v); + *p++ = v; + // The marker and payload type, total 1 byte. v = payload_type; if (marker) { v |= kRtpMarker; } - stream->write_1bytes(v); + *p++ = v; - stream->write_2bytes(sequence); - stream->write_4bytes(timestamp); - stream->write_4bytes(ssrc); + // The sequence number, 2 bytes. + char* pp = (char*)&sequence; + *p++ = pp[1]; + *p++ = pp[0]; + + // The timestamp, 4 bytes. + pp = (char*)×tamp; + *p++ = pp[3]; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + + // The SSRC, 4 bytes. + pp = (char*)&ssrc; + *p++ = pp[3]; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + + // The CSRC list: 0 to 15 items, each is 4 bytes. for (size_t i = 0; i < cc; ++i) { - stream->write_4bytes(csrc[i]); + pp = (char*)&csrc[i]; + *p++ = pp[3]; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; } // TODO: Write exteinsion field. if (extension) { } + // Consume the data. + stream->skip(p - op); + return err; } @@ -129,51 +148,77 @@ size_t SrsRtpHeader::header_size() return kRtpHeaderFixedSize + cc * 4 + (extension ? (extension_length + 1) * 4 : 0); } -void SrsRtpHeader::set_marker(bool marker) -{ - this->marker = marker; -} - -void SrsRtpHeader::set_payload_type(uint8_t payload_type) -{ - this->payload_type = payload_type; -} - -void SrsRtpHeader::set_sequence(uint16_t sequence) -{ - this->sequence = sequence; -} - -void SrsRtpHeader::set_timestamp(int64_t timestamp) -{ - this->timestamp = timestamp; -} - -void SrsRtpHeader::set_ssrc(uint32_t ssrc) -{ - this->ssrc = ssrc; -} - SrsRtpPacket2::SrsRtpPacket2() { payload = NULL; padding = 0; + + cache_raw = new SrsRtpRawPayload(); + cache_fua = new SrsRtpFUAPayload2(); + cache_payload = 0; } SrsRtpPacket2::~SrsRtpPacket2() { + // We may use the cache as payload. + if (payload == cache_raw || payload == cache_fua) { + payload = NULL; + } + srs_freep(payload); + srs_freep(cache_raw); + srs_freep(cache_fua); } void SrsRtpPacket2::set_padding(int size) { rtp_header.set_padding(size > 0); + if (cache_payload) { + cache_payload += size - padding; + } padding = size; } +void SrsRtpPacket2::add_padding(int size) +{ + rtp_header.set_padding(padding + size > 0); + if (cache_payload) { + cache_payload += size; + } + padding += size; +} + +void SrsRtpPacket2::reset() +{ + rtp_header.reset(); + padding = 0; + cache_payload = 0; + + // We may use the cache as payload. + if (payload == cache_raw || payload == cache_fua) { + payload = NULL; + } + srs_freep(payload); +} + +SrsRtpRawPayload* SrsRtpPacket2::reuse_raw() +{ + payload = cache_raw; + return cache_raw; +} + +SrsRtpFUAPayload2* SrsRtpPacket2::reuse_fua() +{ + payload = cache_fua; + return cache_fua; +} + int SrsRtpPacket2::nb_bytes() { - return rtp_header.header_size() + (payload? payload->nb_bytes():0) + padding; + if (!cache_payload) { + cache_payload = rtp_header.header_size() + (payload? payload->nb_bytes():0) + padding; + } + return cache_payload; } srs_error_t SrsRtpPacket2::encode(SrsBuffer* buf) @@ -188,11 +233,11 @@ srs_error_t SrsRtpPacket2::encode(SrsBuffer* buf) return srs_error_wrap(err, "encode payload"); } - if (padding) { + if (padding > 0) { if (!buf->require(padding)) { return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", padding); } - memset(buf->data(), padding, padding); + memset(buf->data() + buf->pos(), padding, padding); buf->skip(padding); } @@ -237,12 +282,13 @@ SrsRtpRawNALUs::SrsRtpRawNALUs() SrsRtpRawNALUs::~SrsRtpRawNALUs() { - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; - srs_freep(p); + if (true) { + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + srs_freep(p); + } } - nalus.clear(); } void SrsRtpRawNALUs::push_back(SrsSample* sample) @@ -270,19 +316,19 @@ uint8_t SrsRtpRawNALUs::skip_first_byte() return uint8_t(nalus[0]->bytes[0]); } -srs_error_t SrsRtpRawNALUs::read_samples(vector& samples, int size) +srs_error_t SrsRtpRawNALUs::read_samples(vector& samples, int packet_size) { - if (cursor + size < 0 || cursor + size > nn_bytes) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "cursor=%d, max=%d, size=%d", cursor, nn_bytes, size); + if (cursor + packet_size < 0 || cursor + packet_size > nn_bytes) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cursor=%d, max=%d, size=%d", cursor, nn_bytes, packet_size); } int pos = cursor; - cursor += size; - int left = size; + cursor += packet_size; + int left = packet_size; - vector::iterator it; - for (it = nalus.begin(); it != nalus.end() && left > 0; ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; left > 0 && i < nn_nalus; i++) { + SrsSample* p = nalus[i]; // Ignore previous consumed samples. if (pos && pos - p->size >= 0) { @@ -295,9 +341,10 @@ srs_error_t SrsRtpRawNALUs::read_samples(vector& samples, int size) srs_assert(nn > 0); SrsSample* sample = new SrsSample(); + samples.push_back(sample); + sample->bytes = p->bytes + pos; sample->size = nn; - samples.push_back(sample); left -= nn; pos = 0; @@ -310,9 +357,9 @@ int SrsRtpRawNALUs::nb_bytes() { int size = 0; - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; size += p->size; } @@ -321,9 +368,9 @@ int SrsRtpRawNALUs::nb_bytes() srs_error_t SrsRtpRawNALUs::encode(SrsBuffer* buf) { - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; if (!buf->require(p->size)) { return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", p->size); @@ -342,21 +389,20 @@ SrsRtpSTAPPayload::SrsRtpSTAPPayload() SrsRtpSTAPPayload::~SrsRtpSTAPPayload() { - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; srs_freep(p); } - nalus.clear(); } int SrsRtpSTAPPayload::nb_bytes() { int size = 1; - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; size += 2 + p->size; } @@ -376,9 +422,10 @@ srs_error_t SrsRtpSTAPPayload::encode(SrsBuffer* buf) buf->write_1bytes(v); // NALUs. - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + if (!buf->require(2 + p->size)) { return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2 + p->size); } @@ -398,21 +445,20 @@ SrsRtpFUAPayload::SrsRtpFUAPayload() SrsRtpFUAPayload::~SrsRtpFUAPayload() { - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; srs_freep(p); } - nalus.clear(); } int SrsRtpFUAPayload::nb_bytes() { int size = 2; - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; size += p->size; } @@ -441,9 +487,10 @@ srs_error_t SrsRtpFUAPayload::encode(SrsBuffer* buf) buf->write_1bytes(fu_header); // FU payload, @see https://tools.ietf.org/html/rfc6184#section-5.8 - vector::iterator it; - for (it = nalus.begin(); it != nalus.end(); ++it) { - SrsSample* p = *it; + int nn_nalus = (int)nalus.size(); + for (int i = 0; i < nn_nalus; i++) { + SrsSample* p = nalus[i]; + if (!buf->require(p->size)) { return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", p->size); } @@ -454,6 +501,57 @@ srs_error_t SrsRtpFUAPayload::encode(SrsBuffer* buf) return srs_success; } +SrsRtpFUAPayload2::SrsRtpFUAPayload2() +{ + start = end = false; + nri = nalu_type = (SrsAvcNaluType)0; + + payload = NULL; + size = 0; +} + +SrsRtpFUAPayload2::~SrsRtpFUAPayload2() +{ +} + +int SrsRtpFUAPayload2::nb_bytes() +{ + return 2 + size; +} + +srs_error_t SrsRtpFUAPayload2::encode(SrsBuffer* buf) +{ + if (!buf->require(2 + size)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1); + } + + // Fast encoding. + char* p = buf->head(); + + // FU indicator, @see https://tools.ietf.org/html/rfc6184#section-5.8 + uint8_t fu_indicate = kFuA; + fu_indicate |= (nri & (~kNalTypeMask)); + *p++ = fu_indicate; + + // FU header, @see https://tools.ietf.org/html/rfc6184#section-5.8 + uint8_t fu_header = nalu_type; + if (start) { + fu_header |= kStart; + } + if (end) { + fu_header |= kEnd; + } + *p++ = fu_header; + + // FU payload, @see https://tools.ietf.org/html/rfc6184#section-5.8 + memcpy(p, payload, size); + + // Consume bytes. + buf->skip(2 + size); + + return srs_success; +} + SrsRtpSharedPacket::SrsRtpSharedPacketPayload::SrsRtpSharedPacketPayload() { payload = NULL; diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index 7733b431f..349c388a5 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -38,6 +38,8 @@ const uint8_t kRtpMarker = 0x80; const uint8_t kNalTypeMask = 0x1F; class SrsBuffer; +class SrsRtpRawPayload; +class SrsRtpFUAPayload2; class SrsRtpHeader { @@ -48,7 +50,7 @@ private: bool marker; uint8_t payload_type; uint16_t sequence; - int64_t timestamp; + int32_t timestamp; uint32_t ssrc; uint32_t csrc[15]; uint16_t extension_length; @@ -56,25 +58,24 @@ private: public: SrsRtpHeader(); virtual ~SrsRtpHeader(); - SrsRtpHeader(const SrsRtpHeader& rhs); - SrsRtpHeader& operator=(const SrsRtpHeader& rhs); + void reset(); public: srs_error_t decode(SrsBuffer* stream); srs_error_t encode(SrsBuffer* stream); public: size_t header_size(); public: - void set_marker(bool marker); + inline void set_marker(bool v) { marker = v; } bool get_marker() const { return marker; } - void set_payload_type(uint8_t payload_type); + inline void set_payload_type(uint8_t v) { payload_type = v; } uint8_t get_payload_type() const { return payload_type; } - void set_sequence(uint16_t sequence); + inline void set_sequence(uint16_t v) { sequence = v; } uint16_t get_sequence() const { return sequence; } - void set_timestamp(int64_t timestamp); + inline void set_timestamp(int64_t v) { timestamp = (uint32_t)v; } int64_t get_timestamp() const { return timestamp; } - void set_ssrc(uint32_t ssrc); + inline void set_ssrc(uint32_t v) { ssrc = v; } uint32_t get_ssrc() const { return ssrc; } - void set_padding(bool v) { padding = v; } + inline void set_padding(bool v) { padding = v; } }; class SrsRtpPacket2 @@ -83,12 +84,24 @@ public: SrsRtpHeader rtp_header; ISrsEncoder* payload; int padding; +private: + SrsRtpRawPayload* cache_raw; + SrsRtpFUAPayload2* cache_fua; + int cache_payload; public: SrsRtpPacket2(); virtual ~SrsRtpPacket2(); public: - // Append size of bytes as padding. - virtual void set_padding(int size); + // Set the padding of RTP packet. + void set_padding(int size); + // Increase the padding of RTP packet. + void add_padding(int size); + // Reset RTP packet. + void reset(); + // Reuse the cached raw message as payload. + SrsRtpRawPayload* reuse_raw(); + // Reuse the cached fua message as payload. + SrsRtpFUAPayload2* reuse_fua(); // interface ISrsEncoder public: virtual int nb_bytes(); @@ -99,7 +112,8 @@ public: class SrsRtpRawPayload : public ISrsEncoder { public: - // @remark We only refer to the memory, user must free it. + // The RAW payload, directly point to the shared memory. + // @remark We only refer to the memory, user must free its bytes. char* payload; int nn_payload; public: @@ -115,6 +129,7 @@ public: class SrsRtpRawNALUs : public ISrsEncoder { private: + // We will manage the samples, but the sample itself point to the shared memory. std::vector nalus; int nn_bytes; int cursor; @@ -125,7 +140,8 @@ public: void push_back(SrsSample* sample); public: uint8_t skip_first_byte(); - srs_error_t read_samples(std::vector& samples, int size); + // We will manage the returned samples, if user want to manage it, please copy it. + srs_error_t read_samples(std::vector& samples, int packet_size); // interface ISrsEncoder public: virtual int nb_bytes(); @@ -138,7 +154,7 @@ class SrsRtpSTAPPayload : public ISrsEncoder public: // The NRI in NALU type. SrsAvcNaluType nri; - // The NALU samples. + // The NALU samples, we will manage the samples. // @remark We only refer to the memory, user must free its bytes. std::vector nalus; public: @@ -151,6 +167,7 @@ public: }; // FU-A, for one NALU with multiple fragments. +// With more than one payload. class SrsRtpFUAPayload : public ISrsEncoder { public: @@ -160,7 +177,7 @@ public: bool start; bool end; SrsAvcNaluType nalu_type; - // The NALU samples. + // The NALU samples, we manage the samples. // @remark We only refer to the memory, user must free its bytes. std::vector nalus; public: @@ -172,6 +189,29 @@ public: virtual srs_error_t encode(SrsBuffer* buf); }; +// FU-A, for one NALU with multiple fragments. +// With only one payload. +class SrsRtpFUAPayload2 : public ISrsEncoder +{ +public: + // The NRI in NALU type. + SrsAvcNaluType nri; + // The FUA header. + bool start; + bool end; + SrsAvcNaluType nalu_type; + // The payload and size, + char* payload; + int size; +public: + SrsRtpFUAPayload2(); + virtual ~SrsRtpFUAPayload2(); +// interface ISrsEncoder +public: + virtual int nb_bytes(); + virtual srs_error_t encode(SrsBuffer* buf); +}; + class SrsRtpSharedPacket { private: diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index 4e1e1bc01..da54a5dae 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -436,6 +436,8 @@ int srs_sendmmsg(srs_netfd_t stfd, struct mmsghdr *msgvec, unsigned int vlen, in } msgvec->msg_len = r0; #else + msgvec->msg_len = 0; + int tolen = (int)msgvec->msg_hdr.msg_namelen; const struct sockaddr* to = (const struct sockaddr*)msgvec->msg_hdr.msg_name; for (int i = 0; i < (int)msgvec->msg_hdr.msg_iovlen; i++) {