diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index a88bf1af3..9f93b2cf8 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -428,9 +428,9 @@ srs_error_t SrsUdpMuxListener::cycle() return srs_error_wrap(err, "udp listener"); } - SrsUdpMuxSocket udp_mux_skt(lfd); + SrsUdpMuxSocket skt(lfd); - int nread = udp_mux_skt.recvfrom(SRS_UTIME_NO_TIMEOUT); + int nread = skt.recvfrom(SRS_UTIME_NO_TIMEOUT); if (nread <= 0) { if (nread < 0) { srs_warn("udp recv error"); @@ -439,7 +439,7 @@ srs_error_t SrsUdpMuxListener::cycle() continue; } - if ((err = handler->on_udp_packet(&udp_mux_skt)) != srs_success) { + if ((err = handler->on_udp_packet(&skt)) != srs_success) { // remux udp never return srs_warn("udp packet handler error:%s", srs_error_desc(err).c_str()); continue; diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 0222413c1..78db998bd 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -69,7 +69,7 @@ public: virtual ~ISrsUdpMuxHandler(); public: virtual srs_error_t on_stfd_change(srs_netfd_t fd); - virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) = 0; + virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt) = 0; }; // The tcp connection handler. diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index bdcc8c705..da3a4fc2f 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -174,7 +174,7 @@ srs_error_t SrsDtlsSession::initialize(const SrsRequest& req) return err; } -srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; @@ -186,7 +186,7 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt) int ssl_err = SSL_get_error(dtls, ret); switch(ssl_err) { case SSL_ERROR_NONE: { - if ((err = on_dtls_handshake_done(udp_mux_skt)) != srs_success) { + if ((err = on_dtls_handshake_done(skt)) != srs_success) { return srs_error_wrap(err, "dtls handshake done handle"); } break; @@ -206,7 +206,7 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt) } if (out_bio_len) { - if ((err = udp_mux_skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) { + if ((err = skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) { return srs_error_wrap(err, "send dtls packet"); } } @@ -214,7 +214,7 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt) return err; } -srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; if (BIO_reset(bio_in) != 1) { @@ -224,13 +224,13 @@ srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt) return srs_error_new(ERROR_OpenSslBIOReset, "BIO_reset"); } - if (BIO_write(bio_in, udp_mux_skt->data(), udp_mux_skt->size()) <= 0) { + if (BIO_write(bio_in, skt->data(), skt->size()) <= 0) { // TODO: 0 or -1 maybe block, use BIO_should_retry to check. return srs_error_new(ERROR_OpenSslBIOWrite, "BIO_write"); } if (! handshake_done) { - err = handshake(udp_mux_skt); + err = handshake(skt); } else { while (BIO_ctrl_pending(bio_in) > 0) { char dtls_read_buf[8092]; @@ -247,7 +247,7 @@ srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt) return err; } -srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; srs_trace("dtls handshake done"); @@ -257,7 +257,7 @@ srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt) return srs_error_wrap(err, "srtp init failed"); } - return rtc_session->on_connection_established(udp_mux_skt); + return rtc_session->on_connection_established(skt); } srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int nb_buf) @@ -529,13 +529,6 @@ srs_error_t SrsRtcSenderThread::cycle() return srs_error_wrap(err, "rtc sender thread"); } - pprint->elapse(); - - if (pprint->can_print()) { - // TODO: FIXME: - // Print stat like frame/s, packet/s, loss_packets. - } - #ifdef SRS_PERF_QUEUE_COND_WAIT if (realtime) { // for realtime, min required msgs is 0, send when got one+ msgs. @@ -559,20 +552,28 @@ srs_error_t SrsRtcSenderThread::cycle() continue; } - send_and_free_messages(msgs.msgs, msg_count, sendonly_ukt); + int nn = 0; + int nn_rtp_pkts = 0; + send_and_free_messages(msgs.msgs, msg_count, sendonly_ukt, &nn, &nn_rtp_pkts); + + pprint->elapse(); + if (pprint->can_print()) { + // TODO: FIXME: Print stat like frame/s, packet/s, loss_packets. + srs_trace("-> RTC PLAY %d msgs, %d packets, %d bytes", msg_count, nn_rtp_pkts, nn); + } } } -void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* ukt) +void SrsRtcSenderThread::update_sendonly_socket(SrsUdpMuxSocket* skt) { srs_trace("session %s address changed, update %s -> %s", - rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), ukt->get_peer_id().c_str()); + rtc_session->id().c_str(), sendonly_ukt->get_peer_id().c_str(), skt->get_peer_id().c_str()); srs_freep(sendonly_ukt); - sendonly_ukt = ukt->copy_sendonly(); + sendonly_ukt = skt->copy_sendonly(); } -void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt) +void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts) { if (!rtc_session->dtls_session) { return; @@ -583,16 +584,20 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int bool is_video = msg->is_video(); bool is_audio = msg->is_audio(); - for (vector::iterator it = msg->rtp_packets.begin(); it != msg->rtp_packets.end(); ++it) { - SrsRtpSharedPacket* pkt = *it; - send_and_free_message(msg, is_video, is_audio, pkt, udp_mux_skt); + int nn_rtp_pkts = (int)msg->rtp_packets.size(); + for (int j = 0; j < nn_rtp_pkts; j++) { + SrsRtpSharedPacket* pkt = msg->rtp_packets[j]; + send_and_free_message(msg, is_video, is_audio, pkt, skt); } + *pnn += msg->size; + *pnn_rtp_pkts += nn_rtp_pkts; + srs_freep(msg); } } -void SrsRtcSenderThread::send_and_free_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* udp_mux_skt) +void SrsRtcSenderThread::send_and_free_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; @@ -620,15 +625,15 @@ void SrsRtcSenderThread::send_and_free_message(SrsSharedPtrMessage* msg, bool is memcpy(buf, pkt->payload, length); } - sockaddr_in* addr = (sockaddr_in*)udp_mux_skt->peer_addr(); - socklen_t addrlen = (socklen_t)udp_mux_skt->peer_addrlen(); + sockaddr_in* addr = (sockaddr_in*)skt->peer_addr(); + socklen_t addrlen = (socklen_t)skt->peer_addrlen(); mhdr->msg_hdr.msg_name = (sockaddr_in*)addr; mhdr->msg_hdr.msg_namelen = (socklen_t)addrlen; mhdr->msg_hdr.msg_iov->iov_len = length; mhdr->msg_len = 0; - rtc_session->rtc_server->sendmmsg(udp_mux_skt->stfd(), mhdr); + rtc_session->rtc_server->sendmmsg(skt->stfd(), mhdr); } SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) @@ -673,12 +678,12 @@ void SrsRtcSession::switch_to_context() _srs_context->set_id(cid); } -srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req) +srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req) { srs_error_t err = srs_success; if (stun_req->is_binding_request()) { - if ((err = on_binding_request(udp_mux_skt, stun_req)) != srs_success) { + if ((err = on_binding_request(skt, stun_req)) != srs_success) { return srs_error_wrap(err, "stun binding request failed"); } @@ -687,8 +692,8 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* if (strd && strd->sendonly_ukt) { // We are running in the ice-lite(server) mode. If client have multi network interface, // we only choose one candidate pair which is determined by client. - if (stun_req->get_use_candidate() && strd->sendonly_ukt->get_peer_id() != udp_mux_skt->get_peer_id()) { - strd->update_sendonly_socket(udp_mux_skt); + if (stun_req->get_use_candidate() && strd->sendonly_ukt->get_peer_id() != skt->get_peer_id()) { + strd->update_sendonly_socket(skt); } } } @@ -718,7 +723,7 @@ srs_error_t SrsRtcSession::check_source() #define be32toh ntohl #endif -srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req) +srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req) { srs_error_t err = srs_success; @@ -739,28 +744,28 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsS stun_binding_response.set_remote_ufrag(stun_req->get_local_ufrag()); stun_binding_response.set_transcation_id(stun_req->get_transcation_id()); // FIXME: inet_addr is deprecated, IPV6 support - stun_binding_response.set_mapped_address(be32toh(inet_addr(udp_mux_skt->get_peer_ip().c_str()))); - stun_binding_response.set_mapped_port(udp_mux_skt->get_peer_port()); + stun_binding_response.set_mapped_address(be32toh(inet_addr(skt->get_peer_ip().c_str()))); + stun_binding_response.set_mapped_port(skt->get_peer_port()); if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) { return srs_error_wrap(err, "stun binding response encode failed"); } - if ((err = udp_mux_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { + if ((err = skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { return srs_error_wrap(err, "stun binding response send failed"); } if (get_session_state() == WAITING_STUN) { set_session_state(DOING_DTLS_HANDSHAKE); - peer_id = udp_mux_skt->get_peer_id(); + peer_id = skt->get_peer_id(); rtc_server->insert_into_id_sessions(peer_id, this); } return err; } -srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; @@ -845,14 +850,14 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock srs_verbose("resend pkt sequence=%u", resend_pkts[i]->rtp_header.get_sequence()); dtls_session->protect_rtp(protected_buf, resend_pkts[i]->payload, nb_protected_buf); - udp_mux_skt->sendto(protected_buf, nb_protected_buf, 0); + skt->sendto(protected_buf, nb_protected_buf, 0); } } return err; } -srs_error_t SrsRtcSession::on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcSession::on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; @@ -899,7 +904,7 @@ srs_error_t SrsRtcSession::on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxS return err; } -srs_error_t SrsRtcSession::on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcSession::on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; @@ -968,18 +973,18 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ return err; } -srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* skt) { srs_trace("rtc session=%s, to=%dms connection established", id().c_str(), srsu2msi(sessionStunTimeout)); - return start_play(udp_mux_skt); + return start_play(skt); } -srs_error_t SrsRtcSession::start_play(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcSession::start_play(SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; srs_freep(strd); - strd = new SrsRtcSenderThread(this, udp_mux_skt, _srs_context->get_id()); + strd = new SrsRtcSenderThread(this, skt, _srs_context->get_id()); uint32_t video_ssrc = 0; uint32_t audio_ssrc = 0; @@ -1012,12 +1017,12 @@ bool SrsRtcSession::is_stun_timeout() return last_stun_time + sessionStunTimeout < srs_get_system_time(); } -srs_error_t SrsRtcSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcSession::on_dtls(SrsUdpMuxSocket* skt) { - return dtls_session->on_dtls(udp_mux_skt); + return dtls_session->on_dtls(skt); } -srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; @@ -1026,8 +1031,8 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) } char unprotected_buf[kRtpPacketSize]; - int nb_unprotected_buf = udp_mux_skt->size(); - if ((err = dtls_session->unprotect_rtcp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf)) != srs_success) { + int nb_unprotected_buf = skt->size(); + if ((err = dtls_session->unprotect_rtcp(unprotected_buf, skt->data(), nb_unprotected_buf)) != srs_success) { return srs_error_wrap(err, "rtcp unprotect failed"); } @@ -1050,7 +1055,7 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) break; } case kRR: { - err = on_rtcp_receiver_report(ph, length, udp_mux_skt); + err = on_rtcp_receiver_report(ph, length, skt); break; } case kSDES: { @@ -1063,11 +1068,11 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) break; } case kRtpFb: { - err = on_rtcp_feedback(ph, length, udp_mux_skt); + err = on_rtcp_feedback(ph, length, skt); break; } case kPsFb: { - err = on_rtcp_ps_feedback(ph, length, udp_mux_skt); + err = on_rtcp_ps_feedback(ph, length, skt); break; } default:{ @@ -1170,14 +1175,14 @@ srs_error_t SrsRtcServer::listen_udp() return err; } -srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) { - if (is_stun(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { - return on_stun(udp_mux_skt); - } else if (is_dtls(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { - return on_dtls(udp_mux_skt); - } else if (is_rtp_or_rtcp(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { - return on_rtp_or_rtcp(udp_mux_skt); + if (is_stun(reinterpret_cast(skt->data()), skt->size())) { + return on_stun(skt); + } else if (is_dtls(reinterpret_cast(skt->data()), skt->size())) { + return on_dtls(skt); + } else if (is_rtp_or_rtcp(reinterpret_cast(skt->data()), skt->size())) { + return on_rtp_or_rtcp(skt); } return srs_error_new(ERROR_RTC_UDP, "unknown udp packet type"); @@ -1246,17 +1251,17 @@ SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id) return iter->second; } -srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; SrsStunPacket stun_req; - if ((err = stun_req.decode(udp_mux_skt->data(), udp_mux_skt->size())) != srs_success) { + if ((err = stun_req.decode(skt->data(), skt->size())) != srs_success) { return srs_error_wrap(err, "decode stun packet failed"); } srs_verbose("recv stun packet from %s, use-candidate=%d, ice-controlled=%d, ice-controlling=%d", - udp_mux_skt->get_peer_id().c_str(), stun_req.get_use_candidate(), stun_req.get_ice_controlled(), stun_req.get_ice_controlling()); + skt->get_peer_id().c_str(), stun_req.get_use_candidate(), stun_req.get_ice_controlled(), stun_req.get_ice_controlling()); std::string username = stun_req.get_username(); SrsRtcSession* rtc_session = find_rtc_session_by_username(username); @@ -1268,44 +1273,44 @@ srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* udp_mux_skt) // to make all logs write to the "correct" pid+cid. rtc_session->switch_to_context(); - return rtc_session->on_stun(udp_mux_skt, &stun_req); + return rtc_session->on_stun(skt, &stun_req); } -srs_error_t SrsRtcServer::on_dtls(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcServer::on_dtls(SrsUdpMuxSocket* skt) { - SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id()); + SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(skt->get_peer_id()); if (rtc_session == NULL) { - return srs_error_new(ERROR_RTC_DTLS, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str()); + return srs_error_new(ERROR_RTC_DTLS, "can not find rtc session by peer_id=%s", skt->get_peer_id().c_str()); } // Now, we got the RTC session to handle the packet, switch to its context // to make all logs write to the "correct" pid+cid. rtc_session->switch_to_context(); - return rtc_session->on_dtls(udp_mux_skt); + return rtc_session->on_dtls(skt); } -srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt) +srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* skt) { srs_error_t err = srs_success; - SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id()); + SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(skt->get_peer_id()); if (rtc_session == NULL) { - return srs_error_new(ERROR_RTC_RTP, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str()); + return srs_error_new(ERROR_RTC_RTP, "can not find rtc session by peer_id=%s", skt->get_peer_id().c_str()); } // Now, we got the RTC session to handle the packet, switch to its context // to make all logs write to the "correct" pid+cid. rtc_session->switch_to_context(); - if (is_rtcp(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { - err = rtc_session->on_rtcp(udp_mux_skt); + if (is_rtcp(reinterpret_cast(skt->data()), skt->size())) { + err = rtc_session->on_rtcp(skt); } else { // We disable it because no RTP for player. // see https://github.com/ossrs/srs/blob/018577e685a07d9de7a47354e7a9c5f77f5f4202/trunk/src/app/srs_app_rtc_conn.cpp#L1081 - // err = rtc_session->on_rtp(udp_mux_skt); + // err = rtc_session->on_rtp(skt); } return err; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index f7cb9137d..f448778b9 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -99,8 +99,8 @@ public: srs_error_t initialize(const SrsRequest& req); - srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt); - srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_dtls(SrsUdpMuxSocket* skt); + srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* skt); srs_error_t on_dtls_application_data(const char* data, const int len); public: srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); @@ -108,7 +108,7 @@ public: srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); private: - srs_error_t handshake(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t handshake(SrsUdpMuxSocket* skt); private: srs_error_t srtp_initialize(); srs_error_t srtp_send_init(); @@ -142,10 +142,10 @@ public: public: virtual srs_error_t cycle(); public: - void update_sendonly_socket(SrsUdpMuxSocket* ukt); + void update_sendonly_socket(SrsUdpMuxSocket* skt); private: - void send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt); - void send_and_free_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* udp_mux_skt); + void send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* skt, int* pnn, int* pnn_rtp_pkts); + void send_and_free_message(SrsSharedPtrMessage* msg, bool is_video, bool is_audio, SrsRtpSharedPacket* pkt, SrsUdpMuxSocket* skt); }; class SrsRtcSession @@ -196,23 +196,23 @@ public: void switch_to_context(); public: - srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req); - srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt); - srs_error_t on_rtcp(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req); + srs_error_t on_dtls(SrsUdpMuxSocket* skt); + srs_error_t on_rtcp(SrsUdpMuxSocket* skt); public: - srs_error_t send_client_hello(SrsUdpMuxSocket* udp_mux_skt); - srs_error_t on_connection_established(SrsUdpMuxSocket* udp_mux_skt); - srs_error_t start_play(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t send_client_hello(SrsUdpMuxSocket* skt); + srs_error_t on_connection_established(SrsUdpMuxSocket* skt); + srs_error_t start_play(SrsUdpMuxSocket* skt); public: bool is_stun_timeout(); private: srs_error_t check_source(); private: - srs_error_t on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req); + srs_error_t on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req); private: - srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); - srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); - srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt); + srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* skt); + srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt); }; class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass, @@ -247,16 +247,16 @@ public: // TODO: FIXME: Support gracefully quit. // TODO: FIXME: Support reload. virtual srs_error_t listen_udp(); - virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt); + virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt); public: virtual srs_error_t listen_api(); SrsRtcSession* create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip); bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session); void check_and_clean_timeout_session(); private: - srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt); - srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt); - srs_error_t on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_stun(SrsUdpMuxSocket* skt); + srs_error_t on_dtls(SrsUdpMuxSocket* skt); + srs_error_t on_rtp_or_rtcp(SrsUdpMuxSocket* skt); private: SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag); SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id);