1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Refine the regression test tool, add missing files

This commit is contained in:
winlin 2021-03-04 14:03:01 +08:00
parent a29d0a6a24
commit bb3bd1705e
8 changed files with 1333 additions and 3 deletions

View file

@ -17,7 +17,7 @@ bench: ./objs/srs_bench
test: ./objs/srs_test
./objs/srs_test: .format.txt *.go rtc/*.go srs/*.go Makefile
go test ./srs -c -o ./objs/srs_test
go test ./srs -mod=vendor -c -o ./objs/srs_test
help:
@echo "Usage: make [bench|test]"

View file

@ -113,7 +113,7 @@ fi
然后运行回归测试用例,如果只跑一次,可以直接运行:
```bash
go test ./srs -v
go test ./srs -mod=vendor -v
```
也可以用make编译出重复使用的二进制
@ -125,7 +125,7 @@ make test && ./objs/srs_test -test.v
可以给回归测试传参数,这样可以测试不同的序列,比如:
```bash
go test ./srs -v -srs-server=127.0.0.1
go test ./srs -mod=vendor -v -srs-server=127.0.0.1
# Or
make test && ./objs/srs_test -test.v -srs-server=127.0.0.1
```

262
trunk/3rdparty/srs-bench/srs/player.go vendored Normal file
View file

@ -0,0 +1,262 @@
package srs
import (
"context"
"fmt"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/pion/webrtc/v3/pkg/media/h264writer"
"github.com/pion/webrtc/v3/pkg/media/ivfwriter"
"github.com/pion/webrtc/v3/pkg/media/oggwriter"
"strings"
"sync"
"time"
)
// @see https://github.com/pion/webrtc/blob/master/examples/save-to-disk/main.go
func StartPlay(ctx context.Context, r, dumpAudio, dumpVideo string, enableAudioLevel, enableTWCC bool, pli int) error {
ctx = logger.WithContext(ctx)
logger.Tf(ctx, "Start play url=%v, audio=%v, video=%v, audio-level=%v, twcc=%v",
r, dumpAudio, dumpVideo, enableAudioLevel, enableTWCC)
// For audio-level.
webrtcNewPeerConnection := func(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return nil, err
}
for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.TransportCCURI} {
if extension == sdp.TransportCCURI && !enableTWCC {
continue
}
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeVideo); err != nil {
return nil, err
}
}
// https://github.com/pion/ion/issues/130
// https://github.com/pion/ion-sfu/pull/373/files#diff-6f42c5ac6f8192dd03e5a17e9d109e90cb76b1a4a7973be6ce44a89ffd1b5d18R73
for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.AudioLevelURI} {
if extension == sdp.AudioLevelURI && !enableAudioLevel {
continue
}
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeAudio); err != nil {
return nil, err
}
}
i := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
return nil, err
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
return api.NewPeerConnection(configuration)
}
pc, err := webrtcNewPeerConnection(webrtc.Configuration{})
if err != nil {
return errors.Wrapf(err, "Create PC")
}
defer pc.Close()
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
offer, err := pc.CreateOffer(nil)
if err != nil {
return errors.Wrapf(err, "Create Offer")
}
if err := pc.SetLocalDescription(offer); err != nil {
return errors.Wrapf(err, "Set offer %v", offer)
}
answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP)
if err != nil {
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
}
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer, SDP: answer,
}); err != nil {
return errors.Wrapf(err, "Set answer %v", answer)
}
var da media.Writer
var dv_vp8 media.Writer
var dv_h264 media.Writer
defer func() {
if da != nil {
da.Close()
}
if dv_vp8 != nil {
dv_vp8.Close()
}
if dv_h264 != nil {
dv_h264.Close()
}
}()
handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error {
// Send a PLI on an interval so that the publisher is pushing a keyframe
go func() {
if track.Kind() == webrtc.RTPCodecTypeAudio {
return
}
if pli <= 0 {
return
}
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(pli) * time.Second):
_ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
MediaSSRC: uint32(track.SSRC()),
}})
}
}
}()
codec := track.Codec()
trackDesc := fmt.Sprintf("channels=%v", codec.Channels)
if track.Kind() == webrtc.RTPCodecTypeVideo {
trackDesc = fmt.Sprintf("fmtp=%v", codec.SDPFmtpLine)
}
if headers := receiver.GetParameters().HeaderExtensions; len(headers) > 0 {
trackDesc = fmt.Sprintf("%v, header=%v", trackDesc, headers)
}
logger.Tf(ctx, "Got track %v, pt=%v, tbn=%v, %v",
codec.MimeType, codec.PayloadType, codec.ClockRate, trackDesc)
if codec.MimeType == "audio/opus" {
if da == nil && dumpAudio != "" {
if da, err = oggwriter.New(dumpAudio, codec.ClockRate, codec.Channels); err != nil {
return errors.Wrapf(err, "New audio dumper")
}
logger.Tf(ctx, "Open ogg writer file=%v, tbn=%v, channels=%v",
dumpAudio, codec.ClockRate, codec.Channels)
}
if err = writeTrackToDisk(ctx, da, track); err != nil {
return errors.Wrapf(err, "Write audio disk")
}
} else if codec.MimeType == "video/VP8" {
if dumpVideo != "" && !strings.HasSuffix(dumpVideo, ".ivf") {
return errors.Errorf("%v should be .ivf for VP8", dumpVideo)
}
if dv_vp8 == nil && dumpVideo != "" {
if dv_vp8, err = ivfwriter.New(dumpVideo); err != nil {
return errors.Wrapf(err, "New video dumper")
}
logger.Tf(ctx, "Open ivf writer file=%v", dumpVideo)
}
if err = writeTrackToDisk(ctx, dv_vp8, track); err != nil {
return errors.Wrapf(err, "Write video disk")
}
} else if codec.MimeType == "video/H264" {
if dumpVideo != "" && !strings.HasSuffix(dumpVideo, ".h264") {
return errors.Errorf("%v should be .h264 for H264", dumpVideo)
}
if dv_h264 == nil && dumpVideo != "" {
if dv_h264, err = h264writer.New(dumpVideo); err != nil {
return errors.Wrapf(err, "New video dumper")
}
logger.Tf(ctx, "Open h264 writer file=%v", dumpVideo)
}
if err = writeTrackToDisk(ctx, dv_h264, track); err != nil {
return errors.Wrapf(err, "Write video disk")
}
} else {
logger.Wf(ctx, "Ignore track %v pt=%v", codec.MimeType, codec.PayloadType)
}
return nil
}
ctx, cancel := context.WithCancel(ctx)
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
err = handleTrack(ctx, track, receiver)
if err != nil {
codec := track.Codec()
err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType)
cancel()
}
})
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
logger.If(ctx, "ICE state %v", state)
if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
if ctx.Err() != nil {
return
}
logger.Wf(ctx, "Close for ICE state %v", state)
cancel()
}
})
// Wait for event from context or tracks.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
StatRTC.PeerConnection = pc.GetStats()
}
}
}()
wg.Wait()
return err
}
func writeTrackToDisk(ctx context.Context, w media.Writer, track *webrtc.TrackRemote) error {
for ctx.Err() == nil {
pkt, _, err := track.ReadRTP()
if err != nil {
if ctx.Err() != nil {
return nil
}
return errors.Wrapf(err, "Read RTP")
}
if w == nil {
continue
}
if err := w.WriteRTP(pkt); err != nil {
if len(pkt.Payload) <= 2 {
continue
}
logger.Wf(ctx, "Ignore write RTP %vB err %+v", len(pkt.Payload), err)
}
}
return ctx.Err()
}

View file

@ -0,0 +1,429 @@
package srs
import (
"context"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/ossrs/srs-bench/rtc"
"github.com/pion/interceptor"
"github.com/pion/rtp"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/pion/webrtc/v3/pkg/media/h264reader"
"github.com/pion/webrtc/v3/pkg/media/oggreader"
"io"
"os"
"strings"
"sync"
"time"
)
// @see https://github.com/pion/webrtc/blob/master/examples/play-from-disk/main.go
func StartPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC bool) error {
ctx = logger.WithContext(ctx)
logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v, audio-level=%v, twcc=%v",
r, sourceAudio, sourceVideo, fps, enableAudioLevel, enableTWCC)
// For audio-level.
webrtcNewPeerConnection := func(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return nil, err
}
for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.TransportCCURI} {
if extension == sdp.TransportCCURI && !enableTWCC {
continue
}
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeVideo); err != nil {
return nil, err
}
}
// https://github.com/pion/ion/issues/130
// https://github.com/pion/ion-sfu/pull/373/files#diff-6f42c5ac6f8192dd03e5a17e9d109e90cb76b1a4a7973be6ce44a89ffd1b5d18R73
for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.AudioLevelURI} {
if extension == sdp.AudioLevelURI && !enableAudioLevel {
continue
}
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeAudio); err != nil {
return nil, err
}
}
i := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
return nil, err
}
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
return api.NewPeerConnection(configuration)
}
pc, err := webrtcNewPeerConnection(webrtc.Configuration{})
if err != nil {
return errors.Wrapf(err, "Create PC")
}
defer pc.Close()
var sVideoTrack *rtc.TrackLocalStaticSample
var sVideoSender *webrtc.RTPSender
if sourceVideo != "" {
mimeType, trackID := "video/H264", "video"
if strings.HasSuffix(sourceVideo, ".ivf") {
mimeType = "video/VP8"
}
sVideoTrack, err = rtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 90000}, trackID, "pion",
)
if err != nil {
return errors.Wrapf(err, "Create video track")
}
sVideoSender, err = pc.AddTrack(sVideoTrack)
if err != nil {
return errors.Wrapf(err, "Add video track")
}
sVideoSender.Stop()
}
var sAudioTrack *rtc.TrackLocalStaticSample
var sAudioSender *webrtc.RTPSender
if sourceAudio != "" {
mimeType, trackID := "audio/opus", "audio"
sAudioTrack, err = rtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 48000, Channels: 2}, trackID, "pion",
)
if err != nil {
return errors.Wrapf(err, "Create audio track")
}
sAudioSender, err = pc.AddTrack(sAudioTrack)
if err != nil {
return errors.Wrapf(err, "Add audio track")
}
defer sAudioSender.Stop()
}
offer, err := pc.CreateOffer(nil)
if err != nil {
return errors.Wrapf(err, "Create Offer")
}
if err := pc.SetLocalDescription(offer); err != nil {
return errors.Wrapf(err, "Set offer %v", offer)
}
answer, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP)
if err != nil {
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
}
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer, SDP: answer,
}); err != nil {
return errors.Wrapf(err, "Set answer %v", answer)
}
logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState())
// ICE state management.
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
logger.Tf(ctx, "ICE state %v", state)
})
pc.OnSignalingStateChange(func(state webrtc.SignalingState) {
logger.Tf(ctx, "Signaling state %v", state)
})
sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) {
logger.Tf(ctx, "DTLS state %v", state)
})
ctx, cancel := context.WithCancel(ctx)
pcDone, pcDoneCancel := context.WithCancel(context.Background())
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
logger.Tf(ctx, "PC state %v", state)
if state == webrtc.PeerConnectionStateConnected {
pcDoneCancel()
}
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed {
if ctx.Err() != nil {
return
}
logger.Wf(ctx, "Close for PC state %v", state)
cancel()
}
})
// Wait for event from context or tracks.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if sAudioSender == nil {
return
}
select {
case <-ctx.Done():
case <-pcDone.Done():
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start read audio packets")
}
buf := make([]byte, 1500)
for ctx.Err() == nil {
if _, _, err := sAudioSender.Read(buf); err != nil {
return
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if sAudioTrack == nil {
return
}
select {
case <-ctx.Done():
case <-pcDone.Done():
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest audio %v", sourceAudio)
}
for ctx.Err() == nil {
if err := readAudioTrackFromDisk(ctx, sourceAudio, sAudioSender, sAudioTrack); err != nil {
if errors.Cause(err) == io.EOF {
logger.Tf(ctx, "EOF, restart ingest audio %v", sourceAudio)
continue
}
logger.Wf(ctx, "Ignore audio err %+v", err)
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if sVideoSender == nil {
return
}
select {
case <-ctx.Done():
case <-pcDone.Done():
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start read video packets")
}
buf := make([]byte, 1500)
for ctx.Err() == nil {
if _, _, err := sVideoSender.Read(buf); err != nil {
return
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if sVideoTrack == nil {
return
}
select {
case <-ctx.Done():
case <-pcDone.Done():
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo)
}
for ctx.Err() == nil {
if err := readVideoTrackFromDisk(ctx, sourceVideo, sVideoSender, fps, sVideoTrack); err != nil {
if errors.Cause(err) == io.EOF {
logger.Tf(ctx, "EOF, restart ingest video %v", sourceVideo)
continue
}
logger.Wf(ctx, "Ignore video err %+v", err)
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
StatRTC.PeerConnection = pc.GetStats()
}
}
}()
wg.Wait()
return nil
}
func readAudioTrackFromDisk(ctx context.Context, source string, sender *webrtc.RTPSender, track *rtc.TrackLocalStaticSample) error {
f, err := os.Open(source)
if err != nil {
return errors.Wrapf(err, "Open file %v", source)
}
defer f.Close()
ogg, _, err := oggreader.NewWith(f)
if err != nil {
return errors.Wrapf(err, "Open ogg %v", source)
}
enc := sender.GetParameters().Encodings[0]
codec := sender.GetParameters().Codecs[0]
headers := sender.GetParameters().HeaderExtensions
logger.Tf(ctx, "Audio %v, tbn=%v, channels=%v, ssrc=%v, pt=%v, header=%v",
codec.MimeType, codec.ClockRate, codec.Channels, enc.SSRC, codec.PayloadType, headers)
// Whether should encode the audio-level in RTP header.
var audioLevel *webrtc.RTPHeaderExtensionParameter
for _, h := range headers {
if h.URI == sdp.AudioLevelURI {
audioLevel = &h
}
}
clock := newWallClock()
var lastGranule uint64
for ctx.Err() == nil {
pageData, pageHeader, err := ogg.ParseNextPage()
if err == io.EOF {
return nil
}
if err != nil {
return errors.Wrapf(err, "Read ogg")
}
// The amount of samples is the difference between the last and current timestamp
sampleCount := uint64(pageHeader.GranulePosition - lastGranule)
lastGranule = pageHeader.GranulePosition
sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 * sampleCount / uint64(codec.ClockRate))
// For audio-level, set the extensions if negotiated.
track.OnBeforeWritePacket = func(p *rtp.Packet) {
if audioLevel != nil {
if b, err := new(rtp.AudioLevelExtension).Marshal(); err == nil {
p.SetExtension(uint8(audioLevel.ID), b)
}
}
}
if err = track.WriteSample(media.Sample{Data: pageData, Duration: sampleDuration}); err != nil {
return errors.Wrapf(err, "Write sample")
}
if d := clock.Tick(sampleDuration); d > 0 {
time.Sleep(d)
}
}
return nil
}
func readVideoTrackFromDisk(ctx context.Context, source string, sender *webrtc.RTPSender, fps int, track *rtc.TrackLocalStaticSample) error {
f, err := os.Open(source)
if err != nil {
return errors.Wrapf(err, "Open file %v", source)
}
defer f.Close()
// TODO: FIXME: Support ivf for vp8.
h264, err := h264reader.NewReader(f)
if err != nil {
return errors.Wrapf(err, "Open h264 %v", source)
}
enc := sender.GetParameters().Encodings[0]
codec := sender.GetParameters().Codecs[0]
headers := sender.GetParameters().HeaderExtensions
logger.Tf(ctx, "Video %v, tbn=%v, fps=%v, ssrc=%v, pt=%v, header=%v",
codec.MimeType, codec.ClockRate, fps, enc.SSRC, codec.PayloadType, headers)
clock := newWallClock()
sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 / uint64(fps))
for ctx.Err() == nil {
var sps, pps *h264reader.NAL
var oFrames []*h264reader.NAL
for ctx.Err() == nil {
frame, err := h264.NextNAL()
if err == io.EOF {
return nil
}
if err != nil {
return errors.Wrapf(err, "Read h264")
}
oFrames = append(oFrames, frame)
logger.If(ctx, "NALU %v PictureOrderCount=%v, ForbiddenZeroBit=%v, RefIdc=%v, %v bytes",
frame.UnitType.String(), frame.PictureOrderCount, frame.ForbiddenZeroBit, frame.RefIdc, len(frame.Data))
if frame.UnitType == h264reader.NalUnitTypeSPS {
sps = frame
} else if frame.UnitType == h264reader.NalUnitTypePPS {
pps = frame
} else {
break
}
}
var frames []*h264reader.NAL
// Package SPS/PPS to STAP-A
if sps != nil && pps != nil {
stapA := packageAsSTAPA(sps, pps)
frames = append(frames, stapA)
}
// Append other original frames.
for _, frame := range oFrames {
if frame.UnitType != h264reader.NalUnitTypeSPS && frame.UnitType != h264reader.NalUnitTypePPS {
frames = append(frames, frame)
}
}
// Covert frames to sample(buffers).
for i, frame := range frames {
sample := media.Sample{Data: frame.Data, Duration: sampleDuration}
// Use the sample timestamp for frames.
if i != len(frames)-1 {
sample.Duration = 0
}
// For STAP-A, set marker to false, to make Chrome happy.
track.OnBeforeWritePacket = func(p *rtp.Packet) {
if i < len(frames)-1 {
p.Header.Marker = false
}
}
if err = track.WriteSample(sample); err != nil {
return errors.Wrapf(err, "Write sample")
}
}
if d := clock.Tick(sampleDuration); d > 0 {
time.Sleep(d)
}
}
return nil
}

449
trunk/3rdparty/srs-bench/srs/rtc_test.go vendored Normal file
View file

@ -0,0 +1,449 @@
package srs
import (
"context"
"encoding/json"
"flag"
"fmt"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/ossrs/srs-bench/rtc"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"io"
"io/ioutil"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"
)
var srsSchema = "http"
var srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API")
var srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to")
var srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play")
var srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
var srsTimeout = flag.Int("srs-timeout", 3000, "For each case, the timeout in ms")
var srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.")
var srsPlayOKPackets = flag.Int("srs-play-ok-packets", 10, "If got N packets, it's ok, or fail")
var srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.")
var srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.")
var srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.")
func TestMain(m *testing.M) {
// Should parse it first.
flag.Parse()
// The stream should starts with /, for example, /rtc/regression
if strings.HasPrefix(*srsStream, "/") {
*srsStream = "/" + *srsStream
}
// Generate srs protocol from whether use HTTPS.
if *srsHttps {
srsSchema = "https"
}
// Disable the logger during all tests.
logger.Tf(nil, "sys log %v", *srsLog)
if *srsLog == false {
olw := logger.Switch(ioutil.Discard)
defer func() {
logger.Switch(olw)
}()
}
// Run tests.
os.Exit(m.Run())
}
func TestRTCServerVersion(t *testing.T) {
api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer)
req, err := http.NewRequest("POST", api, nil)
if err != nil {
t.Errorf("Request %v", api)
return
}
res, err := http.DefaultClient.Do(req)
if err != nil {
t.Errorf("Do request %v", api)
return
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Errorf("Read body of %v", api)
return
}
obj := struct {
Code int `json:"code"`
Server string `json:"server"`
Data struct {
Major int `json:"major"`
Minor int `json:"minor"`
Revision int `json:"revision"`
Version string `json:"version"`
} `json:"data"`
}{}
if err := json.Unmarshal(b, &obj); err != nil {
t.Errorf("Parse %v", string(b))
return
}
if obj.Code != 0 {
t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server)
return
}
if obj.Data.Major == 0 && obj.Data.Minor == 0 {
t.Errorf("Invalid version %v", obj.Data)
return
}
}
func TestRTCServerPublishPlay(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithCancel(ctx)
r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
publishReady, publishReadyCancel := context.WithCancel(context.Background())
startPlay := func(ctx context.Context) error {
logger.Tf(ctx, "Start play url=%v", r)
pc, err := webrtc.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return errors.Wrapf(err, "Create PC")
}
defer pc.Close()
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
offer, err := pc.CreateOffer(nil)
if err != nil {
return errors.Wrapf(err, "Create Offer")
}
if err := pc.SetLocalDescription(offer); err != nil {
return errors.Wrapf(err, "Set offer %v", offer)
}
answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP)
if err != nil {
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
}
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer, SDP: answer,
}); err != nil {
return errors.Wrapf(err, "Set answer %v", answer)
}
handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error {
// Send a PLI on an interval so that the publisher is pushing a keyframe
go func() {
if track.Kind() == webrtc.RTPCodecTypeAudio {
return
}
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(*srsPlayPLI) * time.Millisecond):
_ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
MediaSSRC: uint32(track.SSRC()),
}})
}
}
}()
// Try to read packets of track.
for i := 0; i < *srsPlayOKPackets && ctx.Err() == nil; i++ {
_, _, err := track.ReadRTP()
if err != nil {
return errors.Wrapf(err, "Read RTP")
}
}
// Completed.
cancel()
return nil
}
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
err = handleTrack(ctx, track, receiver)
if err != nil {
codec := track.Codec()
err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType)
cancel()
}
})
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
err = errors.Errorf("Close for ICE state %v", state)
cancel()
}
})
<-ctx.Done()
return err
}
startPublish := func(ctx context.Context) error {
sourceVideo := *srsPublishVideo
sourceAudio := *srsPublishAudio
fps := *srsPublishVideoFps
logger.Tf(ctx, "Start publish url=%v, audio=%v, video=%v, fps=%v",
r, sourceAudio, sourceVideo, fps)
pc, err := webrtc.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return errors.Wrapf(err, "Create PC")
}
defer pc.Close()
var sVideoTrack *rtc.TrackLocalStaticSample
var sVideoSender *webrtc.RTPSender
if sourceVideo != "" {
mimeType, trackID := "video/H264", "video"
if strings.HasSuffix(sourceVideo, ".ivf") {
mimeType = "video/VP8"
}
sVideoTrack, err = rtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 90000}, trackID, "pion",
)
if err != nil {
return errors.Wrapf(err, "Create video track")
}
sVideoSender, err = pc.AddTrack(sVideoTrack)
if err != nil {
return errors.Wrapf(err, "Add video track")
}
sVideoSender.Stop()
}
var sAudioTrack *rtc.TrackLocalStaticSample
var sAudioSender *webrtc.RTPSender
if sourceAudio != "" {
mimeType, trackID := "audio/opus", "audio"
sAudioTrack, err = rtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 48000, Channels: 2}, trackID, "pion",
)
if err != nil {
return errors.Wrapf(err, "Create audio track")
}
sAudioSender, err = pc.AddTrack(sAudioTrack)
if err != nil {
return errors.Wrapf(err, "Add audio track")
}
defer sAudioSender.Stop()
}
offer, err := pc.CreateOffer(nil)
if err != nil {
return errors.Wrapf(err, "Create Offer")
}
if err := pc.SetLocalDescription(offer); err != nil {
return errors.Wrapf(err, "Set offer %v", offer)
}
answer, err := apiRtcRequest(ctx, "/rtc/v1/publish", r, offer.SDP)
if err != nil {
return errors.Wrapf(err, "Api request offer=%v", offer.SDP)
}
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer, SDP: answer,
}); err != nil {
return errors.Wrapf(err, "Set answer %v", answer)
}
logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState())
ctx, cancel := context.WithCancel(ctx)
pcDone, pcDoneCancel := context.WithCancel(context.Background())
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
logger.Tf(ctx, "PC state %v", state)
if state == webrtc.PeerConnectionStateConnected {
pcDoneCancel()
publishReadyCancel()
}
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed {
err = errors.Errorf("Close for PC state %v", state)
cancel()
}
})
// Wait for event from context or tracks.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if sAudioSender == nil {
return
}
select {
case <-ctx.Done():
case <-pcDone.Done():
}
buf := make([]byte, 1500)
for ctx.Err() == nil {
if _, _, err := sAudioSender.Read(buf); err != nil {
return
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if sAudioTrack == nil {
return
}
select {
case <-ctx.Done():
case <-pcDone.Done():
}
for ctx.Err() == nil {
if err := readAudioTrackFromDisk(ctx, sourceAudio, sAudioSender, sAudioTrack); err != nil {
if errors.Cause(err) == io.EOF {
logger.Tf(ctx, "EOF, restart ingest audio %v", sourceAudio)
continue
}
logger.Wf(ctx, "Ignore audio err %+v", err)
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if sVideoSender == nil {
return
}
select {
case <-ctx.Done():
case <-pcDone.Done():
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start read video packets")
}
buf := make([]byte, 1500)
for ctx.Err() == nil {
if _, _, err := sVideoSender.Read(buf); err != nil {
return
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if sVideoTrack == nil {
return
}
select {
case <-ctx.Done():
case <-pcDone.Done():
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo)
}
for ctx.Err() == nil {
if err := readVideoTrackFromDisk(ctx, sourceVideo, sVideoSender, fps, sVideoTrack); err != nil {
if errors.Cause(err) == io.EOF {
logger.Tf(ctx, "EOF, restart ingest video %v", sourceVideo)
continue
}
logger.Wf(ctx, "Ignore video err %+v", err)
}
}
}()
wg.Wait()
return err
}
var wg sync.WaitGroup
errs := make(chan error, 0)
wg.Add(1)
go func() {
defer wg.Done()
// Wait for publisher to start first.
select {
case <-ctx.Done():
return
case <-publishReady.Done():
}
errs <- startPlay(logger.WithContext(ctx))
cancel()
}()
wg.Add(1)
go func() {
defer wg.Done()
errs <- startPublish(logger.WithContext(ctx))
cancel()
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
case <-time.After(time.Duration(*srsTimeout) * time.Millisecond):
errs <- errors.Errorf("timeout for %vms", *srsTimeout)
cancel()
}
}()
testDone, testDoneCancel := context.WithCancel(context.Background())
go func() {
wg.Wait()
testDoneCancel()
}()
// Handle errs, the test result.
for {
select {
case <-testDone.Done():
return
case err := <-errs:
if err != nil && err != context.Canceled && !t.Failed() {
t.Errorf("err %+v", err)
}
}
}
}

47
trunk/3rdparty/srs-bench/srs/stat.go vendored Normal file
View file

@ -0,0 +1,47 @@
package srs
import (
"context"
"encoding/json"
"github.com/ossrs/go-oryx-lib/logger"
"net/http"
"strings"
)
type statRTC struct {
Publishers struct {
Expect int `json:"expect"`
Alive int `json:"alive"`
} `json:"publishers"`
Subscribers struct {
Expect int `json:"expect"`
Alive int `json:"alive"`
} `json:"subscribers"`
PeerConnection interface{} `json:"random-pc"`
}
var StatRTC statRTC
func HandleStat(ctx context.Context, mux *http.ServeMux, l string) {
if strings.HasPrefix(l, ":") {
l = "127.0.0.1" + l
}
logger.Tf(ctx, "Handle http://%v/api/v1/sb/rtc", l)
mux.HandleFunc("/api/v1/sb/rtc", func(w http.ResponseWriter, r *http.Request) {
res := &struct {
Code int `json:"code"`
Data interface{} `json:"data"`
}{
0, &StatRTC,
}
b, err := json.Marshal(res)
if err != nil {
logger.Wf(ctx, "marshal %v err %+v", res, err)
return
}
w.Write(b)
})
}

142
trunk/3rdparty/srs-bench/srs/util.go vendored Normal file
View file

@ -0,0 +1,142 @@
package srs
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/pion/webrtc/v3/pkg/media/h264reader"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
)
func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error) {
u, err := url.Parse(r)
if err != nil {
return "", errors.Wrapf(err, "Parse url %v", r)
}
// Build api url.
host := u.Host
if !strings.Contains(host, ":") {
host += ":1985"
}
api := fmt.Sprintf("http://%v", host)
if !strings.HasPrefix(apiPath, "/") {
api += "/"
}
api += apiPath
if !strings.HasSuffix(apiPath, "/") {
api += "/"
}
if u.RawQuery != "" {
api += "?" + u.RawQuery
}
// Build JSON body.
reqBody := struct {
Api string `json:"api"`
ClientIP string `json:"clientip"`
SDP string `json:"sdp"`
StreamURL string `json:"streamurl"`
}{
api, "", offer, r,
}
b, err := json.Marshal(reqBody)
if err != nil {
return "", errors.Wrapf(err, "Marshal body %v", reqBody)
}
logger.If(ctx, "Request url api=%v with %v", api, string(b))
logger.Tf(ctx, "Request url api=%v with %v bytes", api, len(b))
req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
if err != nil {
return "", errors.Wrapf(err, "HTTP request %v", string(b))
}
res, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return "", errors.Wrapf(err, "Do HTTP request %v", string(b))
}
b2, err := ioutil.ReadAll(res.Body)
if err != nil {
return "", errors.Wrapf(err, "Read response for %v", string(b))
}
logger.If(ctx, "Response from %v is %v", api, string(b2))
logger.Tf(ctx, "Response from %v is %v bytes", api, len(b2))
resBody := struct {
Code int `json:"code"`
Session string `json:"sessionid"`
SDP string `json:"sdp"`
}{}
if err := json.Unmarshal(b2, &resBody); err != nil {
return "", errors.Wrapf(err, "Marshal %v", string(b2))
}
if resBody.Code != 0 {
return "", errors.Errorf("Server fail code=%v %v", resBody.Code, string(b2))
}
logger.If(ctx, "Parse response to code=%v, session=%v, sdp=%v",
resBody.Code, resBody.Session, escapeSDP(resBody.SDP))
logger.Tf(ctx, "Parse response to code=%v, session=%v, sdp=%v bytes",
resBody.Code, resBody.Session, len(resBody.SDP))
return string(resBody.SDP), nil
}
func escapeSDP(sdp string) string {
return strings.ReplaceAll(strings.ReplaceAll(sdp, "\r", "\\r"), "\n", "\\n")
}
func packageAsSTAPA(frames ...*h264reader.NAL) *h264reader.NAL {
first := frames[0]
buf := bytes.Buffer{}
buf.WriteByte(
byte(first.RefIdc<<5)&0x60 | byte(24), // STAP-A
)
for _, frame := range frames {
buf.WriteByte(byte(len(frame.Data) >> 8))
buf.WriteByte(byte(len(frame.Data)))
buf.Write(frame.Data)
}
return &h264reader.NAL{
PictureOrderCount: first.PictureOrderCount,
ForbiddenZeroBit: false,
RefIdc: first.RefIdc,
UnitType: h264reader.NalUnitType(24), // STAP-A
Data: buf.Bytes(),
}
}
type wallClock struct {
start time.Time
duration time.Duration
}
func newWallClock() *wallClock {
return &wallClock{start: time.Now()}
}
func (v *wallClock) Tick(d time.Duration) time.Duration {
v.duration += d
wc := time.Now().Sub(v.start)
re := v.duration - wc
if re > 30*time.Millisecond {
return re
}
return 0
}

View file

@ -2,6 +2,7 @@
listen 1935;
max_connections 1000;
daemon on;
disable_daemon_for_docker off;
srs_log_tank file;
http_server {