diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 9c6c3cf8f..5305771e0 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -205,10 +205,15 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt) } } - if (out_bio_len) { - if ((err = udp_mux_skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) { - return srs_error_wrap(err, "send dtls packet"); - } + if (out_bio_len) { + srs_netfd_t stfd = udp_mux_skt->stfd(); + sockaddr_in* addr = udp_mux_skt->peer_addr(); + socklen_t addrlen = udp_mux_skt->peer_addrlen(); + + char* buf = new char[out_bio_len]; + memcpy(buf, out_bio_data, out_bio_len); + + rtc_session->send_and_free_messages(stfd, addr, addrlen, buf, out_bio_len); } return err; @@ -576,7 +581,10 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int { srs_error_t err = srs_success; - vector mhdrs; + srs_netfd_t stfd = udp_mux_skt->stfd(); + sockaddr_in* addr = udp_mux_skt->peer_addr(); + socklen_t addrlen = udp_mux_skt->peer_addrlen(); + for (int i = 0; i < nb_msgs; i++) { SrsSharedPtrMessage* msg = msgs[i]; @@ -609,24 +617,11 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int memcpy(buf, pkt->payload, length); } - mmsghdr mhdr; - memset(&mhdr, 0, sizeof(mmsghdr)); - mhdr.msg_hdr.msg_name = (sockaddr_in*)udp_mux_skt->peer_addr(); - mhdr.msg_hdr.msg_namelen = udp_mux_skt->peer_addrlen(); - mhdr.msg_hdr.msg_iovlen = 1; - mhdr.msg_hdr.msg_iov = new iovec(); - mhdr.msg_hdr.msg_iov->iov_base = buf; - mhdr.msg_hdr.msg_iov->iov_len = length; - mhdrs.push_back(mhdr); + rtc_session->send_and_free_messages(stfd, addr, addrlen, buf, length); } srs_freep(msg); } - - if ((err = rtc_session->rtc_server->send_and_free_messages(udp_mux_skt->stfd(), mhdrs)) != srs_success) { - srs_warn("sendmsg %d msgs, err %s", mhdrs.size(), srs_error_summary(err).c_str()); - srs_freep(err); - } } SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) @@ -728,9 +723,6 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsS } SrsStunPacket stun_binding_response; - char buf[1460]; - SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); - SrsAutoFree(SrsBuffer, stream); stun_binding_response.set_message_type(BindingResponse); stun_binding_response.set_local_ufrag(stun_req->get_remote_ufrag()); @@ -740,13 +732,18 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsS stun_binding_response.set_mapped_address(be32toh(inet_addr(udp_mux_skt->get_peer_ip().c_str()))); stun_binding_response.set_mapped_port(udp_mux_skt->get_peer_port()); + char* buf = new char[1460]; + SrsBuffer* stream = new SrsBuffer(buf, 1460); + SrsAutoFree(SrsBuffer, stream); + if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) { return srs_error_wrap(err, "stun binding response encode failed"); } - if ((err = udp_mux_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { - return srs_error_wrap(err, "stun binding response send failed"); - } + srs_netfd_t stfd = udp_mux_skt->stfd(); + sockaddr_in* addr = udp_mux_skt->peer_addr(); + socklen_t addrlen = udp_mux_skt->peer_addrlen(); + send_and_free_messages(stfd, addr, addrlen, buf, stream->pos()); if (get_session_state() == WAITING_STUN) { set_session_state(DOING_DTLS_HANDSHAKE); @@ -837,13 +834,17 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock for (int i = 0; i < (int)resend_pkts.size(); ++i) { if (dtls_session) { - char protected_buf[kRtpPacketSize]; + char* protected_buf = new char[kRtpPacketSize]; int nb_protected_buf = resend_pkts[i]->size; srs_verbose("resend pkt sequence=%u", resend_pkts[i]->rtp_header.get_sequence()); dtls_session->protect_rtp(protected_buf, resend_pkts[i]->payload, nb_protected_buf); - udp_mux_skt->sendto(protected_buf, nb_protected_buf, 0); + + srs_netfd_t stfd = udp_mux_skt->stfd(); + sockaddr_in* addr = udp_mux_skt->peer_addr(); + socklen_t addrlen = udp_mux_skt->peer_addrlen(); + send_and_free_messages(stfd, addr, addrlen, protected_buf, nb_protected_buf); } } @@ -1085,6 +1086,11 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) return err; } +void SrsRtcSession::send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length) +{ + rtc_server->send_and_free_messages(stfd, addr, addrlen, buf, length); +} + SrsRtcServer::SrsRtcServer() { listener = NULL; @@ -1344,19 +1350,25 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic return srs_success; } -srs_error_t SrsRtcServer::send_and_free_messages(srs_netfd_t stfd, const vector& msgs) +void SrsRtcServer::send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length) { - srs_error_t err = srs_success; - mmstfd = stfd; - mmhdrs.insert(mmhdrs.end(), msgs.begin(), msgs.end()); + + mmsghdr mhdr; + memset(&mhdr, 0, sizeof(mhdr)); + + mhdr.msg_hdr.msg_name = addr; + mhdr.msg_hdr.msg_namelen = addrlen; + mhdr.msg_hdr.msg_iovlen = 1; + mhdr.msg_hdr.msg_iov = new iovec(); + mhdr.msg_hdr.msg_iov->iov_base = buf; + mhdr.msg_hdr.msg_iov->iov_len = length; + mmhdrs.push_back(mhdr); if (waiting_msgs) { waiting_msgs = false; srs_cond_signal(cond); } - - return err; } void SrsRtcServer::free_messages(vector& hdrs) diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index e1bb21fff..08be09929 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -211,6 +211,9 @@ private: srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); +// Internal only. +public: + void send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length); }; class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass, virtual public ISrsCoroutineHandler @@ -255,7 +258,7 @@ public: virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); // Internal only. public: - srs_error_t send_and_free_messages(srs_netfd_t stfd, const std::vector& msgs); + void send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length); void free_messages(std::vector& hdrs); virtual srs_error_t cycle(); };