mirror of
https://github.com/ossrs/srs.git
synced 2025-02-13 20:01:56 +00:00
Test: Update srs-bench.
This commit is contained in:
parent
57f8a1afec
commit
c07b7db3c5
14 changed files with 1098 additions and 936 deletions
2
trunk/3rdparty/srs-bench/main.go
vendored
2
trunk/3rdparty/srs-bench/main.go
vendored
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
|
11
trunk/3rdparty/srs-bench/srs/ingester.go
vendored
11
trunk/3rdparty/srs-bench/srs/ingester.go
vendored
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
@ -22,6 +22,11 @@ package srs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ossrs/go-oryx-lib/errors"
|
"github.com/ossrs/go-oryx-lib/errors"
|
||||||
"github.com/ossrs/go-oryx-lib/logger"
|
"github.com/ossrs/go-oryx-lib/logger"
|
||||||
"github.com/pion/interceptor"
|
"github.com/pion/interceptor"
|
||||||
|
@ -31,10 +36,6 @@ import (
|
||||||
"github.com/pion/webrtc/v3/pkg/media"
|
"github.com/pion/webrtc/v3/pkg/media"
|
||||||
"github.com/pion/webrtc/v3/pkg/media/h264reader"
|
"github.com/pion/webrtc/v3/pkg/media/h264reader"
|
||||||
"github.com/pion/webrtc/v3/pkg/media/oggreader"
|
"github.com/pion/webrtc/v3/pkg/media/oggreader"
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type videoIngester struct {
|
type videoIngester struct {
|
||||||
|
|
23
trunk/3rdparty/srs-bench/srs/interceptor.go
vendored
23
trunk/3rdparty/srs-bench/srs/interceptor.go
vendored
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
@ -31,14 +31,13 @@ type RTPInterceptorOptionFunc func(i *RTPInterceptor)
|
||||||
// Common RTP packet interceptor for benchmark.
|
// Common RTP packet interceptor for benchmark.
|
||||||
// @remark Should never merge with RTCPInterceptor, because they has the same Write interface.
|
// @remark Should never merge with RTCPInterceptor, because they has the same Write interface.
|
||||||
type RTPInterceptor struct {
|
type RTPInterceptor struct {
|
||||||
localInfo *interceptor.StreamInfo
|
|
||||||
remoteInfo *interceptor.StreamInfo
|
|
||||||
// If rtpReader is nil, use the default next one to read.
|
// If rtpReader is nil, use the default next one to read.
|
||||||
rtpReader interceptor.RTPReaderFunc
|
rtpReader interceptor.RTPReaderFunc
|
||||||
nextRTPReader interceptor.RTPReader
|
nextRTPReader interceptor.RTPReader
|
||||||
// If rtpWriter is nil, use the default next one to write.
|
// If rtpWriter is nil, use the default next one to write.
|
||||||
rtpWriter interceptor.RTPWriterFunc
|
rtpWriter interceptor.RTPWriterFunc
|
||||||
nextRTPWriter interceptor.RTPWriter
|
nextRTPWriter interceptor.RTPWriter
|
||||||
|
// Other common fields.
|
||||||
BypassInterceptor
|
BypassInterceptor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,11 +50,6 @@ func NewRTPInterceptor(options ...RTPInterceptorOptionFunc) *RTPInterceptor {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *RTPInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
|
func (v *RTPInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
|
||||||
if v.localInfo != nil {
|
|
||||||
return writer // Only handle one stream.
|
|
||||||
}
|
|
||||||
|
|
||||||
v.localInfo = info
|
|
||||||
v.nextRTPWriter = writer
|
v.nextRTPWriter = writer
|
||||||
return v // Handle all RTP
|
return v // Handle all RTP
|
||||||
}
|
}
|
||||||
|
@ -68,17 +62,9 @@ func (v *RTPInterceptor) Write(header *rtp.Header, payload []byte, attributes in
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *RTPInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
|
func (v *RTPInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
|
||||||
if v.localInfo == nil || v.localInfo.ID != info.ID {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
v.localInfo = nil // Reset the interceptor.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *RTPInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
|
func (v *RTPInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
|
||||||
if v.remoteInfo != nil {
|
|
||||||
return reader // Only handle one stream.
|
|
||||||
}
|
|
||||||
|
|
||||||
v.nextRTPReader = reader
|
v.nextRTPReader = reader
|
||||||
return v // Handle all RTP
|
return v // Handle all RTP
|
||||||
}
|
}
|
||||||
|
@ -91,10 +77,6 @@ func (v *RTPInterceptor) Read(b []byte, a interceptor.Attributes) (int, intercep
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *RTPInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
|
func (v *RTPInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
|
||||||
if v.remoteInfo == nil || v.remoteInfo.ID != info.ID {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
v.remoteInfo = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type RTCPInterceptorOptionFunc func(i *RTCPInterceptor)
|
type RTCPInterceptorOptionFunc func(i *RTCPInterceptor)
|
||||||
|
@ -108,6 +90,7 @@ type RTCPInterceptor struct {
|
||||||
// If rtcpWriter is nil, use the default next one to write.
|
// If rtcpWriter is nil, use the default next one to write.
|
||||||
rtcpWriter interceptor.RTCPWriterFunc
|
rtcpWriter interceptor.RTCPWriterFunc
|
||||||
nextRTCPWriter interceptor.RTCPWriter
|
nextRTCPWriter interceptor.RTCPWriter
|
||||||
|
// Other common fields.
|
||||||
BypassInterceptor
|
BypassInterceptor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
9
trunk/3rdparty/srs-bench/srs/player.go
vendored
9
trunk/3rdparty/srs-bench/srs/player.go
vendored
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
@ -23,6 +23,10 @@ package srs
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ossrs/go-oryx-lib/errors"
|
"github.com/ossrs/go-oryx-lib/errors"
|
||||||
"github.com/ossrs/go-oryx-lib/logger"
|
"github.com/ossrs/go-oryx-lib/logger"
|
||||||
"github.com/pion/interceptor"
|
"github.com/pion/interceptor"
|
||||||
|
@ -33,9 +37,6 @@ import (
|
||||||
"github.com/pion/webrtc/v3/pkg/media/h264writer"
|
"github.com/pion/webrtc/v3/pkg/media/h264writer"
|
||||||
"github.com/pion/webrtc/v3/pkg/media/ivfwriter"
|
"github.com/pion/webrtc/v3/pkg/media/ivfwriter"
|
||||||
"github.com/pion/webrtc/v3/pkg/media/oggwriter"
|
"github.com/pion/webrtc/v3/pkg/media/oggwriter"
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// @see https://github.com/pion/webrtc/blob/master/examples/save-to-disk/main.go
|
// @see https://github.com/pion/webrtc/blob/master/examples/save-to-disk/main.go
|
||||||
|
|
9
trunk/3rdparty/srs-bench/srs/publisher.go
vendored
9
trunk/3rdparty/srs-bench/srs/publisher.go
vendored
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
@ -22,14 +22,15 @@ package srs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ossrs/go-oryx-lib/errors"
|
"github.com/ossrs/go-oryx-lib/errors"
|
||||||
"github.com/ossrs/go-oryx-lib/logger"
|
"github.com/ossrs/go-oryx-lib/logger"
|
||||||
"github.com/pion/interceptor"
|
"github.com/pion/interceptor"
|
||||||
"github.com/pion/sdp/v3"
|
"github.com/pion/sdp/v3"
|
||||||
"github.com/pion/webrtc/v3"
|
"github.com/pion/webrtc/v3"
|
||||||
"io"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// @see https://github.com/pion/webrtc/blob/master/examples/play-from-disk/main.go
|
// @see https://github.com/pion/webrtc/blob/master/examples/play-from-disk/main.go
|
||||||
|
|
527
trunk/3rdparty/srs-bench/srs/rtc_test.go
vendored
527
trunk/3rdparty/srs-bench/srs/rtc_test.go
vendored
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
@ -23,19 +23,39 @@ package srs
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/ossrs/go-oryx-lib/errors"
|
|
||||||
"github.com/ossrs/go-oryx-lib/logger"
|
|
||||||
"github.com/pion/interceptor"
|
|
||||||
"github.com/pion/rtcp"
|
|
||||||
"github.com/pion/rtp"
|
|
||||||
"github.com/pion/transport/vnet"
|
"github.com/pion/transport/vnet"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/ossrs/go-oryx-lib/errors"
|
||||||
|
"github.com/ossrs/go-oryx-lib/logger"
|
||||||
|
"github.com/pion/interceptor"
|
||||||
|
"github.com/pion/rtcp"
|
||||||
|
"github.com/pion/rtp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
if err := prepareTest(); err != nil {
|
||||||
|
logger.Ef(nil, "Prepare test fail, err %+v", err)
|
||||||
|
os.Exit(-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disable the logger during all tests.
|
||||||
|
if *srsLog == false {
|
||||||
|
olw := logger.Switch(ioutil.Discard)
|
||||||
|
defer func() {
|
||||||
|
logger.Switch(olw)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
|
|
||||||
// Basic use scenario, publish a stream, then play it.
|
// Basic use scenario, publish a stream, then play it.
|
||||||
func TestRtcBasic_PublishPlay(t *testing.T) {
|
func TestRtcBasic_PublishPlay(t *testing.T) {
|
||||||
ctx := logger.WithContext(context.Background())
|
ctx := logger.WithContext(context.Background())
|
||||||
|
@ -50,12 +70,20 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
|
||||||
}
|
}
|
||||||
}(ctx)
|
}(ctx)
|
||||||
|
|
||||||
|
var resources []io.Closer
|
||||||
|
defer func() {
|
||||||
|
for _, resource := range resources {
|
||||||
|
resource.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
defer wg.Wait()
|
defer wg.Wait()
|
||||||
|
|
||||||
// The event notify.
|
// The event notify.
|
||||||
var thePublisher *TestPublisher
|
var thePublisher *TestPublisher
|
||||||
var thePlayer *TestPlayer
|
var thePlayer *TestPlayer
|
||||||
|
|
||||||
mainReady, mainReadyCancel := context.WithCancel(context.Background())
|
mainReady, mainReadyCancel := context.WithCancel(context.Background())
|
||||||
publishReady, publishReadyCancel := context.WithCancel(context.Background())
|
publishReady, publishReadyCancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
@ -66,76 +94,110 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
doInit := func() error {
|
doInit := func() error {
|
||||||
playOK := *srsPlayOKPackets
|
playOK, vnetClientIP := *srsPlayOKPackets, *srsVnetClientIP
|
||||||
vnetClientIP := *srsVnetClientIP
|
|
||||||
|
|
||||||
// Create top level test object.
|
|
||||||
api, err := NewTestWebRTCAPI()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer api.Close()
|
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int())
|
||||||
play := NewTestPlayer(api, func(play *TestPlayer) {
|
|
||||||
|
// Initialize player with private api.
|
||||||
|
if play, err := NewTestPlayer(nil, func(play *TestPlayer) error {
|
||||||
play.streamSuffix = streamSuffix
|
play.streamSuffix = streamSuffix
|
||||||
})
|
resources = append(resources, play)
|
||||||
defer play.Close()
|
|
||||||
|
|
||||||
pub := NewTestPublisher(api, func(pub *TestPublisher) {
|
api, err := NewTestWebRTCAPI()
|
||||||
pub.streamSuffix = streamSuffix
|
if err != nil {
|
||||||
pub.iceReadyCancel = publishReadyCancel
|
return err
|
||||||
})
|
}
|
||||||
defer pub.Close()
|
resources = append(resources, api)
|
||||||
|
play.api = api
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
var nnPlayWriteRTCP, nnPlayReadRTCP, nnPlayWriteRTP, nnPlayReadRTP uint64
|
||||||
var nnWriteRTP, nnReadRTP, nnWriteRTCP, nnReadRTCP int64
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
nn, attr, err := i.nextRTPReader.Read(buf, attributes)
|
if nnPlayReadRTP++; nnPlayReadRTP >= uint64(playOK) {
|
||||||
nnReadRTP++
|
cancel() // Completed.
|
||||||
return nn, attr, err
|
}
|
||||||
}
|
logger.Tf(ctx, "Play rtp=(recv:%v, send:%v), rtcp=(recv:%v send:%v) packets",
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
nnPlayReadRTP, nnPlayWriteRTP, nnPlayReadRTCP, nnPlayWriteRTCP)
|
||||||
nn, err := i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTPReader.Read(payload, attributes)
|
||||||
|
|
||||||
nnWriteRTP++
|
|
||||||
logger.Tf(ctx, "publish rtp=(read:%v write:%v), rtcp=(read:%v write:%v) packets",
|
|
||||||
nnReadRTP, nnWriteRTP, nnReadRTCP, nnWriteRTCP)
|
|
||||||
return nn, err
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
|
||||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
|
||||||
nn, attr, err := i.nextRTCPReader.Read(buf, attributes)
|
|
||||||
nnReadRTCP++
|
|
||||||
return nn, attr, err
|
|
||||||
}
|
|
||||||
i.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
|
|
||||||
nn, err := i.nextRTCPWriter.Write(pkts, attributes)
|
|
||||||
nnWriteRTCP++
|
|
||||||
return nn, err
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
}, func(api *TestWebRTCAPI) {
|
|
||||||
var nn uint64
|
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
|
||||||
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
|
||||||
if nn++; nn >= uint64(playOK) {
|
|
||||||
cancel() // Completed.
|
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "play got %v packets", nn)
|
}))
|
||||||
return i.nextRTPReader.Read(payload, attributes)
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
}
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
}))
|
nn, attr, err := i.nextRTCPReader.Read(buf, attributes)
|
||||||
|
nnPlayReadRTCP++
|
||||||
|
return nn, attr, err
|
||||||
|
}
|
||||||
|
i.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
|
||||||
|
nn, err := i.nextRTCPWriter.Write(pkts, attributes)
|
||||||
|
nnPlayWriteRTCP++
|
||||||
|
return nn, err
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
|
} else {
|
||||||
|
thePlayer = play
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the available objects.
|
// Initialize publisher with private api.
|
||||||
|
if pub, err := NewTestPublisher(nil, func(pub *TestPublisher) error {
|
||||||
|
pub.streamSuffix = streamSuffix
|
||||||
|
pub.iceReadyCancel = publishReadyCancel
|
||||||
|
resources = append(resources, pub)
|
||||||
|
|
||||||
|
api, err := NewTestWebRTCAPI()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resources = append(resources, api)
|
||||||
|
pub.api = api
|
||||||
|
|
||||||
|
var nnPubWriteRTCP, nnPubReadRTCP, nnPubWriteRTP, nnPubReadRTP uint64
|
||||||
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
|
i.rtpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
nn, attr, err := i.nextRTPReader.Read(buf, attributes)
|
||||||
|
nnPubReadRTP++
|
||||||
|
return nn, attr, err
|
||||||
|
}
|
||||||
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
|
nn, err := i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
nnPubWriteRTP++
|
||||||
|
logger.Tf(ctx, "Publish rtp=(recv:%v, send:%v), rtcp=(recv:%v send:%v) packets",
|
||||||
|
nnPubReadRTP, nnPubWriteRTP, nnPubReadRTCP, nnPubWriteRTCP)
|
||||||
|
return nn, err
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
nn, attr, err := i.nextRTCPReader.Read(buf, attributes)
|
||||||
|
nnPubReadRTCP++
|
||||||
|
return nn, attr, err
|
||||||
|
}
|
||||||
|
i.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
|
||||||
|
nn, err := i.nextRTCPWriter.Write(pkts, attributes)
|
||||||
|
nnPubWriteRTCP++
|
||||||
|
return nn, err
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
thePublisher = pub
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init done.
|
||||||
mainReadyCancel()
|
mainReadyCancel()
|
||||||
thePublisher = pub
|
|
||||||
thePlayer = play
|
|
||||||
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
return nil
|
return nil
|
||||||
|
@ -158,17 +220,10 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
|
||||||
case <-mainReady.Done():
|
case <-mainReady.Done():
|
||||||
}
|
}
|
||||||
|
|
||||||
doPublish := func() error {
|
if err := thePublisher.Run(logger.WithContext(ctx), cancel); err != nil {
|
||||||
if err := thePublisher.Run(logger.WithContext(ctx), cancel); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Tf(ctx, "pub done")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err := doPublish(); err != nil {
|
|
||||||
r2 = err
|
r2 = err
|
||||||
}
|
}
|
||||||
|
logger.Tf(ctx, "pub done")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Run player.
|
// Run player.
|
||||||
|
@ -177,30 +232,16 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-mainReady.Done():
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-publishReady.Done():
|
case <-publishReady.Done():
|
||||||
}
|
}
|
||||||
|
|
||||||
doPlay := func() error {
|
if err := thePlayer.Run(logger.WithContext(ctx), cancel); err != nil {
|
||||||
if err := thePlayer.Run(logger.WithContext(ctx), cancel); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Tf(ctx, "play done")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err := doPlay(); err != nil {
|
|
||||||
r3 = err
|
r3 = err
|
||||||
}
|
}
|
||||||
|
logger.Tf(ctx, "play done")
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,21 +263,31 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) {
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupActive
|
p.onOffer = testUtilSetupActive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -276,21 +327,31 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) {
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -327,21 +388,31 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) {
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupActive
|
p.onOffer = testUtilSetupActive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -385,21 +456,31 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) {
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -450,21 +531,31 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupActive
|
p.onOffer = testUtilSetupActive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -527,21 +618,31 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -603,21 +704,31 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupActive
|
p.onOffer = testUtilSetupActive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -690,21 +801,31 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -774,21 +895,31 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupActive
|
p.onOffer = testUtilSetupActive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -850,21 +981,31 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -926,21 +1067,31 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupActive
|
p.onOffer = testUtilSetupActive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -1011,21 +1162,31 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -1087,10 +1248,14 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) {
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
|
@ -1145,10 +1310,14 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) {
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
|
@ -1203,10 +1372,14 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) {
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
|
@ -1261,10 +1434,14 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) {
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
|
@ -1320,21 +1497,31 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) {
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
@ -1397,21 +1584,31 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
|
||||||
defer api.Close()
|
defer api.Close()
|
||||||
|
|
||||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||||
p := NewTestPublisher(api, func(p *TestPublisher) {
|
p, err := NewTestPublisher(api, func(p *TestPublisher) error {
|
||||||
p.streamSuffix = streamSuffix
|
p.streamSuffix = streamSuffix
|
||||||
p.onOffer = testUtilSetupPassive
|
p.onOffer = testUtilSetupPassive
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer p.Close()
|
defer p.Close()
|
||||||
|
|
||||||
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
if err := api.Setup(vnetClientIP, func(api *TestWebRTCAPI) {
|
||||||
var nn int64
|
var nnRTCP, nnRTP int64
|
||||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||||
if nn++; nn >= int64(publishOK) {
|
nnRTP++
|
||||||
|
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||||
|
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
if nnRTCP++; nnRTCP >= int64(publishOK) && nnRTP >= int64(publishOK) {
|
||||||
cancel() // Send enough packets, done.
|
cancel() // Send enough packets, done.
|
||||||
}
|
}
|
||||||
logger.Tf(ctx, "publish write %v packets", nn)
|
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
return i.nextRTCPReader.Read(buf, attributes)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}, func(api *TestWebRTCAPI) {
|
}, func(api *TestWebRTCAPI) {
|
||||||
|
|
5
trunk/3rdparty/srs-bench/srs/stat.go
vendored
5
trunk/3rdparty/srs-bench/srs/stat.go
vendored
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
@ -23,9 +23,10 @@ package srs
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/ossrs/go-oryx-lib/logger"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/ossrs/go-oryx-lib/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type statRTC struct {
|
type statRTC struct {
|
||||||
|
|
715
trunk/3rdparty/srs-bench/srs/util.go
vendored
715
trunk/3rdparty/srs-bench/srs/util.go
vendored
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
@ -24,21 +24,113 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/ossrs/go-oryx-lib/errors"
|
"io"
|
||||||
"github.com/ossrs/go-oryx-lib/logger"
|
|
||||||
"github.com/pion/transport/vnet"
|
|
||||||
"github.com/pion/webrtc/v3"
|
|
||||||
"github.com/pion/webrtc/v3/pkg/media/h264reader"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/ossrs/go-oryx-lib/errors"
|
||||||
|
"github.com/ossrs/go-oryx-lib/logger"
|
||||||
|
vnet_proxy "github.com/ossrs/srs-bench/vnet"
|
||||||
|
"github.com/pion/interceptor"
|
||||||
|
"github.com/pion/logging"
|
||||||
|
"github.com/pion/rtcp"
|
||||||
|
"github.com/pion/transport/vnet"
|
||||||
|
"github.com/pion/webrtc/v3"
|
||||||
|
"github.com/pion/webrtc/v3/pkg/media/h264reader"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var srsHttps *bool
|
||||||
|
var srsLog *bool
|
||||||
|
|
||||||
|
var srsTimeout *int
|
||||||
|
var srsPlayPLI *int
|
||||||
|
var srsPlayOKPackets *int
|
||||||
|
var srsPublishOKPackets *int
|
||||||
|
var srsPublishVideoFps *int
|
||||||
|
var srsDTLSDropPackets *int
|
||||||
|
|
||||||
|
var srsSchema string
|
||||||
|
var srsServer *string
|
||||||
|
var srsStream *string
|
||||||
|
var srsPublishAudio *string
|
||||||
|
var srsPublishVideo *string
|
||||||
|
var srsVnetClientIP *string
|
||||||
|
|
||||||
|
func prepareTest() error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API")
|
||||||
|
srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to")
|
||||||
|
srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play")
|
||||||
|
srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
|
||||||
|
srsTimeout = flag.Int("srs-timeout", 5000, "For each case, the timeout in ms")
|
||||||
|
srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.")
|
||||||
|
srsPlayOKPackets = flag.Int("srs-play-ok-packets", 10, "If recv N RTP packets, it's ok, or fail")
|
||||||
|
srsPublishOKPackets = flag.Int("srs-publish-ok-packets", 3, "If send N RTP, recv N RTCP packets, it's ok, or fail")
|
||||||
|
srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.")
|
||||||
|
srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.")
|
||||||
|
srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.")
|
||||||
|
srsVnetClientIP = flag.String("srs-vnet-client-ip", "192.168.168.168", "The client ip in pion/vnet.")
|
||||||
|
srsDTLSDropPackets = flag.Int("srs-dtls-drop-packets", 5, "If dropped N packets, it's ok, or fail")
|
||||||
|
|
||||||
|
// Should parse it first.
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
// The stream should starts with /, for example, /rtc/regression
|
||||||
|
if !strings.HasPrefix(*srsStream, "/") {
|
||||||
|
*srsStream = "/" + *srsStream
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate srs protocol from whether use HTTPS.
|
||||||
|
srsSchema = "http"
|
||||||
|
if *srsHttps {
|
||||||
|
srsSchema = "https"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check file.
|
||||||
|
tryOpenFile := func(filename string) (string, error) {
|
||||||
|
if filename == "" {
|
||||||
|
return filename, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Open(filename)
|
||||||
|
if err != nil {
|
||||||
|
nfilename := path.Join("../", filename)
|
||||||
|
f2, err := os.Open(nfilename)
|
||||||
|
if err != nil {
|
||||||
|
return filename, errors.Wrapf(err, "No video file at %v or %v", filename, nfilename)
|
||||||
|
}
|
||||||
|
defer f2.Close()
|
||||||
|
|
||||||
|
return nfilename, nil
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
return filename, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if *srsPublishVideo, err = tryOpenFile(*srsPublishVideo); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error) {
|
func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error) {
|
||||||
u, err := url.Parse(r)
|
u, err := url.Parse(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -367,7 +459,11 @@ type ChunkMessageType struct {
|
||||||
|
|
||||||
func (v *ChunkMessageType) String() string {
|
func (v *ChunkMessageType) String() string {
|
||||||
if v.chunk == ChunkTypeDTLS {
|
if v.chunk == ChunkTypeDTLS {
|
||||||
return fmt.Sprintf("%v-%v-%v", v.chunk, v.content, v.handshake)
|
if v.content == DTLSContentTypeHandshake {
|
||||||
|
return fmt.Sprintf("%v-%v-%v", v.chunk, v.content, v.handshake)
|
||||||
|
} else {
|
||||||
|
return fmt.Sprintf("%v-%v", v.chunk, v.content)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("%v", v.chunk)
|
return fmt.Sprintf("%v", v.chunk)
|
||||||
}
|
}
|
||||||
|
@ -466,3 +562,608 @@ func (v *DTLSRecord) Unmarshal(b []byte) error {
|
||||||
v.Data = b[13:]
|
v.Data = b[13:]
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TestWebRTCAPIOptionFunc func(api *TestWebRTCAPI)
|
||||||
|
|
||||||
|
type TestWebRTCAPI struct {
|
||||||
|
// The options to setup the api.
|
||||||
|
options []TestWebRTCAPIOptionFunc
|
||||||
|
// The api and settings.
|
||||||
|
api *webrtc.API
|
||||||
|
mediaEngine *webrtc.MediaEngine
|
||||||
|
registry *interceptor.Registry
|
||||||
|
settingEngine *webrtc.SettingEngine
|
||||||
|
// The vnet router, can be shared by different apis, but we do not share it.
|
||||||
|
router *vnet.Router
|
||||||
|
// The network for api.
|
||||||
|
network *vnet.Net
|
||||||
|
// The vnet UDP proxy bind to the router.
|
||||||
|
proxy *vnet_proxy.UDPProxy
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTestWebRTCAPI(options ...TestWebRTCAPIOptionFunc) (*TestWebRTCAPI, error) {
|
||||||
|
v := &TestWebRTCAPI{}
|
||||||
|
|
||||||
|
v.mediaEngine = &webrtc.MediaEngine{}
|
||||||
|
if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
v.registry = &interceptor.Registry{}
|
||||||
|
if err := webrtc.RegisterDefaultInterceptors(v.mediaEngine, v.registry); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, setup := range options {
|
||||||
|
setup(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
v.settingEngine = &webrtc.SettingEngine{}
|
||||||
|
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *TestWebRTCAPI) Close() error {
|
||||||
|
if v.proxy != nil {
|
||||||
|
v.proxy.Close()
|
||||||
|
v.proxy = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.router != nil {
|
||||||
|
v.router.Stop()
|
||||||
|
v.router = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *TestWebRTCAPI) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error {
|
||||||
|
// Setting engine for https://github.com/pion/transport/tree/master/vnet
|
||||||
|
setupVnet := func(vnetClientIP string) (err error) {
|
||||||
|
// We create a private router for a api, however, it's possible to share the
|
||||||
|
// same router between apis.
|
||||||
|
if v.router, err = vnet.NewRouter(&vnet.RouterConfig{
|
||||||
|
CIDR: "0.0.0.0/0", // Accept all ip, no sub router.
|
||||||
|
LoggerFactory: logging.NewDefaultLoggerFactory(),
|
||||||
|
}); err != nil {
|
||||||
|
return errors.Wrapf(err, "create router for api")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Each api should bind to a network, however, it's possible to share it
|
||||||
|
// for different apis.
|
||||||
|
v.network = vnet.NewNet(&vnet.NetConfig{
|
||||||
|
StaticIP: vnetClientIP,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err = v.router.AddNet(v.network); err != nil {
|
||||||
|
return errors.Wrapf(err, "create network for api")
|
||||||
|
}
|
||||||
|
|
||||||
|
v.settingEngine.SetVNet(v.network)
|
||||||
|
|
||||||
|
// Create a proxy bind to the router.
|
||||||
|
if v.proxy, err = vnet_proxy.NewProxy(v.router); err != nil {
|
||||||
|
return errors.Wrapf(err, "create proxy for router")
|
||||||
|
}
|
||||||
|
|
||||||
|
return v.router.Start()
|
||||||
|
}
|
||||||
|
if err := setupVnet(vnetClientIP); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, setup := range options {
|
||||||
|
setup(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, setup := range v.options {
|
||||||
|
setup(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
v.api = webrtc.NewAPI(
|
||||||
|
webrtc.WithMediaEngine(v.mediaEngine),
|
||||||
|
webrtc.WithInterceptorRegistry(v.registry),
|
||||||
|
webrtc.WithSettingEngine(*v.settingEngine),
|
||||||
|
)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *TestWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
|
||||||
|
return v.api.NewPeerConnection(configuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
type TestPlayerOptionFunc func(p *TestPlayer) error
|
||||||
|
|
||||||
|
type TestPlayer struct {
|
||||||
|
pc *webrtc.PeerConnection
|
||||||
|
receivers []*webrtc.RTPReceiver
|
||||||
|
// root api object
|
||||||
|
api *TestWebRTCAPI
|
||||||
|
// Optional suffix for stream url.
|
||||||
|
streamSuffix string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) (*TestPlayer, error) {
|
||||||
|
v := &TestPlayer{api: api}
|
||||||
|
|
||||||
|
for _, opt := range options {
|
||||||
|
if err := opt(v); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The api might be override by options.
|
||||||
|
api = v.api
|
||||||
|
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *TestPlayer) Close() error {
|
||||||
|
if v.pc != nil {
|
||||||
|
v.pc.Close()
|
||||||
|
v.pc = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, receiver := range v.receivers {
|
||||||
|
receiver.Stop()
|
||||||
|
}
|
||||||
|
v.receivers = nil
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
|
||||||
|
r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
|
||||||
|
if v.streamSuffix != "" {
|
||||||
|
r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
|
||||||
|
}
|
||||||
|
pli := time.Duration(*srsPlayPLI) * time.Millisecond
|
||||||
|
logger.Tf(ctx, "Start play url=%v", r)
|
||||||
|
|
||||||
|
pc, err := v.api.NewPeerConnection(webrtc.Configuration{})
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "Create PC")
|
||||||
|
}
|
||||||
|
v.pc = pc
|
||||||
|
|
||||||
|
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
|
||||||
|
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
||||||
|
})
|
||||||
|
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
|
||||||
|
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
||||||
|
})
|
||||||
|
|
||||||
|
offer, err := pc.CreateOffer(nil)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "Create Offer")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pc.SetLocalDescription(offer); err != nil {
|
||||||
|
return errors.Wrapf(err, "Set offer %v", offer)
|
||||||
|
}
|
||||||
|
|
||||||
|
answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a proxy for real server and vnet.
|
||||||
|
if address, err := parseAddressOfCandidate(answer); err != nil {
|
||||||
|
return errors.Wrapf(err, "parse address of %v", answer)
|
||||||
|
} else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
|
||||||
|
return errors.Wrapf(err, "proxy %v to %v", v.api.network, address)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
|
||||||
|
Type: webrtc.SDPTypeAnswer, SDP: answer,
|
||||||
|
}); err != nil {
|
||||||
|
return errors.Wrapf(err, "Set answer %v", answer)
|
||||||
|
}
|
||||||
|
|
||||||
|
handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error {
|
||||||
|
// Send a PLI on an interval so that the publisher is pushing a keyframe
|
||||||
|
go func() {
|
||||||
|
if track.Kind() == webrtc.RTPCodecTypeAudio {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(pli):
|
||||||
|
_ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
|
||||||
|
MediaSSRC: uint32(track.SSRC()),
|
||||||
|
}})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
v.receivers = append(v.receivers, receiver)
|
||||||
|
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
_, _, err := track.ReadRTP()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "Read RTP")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
||||||
|
err = handleTrack(ctx, track, receiver)
|
||||||
|
if err != nil {
|
||||||
|
codec := track.Codec()
|
||||||
|
err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType)
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
||||||
|
if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
|
||||||
|
err = errors.Errorf("Close for ICE state %v", state)
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type TestPublisherOptionFunc func(p *TestPublisher) error
|
||||||
|
|
||||||
|
type TestPublisher struct {
|
||||||
|
onOffer func(s *webrtc.SessionDescription) error
|
||||||
|
onAnswer func(s *webrtc.SessionDescription) error
|
||||||
|
iceReadyCancel context.CancelFunc
|
||||||
|
// internal objects
|
||||||
|
aIngester *audioIngester
|
||||||
|
vIngester *videoIngester
|
||||||
|
pc *webrtc.PeerConnection
|
||||||
|
// root api object
|
||||||
|
api *TestWebRTCAPI
|
||||||
|
// Optional suffix for stream url.
|
||||||
|
streamSuffix string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (*TestPublisher, error) {
|
||||||
|
sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio
|
||||||
|
|
||||||
|
v := &TestPublisher{api: api}
|
||||||
|
|
||||||
|
for _, opt := range options {
|
||||||
|
if err := opt(v); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The api might be override by options.
|
||||||
|
api = v.api
|
||||||
|
|
||||||
|
// Create ingesters.
|
||||||
|
if sourceAudio != "" {
|
||||||
|
v.aIngester = NewAudioIngester(sourceAudio)
|
||||||
|
}
|
||||||
|
if sourceVideo != "" {
|
||||||
|
v.vIngester = NewVideoIngester(sourceVideo)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup the interceptors for packets.
|
||||||
|
api.options = append(api.options, func(api *TestWebRTCAPI) {
|
||||||
|
// Filter for RTCP packets.
|
||||||
|
rtcpInterceptor := &RTCPInterceptor{}
|
||||||
|
rtcpInterceptor.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||||
|
return rtcpInterceptor.nextRTCPReader.Read(buf, attributes)
|
||||||
|
}
|
||||||
|
rtcpInterceptor.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
|
||||||
|
return rtcpInterceptor.nextRTCPWriter.Write(pkts, attributes)
|
||||||
|
}
|
||||||
|
api.registry.Add(rtcpInterceptor)
|
||||||
|
|
||||||
|
// Filter for ingesters.
|
||||||
|
if sourceAudio != "" {
|
||||||
|
api.registry.Add(v.aIngester.audioLevelInterceptor)
|
||||||
|
}
|
||||||
|
if sourceVideo != "" {
|
||||||
|
api.registry.Add(v.vIngester.markerInterceptor)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *TestPublisher) Close() error {
|
||||||
|
if v.vIngester != nil {
|
||||||
|
v.vIngester.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.aIngester != nil {
|
||||||
|
v.aIngester.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.pc != nil {
|
||||||
|
v.pc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *TestPublisher) SetStreamSuffix(suffix string) *TestPublisher {
|
||||||
|
v.streamSuffix = suffix
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) error {
|
||||||
|
r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
|
||||||
|
if v.streamSuffix != "" {
|
||||||
|
r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
|
||||||
|
}
|
||||||
|
sourceVideo, sourceAudio, fps := *srsPublishVideo, *srsPublishAudio, *srsPublishVideoFps
|
||||||
|
|
||||||
|
logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v",
|
||||||
|
r, sourceAudio, sourceVideo, fps)
|
||||||
|
|
||||||
|
pc, err := v.api.NewPeerConnection(webrtc.Configuration{})
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "Create PC")
|
||||||
|
}
|
||||||
|
v.pc = pc
|
||||||
|
|
||||||
|
if v.vIngester != nil {
|
||||||
|
if err := v.vIngester.AddTrack(pc, fps); err != nil {
|
||||||
|
return errors.Wrapf(err, "Add track")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.aIngester != nil {
|
||||||
|
if err := v.aIngester.AddTrack(pc); err != nil {
|
||||||
|
return errors.Wrapf(err, "Add track")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
offer, err := pc.CreateOffer(nil)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "Create Offer")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pc.SetLocalDescription(offer); err != nil {
|
||||||
|
return errors.Wrapf(err, "Set offer %v", offer)
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.onOffer != nil {
|
||||||
|
if err := v.onOffer(&offer); err != nil {
|
||||||
|
return errors.Wrapf(err, "sdp %v %v", offer.Type, offer.SDP)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
answerSDP, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a proxy for real server and vnet.
|
||||||
|
if address, err := parseAddressOfCandidate(answerSDP); err != nil {
|
||||||
|
return errors.Wrapf(err, "parse address of %v", answerSDP)
|
||||||
|
} else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
|
||||||
|
return errors.Wrapf(err, "proxy %v to %v", v.api.network, address)
|
||||||
|
}
|
||||||
|
|
||||||
|
answer := &webrtc.SessionDescription{
|
||||||
|
Type: webrtc.SDPTypeAnswer, SDP: answerSDP,
|
||||||
|
}
|
||||||
|
if v.onAnswer != nil {
|
||||||
|
if err := v.onAnswer(answer); err != nil {
|
||||||
|
return errors.Wrapf(err, "on answerSDP")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pc.SetRemoteDescription(*answer); err != nil {
|
||||||
|
return errors.Wrapf(err, "Set answerSDP %v", answerSDP)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState())
|
||||||
|
|
||||||
|
// ICE state management.
|
||||||
|
pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
|
||||||
|
logger.Tf(ctx, "ICE gather state %v", state)
|
||||||
|
})
|
||||||
|
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
||||||
|
logger.Tf(ctx, "ICE candidate %v %v:%v", candidate.Protocol, candidate.Address, candidate.Port)
|
||||||
|
|
||||||
|
})
|
||||||
|
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
||||||
|
logger.Tf(ctx, "ICE state %v", state)
|
||||||
|
})
|
||||||
|
|
||||||
|
pc.OnSignalingStateChange(func(state webrtc.SignalingState) {
|
||||||
|
logger.Tf(ctx, "Signaling state %v", state)
|
||||||
|
})
|
||||||
|
|
||||||
|
if v.aIngester != nil {
|
||||||
|
v.aIngester.sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) {
|
||||||
|
logger.Tf(ctx, "DTLS state %v", state)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pcDone, pcDoneCancel := context.WithCancel(context.Background())
|
||||||
|
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||||||
|
logger.Tf(ctx, "PC state %v", state)
|
||||||
|
|
||||||
|
if state == webrtc.PeerConnectionStateConnected {
|
||||||
|
pcDoneCancel()
|
||||||
|
if v.iceReadyCancel != nil {
|
||||||
|
v.iceReadyCancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed {
|
||||||
|
err = errors.Errorf("Close for PC state %v", state)
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Wait for event from context or tracks.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var finalErr error
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer logger.Tf(ctx, "ingest notify done")
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
|
||||||
|
if v.aIngester != nil && v.aIngester.sAudioSender != nil {
|
||||||
|
v.aIngester.sAudioSender.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.vIngester != nil && v.vIngester.sVideoSender != nil {
|
||||||
|
v.vIngester.sVideoSender.Stop()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if v.aIngester == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-pcDone.Done():
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer logger.Tf(ctx, "aingester sender read done")
|
||||||
|
|
||||||
|
buf := make([]byte, 1500)
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
if _, _, err := v.aIngester.sAudioSender.Read(buf); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
if err := v.aIngester.Ingest(ctx); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
logger.Tf(ctx, "aingester retry for %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != context.Canceled {
|
||||||
|
finalErr = errors.Wrapf(err, "audio")
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Tf(ctx, "aingester err=%v, final=%v", err, finalErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if v.vIngester == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-pcDone.Done():
|
||||||
|
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
defer logger.Tf(ctx, "vingester sender read done")
|
||||||
|
|
||||||
|
buf := make([]byte, 1500)
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
// The Read() might block in r.rtcpInterceptor.Read(b, a),
|
||||||
|
// so that the Stop() can not stop it.
|
||||||
|
if _, _, err := v.vIngester.sVideoSender.Read(buf); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
if err := v.vIngester.Ingest(ctx); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
logger.Tf(ctx, "vingester retry for %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != context.Canceled {
|
||||||
|
finalErr = errors.Wrapf(err, "video")
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Tf(ctx, "vingester err=%v, final=%v", err, finalErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
logger.Tf(ctx, "ingester done ctx=%v, final=%v", ctx.Err(), finalErr)
|
||||||
|
if finalErr != nil {
|
||||||
|
return finalErr
|
||||||
|
}
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRTCServerVersion(t *testing.T) {
|
||||||
|
api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer)
|
||||||
|
req, err := http.NewRequest("POST", api, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Request %v", api)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Do request %v", api)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := ioutil.ReadAll(res.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Read body of %v", api)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
obj := struct {
|
||||||
|
Code int `json:"code"`
|
||||||
|
Server string `json:"server"`
|
||||||
|
Data struct {
|
||||||
|
Major int `json:"major"`
|
||||||
|
Minor int `json:"minor"`
|
||||||
|
Revision int `json:"revision"`
|
||||||
|
Version string `json:"version"`
|
||||||
|
} `json:"data"`
|
||||||
|
}{}
|
||||||
|
if err := json.Unmarshal(b, &obj); err != nil {
|
||||||
|
t.Errorf("Parse %v", string(b))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if obj.Code != 0 {
|
||||||
|
t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if obj.Data.Major == 0 && obj.Data.Minor == 0 {
|
||||||
|
t.Errorf("Invalid version %v", obj.Data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
723
trunk/3rdparty/srs-bench/srs/util_test.go
vendored
723
trunk/3rdparty/srs-bench/srs/util_test.go
vendored
|
@ -1,723 +0,0 @@
|
||||||
// The MIT License (MIT)
|
|
||||||
//
|
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
|
||||||
//
|
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
|
||||||
// the Software without restriction, including without limitation the rights to
|
|
||||||
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
|
||||||
// the Software, and to permit persons to whom the Software is furnished to do so,
|
|
||||||
// subject to the following conditions:
|
|
||||||
//
|
|
||||||
// The above copyright notice and this permission notice shall be included in all
|
|
||||||
// copies or substantial portions of the Software.
|
|
||||||
//
|
|
||||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
|
||||||
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
|
||||||
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
|
||||||
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
|
||||||
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
||||||
package srs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"github.com/ossrs/go-oryx-lib/errors"
|
|
||||||
"github.com/ossrs/go-oryx-lib/logger"
|
|
||||||
vnet_proxy "github.com/ossrs/srs-bench/vnet"
|
|
||||||
"github.com/pion/interceptor"
|
|
||||||
"github.com/pion/logging"
|
|
||||||
"github.com/pion/rtcp"
|
|
||||||
"github.com/pion/transport/vnet"
|
|
||||||
"github.com/pion/webrtc/v3"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var srsSchema = "http"
|
|
||||||
var srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API")
|
|
||||||
var srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to")
|
|
||||||
var srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play")
|
|
||||||
var srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
|
|
||||||
var srsTimeout = flag.Int("srs-timeout", 5000, "For each case, the timeout in ms")
|
|
||||||
var srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.")
|
|
||||||
var srsPlayOKPackets = flag.Int("srs-play-ok-packets", 10, "If got N packets, it's ok, or fail")
|
|
||||||
var srsPublishOKPackets = flag.Int("srs-publish-ok-packets", 10, "If send N packets, it's ok, or fail")
|
|
||||||
var srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.")
|
|
||||||
var srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.")
|
|
||||||
var srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.")
|
|
||||||
var srsVnetClientIP = flag.String("srs-vnet-client-ip", "192.168.168.168", "The client ip in pion/vnet.")
|
|
||||||
var srsDTLSDropPackets = flag.Int("srs-dtls-drop-packets", 5, "If dropped N packets, it's ok, or fail")
|
|
||||||
|
|
||||||
func prepareTest() error {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// Should parse it first.
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
// The stream should starts with /, for example, /rtc/regression
|
|
||||||
if !strings.HasPrefix(*srsStream, "/") {
|
|
||||||
*srsStream = "/" + *srsStream
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate srs protocol from whether use HTTPS.
|
|
||||||
if *srsHttps {
|
|
||||||
srsSchema = "https"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check file.
|
|
||||||
tryOpenFile := func(filename string) (string, error) {
|
|
||||||
if filename == "" {
|
|
||||||
return filename, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := os.Open(filename)
|
|
||||||
if err != nil {
|
|
||||||
nfilename := path.Join("../", filename)
|
|
||||||
f2, err := os.Open(nfilename)
|
|
||||||
if err != nil {
|
|
||||||
return filename, errors.Wrapf(err, "No video file at %v or %v", filename, nfilename)
|
|
||||||
}
|
|
||||||
defer f2.Close()
|
|
||||||
|
|
||||||
return nfilename, nil
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
return filename, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if *srsPublishVideo, err = tryOpenFile(*srsPublishVideo); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
|
||||||
if err := prepareTest(); err != nil {
|
|
||||||
logger.Ef(nil, "Prepare test fail, err %+v", err)
|
|
||||||
os.Exit(-1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Disable the logger during all tests.
|
|
||||||
if *srsLog == false {
|
|
||||||
olw := logger.Switch(ioutil.Discard)
|
|
||||||
defer func() {
|
|
||||||
logger.Switch(olw)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
os.Exit(m.Run())
|
|
||||||
}
|
|
||||||
|
|
||||||
type TestWebRTCAPIOptionFunc func(api *TestWebRTCAPI)
|
|
||||||
|
|
||||||
type TestWebRTCAPI struct {
|
|
||||||
// The options to setup the api.
|
|
||||||
options []TestWebRTCAPIOptionFunc
|
|
||||||
// The api and settings.
|
|
||||||
api *webrtc.API
|
|
||||||
mediaEngine *webrtc.MediaEngine
|
|
||||||
registry *interceptor.Registry
|
|
||||||
settingEngine *webrtc.SettingEngine
|
|
||||||
// The vnet router, can be shared by different apis, but we do not share it.
|
|
||||||
router *vnet.Router
|
|
||||||
// The network for api.
|
|
||||||
network *vnet.Net
|
|
||||||
// The vnet UDP proxy bind to the router.
|
|
||||||
proxy *vnet_proxy.UDPProxy
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTestWebRTCAPI(options ...TestWebRTCAPIOptionFunc) (*TestWebRTCAPI, error) {
|
|
||||||
v := &TestWebRTCAPI{}
|
|
||||||
|
|
||||||
v.mediaEngine = &webrtc.MediaEngine{}
|
|
||||||
if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
v.registry = &interceptor.Registry{}
|
|
||||||
if err := webrtc.RegisterDefaultInterceptors(v.mediaEngine, v.registry); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, setup := range options {
|
|
||||||
setup(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
v.settingEngine = &webrtc.SettingEngine{}
|
|
||||||
|
|
||||||
return v, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *TestWebRTCAPI) Close() error {
|
|
||||||
if v.proxy != nil {
|
|
||||||
v.proxy.Close()
|
|
||||||
v.proxy = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if v.router != nil {
|
|
||||||
v.router.Stop()
|
|
||||||
v.router = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *TestWebRTCAPI) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error {
|
|
||||||
// Setting engine for https://github.com/pion/transport/tree/master/vnet
|
|
||||||
setupVnet := func(vnetClientIP string) (err error) {
|
|
||||||
// We create a private router for a api, however, it's possible to share the
|
|
||||||
// same router between apis.
|
|
||||||
if v.router, err = vnet.NewRouter(&vnet.RouterConfig{
|
|
||||||
CIDR: "0.0.0.0/0", // Accept all ip, no sub router.
|
|
||||||
LoggerFactory: logging.NewDefaultLoggerFactory(),
|
|
||||||
}); err != nil {
|
|
||||||
return errors.Wrapf(err, "create router for api")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Each api should bind to a network, however, it's possible to share it
|
|
||||||
// for different apis.
|
|
||||||
v.network = vnet.NewNet(&vnet.NetConfig{
|
|
||||||
StaticIP: vnetClientIP,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err = v.router.AddNet(v.network); err != nil {
|
|
||||||
return errors.Wrapf(err, "create network for api")
|
|
||||||
}
|
|
||||||
|
|
||||||
v.settingEngine.SetVNet(v.network)
|
|
||||||
|
|
||||||
// Create a proxy bind to the router.
|
|
||||||
if v.proxy, err = vnet_proxy.NewProxy(v.router); err != nil {
|
|
||||||
return errors.Wrapf(err, "create proxy for router")
|
|
||||||
}
|
|
||||||
|
|
||||||
return v.router.Start()
|
|
||||||
}
|
|
||||||
if err := setupVnet(vnetClientIP); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, setup := range options {
|
|
||||||
setup(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, setup := range v.options {
|
|
||||||
setup(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
v.api = webrtc.NewAPI(
|
|
||||||
webrtc.WithMediaEngine(v.mediaEngine),
|
|
||||||
webrtc.WithInterceptorRegistry(v.registry),
|
|
||||||
webrtc.WithSettingEngine(*v.settingEngine),
|
|
||||||
)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *TestWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
|
|
||||||
return v.api.NewPeerConnection(configuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
type TestPlayerOptionFunc func(p *TestPlayer)
|
|
||||||
|
|
||||||
type TestPlayer struct {
|
|
||||||
pc *webrtc.PeerConnection
|
|
||||||
receivers []*webrtc.RTPReceiver
|
|
||||||
// root api object
|
|
||||||
api *TestWebRTCAPI
|
|
||||||
// Optional suffix for stream url.
|
|
||||||
streamSuffix string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) *TestPlayer {
|
|
||||||
v := &TestPlayer{api: api}
|
|
||||||
|
|
||||||
for _, opt := range options {
|
|
||||||
opt(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *TestPlayer) Close() error {
|
|
||||||
if v.pc != nil {
|
|
||||||
v.pc.Close()
|
|
||||||
v.pc = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, receiver := range v.receivers {
|
|
||||||
receiver.Stop()
|
|
||||||
}
|
|
||||||
v.receivers = nil
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
|
|
||||||
r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
|
|
||||||
if v.streamSuffix != "" {
|
|
||||||
r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
|
|
||||||
}
|
|
||||||
pli := time.Duration(*srsPlayPLI) * time.Millisecond
|
|
||||||
logger.Tf(ctx, "Start play url=%v", r)
|
|
||||||
|
|
||||||
pc, err := v.api.NewPeerConnection(webrtc.Configuration{})
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "Create PC")
|
|
||||||
}
|
|
||||||
v.pc = pc
|
|
||||||
|
|
||||||
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
|
|
||||||
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
|
||||||
})
|
|
||||||
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
|
|
||||||
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
|
||||||
})
|
|
||||||
|
|
||||||
offer, err := pc.CreateOffer(nil)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "Create Offer")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := pc.SetLocalDescription(offer); err != nil {
|
|
||||||
return errors.Wrapf(err, "Set offer %v", offer)
|
|
||||||
}
|
|
||||||
|
|
||||||
answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start a proxy for real server and vnet.
|
|
||||||
if address, err := parseAddressOfCandidate(answer); err != nil {
|
|
||||||
return errors.Wrapf(err, "parse address of %v", answer)
|
|
||||||
} else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
|
|
||||||
return errors.Wrapf(err, "proxy %v to %v", v.api.network, address)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
|
|
||||||
Type: webrtc.SDPTypeAnswer, SDP: answer,
|
|
||||||
}); err != nil {
|
|
||||||
return errors.Wrapf(err, "Set answer %v", answer)
|
|
||||||
}
|
|
||||||
|
|
||||||
handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error {
|
|
||||||
// Send a PLI on an interval so that the publisher is pushing a keyframe
|
|
||||||
go func() {
|
|
||||||
if track.Kind() == webrtc.RTPCodecTypeAudio {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-time.After(pli):
|
|
||||||
_ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
|
|
||||||
MediaSSRC: uint32(track.SSRC()),
|
|
||||||
}})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
v.receivers = append(v.receivers, receiver)
|
|
||||||
|
|
||||||
for ctx.Err() == nil {
|
|
||||||
_, _, err := track.ReadRTP()
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "Read RTP")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
|
||||||
err = handleTrack(ctx, track, receiver)
|
|
||||||
if err != nil {
|
|
||||||
codec := track.Codec()
|
|
||||||
err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType)
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
|
||||||
if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
|
|
||||||
err = errors.Errorf("Close for ICE state %v", state)
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
<-ctx.Done()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
type TestPublisherOptionFunc func(p *TestPublisher)
|
|
||||||
|
|
||||||
type TestPublisher struct {
|
|
||||||
onOffer func(s *webrtc.SessionDescription) error
|
|
||||||
onAnswer func(s *webrtc.SessionDescription) error
|
|
||||||
iceReadyCancel context.CancelFunc
|
|
||||||
// internal objects
|
|
||||||
aIngester *audioIngester
|
|
||||||
vIngester *videoIngester
|
|
||||||
pc *webrtc.PeerConnection
|
|
||||||
// root api object
|
|
||||||
api *TestWebRTCAPI
|
|
||||||
// Optional suffix for stream url.
|
|
||||||
streamSuffix string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) *TestPublisher {
|
|
||||||
sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio
|
|
||||||
|
|
||||||
v := &TestPublisher{api: api}
|
|
||||||
|
|
||||||
for _, opt := range options {
|
|
||||||
opt(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create ingesters.
|
|
||||||
if sourceAudio != "" {
|
|
||||||
v.aIngester = NewAudioIngester(sourceAudio)
|
|
||||||
}
|
|
||||||
if sourceVideo != "" {
|
|
||||||
v.vIngester = NewVideoIngester(sourceVideo)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup the interceptors for packets.
|
|
||||||
api.options = append(api.options, func(api *TestWebRTCAPI) {
|
|
||||||
// Filter for RTCP packets.
|
|
||||||
rtcpInterceptor := &RTCPInterceptor{}
|
|
||||||
rtcpInterceptor.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
|
||||||
return rtcpInterceptor.nextRTCPReader.Read(buf, attributes)
|
|
||||||
}
|
|
||||||
rtcpInterceptor.rtcpWriter = func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
|
|
||||||
return rtcpInterceptor.nextRTCPWriter.Write(pkts, attributes)
|
|
||||||
}
|
|
||||||
api.registry.Add(rtcpInterceptor)
|
|
||||||
|
|
||||||
// Filter for ingesters.
|
|
||||||
if sourceAudio != "" {
|
|
||||||
api.registry.Add(v.aIngester.audioLevelInterceptor)
|
|
||||||
}
|
|
||||||
if sourceVideo != "" {
|
|
||||||
api.registry.Add(v.vIngester.markerInterceptor)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *TestPublisher) Close() error {
|
|
||||||
if v.vIngester != nil {
|
|
||||||
v.vIngester.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
if v.aIngester != nil {
|
|
||||||
v.aIngester.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
if v.pc != nil {
|
|
||||||
v.pc.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *TestPublisher) SetStreamSuffix(suffix string) *TestPublisher {
|
|
||||||
v.streamSuffix = suffix
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) error {
|
|
||||||
r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
|
|
||||||
if v.streamSuffix != "" {
|
|
||||||
r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
|
|
||||||
}
|
|
||||||
sourceVideo, sourceAudio, fps := *srsPublishVideo, *srsPublishAudio, *srsPublishVideoFps
|
|
||||||
|
|
||||||
logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v",
|
|
||||||
r, sourceAudio, sourceVideo, fps)
|
|
||||||
|
|
||||||
pc, err := v.api.NewPeerConnection(webrtc.Configuration{})
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "Create PC")
|
|
||||||
}
|
|
||||||
v.pc = pc
|
|
||||||
|
|
||||||
if v.vIngester != nil {
|
|
||||||
if err := v.vIngester.AddTrack(pc, fps); err != nil {
|
|
||||||
return errors.Wrapf(err, "Add track")
|
|
||||||
}
|
|
||||||
defer v.vIngester.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
if v.aIngester != nil {
|
|
||||||
if err := v.aIngester.AddTrack(pc); err != nil {
|
|
||||||
return errors.Wrapf(err, "Add track")
|
|
||||||
}
|
|
||||||
defer v.aIngester.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
offer, err := pc.CreateOffer(nil)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "Create Offer")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := pc.SetLocalDescription(offer); err != nil {
|
|
||||||
return errors.Wrapf(err, "Set offer %v", offer)
|
|
||||||
}
|
|
||||||
|
|
||||||
if v.onOffer != nil {
|
|
||||||
if err := v.onOffer(&offer); err != nil {
|
|
||||||
return errors.Wrapf(err, "sdp %v %v", offer.Type, offer.SDP)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
answerSDP, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start a proxy for real server and vnet.
|
|
||||||
if address, err := parseAddressOfCandidate(answerSDP); err != nil {
|
|
||||||
return errors.Wrapf(err, "parse address of %v", answerSDP)
|
|
||||||
} else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
|
|
||||||
return errors.Wrapf(err, "proxy %v to %v", v.api.network, address)
|
|
||||||
}
|
|
||||||
|
|
||||||
answer := &webrtc.SessionDescription{
|
|
||||||
Type: webrtc.SDPTypeAnswer, SDP: answerSDP,
|
|
||||||
}
|
|
||||||
if v.onAnswer != nil {
|
|
||||||
if err := v.onAnswer(answer); err != nil {
|
|
||||||
return errors.Wrapf(err, "on answerSDP")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := pc.SetRemoteDescription(*answer); err != nil {
|
|
||||||
return errors.Wrapf(err, "Set answerSDP %v", answerSDP)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState())
|
|
||||||
|
|
||||||
// ICE state management.
|
|
||||||
pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
|
|
||||||
logger.Tf(ctx, "ICE gather state %v", state)
|
|
||||||
})
|
|
||||||
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
|
||||||
logger.Tf(ctx, "ICE candidate %v %v:%v", candidate.Protocol, candidate.Address, candidate.Port)
|
|
||||||
|
|
||||||
})
|
|
||||||
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
|
||||||
logger.Tf(ctx, "ICE state %v", state)
|
|
||||||
})
|
|
||||||
|
|
||||||
pc.OnSignalingStateChange(func(state webrtc.SignalingState) {
|
|
||||||
logger.Tf(ctx, "Signaling state %v", state)
|
|
||||||
})
|
|
||||||
|
|
||||||
if v.aIngester != nil {
|
|
||||||
v.aIngester.sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) {
|
|
||||||
logger.Tf(ctx, "DTLS state %v", state)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pcDone, pcDoneCancel := context.WithCancel(context.Background())
|
|
||||||
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
|
||||||
logger.Tf(ctx, "PC state %v", state)
|
|
||||||
|
|
||||||
if state == webrtc.PeerConnectionStateConnected {
|
|
||||||
pcDoneCancel()
|
|
||||||
if v.iceReadyCancel != nil {
|
|
||||||
v.iceReadyCancel()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed {
|
|
||||||
err = errors.Errorf("Close for PC state %v", state)
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// Wait for event from context or tracks.
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var finalErr error
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
defer logger.Tf(ctx, "ingest notify done")
|
|
||||||
|
|
||||||
<-ctx.Done()
|
|
||||||
|
|
||||||
if v.aIngester != nil && v.aIngester.sAudioSender != nil {
|
|
||||||
v.aIngester.sAudioSender.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
if v.vIngester != nil && v.vIngester.sVideoSender != nil {
|
|
||||||
v.vIngester.sVideoSender.Stop()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if v.aIngester == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-pcDone.Done():
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
defer logger.Tf(ctx, "aingester sender read done")
|
|
||||||
|
|
||||||
buf := make([]byte, 1500)
|
|
||||||
for ctx.Err() == nil {
|
|
||||||
if _, _, err := v.aIngester.sAudioSender.Read(buf); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
if err := v.aIngester.Ingest(ctx); err != nil {
|
|
||||||
if err == io.EOF {
|
|
||||||
logger.Tf(ctx, "aingester retry for %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err != context.Canceled {
|
|
||||||
finalErr = errors.Wrapf(err, "audio")
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Tf(ctx, "aingester err=%v, final=%v", err, finalErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if v.vIngester == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-pcDone.Done():
|
|
||||||
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo)
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
defer logger.Tf(ctx, "vingester sender read done")
|
|
||||||
|
|
||||||
buf := make([]byte, 1500)
|
|
||||||
for ctx.Err() == nil {
|
|
||||||
// The Read() might block in r.rtcpInterceptor.Read(b, a),
|
|
||||||
// so that the Stop() can not stop it.
|
|
||||||
if _, _, err := v.vIngester.sVideoSender.Read(buf); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
if err := v.vIngester.Ingest(ctx); err != nil {
|
|
||||||
if err == io.EOF {
|
|
||||||
logger.Tf(ctx, "vingester retry for %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err != context.Canceled {
|
|
||||||
finalErr = errors.Wrapf(err, "video")
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Tf(ctx, "vingester err=%v, final=%v", err, finalErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
logger.Tf(ctx, "ingester done ctx=%v, final=%v", ctx.Err(), finalErr)
|
|
||||||
if finalErr != nil {
|
|
||||||
return finalErr
|
|
||||||
}
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRTCServerVersion(t *testing.T) {
|
|
||||||
api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer)
|
|
||||||
req, err := http.NewRequest("POST", api, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Request %v", api)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Do request %v", api)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := ioutil.ReadAll(res.Body)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Read body of %v", api)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
obj := struct {
|
|
||||||
Code int `json:"code"`
|
|
||||||
Server string `json:"server"`
|
|
||||||
Data struct {
|
|
||||||
Major int `json:"major"`
|
|
||||||
Minor int `json:"minor"`
|
|
||||||
Revision int `json:"revision"`
|
|
||||||
Version string `json:"version"`
|
|
||||||
} `json:"data"`
|
|
||||||
}{}
|
|
||||||
if err := json.Unmarshal(b, &obj); err != nil {
|
|
||||||
t.Errorf("Parse %v", string(b))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if obj.Code != 0 {
|
|
||||||
t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if obj.Data.Major == 0 && obj.Data.Minor == 0 {
|
|
||||||
t.Errorf("Invalid version %v", obj.Data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
|
2
trunk/3rdparty/srs-bench/vnet/udpproxy.go
vendored
2
trunk/3rdparty/srs-bench/vnet/udpproxy.go
vendored
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
// The MIT License (MIT)
|
// The MIT License (MIT)
|
||||||
//
|
//
|
||||||
// Copyright (c) 2021 srs-bench(ossrs)
|
// Copyright (c) 2021 Winlin
|
||||||
//
|
//
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
// this software and associated documentation files (the "Software"), to deal in
|
// this software and associated documentation files (the "Software"), to deal in
|
||||||
|
|
Loading…
Reference in a new issue