1
0
Fork 0
mirror of https://github.com/ossrs/srs.git synced 2025-02-13 03:41:55 +00:00

Fix #1641, HLS/RTC picture corrupt for SPS/PPS lost. v4.0.175

This commit is contained in:
winlin 2021-10-11 23:16:05 +08:00
parent 71ed6e5dc5
commit 29cf13afa0
5 changed files with 164 additions and 17 deletions

View file

@ -21,19 +21,20 @@
package srs
import (
"bytes"
"context"
"fmt"
"github.com/ossrs/go-oryx-lib/avc"
"github.com/ossrs/go-oryx-lib/flv"
"github.com/pion/interceptor"
"math/rand"
"os"
"sync"
"testing"
"time"
"github.com/ossrs/go-oryx-lib/avc"
"github.com/ossrs/go-oryx-lib/flv"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/ossrs/go-oryx-lib/rtmp"
"github.com/pion/interceptor"
)
func TestRtmpPublishPlay(t *testing.T) {
@ -126,12 +127,14 @@ func TestRtmpPublish_RtcPlay(t *testing.T) {
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
nn, attr, err := i.nextRTPReader.Read(payload, attributes)
if err == nil {
if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) {
cancel() // Completed.
}
logger.Tf(ctx, "Play RECV RTP #%v %vB", nnPlayReadRTP, nn)
if err != nil {
return nn, attr, err
}
if nnPlayReadRTP++; nnPlayReadRTP >= uint64(*srsPlayOKPackets) {
cancel() // Completed.
}
logger.Tf(ctx, "Play RECV RTP #%v %vB", nnPlayReadRTP, nn)
return nn, attr, err
}
}))
@ -280,3 +283,113 @@ func TestRtmpPublish_MultipleSequences(t *testing.T) {
t.Errorf("err %+v", err)
}
}
func TestRtmpPublish_MultipleSequences_RtcPlay(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
var r0, r1, r2 error
err := func() (err error) {
streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int())
rtmpUrl := fmt.Sprintf("%v://%v%v-%v", srsSchema, *srsServer, *srsStream, streamSuffix)
// Publisher connect to a RTMP stream.
publisher := NewRTMPPublisher()
defer publisher.Close()
if err := publisher.Publish(ctx, rtmpUrl); err != nil {
return err
}
// Setup the RTC player.
var thePlayer *testPlayer
if thePlayer, err = newTestPlayer(registerMiniCodecs, func(play *testPlayer) error {
play.streamSuffix = streamSuffix
var nnSpsPps uint64
var previousSpsPps []byte
return play.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) {
api.registry.Add(newRTPInterceptor(func(i *rtpInterceptor) {
i.rtpReader = func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
nn, attr, err := i.nextRTPReader.Read(payload, attributes)
if err != nil {
return nn, attr, err
}
annexb, nalus, err := DemuxRtpSpsPps(payload[:nn])
if err != nil || len(nalus) == 0 ||
(nalus[0].NALUType != avc.NALUTypeSPS && nalus[0].NALUType != avc.NALUTypePPS) ||
bytes.Equal(annexb, previousSpsPps) {
return nn, attr, err
}
previousSpsPps = annexb
if nnSpsPps++; nnSpsPps >= 2 {
cancel() // Completed.
}
logger.Tf(ctx, "Play RECV SPS/PPS #%v %vB %v", nnSpsPps, nn, nalus[0].NALUType)
return nn, attr, err
}
}))
})
}); err != nil {
return err
}
defer thePlayer.Close()
// Run publisher and players.
var wg sync.WaitGroup
defer wg.Wait()
var playerIceReady context.Context
playerIceReady, thePlayer.iceReadyCancel = context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
if r1 = thePlayer.Run(logger.WithContext(ctx), cancel); r1 != nil {
cancel()
}
logger.Tf(ctx, "player done")
}()
wg.Add(1)
go func() {
defer wg.Done()
// Wait for player ready.
select {
case <-ctx.Done():
return
case <-playerIceReady.Done():
}
var nnPackets int
ctxAvatar, cancelAvatar := context.WithCancel(ctx)
publisher.onSendPacket = func(m *rtmp.Message) error {
if m.MessageType == rtmp.MessageTypeVideo {
nnPackets++
}
if nnPackets > 10 {
cancelAvatar()
}
return nil
}
publisher.closeTransportWhenIngestDone = false
if r0 = publisher.Ingest(ctxAvatar, *srsPublishBBB); r0 != nil {
cancel()
}
publisher.closeTransportWhenIngestDone = true
if r2 = publisher.Ingest(ctx, *srsPublishAvatar); r2 != nil {
cancel()
}
logger.Tf(ctx, "publisher done")
}()
return nil
}()
if err := filterTestError(ctx.Err(), err, r0, r1, r2); err != nil {
t.Errorf("err %+v", err)
}
}

View file

@ -29,6 +29,8 @@ import (
"github.com/ossrs/go-oryx-lib/avc"
"github.com/ossrs/go-oryx-lib/flv"
"github.com/ossrs/go-oryx-lib/rtmp"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"io"
"math/rand"
"net"
@ -1657,3 +1659,42 @@ func IsNALUEquals(a, b *avc.NALU) bool {
return bytes.Equal(a.Data, b.Data)
}
func DemuxRtpSpsPps(payload []byte) ([]byte, []*avc.NALU, error) {
// Parse RTP packet.
pkt := rtp.Packet{}
if err := pkt.Unmarshal(payload); err != nil {
return nil, nil, err
}
// Decode H264 packet.
h264Packet := codecs.H264Packet{}
annexb, err := h264Packet.Unmarshal(pkt.Payload)
if err != nil {
return annexb, nil, err
}
// Ignore if not STAP-A
if !bytes.HasPrefix(annexb, []byte{0x00, 0x00, 0x00, 0x01}) {
return annexb, nil, err
}
// Parse to NALUs
rawNalus := bytes.Split(annexb, []byte{0x00, 0x00, 0x00, 0x01})
nalus := []*avc.NALU{}
for _, rawNalu := range rawNalus {
if len(rawNalu) == 0 {
continue
}
nalu := avc.NewNALU()
if err := nalu.UnmarshalBinary(rawNalu); err != nil {
return annexb, nil, err
}
nalus = append(nalus, nalu)
}
return annexb, nalus, nil
}

View file

@ -8,6 +8,7 @@ The changelog for SRS.
## SRS 4.0 Changelog
* v4.0, 2021-10-11, Fix [#1641](https://github.com/ossrs/srs/issues/1641), HLS/RTC picture corrupt for SPS/PPS lost. v4.0.175
* v4.0, 2021-10-11, RTC: Refine config, aac to rtmp_to_rtc, bframe to keep_bframe. v4.0.174
* v4.0, 2021-10-10, For [#1641](https://github.com/ossrs/srs/issues/1641), Support RTMP publish and play regression test. v4.0.173
* v4.0, 2021-10-10, RTC: Change rtc.aac to discard by default. v4.0.172

View file

@ -9,6 +9,6 @@
#define VERSION_MAJOR 4
#define VERSION_MINOR 0
#define VERSION_REVISION 174
#define VERSION_REVISION 175
#endif

View file

@ -818,10 +818,6 @@ srs_error_t SrsFormat::avc_demux_sps_pps(SrsBuffer* stream)
if (!stream->require(sequenceParameterSetLength)) {
return srs_error_new(ERROR_HLS_DECODE_ERROR, "decode SPS data");
}
if (vcodec->sequenceParameterSetNALUnit.size() > 0) {
stream->skip(sequenceParameterSetLength);
continue;
}
if (sequenceParameterSetLength > 0) {
vcodec->sequenceParameterSetNALUnit.resize(sequenceParameterSetLength);
stream->read_bytes(&vcodec->sequenceParameterSetNALUnit[0], sequenceParameterSetLength);
@ -846,10 +842,6 @@ srs_error_t SrsFormat::avc_demux_sps_pps(SrsBuffer* stream)
if (!stream->require(pictureParameterSetLength)) {
return srs_error_new(ERROR_HLS_DECODE_ERROR, "decode PPS data");
}
if (vcodec->pictureParameterSetNALUnit.size() > 0) {
stream->skip(pictureParameterSetLength);
continue;
}
if (pictureParameterSetLength > 0) {
vcodec->pictureParameterSetNALUnit.resize(pictureParameterSetLength);
stream->read_bytes(&vcodec->pictureParameterSetNALUnit[0], pictureParameterSetLength);