1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00

Squash: Merge SRS 4.0, regression test for RTMP.

This commit is contained in:
winlin 2021-10-12 08:36:24 +08:00
parent a81aa2edc5
commit b874d9c9ba
32 changed files with 9977 additions and 131 deletions

View file

@ -45,23 +45,6 @@ import (
"github.com/pion/rtp"
)
func TestMain(m *testing.M) {
if err := prepareTest(); err != nil {
logger.Ef(nil, "Prepare test fail, err %+v", err)
os.Exit(-1)
}
// Disable the logger during all tests.
if *srsLog == false {
olw := logger.Switch(ioutil.Discard)
defer func() {
logger.Switch(olw)
}()
}
os.Exit(m.Run())
}
// Test for https://github.com/ossrs/srs/pull/2483
func TestPR2483_RtcStatApi_PublisherOnly(t *testing.T) {
if err := filterTestError(func() error {
@ -876,6 +859,9 @@ func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) {
//
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
@ -889,7 +875,6 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
}
defer p.Close()
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
var nnRTCP, nnRTP int64
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
@ -938,7 +923,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
return p.Run(ctx, cancel)
}()
if err := filterTestError(err, r0); err != nil {
if err := filterTestError(ctx.Err(), err, r0); err != nil {
t.Errorf("err %+v", err)
}
}
@ -954,6 +939,9 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T
// openssl will create a new ClientHello with increased sequence. It's ok, but waste a lots of duplicated ClientHello
// packets, so we fail the test, requires the epoch+sequence never dup, even for ARQ.
func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
@ -967,7 +955,6 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
}
defer p.Close()
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
var nnRTCP, nnRTP int64
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
@ -1016,7 +1003,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
return p.Run(ctx, cancel)
}()
if err := filterTestError(err, r0); err != nil {
if err := filterTestError(ctx.Err(), err, r0); err != nil {
t.Errorf("err %+v", err)
}
}
@ -1031,6 +1018,9 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.
//
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
@ -1044,7 +1034,6 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
}
defer p.Close()
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
var nnRTCP, nnRTP int64
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
@ -1102,7 +1091,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
return p.Run(ctx, cancel)
}()
if err := filterTestError(err, r0, r1); err != nil {
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
t.Errorf("err %+v", err)
}
}
@ -1119,6 +1108,9 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T
// openssl will create a new ClientHello with increased sequence. It's ok, but waste a lots of duplicated ClientHello
// packets, so we fail the test, requires the epoch+sequence never dup, even for ARQ.
func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int())
@ -1132,7 +1124,6 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
}
defer p.Close()
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
var nnRTCP, nnRTP int64
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
@ -1190,7 +1181,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
return p.Run(ctx, cancel)
}()
if err := filterTestError(err, r0, r1); err != nil {
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
t.Errorf("err %+v", err)
}
}
@ -1204,6 +1195,9 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.
//
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
@ -1217,7 +1211,6 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
}
defer p.Close()
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
var nnRTCP, nnRTP int64
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
@ -1266,7 +1259,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
return p.Run(ctx, cancel)
}()
if err := filterTestError(err, r0); err != nil {
if err := filterTestError(ctx.Err(), err, r0); err != nil {
t.Errorf("err %+v", err)
}
}
@ -1281,6 +1274,9 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T
// @remark If retransmit the Certificate, with the same epoch+sequence, peer will drop the message. It's ok right now, but
// wast some packets, so we check the epoch+sequence which should never dup, even for ARQ.
func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
@ -1294,7 +1290,6 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
}
defer p.Close()
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
var nnRTCP, nnRTP int64
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
@ -1343,7 +1338,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
return p.Run(ctx, cancel)
}()
if err := filterTestError(err, r0); err != nil {
if err := filterTestError(ctx.Err(), err, r0); err != nil {
t.Errorf("err %+v", err)
}
}
@ -1358,6 +1353,9 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing.
//
// @remark The pion is active, so it can be consider a benchmark for DTLS server.
func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int())
@ -1371,7 +1369,6 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
}
defer p.Close()
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
var nnRTCP, nnRTP int64
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
@ -1428,7 +1425,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
return p.Run(ctx, cancel)
}()
if err := filterTestError(err, r0, r1); err != nil {
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
t.Errorf("err %+v", err)
}
}
@ -1444,6 +1441,9 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test
// @remark If retransmit the Certificate, with the same epoch+sequence, peer will drop the message, and never generate the
// ChangeCipherSpec, which will cause DTLS fail.
func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int())
@ -1457,7 +1457,6 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
}
defer p.Close()
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
var nnRTCP, nnRTP int64
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
@ -1514,7 +1513,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes
return p.Run(ctx, cancel)
}()
if err := filterTestError(err, r0, r1); err != nil {
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
t.Errorf("err %+v", err)
}
}
@ -1812,6 +1811,9 @@ func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) {
// If we retransmit 2 ClientHello packets, consumed 150ms, server might wait at 200ms.
// Then we retransmit the Certificate, server reset the timer and retransmit it in 50ms, not 200ms.
func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0 error
err := func() error {
streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int())
@ -1825,7 +1827,6 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
}
defer p.Close()
ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond)
if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
var nnRTCP, nnRTP int64
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
@ -1890,7 +1891,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) {
return p.Run(ctx, cancel)
}()
if err := filterTestError(err, r0); err != nil {
if err := filterTestError(ctx.Err(), err, r0); err != nil {
t.Errorf("err %+v", err)
}
}
@ -1939,7 +1940,7 @@ func TestRTCServerVersion(t *testing.T) {
}
}
func TestRtcPublishFlvPlay(t *testing.T) {
func TestRtcPublish_FlvPlay(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)

View file

@ -0,0 +1,395 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package srs
import (
"bytes"
"context"
"fmt"
"math/rand"
"os"
"sync"
"testing"
"time"
"github.com/ossrs/go-oryx-lib/avc"
"github.com/ossrs/go-oryx-lib/flv"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/ossrs/go-oryx-lib/rtmp"
"github.com/pion/interceptor"
)
func TestRtmpPublishPlay(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1 error
err := func() error {
publisher := NewRTMPPublisher()
defer publisher.Close()
player := NewRTMPPlayer()
defer player.Close()
// Connect to RTMP URL.
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix)
if err := publisher.Publish(ctx, rtmpUrl); err != nil {
return err
}
if err := player.Play(ctx, rtmpUrl); err != nil {
return err
}
// Check packets.
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
var nnPackets int
player.onRecvPacket = func(m *rtmp.Message, a *flv.AudioFrame, v *flv.VideoFrame) error {
logger.Tf(ctx, "got %v packet, %v %vms %vB",
nnPackets, m.MessageType, m.Timestamp, len(m.Payload))
if nnPackets += 1; nnPackets > 50 {
cancel()
}
return nil
}
if r1 = player.Consume(ctx); r1 != nil {
cancel()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
publisher.onSendPacket = func(m *rtmp.Message) error {
time.Sleep(1 * time.Millisecond)
return nil
}
if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil {
cancel()
}
}()
return nil
}()
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
t.Errorf("err %+v", err)
}
}
func TestRtmpPublish_RtcPlay(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1 error
err := func() (err error) {
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
rtmpUrl := fmt.Sprintf("%v://%v%v-%v", srsSchema, *srsServer, *srsStream, streamSuffix)
// Publisher connect to a RTMP stream.
publisher := NewRTMPPublisher()
defer publisher.Close()
if err := publisher.Publish(ctx, rtmpUrl); err != nil {
return err
}
// Setup the RTC player.
var thePlayer *testPlayer
if thePlayer, err = newTestPlayer(registerMiniCodecs, func(play *testPlayer) error {
play.streamSuffix = streamSuffix
var nnPlayReadRTP uint64
return play.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
nn, attr, err := i.nextRTPReader.Read(payload, attributes)
if err != nil {
return nn, attr, err
}
if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) {
cancel() // Completed.
}
logger.Tf(ctx, "Play RECV RTP #%v %vB", nnPlayReadRTP, nn)
return nn, attr, err
}
}))
})
}); err != nil {
return err
}
defer thePlayer.Close()
// Run publisher and players.
var wg sync.WaitGroup
defer wg.Wait()
var playerIceReady context.Context
playerIceReady, thePlayer.iceReadyCancel = context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
if r1 = thePlayer.Run(logger.WithContext(ctx), cancel); r1 != nil {
cancel()
}
logger.Tf(ctx, "player done")
}()
wg.Add(1)
go func() {
defer wg.Done()
// Wait for player ready.
select {
case <-ctx.Done():
return
case <-playerIceReady.Done():
}
publisher.onSendPacket = func(m *rtmp.Message) error {
time.Sleep(100 * time.Microsecond)
return nil
}
if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil {
cancel()
}
logger.Tf(ctx, "publisher done")
}()
return nil
}()
if err := filterTestError(ctx.Err(), err, r0, r1); err != nil {
t.Errorf("err %+v", err)
}
}
func TestRtmpPublish_MultipleSequences(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1, r2 error
err := func() error {
publisher := NewRTMPPublisher()
defer publisher.Close()
player := NewRTMPPlayer()
defer player.Close()
// Connect to RTMP URL.
streamSuffix := fmt.Sprintf("rtmp-multi-spspps-%v-%v", os.Getpid(), rand.Int())
rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix)
if err := publisher.Publish(ctx, rtmpUrl); err != nil {
return err
}
if err := player.Play(ctx, rtmpUrl); err != nil {
return err
}
// Check packets.
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
var nnPackets int
var previousAvccr *avc.AVCDecoderConfigurationRecord
player.onRecvPacket = func(m *rtmp.Message, a *flv.AudioFrame, v *flv.VideoFrame) error {
if m.MessageType == rtmp.MessageTypeAudio || v.FrameType != flv.VideoFrameTypeKeyframe ||
v.Trait != flv.VideoFrameTraitSequenceHeader {
return nil
}
avccr := avc.NewAVCDecoderConfigurationRecord()
if err := avccr.UnmarshalBinary(v.Raw); err != nil {
return err
}
// Ingore the duplicated sps/pps.
if IsAvccrEquals(previousAvccr, avccr) {
return nil
}
previousAvccr = avccr
logger.Tf(ctx, "got %v sps/pps, %v %vms %vB, sps=%v, pps=%v, %v, %v",
nnPackets, m.MessageType, m.Timestamp, len(m.Payload), len(avccr.SequenceParameterSetNALUnits),
len(avccr.PictureParameterSetNALUnits), avccr.AVCProfileIndication, avccr.AVCLevelIndication)
if nnPackets++; nnPackets >= 2 {
cancel()
}
return nil
}
if r1 = player.Consume(ctx); r1 != nil {
cancel()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
var nnPackets int
ctxAvatar, cancelAvatar := context.WithCancel(ctx)
publisher.onSendPacket = func(m *rtmp.Message) error {
if m.MessageType == rtmp.MessageTypeVideo {
nnPackets++
}
if nnPackets > 10 {
cancelAvatar()
}
return nil
}
publisher.closeTransportWhenIngestDone = false
if r0 = publisher.Ingest(ctxAvatar, *srsPublishBBB); r0 != nil {
cancel()
}
publisher.closeTransportWhenIngestDone = true
if r2 = publisher.Ingest(ctx, *srsPublishAvatar); r2 != nil {
cancel()
}
}()
return nil
}()
if err := filterTestError(ctx.Err(), err, r0, r1, r2); err != nil {
t.Errorf("err %+v", err)
}
}
func TestRtmpPublish_MultipleSequences_RtcPlay(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1, r2 error
err := func() (err error) {
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
rtmpUrl := fmt.Sprintf("%v://%v%v-%v", srsSchema, *srsServer, *srsStream, streamSuffix)
// Publisher connect to a RTMP stream.
publisher := NewRTMPPublisher()
defer publisher.Close()
if err := publisher.Publish(ctx, rtmpUrl); err != nil {
return err
}
// Setup the RTC player.
var thePlayer *testPlayer
if thePlayer, err = newTestPlayer(registerMiniCodecs, func(play *testPlayer) error {
play.streamSuffix = streamSuffix
var nnSpsPps uint64
var previousSpsPps []byte
return play.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
nn, attr, err := i.nextRTPReader.Read(payload, attributes)
if err != nil {
return nn, attr, err
}
annexb, nalus, err := DemuxRtpSpsPps(payload[:nn])
if err != nil || len(nalus) == 0 ||
(nalus[0].NALUType != avc.NALUTypeSPS && nalus[0].NALUType != avc.NALUTypePPS) ||
bytes.Equal(annexb, previousSpsPps) {
return nn, attr, err
}
previousSpsPps = annexb
if nnSpsPps++; nnSpsPps >= 2 {
cancel() // Completed.
}
logger.Tf(ctx, "Play RECV SPS/PPS #%v %vB %v", nnSpsPps, nn, nalus[0].NALUType)
return nn, attr, err
}
}))
})
}); err != nil {
return err
}
defer thePlayer.Close()
// Run publisher and players.
var wg sync.WaitGroup
defer wg.Wait()
var playerIceReady context.Context
playerIceReady, thePlayer.iceReadyCancel = context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
if r1 = thePlayer.Run(logger.WithContext(ctx), cancel); r1 != nil {
cancel()
}
logger.Tf(ctx, "player done")
}()
wg.Add(1)
go func() {
defer wg.Done()
// Wait for player ready.
select {
case <-ctx.Done():
return
case <-playerIceReady.Done():
}
var nnPackets int
ctxAvatar, cancelAvatar := context.WithCancel(ctx)
publisher.onSendPacket = func(m *rtmp.Message) error {
if m.MessageType == rtmp.MessageTypeVideo {
nnPackets++
}
if nnPackets > 10 {
cancelAvatar()
}
return nil
}
publisher.closeTransportWhenIngestDone = false
if r0 = publisher.Ingest(ctxAvatar, *srsPublishBBB); r0 != nil {
cancel()
}
publisher.closeTransportWhenIngestDone = true
if r2 = publisher.Ingest(ctx, *srsPublishAvatar); r2 != nil {
cancel()
}
logger.Tf(ctx, "publisher done")
}()
return nil
}()
if err := filterTestError(ctx.Err(), err, r0, r1, r2); err != nil {
t.Errorf("err %+v", err)
}
}

