1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-03-09 15:49:59 +00:00
This commit is contained in:
winlin 2021-08-15 21:51:05 +08:00
commit 1c825ce831
11 changed files with 243 additions and 186 deletions

View file

@ -637,155 +637,6 @@ func TestRtcBasic_PublishPlay(t *testing.T) {
}()
}
// When republish a stream, the player stream SHOULD be continuous.
func TestRtcBasic_Republish(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1, r2, r3, r4 error
defer func(ctx context.Context) {
if err := filterTestError(ctx.Err(), r0, r1, r2, r3, r4); err != nil {
t.Errorf("Fail for err %+v", err)
} else {
logger.Tf(ctx, "test done with err %+v", err)
}
}(ctx)
var resources []io.Closer
defer func() {
for _, resource := range resources {
_ = resource.Close()
}
}()
var wg sync.WaitGroup
defer wg.Wait()
// The event notify.
var thePublisher, theRepublisher *testPublisher
var thePlayer *testPlayer
mainReady, mainReadyCancel := context.WithCancel(context.Background())
publishReady, publishReadyCancel := context.WithCancel(context.Background())
republishReady, republishReadyCancel := context.WithCancel(context.Background())
// Objects init.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
doInit := func() (err error) {
streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int())
// Initialize player with private api.
if thePlayer, err = newTestPlayer(registerDefaultCodecs, func(play *testPlayer) error {
play.streamSuffix = streamSuffix
resources = append(resources, play)
var nnPlayReadRTP uint64
return play.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
select {
case <-republishReady.Done():
if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) {
cancel() // Completed.
}
logger.Tf(ctx, "Play recv rtp %v packets", nnPlayReadRTP)
default:
logger.Tf(ctx, "Play recv rtp packet before republish")
}
return i.nextRTPReader.Read(payload, attributes)
}
}))
})
}); err != nil {
return err
}
// Initialize publisher with private api.
if thePublisher, err = newTestPublisher(registerDefaultCodecs, func(pub *testPublisher) error {
pub.streamSuffix = streamSuffix
pub.iceReadyCancel = publishReadyCancel
resources = append(resources, pub)
var nnPubReadRTCP uint64
return pub.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) {
i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
nn, attr, err := i.nextRTCPReader.Read(buf, attributes)
if nnPubReadRTCP++; nnPubReadRTCP > 0 && pub.cancel != nil {
pub.cancel() // We only cancel the publisher itself.
}
logger.Tf(ctx, "Publish recv rtcp %v packets", nnPubReadRTCP)
return nn, attr, err
}
}))
})
}); err != nil {
return err
}
// Initialize re-publisher with private api.
if theRepublisher, err = newTestPublisher(registerDefaultCodecs, func(pub *testPublisher) error {
pub.streamSuffix = streamSuffix
pub.iceReadyCancel = republishReadyCancel
resources = append(resources, pub)
return pub.Setup(*srsVnetClientIP)
}); err != nil {
return err
}
// Init done.
mainReadyCancel()
<-ctx.Done()
return nil
}
if err := doInit(); err != nil {
r1 = err
}
}()
// Run publisher.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-mainReady.Done():
pubCtx, pubCancel := context.WithCancel(ctx)
r2 = thePublisher.Run(logger.WithContext(pubCtx), pubCancel)
logger.Tf(ctx, "pub done, re-publish again")
// Dispose the stream.
_ = thePublisher.Close()
r4 = theRepublisher.Run(logger.WithContext(ctx), cancel)
logger.Tf(ctx, "re-pub done")
}
}()
// Run player.
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
select {
case <-ctx.Done():
case <-publishReady.Done():
r3 = thePlayer.Run(logger.WithContext(ctx), cancel)
logger.Tf(ctx, "play done")
}
}()
}
// The srs-server is DTLS server(passive), srs-bench is DTLS client which is active mode.
// No.1 srs-bench: ClientHello
// No.2 srs-server: ServerHello, Certificate, ServerKeyExchange, CertificateRequest, ServerHelloDone