From c62901a3ac767110fada3449bfd2d478a5e91404 Mon Sep 17 00:00:00 2001 From: xiaozhihong Date: Sun, 8 Mar 2020 00:30:31 +0800 Subject: [PATCH] make code easy, wrap udp remux socket --- trunk/configure | 2 +- trunk/src/app/srs_app_http_api.cpp | 1 - trunk/src/app/srs_app_listener.cpp | 64 ++++++- trunk/src/app/srs_app_listener.hpp | 31 +++- trunk/src/app/srs_app_rtc.cpp | 86 ---------- trunk/src/app/srs_app_rtc.hpp | 52 ------ trunk/src/app/srs_app_rtc_conn.cpp | 232 +++++++++++++------------- trunk/src/app/srs_app_rtc_conn.hpp | 63 +++---- trunk/src/app/srs_app_server.cpp | 5 +- trunk/src/protocol/srs_stun_stack.cpp | 1 + trunk/src/protocol/srs_stun_stack.hpp | 2 + 11 files changed, 245 insertions(+), 294 deletions(-) delete mode 100644 trunk/src/app/srs_app_rtc.cpp delete mode 100644 trunk/src/app/srs_app_rtc.hpp diff --git a/trunk/configure b/trunk/configure index 047f41883..6d3da2bff 100755 --- a/trunk/configure +++ b/trunk/configure @@ -257,7 +257,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_edge" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds" - "srs_app_mpegts_udp" "srs_app_rtc" "srs_app_rtc_conn" "srs_app_dtls" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" + "srs_app_mpegts_udp" "srs_app_rtc_conn" "srs_app_dtls" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" "srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec" "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" "srs_app_coworkers" "srs_app_hybrid") diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 5e1486093..bb892c57c 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -46,7 +46,6 @@ using namespace std; #include #include #include -#include #include srs_error_t srs_api_response_jsonp(ISrsHttpResponseWriter* w, string callback, string data) diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 2c1703250..c8ecec2c6 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -220,6 +221,57 @@ srs_error_t SrsTcpListener::cycle() return err; } +SrsUdpRemuxSocket::SrsUdpRemuxSocket(srs_netfd_t fd) +{ + nb_buf = SRS_UDP_MAX_PACKET_SIZE; + buf = new char[nb_buf]; + nread = 0; + + lfd = fd; + + fromlen = 0; +} + +SrsUdpRemuxSocket::~SrsUdpRemuxSocket() +{ + srs_freepa(buf); +} + +int SrsUdpRemuxSocket::recvfrom(srs_utime_t timeout) +{ + fromlen = sizeof(from); + nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &fromlen, timeout); + + if (nread > 0) { + char address_string[64]; + char port_string[16]; + if (getnameinfo((sockaddr*)&from, fromlen, + (char*)&address_string, sizeof(address_string), + (char*)&port_string, sizeof(port_string), + NI_NUMERICHOST|NI_NUMERICSERV)) { + return -1; + } + + peer_ip = std::string(address_string); + peer_port = atoi(port_string); + } + + return nread; +} + +int SrsUdpRemuxSocket::sendto(void* data, int size, srs_utime_t timeout) +{ + return srs_sendto(lfd, data, size, (sockaddr*)&from, fromlen, timeout); +} + +std::string SrsUdpRemuxSocket::get_peer_id() +{ + char id_buf[1024]; + int len = snprintf(id_buf, sizeof(id_buf), "%s:%d", peer_ip.c_str(), peer_port); + + return string(id_buf, len); +} + SrsUdpRemuxListener::SrsUdpRemuxListener(ISrsUdpRemuxHandler* h, std::string i, int p) { handler = h; @@ -276,19 +328,17 @@ srs_error_t SrsUdpRemuxListener::cycle() return srs_error_wrap(err, "udp listener"); } - int nread = 0; - sockaddr_storage from; - int nb_from = sizeof(from); - if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) { + SrsUdpRemuxSocket udp_remux_socket(lfd); + + if (udp_remux_socket.recvfrom(SRS_UTIME_NO_TIMEOUT) <= 0) { srs_error("udp recv error"); // remux udp never return continue; } - if ((err = handler->on_udp_packet(lfd, (const sockaddr*)&from, nb_from, buf, nread)) != srs_success) { - //srs_error("udp handle packet error"); + if ((err = handler->on_udp_packet(&udp_remux_socket)) != srs_success) { // remux udp never return - srs_error("udp remux error:%s", srs_error_desc(err).c_str()); + srs_error("udp packet handler error:%s", srs_error_desc(err).c_str()); continue; } diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 3b4d96b80..fc25551e8 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -26,6 +26,8 @@ #include +#include + #include #include @@ -33,6 +35,8 @@ struct sockaddr; +class SrsUdpRemuxSocket; + // The udp packet handler. class ISrsUdpHandler { @@ -61,7 +65,7 @@ public: virtual ~ISrsUdpRemuxHandler(); public: virtual srs_error_t on_stfd_change(srs_netfd_t fd); - virtual srs_error_t on_udp_packet(srs_netfd_t fd, const sockaddr* from, const int fromlen, char* buf, int nb_buf) = 0; + virtual srs_error_t on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket) = 0; }; // The tcp connection handler. @@ -123,6 +127,31 @@ public: virtual srs_error_t cycle(); }; +class SrsUdpRemuxSocket +{ +private: + char* buf; + int nb_buf; + int nread; + srs_netfd_t lfd; + sockaddr_storage from; + int fromlen; + std::string peer_ip; + int peer_port; +public: + SrsUdpRemuxSocket(srs_netfd_t fd); + virtual ~SrsUdpRemuxSocket(); + + int recvfrom(srs_utime_t timeout); + int sendto(void* data, int size, srs_utime_t timeout); + + char* data() { return buf; } + int size() { return nread; } + std::string get_peer_ip() const { return peer_ip; } + int get_peer_port() const { return peer_port; } + std::string get_peer_id(); +}; + class SrsUdpRemuxListener : public ISrsCoroutineHandler { protected: diff --git a/trunk/src/app/srs_app_rtc.cpp b/trunk/src/app/srs_app_rtc.cpp deleted file mode 100644 index e7fbf7fc2..000000000 --- a/trunk/src/app/srs_app_rtc.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/** - * The MIT License (MIT) - * - * Copyright (c) 2013-2020 Winlin - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#include - -#include -#include -#include -#include -#include -using namespace std; - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -static bool is_stun(const char* data, const int size) { - return data != NULL && size > 0 && (data[0] == 0 || data[0] == 1); -} - -static bool is_rtp_or_rtcp(const char* data, const int size) { - return data != NULL && size > 0 && (data[0] >= 128 && data[0] <= 191); -} - -static bool is_dtls(const char* data, const int size) { - return data != NULL && size > 0 && (data[0] >= 20 && data[0] <= 64); -} - -SrsRtc::SrsRtc(SrsRtcServer* rtc_svr) -{ - rtc_server = rtc_svr; -} - -SrsRtc::~SrsRtc() -{ -} - -srs_error_t SrsRtc::on_udp_packet(srs_netfd_t fd, const sockaddr* from, const int fromlen, char* buf, int nb_buf) -{ - char address_string[64]; - char port_string[16]; - if(getnameinfo(from, fromlen, - (char*)&address_string, sizeof(address_string), - (char*)&port_string, sizeof(port_string), - NI_NUMERICHOST|NI_NUMERICSERV)) { - return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address"); - } - std::string peer_ip = std::string(address_string); - int peer_port = atoi(port_string); - - return rtc_server->on_udp_packet(fd, peer_ip, peer_port, from, fromlen, buf, nb_buf); -} diff --git a/trunk/src/app/srs_app_rtc.hpp b/trunk/src/app/srs_app_rtc.hpp deleted file mode 100644 index 8c15bbf16..000000000 --- a/trunk/src/app/srs_app_rtc.hpp +++ /dev/null @@ -1,52 +0,0 @@ -/** - * The MIT License (MIT) - * - * Copyright (c) 2013-2020 Winlin - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#ifndef SRS_APP_RTC_HPP -#define SRS_APP_RTC_HPP - -#include - -struct sockaddr; -#include -#include - -#include -#include -#include - -class SrsRtcServer; - -// The rtc over udp stream receiver -class SrsRtc : virtual public ISrsUdpRemuxHandler -{ -private: - SrsRtcServer* rtc_server; -public: - SrsRtc(SrsRtcServer* rtc_svr); - virtual ~SrsRtc(); -// Interface ISrsUdpHandler -public: - virtual srs_error_t on_udp_packet(srs_netfd_t fd, const sockaddr* from, const int fromlen, char* buf, int nb_buf); -}; - -#endif diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 65cd4fe10..bfc5c09f4 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -279,7 +279,7 @@ srs_error_t SrsSdp::parse_attr(const string& line) return err; } -SrsDtlsSession::SrsDtlsSession(srs_netfd_t lfd, const sockaddr* f, int fl) +SrsDtlsSession::SrsDtlsSession() { dtls = NULL; bio_in = NULL; @@ -291,10 +291,6 @@ SrsDtlsSession::SrsDtlsSession(srs_netfd_t lfd, const sockaddr* f, int fl) srtp_send = NULL; srtp_recv = NULL; - fd = lfd; - from = f; - fromlen = fl; - handshake_done = false; } @@ -302,7 +298,7 @@ SrsDtlsSession::~SrsDtlsSession() { } -srs_error_t SrsDtlsSession::handshake() +srs_error_t SrsDtlsSession::handshake(SrsUdpRemuxSocket* udp_remux_socket) { srs_error_t err = srs_success; @@ -314,11 +310,7 @@ srs_error_t SrsDtlsSession::handshake() int ssl_err = SSL_get_error(dtls, ret); switch(ssl_err) { case SSL_ERROR_NONE: { - srs_trace("dtls handshake done"); - handshake_done = true; - srtp_init(); - srtp_sender_side_init(); - srtp_receiver_side_init(); + err = on_dtls_handshake_done(); } break; @@ -337,25 +329,25 @@ srs_error_t SrsDtlsSession::handshake() if (out_bio_len) { srs_trace("send dtls handshake data"); - srs_sendto(fd, out_bio_data, out_bio_len, from, fromlen, 0); + udp_remux_socket->sendto(out_bio_data, out_bio_len, 0); } return err; } -srs_error_t SrsDtlsSession::on_dtls(const char* data, const int len) +srs_error_t SrsDtlsSession::on_dtls(SrsUdpRemuxSocket* udp_remux_socket) { srs_error_t err = srs_success; if (! handshake_done) { BIO_reset(bio_in); BIO_reset(bio_out); - BIO_write(bio_in, data, len); + BIO_write(bio_in, udp_remux_socket->data(), udp_remux_socket->size()); - handshake(); + handshake(udp_remux_socket); } else { BIO_reset(bio_in); BIO_reset(bio_out); - BIO_write(bio_in, data, len); + BIO_write(bio_in, udp_remux_socket->data(), udp_remux_socket->size()); while (BIO_ctrl_pending(bio_in) > 0) { char dtls_read_buf[8092]; @@ -370,6 +362,14 @@ srs_error_t SrsDtlsSession::on_dtls(const char* data, const int len) return err; } +srs_error_t SrsDtlsSession::on_dtls_handshake_done() +{ + srs_trace("dtls handshake done"); + + handshake_done = true; + return srtp_init(); +} + srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int nb_buf) { srs_error_t err = srs_success; @@ -377,8 +377,7 @@ srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int return err; } - -void SrsDtlsSession::send_client_hello() +void SrsDtlsSession::send_client_hello(SrsUdpRemuxSocket* udp_remux_socket) { if (dtls == NULL) { srs_trace("send client hello"); @@ -391,7 +390,7 @@ void SrsDtlsSession::send_client_hello() SSL_set_bio(dtls, bio_in, bio_out); - handshake(); + handshake(udp_remux_socket); } } @@ -400,8 +399,8 @@ srs_error_t SrsDtlsSession::srtp_init() srs_error_t err = srs_success; unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server - char dtls_srtp_lable[] = "EXTRACTOR-dtls_srtp"; - if (! SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable, strlen(dtls_srtp_lable), NULL, 0, 0)) { + static string dtls_srtp_lable = "EXTRACTOR-dtls_srtp"; + if (! SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) { return srs_error_wrap(err, "SSL_export_keying_material failed"); } @@ -418,8 +417,15 @@ srs_error_t SrsDtlsSession::srtp_init() client_key = sClientMasterKey + sClientMasterSalt; server_key = sServerMasterKey + sServerMasterSalt; - srtp_sender_side_init(); - srtp_receiver_side_init(); + if (srtp_sender_side_init() != srs_success) { + return srs_error_wrap(err, "srtp sender size init failed"); + } + + if (srtp_receiver_side_init() != srs_success) { + return srs_error_wrap(err, "srtp receiver size init failed"); + } + + return err; } srs_error_t SrsDtlsSession::srtp_sender_side_init() @@ -435,7 +441,8 @@ srs_error_t SrsDtlsSession::srtp_sender_side_init() policy.ssrc.type = ssrc_any_outbound; policy.ssrc.value = 0; - policy.window_size = 8192; // seq 相差8192认为无效 + // TODO: adjust window_size + policy.window_size = 8192; policy.allow_repeat_tx = 1; policy.next = NULL; @@ -444,6 +451,7 @@ srs_error_t SrsDtlsSession::srtp_sender_side_init() policy.key = key; if (srtp_create(&srtp_send, &policy) != 0) { + delete [] key; return srs_error_wrap(err, "srtp_create failed"); } @@ -465,7 +473,8 @@ srs_error_t SrsDtlsSession::srtp_receiver_side_init() policy.ssrc.type = ssrc_any_inbound; policy.ssrc.value = 0; - policy.window_size = 8192; // seq 相差8192认为无效 + // TODO: adjust window_size + policy.window_size = 8192; policy.allow_repeat_tx = 1; policy.next = NULL; @@ -474,6 +483,7 @@ srs_error_t SrsDtlsSession::srtp_receiver_side_init() policy.key = key; if (srtp_create(&srtp_recv, &policy) != 0) { + delete [] key; return srs_error_wrap(err, "srtp_create failed"); } @@ -482,8 +492,9 @@ srs_error_t SrsDtlsSession::srtp_receiver_side_init() return err; } -SrsRtcSession::SrsRtcSession() +SrsRtcSession::SrsRtcSession(SrsRtcServer* svr) { + rtc_server = svr; session_state = INIT; dtls_session = NULL; } @@ -492,42 +503,73 @@ SrsRtcSession::~SrsRtcSession() { } -srs_error_t SrsRtcSession::on_binding_request(const SrsStunPacket& stun_packet, const string& peer_ip, const uint16_t peer_port, - SrsStunPacket& stun_binding_response) +srs_error_t SrsRtcSession::on_stun(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req) { srs_error_t err = srs_success; - stun_binding_response.set_message_type(BindingResponse); - stun_binding_response.set_local_ufrag(stun_packet.get_remote_ufrag()); - stun_binding_response.set_remote_ufrag(stun_packet.get_local_ufrag()); - stun_binding_response.set_transcation_id(stun_packet.get_transcation_id()); - stun_binding_response.set_mapped_address(be32toh(inet_addr(peer_ip.c_str()))); - stun_binding_response.set_mapped_port(peer_port); + if (stun_req->is_binding_request()) { + if (on_binding_request(udp_remux_socket, stun_req) != srs_success) { + return srs_error_wrap(err, "stun binding request failed"); + } + } return err; } -srs_error_t SrsRtcSession::send_client_hello(srs_netfd_t fd, const sockaddr* from, int fromlen) +srs_error_t SrsRtcSession::on_binding_request(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req) { - if (dtls_session == NULL) { - dtls_session = new SrsDtlsSession(fd, from, fromlen); + srs_error_t err = srs_success; + + SrsStunPacket stun_binding_response; + char buf[1460]; + SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); + SrsAutoFree(SrsBuffer, stream); + + stun_binding_response.set_message_type(BindingResponse); + stun_binding_response.set_local_ufrag(stun_req->get_remote_ufrag()); + stun_binding_response.set_remote_ufrag(stun_req->get_local_ufrag()); + stun_binding_response.set_transcation_id(stun_req->get_transcation_id()); + // FIXME: inet_addr is deprecated, IPV6 support + stun_binding_response.set_mapped_address(be32toh(inet_addr(udp_remux_socket->get_peer_ip().c_str()))); + stun_binding_response.set_mapped_port(udp_remux_socket->get_peer_port()); + + if (stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream) != srs_success) { + return srs_error_wrap(err, "stun binding response encode failed"); } - dtls_session->send_client_hello(); + if (udp_remux_socket->sendto(stream->data(), stream->pos(), 0) <= 0) { + return srs_error_wrap(err, "stun binding response send failed"); + } + + if (get_session_state() == WAITING_STUN) { + set_session_state(DOING_DTLS_HANDSHAKE); + send_client_hello(udp_remux_socket); + + string peer_id = udp_remux_socket->get_peer_id(); + rtc_server->insert_into_id_sessions(peer_id, this); + } + + // TODO: dtls send client retry + + return err; } -srs_error_t SrsRtcSession::on_dtls(const char* buf, const int nb_buf) +srs_error_t SrsRtcSession::send_client_hello(SrsUdpRemuxSocket* udp_remux_socket) { - dtls_session->on_dtls(buf, nb_buf); + if (dtls_session == NULL) { + dtls_session = new SrsDtlsSession(); + } + + dtls_session->send_client_hello(udp_remux_socket); } -srs_error_t SrsRtcSession::send_packet() +srs_error_t SrsRtcSession::on_dtls(SrsUdpRemuxSocket* udp_remux_socket) { + return dtls_session->on_dtls(udp_remux_socket); } -SrsRtcServer::SrsRtcServer(SrsServer* svr) +SrsRtcServer::SrsRtcServer() { - server = svr; } SrsRtcServer::~SrsRtcServer() @@ -541,32 +583,32 @@ srs_error_t SrsRtcServer::initialize() return err; } -srs_error_t SrsRtcServer::on_udp_packet(srs_netfd_t fd, const string& peer_ip, const int peer_port, - const sockaddr* from, const int fromlen, const char* data, const int size) +srs_error_t SrsRtcServer::on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket) { srs_error_t err = srs_success; - if (is_stun(data, size)) { - return on_stun(fd, peer_ip, peer_port, from, fromlen, data, size); - } else if (is_dtls(data, size)) { - srs_trace("dtls"); - return on_dtls(fd, peer_ip, peer_port, from, fromlen, data, size); - } else if (is_rtp_or_rtcp(data, size)) { - return on_rtp_or_rtcp(fd, peer_ip, peer_port, from, fromlen, data, size); + if (is_stun(udp_remux_socket->data(), udp_remux_socket->size())) { + return on_stun(udp_remux_socket); + } else if (is_dtls(udp_remux_socket->data(), udp_remux_socket->size())) { + return on_dtls(udp_remux_socket); + } else if (is_rtp_or_rtcp(udp_remux_socket->data(), udp_remux_socket->size())) { + return on_rtp_or_rtcp(udp_remux_socket); } - return srs_error_wrap(err, "unknown packet type"); + return srs_error_wrap(err, "unknown udp packet type"); } SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp) { - SrsRtcSession* session = new SrsRtcSession(); + SrsRtcSession* session = new SrsRtcSession(this); - std::string local_ufrag = gen_random_str(8); std::string local_pwd = gen_random_str(32); - + std::string local_ufrag = ""; while (true) { - bool ret = map_ufrag_sessions.insert(make_pair(remote_sdp.get_ice_ufrag(), session)).second; + local_ufrag = gen_random_str(8); + std::string username = local_ufrag + ":" + remote_sdp.get_ice_ufrag(); + + bool ret = map_username_session.insert(make_pair(username, session)).second; if (ret) { break; } @@ -583,105 +625,71 @@ SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsSdp& remote_sdp, SrsSdp return session; } -SrsRtcSession* SrsRtcServer::find_rtc_session_by_ip_port(const string& peer_ip, const uint16_t peer_port) +SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id) { - ostringstream os; - os << peer_ip << ":" << peer_port; - string key = os.str(); - map::iterator iter = map_ip_port_sessions.find(key); - if (iter == map_ip_port_sessions.end()) { + map::iterator iter = map_id_session.find(peer_id); + if (iter == map_id_session.end()) { return NULL; } return iter->second; } -srs_error_t SrsRtcServer::on_stun(srs_netfd_t fd, const string& peer_ip, const int peer_port, - const sockaddr* from, const int fromlen, const char* data, const int size) +srs_error_t SrsRtcServer::on_stun(SrsUdpRemuxSocket* udp_remux_socket) { srs_error_t err = srs_success; - srs_trace("peer %s:%d stun", peer_ip.c_str(), peer_port); + srs_trace("recv stun packet from %s", udp_remux_socket->get_peer_id().c_str()); SrsStunPacket stun_req; - if (stun_req.decode(data, size) != srs_success) { - return srs_error_wrap(err, "decode stun failed"); + if (stun_req.decode(udp_remux_socket->data(), udp_remux_socket->size()) != srs_success) { + return srs_error_wrap(err, "decode stun packet failed"); } - std::string remote_ufrag = stun_req.get_remote_ufrag(); - SrsRtcSession* rtc_session = find_rtc_session_by_ufrag(remote_ufrag); + std::string username = stun_req.get_username(); + SrsRtcSession* rtc_session = find_rtc_session_by_username(username); if (rtc_session == NULL) { - return srs_error_wrap(err, "can not find rtc_session, ufrag=%s", remote_ufrag.c_str()); + return srs_error_wrap(err, "can not find rtc_session, stun username=%s", username.c_str()); } - SrsStunPacket stun_rsp; - char buf[1460]; - SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); - SrsAutoFree(SrsBuffer, stream); - - if (stun_req.is_binding_request()) { - if (rtc_session->on_binding_request(stun_req, peer_ip, peer_port, stun_rsp) != srs_success) { - return srs_error_wrap(err, "stun binding request failed"); - } - } - - if (stun_rsp.encode(rtc_session->get_local_sdp()->get_ice_pwd(), stream) != srs_success) { - return srs_error_wrap(err, "stun rsp encode failed"); - } - - srs_sendto(fd, stream->data(), stream->pos(), from, fromlen, 0); - - if (rtc_session->get_session_state() == WAITING_STUN) { - rtc_session->set_session_state(DOING_DTLS_HANDSHAKE); - rtc_session->send_client_hello(fd, from, fromlen); - - insert_into_ip_port_sessions(peer_ip, peer_port, rtc_session); - } - - return err; + return rtc_session->on_stun(udp_remux_socket, &stun_req); } -srs_error_t SrsRtcServer::on_dtls(srs_netfd_t fd, const string& peer_ip, const int peer_port, - const sockaddr* from, const int fromlen, const char* data, const int size) +srs_error_t SrsRtcServer::on_dtls(SrsUdpRemuxSocket* udp_remux_socket) { srs_error_t err = srs_success; srs_trace("on dtls"); // FIXME - SrsRtcSession* rtc_session = find_rtc_session_by_ip_port(peer_ip, peer_port); + SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_remux_socket->get_peer_id()); if (rtc_session == NULL) { - return srs_error_wrap(err, "can not find rtc session by ip=%s, port=%u", peer_ip.c_str(), peer_port); + return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_remux_socket->get_peer_id().c_str()); } - rtc_session->on_dtls(data, size); + rtc_session->on_dtls(udp_remux_socket); return err; } -srs_error_t SrsRtcServer::on_rtp_or_rtcp(srs_netfd_t fd, const string& peer_ip, const int peer_port, - const sockaddr* from, const int fromlen, const char* data, const int size) +srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket) { srs_error_t err = srs_success; srs_trace("on rtp/rtcp"); return err; } -SrsRtcSession* SrsRtcServer::find_rtc_session_by_ufrag(const std::string& ufrag) +SrsRtcSession* SrsRtcServer::find_rtc_session_by_username(const std::string& username) { - map::iterator iter = map_ufrag_sessions.find(ufrag); - if (iter == map_ufrag_sessions.end()) { + map::iterator iter = map_username_session.find(username); + if (iter == map_username_session.end()) { return NULL; } return iter->second; } -bool SrsRtcServer::insert_into_ip_port_sessions(const string& peer_ip, const uint16_t peer_port, SrsRtcSession* rtc_session) +bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcSession* rtc_session) { - ostringstream os; - os << peer_ip << ":" << peer_port; - string key = os.str(); - - return map_ip_port_sessions.insert(make_pair(key, rtc_session)).second; + return map_id_session.insert(make_pair(peer_id, rtc_session)).second; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index b9d60d07e..e715e8151 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -25,6 +25,7 @@ #define SRS_APP_RTC_CONN_HPP #include +#include #include #include @@ -34,6 +35,7 @@ #include #include +class SrsUdpRemuxSocket; class SrsServer; class SrsStunPacket; @@ -103,37 +105,39 @@ private: srtp_t srtp_send; srtp_t srtp_recv; - srs_netfd_t fd; - const sockaddr* from; - int fromlen; - bool handshake_done; public: - SrsDtlsSession(srs_netfd_t lfd, const sockaddr* f, int fl); + SrsDtlsSession(); virtual ~SrsDtlsSession(); - srs_error_t on_dtls(const char* data, const int len); + srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t on_dtls_handshake_done(); srs_error_t on_dtls_application_data(const char* data, const int len); - void send_client_hello(); - srs_error_t handshake(); + void send_client_hello(SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t handshake(SrsUdpRemuxSocket* udp_remux_socket); + +private: srs_error_t srtp_init(); srs_error_t srtp_sender_side_init(); srs_error_t srtp_receiver_side_init(); }; +class SrsRtcServer; + class SrsRtcSession { private: + SrsRtcServer* rtc_server; SrsSdp remote_sdp; SrsSdp local_sdp; SrsRtcSessionStateType session_state; SrsDtlsSession* dtls_session; public: - SrsRtcSession(); + SrsRtcSession(SrsRtcServer* svr); virtual ~SrsRtcSession(); - +public: SrsSdp* get_local_sdp() { return &local_sdp; } SrsSdp* get_remote_sdp() { return &remote_sdp; } SrsRtcSessionStateType get_session_state() { return session_state; } @@ -141,40 +145,37 @@ public: void set_local_sdp(const SrsSdp& sdp) { local_sdp = sdp; } void set_remote_sdp(const SrsSdp& sdp) { remote_sdp = sdp; } void set_session_state(SrsRtcSessionStateType state) { session_state = state; } - - srs_error_t on_udp_packet(const std::string& peer_ip, const int peer_port, const char* data, const int size); - srs_error_t on_binding_request(const SrsStunPacket& stun_packet, const std::string& peer_ip, const uint16_t peer_port, - SrsStunPacket& stun_binding_response); - srs_error_t on_dtls(const char* buf, const int nb_buf); - srs_error_t send_client_hello(srs_netfd_t fd, const sockaddr* from, int fromlen); - srs_error_t send_packet(); +public: + srs_error_t on_stun(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req); + srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket); +public: + srs_error_t send_client_hello(SrsUdpRemuxSocket* udp_remux_socket); +private: + srs_error_t on_binding_request(SrsUdpRemuxSocket* udp_remux_socket, SrsStunPacket* stun_req); }; -class SrsRtcServer +class SrsRtcServer : public ISrsUdpRemuxHandler { private: - SrsServer* server; - std::map map_ufrag_sessions; - std::map map_ip_port_sessions; + std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) + std::map map_id_session; // key: peerip(ip + ":" + port) public: - SrsRtcServer(SrsServer* svr); + SrsRtcServer(); virtual ~SrsRtcServer(); public: virtual srs_error_t initialize(); - virtual srs_error_t on_udp_packet(srs_netfd_t fd, const std::string& peer_ip, const int peer_port, - const sockaddr* from, const int fromlen, const char* data, const int size); + virtual srs_error_t on_udp_packet(SrsUdpRemuxSocket* udp_remux_socket); SrsRtcSession* create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp); - bool insert_into_ip_port_sessions(const std::string& peer_ip, const uint16_t peer_port, SrsRtcSession* rtc_session); + bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session); private: - srs_error_t on_stun(srs_netfd_t fd, const std::string& peer_ip, const int peer_port, const sockaddr* from, const int fromlen, const char* data, const int size); - srs_error_t on_dtls(srs_netfd_t fd, const std::string& peer_ip, const int peer_port, const sockaddr* from, const int fromlen, const char* data, const int size); - srs_error_t on_rtp_or_rtcp(srs_netfd_t fd, const std::string& peer_ip, const int peer_port, - const sockaddr* from, const int fromlen, const char* data, const int size); + srs_error_t on_stun(SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t on_dtls(SrsUdpRemuxSocket* udp_remux_socket); + srs_error_t on_rtp_or_rtcp(SrsUdpRemuxSocket* udp_remux_socket); private: - SrsRtcSession* find_rtc_session_by_ufrag(const std::string& ufrag); - SrsRtcSession* find_rtc_session_by_ip_port(const std::string& peer_ip, const uint16_t peer_port); + SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag); + SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id); }; #endif diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index cdd939408..d630ca35d 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -44,7 +44,6 @@ using namespace std; #include #include #include -#include #include #include #include @@ -342,7 +341,7 @@ SrsUdpCasterListener::~SrsUdpCasterListener() SrsRtcListener::SrsRtcListener(SrsServer* svr, SrsRtcServer* rtc_svr, SrsListenerType t) : SrsListener(svr, t) { srs_assert(type == SrsListenerRtc); - rtc = new SrsRtc(rtc_svr); + rtc = rtc_svr; } SrsRtcListener::~SrsRtcListener() @@ -528,7 +527,7 @@ SrsServer::SrsServer() // new these objects in initialize instead. http_api_mux = new SrsHttpServeMux(); http_server = new SrsHttpServer(this); - rtc_server = new SrsRtcServer(this); + rtc_server = new SrsRtcServer(); http_heartbeat = new SrsHttpHeartbeat(); ingester = new SrsIngester(); } diff --git a/trunk/src/protocol/srs_stun_stack.cpp b/trunk/src/protocol/srs_stun_stack.cpp index 50284c785..a057715d6 100644 --- a/trunk/src/protocol/srs_stun_stack.cpp +++ b/trunk/src/protocol/srs_stun_stack.cpp @@ -129,6 +129,7 @@ srs_error_t SrsStunPacket::decode(const char* buf, const int nb_buf) switch (type) { // FIXME: enum case 6: { + username = val; size_t p = val.find(":"); if (p != string::npos) { local_ufrag = val.substr(0, p); diff --git a/trunk/src/protocol/srs_stun_stack.hpp b/trunk/src/protocol/srs_stun_stack.hpp index 80bba8a15..aa2bc2192 100644 --- a/trunk/src/protocol/srs_stun_stack.hpp +++ b/trunk/src/protocol/srs_stun_stack.hpp @@ -70,6 +70,7 @@ class SrsStunPacket { private: uint16_t message_type; + std::string username; std::string local_ufrag; std::string remote_ufrag; std::string transcation_id; @@ -83,6 +84,7 @@ public: bool is_binding_response() const { return message_type == BindingResponse; } uint16_t get_message_type() const { return message_type; } + std::string get_username() const { return username; } std::string get_local_ufrag() const { return local_ufrag; } std::string get_remote_ufrag() const { return remote_ufrag; } std::string get_transcation_id() const { return transcation_id; }