mirror of
https://github.com/ossrs/srs.git
synced 2025-02-15 04:42:04 +00:00
1. Add live benchmark support in srs-bench, which only connects and disconnects without any media transport, to test source creation and disposal and verify source memory leaks. 2. SmartPtr: Support cleanup of HTTP-FLV stream. Unregister the HTTP-FLV handler for the pattern and clean up the objects and resources. 3. Support benchmarking RTMP/SRT with srs-bench by integrating the gosrt and oryx RTMP libraries. 4. Refine SRT and RTC sources by using a timer to clean up the sources, following the same strategy as the Live source. --------- Co-authored-by: Haibo Chen <495810242@qq.com> Co-authored-by: Jacob Su <suzp1984@gmail.com>
525 lines
13 KiB
Go
525 lines
13 KiB
Go
// The MIT License (MIT)
|
|
//
|
|
// # Copyright (c) 2022-2024 Winlin
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
|
// this software and associated documentation files (the "Software"), to deal in
|
|
// the Software without restriction, including without limitation the rights to
|
|
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
|
// the Software, and to permit persons to whom the Software is furnished to do so,
|
|
// subject to the following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in all
|
|
// copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
|
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
|
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
|
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
|
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
package gb28181
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/ghettovoice/gosip/sip"
|
|
"github.com/ossrs/go-oryx-lib/errors"
|
|
"github.com/ossrs/go-oryx-lib/logger"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestGbPublishRegularly(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
err := func() error {
|
|
t := NewGBTestPublisher()
|
|
defer t.Close()
|
|
|
|
var nnPackets int
|
|
t.ingester.onSendPacket = func(pack *PSPackStream) error {
|
|
if nnPackets += 1; nnPackets > 10 {
|
|
cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
t.ingester.conf.psConfig.video = "avatar.h264"
|
|
if err := t.Run(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
if err := filterTestError(ctx.Err(), err); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbPublishRegularlyH265(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
err := func() error {
|
|
t := NewGBTestPublisher()
|
|
defer t.Close()
|
|
|
|
var nnPackets int
|
|
t.ingester.onSendPacket = func(pack *PSPackStream) error {
|
|
if nnPackets += 1; nnPackets > 10 {
|
|
cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
t.ingester.conf.psConfig.video = "avatar.h265"
|
|
if err := t.Run(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
if err := filterTestError(ctx.Err(), err); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbSessionHandshake(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
err := func() error {
|
|
t := NewGBTestSession()
|
|
defer t.Close()
|
|
|
|
// Use fast heartbeat for utest.
|
|
t.session.heartbeatInterval = 100 * time.Millisecond
|
|
|
|
if err := t.Run(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
var nn int
|
|
t.session.onMessageHeartbeat = func(req, res sip.Message) error {
|
|
if nn++; nn >= 3 {
|
|
t.session.cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
<-t.session.heartbeatCtx.Done()
|
|
return t.session.heartbeatCtx.Err()
|
|
}()
|
|
if err := filterTestError(ctx.Err(), err); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbSessionHandshakeDropRegisterOk(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
var conf *SIPConfig
|
|
r0 := func() error {
|
|
t := NewGBTestSession()
|
|
defer t.Close()
|
|
|
|
conf = t.session.sip.conf
|
|
|
|
ctx, cancel2 := context.WithCancel(ctx)
|
|
t.session.onRegisterDone = func(req, res sip.Message) error {
|
|
cancel2()
|
|
return nil
|
|
}
|
|
|
|
return t.Run(ctx)
|
|
}()
|
|
|
|
// Use the same session for SIP.
|
|
r1 := func() error {
|
|
session := NewGBTestSession()
|
|
session.session.sip.conf = conf
|
|
defer session.Close()
|
|
return session.Run(ctx)
|
|
}()
|
|
if err := filterTestError(ctx.Err(), r0, r1); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbSessionHandshakeDropInviteRequest(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
var conf *SIPConfig
|
|
r0 := func() error {
|
|
t := NewGBTestSession()
|
|
defer t.Close()
|
|
conf = t.session.sip.conf
|
|
|
|
// Drop the invite request, to simulate the device crash or disconnect when got this message.
|
|
ctx2, cancel2 := context.WithCancel(ctx)
|
|
t.session.onInviteRequest = func(req sip.Message) error {
|
|
cancel2()
|
|
return nil
|
|
}
|
|
|
|
return t.Run(ctx2)
|
|
}()
|
|
|
|
// When device restart session when inviting, server should re-invite when got register message.
|
|
r1 := func() error {
|
|
t := NewGBTestSession()
|
|
t.session.sip.conf = conf
|
|
defer t.Close()
|
|
return t.Run(ctx)
|
|
}()
|
|
if err := filterTestError(ctx.Err(), r0, r1); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbSessionHandshakeDropInvite200Ack(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
var conf *SIPConfig
|
|
r0 := func() error {
|
|
t := NewGBTestSession()
|
|
defer t.Close()
|
|
conf = t.session.sip.conf
|
|
|
|
// Drop the invite ok ACK, to simulate the device crash or disconnect when got this message.
|
|
ctx2, cancel2 := context.WithCancel(ctx)
|
|
t.session.onInviteOkAck = func(req, res sip.Message) error {
|
|
cancel2()
|
|
return nil
|
|
}
|
|
|
|
return t.Run(ctx2)
|
|
}()
|
|
|
|
// When device restart session when 200 ack of invite, server should be stable state and waiting for media, then
|
|
//there should be a media timeout and re-invite.
|
|
r1 := func() error {
|
|
t := NewGBTestSession()
|
|
t.session.sip.conf = conf
|
|
defer t.Close()
|
|
return t.Run(ctx)
|
|
}()
|
|
if err := filterTestError(ctx.Err(), r0, r1); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbPublishMediaDisconnect(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
var conf *SIPConfig
|
|
r0 := func() error {
|
|
t := NewGBTestPublisher()
|
|
defer t.Close()
|
|
conf = t.session.sip.conf
|
|
|
|
var nnPackets int
|
|
ctx2, cancel2 := context.WithCancel(ctx)
|
|
t.ingester.onSendPacket = func(pack *PSPackStream) error {
|
|
if nnPackets += 1; nnPackets > 200 {
|
|
cancel2()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := t.Run(ctx2); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
|
|
r1 := func() error {
|
|
t := NewGBTestSession()
|
|
t.session.sip.conf = conf
|
|
defer t.Close()
|
|
return t.Run(ctx)
|
|
}()
|
|
|
|
if err := filterTestError(ctx.Err(), r0, r1); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbSessionBye(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
err := func() error {
|
|
t := NewGBTestSession()
|
|
defer t.Close()
|
|
|
|
// Use fast heartbeat for utest.
|
|
t.session.heartbeatInterval = 100 * time.Millisecond
|
|
|
|
if err := t.Run(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
var nn int
|
|
t.session.onMessageHeartbeat = func(req, res sip.Message) error {
|
|
if nn++; nn == 3 {
|
|
return t.session.Bye(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
reconnectTimeout := time.Duration(*srsMediaTimeout+*srsReinviteTimeout+1000) * time.Millisecond
|
|
ctx2, cancel2 := context.WithTimeout(ctx, reconnectTimeout)
|
|
defer cancel2()
|
|
|
|
req, err := t.session.sip.Wait(ctx2, sip.INVITE)
|
|
if req != nil {
|
|
return fmt.Errorf("should not invite after bye")
|
|
}
|
|
if errors.Cause(err) == context.DeadlineExceeded {
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}()
|
|
if err := filterTestError(ctx.Err(), err); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbSessionUnregister(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
err := func() error {
|
|
t := NewGBTestSession()
|
|
defer t.Close()
|
|
|
|
// Use fast heartbeat for utest.
|
|
t.session.heartbeatInterval = 100 * time.Millisecond
|
|
|
|
if err := t.Run(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
var nn int
|
|
t.session.onMessageHeartbeat = func(req, res sip.Message) error {
|
|
if nn++; nn == 3 {
|
|
return t.session.UnRegister(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
reconnectTimeout := time.Duration(*srsMediaTimeout+*srsReinviteTimeout+1000) * time.Millisecond
|
|
ctx2, cancel2 := context.WithTimeout(ctx, reconnectTimeout)
|
|
defer cancel2()
|
|
|
|
req, err := t.session.sip.Wait(ctx2, sip.INVITE)
|
|
if req != nil {
|
|
return fmt.Errorf("should not invite after bye")
|
|
}
|
|
if errors.Cause(err) == context.DeadlineExceeded {
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}()
|
|
if err := filterTestError(ctx.Err(), err); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbPublishReinvite(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
var conf *SIPConfig
|
|
err := func() error {
|
|
t := NewGBTestPublisher()
|
|
defer t.Close()
|
|
conf = t.session.sip.conf
|
|
|
|
var nnPackets int
|
|
ctx2, cancel2 := context.WithCancel(ctx)
|
|
t.ingester.onSendPacket = func(pack *PSPackStream) error {
|
|
if nnPackets += 1; nnPackets == 3 {
|
|
cancel2()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := t.Run(ctx2); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
|
|
r1 := func() error {
|
|
t := NewGBTestSession()
|
|
defer t.Close()
|
|
t.session.sip.conf = conf
|
|
|
|
// Only register the device, bind to session.
|
|
if err := t.session.Connect(ctx); err != nil {
|
|
return err
|
|
}
|
|
if err := t.session.Register(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// We should get reinvite when reconnect to SRS.
|
|
req, err := t.session.sip.Wait(ctx, sip.INVITE)
|
|
if req == nil {
|
|
return fmt.Errorf("should reinvite after disconnect")
|
|
}
|
|
|
|
return err
|
|
}()
|
|
if err := filterTestError(ctx.Err(), err, r1); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbPublishBye(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
var conf *SIPConfig
|
|
err := func() error {
|
|
t := NewGBTestPublisher()
|
|
defer t.Close()
|
|
conf = t.session.sip.conf
|
|
|
|
var nnPackets int
|
|
ctx2, cancel2 := context.WithCancel(ctx)
|
|
t.ingester.onSendPacket = func(pack *PSPackStream) error {
|
|
if nnPackets += 1; nnPackets == 10 {
|
|
if err := t.session.Bye(ctx2); err != nil {
|
|
return err
|
|
}
|
|
cancel2()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := t.Run(ctx2); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
|
|
r1 := func() error {
|
|
t := NewGBTestSession()
|
|
defer t.Close()
|
|
t.session.sip.conf = conf
|
|
|
|
// Only register the device, bind to session.
|
|
if err := t.session.Connect(ctx); err != nil {
|
|
return err
|
|
}
|
|
if err := t.session.Register(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// We should not get reinvite when reconnect to SRS.
|
|
reconnectTimeout := time.Duration(*srsMediaTimeout+*srsReinviteTimeout+1000) * time.Millisecond
|
|
ctx2, cancel2 := context.WithTimeout(ctx, reconnectTimeout)
|
|
defer cancel2()
|
|
|
|
req, err := t.session.sip.Wait(ctx2, sip.INVITE)
|
|
if req != nil {
|
|
return fmt.Errorf("should not invite after bye")
|
|
}
|
|
if errors.Cause(err) == context.DeadlineExceeded {
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}()
|
|
if err := filterTestError(ctx.Err(), err, r1); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestGbPublishUnregister(t *testing.T) {
|
|
ctx := logger.WithContext(context.Background())
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
|
|
defer cancel()
|
|
|
|
var conf *SIPConfig
|
|
err := func() error {
|
|
t := NewGBTestPublisher()
|
|
defer t.Close()
|
|
conf = t.session.sip.conf
|
|
|
|
var nnPackets int
|
|
ctx2, cancel2 := context.WithCancel(ctx)
|
|
t.ingester.onSendPacket = func(pack *PSPackStream) error {
|
|
if nnPackets += 1; nnPackets == 10 {
|
|
if err := t.session.UnRegister(ctx2); err != nil {
|
|
return err
|
|
}
|
|
cancel2()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := t.Run(ctx2); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
|
|
r1 := func() error {
|
|
t := NewGBTestSession()
|
|
defer t.Close()
|
|
t.session.sip.conf = conf
|
|
|
|
// Only register the device, bind to session.
|
|
if err := t.session.Connect(ctx); err != nil {
|
|
return err
|
|
}
|
|
if err := t.session.Register(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// We should not get reinvite when reconnect to SRS.
|
|
reconnectTimeout := time.Duration(*srsMediaTimeout+*srsReinviteTimeout+1000) * time.Millisecond
|
|
ctx2, cancel2 := context.WithTimeout(ctx, reconnectTimeout)
|
|
defer cancel2()
|
|
|
|
req, err := t.session.sip.Wait(ctx2, sip.INVITE)
|
|
if req != nil {
|
|
return fmt.Errorf("should not invite after bye")
|
|
}
|
|
if errors.Cause(err) == context.DeadlineExceeded {
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}()
|
|
if err := filterTestError(ctx.Err(), err, r1); err != nil {
|
|
t.Errorf("err %+v", err)
|
|
}
|
|
}
|