1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Cache RTP packets

This commit is contained in:
winlin 2020-04-15 22:46:06 +08:00
parent 7b68f55edc
commit 1064429c7e
4 changed files with 94 additions and 53 deletions

View file

@ -469,6 +469,8 @@ SrsRtcPackets::SrsRtcPackets()
nn_audios = nn_extras = 0; nn_audios = nn_extras = 0;
nn_videos = nn_samples = 0; nn_videos = nn_samples = 0;
nn_paddings = 0; nn_paddings = 0;
cursor = 0;
} }
SrsRtcPackets::~SrsRtcPackets() SrsRtcPackets::~SrsRtcPackets()
@ -481,12 +483,38 @@ void SrsRtcPackets::reset(bool gso, bool merge_nalus)
use_gso = gso; use_gso = gso;
should_merge_nalus = merge_nalus; should_merge_nalus = merge_nalus;
vector<SrsRtpPacket2*>::iterator it; for (int i = 0; i < cursor; i++) {
for (it = packets.begin(); it != packets.end(); ++it ) { SrsRtpPacket2* packet = packets[i];
SrsRtpPacket2* packet = *it; packet->reset();
srs_freep(packet);
} }
packets.clear();
cursor = 0;
}
SrsRtpPacket2* SrsRtcPackets::fetch()
{
if (cursor >= (int)packets.size()) {
packets.push_back(new SrsRtpPacket2());
}
return packets[cursor++];
}
SrsRtpPacket2* SrsRtcPackets::back()
{
srs_assert(cursor > 0);
return packets[cursor - 1];
}
int SrsRtcPackets::size()
{
return cursor;
}
SrsRtpPacket2* SrsRtcPackets::at(int index)
{
srs_assert(index < cursor);
return packets[index];
} }
SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid) SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid)
@ -684,12 +712,12 @@ srs_error_t SrsRtcSenderThread::cycle()
int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos; int nn_rtc_packets = srs_max(pkts.nn_audios, pkts.nn_extras) + pkts.nn_videos;
stat->perf_on_rtc_packets(nn_rtc_packets); stat->perf_on_rtc_packets(nn_rtc_packets);
// Stat the RAW RTP packets, which maybe group by GSO. // Stat the RAW RTP packets, which maybe group by GSO.
stat->perf_on_rtp_packets(pkts.packets.size()); stat->perf_on_rtp_packets(pkts.size());
// Stat the RTP packets going into kernel. // Stat the RTP packets going into kernel.
stat->perf_on_gso_packets(pkts.nn_rtp_pkts); stat->perf_on_gso_packets(pkts.nn_rtp_pkts);
#if defined(SRS_DEBUG) #if defined(SRS_DEBUG)
srs_trace("RTC PLAY done, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d bytes", srs_trace("RTC PLAY done, msgs %d/%d, rtp %d, gso %d, %d audios, %d extras, %d videos, %d samples, %d bytes",
msg_count, nn_rtc_packets, pkts.packets.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, msg_count, nn_rtc_packets, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos,
pkts.nn_samples, pkts.nn_bytes); pkts.nn_samples, pkts.nn_bytes);
#endif #endif
@ -697,7 +725,7 @@ srs_error_t SrsRtcSenderThread::cycle()
if (pprint->can_print()) { if (pprint->can_print()) {
// TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets.
srs_trace("-> RTC PLAY %d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d bytes, %d pad", srs_trace("-> RTC PLAY %d msgs, %d/%d packets, %d audios, %d extras, %d videos, %d samples, %d bytes, %d pad",
msg_count, pkts.packets.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos, msg_count, pkts.size(), pkts.nn_rtp_pkts, pkts.nn_audios, pkts.nn_extras, pkts.nn_videos,
pkts.nn_samples, pkts.nn_bytes, pkts.nn_paddings); pkts.nn_samples, pkts.nn_bytes, pkts.nn_paddings);
} }
} }
@ -753,16 +781,14 @@ srs_error_t SrsRtcSenderThread::messages_to_packets(
packets.nn_samples += nn_samples; packets.nn_samples += nn_samples;
// For audio, we transcoded AAC to opus in extra payloads. // For audio, we transcoded AAC to opus in extra payloads.
SrsRtpPacket2* packet = NULL;
if (msg->is_audio()) { if (msg->is_audio()) {
packets.nn_audios++; packets.nn_audios++;
for (int i = 0; i < nn_extra_payloads; i++) { for (int i = 0; i < nn_extra_payloads; i++) {
SrsSample* sample = msg->extra_payloads() + i; SrsSample* sample = msg->extra_payloads() + i;
if ((err = packet_opus(sample, &packet)) != srs_success) { if ((err = packet_opus(sample, packets)) != srs_success) {
return srs_error_wrap(err, "opus package"); return srs_error_wrap(err, "opus package");
} }
packets.packets.push_back(packet);
} }
continue; continue;
} }
@ -772,10 +798,9 @@ srs_error_t SrsRtcSenderThread::messages_to_packets(
// Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A. // Well, for each IDR, we append a SPS/PPS before it, which is packaged in STAP-A.
if (msg->has_idr()) { if (msg->has_idr()) {
if ((err = packet_stap_a(source, msg, &packet)) != srs_success) { if ((err = packet_stap_a(source, msg, packets)) != srs_success) {
return srs_error_wrap(err, "packet stap-a"); return srs_error_wrap(err, "packet stap-a");
} }
packets.packets.push_back(packet);
} }
// If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet. // If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet.
@ -797,10 +822,9 @@ srs_error_t SrsRtcSenderThread::messages_to_packets(
} }
if (sample->size <= kRtpMaxPayloadSize) { if (sample->size <= kRtpMaxPayloadSize) {
if ((err = packet_single_nalu(msg, sample, &packet)) != srs_success) { if ((err = packet_single_nalu(msg, sample, packets)) != srs_success) {
return srs_error_wrap(err, "packet single nalu"); return srs_error_wrap(err, "packet single nalu");
} }
packets.packets.push_back(packet);
} else { } else {
if ((err = packet_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) { if ((err = packet_fu_a(msg, sample, kRtpMaxPayloadSize, packets)) != srs_success) {
return srs_error_wrap(err, "packet fu-a"); return srs_error_wrap(err, "packet fu-a");
@ -808,7 +832,7 @@ srs_error_t SrsRtcSenderThread::messages_to_packets(
} }
if (i == nn_samples - 1) { if (i == nn_samples - 1) {
packets.packets.back()->rtp_header.set_marker(true); packets.back()->rtp_header.set_marker(true);
} }
} }
} }
@ -822,9 +846,9 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets
ISrsUdpSender* sender = skt->sender(); ISrsUdpSender* sender = skt->sender();
vector<SrsRtpPacket2*>::iterator it; int nn_packets = packets.size();
for (it = packets.packets.begin(); it != packets.packets.end(); ++it) { for (int i = 0; i < nn_packets; i++) {
SrsRtpPacket2* packet = *it; SrsRtpPacket2* packet = packets.at(i);
// Fetch a cached message from queue. // Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
@ -832,6 +856,17 @@ srs_error_t SrsRtcSenderThread::send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets
if ((err = sender->fetch(&mhdr)) != srs_success) { if ((err = sender->fetch(&mhdr)) != srs_success) {
return srs_error_wrap(err, "fetch msghdr"); return srs_error_wrap(err, "fetch msghdr");
} }
// Reset the iovec, we should never change the msg_iovlen.
for (int j = 0; j < (int)mhdr->msg_hdr.msg_iovlen; j++) {
iovec* p = mhdr->msg_hdr.msg_iov + j;
if (!p->iov_len) {
break;
}
p->iov_len = 0;
}
char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base;
int length = kRtpPacketSize; int length = kRtpPacketSize;
@ -882,15 +917,15 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac
bool use_gso = false; bool gso_final = false; bool use_gso = false; bool gso_final = false;
ISrsUdpSender* sender = skt->sender(); ISrsUdpSender* sender = skt->sender();
int nn_packets = (int)packets.packets.size(); int nn_packets = packets.size();
for (int i = 0; i < nn_packets; i++) { for (int i = 0; i < nn_packets; i++) {
SrsRtpPacket2* packet = packets.packets[i]; SrsRtpPacket2* packet = packets.at(i);
int nn_packet = packet->nb_bytes(); int nn_packet = packet->nb_bytes();
SrsRtpPacket2* next_packet = NULL; SrsRtpPacket2* next_packet = NULL;
int nn_next_packet = 0; int nn_next_packet = 0;
if (i < nn_packets - 1) { if (i < nn_packets - 1) {
next_packet = (i < nn_packets - 1)? packets.packets[i + 1]:NULL; next_packet = (i < nn_packets - 1)? packets.at(i + 1):NULL;
nn_next_packet = next_packet? next_packet->nb_bytes() : 0; nn_next_packet = next_packet? next_packet->nb_bytes() : 0;
} }
@ -1075,7 +1110,7 @@ srs_error_t SrsRtcSenderThread::send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPac
} }
#if defined(SRS_DEBUG) #if defined(SRS_DEBUG)
srs_trace("RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d, pad %d", packets.packets.size(), packets.nn_rtp_pkts, srs_trace("RTC PLAY summary, rtp %d/%d, videos %d/%d, audios %d/%d, pad %d", packets.size(), packets.nn_rtp_pkts,
packets.nn_videos, packets.nn_samples, packets.nn_audios, packets.nn_extras, packets.nn_paddings); packets.nn_videos, packets.nn_samples, packets.nn_audios, packets.nn_extras, packets.nn_paddings);
#endif #endif
@ -1109,13 +1144,12 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac
if (nn_bytes < kRtpMaxPayloadSize) { if (nn_bytes < kRtpMaxPayloadSize) {
// Package NALUs in a single RTP packet. // Package NALUs in a single RTP packet.
SrsRtpPacket2* packet = new SrsRtpPacket2(); SrsRtpPacket2* packet = packets.fetch();
packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_sequence(video_sequence++);
packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_ssrc(video_ssrc);
packet->rtp_header.set_payload_type(video_payload_type); packet->rtp_header.set_payload_type(video_payload_type);
packet->payload = raw; packet->payload = raw;
packets.packets.push_back(packet);
} else { } else {
SrsAutoFree(SrsRtpRawNALUs, raw); SrsAutoFree(SrsRtpRawNALUs, raw);
@ -1131,15 +1165,19 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac
for (int i = 0; i < num_of_packet; ++i) { for (int i = 0; i < num_of_packet; ++i) {
int packet_size = srs_min(nb_left, fu_payload_size); int packet_size = srs_min(nb_left, fu_payload_size);
SrsRtpPacket2* packet = new SrsRtpPacket2(); SrsRtpFUAPayload* fua = new SrsRtpFUAPayload();
packets.packets.push_back(packet); if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) {
srs_freep(fua);
return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes);
}
SrsRtpPacket2* packet = packets.fetch();
packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_sequence(video_sequence++);
packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_ssrc(video_ssrc);
packet->rtp_header.set_payload_type(video_payload_type); packet->rtp_header.set_payload_type(video_payload_type);
SrsRtpFUAPayload* fua = new SrsRtpFUAPayload();
packet->payload = fua; packet->payload = fua;
fua->nri = (SrsAvcNaluType)header; fua->nri = (SrsAvcNaluType)header;
@ -1147,26 +1185,22 @@ srs_error_t SrsRtcSenderThread::packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPac
fua->start = bool(i == 0); fua->start = bool(i == 0);
fua->end = bool(i == num_of_packet - 1); fua->end = bool(i == num_of_packet - 1);
if ((err = raw->read_samples(fua->nalus, packet_size)) != srs_success) {
return srs_error_wrap(err, "read samples %d bytes, left %d, total %d", packet_size, nb_left, nn_bytes);
}
nb_left -= packet_size; nb_left -= packet_size;
} }
} }
if (!packets.packets.empty()) { if (packets.size() > 0) {
packets.packets.back()->rtp_header.set_marker(true); packets.back()->rtp_header.set_marker(true);
} }
return err; return err;
} }
srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket) srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtcPackets& packets)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
SrsRtpPacket2* packet = new SrsRtpPacket2(); SrsRtpPacket2* packet = packets.fetch();
packet->rtp_header.set_marker(true); packet->rtp_header.set_marker(true);
packet->rtp_header.set_timestamp(audio_timestamp); packet->rtp_header.set_timestamp(audio_timestamp);
packet->rtp_header.set_sequence(audio_sequence++); packet->rtp_header.set_sequence(audio_sequence++);
@ -1181,8 +1215,6 @@ srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** p
// TODO: FIXME: Why 960? Need Refactoring? // TODO: FIXME: Why 960? Need Refactoring?
audio_timestamp += 960; audio_timestamp += 960;
*ppacket = packet;
return err; return err;
} }
@ -1199,8 +1231,7 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample*
for (int i = 0; i < num_of_packet; ++i) { for (int i = 0; i < num_of_packet; ++i) {
int packet_size = srs_min(nb_left, fu_payload_size); int packet_size = srs_min(nb_left, fu_payload_size);
SrsRtpPacket2* packet = new SrsRtpPacket2(); SrsRtpPacket2* packet = packets.fetch();
packets.packets.push_back(packet);
packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_sequence(video_sequence++);
@ -1228,11 +1259,11 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample*
} }
// Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6 // Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6
srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket) srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
SrsRtpPacket2* packet = new SrsRtpPacket2(); SrsRtpPacket2* packet = packets.fetch();
packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_sequence(video_sequence++);
packet->rtp_header.set_ssrc(video_ssrc); packet->rtp_header.set_ssrc(video_ssrc);
@ -1246,12 +1277,10 @@ srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, Srs
p->size = sample->size; p->size = sample->size;
raw->push_back(p); raw->push_back(p);
*ppacket = packet;
return err; return err;
} }
srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket) srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -1271,7 +1300,7 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes
return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty"); return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty");
} }
SrsRtpPacket2* packet = new SrsRtpPacket2(); SrsRtpPacket2* packet = packets.fetch();
packet->rtp_header.set_marker(false); packet->rtp_header.set_marker(false);
packet->rtp_header.set_timestamp(msg->timestamp * 90); packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++); packet->rtp_header.set_sequence(video_sequence++);
@ -1298,8 +1327,6 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes
stap->nalus.push_back(sample); stap->nalus.push_back(sample);
} }
*ppacket = packet;
return err; return err;
} }

View file

@ -140,13 +140,18 @@ public:
int nn_videos; int nn_videos;
// The number of padded packet. // The number of padded packet.
int nn_paddings; int nn_paddings;
public: private:
int cursor;
std::vector<SrsRtpPacket2*> packets; std::vector<SrsRtpPacket2*> packets;
public: public:
SrsRtcPackets(); SrsRtcPackets();
virtual ~SrsRtcPackets(); virtual ~SrsRtcPackets();
public: public:
void reset(bool gso, bool merge_nalus); void reset(bool gso, bool merge_nalus);
SrsRtpPacket2* fetch();
SrsRtpPacket2* back();
int size();
SrsRtpPacket2* at(int index);
}; };
class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
@ -195,12 +200,12 @@ private:
srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); srs_error_t send_packets(SrsUdpMuxSocket* skt, SrsRtcPackets& packets);
srs_error_t send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets); srs_error_t send_packets_gso(SrsUdpMuxSocket* skt, SrsRtcPackets& packets);
private: private:
srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); srs_error_t packet_opus(SrsSample* sample, SrsRtcPackets& packets);
private: private:
srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets); srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, SrsRtcPackets& packets);
srs_error_t packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets); srs_error_t packet_nalus(SrsSharedPtrMessage* msg, SrsRtcPackets& packets);
srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket); srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtcPackets& packets);
srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket); srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtcPackets& packets);
}; };
class SrsRtcSession class SrsRtcSession

View file

@ -171,6 +171,13 @@ void SrsRtpPacket2::set_padding(int size)
padding = size; padding = size;
} }
void SrsRtpPacket2::reset()
{
memset((void*)&rtp_header, 0, sizeof(SrsRtpHeader));
padding = 0;
srs_freep(payload);
}
int SrsRtpPacket2::nb_bytes() int SrsRtpPacket2::nb_bytes()
{ {
return rtp_header.header_size() + (payload? payload->nb_bytes():0) + padding; return rtp_header.header_size() + (payload? payload->nb_bytes():0) + padding;

View file

@ -88,7 +88,9 @@ public:
virtual ~SrsRtpPacket2(); virtual ~SrsRtpPacket2();
public: public:
// Append size of bytes as padding. // Append size of bytes as padding.
virtual void set_padding(int size); void set_padding(int size);
// Reset RTP packet.
void reset();
// interface ISrsEncoder // interface ISrsEncoder
public: public:
virtual int nb_bytes(); virtual int nb_bytes();