mirror of
https://github.com/ossrs/srs.git
synced 2025-03-09 15:49:59 +00:00
Test: Update srs-bench
This commit is contained in:
parent
c2229d392a
commit
543377ebf1
22 changed files with 4290 additions and 606 deletions
1077
trunk/3rdparty/srs-bench/janus/api.go
vendored
Normal file
1077
trunk/3rdparty/srs-bench/janus/api.go
vendored
Normal file
File diff suppressed because it is too large
Load diff
301
trunk/3rdparty/srs-bench/janus/ingester.go
vendored
Normal file
301
trunk/3rdparty/srs-bench/janus/ingester.go
vendored
Normal file
|
@ -0,0 +1,301 @@
|
|||
// The MIT License (MIT)
|
||||
//
|
||||
// Copyright (c) 2021 Winlin
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
// this software and associated documentation files (the "Software"), to deal in
|
||||
// the Software without restriction, including without limitation the rights to
|
||||
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
// subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
package janus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/pion/interceptor"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/sdp/v3"
|
||||
"github.com/pion/webrtc/v3"
|
||||
"github.com/pion/webrtc/v3/pkg/media"
|
||||
"github.com/pion/webrtc/v3/pkg/media/h264reader"
|
||||
"github.com/pion/webrtc/v3/pkg/media/oggreader"
|
||||
)
|
||||
|
||||
type videoIngester struct {
|
||||
sourceVideo string
|
||||
fps int
|
||||
markerInterceptor *rtpInterceptor
|
||||
sVideoTrack *webrtc.TrackLocalStaticSample
|
||||
sVideoSender *webrtc.RTPSender
|
||||
ready context.Context
|
||||
readyCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func newVideoIngester(sourceVideo string) *videoIngester {
|
||||
v := &videoIngester{markerInterceptor: &rtpInterceptor{}, sourceVideo: sourceVideo}
|
||||
v.ready, v.readyCancel = context.WithCancel(context.Background())
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *videoIngester) Close() error {
|
||||
v.readyCancel()
|
||||
if v.sVideoSender != nil {
|
||||
_ = v.sVideoSender.Stop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *videoIngester) AddTrack(pc *webrtc.PeerConnection, fps int) error {
|
||||
v.fps = fps
|
||||
|
||||
mimeType, trackID := "video/H264", "video"
|
||||
if strings.HasSuffix(v.sourceVideo, ".ivf") {
|
||||
mimeType = "video/VP8"
|
||||
}
|
||||
|
||||
var err error
|
||||
v.sVideoTrack, err = webrtc.NewTrackLocalStaticSample(
|
||||
webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 90000}, trackID, "pion",
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Create video track")
|
||||
}
|
||||
|
||||
v.sVideoSender, err = pc.AddTrack(v.sVideoTrack)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Add video track")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (v *videoIngester) Ingest(ctx context.Context) error {
|
||||
source, sender, track, fps := v.sourceVideo, v.sVideoSender, v.sVideoTrack, v.fps
|
||||
|
||||
f, err := os.Open(source)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Open file %v", source)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// TODO: FIXME: Support ivf for vp8.
|
||||
h264, err := h264reader.NewReader(f)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Open h264 %v", source)
|
||||
}
|
||||
|
||||
enc := sender.GetParameters().Encodings[0]
|
||||
codec := sender.GetParameters().Codecs[0]
|
||||
headers := sender.GetParameters().HeaderExtensions
|
||||
logger.Tf(ctx, "Video %v, tbn=%v, fps=%v, ssrc=%v, pt=%v, header=%v",
|
||||
codec.MimeType, codec.ClockRate, fps, enc.SSRC, codec.PayloadType, headers)
|
||||
|
||||
// OK, we are ready.
|
||||
v.readyCancel()
|
||||
|
||||
clock := newWallClock()
|
||||
sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 / uint64(fps))
|
||||
for ctx.Err() == nil {
|
||||
var sps, pps *h264reader.NAL
|
||||
var oFrames []*h264reader.NAL
|
||||
for ctx.Err() == nil {
|
||||
frame, err := h264.NextNAL()
|
||||
if err == io.EOF {
|
||||
return io.EOF
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Read h264")
|
||||
}
|
||||
|
||||
oFrames = append(oFrames, frame)
|
||||
logger.If(ctx, "NALU %v PictureOrderCount=%v, ForbiddenZeroBit=%v, RefIdc=%v, %v bytes",
|
||||
frame.UnitType.String(), frame.PictureOrderCount, frame.ForbiddenZeroBit, frame.RefIdc, len(frame.Data))
|
||||
|
||||
if frame.UnitType == h264reader.NalUnitTypeSPS {
|
||||
sps = frame
|
||||
} else if frame.UnitType == h264reader.NalUnitTypePPS {
|
||||
pps = frame
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var frames []*h264reader.NAL
|
||||
// Package SPS/PPS to STAP-A
|
||||
if sps != nil && pps != nil {
|
||||
stapA := packageAsSTAPA(sps, pps)
|
||||
frames = append(frames, stapA)
|
||||
}
|
||||
// Append other original frames.
|
||||
for _, frame := range oFrames {
|
||||
if frame.UnitType != h264reader.NalUnitTypeSPS && frame.UnitType != h264reader.NalUnitTypePPS {
|
||||
frames = append(frames, frame)
|
||||
}
|
||||
}
|
||||
|
||||
// Covert frames to sample(buffers).
|
||||
for i, frame := range frames {
|
||||
sample := media.Sample{Data: frame.Data, Duration: sampleDuration}
|
||||
// Use the sample timestamp for frames.
|
||||
if i != len(frames)-1 {
|
||||
sample.Duration = 0
|
||||
}
|
||||
|
||||
// For STAP-A, set marker to false, to make Chrome happy.
|
||||
if ri := v.markerInterceptor; ri.rtpWriter == nil {
|
||||
ri.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
// TODO: Should we decode to check whether SPS/PPS?
|
||||
if len(payload) > 0 && payload[0]&0x1f == 24 {
|
||||
header.Marker = false // 24, STAP-A
|
||||
}
|
||||
return ri.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}
|
||||
|
||||
if err = track.WriteSample(sample); err != nil {
|
||||
return errors.Wrapf(err, "Write sample")
|
||||
}
|
||||
}
|
||||
|
||||
if d := clock.Tick(sampleDuration); d > 0 {
|
||||
time.Sleep(d)
|
||||
}
|
||||
}
|
||||
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
type audioIngester struct {
|
||||
sourceAudio string
|
||||
audioLevelInterceptor *rtpInterceptor
|
||||
sAudioTrack *webrtc.TrackLocalStaticSample
|
||||
sAudioSender *webrtc.RTPSender
|
||||
ready context.Context
|
||||
readyCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func newAudioIngester(sourceAudio string) *audioIngester {
|
||||
v := &audioIngester{audioLevelInterceptor: &rtpInterceptor{}, sourceAudio: sourceAudio}
|
||||
v.ready, v.readyCancel = context.WithCancel(context.Background())
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *audioIngester) Close() error {
|
||||
v.readyCancel() // OK we are closed, also ready.
|
||||
|
||||
if v.sAudioSender != nil {
|
||||
_ = v.sAudioSender.Stop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *audioIngester) AddTrack(pc *webrtc.PeerConnection) error {
|
||||
var err error
|
||||
|
||||
mimeType, trackID := "audio/opus", "audio"
|
||||
v.sAudioTrack, err = webrtc.NewTrackLocalStaticSample(
|
||||
webrtc.RTPCodecCapability{MimeType: mimeType, ClockRate: 48000, Channels: 2}, trackID, "pion",
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Create audio track")
|
||||
}
|
||||
|
||||
v.sAudioSender, err = pc.AddTrack(v.sAudioTrack)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Add audio track")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *audioIngester) Ingest(ctx context.Context) error {
|
||||
source, sender, track := v.sourceAudio, v.sAudioSender, v.sAudioTrack
|
||||
|
||||
f, err := os.Open(source)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Open file %v", source)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
ogg, _, err := oggreader.NewWith(f)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Open ogg %v", source)
|
||||
}
|
||||
|
||||
enc := sender.GetParameters().Encodings[0]
|
||||
codec := sender.GetParameters().Codecs[0]
|
||||
headers := sender.GetParameters().HeaderExtensions
|
||||
logger.Tf(ctx, "Audio %v, tbn=%v, channels=%v, ssrc=%v, pt=%v, header=%v",
|
||||
codec.MimeType, codec.ClockRate, codec.Channels, enc.SSRC, codec.PayloadType, headers)
|
||||
|
||||
// Whether should encode the audio-level in RTP header.
|
||||
var audioLevel *webrtc.RTPHeaderExtensionParameter
|
||||
for _, h := range headers {
|
||||
if h.URI == sdp.AudioLevelURI {
|
||||
audioLevel = &h
|
||||
}
|
||||
}
|
||||
|
||||
// OK, we are ready.
|
||||
v.readyCancel()
|
||||
|
||||
clock := newWallClock()
|
||||
var lastGranule uint64
|
||||
|
||||
for ctx.Err() == nil {
|
||||
pageData, pageHeader, err := ogg.ParseNextPage()
|
||||
if err == io.EOF {
|
||||
return io.EOF
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Read ogg")
|
||||
}
|
||||
|
||||
// The amount of samples is the difference between the last and current timestamp
|
||||
sampleCount := pageHeader.GranulePosition - lastGranule
|
||||
lastGranule = pageHeader.GranulePosition
|
||||
sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 * sampleCount / uint64(codec.ClockRate))
|
||||
|
||||
// For audio-level, set the extensions if negotiated.
|
||||
if ri := v.audioLevelInterceptor; ri.rtpWriter == nil {
|
||||
ri.rtpWriter = func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
if audioLevel != nil {
|
||||
audioLevelPayload, err := new(rtp.AudioLevelExtension).Marshal()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
_ = header.SetExtension(uint8(audioLevel.ID), audioLevelPayload)
|
||||
}
|
||||
|
||||
return ri.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
}
|
||||
|
||||
if err = track.WriteSample(media.Sample{Data: pageData, Duration: sampleDuration}); err != nil {
|
||||
return errors.Wrapf(err, "Write sample")
|
||||
}
|
||||
|
||||
if d := clock.Tick(sampleDuration); d > 0 {
|
||||
time.Sleep(d)
|
||||
}
|
||||
}
|
||||
|
||||
return ctx.Err()
|
||||
}
|
158
trunk/3rdparty/srs-bench/janus/interceptor.go
vendored
Normal file
158
trunk/3rdparty/srs-bench/janus/interceptor.go
vendored
Normal file
|
@ -0,0 +1,158 @@
|
|||
// The MIT License (MIT)
|
||||
//
|
||||
// Copyright (c) 2021 Winlin
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
// this software and associated documentation files (the "Software"), to deal in
|
||||
// the Software without restriction, including without limitation the rights to
|
||||
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
// subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
package janus
|
||||
|
||||
import (
|
||||
"github.com/pion/interceptor"
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
type rtpInterceptorOptionFunc func(i *rtpInterceptor)
|
||||
|
||||
// Common RTP packet interceptor for benchmark.
|
||||
// @remark Should never merge with rtcpInterceptor, because they has the same Write interface.
|
||||
type rtpInterceptor struct {
|
||||
// If rtpReader is nil, use the default next one to read.
|
||||
rtpReader interceptor.RTPReaderFunc
|
||||
nextRTPReader interceptor.RTPReader
|
||||
// If rtpWriter is nil, use the default next one to write.
|
||||
rtpWriter interceptor.RTPWriterFunc
|
||||
nextRTPWriter interceptor.RTPWriter
|
||||
// Other common fields.
|
||||
bypassInterceptor
|
||||
}
|
||||
|
||||
func newRTPInterceptor(options ...rtpInterceptorOptionFunc) *rtpInterceptor {
|
||||
v := &rtpInterceptor{}
|
||||
for _, opt := range options {
|
||||
opt(v)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *rtpInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
|
||||
v.nextRTPWriter = writer
|
||||
return v // Handle all RTP
|
||||
}
|
||||
|
||||
func (v *rtpInterceptor) Write(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
|
||||
if v.rtpWriter != nil {
|
||||
return v.rtpWriter(header, payload, attributes)
|
||||
}
|
||||
return v.nextRTPWriter.Write(header, payload, attributes)
|
||||
}
|
||||
|
||||
func (v *rtpInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
|
||||
}
|
||||
|
||||
func (v *rtpInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
|
||||
v.nextRTPReader = reader
|
||||
return v // Handle all RTP
|
||||
}
|
||||
|
||||
func (v *rtpInterceptor) Read(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if v.rtpReader != nil {
|
||||
return v.rtpReader(b, a)
|
||||
}
|
||||
return v.nextRTPReader.Read(b, a)
|
||||
}
|
||||
|
||||
func (v *rtpInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
|
||||
}
|
||||
|
||||
type rtcpInterceptorOptionFunc func(i *rtcpInterceptor)
|
||||
|
||||
// Common RTCP packet interceptor for benchmark.
|
||||
// @remark Should never merge with rtpInterceptor, because they has the same Write interface.
|
||||
type rtcpInterceptor struct {
|
||||
// If rtcpReader is nil, use the default next one to read.
|
||||
rtcpReader interceptor.RTCPReaderFunc
|
||||
nextRTCPReader interceptor.RTCPReader
|
||||
// If rtcpWriter is nil, use the default next one to write.
|
||||
rtcpWriter interceptor.RTCPWriterFunc
|
||||
nextRTCPWriter interceptor.RTCPWriter
|
||||
// Other common fields.
|
||||
bypassInterceptor
|
||||
}
|
||||
|
||||
func newRTCPInterceptor(options ...rtcpInterceptorOptionFunc) *rtcpInterceptor {
|
||||
v := &rtcpInterceptor{}
|
||||
for _, opt := range options {
|
||||
opt(v)
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (v *rtcpInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
|
||||
v.nextRTCPReader = reader
|
||||
return v // Handle all RTCP
|
||||
}
|
||||
|
||||
func (v *rtcpInterceptor) Read(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
|
||||
if v.rtcpReader != nil {
|
||||
return v.rtcpReader(b, a)
|
||||
}
|
||||
return v.nextRTCPReader.Read(b, a)
|
||||
}
|
||||
|
||||
func (v *rtcpInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
|
||||
v.nextRTCPWriter = writer
|
||||
return v // Handle all RTCP
|
||||
}
|
||||
|
||||
func (v *rtcpInterceptor) Write(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
|
||||
if v.rtcpWriter != nil {
|
||||
return v.rtcpWriter(pkts, attributes)
|
||||
}
|
||||
return v.nextRTCPWriter.Write(pkts, attributes)
|
||||
}
|
||||
|
||||
// Do nothing.
|
||||
type bypassInterceptor struct {
|
||||
interceptor.Interceptor
|
||||
}
|
||||
|
||||
func (v *bypassInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
|
||||
return reader
|
||||
}
|
||||
|
||||
func (v *bypassInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
|
||||
return writer
|
||||
}
|
||||
|
||||
func (v *bypassInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
|
||||
return writer
|
||||
}
|
||||
|
||||
func (v *bypassInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
|
||||
}
|
||||
|
||||
func (v *bypassInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
|
||||
return reader
|
||||
}
|
||||
|
||||
func (v *bypassInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
|
||||
}
|
||||
|
||||
func (v *bypassInterceptor) Close() error {
|
||||
return nil
|
||||
}
|
198
trunk/3rdparty/srs-bench/janus/janus.go
vendored
Normal file
198
trunk/3rdparty/srs-bench/janus/janus.go
vendored
Normal file
|
@ -0,0 +1,198 @@
|
|||
// The MIT License (MIT)
|
||||
//
|
||||
// Copyright (c) 2021 Winlin
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
// this software and associated documentation files (the "Software"), to deal in
|
||||
// the Software without restriction, including without limitation the rights to
|
||||
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
// subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
package janus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var sr string
|
||||
var pli int
|
||||
|
||||
var pr, sourceAudio, sourceVideo string
|
||||
var fps int
|
||||
|
||||
var audioLevel, videoTWCC bool
|
||||
|
||||
var clients, streams, delay int
|
||||
|
||||
func Parse(ctx context.Context) {
|
||||
fl := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||
|
||||
var sfu string
|
||||
fl.StringVar(&sfu, "sfu", "srs", "The SFU server, srs or janus")
|
||||
|
||||
fl.StringVar(&sr, "sr", "", "")
|
||||
fl.IntVar(&pli, "pli", 10, "")
|
||||
|
||||
fl.StringVar(&pr, "pr", "", "")
|
||||
fl.StringVar(&sourceAudio, "sa", "", "")
|
||||
fl.StringVar(&sourceVideo, "sv", "", "")
|
||||
fl.IntVar(&fps, "fps", 0, "")
|
||||
|
||||
fl.BoolVar(&audioLevel, "al", true, "")
|
||||
fl.BoolVar(&videoTWCC, "twcc", true, "")
|
||||
|
||||
fl.IntVar(&clients, "nn", 1, "")
|
||||
fl.IntVar(&streams, "sn", 1, "")
|
||||
fl.IntVar(&delay, "delay", 50, "")
|
||||
|
||||
fl.Usage = func() {
|
||||
fmt.Println(fmt.Sprintf("Usage: %v [Options]", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf("Options:"))
|
||||
fmt.Println(fmt.Sprintf(" -sfu The target SFU, srs or janus. Default: srs"))
|
||||
fmt.Println(fmt.Sprintf(" -nn The number of clients to simulate. Default: 1"))
|
||||
fmt.Println(fmt.Sprintf(" -sn The number of streams to simulate. Variable: %%d. Default: 1"))
|
||||
fmt.Println(fmt.Sprintf(" -delay The start delay in ms for each client or stream to simulate. Default: 50"))
|
||||
fmt.Println(fmt.Sprintf(" -al [Optional] Whether enable audio-level. Default: true"))
|
||||
fmt.Println(fmt.Sprintf(" -twcc [Optional] Whether enable vdieo-twcc. Default: true"))
|
||||
fmt.Println(fmt.Sprintf("Player or Subscriber:"))
|
||||
fmt.Println(fmt.Sprintf(" -sr The url to play/subscribe. If sn exceed 1, auto append variable %%d."))
|
||||
fmt.Println(fmt.Sprintf(" -pli [Optional] PLI request interval in seconds. Default: 10"))
|
||||
fmt.Println(fmt.Sprintf("Publisher:"))
|
||||
fmt.Println(fmt.Sprintf(" -pr The url to publish. If sn exceed 1, auto append variable %%d."))
|
||||
fmt.Println(fmt.Sprintf(" -fps The fps of .h264 source file."))
|
||||
fmt.Println(fmt.Sprintf(" -sa [Optional] The file path to read audio, ignore if empty."))
|
||||
fmt.Println(fmt.Sprintf(" -sv [Optional] The file path to read video, ignore if empty."))
|
||||
fmt.Println(fmt.Sprintf("\n例如,1个播放,1个推流:"))
|
||||
fmt.Println(fmt.Sprintf(" %v -sfu janus -sr webrtc://localhost:8080/2345/livestream", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf(" %v -sfu janus -pr webrtc://localhost:8080/2345/livestream -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf("\n例如,1个流,3个播放,共3个客户端:"))
|
||||
fmt.Println(fmt.Sprintf(" %v -sfu janus -sr webrtc://localhost:8080/2345/livestream -nn 3", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf(" %v -sfu janus -pr webrtc://localhost:8080/2345/livestream -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf("\n例如,2个流,每个流3个播放,共6个客户端:"))
|
||||
fmt.Println(fmt.Sprintf(" %v -sfu janus -sr webrtc://localhost:8080/2345/livestream_%%d -sn 2 -nn 3", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf(" %v -sfu janus -pr webrtc://localhost:8080/2345/livestream_%%d -sn 2 -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0]))
|
||||
fmt.Println(fmt.Sprintf("\n例如,2个推流:"))
|
||||
fmt.Println(fmt.Sprintf(" %v -sfu janus -pr webrtc://localhost:8080/2345/livestream_%%d -sn 2 -sa avatar.ogg -sv avatar.h264 -fps 25", os.Args[0]))
|
||||
}
|
||||
if err := fl.Parse(os.Args[1:]); err == flag.ErrHelp {
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
showHelp := (clients <= 0 || streams <= 0)
|
||||
if sr == "" && pr == "" {
|
||||
showHelp = true
|
||||
}
|
||||
if pr != "" && (sourceAudio == "" && sourceVideo == "") {
|
||||
showHelp = true
|
||||
}
|
||||
if showHelp {
|
||||
fl.Usage()
|
||||
os.Exit(-1)
|
||||
}
|
||||
|
||||
summaryDesc := fmt.Sprintf("delay=%v, al=%v, twcc=%v", delay, audioLevel, videoTWCC)
|
||||
if sr != "" {
|
||||
summaryDesc = fmt.Sprintf("%v, play(url=%v, pli=%v)", summaryDesc, sr, pli)
|
||||
}
|
||||
if pr != "" {
|
||||
summaryDesc = fmt.Sprintf("%v, publish(url=%v, sa=%v, sv=%v, fps=%v)",
|
||||
summaryDesc, pr, sourceAudio, sourceVideo, fps)
|
||||
}
|
||||
logger.Tf(ctx, "Run benchmark with %v", summaryDesc)
|
||||
|
||||
checkFlags := func() error {
|
||||
if sourceVideo != "" && !strings.HasSuffix(sourceVideo, ".h264") {
|
||||
return errors.Errorf("Should be .264, actual %v", sourceVideo)
|
||||
}
|
||||
|
||||
if sourceVideo != "" && strings.HasSuffix(sourceVideo, ".h264") && fps <= 0 {
|
||||
return errors.Errorf("Video fps should >0, actual %v", fps)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err := checkFlags(); err != nil {
|
||||
logger.Ef(ctx, "Check faile err %+v", err)
|
||||
os.Exit(-1)
|
||||
}
|
||||
}
|
||||
|
||||
func Run(ctx context.Context) error {
|
||||
// Run tasks.
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Run all subscribers or players.
|
||||
for i := 0; sr != "" && i < streams && ctx.Err() == nil; i++ {
|
||||
r_auto := sr
|
||||
if streams > 1 && !strings.Contains(r_auto, "%") {
|
||||
r_auto += "%d"
|
||||
}
|
||||
|
||||
r2 := r_auto
|
||||
if strings.Contains(r2, "%") {
|
||||
r2 = fmt.Sprintf(r2, i)
|
||||
}
|
||||
|
||||
for j := 0; sr != "" && j < clients && ctx.Err() == nil; j++ {
|
||||
wg.Add(1)
|
||||
go func(sr string) {
|
||||
defer wg.Done()
|
||||
|
||||
if err := startPlay(ctx, sr, audioLevel, videoTWCC, pli); err != nil {
|
||||
if errors.Cause(err) != context.Canceled {
|
||||
logger.Wf(ctx, "Run err %+v", err)
|
||||
}
|
||||
}
|
||||
}(r2)
|
||||
|
||||
time.Sleep(time.Duration(delay) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// Run all publishers.
|
||||
for i := 0; pr != "" && i < streams && ctx.Err() == nil; i++ {
|
||||
r_auto := pr
|
||||
if streams > 1 && !strings.Contains(r_auto, "%") {
|
||||
r_auto += "%d"
|
||||
}
|
||||
|
||||
r2 := r_auto
|
||||
if strings.Contains(r2, "%") {
|
||||
r2 = fmt.Sprintf(r2, i)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(pr string) {
|
||||
defer wg.Done()
|
||||
|
||||
if err := startPublish(ctx, pr, sourceAudio, sourceVideo, fps, audioLevel, videoTWCC); err != nil {
|
||||
if errors.Cause(err) != context.Canceled {
|
||||
logger.Wf(ctx, "Run err %+v", err)
|
||||
}
|
||||
}
|
||||
}(r2)
|
||||
|
||||
time.Sleep(time.Duration(delay) * time.Millisecond)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
246
trunk/3rdparty/srs-bench/janus/player.go
vendored
Normal file
246
trunk/3rdparty/srs-bench/janus/player.go
vendored
Normal file
|
@ -0,0 +1,246 @@
|
|||
// The MIT License (MIT)
|
||||
//
|
||||
// Copyright (c) 2021 Winlin
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
// this software and associated documentation files (the "Software"), to deal in
|
||||
// the Software without restriction, including without limitation the rights to
|
||||
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
// subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
package janus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/pion/interceptor"
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/sdp/v3"
|
||||
"github.com/pion/webrtc/v3"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func startPlay(ctx context.Context, r string, enableAudioLevel, enableTWCC bool, pli int) error {
|
||||
ctx = logger.WithContext(ctx)
|
||||
|
||||
u, err := url.Parse(r)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Parse url %v", r)
|
||||
}
|
||||
|
||||
var room int
|
||||
var display string
|
||||
if us := strings.SplitN(u.Path, "/", 3); len(us) >= 3 {
|
||||
if iv, err := strconv.Atoi(us[1]); err != nil {
|
||||
return errors.Wrapf(err, "parse %v", us[1])
|
||||
} else {
|
||||
room = iv
|
||||
}
|
||||
|
||||
display = strings.Join(us[2:], "-")
|
||||
}
|
||||
|
||||
logger.Tf(ctx, "Run play url=%v, room=%v, diplay=%v, audio-level=%v, twcc=%v",
|
||||
r, room, display, enableAudioLevel, enableTWCC)
|
||||
|
||||
// For audio-level.
|
||||
webrtcNewPeerConnection := func(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
|
||||
m := &webrtc.MediaEngine{}
|
||||
if err := m.RegisterDefaultCodecs(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.TransportCCURI} {
|
||||
if extension == sdp.TransportCCURI && !enableTWCC {
|
||||
continue
|
||||
}
|
||||
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeVideo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// https://github.com/pion/ion/issues/130
|
||||
// https://github.com/pion/ion-sfu/pull/373/files#diff-6f42c5ac6f8192dd03e5a17e9d109e90cb76b1a4a7973be6ce44a89ffd1b5d18R73
|
||||
for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.AudioLevelURI} {
|
||||
if extension == sdp.AudioLevelURI && !enableAudioLevel {
|
||||
continue
|
||||
}
|
||||
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeAudio); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
i := &interceptor.Registry{}
|
||||
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
|
||||
return api.NewPeerConnection(configuration)
|
||||
}
|
||||
|
||||
pc, err := webrtcNewPeerConnection(webrtc.Configuration{
|
||||
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Create PC")
|
||||
}
|
||||
|
||||
var receivers []*webrtc.RTPReceiver
|
||||
defer func() {
|
||||
pc.Close()
|
||||
for _, receiver := range receivers {
|
||||
receiver.Stop()
|
||||
}
|
||||
}()
|
||||
|
||||
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
|
||||
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
||||
})
|
||||
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
|
||||
Direction: webrtc.RTPTransceiverDirectionRecvonly,
|
||||
})
|
||||
|
||||
// Signaling API
|
||||
api := newJanusAPI(fmt.Sprintf("http://%v/janus", u.Host))
|
||||
|
||||
if err := api.Create(ctx); err != nil {
|
||||
return errors.Wrapf(err, "create")
|
||||
}
|
||||
defer api.Close()
|
||||
|
||||
// Discover the publisherInfo to subscribe.
|
||||
publisherInfo, err := api.DiscoverPublisher(ctx, room, display, 5*time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Tf(ctx, "Publisher found, room=%v, display=%v, %v", room, display, publisherInfo)
|
||||
|
||||
subscribeHandle, err := api.AttachPlugin(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "attach plugin")
|
||||
}
|
||||
|
||||
offer, err := api.JoinAsSubscribe(ctx, subscribeHandle, room, publisherInfo)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "subscribe")
|
||||
}
|
||||
|
||||
// Exchange offer and generate answer.
|
||||
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
|
||||
Type: webrtc.SDPTypeOffer, SDP: offer,
|
||||
}); err != nil {
|
||||
return errors.Wrapf(err, "Set offer %v", offer)
|
||||
}
|
||||
|
||||
answer, err := pc.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Create answer")
|
||||
}
|
||||
if err := pc.SetLocalDescription(answer); err != nil {
|
||||
return errors.Wrapf(err, "Set answer %v", answer)
|
||||
}
|
||||
|
||||
// Send answer to Janus.
|
||||
if err := api.Subscribe(ctx, subscribeHandle, room, answer.SDP); err != nil {
|
||||
return errors.Wrapf(err, "Subscribe with answer %v", answer)
|
||||
}
|
||||
|
||||
handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error {
|
||||
// Send a PLI on an interval so that the publisher is pushing a keyframe
|
||||
go func() {
|
||||
if track.Kind() == webrtc.RTPCodecTypeAudio {
|
||||
return
|
||||
}
|
||||
|
||||
if pli <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Duration(pli) * time.Second):
|
||||
_ = pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
|
||||
MediaSSRC: uint32(track.SSRC()),
|
||||
}})
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
receivers = append(receivers, receiver)
|
||||
|
||||
codec := track.Codec()
|
||||
|
||||
trackDesc := fmt.Sprintf("channels=%v", codec.Channels)
|
||||
if track.Kind() == webrtc.RTPCodecTypeVideo {
|
||||
trackDesc = fmt.Sprintf("fmtp=%v", codec.SDPFmtpLine)
|
||||
}
|
||||
if headers := receiver.GetParameters().HeaderExtensions; len(headers) > 0 {
|
||||
trackDesc = fmt.Sprintf("%v, header=%v", trackDesc, headers)
|
||||
}
|
||||
logger.Tf(ctx, "Got track %v, pt=%v, tbn=%v, %v",
|
||||
codec.MimeType, codec.PayloadType, codec.ClockRate, trackDesc)
|
||||
|
||||
return writeTrackToDisk(ctx, track)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
||||
err = handleTrack(ctx, track, receiver)
|
||||
if err != nil {
|
||||
codec := track.Codec()
|
||||
err = errors.Wrapf(err, "Handle track %v, pt=%v", codec.MimeType, codec.PayloadType)
|
||||
cancel()
|
||||
}
|
||||
})
|
||||
|
||||
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
||||
logger.If(ctx, "ICE state %v", state)
|
||||
|
||||
if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Wf(ctx, "Close for ICE state %v", state)
|
||||
cancel()
|
||||
}
|
||||
})
|
||||
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeTrackToDisk(ctx context.Context, track *webrtc.TrackRemote) error {
|
||||
for ctx.Err() == nil {
|
||||
pkt, _, err := track.ReadRTP()
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
return errors.Wrapf(err, "Read RTP")
|
||||
}
|
||||
|
||||
logger.If(ctx, "Got packet ssrc=%v, pt=%v, seq=%v %vB",
|
||||
pkt.SSRC, pkt.PayloadType, pkt.SequenceNumber, len(pkt.Payload))
|
||||
}
|
||||
|
||||
return ctx.Err()
|
||||
}
|
352
trunk/3rdparty/srs-bench/janus/publisher.go
vendored
Normal file
352
trunk/3rdparty/srs-bench/janus/publisher.go
vendored
Normal file
|
@ -0,0 +1,352 @@
|
|||
// The MIT License (MIT)
|
||||
//
|
||||
// Copyright (c) 2021 Winlin
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
// this software and associated documentation files (the "Software"), to deal in
|
||||
// the Software without restriction, including without limitation the rights to
|
||||
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
// subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
package janus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/ossrs/go-oryx-lib/errors"
|
||||
"github.com/ossrs/go-oryx-lib/logger"
|
||||
"github.com/pion/interceptor"
|
||||
"github.com/pion/sdp/v3"
|
||||
"github.com/pion/webrtc/v3"
|
||||
"io"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func startPublish(ctx context.Context, r, sourceAudio, sourceVideo string, fps int, enableAudioLevel, enableTWCC bool) error {
|
||||
ctx = logger.WithContext(ctx)
|
||||
|
||||
u, err := url.Parse(r)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Parse url %v", r)
|
||||
}
|
||||
|
||||
var room int
|
||||
var display string
|
||||
if us := strings.SplitN(u.Path, "/", 3); len(us) >= 3 {
|
||||
if iv, err := strconv.Atoi(us[1]); err != nil {
|
||||
return errors.Wrapf(err, "parse %v", us[1])
|
||||
} else {
|
||||
room = iv
|
||||
}
|
||||
|
||||
display = strings.Join(us[2:], "-")
|
||||
}
|
||||
|
||||
logger.Tf(ctx, "Run publish url=%v, audio=%v, video=%v, fps=%v, audio-level=%v, twcc=%v",
|
||||
r, sourceAudio, sourceVideo, fps, enableAudioLevel, enableTWCC)
|
||||
|
||||
// Filter for SPS/PPS marker.
|
||||
var aIngester *audioIngester
|
||||
var vIngester *videoIngester
|
||||
|
||||
// For audio-level and sps/pps marker.
|
||||
// TODO: FIXME: Should share with player.
|
||||
webrtcNewPeerConnection := func(configuration webrtc.Configuration) (*webrtc.PeerConnection, error) {
|
||||
m := &webrtc.MediaEngine{}
|
||||
if err := m.RegisterDefaultCodecs(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.TransportCCURI} {
|
||||
if extension == sdp.TransportCCURI && !enableTWCC {
|
||||
continue
|
||||
}
|
||||
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeVideo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// https://github.com/pion/ion/issues/130
|
||||
// https://github.com/pion/ion-sfu/pull/373/files#diff-6f42c5ac6f8192dd03e5a17e9d109e90cb76b1a4a7973be6ce44a89ffd1b5d18R73
|
||||
for _, extension := range []string{sdp.SDESMidURI, sdp.SDESRTPStreamIDURI, sdp.AudioLevelURI} {
|
||||
if extension == sdp.AudioLevelURI && !enableAudioLevel {
|
||||
continue
|
||||
}
|
||||
if err := m.RegisterHeaderExtension(webrtc.RTPHeaderExtensionCapability{URI: extension}, webrtc.RTPCodecTypeAudio); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
registry := &interceptor.Registry{}
|
||||
if err := webrtc.RegisterDefaultInterceptors(m, registry); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if sourceAudio != "" {
|
||||
aIngester = newAudioIngester(sourceAudio)
|
||||
registry.Add(aIngester.audioLevelInterceptor)
|
||||
}
|
||||
if sourceVideo != "" {
|
||||
vIngester = newVideoIngester(sourceVideo)
|
||||
registry.Add(vIngester.markerInterceptor)
|
||||
}
|
||||
|
||||
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(registry))
|
||||
return api.NewPeerConnection(configuration)
|
||||
}
|
||||
|
||||
pc, err := webrtcNewPeerConnection(webrtc.Configuration{})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Create PC")
|
||||
}
|
||||
|
||||
doClose := func() {
|
||||
if pc != nil {
|
||||
pc.Close()
|
||||
}
|
||||
if vIngester != nil {
|
||||
vIngester.Close()
|
||||
}
|
||||
if aIngester != nil {
|
||||
aIngester.Close()
|
||||
}
|
||||
}
|
||||
defer doClose()
|
||||
|
||||
if vIngester != nil {
|
||||
if err := vIngester.AddTrack(pc, fps); err != nil {
|
||||
return errors.Wrapf(err, "Add track")
|
||||
}
|
||||
}
|
||||
|
||||
if aIngester != nil {
|
||||
if err := aIngester.AddTrack(pc); err != nil {
|
||||
return errors.Wrapf(err, "Add track")
|
||||
}
|
||||
}
|
||||
|
||||
offer, err := pc.CreateOffer(nil)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Create Offer")
|
||||
}
|
||||
|
||||
if err := pc.SetLocalDescription(offer); err != nil {
|
||||
return errors.Wrapf(err, "Set offer %v", offer)
|
||||
}
|
||||
|
||||
// Signaling API
|
||||
api := newJanusAPI(fmt.Sprintf("http://%v/janus", u.Host))
|
||||
|
||||
webrtcUpCtx, webrtcUpCancel := context.WithCancel(ctx)
|
||||
api.onWebrtcUp = func(sender, sessionID uint64) {
|
||||
logger.Tf(ctx, "Event webrtcup: DTLS/SRTP done, from=(sender:%v,session:%v)", sender, sessionID)
|
||||
webrtcUpCancel()
|
||||
}
|
||||
api.onMedia = func(sender, sessionID uint64, mtype string, receiving bool) {
|
||||
logger.Tf(ctx, "Event media: %v receiving=%v, from=(sender:%v,session:%v)", mtype, receiving, sender, sessionID)
|
||||
}
|
||||
api.onSlowLink = func(sender, sessionID uint64, media string, lost uint64, uplink bool) {
|
||||
logger.Tf(ctx, "Event slowlink: %v lost=%v, uplink=%v, from=(sender:%v,session:%v)", media, lost, uplink, sender, sessionID)
|
||||
}
|
||||
api.onPublisher = func(sender, sessionID uint64, publishers []publisherInfo) {
|
||||
logger.Tf(ctx, "Event publisher: %v, from=(sender:%v,session:%v)", publishers, sender, sessionID)
|
||||
}
|
||||
api.onUnPublished = func(sender, sessionID, id uint64) {
|
||||
logger.Tf(ctx, "Event unpublish: %v, from=(sender:%v,session:%v)", id, sender, sessionID)
|
||||
}
|
||||
api.onLeave = func(sender, sessionID, id uint64) {
|
||||
logger.Tf(ctx, "Event leave: %v, from=(sender:%v,session:%v)", id, sender, sessionID)
|
||||
}
|
||||
|
||||
if err := api.Create(ctx); err != nil {
|
||||
return errors.Wrapf(err, "create")
|
||||
}
|
||||
defer api.Close()
|
||||
|
||||
publishHandleID, err := api.AttachPlugin(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "attach plugin")
|
||||
}
|
||||
defer api.DetachPlugin(ctx, publishHandleID)
|
||||
|
||||
if err := api.JoinAsPublisher(ctx, publishHandleID, room, display); err != nil {
|
||||
return errors.Wrapf(err, "join as publisher")
|
||||
}
|
||||
|
||||
answer, err := api.Publish(ctx, publishHandleID, offer.SDP)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "join as publisher")
|
||||
}
|
||||
defer api.UnPublish(ctx, publishHandleID)
|
||||
|
||||
// Setup the offer-answer
|
||||
if err := pc.SetRemoteDescription(webrtc.SessionDescription{
|
||||
Type: webrtc.SDPTypeAnswer, SDP: answer,
|
||||
}); err != nil {
|
||||
return errors.Wrapf(err, "Set answer %v", answer)
|
||||
}
|
||||
|
||||
logger.Tf(ctx, "State signaling=%v, ice=%v, conn=%v", pc.SignalingState(), pc.ICEConnectionState(), pc.ConnectionState())
|
||||
|
||||
// ICE state management.
|
||||
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
||||
logger.Tf(ctx, "ICE state %v", state)
|
||||
})
|
||||
|
||||
pc.OnSignalingStateChange(func(state webrtc.SignalingState) {
|
||||
logger.Tf(ctx, "Signaling state %v", state)
|
||||
})
|
||||
|
||||
if aIngester != nil {
|
||||
aIngester.sAudioSender.Transport().OnStateChange(func(state webrtc.DTLSTransportState) {
|
||||
logger.Tf(ctx, "DTLS state %v", state)
|
||||
})
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
pcDoneCtx, pcDoneCancel := context.WithCancel(context.Background())
|
||||
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||||
logger.Tf(ctx, "PC state %v", state)
|
||||
|
||||
if state == webrtc.PeerConnectionStateConnected {
|
||||
pcDoneCancel()
|
||||
}
|
||||
|
||||
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Wf(ctx, "Close for PC state %v", state)
|
||||
cancel()
|
||||
}
|
||||
})
|
||||
|
||||
// OK, DTLS/SRTP ok.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-webrtcUpCtx.Done():
|
||||
}
|
||||
|
||||
// Wait for event from context or tracks.
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-ctx.Done()
|
||||
doClose() // Interrupt the RTCP read.
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
if aIngester == nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-pcDoneCtx.Done():
|
||||
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start read audio packets")
|
||||
}
|
||||
|
||||
buf := make([]byte, 1500)
|
||||
for ctx.Err() == nil {
|
||||
if _, _, err := aIngester.sAudioSender.Read(buf); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
if aIngester == nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-pcDoneCtx.Done():
|
||||
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest audio %v", sourceAudio)
|
||||
}
|
||||
|
||||
// Read audio and send out.
|
||||
for ctx.Err() == nil {
|
||||
if err := aIngester.Ingest(ctx); err != nil {
|
||||
if errors.Cause(err) == io.EOF {
|
||||
logger.Tf(ctx, "EOF, restart ingest audio %v", sourceAudio)
|
||||
continue
|
||||
}
|
||||
logger.Wf(ctx, "Ignore audio err %+v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
if vIngester == nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-pcDoneCtx.Done():
|
||||
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start read video packets")
|
||||
}
|
||||
|
||||
buf := make([]byte, 1500)
|
||||
for ctx.Err() == nil {
|
||||
if _, _, err := vIngester.sVideoSender.Read(buf); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
if vIngester == nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-pcDoneCtx.Done():
|
||||
logger.Tf(ctx, "PC(ICE+DTLS+SRTP) done, start ingest video %v", sourceVideo)
|
||||
}
|
||||
|
||||
for ctx.Err() == nil {
|
||||
if err := vIngester.Ingest(ctx); err != nil {
|
||||
if errors.Cause(err) == io.EOF {
|
||||
logger.Tf(ctx, "EOF, restart ingest video %v", sourceVideo)
|
||||
continue
|
||||
}
|
||||
logger.Wf(ctx, "Ignore video err %+v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
1176
trunk/3rdparty/srs-bench/janus/util.go
vendored
Normal file
1176
trunk/3rdparty/srs-bench/janus/util.go
vendored
Normal file
File diff suppressed because it is too large
Load diff
48
trunk/3rdparty/srs-bench/janus/util2.go
vendored
Normal file
48
trunk/3rdparty/srs-bench/janus/util2.go
vendored
Normal file
|
@ -0,0 +1,48 @@
|
|||
// The MIT License (MIT)
|
||||
//
|
||||
// Copyright (c) 2021 Winlin
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
// this software and associated documentation files (the "Software"), to deal in
|
||||
// the Software without restriction, including without limitation the rights to
|
||||
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
// subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
package janus
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"math/rand"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func newTransactionID() string {
|
||||
sb := strings.Builder{}
|
||||
for i := 0; i < 12; i++ {
|
||||
sb.WriteByte(byte('a') + byte(rand.Int()%26))
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func escapeJSON(s string) string {
|
||||
var o map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(s), &o); err != nil {
|
||||
return s
|
||||
}
|
||||
|
||||
if b, err := json.Marshal(o); err != nil {
|
||||
return s
|
||||
} else {
|
||||
return string(b)
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue