1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SmartPtr: Use shared ptr in RTC TCP connection. v6.0.127 (#4083)

Fix issue https://github.com/ossrs/srs/issues/3784

---

Co-authored-by: Jacob Su <suzp1984@gmail.com>
This commit is contained in:
Winlin 2024-06-13 16:04:31 +08:00 committed by GitHub
parent 7b9c52b283
commit 242152bd6b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 95 additions and 76 deletions

View file

@ -436,7 +436,7 @@ srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite)
return sendonly_skt_->sendto(buf, size, SRS_UTIME_NO_TIMEOUT);
}
SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta)
SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta) : owner_(new SrsRtcTcpConn())
{
conn_ = conn;
delta_ = delta;
@ -444,11 +444,11 @@ SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* de
transport_ = new SrsSecurityTransport(this);
peer_port_ = 0;
state_ = SrsRtcNetworkStateInit;
owner_ = NULL;
}
SrsRtcTcpNetwork::~SrsRtcTcpNetwork()
{
owner_->interrupt();
srs_freep(transport_);
}
@ -694,36 +694,54 @@ void SrsRtcTcpNetwork::dispose()
#define SRS_RTC_TCP_PACKET_MAX 1500
SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm)
SrsRtcTcpConn::SrsRtcTcpConn()
{
wrapper_ = NULL;
owner_coroutine_ = NULL;
owner_cid_ = NULL;
cid_ = _srs_context->get_id();
pkt_ = NULL;
delta_ = NULL;
skt_ = NULL;
}
SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port) : SrsRtcTcpConn()
{
manager_ = cm;
ip_ = cip;
port_ = port;
skt_ = skt;
delta_ = new SrsNetworkDelta();
delta_->set_io(skt_, skt_);
trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id());
session_ = NULL;
pkt_ = new char[SRS_RTC_TCP_PACKET_MAX];
_srs_rtc_manager->subscribe(this);
}
SrsRtcTcpConn::~SrsRtcTcpConn()
{
_srs_rtc_manager->unsubscribe(this);
trd_->interrupt();
srs_freep(trd_);
srs_freepa(pkt_);
srs_freep(delta_);
srs_freep(skt_);
}
void SrsRtcTcpConn::setup_owner(SrsSharedResource<SrsRtcTcpConn>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid)
{
wrapper_ = wrapper;
owner_coroutine_ = owner_coroutine;
owner_cid_ = owner_cid;
}
ISrsKbpsDelta* SrsRtcTcpConn::delta()
{
return delta_;
}
void SrsRtcTcpConn::interrupt()
{
session_ = NULL;
if (owner_coroutine_) owner_coroutine_->interrupt();
}
std::string SrsRtcTcpConn::desc()
{
return "Tcp";
@ -731,7 +749,7 @@ std::string SrsRtcTcpConn::desc()
const SrsContextId& SrsRtcTcpConn::get_id()
{
return trd_->cid();
return cid_;
}
std::string SrsRtcTcpConn::remote_ip()
@ -739,9 +757,9 @@ std::string SrsRtcTcpConn::remote_ip()
return ip_;
}
srs_error_t SrsRtcTcpConn::start()
void SrsRtcTcpConn::on_executor_done(ISrsInterruptable* executor)
{
return trd_->start();
owner_coroutine_ = NULL;
}
srs_error_t SrsRtcTcpConn::cycle()
@ -752,15 +770,10 @@ srs_error_t SrsRtcTcpConn::cycle()
SrsStatistic::instance()->on_disconnect(get_id().c_str(), err);
SrsStatistic::instance()->kbps_add_delta(get_id().c_str(), delta_);
// TODO: FIXME: Should manage RTC TCP connection by _srs_rtc_manager.
// Because we use manager to manage this object, not the http connection object, so we must remove it here.
manager_->remove(this);
// TODO: FIXME: When TCP connection(transport) closed, should notify session to dispose, should not free them simultaneously.
// Only remove session when network is established, because client might use other UDP network.
if(session_ && session_->tcp()->is_establelished()) {
session_->tcp()->set_state(SrsRtcNetworkStateClosed);
_srs_rtc_manager->remove(session_);
session_->expire();
}
// For HTTP-API timeout, we think it's done successfully,
@ -801,13 +814,18 @@ srs_error_t SrsRtcTcpConn::do_cycle()
{
srs_error_t err = srs_success;
// Update all context id to cid of session.
_srs_context->set_id(cid_);
owner_cid_->set_cid(cid_);
if((err = handshake()) != srs_success) {
return srs_error_wrap(err, "process rtc tcp pkt");
}
// TODO: FIXME: Handle all bytes of TCP Connection.
while(true) {
if((err = trd_->pull()) != srs_success) {
if (!owner_coroutine_) return err;
if ((err = owner_coroutine_->pull()) != srs_success) {
return srs_error_wrap(err, "rtc tcp conn");
}
@ -859,11 +877,11 @@ srs_error_t SrsRtcTcpConn::handshake()
// Should support only one TCP candidate.
SrsRtcTcpNetwork* network = dynamic_cast<SrsRtcTcpNetwork*>(session->tcp());
if (!network->owner()) {
network->set_owner(this);
if (network->owner().get() != this) {
network->set_owner(*wrapper_);
session_ = session;
}
if (network->owner() != this) {
if (network->owner().get() != this) {
return srs_error_new(ERROR_RTC_TCP_UNIQUE, "only support one network");
}
@ -939,21 +957,3 @@ srs_error_t SrsRtcTcpConn::on_tcp_pkt(char* pkt, int nb_pkt)
return srs_error_new(ERROR_RTC_UDP, "unknown packet");
}
void SrsRtcTcpConn::on_before_dispose(ISrsResource* c)
{
if (!session_) return;
SrsRtcConnection* conn = dynamic_cast<SrsRtcConnection*>(c);
if(conn == session_) {
session_ = NULL;
// the related rtc connection will be disposed
srs_trace("RTC: tcp conn diposing, because of rtc connection");
trd_->interrupt();
}
}
void SrsRtcTcpConn::on_disposing(ISrsResource* c)
{
return;
}