1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

WebRTC: Refine code and destroy session when tcp close.

This commit is contained in:
Li Peng 2022-09-04 21:24:51 +08:00 committed by winlin
parent 07339e1417
commit efa0851476
10 changed files with 181 additions and 157 deletions

View file

@ -103,28 +103,26 @@ ISrsKbpsDelta* SrsRtcNetworks::delta()
ISrsRtcNetwork::ISrsRtcNetwork()
{
establelished_ = false;
}
ISrsRtcNetwork::~ISrsRtcNetwork()
{
}
bool ISrsRtcNetwork::is_establelished()
{
return establelished_;
}
SrsRtcDummyNetwork::SrsRtcDummyNetwork()
{
establelished_ = true;
}
SrsRtcDummyNetwork::~SrsRtcDummyNetwork()
{
}
srs_error_t SrsRtcDummyNetwork::on_connection_established()
bool SrsRtcDummyNetwork::is_establelished()
{
return true;
}
srs_error_t SrsRtcDummyNetwork::on_dtls_handshake_done()
{
return srs_success;
}
@ -195,14 +193,8 @@ srs_error_t SrsRtcUdpNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool
return err;
}
srs_error_t SrsRtcUdpNetwork::start_active_handshake()
{
return transport_->start_active_handshake();
}
srs_error_t SrsRtcUdpNetwork::on_dtls(char* data, int nb_data)
{
establelished_ = true;
// Update stat when we received data.
delta_->add_delta(nb_data, 0);
@ -214,20 +206,20 @@ srs_error_t SrsRtcUdpNetwork::on_dtls_alert(std::string type, std::string desc)
return conn_->on_dtls_alert(type, desc);
}
srs_error_t SrsRtcUdpNetwork::on_connection_established()
srs_error_t SrsRtcUdpNetwork::on_dtls_handshake_done()
{
srs_error_t err = srs_success;
// If DTLS done packet received many times, such as ARQ, ignore.
if(SrsRtcNetworkStateClosed == state_) {
if(SrsRtcNetworkStateEstablished == state_) {
return err;
}
if ((err = conn_->on_connection_established()) != srs_success) {
if ((err = conn_->on_dtls_handshake_done()) != srs_success) {
return srs_error_wrap(err, "udp");
}
state_ = SrsRtcNetworkStateClosed;
state_ = SrsRtcNetworkStateEstablished;
return err;
}
@ -303,6 +295,11 @@ void SrsRtcUdpNetwork::set_state(SrsRtcNetworkState state)
state_ = state;
}
bool SrsRtcUdpNetwork::is_establelished()
{
return state_ == SrsRtcNetworkStateEstablished;
}
string SrsRtcUdpNetwork::get_peer_ip()
{
srs_assert(sendonly_skt_);
@ -418,7 +415,7 @@ srs_error_t SrsRtcUdpNetwork::on_binding_request(SrsStunPacket* r, string ice_pw
// TODO: FIXME: Add cost.
srs_trace("RTC: session STUN done, waiting DTLS handshake.");
if((err = start_active_handshake()) != srs_success) {
if((err = transport_->start_active_handshake()) != srs_success) {
return srs_error_wrap(err, "fail to dtls handshake");
}
}
@ -459,20 +456,20 @@ void SrsRtcTcpNetwork::update_sendonly_socket(ISrsProtocolReadWriter* skt)
sendonly_skt_ = skt;
}
srs_error_t SrsRtcTcpNetwork::on_connection_established()
srs_error_t SrsRtcTcpNetwork::on_dtls_handshake_done()
{
srs_error_t err = srs_success;
// If DTLS done packet received many times, such as ARQ, ignore.
if(SrsRtcNetworkStateClosed == state_) {
if(SrsRtcNetworkStateEstablished == state_) {
return err;
}
if ((err = conn_->on_connection_established()) != srs_success) {
if ((err = conn_->on_dtls_handshake_done()) != srs_success) {
return srs_error_wrap(err, "udp");
}
state_ = SrsRtcNetworkStateClosed;
state_ = SrsRtcNetworkStateEstablished;
return err;
}
@ -546,7 +543,7 @@ srs_error_t SrsRtcTcpNetwork::on_binding_request(SrsStunPacket* r, std::string i
// TODO: FIXME: Add cost.
srs_trace("RTC: session STUN done, waiting DTLS handshake.");
if((err = start_active_handshake()) != srs_success) {
if((err = transport_->start_active_handshake()) != srs_success) {
return srs_error_wrap(err, "fail to dtls handshake");
}
}
@ -578,14 +575,8 @@ srs_error_t SrsRtcTcpNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool
return err;
}
srs_error_t SrsRtcTcpNetwork::start_active_handshake()
{
return transport_->start_active_handshake();
}
srs_error_t SrsRtcTcpNetwork::on_dtls(char* data, int nb_data)
{
establelished_ = true;
// Update stat when we received data.
delta_->add_delta(nb_data, 0);
@ -654,6 +645,11 @@ void SrsRtcTcpNetwork::set_state(SrsRtcNetworkState state)
state_ = state;
}
bool SrsRtcTcpNetwork::is_establelished()
{
return state_ == SrsRtcNetworkStateEstablished;
}
std::string SrsRtcTcpNetwork::get_peer_ip()
{
return peer_ip_;
@ -666,32 +662,21 @@ int SrsRtcTcpNetwork::get_peer_port()
srs_error_t SrsRtcTcpNetwork::write(void* buf, size_t size, ssize_t* nwrite)
{
srs_assert(size <= 65535);
srs_error_t err = srs_success;
char len_str[2];
SrsBuffer buf_len(len_str, sizeof(len_str));
buf_len.write_2bytes(size);
ssize_t n = 0;
// Encode and send 2 bytes size, in network order.
srs_assert(size <= 65535);
uint8_t b[2] = {uint8_t(size>>8), uint8_t(size)};
if((err = sendonly_skt_->write(buf_len.data(), sizeof(len_str), &n)) != srs_success) {
if((err = sendonly_skt_->write((char*)b, sizeof(b), NULL)) != srs_success) {
return srs_error_wrap(err, "rtc tcp write len(%d)", size);
}
if(nwrite) {
*nwrite = n;
}
// TODO: FIXME: maybe need to send by a few times
if((err = sendonly_skt_->write(buf, size, &n)) != srs_success) {
// Send the data in size of bytes.
if((err = sendonly_skt_->write(buf, size, nwrite)) != srs_success) {
return srs_error_wrap(err, "rtc tcp write body");
}
if(nwrite) {
*nwrite += n;
}
return err;
}
@ -701,6 +686,13 @@ void SrsRtcTcpNetwork::set_peer_id(const std::string& ip, int port)
peer_port_ = port;
}
void SrsRtcTcpNetwork::dispose()
{
state_ = SrsRtcNetworkStateClosed;
}
#define SRS_RTC_TCP_PACKET_MAX 1500
SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm)
{
manager_ = cm;
@ -711,7 +703,8 @@ SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int p
delta_->set_io(skt_, skt_);
trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id());
session_ = NULL;
disposing_ = false;
pkt_ = new char[SRS_RTC_TCP_PACKET_MAX];
_srs_rtc_manager->subscribe(this);
}
SrsRtcTcpConn::~SrsRtcTcpConn()
@ -720,6 +713,7 @@ SrsRtcTcpConn::~SrsRtcTcpConn()
trd_->interrupt();
srs_freep(trd_);
srs_freepa(pkt_);
srs_freep(delta_);
srs_freep(skt_);
}
@ -746,7 +740,6 @@ std::string SrsRtcTcpConn::remote_ip()
srs_error_t SrsRtcTcpConn::start()
{
_srs_rtc_manager->subscribe(this);
return trd_->start();
}
@ -758,10 +751,15 @@ srs_error_t SrsRtcTcpConn::cycle()
SrsStatistic::instance()->on_disconnect(get_id().c_str());
SrsStatistic::instance()->kbps_add_delta(get_id().c_str(), delta_);
// Because we use manager to manage this object,
// not the http connection object, so we must remove it here.
// Because we use manager to manage this object, not the http connection object, so we must remove it here.
manager_->remove(this);
// Only remove session when network is established, because client might use other UDP network.
if(session_ && session_->tcp()->is_establelished()) {
session_->tcp()->set_state(SrsRtcNetworkStateClosed);
_srs_rtc_manager->remove(session_);
}
// For HTTP-API timeout, we think it's done successfully,
// because there may be no request or response for HTTP-API.
if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) {
@ -776,37 +774,22 @@ srs_error_t SrsRtcTcpConn::do_cycle()
{
srs_error_t err = srs_success;
char* pkt = new char[1500];
SrsAutoFreeA(char, pkt);
if((err = handshake()) != srs_success) {
return srs_error_wrap(err, "process rtc tcp pkt");
}
// TODO: FIXME: Handle all bytes of TCP Connection.
while(!disposing_) {
while(true) {
if((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "rtc tcp conn");
}
// Read length in 2 bytes @doc: https://www.rfc-editor.org/rfc/rfc4571#section-2
ssize_t nread = 0; uint8_t b[2];
if((err = skt_->read((char*)b, sizeof(b), &nread)) != srs_success) {
return srs_error_wrap(err, "rtc tcp conn read len");
int npkt = SRS_RTC_TCP_PACKET_MAX;
if((err = read_packet(pkt_, &npkt)) != srs_success) {
return srs_error_wrap(err, "process rtc tcp pkt");
}
uint16_t npkt = uint16_t(b[0])<<8 | uint16_t(b[1]);
if (npkt > 1500) {
return srs_error_new(ERROR_RTC_TCP_SIZE, "invalid size=%u", npkt);
}
// Read a RTC pkt such as STUN, DTLS or RTP/RTCP
if((err = skt_->read_fully(pkt, npkt, &nread)) != srs_success) {
return srs_error_wrap(err, "rtc tcp conn read body");
}
// Ready to be destroyed, not need to process new packet
if(disposing_) {
return err;
}
if((err = on_tcp_pkt(pkt, npkt)) != srs_success) {
if((err = on_tcp_pkt(pkt_, npkt)) != srs_success) {
return srs_error_wrap(err, "process rtc tcp pkt");
}
}
@ -814,89 +797,122 @@ srs_error_t SrsRtcTcpConn::do_cycle()
return err;
}
srs_error_t SrsRtcTcpConn::handshake()
{
srs_error_t err = srs_success;
int npkt = SRS_RTC_TCP_PACKET_MAX;
if((err = read_packet(pkt_, &npkt)) != srs_success) {
return srs_error_wrap(err, "process rtc tcp pkt");
}
bool is_stun = srs_is_stun((uint8_t*)pkt_, npkt);
bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t*)pkt_, npkt);
if (!is_stun) {
return srs_error_new(ERROR_RTC_TCP_PACKET, "invalid packet stun=%d, rtp/rtcp=%d, pkt=%s",
is_stun, is_rtp_or_rtcp, srs_string_dumps_hex(pkt_, npkt, 8).c_str());
}
// Find session by ping(BindingRequest).
SrsStunPacket ping;
if ((err = ping.decode(pkt_, npkt)) != srs_success) {
return srs_error_wrap(err, "decode stun packet failed");
}
if (!session_) {
session_ = dynamic_cast<SrsRtcConnection*>(_srs_rtc_manager->find_by_name(ping.get_username()));
}
if (session_) {
session_->switch_to_context();
}
srs_trace("recv stun packet from %s:%d, use-candidate=%d, ice-controlled=%d, ice-controlling=%d",
ip_.c_str(), port_, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling());
// TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it.
if (!session_) {
return srs_error_new(ERROR_RTC_TCP_STUN, "no session, stun username=%s", ping.get_username().c_str());
}
// For each binding request, update the TCP socket.
if (ping.is_binding_request()) {
session_->tcp()->update_sendonly_socket(skt_);
session_->tcp()->set_peer_id(ip_, port_);
}
return session_->tcp()->on_stun(&ping, pkt_, npkt);
}
srs_error_t SrsRtcTcpConn::read_packet(char* pkt, int* nb_pkt)
{
srs_error_t err = srs_success;
// Read length in 2 bytes @doc: https://www.rfc-editor.org/rfc/rfc4571#section-2
ssize_t nread = 0; uint8_t b[2];
if((err = skt_->read((char*)b, sizeof(b), &nread)) != srs_success) {
return srs_error_wrap(err, "rtc tcp conn read len");
}
uint16_t npkt = uint16_t(b[0])<<8 | uint16_t(b[1]);
if (npkt > *nb_pkt) {
return srs_error_new(ERROR_RTC_TCP_SIZE, "invalid size=%u exceed %d", npkt, *nb_pkt);
}
// Read a RTC pkt such as STUN, DTLS or RTP/RTCP
if((err = skt_->read_fully(pkt, npkt, &nread)) != srs_success) {
return srs_error_wrap(err, "rtc tcp conn read body");
}
*nb_pkt = npkt;
return err;
}
srs_error_t SrsRtcTcpConn::on_tcp_pkt(char* pkt, int nb_pkt)
{
srs_error_t err = srs_success;
// Session is destroyed, ignore TCP packet.
if (!session_) return err;
bool is_stun = srs_is_stun((uint8_t*)pkt, nb_pkt);
bool is_rtp_or_rtcp = srs_is_rtp_or_rtcp((uint8_t*)pkt, nb_pkt);
bool is_rtcp = srs_is_rtcp((uint8_t*)pkt, nb_pkt);
if(!is_stun && !session_) {
srs_warn("rtc tcp received a mess pkt. %d[%s]", nb_pkt, srs_string_dumps_hex(pkt, nb_pkt, 8).c_str());
return err;
}
// When got any packet, the session is alive now.
session_->alive();
if (session_) {
// When got any packet, the session is alive now.
session_->alive();
}
if(is_stun) {
if (is_stun) {
SrsStunPacket ping;
if ((err = ping.decode(pkt, nb_pkt)) != srs_success) {
return srs_error_wrap(err, "decode stun packet failed");
}
if (!session_) {
session_ = dynamic_cast<SrsRtcConnection*>(_srs_rtc_manager->find_by_name(ping.get_username()));
}
if (session_) {
session_->switch_to_context();
}
srs_trace("recv stun packet from %s:%d, use-candidate=%d, ice-controlled=%d, ice-controlling=%d",
ip_.c_str(), port_, ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling());
// TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it.
if (!session_) {
return srs_error_new(ERROR_RTC_STUN, "no session, stun username=%s",
ping.get_username().c_str());
}
// For each binding request, update the TCP socket.
if (ping.is_binding_request()) {
session_->tcp()->update_sendonly_socket(skt_);
session_->tcp()->set_peer_id(ip_, port_);
}
return session_->tcp()->on_stun(&ping, pkt, nb_pkt);
}
// For DTLS, RTCP or RTP, which does not support peer address changing.
if (!session_) {
return srs_error_new(ERROR_RTC_STUN, "no session peer=%s:%d", ip_.c_str(), port_);
}
// Note that we don't(except error) switch to the context of session, for performance issue.
if (is_rtp_or_rtcp && !is_rtcp) {
err = session_->tcp()->on_rtp(pkt, nb_pkt);
if (err != srs_success) {
return srs_error_wrap(err, "rtc tcp rtp");
}
return err;
return session_->tcp()->on_rtp(pkt, nb_pkt);
}
if (is_rtp_or_rtcp && is_rtcp) {
return session_->tcp()->on_rtcp(pkt, nb_pkt);
}
if (srs_is_dtls((uint8_t*)pkt, nb_pkt)) {
srs_trace("receive a dtls pkt");
return session_->tcp()->on_dtls(pkt, nb_pkt);
}
return srs_error_new(ERROR_RTC_UDP, "unknown packet");
}
void SrsRtcTcpConn::on_before_dispose(ISrsResource* c)
{
if(!session_ || disposing_) {
return;
}
if (!session_) return;
SrsRtcConnection *conn = dynamic_cast<SrsRtcConnection*>(c);
SrsRtcConnection* conn = dynamic_cast<SrsRtcConnection*>(c);
if(conn == session_) {
session_ = NULL;
// the related rtc connection will be disposed
srs_trace("RTC: tcp conn diposing, because of rtc connection");
session_ = NULL;
disposing_ = true;
trd_->interrupt();
}
}