1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

DVR: Support blackbox test based on hooks. v5.0.132 (#3365)

This commit is contained in:
Winlin 2023-01-07 20:36:59 +08:00 committed by winlin
parent a27ce1d50f
commit e655948e96
31 changed files with 4704 additions and 3925 deletions

View file

@ -27,6 +27,7 @@ import (
"flag"
"fmt"
"github.com/ossrs/go-oryx-lib/errors"
ohttp "github.com/ossrs/go-oryx-lib/http"
"github.com/ossrs/go-oryx-lib/logger"
"io/ioutil"
"math/rand"
@ -200,6 +201,8 @@ type backendService struct {
name string
args []string
env []string
// If timeout, kill the process.
duration time.Duration
// The process stdout and stderr.
stdout bytes.Buffer
@ -315,6 +318,22 @@ func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) err
// The context for SRS process.
processDone, processDoneCancel := context.WithCancel(context.Background())
// If exceed timeout, kill the process.
v.wg.Add(1)
go func() {
defer v.wg.Done()
if v.duration <= 0 {
return
}
select {
case <- ctx.Done():
case <-time.After(v.duration):
logger.Tf(ctx, "Process killed duration=%v, pid=%v, name=%v, args=%v", v.duration, v.pid, v.name, v.args)
cmd.Process.Kill()
}
}()
// If SRS process terminated, notify case to stop.
v.wg.Add(1)
go func() {
@ -327,12 +346,12 @@ func (v *backendService) Run(ctx context.Context, cancel context.CancelFunc) err
defer processDoneCancel()
if err := cmd.Wait(); err != nil && !v.ignoreExitStatusError {
v.r0 = errors.Wrapf(err, "Process wait err, name=%v, args=%v", v.name, v.args)
v.r0 = errors.Wrapf(err, "Process wait err, pid=%v, name=%v, args=%v", v.pid, v.name, v.args)
}
if v.onStop != nil {
if err := v.onStop(ctx, v, cmd, v.r0, &v.stdout, &v.stderr); err != nil {
if v.r0 == nil {
v.r0 = errors.Wrapf(err, "Process onStop err, name=%v, args=%v", v.name, v.args)
v.r0 = errors.Wrapf(err, "Process onStop err, pid=%v, name=%v, args=%v", v.pid, v.name, v.args)
} else {
logger.Ef(ctx, "Process onStop err %v", err)
}
@ -435,7 +454,7 @@ type srsServer struct {
func NewSRSServer(opts ...func(v *srsServer)) SRSServer {
rid := fmt.Sprintf("%v-%v", os.Getpid(), rand.Int())
v := &srsServer{
workDir: "./",
workDir: path.Join("objs", fmt.Sprintf("%v", rand.Int())),
srsID: fmt.Sprintf("srs-id-%v", rid),
process: newBackendService(),
}
@ -443,7 +462,7 @@ func NewSRSServer(opts ...func(v *srsServer)) SRSServer {
// If we run in GoLand, the current directory is in blackbox, so we use parent directory.
if _, err := os.Stat("objs"); err != nil {
v.workDir = "../"
v.workDir = path.Join("..", "objs", fmt.Sprintf("%v", rand.Int()))
}
// Do allocate resource.
@ -461,22 +480,12 @@ func NewSRSServer(opts ...func(v *srsServer)) SRSServer {
allocator.Free(v.httpListen)
allocator.Free(v.srtListen)
pidFile := path.Join(v.workDir, v.srsRelativePidFile)
if _, err := os.Stat(pidFile); err == nil {
os.Remove(pidFile)
if _, err := os.Stat(v.workDir); err == nil {
os.RemoveAll(v.workDir)
}
idFile := path.Join(v.workDir, v.srsRelativeIDFile)
if _, err := os.Stat(idFile); err == nil {
os.Remove(idFile)
}
hlsFiles := path.Join(v.workDir, "objs", "live")
if _, err := os.Stat(hlsFiles); err == nil {
os.RemoveAll(hlsFiles)
}
logger.Tf(ctx, "SRS server is closed, id=%v, pid=%v, r0=%v", v.srsID, bs.pid, bs.r0)
logger.Tf(ctx, "SRS server is closed, id=%v, pid=%v, cleanup=%v r0=%v",
v.srsID, bs.pid, v.workDir, bs.r0)
return nil
}
@ -512,20 +521,39 @@ func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error {
v.workDir, *srsBinary, v.srsID, v.srsRelativePidFile, v.rtmpListen,
)
// Create directories.
if err := os.MkdirAll(path.Join(v.workDir, "./objs/nginx/html"), os.FileMode(0755) | os.ModeDir); err != nil {
return errors.Wrapf(err, "SRS create directory %v", path.Join(v.workDir, "./objs/nginx/html"))
}
// Setup the name and args of process.
v.process.name = *srsBinary
v.process.args = []string{"-e"}
// Setup the envrionment variables.
// Setup the constant values.
v.process.env = []string{
// SRS working directory.
fmt.Sprintf("SRS_WORK_DIR=%v", v.workDir),
// Run in frontend.
"SRS_DAEMON=off",
// Write logs to stdout and stderr.
"SRS_SRS_LOG_FILE=console",
// Disable warning for asan.
"MallocNanoZone=0",
// Avoid error for macOS, which ulimit to 256.
"SRS_MAX_CONNECTIONS=100",
}
// For directories.
v.process.env = append(v.process.env, []string{
// SRS working directory.
fmt.Sprintf("SRS_WORK_DIR=%v", v.workDir),
// Setup the default directory for HTTP server.
"SRS_HTTP_SERVER_DIR=./objs/nginx/html",
// Setup the default directory for HLS stream.
"SRS_VHOST_HLS_HLS_PATH=./objs/nginx/html",
"SRS_VHOST_HLS_HLS_M3U8_FILE=[app]/[stream].m3u8",
"SRS_VHOST_HLS_HLS_TS_FILE=[app]/[stream]-[seq].ts",
}...)
// For variables.
v.process.env = append(v.process.env, []string{
// SRS PID file.
fmt.Sprintf("SRS_PID=%v", v.srsRelativePidFile),
// SRS ID file.
@ -533,19 +561,13 @@ func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error {
// HTTP API to detect the service.
fmt.Sprintf("SRS_HTTP_API_ENABLED=on"),
fmt.Sprintf("SRS_HTTP_API_LISTEN=%v", v.apiListen),
// Avoid error for macOS, which ulimit to 256.
"SRS_MAX_CONNECTIONS=100",
// Setup the default directory for HTTP server.
"SRS_HTTP_SERVER_DIR=objs",
// Setup the default directory for HLS stream.
"SRS_VHOST_HLS_HLS_PATH=objs",
// Setup the RTMP listen port.
fmt.Sprintf("SRS_LISTEN=%v", v.rtmpListen),
// Setup the HTTP sever listen port.
fmt.Sprintf("SRS_HTTP_SERVER_LISTEN=%v", v.httpListen),
// Setup the SRT server listen port.
fmt.Sprintf("SRS_SRT_SERVER_LISTEN=%v", v.srtListen),
}
}...)
// Rewrite envs by case.
if v.envs != nil {
v.process.env = append(v.process.env, v.envs...)
@ -588,8 +610,8 @@ func (v *srsServer) Run(ctx context.Context, cancel context.CancelFunc) error {
// Hooks for process.
v.process.onBeforeStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
logger.Tf(ctx, "SRS id=%v, env=%v, cmd is %v %v",
v.srsID, cmd.Env, bs.name, strings.Join(bs.args, " "))
logger.Tf(ctx, "SRS id=%v, env %v %v %v",
v.srsID, strings.Join(cmd.Env, " "), bs.name, strings.Join(bs.args, " "))
return nil
}
v.process.onAfterStart = func(ctx context.Context, bs *backendService, cmd *exec.Cmd) error {
@ -625,11 +647,16 @@ type ffmpegClient struct {
// FFmpeg cli args, without ffmpeg binary.
args []string
// Let the process quit, do not cancel the case.
cancelCaseWhenQuit bool
// When timeout, stop FFmpeg, sometimes the '-t' does not work.
ffmpegDuration time.Duration
}
func NewFFmpeg(opts ...func(v *ffmpegClient)) FFmpegClient {
v := &ffmpegClient{
process: newBackendService(),
cancelCaseWhenQuit: true,
}
// Do cleanup.
@ -657,6 +684,7 @@ func (v *ffmpegClient) Run(ctx context.Context, cancel context.CancelFunc) error
v.process.name = *srsFFmpeg
v.process.args = v.args
v.process.env = os.Environ()
v.process.duration = v.ffmpegDuration
v.process.onStop = func(ctx context.Context, bs *backendService, cmd *exec.Cmd, r0 error, stdout, stderr *bytes.Buffer) error {
logger.Tf(ctx, "FFmpeg process pid=%v exit, r0=%v, stdout=%v", bs.pid, r0, stdout.String())
@ -666,7 +694,20 @@ func (v *ffmpegClient) Run(ctx context.Context, cancel context.CancelFunc) error
return nil
}
return v.process.Run(ctx, cancel)
// We might not want to cancel the case, for example, when check DVR by session, we just let the FFmpeg process to
// quit and we should check the callback and DVR file.
ffCtx, ffCancel := context.WithCancel(ctx)
go func() {
select {
case <- ctx.Done():
case <-ffCtx.Done():
if v.cancelCaseWhenQuit {
cancel()
}
}
}()
return v.process.Run(ffCtx, ffCancel)
}
type FFprobeClient interface {
@ -678,16 +719,19 @@ type FFprobeClient interface {
}
type ffprobeClient struct {
// The stream to probe.
streamURL string
// The DVR file for ffprobe. We DVR stream to file, then use ffprobe to detect it.
// The DVR file for ffprobe. Stream should be DVR to file, then use ffprobe to detect it. If DVR by FFmpeg, we will
// start a FFmpeg process to do the DVR, or the DVR should be done by other tools.
dvrFile string
// The duration of video file for DVR.
duration time.Duration
// The timeout to wait for task to done.
timeout time.Duration
// Whether do DVR by FFmpeg, if using SRS DVR, please set to false.
dvrByFFmpeg bool
// The stream to DVR for probing. Ignore if not DVR by ffmpeg
streamURL string
// The duration of video file for DVR and probing.
duration time.Duration
// When probe stream metadata object.
doneCtx context.Context
doneCancel context.CancelFunc
@ -699,7 +743,8 @@ type ffprobeClient struct {
func NewFFprobe(opts ...func(v *ffprobeClient)) FFprobeClient {
v := &ffprobeClient{
metadata: &ffprobeObject{},
metadata: &ffprobeObject{},
dvrByFFmpeg: true,
}
v.doneCtx, v.doneCancel = context.WithCancel(context.Background())
@ -728,9 +773,12 @@ func (v *ffprobeClient) Run(ctxCase context.Context, cancelCase context.CancelFu
// Try to start a DVR process.
for ctx.Err() == nil {
// If error, just ignore and retry, because the stream might not be ready. For example, for HLS, the DVR process
// might need to wait for a duration of segment, 10s as such.
_ = v.doDVR(ctx)
// If not DVR by FFmpeg, we just wait the DVR file to be ready, and it should be done by SRS or other tools.
if v.dvrByFFmpeg {
// If error, just ignore and retry, because the stream might not be ready. For example, for HLS, the DVR process
// might need to wait for a duration of segment, 10s as such.
_ = v.doDVR(ctx)
}
// Check whether DVR file is ok.
if fs, err := os.Stat(v.dvrFile); err == nil && fs.Size() > 1024 {
@ -738,9 +786,14 @@ func (v *ffprobeClient) Run(ctxCase context.Context, cancelCase context.CancelFu
break
}
// If not DVR by FFmpeg, must be by other tools, only need to wait.
if !v.dvrByFFmpeg {
logger.Tf(ctx, "Waiting stream=%v to be DVR", v.streamURL)
}
// Wait for a while and retry. Use larger timeout for HLS.
retryTimeout := 1 * time.Second
if strings.Contains(v.streamURL, ".m3u8") {
if strings.Contains(v.streamURL, ".m3u8") || v.dvrFile == "" {
retryTimeout = 3 * time.Second
}
@ -763,6 +816,10 @@ func (v *ffprobeClient) Run(ctxCase context.Context, cancelCase context.CancelFu
func (v *ffprobeClient) doDVR(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
if !v.dvrByFFmpeg {
return nil
}
process := newBackendService()
process.name = *srsFFmpeg
process.args = []string{
@ -1070,3 +1127,167 @@ func (v *ffprobeObject) Audio() *ffprobeObjectMedia {
}
return nil
}
type HooksEvent interface {
HookAction() string
}
type HooksEventBase struct {
Action string `json:"action"`
}
func (v *HooksEventBase) HookAction() string {
return v.Action
}
type HooksEventOnDvr struct {
HooksEventBase
Stream string `json:"stream"`
StreamUrl string `json:"stream_url"`
StreamID string `json:"stream_id"`
CWD string `json:"cwd"`
File string `json:"file"`
TcUrl string `json:"tcUrl"`
App string `json:"app"`
Vhost string `json:"vhost"`
IP string `json:"ip"`
ClientIP string `json:"client_id"`
ServerID string `json:"server_id"`
}
type HooksService interface {
ServiceRunner
ServiceReadyQuerier
HooksAPI() int
HooksEvents() <-chan HooksEvent
}
type hooksService struct {
readyCtx context.Context
readyCancel context.CancelFunc
httpPort int
dispose func()
r0 error
hooksOnDvr chan HooksEvent
}
func NewHooksService(opts ...func(v *hooksService)) HooksService {
v := &hooksService{}
v.httpPort = allocator.Allocate()
v.dispose = func() {
allocator.Free(v.httpPort)
close(v.hooksOnDvr)
}
v.hooksOnDvr = make(chan HooksEvent, 64)
v.readyCtx, v.readyCancel = context.WithCancel(context.Background())
for _, opt := range opts {
opt(v)
}
return v
}
func (v *hooksService) ReadyCtx() context.Context {
return v.readyCtx
}
func (v *hooksService) HooksAPI() int {
return v.httpPort
}
func (v *hooksService) HooksEvents() <-chan HooksEvent {
return v.hooksOnDvr
}
func (v *hooksService) Run(ctx context.Context, cancel context.CancelFunc) error {
defer func() {
v.readyCancel()
v.dispose()
}()
handler := http.ServeMux{}
handler.HandleFunc("/api/v1/ping", func(w http.ResponseWriter, r *http.Request) {
ohttp.WriteData(ctx, w, r, "pong")
})
handler.HandleFunc("/api/v1/dvrs", func(w http.ResponseWriter, r *http.Request) {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
ohttp.WriteError(ctx, w, r, err)
return
}
evt := HooksEventOnDvr{}
if err := json.Unmarshal(b, &evt); err != nil {
ohttp.WriteError(ctx, w, r, err)
return
}
select {
case <-ctx.Done():
case v.hooksOnDvr <- &evt:
}
logger.Tf(ctx, "Callback: Got on_dvr request %v", string(b))
ohttp.WriteData(ctx, w, r, nil)
})
server := &http.Server{Addr: fmt.Sprintf(":%v", v.httpPort), Handler: &handler}
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
logger.Tf(ctx, "Callback: Start hooks server, listen=%v", v.httpPort)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Wf(ctx, "Callback: Service listen=%v, err %v", v.httpPort, err)
v.r0 = errors.Wrapf(err, "server listen=%v", v.httpPort)
cancel()
return
}
logger.Tf(ctx, "Callback: Hooks done, listen=%v", v.httpPort)
}()
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
go server.Shutdown(context.Background())
}()
wg.Add(1)
go func() {
defer wg.Done()
for ctx.Err() == nil {
time.Sleep(100 * time.Millisecond)
r := fmt.Sprintf("http://localhost:%v/api/v1/ping", v.httpPort)
res, err := http.Get(r)
if err != nil {
continue
}
defer res.Body.Close()
b, err := ioutil.ReadAll(res.Body)
if err != nil {
continue
}
logger.Tf(ctx, "Callback: API is ready, %v %v", r, string(b))
v.readyCancel()
return
}
}()
wg.Wait()
return v.r0
}