mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Refine the logs.
This commit is contained in:
parent
5b6c9df785
commit
bf4b973093
5 changed files with 155 additions and 113 deletions
|
@ -77,7 +77,7 @@ func (v *httpAPI) Run(ctx context.Context) error {
|
|||
// The WebRTC WHIP API handler.
|
||||
logger.Df(ctx, "Handle /rtc/v1/whip/ by %v", addr)
|
||||
mux.HandleFunc("/rtc/v1/whip/", func(w http.ResponseWriter, r *http.Request) {
|
||||
if err := v.rtc.HandleWHIP(ctx, w, r); err != nil {
|
||||
if err := v.rtc.HandleApiForWHIP(ctx, w, r); err != nil {
|
||||
apiError(ctx, w, r, err)
|
||||
}
|
||||
})
|
||||
|
@ -85,7 +85,7 @@ func (v *httpAPI) Run(ctx context.Context) error {
|
|||
// The WebRTC WHEP API handler.
|
||||
logger.Df(ctx, "Handle /rtc/v1/whep/ by %v", addr)
|
||||
mux.HandleFunc("/rtc/v1/whep/", func(w http.ResponseWriter, r *http.Request) {
|
||||
if err := v.rtc.HandleWHEP(ctx, w, r); err != nil {
|
||||
if err := v.rtc.HandleApiForWHEP(ctx, w, r); err != nil {
|
||||
apiError(ctx, w, r, err)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -103,10 +103,10 @@ func (v *httpServer) Run(ctx context.Context) error {
|
|||
return
|
||||
}
|
||||
|
||||
stream, _ := srsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, NewHLSStreaming(func(v *HLSStreaming) {
|
||||
v.SRSProxyBackendHLSID = logger.GenerateContextID()
|
||||
v.StreamURL, v.FullURL = streamURL, fullURL
|
||||
v.BuildContext(ctx)
|
||||
stream, _ := srsLoadBalancer.LoadOrStoreHLS(ctx, streamURL, NewHLSPlayStream(func(s *HLSPlayStream) {
|
||||
s.SRSProxyBackendHLSID = logger.GenerateContextID()
|
||||
s.StreamURL, s.FullURL = streamURL, fullURL
|
||||
s.Initialize(ctx)
|
||||
}))
|
||||
|
||||
stream.ServeHTTP(w, r)
|
||||
|
@ -121,14 +121,14 @@ func (v *httpServer) Run(ctx context.Context) error {
|
|||
if stream, err := srsLoadBalancer.LoadHLSBySPBHID(ctx, srsProxyBackendID); err != nil {
|
||||
http.Error(w, fmt.Sprintf("load stream by spbhid %v", srsProxyBackendID), http.StatusBadRequest)
|
||||
} else {
|
||||
stream.ServeHTTP(w, r)
|
||||
stream.Initialize(ctx).ServeHTTP(w, r)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Use HTTP pseudo streaming to proxy the request.
|
||||
NewHTTPStreaming(func(streaming *HTTPStreaming) {
|
||||
streaming.ctx = ctx
|
||||
NewHTTPFlvTsConnection(func(c *HTTPFlvTsConnection) {
|
||||
c.ctx = ctx
|
||||
}).ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
@ -155,20 +155,26 @@ func (v *httpServer) Run(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type HTTPStreaming struct {
|
||||
// HTTPFlvTsConnection is an HTTP pseudo streaming connection, such as an HTTP-FLV or HTTP-TS
|
||||
// connection. There is no state need to be sync between proxy servers.
|
||||
//
|
||||
// When we got an HTTP FLV or TS request, we will parse the stream URL from the HTTP request,
|
||||
// then proxy to the corresponding backend server. All state is in the HTTP request, so this
|
||||
// connection is stateless.
|
||||
type HTTPFlvTsConnection struct {
|
||||
// The context for HTTP streaming.
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewHTTPStreaming(opts ...func(streaming *HTTPStreaming)) *HTTPStreaming {
|
||||
v := &HTTPStreaming{}
|
||||
func NewHTTPFlvTsConnection(opts ...func(*HTTPFlvTsConnection)) *HTTPFlvTsConnection {
|
||||
v := &HTTPFlvTsConnection{}
|
||||
for _, opt := range opts {
|
||||
opt(v)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
ctx := logger.WithContext(v.ctx)
|
||||
|
||||
|
@ -179,7 +185,7 @@ func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
func (v *HTTPFlvTsConnection) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
// Always allow CORS for all requests.
|
||||
if ok := apiCORS(ctx, w, r); ok {
|
||||
return nil
|
||||
|
@ -207,7 +213,7 @@ func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *htt
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error {
|
||||
func (v *HTTPFlvTsConnection) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error {
|
||||
// Parse HTTP port from backend.
|
||||
if len(backend.HTTP) == 0 {
|
||||
return errors.Errorf("no http stream server")
|
||||
|
@ -255,7 +261,14 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite
|
|||
return nil
|
||||
}
|
||||
|
||||
type HLSStreaming struct {
|
||||
// HLSPlayStream is an HLS stream proxy, which represents the stream level object. This means multiple HLS
|
||||
// clients will share this object, and they use the same ctx among proxy servers.
|
||||
//
|
||||
// Unlike the HTTP FLV or TS connection, HLS client may request the m3u8 or ts via different HTTP connections.
|
||||
// Especially for requesting ts, we need to identify the stream URl or backend server for it. So we create
|
||||
// the spbhid which can be seen as the hash of stream URL or backend server. The spbhid enable us to convert
|
||||
// to the stream URL and then query the backend server to serve it.
|
||||
type HLSPlayStream struct {
|
||||
// The context for HLS streaming.
|
||||
ctx context.Context
|
||||
// The context ID for recovering the context.
|
||||
|
@ -269,22 +282,28 @@ type HLSStreaming struct {
|
|||
FullURL string `json:"full_url"`
|
||||
}
|
||||
|
||||
func NewHLSStreaming(opts ...func(streaming *HLSStreaming)) *HLSStreaming {
|
||||
v := &HLSStreaming{}
|
||||
func NewHLSPlayStream(opts ...func(*HLSPlayStream)) *HLSPlayStream {
|
||||
v := &HLSPlayStream{}
|
||||
for _, opt := range opts {
|
||||
opt(v)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *HLSStreaming) BuildContext(ctx context.Context) {
|
||||
func (v *HLSPlayStream) Initialize(ctx context.Context) *HLSPlayStream {
|
||||
if v.ctx != nil && v.ContextID != "" {
|
||||
return v
|
||||
}
|
||||
|
||||
if v.ContextID == "" {
|
||||
v.ContextID = logger.GenerateContextID()
|
||||
}
|
||||
v.ctx = logger.WithContextID(ctx, v.ContextID)
|
||||
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *HLSStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
|
||||
if err := v.serve(v.ctx, w, r); err != nil {
|
||||
|
@ -295,7 +314,7 @@ func (v *HLSStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func (v *HLSStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
func (v *HLSPlayStream) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
ctx, streamURL, fullURL := v.ctx, v.StreamURL, v.FullURL
|
||||
|
||||
// Always allow CORS for all requests.
|
||||
|
@ -316,7 +335,7 @@ func (v *HLSStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *HLSStreaming) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error {
|
||||
func (v *HLSPlayStream) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer) error {
|
||||
// Parse HTTP port from backend.
|
||||
if len(backend.HTTP) == 0 {
|
||||
return errors.Errorf("no rtmp server")
|
||||
|
|
141
proxy/rtc.go
141
proxy/rtc.go
|
@ -52,7 +52,7 @@ func (v *rtcServer) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *rtcServer) HandleWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
func (v *rtcServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
defer r.Body.Close()
|
||||
ctx = logger.WithContext(ctx)
|
||||
|
||||
|
@ -82,14 +82,14 @@ func (v *rtcServer) HandleWHIP(ctx context.Context, w http.ResponseWriter, r *ht
|
|||
return errors.Wrapf(err, "pick backend for %v", streamURL)
|
||||
}
|
||||
|
||||
if err = v.serveByBackend(ctx, w, r, backend, string(remoteSDPOffer), streamURL); err != nil {
|
||||
if err = v.proxyApiToBackend(ctx, w, r, backend, string(remoteSDPOffer), streamURL); err != nil {
|
||||
return errors.Wrapf(err, "serve %v with %v by backend %+v", fullURL, streamURL, backend)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *rtcServer) HandleWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
func (v *rtcServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
defer r.Body.Close()
|
||||
ctx = logger.WithContext(ctx)
|
||||
|
||||
|
@ -119,14 +119,17 @@ func (v *rtcServer) HandleWHEP(ctx context.Context, w http.ResponseWriter, r *ht
|
|||
return errors.Wrapf(err, "pick backend for %v", streamURL)
|
||||
}
|
||||
|
||||
if err = v.serveByBackend(ctx, w, r, backend, string(remoteSDPOffer), streamURL); err != nil {
|
||||
if err = v.proxyApiToBackend(ctx, w, r, backend, string(remoteSDPOffer), streamURL); err != nil {
|
||||
return errors.Wrapf(err, "serve %v with %v by backend %+v", fullURL, streamURL, backend)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *rtcServer) serveByBackend(ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer, remoteSDPOffer string, streamURL string) error {
|
||||
func (v *rtcServer) proxyApiToBackend(
|
||||
ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer,
|
||||
remoteSDPOffer string, streamURL string,
|
||||
) error {
|
||||
// Parse HTTP port from backend.
|
||||
if len(backend.API) == 0 {
|
||||
return errors.Errorf("no http api server")
|
||||
|
@ -198,9 +201,12 @@ func (v *rtcServer) serveByBackend(ctx context.Context, w http.ResponseWriter, r
|
|||
RemoteICEUfrag: remoteICEUfrag, RemoteICEPwd: remoteICEPwd,
|
||||
LocalICEUfrag: localICEUfrag, LocalICEPwd: localICEPwd,
|
||||
}
|
||||
if _, err := srsLoadBalancer.LoadOrStoreWebRTC(ctx, streamURL, icePair.Ufrag(), NewRTCStreaming(func(s *RTCConnection) {
|
||||
s.StreamURL, s.listenerUDP = streamURL, v.listener
|
||||
s.BuildContext(ctx)
|
||||
if err := srsLoadBalancer.StoreWebRTC(ctx, streamURL, NewRTCConnection(func(c *RTCConnection) {
|
||||
c.StreamURL, c.Ufrag = streamURL, icePair.Ufrag()
|
||||
c.Initialize(ctx, v.listener)
|
||||
|
||||
// Cache the connection for fast search by username.
|
||||
v.usernames.Store(c.Ufrag, c)
|
||||
})); err != nil {
|
||||
return errors.Wrapf(err, "load or store webrtc %v", streamURL)
|
||||
}
|
||||
|
@ -210,7 +216,7 @@ func (v *rtcServer) serveByBackend(ctx context.Context, w http.ResponseWriter, r
|
|||
return errors.Wrapf(err, "write local sdp answer %v", localSDPAnswer)
|
||||
}
|
||||
|
||||
logger.Df(ctx, "Response local answer %vB with ice-ufrag=%v, ice-pwd=%vB",
|
||||
logger.Df(ctx, "Create WebRTC connection with local answer %vB with ice-ufrag=%v, ice-pwd=%vB",
|
||||
len(localSDPAnswer), localICEUfrag, len(localICEPwd))
|
||||
return nil
|
||||
}
|
||||
|
@ -244,12 +250,12 @@ func (v *rtcServer) Run(ctx context.Context) error {
|
|||
n, addr, err := listener.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
// TODO: If WebRTC server closed unexpectedly, we should notice the main loop to quit.
|
||||
logger.Wf(ctx, "read from udp failed, err=%v", err)
|
||||
logger.Wf(ctx, "read from udp failed, err=%+v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := v.handleClientUDP(ctx, addr, buf[:n]); err != nil {
|
||||
logger.Wf(ctx, "handle udp %vB failed, addr=%v, err=%v", n, addr, err)
|
||||
logger.Wf(ctx, "handle udp %vB failed, addr=%v, err=%+v", n, addr, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -258,7 +264,7 @@ func (v *rtcServer) Run(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (v *rtcServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error {
|
||||
var stream *RTCConnection
|
||||
var connection *RTCConnection
|
||||
|
||||
// If STUN binding request, parse the ufrag and identify the connection.
|
||||
if err := func() error {
|
||||
|
@ -271,58 +277,69 @@ func (v *rtcServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data
|
|||
return errors.Wrapf(err, "unmarshal stun packet")
|
||||
}
|
||||
|
||||
// Search the stream in fast cache.
|
||||
// Search the connection in fast cache.
|
||||
if s, ok := v.usernames.Load(pkt.Username); ok {
|
||||
stream = s
|
||||
connection = s
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load stream by username.
|
||||
// Load connection by username.
|
||||
if s, err := srsLoadBalancer.LoadWebRTCByUfrag(ctx, pkt.Username); err != nil {
|
||||
return errors.Wrapf(err, "load webrtc by ufrag %v", pkt.Username)
|
||||
} else {
|
||||
stream = s
|
||||
connection = s.Initialize(ctx, v.listener)
|
||||
logger.Df(ctx, "Create WebRTC connection by ufrag=%v, stream=%v", pkt.Username, connection.StreamURL)
|
||||
}
|
||||
|
||||
// Cache stream for fast search.
|
||||
if stream != nil {
|
||||
v.usernames.Store(pkt.Username, stream)
|
||||
// Cache connection for fast search.
|
||||
if connection != nil {
|
||||
v.usernames.Store(pkt.Username, connection)
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Search the stream by addr.
|
||||
// Search the connection by addr.
|
||||
if s, ok := v.addresses.Load(addr.String()); ok {
|
||||
stream = s
|
||||
} else if stream != nil {
|
||||
connection = s
|
||||
} else if connection != nil {
|
||||
// Cache the address for fast search.
|
||||
v.addresses.Store(addr.String(), stream)
|
||||
v.addresses.Store(addr.String(), connection)
|
||||
}
|
||||
|
||||
// If stream is not found, ignore the packet.
|
||||
if stream == nil {
|
||||
// If connection is not found, ignore the packet.
|
||||
if connection == nil {
|
||||
// TODO: Should logging the dropped packet, only logging the first one for each address.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Proxy the packet to backend.
|
||||
if err := stream.Proxy(addr, data); err != nil {
|
||||
return errors.Wrapf(err, "proxy %vB for %v", len(data), stream.StreamURL)
|
||||
if err := connection.HandlePacket(addr, data); err != nil {
|
||||
return errors.Wrapf(err, "proxy %vB for %v", len(data), connection.StreamURL)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RTCConnection is a WebRTC connection proxy, for both WHIP and WHEP. It represents a WebRTC
|
||||
// connection, identify by the ufrag in sdp offer/answer and ICE binding request.
|
||||
//
|
||||
// It's not like RTMP or HTTP FLV/TS proxy connection, which are stateless and all state is
|
||||
// in the client request. The RTCConnection is stateful, and need to sync the ufrag between
|
||||
// proxy servers.
|
||||
//
|
||||
// The media transport is UDP, which is also a special thing for WebRTC. So if the client switch
|
||||
// to another UDP address, it may connect to another WebRTC proxy, then we should discover the
|
||||
// RTCConnection by the ufrag from the ICE binding request.
|
||||
type RTCConnection struct {
|
||||
// The stream context for WebRTC streaming.
|
||||
ctx context.Context
|
||||
// The context ID for recovering the context.
|
||||
ContextID string `json:"cid"`
|
||||
|
||||
// The stream URL in vhost/app/stream schema.
|
||||
StreamURL string `json:"stream_url"`
|
||||
// The ufrag for this WebRTC connection.
|
||||
Ufrag string `json:"ufrag"`
|
||||
|
||||
// The UDP connection proxy to backend.
|
||||
backendUDP *net.UDPConn
|
||||
|
@ -332,7 +349,7 @@ type RTCConnection struct {
|
|||
listenerUDP *net.UDPConn
|
||||
}
|
||||
|
||||
func NewRTCStreaming(opts ...func(*RTCConnection)) *RTCConnection {
|
||||
func NewRTCConnection(opts ...func(*RTCConnection)) *RTCConnection {
|
||||
v := &RTCConnection{}
|
||||
for _, opt := range opts {
|
||||
opt(v)
|
||||
|
@ -340,7 +357,15 @@ func NewRTCStreaming(opts ...func(*RTCConnection)) *RTCConnection {
|
|||
return v
|
||||
}
|
||||
|
||||
func (v *RTCConnection) Proxy(addr *net.UDPAddr, data []byte) error {
|
||||
func (v *RTCConnection) Initialize(ctx context.Context, listener *net.UDPConn) *RTCConnection {
|
||||
v.ctx = logger.WithContext(ctx)
|
||||
if listener != nil {
|
||||
v.listenerUDP = listener
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *RTCConnection) HandlePacket(addr *net.UDPAddr, data []byte) error {
|
||||
ctx := v.ctx
|
||||
|
||||
// Update the current UDP address.
|
||||
|
@ -352,10 +377,31 @@ func (v *RTCConnection) Proxy(addr *net.UDPAddr, data []byte) error {
|
|||
}
|
||||
|
||||
// Proxy client message to backend.
|
||||
if v.backendUDP != nil {
|
||||
if _, err := v.backendUDP.Write(data); err != nil {
|
||||
return errors.Wrapf(err, "write to backend %v", v.StreamURL)
|
||||
if v.backendUDP == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Proxy all messages from backend to client.
|
||||
go func() {
|
||||
for ctx.Err() == nil {
|
||||
buf := make([]byte, 4096)
|
||||
n, _, err := v.backendUDP.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Wf(ctx, "read from backend failed, err=%v", err)
|
||||
break
|
||||
}
|
||||
|
||||
if _, err = v.listenerUDP.WriteToUDP(buf[:n], v.clientUDP); err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Wf(ctx, "write to client failed, err=%v", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if _, err := v.backendUDP.Write(data); err != nil {
|
||||
return errors.Wrapf(err, "write to backend %v", v.StreamURL)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -385,6 +431,7 @@ func (v *RTCConnection) connectBackend(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// Connect to backend SRS server via UDP client.
|
||||
// TODO: Support close the connection when timeout or DTLS alert.
|
||||
backendAddr := net.UDPAddr{IP: net.ParseIP(backend.IP), Port: udpPort}
|
||||
if backendUDP, err := net.DialUDP("udp", nil, &backendAddr); err != nil {
|
||||
return errors.Wrapf(err, "dial udp to %v", backendAddr)
|
||||
|
@ -392,35 +439,9 @@ func (v *RTCConnection) connectBackend(ctx context.Context) error {
|
|||
v.backendUDP = backendUDP
|
||||
}
|
||||
|
||||
// Proxy all messages from backend to client.
|
||||
go func() {
|
||||
for ctx.Err() == nil {
|
||||
buf := make([]byte, 4096)
|
||||
n, _, err := v.backendUDP.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Wf(ctx, "read from backend failed, err=%v", err)
|
||||
break
|
||||
}
|
||||
|
||||
if _, err = v.listenerUDP.WriteToUDP(buf[:n], v.clientUDP); err != nil {
|
||||
// TODO: If backend server closed unexpectedly, we should notice the stream to quit.
|
||||
logger.Wf(ctx, "write to client failed, err=%v", err)
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *RTCConnection) BuildContext(ctx context.Context) {
|
||||
if v.ContextID == "" {
|
||||
v.ContextID = logger.GenerateContextID()
|
||||
}
|
||||
v.ctx = logger.WithContextID(ctx, v.ContextID)
|
||||
}
|
||||
|
||||
type RTCICEPair struct {
|
||||
// The remote ufrag, used for ICE username and session id.
|
||||
RemoteICEUfrag string `json:"remote_ufrag"`
|
||||
|
|
|
@ -108,6 +108,12 @@ func (v *rtmpServer) Run(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// RTMPConnection is an RTMP streaming connection. There is no state need to be sync between
|
||||
// proxy servers.
|
||||
//
|
||||
// When we got an RTMP request, we will parse the stream URL from the RTMP publish or play request,
|
||||
// then proxy to the corresponding backend server. All state is in the RTMP request, so this
|
||||
// connection is stateless.
|
||||
type RTMPConnection struct {
|
||||
// The random number generator.
|
||||
rd *rand.Rand
|
||||
|
|
58
proxy/srs.go
58
proxy/srs.go
|
@ -148,11 +148,11 @@ type SRSLoadBalancer interface {
|
|||
// Pick a backend server for the specified stream URL.
|
||||
Pick(ctx context.Context, streamURL string) (*SRSServer, error)
|
||||
// Load or store the HLS streaming for the specified stream URL.
|
||||
LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSStreaming) (*HLSStreaming, error)
|
||||
LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSPlayStream) (*HLSPlayStream, error)
|
||||
// Load the HLS streaming by SPBHID, the SRS Proxy Backend HLS ID.
|
||||
LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSStreaming, error)
|
||||
// Load or store the WebRTC streaming for the specified stream URL.
|
||||
LoadOrStoreWebRTC(ctx context.Context, streamURL, ufrag string, value *RTCConnection) (*RTCConnection, error)
|
||||
LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSPlayStream, error)
|
||||
// Store the WebRTC streaming for the specified stream URL.
|
||||
StoreWebRTC(ctx context.Context, streamURL string, value *RTCConnection) error
|
||||
// Load the WebRTC streaming by ufrag, the ICE username.
|
||||
LoadWebRTCByUfrag(ctx context.Context, ufrag string) (*RTCConnection, error)
|
||||
}
|
||||
|
@ -167,9 +167,9 @@ type srsMemoryLoadBalancer struct {
|
|||
// The picked server to servce client by specified stream URL, key is stream url.
|
||||
picked sync.Map[string, *SRSServer]
|
||||
// The HLS streaming, key is stream URL.
|
||||
hlsStreamURL sync.Map[string, *HLSStreaming]
|
||||
hlsStreamURL sync.Map[string, *HLSPlayStream]
|
||||
// The HLS streaming, key is SPBHID.
|
||||
hlsSPBHID sync.Map[string, *HLSStreaming]
|
||||
hlsSPBHID sync.Map[string, *HLSPlayStream]
|
||||
// The WebRTC streaming, key is stream URL.
|
||||
rtcStreamURL sync.Map[string, *RTCConnection]
|
||||
// The WebRTC streaming, key is ufrag.
|
||||
|
@ -245,7 +245,7 @@ func (v *srsMemoryLoadBalancer) Pick(ctx context.Context, streamURL string) (*SR
|
|||
return server, nil
|
||||
}
|
||||
|
||||
func (v *srsMemoryLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSStreaming, error) {
|
||||
func (v *srsMemoryLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSPlayStream, error) {
|
||||
// Load the HLS streaming for the SPBHID, for TS files.
|
||||
if actual, ok := v.hlsSPBHID.Load(spbhid); !ok {
|
||||
return nil, errors.Errorf("no HLS streaming for SPBHID %v", spbhid)
|
||||
|
@ -254,7 +254,7 @@ func (v *srsMemoryLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid stri
|
|||
}
|
||||
}
|
||||
|
||||
func (v *srsMemoryLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSStreaming) (*HLSStreaming, error) {
|
||||
func (v *srsMemoryLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSPlayStream) (*HLSPlayStream, error) {
|
||||
// Update the HLS streaming for the stream URL, for M3u8.
|
||||
actual, _ := v.hlsStreamURL.LoadOrStore(streamURL, value)
|
||||
if actual == nil {
|
||||
|
@ -263,19 +263,17 @@ func (v *srsMemoryLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL st
|
|||
|
||||
// Update the HLS streaming for the SPBHID, for TS files.
|
||||
v.hlsSPBHID.Store(value.SRSProxyBackendHLSID, actual)
|
||||
|
||||
return actual, nil
|
||||
}
|
||||
|
||||
func (v *srsMemoryLoadBalancer) LoadOrStoreWebRTC(ctx context.Context, streamURL, ufrag string, value *RTCConnection) (*RTCConnection, error) {
|
||||
func (v *srsMemoryLoadBalancer) StoreWebRTC(ctx context.Context, streamURL string, value *RTCConnection) error {
|
||||
// Update the WebRTC streaming for the stream URL.
|
||||
actual, _ := v.rtcStreamURL.LoadOrStore(streamURL, value)
|
||||
if actual == nil {
|
||||
return nil, errors.Errorf("load or store WebRTC streaming for %v failed", streamURL)
|
||||
}
|
||||
v.rtcStreamURL.Store(streamURL, value)
|
||||
|
||||
// Update the WebRTC streaming for the ufrag.
|
||||
v.rtcUfrag.Store(ufrag, value)
|
||||
return nil, nil
|
||||
v.rtcUfrag.Store(value.Ufrag, value)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *srsMemoryLoadBalancer) LoadWebRTCByUfrag(ctx context.Context, ufrag string) (*RTCConnection, error) {
|
||||
|
@ -446,24 +444,23 @@ func (v *srsRedisLoadBalancer) Pick(ctx context.Context, streamURL string) (*SRS
|
|||
return &server, nil
|
||||
}
|
||||
|
||||
func (v *srsRedisLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSStreaming, error) {
|
||||
func (v *srsRedisLoadBalancer) LoadHLSBySPBHID(ctx context.Context, spbhid string) (*HLSPlayStream, error) {
|
||||
key := v.redisKeySPBHID(spbhid)
|
||||
|
||||
actual, err := v.rdb.Get(ctx, key).Bytes()
|
||||
b, err := v.rdb.Get(ctx, key).Bytes()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "get key=%v HLS", key)
|
||||
}
|
||||
|
||||
var actualHLS HLSStreaming
|
||||
if err := json.Unmarshal(actual, &actualHLS); err != nil {
|
||||
return nil, errors.Wrapf(err, "unmarshal key=%v HLS %v", key, string(actual))
|
||||
var actual HLSPlayStream
|
||||
if err := json.Unmarshal(b, &actual); err != nil {
|
||||
return nil, errors.Wrapf(err, "unmarshal key=%v HLS %v", key, string(b))
|
||||
}
|
||||
|
||||
actualHLS.BuildContext(ctx)
|
||||
return &actualHLS, nil
|
||||
return &actual, nil
|
||||
}
|
||||
|
||||
func (v *srsRedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSStreaming) (*HLSStreaming, error) {
|
||||
func (v *srsRedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL string, value *HLSPlayStream) (*HLSPlayStream, error) {
|
||||
b, err := json.Marshal(value)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "marshal HLS %v", value)
|
||||
|
@ -479,22 +476,21 @@ func (v *srsRedisLoadBalancer) LoadOrStoreHLS(ctx context.Context, streamURL str
|
|||
}
|
||||
|
||||
// Query the HLS streaming from redis.
|
||||
actual, err := v.rdb.Get(ctx, key).Bytes()
|
||||
b2, err := v.rdb.Get(ctx, key).Bytes()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "get key=%v HLS", key)
|
||||
}
|
||||
|
||||
var actualHLS HLSStreaming
|
||||
if err := json.Unmarshal(actual, &actualHLS); err != nil {
|
||||
return nil, errors.Wrapf(err, "unmarshal key=%v HLS %v", key, string(actual))
|
||||
var actual HLSPlayStream
|
||||
if err := json.Unmarshal(b2, &actual); err != nil {
|
||||
return nil, errors.Wrapf(err, "unmarshal key=%v HLS %v", key, string(b2))
|
||||
}
|
||||
|
||||
actualHLS.BuildContext(ctx)
|
||||
return &actualHLS, nil
|
||||
return &actual, nil
|
||||
}
|
||||
|
||||
func (v *srsRedisLoadBalancer) LoadOrStoreWebRTC(ctx context.Context, streamURL, ufrag string, value *RTCConnection) (*RTCConnection, error) {
|
||||
return nil, nil
|
||||
func (v *srsRedisLoadBalancer) StoreWebRTC(ctx context.Context, streamURL string, value *RTCConnection) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *srsRedisLoadBalancer) LoadWebRTCByUfrag(ctx context.Context, ufrag string) (*RTCConnection, error) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue