diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index d00ca49ab..20a1e02f3 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -386,6 +386,28 @@ srs_error_t SrsDtlsSession::protect_rtp(char* out_buf, const char* in_buf, int& return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect failed"); } +srs_error_t SrsDtlsSession::protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + if (!srtp_send) { + return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect"); + } + + SrsBuffer stream(buf, *pnn_buf); + if ((err = pkt->encode(&stream)) != srs_success) { + return srs_error_wrap(err, "encode packet"); + } + + *pnn_buf = stream.pos(); + + if (srtp_protect(srtp_send, buf, pnn_buf) != 0) { + return srs_error_new(ERROR_RTC_SRTP_PROTECT, "rtp protect"); + } + + return err; +} + srs_error_t SrsDtlsSession::unprotect_rtp(char* out_buf, const char* in_buf, int& nb_out_buf) { srs_error_t err = srs_success; @@ -599,18 +621,28 @@ srs_error_t SrsRtcSenderThread::send_messages( SrsSharedPtrMessage* msg = msgs[i]; bool is_video = msg->is_video(); bool is_audio = msg->is_audio(); - - // Package opus packets to RTP packets. - vector rtp_packets; + *pnn += msg->size; if (is_audio) { for (int i = 0; i < msg->nn_extra_payloads(); i++) { SrsSample* sample = msg->extra_payloads() + i; - if ((err = packet_opus(msg, sample, rtp_packets)) != srs_success) { + + SrsRtpPacket2* packet = NULL; + if ((err = packet_opus(sample, &packet)) != srs_success) { return srs_error_wrap(err, "opus package"); } + + err = send_message2(msg, is_video, is_audio, packet, skt); + srs_freep(packet); + if (err != srs_success) { + return srs_error_wrap(err, "send message"); + } + + *pnn_rtp_pkts += 1; } } else { + vector rtp_packets; + for (int i = 0; i < msg->nn_samples(); i++) { SrsSample* sample = msg->samples() + i; @@ -645,19 +677,18 @@ srs_error_t SrsRtcSenderThread::send_messages( return srs_error_wrap(err, "set marker"); } } - } - int nn_rtp_pkts = (int)rtp_packets.size(); - for (int j = 0; j < nn_rtp_pkts; j++) { - SrsRtpSharedPacket* pkt = rtp_packets[j]; - if ((err = send_message(msg, is_video, is_audio, pkt, skt)) != srs_success) { - return srs_error_wrap(err, "send message"); + int nn_rtp_pkts = (int)rtp_packets.size(); + for (int j = 0; j < nn_rtp_pkts; j++) { + SrsRtpSharedPacket* pkt = rtp_packets[j]; + if ((err = send_message(msg, is_video, is_audio, pkt, skt)) != srs_success) { + return srs_error_wrap(err, "send message"); + } + srs_freep(pkt); } - srs_freep(pkt); - } - *pnn += msg->size; - *pnn_rtp_pkts += nn_rtp_pkts; + *pnn_rtp_pkts += nn_rtp_pkts; + } } return err; @@ -702,20 +733,58 @@ srs_error_t SrsRtcSenderThread::send_message(SrsSharedPtrMessage* msg, bool is_v return err; } -srs_error_t SrsRtcSenderThread::packet_opus(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector& rtp_packets) +srs_error_t SrsRtcSenderThread::send_message2(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; - SrsRtpSharedPacket* packet = new SrsRtpSharedPacket(); - packet->rtp_header.set_marker(true); - if ((err = packet->create(audio_timestamp, audio_sequence++, kAudioSSRC, kOpusPayloadType, sample->bytes, sample->size)) != srs_success) { - return srs_error_wrap(err, "rtp packet encode"); + int length = kRtpPacketSize; + // Fetch a cached message from queue. + // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. + mmsghdr* mhdr = rtc_session->rtc_server->fetch(); + char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; + + if (rtc_session->encrypt) { + if ((err = rtc_session->dtls_session->protect_rtp2(buf, &length, pkt)) != srs_success) { + return srs_error_wrap(err, "srtp protect"); + } + } else { + SrsBuffer stream(buf, length); + if ((err = pkt->encode(&stream)) != srs_success) { + return srs_error_wrap(err, "encode packet"); + } + length = stream.pos(); } + sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); + socklen_t addrlen = (socklen_t)skt->peer_addrlen(); + + mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; + mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; + mhdr->msg_hdr.msg_iov->iov_len = length; + mhdr->msg_len = 0; + + rtc_session->rtc_server->sendmmsg(skt->stfd(), mhdr); + return err; +} + +srs_error_t SrsRtcSenderThread::packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket) +{ + srs_error_t err = srs_success; + + SrsRtpPacket2* packet = new SrsRtpPacket2(); + packet->rtp_header.set_marker(true); + packet->rtp_header.set_timestamp(audio_timestamp); + packet->rtp_header.set_sequence(audio_sequence++); + packet->rtp_header.set_ssrc(audio_ssrc); + packet->rtp_header.set_payload_type(audio_payload_type); + + packet->payload = sample->bytes; + packet->nn_payload = sample->size; + // TODO: FIXME: Why 960? Need Refactoring? audio_timestamp += 960; - rtp_packets.push_back(packet); + *ppacket = packet; return err; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index d9fd03cfe..b2729e843 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -49,6 +49,7 @@ class SrsRtcServer; class SrsRtcSession; class SrsSharedPtrMessage; class SrsSource; +class SrsRtpPacket2; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -104,6 +105,7 @@ public: srs_error_t on_dtls_application_data(const char* data, const int len); public: srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); + srs_error_t protect_rtp2(char* buf, int* pnn_buf, SrsRtpPacket2* pkt); srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); @@ -152,8 +154,9 @@ public: private: srs_error_t send_messages(SrsSource* source, SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts); srs_error_t send_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* skt); + srs_error_t send_message2(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpPacket2* pkt, SrsUdpMuxSocket* skt); private: - srs_error_t packet_opus(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector& rtp_packets); + srs_error_t packet_opus(SrsSample* sample, SrsRtpPacket2** ppacket); private: srs_error_t packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector& rtp_packets); srs_error_t packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsSample* sample, std::vector& rtp_packets); diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp index 59ad7ca93..5cef33e9c 100644 --- a/trunk/src/kernel/srs_kernel_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -77,7 +77,7 @@ srs_error_t SrsRtpHeader::decode(SrsBuffer* stream) { srs_error_t err = srs_success; - // TODO: + // TODO: FIXME: Implements it. return err; } @@ -86,19 +86,21 @@ srs_error_t SrsRtpHeader::encode(SrsBuffer* stream) { srs_error_t err = srs_success; - uint8_t first = 0x80 | cc; + uint8_t v = 0x80 | cc; if (padding) { - first |= 0x40; + v |= 0x40; } if (extension) { - first |= 0x10; + v |= 0x10; } - stream->write_1bytes(first); - uint8_t second = payload_type; + stream->write_1bytes(v); + + v = payload_type; if (marker) { - payload_type |= kRtpMarker; + v |= kRtpMarker; } - stream->write_1bytes(second); + stream->write_1bytes(v); + stream->write_2bytes(sequence); stream->write_4bytes(timestamp); stream->write_4bytes(ssrc); @@ -143,6 +145,37 @@ void SrsRtpHeader::set_ssrc(uint32_t ssrc) this->ssrc = ssrc; } +SrsRtpPacket2::SrsRtpPacket2() +{ + payload = NULL; + nn_payload = 0; +} + +SrsRtpPacket2::~SrsRtpPacket2() +{ +} + +srs_error_t SrsRtpPacket2::encode(SrsBuffer* stream) +{ + srs_error_t err = srs_success; + + if ((err = rtp_header.encode(stream)) != srs_success) { + return srs_error_wrap(err, "rtp header"); + } + + if (nn_payload <= 0) { + return 0; + } + + if (!stream->require(nn_payload)) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "requires %d bytes", nn_payload); + } + + stream->write_bytes(payload, nn_payload); + + return err; +} + SrsRtpSharedPacket::SrsRtpSharedPacketPayload::SrsRtpSharedPacketPayload() { payload = NULL; diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp index 396cb2687..368dc5b0f 100644 --- a/trunk/src/kernel/srs_kernel_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -70,6 +70,20 @@ public: uint32_t get_ssrc() const { return ssrc; } }; +class SrsRtpPacket2 +{ +public: + SrsRtpHeader rtp_header; + // @remark We only refer to the memory, user must free it. + char* payload; + int nn_payload; +public: + SrsRtpPacket2(); + virtual ~SrsRtpPacket2(); +public: + virtual srs_error_t encode(SrsBuffer* stream); +}; + class SrsRtpSharedPacket { private: