1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

SRS5: Test: Add blackbox for HLS.

PICK 30779f3b5a
This commit is contained in:
winlin 2023-01-02 22:47:51 +08:00
parent 4c2db0073a
commit 2cab98aa68
3 changed files with 231 additions and 48 deletions

View file

@ -0,0 +1,116 @@
// The MIT License (MIT)
//
// # Copyright (c) 2023 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package blackbox
import (
"context"
"fmt"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"math/rand"
"os"
"path"
"sync"
"testing"
"time"
)
func TestRtmpPublish_HlsPlay_Basic(t *testing.T) {
// This case is run in parallel.
t.Parallel()
// Setup the max timeout for this case.
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
defer cancel()
// Check a set of errors.
var r0, r1, r2, r3, r4 error
defer func(ctx context.Context) {
if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4); err != nil {
t.Errorf("Fail for err %+v", err)
} else {
logger.Tf(ctx, "test done with err %+v", err)
}
}(ctx)
var wg sync.WaitGroup
defer wg.Wait()
// Start SRS server and wait for it to be ready.
svr := NewSRSServer(func(v *srsServer) {
v.envs = []string{
"SRS_HTTP_SERVER_ENABLED=on",
"SRS_VHOST_HLS_ENABLED=on",
}
})
wg.Add(1)
go func() {
defer wg.Done()
r0 = svr.Run(ctx, cancel)
}()
// Start FFmpeg to publish stream.
streamID := fmt.Sprintf("stream-%v-%v", os.Getpid(), rand.Int())
streamURL := fmt.Sprintf("rtmp://localhost:%v/live/%v", svr.RTMPPort(), streamID)
ffmpeg := NewFFmpeg(func(v *ffmpegClient) {
v.args = []string{
"-stream_loop", "-1", "-re", "-i", *srsPublishAvatar, "-c", "copy", "-f", "flv", streamURL,
}
})
wg.Add(1)
go func() {
defer wg.Done()
<-svr.ReadyCtx().Done()
r1 = ffmpeg.Run(ctx, cancel)
}()
// Start FFprobe to detect and verify stream.
duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
ffprobe := NewFFprobe(func(v *ffprobeClient) {
v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.ts", streamID))
v.streamURL = fmt.Sprintf("http://localhost:%v/live/%v.m3u8", svr.HTTPPort(), streamID)
v.duration, v.timeout = duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
})
wg.Add(1)
go func() {
defer wg.Done()
<-svr.ReadyCtx().Done()
r2 = ffprobe.Run(ctx, cancel)
}()
// Fast quit for probe done.
select {
case <-ctx.Done():
case <-ffprobe.ProbeDoneCtx().Done():
defer cancel()
str, m := ffprobe.Result()
if len(m.Streams) != 2 {
r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
}
// Note that HLS score is low, so we only check duration. Note that only check half of duration, because we
// might get only some pieces of segments.
if dv := m.Duration(); dv < duration/2 {
r4 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration/2, m.String(), str)
}
}
}

View file

@ -42,9 +42,9 @@ func TestRtmpPublish_RtmpPlay_Basic(t *testing.T) {
defer cancel()
// Check a set of errors.
var r0, r1, r2, r3 error
var r0, r1, r2, r3, r4, r5 error
defer func(ctx context.Context) {
if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil {
if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4, r5); err != nil {
t.Errorf("Fail for err %+v", err)
} else {
logger.Tf(ctx, "test done with err %+v", err)
@ -55,11 +55,7 @@ func TestRtmpPublish_RtmpPlay_Basic(t *testing.T) {
defer wg.Wait()
// Start SRS server and wait for it to be ready.
svr := NewSRSServer(func(v *srsServer) {
v.envs = []string{
fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen),
}
})
svr := NewSRSServer()
wg.Add(1)
go func() {
defer wg.Done()
@ -84,7 +80,7 @@ func TestRtmpPublish_RtmpPlay_Basic(t *testing.T) {
// Start FFprobe to detect and verify stream.
duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
ffprobe := NewFFprobe(func(v *ffprobeClient) {
v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.mp4", streamID))
v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.flv", streamID))
v.streamURL, v.duration, v.timeout = streamURL, duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
})
wg.Add(1)
@ -104,6 +100,13 @@ func TestRtmpPublish_RtmpPlay_Basic(t *testing.T) {
if len(m.Streams) != 2 {
r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
}
if ts := 90; m.Format.ProbeScore < ts {
r4 = errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str)
}
if dv := m.Duration(); dv < duration {
r5 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration, m.String(), str)
}
}
}
@ -116,9 +119,9 @@ func TestRtmpPublish_FlvPlay_Basic(t *testing.T) {
defer cancel()
// Check a set of errors.
var r0, r1, r2, r3 error
var r0, r1, r2, r3, r4, r5 error
defer func(ctx context.Context) {
if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil {
if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4, r5); err != nil {
t.Errorf("Fail for err %+v", err)
} else {
logger.Tf(ctx, "test done with err %+v", err)
@ -131,9 +134,7 @@ func TestRtmpPublish_FlvPlay_Basic(t *testing.T) {
// Start SRS server and wait for it to be ready.
svr := NewSRSServer(func(v *srsServer) {
v.envs = []string{
fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen),
"SRS_HTTP_SERVER_ENABLED=on",
fmt.Sprintf("SRS_HTTP_SERVER_LISTEN=%v", v.httpListen),
"SRS_VHOST_HTTP_REMUX_ENABLED=on",
}
})
@ -161,7 +162,7 @@ func TestRtmpPublish_FlvPlay_Basic(t *testing.T) {
// Start FFprobe to detect and verify stream.
duration := time.Duration(*srsFFprobeDuration) * time.Millisecond
ffprobe := NewFFprobe(func(v *ffprobeClient) {
v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.mp4", streamID))
v.dvrFile = path.Join(svr.WorkDir(), "objs", fmt.Sprintf("srs-ffprobe-%v.flv", streamID))
v.streamURL = fmt.Sprintf("http://localhost:%v/live/%v.flv", svr.HTTPPort(), streamID)
v.duration, v.timeout = duration, time.Duration(*srsFFprobeTimeout)*time.Millisecond
})
@ -182,5 +183,12 @@ func TestRtmpPublish_FlvPlay_Basic(t *testing.T) {
if len(m.Streams) != 2 {
r3 = errors.Errorf("invalid streams=%v, %v, %v", len(m.Streams), m.String(), str)
}
if ts := 90; m.Format.ProbeScore < ts {
r4 = errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str)
}
if dv := m.Duration(); dv < duration {
r5 = errors.Errorf("short duration=%v < %v, %v, %v", dv, duration, m.String(), str)
}
}
}

View file

@ -206,12 +206,20 @@ type backendService struct {
r0 error
// The process pid.
pid int
// Whether ignore process exit status error.
ignoreExitStatusError bool
// Hooks for owner.
// Before start the process.
onBeforeStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
onAfterStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
onStop func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error
onDispose func(ctx context.Context, bs *backendService) error
// After started the process.
onAfterStart func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
// Before kill the process, when case is done.
onBeforeKill func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error
// After stopped the process. Always callback when run is called.
onStop func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error
// When dispose the process. Always callback when run is called.
onDispose func(ctx context.Context, bs *backendService) error
}
func newBackendService(opts ...func(v *backendService)) *backendService {
@ -246,7 +254,7 @@ func (v *backendService) Close() error {
v.onDispose(v.caseCtx, v)
}
logger.Tf(v.caseCtx, "Service is closed, pid=%v, r0=%v", v.pid, v.r0)
logger.Tf(v.caseCtx, "Process is closed, pid=%v, r0=%v", v.pid, v.r0)
return nil
}
@ -255,6 +263,9 @@ func (v *backendService) ReadyCtx() context.Context {
}
func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) error {
// Always dispose resource of process.
defer v.Close()
// Start SRS with -e, which only use environment variables.
cmd := exec.Command(v.name, v.args...)
@ -288,7 +299,6 @@ func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) err
if err := cmd.Start(); err != nil {
return err
}
defer v.Close()
// Now process started, query the pid.
v.pid = cmd.Process.Pid
@ -314,12 +324,15 @@ func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) err
// Notify other goroutine, SRS already done.
defer processDoneCancel()
v.r0 = cmd.Wait()
if err := cmd.Wait(); err != nil && !v.ignoreExitStatusError {
v.r0 = errors.Wrapf(err, "Process wait err, name=%v, args=%v", v.name, v.args)
}
if v.onStop != nil {
if r1 := v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr); r1 != nil {
logger.Wf(ctx, "Process onStop err %v", r1)
if err := v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr); err != nil {
if v.r0 == nil {
v.r0 = r1
v.r0 = errors.Wrapf(err, "Process onStop err, name=%v, args=%v", v.name, v.args)
} else {
logger.Ef(ctx, "Process onStop err %v", err)
}
}
}
@ -332,6 +345,11 @@ func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) err
select {
case <-ctx.Done():
// Notify owner that we're going to kill the process.
if v.onBeforeKill != nil {
v.onBeforeKill(ctx, v, cmd)
}
// When case terminated, also terminate the SRS process.
cmd.Process.Signal(syscall.SIGINT)
case <-processDone.Done():
@ -339,7 +357,7 @@ func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) err
return
}
// Start a goroutine to ensure SRS killed.
// Start a goroutine to ensure process killed.
go func() {
time.Sleep(3 * time.Second)
if processDone.Err() == nil { // Ignore if already done.
@ -436,15 +454,20 @@ func NewSRSServer(opts ...func(v *srsServer)) SRSServer {
allocator.Free(v.httpListen)
pidFile := path.Join(v.workDir, v.srsRelativePidFile)
if _, err := os.Stat(pidFile); !os.IsNotExist(err) {
if _, err := os.Stat(pidFile); err == nil {
os.Remove(pidFile)
}
idFile := path.Join(v.workDir, v.srsRelativeIDFile)
if _, err := os.Stat(idFile); !os.IsNotExist(err) {
if _, err := os.Stat(idFile); err == nil {
os.Remove(idFile)
}
hlsFiles := path.Join(v.workDir, "objs", "live")
if _, err := os.Stat(hlsFiles); err == nil {
os.RemoveAll(hlsFiles)
}
logger.Tf(ctx, "SRS server is closed, id=%v, pid=%v, r0=%v", v.srsID, bs.pid, bs.r0)
return nil
}
@ -500,6 +523,14 @@ func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error {
fmt.Sprintf("SRS_HTTP_API_LISTEN=%v", v.apiListen),
// Avoid error for macOS, which ulimit to 256.
"SRS_MAX_CONNECTIONS=100",
// Setup the default directory for HTTP server.
"SRS_HTTP_SERVER_DIR=objs",
// Setup the default directory for HLS stream.
"SRS_VHOST_HLS_HLS_PATH=objs",
// Setup the RTMP listen port.
fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen),
// Setup the HTTP sever listen port.
fmt.Sprintf("SRS_HTTP_SERVER_LISTEN=%v", v.httpListen),
}
// Rewrite envs by case.
if v.envs != nil {
@ -523,7 +554,8 @@ func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error {
for ctx.Err() == nil {
time.Sleep(100 * time.Millisecond)
res, err := http.Get(fmt.Sprintf("http://localhost:%v/api/v1/versions", v.apiListen))
r := fmt.Sprintf("http://localhost:%v/api/v1/versions", v.apiListen)
res, err := http.Get(r)
if err != nil {
continue
}
@ -534,7 +566,7 @@ func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error {
continue
}
logger.Tf(ctx, "SRS API is ready, %v", string(b))
logger.Tf(ctx, "SRS API is ready, %v %v", r, string(b))
v.readyCtxCancel()
return
}
@ -591,6 +623,9 @@ func NewFFmpeg(opts ...func(v *ffmpegClient)) FFmpegClient {
return nil
}
// We ignore any exit error, because FFmpeg might exit with error even publish ok.
v.process.ignoreExitStatusError = true
for _, opt := range opts {
opt(v)
}
@ -670,22 +705,35 @@ func (v *ffprobeClient) Result() (string, *ffprobeObject) {
}
func (v *ffprobeClient) Run(ctxCase context.Context, cancelCase context.CancelFunc) error {
ctx, cancel := context.WithTimeout(ctxCase, v.timeout)
defer cancel()
if true {
ctx, cancel := context.WithTimeout(ctxCase, v.timeout)
defer cancel()
logger.Tf(ctx, "Starting FFprobe for stream=%v, dvr=%v, duration=%v, timeout=%v",
v.streamURL, v.dvrFile, v.duration, v.timeout)
logger.Tf(ctx, "Starting FFprobe for stream=%v, dvr=%v, duration=%v, timeout=%v",
v.streamURL, v.dvrFile, v.duration, v.timeout)
// Try to start a DVR process.
for ctx.Err() == nil {
// If error, just ignore and retry, because the stream might not be ready. For example, for HLS, the DVR process
// might need to wait for a duration of segment, 10s as such.
_ = v.doDVR(ctx)
// Try to start a DVR process.
for ctx.Err() == nil {
// If error, just ignore and retry, because the stream might not be ready. For example, for HLS, the DVR process
// might need to wait for a duration of segment, 10s as such.
_ = v.doDVR(ctx)
// Check whether DVR file is ok.
if fs, err := os.Stat(v.dvrFile); err == nil && fs.Size() > 1024 {
logger.Tf(ctx, "DVR FFprobe file is ok, file=%v, size=%v", v.dvrFile, fs.Size())
break
// Check whether DVR file is ok.
if fs, err := os.Stat(v.dvrFile); err == nil && fs.Size() > 1024 {
logger.Tf(ctx, "DVR FFprobe file is ok, file=%v, size=%v", v.dvrFile, fs.Size())
break
}
// Wait for a while and retry. Use larger timeout for HLS.
retryTimeout := 1 * time.Second
if strings.Contains(v.streamURL, ".m3u8") {
retryTimeout = 3 * time.Second
}
select {
case <-ctx.Done():
case <-time.After(retryTimeout):
}
}
}
@ -695,7 +743,7 @@ func (v *ffprobeClient) Run(ctxCase context.Context, cancelCase context.CancelFu
}
// Start a probe process for the DVR file.
return v.doProbe(ctx, cancelCase)
return v.doProbe(ctxCase, cancelCase)
}
func (v *ffprobeClient) doDVR(ctx context.Context) error {
@ -763,13 +811,6 @@ func (v *ffprobeClient) doProbe(ctx context.Context, cancel context.CancelFunc)
m := v.metadata
logger.Tf(ctx, "FFprobe done pid=%v, %v", bs.pid, m.String())
if ts := 90; m.Format.ProbeScore < ts {
return errors.Errorf("low score=%v < %v, %v, %v", m.Format.ProbeScore, ts, m.String(), str)
}
if dv := m.Duration(); dv < v.duration {
return errors.Errorf("short duration=%v < %v, %v, %v", dv, v.duration, m.String(), str)
}
v.doneCancel()
return nil
}
@ -997,3 +1038,21 @@ func (v *ffprobeObject) Duration() time.Duration {
return time.Duration(dv*1000) * time.Millisecond
}
func (v *ffprobeObject) Video() *ffprobeObjectMedia {
for _, media := range v.Streams {
if media.CodecType == "video" {
return &media
}
}
return nil
}
func (v *ffprobeObject) Audio() *ffprobeObjectMedia {
for _, media := range v.Streams {
if media.CodecType == "audio" {
return &media
}
}
return nil
}