1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

RTC: Remove GSO for player, no premature optimization

This commit is contained in:
winlin 2020-05-21 14:06:46 +08:00
parent 5bd2812405
commit 4f6b24ea12
9 changed files with 13 additions and 375 deletions

View file

@ -33,13 +33,6 @@ using namespace std;
#include <fcntl.h>
#include <unistd.h>
#include <netinet/udp.h>
// Define macro for UDP GSO.
// @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/udpgso.c
#ifndef UDP_SEGMENT
#define UDP_SEGMENT 103
#endif
#include <sstream>
#include <srs_core_autofree.hpp>
@ -483,7 +476,6 @@ SrsRtcOutgoingInfo::SrsRtcOutgoingInfo()
debug_id = 0;
#endif
use_gso = false;
nn_rtp_pkts = 0;
nn_audios = nn_extras = 0;
nn_videos = nn_samples = 0;
@ -503,7 +495,6 @@ SrsRtcPlayer::SrsRtcPlayer(SrsRtcSession* s, int parent_cid)
session_ = s;
gso = false;
max_padding = 0;
audio_timestamp = 0;
@ -543,22 +534,20 @@ srs_error_t SrsRtcPlayer::initialize(const uint32_t& vssrc, const uint32_t& assr
video_payload_type = v_pt;
audio_payload_type = a_pt;
gso = _srs_config->get_rtc_server_gso();
max_padding = _srs_config->get_rtc_server_padding();
// TODO: FIXME: Support reload.
nack_enabled_ = _srs_config->get_rtc_nack_enabled(session_->req->vhost);
srs_trace("RTC publisher video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), gso=%d, padding=%d, nack=%d",
video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, gso, max_padding, nack_enabled_);
srs_trace("RTC publisher video(ssrc=%d, pt=%d), audio(ssrc=%d, pt=%d), padding=%d, nack=%d",
video_ssrc, video_payload_type, audio_ssrc, audio_payload_type, max_padding, nack_enabled_);
return err;
}
srs_error_t SrsRtcPlayer::on_reload_rtc_server()
{
gso = _srs_config->get_rtc_server_gso();
max_padding = _srs_config->get_rtc_server_padding();
srs_trace("Reload rtc_server gso=%d, max_padding=%d", gso, max_padding);
srs_trace("Reload rtc_server max_padding=%d", max_padding);
return srs_success;
}
@ -698,8 +687,6 @@ srs_error_t SrsRtcPlayer::cycle()
stat->perf_on_rtc_packets(nn_rtc_packets);
// Stat the RAW RTP packets, which maybe group by GSO.
stat->perf_on_rtp_packets(msg_count);
// Stat the RTP packets going into kernel.
stat->perf_on_gso_packets(info.nn_rtp_pkts);
// Stat the bytes and paddings.
stat->perf_on_rtc_bytes(info.nn_bytes, info.nn_rtp_bytes, info.nn_padding_bytes);
// Stat the messages and dropped count.
@ -729,18 +716,6 @@ srs_error_t SrsRtcPlayer::send_messages(SrsRtcSource* source, const vector<SrsRt
return srs_error_wrap(err, "messages to packets");
}
#ifndef SRS_OSX
// If enabled GSO, send out some packets in a msghdr.
// @remark When NACK simulator is on, we don't use GSO.
// TODO: FIXME: Support GSO.
if (info.use_gso && !nn_simulate_nack_drop) {
if ((err = send_packets_gso(pkts, info)) != srs_success) {
return srs_error_wrap(err, "gso send");
}
return err;
}
#endif
// By default, we send packets by sendmmsg.
if ((err = send_packets(pkts, info)) != srs_success) {
return srs_error_wrap(err, "raw send");
@ -890,230 +865,6 @@ srs_error_t SrsRtcPlayer::send_packets(const std::vector<SrsRtpPacket2*>& pkts,
return err;
}
// TODO: FIXME: We can gather and pad audios, because they have similar size.
srs_error_t SrsRtcPlayer::send_packets_gso(const vector<SrsRtpPacket2*>& pkts, SrsRtcOutgoingInfo& info)
{
srs_error_t err = srs_success;
// Cache the encrypt flag and sender.
bool encrypt = session_->encrypt;
ISrsUdpSender* sender = session_->sendonly_skt->sender();
// Previous handler, if has the same size, we can use GSO.
srs_mmsghdr* gso_mhdr = NULL; int gso_size = 0; int gso_encrypt = 0; int gso_cursor = 0;
// GSO, N packets has same length, the final one may not.
bool using_gso = false; bool gso_final = false;
// The message will marshal in iovec.
iovec* iov = NULL;
int nn_packets = pkts.size();
for (int i = 0; i < nn_packets; i++) {
SrsRtpPacket2* packet = pkts.at(i);
int nn_packet = packet->nb_bytes();
int padding = 0;
SrsRtpPacket2* next_packet = NULL;
int nn_next_packet = 0;
if (max_padding > 0) {
if (i < nn_packets - 1) {
next_packet = (i < nn_packets - 1)? pkts.at(i + 1):NULL;
nn_next_packet = next_packet? next_packet->nb_bytes() : 0;
}
// Padding the packet to next or GSO size.
if (next_packet) {
if (!using_gso) {
// Padding to the next packet to merge with it.
if (nn_next_packet > nn_packet) {
padding = nn_next_packet - nn_packet;
}
} else {
// Padding to GSO size for next one to merge with us.
if (nn_next_packet < gso_size) {
padding = gso_size - nn_packet;
}
}
// Reset padding if exceed max.
if (padding > max_padding) {
padding = 0;
}
if (padding > 0) {
#if defined(SRS_DEBUG)
srs_trace("#%d, Padding %d bytes %d=>%d, packets %d, max_padding %d", info.debug_id,
padding, nn_packet, nn_packet + padding, nn_packets, max_padding);
#endif
packet->add_padding(padding);
nn_packet += padding;
info.nn_paddings++;
info.nn_padding_bytes += padding;
}
}
}
// Check whether we can use GSO to send it.
if (using_gso && !gso_final) {
gso_final = (gso_size != nn_packet);
}
if (next_packet) {
// If not GSO, maybe the first fresh packet, we should see whether the next packet is smaller than this one,
// if smaller, we can still enter GSO.
if (!using_gso) {
using_gso = (nn_packet >= nn_next_packet);
}
// If GSO, but next is bigger than this one, we must enter the final state.
if (using_gso && !gso_final) {
gso_final = (nn_packet < nn_next_packet);
}
}
// For GSO, reuse mhdr if possible.
srs_mmsghdr* mhdr = gso_mhdr;
if (!mhdr) {
// Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
if ((err = sender->fetch(&mhdr)) != srs_success) {
return srs_error_wrap(err, "fetch msghdr");
}
// Now, GSO will use this message and size.
gso_mhdr = mhdr;
gso_size = nn_packet;
}
// For this message, select a new iovec.
if (!iov) {
iov = mhdr->msg_hdr.msg_iov;
} else {
iov++;
}
gso_cursor++;
mhdr->msg_hdr.msg_iovlen = gso_cursor;
if (gso_cursor > SRS_PERF_RTC_GSO_IOVS && !iov->iov_base) {
iov->iov_base = new char[kRtpPacketSize];
}
iov->iov_len = kRtpPacketSize;
// Marshal packet to bytes in iovec.
if (true) {
SrsBuffer stream((char*)iov->iov_base, iov->iov_len);
if ((err = packet->encode(&stream)) != srs_success) {
return srs_error_wrap(err, "encode packet");
}
iov->iov_len = stream.pos();
}
// Whether encrypt the RTP bytes.
if (encrypt) {
int nn_encrypt = (int)iov->iov_len;
if ((err = session_->dtls_->protect_rtp2(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;
}
// Put final RTP packet to NACK/ARQ queue.
if (nack_enabled_) {
SrsRtpPacket2* nack = new SrsRtpPacket2();
nack->header = packet->header;
// TODO: FIXME: Should avoid memory copying.
SrsRtpRawPayload* payload = new SrsRtpRawPayload();
nack->payload = payload;
payload->nn_payload = (int)iov->iov_len;
payload->payload = new char[payload->nn_payload];
memcpy((void*)payload->payload, iov->iov_base, iov->iov_len);
if (nack->header.get_ssrc() == video_ssrc) {
video_queue_->set(nack->header.get_sequence(), nack);
} else {
audio_queue_->set(nack->header.get_sequence(), nack);
}
}
info.nn_rtp_bytes += (int)iov->iov_len;
// If GSO, they must has same size, except the final one.
if (using_gso && !gso_final && gso_encrypt && gso_encrypt != (int)iov->iov_len) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "GSO size=%d/%d, encrypt=%d/%d", gso_size, nn_packet, gso_encrypt, iov->iov_len);
}
if (using_gso && !gso_final) {
gso_encrypt = iov->iov_len;
}
// If exceed the max GSO size, set to final.
if (using_gso && gso_cursor + 1 >= SRS_PERF_RTC_GSO_MAX) {
gso_final = true;
}
// For last message, or final gso, or determined not using GSO, send it now.
bool do_send = (i == nn_packets - 1 || gso_final || !using_gso);
#if defined(SRS_DEBUG)
bool is_video = packet->header.get_payload_type() == video_payload_type;
srs_trace("#%d, Packet %s SSRC=%d, SN=%d, %d/%d bytes", info.debug_id, is_video? "Video":"Audio",
packet->header.get_ssrc(), packet->header.get_sequence(), nn_packet - padding, padding);
if (do_send) {
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* iov = mhdr->msg_hdr.msg_iov + j;
srs_trace("#%d, %s #%d/%d/%d, %d/%d bytes, size %d/%d", info.debug_id, (using_gso? "GSO":"RAW"), j,
gso_cursor + 1, mhdr->msg_hdr.msg_iovlen, iov->iov_len, padding, gso_size, gso_encrypt);
}
}
#endif
if (do_send) {
// Set the address and control information.
sockaddr_in* addr = (sockaddr_in*)session_->sendonly_skt->peer_addr();
socklen_t addrlen = (socklen_t)session_->sendonly_skt->peer_addrlen();
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
mhdr->msg_hdr.msg_controllen = 0;
#ifndef SRS_OSX
if (using_gso) {
mhdr->msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
if (!mhdr->msg_hdr.msg_control) {
mhdr->msg_hdr.msg_control = new char[mhdr->msg_hdr.msg_controllen];
}
cmsghdr* cm = CMSG_FIRSTHDR(&mhdr->msg_hdr);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t*)CMSG_DATA(cm)) = gso_encrypt;
}
#endif
// When we send out a packet, we commit a RTP packet.
info.nn_rtp_pkts++;
if ((err = sender->sendmmsg(mhdr)) != srs_success) {
return srs_error_wrap(err, "send msghdr");
}
// Reset the GSO flag.
gso_mhdr = NULL; gso_size = 0; gso_encrypt = 0; gso_cursor = 0;
using_gso = gso_final = false; iov = NULL;
}
}
#if defined(SRS_DEBUG)
srs_trace("#%d, RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d, pad %d/%d/%d", info.debug_id, pkts.size(),
info.nn_rtp_pkts, info.nn_videos, info.nn_samples, info.nn_audios, info.nn_extras, info.nn_paddings,
info.nn_padding_bytes, info.nn_rtp_bytes);
#endif
return err;
}
void SrsRtcPlayer::nack_fetch(vector<SrsRtpPacket2*>& pkts, uint32_t ssrc, uint16_t seq)
{
SrsRtpPacket2* pkt = NULL;