mirror of
				https://github.com/ossrs/srs.git
				synced 2025-03-09 15:49:59 +00:00 
			
		
		
		
	1. Add live benchmark support in srs-bench, which only connects and disconnects without any media transport, to test source creation and disposal and verify source memory leaks. 2. SmartPtr: Support cleanup of HTTP-FLV stream. Unregister the HTTP-FLV handler for the pattern and clean up the objects and resources. 3. Support benchmarking RTMP/SRT with srs-bench by integrating the gosrt and oryx RTMP libraries. 4. Refine SRT and RTC sources by using a timer to clean up the sources, following the same strategy as the Live source. --------- Co-authored-by: Haibo Chen <495810242@qq.com> Co-authored-by: Jacob Su <suzp1984@gmail.com>
		
			
				
	
	
		
			361 lines
		
	
	
	
		
			9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			361 lines
		
	
	
	
		
			9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// The MIT License (MIT)
 | 
						|
//
 | 
						|
// # Copyright (c) 2022-2024 Winlin
 | 
						|
//
 | 
						|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
 | 
						|
// this software and associated documentation files (the "Software"), to deal in
 | 
						|
// the Software without restriction, including without limitation the rights to
 | 
						|
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
 | 
						|
// the Software, and to permit persons to whom the Software is furnished to do so,
 | 
						|
// subject to the following conditions:
 | 
						|
//
 | 
						|
// The above copyright notice and this permission notice shall be included in all
 | 
						|
// copies or substantial portions of the Software.
 | 
						|
//
 | 
						|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 | 
						|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
 | 
						|
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
 | 
						|
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
 | 
						|
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 | 
						|
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 | 
						|
package gb28181
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"context"
 | 
						|
	"flag"
 | 
						|
	"fmt"
 | 
						|
	"github.com/ghettovoice/gosip/sip"
 | 
						|
	"github.com/ossrs/go-oryx-lib/aac"
 | 
						|
	"github.com/ossrs/go-oryx-lib/errors"
 | 
						|
	"github.com/yapingcat/gomedia/mpeg2"
 | 
						|
	"io"
 | 
						|
	"net"
 | 
						|
	"net/url"
 | 
						|
	"os"
 | 
						|
	"path"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
var srsLog *bool
 | 
						|
 | 
						|
var srsTimeout *int
 | 
						|
var srsPublishVideoFps *int
 | 
						|
 | 
						|
var srsSipAddr *string
 | 
						|
var srsSipUser *string
 | 
						|
var srsSipRandomID *int
 | 
						|
var srsSipDomain *string
 | 
						|
var srsSipSvrID *string
 | 
						|
 | 
						|
var srsMediaTimeout *int
 | 
						|
var srsReinviteTimeout *int
 | 
						|
var srsPublishAudio *string
 | 
						|
var srsPublishVideo *string
 | 
						|
 | 
						|
