mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Refine HTTP streaming error.
This commit is contained in:
parent
3d5db62132
commit
d053b8101e
1 changed files with 26 additions and 133 deletions
159
proxy/http.go
159
proxy/http.go
|
@ -9,7 +9,6 @@ import (
|
|||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -159,8 +158,6 @@ func (v *httpServer) Run(ctx context.Context) error {
|
|||
type HTTPStreaming struct {
|
||||
// The context for HTTP streaming.
|
||||
ctx context.Context
|
||||
// Whether has written response to client.
|
||||
written bool
|
||||
}
|
||||
|
||||
func NewHTTPStreaming(opts ...func(streaming *HTTPStreaming)) *HTTPStreaming {
|
||||
|
@ -175,62 +172,30 @@ func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
defer r.Body.Close()
|
||||
ctx := logger.WithContext(v.ctx)
|
||||
|
||||
var backendClosedErr, clientClosedErr bool
|
||||
|
||||
handleBackendErr := func(err error) {
|
||||
if isPeerClosedError(err) {
|
||||
if !backendClosedErr {
|
||||
backendClosedErr = true
|
||||
logger.Df(ctx, "HTTP backend peer closed")
|
||||
}
|
||||
} else {
|
||||
logger.Wf(ctx, "HTTP backend err %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
handleClientErr := func(err error) {
|
||||
if isPeerClosedError(err) {
|
||||
if !clientClosedErr {
|
||||
clientClosedErr = true
|
||||
logger.Df(ctx, "HTTP client peer closed")
|
||||
}
|
||||
} else {
|
||||
logger.Wf(ctx, "HTTP client %v err %+v", r.RemoteAddr, err)
|
||||
}
|
||||
}
|
||||
|
||||
handleErr := func(err error) {
|
||||
if perr, ok := err.(*RTMPProxyError); ok {
|
||||
if perr.isBackend {
|
||||
handleBackendErr(perr.err)
|
||||
} else {
|
||||
handleClientErr(perr.err)
|
||||
}
|
||||
} else {
|
||||
handleClientErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := v.serve(ctx, w, r); err != nil {
|
||||
if merr, ok := err.(*RTMPMultipleError); ok {
|
||||
// If multiple errors, handle all of them.
|
||||
for _, err := range merr.errs {
|
||||
handleErr(err)
|
||||
}
|
||||
} else {
|
||||
// If single error, directly handle it.
|
||||
handleErr(err)
|
||||
}
|
||||
|
||||
if !v.written {
|
||||
apiError(ctx, w, r, err)
|
||||
}
|
||||
apiError(ctx, w, r, err)
|
||||
} else {
|
||||
logger.Df(ctx, "HTTP client done")
|
||||
}
|
||||
}
|
||||
|
||||
func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
// Always support CORS. Note that browser may send origin header for m3u8, but no origin header
|
||||
// for ts. So we always response CORS header.
|
||||
if true {
|
||||
// SRS does not need cookie or credentials, so we disable CORS credentials, and use * for CORS origin,
|
||||
// headers, expose headers and methods.
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Headers
|
||||
w.Header().Set("Access-Control-Allow-Headers", "*")
|
||||
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Methods
|
||||
w.Header().Set("Access-Control-Allow-Methods", "*")
|
||||
}
|
||||
if r.Method == http.MethodOptions {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Build the stream URL in vhost/app/stream schema.
|
||||
unifiedURL, fullURL := convertURLToStreamURL(r)
|
||||
logger.Df(ctx, "Got HTTP client from %v for %v", r.RemoteAddr, fullURL)
|
||||
|
@ -247,8 +212,7 @@ func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *htt
|
|||
}
|
||||
|
||||
if err = v.serveByBackend(ctx, w, r, backend, streamURL); err != nil {
|
||||
extraMsg := fmt.Sprintf("serve %v by backend %+v", fullURL, backend)
|
||||
return wrapProxyError(err, extraMsg)
|
||||
return errors.Wrapf(err, "serve %v with %v by backend %+v", fullURL, streamURL, backend)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -267,42 +231,21 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite
|
|||
httpPort = int(iv)
|
||||
}
|
||||
|
||||
// If any goroutine quit, cancel another one.
|
||||
parentCtx := ctx
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-r.Context().Done():
|
||||
// If client request cancelled, cancel the proxy goroutine.
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
||||
// Connect to backend SRS server via HTTP client.
|
||||
backendURL := fmt.Sprintf("http://%v:%v%s", backend.IP, httpPort, r.URL.Path)
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", backendURL, nil)
|
||||
if err != nil {
|
||||
return &RTMPProxyError{true, errors.Wrapf(err, "create request to %v", backendURL)}
|
||||
return errors.Wrapf(err, "create request to %v", backendURL)
|
||||
}
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
if urlErr, ok := err.(*url.Error); ok {
|
||||
if urlErr.Err == io.EOF {
|
||||
return &RTMPProxyError{true, errors.Errorf("do request to %v EOF", backendURL)}
|
||||
}
|
||||
if urlErr.Err == context.Canceled && r.Context().Err() != nil {
|
||||
return &RTMPProxyError{false, errors.Wrapf(io.EOF, "client closed")}
|
||||
}
|
||||
}
|
||||
return &RTMPProxyError{true, errors.Wrapf(err, "do request to %v", backendURL)}
|
||||
return errors.Wrapf(err, "do request to %v", backendURL)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return &RTMPProxyError{true, errors.Errorf("proxy stream to %v failed, status=%v", backendURL, resp.Status)}
|
||||
return errors.Errorf("proxy stream to %v failed, status=%v", backendURL, resp.Status)
|
||||
}
|
||||
|
||||
// Copy all headers from backend to client.
|
||||
|
@ -313,64 +256,14 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite
|
|||
}
|
||||
}
|
||||
|
||||
v.written = true
|
||||
logger.Df(ctx, "HTTP start streaming")
|
||||
|
||||
// For all proxy goroutines.
|
||||
var wg stdSync.WaitGroup
|
||||
defer wg.Wait()
|
||||
|
||||
// Detect the client closed.
|
||||
wg.Add(1)
|
||||
var r0 error
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer cancel()
|
||||
|
||||
r0 = func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-r.Context().Done():
|
||||
return &RTMPProxyError{false, errors.Wrapf(io.EOF, "client closed")}
|
||||
}
|
||||
}()
|
||||
}()
|
||||
|
||||
// Copy all data from backend to client.
|
||||
wg.Add(1)
|
||||
var r1 error
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer cancel()
|
||||
|
||||
r1 = func() error {
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := resp.Body.Read(buf)
|
||||
if err != nil {
|
||||
return &RTMPProxyError{true, errors.Wrapf(err, "read stream from %v", backendURL)}
|
||||
}
|
||||
|
||||
if _, err := w.Write(buf[:n]); err != nil {
|
||||
return &RTMPProxyError{false, errors.Wrapf(err, "write stream client")}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}()
|
||||
|
||||
// Wait until all goroutine quit.
|
||||
wg.Wait()
|
||||
|
||||
// Reset the error if caused by another goroutine.
|
||||
if errors.Cause(r0) == context.Canceled && parentCtx.Err() == nil {
|
||||
r0 = nil
|
||||
}
|
||||
if errors.Cause(r1) == context.Canceled && parentCtx.Err() == nil {
|
||||
r1 = nil
|
||||
// Proxy the stream from backend to client.
|
||||
if _, err := io.Copy(w, resp.Body); err != nil {
|
||||
return errors.Wrapf(err, "copy stream to client, backend=%v", backendURL)
|
||||
}
|
||||
|
||||
return NewRTMPMultipleError(r0, r1, parentCtx.Err())
|
||||
return nil
|
||||
}
|
||||
|
||||
type HLSStreaming struct {
|
||||
|
@ -490,7 +383,7 @@ func (v *HLSStreaming) serveByBackend(ctx context.Context, w http.ResponseWriter
|
|||
// For TS file, directly copy it.
|
||||
if !strings.HasSuffix(r.URL.Path, ".m3u8") {
|
||||
if _, err := io.Copy(w, resp.Body); err != nil {
|
||||
return errors.Wrapf(err, "write stream client")
|
||||
return errors.Wrapf(err, "copy stream to client, backend=%v", backendURL)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue