1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 20:01:56 +00:00

For #307, zero copy for RTP FUA packet

This commit is contained in:
winlin 2020-04-12 01:01:39 +08:00
parent aa81b47c9a
commit ca027ca5cb
5 changed files with 147 additions and 109 deletions

View file

@ -48,13 +48,6 @@ const int kRtpPacketSize = 1500;
const uint8_t kOpusPayloadType = 111;
const uint8_t kH264PayloadType = 102;
// @see: https://tools.ietf.org/html/rfc6184#section-5.2
const uint8_t kFuA = 28;
// @see: https://tools.ietf.org/html/rfc6184#section-5.8
const uint8_t kStart = 0x80; // Fu-header start bit
const uint8_t kEnd = 0x40; // Fu-header end bit
const int kChannel = 2;
const int kSamplerate = 48000;

View file

@ -641,8 +641,6 @@ srs_error_t SrsRtcSenderThread::send_messages(
*pnn_rtp_pkts += 1;
}
} else {
vector<SrsRtpSharedPacket*> rtp_packets;
for (int i = 0; i < msg->nn_samples(); i++) {
SrsSample* sample = msg->samples() + i;
@ -668,90 +666,51 @@ srs_error_t SrsRtcSenderThread::send_messages(
*pnn_rtp_pkts += 1;
}
vector<SrsRtpPacket2*> packets;
if (sample->size <= kRtpMaxPayloadSize) {
if ((err = packet_single_nalu(msg, sample, rtp_packets)) != srs_success) {
SrsRtpPacket2* packet = NULL;
if ((err = packet_single_nalu(msg, sample, &packet)) != srs_success) {
return srs_error_wrap(err, "packet single nalu");
}
packets.push_back(packet);
} else {
if ((err = packet_fu_a(msg, sample, rtp_packets)) != srs_success) {
if ((err = packet_fu_a(msg, sample, packets)) != srs_success) {
return srs_error_wrap(err, "packet fu-a");
}
}
}
if (!rtp_packets.empty()) {
// At the end of the frame, set marker bit.
// One frame may have multi nals. Set the marker bit in the last nal end, no the end of the nal.
if ((err = rtp_packets.back()->modify_rtp_header_marker(true)) != srs_success) {
return srs_error_wrap(err, "set marker");
if (i == msg->nn_samples() - 1) {
packets.back()->rtp_header.set_marker(true);
}
}
int nn_rtp_pkts = (int)rtp_packets.size();
for (int j = 0; j < nn_rtp_pkts; j++) {
SrsRtpSharedPacket* pkt = rtp_packets[j];
if ((err = send_message(msg, is_video, is_audio, pkt, skt)) != srs_success) {
return srs_error_wrap(err, "send message");
for (int j = 0; j < (int)packets.size(); j++) {
SrsRtpPacket2* packet = packets[j];
err = send_message2(msg, is_video, is_audio, packet, skt);
srs_freep(packet);
if (err != srs_success) {
return srs_error_wrap(err, "send message");
}
}
srs_freep(pkt);
*pnn_rtp_pkts += (int)packets.size();
}
*pnn_rtp_pkts += nn_rtp_pkts;
}
}
return err;
}
srs_error_t SrsRtcSenderThread::send_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* skt)
{
srs_error_t err = srs_success;
if (is_video) {
pkt->modify_rtp_header_payload_type(video_payload_type);
pkt->modify_rtp_header_ssrc(video_ssrc);
srs_verbose("send video, ssrc=%u, seq=%u, timestamp=%u", video_ssrc, pkt->rtp_header.get_sequence(), pkt->rtp_header.get_timestamp());
} else if (is_audio) {
pkt->modify_rtp_header_payload_type(audio_payload_type);
pkt->modify_rtp_header_ssrc(audio_ssrc);
}
int length = pkt->size;
// Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
mmsghdr* mhdr = rtc_session->rtc_server->fetch();
char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base;
if (rtc_session->encrypt) {
if ((err = rtc_session->dtls_session->protect_rtp(buf, pkt->payload, length)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
} else {
memcpy(buf, pkt->payload, length);
}
sockaddr_in* addr = (sockaddr_in*)skt->peer_addr();
socklen_t addrlen = (socklen_t)skt->peer_addrlen();
mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
mhdr->msg_hdr.msg_iov->iov_len = length;
mhdr->msg_len = 0;
rtc_session->rtc_server->sendmmsg(skt->stfd(), mhdr);
return err;
}
srs_error_t SrsRtcSenderThread::send_message2(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt)
{
srs_error_t err = srs_success;
int length = kRtpPacketSize;
// Fetch a cached message from queue.
// TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
mmsghdr* mhdr = rtc_session->rtc_server->fetch();
char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base;
// Length of iov, default size.
int length = kRtpPacketSize;
if (rtc_session->encrypt) {
if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length, pkt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
@ -800,7 +759,7 @@ srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** p
return err;
}
srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsSample* sample, vector<SrsRtpSharedPacket*>& rtp_packets)
srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, vector<SrsRtpPacket2*>& packets)
{
srs_error_t err = srs_success;
@ -809,61 +768,60 @@ srs_error_t SrsRtcSenderThread::packet_fu_a(SrsSharedPtrMessage* shared_frame, S
uint8_t header = sample->bytes[0];
uint8_t nal_type = header & kNalTypeMask;
int num_of_packet = (sample->size - 1 + kRtpMaxPayloadSize) / kRtpMaxPayloadSize;
int num_of_packet = 1 + (sample->size - 1) / kRtpMaxPayloadSize;
for (int i = 0; i < num_of_packet; ++i) {
char buf[kRtpPacketSize];
SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize);
SrsAutoFree(SrsBuffer, stream);
int packet_size = srs_min(nb_left, kRtpMaxPayloadSize);
int packet_size = min(nb_left, kRtpMaxPayloadSize);
SrsRtpPacket2* packet = new SrsRtpPacket2();
packets.push_back(packet);
// fu-indicate
uint8_t fu_indicate = kFuA;
fu_indicate |= (header & (~kNalTypeMask));
stream->write_1bytes(fu_indicate);
packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++);
packet->rtp_header.set_ssrc(video_ssrc);
packet->rtp_header.set_payload_type(video_payload_type);
uint8_t fu_header = nal_type;
if (i == 0)
fu_header |= kStart;
if (i == num_of_packet - 1)
fu_header |= kEnd;
stream->write_1bytes(fu_header);
SrsRtpFUAPayload* fua = new SrsRtpFUAPayload();
packet->payload = fua;
fua->nri = (SrsAvcNaluType)header;
fua->nalu_type = (SrsAvcNaluType)nal_type;
fua->start = bool(i == 0);
fua->end = bool(i == num_of_packet - 1);
SrsSample* sample = new SrsSample();
sample->bytes = p;
sample->size = packet_size;
fua->nalus.push_back(sample);
stream->write_bytes(p, packet_size);
p += packet_size;
nb_left -= packet_size;
srs_verbose("rtp fu-a nalu, size=%u, seq=%u, timestamp=%lu", sample->size, video_sequence, (shared_frame->timestamp * 90));
SrsRtpSharedPacket* packet = new SrsRtpSharedPacket();
if ((err = packet->create((shared_frame->timestamp * 90), video_sequence++, kVideoSSRC, kH264PayloadType, stream->data(), stream->pos())) != srs_success) {
return srs_error_wrap(err, "rtp packet encode");
}
rtp_packets.push_back(packet);
}
return err;
}
// Single NAL Unit Packet @see https://tools.ietf.org/html/rfc6184#section-5.6
srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsSample* sample, vector<SrsRtpSharedPacket*>& rtp_packets)
srs_error_t SrsRtcSenderThread::packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket)
{
srs_error_t err = srs_success;
srs_verbose("rtp single nalu, size=%u, seq=%u, timestamp=%lu", sample->size, video_sequence, (shared_frame->timestamp * 90));
SrsRtpPacket2* packet = new SrsRtpPacket2();
packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++);
packet->rtp_header.set_ssrc(video_ssrc);
packet->rtp_header.set_payload_type(video_payload_type);
SrsRtpSharedPacket* packet = new SrsRtpSharedPacket();
if ((err = packet->create((shared_frame->timestamp * 90), video_sequence++, kVideoSSRC, kH264PayloadType, sample->bytes, sample->size)) != srs_success) {
return srs_error_wrap(err, "rtp packet encode");
}
SrsRtpRawPayload* raw = new SrsRtpRawPayload();
raw->payload = sample->bytes;
raw->nn_payload = sample->size;
packet->payload = raw;
rtp_packets.push_back(packet);
*ppacket = packet;
return err;
}
srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMessage* shared_frame, SrsRtpPacket2** ppacket)
srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket)
{
srs_error_t err = srs_success;
@ -883,12 +841,9 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes
return srs_error_new(ERROR_RTC_RTP_MUXER, "sps/pps empty");
}
srs_verbose("rtp stap-a nalu, size=%u, seq=%u, timestamp=%lu",
(sps.size() + pps.size()), video_sequence, (shared_frame->timestamp * 90));
SrsRtpPacket2* packet = new SrsRtpPacket2();
packet->rtp_header.set_marker(false);
packet->rtp_header.set_timestamp(shared_frame->timestamp * 90);
packet->rtp_header.set_timestamp(msg->timestamp * 90);
packet->rtp_header.set_sequence(video_sequence++);
packet->rtp_header.set_ssrc(video_ssrc);
packet->rtp_header.set_payload_type(video_payload_type);
@ -898,7 +853,7 @@ srs_error_t SrsRtcSenderThread::packet_stap_a(SrsSource* source, SrsSharedPtrMes
uint8_t header = sps[0];
uint8_t nal_type = header & kNalTypeMask;
stap->nri = (SrsAvcNaluType)nal_type;
stap->nri = (SrsAvcNaluType)header;
stap->nn_nalus = 2;
stap->nalus = new SrsSample[stap->nn_nalus];

View file

@ -153,14 +153,13 @@ public:
void update_sendonly_socket(SrsUdpMuxSocket* skt);
private:
srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts);
srs_error_t send_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* skt);
srs_error_t send_message2(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt);
private:
srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket);
private:
srs_error_t packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packets);
srs_error_t packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packets);
srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* shared_frame, SrsRtpPacket2** ppacket);
srs_error_t packet_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, std::vector<SrsRtpPacket2*>& packets);
srs_error_t packet_single_nalu(SrsSharedPtrMessage* msg, SrsSample* sample, SrsRtpPacket2** ppacket);
srs_error_t packet_stap_a(SrsSource* source, SrsSharedPtrMessage* msg, SrsRtpPacket2** ppacket);
};
class SrsRtcSession

View file

@ -35,6 +35,13 @@ using namespace std;
// @see: https://tools.ietf.org/html/rfc6184#section-5.2
const uint8_t kStapA = 24;
// @see: https://tools.ietf.org/html/rfc6184#section-5.2
const uint8_t kFuA = 28;
// @see: https://tools.ietf.org/html/rfc6184#section-5.8
const uint8_t kStart = 0x80; // Fu-header start bit
const uint8_t kEnd = 0x40; // Fu-header end bit
SrsRtpHeader::SrsRtpHeader()
{
padding = false;
@ -252,6 +259,69 @@ srs_error_t SrsRtpSTAPPayload::encode(SrsBuffer* buf)
return srs_success;
}
SrsRtpFUAPayload::SrsRtpFUAPayload()
{
start = end = false;
nri = nalu_type = (SrsAvcNaluType)0;
}
SrsRtpFUAPayload::~SrsRtpFUAPayload()
{
vector<SrsSample*>::iterator it;
for (it = nalus.begin(); it != nalus.end(); ++it) {
SrsSample* p = *it;
srs_freep(p);
}
}
int SrsRtpFUAPayload::nb_bytes()
{
int size = 2;
vector<SrsSample*>::iterator it;
for (it = nalus.begin(); it != nalus.end(); ++it) {
SrsSample* p = *it;
size += p->size;
}
return size;
}
srs_error_t SrsRtpFUAPayload::encode(SrsBuffer* buf)
{
if (!buf->require(2)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 1);
}
// FU indicator, @see https://tools.ietf.org/html/rfc6184#section-5.8
uint8_t fu_indicate = kFuA;
fu_indicate |= (nri & (~kNalTypeMask));
buf->write_1bytes(fu_indicate);
// FU header, @see https://tools.ietf.org/html/rfc6184#section-5.8
uint8_t fu_header = nalu_type;
if (start) {
fu_header |= kStart;
}
if (end) {
fu_header |= kEnd;
}
buf->write_1bytes(fu_header);
// FU payload, @see https://tools.ietf.org/html/rfc6184#section-5.8
vector<SrsSample*>::iterator it;
for (it = nalus.begin(); it != nalus.end(); ++it) {
SrsSample* p = *it;
if (!buf->require(p->size)) {
return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", 2 + p->size);
}
buf->write_bytes(p->bytes, p->size);
}
return srs_success;
}
SrsRtpSharedPacket::SrsRtpSharedPacketPayload::SrsRtpSharedPacketPayload()
{
payload = NULL;

View file

@ -121,6 +121,27 @@ public:
virtual srs_error_t encode(SrsBuffer* buf);
};
class SrsRtpFUAPayload : public ISrsEncoder
{
public:
// The NRI in NALU type.
SrsAvcNaluType nri;
// The FUA header.
bool start;
bool end;
SrsAvcNaluType nalu_type;
// The NALU samples.
// @remark We only refer to the memory, user must free its bytes.
std::vector<SrsSample*> nalus;
public:
SrsRtpFUAPayload();
virtual ~SrsRtpFUAPayload();
// interface ISrsEncoder
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
};
class SrsRtpSharedPacket
{
private: