From 7692e589ed6f8a2c3d9722ea2d4aa248c993e753 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 27 Apr 2020 09:35:50 +0800 Subject: [PATCH] For RTC publisher, support black-hole --- trunk/conf/full.conf | 10 +++ trunk/src/app/srs_app_config.cpp | 48 ++++++++++++++- trunk/src/app/srs_app_config.hpp | 2 + trunk/src/app/srs_app_rtc_conn.cpp | 98 +++++++++++++++++++++++++----- trunk/src/app/srs_app_rtc_conn.hpp | 5 ++ 5 files changed, 146 insertions(+), 17 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index bb852f43e..4a8ea1aa1 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -461,6 +461,16 @@ rtc_server { # then system queue is 2000*4 = 8k, user can incrase reuseport to incrase the queue. # default: 2000 queue_length 2000; + # The black-hole to copy packet to, for debugging. + # For example, when debugging Chrome publish stream, the received packets are encrypted cipher, + # we can set the publisher black-hole, SRS will copy the plaintext packets to black-hole, and + # we are able to capture the plaintext packets by wireshark. + black_hole { + # Whether enable the black-hole. + enabled off; + # The black-hole address for publisher, or SRS as receiver. + publisher 127.0.0.1:10000; + } } vhost rtc.vhost.srs.com { diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 01db3abc0..cd1e04b96 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3645,7 +3645,7 @@ srs_error_t SrsConfig::check_normal_config() string n = conf->at(i)->name; if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa" && n != "sendmmsg" && n != "encrypt" && n != "reuseport" && n != "gso" && n != "merge_nalus" - && n != "padding" && n != "perf_stat" && n != "queue_length") { + && n != "padding" && n != "perf_stat" && n != "queue_length" && n != "black_hole") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str()); } } @@ -4742,7 +4742,7 @@ std::string SrsConfig::get_rtc_server_candidates() return DEFAULT; } - return (conf->arg0().c_str()); + return conf->arg0(); } bool SrsConfig::get_rtc_server_ecdsa() @@ -4943,6 +4943,50 @@ int SrsConfig::get_rtc_server_queue_length() return ::atoi(conf->arg0().c_str()); } +bool SrsConfig::get_rtc_server_black_hole() +{ + static bool DEFAULT = false; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("black_hole"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("enabled"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +std::string SrsConfig::get_rtc_server_black_hole_publisher() +{ + static string DEFAULT = ""; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("black_hole"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("publisher"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return conf->arg0(); +} + SrsConfDirective* SrsConfig::get_rtc(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 04552c3be..ab9ac5ac2 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -534,6 +534,8 @@ public: virtual int get_rtc_server_padding(); virtual bool get_rtc_server_perf_stat(); virtual int get_rtc_server_queue_length(); + virtual bool get_rtc_server_black_hole(); + virtual std::string get_rtc_server_black_hole_publisher(); private: virtual int get_rtc_server_reuseport2(); virtual bool get_rtc_server_gso2(); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 256ef3cf5..83cb0e384 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -62,6 +62,7 @@ using namespace std; #include #include #include +#include // The RTP payload max size, reserved some paddings for SRTP as such: // kRtpPacketSize = kRtpMaxPayloadSize + paddings @@ -290,12 +291,18 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* skt) } } - if (out_bio_len) { + if (out_bio_len) { if ((err = skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) { return srs_error_wrap(err, "send dtls packet"); } } + if (rtc_session->blackhole && rtc_session->blackhole_addr && rtc_session->blackhole_stfd) { + // Ignore any error for black-hole. + void* p = out_bio_data; int len = out_bio_len; SrsRtcSession* s = rtc_session; + srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + return err; } @@ -314,7 +321,13 @@ srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* skt) return srs_error_new(ERROR_OpenSslBIOWrite, "BIO_write"); } - if (! handshake_done) { + if (rtc_session->blackhole && rtc_session->blackhole_addr && rtc_session->blackhole_stfd) { + // Ignore any error for black-hole. + void* p = skt->data(); int len = skt->size(); SrsRtcSession* s = rtc_session; + srs_sendto(s->blackhole_stfd, p, len, (sockaddr*)s->blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + + if (!handshake_done) { err = handshake(skt); } else { while (BIO_ctrl_pending(bio_in) > 0) { @@ -360,7 +373,7 @@ srs_error_t SrsDtlsSession::srtp_initialize() unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp"; - if (! SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) { + if (!SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) { return srs_error_new(ERROR_RTC_SRTP_INIT, "SSL_export_keying_material failed"); } @@ -1962,7 +1975,7 @@ srs_error_t SrsRtcPublisher::collect_audio_frame() rtp_audio_queue->get_and_clean_collected_frames(frames); for (size_t i = 0; i < frames.size(); ++i) { - if (! frames[i].empty()) { + if (!frames[i].empty()) { srs_verbose("collect %d audio frames, seq range %u,%u", frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence()); } @@ -1983,7 +1996,7 @@ srs_error_t SrsRtcPublisher::collect_video_frame() rtp_video_queue->get_and_clean_collected_frames(frames); for (size_t i = 0; i < frames.size(); ++i) { - if (! frames[i].empty()) { + if (!frames[i].empty()) { srs_verbose("collect %d video frames, seq range %u,%u", frames.size(), frames[i].front()->rtp_header.get_sequence(), frames[i].back()->rtp_header.get_sequence()); @@ -2011,13 +2024,13 @@ srs_error_t SrsRtcPublisher::collect_video_frame() if (rtp_h264_header->nalu_type != kFuA) { if ((p[0] & kNalTypeMask) == SrsAvcNaluTypeSPS) { string cur_sps = string((char*)p, rtp_h264_header->nalu_offset[j].second); - if (! cur_sps.empty() && sps != cur_sps) { + if (!cur_sps.empty() && sps != cur_sps) { video_header_change = true; sps = cur_sps; } } else if ((p[0] & kNalTypeMask) == SrsAvcNaluTypePPS) { string cur_pps = string((char*)p, rtp_h264_header->nalu_offset[j].second); - if (! cur_pps.empty() && pps != cur_pps) { + if (!cur_pps.empty() && pps != cur_pps) { video_header_change = true; pps = cur_pps; } @@ -2159,6 +2172,10 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string& session_state = INIT; last_stun_time = 0; sessionStunTimeout = 0; + + blackhole = false; + blackhole_addr = NULL; + blackhole_stfd = NULL; } SrsRtcSession::~SrsRtcSession() @@ -2167,6 +2184,8 @@ SrsRtcSession::~SrsRtcSession() srs_freep(publisher); srs_freep(dtls_session); srs_freep(req); + srs_close_stfd(blackhole_stfd); + srs_freep(blackhole_addr); } void SrsRtcSession::set_local_sdp(const SrsSdp& sdp) @@ -2191,6 +2210,30 @@ srs_error_t SrsRtcSession::initialize() sessionStunTimeout = _srs_config->get_rtc_stun_timeout(req->vhost); last_stun_time = srs_get_system_time(); + blackhole = _srs_config->get_rtc_server_black_hole(); + + srs_trace("RTC init session, timeout=%dms, blackhole=%d", srsu2msi(sessionStunTimeout), blackhole); + + if (blackhole) { + string blackhole_ep = _srs_config->get_rtc_server_black_hole_publisher(); + if (!blackhole_ep.empty()) { + string host; int port; + srs_parse_hostport(blackhole_ep, host, port); + + srs_freep(blackhole_addr); + blackhole_addr = new sockaddr_in(); + blackhole_addr->sin_family = AF_INET; + blackhole_addr->sin_addr.s_addr = inet_addr(host.c_str()); + blackhole_addr->sin_port = htons(port); + + int fd = socket(AF_INET, SOCK_DGRAM, 0); + blackhole_stfd = srs_netfd_open_socket(fd); + srs_assert(blackhole_stfd); + + srs_trace("RTC blackhole %s:%d, fd=%d", host.c_str(), port, fd); + } + } + return err; } @@ -2274,6 +2317,19 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacke srs_trace("rtc session=%s, STUN done, waitting DTLS handshake.", id().c_str()); } + // Write STUN messages to blackhole. + if (blackhole && blackhole_addr && blackhole_stfd) { + // Ignore any error for black-hole. + void* p = skt->data(); int len = skt->size(); + srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + + if (blackhole && blackhole_addr && blackhole_stfd) { + // Ignore any error for black-hole. + void* p = stream->data(); int len = stream->pos(); + srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + return err; } @@ -2334,7 +2390,7 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock uint16_t mask = 0x01; for (int i = 1; i < 16 && blp; ++i, mask <<= 1) { - if (! (blp & mask)) { + if (!(blp & mask)) { continue; } @@ -2343,7 +2399,7 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock // TODO: FIXME: Support ARQ. (void)loss_seq; SrsRtpSharedPacket* pkt = NULL; // source->find_rtp_packet(loss_seq); - if (! pkt) { + if (!pkt) { continue; } @@ -2506,14 +2562,14 @@ srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* skt) srs_trace("rtc session=%s, to=%dms connection established", id().c_str(), srsu2msi(sessionStunTimeout)); - if (! local_sdp.media_descs_.empty() && + if (!local_sdp.media_descs_.empty() && (local_sdp.media_descs_.back().recvonly_ || local_sdp.media_descs_.back().sendrecv_)) { if ((err = start_publish(skt)) != srs_success) { return srs_error_wrap(err, "start publish"); } } - if (! local_sdp.media_descs_.empty() && + if (!local_sdp.media_descs_.empty() && (local_sdp.media_descs_.back().sendonly_ || local_sdp.media_descs_.back().sendrecv_)) { if ((err = start_play(skt)) != srs_success) { return srs_error_wrap(err, "start play"); @@ -2568,11 +2624,11 @@ srs_error_t SrsRtcSession::start_publish(SrsUdpMuxSocket* skt) for (size_t i = 0; i < remote_sdp.media_descs_.size(); ++i) { const SrsMediaDesc& media_desc = remote_sdp.media_descs_[i]; if (media_desc.is_audio()) { - if (! media_desc.ssrc_infos_.empty()) { + if (!media_desc.ssrc_infos_.empty()) { audio_ssrc = media_desc.ssrc_infos_[0].ssrc_; } } else if (media_desc.is_video()) { - if (! media_desc.ssrc_infos_.empty()) { + if (!media_desc.ssrc_infos_.empty()) { video_ssrc = media_desc.ssrc_infos_[0].ssrc_; } } @@ -2610,6 +2666,12 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* skt) return srs_error_wrap(err, "rtcp unprotect failed"); } + if (blackhole && blackhole_addr && blackhole_stfd) { + // Ignore any error for black-hole. + void* p = unprotected_buf; int len = nb_unprotected_buf; + srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + char* ph = unprotected_buf; int nb_left = nb_unprotected_buf; while (nb_left) { @@ -2683,12 +2745,18 @@ srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* skt) return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done"); } - char* unprotected_buf = new char[1460]; + char* unprotected_buf = new char[kRtpPacketSize]; int nb_unprotected_buf = skt->size(); if ((err = dtls_session->unprotect_rtp(unprotected_buf, skt->data(), nb_unprotected_buf)) != srs_success) { return srs_error_wrap(err, "rtp unprotect failed"); } + if (blackhole && blackhole_addr && blackhole_stfd) { + // Ignore any error for black-hole. + void* p = unprotected_buf; int len = nb_unprotected_buf; + srs_sendto(blackhole_stfd, p, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT); + } + return publisher->on_rtp(skt, unprotected_buf, nb_unprotected_buf); } @@ -3110,7 +3178,7 @@ srs_error_t SrsRtcServer::create_rtc_session( local_ufrag = gen_random_str(8); username = local_ufrag + ":" + remote_sdp.get_ice_ufrag(); - if (! map_username_session.count(username)) + if (!map_username_session.count(username)) break; } diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 0797a0274..1793b49a4 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -305,6 +305,7 @@ public: class SrsRtcSession { + friend class SrsDtlsSession; friend class SrsRtcSenderThread; friend class SrsRtcPublisher; private: @@ -327,6 +328,10 @@ private: bool encrypt; // The timeout of session, keep alive by STUN ping pong. srs_utime_t sessionStunTimeout; +private: + bool blackhole; + sockaddr_in* blackhole_addr; + srs_netfd_t blackhole_stfd; public: SrsRequest* req; SrsSource* source;