mirror of
				https://github.com/ossrs/srs.git
				synced 2025-03-09 15:49:59 +00:00 
			
		
		
		
	1. Add live benchmark support in srs-bench, which only connects and disconnects without any media transport, to test source creation and disposal and verify source memory leaks. 2. SmartPtr: Support cleanup of HTTP-FLV stream. Unregister the HTTP-FLV handler for the pattern and clean up the objects and resources. 3. Support benchmarking RTMP/SRT with srs-bench by integrating the gosrt and oryx RTMP libraries. 4. Refine SRT and RTC sources by using a timer to clean up the sources, following the same strategy as the Live source. --------- Co-authored-by: Haibo Chen <495810242@qq.com> Co-authored-by: Jacob Su <suzp1984@gmail.com>
		
			
				
	
	
		
			491 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			491 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// The MIT License (MIT)
 | 
						|
//
 | 
						|
// # Copyright (c) 2022-2024 Winlin
 | 
						|
//
 | 
						|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
 | 
						|
// this software and associated documentation files (the "Software"), to deal in
 | 
						|
// the Software without restriction, including without limitation the rights to
 | 
						|
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
 | 
						|
// the Software, and to permit persons to whom the Software is furnished to do so,
 | 
						|
// subject to the following conditions:
 | 
						|
//
 | 
						|
// The above copyright notice and this permission notice shall be included in all
 | 
						|
// copies or substantial portions of the Software.
 | 
						|
//
 | 
						|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 | 
						|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
 | 
						|
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
 | 
						|
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
 | 
						|
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 | 
						|
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 | 
						|
package gb28181
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"github.com/ghettovoice/gosip/sip"
 | 
						|
	"github.com/ossrs/go-oryx-lib/errors"
 | 
						|
	"github.com/ossrs/go-oryx-lib/logger"
 | 
						|
	"github.com/pion/webrtc/v3/pkg/media/h264reader"
 | 
						|
	"github.com/yapingcat/gomedia/mpeg2"
 | 
						|
	"io"
 | 
						|
	"os"
 | 
						|
	"path"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
type GBSessionConfig struct {
 | 
						|
	regTimeout    time.Duration
 | 
						|
	inviteTimeout time.Duration
 | 
						|
}
 | 
						|
 | 
						|
type GBSessionOutput struct {
 | 
						|
	ssrc        int64
 | 
						|
	mediaPort   int64
 | 
						|
	clockRate   uint64
 | 
						|
	payloadType uint8
 | 
						|
}
 | 
						|
 | 
						|
type GBSession struct {
 | 
						|
	// GB config.
 | 
						|
	conf *GBSessionConfig
 | 
						|
	// The output of session.
 | 
						|
	out *GBSessionOutput
 | 
						|
	// The SIP session object.
 | 
						|
	sip *SIPSession
 | 
						|
	// Callback when REGISTER done.
 | 
						|
	onRegisterDone func(req, res sip.Message) error
 | 
						|
	// Callback when got INVITE request.
 | 
						|
	onInviteRequest func(req sip.Message) error
 | 
						|
	// Callback when got INVITE 200 OK ACK request.
 | 
						|
	onInviteOkAck func(req, res sip.Message) error
 | 
						|
	// Callback when got MESSAGE response.
 | 
						|
	onMessageHeartbeat func(req, res sip.Message) error
 | 
						|
	// For heartbeat coroutines.
 | 
						|
	heartbeatInterval time.Duration
 | 
						|
	heartbeatCtx      context.Context
 | 
						|
	cancel            context.CancelFunc
 | 
						|
	// WaitGroup for coroutines.
 | 
						|
	wg sync.WaitGroup
 | 
						|
}
 | 
						|
 | 
						|
func NewGBSession(c *GBSessionConfig, sc *SIPConfig) *GBSession {
 | 
						|
	return &GBSession{
 | 
						|
		sip:  NewSIPSession(sc),
 | 
						|
		conf: c,
 | 
						|
		out: &GBSessionOutput{
 | 
						|
			clockRate:   uint64(90000),
 | 
						|
			payloadType: uint8(96),
 | 
						|
		},
 | 
						|
		heartbeatInterval: 1 * time.Second,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (v *GBSession) Close() error {
 | 
						|
	if v.cancel != nil {
 | 
						|
		v.cancel()
 | 
						|
	}
 | 
						|
	v.sip.Close()
 | 
						|
	v.wg.Wait()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (v *GBSession) Connect(ctx context.Context) error {
 | 
						|
	client := v.sip
 | 
						|
 | 
						|
	if err := client.Connect(ctx); err != nil {
 | 
						|
		return errors.Wrap(err, "connect")
 | 
						|
	}
 | 
						|
 | 
						|
	return ctx.Err()
 | 
						|
}
 | 
						|
 | 
						|
func (v *GBSession) Register(ctx context.Context) error {
 | 
						|
	client := v.sip
 | 
						|
 | 
						|
	for ctx.Err() == nil {
 | 
						|
		ctx, regCancel := context.WithTimeout(ctx, v.conf.regTimeout)
 | 
						|
		defer regCancel()
 | 
						|
 | 
						|
		regReq, regRes, err := client.Register(ctx)
 | 
						|
		if err != nil {
 | 
						|
			return errors.Wrap(err, "register")
 | 
						|
		}
 | 
						|
		logger.Tf(ctx, "Register id=%v, response=%v", regReq.MessageID(), regRes.MessageID())
 | 
						|
 | 
						|
		if v.onRegisterDone != nil {
 | 
						|
			if err = v.onRegisterDone(regReq, regRes); err != nil {
 | 
						|
				return errors.Wrap(err, "callback")
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		break
 | 
						|
	}
 | 
						|
 | 
						|
	return ctx.Err()
 | 
						|
}
 | 
						|
 | 
						|
func (v *GBSession) Invite(ctx context.Context) error {
 | 
						|
	client := v.sip
 | 
						|
 | 
						|
	for ctx.Err() == nil {
 | 
						|
		ctx, inviteCancel := context.WithTimeout(ctx, v.conf.inviteTimeout)
 | 
						|
		defer inviteCancel()
 | 
						|
 | 
						|
		inviteReq, err := client.Wait(ctx, sip.INVITE)
 | 
						|
		if err != nil {
 | 
						|
			return errors.Wrap(err, "wait")
 | 
						|
		}
 | 
						|
		logger.Tf(ctx, "Got INVITE request, Call-ID=%v", sipGetCallID(inviteReq))
 | 
						|
 | 
						|
		if v.onInviteRequest != nil {
 | 
						|
			if err = v.onInviteRequest(inviteReq); err != nil {
 | 
						|
				return errors.Wrap(err, "callback")
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if err = client.Trying(ctx, inviteReq); err != nil {
 | 
						|
			return errors.Wrapf(err, "trying invite is %v", inviteReq.String())
 | 
						|
		}
 | 
						|
		time.Sleep(100 * time.Millisecond)
 | 
						|
 | 
						|
		inviteRes, err := client.InviteResponse(ctx, inviteReq)
 | 
						|
		if err != nil {
 | 
						|
			return errors.Wrapf(err, "response invite is %v", inviteReq.String())
 | 
						|
		}
 | 
						|
 | 
						|
		offer := inviteReq.Body()
 | 
						|
		ssrcStr := strings.Split(strings.Split(offer, "y=")[1], "\r\n")[0]
 | 
						|
		if v.out.ssrc, err = strconv.ParseInt(ssrcStr, 10, 64); err != nil {
 | 
						|
			return errors.Wrapf(err, "parse ssrc=%v, sdp %v", ssrcStr, offer)
 | 
						|
		}
 | 
						|
		mediaPortStr := strings.Split(strings.Split(offer, "m=video")[1], " ")[1]
 | 
						|
		if v.out.mediaPort, err = strconv.ParseInt(mediaPortStr, 10, 64); err != nil {
 | 
						|
			return errors.Wrapf(err, "parse media port=%v, sdp %v", mediaPortStr, offer)
 | 
						|
		}
 | 
						|
		logger.Tf(ctx, "Invite id=%v, response=%v, y=%v, ssrc=%v, mediaPort=%v",
 | 
						|
			inviteReq.MessageID(), inviteRes.MessageID(), ssrcStr, v.out.ssrc, v.out.mediaPort,
 | 
						|
		)
 | 
						|
 | 
						|
		if v.onInviteOkAck != nil {
 | 
						|
			if err = v.onInviteOkAck(inviteReq, inviteRes); err != nil {
 | 
						|
				return errors.Wrap(err, "callback")
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		break
 | 
						|
	}
 | 
						|
 | 
						|
	// Start goroutine for heartbeat every 1s.
 | 
						|
	v.heartbeatCtx, v.cancel = context.WithCancel(ctx)
 | 
						|
	go func(ctx context.Context) {
 | 
						|
		v.wg.Add(1)
 | 
						|
		defer v.wg.Done()
 | 
						|
 | 
						|
		for ctx.Err() == nil {
 | 
						|
			req, res, err := client.Message(ctx)
 | 
						|
			if err != nil {
 | 
						|
				v.cancel()
 | 
						|
				logger.Ef(ctx, "heartbeat err %+v", err)
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			if v.onMessageHeartbeat != nil {
 | 
						|
				if err = v.onMessageHeartbeat(req, res); err != nil {
 | 
						|
					v.cancel()
 | 
						|
					logger.Ef(ctx, "callback err %+v", err)
 | 
						|
					return
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			select {
 | 
						|
			case <-ctx.Done():
 | 
						|
				return
 | 
						|
			case <-time.After(v.heartbeatInterval):
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}(v.heartbeatCtx)
 | 
						|
 | 
						|
	return ctx.Err()
 | 
						|
}
 | 
						|
 | 
						|
func (v *GBSession) Bye(ctx context.Context) error {
 | 
						|
	client := v.sip
 | 
						|
 | 
						|
	for ctx.Err() == nil {
 | 
						|
		ctx, regCancel := context.WithTimeout(ctx, v.conf.regTimeout)
 | 
						|
		defer regCancel()
 | 
						|
 | 
						|
		regReq, regRes, err := client.Bye(ctx)
 | 
						|
		if err != nil {
 | 
						|
			return errors.Wrap(err, "bye")
 | 
						|
		}
 | 
						|
		logger.Tf(ctx, "Bye id=%v, response=%v", regReq.MessageID(), regRes.MessageID())
 | 
						|
 | 
						|
		break
 | 
						|
	}
 | 
						|
 | 
						|
	return ctx.Err()
 | 
						|
}
 | 
						|
 | 
						|
func (v *GBSession) UnRegister(ctx context.Context) error {
 | 
						|
	client := v.sip
 | 
						|
 | 
						|
	for ctx.Err() == nil {
 | 
						|
		ctx, regCancel := context.WithTimeout(ctx, v.conf.regTimeout)
 | 
						|
		defer regCancel()
 | 
						|
 | 
						|
		regReq, regRes, err := client.UnRegister(ctx)
 | 
						|
		if err != nil {
 | 
						|
			return errors.Wrap(err, "UnRegister")
 | 
						|
		}
 | 
						|
		logger.Tf(ctx, "UnRegister id=%v, response=%v", regReq.MessageID(), regRes.MessageID())
 | 
						|
 | 
						|
		break
 | 
						|
	}
 | 
						|
 | 
						|
	return ctx.Err()
 | 
						|
}
 | 
						|
 | 
						|
type IngesterConfig struct {
 | 
						|
	psConfig    PSConfig
 | 
						|
	ssrc        uint32
 | 
						|
	serverAddr  string
 | 
						|
	clockRate   uint64
 | 
						|
	payloadType uint8
 | 
						|
}
 | 
						|
 | 
						|
type PSIngester struct {
 | 
						|
	conf         *IngesterConfig
 | 
						|
	onSendPacket func(pack *PSPackStream) error
 | 
						|
	cancel       context.CancelFunc
 | 
						|
}
 | 
						|
 | 
						|
func NewPSIngester(c *IngesterConfig) *PSIngester {
 | 
						|
	return &PSIngester{conf: c}
 | 
						|
}
 | 
						|
 | 
						|
func (v *PSIngester) Close() error {
 | 
						|
	if v.cancel != nil {
 | 
						|
		v.cancel()
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (v *PSIngester) Ingest(ctx context.Context) error {
 | 
						|
	ctx, v.cancel = context.WithCancel(ctx)
 | 
						|
 | 
						|
	ps := NewPSClient(uint32(v.conf.ssrc), v.conf.serverAddr)
 | 
						|
	if err := ps.Connect(ctx); err != nil {
 | 
						|
		return errors.Wrapf(err, "connect media=%v", v.conf.serverAddr)
 | 
						|
	}
 | 
						|
	defer ps.Close()
 | 
						|
 | 
						|
	videoFile, err := os.Open(v.conf.psConfig.video)
 | 
						|
	if err != nil {
 | 
						|
		return errors.Wrapf(err, "Open file %v", v.conf.psConfig.video)
 | 
						|
	}
 | 
						|
	defer videoFile.Close()
 | 
						|
 | 
						|
	f, err := os.Open(v.conf.psConfig.audio)
 | 
						|
	if err != nil {
 | 
						|
		return errors.Wrapf(err, "Open file %v", v.conf.psConfig.audio)
 | 
						|
	}
 | 
						|
	defer f.Close()
 | 
						|
 | 
						|
	fileSuffix := path.Ext(v.conf.psConfig.video)
 | 
						|
	var h264 *h264reader.H264Reader
 | 
						|
	var h265 *H265Reader
 | 
						|
	if fileSuffix == ".h265" {
 | 
						|
		h265, err = NewReader(videoFile)
 | 
						|
	} else {
 | 
						|
		h264, err = h264reader.NewReader(videoFile)
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return errors.Wrapf(err, "Open %v", v.conf.psConfig.video)
 | 
						|
	}
 | 
						|
 | 
						|
	audio, err := NewAACReader(f)
 | 
						|
	if err != nil {
 | 
						|
		return errors.Wrapf(err, "Open ogg %v", v.conf.psConfig.audio)
 | 
						|
	}
 | 
						|
 | 
						|
	// Scale the video samples to 1024 according to AAC, that is 1 video frame means 1024 samples.
 | 
						|
	audioSampleRate := audio.codec.ASC().SampleRate.ToHz()
 | 
						|
	videoSampleRate := 1024 * 1000 / v.conf.psConfig.fps
 | 
						|
	logger.Tf(ctx, "PS: Media stream, tbn=%v, ssrc=%v, pt=%v, Video(%v, fps=%v, rate=%v), Audio(%v, rate=%v, channels=%v)",
 | 
						|
		v.conf.clockRate, v.conf.ssrc, v.conf.payloadType, v.conf.psConfig.video, v.conf.psConfig.fps, videoSampleRate,
 | 
						|
		v.conf.psConfig.audio, audioSampleRate, audio.codec.ASC().Channels)
 | 
						|
 | 
						|
	lastPrint := time.Now()
 | 
						|
	var aacSamples, avcSamples uint64
 | 
						|
	var audioDTS, videoDTS uint64
 | 
						|
	defer func() {
 | 
						|
		logger.Tf(ctx, "Consume Video(samples=%v, dts=%v, ts=%.2f) and Audio(samples=%v, dts=%v, ts=%.2f)",
 | 
						|
			avcSamples, videoDTS, float64(videoDTS)/90.0, aacSamples, audioDTS, float64(audioDTS)/90.0,
 | 
						|
		)
 | 
						|
	}()
 | 
						|
 | 
						|
	clock := newWallClock()
 | 
						|
	var pack *PSPackStream
 | 
						|
	for ctx.Err() == nil {
 | 
						|
		if pack == nil {
 | 
						|
			pack = NewPSPackStream(v.conf.payloadType)
 | 
						|
		}
 | 
						|
 | 
						|
		// One pack should only contains one video frame.
 | 
						|
		if !pack.hasVideo {
 | 
						|
			if fileSuffix == ".h265" {
 | 
						|
				err = v.writeH265(ctx, pack, h265, videoSampleRate, &avcSamples, &videoDTS)
 | 
						|
			} else {
 | 
						|
				err = v.writeH264(ctx, pack, h264, videoSampleRate, &avcSamples, &videoDTS)
 | 
						|
			}
 | 
						|
			if err != nil {
 | 
						|
				return errors.Wrap(err, "WriteVideo")
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// Always read and consume one audio frame each time.
 | 
						|
		if true {
 | 
						|
			audioFrame, err := audio.NextADTSFrame()
 | 
						|
			if err != nil {
 | 
						|
				return errors.Wrap(err, "Read AAC")
 | 
						|
			}
 | 
						|
 | 
						|
			// Each AAC frame contains 1024 samples, DTS = total-samples / sample-rate
 | 
						|
			aacSamples += 1024
 | 
						|
			audioDTS = uint64(v.conf.clockRate*aacSamples) / uint64(audioSampleRate)
 | 
						|
			if time.Now().Sub(lastPrint) > 3*time.Second {
 | 
						|
				lastPrint = time.Now()
 | 
						|
				logger.Tf(ctx, "Consume Video(samples=%v, dts=%v, ts=%.2f) and Audio(samples=%v, dts=%v, ts=%.2f)",
 | 
						|
					avcSamples, videoDTS, float64(videoDTS)/90.0, aacSamples, audioDTS, float64(audioDTS)/90.0,
 | 
						|
				)
 | 
						|
			}
 | 
						|
 | 
						|
			if err = pack.WriteAudio(audioFrame, audioDTS); err != nil {
 | 
						|
				return errors.Wrapf(err, "write audio %v", len(audioFrame))
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// Send pack when got video and enough audio frames.
 | 
						|
		if pack.hasVideo && videoDTS < audioDTS {
 | 
						|
			if err := ps.WritePacksOverRTP(pack.packets); err != nil {
 | 
						|
				return errors.Wrap(err, "write")
 | 
						|
			}
 | 
						|
			if v.onSendPacket != nil {
 | 
						|
				if err := v.onSendPacket(pack); err != nil {
 | 
						|
					return errors.Wrap(err, "callback")
 | 
						|
				}
 | 
						|
			}
 | 
						|
			pack = nil // Reset pack.
 | 
						|
		}
 | 
						|
 | 
						|
		// One audio frame(1024 samples), the duration is 1024/audioSampleRate in seconds.
 | 
						|
		sampleDuration := time.Duration(uint64(time.Second) * 1024 / uint64(audioSampleRate))
 | 
						|
		if d := clock.Tick(sampleDuration); d > 0 {
 | 
						|
			time.Sleep(d)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (v *PSIngester) writeH264(ctx context.Context, pack *PSPackStream, h264 *h264reader.H264Reader,
 | 
						|
	videoSampleRate int, avcSamples, videoDTS *uint64) error {
 | 
						|
	var sps, pps *h264reader.NAL
 | 
						|
	var videoFrames []*h264reader.NAL
 | 
						|
	for ctx.Err() == nil {
 | 
						|
		frame, err := h264.NextNAL()
 | 
						|
		if err == io.EOF {
 | 
						|
			return io.EOF
 | 
						|
		}
 | 
						|
		if err != nil {
 | 
						|
			return errors.Wrapf(err, "Read h264")
 | 
						|
		}
 | 
						|
 | 
						|
		videoFrames = append(videoFrames, frame)
 | 
						|
		logger.If(ctx, "NALU %v PictureOrderCount=%v, ForbiddenZeroBit=%v, RefIdc=%v, %v bytes",
 | 
						|
			frame.UnitType.String(), frame.PictureOrderCount, frame.ForbiddenZeroBit, frame.RefIdc, len(frame.Data))
 | 
						|
 | 
						|
		if frame.UnitType == h264reader.NalUnitTypeSPS {
 | 
						|
			sps = frame
 | 
						|
		} else if frame.UnitType == h264reader.NalUnitTypePPS {
 | 
						|
			pps = frame
 | 
						|
		} else {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// We convert the video sample rate to be based over 1024, that is 1024 samples means one video frame.
 | 
						|
	*avcSamples += 1024
 | 
						|
	*videoDTS = uint64(v.conf.clockRate*(*avcSamples)) / uint64(videoSampleRate)
 | 
						|
 | 
						|
	var err error
 | 
						|
	if sps != nil || pps != nil {
 | 
						|
		err = pack.WriteHeader(mpeg2.PS_STREAM_H264, *videoDTS)
 | 
						|
	} else {
 | 
						|
		err = pack.WritePackHeader(*videoDTS)
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return errors.Wrap(err, "pack header")
 | 
						|
	}
 | 
						|
 | 
						|
	for _, frame := range videoFrames {
 | 
						|
		if err = pack.WriteVideo(frame.Data, *videoDTS); err != nil {
 | 
						|
			return errors.Wrapf(err, "write video %v", len(frame.Data))
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (v *PSIngester) writeH265(ctx context.Context, pack *PSPackStream, h265 *H265Reader,
 | 
						|
	videoSampleRate int, avcSamples, videoDTS *uint64) error {
 | 
						|
	var vps, sps, pps *NAL
 | 
						|
	var videoFrames []*NAL
 | 
						|
	for ctx.Err() == nil {
 | 
						|
		frame, err := h265.NextNAL()
 | 
						|
		if err == io.EOF {
 | 
						|
			return io.EOF
 | 
						|
		}
 | 
						|
		if err != nil {
 | 
						|
			return errors.Wrapf(err, "Read h265")
 | 
						|
		}
 | 
						|
 | 
						|
		videoFrames = append(videoFrames, frame)
 | 
						|
		logger.If(ctx, "NALU %v PictureOrderCount=%v, ForbiddenZeroBit=%v, %v bytes",
 | 
						|
			frame.UnitType, frame.PictureOrderCount, frame.ForbiddenZeroBit, len(frame.Data))
 | 
						|
 | 
						|
		if frame.UnitType == NaluTypeVps {
 | 
						|
			vps = frame
 | 
						|
		} else if frame.UnitType == NaluTypeSps {
 | 
						|
			sps = frame
 | 
						|
		} else if frame.UnitType == NaluTypePps {
 | 
						|
			pps = frame
 | 
						|
		} else {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// We convert the video sample rate to be based over 1024, that is 1024 samples means one video frame.
 | 
						|
	*avcSamples += 1024
 | 
						|
	*videoDTS = uint64(v.conf.clockRate*(*avcSamples)) / uint64(videoSampleRate)
 | 
						|
 | 
						|
	var err error
 | 
						|
	if vps != nil || sps != nil || pps != nil {
 | 
						|
		err = pack.WriteHeader(mpeg2.PS_STREAM_H265, *videoDTS)
 | 
						|
	} else {
 | 
						|
		err = pack.WritePackHeader(*videoDTS)
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return errors.Wrap(err, "pack header")
 | 
						|
	}
 | 
						|
 | 
						|
	for _, frame := range videoFrames {
 | 
						|
		if err = pack.WriteVideo(frame.Data, *videoDTS); err != nil {
 | 
						|
			return errors.Wrapf(err, "write video %v", len(frame.Data))
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |