diff --git a/README.md b/README.md index 728152999..270a7569a 100755 --- a/README.md +++ b/README.md @@ -157,6 +157,7 @@ For previous versions, please read: ## V4 changes +* v4.0, 2020-04-14, For [#307][bug #307], support sendmmsg, GSO and reuseport. 4.0.23 * v4.0, 2020-04-05, For [#307][bug #307], SRTP ASM only works with openssl-1.0, auto detect it. 4.0.22 * v4.0, 2020-04-04, Merge RTC and GB28181, with bugs fixed. 4.0.21 * v4.0, 2020-04-04, For [#307][bug #307], refine RTC latency from 600ms to 200ms. 4.0.20 diff --git a/trunk/auto/auto_headers.sh b/trunk/auto/auto_headers.sh index 84b64319b..d77325395 100755 --- a/trunk/auto/auto_headers.sh +++ b/trunk/auto/auto_headers.sh @@ -171,6 +171,12 @@ else srs_undefine_macro "SRS_AUTO_HAS_SENDMMSG" $SRS_AUTO_HEADERS_H fi +if [ $SRS_DEBUG = YES ]; then + srs_define_macro "SRS_AUTO_DEBUG" $SRS_AUTO_HEADERS_H +else + srs_undefine_macro "SRS_AUTO_DEBUG" $SRS_AUTO_HEADERS_H +fi + # prefix echo "" >> $SRS_AUTO_HEADERS_H echo "#define SRS_AUTO_PREFIX \"${SRS_PREFIX}\"" >> $SRS_AUTO_HEADERS_H diff --git a/trunk/auto/depends.sh b/trunk/auto/depends.sh index 457b371a5..f2d3e378c 100755 --- a/trunk/auto/depends.sh +++ b/trunk/auto/depends.sh @@ -110,7 +110,11 @@ function Ubuntu_prepare() echo "The valgrind-dev is installed." fi fi - + + pkg-config --version >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then + echo "Please install pkg-config"; exit -1; + fi + echo "Tools for Ubuntu are installed." return 0 } @@ -191,6 +195,10 @@ function Centos_prepare() echo "The valgrind-devel is installed." fi fi + + pkg-config --version --help >/dev/null 2>&1; ret=$?; if [[ 0 -ne $ret ]]; then + echo "Please install pkg-config"; exit -1; + fi echo "Tools for Centos are installed." return 0 diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh index 0e31b05fd..69293fce3 100755 --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -122,6 +122,7 @@ SRS_NASM=YES SRS_SRTP_ASM=YES SRS_SENDMMSG=YES SRS_HAS_SENDMMSG=YES +SRS_DEBUG=NO ##################################################################################### # menu @@ -162,6 +163,7 @@ Features: --prefix= The absolute installation path for srs. Default: $SRS_PREFIX --static Whether add '-static' to link options. --gcov Whether enable the GCOV compiler options. + --debug Whether enable the debug code, may hurt performance. --jobs[=N] Allow N jobs at once; infinite jobs with no arg. Used for make in the configure, for example, to make ffmpeg. --log-verbose Whether enable the log verbose level. default: no. @@ -293,6 +295,7 @@ function parse_user_option() { --log-info) SRS_LOG_INFO=YES ;; --log-trace) SRS_LOG_TRACE=YES ;; --gcov) SRS_GCOV=YES ;; + --debug) SRS_DEBUG=YES ;; --arm) SRS_CROSS_BUILD=YES ;; --mips) SRS_CROSS_BUILD=YES ;; @@ -623,6 +626,7 @@ function regenerate_options() { if [ $SRS_LOG_INFO = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --log-info"; fi if [ $SRS_LOG_TRACE = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --log-trace"; fi if [ $SRS_GCOV = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --gcov"; fi + if [ $SRS_DEBUG = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --debug"; fi if [[ $SRS_EXTRA_FLAGS != '' ]]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --extra-flags=\\\"$SRS_EXTRA_FLAGS\\\""; fi if [[ $SRS_BUILD_TAG != '' ]]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --build-tag=\\\"$SRS_BUILD_TAG\\\""; fi if [[ $SRS_TOOL_CC != '' ]]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --cc=$SRS_TOOL_CC"; fi diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 7304edd06..581e4daa3 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -435,6 +435,14 @@ rtc_server { # and net.core.rmem_default or just increase this to get larger UDP recv and send buffer. # default: 4 reuseport 4; + # Whether merge multiple NALUs into one. + # @see https://github.com/ossrs/srs/issues/307#issuecomment-612806318 + # default: on + merge_nalus on; + # Whether enable GSO to send out RTP packets. + # @remark Linux 4.18+ only, for other OS always disabled. + # default: on + gso on; } vhost rtc.vhost.srs.com { diff --git a/trunk/research/players/js/srs.page.js b/trunk/research/players/js/srs.page.js index 56ff81e45..3f3b1f6ca 100755 --- a/trunk/research/players/js/srs.page.js +++ b/trunk/research/players/js/srs.page.js @@ -207,6 +207,14 @@ function build_default_hls_url() { } function build_default_rtc_url(query) { + // Use target to overwrite server, vhost and eip. + console.log('?target=x.x.x.x to overwrite server, vhost and eip.'); + if (query.target) { + query.server = query.vhost = query.eip = query.target; + query.user_query.eip = query.target; + delete query.target; + } + var server = (!query.server)? window.location.hostname:query.server; var vhost = (!query.vhost)? window.location.hostname:query.vhost; var app = (!query.app)? "live":query.app; diff --git a/trunk/scripts/perf_gso.py b/trunk/scripts/perf_gso.py new file mode 100755 index 000000000..707e6c6b5 --- /dev/null +++ b/trunk/scripts/perf_gso.py @@ -0,0 +1,90 @@ +#!/usr/bin/python +''' +The MIT License (MIT) + +Copyright (c) 2013-2016 SRS(ossrs) + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +''' + +import urllib, sys, json + +url = "http://localhost:1985/api/v1/perf" +if len(sys.argv) < 2: + print "Usage: %s "%(sys.argv[0]) + print "For example:" + print " %s http://localhost:1985/api/v1/perf"%(sys.argv[0]) + sys.exit(-1) + +url = sys.argv[1] +print "Open %s"%(url) + +f = urllib.urlopen(url) +s = f.read() +f.close() +print "Repsonse %s"%(s) + +obj = json.loads(s) + +# 2, 3, 5, 9, 16, 32, 64, 128, 256 +keys = ['lt_2', 'lt_3', 'lt_5', 'lt_9', 'lt_16', 'lt_32', 'lt_64', 'lt_128', 'lt_256', 'gt_256'] + +print "" +print("AV---Frames"), +p = obj['data']['avframes'] +for k in keys: + k2 = '%s'%(k) + if k2 in p: + print(p[k2]), + else: + print(0), +print(p['nn']), + +print "" +print("RTC--Frames"), +p = obj['data']['rtc'] +for k in keys: + k2 = '%s'%(k) + if k2 in p: + print(p[k2]), + else: + print(0), +print(p['nn']), + +print "" +print("RTP-Packets"), +p = obj['data']['rtp'] +for k in keys: + k2 = '%s'%(k) + if k2 in p: + print(p[k2]), + else: + print(0), +print(p['nn']), + +print "" +print("GSO-Packets"), +p = obj['data']['gso'] +for k in keys: + k2 = '%s'%(k) + if k2 in p: + print(p[k2]), + else: + print(0), +print(p['nn']), + diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index d3e790426..956bc99b0 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -33,6 +33,10 @@ #include #include #include +#ifdef __linux__ +#include +#include +#endif #include #include @@ -3614,7 +3618,7 @@ srs_error_t SrsConfig::check_normal_config() for (int i = 0; conf && i < (int)conf->directives.size(); i++) { string n = conf->at(i)->name; if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa" - && n != "sendmmsg" && n != "encrypt" && n != "reuseport") { + && n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str()); } } @@ -4747,12 +4751,20 @@ int SrsConfig::get_rtc_server_sendmmsg() int SrsConfig::get_rtc_server_reuseport() { -#if defined(SO_REUSEPORT) - static int DEFAULT = 4; -#else - static int DEFAULT = 1; + int v = get_rtc_server_reuseport2(); + +#if !defined(SO_REUSEPORT) + srs_warn("REUSEPORT not supported, reset %d to %d", reuseport, DEFAULT); + v = 1 #endif + return v; +} + +int SrsConfig::get_rtc_server_reuseport2() +{ + static int DEFAULT = 4; + SrsConfDirective* conf = root->get("rtc_server"); if (!conf) { return DEFAULT; @@ -4763,13 +4775,69 @@ int SrsConfig::get_rtc_server_reuseport() return DEFAULT; } - int reuseport = ::atoi(conf->arg0().c_str()); -#if !defined(SO_REUSEPORT) - srs_warn("REUSEPORT not supported, reset %d to %d", reuseport, DEFAULT); - reuseport = DEFAULT + return ::atoi(conf->arg0().c_str()); +} + +bool SrsConfig::get_rtc_server_merge_nalus() +{ + static int DEFAULT = true; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("merge_nalus"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + +bool SrsConfig::get_rtc_server_gso() +{ + bool v = get_rtc_server_gso2(); + + bool gso_disabled = false; +#if !defined(__linux__) + gso_disabled = true; + if (v) { + srs_warn("GSO is disabled, for Linux 4.18+ only"); + } +#elif LINUX_VERSION_CODE < KERNEL_VERSION(4,18,0) + if (v) { + utsname un = {0}; + int r0 = uname(&un); + if (r0 || strcmp(un.release, "4.18.0") < 0) { + gso_disabled = true; + srs_warn("GSO is disabled, for Linux 4.18+ only, r0=%d, kernel=%s", r0, un.release); + } + } #endif - return reuseport; + if (v && gso_disabled) { + v = false; + } + + return v; +} + +bool SrsConfig::get_rtc_server_gso2() +{ + static int DEFAULT = true; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("gso"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); } SrsConfDirective* SrsConfig::get_rtc(string vhost) diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index ba46984e3..96524c33a 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -528,7 +528,14 @@ public: virtual int get_rtc_server_sendmmsg(); virtual bool get_rtc_server_encrypt(); virtual int get_rtc_server_reuseport(); - +private: + virtual int get_rtc_server_reuseport2(); +public: + virtual bool get_rtc_server_merge_nalus(); + virtual bool get_rtc_server_gso(); +private: + virtual bool get_rtc_server_gso2(); +public: SrsConfDirective* get_rtc(std::string vhost); bool get_rtc_enabled(std::string vhost); bool get_rtc_bframe_discard(std::string vhost); diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 1f3749b7a..6a83f7a8c 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1622,13 +1622,40 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* data->set("query", p); p->set("target", SrsJsonAny::str(target.c_str())); - p->set("help", SrsJsonAny::str("?target=writev|sendmmsg")); + p->set("help", SrsJsonAny::str("?target=avframes|rtc|rtp|gso|writev_iovs|sendmmsg")); } - if (target.empty() || target == "writev") { + if (target.empty() || target == "avframes") { SrsJsonObject* p = SrsJsonAny::object(); - data->set("writev", p); - if ((err = stat->dumps_perf_writev(p)) != srs_success) { + data->set("avframes", p); + if ((err = stat->dumps_perf_msgs(p)) != srs_success) { + int code = srs_error_code(err); srs_error_reset(err); + return srs_api_response_code(w, r, code); + } + } + + if (target.empty() || target == "rtc") { + SrsJsonObject* p = SrsJsonAny::object(); + data->set("rtc", p); + if ((err = stat->dumps_perf_rtc_packets(p)) != srs_success) { + int code = srs_error_code(err); srs_error_reset(err); + return srs_api_response_code(w, r, code); + } + } + + if (target.empty() || target == "rtp") { + SrsJsonObject* p = SrsJsonAny::object(); + data->set("rtp", p); + if ((err = stat->dumps_perf_rtp_packets(p)) != srs_success) { + int code = srs_error_code(err); srs_error_reset(err); + return srs_api_response_code(w, r, code); + } + } + + if (target.empty() || target == "gso") { + SrsJsonObject* p = SrsJsonAny::object(); + data->set("gso", p); + if ((err = stat->dumps_perf_gso(p)) != srs_success) { int code = srs_error_code(err); srs_error_reset(err); return srs_api_response_code(w, r, code); } @@ -1643,6 +1670,15 @@ srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* } } + if (target.empty() || target == "writev_iovs") { + SrsJsonObject* p = SrsJsonAny::object(); + data->set("writev_iovs", p); + if ((err = stat->dumps_perf_writev_iovs(p)) != srs_success) { + int code = srs_error_code(err); srs_error_reset(err); + return srs_api_response_code(w, r, code); + } + } + return srs_api_response(w, r, obj->dumps()); } diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 6609c3852..95c1f03fd 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -680,6 +680,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess err = streaming_send_messages(enc, msgs.msgs, count); } + // TODO: FIXME: Update the stat. + // free the messages. for (int i = 0; i < count; i++) { SrsSharedPtrMessage* msg = msgs.msgs[i]; diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 2117983be..3294c79f5 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -499,7 +499,7 @@ srs_error_t SrsUdpMuxListener::cycle() pps_unit = "(k)"; pps_last /= 10000; pps_average /= 10000; } - srs_trace("<- RTC #%d RECV %" PRId64 ", pps %d/%d%s, schedule %" PRId64, + srs_trace("<- RTC RECV #%d, udp %" PRId64 ", pps %d/%d%s, schedule %" PRId64, srs_netfd_fileno(lfd), nn_msgs_stage, pps_average, pps_last, pps_unit.c_str(), nn_loop); nn_msgs_last = nn_msgs; time_last = srs_get_system_time(); nn_loop = 0; nn_msgs_stage = 0; diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 6e1d1a524..b3cd71922 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -33,6 +33,11 @@ using namespace std; #include #include +#include +#ifndef UDP_SEGMENT +#define UDP_SEGMENT 103 +#endif + #include #include @@ -386,7 +391,7 @@ srs_error_t SrsDtlsSession::protect_rtp(char* out_buf, const char* in_buf, int& return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect failed"); } -srs_error_t SrsDtlsSession::protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2* pkt) +srs_error_t SrsDtlsSession::protect_rtp2(void* rtp_hdr, int* len_ptr) { srs_error_t err = srs_success; @@ -394,14 +399,7 @@ srs_error_t SrsDtlsSession::protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2* return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect"); } - SrsBuffer stream(buf, *pnn_buf); - if ((err = pkt->encode(&stream)) != srs_success) { - return srs_error_wrap(err, "encode packet"); - } - - *pnn_buf = stream.pos(); - - if (srtp_protect(srtp_send, buf, pnn_buf) != 0) { + if (srtp_protect(srtp_send, rtp_hdr, len_ptr) != 0) { return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect"); } @@ -456,6 +454,26 @@ srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, in return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "rtcp unprotect failed"); } +SrsRtcPackets::SrsRtcPackets(bool gso, bool merge_nalus) +{ + use_gso = gso; + should_merge_nalus = merge_nalus; + + nn_rtp_pkts = 0; + nn_audios = nn_extras = 0; + nn_videos = nn_samples = 0; +} + +SrsRtcPackets::~SrsRtcPackets() +{ + vector::iterator it; + for (it = packets.begin(); it != packets.end(); ++it ) { + SrsRtpPacket2* packet = *it; + srs_freep(packet); + } + packets.clear(); +} + SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid) : sendonly_ukt(NULL) { @@ -464,15 +482,21 @@ SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int rtc_session = s; sendonly_ukt = u->copy_sendonly(); + gso = false; + merge_nalus = false; audio_timestamp = 0; audio_sequence = 0; video_sequence = 0; + + _srs_config->subscribe(this); } SrsRtcSenderThread::~SrsRtcSenderThread() { + _srs_config->unsubscribe(this); + srs_freep(trd); srs_freep(sendonly_ukt); } @@ -487,9 +511,35 @@ srs_error_t SrsRtcSenderThread::initialize(const uint32_t& vssrc, const uint32_t video_payload_type = v_pt; audio_payload_type = a_pt; + gso = _srs_config->get_rtc_server_gso(); + merge_nalus = _srs_config->get_rtc_server_merge_nalus(); + srs_trace("RTC sender video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), package(gso=%d, merge_nalus=%d)", + video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, merge_nalus); + return err; } +srs_error_t SrsRtcSenderThread::on_reload_rtc_server() +{ + if (true) { + bool v = _srs_config->get_rtc_server_gso(); + if (gso != v) { + srs_trace("Reload gso %d=>%d", gso, v); + gso = v; + } + } + + if (true) { + bool v = _srs_config->get_rtc_server_merge_nalus(); + if (merge_nalus != v) { + srs_trace("Reload merge_nalus %d=>%d", merge_nalus, v); + merge_nalus = v; + } + } + + return srs_success; +} + int SrsRtcSenderThread::cid() { return trd->cid(); @@ -560,6 +610,7 @@ srs_error_t SrsRtcSenderThread::cycle() SrsAutoFree(SrsPithyPrint, pprint); srs_trace("rtc session=%s, start play", rtc_session->id().c_str()); + SrsStatistic* stat = SrsStatistic::instance(); while (true) { if ((err = trd->pull()) != srs_success) { @@ -569,7 +620,7 @@ srs_error_t SrsRtcSenderThread::cycle() #ifdef SRS_PERF_QUEUE_COND_WAIT if (realtime) { // for realtime, min required msgs is 0, send when got one+ msgs. - consumer->wait(0, mw_sleep); + consumer->wait(SRS_PERF_MW_MIN_MSGS_FOR_RTC_REALTIME, mw_sleep); } else { // for no-realtime, got some msgs then send. consumer->wait(SRS_PERF_MW_MIN_MSGS_FOR_RTC, mw_sleep); @@ -589,9 +640,8 @@ srs_error_t SrsRtcSenderThread::cycle() continue; } - int nn = 0; - int nn_rtp_pkts = 0; - if ((err = send_messages(source, msgs.msgs, msg_count, sendonly_ukt, &nn, &nn_rtp_pkts)) != srs_success) { + SrsRtcPackets pkts(gso, merge_nalus); + if ((err = send_messages(sendonly_ukt, source, msgs.msgs, msg_count, pkts)) != srs_success) { srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); } @@ -600,17 +650,33 @@ srs_error_t SrsRtcSenderThread::cycle() srs_freep(msg); } + // Stat the original RAW AV frame, maybe h264+aac. + stat->perf_on_msgs(msg_count); + // Stat the RTC packets, RAW AV frame, maybe h.264+opus. + int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos; + stat->perf_on_rtc_packets(nn_rtc_packets); + // Stat the RAW RTP packets, which maybe group by GSO. + stat->perf_on_rtp_packets(pkts.packets.size()); + // Stat the RTP packets going into kernel. + stat->perf_on_gso_packets(pkts.nn_rtp_pkts); +#if defined(SRS_DEBUG) + srs_trace("RTC PLAY packets, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d bytes", + msg_count, nn_rtc_packets, pkts.packets.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, + pkts.nn_samples, pkts.nn_bytes); +#endif + pprint->elapse(); if (pprint->can_print()) { // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. - srs_trace("-> RTC PLAY %d msgs, %d packets, %d bytes", msg_count, nn_rtp_pkts, nn); + srs_trace("-> RTC PLAY %d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d bytes", + msg_count, pkts.packets.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, + pkts.nn_samples, pkts.nn_bytes); } } } srs_error_t SrsRtcSenderThread::send_messages( - SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, - SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts + SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets ) { srs_error_t err = srs_success; @@ -619,34 +685,81 @@ srs_error_t SrsRtcSenderThread::send_messages( } // Covert kernel messages to RTP packets. - vector packets; + if ((err = messages_to_packets(source, msgs, nb_msgs, packets)) != srs_success) { + return srs_error_wrap(err, "messages to packets"); + } + +#ifndef SRS_AUTO_OSX + // If enabled GSO, send out some packets in a msghdr. + if (packets.use_gso) { + if ((err = send_packets_gso(skt, packets)) != srs_success) { + return srs_error_wrap(err, "gso send"); + } + return err; + } +#endif + + // By default, we send packets by sendmmsg. + if ((err = send_packets(skt, packets)) != srs_success) { + return srs_error_wrap(err, "raw send"); + } + + return err; +} + +srs_error_t SrsRtcSenderThread::messages_to_packets( + SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets +) { + srs_error_t err = srs_success; for (int i = 0; i < nb_msgs; i++) { SrsSharedPtrMessage* msg = msgs[i]; - *pnn += msg->size; + // Update stats. + packets.nn_bytes += msg->size; + + int nn_extra_payloads = msg->nn_extra_payloads(); + packets.nn_extras += nn_extra_payloads; + + int nn_samples = msg->nn_samples(); + packets.nn_samples += nn_samples; + + // For audio, we transcoded AAC to opus in extra payloads. SrsRtpPacket2* packet = NULL; if (msg->is_audio()) { - for (int i = 0; i < msg->nn_extra_payloads(); i++) { + packets.nn_audios++; + + for (int i = 0; i < nn_extra_payloads; i++) { SrsSample* sample = msg->extra_payloads() + i; if ((err = packet_opus(sample, &packet)) != srs_success) { return srs_error_wrap(err, "opus package"); } - packets.push_back(packet); + packets.packets.push_back(packet); } - continue; } + // For video, we should process all NALUs in samples. + packets.nn_videos++; + // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. if (msg->has_idr()) { if ((err = packet_stap_a(source, msg, &packet)) != srs_success) { return srs_error_wrap(err, "packet stap-a"); } - packets.push_back(packet); + packets.packets.push_back(packet); } - for (int i = 0; i < msg->nn_samples(); i++) { + // If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet. + if (packets.should_merge_nalus && nn_samples > 1) { + if ((err = packet_nalus(msg, packets)) != srs_success) { + return srs_error_wrap(err, "packet stap-a"); + } + continue; + } + + // By default, we package each NALU(sample) to a RTP or FUA packet. + for (int i = 0; i < nn_samples; i++) { SrsSample* sample = msg->samples() + i; // We always ignore bframe here, if config to discard bframe, @@ -660,76 +773,339 @@ srs_error_t SrsRtcSenderThread::send_messages( if ((err = packet_single_nalu(msg, sample, &packet)) != srs_success) { return srs_error_wrap(err, "packet single nalu"); } - - if (i == msg->nn_samples() - 1) { - packet->rtp_header.set_marker(true); - } - packets.push_back(packet); + packets.packets.push_back(packet); } else { if ((err = packet_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) { return srs_error_wrap(err, "packet fu-a"); } + } - if (i == msg->nn_samples() - 1) { - packets.back()->rtp_header.set_marker(true); - } + if (i == nn_samples - 1) { + packets.packets.back()->rtp_header.set_marker(true); } } } - *pnn_rtp_pkts += (int)packets.size(); - - for (int j = 0; j < (int)packets.size(); j++) { - SrsRtpPacket2* packet = packets[j]; - if ((err = send_packet(packet, skt)) != srs_success) { - srs_warn("send err %s", srs_error_summary(err).c_str()); srs_error_reset(err); - } - srs_freep(packet); - } - return err; } -srs_error_t SrsRtcSenderThread::send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt) +srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets) { srs_error_t err = srs_success; ISrsUdpSender* sender = skt->sender(); - // Fetch a cached message from queue. - // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. - mmsghdr* mhdr = NULL; - if ((err = sender->fetch(&mhdr)) != srs_success) { - return srs_error_wrap(err, "fetch msghdr"); - } - char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; + vector::iterator it; + for (it = packets.packets.begin(); it != packets.packets.end(); ++it) { + SrsRtpPacket2* packet = *it; - // Length of iov, default size. - int length = kRtpPacketSize; - - if (rtc_session->encrypt) { - if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length, pkt)) != srs_success) { - return srs_error_wrap(err, "srtp protect"); + // Fetch a cached message from queue. + // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. + mmsghdr* mhdr = NULL; + if ((err = sender->fetch(&mhdr)) != srs_success) { + return srs_error_wrap(err, "fetch msghdr"); } + char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; + int length = kRtpPacketSize; + + // Marshal packet to bytes. + if (true) { + SrsBuffer stream(buf, length); + if ((err = packet->encode(&stream)) != srs_success) { + return srs_error_wrap(err, "encode packet"); + } + length = stream.pos(); + } + + // Whether encrypt the RTP bytes. + if (rtc_session->encrypt) { + if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length)) != srs_success) { + return srs_error_wrap(err, "srtp protect"); + } + } + + sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); + socklen_t addrlen = (socklen_t)skt->peer_addrlen(); + + mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; + mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; + mhdr->msg_hdr.msg_iov->iov_len = length; + mhdr->msg_hdr.msg_controllen = 0; + mhdr->msg_len = 0; + + // When we send out a packet, we commit a RTP packet. + packets.nn_rtp_pkts++; + + if ((err = sender->sendmmsg(mhdr)) != srs_success) { + return srs_error_wrap(err, "send msghdr"); + } + } + + return err; +} + +// TODO: FIXME: We can gather and pad audios, because they have similar size. +srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets) +{ + srs_error_t err = srs_success; + + // Previous handler, if has the same size, we can use GSO. + mmsghdr* gso_mhdr = NULL; int gso_size = 0; int gso_encrypt = 0; int gso_cursor = 0; + // GSO, N packets has same length, the final one may not. + bool use_gso = false; bool gso_final = false; + + ISrsUdpSender* sender = skt->sender(); + int nn_packets = (int)packets.packets.size(); + for (int i = 0; i < nn_packets; i++) { + SrsRtpPacket2* packet = packets.packets[i]; + + // The handler to send message. + mmsghdr* mhdr = NULL; + + // Check whether we can use GSO to send it. + int nn_packet = packet->nb_bytes(); + + if ((gso_size && gso_size == nn_packet) || (use_gso && !gso_final)) { + use_gso = true; + gso_final = (gso_size && gso_size != nn_packet); + mhdr = gso_mhdr; + + // We need to increase the iov and cursor. + int nb_iovs = mhdr->msg_hdr.msg_iovlen; + if (gso_cursor >= nb_iovs - 1) { + int nn_new_iovs = nb_iovs; + mhdr->msg_hdr.msg_iovlen = nb_iovs + nn_new_iovs; + mhdr->msg_hdr.msg_iov = (iovec*)realloc(mhdr->msg_hdr.msg_iov, sizeof(iovec) * (nb_iovs + nn_new_iovs)); + memset(mhdr->msg_hdr.msg_iov + nb_iovs, 0, sizeof(iovec) * nn_new_iovs); + } + gso_cursor++; + + // Create payload cache for RTP packet. + iovec* p = mhdr->msg_hdr.msg_iov + gso_cursor; + if (!p->iov_base) { + p->iov_base = new char[kRtpPacketSize]; + p->iov_len = kRtpPacketSize; + } + } + + // Change the state according to the next packet. + if (i < nn_packets - 1) { + SrsRtpPacket2* next_packet = (i < nn_packets - 1)? packets.packets[i + 1]:NULL; + int nn_next_packet = next_packet? next_packet->nb_bytes() : 0; + + // If GSO, but next is bigger than this one, we must enter the final state. + if (use_gso && !gso_final) { + gso_final = (nn_packet < nn_next_packet); + } + + // If not GSO, maybe the first fresh packet, we should see whether the next packet is smaller than this one, + // if smaller, we can still enter GSO. + if (!use_gso) { + use_gso = (nn_packet >= nn_next_packet); + } + } + + // Now, we fetch the msg from cache. + if (!mhdr) { + // Fetch a cached message from queue. + // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. + if ((err = sender->fetch(&mhdr)) != srs_success) { + return srs_error_wrap(err, "fetch msghdr"); + } + + // Reset the iovec, we should never change the msg_iovlen. + for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) { + iovec* p = mhdr->msg_hdr.msg_iov + j; + p->iov_len = 0; + } + + // Now, GSO will use this message and size. + if (use_gso) { + gso_mhdr = mhdr; + gso_size = nn_packet; + } + } + + // Marshal packet to bytes. + iovec* iov = mhdr->msg_hdr.msg_iov + gso_cursor; + iov->iov_len = kRtpPacketSize; + + if (true) { + SrsBuffer stream((char*)iov->iov_base, iov->iov_len); + if ((err = packet->encode(&stream)) != srs_success) { + return srs_error_wrap(err, "encode packet"); + } + iov->iov_len = stream.pos(); + } + + // Whether encrypt the RTP bytes. + if (rtc_session->encrypt) { + int nn_encrypt = (int)iov->iov_len; + if ((err = rtc_session->dtls_session->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) { + return srs_error_wrap(err, "srtp protect"); + } + iov->iov_len = (size_t)nn_encrypt; + } + + // If GSO, they must has same size, except the final one. + if (use_gso && !gso_final && gso_encrypt && gso_encrypt != (int)iov->iov_len) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "GSO size=%d/%d, encrypt=%d/%d", gso_size, nn_packet, gso_encrypt, iov->iov_len); + } + + if (use_gso && !gso_final) { + gso_encrypt = iov->iov_len; + } + + // If exceed the max GSO size, set to final. + if (use_gso && gso_cursor > 64) { + gso_final = true; + } + + // For last message, or final gso, or determined not using GSO, send it now. + bool do_send = (i == nn_packets - 1 || gso_final || !use_gso); + +#if defined(SRS_DEBUG) + bool is_video = packet->rtp_header.get_payload_type() == video_payload_type; + srs_trace("Packet %s SSRC=%d, SN=%d, %d bytes", is_video? "Video":"Audio", packet->rtp_header.get_ssrc(), + packet->rtp_header.get_sequence(), nn_packet); + if (do_send) { + for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) { + iovec* iov = mhdr->msg_hdr.msg_iov + j; + if (iov->iov_len <= 0) { + break; + } + srs_trace("%s #%d/%d/%d, %d bytes, size %d/%d", (use_gso? "GSO":"RAW"), j, gso_cursor + 1, + mhdr->msg_hdr.msg_iovlen, iov->iov_len, gso_size, gso_encrypt); + } + } +#endif + + if (do_send) { + sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); + socklen_t addrlen = (socklen_t)skt->peer_addrlen(); + + mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; + mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; + mhdr->msg_hdr.msg_controllen = 0; + mhdr->msg_len = 0; + +#ifndef SRS_AUTO_OSX + if (use_gso) { + mhdr->msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t)); + if (!mhdr->msg_hdr.msg_control) { + mhdr->msg_hdr.msg_control = new char[mhdr->msg_hdr.msg_controllen]; + } + + cmsghdr* cm = CMSG_FIRSTHDR(&mhdr->msg_hdr); + cm->cmsg_level = SOL_UDP; + cm->cmsg_type = UDP_SEGMENT; + cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); + *((uint16_t*)CMSG_DATA(cm)) = gso_encrypt; + + // Private message, use it to store the cursor. + mhdr->msg_len = gso_cursor + 1; + } +#endif + + // When we send out a packet, we commit a RTP packet. + packets.nn_rtp_pkts++; + + if ((err = sender->sendmmsg(mhdr)) != srs_success) { + return srs_error_wrap(err, "send msghdr"); + } + + // Reset the GSO flag. + gso_mhdr = NULL; gso_size = 0; gso_encrypt = 0; gso_cursor = 0; + use_gso = gso_final = false; + } + } + +#if defined(SRS_DEBUG) + srs_trace("RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d", packets.packets.size(), + packets.nn_rtp_pkts, packets.nn_videos, packets.nn_samples, packets.nn_audios, packets.nn_extras); +#endif + + return err; +} + +srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets) +{ + srs_error_t err = srs_success; + + SrsRtpRawNALUs* raw = new SrsRtpRawNALUs(); + + for (int i = 0; i < msg->nn_samples(); i++) { + SrsSample* sample = msg->samples() + i; + + // We always ignore bframe here, if config to discard bframe, + // the bframe flag will not be set. + if (sample->bframe) { + continue; + } + + raw->push_back(sample->copy()); + } + + // Ignore empty. + int nn_bytes = raw->nb_bytes(); + if (nn_bytes <= 0) { + srs_freep(raw); + return err; + } + + const int kRtpMaxPayloadSize = 1200; + if (nn_bytes < kRtpMaxPayloadSize) { + // Package NALUs in a single RTP packet. + SrsRtpPacket2* packet = new SrsRtpPacket2(); + packet->rtp_header.set_timestamp(msg->timestamp * 90); + packet->rtp_header.set_sequence(video_sequence++); + packet->rtp_header.set_ssrc(video_ssrc); + packet->rtp_header.set_payload_type(video_payload_type); + packet->payload = raw; + packets.packets.push_back(packet); } else { - SrsBuffer stream(buf, length); - if ((err = pkt->encode(&stream)) != srs_success) { - return srs_error_wrap(err, "encode packet"); + SrsAutoFree(SrsRtpRawNALUs, raw); + + // Package NALUs in FU-A RTP packets. + int fu_payload_size = kRtpMaxPayloadSize; + + // The first byte is store in FU-A header. + uint8_t header = raw->skip_first_byte(); + uint8_t nal_type = header & kNalTypeMask; + int nb_left = nn_bytes - 1; + + int num_of_packet = 1 + (nn_bytes - 1) / fu_payload_size; + for (int i = 0; i < num_of_packet; ++i) { + int packet_size = srs_min(nb_left, fu_payload_size); + + SrsRtpPacket2* packet = new SrsRtpPacket2(); + packets.packets.push_back(packet); + + packet->rtp_header.set_timestamp(msg->timestamp * 90); + packet->rtp_header.set_sequence(video_sequence++); + packet->rtp_header.set_ssrc(video_ssrc); + packet->rtp_header.set_payload_type(video_payload_type); + + SrsRtpFUAPayload* fua = new SrsRtpFUAPayload(); + packet->payload = fua; + + fua->nri = (SrsAvcNaluType)header; + fua->nalu_type = (SrsAvcNaluType)nal_type; + fua->start = bool(i == 0); + fua->end = bool(i == num_of_packet - 1); + + if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) { + return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes); + } + + nb_left -= packet_size; } - length = stream.pos(); } - sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); - socklen_t addrlen = (socklen_t)skt->peer_addrlen(); - - mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; - mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; - mhdr->msg_hdr.msg_iov->iov_len = length; - mhdr->msg_len = 0; - - if ((err = sender->sendmmsg(mhdr)) != srs_success) { - return srs_error_wrap(err, "send msghdr"); + if (!packets.packets.empty()) { + packets.packets.back()->rtp_header.set_marker(true); } + return err; } @@ -757,7 +1133,7 @@ srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** p return err; } -srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, vector& packets) +srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets) { srs_error_t err = srs_success; @@ -771,7 +1147,7 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* int packet_size = srs_min(nb_left, fu_payload_size); SrsRtpPacket2* packet = new SrsRtpPacket2(); - packets.push_back(packet); + packets.packets.push_back(packet); packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_sequence(video_sequence++); @@ -786,10 +1162,10 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* fua->start = bool(i == 0); fua->end = bool(i == num_of_packet - 1); - SrsSample* sample = new SrsSample(); - sample->bytes = p; - sample->size = packet_size; - fua->nalus.push_back(sample); + SrsSample* fragment_sample = new SrsSample(); + fragment_sample->bytes = p; + fragment_sample->size = packet_size; + fua->nalus.push_back(fragment_sample); p += packet_size; nb_left -= packet_size; @@ -809,11 +1185,13 @@ srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, Srs packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_payload_type(video_payload_type); - SrsRtpRawPayload* raw = new SrsRtpRawPayload(); + SrsRtpRawNALUs* raw = new SrsRtpRawNALUs(); packet->payload = raw; - raw->payload = sample->bytes; - raw->nn_payload = sample->size; + SrsSample* p = new SrsSample(); + p->bytes = sample->bytes; + p->size = sample->size; + raw->push_back(p); *ppacket = packet; @@ -1372,7 +1750,8 @@ srs_error_t SrsUdpMuxSender::initialize(srs_netfd_t fd) } max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); - srs_trace("UDP sender #%d init ok, max_sendmmsg=%d", srs_netfd_fileno(fd), max_sendmmsg); + bool gso = _srs_config->get_rtc_server_gso(); + srs_trace("UDP sender #%d init ok, max_sendmmsg=%d, gso=%d", srs_netfd_fileno(fd), max_sendmmsg, gso); return err; } @@ -1382,6 +1761,11 @@ void SrsUdpMuxSender::free_mhdrs(std::vector& mhdrs) for (int i = 0; i < (int)mhdrs.size(); i++) { mmsghdr* hdr = &mhdrs[i]; + // Free control for GSO. + char* msg_control = (char*)hdr->msg_hdr.msg_control; + srs_freep(msg_control); + + // Free iovec. for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) { iovec* iov = hdr->msg_hdr.msg_iov + j; char* data = (char*)iov->iov_base; @@ -1389,6 +1773,7 @@ void SrsUdpMuxSender::free_mhdrs(std::vector& mhdrs) srs_freep(iov); } } + mhdrs.clear(); } srs_error_t SrsUdpMuxSender::fetch(mmsghdr** pphdr) @@ -1425,11 +1810,9 @@ srs_error_t SrsUdpMuxSender::cycle() { srs_error_t err = srs_success; - uint64_t nn_msgs = 0; - uint64_t nn_msgs_last = 0; - int nn_msgs_max = 0; - int nn_loop = 0; - int nn_wait = 0; + uint64_t nn_msgs = 0; uint64_t nn_msgs_last = 0; int nn_msgs_max = 0; + uint64_t nn_gso_msgs = 0; uint64_t nn_gso_iovs = 0; int nn_gso_msgs_max = 0; int nn_gso_iovs_max = 0; + int nn_loop = 0; int nn_wait = 0; srs_utime_t time_last = srs_get_system_time(); SrsStatistic* stat = SrsStatistic::instance(); @@ -1444,7 +1827,9 @@ srs_error_t SrsUdpMuxSender::cycle() nn_loop++; int pos = cache_pos; - if (pos <= 0) { + int gso_pos = 0; + int gso_iovs = 0; + if (pos <= 0 && gso_pos == 0) { waiting_msgs = true; nn_wait++; srs_cond_wait(cond); @@ -1455,22 +1840,45 @@ srs_error_t SrsUdpMuxSender::cycle() cache.swap(hotspot); cache_pos = 0; - mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; - for (; p < end; p += max_sendmmsg) { - int vlen = (int)(end - p); - vlen = srs_min(max_sendmmsg, vlen); + // Collect informations for GSO + if (pos > 0) { + // For shared GSO cache, stat the messages. + mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; + for (p = &hotspot[0]; p < end; p++) { + if (!p->msg_len) { + continue; + } - int r0 = srs_sendmmsg(lfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT); - if (r0 != vlen) { - srs_warn("sendmsg %d msgs, %d done", vlen, r0); + // Private message, use it to store the cursor. + int real_iovs = p->msg_len; + p->msg_len = 0; + + gso_pos++; nn_gso_msgs++; nn_gso_iovs += real_iovs; gso_iovs += real_iovs; } + } - stat->perf_mw_on_packets(vlen); + // Send out all messages, may GSO if shared cache. + if (pos > 0) { + // Send out all messages. + mmsghdr* p = &hotspot[0]; mmsghdr* end = p + pos; + for (p = &hotspot[0]; p < end; p += max_sendmmsg) { + int vlen = (int)(end - p); + vlen = srs_min(max_sendmmsg, vlen); + + int r0 = srs_sendmmsg(lfd, p, (unsigned int)vlen, 0, SRS_UTIME_NO_TIMEOUT); + if (r0 != vlen) { + srs_warn("sendmmsg %d msgs, %d done", vlen, r0); + } + + stat->perf_sendmmsg_on_packets(vlen); + } } // Increase total messages. nn_msgs += pos; nn_msgs_max = srs_max(pos, nn_msgs_max); + nn_gso_msgs_max = srs_max(gso_pos, nn_gso_msgs_max); + nn_gso_iovs_max = srs_max(gso_iovs, nn_gso_iovs_max); pprint->elapse(); if (pprint->can_print()) { @@ -1492,11 +1900,12 @@ srs_error_t SrsUdpMuxSender::cycle() pps_unit = "(k)"; pps_last /= 1000; pps_average /= 1000; } - srs_trace("-> RTC #%d SEND %d/%d/%" PRId64 ", pps %d/%d%s, schedule %d/%d, sessions %d, cache %d/%d by sendmmsg %d", - srs_netfd_fileno(lfd), pos, nn_msgs_max, nn_msgs, pps_average, pps_last, pps_unit.c_str(), nn_loop, nn_wait, - (int)server->nn_sessions(), (int)cache.size(), (int)hotspot.size(), max_sendmmsg); + srs_trace("-> RTC SEND #%d, sessions %d, udp %d/%d/%" PRId64 ", gso %d/%d/%" PRId64 ", iovs %d/%d/%" PRId64 ", pps %d/%d%s", + srs_netfd_fileno(lfd), (int)server->nn_sessions(), pos, nn_msgs_max, nn_msgs, gso_pos, nn_gso_msgs_max, nn_gso_msgs, gso_iovs, nn_gso_iovs_max, nn_gso_iovs, + pps_average, pps_last, pps_unit.c_str()); nn_msgs_last = nn_msgs; time_last = srs_get_system_time(); nn_loop = nn_wait = nn_msgs_max = 0; + nn_gso_msgs_max = 0; nn_gso_iovs_max = 0; } } @@ -1505,10 +1914,12 @@ srs_error_t SrsUdpMuxSender::cycle() srs_error_t SrsUdpMuxSender::on_reload_rtc_server() { - int v = _srs_config->get_rtc_server_sendmmsg(); - if (max_sendmmsg != v) { - max_sendmmsg = v; - srs_trace("Reload max_sendmmsg=%d", max_sendmmsg); + if (true) { + int v = _srs_config->get_rtc_server_sendmmsg(); + if (max_sendmmsg != v) { + srs_trace("Reload max_sendmmsg %d=>%d", max_sendmmsg, v); + max_sendmmsg = v; + } } return srs_success; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 3fef015c6..2ffe75441 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -105,7 +105,7 @@ public: srs_error_t on_dtls_application_data(const char* data, const int len); public: srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); - srs_error_t protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2* pkt); + srs_error_t protect_rtp2(void* rtp_hdr, int* len_ptr); srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); @@ -117,7 +117,35 @@ private: srs_error_t srtp_recv_init(); }; -class SrsRtcSenderThread : public ISrsCoroutineHandler +// A group of RTP packets. +class SrsRtcPackets +{ +public: + bool use_gso; + bool should_merge_nalus; +public: + // The total bytes of RTP packets. + int nn_bytes; + // The RTP packets send out by sendmmsg or sendmsg. Note that if many packets group to + // one msghdr by GSO, it's only one RTP packet, because we only send once. + int nn_rtp_pkts; + // For video, the samples or NALUs. + int nn_samples; + // For audio, the generated extra audio packets. + // For example, when transcoding AAC to opus, may many extra payloads for a audio. + int nn_extras; + // The original audio messages. + int nn_audios; + // The original video messages. + int nn_videos; +public: + std::vector packets; +public: + SrsRtcPackets(bool gso, bool merge_nalus); + virtual ~SrsRtcPackets(); +}; + +class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler { protected: SrsCoroutine* trd; @@ -136,11 +164,16 @@ private: uint16_t video_sequence; public: SrsUdpMuxSocket* sendonly_ukt; + bool merge_nalus; + bool gso; public: SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid); virtual ~SrsRtcSenderThread(); public: srs_error_t initialize(const uint32_t& vssrc, const uint32_t& assrc, const uint16_t& v_pt, const uint16_t& a_pt); +// interface ISrsReloadHandler +public: + virtual srs_error_t on_reload_rtc_server(); public: virtual int cid(); public: @@ -152,12 +185,15 @@ public: public: virtual srs_error_t cycle(); private: - srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts); - srs_error_t send_packet(SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt); + srs_error_t send_messages(SrsUdpMuxSocket* skt, SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); + srs_error_t messages_to_packets(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsRtcPackets& packets); + srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); + srs_error_t send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); private: srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); private: - srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& packets); + srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets); + srs_error_t packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets); srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket); srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket); }; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index d312d4ab2..d06d79a42 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -732,7 +732,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr // @see https://github.com/ossrs/srs/issues/257 if (realtime) { // for realtime, min required msgs is 0, send when got one+ msgs. - consumer->wait(0, mw_sleep); + consumer->wait(SRS_PERF_MW_MIN_MSGS_REALTIME, mw_sleep); } else { // for no-realtime, got some msgs then send. consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep); diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index be3874ff7..01abcd446 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -236,6 +236,8 @@ srs_error_t SrsStatisticClient::dumps(SrsJsonObject* obj) SrsStatisticCategory::SrsStatisticCategory() { + nn = 0; + a = 0; b = 0; c = 0; @@ -265,8 +267,10 @@ SrsStatistic::SrsStatistic() perf_iovs = new SrsStatisticCategory(); perf_msgs = new SrsStatisticCategory(); - perf_sys = new SrsStatisticCategory(); perf_sendmmsg = new SrsStatisticCategory(); + perf_gso = new SrsStatisticCategory(); + perf_rtp = new SrsStatisticCategory(); + perf_rtc = new SrsStatisticCategory(); } SrsStatistic::~SrsStatistic() @@ -303,8 +307,10 @@ SrsStatistic::~SrsStatistic() srs_freep(perf_iovs); srs_freep(perf_msgs); - srs_freep(perf_sys); srs_freep(perf_sendmmsg); + srs_freep(perf_gso); + srs_freep(perf_rtp); + srs_freep(perf_rtc); } SrsStatistic* SrsStatistic::instance() @@ -585,225 +591,135 @@ srs_error_t SrsStatistic::dumps_clients(SrsJsonArray* arr, int start, int count) return err; } -void SrsStatistic::perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs) +void SrsStatistic::perf_on_msgs(int nb_msgs) { - // For perf msgs, the nb_msgs stat. - // a: =1 - // b: <10 - // c: <100 - // d: <200 - // e: <300 - // f: <400 - // g: <500 - // h: <600 - // i: <1000 - // j: >=1000 - if (nb_msgs == 1) { - perf_msgs->a++; - } else if (nb_msgs < 10) { - perf_msgs->b++; - } else if (nb_msgs < 100) { - perf_msgs->c++; - } else if (nb_msgs < 200) { - perf_msgs->d++; - } else if (nb_msgs < 300) { - perf_msgs->e++; - } else if (nb_msgs < 400) { - perf_msgs->f++; - } else if (nb_msgs < 500) { - perf_msgs->g++; - } else if (nb_msgs < 600) { - perf_msgs->h++; - } else if (nb_msgs < 1000) { - perf_msgs->i++; - } else { - perf_msgs->j++; - } - - // For perf iovs, the nb_iovs stat. - // a: <=2 - // b: <10 - // c: <20 - // d: <200 - // e: <300 - // f: <500 - // g: <700 - // h: <900 - // i: <1024 - // j: >=1024 - if (nb_iovs <= 2) { - perf_iovs->a++; - } else if (nb_iovs < 10) { - perf_iovs->b++; - } else if (nb_iovs < 20) { - perf_iovs->c++; - } else if (nb_iovs < 200) { - perf_iovs->d++; - } else if (nb_iovs < 300) { - perf_iovs->e++; - } else if (nb_iovs < 500) { - perf_iovs->f++; - } else if (nb_iovs < 700) { - perf_iovs->g++; - } else if (nb_iovs < 900) { - perf_iovs->h++; - } else if (nb_iovs < 1024) { - perf_iovs->i++; - } else { - perf_iovs->j++; - } - - // Stat the syscalls. - // a: number of syscalls of msgs. - perf_sys->a++; + perf_on_packets(perf_msgs, nb_msgs); } -void SrsStatistic::perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs) +srs_error_t SrsStatistic::dumps_perf_msgs(SrsJsonObject* obj) { - // Stat the syscalls. - // a: number of syscalls of msgs. - // b: number of syscalls of pkts. - perf_sys->b++; + return dumps_perf(perf_msgs, obj); } -srs_error_t SrsStatistic::dumps_perf_writev(SrsJsonObject* obj) +void SrsStatistic::perf_on_rtc_packets(int nb_packets) { - srs_error_t err = srs_success; - - if (true) { - SrsJsonObject* p = SrsJsonAny::object(); - obj->set("msgs", p); - - // For perf msgs, the nb_msgs stat. - // a: =1 - // b: <10 - // c: <100 - // d: <200 - // e: <300 - // f: <400 - // g: <500 - // h: <600 - // i: <1000 - // j: >=1000 - p->set("lt_2", SrsJsonAny::integer(perf_msgs->a)); - p->set("lt_10", SrsJsonAny::integer(perf_msgs->b)); - p->set("lt_100", SrsJsonAny::integer(perf_msgs->c)); - p->set("lt_200", SrsJsonAny::integer(perf_msgs->d)); - p->set("lt_300", SrsJsonAny::integer(perf_msgs->e)); - p->set("lt_400", SrsJsonAny::integer(perf_msgs->f)); - p->set("lt_500", SrsJsonAny::integer(perf_msgs->g)); - p->set("lt_600", SrsJsonAny::integer(perf_msgs->h)); - p->set("lt_1000", SrsJsonAny::integer(perf_msgs->i)); - p->set("gt_1000", SrsJsonAny::integer(perf_msgs->j)); - } - - if (true) { - SrsJsonObject* p = SrsJsonAny::object(); - obj->set("iovs", p); - - // For perf iovs, the nb_iovs stat. - // a: <=2 - // b: <10 - // c: <20 - // d: <200 - // e: <300 - // f: <500 - // g: <700 - // h: <900 - // i: <1024 - // j: >=1024 - p->set("lt_3", SrsJsonAny::integer(perf_iovs->a)); - p->set("lt_10", SrsJsonAny::integer(perf_iovs->b)); - p->set("lt_20", SrsJsonAny::integer(perf_iovs->c)); - p->set("lt_200", SrsJsonAny::integer(perf_iovs->d)); - p->set("lt_300", SrsJsonAny::integer(perf_iovs->e)); - p->set("lt_500", SrsJsonAny::integer(perf_iovs->f)); - p->set("lt_700", SrsJsonAny::integer(perf_iovs->g)); - p->set("lt_900", SrsJsonAny::integer(perf_iovs->h)); - p->set("lt_1024", SrsJsonAny::integer(perf_iovs->i)); - p->set("gt_1024", SrsJsonAny::integer(perf_iovs->j)); - } - - if (true) { - SrsJsonObject* p = SrsJsonAny::object(); - obj->set("sys", p); - - // Stat the syscalls. - // a: number of syscalls of msgs. - // b: number of syscalls of pkts. - p->set("msgs", SrsJsonAny::integer(perf_sys->a)); - p->set("pkts", SrsJsonAny::integer(perf_sys->b)); - } - - return err; + perf_on_packets(perf_rtc, nb_packets); } -void SrsStatistic::perf_mw_on_packets(int nb_msgs) +srs_error_t SrsStatistic::dumps_perf_rtc_packets(SrsJsonObject* obj) { - // For perf msgs, the nb_msgs stat. - // a: =1 - // b: <10 - // c: <100 - // d: <200 - // e: <300 - // f: <400 - // g: <500 - // h: <600 - // i: <1000 - // j: >=1000 - if (nb_msgs == 1) { - perf_sendmmsg->a++; - } else if (nb_msgs < 10) { - perf_sendmmsg->b++; - } else if (nb_msgs < 100) { - perf_sendmmsg->c++; - } else if (nb_msgs < 200) { - perf_sendmmsg->d++; - } else if (nb_msgs < 300) { - perf_sendmmsg->e++; - } else if (nb_msgs < 400) { - perf_sendmmsg->f++; - } else if (nb_msgs < 500) { - perf_sendmmsg->g++; - } else if (nb_msgs < 600) { - perf_sendmmsg->h++; - } else if (nb_msgs < 1000) { - perf_sendmmsg->i++; - } else { - perf_sendmmsg->j++; - } + return dumps_perf(perf_rtc, obj); +} + +void SrsStatistic::perf_on_rtp_packets(int nb_packets) +{ + perf_on_packets(perf_rtp, nb_packets); +} + +srs_error_t SrsStatistic::dumps_perf_rtp_packets(SrsJsonObject* obj) +{ + return dumps_perf(perf_rtp, obj); +} + +void SrsStatistic::perf_on_gso_packets(int nb_packets) +{ + perf_on_packets(perf_gso, nb_packets); +} + +srs_error_t SrsStatistic::dumps_perf_gso(SrsJsonObject* obj) +{ + return dumps_perf(perf_gso, obj); +} + +void SrsStatistic::perf_on_writev_iovs(int nb_iovs) +{ + perf_on_packets(perf_iovs, nb_iovs); +} + +srs_error_t SrsStatistic::dumps_perf_writev_iovs(SrsJsonObject* obj) +{ + return dumps_perf(perf_iovs, obj); +} + +void SrsStatistic::perf_sendmmsg_on_packets(int nb_packets) +{ + perf_on_packets(perf_sendmmsg, nb_packets); } srs_error_t SrsStatistic::dumps_perf_sendmmsg(SrsJsonObject* obj) +{ + return dumps_perf(perf_sendmmsg, obj); +} + +void SrsStatistic::perf_on_packets(SrsStatisticCategory* p, int nb_msgs) +{ + // The range for stat: + // 2, 3, 5, 9, 16, 32, 64, 128, 256 + // that is: + // a: <2 + // b: <3 + // c: <5 + // d: <9 + // e: <16 + // f: <32 + // g: <64 + // h: <128 + // i: <256 + // j: >=256 + if (nb_msgs < 2) { + p->a++; + } else if (nb_msgs < 3) { + p->b++; + } else if (nb_msgs < 5) { + p->c++; + } else if (nb_msgs < 9) { + p->d++; + } else if (nb_msgs < 16) { + p->e++; + } else if (nb_msgs < 32) { + p->f++; + } else if (nb_msgs < 64) { + p->g++; + } else if (nb_msgs < 128) { + p->h++; + } else if (nb_msgs < 256) { + p->i++; + } else { + p->j++; + } + + p->nn += nb_msgs; +} + +srs_error_t SrsStatistic::dumps_perf(SrsStatisticCategory* p, SrsJsonObject* obj) { srs_error_t err = srs_success; - if (true) { - SrsJsonObject* p = SrsJsonAny::object(); - obj->set("msgs", p); + // The range for stat: + // 2, 3, 5, 9, 16, 32, 64, 128, 256 + // that is: + // a: <2 + // b: <3 + // c: <5 + // d: <9 + // e: <16 + // f: <32 + // g: <64 + // h: <128 + // i: <256 + // j: >=256 + if (p->a) obj->set("lt_2", SrsJsonAny::integer(p->a)); + if (p->b) obj->set("lt_3", SrsJsonAny::integer(p->b)); + if (p->c) obj->set("lt_5", SrsJsonAny::integer(p->c)); + if (p->d) obj->set("lt_9", SrsJsonAny::integer(p->d)); + if (p->e) obj->set("lt_16", SrsJsonAny::integer(p->e)); + if (p->f) obj->set("lt_32", SrsJsonAny::integer(p->f)); + if (p->g) obj->set("lt_64", SrsJsonAny::integer(p->g)); + if (p->h) obj->set("lt_128", SrsJsonAny::integer(p->h)); + if (p->i) obj->set("lt_256", SrsJsonAny::integer(p->i)); + if (p->j) obj->set("gt_256", SrsJsonAny::integer(p->j)); - // For perf msgs, the nb_msgs stat. - // a: =1 - // b: <10 - // c: <100 - // d: <200 - // e: <300 - // f: <400 - // g: <500 - // h: <600 - // i: <1000 - // j: >=1000 - p->set("lt_2", SrsJsonAny::integer(perf_sendmmsg->a)); - p->set("lt_10", SrsJsonAny::integer(perf_sendmmsg->b)); - p->set("lt_100", SrsJsonAny::integer(perf_sendmmsg->c)); - p->set("lt_200", SrsJsonAny::integer(perf_sendmmsg->d)); - p->set("lt_300", SrsJsonAny::integer(perf_sendmmsg->e)); - p->set("lt_400", SrsJsonAny::integer(perf_sendmmsg->f)); - p->set("lt_500", SrsJsonAny::integer(perf_sendmmsg->g)); - p->set("lt_600", SrsJsonAny::integer(perf_sendmmsg->h)); - p->set("lt_1000", SrsJsonAny::integer(perf_sendmmsg->i)); - p->set("gt_1000", SrsJsonAny::integer(perf_sendmmsg->j)); - } + obj->set("nn", SrsJsonAny::integer(p->nn)); return err; } diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 339e95b26..0d9905f75 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -124,6 +124,8 @@ public: class SrsStatisticCategory { +public: + uint64_t nn; public: uint64_t a; uint64_t b; @@ -168,8 +170,10 @@ private: // The perf stat for mw(merged write). SrsStatisticCategory* perf_iovs; SrsStatisticCategory* perf_msgs; - SrsStatisticCategory* perf_sys; SrsStatisticCategory* perf_sendmmsg; + SrsStatisticCategory* perf_gso; + SrsStatisticCategory* perf_rtp; + SrsStatisticCategory* perf_rtc; private: SrsStatistic(); virtual ~SrsStatistic(); @@ -228,19 +232,39 @@ public: // @param count the max count of clients to dump. virtual srs_error_t dumps_clients(SrsJsonArray* arr, int start, int count); public: - // Stat for packets merged written, nb_msgs is the number of RTMP messages, - // bytes_msgs is the total bytes of RTMP messages, nb_iovs is the total number of iovec. - virtual void perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs); - // Stat for packets merged written, nb_pkts is the number of or chunk packets, - // bytes_pkts is the total bytes of or chunk packets, nb_iovs is the total number of iovec. - virtual void perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs); - // Dumps the perf statistic data for TCP writev, for performance analysis. - virtual srs_error_t dumps_perf_writev(SrsJsonObject* obj); + // Stat for packets merged written, nb_msgs is the number of RTMP messages. + // For example, publish by FFMPEG, Audio and Video frames. + virtual void perf_on_msgs(int nb_msgs); + virtual srs_error_t dumps_perf_msgs(SrsJsonObject* obj); public: - // Stat for packets UDP sendmmsg, nb_msgs is the vlen for sendmmsg. - virtual void perf_mw_on_packets(int nb_msgs); + // Stat for packets merged written, nb_packets is the number of RTC packets. + // For example, a RTMP/AAC audio packet maybe transcoded to two RTC/opus packets. + virtual void perf_on_rtc_packets(int nb_packets); + virtual srs_error_t dumps_perf_rtc_packets(SrsJsonObject* obj); +public: + // Stat for packets merged written, nb_packets is the number of RTP packets. + // For example, a RTC/opus packet maybe package to three RTP packets. + virtual void perf_on_rtp_packets(int nb_packets); + // Dumps the perf statistic data for RTP packets, for performance analysis. + virtual srs_error_t dumps_perf_rtp_packets(SrsJsonObject* obj); +public: + // Stat for packets UDP GSO, nb_packets is the merged RTP packets. + // For example, three RTP/audio packets maybe GSO to one msghdr. + virtual void perf_on_gso_packets(int nb_packets); + // Dumps the perf statistic data for UDP GSO, for performance analysis. + virtual srs_error_t dumps_perf_gso(SrsJsonObject* obj); +public: + // Stat for TCP writev, nb_iovs is the total number of iovec. + virtual void perf_on_writev_iovs(int nb_iovs); + virtual srs_error_t dumps_perf_writev_iovs(SrsJsonObject* obj); +public: + // Stat for packets UDP sendmmsg, nb_packets is the vlen for sendmmsg. + virtual void perf_sendmmsg_on_packets(int nb_packets); // Dumps the perf statistic data for UDP sendmmsg, for performance analysis. virtual srs_error_t dumps_perf_sendmmsg(SrsJsonObject* obj); +private: + virtual void perf_on_packets(SrsStatisticCategory* p, int nb_msgs); + virtual srs_error_t dumps_perf(SrsStatisticCategory* p, SrsJsonObject* obj); private: virtual SrsStatisticVhost* create_vhost(SrsRequest* req); virtual SrsStatisticStream* create_stream(SrsStatisticVhost* vhost, SrsRequest* req); diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index c846214b2..725c1bff4 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -33,6 +33,11 @@ // The macros generated by configure script. #include +// Alias for debug. +#ifdef SRS_AUTO_DEBUG +#define SRS_DEBUG +#endif + // To convert macro values to string. // @see https://gcc.gnu.org/onlinedocs/cpp/Stringification.html#Stringification #define SRS_INTERNAL_STR(v) #v diff --git a/trunk/src/core/srs_core_performance.hpp b/trunk/src/core/srs_core_performance.hpp index 07c1e1539..7cafa055e 100644 --- a/trunk/src/core/srs_core_performance.hpp +++ b/trunk/src/core/srs_core_performance.hpp @@ -127,8 +127,12 @@ */ #define SRS_PERF_QUEUE_COND_WAIT #ifdef SRS_PERF_QUEUE_COND_WAIT + // For RTMP, use larger wait queue. #define SRS_PERF_MW_MIN_MSGS 8 - #define SRS_PERF_MW_MIN_MSGS_FOR_RTC 4 + #define SRS_PERF_MW_MIN_MSGS_REALTIME 0 + // For RTC, use smaller wait queue. + #define SRS_PERF_MW_MIN_MSGS_FOR_RTC 2 + #define SRS_PERF_MW_MIN_MSGS_FOR_RTC_REALTIME 0 #endif /** * the default value of vhost for diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 34fd22900..e47dfdc68 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -24,6 +24,6 @@ #ifndef SRS_CORE_VERSION4_HPP #define SRS_CORE_VERSION4_HPP -#define SRS_VERSION4_REVISION 22 +#define SRS_VERSION4_REVISION 23 #endif diff --git a/trunk/src/kernel/srs_kernel_codec.cpp b/trunk/src/kernel/srs_kernel_codec.cpp index a351dcc52..e0789c236 100644 --- a/trunk/src/kernel/srs_kernel_codec.cpp +++ b/trunk/src/kernel/srs_kernel_codec.cpp @@ -409,6 +409,14 @@ srs_error_t SrsSample::parse_bframe() return err; } +SrsSample* SrsSample::copy() +{ + SrsSample* p = new SrsSample(); + p->bytes = bytes; + p->size = size; + return p; +} + SrsCodecConfig::SrsCodecConfig() { } diff --git a/trunk/src/kernel/srs_kernel_codec.hpp b/trunk/src/kernel/srs_kernel_codec.hpp index 49447b5de..9c74e1387 100644 --- a/trunk/src/kernel/srs_kernel_codec.hpp +++ b/trunk/src/kernel/srs_kernel_codec.hpp @@ -542,6 +542,8 @@ public: public: // If we need to know whether sample is bframe, we have to parse the NALU payload. srs_error_t parse_bframe(); + // Copy sample, share the bytes pointer. + SrsSample* copy(); }; /** diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index 00e289feb..28b2d938b 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -383,7 +383,7 @@ void SrsSharedPtrMessage::set_extra_payloads(SrsSample* payloads, int nn_payload ptr->nn_extra_payloads = nn_payloads; ptr->extra_payloads = new SrsSample[nn_payloads]; - memcpy(ptr->extra_payloads, payloads, nn_payloads * sizeof(SrsSample)); + memcpy((void*)ptr->extra_payloads, payloads, nn_payloads * sizeof(SrsSample)); } void SrsSharedPtrMessage::set_samples(SrsSample* samples, int nn_samples) @@ -394,7 +394,7 @@ void SrsSharedPtrMessage::set_samples(SrsSample* samples, int nn_samples) ptr->nn_samples = nn_samples; ptr->samples = new SrsSample[nn_samples]; - memcpy(ptr->samples, samples, nn_samples * sizeof(SrsSample)); + memcpy((void*)ptr->samples, samples, nn_samples * sizeof(SrsSample)); } #endif diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index d00140005..ae044d4a2 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -229,6 +229,112 @@ srs_error_t SrsRtpRawPayload::encode(SrsBuffer* buf) return srs_success; } +SrsRtpRawNALUs::SrsRtpRawNALUs() +{ + cursor = 0; + nn_bytes = 0; +} + +SrsRtpRawNALUs::~SrsRtpRawNALUs() +{ + vector::iterator it; + for (it = nalus.begin(); it != nalus.end(); ++it) { + SrsSample* p = *it; + srs_freep(p); + } + nalus.clear(); +} + +void SrsRtpRawNALUs::push_back(SrsSample* sample) +{ + if (sample->size <= 0) { + return; + } + + if (!nalus.empty()) { + SrsSample* p = new SrsSample(); + p->bytes = (char*)"\0\0\1"; + p->size = 3; + nn_bytes += 3; + nalus.push_back(p); + } + + nn_bytes += sample->size; + nalus.push_back(sample); +} + +uint8_t SrsRtpRawNALUs::skip_first_byte() +{ + srs_assert (cursor >= 0 && nn_bytes > 0 && cursor < nn_bytes); + cursor++; + return uint8_t(nalus[0]->bytes[0]); +} + +srs_error_t SrsRtpRawNALUs::read_samples(vector& samples, int size) +{ + if (cursor + size < 0 || cursor + size > nn_bytes) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "cursor=%d, max=%d, size=%d", cursor, nn_bytes, size); + } + + int pos = cursor; + cursor += size; + int left = size; + + vector::iterator it; + for (it = nalus.begin(); it != nalus.end() && left > 0; ++it) { + SrsSample* p = *it; + + // Ignore previous consumed samples. + if (pos && pos - p->size >= 0) { + pos -= p->size; + continue; + } + + // Now, we are working at the sample. + int nn = srs_min(left, p->size - pos); + srs_assert(nn > 0); + + SrsSample* sample = new SrsSample(); + sample->bytes = p->bytes + pos; + sample->size = nn; + samples.push_back(sample); + + left -= nn; + pos = 0; + } + + return srs_success; +} + +int SrsRtpRawNALUs::nb_bytes() +{ + int size = 0; + + vector::iterator it; + for (it = nalus.begin(); it != nalus.end(); ++it) { + SrsSample* p = *it; + size += p->size; + } + + return size; +} + +srs_error_t SrsRtpRawNALUs::encode(SrsBuffer* buf) +{ + vector::iterator it; + for (it = nalus.begin(); it != nalus.end(); ++it) { + SrsSample* p = *it; + + if (!buf->require(p->size)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", p->size); + } + + buf->write_bytes(p->bytes, p->size); + } + + return srs_success; +} + SrsRtpSTAPPayload::SrsRtpSTAPPayload() { nri = (SrsAvcNaluType)0; @@ -339,7 +445,7 @@ srs_error_t SrsRtpFUAPayload::encode(SrsBuffer* buf) for (it = nalus.begin(); it != nalus.end(); ++it) { SrsSample* p = *it; if (!buf->require(p->size)) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2 + p->size); + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", p->size); } buf->write_bytes(p->bytes, p->size); diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index 975b2b463..7733b431f 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -95,6 +95,7 @@ public: virtual srs_error_t encode(SrsBuffer* buf); }; +// Single payload data. class SrsRtpRawPayload : public ISrsEncoder { public: @@ -110,6 +111,28 @@ public: virtual srs_error_t encode(SrsBuffer* buf); }; +// Multiple NALUs, automatically insert 001 between NALUs. +class SrsRtpRawNALUs : public ISrsEncoder +{ +private: + std::vector nalus; + int nn_bytes; + int cursor; +public: + SrsRtpRawNALUs(); + virtual ~SrsRtpRawNALUs(); +public: + void push_back(SrsSample* sample); +public: + uint8_t skip_first_byte(); + srs_error_t read_samples(std::vector& samples, int size); +// interface ISrsEncoder +public: + virtual int nb_bytes(); + virtual srs_error_t encode(SrsBuffer* buf); +}; + +// STAP-A, for multiple NALUs. class SrsRtpSTAPPayload : public ISrsEncoder { public: @@ -127,6 +150,7 @@ public: virtual srs_error_t encode(SrsBuffer* buf); }; +// FU-A, for one NALU with multiple fragments. class SrsRtpFUAPayload : public ISrsEncoder { public: diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp index 6c0889d3f..4b11e5ec3 100644 --- a/trunk/src/protocol/srs_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_rtmp_stack.cpp @@ -548,7 +548,8 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg // Notify about perf stat. if (perf) { - perf->perf_mw_on_msgs(nb_msgs_merged_written, bytes_msgs_merged_written, iov_index); + perf->perf_on_msgs(nb_msgs_merged_written); + perf->perf_on_writev_iovs(iov_index); nb_msgs_merged_written = 0; bytes_msgs_merged_written = 0; } @@ -576,7 +577,8 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg // Notify about perf stat. if (perf) { - perf->perf_mw_on_msgs(nb_msgs_merged_written, bytes_msgs_merged_written, iov_index); + perf->perf_on_msgs(nb_msgs_merged_written); + perf->perf_on_writev_iovs(iov_index); nb_msgs_merged_written = 0; bytes_msgs_merged_written = 0; } @@ -627,11 +629,6 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg if ((er = skt->writev(iovs, 2, NULL)) != srs_success) { return srs_error_wrap(err, "writev"); } - - // Notify about perf stat. - if (perf) { - perf->perf_mw_on_packets(1, payload_size, 2); - } } } diff --git a/trunk/src/protocol/srs_rtmp_stack.hpp b/trunk/src/protocol/srs_rtmp_stack.hpp index 470347cac..6f311383f 100644 --- a/trunk/src/protocol/srs_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_rtmp_stack.hpp @@ -155,11 +155,9 @@ public: virtual ~ISrsProtocolPerf(); public: // Stat for packets merged written, nb_msgs is the number of RTMP messages, - // bytes_msgs is the total bytes of RTMP messages, nb_iovs is the total number of iovec. - virtual void perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs) = 0; - // Stat for packets merged written, nb_pkts is the number of or chunk packets, - // bytes_pkts is the total bytes of or chunk packets, nb_iovs is the total number of iovec. - virtual void perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs) = 0; + virtual void perf_on_msgs(int nb_msgs) = 0; + // Stat for TCP writev, nb_iovs is the total number of iovec. + virtual void perf_on_writev_iovs(int nb_iovs) = 0; }; // The protocol provides the rtmp-message-protocol services,