func prepareTest() (err error) {
 | 
						|
	srsSipAddr = flag.String("srs-sip", "tcp://127.0.0.1:5060", "The SRS GB server to connect to")
 | 
						|
	srsSipUser = flag.String("srs-stream", "3402000000", "The GB user/stream to publish")
 | 
						|
	srsSipRandomID = flag.Int("srs-random", 10, "The GB user/stream random suffix to publish")
 | 
						|
	srsSipDomain = flag.String("srs-domain", "3402000000", "The GB SIP domain")
 | 
						|
	srsSipSvrID = flag.String("srs-server", "34020000002000000001", "The GB server ID for SIP")
 | 
						|
	srsLog = flag.Bool("srs-log", false, "Whether enable the detail log")
 | 
						|
	srsTimeout = flag.Int("srs-timeout", 11000, "For each case, the timeout in ms")
 | 
						|
	srsMediaTimeout = flag.Int("srs-media-timeout", 2100, "PS media disconnect timeout in ms")
 | 
						|
	srsReinviteTimeout = flag.Int("srs-reinvite-timeout", 1200, "When disconnect, SIP re-invite timeout in ms")
 | 
						|
	srsPublishAudio = flag.String("srs-publish-audio", "avatar.aac", "The audio file for publisher.")
 | 
						|
	srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher. Note that *.h264 is for AVC, *.h265 is for HEVC.")
 | 
						|
	srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.")
 | 
						|
 | 
						|
	// Should parse it first.
 | 
						|
	flag.Parse()
 | 
						|
 | 
						|
	// Check file.
 | 
						|
	tryOpenFile := func(filename string) (string, error) {
 | 
						|
		if filename == "" {
 | 
						|
			return filename, nil
 | 
						|
		}
 | 
						|
 | 
						|
		f, err := os.Open(filename)
 | 
						|
		if err != nil {
 | 
						|
			nfilename := path.Join("../", filename)
 | 
						|
			f2, err := os.Open(nfilename)
 | 
						|
			if err != nil {
 | 
						|
				return filename, errors.Wrapf(err, "No video file at %v or %v", filename, nfilename)
 | 
						|
			}
 | 
						|
			defer f2.Close()
 | 
						|
 | 
						|
			return nfilename, nil
 | 
						|
		}
 | 
						|
		defer f.Close()
 | 
						|
 | 
						|
		return filename, nil
 | 
						|
	}
 | 
						|
 | 
						|
	if *srsPublishVideo, err = tryOpenFile(*srsPublishVideo); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
type GBTestSession struct {
 | 
						|
	session *GBSession
 | 
						|
}
 | 
						|
 | 
						|
func NewGBTestSession() *GBTestSession {
 | 
						|
	sipConfig := SIPConfig{
 | 
						|
		addr:   *srsSipAddr,
 | 
						|
		domain: *srsSipDomain,
 | 
						|
		user:   *srsSipUser,
 | 
						|
		random: *srsSipRandomID,
 | 
						|
		server: *srsSipSvrID,
 | 
						|
	}
 | 
						|
	return &GBTestSession{
 | 
						|
		session: NewGBSession(&GBSessionConfig{
 | 
						|
			regTimeout:    time.Duration(*srsTimeout) * 5 * time.Minute,
 | 
						|
			inviteTimeout: time.Duration(*srsTimeout) * 5 * time.Minute,
 | 
						|
		}, &sipConfig),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (v *GBTestSession) Close() error {
 | 
						|
	v.session.Close()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (v *GBTestSession) Run(ctx context.Context) (err error) {
 | 
						|
	if err = v.session.Connect(ctx); err != nil {
 | 
						|
		return errors.Wrap(err, "connect")
 | 
						|
	}
 | 
						|
	if err = v.session.Register(ctx); err != nil {
 | 
						|
		return errors.Wrap(err, "register")
 | 
						|
	}
 | 
						|
	if err = v.session.Invite(ctx); err != nil {
 | 
						|
		return errors.Wrap(err, "invite")
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
type GBTestPublisher struct {
 | 
						|
	session  *GBSession
 | 
						|
	ingester *PSIngester
 | 
						|
}
 | 
						|
 | 
						|
func NewGBTestPublisher() *GBTestPublisher {
 | 
						|
	sipConfig := SIPConfig{
 | 
						|
		addr:   *srsSipAddr,
 | 
						|
		domain: *srsSipDomain,
 | 
						|
		user:   *srsSipUser,
 | 
						|
		random: *srsSipRandomID,
 | 
						|
		server: *srsSipSvrID,
 | 
						|
	}
 | 
						|
	psConfig := PSConfig{
 | 
						|
		video: *srsPublishVideo,
 | 
						|
		fps:   *srsPublishVideoFps,
 | 
						|
		audio: *srsPublishAudio,
 | 
						|
	}
 | 
						|
	return &GBTestPublisher{
 | 
						|
		session: NewGBSession(&GBSessionConfig{
 | 
						|
			regTimeout:    time.Duration(*srsTimeout) * 5 * time.Minute,
 | 
						|
			inviteTimeout: time.Duration(*srsTimeout) * 5 * time.Minute,
 | 
						|
		}, &sipConfig),
 | 
						|
		ingester: NewPSIngester(&IngesterConfig{
 | 
						|
			psConfig: psConfig,
 | 
						|
		}),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (v *GBTestPublisher) Close() error {
 | 
						|
	v.ingester.Close()
 | 
						|
	v.session.Close()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (v *GBTestPublisher) Run(ctx context.Context) (err error) {
 | 
						|
	if err = v.session.Connect(ctx); err != nil {
 | 
						|
		return errors.Wrap(err, "connect")
 | 
						|
	}
 | 
						|
	if err = v.session.Register(ctx); err != nil {
 | 
						|
		return errors.Wrap(err, "register")
 | 
						|
	}
 | 
						|
	if err = v.session.Invite(ctx); err != nil {
 | 
						|
		return errors.Wrap(err, "invite")
 | 
						|
	}
 | 
						|
 | 
						|
	serverAddr, err := utilBuildMediaAddr(v.session.sip.conf.addr, v.session.out.mediaPort)
 | 
						|
	if err != nil {
 | 
						|
		return errors.Wrap(err, "parse")
 | 
						|
	}
 | 
						|
	v.ingester.conf.serverAddr = serverAddr
 | 
						|
 | 
						|
	v.ingester.conf.ssrc = uint32(v.session.out.ssrc)
 | 
						|
	v.ingester.conf.clockRate = v.session.out.clockRate
 | 
						|
	v.ingester.conf.payloadType = uint8(v.session.out.payloadType)
 | 
						|
 | 
						|
	if err := v.ingester.Ingest(ctx); err != nil {
 | 
						|
		return errors.Wrap(err, "ingest")
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Filter the test error, ignore context.Canceled
 | 
						|
func filterTestError(errs ...error) error {
 | 
						|
	var filteredErrors []error
 | 
						|
 | 
						|
	for _, err := range errs {
 | 
						|
		if err == nil || errors.Cause(err) == context.Canceled {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// If url error, server maybe error, do not print the detail log.
 | 
						|
		if r0 := errors.Cause(err); r0 != nil {
 | 
						|
			if r1, ok := r0.(*url.Error); ok {
 | 
						|
				err = r1
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		filteredErrors = append(filteredErrors, err)
 | 
						|
	}
 | 
						|
 | 
						|
	if len(filteredErrors) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if len(filteredErrors) == 1 {
 | 
						|
		return filteredErrors[0]
 | 
						|
	}
 | 
						|
 | 
						|
	var descs []string
 | 
						|
	for i, err := range filteredErrors[1:] {
 | 
						|
		descs = append(descs, fmt.Sprintf("err #%d, %+v", i, err))
 | 
						|
	}
 | 
						|
	return errors.Wrapf(filteredErrors[0], "with %v", strings.Join(descs, ","))
 | 
						|
}
 | 
						|
 | 
						|
type wallClock struct {
 | 
						|
	start    time.Time
 | 
						|
	duration time.Duration
 | 
						|
}
 | 
						|
 | 
						|
func newWallClock() *wallClock {
 | 
						|
	return &wallClock{start: time.Now()}
 | 
						|
}
 | 
						|
 | 
						|
func (v *wallClock) Tick(d time.Duration) time.Duration {
 | 
						|
	v.duration += d
 | 
						|
 | 
						|
	wc := time.Now().Sub(v.start)
 | 
						|
	re := v.duration - wc
 | 
						|
	if re > 30*time.Millisecond {
 | 
						|
		return re
 | 
						|
	}
 | 
						|
	return 0
 | 
						|
}
 | 
						|
 | 
						|
func sipGetCallID(m sip.Message) string {
 | 
						|
	if v, ok := m.CallID(); !ok {
 | 
						|
		return ""
 | 
						|
	} else {
 | 
						|
		return v.Value()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func utilBuildMediaAddr(addr string, mediaPort int64) (string, error) {
 | 
						|
	if u, err := url.Parse(addr); err != nil {
 | 
						|
		return "", errors.Wrapf(err, "parse %v", addr)
 | 
						|
	} else if addr, err := net.ResolveTCPAddr(u.Scheme, u.Host); err != nil {
 | 
						|
		return "", errors.Wrapf(err, "parse %v scheme=%v, host=%v", addr, u.Scheme, u.Host)
 | 
						|
	} else {
 | 
						|
		return fmt.Sprintf("%v://%v:%v",
 | 
						|
			u.Scheme, addr.IP.String(), mediaPort,
 | 
						|
		), nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// See SrsMpegPES::decode
 | 
						|
func utilUpdatePesPacketLength(pes *mpeg2.PesPacket) {
 | 
						|
	var nb_required int
 | 
						|
	if pes.PTS_DTS_flags == 0x2 {
 | 
						|
		nb_required += 5
 | 
						|
	}
 | 
						|
	if pes.PTS_DTS_flags == 0x3 {
 | 
						|
		nb_required += 10
 | 
						|
	}
 | 
						|
	if pes.ESCR_flag > 0 {
 | 
						|
		nb_required += 6
 | 
						|
	}
 | 
						|
	if pes.ES_rate_flag > 0 {
 | 
						|
		nb_required += 3
 | 
						|
	}
 | 
						|
	if pes.DSM_trick_mode_flag > 0 {
 | 
						|
		nb_required += 1
 | 
						|
	}
 | 
						|
	if pes.Additional_copy_info_flag > 0 {
 | 
						|
		nb_required += 1
 | 
						|
	}
 | 
						|
	if pes.PES_CRC_flag > 0 {
 | 
						|
		nb_required += 2
 | 
						|
	}
 | 
						|
	if pes.PES_extension_flag > 0 {
 | 
						|
		nb_required += 1
 | 
						|
	}
 | 
						|
 | 
						|
	// Size before PES_header_data_length.
 | 
						|
	const fixed = uint16(3)
 | 
						|
	// Size after PES_header_data_length.
 | 
						|
	pes.PES_header_data_length = uint8(nb_required)
 | 
						|
	// Size after PES_packet_length
 | 
						|
	pes.PES_packet_length = uint16(len(pes.Pes_payload)) + fixed + uint16(pes.PES_header_data_length)
 | 
						|
}
 | 
						|
 | 
						|
type AACReader struct {
 | 
						|
	codec aac.ADTS
 | 
						|
	r     *bufio.Reader
 | 
						|
}
 | 
						|
 | 
						|
func NewAACReader(f io.Reader) (*AACReader, error) {
 | 
						|
	v := &AACReader{}
 | 
						|
 | 
						|
	var err error
 | 
						|
	if v.codec, err = aac.NewADTS(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	v.r = bufio.NewReaderSize(f, 4096)
 | 
						|
	b, err := v.r.Peek(7 + 1024)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if _, _, err = v.codec.Decode(b); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return v, nil
 | 
						|
}
 | 
						|
 | 
						|
func (v *AACReader) NextADTSFrame() ([]byte, error) {
 | 
						|
	b, err := v.r.Peek(7 + 1024)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	_, left, err := v.codec.Decode(b)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	adts := b[:len(b)-len(left)]
 | 
						|
	if _, err = v.r.Discard(len(adts)); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return adts, nil
 | 
						|
}
 |