1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

For #307, use RTC server to sendmmsg

This commit is contained in:
winlin 2020-04-09 09:56:13 +08:00
parent cf48a5594d
commit 1e83749485
2 changed files with 49 additions and 34 deletions

View file

@ -205,10 +205,15 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt)
}
}
if (out_bio_len) {
if ((err = udp_mux_skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) {
return srs_error_wrap(err, "send dtls packet");
}
if (out_bio_len) {
srs_netfd_t stfd = udp_mux_skt->stfd();
sockaddr_in* addr = udp_mux_skt->peer_addr();
socklen_t addrlen = udp_mux_skt->peer_addrlen();
char* buf = new char[out_bio_len];
memcpy(buf, out_bio_data, out_bio_len);
rtc_session->send_and_free_messages(stfd, addr, addrlen, buf, out_bio_len);
}
return err;
@ -576,7 +581,10 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int
{
srs_error_t err = srs_success;
vector<mmsghdr> mhdrs;
srs_netfd_t stfd = udp_mux_skt->stfd();
sockaddr_in* addr = udp_mux_skt->peer_addr();
socklen_t addrlen = udp_mux_skt->peer_addrlen();
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
@ -609,24 +617,11 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int
memcpy(buf, pkt->payload, length);
}
mmsghdr mhdr;
memset(&mhdr, 0, sizeof(mmsghdr));
mhdr.msg_hdr.msg_name = (sockaddr_in*)udp_mux_skt->peer_addr();
mhdr.msg_hdr.msg_namelen = udp_mux_skt->peer_addrlen();
mhdr.msg_hdr.msg_iovlen = 1;
mhdr.msg_hdr.msg_iov = new iovec();
mhdr.msg_hdr.msg_iov->iov_base = buf;
mhdr.msg_hdr.msg_iov->iov_len = length;
mhdrs.push_back(mhdr);
rtc_session->send_and_free_messages(stfd, addr, addrlen, buf, length);
}
srs_freep(msg);
}
if ((err = rtc_session->rtc_server->send_and_free_messages(udp_mux_skt->stfd(), mhdrs)) != srs_success) {
srs_warn("sendmsg %d msgs, err %s", mhdrs.size(), srs_error_summary(err).c_str());
srs_freep(err);
}
}
SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id)
@ -728,9 +723,6 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsS
}
SrsStunPacket stun_binding_response;
char buf[1460];
SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf));
SrsAutoFree(SrsBuffer, stream);
stun_binding_response.set_message_type(BindingResponse);
stun_binding_response.set_local_ufrag(stun_req->get_remote_ufrag());
@ -740,13 +732,18 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsS
stun_binding_response.set_mapped_address(be32toh(inet_addr(udp_mux_skt->get_peer_ip().c_str())));
stun_binding_response.set_mapped_port(udp_mux_skt->get_peer_port());
char* buf = new char[1460];
SrsBuffer* stream = new SrsBuffer(buf, 1460);
SrsAutoFree(SrsBuffer, stream);
if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) {
return srs_error_wrap(err, "stun binding response encode failed");
}
if ((err = udp_mux_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) {
return srs_error_wrap(err, "stun binding response send failed");
}
srs_netfd_t stfd = udp_mux_skt->stfd();
sockaddr_in* addr = udp_mux_skt->peer_addr();
socklen_t addrlen = udp_mux_skt->peer_addrlen();
send_and_free_messages(stfd, addr, addrlen, buf, stream->pos());
if (get_session_state() == WAITING_STUN) {
set_session_state(DOING_DTLS_HANDSHAKE);
@ -837,13 +834,17 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock
for (int i = 0; i < (int)resend_pkts.size(); ++i) {
if (dtls_session) {
char protected_buf[kRtpPacketSize];
char* protected_buf = new char[kRtpPacketSize];
int nb_protected_buf = resend_pkts[i]->size;
srs_verbose("resend pkt sequence=%u", resend_pkts[i]->rtp_header.get_sequence());
dtls_session->protect_rtp(protected_buf, resend_pkts[i]->payload, nb_protected_buf);
udp_mux_skt->sendto(protected_buf, nb_protected_buf, 0);
srs_netfd_t stfd = udp_mux_skt->stfd();
sockaddr_in* addr = udp_mux_skt->peer_addr();
socklen_t addrlen = udp_mux_skt->peer_addrlen();
send_and_free_messages(stfd, addr, addrlen, protected_buf, nb_protected_buf);
}
}
@ -1085,6 +1086,11 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt)
return err;
}
void SrsRtcSession::send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length)
{
rtc_server->send_and_free_messages(stfd, addr, addrlen, buf, length);
}
SrsRtcServer::SrsRtcServer()
{
listener = NULL;
@ -1344,19 +1350,25 @@ srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tic
return srs_success;
}
srs_error_t SrsRtcServer::send_and_free_messages(srs_netfd_t stfd, const vector<mmsghdr>& msgs)
void SrsRtcServer::send_and_free_messages(srs_netfd_t stfd, sockaddr_in* addr, socklen_t addrlen, char* buf, int length)
{
srs_error_t err = srs_success;
mmstfd = stfd;
mmhdrs.insert(mmhdrs.end(), msgs.begin(), msgs.end());
mmsghdr mhdr;
memset(&mhdr, 0, sizeof(mhdr));
mhdr.msg_hdr.msg_name = addr;
mhdr.msg_hdr.msg_namelen = addrlen;
mhdr.msg_hdr.msg_iovlen = 1;
mhdr.msg_hdr.msg_iov = new iovec();
mhdr.msg_hdr.msg_iov->iov_base = buf;
mhdr.msg_hdr.msg_iov->iov_len = length;
mmhdrs.push_back(mhdr);
if (waiting_msgs) {
waiting_msgs = false;
srs_cond_signal(cond);
}
return err;
}
void SrsRtcServer::free_messages(vector<mmsghdr>& hdrs)