1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-15 04:42:04 +00:00

Test: Add republish regression test, should fail

This commit is contained in:
winlin 2021-03-23 19:32:59 +08:00
parent bb37a5550c
commit 3fea5c0ec3
4 changed files with 397 additions and 373 deletions

View file

@ -44,16 +44,20 @@ type videoIngester struct {
markerInterceptor *RTPInterceptor markerInterceptor *RTPInterceptor
sVideoTrack *webrtc.TrackLocalStaticSample sVideoTrack *webrtc.TrackLocalStaticSample
sVideoSender *webrtc.RTPSender sVideoSender *webrtc.RTPSender
ready context.Context
readyCancel context.CancelFunc
} }
func NewVideoIngester(sourceVideo string) *videoIngester { func NewVideoIngester(sourceVideo string) *videoIngester {
return &videoIngester{markerInterceptor: &RTPInterceptor{}, sourceVideo: sourceVideo} v := &videoIngester{markerInterceptor: &RTPInterceptor{}, sourceVideo: sourceVideo}
v.ready, v.readyCancel = context.WithCancel(context.Background())
return v
} }
func (v *videoIngester) Close() error { func (v *videoIngester) Close() error {
v.readyCancel()
if v.sVideoSender != nil { if v.sVideoSender != nil {
v.sVideoSender.Stop() _ = v.sVideoSender.Stop()
v.sVideoSender = nil
} }
return nil return nil
} }
@ -102,6 +106,9 @@ func (v *videoIngester) Ingest(ctx context.Context) error {
logger.Tf(ctx, "Video %v, tbn=%v, fps=%v, ssrc=%v, pt=%v, header=%v", logger.Tf(ctx, "Video %v, tbn=%v, fps=%v, ssrc=%v, pt=%v, header=%v",
codec.MimeType, codec.ClockRate, fps, enc.SSRC, codec.PayloadType, headers) codec.MimeType, codec.ClockRate, fps, enc.SSRC, codec.PayloadType, headers)
// OK, we are ready.
v.readyCancel()
clock := newWallClock() clock := newWallClock()
sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 / uint64(fps)) sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 / uint64(fps))
for ctx.Err() == nil { for ctx.Err() == nil {
@ -179,16 +186,21 @@ type audioIngester struct {
audioLevelInterceptor *RTPInterceptor audioLevelInterceptor *RTPInterceptor
sAudioTrack *webrtc.TrackLocalStaticSample sAudioTrack *webrtc.TrackLocalStaticSample
sAudioSender *webrtc.RTPSender sAudioSender *webrtc.RTPSender
ready context.Context
readyCancel context.CancelFunc
} }
func NewAudioIngester(sourceAudio string) *audioIngester { func NewAudioIngester(sourceAudio string) *audioIngester {
return &audioIngester{audioLevelInterceptor: &RTPInterceptor{}, sourceAudio: sourceAudio} v := &audioIngester{audioLevelInterceptor: &RTPInterceptor{}, sourceAudio: sourceAudio}
v.ready, v.readyCancel = context.WithCancel(context.Background())
return v
} }
func (v *audioIngester) Close() error { func (v *audioIngester) Close() error {
v.readyCancel() // OK we are closed, also ready.
if v.sAudioSender != nil { if v.sAudioSender != nil {
v.sAudioSender.Stop() _ = v.sAudioSender.Stop()
v.sAudioSender = nil
} }
return nil return nil
} }
@ -240,6 +252,9 @@ func (v *audioIngester) Ingest(ctx context.Context) error {
} }
} }
// OK, we are ready.
v.readyCancel()
clock := newWallClock() clock := newWallClock()
var lastGranule uint64 var lastGranule uint64
@ -253,7 +268,7 @@ func (v *audioIngester) Ingest(ctx context.Context) error {
} }
// The amount of samples is the difference between the last and current timestamp // The amount of samples is the difference between the last and current timestamp
sampleCount := uint64(pageHeader.GranulePosition - lastGranule) sampleCount := pageHeader.GranulePosition - lastGranule
lastGranule = pageHeader.GranulePosition lastGranule = pageHeader.GranulePosition
sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 * sampleCount / uint64(codec.ClockRate)) sampleDuration := time.Duration(uint64(time.Millisecond) * 1000 * sampleCount / uint64(codec.ClockRate))
@ -266,7 +281,7 @@ func (v *audioIngester) Ingest(ctx context.Context) error {
return 0, err return 0, err
} }
header.SetExtension(uint8(audioLevel.ID), audioLevelPayload) _ = header.SetExtension(uint8(audioLevel.ID), audioLevelPayload)
} }
return ri.nextRTPWriter.Write(header, payload, attributes) return ri.nextRTPWriter.Write(header, payload, attributes)

File diff suppressed because it is too large Load diff

View file

@ -36,7 +36,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"testing"
"time" "time"
"github.com/ossrs/go-oryx-lib/errors" "github.com/ossrs/go-oryx-lib/errors"
@ -207,7 +206,7 @@ func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error
logger.Tf(ctx, "Parse response to code=%v, session=%v, sdp=%v bytes", logger.Tf(ctx, "Parse response to code=%v, session=%v, sdp=%v bytes",
resBody.Code, resBody.Session, len(resBody.SDP)) resBody.Code, resBody.Session, len(resBody.SDP))
return string(resBody.SDP), nil return resBody.SDP, nil
} }
func escapeSDP(sdp string) string { func escapeSDP(sdp string) string {
@ -219,7 +218,7 @@ func packageAsSTAPA(frames ...*h264reader.NAL) *h264reader.NAL {
buf := bytes.Buffer{} buf := bytes.Buffer{}
buf.WriteByte( buf.WriteByte(
byte(first.RefIdc<<5)&0x60 | byte(24), // STAP-A first.RefIdc<<5&0x60 | byte(24), // STAP-A
) )
for _, frame := range frames { for _, frame := range frames {
@ -325,6 +324,14 @@ func filterTestError(errs ...error) error {
if err == nil || errors.Cause(err) == context.Canceled { if err == nil || errors.Cause(err) == context.Canceled {
continue continue
} }
// If url error, server maybe error, do not print the detail log.
if r0 := errors.Cause(err); r0 != nil {
if r1, ok := r0.(*url.Error); ok {
err = r1
}
}
filteredErrors = append(filteredErrors, err) filteredErrors = append(filteredErrors, err)
} }
@ -352,13 +359,13 @@ func srsIsStun(b []byte) bool {
// @see https://tools.ietf.org/html/rfc2246#section-6.2.1 // @see https://tools.ietf.org/html/rfc2246#section-6.2.1
// @see srs_is_dtls of https://github.com/ossrs/srs // @see srs_is_dtls of https://github.com/ossrs/srs
func srsIsDTLS(b []byte) bool { func srsIsDTLS(b []byte) bool {
return (len(b) >= 13 && (b[0] > 19 && b[0] < 64)) return len(b) >= 13 && (b[0] > 19 && b[0] < 64)
} }
// For RTP or RTCP, the V=2 which is in the high 2bits, 0xC0 (1100 0000) // For RTP or RTCP, the V=2 which is in the high 2bits, 0xC0 (1100 0000)
// @see srs_is_rtp_or_rtcp of https://github.com/ossrs/srs // @see srs_is_rtp_or_rtcp of https://github.com/ossrs/srs
func srsIsRTPOrRTCP(b []byte) bool { func srsIsRTPOrRTCP(b []byte) bool {
return (len(b) >= 12 && (b[0]&0xC0) == 0x80) return len(b) >= 12 && (b[0]&0xC0) == 0x80
} }
// For RTCP, PT is [128, 223] (or without marker [0, 95]). // For RTCP, PT is [128, 223] (or without marker [0, 95]).
@ -554,7 +561,7 @@ func (v *DTLSRecord) Unmarshal(b []byte) error {
return errors.Errorf("requires 13B only %v", len(b)) return errors.Errorf("requires 13B only %v", len(b))
} }
v.ContentType = DTLSContentType(uint8(b[0])) v.ContentType = DTLSContentType(b[0])
v.Version = uint16(b[1])<<8 | uint16(b[2]) v.Version = uint16(b[1])<<8 | uint16(b[2])
v.Epoch = uint16(b[3])<<8 | uint16(b[4]) v.Epoch = uint16(b[3])<<8 | uint16(b[4])
v.SequenceNumber = uint64(b[5])<<40 | uint64(b[6])<<32 | uint64(b[7])<<24 | uint64(b[8])<<16 | uint64(b[9])<<8 | uint64(b[10]) v.SequenceNumber = uint64(b[5])<<40 | uint64(b[6])<<32 | uint64(b[7])<<24 | uint64(b[8])<<16 | uint64(b[9])<<8 | uint64(b[10])
@ -605,11 +612,11 @@ func NewTestWebRTCAPI(options ...TestWebRTCAPIOptionFunc) (*TestWebRTCAPI, error
func (v *TestWebRTCAPI) Close() error { func (v *TestWebRTCAPI) Close() error {
if v.proxy != nil { if v.proxy != nil {
v.proxy.Close() _ = v.proxy.Close()
} }
if v.router != nil { if v.router != nil {
v.router.Stop() _ = v.router.Stop()
} }
return nil return nil
@ -676,14 +683,24 @@ type TestPlayerOptionFunc func(p *TestPlayer) error
type TestPlayer struct { type TestPlayer struct {
pc *webrtc.PeerConnection pc *webrtc.PeerConnection
receivers []*webrtc.RTPReceiver receivers []*webrtc.RTPReceiver
// root api object // We should dispose it.
api *TestWebRTCAPI api *TestWebRTCAPI
// Optional suffix for stream url. // Optional suffix for stream url.
streamSuffix string streamSuffix string
} }
func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) (*TestPlayer, error) { func CreateApiForPlayer(play *TestPlayer) error {
v := &TestPlayer{api: api} api, err := NewTestWebRTCAPI()
if err != nil {
return err
}
play.api = api
return nil
}
func NewTestPlayer(options ...TestPlayerOptionFunc) (*TestPlayer, error) {
v := &TestPlayer{}
for _, opt := range options { for _, opt := range options {
if err := opt(v); err != nil { if err := opt(v); err != nil {
@ -691,19 +708,24 @@ func NewTestPlayer(api *TestWebRTCAPI, options ...TestPlayerOptionFunc) (*TestPl
} }
} }
// The api might be override by options.
api = v.api
return v, nil return v, nil
} }
func (v *TestPlayer) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error {
return v.api.Setup(vnetClientIP, options...)
}
func (v *TestPlayer) Close() error { func (v *TestPlayer) Close() error {
if v.pc != nil { if v.pc != nil {
v.pc.Close() _ = v.pc.Close()
} }
for _, receiver := range v.receivers { for _, receiver := range v.receivers {
receiver.Stop() _ = receiver.Stop()
}
if v.api != nil {
_ = v.api.Close()
} }
return nil return nil
@ -723,12 +745,16 @@ func (v *TestPlayer) Run(ctx context.Context, cancel context.CancelFunc) error {
} }
v.pc = pc v.pc = pc
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{ if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly, Direction: webrtc.RTPTransceiverDirectionRecvonly,
}) }); err != nil {
pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{ return errors.Wrapf(err, "add track")
}
if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly, Direction: webrtc.RTPTransceiverDirectionRecvonly,
}) }); err != nil {
return errors.Wrapf(err, "add track")
}
offer, err := pc.CreateOffer(nil) offer, err := pc.CreateOffer(nil)
if err != nil { if err != nil {
@ -818,16 +844,28 @@ type TestPublisher struct {
aIngester *audioIngester aIngester *audioIngester
vIngester *videoIngester vIngester *videoIngester
pc *webrtc.PeerConnection pc *webrtc.PeerConnection
// root api object // We should dispose it.
api *TestWebRTCAPI api *TestWebRTCAPI
// Optional suffix for stream url. // Optional suffix for stream url.
streamSuffix string streamSuffix string
// To cancel the publisher, pass by Run.
cancel context.CancelFunc
} }
func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (*TestPublisher, error) { func CreateApiForPublisher(pub *TestPublisher) error {
api, err := NewTestWebRTCAPI()
if err != nil {
return err
}
pub.api = api
return nil
}
func NewTestPublisher(options ...TestPublisherOptionFunc) (*TestPublisher, error) {
sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio
v := &TestPublisher{api: api} v := &TestPublisher{}
for _, opt := range options { for _, opt := range options {
if err := opt(v); err != nil { if err := opt(v); err != nil {
@ -835,9 +873,6 @@ func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (*
} }
} }
// The api might be override by options.
api = v.api
// Create ingesters. // Create ingesters.
if sourceAudio != "" { if sourceAudio != "" {
v.aIngester = NewAudioIngester(sourceAudio) v.aIngester = NewAudioIngester(sourceAudio)
@ -847,6 +882,7 @@ func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (*
} }
// Setup the interceptors for packets. // Setup the interceptors for packets.
api := v.api
api.options = append(api.options, func(api *TestWebRTCAPI) { api.options = append(api.options, func(api *TestWebRTCAPI) {
// Filter for RTCP packets. // Filter for RTCP packets.
rtcpInterceptor := &RTCPInterceptor{} rtcpInterceptor := &RTCPInterceptor{}
@ -870,17 +906,25 @@ func NewTestPublisher(api *TestWebRTCAPI, options ...TestPublisherOptionFunc) (*
return v, nil return v, nil
} }
func (v *TestPublisher) Setup(vnetClientIP string, options ...TestWebRTCAPIOptionFunc) error {
return v.api.Setup(vnetClientIP, options...)
}
func (v *TestPublisher) Close() error { func (v *TestPublisher) Close() error {
if v.vIngester != nil { if v.vIngester != nil {
v.vIngester.Close() _ = v.vIngester.Close()
} }
if v.aIngester != nil { if v.aIngester != nil {
v.aIngester.Close() _ = v.aIngester.Close()
} }
if v.pc != nil { if v.pc != nil {
v.pc.Close() _ = v.pc.Close()
}
if v.api != nil {
_ = v.api.Close()
} }
return nil return nil
@ -892,6 +936,9 @@ func (v *TestPublisher) SetStreamSuffix(suffix string) *TestPublisher {
} }
func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) error { func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) error {
// Save the cancel.
v.cancel = cancel
r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream)
if v.streamSuffix != "" { if v.streamSuffix != "" {
r = fmt.Sprintf("%v-%v", r, v.streamSuffix) r = fmt.Sprintf("%v-%v", r, v.streamSuffix)
@ -1012,11 +1059,17 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro
<-ctx.Done() <-ctx.Done()
if v.aIngester != nil && v.aIngester.sAudioSender != nil { if v.aIngester != nil && v.aIngester.sAudioSender != nil {
v.aIngester.sAudioSender.Stop() // We MUST wait for the ingester ready(or closed), because it might crash if sender is disposed.
<-v.aIngester.ready.Done()
_ = v.aIngester.Close()
} }
if v.vIngester != nil && v.vIngester.sVideoSender != nil { if v.vIngester != nil && v.vIngester.sVideoSender != nil {
v.vIngester.sVideoSender.Stop() // We MUST wait for the ingester ready(or closed), because it might crash if sender is disposed.
<-v.vIngester.ready.Done()
_ = v.vIngester.Close()
} }
}() }()
@ -1028,6 +1081,7 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro
if v.aIngester == nil { if v.aIngester == nil {
return return
} }
defer v.aIngester.readyCancel()
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -1072,6 +1126,7 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro
if v.vIngester == nil { if v.vIngester == nil {
return return
} }
defer v.vIngester.readyCancel()
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -1119,47 +1174,3 @@ func (v *TestPublisher) Run(ctx context.Context, cancel context.CancelFunc) erro
} }
return ctx.Err() return ctx.Err()
} }
func TestRTCServerVersion(t *testing.T) {
api := fmt.Sprintf("http://%v:1985/api/v1/versions", *srsServer)
req, err := http.NewRequest("POST", api, nil)
if err != nil {
t.Errorf("Request %v", api)
return
}
res, err := http.DefaultClient.Do(req)
if err != nil {
t.Errorf("Do request %v", api)
return
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Errorf("Read body of %v", api)
return
}
obj := struct {
Code int `json:"code"`
Server string `json:"server"`
Data struct {
Major int `json:"major"`
Minor int `json:"minor"`
Revision int `json:"revision"`
Version string `json:"version"`
} `json:"data"`
}{}
if err := json.Unmarshal(b, &obj); err != nil {
t.Errorf("Parse %v", string(b))
return
}
if obj.Code != 0 {
t.Errorf("Server err code=%v, server=%v", obj.Code, obj.Server)
return
}
if obj.Data.Major == 0 && obj.Data.Minor == 0 {
t.Errorf("Invalid version %v", obj.Data)
return
}
}

View file

@ -605,7 +605,7 @@ srs_error_t SrsRtcPlayStream::send_packet(SrsRtpPacket2*& pkt)
// TODO: FIXME: Maybe refine for performance issue. // TODO: FIXME: Maybe refine for performance issue.
if (!audio_tracks_.count(pkt->header.get_ssrc()) && !video_tracks_.count(pkt->header.get_ssrc())) { if (!audio_tracks_.count(pkt->header.get_ssrc()) && !video_tracks_.count(pkt->header.get_ssrc())) {
srs_warn("ssrc %u not found", pkt->header.get_ssrc()); srs_warn("RTC: Drop for ssrc %u not found", pkt->header.get_ssrc());
return err; return err;
} }