1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Refine RTMP streaming error.

This commit is contained in:
winlin 2024-09-03 16:46:11 +08:00
parent d053b8101e
commit 68595a587d
2 changed files with 20 additions and 129 deletions

View file

@ -86,41 +86,11 @@ func (v *rtmpServer) Run(ctx context.Context) error {
defer v.wg.Done()
defer conn.Close()
var backendClosedErr, clientClosedErr bool
handleBackendErr := func(err error) {
if isPeerClosedError(err) {
if !backendClosedErr {
backendClosedErr = true
logger.Df(ctx, "RTMP backend peer closed")
}
} else {
logger.Wf(ctx, "RTMP backend err %+v", err)
}
}
handleClientErr := func(err error) {
if isPeerClosedError(err) {
if !clientClosedErr {
clientClosedErr = true
logger.Df(ctx, "RTMP client peer closed")
}
} else {
logger.Wf(ctx, "RTMP client %v err %+v", conn.RemoteAddr(), err)
}
}
handleErr := func(err error) {
if perr, ok := err.(*RTMPProxyError); ok {
// For proxy error, maybe caused by proxy or client.
if perr.isBackend {
handleBackendErr(perr.err)
} else {
handleClientErr(perr.err)
}
if isPeerClosedError(err) {
logger.Df(ctx, "RTMP peer is closed")
} else {
// Default as client error.
handleClientErr(err)
logger.Wf(ctx, "RTMP serve err %+v", err)
}
}
@ -128,15 +98,7 @@ func (v *rtmpServer) Run(ctx context.Context) error {
client.rd = v.rd
})
if err := rc.serve(ctx, conn); err != nil {
if merr, ok := err.(*RTMPMultipleError); ok {
// If multiple errors, handle all of them.
for _, err := range merr.errs {
handleErr(err)
}
} else {
// If single error, directly handle it.
handleErr(err)
}
handleErr(err)
} else {
logger.Df(ctx, "RTMP client done")
}
@ -147,60 +109,6 @@ func (v *rtmpServer) Run(ctx context.Context) error {
return nil
}
type RTMPMultipleError struct {
// The caused errors.
errs []error
}
// NewRTMPMultipleError ignore nil errors. If no error, return nil.
func NewRTMPMultipleError(errs ...error) error {
var nerrs []error
for _, err := range errs {
if errors.Cause(err) != nil {
nerrs = append(nerrs, err)
}
}
if len(nerrs) == 0 {
return nil
}
return &RTMPMultipleError{errs: nerrs}
}
func (v *RTMPMultipleError) Error() string {
var b strings.Builder
for i, err := range v.errs {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(err.Error())
}
return b.String()
}
func (v *RTMPMultipleError) Cause() error {
if len(v.errs) == 0 {
return nil
}
return v.errs[0]
}
type RTMPProxyError struct {
// Whether error is caused by backend.
isBackend bool
// The caused error.
err error
}
func (v *RTMPProxyError) Error() string {
return v.err.Error()
}
func (v *RTMPProxyError) Cause() error {
return v.err
}
type RTMPConnection struct {
// The random number generator.
rd *rand.Rand
@ -217,13 +125,15 @@ func NewRTMPConnection(opts ...func(*RTMPConnection)) *RTMPConnection {
func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
logger.Df(ctx, "Got RTMP client from %v", conn.RemoteAddr())
// Close the connection when ctx done.
// If any goroutine quit, cancel another one.
parentCtx := ctx
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var backend *RTMPClientToBackend
if true {
connDoneCtx, connDoneCancel := context.WithCancel(ctx)
defer connDoneCancel()
go func() {
<-connDoneCtx.Done()
<-ctx.Done()
conn.Close()
if backend != nil {
backend.Close()
@ -380,7 +290,7 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
defer backend.Close()
if err := backend.Connect(ctx, tcUrl, streamName); err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "connect backend, tcUrl=%v, stream=%v", tcUrl, streamName)}
return errors.Wrapf(err, "connect backend, tcUrl=%v, stream=%v", tcUrl, streamName)
}
// Start the streaming.
@ -424,10 +334,6 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
var wg sync.WaitGroup
defer wg.Wait()
// If any goroutine quit, cancel another one.
parentCtx := ctx
ctx, cancel := context.WithCancel(ctx)
// Proxy all message from backend to client.
wg.Add(1)
var r0 error
@ -439,13 +345,13 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
for {
m, err := backend.client.ReadMessage(ctx)
if err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "read message")}
return errors.Wrapf(err, "read message")
}
//logger.Df(ctx, "client<- %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload))
// TODO: Update the stream ID if not the same.
if err := client.WriteMessage(ctx, m); err != nil {
return &RTMPProxyError{false, errors.Wrapf(err, "write message")}
return errors.Wrapf(err, "write message")
}
}
}()
@ -462,13 +368,13 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
for {
m, err := client.ReadMessage(ctx)
if err != nil {
return &RTMPProxyError{false, errors.Wrapf(err, "read message")}
return errors.Wrapf(err, "read message")
}
//logger.Df(ctx, "client-> %v %v %vB", m.MessageType, m.Timestamp, len(m.Payload))
// TODO: Update the stream ID if not the same.
if err := backend.client.WriteMessage(ctx, m); err != nil {
return &RTMPProxyError{true, errors.Wrapf(err, "write message")}
return errors.Wrapf(err, "write message")
}
}
}()
@ -478,14 +384,14 @@ func (v *RTMPConnection) serve(ctx context.Context, conn *net.TCPConn) error {
wg.Wait()
// Reset the error if caused by another goroutine.
if errors.Cause(r0) == context.Canceled && parentCtx.Err() == nil {
r0 = nil
if r0 != nil {
return errors.Wrapf(r0, "proxy backend->client")
}
if errors.Cause(r1) == context.Canceled && parentCtx.Err() == nil {
r1 = nil
if r1 != nil {
return errors.Wrapf(r1, "proxy client->backend")
}
return NewRTMPMultipleError(r0, r1, parentCtx.Err())
return parentCtx.Err()
}
type RTMPClientType string

View file

@ -140,18 +140,3 @@ func convertURLToStreamURL(r *http.Request) (unifiedURL, fullURL string) {
fullURL = fmt.Sprintf("%v%v", unifiedURL, streamExt)
return
}
// wrapProxyError extract and wrap the proxy and multiple errors with extraMsg.
func wrapProxyError(err error, extraMsg string) error {
if perr, ok := err.(*RTMPProxyError); ok {
return &RTMPProxyError{perr.isBackend, errors.Wrapf(perr.err, extraMsg)}
} else if merr, ok := err.(*RTMPMultipleError); ok {
var errs []error
for _, e := range merr.errs {
errs = append(errs, errors.Wrapf(e, extraMsg))
}
return NewRTMPMultipleError(errs...)
} else {
return errors.Wrapf(err, extraMsg)
}
}