From efa0851476b74be38a2e177977d29dcfe84e5dc7 Mon Sep 17 00:00:00 2001 From: Li Peng Date: Sun, 4 Sep 2022 21:24:51 +0800 Subject: [PATCH] WebRTC: Refine code and destroy session when tcp close. --- trunk/src/app/srs_app_conn.cpp | 28 +-- trunk/src/app/srs_app_conn.hpp | 6 +- trunk/src/app/srs_app_rtc_conn.cpp | 6 +- trunk/src/app/srs_app_rtc_conn.hpp | 2 +- trunk/src/app/srs_app_rtc_network.cpp | 262 +++++++++++++------------ trunk/src/app/srs_app_rtc_network.hpp | 26 +-- trunk/src/app/srs_app_rtc_server.cpp | 3 + trunk/src/app/srs_app_server.cpp | 2 +- trunk/src/kernel/srs_kernel_error.hpp | 2 + trunk/src/protocol/srs_protocol_st.cpp | 1 + 10 files changed, 181 insertions(+), 157 deletions(-) diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 1f30ca86c..8741fa22f 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -559,18 +559,18 @@ srs_error_t SrsTcpConnection::writev(const iovec *iov, int iov_size, ssize_t* nw return skt->writev(iov, iov_size, nwrite); } -SrsBufferedReader::SrsBufferedReader(ISrsProtocolReadWriter* io) +SrsBufferedReadWriter::SrsBufferedReadWriter(ISrsProtocolReadWriter* io) { io_ = io; buf_ = NULL; } -SrsBufferedReader::~SrsBufferedReader() +SrsBufferedReadWriter::~SrsBufferedReadWriter() { srs_freep(buf_); } -srs_error_t SrsBufferedReader::peek(char* buf, int* size) +srs_error_t SrsBufferedReadWriter::peek(char* buf, int* size) { srs_error_t err = srs_success; @@ -588,7 +588,7 @@ srs_error_t SrsBufferedReader::peek(char* buf, int* size) return err; } -srs_error_t SrsBufferedReader::reload_buffer() +srs_error_t SrsBufferedReadWriter::reload_buffer() { srs_error_t err = srs_success; @@ -608,7 +608,7 @@ srs_error_t SrsBufferedReader::reload_buffer() return err; } -srs_error_t SrsBufferedReader::read(void* buf, size_t size, ssize_t* nread) +srs_error_t SrsBufferedReadWriter::read(void* buf, size_t size, ssize_t* nread) { if (!buf_ || buf_->empty()) { return io_->read(buf, size, nread); @@ -623,7 +623,7 @@ srs_error_t SrsBufferedReader::read(void* buf, size_t size, ssize_t* nread) return srs_success; } -srs_error_t SrsBufferedReader::read_fully(void* buf, size_t size, ssize_t* nread) +srs_error_t SrsBufferedReadWriter::read_fully(void* buf, size_t size, ssize_t* nread) { if (!buf_ || buf_->empty()) { return io_->read_fully(buf, size, nread); @@ -643,42 +643,42 @@ srs_error_t SrsBufferedReader::read_fully(void* buf, size_t size, ssize_t* nread return srs_success; } -void SrsBufferedReader::set_recv_timeout(srs_utime_t tm) +void SrsBufferedReadWriter::set_recv_timeout(srs_utime_t tm) { return io_->set_recv_timeout(tm); } -srs_utime_t SrsBufferedReader::get_recv_timeout() +srs_utime_t SrsBufferedReadWriter::get_recv_timeout() { return io_->get_recv_timeout(); } -int64_t SrsBufferedReader::get_recv_bytes() +int64_t SrsBufferedReadWriter::get_recv_bytes() { return io_->get_recv_bytes(); } -int64_t SrsBufferedReader::get_send_bytes() +int64_t SrsBufferedReadWriter::get_send_bytes() { return io_->get_send_bytes(); } -void SrsBufferedReader::set_send_timeout(srs_utime_t tm) +void SrsBufferedReadWriter::set_send_timeout(srs_utime_t tm) { return io_->set_send_timeout(tm); } -srs_utime_t SrsBufferedReader::get_send_timeout() +srs_utime_t SrsBufferedReadWriter::get_send_timeout() { return io_->get_send_timeout(); } -srs_error_t SrsBufferedReader::write(void* buf, size_t size, ssize_t* nwrite) +srs_error_t SrsBufferedReadWriter::write(void* buf, size_t size, ssize_t* nwrite) { return io_->write(buf, size, nwrite); } -srs_error_t SrsBufferedReader::writev(const iovec *iov, int iov_size, ssize_t* nwrite) +srs_error_t SrsBufferedReadWriter::writev(const iovec *iov, int iov_size, ssize_t* nwrite) { return io_->writev(iov, iov_size, nwrite); } diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 99255cc98..d2e3b4433 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -170,7 +170,7 @@ public: // With a small fast read buffer, to support peek for protocol detecting. Note that directly write to io without any // cache or buffer. -class SrsBufferedReader : public ISrsProtocolReadWriter +class SrsBufferedReadWriter : public ISrsProtocolReadWriter { private: // The under-layer transport. @@ -181,8 +181,8 @@ private: // Current reading position. SrsBuffer* buf_; public: - SrsBufferedReader(ISrsProtocolReadWriter* io); - virtual ~SrsBufferedReader(); + SrsBufferedReadWriter(ISrsProtocolReadWriter* io); + virtual ~SrsBufferedReadWriter(); public: // Peek the head of cache to buf in size of bytes. srs_error_t peek(char* buf, int* size); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 45305bfb6..b85dfabe7 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -149,7 +149,7 @@ srs_error_t SrsSecurityTransport::on_dtls_handshake_done() return srs_error_wrap(err, "srtp init"); } - return network_->on_connection_established(); + return network_->on_dtls_handshake_done(); } srs_error_t SrsSecurityTransport::on_dtls_application_data(const char* buf, const int nb_buf) @@ -249,7 +249,7 @@ srs_error_t SrsPlaintextTransport::on_dtls_alert(std::string type, std::string d srs_error_t SrsPlaintextTransport::on_dtls_handshake_done() { srs_trace("RTC: DTLS handshake done."); - return network_->on_connection_established(); + return network_->on_dtls_handshake_done(); } srs_error_t SrsPlaintextTransport::on_dtls_application_data(const char* data, const int len) @@ -2166,7 +2166,7 @@ srs_error_t SrsRtcConnection::find_publisher(char* buf, int size, SrsRtcPublishS return err; } -srs_error_t SrsRtcConnection::on_connection_established() +srs_error_t SrsRtcConnection::on_dtls_handshake_done() { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 18127f24f..28021e648 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -513,7 +513,7 @@ public: srs_error_t on_rtcp_feedback_twcc(char* buf, int nb_buf); srs_error_t on_rtcp_feedback_remb(SrsRtcpPsfbCommon *rtcp); public: - srs_error_t on_connection_established(); + srs_error_t on_dtls_handshake_done(); srs_error_t on_dtls_alert(std::string type, std::string desc); bool is_alive(); void alive(); diff --git a/trunk/src/app/srs_app_rtc_network.cpp b/trunk/src/app/srs_app_rtc_network.cpp index 5babfbd52..41fc106f4 100644 --- a/trunk/src/app/srs_app_rtc_network.cpp +++ b/trunk/src/app/srs_app_rtc_network.cpp @@ -103,28 +103,26 @@ ISrsKbpsDelta* SrsRtcNetworks::delta() ISrsRtcNetwork::ISrsRtcNetwork() { - establelished_ = false; } ISrsRtcNetwork::~ISrsRtcNetwork() { } -bool ISrsRtcNetwork::is_establelished() -{ - return establelished_; -} - SrsRtcDummyNetwork::SrsRtcDummyNetwork() { - establelished_ = true; } SrsRtcDummyNetwork::~SrsRtcDummyNetwork() { } -srs_error_t SrsRtcDummyNetwork::on_connection_established() +bool SrsRtcDummyNetwork::is_establelished() +{ + return true; +} + +srs_error_t SrsRtcDummyNetwork::on_dtls_handshake_done() { return srs_success; } @@ -195,14 +193,8 @@ srs_error_t SrsRtcUdpNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool return err; } -srs_error_t SrsRtcUdpNetwork::start_active_handshake() -{ - return transport_->start_active_handshake(); -} - srs_error_t SrsRtcUdpNetwork::on_dtls(char* data, int nb_data) { - establelished_ = true; // Update stat when we received data. delta_->add_delta(nb_data, 0); @@ -214,20 +206,20 @@ srs_error_t SrsRtcUdpNetwork::on_dtls_alert(std::string type, std::string desc) return conn_->on_dtls_alert(type, desc); } -srs_error_t SrsRtcUdpNetwork::on_connection_established() +srs_error_t SrsRtcUdpNetwork::on_dtls_handshake_done() { srs_error_t err = srs_success; // If DTLS done packet received many times, such as ARQ, ignore. - if(SrsRtcNetworkStateClosed == state_) { + if(SrsRtcNetworkStateEstablished == state_) { return err; } - if ((err = conn_->on_connection_established()) != srs_success) { + if ((err = conn_->on_dtls_handshake_done()) != srs_success) { return srs_error_wrap(err, "udp"); } - state_ = SrsRtcNetworkStateClosed; + state_ = SrsRtcNetworkStateEstablished; return err; } @@ -303,6 +295,11 @@ void SrsRtcUdpNetwork::set_state(SrsRtcNetworkState state) state_ = state; } +bool SrsRtcUdpNetwork::is_establelished() +{ + return state_ == SrsRtcNetworkStateEstablished; +} + string SrsRtcUdpNetwork::get_peer_ip() { srs_assert(sendonly_skt_); @@ -418,7 +415,7 @@ srs_error_t SrsRtcUdpNetwork::on_binding_request(SrsStunPacket* r, string ice_pw // TODO: FIXME: Add cost. srs_trace("RTC: session STUN done, waiting DTLS handshake."); - if((err = start_active_handshake()) != srs_success) { + if((err = transport_->start_active_handshake()) != srs_success) { return srs_error_wrap(err, "fail to dtls handshake"); } } @@ -459,20 +456,20 @@ void SrsRtcTcpNetwork::update_sendonly_socket(ISrsProtocolReadWriter* skt) sendonly_skt_ = skt; } -srs_error_t SrsRtcTcpNetwork::on_connection_established() +srs_error_t SrsRtcTcpNetwork::on_dtls_handshake_done() { srs_error_t err = srs_success; // If DTLS done packet received many times, such as ARQ, ignore. - if(SrsRtcNetworkStateClosed == state_) { + if(SrsRtcNetworkStateEstablished == state_) { return err; } - if ((err = conn_->on_connection_established()) != srs_success) { + if ((err = conn_->on_dtls_handshake_done()) != srs_success) { return srs_error_wrap(err, "udp"); } - state_ = SrsRtcNetworkStateClosed; + state_ = SrsRtcNetworkStateEstablished; return err; } @@ -546,7 +543,7 @@ srs_error_t SrsRtcTcpNetwork::on_binding_request(SrsStunPacket* r, std::string i // TODO: FIXME: Add cost. srs_trace("RTC: session STUN done, waiting DTLS handshake."); - if((err = start_active_handshake()) != srs_success) { + if((err = transport_->start_active_handshake()) != srs_success) { return srs_error_wrap(err, "fail to dtls handshake"); } } @@ -578,14 +575,8 @@ srs_error_t SrsRtcTcpNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool return err; } -srs_error_t SrsRtcTcpNetwork::start_active_handshake() -{ - return transport_->start_active_handshake(); -} - srs_error_t SrsRtcTcpNetwork::on_dtls(char* data, int nb_data) { - establelished_ = true; // Update stat when we received data. delta_->add_delta(nb_data, 0); @@ -654,6 +645,11 @@ void SrsRtcTcpNetwork::set_state(SrsRtcNetworkState state) state_ = state; } +bool SrsRtcTcpNetwork::is_establelished() +{ + return state_ == SrsRtcNetworkStateEstablished; +} + std::string SrsRtcTcpNetwork::get_peer_ip() { return peer_ip_; @@ -666,32 +662,21 @@ int SrsRtcTcpNetwork::get_peer_port() srs_error_t SrsRtcTcpNetwork::write(void* buf, size_t size, ssize_t* nwrite) { - srs_assert(size <= 65535); srs_error_t err = srs_success; - - char len_str[2]; - SrsBuffer buf_len(len_str, sizeof(len_str)); - buf_len.write_2bytes(size); - ssize_t n = 0; + // Encode and send 2 bytes size, in network order. + srs_assert(size <= 65535); + uint8_t b[2] = {uint8_t(size>>8), uint8_t(size)}; - if((err = sendonly_skt_->write(buf_len.data(), sizeof(len_str), &n)) != srs_success) { + if((err = sendonly_skt_->write((char*)b, sizeof(b), NULL)) != srs_success) { return srs_error_wrap(err, "rtc tcp write len(%d)", size); } - if(nwrite) { - *nwrite = n; - } - - // TODO: FIXME: maybe need to send by a few times - if((err = sendonly_skt_->write(buf, size, &n)) != srs_success) { + // Send the data in size of bytes. + if((err = sendonly_skt_->write(buf, size, nwrite)) != srs_success) { return srs_error_wrap(err, "rtc tcp write body"); } - if(nwrite) { - *nwrite += n; - } - return err; } @@ -701,6 +686,13 @@ void SrsRtcTcpNetwork::set_peer_id(const std::string& ip, int port) peer_port_ = port; } +void SrsRtcTcpNetwork::dispose() +{ + state_ = SrsRtcNetworkStateClosed; +} + +#define SRS_RTC_TCP_PACKET_MAX 1500 + SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm) { manager_ = cm; @@ -711,7 +703,8 @@ SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int p delta_->set_io(skt_, skt_); trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id()); session_ = NULL; - disposing_ = false; + pkt_ = new char[SRS_RTC_TCP_PACKET_MAX]; + _srs_rtc_manager->subscribe(this); } SrsRtcTcpConn::~SrsRtcTcpConn() @@ -720,6 +713,7 @@ SrsRtcTcpConn::~SrsRtcTcpConn() trd_->interrupt(); srs_freep(trd_); + srs_freepa(pkt_); srs_freep(delta_); srs_freep(skt_); } @@ -746,7 +740,6 @@ std::string SrsRtcTcpConn::remote_ip() srs_error_t SrsRtcTcpConn::start() { - _srs_rtc_manager->subscribe(this); return trd_->start(); } @@ -758,10 +751,15 @@ srs_error_t SrsRtcTcpConn::cycle() SrsStatistic::instance()->on_disconnect(get_id().c_str()); SrsStatistic::instance()->kbps_add_delta(get_id().c_str(), delta_); - // Because we use manager to manage this object, - // not the http connection object, so we must remove it here. + // Because we use manager to manage this object, not the http connection object, so we must remove it here. manager_->remove(this); + // Only remove session when network is established, because client might use other UDP network. + if(session_ && session_->tcp()->is_establelished()) { + session_->tcp()->set_state(SrsRtcNetworkStateClosed); + _srs_rtc_manager->remove(session_); + } + // For HTTP-API timeout, we think it's done successfully, // because there may be no request or response for HTTP-API. if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) { @@ -776,37 +774,22 @@ srs_error_t SrsRtcTcpConn::do_cycle() { srs_error_t err = srs_success; - char* pkt = new char[1500]; - SrsAutoFreeA(char, pkt); + if((err = handshake()) != srs_success) { + return srs_error_wrap(err, "process rtc tcp pkt"); + } // TODO: FIXME: Handle all bytes of TCP Connection. - while(!disposing_) { + while(true) { if((err = trd_->pull()) != srs_success) { return srs_error_wrap(err, "rtc tcp conn"); } - // Read length in 2 bytes @doc: https://www.rfc-editor.org/rfc/rfc4571#section-2 - ssize_t nread = 0; uint8_t b[2]; - if((err = skt_->read((char*)b, sizeof(b), &nread)) != srs_success) { - return srs_error_wrap(err, "rtc tcp conn read len"); + int npkt = SRS_RTC_TCP_PACKET_MAX; + if((err = read_packet(pkt_, &npkt)) != srs_success) { + return srs_error_wrap(err, "process rtc tcp pkt"); } - uint16_t npkt = uint16_t(b[0])<<8 | uint16_t(b[1]); - if (npkt > 1500) { - return srs_error_new(ERROR_RTC_TCP_SIZE, "invalid size=%u", npkt); - } - - // Read a RTC pkt such as STUN, DTLS or RTP/RTCP - if((err = skt_->read_fully(pkt, npkt, &nread)) != srs_success) { - return srs_error_wrap(err, "rtc tcp conn read body"); - } - - // Ready to be destroyed, not need to process new packet - if(disposing_) { - return err; - } - - if((err = on_tcp_pkt(pkt, npkt)) != srs_success) { + if((err = on_tcp_pkt(pkt_, npkt)) != srs_success) { return srs_error_wrap(err, "process rtc tcp pkt"); } } @@ -814,89 +797,122 @@ srs_error_t SrsRtcTcpConn::do_cycle() return err; } +srs_error_t SrsRtcTcpConn::handshake() +{ + srs_error_t err = srs_success; + + int npkt = SRS_RTC_TCP_PACKET_MAX; + if((err = read_packet(pkt_, &npkt)) != srs_success) { + return srs_error_wrap(err, "process rtc tcp pkt"); + } + + bool is_stun = srs_is_stun((uint8_t*)pkt_, npkt); + bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t*)pkt_, npkt); + if (!is_stun) { + return srs_error_new(ERROR_RTC_TCP_PACKET, "invalid packet stun=%d, rtp/rtcp=%d, pkt=%s", + is_stun, is_rtp_or_rtcp, srs_string_dumps_hex(pkt_, npkt, 8).c_str()); + } + + // Find session by ping(BindingRequest). + SrsStunPacket ping; + if ((err = ping.decode(pkt_, npkt)) != srs_success) { + return srs_error_wrap(err, "decode stun packet failed"); + } + if (!session_) { + session_ = dynamic_cast(_srs_rtc_manager->find_by_name(ping.get_username())); + } + if (session_) { + session_->switch_to_context(); + } + + srs_trace("recv stun packet from %s:%d, use-candidate=%d, ice-controlled=%d, ice-controlling=%d", + ip_.c_str(), port_, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling()); + + // TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it. + if (!session_) { + return srs_error_new(ERROR_RTC_TCP_STUN, "no session, stun username=%s", ping.get_username().c_str()); + } + + // For each binding request, update the TCP socket. + if (ping.is_binding_request()) { + session_->tcp()->update_sendonly_socket(skt_); + session_->tcp()->set_peer_id(ip_, port_); + } + return session_->tcp()->on_stun(&ping, pkt_, npkt); +} + +srs_error_t SrsRtcTcpConn::read_packet(char* pkt, int* nb_pkt) +{ + srs_error_t err = srs_success; + + // Read length in 2 bytes @doc: https://www.rfc-editor.org/rfc/rfc4571#section-2 + ssize_t nread = 0; uint8_t b[2]; + if((err = skt_->read((char*)b, sizeof(b), &nread)) != srs_success) { + return srs_error_wrap(err, "rtc tcp conn read len"); + } + + uint16_t npkt = uint16_t(b[0])<<8 | uint16_t(b[1]); + if (npkt > *nb_pkt) { + return srs_error_new(ERROR_RTC_TCP_SIZE, "invalid size=%u exceed %d", npkt, *nb_pkt); + } + + // Read a RTC pkt such as STUN, DTLS or RTP/RTCP + if((err = skt_->read_fully(pkt, npkt, &nread)) != srs_success) { + return srs_error_wrap(err, "rtc tcp conn read body"); + } + + *nb_pkt = npkt; + + return err; +} + srs_error_t SrsRtcTcpConn::on_tcp_pkt(char* pkt, int nb_pkt) { srs_error_t err = srs_success; + // Session is destroyed, ignore TCP packet. + if (!session_) return err; + bool is_stun = srs_is_stun((uint8_t*)pkt, nb_pkt); bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t*)pkt, nb_pkt); bool is_rtcp = srs_is_rtcp((uint8_t*)pkt, nb_pkt); - if(!is_stun && !session_) { - srs_warn("rtc tcp received a mess pkt. %d[%s]", nb_pkt, srs_string_dumps_hex(pkt, nb_pkt, 8).c_str()); - return err; - } + // When got any packet, the session is alive now. + session_->alive(); - if (session_) { - // When got any packet, the session is alive now. - session_->alive(); - } - - if(is_stun) { + if (is_stun) { SrsStunPacket ping; if ((err = ping.decode(pkt, nb_pkt)) != srs_success) { return srs_error_wrap(err, "decode stun packet failed"); } - if (!session_) { - session_ = dynamic_cast(_srs_rtc_manager->find_by_name(ping.get_username())); - } - if (session_) { - session_->switch_to_context(); - } - - srs_trace("recv stun packet from %s:%d, use-candidate=%d, ice-controlled=%d, ice-controlling=%d", - ip_.c_str(), port_, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling()); - - // TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it. - if (!session_) { - return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s", - ping.get_username().c_str()); - } - - // For each binding request, update the TCP socket. - if (ping.is_binding_request()) { - session_->tcp()->update_sendonly_socket(skt_); - session_->tcp()->set_peer_id(ip_, port_); - } return session_->tcp()->on_stun(&ping, pkt, nb_pkt); } - // For DTLS, RTCP or RTP, which does not support peer address changing. - if (!session_) { - return srs_error_new(ERROR_RTC_STUN, "no session peer=%s:%d", ip_.c_str(), port_); - } - - // Note that we don't(except error) switch to the context of session, for performance issue. if (is_rtp_or_rtcp && !is_rtcp) { - err = session_->tcp()->on_rtp(pkt, nb_pkt); - if (err != srs_success) { - return srs_error_wrap(err, "rtc tcp rtp"); - } - return err; + return session_->tcp()->on_rtp(pkt, nb_pkt); } if (is_rtp_or_rtcp && is_rtcp) { return session_->tcp()->on_rtcp(pkt, nb_pkt); } + if (srs_is_dtls((uint8_t*)pkt, nb_pkt)) { - srs_trace("receive a dtls pkt"); return session_->tcp()->on_dtls(pkt, nb_pkt); } + return srs_error_new(ERROR_RTC_UDP, "unknown packet"); } void SrsRtcTcpConn::on_before_dispose(ISrsResource* c) { - if(!session_ || disposing_) { - return; - } + if (!session_) return; - SrsRtcConnection *conn = dynamic_cast(c); + SrsRtcConnection* conn = dynamic_cast(c); if(conn == session_) { + session_ = NULL; // the related rtc connection will be disposed srs_trace("RTC: tcp conn diposing, because of rtc connection"); - session_ = NULL; - disposing_ = true; + trd_->interrupt(); } } diff --git a/trunk/src/app/srs_app_rtc_network.hpp b/trunk/src/app/srs_app_rtc_network.hpp index 9e786cd87..d837a2784 100644 --- a/trunk/src/app/srs_app_rtc_network.hpp +++ b/trunk/src/app/srs_app_rtc_network.hpp @@ -80,15 +80,12 @@ public: // For DTLS or Session to call network service. class ISrsRtcNetwork : public ISrsStreamWriter { -protected: - bool establelished_; - public: ISrsRtcNetwork(); virtual ~ISrsRtcNetwork(); public: // Callback when DTLS connected. - virtual srs_error_t on_connection_established() = 0; + virtual srs_error_t on_dtls_handshake_done() = 0; // Callback when DTLS disconnected. virtual srs_error_t on_dtls_alert(std::string type, std::string desc) = 0; public: @@ -97,7 +94,7 @@ public: // Protect RTCP packet by SRTP context. virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher) = 0; public: - bool is_establelished(); + virtual bool is_establelished() = 0; }; // Dummy networks @@ -109,11 +106,12 @@ public: // The interface of ISrsRtcNetwork public: - virtual srs_error_t on_connection_established(); + virtual srs_error_t on_dtls_handshake_done(); virtual srs_error_t on_dtls_alert(std::string type, std::string desc); public: virtual srs_error_t protect_rtp(void* packet, int* nb_cipher); virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher); + virtual bool is_establelished(); // Interface ISrsStreamWriter. public: virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); @@ -150,10 +148,9 @@ private: // DTLS transport functions. public: srs_error_t initialize(SrsSessionConfig* cfg, bool dtls, bool srtp); - virtual srs_error_t start_active_handshake(); virtual srs_error_t on_dtls(char* data, int nb_data); virtual srs_error_t on_dtls_alert(std::string type, std::string desc); - srs_error_t on_connection_established(); + srs_error_t on_dtls_handshake_done(); srs_error_t protect_rtp(void* packet, int* nb_cipher); srs_error_t protect_rtcp(void* packet, int* nb_cipher); // When got data from socket. @@ -164,6 +161,7 @@ public: public: // Connection level state machine, for ARQ of UDP packets. void set_state(SrsRtcNetworkState state); + virtual bool is_establelished(); // ICE reflexive address functions. std::string get_peer_ip(); int get_peer_port(); @@ -193,7 +191,7 @@ public: //ISrsRtcNetwork public: // Callback when DTLS connected. - virtual srs_error_t on_connection_established(); + virtual srs_error_t on_dtls_handshake_done(); // Callback when DTLS disconnected. virtual srs_error_t on_dtls_alert(std::string type, std::string desc); // Protect RTP packet by SRTP context. @@ -208,7 +206,6 @@ private: // DTLS transport functions. public: srs_error_t initialize(SrsSessionConfig* cfg, bool dtls, bool srtp); - virtual srs_error_t start_active_handshake(); virtual srs_error_t on_dtls(char* data, int nb_data); // When got data from socket. public: @@ -218,6 +215,7 @@ public: public: // Connection level state machine, for ARQ of UDP packets. void set_state(SrsRtcNetworkState state); + virtual bool is_establelished(); // ICE reflexive address functions. std::string get_peer_ip(); int get_peer_port(); @@ -226,6 +224,7 @@ public: virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); public: void set_peer_id(const std::string& ip, int port); + void dispose(); }; // For WebRTC over TCP. @@ -241,11 +240,11 @@ private: int port_; // The delta for statistic. SrsNetworkDelta* delta_; - // WebRTC session object. SrsRtcConnection* session_; - bool disposing_; ISrsProtocolReadWriter* skt_; + // Packet cache. + char* pkt_; public: SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm); virtual ~SrsRtcTcpConn(); @@ -266,6 +265,9 @@ public: virtual srs_error_t cycle(); private: virtual srs_error_t do_cycle(); + srs_error_t handshake(); + srs_error_t read_packet(char* pkt, int* nb_pkt); + srs_error_t on_stun(char* pkt, int nb_pkt); srs_error_t on_tcp_pkt(char* pkt, int nb_pkt); // Interface of ISrsDisposingHandler public: diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 3e38771b3..d39d46ef0 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -394,6 +394,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) ++_srs_pps_rstuns->sugar; string peer_id = skt->peer_id(); + // TODO: FIXME: Should support ICE renomination, to switch network between candidates. SrsStunPacket ping; if ((err = ping.decode(data, size)) != srs_success) { return srs_error_wrap(err, "decode stun packet failed"); @@ -549,6 +550,8 @@ srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig* ruc, SrsSdp& local int udp_port = _srs_config->get_rtc_server_listen(); int tcp_port = _srs_config->get_rtc_server_tcp_listen(); string protocol = _srs_config->get_rtc_server_protocol(); + + // TODO: FIXME: Should support only one TCP candidate. set candidates = discover_candidates(ruc); for (set::iterator it = candidates.begin(); it != candidates.end(); ++it) { string hostname; diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index d041e8620..11f9a1696 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1505,7 +1505,7 @@ srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t& stfd, I // If reuse HTTP server with WebRTC TCP, peek to detect the client. if (reuse_rtc_over_server_ && (type == SrsListenerHttpStream || type == SrsListenerHttpsStream)) { SrsTcpConnection* skt = new SrsTcpConnection(fd2); - SrsBufferedReader* io = new SrsBufferedReader(skt); + SrsBufferedReadWriter* io = new SrsBufferedReadWriter(skt); uint8_t b[10]; int nn = sizeof(b); if ((err = io->peek((char*)b, &nn)) != srs_success) { diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 1eb381809..71362fc0c 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -353,6 +353,8 @@ #define ERROR_RTC_NO_TRACK 5030 #define ERROR_RTC_RTCP_EMPTY_RR 5031 #define ERROR_RTC_TCP_SIZE 5032 +#define ERROR_RTC_TCP_PACKET 5033 +#define ERROR_RTC_TCP_STUN 5034 /////////////////////////////////////////////////////// // SRT protocol error. diff --git a/trunk/src/protocol/srs_protocol_st.cpp b/trunk/src/protocol/srs_protocol_st.cpp index 32d320f6f..946e6bb27 100644 --- a/trunk/src/protocol/srs_protocol_st.cpp +++ b/trunk/src/protocol/srs_protocol_st.cpp @@ -681,6 +681,7 @@ srs_error_t SrsTcpClient::connect() return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout)); } + // TODO: FIMXE: The timeout set on io need to be set to new object. srs_freep(io); io = new SrsStSocket(stfd);