// // Copyright (c) 2013-2024 The SRS Authors // // SPDX-License-Identifier: MIT // #ifndef SRS_APP_RTC_CONN_HPP #define SRS_APP_RTC_CONN_HPP #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include class SrsUdpMuxSocket; class SrsLiveConsumer; class SrsStunPacket; class SrsRtcServer; class SrsRtcConnection; class SrsSharedPtrMessage; class SrsRtcSource; class SrsRtpPacket; class ISrsCodec; class SrsRtpNackForReceiver; class SrsRtpIncommingVideoFrame; class SrsRtpRingBuffer; class SrsRtcConsumer; class SrsRtcAudioSendTrack; class SrsRtcVideoSendTrack; class SrsErrorPithyPrint; class SrsPithyPrint; class SrsStatistic; class SrsRtcUserConfig; class SrsRtcSendTrack; class SrsRtcPublishStream; class SrsEphemeralDelta; class SrsRtcNetworks; class SrsRtcUdpNetwork; class ISrsRtcNetwork; class SrsRtcTcpNetwork; const uint8_t kSR = 200; const uint8_t kRR = 201; const uint8_t kSDES = 202; const uint8_t kBye = 203; const uint8_t kApp = 204; // @see: https://tools.ietf.org/html/rfc4585#section-6.1 const uint8_t kRtpFb = 205; const uint8_t kPsFb = 206; const uint8_t kXR = 207; // The transport for RTC connection. class ISrsRtcTransport : public ISrsDtlsCallback { public: ISrsRtcTransport(); virtual ~ISrsRtcTransport(); public: virtual srs_error_t initialize(SrsSessionConfig* cfg) = 0; virtual srs_error_t start_active_handshake() = 0; virtual srs_error_t on_dtls(char* data, int nb_data) = 0; virtual srs_error_t on_dtls_alert(std::string type, std::string desc) = 0; public: // Encrypt the packet(paintext) to cipher, which is aso the packet ptr. // The nb_cipher should be initialized to the size of cipher, with some paddings. virtual srs_error_t protect_rtp(void* packet, int* nb_cipher) = 0; virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher) = 0; // Decrypt the packet(cipher) to plaintext, which is also the packet ptr. // The nb_plaintext should be initialized to the size of cipher. virtual srs_error_t unprotect_rtp(void* packet, int* nb_plaintext) = 0; virtual srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext) = 0; }; // The security transport, use DTLS/SRTP to protect the data. class SrsSecurityTransport : public ISrsRtcTransport { private: ISrsRtcNetwork* network_; SrsDtls* dtls_; SrsSRTP* srtp_; bool handshake_done; public: SrsSecurityTransport(ISrsRtcNetwork* s); virtual ~SrsSecurityTransport(); srs_error_t initialize(SrsSessionConfig* cfg); // When play role of dtls client, it send handshake. srs_error_t start_active_handshake(); srs_error_t on_dtls(char* data, int nb_data); srs_error_t on_dtls_alert(std::string type, std::string desc); public: // Encrypt the packet(paintext) to cipher, which is aso the packet ptr. // The nb_cipher should be initialized to the size of cipher, with some paddings. srs_error_t protect_rtp(void* packet, int* nb_cipher); srs_error_t protect_rtcp(void* packet, int* nb_cipher); // Decrypt the packet(cipher) to plaintext, which is also the packet ptr. // The nb_plaintext should be initialized to the size of cipher. srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); // implement ISrsDtlsCallback public: virtual srs_error_t on_dtls_handshake_done(); virtual srs_error_t on_dtls_application_data(const char* data, const int len); virtual srs_error_t write_dtls_data(void* data, int size); private: srs_error_t srtp_initialize(); }; // Semi security transport, setup DTLS and SRTP, with SRTP decrypt, without SRTP encrypt. class SrsSemiSecurityTransport : public SrsSecurityTransport { public: SrsSemiSecurityTransport(ISrsRtcNetwork* s); virtual ~SrsSemiSecurityTransport(); public: srs_error_t protect_rtp(void* packet, int* nb_cipher); srs_error_t protect_rtcp(void* packet, int* nb_cipher); srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); }; // Plaintext transport, without DTLS or SRTP. class SrsPlaintextTransport : public ISrsRtcTransport { private: ISrsRtcNetwork* network_; public: SrsPlaintextTransport(ISrsRtcNetwork* s); virtual ~SrsPlaintextTransport(); public: virtual srs_error_t initialize(SrsSessionConfig* cfg); virtual srs_error_t start_active_handshake(); virtual srs_error_t on_dtls(char* data, int nb_data); virtual srs_error_t on_dtls_alert(std::string type, std::string desc); virtual srs_error_t on_dtls_handshake_done(); virtual srs_error_t on_dtls_application_data(const char* data, const int len); virtual srs_error_t write_dtls_data(void* data, int size); public: srs_error_t protect_rtp(void* packet, int* nb_cipher); srs_error_t protect_rtcp(void* packet, int* nb_cipher); srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); }; // The handler for PLI worker coroutine. class ISrsRtcPLIWorkerHandler { public: ISrsRtcPLIWorkerHandler(); virtual ~ISrsRtcPLIWorkerHandler(); public: virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid) = 0; }; // A worker coroutine to request the PLI. class SrsRtcPLIWorker : public ISrsCoroutineHandler { private: SrsCoroutine* trd_; srs_cond_t wait_; ISrsRtcPLIWorkerHandler* handler_; private: // Key is SSRC, value is the CID of subscriber which requests PLI. std::map plis_; public: SrsRtcPLIWorker(ISrsRtcPLIWorkerHandler* h); virtual ~SrsRtcPLIWorker(); public: virtual srs_error_t start(); virtual void request_keyframe(uint32_t ssrc, SrsContextId cid); // interface ISrsCoroutineHandler public: virtual srs_error_t cycle(); }; // the rtc on_stop async call. class SrsRtcAsyncCallOnStop : public ISrsAsyncCallTask { private: SrsContextId cid; SrsRequest* req; public: SrsRtcAsyncCallOnStop(SrsContextId c, SrsRequest* r); virtual ~SrsRtcAsyncCallOnStop(); public: virtual srs_error_t call(); virtual std::string to_string(); }; // A RTC play stream, client pull and play stream from SRS. class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler , public ISrsRtcPLIWorkerHandler, public ISrsRtcSourceChangeCallback { private: SrsContextId cid_; SrsFastCoroutine* trd_; SrsRtcConnection* session_; SrsRtcPLIWorker* pli_worker_; private: SrsRequest* req_; SrsSharedPtr source_; // key: publish_ssrc, value: send track to process rtp/rtcp std::map audio_tracks_; std::map video_tracks_; // The pithy print for special stage. SrsErrorPithyPrint* nack_epp; private: // Fast cache for tracks. uint32_t cache_ssrc0_; uint32_t cache_ssrc1_; uint32_t cache_ssrc2_; SrsRtcSendTrack* cache_track0_; SrsRtcSendTrack* cache_track1_; SrsRtcSendTrack* cache_track2_; private: // For merged-write messages. int mw_msgs; bool realtime; // Whether enabled nack. bool nack_enabled_; bool nack_no_copy_; private: // Whether player started. bool is_started; public: SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid); virtual ~SrsRtcPlayStream(); public: srs_error_t initialize(SrsRequest* request, std::map sub_relations); // Interface ISrsRtcSourceChangeCallback public: void on_stream_change(SrsRtcSourceDescription* desc); // interface ISrsReloadHandler public: virtual srs_error_t on_reload_vhost_play(std::string vhost); virtual srs_error_t on_reload_vhost_realtime(std::string vhost); virtual const SrsContextId& context_id(); public: virtual srs_error_t start(); virtual void stop(); public: virtual srs_error_t cycle(); private: srs_error_t send_packet(SrsRtpPacket*& pkt); public: // Directly set the status of track, generally for init to set the default value. void set_all_tracks_status(bool status); public: srs_error_t on_rtcp(SrsRtcpCommon* rtcp); private: srs_error_t on_rtcp_xr(SrsRtcpXr* rtcp); srs_error_t on_rtcp_nack(SrsRtcpNack* rtcp); srs_error_t on_rtcp_ps_feedback(SrsRtcpFbCommon* rtcp); srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp); uint32_t get_video_publish_ssrc(uint32_t play_ssrc); // Interface ISrsRtcPLIWorkerHandler public: virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); }; // A fast timer for publish stream, for RTCP feedback. class SrsRtcPublishRtcpTimer : public ISrsFastTimer { private: SrsRtcPublishStream* p_; public: SrsRtcPublishRtcpTimer(SrsRtcPublishStream* p); virtual ~SrsRtcPublishRtcpTimer(); // interface ISrsFastTimer private: srs_error_t on_timer(srs_utime_t interval); }; // A fast timer for publish stream, for TWCC feedback. class SrsRtcPublishTwccTimer : public ISrsFastTimer { private: SrsRtcPublishStream* p_; public: SrsRtcPublishTwccTimer(SrsRtcPublishStream* p); virtual ~SrsRtcPublishTwccTimer(); // interface ISrsFastTimer private: srs_error_t on_timer(srs_utime_t interval); }; // the rtc on_unpublish async call. class SrsRtcAsyncCallOnUnpublish : public ISrsAsyncCallTask { private: SrsContextId cid; SrsRequest* req; public: SrsRtcAsyncCallOnUnpublish(SrsContextId c, SrsRequest* r); virtual ~SrsRtcAsyncCallOnUnpublish(); public: virtual srs_error_t call(); virtual std::string to_string(); }; // A RTC publish stream, client push and publish stream to SRS. class SrsRtcPublishStream : public ISrsRtspPacketDecodeHandler , public ISrsRtcPublishStream, public ISrsRtcPLIWorkerHandler { private: friend class SrsRtcPublishRtcpTimer; friend class SrsRtcPublishTwccTimer; SrsRtcPublishRtcpTimer* timer_rtcp_; SrsRtcPublishTwccTimer* timer_twcc_; private: SrsContextId cid_; uint64_t nn_audio_frames; SrsRtcPLIWorker* pli_worker_; SrsErrorPithyPrint* twcc_epp_; private: SrsRtcConnection* session_; uint16_t pt_to_drop_; // Whether enabled nack. bool nack_enabled_; bool nack_no_copy_; bool twcc_enabled_; private: bool request_keyframe_; SrsErrorPithyPrint* pli_epp; private: SrsRequest* req_; SrsSharedPtr source_; // Simulators. int nn_simulate_nack_drop; private: // track vector std::vector audio_tracks_; std::vector video_tracks_; private: int twcc_id_; uint8_t twcc_fb_count_; SrsRtcpTWCC rtcp_twcc_; SrsRtpExtensionTypes extension_types_; bool is_started; srs_utime_t last_time_send_twcc_; public: SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid); virtual ~SrsRtcPublishStream(); public: srs_error_t initialize(SrsRequest* req, SrsRtcSourceDescription* stream_desc); srs_error_t start(); // Directly set the status of track, generally for init to set the default value. void set_all_tracks_status(bool status); virtual const SrsContextId& context_id(); private: srs_error_t send_rtcp_rr(); srs_error_t send_rtcp_xr_rrtr(); public: srs_error_t on_rtp_cipher(char* buf, int nb_buf); srs_error_t on_rtp_plaintext(char* buf, int nb_buf); private: srs_error_t do_on_rtp_plaintext(SrsRtpPacket*& pkt, SrsBuffer* buf); public: srs_error_t check_send_nacks(); public: virtual void on_before_decode_payload(SrsRtpPacket* pkt, SrsBuffer* buf, ISrsRtpPayloader** ppayload, SrsRtspPacketPayloadType* ppt); private: srs_error_t send_periodic_twcc(); public: srs_error_t on_rtcp(SrsRtcpCommon* rtcp); private: srs_error_t on_rtcp_sr(SrsRtcpSR* rtcp); srs_error_t on_rtcp_xr(SrsRtcpXr* rtcp); public: void request_keyframe(uint32_t ssrc, SrsContextId cid); virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); public: void simulate_nack_drop(int nn); private: void simulate_drop_packet(SrsRtpHeader* h, int nn_bytes); private: srs_error_t on_twcc(uint16_t sn); SrsRtcAudioRecvTrack* get_audio_track(uint32_t ssrc); SrsRtcVideoRecvTrack* get_video_track(uint32_t ssrc); void update_rtt(uint32_t ssrc, int rtt); void update_send_report_time(uint32_t ssrc, const SrsNtp& ntp, uint32_t rtp_time); }; // A fast timer for conntion, for NACK feedback. class SrsRtcConnectionNackTimer : public ISrsFastTimer { private: SrsRtcConnection* p_; public: SrsRtcConnectionNackTimer(SrsRtcConnection* p); virtual ~SrsRtcConnectionNackTimer(); // interface ISrsFastTimer private: srs_error_t on_timer(srs_utime_t interval); }; // A RTC Peer Connection, SDP level object. // // For performance, we use non-public from resource, // see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, public ISrsExpire { friend class SrsSecurityTransport; friend class SrsRtcPlayStream; friend class SrsRtcPublishStream; private: friend class SrsRtcConnectionNackTimer; SrsRtcConnectionNackTimer* timer_nack_; public: bool disposing_; private: SrsRtcServer* server_; private: iovec* cache_iov_; SrsBuffer* cache_buffer_; private: // key: stream id std::map players_; //key: player track's ssrc std::map players_ssrc_map_; // key: stream id std::map publishers_; // key: publisher track's ssrc std::map publishers_ssrc_map_; private: // The local:remote username, such as m5x0n128:jvOm where local name is m5x0n128. std::string username_; // The random token to verify the WHIP DELETE request etc. std::string token_; // A group of networks, each has its own DTLS and SRTP context. SrsRtcNetworks* networks_; private: // TODO: FIXME: Rename it. // The timeout of session, keep alive by STUN ping pong. srs_utime_t session_timeout; // TODO: FIXME: Rename it. srs_utime_t last_stun_time; private: // For each RTC session, we use a specified cid for debugging logs. SrsContextId cid_; SrsRequest* req_; SrsSdp remote_sdp; SrsSdp local_sdp; private: // twcc handler int twcc_id_; // Simulators. int nn_simulate_player_nack_drop; // Pithy print for PLI request. SrsErrorPithyPrint* pli_epp; private: bool nack_enabled_; public: SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid); virtual ~SrsRtcConnection(); // interface ISrsDisposingHandler public: virtual void on_before_dispose(ISrsResource* c); virtual void on_disposing(ISrsResource* c); public: // TODO: FIXME: save only connection info. SrsSdp* get_local_sdp(); void set_local_sdp(const SrsSdp& sdp); SrsSdp* get_remote_sdp(); void set_remote_sdp(const SrsSdp& sdp); // Change network to waiting stun state. void set_state_as_waiting_stun(); // Get username pair for this connection, used as ID of session. std::string username(); // Get the token for verify this session, for example, when delete session by WHIP API. std::string token(); public: virtual ISrsKbpsDelta* delta(); // Interface ISrsResource. public: virtual const SrsContextId& get_id(); virtual std::string desc(); // Interface ISrsExpire. public: virtual void expire(); public: void switch_to_context(); const SrsContextId& context_id(); public: srs_error_t add_publisher(SrsRtcUserConfig* ruc, SrsSdp& local_sdp); srs_error_t add_player(SrsRtcUserConfig* ruc, SrsSdp& local_sdp); public: // Before initialize, user must set the local SDP, which is used to inititlize DTLS. srs_error_t initialize(SrsRequest* r, bool dtls, bool srtp, std::string username); srs_error_t on_rtp_cipher(char* data, int nb_data); srs_error_t on_rtp_plaintext(char* data, int nb_data); private: // Decode the RTP header from buf, find the publisher by SSRC. srs_error_t find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher); public: srs_error_t on_rtcp(char* data, int nb_data); private: srs_error_t dispatch_rtcp(SrsRtcpCommon* rtcp); public: srs_error_t on_rtcp_feedback_twcc(char* buf, int nb_buf); srs_error_t on_rtcp_feedback_remb(SrsRtcpFbCommon *rtcp); public: srs_error_t on_dtls_handshake_done(); srs_error_t on_dtls_alert(std::string type, std::string desc); bool is_alive(); void alive(); public: SrsRtcUdpNetwork* udp(); SrsRtcTcpNetwork* tcp(); public: // send rtcp srs_error_t send_rtcp(char *data, int nb_data); void check_send_nacks(SrsRtpNackForReceiver* nack, uint32_t ssrc, uint32_t& sent_nacks, uint32_t& timeout_nacks); srs_error_t send_rtcp_rr(uint32_t ssrc, SrsRtpRingBuffer* rtp_queue, const uint64_t& last_send_systime, const SrsNtp& last_send_ntp); srs_error_t send_rtcp_xr_rrtr(uint32_t ssrc); srs_error_t send_rtcp_fb_pli(uint32_t ssrc, const SrsContextId& cid_of_subscriber); public: // Simulate the NACK to drop nn packets. void simulate_nack_drop(int nn); void simulate_player_drop_packet(SrsRtpHeader* h, int nn_bytes); srs_error_t do_send_packet(SrsRtpPacket* pkt); // Directly set the status of play track, generally for init to set the default value. void set_all_tracks_status(std::string stream_uri, bool is_publish, bool status); public: // Notify by specified network. srs_error_t on_binding_request(SrsStunPacket* r, std::string& ice_pwd); private: // publish media capabilitiy negotiate srs_error_t negotiate_publish_capability(SrsRtcUserConfig* ruc, SrsRtcSourceDescription* stream_desc); srs_error_t generate_publish_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, bool unified_plan, bool audio_before_video); srs_error_t generate_publish_local_sdp_for_audio(SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc); srs_error_t generate_publish_local_sdp_for_video(SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, bool unified_plan); // play media capabilitiy negotiate //TODO: Use StreamDescription to negotiate and remove first negotiate_play_capability function srs_error_t negotiate_play_capability(SrsRtcUserConfig* ruc, std::map& sub_relations); srs_error_t generate_play_local_sdp(SrsRequest* req, SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, bool unified_plan, bool audio_before_video); srs_error_t generate_play_local_sdp_for_audio(SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, std::string cname, std::string stream_id); srs_error_t generate_play_local_sdp_for_video(SrsSdp& local_sdp, SrsRtcSourceDescription* stream_desc, bool unified_plan, std::string cname, std::string stream_id); srs_error_t create_player(SrsRequest* request, std::map sub_relations); srs_error_t create_publisher(SrsRequest* request, SrsRtcSourceDescription* stream_desc); }; #endif