View file

@ -0,0 +1,45 @@
// The MIT License (MIT)
//
// Copyright (c) 2021 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package srs
import (
"github.com/ossrs/go-oryx-lib/logger"
"io/ioutil"
"os"
"testing"
)
func TestMain(m *testing.M) {
if err := prepareTest(); err != nil {
logger.Ef(nil, "Prepare test fail, err %+v", err)
os.Exit(-1)
}
// Disable the logger during all tests.
if *srsLog == false {
olw := logger.Switch(ioutil.Discard)
defer func() {
logger.Switch(olw)
}()
}
os.Exit(m.Run())
}

View file

@ -25,7 +25,14 @@ import (
"context"
"flag"
"fmt"
"github.com/ossrs/go-oryx-lib/amf0"
"github.com/ossrs/go-oryx-lib/avc"
"github.com/ossrs/go-oryx-lib/flv"
"github.com/ossrs/go-oryx-lib/rtmp"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"io"
"math/rand"
"net"
"net/url"
"os"
@ -62,6 +69,8 @@ var srsStream *string
var srsLiveStream *string
var srsPublishAudio *string
var srsPublishVideo *string
var srsPublishAvatar *string
var srsPublishBBB *string
var srsVnetClientIP *string
func prepareTest() error {
@ -78,6 +87,8 @@ func prepareTest() error {
srsPublishOKPackets = flag.Int("srs-publish-ok-packets", 3, "If send N RTP, recv N RTCP packets, it's ok, or fail")
srsPublishAudio = flag.String("srs-publish-audio", "avatar.ogg", "The audio file for publisher.")
srsPublishVideo = flag.String("srs-publish-video", "avatar.h264", "The video file for publisher.")
srsPublishAvatar = flag.String("srs-publish-avatar", "avatar.flv", "The avatar file for publisher.")
srsPublishBBB = flag.String("srs-publish-bbb", "bbb.flv", "The bbb file for publisher.")
srsPublishVideoFps = flag.Int("srs-publish-video-fps", 25, "The video fps for publisher.")
srsVnetClientIP = flag.String("srs-vnet-client-ip", "192.168.168.168", "The client ip in pion/vnet.")
srsDTLSDropPackets = flag.Int("srs-dtls-drop-packets", 5, "If dropped N packets, it's ok, or fail")
@ -122,6 +133,14 @@ func prepareTest() error {
return err
}
if *srsPublishAvatar, err = tryOpenFile(*srsPublishAvatar); err != nil {
return err
}
if *srsPublishBBB, err = tryOpenFile(*srsPublishBBB); err != nil {
return err
}
if *srsPublishAudio, err = tryOpenFile(*srsPublishAudio); err != nil {
return err
}
@ -732,10 +751,11 @@ func (v *testWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (*
type testPlayerOptionFunc func(p *testPlayer) error
type testPlayer struct {
onOffer func(s *webrtc.SessionDescription) error
onAnswer func(s *webrtc.SessionDescription) error
pc *webrtc.PeerConnection
receivers []*webrtc.RTPReceiver
onOffer func(s *webrtc.SessionDescription) error
onAnswer func(s *webrtc.SessionDescription) error
iceReadyCancel context.CancelFunc
pc *webrtc.PeerConnection
receivers []*webrtc.RTPReceiver
// We should dispose it.
api *testWebRTCAPI
// Optional suffix for stream url.
@ -893,8 +913,20 @@ func (v *testPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
})
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
err = errors.Errorf("Close for ICE state %v", state)
logger.Tf(ctx, "ICE state %v", state)
})
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
logger.Tf(ctx, "PC state %v", state)
if state == webrtc.PeerConnectionStateConnected {
if v.iceReadyCancel != nil {
v.iceReadyCancel()
}
}
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateClosed {
err = errors.Errorf("Close for PC state %v", state)
cancel()
}
})
@ -1240,3 +1272,429 @@ func (v *testPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro
}
return ctx.Err()
}
type RTMPClient struct {
rtmpUrl string
rtmpTcUrl string
rtmpStream string
rtmpUrlObject *url.URL
streamID int
conn *net.TCPConn
proto *rtmp.Protocol
}
func (v *RTMPClient) Close() error {
if v.conn != nil {
v.conn.Close()
}
return nil
}
func (v *RTMPClient) connect(rtmpUrl string) error {
v.rtmpUrl = rtmpUrl
if index := strings.LastIndex(rtmpUrl, "/"); index <= 0 {
return fmt.Errorf("invalid url %v, index=%v", rtmpUrl, index)
} else {
v.rtmpTcUrl = rtmpUrl[0:index]
v.rtmpStream = rtmpUrl[index+1:]
}
// Parse RTMP url.
rtmpUrlObject, err := url.Parse(rtmpUrl)
if err != nil {
return err
}
v.rtmpUrlObject = rtmpUrlObject
port := rtmpUrlObject.Port()
if port == "" {
port = "1935"
}
// Connect to TCP server.
rtmpAddr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%v:%v", rtmpUrlObject.Hostname(), port))
if err != nil {
return err
}
c, err := net.DialTCP("tcp4", nil, rtmpAddr)
if err != nil {
return err
}
v.conn = c
// RTMP Handshake with server.
hs := rtmp.NewHandshake(rand.New(rand.NewSource(time.Now().UnixNano())))
if err := hs.WriteC0S0(c); err != nil {
return err
}
if err := hs.WriteC1S1(c); err != nil {
return err
}
if _, err := hs.ReadC0S0(c); err != nil {
return err
}
s1, err := hs.ReadC1S1(c)
if err != nil {
return err
}
if _, err := hs.ReadC2S2(c); err != nil {
return err
}
if err := hs.WriteC2S2(c, s1); err != nil {
return err
}
// Connect to RTMP tcUrl.
p := rtmp.NewProtocol(v.conn)
pkt := rtmp.NewConnectAppPacket()
pkt.CommandObject.Set("tcUrl", amf0.NewString(v.rtmpTcUrl))
if err = p.WritePacket(pkt, 0); err != nil {
return err
}
res := rtmp.NewConnectAppResPacket(pkt.TransactionID)
if _, err := p.ExpectPacket(&res); err != nil {
return err
}
v.proto = p
return nil
}
func (v *RTMPClient) Publish(ctx context.Context, rtmpUrl string) error {
if err := v.connect(rtmpUrl); err != nil {
return err
}
p := v.proto
// Create RTMP stream.
if true {
pkt := rtmp.NewCreateStreamPacket()
if err := p.WritePacket(pkt, 0); err != nil {
return err
}
res := rtmp.NewCreateStreamResPacket(pkt.TransactionID)
if _, err := p.ExpectPacket(&res); err != nil {
return err
}
v.streamID = int(res.StreamID)
}
// Publish RTMP stream.
if true {
pkt := rtmp.NewPublishPacket()
pkt.StreamName = *amf0.NewString(v.rtmpStream)
if err := p.WritePacket(pkt, v.streamID); err != nil {
return err
}
res := rtmp.NewCallPacket()
if _, err := p.ExpectPacket(&res); err != nil {
return err
}
}
return nil
}
func (v *RTMPClient) Play(ctx context.Context, rtmpUrl string) error {
if err := v.connect(rtmpUrl); err != nil {
return err
}
p := v.proto
// Create RTMP stream.
if true {
pkt := rtmp.NewCreateStreamPacket()
if err := p.WritePacket(pkt, 0); err != nil {
return err
}
res := rtmp.NewCreateStreamResPacket(pkt.TransactionID)
if _, err := p.ExpectPacket(&res); err != nil {
return err
}
v.streamID = int(res.StreamID)
}
// Play RTMP stream.
if true {
pkt := rtmp.NewPlayPacket()
pkt.StreamName = *amf0.NewString(v.rtmpStream)
if err := p.WritePacket(pkt, v.streamID); err != nil {
return err
}
res := rtmp.NewCallPacket()
if _, err := p.ExpectPacket(&res); err != nil {
return err
}
}
return nil
}
type RTMPPublisher struct {
client *RTMPClient
// Whether auto close transport when ingest done.
closeTransportWhenIngestDone bool
onSendPacket func(m *rtmp.Message) error
}
func NewRTMPPublisher() *RTMPPublisher {
v := &RTMPPublisher{
client: &RTMPClient{},
}
// By default, set to on.
v.closeTransportWhenIngestDone = true
return v
}
func (v *RTMPPublisher) Close() error {
return v.client.Close()
}
func (v *RTMPPublisher) Publish(ctx context.Context, rtmpUrl string) error {
return v.client.Publish(ctx, rtmpUrl)
}
func (v *RTMPPublisher) Ingest(ctx context.Context, flvInput string) error {
// If ctx is cancelled, close the RTMP transport.
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
if v.closeTransportWhenIngestDone {
v.Close()
}
}()
// Consume all packets.
err := v.ingest(flvInput)
if err == io.EOF {
return nil
}
if ctx.Err() == context.Canceled {
return nil
}
return err
}
func (v *RTMPPublisher) ingest(flvInput string) error {
p := v.client
fs, err := os.Open(flvInput)
if err != nil {
return err
}
defer fs.Close()
demuxer, err := flv.NewDemuxer(fs)
if err != nil {
return err
}
if _, _, _, err = demuxer.ReadHeader(); err != nil {
return err
}
for {
tagType, tagSize, timestamp, err := demuxer.ReadTagHeader()
if err != nil {
return err
}
tag, err := demuxer.ReadTag(tagSize)
if err != nil {
return err
}
if tagType != flv.TagTypeVideo && tagType != flv.TagTypeAudio {
continue
}
m := rtmp.NewStreamMessage(p.streamID)
m.MessageType = rtmp.MessageType(tagType)
m.Timestamp = uint64(timestamp)
m.Payload = tag
if err = p.proto.WriteMessage(m); err != nil {
return err
}
if v.onSendPacket != nil {
if err = v.onSendPacket(m); err != nil {
return err
}
}
}
return nil
}
type RTMPPlayer struct {
// Transport.
client *RTMPClient
// FLV packager.
videoPackager flv.VideoPackager
onRecvPacket func(m *rtmp.Message, a *flv.AudioFrame, v *flv.VideoFrame) error
}
func NewRTMPPlayer() *RTMPPlayer {
return &RTMPPlayer{
client: &RTMPClient{},
}
}
func (v *RTMPPlayer) Close() error {
return v.client.Close()
}
func (v *RTMPPlayer) Play(ctx context.Context, rtmpUrl string) error {
var err error
if v.videoPackager, err = flv.NewVideoPackager(); err != nil {
return err
}
return v.client.Play(ctx, rtmpUrl)
}
func (v *RTMPPlayer) Consume(ctx context.Context) error {
// If ctx is cancelled, close the RTMP transport.
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
v.Close()
}()
// Consume all packets.
err := v.consume()
if err == io.EOF {
return nil
}
if ctx.Err() == context.Canceled {
return nil
}
return err
}
func (v *RTMPPlayer) consume() error {
for {
res, err := v.client.proto.ExpectMessage(rtmp.MessageTypeVideo, rtmp.MessageTypeAudio)
if err != nil {
return err
}
if v.onRecvPacket != nil {
var audioFrame *flv.AudioFrame
var videoFrame *flv.VideoFrame
if res.MessageType == rtmp.MessageTypeVideo {
if videoFrame, err = v.videoPackager.Decode(res.Payload); err != nil {
return err
}
}
if err := v.onRecvPacket(res, audioFrame, videoFrame); err != nil {
return err
}
}
}
}
func IsAvccrEquals(a, b *avc.AVCDecoderConfigurationRecord) bool {
if a == nil || b == nil {
return false
}
if a.AVCLevelIndication != b.AVCLevelIndication ||
a.AVCProfileIndication != b.AVCProfileIndication ||
a.LengthSizeMinusOne != b.LengthSizeMinusOne ||
len(a.SequenceParameterSetNALUnits) != len(b.SequenceParameterSetNALUnits) ||
len(a.PictureParameterSetNALUnits) != len(b.PictureParameterSetNALUnits) {
return false
}
for i := 0; i < len(a.SequenceParameterSetNALUnits); i++ {
if !IsNALUEquals(a.SequenceParameterSetNALUnits[i], b.SequenceParameterSetNALUnits[i]) {
return false
}
}
for i := 0; i < len(a.PictureParameterSetNALUnits); i++ {
if !IsNALUEquals(a.PictureParameterSetNALUnits[i], b.PictureParameterSetNALUnits[i]) {
return false
}
}
return true
}
func IsNALUEquals(a, b *avc.NALU) bool {
if a == nil || b == nil {
return false
}
if a.NALUType != b.NALUType || a.NALRefIDC != b.NALRefIDC {
return false
}
return bytes.Equal(a.Data, b.Data)
}
func DemuxRtpSpsPps(payload []byte) ([]byte, []*avc.NALU, error) {
// Parse RTP packet.
pkt := rtp.Packet{}
if err := pkt.Unmarshal(payload); err != nil {
return nil, nil, err
}
// Decode H264 packet.
h264Packet := codecs.H264Packet{}
annexb, err := h264Packet.Unmarshal(pkt.Payload)
if err != nil {
return annexb, nil, err
}
// Ignore if not STAP-A
if !bytes.HasPrefix(annexb, []byte{0x00, 0x00, 0x00, 0x01}) {
return annexb, nil, err
}
// Parse to NALUs
rawNalus := bytes.Split(annexb, []byte{0x00, 0x00, 0x00, 0x01})
nalus := []*avc.NALU{}
for _, rawNalu := range rawNalus {
if len(rawNalu) == 0 {
continue
}
nalu := avc.NewNALU()
if err := nalu.UnmarshalBinary(rawNalu); err != nil {
return annexb, nil, err
}
nalus = append(nalus, nalu)
}
return annexb, nalus, nil
}