1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

WebRTC: Support config, listener and SDP for TCP transport.

This commit is contained in:
winlin 2022-09-02 22:39:19 +08:00
parent 424713a31d
commit 770d959148
14 changed files with 767 additions and 386 deletions

View file

@ -47,6 +47,7 @@ using namespace std;
#include <srs_app_http_hooks.hpp>
#include <srs_protocol_kbps.hpp>
#include <srs_kernel_kbps.hpp>
#include <srs_app_rtc_network.hpp>
SrsPps* _srs_pps_sstuns = NULL;
SrsPps* _srs_pps_srtcps = NULL;
@ -75,9 +76,9 @@ ISrsRtcTransport::~ISrsRtcTransport()
{
}
SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s)
SrsSecurityTransport::SrsSecurityTransport(ISrsRtcNetwork* s)
{
session_ = s;
network_ = s;
dtls_ = new SrsDtls((ISrsDtlsCallback*)this);
srtp_ = new SrsSRTP();
@ -111,7 +112,7 @@ srs_error_t SrsSecurityTransport::write_dtls_data(void* data, int size)
++_srs_pps_sstuns->sugar;
if ((err = session_->sendonly_skt->sendto(data, size, 0)) != srs_success) {
if ((err = network_->write(data, size, NULL)) != srs_success) {
return srs_error_wrap(err, "send dtls packet");
}
@ -129,7 +130,7 @@ srs_error_t SrsSecurityTransport::on_dtls(char* data, int nb_data)
srs_error_t SrsSecurityTransport::on_dtls_alert(std::string type, std::string desc)
{
return session_->on_dtls_alert(type, desc);
return network_->on_dtls_alert(type, desc);
}
srs_error_t SrsSecurityTransport::on_dtls_handshake_done()
@ -148,7 +149,7 @@ srs_error_t SrsSecurityTransport::on_dtls_handshake_done()
return srs_error_wrap(err, "srtp init");
}
return session_->on_connection_established();
return network_->on_connection_established();
}
srs_error_t SrsSecurityTransport::on_dtls_application_data(const char* buf, const int nb_buf)
@ -198,7 +199,7 @@ srs_error_t SrsSecurityTransport::unprotect_rtcp(void* packet, int* nb_plaintext
return srtp_->unprotect_rtcp(packet, nb_plaintext);
}
SrsSemiSecurityTransport::SrsSemiSecurityTransport(SrsRtcConnection* s) : SrsSecurityTransport(s)
SrsSemiSecurityTransport::SrsSemiSecurityTransport(ISrsRtcNetwork* s) : SrsSecurityTransport(s)
{
}
@ -216,9 +217,9 @@ srs_error_t SrsSemiSecurityTransport::protect_rtcp(void* packet, int* nb_cipher)
return srs_success;
}
SrsPlaintextTransport::SrsPlaintextTransport(SrsRtcConnection* s)
SrsPlaintextTransport::SrsPlaintextTransport(ISrsRtcNetwork* s)
{
session_ = s;
network_ = s;
}
SrsPlaintextTransport::~SrsPlaintextTransport()
@ -248,7 +249,7 @@ srs_error_t SrsPlaintextTransport::on_dtls_alert(std::string type, std::string d
srs_error_t SrsPlaintextTransport::on_dtls_handshake_done()
{
srs_trace("RTC: DTLS handshake done.");
return session_->on_connection_established();
return network_->on_connection_established();
}
srs_error_t SrsPlaintextTransport::on_dtls_application_data(const char* data, const int len)
@ -434,11 +435,6 @@ SrsRtcPlayStream::~SrsRtcPlayStream()
session_->server_->exec_async_work(new SrsRtcAsyncCallOnStop(cid_, req_));
}
// TODO: FIXME: Should not do callback in de-constructor?
if (_srs_rtc_hijacker) {
_srs_rtc_hijacker->on_stop_play(session_, this, req_);
}
_srs_config->unsubscribe(this);
srs_freep(nack_epp);
@ -600,12 +596,6 @@ srs_error_t SrsRtcPlayStream::start()
return srs_error_wrap(err, "start pli worker");
}
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_play(session_, this, req_)) != srs_success) {
return srs_error_wrap(err, "on start play");
}
}
is_started = true;
return err;
@ -649,12 +639,6 @@ srs_error_t SrsRtcPlayStream::cycle()
SrsErrorPithyPrint* epp = new SrsErrorPithyPrint();
SrsAutoFree(SrsErrorPithyPrint, epp);
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_consume(session_, this, req_, consumer)) != srs_success) {
return srs_error_wrap(err, "on start consuming");
}
}
while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "rtc sender thread");
@ -1105,14 +1089,6 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
source->on_unpublish();
}
// TODO: FIXME: Should not do callback in de-constructor?
// NOTE: on_stop_publish lead to switch io,
// it must be called after source stream unpublish (set source stream is_created=false).
// if not, it lead to republish failed.
if (_srs_rtc_hijacker) {
_srs_rtc_hijacker->on_stop_publish(session_, this, req_);
}
for (int i = 0; i < (int)video_tracks_.size(); ++i) {
SrsRtcVideoRecvTrack* track = video_tracks_.at(i);
srs_freep(track);
@ -1248,12 +1224,6 @@ srs_error_t SrsRtcPublishStream::start()
return srs_error_wrap(err, "start pli worker");
}
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req_)) != srs_success) {
return srs_error_wrap(err, "on start publish");
}
}
is_started = true;
return err;
@ -1383,7 +1353,7 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
// Decrypt the cipher to plaintext RTP data.
char* plaintext = data;
int nb_plaintext = nb_data;
if ((err = session_->transport_->unprotect_rtp(plaintext, &nb_plaintext)) != srs_success) {
if ((err = session_->network_->unprotect_rtp(plaintext, &nb_plaintext)) != srs_success) {
// We try to decode the RTP header for more detail error informations.
SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true);
srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding.
@ -1467,12 +1437,6 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket*& pkt, SrsBuff
return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc);
}
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req_, pkt)) != srs_success) {
return srs_error_wrap(err, "on rtp packet");
}
}
// If circuit-breaker is enabled, disable nack.
if (_srs_circuit_breaker->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar;
@ -1760,14 +1724,6 @@ void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp& n
}
}
ISrsRtcConnectionHijacker::ISrsRtcConnectionHijacker()
{
}
ISrsRtcConnectionHijacker::~ISrsRtcConnectionHijacker()
{
}
SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(SrsRtcConnection* p) : p_(p)
{
_srs_hybrid->timer20ms()->subscribe(this);
@ -1811,11 +1767,9 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
{
req_ = NULL;
cid_ = cid;
hijacker_ = NULL;
sendonly_skt = NULL;
server_ = s;
transport_ = new SrsSecurityTransport(this);
network_ = new SrsRtcNetwork(this);
cache_iov_ = new iovec();
cache_iov_->iov_base = new char[kRtpPacketSize];
@ -1829,9 +1783,7 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
twcc_id_ = 0;
nn_simulate_player_nack_drop = 0;
pp_address_change = new SrsErrorPithyPrint();
pli_epp = new SrsErrorPithyPrint();
delta_ = new SrsEphemeralDelta();
nack_enabled_ = false;
timer_nack_ = new SrsRtcConnectionNackTimer(this);
@ -1861,13 +1813,8 @@ SrsRtcConnection::~SrsRtcConnection()
players_.clear();
players_ssrc_map_.clear();
// Note that we should never delete the sendonly_skt,
// it's just point to the object in peer_addresses_.
map<string, SrsUdpMuxSocket*>::iterator it;
for (it = peer_addresses_.begin(); it != peer_addresses_.end(); ++it) {
SrsUdpMuxSocket* addr = it->second;
srs_freep(addr);
}
// Free network over UDP or TCP.
srs_freep(network_);
if (true) {
char* iov_base = (char*)cache_iov_->iov_base;
@ -1876,11 +1823,8 @@ SrsRtcConnection::~SrsRtcConnection()
}
srs_freep(cache_buffer_);
srs_freep(transport_);
srs_freep(req_);
srs_freep(pp_address_change);
srs_freep(pli_epp);
srs_freep(delta_);
}
void SrsRtcConnection::on_before_dispose(ISrsResource* c)
@ -1943,22 +1887,9 @@ string SrsRtcConnection::username()
return username_;
}
vector<SrsUdpMuxSocket*> SrsRtcConnection::peer_addresses()
{
vector<SrsUdpMuxSocket*> addresses;
map<string, SrsUdpMuxSocket*>::iterator it;
for (it = peer_addresses_.begin(); it != peer_addresses_.end(); ++it) {
SrsUdpMuxSocket* addr = it->second;
addresses.push_back(addr);
}
return addresses;
}
ISrsKbpsDelta* SrsRtcConnection::delta()
{
return delta_;
return network_->delta();
}
const SrsContextId& SrsRtcConnection::get_id()
@ -2034,12 +1965,6 @@ srs_error_t SrsRtcConnection::add_player(SrsRtcUserConfig* ruc, SrsSdp& local_sd
SrsRequest* req = ruc->req_;
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_before_play(this, req)) != srs_success) {
return srs_error_wrap(err, "before play");
}
}
std::map<uint32_t, SrsRtcTrackDescription*> play_sub_relations;
if ((err = negotiate_play_capability(ruc, play_sub_relations)) != srs_success) {
return srs_error_wrap(err, "play negotiate");
@ -2084,17 +2009,8 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
username_ = username;
req_ = r->copy();
if (!srtp) {
srs_freep(transport_);
if (dtls) {
transport_ = new SrsSemiSecurityTransport(this);
} else {
transport_ = new SrsPlaintextTransport(this);
}
}
SrsSessionConfig* cfg = &local_sdp.session_negotiate_;
if ((err = transport_->initialize(cfg)) != srs_success) {
if ((err = network_->initialize(cfg, dtls, srtp)) != srs_success) {
return srs_error_wrap(err, "init");
}
@ -2111,26 +2027,19 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
return err;
}
srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r)
srs_error_t SrsRtcConnection::on_stun(SrsStunPacket* r, char* data, int nb_data)
{
srs_error_t err = srs_success;
// Update stat when we received data.
delta_->add_delta(skt->size(), 0);
// Write STUN messages to blackhole.
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(data, nb_data);
}
if (!r->is_binding_request()) {
return err;
}
// We are running in the ice-lite(server) mode. If client have multi network interface,
// we only choose one candidate pair which is determined by client.
update_sendonly_socket(skt);
// Write STUN messages to blackhole.
if (_srs_blackhole->blackhole) {
_srs_blackhole->sendto(skt->data(), skt->size());
}
if ((err = on_binding_request(r)) != srs_success) {
return srs_error_wrap(err, "stun binding request failed");
}
@ -2140,21 +2049,15 @@ srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r)
srs_error_t SrsRtcConnection::on_dtls(char* data, int nb_data)
{
// Update stat when we received data.
delta_->add_delta(nb_data, 0);
return transport_->on_dtls(data, nb_data);
return network_->on_dtls(data, nb_data);
}
srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data)
{
srs_error_t err = srs_success;
// Update stat when we received data.
delta_->add_delta(nb_data, 0);
int nb_unprotected_buf = nb_data;
if ((err = transport_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) {
if ((err = network_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "rtcp unprotect");
}
@ -2282,18 +2185,10 @@ srs_error_t SrsRtcConnection::on_rtcp_feedback_remb(SrsRtcpPsfbCommon *rtcp)
return srs_success;
}
void SrsRtcConnection::set_hijacker(ISrsRtcConnectionHijacker* h)
{
hijacker_ = h;
}
srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
{
srs_error_t err = srs_success;
// Update stat when we received data.
delta_->add_delta(nb_data, 0);
SrsRtcPublishStream* publisher = NULL;
if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) {
return srs_error_wrap(err, "find");
@ -2368,12 +2263,6 @@ srs_error_t SrsRtcConnection::on_connection_established()
}
}
if (hijacker_) {
if ((err = hijacker_->on_dtls_done()) != srs_success) {
return srs_error_wrap(err, "hijack on dtls done");
}
}
return err;
}
@ -2393,39 +2282,6 @@ srs_error_t SrsRtcConnection::on_dtls_alert(std::string type, std::string desc)
return err;
}
srs_error_t SrsRtcConnection::start_play(string stream_uri)
{
srs_error_t err = srs_success;
map<string, SrsRtcPlayStream*>::iterator it = players_.find(stream_uri);
if(it == players_.end()) {
return srs_error_new(ERROR_RTC_NO_PLAYER, "not subscribe %s", stream_uri.c_str());
}
SrsRtcPlayStream* player = it->second;
if ((err = player->start()) != srs_success) {
return srs_error_wrap(err, "start");
}
return err;
}
srs_error_t SrsRtcConnection::start_publish(std::string stream_uri)
{
srs_error_t err = srs_success;
map<string, SrsRtcPublishStream*>::iterator it = publishers_.find(stream_uri);
if(it == publishers_.end()) {
return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no %s publisher", stream_uri.c_str());
}
if ((err = it->second->start()) != srs_success) {
return srs_error_wrap(err, "start");
}
return err;
}
bool SrsRtcConnection::is_alive()
{
return last_stun_time + session_timeout > srs_get_system_time();
@ -2436,52 +2292,9 @@ void SrsRtcConnection::alive()
last_stun_time = srs_get_system_time();
}
void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt)
SrsRtcUdpNetwork* SrsRtcConnection::udp()
{
// TODO: FIXME: Refine performance.
string prev_peer_id, peer_id = skt->peer_id();
if (sendonly_skt) {
prev_peer_id = sendonly_skt->peer_id();
}
// Ignore if same address.
if (prev_peer_id == peer_id) {
return;
}
// Find object from cache.
SrsUdpMuxSocket* addr_cache = NULL;
if (true) {
map<string, SrsUdpMuxSocket*>::iterator it = peer_addresses_.find(peer_id);
if (it != peer_addresses_.end()) {
addr_cache = it->second;
}
}
// Show address change log.
if (prev_peer_id.empty()) {
srs_trace("RTC: session address init %s", peer_id.c_str());
} else {
uint32_t nn = 0;
if (pp_address_change->can_print(skt->get_peer_port(), &nn)) {
srs_trace("RTC: session address change %s -> %s, cached=%d, nn_change=%u/%u, nn_address=%u", prev_peer_id.c_str(),
peer_id.c_str(), (addr_cache? 1:0), pp_address_change->nn_count, nn, peer_addresses_.size());
}
}
// If no cache, build cache and setup the relations in connection.
if (!addr_cache) {
peer_addresses_[peer_id] = addr_cache = skt->copy_sendonly();
_srs_rtc_manager->add_with_id(peer_id, this);
uint64_t fast_id = skt->fast_id();
if (fast_id) {
_srs_rtc_manager->add_with_fast_id(fast_id, this);
}
}
// Update the transport.
sendonly_skt = addr_cache;
return network_->udp();
}
srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data)
@ -2490,15 +2303,12 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data)
++_srs_pps_srtcps->sugar;
// Update stat when we sending data.
delta_->add_delta(0, nb_data);
int nb_buf = nb_data;
if ((err = transport_->protect_rtcp(data, &nb_buf)) != srs_success) {
if ((err = network_->protect_rtcp(data, &nb_buf)) != srs_success) {
return srs_error_wrap(err, "protect rtcp");
}
if ((err = sendonly_skt->sendto(data, nb_buf, 0)) != srs_success) {
if ((err = network_->write(data, nb_buf, NULL)) != srs_success) {
return srs_error_wrap(err, "send");
}
@ -2682,7 +2492,7 @@ srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket* pkt)
// Cipher RTP to SRTP packet.
if (true) {
int nn_encrypt = (int)iov->iov_len;
if ((err = transport_->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) {
if ((err = network_->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) {
return srs_error_wrap(err, "srtp protect");
}
iov->iov_len = (size_t)nn_encrypt;
@ -2697,11 +2507,11 @@ srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket* pkt)
++_srs_pps_srtps->sugar;
// Update stat when we sending data.
delta_->add_delta(0, iov->iov_len);
// TODO: FIXME: Handle error.
sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0);
if ((err = network_->write(iov->iov_base, iov->iov_len, NULL)) != srs_success) {
srs_warn("RTC: Write %d bytes err %s", iov->iov_len, srs_error_desc(err).c_str());
srs_freep(err);
return err;
}
// Detail log, should disable it in release version.
srs_info("RTC: SEND PT=%u, SSRC=%#x, SEQ=%u, Time=%u, %u/%u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(),
@ -2764,17 +2574,14 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r)
stun_binding_response.set_remote_ufrag(r->get_local_ufrag());
stun_binding_response.set_transcation_id(r->get_transcation_id());
// FIXME: inet_addr is deprecated, IPV6 support
stun_binding_response.set_mapped_address(be32toh(inet_addr(sendonly_skt->get_peer_ip().c_str())));
stun_binding_response.set_mapped_port(sendonly_skt->get_peer_port());
stun_binding_response.set_mapped_address(be32toh(inet_addr(network_->get_peer_ip().c_str())));
stun_binding_response.set_mapped_port(network_->get_peer_port());
if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) {
return srs_error_wrap(err, "stun binding response encode failed");
}
// Update stat when we sending data.
delta_->add_delta(0, stream->pos());
if ((err = sendonly_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) {
if ((err = network_->write(stream->data(), stream->pos(), NULL)) != srs_success) {
return srs_error_wrap(err, "stun binding response send failed");
}
@ -2783,7 +2590,7 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r)
// TODO: FIXME: Add cost.
srs_trace("RTC: session STUN done, waiting DTLS handshake.");
if((err = transport_->start_active_handshake()) != srs_success) {
if((err = network_->start_active_handshake()) != srs_success) {
return srs_error_wrap(err, "fail to dtls handshake");
}
}
@ -3646,12 +3453,6 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcSourceDesc
}
}
if (_srs_rtc_hijacker) {
if ((err = _srs_rtc_hijacker->on_create_publish(this, publisher, req)) != srs_success) {
return srs_error_wrap(err, "on create publish");
}
}
// If DTLS done, start the publisher. Because maybe create some publishers after DTLS done.
// For example, for single PC, we maybe start publisher when create it, because DTLS is done.
if(ESTABLISHED == state()) {
@ -3663,13 +3464,3 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcSourceDesc
return err;
}
ISrsRtcHijacker::ISrsRtcHijacker()
{
}
ISrsRtcHijacker::~ISrsRtcHijacker()
{
}
ISrsRtcHijacker* _srs_rtc_hijacker = NULL;