mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Test: Add RTMP to RTC regression test.
This commit is contained in:
parent
f8b4121497
commit
0b99f06eff
3 changed files with 150 additions and 29 deletions
54
trunk/3rdparty/srs-bench/srs/rtc_test.go
vendored
54
trunk/3rdparty/srs-bench/srs/rtc_test.go
vendored
|
@ -859,6 +859,9 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) {
|
||||||
//
|
//
|
||||||
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
|
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
|
||||||
func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) {
|
func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
@ -872,7 +875,6 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
|
||||||
}
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||||
var nnRTCP, nnRTP int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||||
|
@ -921,7 +923,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
|
||||||
|
|
||||||
return p.Run(ctx, cancel)
|
return p.Run(ctx, cancel)
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -937,6 +939,9 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
|
||||||
// openssl will create a new ClientHello with increased sequence. It's ok, but waste a lots of duplicated ClientHello
|
// openssl will create a new ClientHello with increased sequence. It's ok, but waste a lots of duplicated ClientHello
|
||||||
// packets, so we fail the test, requires the epoch+sequence never dup, even for ARQ.
|
// packets, so we fail the test, requires the epoch+sequence never dup, even for ARQ.
|
||||||
func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) {
|
func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
@ -950,7 +955,6 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
|
||||||
}
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||||
var nnRTCP, nnRTP int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||||
|
@ -999,7 +1003,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
|
||||||
|
|
||||||
return p.Run(ctx, cancel)
|
return p.Run(ctx, cancel)
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1014,6 +1018,9 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
|
||||||
//
|
//
|
||||||
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
|
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
|
||||||
func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) {
|
func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0, r1 error
|
var r0, r1 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
@ -1027,7 +1034,6 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
|
||||||
}
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||||
var nnRTCP, nnRTP int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||||
|
@ -1085,7 +1091,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
|
||||||
|
|
||||||
return p.Run(ctx, cancel)
|
return p.Run(ctx, cancel)
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0, r1); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1102,6 +1108,9 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
|
||||||
// openssl will create a new ClientHello with increased sequence. It's ok, but waste a lots of duplicated ClientHello
|
// openssl will create a new ClientHello with increased sequence. It's ok, but waste a lots of duplicated ClientHello
|
||||||
// packets, so we fail the test, requires the epoch+sequence never dup, even for ARQ.
|
// packets, so we fail the test, requires the epoch+sequence never dup, even for ARQ.
|
||||||
func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) {
|
func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0, r1 error
|
var r0, r1 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
@ -1115,7 +1124,6 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
|
||||||
}
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||||
var nnRTCP, nnRTP int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||||
|
@ -1173,7 +1181,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
|
||||||
|
|
||||||
return p.Run(ctx, cancel)
|
return p.Run(ctx, cancel)
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0, r1); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1187,6 +1195,9 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
|
||||||
//
|
//
|
||||||
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
|
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
|
||||||
func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) {
|
func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
@ -1200,7 +1211,6 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
|
||||||
}
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||||
var nnRTCP, nnRTP int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||||
|
@ -1249,7 +1259,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
|
||||||
|
|
||||||
return p.Run(ctx, cancel)
|
return p.Run(ctx, cancel)
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1264,6 +1274,9 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
|
||||||
// @remark If retransmit the Certificate, with the same epoch+sequence, peer will drop the message. It's ok right now, but
|
// @remark If retransmit the Certificate, with the same epoch+sequence, peer will drop the message. It's ok right now, but
|
||||||
// wast some packets, so we check the epoch+sequence which should never dup, even for ARQ.
|
// wast some packets, so we check the epoch+sequence which should never dup, even for ARQ.
|
||||||
func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) {
|
func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
@ -1277,7 +1290,6 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
|
||||||
}
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||||
var nnRTCP, nnRTP int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||||
|
@ -1326,7 +1338,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
|
||||||
|
|
||||||
return p.Run(ctx, cancel)
|
return p.Run(ctx, cancel)
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1341,6 +1353,9 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
|
||||||
//
|
//
|
||||||
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
|
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
|
||||||
func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) {
|
func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0, r1 error
|
var r0, r1 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
@ -1354,7 +1369,6 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
|
||||||
}
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||||
var nnRTCP, nnRTP int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||||
|
@ -1411,7 +1425,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
|
||||||
|
|
||||||
return p.Run(ctx, cancel)
|
return p.Run(ctx, cancel)
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0, r1); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1427,6 +1441,9 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
|
||||||
// @remark If retransmit the Certificate, with the same epoch+sequence, peer will drop the message, and never generate the
|
// @remark If retransmit the Certificate, with the same epoch+sequence, peer will drop the message, and never generate the
|
||||||
// ChangeCipherSpec, which will cause DTLS fail.
|
// ChangeCipherSpec, which will cause DTLS fail.
|
||||||
func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) {
|
func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0, r1 error
|
var r0, r1 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
@ -1440,7 +1457,6 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
|
||||||
}
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||||
var nnRTCP, nnRTP int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||||
|
@ -1497,7 +1513,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
|
||||||
|
|
||||||
return p.Run(ctx, cancel)
|
return p.Run(ctx, cancel)
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0, r1); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1795,6 +1811,9 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) {
|
||||||
// If we retransmit 2 ClientHello packets, consumed 150ms, server might wait at 200ms.
|
// If we retransmit 2 ClientHello packets, consumed 150ms, server might wait at 200ms.
|
||||||
// Then we retransmit the Certificate, server reset the timer and retransmit it in 50ms, not 200ms.
|
// Then we retransmit the Certificate, server reset the timer and retransmit it in 50ms, not 200ms.
|
||||||
func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
|
func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0 error
|
var r0 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
@ -1808,7 +1827,6 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
|
||||||
}
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||||
var nnRTCP, nnRTP int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||||
|
@ -1873,7 +1891,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
|
||||||
|
|
||||||
return p.Run(ctx, cancel)
|
return p.Run(ctx, cancel)
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
98
trunk/3rdparty/srs-bench/srs/rtmp_test.go
vendored
98
trunk/3rdparty/srs-bench/srs/rtmp_test.go
vendored
|
@ -25,6 +25,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/ossrs/go-oryx-lib/avc"
|
"github.com/ossrs/go-oryx-lib/avc"
|
||||||
"github.com/ossrs/go-oryx-lib/flv"
|
"github.com/ossrs/go-oryx-lib/flv"
|
||||||
|
"github.com/pion/interceptor"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -36,6 +37,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRtmpPublishPlay(t *testing.T) {
|
func TestRtmpPublishPlay(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0, r1 error
|
var r0, r1 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
publisher := NewRTMPPublisher()
|
publisher := NewRTMPPublisher()
|
||||||
|
@ -45,7 +49,6 @@ func TestRtmpPublishPlay(t *testing.T) {
|
||||||
defer player.Close()
|
defer player.Close()
|
||||||
|
|
||||||
// Connect to RTMP URL.
|
// Connect to RTMP URL.
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
|
||||||
rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix)
|
rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix)
|
||||||
|
|
||||||
|
@ -92,12 +95,100 @@ func TestRtmpPublishPlay(t *testing.T) {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0, r1); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
|
||||||
|
t.Errorf("err %+v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRtmpPublish_RtcPlay(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
|
var r0, r1 error
|
||||||
|
err := func() (err error) {
|
||||||
|
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
|
||||||
|
rtmpUrl := fmt.Sprintf("%v://%v%v-%v", srsSchema, *srsServer, *srsStream, streamSuffix)
|
||||||
|
|
||||||
|
// Publisher connect to a RTMP stream.
|
||||||
|
publisher := NewRTMPPublisher()
|
||||||
|
defer publisher.Close()
|
||||||
|
|
||||||
|
if err := publisher.Publish(ctx, rtmpUrl); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup the RTC player.
|
||||||
|
var thePlayer *testPlayer
|
||||||
|
if thePlayer, err = newTestPlayer(registerMiniCodecs, func(play *testPlayer) error {
|
||||||
|
play.streamSuffix = streamSuffix
|
||||||
|
var nnPlayReadRTP uint64
|
||||||
|
return play.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||||
|
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||||
|
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
nn, attr, err := i.nextRTPReader.Read(payload, attributes)
|
||||||
|
if err == nil {
|
||||||
|
if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) {
|
||||||
|
cancel() // Completed.
|
||||||
|
}
|
||||||
|
logger.Tf(ctx, "Play RECV RTP #%v %vB", nnPlayReadRTP, nn)
|
||||||
|
}
|
||||||
|
return nn, attr, err
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer thePlayer.Close()
|
||||||
|
|
||||||
|
// Run publisher and players.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
defer wg.Wait()
|
||||||
|
|
||||||
|
var playerIceReady context.Context
|
||||||
|
playerIceReady, thePlayer.iceReadyCancel = context.WithCancel(ctx)
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
if r1 = thePlayer.Run(logger.WithContext(ctx), cancel); r1 != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
logger.Tf(ctx, "player done")
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
// Wait for player ready.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-playerIceReady.Done():
|
||||||
|
}
|
||||||
|
|
||||||
|
publisher.onSendPacket = func(m *rtmp.Message) error {
|
||||||
|
time.Sleep(100 * time.Microsecond)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
logger.Tf(ctx, "publisher done")
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRtmpPublish_MultipleSequences(t *testing.T) {
|
func TestRtmpPublish_MultipleSequences(t *testing.T) {
|
||||||
|
ctx := logger.WithContext(context.Background())
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
||||||
|
|
||||||
var r0, r1, r2 error
|
var r0, r1, r2 error
|
||||||
err := func() error {
|
err := func() error {
|
||||||
publisher := NewRTMPPublisher()
|
publisher := NewRTMPPublisher()
|
||||||
|
@ -107,7 +198,6 @@ func TestRtmpPublish_MultipleSequences(t *testing.T) {
|
||||||
defer player.Close()
|
defer player.Close()
|
||||||
|
|
||||||
// Connect to RTMP URL.
|
// Connect to RTMP URL.
|
||||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
|
||||||
streamSuffix := fmt.Sprintf("rtmp-multi-spspps-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("rtmp-multi-spspps-%v-%v", os.Getpid(), rand.Int())
|
||||||
rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix)
|
rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix)
|
||||||
|
|
||||||
|
@ -186,7 +276,7 @@ func TestRtmpPublish_MultipleSequences(t *testing.T) {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}()
|
}()
|
||||||
if err := filterTestError(err, r0, r1, r2); err != nil {
|
if err := filterTestError(ctx.Err(), err, r0, r1, r2); err != nil {
|
||||||
t.Errorf("err %+v", err)
|
t.Errorf("err %+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
17
trunk/3rdparty/srs-bench/srs/util.go
vendored
17
trunk/3rdparty/srs-bench/srs/util.go
vendored
|
@ -751,6 +751,7 @@ type testPlayerOptionFunc func(p *testPlayer) error
|
||||||
type testPlayer struct {
|
type testPlayer struct {
|
||||||
onOffer func(s *webrtc.SessionDescription) error
|
onOffer func(s *webrtc.SessionDescription) error
|
||||||
onAnswer func(s *webrtc.SessionDescription) error
|
onAnswer func(s *webrtc.SessionDescription) error
|
||||||
|
iceReadyCancel context.CancelFunc
|
||||||
pc *webrtc.PeerConnection
|
pc *webrtc.PeerConnection
|
||||||
receivers []*webrtc.RTPReceiver
|
receivers []*webrtc.RTPReceiver
|
||||||
// We should dispose it.
|
// We should dispose it.
|
||||||
|
@ -910,8 +911,20 @@ func (v *testPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
|
||||||
})
|
})
|
||||||
|
|
||||||
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
||||||
if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
|
logger.Tf(ctx, "ICE state %v", state)
|
||||||
err = errors.Errorf("Close for ICE state %v", state)
|
})
|
||||||
|
|
||||||
|
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||||||
|
logger.Tf(ctx, "PC state %v", state)
|
||||||
|
|
||||||
|
if state == webrtc.PeerConnectionStateConnected {
|
||||||
|
if v.iceReadyCancel != nil {
|
||||||
|
v.iceReadyCancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed {
|
||||||
|
err = errors.Errorf("Close for PC state %v", state)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue