diff --git a/trunk/3rdparty/srs-bench/LICENSE b/trunk/3rdparty/srs-bench/LICENSE index 77ba5769d..1cdf14566 100644 --- a/trunk/3rdparty/srs-bench/LICENSE +++ b/trunk/3rdparty/srs-bench/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2021 srs-bench(ossrs) +Copyright (c) 2021 Winlin Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/trunk/3rdparty/srs-bench/README.md b/trunk/3rdparty/srs-bench/README.md index 415cb195f..6b903692c 100644 --- a/trunk/3rdparty/srs-bench/README.md +++ b/trunk/3rdparty/srs-bench/README.md @@ -142,6 +142,12 @@ go test ./srs -mod=vendor -v -srs-server=127.0.0.1 make && ./objs/srs_test -test.v -srs-server=127.0.0.1 ``` +可以只运行某个用例,并打印详细日志,比如: + +```bash +make && ./objs/srs_test -test.v -srs-log -test.run TestRtcBasic_PublishPlay +``` + 支持的参数如下: * `-srs-server`,RTC服务器地址。默认值:`127.0.0.1` diff --git a/trunk/3rdparty/srs-bench/auto/sync_vnet.sh b/trunk/3rdparty/srs-bench/auto/sync_vnet.sh new file mode 100755 index 000000000..55ef15f1a --- /dev/null +++ b/trunk/3rdparty/srs-bench/auto/sync_vnet.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +FILES=(udpproxy.go udpproxy_test.go) +for file in ${FILES[@]}; do + echo "cp vnet/udpproxy.go ~/git/transport/vnet/" && + cp vnet/udpproxy.go ~/git/transport/vnet/ +done + +# https://github.com/pion/webrtc/wiki/Contributing#run-all-automated-tests-and-checks-before-submitting +cd ~/git/transport/ + +echo ".github/lint-commit-message.sh" && +.github/lint-commit-message.sh && +echo ".github/assert-contributors.sh" && +.github/assert-contributors.sh && +echo ".github/lint-disallowed-functions-in-library.sh" && +.github/lint-disallowed-functions-in-library.sh && +echo ".github/lint-filename.sh" && +.github/lint-filename.sh +if [[ $? -ne 0 ]]; then echo "fail"; exit -1; fi + +# https://github.com/pion/webrtc/wiki/Contributing#run-all-automated-tests-and-checks-before-submitting +cd ~/git/transport/vnet/ + +echo "go test -race ./..." && +go test -race ./... +if [[ $? -ne 0 ]]; then echo "fail"; exit -1; fi + +echo "golangci-lint run --skip-files conn_map_test.go" && +golangci-lint run --skip-files conn_map_test.go +if [[ $? -ne 0 ]]; then echo "fail"; exit -1; fi + +echo "OK" diff --git a/trunk/3rdparty/srs-bench/srs/ingester.go b/trunk/3rdparty/srs-bench/srs/ingester.go index f38409e59..bcedebbb6 100644 --- a/trunk/3rdparty/srs-bench/srs/ingester.go +++ b/trunk/3rdparty/srs-bench/srs/ingester.go @@ -44,16 +44,20 @@ type videoIngester struct { markerInterceptor *RTPInterceptor sVideoTrack *webrtc.TrackLocalStaticSample sVideoSender *webrtc.RTPSender + ready context.Context + readyCancel context.CancelFunc } func NewVideoIngester(sourceVideo string) *videoIngester { - return &videoIngester{markerInterceptor: &RTPInterceptor{}, sourceVideo: sourceVideo} + v := &videoIngester{markerInterceptor: &RTPInterceptor{}, sourceVideo: sourceVideo} + v.ready, v.readyCancel = context.WithCancel(context.Background()) + return v } func (v *videoIngester) Close() error { + v.readyCancel() if v.sVideoSender != nil { - v.sVideoSender.Stop() - v.sVideoSender = nil + _ = v.sVideoSender.Stop() } return nil } @@ -102,6 +106,9 @@ func (v *videoIngester) Ingest(ctx context.Context) error { logger.Tf(ctx, "Video %v, tbn=%v, fps=%v, ssrc=%v, pt=%v, header=%v", codec.MimeType, codec.ClockRate, fps, enc.SSRC, codec.PayloadType, headers) + // OK, we are ready. + v.readyCancel() + clock := newWallClock() sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 / uint64(fps)) for ctx.Err() == nil { @@ -179,16 +186,21 @@ type audioIngester struct { audioLevelInterceptor *RTPInterceptor sAudioTrack *webrtc.TrackLocalStaticSample sAudioSender *webrtc.RTPSender + ready context.Context + readyCancel context.CancelFunc } func NewAudioIngester(sourceAudio string) *audioIngester { - return &audioIngester{audioLevelInterceptor: &RTPInterceptor{}, sourceAudio: sourceAudio} + v := &audioIngester{audioLevelInterceptor: &RTPInterceptor{}, sourceAudio: sourceAudio} + v.ready, v.readyCancel = context.WithCancel(context.Background()) + return v } func (v *audioIngester) Close() error { + v.readyCancel() // OK we are closed, also ready. + if v.sAudioSender != nil { - v.sAudioSender.Stop() - v.sAudioSender = nil + _ = v.sAudioSender.Stop() } return nil } @@ -240,6 +252,9 @@ func (v *audioIngester) Ingest(ctx context.Context) error { } } + // OK, we are ready. + v.readyCancel() + clock := newWallClock() var lastGranule uint64 @@ -253,7 +268,7 @@ func (v *audioIngester) Ingest(ctx context.Context) error { } // The amount of samples is the difference between the last and current timestamp - sampleCount := uint64(pageHeader.GranulePosition - lastGranule) + sampleCount := pageHeader.GranulePosition - lastGranule lastGranule = pageHeader.GranulePosition sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 * sampleCount / uint64(codec.ClockRate)) @@ -266,7 +281,7 @@ func (v *audioIngester) Ingest(ctx context.Context) error { return 0, err } - header.SetExtension(uint8(audioLevel.ID), audioLevelPayload) + _ = header.SetExtension(uint8(audioLevel.ID), audioLevelPayload) } return ri.nextRTPWriter.Write(header, payload, attributes) diff --git a/trunk/3rdparty/srs-bench/srs/rtc_test.go b/trunk/3rdparty/srs-bench/srs/rtc_test.go index 62ad26663..de5ac0ff0 100644 --- a/trunk/3rdparty/srs-bench/srs/rtc_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtc_test.go @@ -22,11 +22,13 @@ package srs import ( "context" + "encoding/json" "fmt" "github.com/pion/transport/vnet" "io" "io/ioutil" "math/rand" + "net/http" "os" "sync" "testing" @@ -73,7 +75,7 @@ func TestRtcBasic_PublishPlay(t *testing.T) { var resources []io.Closer defer func() { for _, resource := range resources { - resource.Close() + _ = resource.Close() } }() @@ -93,27 +95,19 @@ func TestRtcBasic_PublishPlay(t *testing.T) { defer wg.Done() defer cancel() - doInit := func() error { - playOK, vnetClientIP := *srsPlayOKPackets, *srsVnetClientIP + doInit := func() (err error) { streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int()) // Initialize player with private api. - if play, err := NewTestPlayer(nil, func(play *TestPlayer) error { + if thePlayer, err = NewTestPlayer(CreateApiForPlayer, func(play *TestPlayer) error { play.streamSuffix = streamSuffix resources = append(resources, play) - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - resources = append(resources, api) - play.api = api - var nnPlayWriteRTCP, nnPlayReadRTCP, nnPlayWriteRTP, nnPlayReadRTP uint64 - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + return play.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnPlayReadRTP++; nnPlayReadRTP >= uint64(playOK) { + if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) { cancel() // Completed. } logger.Tf(ctx, "Play rtp=(recv:%v, send:%v), rtcp=(recv:%v send:%v) packets", @@ -133,32 +127,19 @@ func TestRtcBasic_PublishPlay(t *testing.T) { return nn, err } })) - }); err != nil { - return err - } - - return nil + }) }); err != nil { return err - } else { - thePlayer = play } // Initialize publisher with private api. - if pub, err := NewTestPublisher(nil, func(pub *TestPublisher) error { + if thePublisher, err = NewTestPublisher(CreateApiForPublisher, func(pub *TestPublisher) error { pub.streamSuffix = streamSuffix pub.iceReadyCancel = publishReadyCancel resources = append(resources, pub) - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - resources = append(resources, api) - pub.api = api - var nnPubWriteRTCP, nnPubReadRTCP, nnPubWriteRTP, nnPubReadRTP uint64 - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + return pub.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { nn, attr, err := i.nextRTPReader.Read(buf, attributes) @@ -185,15 +166,9 @@ func TestRtcBasic_PublishPlay(t *testing.T) { return nn, err } })) - }); err != nil { - return err - } - - return nil + }) }); err != nil { return err - } else { - thePublisher = pub } // Init done. @@ -216,14 +191,10 @@ func TestRtcBasic_PublishPlay(t *testing.T) { select { case <-ctx.Done(): - return case <-mainReady.Done(): + r2 = thePublisher.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "pub done") } - - if err := thePublisher.Run(logger.WithContext(ctx), cancel); err != nil { - r2 = err - } - logger.Tf(ctx, "pub done") }() // Run player. @@ -234,14 +205,159 @@ func TestRtcBasic_PublishPlay(t *testing.T) { select { case <-ctx.Done(): - return case <-publishReady.Done(): + r3 = thePlayer.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "play done") + } + }() +} + +// When republish a stream, the player stream SHOULD be continuous. +func TestRtcBasic_Republish(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + + var r0, r1, r2, r3, r4 error + defer func(ctx context.Context) { + if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4); err != nil { + t.Errorf("Fail for err %+v", err) + } else { + logger.Tf(ctx, "test done with err %+v", err) + } + }(ctx) + + var resources []io.Closer + defer func() { + for _, resource := range resources { + _ = resource.Close() + } + }() + + var wg sync.WaitGroup + defer wg.Wait() + + // The event notify. + var thePublisher, theRepublisher *TestPublisher + var thePlayer *TestPlayer + + mainReady, mainReadyCancel := context.WithCancel(context.Background()) + publishReady, publishReadyCancel := context.WithCancel(context.Background()) + republishReady, republishReadyCancel := context.WithCancel(context.Background()) + + // Objects init. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + doInit := func() (err error) { + streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int()) + + // Initialize player with private api. + if thePlayer, err = NewTestPlayer(CreateApiForPlayer, func(play *TestPlayer) error { + play.streamSuffix = streamSuffix + resources = append(resources, play) + + var nnPlayReadRTP uint64 + return play.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { + api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { + i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + select { + case <-republishReady.Done(): + if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) { + cancel() // Completed. + } + logger.Tf(ctx, "Play recv rtp %v packets", nnPlayReadRTP) + default: + logger.Tf(ctx, "Play recv rtp packet before republish") + } + return i.nextRTPReader.Read(payload, attributes) + } + })) + }) + }); err != nil { + return err + } + + // Initialize publisher with private api. + if thePublisher, err = NewTestPublisher(CreateApiForPublisher, func(pub *TestPublisher) error { + pub.streamSuffix = streamSuffix + pub.iceReadyCancel = publishReadyCancel + resources = append(resources, pub) + + var nnPubReadRTCP uint64 + return pub.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { + api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + nn, attr, err := i.nextRTCPReader.Read(buf, attributes) + if nnPubReadRTCP++; nnPubReadRTCP > 0 && pub.cancel != nil { + pub.cancel() // We only cancel the publisher itself. + } + logger.Tf(ctx, "Publish recv rtcp %v packets", nnPubReadRTCP) + return nn, attr, err + } + })) + }) + }); err != nil { + return err + } + + // Initialize re-publisher with private api. + if theRepublisher, err = NewTestPublisher(CreateApiForPublisher, func(pub *TestPublisher) error { + pub.streamSuffix = streamSuffix + pub.iceReadyCancel = republishReadyCancel + resources = append(resources, pub) + + return pub.Setup(*srsVnetClientIP) + }); err != nil { + return err + } + + // Init done. + mainReadyCancel() + + <-ctx.Done() + return nil } - if err := thePlayer.Run(logger.WithContext(ctx), cancel); err != nil { - r3 = err + if err := doInit(); err != nil { + r1 = err + } + }() + + // Run publisher. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + select { + case <-ctx.Done(): + case <-mainReady.Done(): + pubCtx, pubCancel := context.WithCancel(ctx) + r2 = thePublisher.Run(logger.WithContext(pubCtx), pubCancel) + logger.Tf(ctx, "pub done, re-publish again") + + // Dispose the stream. + _ = thePublisher.Close() + + r4 = theRepublisher.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "re-pub done") + } + }() + + // Run player. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + select { + case <-ctx.Done(): + case <-publishReady.Done(): + r3 = thePlayer.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "play done") } - logger.Tf(ctx, "play done") }() } @@ -252,18 +368,8 @@ func TestRtcBasic_PublishPlay(t *testing.T) { // No.4 srs-server: ChangeCipherSpec, Finished func TestRtcDTLS_ClientActive_Default(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -273,7 +379,8 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -283,7 +390,7 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -316,18 +423,8 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) { // No.4 srs-bench: ChangeCipherSpec, Finished func TestRtcDTLS_ClientPassive_Default(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -337,7 +434,8 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -347,7 +445,7 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -377,18 +475,8 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) { // When srs-bench close the PC, it will send DTLS alert and might retransmit it. func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -398,7 +486,8 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -408,7 +497,7 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -445,18 +534,8 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { // When srs-bench close the PC, it will send DTLS alert and might retransmit it. func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -466,7 +545,8 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -476,7 +556,7 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -520,18 +600,8 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) { var r0 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -541,7 +611,8 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -551,7 +622,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -579,7 +650,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T lastClientHello = record nnClientHello++ - ok = (nnClientHello > nnMaxDrop) + ok = nnClientHello > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnClientHello, chunk, record, ok, len(c.UserData())) return }) @@ -607,18 +678,8 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) { var r0 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -628,7 +689,8 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -638,7 +700,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -666,7 +728,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. lastClientHello = record nnClientHello++ - ok = (nnClientHello > nnMaxDrop) + ok = nnClientHello > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnClientHello, chunk, record, ok, len(c.UserData())) return }) @@ -693,18 +755,8 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) { var r0, r1 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -714,7 +766,8 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -724,7 +777,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -761,7 +814,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T lastServerHello = record nnServerHello++ - ok = (nnServerHello > nnMaxDrop) + ok = nnServerHello > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnServerHello, chunk, record, ok, len(c.UserData())) return }) @@ -790,18 +843,8 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) { var r0, r1 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -811,7 +854,8 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -821,7 +865,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -858,7 +902,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. lastServerHello = record nnServerHello++ - ok = (nnServerHello > nnMaxDrop) + ok = nnServerHello > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnServerHello, chunk, record, ok, len(c.UserData())) return }) @@ -884,18 +928,8 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) { var r0 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -905,7 +939,8 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -915,7 +950,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -943,7 +978,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T lastCertificate = record nnCertificate++ - ok = (nnCertificate > nnMaxDrop) + ok = nnCertificate > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnCertificate, chunk, record, ok, len(c.UserData())) return }) @@ -970,18 +1005,8 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) { var r0 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -991,7 +1016,8 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -1001,7 +1027,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -1029,7 +1055,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. lastCertificate = record nnCertificate++ - ok = (nnCertificate > nnMaxDrop) + ok = nnCertificate > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnCertificate, chunk, record, ok, len(c.UserData())) return }) @@ -1056,18 +1082,8 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) { var r0, r1 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -1077,7 +1093,8 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -1087,7 +1104,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -1123,7 +1140,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test lastChangeCipherSepc = record nnCertificate++ - ok = (nnCertificate > nnMaxDrop) + ok = nnCertificate > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnCertificate, chunk, record, ok, len(c.UserData())) return }) @@ -1151,18 +1168,8 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) { var r0, r1 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1172,7 +1179,8 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -1182,7 +1190,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -1218,7 +1226,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes lastChangeCipherSepc = record nnCertificate++ - ok = (nnCertificate > nnMaxDrop) + ok = nnCertificate > nnMaxDrop logger.Tf(ctx, "NN=%v, Chunk %v, %v, ok=%v %v bytes", nnCertificate, chunk, record, ok, len(c.UserData())) return }) @@ -1237,18 +1245,8 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes // Drop all DTLS packets when got ClientHello, to test the server ARQ thread cleanup. func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - vnetClientIP, dtlsDropPackets := *srsVnetClientIP, *srsDTLSDropPackets - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1258,7 +1256,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { nnDrop, dropAll := 0, false api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) { chunk, parsed := NewChunkMessageType(c) @@ -1275,7 +1274,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { return true } - if nnDrop++; nnDrop >= dtlsDropPackets { + if nnDrop++; nnDrop >= *srsDTLSDropPackets { cancel() // Done, server transmit 5 Client Hello. } @@ -1299,18 +1298,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { // Drop all DTLS packets when got ServerHello, to test the server ARQ thread cleanup. func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - vnetClientIP, dtlsDropPackets := *srsVnetClientIP, *srsDTLSDropPackets - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1320,7 +1309,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { nnDrop, dropAll := 0, false api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) { chunk, parsed := NewChunkMessageType(c) @@ -1337,7 +1327,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { return true } - if nnDrop++; nnDrop >= dtlsDropPackets { + if nnDrop++; nnDrop >= *srsDTLSDropPackets { cancel() // Done, server transmit 5 Client Hello. } @@ -1361,18 +1351,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { // Drop all DTLS packets when got Certificate, to test the server ARQ thread cleanup. func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - vnetClientIP, dtlsDropPackets := *srsVnetClientIP, *srsDTLSDropPackets - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1382,7 +1362,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { nnDrop, dropAll := 0, false api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) { chunk, parsed := NewChunkMessageType(c) @@ -1399,7 +1380,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { return true } - if nnDrop++; nnDrop >= dtlsDropPackets { + if nnDrop++; nnDrop >= *srsDTLSDropPackets { cancel() // Done, server transmit 5 Client Hello. } @@ -1423,18 +1404,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { // Drop all DTLS packets when got ChangeCipherSpec, to test the server ARQ thread cleanup. func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - vnetClientIP, dtlsDropPackets := *srsVnetClientIP, *srsDTLSDropPackets - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1444,7 +1415,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { nnDrop, dropAll := 0, false api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) { chunk, parsed := NewChunkMessageType(c) @@ -1461,7 +1433,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { return true } - if nnDrop++; nnDrop >= dtlsDropPackets { + if nnDrop++; nnDrop >= *srsDTLSDropPackets { cancel() // Done, server transmit 5 Client Hello. } @@ -1486,18 +1458,8 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { // which also consume about 750ms, but finally should be done successfully. func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { if err := filterTestError(func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP, dtlsDropPackets := *srsPublishOKPackets, *srsVnetClientIP, *srsDTLSDropPackets - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1507,7 +1469,8 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -1517,7 +1480,7 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -1545,7 +1508,7 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { } if chunk.IsCertificate() { - if nnDropCertificate >= dtlsDropPackets { + if nnDropCertificate >= *srsDTLSDropPackets { return true } nnDropCertificate++ @@ -1573,18 +1536,8 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { var r0 error err := func() error { - ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) - publishOK, vnetClientIP := *srsPublishOKPackets, *srsVnetClientIP - - // Create top level test object. - api, err := NewTestWebRTCAPI() - if err != nil { - return err - } - defer api.Close() - streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := NewTestPublisher(api, func(p *TestPublisher) error { + p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1594,7 +1547,8 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { } defer p.Close() - if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) { var nnRTCP, nnRTP int64 api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) { i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -1604,7 +1558,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { })) api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) { i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { - if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) { + if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) { cancel() // Send enough packets, done. } logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP) @@ -1662,3 +1616,47 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { t.Errorf("err %+v", err) } } + +func TestRTCServerVersion(t *testing.T) { + api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer) + req, err := http.NewRequest("POST", api, nil) + if err != nil { + t.Errorf("Request %v", api) + return + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + t.Errorf("Do request %v", api) + return + } + + b, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("Read body of %v", api) + return + } + + obj := struct { + Code int `json:"code"` + Server string `json:"server"` + Data struct { + Major int `json:"major"` + Minor int `json:"minor"` + Revision int `json:"revision"` + Version string `json:"version"` + } `json:"data"` + }{} + if err := json.Unmarshal(b, &obj); err != nil { + t.Errorf("Parse %v", string(b)) + return + } + if obj.Code != 0 { + t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server) + return + } + if obj.Data.Major == 0 && obj.Data.Minor == 0 { + t.Errorf("Invalid version %v", obj.Data) + return + } +} diff --git a/trunk/3rdparty/srs-bench/srs/util.go b/trunk/3rdparty/srs-bench/srs/util.go index 941b0bb76..e8cb47dad 100644 --- a/trunk/3rdparty/srs-bench/srs/util.go +++ b/trunk/3rdparty/srs-bench/srs/util.go @@ -36,7 +36,6 @@ import ( "strconv" "strings" "sync" - "testing" "time" "github.com/ossrs/go-oryx-lib/errors" @@ -207,7 +206,7 @@ func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error logger.Tf(ctx, "Parse response to code=%v, session=%v, sdp=%v bytes", resBody.Code, resBody.Session, len(resBody.SDP)) - return string(resBody.SDP), nil + return resBody.SDP, nil } func escapeSDP(sdp string) string { @@ -219,7 +218,7 @@ func packageAsSTAPA(frames ...*h264reader.NAL) *h264reader.NAL { buf := bytes.Buffer{} buf.WriteByte( - byte(first.RefIdc<<5)&0x60 | byte(24), // STAP-A + first.RefIdc<<5&0x60 | byte(24), // STAP-A ) for _, frame := range frames { @@ -325,6 +324,14 @@ func filterTestError(errs ...error) error { if err == nil || errors.Cause(err) == context.Canceled { continue } + + // If url error, server maybe error, do not print the detail log. + if r0 := errors.Cause(err); r0 != nil { + if r1, ok := r0.(*url.Error); ok { + err = r1 + } + } + filteredErrors = append(filteredErrors, err) } @@ -352,13 +359,13 @@ func srsIsStun(b []byte) bool { // @see https://tools.ietf.org/html/rfc2246#section-6.2.1 // @see srs_is_dtls of https://github.com/ossrs/srs func srsIsDTLS(b []byte) bool { - return (len(b) >= 13 && (b[0] > 19 && b[0] < 64)) + return len(b) >= 13 && (b[0] > 19 && b[0] < 64) } // For RTP or RTCP, the V=2 which is in the high 2bits, 0xC0 (1100 0000) // @see srs_is_rtp_or_rtcp of https://github.com/ossrs/srs func srsIsRTPOrRTCP(b []byte) bool { - return (len(b) >= 12 && (b[0]&0xC0) == 0x80) + return len(b) >= 12 && (b[0]&0xC0) == 0x80 } // For RTCP, PT is [128, 223] (or without marker [0, 95]). @@ -554,7 +561,7 @@ func (v *DTLSRecord) Unmarshal(b []byte) error { return errors.Errorf("requires 13B only %v", len(b)) } - v.ContentType = DTLSContentType(uint8(b[0])) + v.ContentType = DTLSContentType(b[0]) v.Version = uint16(b[1])<<8 | uint16(b[2]) v.Epoch = uint16(b[3])<<8 | uint16(b[4]) v.SequenceNumber = uint64(b[5])<<40 | uint64(b[6])<<32 | uint64(b[7])<<24 | uint64(b[8])<<16 | uint64(b[9])<<8 | uint64(b[10]) @@ -605,11 +612,11 @@ func NewTestWebRTCAPI(options ...TestWebRTCAPIOptionFunc) (*TestWebRTCAPI, error func (v *TestWebRTCAPI) Close() error { if v.proxy != nil { - v.proxy.Close() + _ = v.proxy.Close() } if v.router != nil { - v.router.Stop() + _ = v.router.Stop() } return nil @@ -676,14 +683,24 @@ type TestPlayerOptionFunc func(p *TestPlayer) error type TestPlayer struct { pc *webrtc.PeerConnection receivers []*webrtc.RTPReceiver - // root api object + // We should dispose it. api *TestWebRTCAPI // Optional suffix for stream url. streamSuffix string } -func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) (*TestPlayer, error) { - v := &TestPlayer{api: api} +func CreateApiForPlayer(play *TestPlayer) error { + api, err := NewTestWebRTCAPI() + if err != nil { + return err + } + + play.api = api + return nil +} + +func NewTestPlayer(options ...TestPlayerOptionFunc) (*TestPlayer, error) { + v := &TestPlayer{} for _, opt := range options { if err := opt(v); err != nil { @@ -691,19 +708,24 @@ func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) (*TestPl } } - // The api might be override by options. - api = v.api - return v, nil } +func (v *TestPlayer) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error { + return v.api.Setup(vnetClientIP, options...) +} + func (v *TestPlayer) Close() error { if v.pc != nil { - v.pc.Close() + _ = v.pc.Close() } for _, receiver := range v.receivers { - receiver.Stop() + _ = receiver.Stop() + } + + if v.api != nil { + _ = v.api.Close() } return nil @@ -723,12 +745,16 @@ func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error { } v.pc = pc - pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionRecvonly, - }) - pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ + }); err != nil { + return errors.Wrapf(err, "add track") + } + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionRecvonly, - }) + }); err != nil { + return errors.Wrapf(err, "add track") + } offer, err := pc.CreateOffer(nil) if err != nil { @@ -818,16 +844,28 @@ type TestPublisher struct { aIngester *audioIngester vIngester *videoIngester pc *webrtc.PeerConnection - // root api object + // We should dispose it. api *TestWebRTCAPI // Optional suffix for stream url. streamSuffix string + // To cancel the publisher, pass by Run. + cancel context.CancelFunc } -func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (*TestPublisher, error) { +func CreateApiForPublisher(pub *TestPublisher) error { + api, err := NewTestWebRTCAPI() + if err != nil { + return err + } + + pub.api = api + return nil +} + +func NewTestPublisher(options ...TestPublisherOptionFunc) (*TestPublisher, error) { sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio - v := &TestPublisher{api: api} + v := &TestPublisher{} for _, opt := range options { if err := opt(v); err != nil { @@ -835,9 +873,6 @@ func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (* } } - // The api might be override by options. - api = v.api - // Create ingesters. if sourceAudio != "" { v.aIngester = NewAudioIngester(sourceAudio) @@ -847,6 +882,7 @@ func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (* } // Setup the interceptors for packets. + api := v.api api.options = append(api.options, func(api *TestWebRTCAPI) { // Filter for RTCP packets. rtcpInterceptor := &RTCPInterceptor{} @@ -870,17 +906,25 @@ func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (* return v, nil } +func (v *TestPublisher) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error { + return v.api.Setup(vnetClientIP, options...) +} + func (v *TestPublisher) Close() error { if v.vIngester != nil { - v.vIngester.Close() + _ = v.vIngester.Close() } if v.aIngester != nil { - v.aIngester.Close() + _ = v.aIngester.Close() } if v.pc != nil { - v.pc.Close() + _ = v.pc.Close() + } + + if v.api != nil { + _ = v.api.Close() } return nil @@ -892,6 +936,9 @@ func (v *TestPublisher) SetStreamSuffix(suffix string) *TestPublisher { } func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) error { + // Save the cancel. + v.cancel = cancel + r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) if v.streamSuffix != "" { r = fmt.Sprintf("%v-%v", r, v.streamSuffix) @@ -1012,11 +1059,17 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro <-ctx.Done() if v.aIngester != nil && v.aIngester.sAudioSender != nil { - v.aIngester.sAudioSender.Stop() + // We MUST wait for the ingester ready(or closed), because it might crash if sender is disposed. + <-v.aIngester.ready.Done() + + _ = v.aIngester.Close() } if v.vIngester != nil && v.vIngester.sVideoSender != nil { - v.vIngester.sVideoSender.Stop() + // We MUST wait for the ingester ready(or closed), because it might crash if sender is disposed. + <-v.vIngester.ready.Done() + + _ = v.vIngester.Close() } }() @@ -1028,6 +1081,7 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro if v.aIngester == nil { return } + defer v.aIngester.readyCancel() select { case <-ctx.Done(): @@ -1072,6 +1126,7 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro if v.vIngester == nil { return } + defer v.vIngester.readyCancel() select { case <-ctx.Done(): @@ -1119,47 +1174,3 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro } return ctx.Err() } - -func TestRTCServerVersion(t *testing.T) { - api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer) - req, err := http.NewRequest("POST", api, nil) - if err != nil { - t.Errorf("Request %v", api) - return - } - - res, err := http.DefaultClient.Do(req) - if err != nil { - t.Errorf("Do request %v", api) - return - } - - b, err := ioutil.ReadAll(res.Body) - if err != nil { - t.Errorf("Read body of %v", api) - return - } - - obj := struct { - Code int `json:"code"` - Server string `json:"server"` - Data struct { - Major int `json:"major"` - Minor int `json:"minor"` - Revision int `json:"revision"` - Version string `json:"version"` - } `json:"data"` - }{} - if err := json.Unmarshal(b, &obj); err != nil { - t.Errorf("Parse %v", string(b)) - return - } - if obj.Code != 0 { - t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server) - return - } - if obj.Data.Major == 0 && obj.Data.Minor == 0 { - t.Errorf("Invalid version %v", obj.Data) - return - } -} diff --git a/trunk/3rdparty/srs-bench/vnet/udpproxy.go b/trunk/3rdparty/srs-bench/vnet/udpproxy.go index 60dfb87fe..caba86e0e 100644 --- a/trunk/3rdparty/srs-bench/vnet/udpproxy.go +++ b/trunk/3rdparty/srs-bench/vnet/udpproxy.go @@ -1,34 +1,13 @@ -// The MIT License (MIT) -// -// Copyright (c) 2021 Winlin -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. package vnet import ( + "context" "net" "sync" "time" - - "github.com/pion/transport/vnet" ) -// A UDP proxy between real server(net.UDPConn) and vnet.UDPConn. +// UDPProxy is a proxy between real server(net.UDPConn) and vnet.UDPConn. // // High level design: // .............................................. @@ -44,32 +23,9 @@ import ( // : | | ............................: // : +--------+ : // ............................................... -// -// The whole big picture: -// ...................................... -// : Virtual Network (vnet) : -// : : -// +-------+ * 1 +----+ +--------+ : -// | :App |------------>|:Net|--o<-----|:Router | ............................. -// +-------+ +----+ | | : UDPProxy : -// +-----------+ * 1 +----+ | | +----+ +---------+ +---------+ +--------+ -// |:STUNServer|-------->|:Net|--o<-----| |--->o--|:Net|-->o-| vnet. |-->o-| net. |--->-| :Real | -// +-----------+ +----+ | | +----+ | UDPConn | | UDPConn | | Server | -// +-----------+ * 1 +----+ | | : +---------+ +---------+ +--------+ -// |:TURNServer|-------->|:Net|--o<-----| | ............................: -// +-----------+ +----+ [1] | | : -// : 1 | | 1 <> : -// : +---<>| |<>----+ [2] : -// : | +--------+ | : -// To form | *| v 0..1 : -// a subnet tree | o [3] +-----+ : -// : | ^ |:NAT | : -// : | | +-----+ : -// : +-------+ : -// ...................................... type UDPProxy struct { // The router bind to. - router *vnet.Router + router *Router // Each vnet source, bind to a real socket to server. // key is real server addr, which is net.Addr @@ -88,19 +44,22 @@ type UDPProxy struct { // NewProxy create a proxy, the router for this proxy belongs/bind to. If need to proxy for // please create a new proxy for each router. For all addresses we proxy, we will create a // vnet.Net in this router and proxy all packets. -func NewProxy(router *vnet.Router) (*UDPProxy, error) { +func NewProxy(router *Router) (*UDPProxy, error) { v := &UDPProxy{router: router, timeout: 2 * time.Minute} return v, nil } // Close the proxy, stop all workers. func (v *UDPProxy) Close() error { - // nolint:godox // TODO: FIXME: Do cleanup. + v.workers.Range(func(key, value interface{}) bool { + _ = value.(*aUDPProxyWorker).Close() + return true + }) return nil } // Proxy starts a worker for server, ignore if already started. -func (v *UDPProxy) Proxy(client *vnet.Net, server *net.UDPAddr) error { +func (v *UDPProxy) Proxy(client *Net, server *net.UDPAddr) error { // Note that even if the worker exists, it's also ok to create a same worker, // because the router will use the last one, and the real server will see a address // change event after we switch to the next worker. @@ -113,25 +72,44 @@ func (v *UDPProxy) Proxy(client *vnet.Net, server *net.UDPAddr) error { worker := &aUDPProxyWorker{ router: v.router, mockRealServerAddr: v.mockRealServerAddr, } + + // Create context for cleanup. + var ctx context.Context + ctx, worker.ctxDisposeCancel = context.WithCancel(context.Background()) + v.workers.Store(server.String(), worker) - return worker.Proxy(client, server) + return worker.Proxy(ctx, client, server) } // A proxy worker for a specified proxy server. type aUDPProxyWorker struct { - router *vnet.Router + router *Router mockRealServerAddr *net.UDPAddr // Each vnet source, bind to a real socket to server. // key is vnet client addr, which is net.Addr // value is *net.UDPConn endpoints sync.Map + + // For cleanup. + ctxDisposeCancel context.CancelFunc + wg sync.WaitGroup } -func (v *aUDPProxyWorker) Proxy(client *vnet.Net, serverAddr *net.UDPAddr) error { // nolint:gocognit +func (v *aUDPProxyWorker) Close() error { + // Notify all goroutines to dispose. + v.ctxDisposeCancel() + + // Wait for all goroutines quit. + v.wg.Wait() + + return nil +} + +func (v *aUDPProxyWorker) Proxy(ctx context.Context, client *Net, serverAddr *net.UDPAddr) error { // nolint:gocognit // Create vnet for real server by serverAddr. - nw := vnet.NewNet(&vnet.NetConfig{ + nw := NewNet(&NetConfig{ StaticIP: serverAddr.IP.String(), }) if err := v.router.AddNet(nw); err != nil { @@ -145,10 +123,71 @@ func (v *aUDPProxyWorker) Proxy(client *vnet.Net, serverAddr *net.UDPAddr) error return err } - // Start a proxy goroutine. - var findEndpointBy func(addr net.Addr) (*net.UDPConn, error) - // nolint:godox // TODO: FIXME: Do cleanup. + // User stop proxy, we should close the socket. go func() { + <-ctx.Done() + _ = vnetSocket.Close() + }() + + // Got new vnet client, start a new endpoint. + findEndpointBy := func(addr net.Addr) (*net.UDPConn, error) { + // Exists binding. + if value, ok := v.endpoints.Load(addr.String()); ok { + // Exists endpoint, reuse it. + return value.(*net.UDPConn), nil + } + + // The real server we proxy to, for utest to mock it. + realAddr := serverAddr + if v.mockRealServerAddr != nil { + realAddr = v.mockRealServerAddr + } + + // Got new vnet client, create new endpoint. + realSocket, err := net.DialUDP("udp4", nil, realAddr) + if err != nil { + return nil, err + } + + // User stop proxy, we should close the socket. + go func() { + <-ctx.Done() + _ = realSocket.Close() + }() + + // Bind address. + v.endpoints.Store(addr.String(), realSocket) + + // Got packet from real serverAddr, we should proxy it to vnet. + v.wg.Add(1) + go func(vnetClientAddr net.Addr) { + defer v.wg.Done() + + buf := make([]byte, 1500) + for { + n, _, err := realSocket.ReadFrom(buf) + if err != nil { + return + } + + if n <= 0 { + continue // Drop packet + } + + if _, err := vnetSocket.WriteTo(buf[:n], vnetClientAddr); err != nil { + return + } + } + }(addr) + + return realSocket, nil + } + + // Start a proxy goroutine. + v.wg.Add(1) + go func() { + defer v.wg.Done() + buf := make([]byte, 1500) for { @@ -172,51 +211,5 @@ func (v *aUDPProxyWorker) Proxy(client *vnet.Net, serverAddr *net.UDPAddr) error } }() - // Got new vnet client, start a new endpoint. - findEndpointBy = func(addr net.Addr) (*net.UDPConn, error) { - // Exists binding. - if value, ok := v.endpoints.Load(addr.String()); ok { - // Exists endpoint, reuse it. - return value.(*net.UDPConn), nil - } - - // The real server we proxy to, for utest to mock it. - realAddr := serverAddr - if v.mockRealServerAddr != nil { - realAddr = v.mockRealServerAddr - } - - // Got new vnet client, create new endpoint. - realSocket, err := net.DialUDP("udp4", nil, realAddr) - if err != nil { - return nil, err - } - - // Bind address. - v.endpoints.Store(addr.String(), realSocket) - - // Got packet from real serverAddr, we should proxy it to vnet. - // nolint:godox // TODO: FIXME: Do cleanup. - go func(vnetClientAddr net.Addr) { - buf := make([]byte, 1500) - for { - n, _, err := realSocket.ReadFrom(buf) - if err != nil { - return - } - - if n <= 0 { - continue // Drop packet - } - - if _, err := vnetSocket.WriteTo(buf[:n], vnetClientAddr); err != nil { - return - } - } - }(addr) - - return realSocket, nil - } - return nil } diff --git a/trunk/3rdparty/srs-bench/vnet/udpproxy_test.go b/trunk/3rdparty/srs-bench/vnet/udpproxy_test.go index 5ab482bca..e5689bc18 100644 --- a/trunk/3rdparty/srs-bench/vnet/udpproxy_test.go +++ b/trunk/3rdparty/srs-bench/vnet/udpproxy_test.go @@ -1,23 +1,5 @@ -// The MIT License (MIT) -// -// Copyright (c) 2021 Winlin -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// +build !wasm + package vnet import ( @@ -32,7 +14,6 @@ import ( "time" "github.com/pion/logging" - "github.com/pion/transport/vnet" ) type MockUDPEchoServer struct { @@ -163,7 +144,7 @@ func TestUDPProxyOne2One(t *testing.T) { } doVnetProxy := func() error { - router, err := vnet.NewRouter(&vnet.RouterConfig{ + router, err := NewRouter(&RouterConfig{ CIDR: "0.0.0.0/0", LoggerFactory: logging.NewDefaultLoggerFactory(), }) @@ -171,7 +152,7 @@ func TestUDPProxyOne2One(t *testing.T) { return err } - clientNetwork := vnet.NewNet(&vnet.NetConfig{ + clientNetwork := NewNet(&NetConfig{ StaticIP: "10.0.0.11", }) if err = router.AddNet(clientNetwork); err != nil { @@ -309,7 +290,7 @@ func TestUDPProxyTwo2One(t *testing.T) { } doVnetProxy := func() error { - router, err := vnet.NewRouter(&vnet.RouterConfig{ + router, err := NewRouter(&RouterConfig{ CIDR: "0.0.0.0/0", LoggerFactory: logging.NewDefaultLoggerFactory(), }) @@ -317,7 +298,7 @@ func TestUDPProxyTwo2One(t *testing.T) { return err } - clientNetwork := vnet.NewNet(&vnet.NetConfig{ + clientNetwork := NewNet(&NetConfig{ StaticIP: "10.0.0.11", }) if err = router.AddNet(clientNetwork); err != nil { @@ -487,7 +468,7 @@ func TestUDPProxyProxyTwice(t *testing.T) { } doVnetProxy := func() error { - router, err := vnet.NewRouter(&vnet.RouterConfig{ + router, err := NewRouter(&RouterConfig{ CIDR: "0.0.0.0/0", LoggerFactory: logging.NewDefaultLoggerFactory(), }) @@ -495,7 +476,7 @@ func TestUDPProxyProxyTwice(t *testing.T) { return err } - clientNetwork := vnet.NewNet(&vnet.NetConfig{ + clientNetwork := NewNet(&NetConfig{ StaticIP: "10.0.0.11", }) if err = router.AddNet(clientNetwork); err != nil { diff --git a/trunk/3rdparty/srs-bench/vnet/vnet.go b/trunk/3rdparty/srs-bench/vnet/vnet.go new file mode 100644 index 000000000..ceab0847f --- /dev/null +++ b/trunk/3rdparty/srs-bench/vnet/vnet.go @@ -0,0 +1,38 @@ +// The MIT License (MIT) +// +// Copyright (c) 2021 Winlin +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +package vnet + +import ( + "github.com/pion/transport/vnet" +) + +type Router = vnet.Router +type Net = vnet.Net + +type NetConfig = vnet.NetConfig +type RouterConfig = vnet.RouterConfig + +func NewNet(config *NetConfig) *Net { + return vnet.NewNet(config) +} +func NewRouter(config *RouterConfig) (*Router, error) { + return vnet.NewRouter(config) +} diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 65fb9b421..b31eec2e3 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -429,19 +429,18 @@ srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map::iterator it = sub_relations.begin(); - while (it != sub_relations.end()) { - if (it->second->type_ == "audio") { - SrsRtcAudioSendTrack* track = new SrsRtcAudioSendTrack(session_, it->second); - audio_tracks_.insert(make_pair(it->first, track)); - } + for (map::iterator it = sub_relations.begin(); it != sub_relations.end(); ++it) { + uint32_t ssrc = it->first; + SrsRtcTrackDescription* desc = it->second; - if (it->second->type_ == "video") { - SrsRtcVideoSendTrack* track = new SrsRtcVideoSendTrack(session_, it->second); - video_tracks_.insert(make_pair(it->first, track)); - } - ++it; + if (desc->type_ == "audio") { + SrsRtcAudioSendTrack* track = new SrsRtcAudioSendTrack(session_, desc); + audio_tracks_.insert(make_pair(ssrc, track)); + } + + if (desc->type_ == "video") { + SrsRtcVideoSendTrack* track = new SrsRtcVideoSendTrack(session_, desc); + video_tracks_.insert(make_pair(ssrc, track)); } } @@ -605,7 +604,7 @@ srs_error_t SrsRtcPlayStream::send_packet(SrsRtpPacket2*& pkt) // TODO: FIXME: Maybe refine for performance issue. if (!audio_tracks_.count(pkt->header.get_ssrc()) && !video_tracks_.count(pkt->header.get_ssrc())) { - srs_warn("ssrc %u not found", pkt->header.get_ssrc()); + srs_warn("RTC: Drop for ssrc %u not found", pkt->header.get_ssrc()); return err; }