diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 9c6c3cf8f..e8bf756ed 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -574,59 +574,61 @@ void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* ukt) void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt) { - srs_error_t err = srs_success; + if (!rtc_session->dtls_session) { + return; + } - vector mhdrs; for (int i = 0; i < nb_msgs; i++) { SrsSharedPtrMessage* msg = msgs[i]; + bool is_video = msg->is_video(); + bool is_audio = msg->is_audio(); - for (int i = 0; i < (int)msg->rtp_packets.size(); ++i) { - if (!rtc_session->dtls_session) { - continue; - } - - SrsRtpSharedPacket* pkt = msg->rtp_packets[i]; - - if (msg->is_video()) { - pkt->modify_rtp_header_payload_type(video_payload_type); - pkt->modify_rtp_header_ssrc(video_ssrc); - srs_verbose("send video, ssrc=%u, seq=%u, timestamp=%u", video_ssrc, pkt->rtp_header.get_sequence(), pkt->rtp_header.get_timestamp()); - } - - if (msg->is_audio()) { - pkt->modify_rtp_header_payload_type(audio_payload_type); - pkt->modify_rtp_header_ssrc(audio_ssrc); - } - - int length = pkt->size; - char* buf = new char[kRtpPacketSize]; - if (rtc_session->encrypt) { - if ((err = rtc_session->dtls_session->protect_rtp(buf, pkt->payload, length)) != srs_success) { - srs_warn("srtp err %s", srs_error_desc(err).c_str()); srs_freep(err); srs_freepa(buf); - continue; - } - } else { - memcpy(buf, pkt->payload, length); - } - - mmsghdr mhdr; - memset(&mhdr, 0, sizeof(mmsghdr)); - mhdr.msg_hdr.msg_name = (sockaddr_in*)udp_mux_skt->peer_addr(); - mhdr.msg_hdr.msg_namelen = udp_mux_skt->peer_addrlen(); - mhdr.msg_hdr.msg_iovlen = 1; - mhdr.msg_hdr.msg_iov = new iovec(); - mhdr.msg_hdr.msg_iov->iov_base = buf; - mhdr.msg_hdr.msg_iov->iov_len = length; - mhdrs.push_back(mhdr); + for (vector::iterator it = msg->rtp_packets.begin(); it != msg->rtp_packets.end(); ++it) { + SrsRtpSharedPacket* pkt = *it; + send_and_free_message(msg, is_video, is_audio, pkt, udp_mux_skt); } srs_freep(msg); } +} - if ((err = rtc_session->rtc_server->send_and_free_messages(udp_mux_skt->stfd(), mhdrs)) != srs_success) { - srs_warn("sendmsg %d msgs, err %s", mhdrs.size(), srs_error_summary(err).c_str()); - srs_freep(err); +void SrsRtcSenderThread::send_and_free_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + if (is_video) { + pkt->modify_rtp_header_payload_type(video_payload_type); + pkt->modify_rtp_header_ssrc(video_ssrc); + srs_verbose("send video, ssrc=%u, seq=%u, timestamp=%u", video_ssrc, pkt->rtp_header.get_sequence(), pkt->rtp_header.get_timestamp()); + } else if (is_audio) { + pkt->modify_rtp_header_payload_type(audio_payload_type); + pkt->modify_rtp_header_ssrc(audio_ssrc); } + + int length = pkt->size; + // Fetch a cached message from queue. + // TODO: FIXME: Maybe encrypt in async, so the state of mhdr maybe not ready. + mmsghdr* mhdr = rtc_session->rtc_server->fetch(); + char* buf = (char*)mhdr->msg_hdr.msg_iov->iov_base; + + if (rtc_session->encrypt) { + if ((err = rtc_session->dtls_session->protect_rtp(buf, pkt->payload, length)) != srs_success) { + srs_warn("srtp err %s", srs_error_desc(err).c_str()); srs_freep(err); srs_freepa(buf); + return; + } + } else { + memcpy(buf, pkt->payload, length); + } + + sockaddr_in* addr = (sockaddr_in*)udp_mux_skt->peer_addr(); + socklen_t addrlen = (socklen_t)udp_mux_skt->peer_addrlen(); + + mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; + mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; + mhdr->msg_hdr.msg_iov->iov_len = length; + mhdr->msg_len = 0; + + rtc_session->rtc_server->sendmmsg(udp_mux_skt->stfd(), mhdr); } SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) @@ -728,7 +730,7 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsS } SrsStunPacket stun_binding_response; - char buf[1460]; + char buf[kRtpPacketSize]; SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); SrsAutoFree(SrsBuffer, stream); @@ -1023,7 +1025,7 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done"); } - char unprotected_buf[1460]; + char unprotected_buf[kRtpPacketSize]; int nb_unprotected_buf = udp_mux_skt->size(); if ((err = dtls_session->unprotect_rtcp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf)) != srs_success) { return srs_error_wrap(err, "rtcp unprotect failed"); @@ -1094,6 +1096,8 @@ SrsRtcServer::SrsRtcServer() waiting_msgs = false; cond = srs_cond_new(); trd = new SrsDummyCoroutine(); + + cache_pos = 0; } SrsRtcServer::~SrsRtcServer() @@ -1104,8 +1108,11 @@ SrsRtcServer::~SrsRtcServer() srs_freep(trd); srs_cond_destroy(cond); - free_messages(mmhdrs); - mmhdrs.clear(); + free_mhdrs(hotspot); + hotspot.clear(); + + free_mhdrs(cache); + cache.clear(); } srs_error_t SrsRtcServer::initialize() @@ -1344,27 +1351,42 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic return srs_success; } -srs_error_t SrsRtcServer::send_and_free_messages(srs_netfd_t stfd, const vector& msgs) +mmsghdr* SrsRtcServer::fetch() { - srs_error_t err = srs_success; + // TODO: FIXME: Maybe need to shrink? + if (cache_pos >= (int)cache.size()) { + mmsghdr mhdr; + memset(&mhdr, 0, sizeof(mmsghdr)); + mhdr.msg_hdr.msg_iovlen = 1; + mhdr.msg_hdr.msg_iov = new iovec(); + mhdr.msg_hdr.msg_iov->iov_base = new char[kRtpPacketSize]; + mhdr.msg_hdr.msg_iov->iov_len = kRtpPacketSize; + mhdr.msg_len = 0; + + cache.push_back(mhdr); + } + + return &cache[cache_pos++]; +} + +void SrsRtcServer::sendmmsg(srs_netfd_t stfd, mmsghdr* /*hdr*/) +{ mmstfd = stfd; - mmhdrs.insert(mmhdrs.end(), msgs.begin(), msgs.end()); if (waiting_msgs) { waiting_msgs = false; srs_cond_signal(cond); } - - return err; } -void SrsRtcServer::free_messages(vector& hdrs) +void SrsRtcServer::free_mhdrs(std::vector& mhdrs) { - for (int i = 0; i < (int)hdrs.size(); i++) { - msghdr* hdr = &hdrs[i].msg_hdr; - for (int j = (int)hdr->msg_iovlen - 1; j >= 0 ; j--) { - iovec* iov = hdr->msg_iov + j; + for (int i = 0; i < (int)mhdrs.size(); i++) { + mmsghdr* hdr = &mhdrs[i]; + + for (int j = (int)hdr->msg_hdr.msg_iovlen - 1; j >= 0 ; j--) { + iovec* iov = hdr->msg_hdr.msg_iov + j; char* data = (char*)iov->iov_base; srs_freep(data); srs_freep(iov); @@ -1389,17 +1411,19 @@ srs_error_t SrsRtcServer::cycle() return err; } - // TODO: FIXME: Use cond trigger. - if (mmhdrs.empty()) { + int pos = cache_pos; + if (pos <= 0) { waiting_msgs = true; srs_cond_wait(cond); + continue; } - vector mhdrs; - mmhdrs.swap(mhdrs); + // We are working on hotspot now. + cache.swap(hotspot); + cache_pos = 0; - mmsghdr* p = &mhdrs[0]; - for (mmsghdr* end = p + mhdrs.size(); p < end; p += max_sendmmsg) { + mmsghdr* p = &hotspot[0]; + for (mmsghdr* end = p + pos; p < end; p += max_sendmmsg) { int vlen = (int)(end - p); vlen = srs_min(max_sendmmsg, vlen); @@ -1415,10 +1439,8 @@ srs_error_t SrsRtcServer::cycle() if ((cnt++ % 100) == 0) { // TODO: FIXME: Support reload. max_sendmmsg = _srs_config->get_rtc_server_sendmmsg(); - srs_trace("-> RTC SEND %d msgs, by sendmmsg %d", mhdrs.size(), max_sendmmsg); + srs_trace("-> RTC SEND %d msgs, by sendmmsg %d", pos, max_sendmmsg); } - - free_messages(mhdrs); } return err; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index e1bb21fff..9ccc297dc 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -144,6 +144,7 @@ public: void update_sendonly_socket(SrsUdpMuxSocket* ukt); private: void send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt); + void send_and_free_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* udp_mux_skt); }; class SrsRtcSession @@ -224,7 +225,12 @@ private: bool waiting_msgs; // TODO: FIXME: Support multiple stfd. srs_netfd_t mmstfd; - std::vector mmhdrs; + // Hotspot msgs, we are working on it. + // @remark We will wait util all messages are ready. + std::vector hotspot; + // Cache msgs, for other coroutines to fill it. + std::vector cache; + int cache_pos; private: std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) std::map map_id_session; // key: peerip(ip + ":" + port) @@ -255,8 +261,9 @@ public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); // Internal only. public: - srs_error_t send_and_free_messages(srs_netfd_t stfd, const std::vector& msgs); - void free_messages(std::vector& hdrs); + mmsghdr* fetch(); + void sendmmsg(srs_netfd_t stfd, mmsghdr* hdr); + void free_mhdrs(std::vector& mhdrs); virtual srs_error_t cycle(); };