mirror of
				https://github.com/ossrs/srs.git
				synced 2025-03-09 15:49:59 +00:00 
			
		
		
		
	For #307, zero copy for RTP audio packet
This commit is contained in:
		
							parent
							
								
									bf62244908
								
							
						
					
					
						commit
						4b2404c203
					
				
					 4 changed files with 148 additions and 29 deletions
				
			
		| 
						 | 
				
			
			@ -386,6 +386,28 @@ srs_error_t SrsDtlsSession::protect_rtp(char* out_buf, const char* in_buf, int&
 | 
			
		|||
    return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect failed");
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
srs_error_t SrsDtlsSession::protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2* pkt)
 | 
			
		||||
{
 | 
			
		||||
    srs_error_t err = srs_success;
 | 
			
		||||
 | 
			
		||||
    if (!srtp_send) {
 | 
			
		||||
        return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    SrsBuffer stream(buf, *pnn_buf);
 | 
			
		||||
    if ((err = pkt->encode(&stream)) != srs_success) {
 | 
			
		||||
        return srs_error_wrap(err, "encode packet");
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    *pnn_buf = stream.pos();
 | 
			
		||||
 | 
			
		||||
    if (srtp_protect(srtp_send, buf, pnn_buf) != 0) {
 | 
			
		||||
        return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return err;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
srs_error_t SrsDtlsSession::unprotect_rtp(char* out_buf, const char* in_buf, int& nb_out_buf)
 | 
			
		||||
{
 | 
			
		||||
    srs_error_t err = srs_success;
 | 
			
		||||
| 
						 | 
				
			
			@ -599,18 +621,28 @@ srs_error_t SrsRtcSenderThread::send_messages(
 | 
			
		|||
        SrsSharedPtrMessage* msg = msgs[i];
 | 
			
		||||
        bool is_video = msg->is_video();
 | 
			
		||||
        bool is_audio = msg->is_audio();
 | 
			
		||||
 | 
			
		||||
        // Package opus packets to RTP packets.
 | 
			
		||||
        vector<SrsRtpSharedPacket*> rtp_packets;
 | 
			
		||||
        *pnn += msg->size;
 | 
			
		||||
 | 
			
		||||
        if (is_audio) {
 | 
			
		||||
            for (int i = 0; i < msg->nn_extra_payloads(); i++) {
 | 
			
		||||
                SrsSample* sample = msg->extra_payloads() + i;
 | 
			
		||||
                if ((err = packet_opus(msg, sample, rtp_packets)) != srs_success) {
 | 
			
		||||
 | 
			
		||||
                SrsRtpPacket2* packet = NULL;
 | 
			
		||||
                if ((err = packet_opus(sample, &packet)) != srs_success) {
 | 
			
		||||
                    return srs_error_wrap(err, "opus package");
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                err = send_message2(msg, is_video, is_audio, packet, skt);
 | 
			
		||||
                srs_freep(packet);
 | 
			
		||||
                if (err != srs_success) {
 | 
			
		||||
                    return srs_error_wrap(err, "send message");
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                *pnn_rtp_pkts += 1;
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            vector<SrsRtpSharedPacket*> rtp_packets;
 | 
			
		||||
 | 
			
		||||
            for (int i = 0; i < msg->nn_samples(); i++) {
 | 
			
		||||
                SrsSample* sample = msg->samples() + i;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -645,7 +677,6 @@ srs_error_t SrsRtcSenderThread::send_messages(
 | 
			
		|||
                    return srs_error_wrap(err, "set marker");
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
            int nn_rtp_pkts = (int)rtp_packets.size();
 | 
			
		||||
            for (int j = 0; j < nn_rtp_pkts; j++) {
 | 
			
		||||
| 
						 | 
				
			
			@ -656,9 +687,9 @@ srs_error_t SrsRtcSenderThread::send_messages(
 | 
			
		|||
                srs_freep(pkt);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        *pnn += msg->size;
 | 
			
		||||
            *pnn_rtp_pkts += nn_rtp_pkts;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return err;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -702,20 +733,58 @@ srs_error_t SrsRtcSenderThread::send_message(SrsSharedPtrMessage* msg, bool is_v
 | 
			
		|||
    return err;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
srs_error_t SrsRtcSenderThread::packet_opus(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packets)
 | 
			
		||||
srs_error_t SrsRtcSenderThread::send_message2(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt)
 | 
			
		||||
{
 | 
			
		||||
    srs_error_t err = srs_success;
 | 
			
		||||
 | 
			
		||||
    SrsRtpSharedPacket* packet = new SrsRtpSharedPacket();
 | 
			
		||||
    packet->rtp_header.set_marker(true);
 | 
			
		||||
    if ((err = packet->create(audio_timestamp, audio_sequence++, kAudioSSRC, kOpusPayloadType, sample->bytes, sample->size)) != srs_success) {
 | 
			
		||||
        return srs_error_wrap(err, "rtp packet encode");
 | 
			
		||||
    int length = kRtpPacketSize;
 | 
			
		||||
    // Fetch a cached message from queue.
 | 
			
		||||
    // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready.
 | 
			
		||||
    mmsghdr* mhdr = rtc_session->rtc_server->fetch();
 | 
			
		||||
    char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base;
 | 
			
		||||
 | 
			
		||||
    if (rtc_session->encrypt) {
 | 
			
		||||
        if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length, pkt)) != srs_success) {
 | 
			
		||||
            return srs_error_wrap(err, "srtp protect");
 | 
			
		||||
        }
 | 
			
		||||
    } else {
 | 
			
		||||
        SrsBuffer stream(buf, length);
 | 
			
		||||
        if ((err = pkt->encode(&stream)) != srs_success) {
 | 
			
		||||
            return srs_error_wrap(err, "encode packet");
 | 
			
		||||
        }
 | 
			
		||||
        length = stream.pos();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    sockaddr_in* addr = (sockaddr_in*)skt->peer_addr();
 | 
			
		||||
    socklen_t addrlen = (socklen_t)skt->peer_addrlen();
 | 
			
		||||
 | 
			
		||||
    mhdr->msg_hdr.msg_name = (sockaddr_in*)addr;
 | 
			
		||||
    mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen;
 | 
			
		||||
    mhdr->msg_hdr.msg_iov->iov_len = length;
 | 
			
		||||
    mhdr->msg_len = 0;
 | 
			
		||||
 | 
			
		||||
    rtc_session->rtc_server->sendmmsg(skt->stfd(), mhdr);
 | 
			
		||||
    return err;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket)
 | 
			
		||||
{
 | 
			
		||||
    srs_error_t err = srs_success;
 | 
			
		||||
 | 
			
		||||
    SrsRtpPacket2* packet = new SrsRtpPacket2();
 | 
			
		||||
    packet->rtp_header.set_marker(true);
 | 
			
		||||
    packet->rtp_header.set_timestamp(audio_timestamp);
 | 
			
		||||
    packet->rtp_header.set_sequence(audio_sequence++);
 | 
			
		||||
    packet->rtp_header.set_ssrc(audio_ssrc);
 | 
			
		||||
    packet->rtp_header.set_payload_type(audio_payload_type);
 | 
			
		||||
 | 
			
		||||
    packet->payload = sample->bytes;
 | 
			
		||||
    packet->nn_payload = sample->size;
 | 
			
		||||
 | 
			
		||||
    // TODO: FIXME: Why 960? Need Refactoring?
 | 
			
		||||
    audio_timestamp += 960;
 | 
			
		||||
 | 
			
		||||
    rtp_packets.push_back(packet);
 | 
			
		||||
    *ppacket = packet;
 | 
			
		||||
 | 
			
		||||
    return err;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -49,6 +49,7 @@ class SrsRtcServer;
 | 
			
		|||
class SrsRtcSession;
 | 
			
		||||
class SrsSharedPtrMessage;
 | 
			
		||||
class SrsSource;
 | 
			
		||||
class SrsRtpPacket2;
 | 
			
		||||
 | 
			
		||||
const uint8_t kSR   = 200;
 | 
			
		||||
const uint8_t kRR   = 201;
 | 
			
		||||
| 
						 | 
				
			
			@ -104,6 +105,7 @@ public:
 | 
			
		|||
    srs_error_t on_dtls_application_data(const char* data, const int len);
 | 
			
		||||
public:
 | 
			
		||||
    srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
 | 
			
		||||
    srs_error_t protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2* pkt);
 | 
			
		||||
    srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
 | 
			
		||||
    srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
 | 
			
		||||
    srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
 | 
			
		||||
| 
						 | 
				
			
			@ -152,8 +154,9 @@ public:
 | 
			
		|||
private:
 | 
			
		||||
    srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts);
 | 
			
		||||
    srs_error_t send_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* skt);
 | 
			
		||||
    srs_error_t send_message2(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt);
 | 
			
		||||
private:
 | 
			
		||||
    srs_error_t packet_opus(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packets);
 | 
			
		||||
    srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket);
 | 
			
		||||
private:
 | 
			
		||||
    srs_error_t packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packets);
 | 
			
		||||
    srs_error_t packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector<SrsRtpSharedPacket*>& rtp_packets);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -77,7 +77,7 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* stream)
 | 
			
		|||
{
 | 
			
		||||
    srs_error_t err = srs_success;
 | 
			
		||||
 | 
			
		||||
    // TODO:
 | 
			
		||||
    // TODO: FIXME: Implements it.
 | 
			
		||||
 | 
			
		||||
    return err;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -86,19 +86,21 @@ srs_error_t SrsRtpHeader::encode(SrsBuffer* stream)
 | 
			
		|||
{
 | 
			
		||||
    srs_error_t err = srs_success;
 | 
			
		||||
 | 
			
		||||
    uint8_t first = 0x80 | cc;
 | 
			
		||||
    uint8_t v = 0x80 | cc;
 | 
			
		||||
    if (padding) {
 | 
			
		||||
        first |= 0x40;
 | 
			
		||||
        v |= 0x40;
 | 
			
		||||
    }
 | 
			
		||||
    if (extension) {
 | 
			
		||||
        first |= 0x10;
 | 
			
		||||
        v |= 0x10;
 | 
			
		||||
    }
 | 
			
		||||
    stream->write_1bytes(first);
 | 
			
		||||
    uint8_t second = payload_type;
 | 
			
		||||
    stream->write_1bytes(v);
 | 
			
		||||
 | 
			
		||||
    v = payload_type;
 | 
			
		||||
    if (marker) {
 | 
			
		||||
        payload_type |= kRtpMarker;
 | 
			
		||||
        v |= kRtpMarker;
 | 
			
		||||
    }
 | 
			
		||||
    stream->write_1bytes(second);
 | 
			
		||||
    stream->write_1bytes(v);
 | 
			
		||||
 | 
			
		||||
    stream->write_2bytes(sequence);
 | 
			
		||||
    stream->write_4bytes(timestamp);
 | 
			
		||||
    stream->write_4bytes(ssrc);
 | 
			
		||||
| 
						 | 
				
			
			@ -143,6 +145,37 @@ void SrsRtpHeader::set_ssrc(uint32_t ssrc)
 | 
			
		|||
    this->ssrc = ssrc;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
SrsRtpPacket2::SrsRtpPacket2()
 | 
			
		||||
{
 | 
			
		||||
    payload = NULL;
 | 
			
		||||
    nn_payload = 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
SrsRtpPacket2::~SrsRtpPacket2()
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
srs_error_t SrsRtpPacket2::encode(SrsBuffer* stream)
 | 
			
		||||
{
 | 
			
		||||
    srs_error_t err = srs_success;
 | 
			
		||||
 | 
			
		||||
    if ((err = rtp_header.encode(stream)) != srs_success) {
 | 
			
		||||
        return srs_error_wrap(err, "rtp header");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (nn_payload <= 0) {
 | 
			
		||||
        return 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (!stream->require(nn_payload)) {
 | 
			
		||||
        return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", nn_payload);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    stream->write_bytes(payload, nn_payload);
 | 
			
		||||
 | 
			
		||||
    return err;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
SrsRtpSharedPacket::SrsRtpSharedPacketPayload::SrsRtpSharedPacketPayload()
 | 
			
		||||
{
 | 
			
		||||
    payload = NULL;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -70,6 +70,20 @@ public:
 | 
			
		|||
    uint32_t get_ssrc() const { return ssrc; }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class SrsRtpPacket2
 | 
			
		||||
{
 | 
			
		||||
public:
 | 
			
		||||
    SrsRtpHeader rtp_header;
 | 
			
		||||
    // @remark We only refer to the memory, user must free it.
 | 
			
		||||
    char* payload;
 | 
			
		||||
    int nn_payload;
 | 
			
		||||
public:
 | 
			
		||||
    SrsRtpPacket2();
 | 
			
		||||
    virtual ~SrsRtpPacket2();
 | 
			
		||||
public:
 | 
			
		||||
    virtual srs_error_t encode(SrsBuffer* stream);
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class SrsRtpSharedPacket
 | 
			
		||||
{
 | 
			
		||||
private:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue