1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

DTLS: Update regression tests

This commit is contained in:
winlin 2021-03-09 22:26:50 +08:00
parent d4d11c2c18
commit 06f2e1462e
19 changed files with 3329 additions and 949 deletions

20
trunk/3rdparty/srs-bench/LICENSE vendored Normal file
View file

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2021 srs-bench(ossrs)
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View file

@ -5,18 +5,18 @@ default: bench test
clean: clean:
rm -f ./objs/srs_bench ./objs/srs_test rm -f ./objs/srs_bench ./objs/srs_test
.format.txt: *.go rtc/*.go srs/*.go .format.txt: *.go srs/*.go vnet/*.go
gofmt -w . gofmt -w .
echo "done" > .format.txt echo "done" > .format.txt
bench: ./objs/srs_bench bench: ./objs/srs_bench
./objs/srs_bench: .format.txt *.go rtc/*.go srs/*.go Makefile ./objs/srs_bench: .format.txt *.go srs/*.go vnet/*.go Makefile
go build -mod=vendor -o objs/srs_bench . go build -mod=vendor -o objs/srs_bench .
test: ./objs/srs_test test: ./objs/srs_test
./objs/srs_test: .format.txt *.go rtc/*.go srs/*.go Makefile ./objs/srs_test: .format.txt *.go srs/*.go vnet/*.go Makefile
go test ./srs -mod=vendor -c -o ./objs/srs_test go test ./srs -mod=vendor -c -o ./objs/srs_test
help: help:

View file

@ -102,8 +102,7 @@ ffmpeg -re -i doc/source.200kbps.768x320.flv -c copy -f flv -y rtmp://localhost/
回归测试需要先启动[SRS](https://github.com/ossrs/srs/issues/307)支持WebRTC推拉流 回归测试需要先启动[SRS](https://github.com/ossrs/srs/issues/307)支持WebRTC推拉流
```bash ```bash
eip=$(ifconfig en0 inet| grep 'inet '|awk '{print $2}') if [[ ! -z $(ifconfig en0 inet| grep 'inet '|awk '{print $2}') ]]; then
if [[ ! -z $eip ]]; then
docker run -p 1935:1935 -p 8080:8080 -p 1985:1985 -p 8000:8000/udp \ docker run -p 1935:1935 -p 8080:8080 -p 1985:1985 -p 8000:8000/udp \
--rm --env CANDIDATE=$(ifconfig en0 inet| grep 'inet '|awk '{print $2}')\ --rm --env CANDIDATE=$(ifconfig en0 inet| grep 'inet '|awk '{print $2}')\
registry.cn-hangzhou.aliyuncs.com/ossrs/srs:v4.0.76 objs/srs -c conf/rtc.conf registry.cn-hangzhou.aliyuncs.com/ossrs/srs:v4.0.76 objs/srs -c conf/rtc.conf
@ -119,7 +118,20 @@ go test ./srs -mod=vendor -v
也可以用make编译出重复使用的二进制 也可以用make编译出重复使用的二进制
```bash ```bash
make test && ./objs/srs_test -test.v make && ./objs/srs_test -test.v
```
> Note: 注意由于pion不支持`DTLS 1.0`所以SFU必须要支持`DTLS 1.2`才行。
运行结果如下:
```bash
$ make && ./objs/srs_test -test.v
=== RUN TestRTCServerVersion
--- PASS: TestRTCServerVersion (0.00s)
=== RUN TestRTCServerPublishPlay
--- PASS: TestRTCServerPublishPlay (1.28s)
PASS
``` ```
可以给回归测试传参数,这样可以测试不同的序列,比如: 可以给回归测试传参数,这样可以测试不同的序列,比如:
@ -127,23 +139,43 @@ make test && ./objs/srs_test -test.v
```bash ```bash
go test ./srs -mod=vendor -v -srs-server=127.0.0.1 go test ./srs -mod=vendor -v -srs-server=127.0.0.1
# Or # Or
make test && ./objs/srs_test -test.v -srs-server=127.0.0.1 make && ./objs/srs_test -test.v -srs-server=127.0.0.1
``` ```
支持的参数如下: 支持的参数如下:
* `-srs-server`RTC服务器地址。默认值`127.0.0.1` * `-srs-server`RTC服务器地址。默认值`127.0.0.1`
* `-srs-stream`RTC流地址。默认值`/rtc/regression` * `-srs-stream`RTC流地址。默认值`/rtc/regression`
* `-srs-log`,是否开启详细日志。默认值:`false`
* `-srs-timeout`每个Case的超时时间毫秒。默认值`3000`即3秒。 * `-srs-timeout`每个Case的超时时间毫秒。默认值`3000`即3秒。
* `-srs-play-pli`播放时PLI的间隔毫秒。默认值`5000`即5秒。
* `-srs-play-ok-packets`,播放时,收到多少个包认为是测试通过,默认值:`10`
* `-srs-publish-audio`,推流时,使用的音频文件。默认值:`avatar.ogg` * `-srs-publish-audio`,推流时,使用的音频文件。默认值:`avatar.ogg`
* `-srs-publish-video`,推流时,使用的视频文件。默认值:`avatar.h264` * `-srs-publish-video`,推流时,使用的视频文件。默认值:`avatar.h264`
* `-srs-publish-video-fps`推流时视频文件的FPS。默认值`25` * `-srs-publish-video-fps`推流时视频文件的FPS。默认值`25`
* `-srs-vnet-client-ip`,设置[pion/vnet](https://github.com/ossrs/srs-bench/blob/feature/rtc/vnet/example_test.go)客户端的虚拟IP不能和服务器IP冲突。默认值`192.168.168.168`
其他不常用参数: 其他不常用参数:
* `-srs-log`,是否开启详细日志。默认值:`false`
* `-srs-play-ok-packets`,播放时,收到多少个包认为是测试通过,默认值:`10`
* `-srs-publish-ok-packets`,推流时,发送多少个包认为时测试通过,默认值:`10`
* `-srs-https`是否连接HTTPS-API。默认值`false`即连接HTTP-API。 * `-srs-https`是否连接HTTPS-API。默认值`false`即连接HTTP-API。
* `-srs-play-pli`播放时PLI的间隔毫秒。默认值`5000`即5秒。
* `-srs-dtls-drop-packets`DTLS丢包测试丢了多少个包算成功默认值`5`
## GCOVR
本机生成覆盖率时,我们使用工具[gcovr](https://gcovr.com/en/stable/guide.html)。
在macOS上安装gcovr
```bash
pip3 install gcovr
```
在CentOS上安装gcovr
```bash
yum install -y python2-pip &&
pip install lxml && pip install gcovr
```
2021.01, Winlin 2021.01, Winlin

View file

@ -1,3 +1,23 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 srs-bench(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package main package main
import ( import (
@ -42,7 +62,7 @@ func main() {
flag.IntVar(&delay, "delay", 50, "") flag.IntVar(&delay, "delay", 50, "")
var statListen string var statListen string
flag.StringVar(&statListen, "stat", ":18000", "") flag.StringVar(&statListen, "stat", "", "")
flag.Usage = func() { flag.Usage = func() {
fmt.Println(fmt.Sprintf("Usage: %v [Options]", os.Args[0])) fmt.Println(fmt.Sprintf("Usage: %v [Options]", os.Args[0]))
@ -52,7 +72,7 @@ func main() {
fmt.Println(fmt.Sprintf(" -delay The start delay in ms for each client or stream to simulate. Default: 50")) fmt.Println(fmt.Sprintf(" -delay The start delay in ms for each client or stream to simulate. Default: 50"))
fmt.Println(fmt.Sprintf(" -al [Optional] Whether enable audio-level. Default: true")) fmt.Println(fmt.Sprintf(" -al [Optional] Whether enable audio-level. Default: true"))
fmt.Println(fmt.Sprintf(" -twcc [Optional] Whether enable vdieo-twcc. Default: true")) fmt.Println(fmt.Sprintf(" -twcc [Optional] Whether enable vdieo-twcc. Default: true"))
fmt.Println(fmt.Sprintf(" -stat [Optional] The stat server API listen port. Default: :18000")) fmt.Println(fmt.Sprintf(" -stat [Optional] The stat server API listen port."))
fmt.Println(fmt.Sprintf("Player or Subscriber:")) fmt.Println(fmt.Sprintf("Player or Subscriber:"))
fmt.Println(fmt.Sprintf(" -sr The url to play/subscribe. If sn exceed 1, auto append variable %%d.")) fmt.Println(fmt.Sprintf(" -sr The url to play/subscribe. If sn exceed 1, auto append variable %%d."))
fmt.Println(fmt.Sprintf(" -da [Optional] The file path to dump audio, ignore if empty.")) fmt.Println(fmt.Sprintf(" -da [Optional] The file path to dump audio, ignore if empty."))

View file

@ -1,5 +0,0 @@
package rtc
const (
rtpOutboundMTU = 1200
)

View file

@ -1,27 +0,0 @@
package rtc
import (
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3"
"strings"
)
func payloaderForCodec(codec webrtc.RTPCodecCapability) (rtp.Payloader, error) {
switch strings.ToLower(codec.MimeType) {
case strings.ToLower(webrtc.MimeTypeH264):
return &codecs.H264Payloader{}, nil
case strings.ToLower(webrtc.MimeTypeOpus):
return &codecs.OpusPayloader{}, nil
case strings.ToLower(webrtc.MimeTypeVP8):
return &codecs.VP8Payloader{}, nil
case strings.ToLower(webrtc.MimeTypeVP9):
return &codecs.VP9Payloader{}, nil
case strings.ToLower(webrtc.MimeTypeG722):
return &codecs.G722Payloader{}, nil
case strings.ToLower(webrtc.MimeTypePCMU), strings.ToLower(webrtc.MimeTypePCMA):
return &codecs.G711Payloader{}, nil
default:
return nil, webrtc.ErrNoPayloaderForCodec
}
}

View file

@ -1,27 +0,0 @@
package rtc
import (
"github.com/pion/webrtc/v3"
"strings"
)
// Do a fuzzy find for a codec in the list of codecs
// Used for lookup up a codec in an existing list to find a match
func codecParametersFuzzySearch(needle webrtc.RTPCodecParameters, haystack []webrtc.RTPCodecParameters) (webrtc.RTPCodecParameters, error) {
// First attempt to match on MimeType + SDPFmtpLine
for _, c := range haystack {
if strings.EqualFold(c.RTPCodecCapability.MimeType, needle.RTPCodecCapability.MimeType) &&
c.RTPCodecCapability.SDPFmtpLine == needle.RTPCodecCapability.SDPFmtpLine {
return c, nil
}
}
// Fallback to just MimeType
for _, c := range haystack {
if strings.EqualFold(c.RTPCodecCapability.MimeType, needle.RTPCodecCapability.MimeType) {
return c, nil
}
}
return webrtc.RTPCodecParameters{}, webrtc.ErrCodecNotFound
}

View file

@ -1,246 +0,0 @@
package rtc
import (
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"strings"
"sync"
)
// trackBinding is a single bind for a Track
// Bind can be called multiple times, this stores the
// result for a single bind call so that it can be used when writing
type trackBinding struct {
id string
ssrc webrtc.SSRC
payloadType webrtc.PayloadType
writeStream webrtc.TrackLocalWriter
}
// TrackLocalStaticRTP is a TrackLocal that has a pre-set codec and accepts RTP Packets.
// If you wish to send a media.Sample use TrackLocalStaticSample
type TrackLocalStaticRTP struct {
mu sync.RWMutex
bindings []trackBinding
codec webrtc.RTPCodecCapability
id, streamID string
}
// NewTrackLocalStaticRTP returns a TrackLocalStaticRTP.
func NewTrackLocalStaticRTP(c webrtc.RTPCodecCapability, id, streamID string) (*TrackLocalStaticRTP, error) {
return &TrackLocalStaticRTP{
codec: c,
bindings: []trackBinding{},
id: id,
streamID: streamID,
}, nil
}
// Bind is called by the PeerConnection after negotiation is complete
// This asserts that the code requested is supported by the remote peer.
// If so it setups all the state (SSRC and PayloadType) to have a call
func (s *TrackLocalStaticRTP) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) {
s.mu.Lock()
defer s.mu.Unlock()
parameters := webrtc.RTPCodecParameters{RTPCodecCapability: s.codec}
if codec, err := codecParametersFuzzySearch(parameters, t.CodecParameters()); err == nil {
s.bindings = append(s.bindings, trackBinding{
ssrc: t.SSRC(),
payloadType: codec.PayloadType,
writeStream: t.WriteStream(),
id: t.ID(),
})
return codec, nil
}
return webrtc.RTPCodecParameters{}, webrtc.ErrUnsupportedCodec
}
// Unbind implements the teardown logic when the track is no longer needed. This happens
// because a track has been stopped.
func (s *TrackLocalStaticRTP) Unbind(t webrtc.TrackLocalContext) error {
s.mu.Lock()
defer s.mu.Unlock()
for i := range s.bindings {
if s.bindings[i].id == t.ID() {
s.bindings[i] = s.bindings[len(s.bindings)-1]
s.bindings = s.bindings[:len(s.bindings)-1]
return nil
}
}
return webrtc.ErrUnbindFailed
}
// ID is the unique identifier for this Track. This should be unique for the
// stream, but doesn't have to globally unique. A common example would be 'audio' or 'video'
// and StreamID would be 'desktop' or 'webcam'
func (s *TrackLocalStaticRTP) ID() string { return s.id }
// StreamID is the group this track belongs too. This must be unique
func (s *TrackLocalStaticRTP) StreamID() string { return s.streamID }
// Kind controls if this TrackLocal is audio or video
func (s *TrackLocalStaticRTP) Kind() webrtc.RTPCodecType {
switch {
case strings.HasPrefix(s.codec.MimeType, "audio/"):
return webrtc.RTPCodecTypeAudio
case strings.HasPrefix(s.codec.MimeType, "video/"):
return webrtc.RTPCodecTypeVideo
default:
return webrtc.RTPCodecType(0)
}
}
// Codec gets the Codec of the track
func (s *TrackLocalStaticRTP) Codec() webrtc.RTPCodecCapability {
return s.codec
}
// WriteRTP writes a RTP Packet to the TrackLocalStaticRTP
// If one PeerConnection fails the packets will still be sent to
// all PeerConnections. The error message will contain the ID of the failed
// PeerConnections so you can remove them
func (s *TrackLocalStaticRTP) WriteRTP(p *rtp.Packet) error {
s.mu.RLock()
defer s.mu.RUnlock()
writeErrs := []error{}
outboundPacket := *p
for _, b := range s.bindings {
outboundPacket.Header.SSRC = uint32(b.ssrc)
outboundPacket.Header.PayloadType = uint8(b.payloadType)
if _, err := b.writeStream.WriteRTP(&outboundPacket.Header, outboundPacket.Payload); err != nil {
writeErrs = append(writeErrs, err)
}
}
return FlattenErrs(writeErrs)
}
// Write writes a RTP Packet as a buffer to the TrackLocalStaticRTP
// If one PeerConnection fails the packets will still be sent to
// all PeerConnections. The error message will contain the ID of the failed
// PeerConnections so you can remove them
func (s *TrackLocalStaticRTP) Write(b []byte) (n int, err error) {
packet := &rtp.Packet{}
if err = packet.Unmarshal(b); err != nil {
return 0, err
}
return len(b), s.WriteRTP(packet)
}
// TrackLocalStaticSample is a TrackLocal that has a pre-set codec and accepts Samples.
// If you wish to send a RTP Packet use TrackLocalStaticRTP
type TrackLocalStaticSample struct {
packetizer rtp.Packetizer
rtpTrack *TrackLocalStaticRTP
clockRate float64
// Set the callback before write RTP packet.
OnBeforeWritePacket func(rtp *rtp.Packet)
}
// NewTrackLocalStaticSample returns a TrackLocalStaticSample
func NewTrackLocalStaticSample(c webrtc.RTPCodecCapability, id, streamID string) (*TrackLocalStaticSample, error) {
rtpTrack, err := NewTrackLocalStaticRTP(c, id, streamID)
if err != nil {
return nil, err
}
return &TrackLocalStaticSample{
rtpTrack: rtpTrack,
}, nil
}
// ID is the unique identifier for this Track. This should be unique for the
// stream, but doesn't have to globally unique. A common example would be 'audio' or 'video'
// and StreamID would be 'desktop' or 'webcam'
func (s *TrackLocalStaticSample) ID() string { return s.rtpTrack.ID() }
// StreamID is the group this track belongs too. This must be unique
func (s *TrackLocalStaticSample) StreamID() string { return s.rtpTrack.StreamID() }
// Kind controls if this TrackLocal is audio or video
func (s *TrackLocalStaticSample) Kind() webrtc.RTPCodecType { return s.rtpTrack.Kind() }
// Codec gets the Codec of the track
func (s *TrackLocalStaticSample) Codec() webrtc.RTPCodecCapability {
return s.rtpTrack.Codec()
}
// Bind is called by the PeerConnection after negotiation is complete
// This asserts that the code requested is supported by the remote peer.
// If so it setups all the state (SSRC and PayloadType) to have a call
func (s *TrackLocalStaticSample) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) {
codec, err := s.rtpTrack.Bind(t)
if err != nil {
return codec, err
}
s.rtpTrack.mu.Lock()
defer s.rtpTrack.mu.Unlock()
// We only need one packetizer
if s.packetizer != nil {
return codec, nil
}
payloader, err := payloaderForCodec(codec.RTPCodecCapability)
if err != nil {
return codec, err
}
s.packetizer = rtp.NewPacketizer(
rtpOutboundMTU,
0, // Value is handled when writing
0, // Value is handled when writing
payloader,
rtp.NewRandomSequencer(),
codec.ClockRate,
)
s.clockRate = float64(codec.RTPCodecCapability.ClockRate)
return codec, nil
}
// Unbind implements the teardown logic when the track is no longer needed. This happens
// because a track has been stopped.
func (s *TrackLocalStaticSample) Unbind(t webrtc.TrackLocalContext) error {
return s.rtpTrack.Unbind(t)
}
// WriteSample writes a Sample to the TrackLocalStaticSample
// If one PeerConnection fails the packets will still be sent to
// all PeerConnections. The error message will contain the ID of the failed
// PeerConnections so you can remove them
func (s *TrackLocalStaticSample) WriteSample(sample media.Sample) error {
s.rtpTrack.mu.RLock()
p := s.packetizer
clockRate := s.clockRate
s.rtpTrack.mu.RUnlock()
if p == nil {
return nil
}
samples := sample.Duration.Seconds() * clockRate
packets := p.(rtp.Packetizer).Packetize(sample.Data, uint32(samples))
writeErrs := []error{}
for _, p := range packets {
if s.OnBeforeWritePacket != nil {
s.OnBeforeWritePacket(p)
}
if err := s.rtpTrack.WriteRTP(p); err != nil {
writeErrs = append(writeErrs, err)
}
}
return FlattenErrs(writeErrs)
}

View file

@ -1,10 +0,0 @@
package rtc
import "fmt"
func FlattenErrs(errors []error) error {
if len(errors) == 0 {
return nil
}
return fmt.Errorf("%v", errors)
}

View file

@ -1,3 +1,23 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 srs-bench(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package srs package srs
import ( import (
@ -65,7 +85,14 @@ func StartPlay(ctx context.Context, r, dumpAudio, dumpVideo string, enableAudioL
if err != nil { if err != nil {
return errors.Wrapf(err, "Create PC") return errors.Wrapf(err, "Create PC")
} }
defer pc.Close()
var receivers []*webrtc.RTPReceiver
defer func() {
pc.Close()
for _, receiver := range receivers {
receiver.Stop()
}
}()
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly, Direction: webrtc.RTPTransceiverDirectionRecvonly,
@ -132,6 +159,8 @@ func StartPlay(ctx context.Context, r, dumpAudio, dumpVideo string, enableAudioL
} }
}() }()
receivers = append(receivers, receiver)
codec := track.Codec() codec := track.Codec()
trackDesc := fmt.Sprintf("channels=%v", codec.Channels) trackDesc := fmt.Sprintf("channels=%v", codec.Channels)

View file

@ -1,20 +1,33 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 srs-bench(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package srs package srs
import ( import (
"context" "context"
"github.com/ossrs/go-oryx-lib/errors" "github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger" "github.com/ossrs/go-oryx-lib/logger"
"github.com/ossrs/srs-bench/rtc"
"github.com/pion/interceptor" "github.com/pion/interceptor"
"github.com/pion/rtp"
"github.com/pion/sdp/v3" "github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/pion/webrtc/v3/pkg/media/h264reader"
"github.com/pion/webrtc/v3/pkg/media/oggreader"
"io" "io"
"os"
"strings"
"sync" "sync"
"time" "time"
) )
@ -26,7 +39,12 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v, audio-level=%v, twcc=%v", logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v, audio-level=%v, twcc=%v",
r, sourceAudio, sourceVideo, fps, enableAudioLevel, enableTWCC) r, sourceAudio, sourceVideo, fps, enableAudioLevel, enableTWCC)
// For audio-level. // Filter for SPS/PPS marker.
var aIngester *audioIngester
var vIngester *videoIngester
// For audio-level and sps/pps marker.
// TODO: FIXME: Should share with player.
webrtcNewPeerConnection := func(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) { webrtcNewPeerConnection := func(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
m := &webrtc.MediaEngine{} m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil { if err := m.RegisterDefaultCodecs(); err != nil {
@ -53,12 +71,21 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
} }
} }
i := &interceptor.Registry{} registry := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { if err := webrtc.RegisterDefaultInterceptors(m, registry); err != nil {
return nil, err return nil, err
} }
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)) if sourceAudio != "" {
aIngester = NewAudioIngester(sourceAudio)
registry.Add(aIngester.audioLevelInterceptor)
}
if sourceVideo != "" {
vIngester = NewVideoIngester(sourceVideo)
registry.Add(vIngester.markerInterceptor)
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(registry))
return api.NewPeerConnection(configuration) return api.NewPeerConnection(configuration)
} }
@ -66,46 +93,30 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
if err != nil { if err != nil {
return errors.Wrapf(err, "Create PC") return errors.Wrapf(err, "Create PC")
} }
defer pc.Close()
var sVideoTrack *rtc.TrackLocalStaticSample doClose := func() {
var sVideoSender *webrtc.RTPSender if pc != nil {
if sourceVideo != "" { pc.Close()
mimeType, trackID := "video/H264", "video" }
if strings.HasSuffix(sourceVideo, ".ivf") { if vIngester != nil {
mimeType = "video/VP8" vIngester.Close()
}
if aIngester != nil {
aIngester.Close()
}
}
defer doClose()
if vIngester != nil {
if err := vIngester.AddTrack(pc, fps); err != nil {
return errors.Wrapf(err, "Add track")
}
} }
sVideoTrack, err = rtc.NewTrackLocalStaticSample( if aIngester != nil {
webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 90000}, trackID, "pion", if err := aIngester.AddTrack(pc); err != nil {
) return errors.Wrapf(err, "Add track")
if err != nil {
return errors.Wrapf(err, "Create video track")
} }
sVideoSender, err = pc.AddTrack(sVideoTrack)
if err != nil {
return errors.Wrapf(err, "Add video track")
}
sVideoSender.Stop()
}
var sAudioTrack *rtc.TrackLocalStaticSample
var sAudioSender *webrtc.RTPSender
if sourceAudio != "" {
mimeType, trackID := "audio/opus", "audio"
sAudioTrack, err = rtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 48000, Channels: 2}, trackID, "pion",
)
if err != nil {
return errors.Wrapf(err, "Create audio track")
}
sAudioSender, err = pc.AddTrack(sAudioTrack)
if err != nil {
return errors.Wrapf(err, "Add audio track")
}
defer sAudioSender.Stop()
} }
offer, err := pc.CreateOffer(nil) offer, err := pc.CreateOffer(nil)
@ -139,9 +150,11 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
logger.Tf(ctx, "Signaling state %v", state) logger.Tf(ctx, "Signaling state %v", state)
}) })
sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) { if aIngester != nil {
aIngester.sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) {
logger.Tf(ctx, "DTLS state %v", state) logger.Tf(ctx, "DTLS state %v", state)
}) })
}
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
pcDone, pcDoneCancel := context.WithCancel(context.Background()) pcDone, pcDoneCancel := context.WithCancel(context.Background())
@ -168,8 +181,15 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
<-ctx.Done()
doClose() // Interrupt the RTCP read.
}()
if sAudioSender == nil { wg.Add(1)
go func() {
defer wg.Done()
if aIngester == nil {
return return
} }
@ -181,7 +201,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
buf := make([]byte, 1500) buf := make([]byte, 1500)
for ctx.Err() == nil { for ctx.Err() == nil {
if _, _, err := sAudioSender.Read(buf); err != nil { if _, _, err := aIngester.sAudioSender.Read(buf); err != nil {
return return
} }
} }
@ -191,7 +211,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
go func() { go func() {
defer wg.Done() defer wg.Done()
if sAudioTrack == nil { if aIngester == nil {
return return
} }
@ -201,8 +221,9 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest audio %v", sourceAudio) logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest audio %v", sourceAudio)
} }
// Read audio and send out.
for ctx.Err() == nil { for ctx.Err() == nil {
if err := readAudioTrackFromDisk(ctx, sourceAudio, sAudioSender, sAudioTrack); err != nil { if err := aIngester.Ingest(ctx); err != nil {
if errors.Cause(err) == io.EOF { if errors.Cause(err) == io.EOF {
logger.Tf(ctx, "EOF, restart ingest audio %v", sourceAudio) logger.Tf(ctx, "EOF, restart ingest audio %v", sourceAudio)
continue continue
@ -216,7 +237,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
go func() { go func() {
defer wg.Done() defer wg.Done()
if sVideoSender == nil { if vIngester == nil {
return return
} }
@ -228,7 +249,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
buf := make([]byte, 1500) buf := make([]byte, 1500)
for ctx.Err() == nil { for ctx.Err() == nil {
if _, _, err := sVideoSender.Read(buf); err != nil { if _, _, err := vIngester.sVideoSender.Read(buf); err != nil {
return return
} }
} }
@ -238,7 +259,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
go func() { go func() {
defer wg.Done() defer wg.Done()
if sVideoTrack == nil { if vIngester == nil {
return return
} }
@ -249,7 +270,7 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
} }
for ctx.Err() == nil { for ctx.Err() == nil {
if err := readVideoTrackFromDisk(ctx, sourceVideo, sVideoSender, fps, sVideoTrack); err != nil { if err := vIngester.Ingest(ctx); err != nil {
if errors.Cause(err) == io.EOF { if errors.Cause(err) == io.EOF {
logger.Tf(ctx, "EOF, restart ingest video %v", sourceVideo) logger.Tf(ctx, "EOF, restart ingest video %v", sourceVideo)
continue continue
@ -276,154 +297,3 @@ func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps i
wg.Wait() wg.Wait()
return nil return nil
} }
func readAudioTrackFromDisk(ctx context.Context, source string, sender *webrtc.RTPSender, track *rtc.TrackLocalStaticSample) error {
f, err := os.Open(source)
if err != nil {
return errors.Wrapf(err, "Open file %v", source)
}
defer f.Close()
ogg, _, err := oggreader.NewWith(f)
if err != nil {
return errors.Wrapf(err, "Open ogg %v", source)
}
enc := sender.GetParameters().Encodings[0]
codec := sender.GetParameters().Codecs[0]
headers := sender.GetParameters().HeaderExtensions
logger.Tf(ctx, "Audio %v, tbn=%v, channels=%v, ssrc=%v, pt=%v, header=%v",
codec.MimeType, codec.ClockRate, codec.Channels, enc.SSRC, codec.PayloadType, headers)
// Whether should encode the audio-level in RTP header.
var audioLevel *webrtc.RTPHeaderExtensionParameter
for _, h := range headers {
if h.URI == sdp.AudioLevelURI {
audioLevel = &h
}
}
clock := newWallClock()
var lastGranule uint64
for ctx.Err() == nil {
pageData, pageHeader, err := ogg.ParseNextPage()
if err == io.EOF {
return nil
}
if err != nil {
return errors.Wrapf(err, "Read ogg")
}
// The amount of samples is the difference between the last and current timestamp
sampleCount := uint64(pageHeader.GranulePosition - lastGranule)
lastGranule = pageHeader.GranulePosition
sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 * sampleCount / uint64(codec.ClockRate))
// For audio-level, set the extensions if negotiated.
track.OnBeforeWritePacket = func(p *rtp.Packet) {
if audioLevel != nil {
if b, err := new(rtp.AudioLevelExtension).Marshal(); err == nil {
p.SetExtension(uint8(audioLevel.ID), b)
}
}
}
if err = track.WriteSample(media.Sample{Data: pageData, Duration: sampleDuration}); err != nil {
return errors.Wrapf(err, "Write sample")
}
if d := clock.Tick(sampleDuration); d > 0 {
time.Sleep(d)
}
}
return nil
}
func readVideoTrackFromDisk(ctx context.Context, source string, sender *webrtc.RTPSender, fps int, track *rtc.TrackLocalStaticSample) error {
f, err := os.Open(source)
if err != nil {
return errors.Wrapf(err, "Open file %v", source)
}
defer f.Close()
// TODO: FIXME: Support ivf for vp8.
h264, err := h264reader.NewReader(f)
if err != nil {
return errors.Wrapf(err, "Open h264 %v", source)
}
enc := sender.GetParameters().Encodings[0]
codec := sender.GetParameters().Codecs[0]
headers := sender.GetParameters().HeaderExtensions
logger.Tf(ctx, "Video %v, tbn=%v, fps=%v, ssrc=%v, pt=%v, header=%v",
codec.MimeType, codec.ClockRate, fps, enc.SSRC, codec.PayloadType, headers)
clock := newWallClock()
sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 / uint64(fps))
for ctx.Err() == nil {
var sps, pps *h264reader.NAL
var oFrames []*h264reader.NAL
for ctx.Err() == nil {
frame, err := h264.NextNAL()
if err == io.EOF {
return nil
}
if err != nil {
return errors.Wrapf(err, "Read h264")
}
oFrames = append(oFrames, frame)
logger.If(ctx, "NALU %v PictureOrderCount=%v, ForbiddenZeroBit=%v, RefIdc=%v, %v bytes",
frame.UnitType.String(), frame.PictureOrderCount, frame.ForbiddenZeroBit, frame.RefIdc, len(frame.Data))
if frame.UnitType == h264reader.NalUnitTypeSPS {
sps = frame
} else if frame.UnitType == h264reader.NalUnitTypePPS {
pps = frame
} else {
break
}
}
var frames []*h264reader.NAL
// Package SPS/PPS to STAP-A
if sps != nil && pps != nil {
stapA := packageAsSTAPA(sps, pps)
frames = append(frames, stapA)
}
// Append other original frames.
for _, frame := range oFrames {
if frame.UnitType != h264reader.NalUnitTypeSPS && frame.UnitType != h264reader.NalUnitTypePPS {
frames = append(frames, frame)
}
}
// Covert frames to sample(buffers).
for i, frame := range frames {
sample := media.Sample{Data: frame.Data, Duration: sampleDuration}
// Use the sample timestamp for frames.
if i != len(frames)-1 {
sample.Duration = 0
}
// For STAP-A, set marker to false, to make Chrome happy.
track.OnBeforeWritePacket = func(p *rtp.Packet) {
if i < len(frames)-1 {
p.Header.Marker = false
}
}
if err = track.WriteSample(sample); err != nil {
return errors.Wrapf(err, "Write sample")
}
}
if d := clock.Tick(sampleDuration); d > 0 {
time.Sleep(d)
}
}
return nil
}

File diff suppressed because it is too large Load diff

View file

@ -1,3 +1,23 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 srs-bench(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package srs package srs
import ( import (

View file

@ -1,3 +1,23 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 srs-bench(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package srs package srs
import ( import (
@ -7,10 +27,14 @@ import (
"fmt" "fmt"
"github.com/ossrs/go-oryx-lib/errors" "github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger" "github.com/ossrs/go-oryx-lib/logger"
"github.com/pion/transport/vnet"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media/h264reader" "github.com/pion/webrtc/v3/pkg/media/h264reader"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/url" "net/url"
"strconv"
"strings" "strings"
"time" "time"
) )
@ -140,3 +164,305 @@ func (v *wallClock) Tick(d time.Duration) time.Duration {
} }
return 0 return 0
} }
// Set to active, as DTLS client, to start ClientHello.
func testUtilSetupActive(s *webrtc.SessionDescription) error {
if strings.Contains(s.SDP, "setup:passive") {
return errors.New("set to active")
}
s.SDP = strings.ReplaceAll(s.SDP, "setup:actpass", "setup:active")
return nil
}
// Set to passive, as DTLS client, to start ClientHello.
func testUtilSetupPassive(s *webrtc.SessionDescription) error {
if strings.Contains(s.SDP, "setup:active") {
return errors.New("set to passive")
}
s.SDP = strings.ReplaceAll(s.SDP, "setup:actpass", "setup:passive")
return nil
}
// Parse address from SDP.
// candidate:0 1 udp 2130706431 192.168.3.8 8000 typ host generation 0
func parseAddressOfCandidate(answerSDP string) (*net.UDPAddr, error) {
answer := webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: answerSDP}
answerObject, err := answer.Unmarshal()
if err != nil {
return nil, errors.Wrapf(err, "unmarshal answer %v", answerSDP)
}
if len(answerObject.MediaDescriptions) == 0 {
return nil, errors.New("no media")
}
candidate, ok := answerObject.MediaDescriptions[0].Attribute("candidate")
if !ok {
return nil, errors.New("no candidate")
}
// candidate:0 1 udp 2130706431 192.168.3.8 8000 typ host generation 0
attrs := strings.Split(candidate, " ")
if len(attrs) <= 6 {
return nil, errors.Errorf("no address in %v", candidate)
}
// Parse ip and port from answer.
ip := attrs[4]
port, err := strconv.Atoi(attrs[5])
if err != nil {
return nil, errors.Wrapf(err, "invalid port %v", candidate)
}
address := fmt.Sprintf("%v:%v", ip, port)
addr, err := net.ResolveUDPAddr("udp4", address)
if err != nil {
return nil, errors.Wrapf(err, "parse %v", address)
}
return addr, nil
}
// Filter the test error, ignore context.Canceled
func filterTestError(errs ...error) error {
var filteredErrors []error
for _, err := range errs {
if err == nil || errors.Cause(err) == context.Canceled {
continue
}
filteredErrors = append(filteredErrors, err)
}
if len(filteredErrors) == 0 {
return nil
}
if len(filteredErrors) == 1 {
return filteredErrors[0]
}
var descs []string
for i, err := range filteredErrors[1:] {
descs = append(descs, fmt.Sprintf("err #%d, %+v", i, err))
}
return errors.Wrapf(filteredErrors[0], "with %v", strings.Join(descs, ","))
}
// For STUN packet, 0x00 is binding request, 0x01 is binding success response.
// @see srs_is_stun of https://github.com/ossrs/srs
func srsIsStun(b []byte) bool {
return len(b) > 0 && (b[0] == 0 || b[0] == 1)
}
// change_cipher_spec(20), alert(21), handshake(22), application_data(23)
// @see https://tools.ietf.org/html/rfc2246#section-6.2.1
// @see srs_is_dtls of https://github.com/ossrs/srs
func srsIsDTLS(b []byte) bool {
return (len(b) >= 13 && (b[0] > 19 && b[0] < 64))
}
// For RTP or RTCP, the V=2 which is in the high 2bits, 0xC0 (1100 0000)
// @see srs_is_rtp_or_rtcp of https://github.com/ossrs/srs
func srsIsRTPOrRTCP(b []byte) bool {
return (len(b) >= 12 && (b[0]&0xC0) == 0x80)
}
// For RTCP, PT is [128, 223] (or without marker [0, 95]).
// Literally, RTCP starts from 64 not 0, so PT is [192, 223] (or without marker [64, 95]).
// @note For RTP, the PT is [96, 127], or [224, 255] with marker.
// @see srs_is_rtcp of https://github.com/ossrs/srs
func srsIsRTCP(b []byte) bool {
return (len(b) >= 12) && (b[0]&0x80) != 0 && (b[1] >= 192 && b[1] <= 223)
}
type ChunkType int
const (
ChunkTypeICE ChunkType = iota + 1
ChunkTypeDTLS
ChunkTypeRTP
ChunkTypeRTCP
)
func (v ChunkType) String() string {
switch v {
case ChunkTypeICE:
return "ICE"
case ChunkTypeDTLS:
return "DTLS"
case ChunkTypeRTP:
return "RTP"
case ChunkTypeRTCP:
return "RTCP"
default:
return "Unknown"
}
}
type DTLSContentType int
const (
DTLSContentTypeHandshake DTLSContentType = 22
DTLSContentTypeChangeCipherSpec DTLSContentType = 20
DTLSContentTypeAlert DTLSContentType = 21
)
func (v DTLSContentType) String() string {
switch v {
case DTLSContentTypeHandshake:
return "Handshake"
case DTLSContentTypeChangeCipherSpec:
return "ChangeCipherSpec"
default:
return "Unknown"
}
}
type DTLSHandshakeType int
const (
DTLSHandshakeTypeClientHello DTLSHandshakeType = 1
DTLSHandshakeTypeServerHello DTLSHandshakeType = 2
DTLSHandshakeTypeCertificate DTLSHandshakeType = 11
DTLSHandshakeTypeServerKeyExchange DTLSHandshakeType = 12
DTLSHandshakeTypeCertificateRequest DTLSHandshakeType = 13
DTLSHandshakeTypeServerDone DTLSHandshakeType = 14
DTLSHandshakeTypeCertificateVerify DTLSHandshakeType = 15
DTLSHandshakeTypeClientKeyExchange DTLSHandshakeType = 16
DTLSHandshakeTypeFinished DTLSHandshakeType = 20
)
func (v DTLSHandshakeType) String() string {
switch v {
case DTLSHandshakeTypeClientHello:
return "ClientHello"
case DTLSHandshakeTypeServerHello:
return "ServerHello"
case DTLSHandshakeTypeCertificate:
return "Certificate"
case DTLSHandshakeTypeServerKeyExchange:
return "ServerKeyExchange"
case DTLSHandshakeTypeCertificateRequest:
return "CertificateRequest"
case DTLSHandshakeTypeServerDone:
return "ServerDone"
case DTLSHandshakeTypeCertificateVerify:
return "CertificateVerify"
case DTLSHandshakeTypeClientKeyExchange:
return "ClientKeyExchange"
case DTLSHandshakeTypeFinished:
return "Finished"
default:
return "Unknown"
}
}
type ChunkMessageType struct {
chunk ChunkType
content DTLSContentType
handshake DTLSHandshakeType
}
func (v *ChunkMessageType) String() string {
if v.chunk == ChunkTypeDTLS {
return fmt.Sprintf("%v-%v-%v", v.chunk, v.content, v.handshake)
}
return fmt.Sprintf("%v", v.chunk)
}
func NewChunkMessageType(c vnet.Chunk) (*ChunkMessageType, bool) {
b := c.UserData()
if len(b) == 0 {
return nil, false
}
v := &ChunkMessageType{}
if srsIsRTPOrRTCP(b) {
if srsIsRTCP(b) {
v.chunk = ChunkTypeRTCP
} else {
v.chunk = ChunkTypeRTP
}
return v, true
}
if srsIsStun(b) {
v.chunk = ChunkTypeICE
return v, true
}
if !srsIsDTLS(b) {
return nil, false
}
v.chunk, v.content = ChunkTypeDTLS, DTLSContentType(b[0])
if v.content != DTLSContentTypeHandshake {
return v, true
}
if len(b) < 14 {
return v, false
}
v.handshake = DTLSHandshakeType(b[13])
return v, true
}
func (v *ChunkMessageType) IsHandshake() bool {
return v.chunk == ChunkTypeDTLS && v.content == DTLSContentTypeHandshake
}
func (v *ChunkMessageType) IsClientHello() bool {
return v.chunk == ChunkTypeDTLS && v.content == DTLSContentTypeHandshake && v.handshake == DTLSHandshakeTypeClientHello
}
func (v *ChunkMessageType) IsServerHello() bool {
return v.chunk == ChunkTypeDTLS && v.content == DTLSContentTypeHandshake && v.handshake == DTLSHandshakeTypeServerHello
}
func (v *ChunkMessageType) IsCertificate() bool {
return v.chunk == ChunkTypeDTLS && v.content == DTLSContentTypeHandshake && v.handshake == DTLSHandshakeTypeCertificate
}
func (v *ChunkMessageType) IsChangeCipherSpec() bool {
return v.chunk == ChunkTypeDTLS && v.content == DTLSContentTypeChangeCipherSpec
}
type DTLSRecord struct {
ContentType DTLSContentType
Version uint16
Epoch uint16
SequenceNumber uint64
Length uint16
Data []byte
}
func NewDTLSRecord(b []byte) (*DTLSRecord, error) {
v := &DTLSRecord{}
return v, v.Unmarshal(b)
}
func (v *DTLSRecord) String() string {
return fmt.Sprintf("epoch=%v, sequence=%v", v.Epoch, v.SequenceNumber)
}
func (v *DTLSRecord) Equals(p *DTLSRecord) bool {
return v.Epoch == p.Epoch && v.SequenceNumber == p.SequenceNumber
}
func (v *DTLSRecord) Unmarshal(b []byte) error {
if len(b) < 13 {
return errors.Errorf("requires 13B only %v", len(b))
}
v.ContentType = DTLSContentType(uint8(b[0]))
v.Version = uint16(b[1])<<8 | uint16(b[2])
v.Epoch = uint16(b[3])<<8 | uint16(b[4])
v.SequenceNumber = uint64(b[5])<<40 | uint64(b[6])<<32 | uint64(b[7])<<24 | uint64(b[8])<<16 | uint64(b[9])<<8 | uint64(b[10])
v.Length = uint16(b[11])<<8 | uint16(b[12])
v.Data = b[13:]
return nil
}

View file

@ -0,0 +1,278 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 srs-bench(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package vnet_test
import (
"net"
vnet_proxy "github.com/ossrs/srs-bench/vnet"
"github.com/pion/logging"
"github.com/pion/transport/vnet"
)
// Proxy many vnet endpoint to one real server endpoint.
// For example:
// vnet(10.0.0.11:5787) => proxy => 192.168.1.10:8000
// vnet(10.0.0.11:5788) => proxy => 192.168.1.10:8000
// vnet(10.0.0.11:5789) => proxy => 192.168.1.10:8000
func ExampleUDPProxyManyToOne() { // nolint:govet
var clientNetwork *vnet.Net
var serverAddr *net.UDPAddr
if addr, err := net.ResolveUDPAddr("udp4", "192.168.1.10:8000"); err != nil {
// handle error
} else {
serverAddr = addr
}
// Setup the network and proxy.
if true {
// Create vnet WAN with one endpoint, please read from
// https://github.com/pion/transport/tree/master/vnet#example-wan-with-one-endpoint-vnet
router, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "0.0.0.0/0",
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
if err != nil {
// handle error
}
// Create a network and add to router, for example, for client.
clientNetwork = vnet.NewNet(&vnet.NetConfig{
StaticIP: "10.0.0.11",
})
if err = router.AddNet(clientNetwork); err != nil {
// handle error
}
// Start the router.
if err = router.Start(); err != nil {
// handle error
}
defer router.Stop() // nolint:errcheck
// Create a proxy, bind to the router.
proxy, err := vnet_proxy.NewProxy(router)
if err != nil {
// handle error
}
defer proxy.Close() // nolint:errcheck
// Start to proxy some addresses, clientNetwork is a hit for proxy,
// that the client in vnet is from this network.
if err := proxy.Proxy(clientNetwork, serverAddr); err != nil {
// handle error
}
}
// Now, all packets from client, will be proxy to real server, vice versa.
client0, err := clientNetwork.ListenPacket("udp4", "10.0.0.11:5787")
if err != nil {
// handle error
}
_, _ = client0.WriteTo([]byte("Hello"), serverAddr)
client1, err := clientNetwork.ListenPacket("udp4", "10.0.0.11:5788")
if err != nil {
// handle error
}
_, _ = client1.WriteTo([]byte("Hello"), serverAddr)
client2, err := clientNetwork.ListenPacket("udp4", "10.0.0.11:5789")
if err != nil {
// handle error
}
_, _ = client2.WriteTo([]byte("Hello"), serverAddr)
}
// Proxy many vnet endpoint to one real server endpoint.
// For example:
// vnet(10.0.0.11:5787) => proxy => 192.168.1.10:8000
// vnet(10.0.0.11:5788) => proxy => 192.168.1.10:8000
func ExampleUDPProxyMultileTimes() { // nolint:govet
var clientNetwork *vnet.Net
var serverAddr *net.UDPAddr
if addr, err := net.ResolveUDPAddr("udp4", "192.168.1.10:8000"); err != nil {
// handle error
} else {
serverAddr = addr
}
// Setup the network and proxy.
var proxy *vnet_proxy.UDPProxy
if true {
// Create vnet WAN with one endpoint, please read from
// https://github.com/pion/transport/tree/master/vnet#example-wan-with-one-endpoint-vnet
router, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "0.0.0.0/0",
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
if err != nil {
// handle error
}
// Create a network and add to router, for example, for client.
clientNetwork = vnet.NewNet(&vnet.NetConfig{
StaticIP: "10.0.0.11",
})
if err = router.AddNet(clientNetwork); err != nil {
// handle error
}
// Start the router.
if err = router.Start(); err != nil {
// handle error
}
defer router.Stop() // nolint:errcheck
// Create a proxy, bind to the router.
proxy, err = vnet_proxy.NewProxy(router)
if err != nil {
// handle error
}
defer proxy.Close() // nolint:errcheck
}
if true {
// Start to proxy some addresses, clientNetwork is a hit for proxy,
// that the client in vnet is from this network.
if err := proxy.Proxy(clientNetwork, serverAddr); err != nil {
// handle error
}
// Now, all packets from client, will be proxy to real server, vice versa.
client0, err := clientNetwork.ListenPacket("udp4", "10.0.0.11:5787")
if err != nil {
// handle error
}
_, _ = client0.WriteTo([]byte("Hello"), serverAddr)
}
if true {
// It's ok to proxy multiple times, for example, the publisher and player
// might need to proxy when got answer.
if err := proxy.Proxy(clientNetwork, serverAddr); err != nil {
// handle error
}
client1, err := clientNetwork.ListenPacket("udp4", "10.0.0.11:5788")
if err != nil {
// handle error
}
_, _ = client1.WriteTo([]byte("Hello"), serverAddr)
}
}
// Proxy one vnet endpoint to one real server endpoint.
// For example:
// vnet(10.0.0.11:5787) => proxy0 => 192.168.1.10:8000
// vnet(10.0.0.11:5788) => proxy1 => 192.168.1.10:8001
// vnet(10.0.0.11:5789) => proxy2 => 192.168.1.10:8002
func ExampleUDPProxyOneToOne() { // nolint:govet
var clientNetwork *vnet.Net
var serverAddr0 *net.UDPAddr
if addr, err := net.ResolveUDPAddr("udp4", "192.168.1.10:8000"); err != nil {
// handle error
} else {
serverAddr0 = addr
}
var serverAddr1 *net.UDPAddr
if addr, err := net.ResolveUDPAddr("udp4", "192.168.1.10:8001"); err != nil {
// handle error
} else {
serverAddr1 = addr
}
var serverAddr2 *net.UDPAddr
if addr, err := net.ResolveUDPAddr("udp4", "192.168.1.10:8002"); err != nil {
// handle error
} else {
serverAddr2 = addr
}
// Setup the network and proxy.
if true {
// Create vnet WAN with one endpoint, please read from
// https://github.com/pion/transport/tree/master/vnet#example-wan-with-one-endpoint-vnet
router, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "0.0.0.0/0",
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
if err != nil {
// handle error
}
// Create a network and add to router, for example, for client.
clientNetwork = vnet.NewNet(&vnet.NetConfig{
StaticIP: "10.0.0.11",
})
if err = router.AddNet(clientNetwork); err != nil {
// handle error
}
// Start the router.
if err = router.Start(); err != nil {
// handle error
}
defer router.Stop() // nolint:errcheck
// Create a proxy, bind to the router.
proxy, err := vnet_proxy.NewProxy(router)
if err != nil {
// handle error
}
defer proxy.Close() // nolint:errcheck
// Start to proxy some addresses, clientNetwork is a hit for proxy,
// that the client in vnet is from this network.
if err := proxy.Proxy(clientNetwork, serverAddr0); err != nil {
// handle error
}
if err := proxy.Proxy(clientNetwork, serverAddr1); err != nil {
// handle error
}
if err := proxy.Proxy(clientNetwork, serverAddr2); err != nil {
// handle error
}
}
// Now, all packets from client, will be proxy to real server, vice versa.
client0, err := clientNetwork.ListenPacket("udp4", "10.0.0.11:5787")
if err != nil {
// handle error
}
_, _ = client0.WriteTo([]byte("Hello"), serverAddr0)
client1, err := clientNetwork.ListenPacket("udp4", "10.0.0.11:5788")
if err != nil {
// handle error
}
_, _ = client1.WriteTo([]byte("Hello"), serverAddr1)
client2, err := clientNetwork.ListenPacket("udp4", "10.0.0.11:5789")
if err != nil {
// handle error
}
_, _ = client2.WriteTo([]byte("Hello"), serverAddr2)
}

View file

@ -0,0 +1,222 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 srs-bench(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package vnet
import (
"net"
"sync"
"time"
"github.com/pion/transport/vnet"
)
// A UDP proxy between real server(net.UDPConn) and vnet.UDPConn.
//
// High level design:
// ..............................................
// : Virtual Network (vnet) :
// : :
// +-------+ * 1 +----+ +--------+ :
// | :App |------------>|:Net|--o<-----|:Router | .............................
// +-------+ +----+ | | : UDPProxy :
// : | | +----+ +---------+ +---------+ +--------+
// : | |--->o--|:Net|-->o-| vnet. |-->o-| net. |--->-| :Real |
// : | | +----+ | UDPConn | | UDPConn | | Server |
// : | | : +---------+ +---------+ +--------+
// : | | ............................:
// : +--------+ :
// ...............................................
//
// The whole big picture:
// ......................................
// : Virtual Network (vnet) :
// : :
// +-------+ * 1 +----+ +--------+ :
// | :App |------------>|:Net|--o<-----|:Router | .............................
// +-------+ +----+ | | : UDPProxy :
// +-----------+ * 1 +----+ | | +----+ +---------+ +---------+ +--------+
// |:STUNServer|-------->|:Net|--o<-----| |--->o--|:Net|-->o-| vnet. |-->o-| net. |--->-| :Real |
// +-----------+ +----+ | | +----+ | UDPConn | | UDPConn | | Server |
// +-----------+ * 1 +----+ | | : +---------+ +---------+ +--------+
// |:TURNServer|-------->|:Net|--o<-----| | ............................:
// +-----------+ +----+ [1] | | :
// : 1 | | 1 <<has>> :
// : +---<>| |<>----+ [2] :
// : | +--------+ | :
// To form | *| v 0..1 :
// a subnet tree | o [3] +-----+ :
// : | ^ |:NAT | :
// : | | +-----+ :
// : +-------+ :
// ......................................
type UDPProxy struct {
// The router bind to.
router *vnet.Router
// Each vnet source, bind to a real socket to server.
// key is real server addr, which is net.Addr
// value is *aUDPProxyWorker
workers sync.Map
// For each endpoint, we never know when to start and stop proxy,
// so we stop the endpoint when timeout.
timeout time.Duration
// For utest, to mock the target real server.
// Optional, use the address of received client packet.
mockRealServerAddr *net.UDPAddr
}
// NewProxy create a proxy, the router for this proxy belongs/bind to. If need to proxy for
// please create a new proxy for each router. For all addresses we proxy, we will create a
// vnet.Net in this router and proxy all packets.
func NewProxy(router *vnet.Router) (*UDPProxy, error) {
v := &UDPProxy{router: router, timeout: 2 * time.Minute}
return v, nil
}
// Close the proxy, stop all workers.
func (v *UDPProxy) Close() error {
// nolint:godox // TODO: FIXME: Do cleanup.
return nil
}
// Proxy starts a worker for server, ignore if already started.
func (v *UDPProxy) Proxy(client *vnet.Net, server *net.UDPAddr) error {
// Note that even if the worker exists, it's also ok to create a same worker,
// because the router will use the last one, and the real server will see a address
// change event after we switch to the next worker.
if _, ok := v.workers.Load(server.String()); ok {
// nolint:godox // TODO: Need to restart the stopped worker?
return nil
}
// Not exists, create a new one.
worker := &aUDPProxyWorker{
router: v.router, mockRealServerAddr: v.mockRealServerAddr,
}
v.workers.Store(server.String(), worker)
return worker.Proxy(client, server)
}
// A proxy worker for a specified proxy server.
type aUDPProxyWorker struct {
router *vnet.Router
mockRealServerAddr *net.UDPAddr
// Each vnet source, bind to a real socket to server.
// key is vnet client addr, which is net.Addr
// value is *net.UDPConn
endpoints sync.Map
}
func (v *aUDPProxyWorker) Proxy(client *vnet.Net, serverAddr *net.UDPAddr) error { // nolint:gocognit
// Create vnet for real server by serverAddr.
nw := vnet.NewNet(&vnet.NetConfig{
StaticIP: serverAddr.IP.String(),
})
if err := v.router.AddNet(nw); err != nil {
return err
}
// We must create a "same" vnet.UDPConn as the net.UDPConn,
// which has the same ip:port, to copy packets between them.
vnetSocket, err := nw.ListenUDP("udp4", serverAddr)
if err != nil {
return err
}
// Start a proxy goroutine.
var findEndpointBy func(addr net.Addr) (*net.UDPConn, error)
// nolint:godox // TODO: FIXME: Do cleanup.
go func() {
buf := make([]byte, 1500)
for {
n, addr, err := vnetSocket.ReadFrom(buf)
if err != nil {
return
}
if n <= 0 || addr == nil {
continue // Drop packet
}
realSocket, err := findEndpointBy(addr)
if err != nil {
continue // Drop packet.
}
if _, err := realSocket.Write(buf[:n]); err != nil {
return
}
}
}()
// Got new vnet client, start a new endpoint.
findEndpointBy = func(addr net.Addr) (*net.UDPConn, error) {
// Exists binding.
if value, ok := v.endpoints.Load(addr.String()); ok {
// Exists endpoint, reuse it.
return value.(*net.UDPConn), nil
}
// The real server we proxy to, for utest to mock it.
realAddr := serverAddr
if v.mockRealServerAddr != nil {
realAddr = v.mockRealServerAddr
}
// Got new vnet client, create new endpoint.
realSocket, err := net.DialUDP("udp4", nil, realAddr)
if err != nil {
return nil, err
}
// Bind address.
v.endpoints.Store(addr.String(), realSocket)
// Got packet from real serverAddr, we should proxy it to vnet.
// nolint:godox // TODO: FIXME: Do cleanup.
go func(vnetClientAddr net.Addr) {
buf := make([]byte, 1500)
for {
n, _, err := realSocket.ReadFrom(buf)
if err != nil {
return
}
if n <= 0 {
continue // Drop packet
}
if _, err := vnetSocket.WriteTo(buf[:n], vnetClientAddr); err != nil {
return
}
}
}(addr)
return realSocket, nil
}
return nil
}

View file

@ -0,0 +1,61 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 srs-bench(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package vnet
import (
"net"
)
func (v *UDPProxy) Deliver(sourceAddr, destAddr net.Addr, b []byte) (nn int, err error) {
v.workers.Range(func(key, value interface{}) bool {
if nn, err := value.(*aUDPProxyWorker).Deliver(sourceAddr, destAddr, b); err != nil {
return false // Fail, abort.
} else if nn == len(b) {
return false // Done.
}
return true // Deliver by next worker.
})
return
}
func (v *aUDPProxyWorker) Deliver(sourceAddr, destAddr net.Addr, b []byte) (nn int, err error) {
addr, ok := sourceAddr.(*net.UDPAddr)
if !ok {
return 0, nil
}
// TODO: Support deliver packet from real server to vnet.
// If packet is from vent, proxy to real server.
var realSocket *net.UDPConn
if value, ok := v.endpoints.Load(addr.String()); !ok {
return 0, nil
} else {
realSocket = value.(*net.UDPConn)
}
// Send to real server.
if _, err := realSocket.Write(b); err != nil {
return 0, err
}
return len(b), nil
}

View file

@ -0,0 +1,184 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 srs-bench(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package vnet
import (
"context"
"fmt"
"github.com/pion/logging"
"github.com/pion/transport/vnet"
"net"
"sync"
"testing"
"time"
)
// vnet client:
// 10.0.0.11:5787
// proxy to real server:
// 192.168.1.10:8000
func TestUDPProxyDirectDeliver(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var r0, r1, r2 error
defer func() {
if r0 != nil || r1 != nil || r2 != nil {
t.Errorf("fail for ctx=%v, r0=%v, r1=%v, r2=%v", ctx.Err(), r0, r1, r2)
}
}()
var wg sync.WaitGroup
defer wg.Wait()
// Timeout, fail
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-time.After(time.Duration(*testTimeout) * time.Millisecond):
r2 = fmt.Errorf("timeout")
}
}()
// For utest, we always proxy vnet packets to the random port we listen to.
mockServer := NewMockUDPEchoServer()
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
if err := mockServer.doMockUDPServer(ctx); err != nil {
r0 = err
}
}()
// Create a vent and proxy.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
// When real server is ready, start the vnet test.
select {
case <-ctx.Done():
return
case <-mockServer.realServerReady.Done():
}
doVnetProxy := func() error {
router, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "0.0.0.0/0",
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
if err != nil {
return err
}
clientNetwork := vnet.NewNet(&vnet.NetConfig{
StaticIP: "10.0.0.11",
})
if err = router.AddNet(clientNetwork); err != nil {
return err
}
if err := router.Start(); err != nil {
return err
}
defer router.Stop()
proxy, err := NewProxy(router)
if err != nil {
return err
}
defer proxy.Close()
// For utest, mock the target real server.
proxy.mockRealServerAddr = mockServer.realServerAddr
// The real server address to proxy to.
// Note that for utest, we will proxy to a local address.
serverAddr, err := net.ResolveUDPAddr("udp4", "192.168.1.10:8000")
if err != nil {
return err
}
if err := proxy.Proxy(clientNetwork, serverAddr); err != nil {
return err
}
// Now, all packets from client, will be proxy to real server, vice versa.
client, err := clientNetwork.ListenPacket("udp4", "10.0.0.11:5787")
if err != nil {
return err
}
// When system quit, interrupt client.
selfKill, selfKillCancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
selfKillCancel()
client.Close()
}()
// Write by vnet client.
if _, err := client.WriteTo([]byte("Hello"), serverAddr); err != nil {
return err
}
buf := make([]byte, 1500)
if n, addr, err := client.ReadFrom(buf); err != nil {
if selfKill.Err() == context.Canceled {
return nil
}
return err
} else if n != 5 || addr == nil {
return fmt.Errorf("n=%v, addr=%v", n, addr)
} else if string(buf[:n]) != "Hello" {
return fmt.Errorf("data %v", buf[:n])
}
// Directly write, simulate the ARQ packet.
// We should got the echo packet also.
if _, err := proxy.Deliver(client.LocalAddr(), serverAddr, []byte("Hello")); err != nil {
return err
}
if n, addr, err := client.ReadFrom(buf); err != nil {
if selfKill.Err() == context.Canceled {
return nil
}
return err
} else if n != 5 || addr == nil {
return fmt.Errorf("n=%v, addr=%v", n, addr)
} else if string(buf[:n]) != "Hello" {
return fmt.Errorf("data %v", buf[:n])
}
return err
}
if err := doVnetProxy(); err != nil {
r1 = err
}
}()
}

View file

@ -0,0 +1,615 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 srs-bench(ossrs)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package vnet
import (
"context"
"errors"
"flag"
"fmt"
"net"
"os"
"sync"
"testing"
"time"
"github.com/pion/logging"
"github.com/pion/transport/vnet"
)
type MockUDPEchoServer struct {
realServerAddr *net.UDPAddr
realServerReady context.Context
realServerReadyCancel context.CancelFunc
}
func NewMockUDPEchoServer() *MockUDPEchoServer {
v := &MockUDPEchoServer{}
v.realServerReady, v.realServerReadyCancel = context.WithCancel(context.Background())
return v
}
func (v *MockUDPEchoServer) doMockUDPServer(ctx context.Context) error {
// Listen to a random port.
laddr, err := net.ResolveUDPAddr("udp4", "127.0.0.1:0")
if err != nil {
return err
}
conn, err := net.ListenUDP("udp4", laddr)
if err != nil {
return err
}
v.realServerAddr = conn.LocalAddr().(*net.UDPAddr)
v.realServerReadyCancel()
// When system quit, interrupt client.
selfKill, selfKillCancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
selfKillCancel()
_ = conn.Close()
}()
// Note that if they has the same ID, the address should not changed.
addrs := make(map[string]net.Addr)
// Start an echo UDP server.
buf := make([]byte, 1500)
for ctx.Err() == nil {
n, addr, err := conn.ReadFrom(buf)
if err != nil {
if errors.Is(selfKill.Err(), context.Canceled) {
return nil
}
return err
} else if n == 0 || addr == nil {
return fmt.Errorf("n=%v, addr=%v", n, addr) // nolint:goerr113
} else if nn, err := conn.WriteTo(buf[:n], addr); err != nil {
return err
} else if nn != n {
return fmt.Errorf("nn=%v, n=%v", nn, n) // nolint:goerr113
}
// Check the address, shold not change, use content as ID.
clientID := string(buf[:n])
if oldAddr, ok := addrs[clientID]; ok && oldAddr.String() != addr.String() {
return fmt.Errorf("address change %v to %v", oldAddr.String(), addr.String()) // nolint:goerr113
}
addrs[clientID] = addr
}
return nil
}
var testTimeout = flag.Int("timeout", 5000, "For each case, the timeout in ms") // nolint:gochecknoglobals
func TestMain(m *testing.M) {
flag.Parse()
os.Exit(m.Run())
}
// vnet client:
// 10.0.0.11:5787
// proxy to real server:
// 192.168.1.10:8000
func TestUDPProxyOne2One(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var r0, r1, r2 error
defer func() {
if r0 != nil || r1 != nil || r2 != nil {
t.Errorf("fail for ctx=%v, r0=%v, r1=%v, r2=%v", ctx.Err(), r0, r1, r2)
}
}()
var wg sync.WaitGroup
defer wg.Wait()
// Timeout, fail
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-time.After(time.Duration(*testTimeout) * time.Millisecond):
r2 = fmt.Errorf("timeout") // nolint:goerr113
}
}()
// For utest, we always proxy vnet packets to the random port we listen to.
mockServer := NewMockUDPEchoServer()
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
if err := mockServer.doMockUDPServer(ctx); err != nil {
r0 = err
}
}()
// Create a vent and proxy.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
// When real server is ready, start the vnet test.
select {
case <-ctx.Done():
return
case <-mockServer.realServerReady.Done():
}
doVnetProxy := func() error {
router, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "0.0.0.0/0",
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
if err != nil {
return err
}
clientNetwork := vnet.NewNet(&vnet.NetConfig{
StaticIP: "10.0.0.11",
})
if err = router.AddNet(clientNetwork); err != nil {
return err
}
if err = router.Start(); err != nil {
return err
}
defer router.Stop() // nolint:errcheck
proxy, err := NewProxy(router)
if err != nil {
return err
}
defer proxy.Close() // nolint:errcheck
// For utest, mock the target real server.
proxy.mockRealServerAddr = mockServer.realServerAddr
// The real server address to proxy to.
// Note that for utest, we will proxy to a local address.
serverAddr, err := net.ResolveUDPAddr("udp4", "192.168.1.10:8000")
if err != nil {
return err
}
if err = proxy.Proxy(clientNetwork, serverAddr); err != nil {
return err
}
// Now, all packets from client, will be proxy to real server, vice versa.
client, err := clientNetwork.ListenPacket("udp4", "10.0.0.11:5787")
if err != nil {
return err
}
// When system quit, interrupt client.
selfKill, selfKillCancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
selfKillCancel()
_ = client.Close() // nolint:errcheck
}()
for i := 0; i < 10; i++ {
if _, err = client.WriteTo([]byte("Hello"), serverAddr); err != nil {
return err
}
var n int
var addr net.Addr
buf := make([]byte, 1500)
if n, addr, err = client.ReadFrom(buf); err != nil { // nolint:gocritic
if errors.Is(selfKill.Err(), context.Canceled) {
return nil
}
return err
} else if n != 5 || addr == nil {
return fmt.Errorf("n=%v, addr=%v", n, addr) // nolint:goerr113
} else if string(buf[:n]) != "Hello" {
return fmt.Errorf("data %v", buf[:n]) // nolint:goerr113
}
// Wait for awhile for each UDP packet, to simulate real network.
select {
case <-ctx.Done():
return nil
case <-time.After(30 * time.Millisecond):
}
}
return err
}
if err := doVnetProxy(); err != nil {
r1 = err
}
}()
}
// vnet client:
// 10.0.0.11:5787
// 10.0.0.11:5788
// proxy to real server:
// 192.168.1.10:8000
func TestUDPProxyTwo2One(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var r0, r1, r2, r3 error
defer func() {
if r0 != nil || r1 != nil || r2 != nil || r3 != nil {
t.Errorf("fail for ctx=%v, r0=%v, r1=%v, r2=%v, r3=%v", ctx.Err(), r0, r1, r2, r3)
}
}()
var wg sync.WaitGroup
defer wg.Wait()
// Timeout, fail
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-time.After(time.Duration(*testTimeout) * time.Millisecond):
r2 = fmt.Errorf("timeout") // nolint:goerr113
}
}()
// For utest, we always proxy vnet packets to the random port we listen to.
mockServer := NewMockUDPEchoServer()
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
if err := mockServer.doMockUDPServer(ctx); err != nil {
r0 = err
}
}()
// Create a vent and proxy.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
// When real server is ready, start the vnet test.
select {
case <-ctx.Done():
return
case <-mockServer.realServerReady.Done():
}
doVnetProxy := func() error {
router, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "0.0.0.0/0",
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
if err != nil {
return err
}
clientNetwork := vnet.NewNet(&vnet.NetConfig{
StaticIP: "10.0.0.11",
})
if err = router.AddNet(clientNetwork); err != nil {
return err
}
if err = router.Start(); err != nil {
return err
}
defer router.Stop() // nolint:errcheck
proxy, err := NewProxy(router)
if err != nil {
return err
}
defer proxy.Close() // nolint:errcheck
// For utest, mock the target real server.
proxy.mockRealServerAddr = mockServer.realServerAddr
// The real server address to proxy to.
// Note that for utest, we will proxy to a local address.
serverAddr, err := net.ResolveUDPAddr("udp4", "192.168.1.10:8000")
if err != nil {
return err
}
if err = proxy.Proxy(clientNetwork, serverAddr); err != nil {
return err
}
handClient := func(address, echoData string) error {
// Now, all packets from client, will be proxy to real server, vice versa.
client, err := clientNetwork.ListenPacket("udp4", address) // nolint:govet
if err != nil {
return err
}
// When system quit, interrupt client.
selfKill, selfKillCancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
selfKillCancel()
_ = client.Close()
}()
for i := 0; i < 10; i++ {
if _, err := client.WriteTo([]byte(echoData), serverAddr); err != nil { // nolint:govet
return err
}
var n int
var addr net.Addr
buf := make([]byte, 1400)
if n, addr, err = client.ReadFrom(buf); err != nil { // nolint:gocritic
if errors.Is(selfKill.Err(), context.Canceled) {
return nil
}
return err
} else if n != len(echoData) || addr == nil {
return fmt.Errorf("n=%v, addr=%v", n, addr) // nolint:goerr113
} else if string(buf[:n]) != echoData {
return fmt.Errorf("check data %v", buf[:n]) // nolint:goerr113
}
// Wait for awhile for each UDP packet, to simulate real network.
select {
case <-ctx.Done():
return nil
case <-time.After(30 * time.Millisecond):
}
}
return nil
}
client0, client0Cancel := context.WithCancel(context.Background())
go func() {
defer client0Cancel()
address := "10.0.0.11:5787"
if err := handClient(address, "Hello"); err != nil { // nolint:govet
r3 = fmt.Errorf("client %v err %v", address, err) // nolint:goerr113
}
}()
client1, client1Cancel := context.WithCancel(context.Background())
go func() {
defer client1Cancel()
address := "10.0.0.11:5788"
if err := handClient(address, "World"); err != nil { // nolint:govet
r3 = fmt.Errorf("client %v err %v", address, err) // nolint:goerr113
}
}()
select {
case <-ctx.Done():
case <-client0.Done():
case <-client1.Done():
}
return err
}
if err := doVnetProxy(); err != nil {
r1 = err
}
}()
}
// vnet client:
// 10.0.0.11:5787
// proxy to real server:
// 192.168.1.10:8000
//
// vnet client:
// 10.0.0.11:5788
// proxy to real server:
// 192.168.1.10:8000
func TestUDPProxyProxyTwice(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var r0, r1, r2, r3 error
defer func() {
if r0 != nil || r1 != nil || r2 != nil || r3 != nil {
t.Errorf("fail for ctx=%v, r0=%v, r1=%v, r2=%v, r3=%v", ctx.Err(), r0, r1, r2, r3)
}
}()
var wg sync.WaitGroup
defer wg.Wait()
// Timeout, fail
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-time.After(time.Duration(*testTimeout) * time.Millisecond):
r2 = fmt.Errorf("timeout") // nolint:goerr113
}
}()
// For utest, we always proxy vnet packets to the random port we listen to.
mockServer := NewMockUDPEchoServer()
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
if err := mockServer.doMockUDPServer(ctx); err != nil {
r0 = err
}
}()
// Create a vent and proxy.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
// When real server is ready, start the vnet test.
select {
case <-ctx.Done():
return
case <-mockServer.realServerReady.Done():
}
doVnetProxy := func() error {
router, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "0.0.0.0/0",
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
if err != nil {
return err
}
clientNetwork := vnet.NewNet(&vnet.NetConfig{
StaticIP: "10.0.0.11",
})
if err = router.AddNet(clientNetwork); err != nil {
return err
}
if err = router.Start(); err != nil {
return err
}
defer router.Stop() // nolint:errcheck
proxy, err := NewProxy(router)
if err != nil {
return err
}
defer proxy.Close() // nolint:errcheck
// For utest, mock the target real server.
proxy.mockRealServerAddr = mockServer.realServerAddr
// The real server address to proxy to.
// Note that for utest, we will proxy to a local address.
serverAddr, err := net.ResolveUDPAddr("udp4", "192.168.1.10:8000")
if err != nil {
return err
}
handClient := func(address, echoData string) error {
// We proxy multiple times, for example, in publisher and player, both call
// the proxy when got answer.
if err := proxy.Proxy(clientNetwork, serverAddr); err != nil { // nolint:govet
return err
}
// Now, all packets from client, will be proxy to real server, vice versa.
client, err := clientNetwork.ListenPacket("udp4", address) // nolint:govet
if err != nil {
return err
}
// When system quit, interrupt client.
selfKill, selfKillCancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
selfKillCancel()
_ = client.Close() // nolint:errcheck
}()
for i := 0; i < 10; i++ {
if _, err = client.WriteTo([]byte(echoData), serverAddr); err != nil {
return err
}
buf := make([]byte, 1500)
if n, addr, err := client.ReadFrom(buf); err != nil { // nolint:gocritic,govet
if errors.Is(selfKill.Err(), context.Canceled) {
return nil
}
return err
} else if n != len(echoData) || addr == nil {
return fmt.Errorf("n=%v, addr=%v", n, addr) // nolint:goerr113
} else if string(buf[:n]) != echoData {
return fmt.Errorf("verify data %v", buf[:n]) // nolint:goerr113
}
// Wait for awhile for each UDP packet, to simulate real network.
select {
case <-ctx.Done():
return nil
case <-time.After(30 * time.Millisecond):
}
}
return nil
}
client0, client0Cancel := context.WithCancel(context.Background())
go func() {
defer client0Cancel()
address := "10.0.0.11:5787"
if err = handClient(address, "Hello"); err != nil {
r3 = fmt.Errorf("client %v err %v", address, err) // nolint:goerr113
}
}()
client1, client1Cancel := context.WithCancel(context.Background())
go func() {
defer client1Cancel()
// Slower than client0, 60ms.
// To simulate the real player or publisher, might not start at the same time.
select {
case <-ctx.Done():
return
case <-time.After(150 * time.Millisecond):
}
address := "10.0.0.11:5788"
if err = handClient(address, "World"); err != nil {
r3 = fmt.Errorf("client %v err %v", address, err) // nolint:goerr113
}
}()
select {
case <-ctx.Done():
case <-client0.Done():
case <-client1.Done():
}
return err
}
if err := doVnetProxy(); err != nil {
r1 = err
}
}()
}