mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Tools: Sync 3rdparty tools
This commit is contained in:
parent
dea6136238
commit
2783ac7c92
28 changed files with 4631 additions and 616 deletions
12
trunk/3rdparty/srs-bench/srs/ingester.go
vendored
12
trunk/3rdparty/srs-bench/srs/ingester.go
vendored
|
@ -41,15 +41,15 @@ import (
|
|||
type videoIngester struct {
|
||||
sourceVideo string
|
||||
fps int
|
||||
markerInterceptor *RTPInterceptor
|
||||
markerInterceptor *rtpInterceptor
|
||||
sVideoTrack *webrtc.TrackLocalStaticSample
|
||||
sVideoSender *webrtc.RTPSender
|
||||
ready context.Context
|
||||
readyCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewVideoIngester(sourceVideo string) *videoIngester {
|
||||
v := &videoIngester{markerInterceptor: &RTPInterceptor{}, sourceVideo: sourceVideo}
|
||||
func newVideoIngester(sourceVideo string) *videoIngester {
|
||||
v := &videoIngester{markerInterceptor: &rtpInterceptor{}, sourceVideo: sourceVideo}
|
||||
v.ready, v.readyCancel = context.WithCancel(context.Background())
|
||||
return v
|
||||
}
|
||||
|
@ -183,15 +183,15 @@ func (v *videoIngester) Ingest(ctx context.Context) error {
|
|||
|
||||
type audioIngester struct {
|
||||
sourceAudio string
|
||||
audioLevelInterceptor *RTPInterceptor
|
||||
audioLevelInterceptor *rtpInterceptor
|
||||
sAudioTrack *webrtc.TrackLocalStaticSample
|
||||
sAudioSender *webrtc.RTPSender
|
||||
ready context.Context
|
||||
readyCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewAudioIngester(sourceAudio string) *audioIngester {
|
||||
v := &audioIngester{audioLevelInterceptor: &RTPInterceptor{}, sourceAudio: sourceAudio}
|
||||
func newAudioIngester(sourceAudio string) *audioIngester {
|
||||
v := &audioIngester{audioLevelInterceptor: &rtpInterceptor{}, sourceAudio: sourceAudio}
|
||||
v.ready, v.readyCancel = context.WithCancel(context.Background())
|
||||
return v
|
||||
}
|
||||
|
|
60
trunk/3rdparty/srs-bench/srs/interceptor.go
vendored
60
trunk/3rdparty/srs-bench/srs/interceptor.go
vendored
|
@ -26,11 +26,11 @@ import (
|
|||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
type RTPInterceptorOptionFunc func(i *RTPInterceptor)
|
||||
type rtpInterceptorOptionFunc func(i *rtpInterceptor)
|
||||
|
||||
// Common RTP packet interceptor for benchmark.
|
||||
// @remark Should never merge with RTCPInterceptor, because they has the same Write interface.
|
||||
type RTPInterceptor struct {
|
||||
// @remark Should never merge with rtcpInterceptor, because they has the same Write interface.
|
||||
type rtpInterceptor struct {
|
||||
// If rtpReader is nil, use the default next one to read.
|
||||
rtpReader interceptor.RTPReaderFunc
|
||||
nextRTPReader interceptor.RTPReader
|
||||
|
@ -38,52 +38,52 @@ type RTPInterceptor struct {
|
|||
rtpWriter interceptor.RTPWriterFunc
|
||||
nextRTPWriter interceptor.RTPWriter
|
||||
// Other common fields.
|
||||
BypassInterceptor
|
||||
bypassInterceptor
|
||||
}
|
||||
|
||||
func NewRTPInterceptor(options ...RTPInterceptorOptionFunc) *RTPInterceptor {
|
||||
v := &RTPInterceptor{}
|
||||
func newRTPInterceptor(options ...rtpInterceptorOptionFunc) *rtpInterceptor {
|
||||
v := &rtpInterceptor{}
|
||||
for _, opt := range options {
|
||||
opt(v)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *RTPInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
|
||||
func (v *rtpInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
|
||||
v.nextRTPWriter = writer
|
||||
return v // Handle all RTP
|
||||
}
|
||||
|
||||
func (v *RTPInterceptor) Write(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
func (v *rtpInterceptor) Write(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
if v.rtpWriter != nil {
|
||||
return v.rtpWriter(header, payload, attributes)
|
||||
}
|
||||
return v.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
|
||||
func (v *RTPInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
|
||||
func (v *rtpInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
|
||||
}
|
||||
|
||||
func (v *RTPInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
|
||||
func (v *rtpInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
|
||||
v.nextRTPReader = reader
|
||||
return v // Handle all RTP
|
||||
}
|
||||
|
||||
func (v *RTPInterceptor) Read(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
func (v *rtpInterceptor) Read(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if v.rtpReader != nil {
|
||||
return v.rtpReader(b, a)
|
||||
}
|
||||
return v.nextRTPReader.Read(b, a)
|
||||
}
|
||||
|
||||
func (v *RTPInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
|
||||
func (v *rtpInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
|
||||
}
|
||||
|
||||
type RTCPInterceptorOptionFunc func(i *RTCPInterceptor)
|
||||
type rtcpInterceptorOptionFunc func(i *rtcpInterceptor)
|
||||
|
||||
// Common RTCP packet interceptor for benchmark.
|
||||
// @remark Should never merge with RTPInterceptor, because they has the same Write interface.
|
||||
type RTCPInterceptor struct {
|
||||
// @remark Should never merge with rtpInterceptor, because they has the same Write interface.
|
||||
type rtcpInterceptor struct {
|
||||
// If rtcpReader is nil, use the default next one to read.
|
||||
rtcpReader interceptor.RTCPReaderFunc
|
||||
nextRTCPReader interceptor.RTCPReader
|
||||
|
@ -91,35 +91,35 @@ type RTCPInterceptor struct {
|
|||
rtcpWriter interceptor.RTCPWriterFunc
|
||||
nextRTCPWriter interceptor.RTCPWriter
|
||||
// Other common fields.
|
||||
BypassInterceptor
|
||||
bypassInterceptor
|
||||
}
|
||||
|
||||
func NewRTCPInterceptor(options ...RTCPInterceptorOptionFunc) *RTCPInterceptor {
|
||||
v := &RTCPInterceptor{}
|
||||
func newRTCPInterceptor(options ...rtcpInterceptorOptionFunc) *rtcpInterceptor {
|
||||
v := &rtcpInterceptor{}
|
||||
for _, opt := range options {
|
||||
opt(v)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *RTCPInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
|
||||
func (v *rtcpInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
|
||||
v.nextRTCPReader = reader
|
||||
return v // Handle all RTCP
|
||||
}
|
||||
|
||||
func (v *RTCPInterceptor) Read(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
func (v *rtcpInterceptor) Read(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if v.rtcpReader != nil {
|
||||
return v.rtcpReader(b, a)
|
||||
}
|
||||
return v.nextRTCPReader.Read(b, a)
|
||||
}
|
||||
|
||||
func (v *RTCPInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
|
||||
func (v *rtcpInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
|
||||
v.nextRTCPWriter = writer
|
||||
return v // Handle all RTCP
|
||||
}
|
||||
|
||||
func (v *RTCPInterceptor) Write(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
|
||||
func (v *rtcpInterceptor) Write(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
|
||||
if v.rtcpWriter != nil {
|
||||
return v.rtcpWriter(pkts, attributes)
|
||||
}
|
||||
|
@ -127,32 +127,32 @@ func (v *RTCPInterceptor) Write(pkts []rtcp.Packet, attributes interceptor.Attri
|
|||
}
|
||||
|
||||
// Do nothing.
|
||||
type BypassInterceptor struct {
|
||||
type bypassInterceptor struct {
|
||||
interceptor.Interceptor
|
||||
}
|
||||
|
||||
func (v *BypassInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
|
||||
func (v *bypassInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
|
||||
return reader
|
||||
}
|
||||
|
||||
func (v *BypassInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
|
||||
func (v *bypassInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
|
||||
return writer
|
||||
}
|
||||
|
||||
func (v *BypassInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
|
||||
func (v *bypassInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
|
||||
return writer
|
||||
}
|
||||
|
||||
func (v *BypassInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
|
||||
func (v *bypassInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
|
||||
}
|
||||
|
||||
func (v *BypassInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
|
||||
func (v *bypassInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
|
||||
return reader
|
||||
}
|
||||
|
||||
func (v *BypassInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
|
||||
func (v *bypassInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
|
||||
}
|
||||
|
||||
func (v *BypassInterceptor) Close() error {
|
||||
func (v *bypassInterceptor) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
|
6
trunk/3rdparty/srs-bench/srs/player.go
vendored
6
trunk/3rdparty/srs-bench/srs/player.go
vendored
|
@ -40,10 +40,10 @@ import (
|
|||
)
|
||||
|
||||
// @see https://github.com/pion/webrtc/blob/master/examples/save-to-disk/main.go
|
||||
func StartPlay(ctx context.Context, r, dumpAudio, dumpVideo string, enableAudioLevel, enableTWCC bool, pli int) error {
|
||||
func startPlay(ctx context.Context, r, dumpAudio, dumpVideo string, enableAudioLevel, enableTWCC bool, pli int) error {
|
||||
ctx = logger.WithContext(ctx)
|
||||
|
||||
logger.Tf(ctx, "Start play url=%v, audio=%v, video=%v, audio-level=%v, twcc=%v",
|
||||
logger.Tf(ctx, "Run play url=%v, audio=%v, video=%v, audio-level=%v, twcc=%v",
|
||||
r, dumpAudio, dumpVideo, enableAudioLevel, enableTWCC)
|
||||
|
||||
// For audio-level.
|
||||
|
@ -257,7 +257,7 @@ func StartPlay(ctx context.Context, r, dumpAudio, dumpVideo string, enableAudioL
|
|||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
StatRTC.PeerConnection = pc.GetStats()
|
||||
gStatRTC.PeerConnection = pc.GetStats()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
20
trunk/3rdparty/srs-bench/srs/publisher.go
vendored
20
trunk/3rdparty/srs-bench/srs/publisher.go
vendored
|
@ -34,10 +34,10 @@ import (
|
|||
)
|
||||
|
||||
// @see https://github.com/pion/webrtc/blob/master/examples/play-from-disk/main.go
|
||||
func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC bool) error {
|
||||
func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC bool) error {
|
||||
ctx = logger.WithContext(ctx)
|
||||
|
||||
logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v, audio-level=%v, twcc=%v",
|
||||
logger.Tf(ctx, "Run publish url=%v, audio=%v, video=%v, fps=%v, audio-level=%v, twcc=%v",
|
||||
r, sourceAudio, sourceVideo, fps, enableAudioLevel, enableTWCC)
|
||||
|
||||
// Filter for SPS/PPS marker.
|
||||
|
@ -78,11 +78,11 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
|
|||
}
|
||||
|
||||
if sourceAudio != "" {
|
||||
aIngester = NewAudioIngester(sourceAudio)
|
||||
aIngester = newAudioIngester(sourceAudio)
|
||||
registry.Add(aIngester.audioLevelInterceptor)
|
||||
}
|
||||
if sourceVideo != "" {
|
||||
vIngester = NewVideoIngester(sourceVideo)
|
||||
vIngester = newVideoIngester(sourceVideo)
|
||||
registry.Add(vIngester.markerInterceptor)
|
||||
}
|
||||
|
||||
|
@ -158,7 +158,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
|
|||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
pcDone, pcDoneCancel := context.WithCancel(context.Background())
|
||||
pcDoneCtx, pcDoneCancel := context.WithCancel(context.Background())
|
||||
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||||
logger.Tf(ctx, "PC state %v", state)
|
||||
|
||||
|
@ -196,7 +196,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
|
|||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-pcDone.Done():
|
||||
case <-pcDoneCtx.Done():
|
||||
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start read audio packets")
|
||||
}
|
||||
|
||||
|
@ -218,7 +218,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
|
|||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-pcDone.Done():
|
||||
case <-pcDoneCtx.Done():
|
||||
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest audio %v", sourceAudio)
|
||||
}
|
||||
|
||||
|
@ -244,7 +244,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
|
|||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-pcDone.Done():
|
||||
case <-pcDoneCtx.Done():
|
||||
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start read video packets")
|
||||
}
|
||||
|
||||
|
@ -266,7 +266,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
|
|||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-pcDone.Done():
|
||||
case <-pcDoneCtx.Done():
|
||||
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo)
|
||||
}
|
||||
|
||||
|
@ -290,7 +290,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
|
|||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
StatRTC.PeerConnection = pc.GetStats()
|
||||
gStatRTC.PeerConnection = pc.GetStats()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
340
trunk/3rdparty/srs-bench/srs/rtc_test.go
vendored
340
trunk/3rdparty/srs-bench/srs/rtc_test.go
vendored
|
@ -58,6 +58,56 @@ func TestMain(m *testing.M) {
|
|||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
// Basic use scenario, publish a stream.
|
||||
func TestRtcBasic_PublishOnly(t *testing.T) {
|
||||
if err := filterTestError(func() error {
|
||||
streamSuffix := fmt.Sprintf("publish-only-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
}
|
||||
logger.Tf(ctx, "publish write %v RTP read %v RTCP packets", nnRTP, nnRTCP)
|
||||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *testWebRTCAPI) {
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed {
|
||||
return true
|
||||
}
|
||||
logger.Tf(ctx, "Chunk %v, ok=%v %v bytes", chunk, ok, len(c.UserData()))
|
||||
return true
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.Run(ctx, cancel)
|
||||
}()); err != nil {
|
||||
t.Errorf("err %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Basic use scenario, publish a stream, then play it.
|
||||
func TestRtcBasic_PublishPlay(t *testing.T) {
|
||||
ctx := logger.WithContext(context.Background())
|
||||
|
@ -83,8 +133,8 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
|
|||
defer wg.Wait()
|
||||
|
||||
// The event notify.
|
||||
var thePublisher *TestPublisher
|
||||
var thePlayer *TestPlayer
|
||||
var thePublisher *testPublisher
|
||||
var thePlayer *testPlayer
|
||||
|
||||
mainReady, mainReadyCancel := context.WithCancel(context.Background())
|
||||
publishReady, publishReadyCancel := context.WithCancel(context.Background())
|
||||
|
@ -99,13 +149,13 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
|
|||
streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int())
|
||||
|
||||
// Initialize player with private api.
|
||||
if thePlayer, err = NewTestPlayer(CreateApiForPlayer, func(play *TestPlayer) error {
|
||||
if thePlayer, err = newTestPlayer(createApiForPlayer, func(play *testPlayer) error {
|
||||
play.streamSuffix = streamSuffix
|
||||
resources = append(resources, play)
|
||||
|
||||
var nnPlayWriteRTCP, nnPlayReadRTCP, nnPlayWriteRTP, nnPlayReadRTP uint64
|
||||
return play.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
return play.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) {
|
||||
cancel() // Completed.
|
||||
|
@ -115,7 +165,7 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
|
|||
return i.nextRTPReader.Read(payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
nn, attr, err := i.nextRTCPReader.Read(buf, attributes)
|
||||
nnPlayReadRTCP++
|
||||
|
@ -133,14 +183,14 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
|
|||
}
|
||||
|
||||
// Initialize publisher with private api.
|
||||
if thePublisher, err = NewTestPublisher(CreateApiForPublisher, func(pub *TestPublisher) error {
|
||||
if thePublisher, err = newTestPublisher(createApiForPublisher, func(pub *testPublisher) error {
|
||||
pub.streamSuffix = streamSuffix
|
||||
pub.iceReadyCancel = publishReadyCancel
|
||||
resources = append(resources, pub)
|
||||
|
||||
var nnPubWriteRTCP, nnPubReadRTCP, nnPubWriteRTP, nnPubReadRTP uint64
|
||||
return pub.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
return pub.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
nn, attr, err := i.nextRTPReader.Read(buf, attributes)
|
||||
nnPubReadRTP++
|
||||
|
@ -154,7 +204,7 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
|
|||
return nn, err
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
nn, attr, err := i.nextRTCPReader.Read(buf, attributes)
|
||||
nnPubReadRTCP++
|
||||
|
@ -237,8 +287,8 @@ func TestRtcBasic_Republish(t *testing.T) {
|
|||
defer wg.Wait()
|
||||
|
||||
// The event notify.
|
||||
var thePublisher, theRepublisher *TestPublisher
|
||||
var thePlayer *TestPlayer
|
||||
var thePublisher, theRepublisher *testPublisher
|
||||
var thePlayer *testPlayer
|
||||
|
||||
mainReady, mainReadyCancel := context.WithCancel(context.Background())
|
||||
publishReady, publishReadyCancel := context.WithCancel(context.Background())
|
||||
|
@ -254,13 +304,13 @@ func TestRtcBasic_Republish(t *testing.T) {
|
|||
streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int())
|
||||
|
||||
// Initialize player with private api.
|
||||
if thePlayer, err = NewTestPlayer(CreateApiForPlayer, func(play *TestPlayer) error {
|
||||
if thePlayer, err = newTestPlayer(createApiForPlayer, func(play *testPlayer) error {
|
||||
play.streamSuffix = streamSuffix
|
||||
resources = append(resources, play)
|
||||
|
||||
var nnPlayReadRTP uint64
|
||||
return play.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
return play.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
select {
|
||||
case <-republishReady.Done():
|
||||
|
@ -280,14 +330,14 @@ func TestRtcBasic_Republish(t *testing.T) {
|
|||
}
|
||||
|
||||
// Initialize publisher with private api.
|
||||
if thePublisher, err = NewTestPublisher(CreateApiForPublisher, func(pub *TestPublisher) error {
|
||||
if thePublisher, err = newTestPublisher(createApiForPublisher, func(pub *testPublisher) error {
|
||||
pub.streamSuffix = streamSuffix
|
||||
pub.iceReadyCancel = publishReadyCancel
|
||||
resources = append(resources, pub)
|
||||
|
||||
var nnPubReadRTCP uint64
|
||||
return pub.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
return pub.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
nn, attr, err := i.nextRTCPReader.Read(buf, attributes)
|
||||
if nnPubReadRTCP++; nnPubReadRTCP > 0 && pub.cancel != nil {
|
||||
|
@ -303,7 +353,7 @@ func TestRtcBasic_Republish(t *testing.T) {
|
|||
}
|
||||
|
||||
// Initialize re-publisher with private api.
|
||||
if theRepublisher, err = NewTestPublisher(CreateApiForPublisher, func(pub *TestPublisher) error {
|
||||
if theRepublisher, err = newTestPublisher(createApiForPublisher, func(pub *testPublisher) error {
|
||||
pub.streamSuffix = streamSuffix
|
||||
pub.iceReadyCancel = republishReadyCancel
|
||||
resources = append(resources, pub)
|
||||
|
@ -369,7 +419,7 @@ func TestRtcBasic_Republish(t *testing.T) {
|
|||
func TestRtcDTLS_ClientActive_Default(t *testing.T) {
|
||||
if err := filterTestError(func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupActive
|
||||
return nil
|
||||
|
@ -380,15 +430,15 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -397,9 +447,9 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) {
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed {
|
||||
return true
|
||||
}
|
||||
|
@ -424,7 +474,7 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) {
|
|||
func TestRtcDTLS_ClientPassive_Default(t *testing.T) {
|
||||
if err := filterTestError(func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -435,15 +485,15 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -452,9 +502,9 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) {
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed {
|
||||
return true
|
||||
}
|
||||
|
@ -476,7 +526,7 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) {
|
|||
func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) {
|
||||
if err := filterTestError(func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupActive
|
||||
return nil
|
||||
|
@ -487,15 +537,15 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -504,15 +554,15 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) {
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != ChunkTypeDTLS {
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != chunkTypeDTLS {
|
||||
return true
|
||||
}
|
||||
|
||||
// Copy the alert to server, ignore error.
|
||||
if chunk.content == DTLSContentTypeAlert {
|
||||
if chunk.content == dtlsContentTypeAlert {
|
||||
_, _ = api.proxy.Deliver(c.SourceAddr(), c.DestinationAddr(), c.UserData())
|
||||
_, _ = api.proxy.Deliver(c.SourceAddr(), c.DestinationAddr(), c.UserData())
|
||||
}
|
||||
|
@ -535,7 +585,7 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) {
|
|||
func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) {
|
||||
if err := filterTestError(func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -546,15 +596,15 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -563,15 +613,15 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) {
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != ChunkTypeDTLS {
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != chunkTypeDTLS {
|
||||
return true
|
||||
}
|
||||
|
||||
// Copy the alert to server, ignore error.
|
||||
if chunk.content == DTLSContentTypeAlert {
|
||||
if chunk.content == dtlsContentTypeAlert {
|
||||
_, _ = api.proxy.Deliver(c.SourceAddr(), c.DestinationAddr(), c.UserData())
|
||||
_, _ = api.proxy.Deliver(c.SourceAddr(), c.DestinationAddr(), c.UserData())
|
||||
}
|
||||
|
@ -601,7 +651,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
|
|||
var r0 error
|
||||
err := func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupActive
|
||||
return nil
|
||||
|
@ -612,15 +662,15 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -629,17 +679,17 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
nnClientHello, nnMaxDrop := 0, 1
|
||||
var lastClientHello *DTLSRecord
|
||||
var lastClientHello *dtlsRecord
|
||||
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != ChunkTypeDTLS || chunk.content != DTLSContentTypeHandshake || chunk.handshake != DTLSHandshakeTypeClientHello {
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != chunkTypeDTLS || chunk.content != dtlsContentTypeHandshake || chunk.handshake != dtlsHandshakeTypeClientHello {
|
||||
return true
|
||||
}
|
||||
|
||||
record, err := NewDTLSRecord(c.UserData())
|
||||
record, err := newDTLSRecord(c.UserData())
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
@ -679,7 +729,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
|
|||
var r0 error
|
||||
err := func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -690,15 +740,15 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -707,17 +757,17 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
nnClientHello, nnMaxDrop := 0, 1
|
||||
var lastClientHello *DTLSRecord
|
||||
var lastClientHello *dtlsRecord
|
||||
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != ChunkTypeDTLS || chunk.content != DTLSContentTypeHandshake || chunk.handshake != DTLSHandshakeTypeClientHello {
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != chunkTypeDTLS || chunk.content != dtlsContentTypeHandshake || chunk.handshake != dtlsHandshakeTypeClientHello {
|
||||
return true
|
||||
}
|
||||
|
||||
record, err := NewDTLSRecord(c.UserData())
|
||||
record, err := newDTLSRecord(c.UserData())
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
@ -756,7 +806,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
|
|||
var r0, r1 error
|
||||
err := func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupActive
|
||||
return nil
|
||||
|
@ -767,15 +817,15 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -784,23 +834,23 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
nnServerHello, nnMaxDrop := 0, 1
|
||||
var lastClientHello, lastServerHello *DTLSRecord
|
||||
var lastClientHello, lastServerHello *dtlsRecord
|
||||
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != ChunkTypeDTLS || chunk.content != DTLSContentTypeHandshake ||
|
||||
(chunk.handshake != DTLSHandshakeTypeClientHello && chunk.handshake != DTLSHandshakeTypeServerHello) {
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != chunkTypeDTLS || chunk.content != dtlsContentTypeHandshake ||
|
||||
(chunk.handshake != dtlsHandshakeTypeClientHello && chunk.handshake != dtlsHandshakeTypeServerHello) {
|
||||
return true
|
||||
}
|
||||
|
||||
record, err := NewDTLSRecord(c.UserData())
|
||||
record, err := newDTLSRecord(c.UserData())
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if chunk.handshake == DTLSHandshakeTypeClientHello {
|
||||
if chunk.handshake == dtlsHandshakeTypeClientHello {
|
||||
if lastClientHello != nil && record.Equals(lastClientHello) {
|
||||
r0 = errors.Errorf("dup record %v", record)
|
||||
}
|
||||
|
@ -844,7 +894,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
|
|||
var r0, r1 error
|
||||
err := func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -855,15 +905,15 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -872,23 +922,23 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
nnServerHello, nnMaxDrop := 0, 1
|
||||
var lastClientHello, lastServerHello *DTLSRecord
|
||||
var lastClientHello, lastServerHello *dtlsRecord
|
||||
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != ChunkTypeDTLS || chunk.content != DTLSContentTypeHandshake ||
|
||||
(chunk.handshake != DTLSHandshakeTypeClientHello && chunk.handshake != DTLSHandshakeTypeServerHello) {
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != chunkTypeDTLS || chunk.content != dtlsContentTypeHandshake ||
|
||||
(chunk.handshake != dtlsHandshakeTypeClientHello && chunk.handshake != dtlsHandshakeTypeServerHello) {
|
||||
return true
|
||||
}
|
||||
|
||||
record, err := NewDTLSRecord(c.UserData())
|
||||
record, err := newDTLSRecord(c.UserData())
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if chunk.handshake == DTLSHandshakeTypeClientHello {
|
||||
if chunk.handshake == dtlsHandshakeTypeClientHello {
|
||||
if lastClientHello != nil && record.Equals(lastClientHello) {
|
||||
r0 = errors.Errorf("dup record %v", record)
|
||||
}
|
||||
|
@ -929,7 +979,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
|
|||
var r0 error
|
||||
err := func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupActive
|
||||
return nil
|
||||
|
@ -940,15 +990,15 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -957,17 +1007,17 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
nnCertificate, nnMaxDrop := 0, 1
|
||||
var lastCertificate *DTLSRecord
|
||||
var lastCertificate *dtlsRecord
|
||||
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != ChunkTypeDTLS || chunk.content != DTLSContentTypeHandshake || chunk.handshake != DTLSHandshakeTypeCertificate {
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != chunkTypeDTLS || chunk.content != dtlsContentTypeHandshake || chunk.handshake != dtlsHandshakeTypeCertificate {
|
||||
return true
|
||||
}
|
||||
|
||||
record, err := NewDTLSRecord(c.UserData())
|
||||
record, err := newDTLSRecord(c.UserData())
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
@ -1006,7 +1056,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
|
|||
var r0 error
|
||||
err := func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -1017,15 +1067,15 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -1034,17 +1084,17 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
nnCertificate, nnMaxDrop := 0, 1
|
||||
var lastCertificate *DTLSRecord
|
||||
var lastCertificate *dtlsRecord
|
||||
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != ChunkTypeDTLS || chunk.content != DTLSContentTypeHandshake || chunk.handshake != DTLSHandshakeTypeCertificate {
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed || chunk.chunk != chunkTypeDTLS || chunk.content != dtlsContentTypeHandshake || chunk.handshake != dtlsHandshakeTypeCertificate {
|
||||
return true
|
||||
}
|
||||
|
||||
record, err := NewDTLSRecord(c.UserData())
|
||||
record, err := newDTLSRecord(c.UserData())
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
@ -1083,7 +1133,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
|
|||
var r0, r1 error
|
||||
err := func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupActive
|
||||
return nil
|
||||
|
@ -1094,15 +1144,15 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -1111,17 +1161,17 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
nnCertificate, nnMaxDrop := 0, 1
|
||||
var lastChangeCipherSepc, lastCertifidate *DTLSRecord
|
||||
var lastChangeCipherSepc, lastCertifidate *dtlsRecord
|
||||
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed || (!chunk.IsChangeCipherSpec() && !chunk.IsCertificate()) {
|
||||
return true
|
||||
}
|
||||
|
||||
record, err := NewDTLSRecord(c.UserData())
|
||||
record, err := newDTLSRecord(c.UserData())
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
@ -1169,7 +1219,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
|
|||
var r0, r1 error
|
||||
err := func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -1180,15 +1230,15 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -1197,17 +1247,17 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
nnCertificate, nnMaxDrop := 0, 1
|
||||
var lastChangeCipherSepc, lastCertifidate *DTLSRecord
|
||||
var lastChangeCipherSepc, lastCertifidate *dtlsRecord
|
||||
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed || (!chunk.IsChangeCipherSpec() && !chunk.IsCertificate()) {
|
||||
return true
|
||||
}
|
||||
|
||||
record, err := NewDTLSRecord(c.UserData())
|
||||
record, err := newDTLSRecord(c.UserData())
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
@ -1246,7 +1296,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
|
|||
func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) {
|
||||
if err := filterTestError(func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -1257,10 +1307,10 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
nnDrop, dropAll := 0, false
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed {
|
||||
return true
|
||||
}
|
||||
|
@ -1299,7 +1349,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) {
|
|||
func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) {
|
||||
if err := filterTestError(func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -1310,10 +1360,10 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
nnDrop, dropAll := 0, false
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed {
|
||||
return true
|
||||
}
|
||||
|
@ -1352,7 +1402,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) {
|
|||
func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) {
|
||||
if err := filterTestError(func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -1363,10 +1413,10 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
nnDrop, dropAll := 0, false
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed {
|
||||
return true
|
||||
}
|
||||
|
@ -1405,7 +1455,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) {
|
|||
func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) {
|
||||
if err := filterTestError(func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -1416,10 +1466,10 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
nnDrop, dropAll := 0, false
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed {
|
||||
return true
|
||||
}
|
||||
|
@ -1459,7 +1509,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) {
|
|||
func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) {
|
||||
if err := filterTestError(func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -1470,15 +1520,15 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -1487,10 +1537,10 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) {
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
nnDropClientHello, nnDropCertificate := 0, 0
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed {
|
||||
return true
|
||||
}
|
||||
|
@ -1537,7 +1587,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
|
|||
var r0 error
|
||||
err := func() error {
|
||||
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
|
||||
p, err := NewTestPublisher(CreateApiForPublisher, func(p *TestPublisher) error {
|
||||
p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error {
|
||||
p.streamSuffix = streamSuffix
|
||||
p.onOffer = testUtilSetupPassive
|
||||
return nil
|
||||
|
@ -1548,15 +1598,15 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *TestWebRTCAPI) {
|
||||
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
|
||||
var nnRTCP, nnRTP int64
|
||||
api.registry.Add(NewRTPInterceptor(func(i *RTPInterceptor) {
|
||||
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
|
||||
i.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
nnRTP++
|
||||
return i.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}))
|
||||
api.registry.Add(NewRTCPInterceptor(func(i *RTCPInterceptor) {
|
||||
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
|
||||
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if nnRTCP++; nnRTCP >= int64(*srsPublishOKPackets) && nnRTP >= int64(*srsPublishOKPackets) {
|
||||
cancel() // Send enough packets, done.
|
||||
|
@ -1565,11 +1615,11 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
|
|||
return i.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
}))
|
||||
}, func(api *TestWebRTCAPI) {
|
||||
}, func(api *testWebRTCAPI) {
|
||||
nnDropClientHello, nnDropCertificate := 0, 0
|
||||
var firstCertificate time.Time
|
||||
api.router.AddChunkFilter(func(c vnet.Chunk) (ok bool) {
|
||||
chunk, parsed := NewChunkMessageType(c)
|
||||
chunk, parsed := newChunkMessageType(c)
|
||||
if !parsed {
|
||||
return true
|
||||
}
|
||||
|
|
282
trunk/3rdparty/srs-bench/srs/srs.go
vendored
Normal file
282
trunk/3rdparty/srs-bench/srs/srs.go
vendored
Normal file
|
@ -0,0 +1,282 @@
|
|||
// The MIT License (MIT)
|
||||
//
|
||||
// Copyright (c) 2021 Winlin
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
// this software and associated documentation files (the "Software"), to deal in
|
||||
// the Software without restriction, including without limitation the rights to
|
||||
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
// subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
package srs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var sr, dumpAudio, dumpVideo string
|
||||
var pli int
|
||||
|
||||
var pr, sourceAudio, sourceVideo string
|
||||
var fps int
|
||||
|
||||
var audioLevel, videoTWCC bool
|
||||
|
||||
var clients, streams, delay int
|
||||
|
||||
var statListen string
|
||||
|
||||
func Parse(ctx context.Context) {
|
||||
fl := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||
|
||||
var sfu string
|
||||
fl.StringVar(&sfu, "sfu", "srs", "The SFU server, srs or janus")
|
||||
|
||||
fl.StringVar(&sr, "sr", "", "")
|
||||
fl.StringVar(&dumpAudio, "da", "", "")
|
||||
fl.StringVar(&dumpVideo, "dv", "", "")
|
||||
fl.IntVar(&pli, "pli", 10, "")
|
||||
|
||||
fl.StringVar(&pr, "pr", "", "")
|
||||
fl.StringVar(&sourceAudio, "sa", "", "")
|
||||
fl.StringVar(&sourceVideo, "sv", "", "")
|
||||
fl.IntVar(&fps, "fps", 0, "")
|
||||
|
||||
fl.BoolVar(&audioLevel, "al", true, "")
|
||||
fl.BoolVar(&videoTWCC, "twcc", true, "")
|
||||
|
||||
fl.IntVar(&clients, "nn", 1, "")
|
||||
fl.IntVar(&streams, "sn", 1, "")
|
||||
fl.IntVar(&delay, "delay", 50, "")
|
||||
|
||||
fl.StringVar(&statListen, "stat", "", "")
|
||||
|
||||
fl.Usage = func() {
|
||||
fmt.Println(fmt.Sprintf("Usage: %v [Options]", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf("Options:"))
|
||||
fmt.Println(fmt.Sprintf(" -sfu The target SFU, srs or janus. Default: srs"))
|
||||
fmt.Println(fmt.Sprintf(" -nn The number of clients to simulate. Default: 1"))
|
||||
fmt.Println(fmt.Sprintf(" -sn The number of streams to simulate. Variable: %%d. Default: 1"))
|
||||
fmt.Println(fmt.Sprintf(" -delay The start delay in ms for each client or stream to simulate. Default: 50"))
|
||||
fmt.Println(fmt.Sprintf(" -al [Optional] Whether enable audio-level. Default: true"))
|
||||
fmt.Println(fmt.Sprintf(" -twcc [Optional] Whether enable vdieo-twcc. Default: true"))
|
||||
fmt.Println(fmt.Sprintf(" -stat [Optional] The stat server API listen port."))
|
||||
fmt.Println(fmt.Sprintf("Player or Subscriber:"))
|
||||
fmt.Println(fmt.Sprintf(" -sr The url to play/subscribe. If sn exceed 1, auto append variable %%d."))
|
||||
fmt.Println(fmt.Sprintf(" -da [Optional] The file path to dump audio, ignore if empty."))
|
||||
fmt.Println(fmt.Sprintf(" -dv [Optional] The file path to dump video, ignore if empty."))
|
||||
fmt.Println(fmt.Sprintf(" -pli [Optional] PLI request interval in seconds. Default: 10"))
|
||||
fmt.Println(fmt.Sprintf("Publisher:"))
|
||||
fmt.Println(fmt.Sprintf(" -pr The url to publish. If sn exceed 1, auto append variable %%d."))
|
||||
fmt.Println(fmt.Sprintf(" -fps The fps of .h264 source file."))
|
||||
fmt.Println(fmt.Sprintf(" -sa [Optional] The file path to read audio, ignore if empty."))
|
||||
fmt.Println(fmt.Sprintf(" -sv [Optional] The file path to read video, ignore if empty."))
|
||||
fmt.Println(fmt.Sprintf("\n例如,1个播放,1个推流:"))
|
||||
fmt.Println(fmt.Sprintf(" %v -sr webrtc://localhost/live/livestream", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf(" %v -pr webrtc://localhost/live/livestream -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf("\n例如,1个流,3个播放,共3个客户端:"))
|
||||
fmt.Println(fmt.Sprintf(" %v -sr webrtc://localhost/live/livestream -nn 3", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf(" %v -pr webrtc://localhost/live/livestream -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf("\n例如,2个流,每个流3个播放,共6个客户端:"))
|
||||
fmt.Println(fmt.Sprintf(" %v -sr webrtc://localhost/live/livestream_%%d -sn 2 -nn 3", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf(" %v -pr webrtc://localhost/live/livestream_%%d -sn 2 -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf("\n例如,2个推流:"))
|
||||
fmt.Println(fmt.Sprintf(" %v -pr webrtc://localhost/live/livestream_%%d -sn 2 -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf("\n例如,1个录制:"))
|
||||
fmt.Println(fmt.Sprintf(" %v -sr webrtc://localhost/live/livestream -da avatar.ogg -dv avatar.h264", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf("\n例如,1个明文播放:"))
|
||||
fmt.Println(fmt.Sprintf(" %v -sr webrtc://localhost/live/livestream?encrypt=false", os.Args[0]))
|
||||
fmt.Println()
|
||||
}
|
||||
_ = fl.Parse(os.Args[1:])
|
||||
|
||||
showHelp := (clients <= 0 || streams <= 0)
|
||||
if sr == "" && pr == "" {
|
||||
showHelp = true
|
||||
}
|
||||
if pr != "" && (sourceAudio == "" && sourceVideo == "") {
|
||||
showHelp = true
|
||||
}
|
||||
if showHelp {
|
||||
fl.Usage()
|
||||
os.Exit(-1)
|
||||
}
|
||||
|
||||
if statListen != "" && !strings.Contains(statListen, ":") {
|
||||
statListen = ":" + statListen
|
||||
}
|
||||
|
||||
summaryDesc := fmt.Sprintf("clients=%v, delay=%v, al=%v, twcc=%v, stat=%v", clients, delay, audioLevel, videoTWCC, statListen)
|
||||
if sr != "" {
|
||||
summaryDesc = fmt.Sprintf("%v, play(url=%v, da=%v, dv=%v, pli=%v)", summaryDesc, sr, dumpAudio, dumpVideo, pli)
|
||||
}
|
||||
if pr != "" {
|
||||
summaryDesc = fmt.Sprintf("%v, publish(url=%v, sa=%v, sv=%v, fps=%v)",
|
||||
summaryDesc, pr, sourceAudio, sourceVideo, fps)
|
||||
}
|
||||
logger.Tf(ctx, "Run benchmark with %v", summaryDesc)
|
||||
|
||||
checkFlags := func() error {
|
||||
if dumpVideo != "" && !strings.HasSuffix(dumpVideo, ".h264") && !strings.HasSuffix(dumpVideo, ".ivf") {
|
||||
return errors.Errorf("Should be .ivf or .264, actual %v", dumpVideo)
|
||||
}
|
||||
|
||||
if sourceVideo != "" && !strings.HasSuffix(sourceVideo, ".h264") {
|
||||
return errors.Errorf("Should be .264, actual %v", sourceVideo)
|
||||
}
|
||||
|
||||
if sourceVideo != "" && strings.HasSuffix(sourceVideo, ".h264") && fps <= 0 {
|
||||
return errors.Errorf("Video fps should >0, actual %v", fps)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err := checkFlags(); err != nil {
|
||||
logger.Ef(ctx, "Check faile err %+v", err)
|
||||
os.Exit(-1)
|
||||
}
|
||||
}
|
||||
|
||||
func Run(ctx context.Context) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
// Run tasks.
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Run STAT API server.
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
if statListen == "" {
|
||||
return
|
||||
}
|
||||
|
||||
var lc net.ListenConfig
|
||||
ln, err := lc.Listen(ctx, "tcp", statListen)
|
||||
if err != nil {
|
||||
logger.Ef(ctx, "stat listen err+%v", err)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
handleStat(ctx, mux, statListen)
|
||||
|
||||
srv := &http.Server{
|
||||
Handler: mux,
|
||||
BaseContext: func(listener net.Listener) context.Context {
|
||||
return ctx
|
||||
},
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
srv.Shutdown(ctx)
|
||||
}()
|
||||
|
||||
logger.Tf(ctx, "Stat listen at %v", statListen)
|
||||
if err := srv.Serve(ln); err != nil {
|
||||
if ctx.Err() == nil {
|
||||
logger.Ef(ctx, "stat serve err+%v", err)
|
||||
cancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// Run all subscribers or players.
|
||||
for i := 0; sr != "" && i < streams && ctx.Err() == nil; i++ {
|
||||
r_auto := sr
|
||||
if streams > 1 && !strings.Contains(r_auto, "%") {
|
||||
r_auto += "%d"
|
||||
}
|
||||
|
||||
r2 := r_auto
|
||||
if strings.Contains(r2, "%") {
|
||||
r2 = fmt.Sprintf(r2, i)
|
||||
}
|
||||
|
||||
for j := 0; sr != "" && j < clients && ctx.Err() == nil; j++ {
|
||||
// Dump audio or video only for the first client.
|
||||
da, dv := dumpAudio, dumpVideo
|
||||
if i > 0 {
|
||||
da, dv = "", ""
|
||||
}
|
||||
|
||||
gStatRTC.Subscribers.Expect++
|
||||
gStatRTC.Subscribers.Alive++
|
||||
|
||||
wg.Add(1)
|
||||
go func(sr, da, dv string) {
|
||||
defer wg.Done()
|
||||
defer func() {
|
||||
gStatRTC.Subscribers.Alive--
|
||||
}()
|
||||
|
||||
if err := startPlay(ctx, sr, da, dv, audioLevel, videoTWCC, pli); err != nil {
|
||||
if errors.Cause(err) != context.Canceled {
|
||||
logger.Wf(ctx, "Run err %+v", err)
|
||||
}
|
||||
}
|
||||
}(r2, da, dv)
|
||||
|
||||
time.Sleep(time.Duration(delay) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// Run all publishers.
|
||||
for i := 0; pr != "" && i < streams && ctx.Err() == nil; i++ {
|
||||
r_auto := pr
|
||||
if streams > 1 && !strings.Contains(r_auto, "%") {
|
||||
r_auto += "%d"
|
||||
}
|
||||
|
||||
r2 := r_auto
|
||||
if strings.Contains(r2, "%") {
|
||||
r2 = fmt.Sprintf(r2, i)
|
||||
}
|
||||
|
||||
gStatRTC.Publishers.Expect++
|
||||
gStatRTC.Publishers.Alive++
|
||||
|
||||
wg.Add(1)
|
||||
go func(pr string) {
|
||||
defer wg.Done()
|
||||
defer func() {
|
||||
gStatRTC.Publishers.Alive--
|
||||
}()
|
||||
|
||||
if err := startPublish(ctx, pr, sourceAudio, sourceVideo, fps, audioLevel, videoTWCC); err != nil {
|
||||
if errors.Cause(err) != context.Canceled {
|
||||
logger.Wf(ctx, "Run err %+v", err)
|
||||
}
|
||||
}
|
||||
}(r2)
|
||||
|
||||
time.Sleep(time.Duration(delay) * time.Millisecond)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
6
trunk/3rdparty/srs-bench/srs/stat.go
vendored
6
trunk/3rdparty/srs-bench/srs/stat.go
vendored
|
@ -41,9 +41,9 @@ type statRTC struct {
|
|||
PeerConnection interface{} `json:"random-pc"`
|
||||
}
|
||||
|
||||
var StatRTC statRTC
|
||||
var gStatRTC statRTC
|
||||
|
||||
func HandleStat(ctx context.Context, mux *http.ServeMux, l string) {
|
||||
func handleStat(ctx context.Context, mux *http.ServeMux, l string) {
|
||||
if strings.HasPrefix(l, ":") {
|
||||
l = "127.0.0.1" + l
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ func HandleStat(ctx context.Context, mux *http.ServeMux, l string) {
|
|||
Code int `json:"code"`
|
||||
Data interface{} `json:"data"`
|
||||
}{
|
||||
0, &StatRTC,
|
||||
0, &gStatRTC,
|
||||
}
|
||||
|
||||
b, err := json.Marshal(res)
|
||||
|
|
214
trunk/3rdparty/srs-bench/srs/util.go
vendored
214
trunk/3rdparty/srs-bench/srs/util.go
vendored
|
@ -376,97 +376,97 @@ func srsIsRTCP(b []byte) bool {
|
|||
return (len(b) >= 12) && (b[0]&0x80) != 0 && (b[1] >= 192 && b[1] <= 223)
|
||||
}
|
||||
|
||||
type ChunkType int
|
||||
type chunkType int
|
||||
|
||||
const (
|
||||
ChunkTypeICE ChunkType = iota + 1
|
||||
ChunkTypeDTLS
|
||||
ChunkTypeRTP
|
||||
ChunkTypeRTCP
|
||||
chunkTypeICE chunkType = iota + 1
|
||||
chunkTypeDTLS
|
||||
chunkTypeRTP
|
||||
chunkTypeRTCP
|
||||
)
|
||||
|
||||
func (v ChunkType) String() string {
|
||||
func (v chunkType) String() string {
|
||||
switch v {
|
||||
case ChunkTypeICE:
|
||||
case chunkTypeICE:
|
||||
return "ICE"
|
||||
case ChunkTypeDTLS:
|
||||
case chunkTypeDTLS:
|
||||
return "DTLS"
|
||||
case ChunkTypeRTP:
|
||||
case chunkTypeRTP:
|
||||
return "RTP"
|
||||
case ChunkTypeRTCP:
|
||||
case chunkTypeRTCP:
|
||||
return "RTCP"
|
||||
default:
|
||||
return "Unknown"
|
||||
}
|
||||
}
|
||||
|
||||
type DTLSContentType int
|
||||
type dtlsContentType int
|
||||
|
||||
const (
|
||||
DTLSContentTypeHandshake DTLSContentType = 22
|
||||
DTLSContentTypeChangeCipherSpec DTLSContentType = 20
|
||||
DTLSContentTypeAlert DTLSContentType = 21
|
||||
dtlsContentTypeHandshake dtlsContentType = 22
|
||||
dtlsContentTypeChangeCipherSpec dtlsContentType = 20
|
||||
dtlsContentTypeAlert dtlsContentType = 21
|
||||
)
|
||||
|
||||
func (v DTLSContentType) String() string {
|
||||
func (v dtlsContentType) String() string {
|
||||
switch v {
|
||||
case DTLSContentTypeHandshake:
|
||||
case dtlsContentTypeHandshake:
|
||||
return "Handshake"
|
||||
case DTLSContentTypeChangeCipherSpec:
|
||||
case dtlsContentTypeChangeCipherSpec:
|
||||
return "ChangeCipherSpec"
|
||||
default:
|
||||
return "Unknown"
|
||||
}
|
||||
}
|
||||
|
||||
type DTLSHandshakeType int
|
||||
type dtlsHandshakeType int
|
||||
|
||||
const (
|
||||
DTLSHandshakeTypeClientHello DTLSHandshakeType = 1
|
||||
DTLSHandshakeTypeServerHello DTLSHandshakeType = 2
|
||||
DTLSHandshakeTypeCertificate DTLSHandshakeType = 11
|
||||
DTLSHandshakeTypeServerKeyExchange DTLSHandshakeType = 12
|
||||
DTLSHandshakeTypeCertificateRequest DTLSHandshakeType = 13
|
||||
DTLSHandshakeTypeServerDone DTLSHandshakeType = 14
|
||||
DTLSHandshakeTypeCertificateVerify DTLSHandshakeType = 15
|
||||
DTLSHandshakeTypeClientKeyExchange DTLSHandshakeType = 16
|
||||
DTLSHandshakeTypeFinished DTLSHandshakeType = 20
|
||||
dtlsHandshakeTypeClientHello dtlsHandshakeType = 1
|
||||
dtlsHandshakeTypeServerHello dtlsHandshakeType = 2
|
||||
dtlsHandshakeTypeCertificate dtlsHandshakeType = 11
|
||||
dtlsHandshakeTypeServerKeyExchange dtlsHandshakeType = 12
|
||||
dtlsHandshakeTypeCertificateRequest dtlsHandshakeType = 13
|
||||
dtlsHandshakeTypeServerDone dtlsHandshakeType = 14
|
||||
dtlsHandshakeTypeCertificateVerify dtlsHandshakeType = 15
|
||||
dtlsHandshakeTypeClientKeyExchange dtlsHandshakeType = 16
|
||||
dtlsHandshakeTypeFinished dtlsHandshakeType = 20
|
||||
)
|
||||
|
||||
func (v DTLSHandshakeType) String() string {
|
||||
func (v dtlsHandshakeType) String() string {
|
||||
switch v {
|
||||
case DTLSHandshakeTypeClientHello:
|
||||
case dtlsHandshakeTypeClientHello:
|
||||
return "ClientHello"
|
||||
case DTLSHandshakeTypeServerHello:
|
||||
case dtlsHandshakeTypeServerHello:
|
||||
return "ServerHello"
|
||||
case DTLSHandshakeTypeCertificate:
|
||||
case dtlsHandshakeTypeCertificate:
|
||||
return "Certificate"
|
||||
case DTLSHandshakeTypeServerKeyExchange:
|
||||
case dtlsHandshakeTypeServerKeyExchange:
|
||||
return "ServerKeyExchange"
|
||||
case DTLSHandshakeTypeCertificateRequest:
|
||||
case dtlsHandshakeTypeCertificateRequest:
|
||||
return "CertificateRequest"
|
||||
case DTLSHandshakeTypeServerDone:
|
||||
case dtlsHandshakeTypeServerDone:
|
||||
return "ServerDone"
|
||||
case DTLSHandshakeTypeCertificateVerify:
|
||||
case dtlsHandshakeTypeCertificateVerify:
|
||||
return "CertificateVerify"
|
||||
case DTLSHandshakeTypeClientKeyExchange:
|
||||
case dtlsHandshakeTypeClientKeyExchange:
|
||||
return "ClientKeyExchange"
|
||||
case DTLSHandshakeTypeFinished:
|
||||
case dtlsHandshakeTypeFinished:
|
||||
return "Finished"
|
||||
default:
|
||||
return "Unknown"
|
||||
}
|
||||
}
|
||||
|
||||
type ChunkMessageType struct {
|
||||
chunk ChunkType
|
||||
content DTLSContentType
|
||||
handshake DTLSHandshakeType
|
||||
type chunkMessageType struct {
|
||||
chunk chunkType
|
||||
content dtlsContentType
|
||||
handshake dtlsHandshakeType
|
||||
}
|
||||
|
||||
func (v *ChunkMessageType) String() string {
|
||||
if v.chunk == ChunkTypeDTLS {
|
||||
if v.content == DTLSContentTypeHandshake {
|
||||
func (v *chunkMessageType) String() string {
|
||||
if v.chunk == chunkTypeDTLS {
|
||||
if v.content == dtlsContentTypeHandshake {
|
||||
return fmt.Sprintf("%v-%v-%v", v.chunk, v.content, v.handshake)
|
||||
} else {
|
||||
return fmt.Sprintf("%v-%v", v.chunk, v.content)
|
||||
|
@ -475,26 +475,26 @@ func (v *ChunkMessageType) String() string {
|
|||
return fmt.Sprintf("%v", v.chunk)
|
||||
}
|
||||
|
||||
func NewChunkMessageType(c vnet.Chunk) (*ChunkMessageType, bool) {
|
||||
func newChunkMessageType(c vnet.Chunk) (*chunkMessageType, bool) {
|
||||
b := c.UserData()
|
||||
|
||||
if len(b) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
v := &ChunkMessageType{}
|
||||
v := &chunkMessageType{}
|
||||
|
||||
if srsIsRTPOrRTCP(b) {
|
||||
if srsIsRTCP(b) {
|
||||
v.chunk = ChunkTypeRTCP
|
||||
v.chunk = chunkTypeRTCP
|
||||
} else {
|
||||
v.chunk = ChunkTypeRTP
|
||||
v.chunk = chunkTypeRTP
|
||||
}
|
||||
return v, true
|
||||
}
|
||||
|
||||
if srsIsStun(b) {
|
||||
v.chunk = ChunkTypeICE
|
||||
v.chunk = chunkTypeICE
|
||||
return v, true
|
||||
}
|
||||
|
||||
|
@ -502,40 +502,40 @@ func NewChunkMessageType(c vnet.Chunk) (*ChunkMessageType, bool) {
|
|||
return nil, false
|
||||
}
|
||||
|
||||
v.chunk, v.content = ChunkTypeDTLS, DTLSContentType(b[0])
|
||||
if v.content != DTLSContentTypeHandshake {
|
||||
v.chunk, v.content = chunkTypeDTLS, dtlsContentType(b[0])
|
||||
if v.content != dtlsContentTypeHandshake {
|
||||
return v, true
|
||||
}
|
||||
|
||||
if len(b) < 14 {
|
||||
return v, false
|
||||
}
|
||||
v.handshake = DTLSHandshakeType(b[13])
|
||||
v.handshake = dtlsHandshakeType(b[13])
|
||||
return v, true
|
||||
}
|
||||
|
||||
func (v *ChunkMessageType) IsHandshake() bool {
|
||||
return v.chunk == ChunkTypeDTLS && v.content == DTLSContentTypeHandshake
|
||||
func (v *chunkMessageType) IsHandshake() bool {
|
||||
return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake
|
||||
}
|
||||
|
||||
func (v *ChunkMessageType) IsClientHello() bool {
|
||||
return v.chunk == ChunkTypeDTLS && v.content == DTLSContentTypeHandshake && v.handshake == DTLSHandshakeTypeClientHello
|
||||
func (v *chunkMessageType) IsClientHello() bool {
|
||||
return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake && v.handshake == dtlsHandshakeTypeClientHello
|
||||
}
|
||||
|
||||
func (v *ChunkMessageType) IsServerHello() bool {
|
||||
return v.chunk == ChunkTypeDTLS && v.content == DTLSContentTypeHandshake && v.handshake == DTLSHandshakeTypeServerHello
|
||||
func (v *chunkMessageType) IsServerHello() bool {
|
||||
return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake && v.handshake == dtlsHandshakeTypeServerHello
|
||||
}
|
||||
|
||||
func (v *ChunkMessageType) IsCertificate() bool {
|
||||
return v.chunk == ChunkTypeDTLS && v.content == DTLSContentTypeHandshake && v.handshake == DTLSHandshakeTypeCertificate
|
||||
func (v *chunkMessageType) IsCertificate() bool {
|
||||
return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeHandshake && v.handshake == dtlsHandshakeTypeCertificate
|
||||
}
|
||||
|
||||
func (v *ChunkMessageType) IsChangeCipherSpec() bool {
|
||||
return v.chunk == ChunkTypeDTLS && v.content == DTLSContentTypeChangeCipherSpec
|
||||
func (v *chunkMessageType) IsChangeCipherSpec() bool {
|
||||
return v.chunk == chunkTypeDTLS && v.content == dtlsContentTypeChangeCipherSpec
|
||||
}
|
||||
|
||||
type DTLSRecord struct {
|
||||
ContentType DTLSContentType
|
||||
type dtlsRecord struct {
|
||||
ContentType dtlsContentType
|
||||
Version uint16
|
||||
Epoch uint16
|
||||
SequenceNumber uint64
|
||||
|
@ -543,25 +543,25 @@ type DTLSRecord struct {
|
|||
Data []byte
|
||||
}
|
||||
|
||||
func NewDTLSRecord(b []byte) (*DTLSRecord, error) {
|
||||
v := &DTLSRecord{}
|
||||
func newDTLSRecord(b []byte) (*dtlsRecord, error) {
|
||||
v := &dtlsRecord{}
|
||||
return v, v.Unmarshal(b)
|
||||
}
|
||||
|
||||
func (v *DTLSRecord) String() string {
|
||||
func (v *dtlsRecord) String() string {
|
||||
return fmt.Sprintf("epoch=%v, sequence=%v", v.Epoch, v.SequenceNumber)
|
||||
}
|
||||
|
||||
func (v *DTLSRecord) Equals(p *DTLSRecord) bool {
|
||||
func (v *dtlsRecord) Equals(p *dtlsRecord) bool {
|
||||
return v.Epoch == p.Epoch && v.SequenceNumber == p.SequenceNumber
|
||||
}
|
||||
|
||||
func (v *DTLSRecord) Unmarshal(b []byte) error {
|
||||
func (v *dtlsRecord) Unmarshal(b []byte) error {
|
||||
if len(b) < 13 {
|
||||
return errors.Errorf("requires 13B only %v", len(b))
|
||||
}
|
||||
|
||||
v.ContentType = DTLSContentType(b[0])
|
||||
v.ContentType = dtlsContentType(b[0])
|
||||
v.Version = uint16(b[1])<<8 | uint16(b[2])
|
||||
v.Epoch = uint16(b[3])<<8 | uint16(b[4])
|
||||
v.SequenceNumber = uint64(b[5])<<40 | uint64(b[6])<<32 | uint64(b[7])<<24 | uint64(b[8])<<16 | uint64(b[9])<<8 | uint64(b[10])
|
||||
|
@ -570,11 +570,11 @@ func (v *DTLSRecord) Unmarshal(b []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type TestWebRTCAPIOptionFunc func(api *TestWebRTCAPI)
|
||||
type testWebRTCAPIOptionFunc func(api *testWebRTCAPI)
|
||||
|
||||
type TestWebRTCAPI struct {
|
||||
type testWebRTCAPI struct {
|
||||
// The options to setup the api.
|
||||
options []TestWebRTCAPIOptionFunc
|
||||
options []testWebRTCAPIOptionFunc
|
||||
// The api and settings.
|
||||
api *webrtc.API
|
||||
mediaEngine *webrtc.MediaEngine
|
||||
|
@ -588,8 +588,8 @@ type TestWebRTCAPI struct {
|
|||
proxy *vnet_proxy.UDPProxy
|
||||
}
|
||||
|
||||
func NewTestWebRTCAPI(options ...TestWebRTCAPIOptionFunc) (*TestWebRTCAPI, error) {
|
||||
v := &TestWebRTCAPI{}
|
||||
func newTestWebRTCAPI(options ...testWebRTCAPIOptionFunc) (*testWebRTCAPI, error) {
|
||||
v := &testWebRTCAPI{}
|
||||
|
||||
v.mediaEngine = &webrtc.MediaEngine{}
|
||||
if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil {
|
||||
|
@ -610,7 +610,7 @@ func NewTestWebRTCAPI(options ...TestWebRTCAPIOptionFunc) (*TestWebRTCAPI, error
|
|||
return v, nil
|
||||
}
|
||||
|
||||
func (v *TestWebRTCAPI) Close() error {
|
||||
func (v *testWebRTCAPI) Close() error {
|
||||
if v.proxy != nil {
|
||||
_ = v.proxy.Close()
|
||||
}
|
||||
|
@ -622,7 +622,7 @@ func (v *TestWebRTCAPI) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *TestWebRTCAPI) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error {
|
||||
func (v *testWebRTCAPI) Setup(vnetClientIP string, options ...testWebRTCAPIOptionFunc) error {
|
||||
// Setting engine for https://github.com/pion/transport/tree/master/vnet
|
||||
setupVnet := func(vnetClientIP string) (err error) {
|
||||
// We create a private router for a api, however, it's possible to share the
|
||||
|
@ -674,23 +674,23 @@ func (v *TestWebRTCAPI) Setup(vnetClientIP string, options ...TestWebRTCAPIOptio
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *TestWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
|
||||
func (v *testWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
|
||||
return v.api.NewPeerConnection(configuration)
|
||||
}
|
||||
|
||||
type TestPlayerOptionFunc func(p *TestPlayer) error
|
||||
type testPlayerOptionFunc func(p *testPlayer) error
|
||||
|
||||
type TestPlayer struct {
|
||||
type testPlayer struct {
|
||||
pc *webrtc.PeerConnection
|
||||
receivers []*webrtc.RTPReceiver
|
||||
// We should dispose it.
|
||||
api *TestWebRTCAPI
|
||||
api *testWebRTCAPI
|
||||
// Optional suffix for stream url.
|
||||
streamSuffix string
|
||||
}
|
||||
|
||||
func CreateApiForPlayer(play *TestPlayer) error {
|
||||
api, err := NewTestWebRTCAPI()
|
||||
func createApiForPlayer(play *testPlayer) error {
|
||||
api, err := newTestWebRTCAPI()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -699,8 +699,8 @@ func CreateApiForPlayer(play *TestPlayer) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func NewTestPlayer(options ...TestPlayerOptionFunc) (*TestPlayer, error) {
|
||||
v := &TestPlayer{}
|
||||
func newTestPlayer(options ...testPlayerOptionFunc) (*testPlayer, error) {
|
||||
v := &testPlayer{}
|
||||
|
||||
for _, opt := range options {
|
||||
if err := opt(v); err != nil {
|
||||
|
@ -711,11 +711,11 @@ func NewTestPlayer(options ...TestPlayerOptionFunc) (*TestPlayer, error) {
|
|||
return v, nil
|
||||
}
|
||||
|
||||
func (v *TestPlayer) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error {
|
||||
func (v *testPlayer) Setup(vnetClientIP string, options ...testWebRTCAPIOptionFunc) error {
|
||||
return v.api.Setup(vnetClientIP, options...)
|
||||
}
|
||||
|
||||
func (v *TestPlayer) Close() error {
|
||||
func (v *testPlayer) Close() error {
|
||||
if v.pc != nil {
|
||||
_ = v.pc.Close()
|
||||
}
|
||||
|
@ -731,13 +731,13 @@ func (v *TestPlayer) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
|
||||
func (v *testPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
|
||||
r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
|
||||
if v.streamSuffix != "" {
|
||||
r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
|
||||
}
|
||||
pli := time.Duration(*srsPlayPLI) * time.Millisecond
|
||||
logger.Tf(ctx, "Start play url=%v", r)
|
||||
logger.Tf(ctx, "Run play url=%v", r)
|
||||
|
||||
pc, err := v.api.NewPeerConnection(webrtc.Configuration{})
|
||||
if err != nil {
|
||||
|
@ -770,7 +770,7 @@ func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
|
|||
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
|
||||
}
|
||||
|
||||
// Start a proxy for real server and vnet.
|
||||
// Run a proxy for real server and vnet.
|
||||
if address, err := parseAddressOfCandidate(answer); err != nil {
|
||||
return errors.Wrapf(err, "parse address of %v", answer)
|
||||
} else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
|
||||
|
@ -834,9 +834,9 @@ func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
|
|||
return err
|
||||
}
|
||||
|
||||
type TestPublisherOptionFunc func(p *TestPublisher) error
|
||||
type testPublisherOptionFunc func(p *testPublisher) error
|
||||
|
||||
type TestPublisher struct {
|
||||
type testPublisher struct {
|
||||
onOffer func(s *webrtc.SessionDescription) error
|
||||
onAnswer func(s *webrtc.SessionDescription) error
|
||||
iceReadyCancel context.CancelFunc
|
||||
|
@ -845,15 +845,15 @@ type TestPublisher struct {
|
|||
vIngester *videoIngester
|
||||
pc *webrtc.PeerConnection
|
||||
// We should dispose it.
|
||||
api *TestWebRTCAPI
|
||||
api *testWebRTCAPI
|
||||
// Optional suffix for stream url.
|
||||
streamSuffix string
|
||||
// To cancel the publisher, pass by Run.
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func CreateApiForPublisher(pub *TestPublisher) error {
|
||||
api, err := NewTestWebRTCAPI()
|
||||
func createApiForPublisher(pub *testPublisher) error {
|
||||
api, err := newTestWebRTCAPI()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -862,10 +862,10 @@ func CreateApiForPublisher(pub *TestPublisher) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func NewTestPublisher(options ...TestPublisherOptionFunc) (*TestPublisher, error) {
|
||||
func newTestPublisher(options ...testPublisherOptionFunc) (*testPublisher, error) {
|
||||
sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio
|
||||
|
||||
v := &TestPublisher{}
|
||||
v := &testPublisher{}
|
||||
|
||||
for _, opt := range options {
|
||||
if err := opt(v); err != nil {
|
||||
|
@ -875,17 +875,17 @@ func NewTestPublisher(options ...TestPublisherOptionFunc) (*TestPublisher, error
|
|||
|
||||
// Create ingesters.
|
||||
if sourceAudio != "" {
|
||||
v.aIngester = NewAudioIngester(sourceAudio)
|
||||
v.aIngester = newAudioIngester(sourceAudio)
|
||||
}
|
||||
if sourceVideo != "" {
|
||||
v.vIngester = NewVideoIngester(sourceVideo)
|
||||
v.vIngester = newVideoIngester(sourceVideo)
|
||||
}
|
||||
|
||||
// Setup the interceptors for packets.
|
||||
api := v.api
|
||||
api.options = append(api.options, func(api *TestWebRTCAPI) {
|
||||
api.options = append(api.options, func(api *testWebRTCAPI) {
|
||||
// Filter for RTCP packets.
|
||||
rtcpInterceptor := &RTCPInterceptor{}
|
||||
rtcpInterceptor := &rtcpInterceptor{}
|
||||
rtcpInterceptor.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
return rtcpInterceptor.nextRTCPReader.Read(buf, attributes)
|
||||
}
|
||||
|
@ -906,11 +906,11 @@ func NewTestPublisher(options ...TestPublisherOptionFunc) (*TestPublisher, error
|
|||
return v, nil
|
||||
}
|
||||
|
||||
func (v *TestPublisher) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error {
|
||||
func (v *testPublisher) Setup(vnetClientIP string, options ...testWebRTCAPIOptionFunc) error {
|
||||
return v.api.Setup(vnetClientIP, options...)
|
||||
}
|
||||
|
||||
func (v *TestPublisher) Close() error {
|
||||
func (v *testPublisher) Close() error {
|
||||
if v.vIngester != nil {
|
||||
_ = v.vIngester.Close()
|
||||
}
|
||||
|
@ -930,12 +930,12 @@ func (v *TestPublisher) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *TestPublisher) SetStreamSuffix(suffix string) *TestPublisher {
|
||||
func (v *testPublisher) SetStreamSuffix(suffix string) *testPublisher {
|
||||
v.streamSuffix = suffix
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) error {
|
||||
func (v *testPublisher) Run(ctx context.Context, cancel context.CancelFunc) error {
|
||||
// Save the cancel.
|
||||
v.cancel = cancel
|
||||
|
||||
|
@ -945,7 +945,7 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro
|
|||
}
|
||||
sourceVideo, sourceAudio, fps := *srsPublishVideo, *srsPublishAudio, *srsPublishVideoFps
|
||||
|
||||
logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v",
|
||||
logger.Tf(ctx, "Run publish url=%v, audio=%v, video=%v, fps=%v",
|
||||
r, sourceAudio, sourceVideo, fps)
|
||||
|
||||
pc, err := v.api.NewPeerConnection(webrtc.Configuration{})
|
||||
|
@ -986,7 +986,7 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro
|
|||
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
|
||||
}
|
||||
|
||||
// Start a proxy for real server and vnet.
|
||||
// Run a proxy for real server and vnet.
|
||||
if address, err := parseAddressOfCandidate(answerSDP); err != nil {
|
||||
return errors.Wrapf(err, "parse address of %v", answerSDP)
|
||||
} else if err := v.api.proxy.Proxy(v.api.network, address); err != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue