diff --git a/trunk/3rdparty/srs-bench/srs/rtc_test.go b/trunk/3rdparty/srs-bench/srs/rtc_test.go index a387a68ed..c7121e7f3 100644 --- a/trunk/3rdparty/srs-bench/srs/rtc_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtc_test.go @@ -2022,71 +2022,44 @@ func TestRtcPublish_FlvPlay(t *testing.T) { select { case <-ctx.Done(): + return case <-publishReady.Done(): - var url string = "http://127.0.0.1:8080" + *srsStream + "-" + streamSuffix + ".flv" - logger.Tf(ctx, "Run play flv url=%v", url) + } - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - logger.Tf(ctx, "New request for flv %v failed, err=%v", url, err) - return - } + player := NewFLVPlayer() + defer player.Close() - client := http.Client{} - resp, err := client.Do(req) - if err != nil { - logger.Tf(ctx, "Http get flv %v failed, err=%v", url, err) - return - } - - var f flv.Demuxer - if f, err = flv.NewDemuxer(resp.Body); err != nil { - logger.Tf(ctx, "Create flv demuxer for %v failed, err=%v", url, err) - return - } - defer f.Close() - - var hasVideo, hasAudio bool - if _, hasVideo, hasAudio, err = f.ReadHeader(); err != nil { - logger.Tf(ctx, "Flv demuxer read header failed, err=%v", err) - return + r3 = func() error { + flvUrl := fmt.Sprintf("http://%v%v-%v.flv", *srsHttpServer, *srsStream, streamSuffix) + if err := player.Play(ctx, flvUrl); err != nil { + return err } var nnVideo, nnAudio int - var prevVideoTimestamp, prevAudioTimestamp int64 - - for { - var tagType flv.TagType - var tagSize, timestamp uint32 - if tagType, tagSize, timestamp, err = f.ReadTagHeader(); err != nil { - logger.Tf(ctx, "Flv demuxer read tag header failed, err=%v", err) - return - } - - var tag []byte - if tag, err = f.ReadTag(tagSize); err != nil { - logger.Tf(ctx, "Flv demuxer read tag failed, err=%v", err) - return - } - + var hasVideo, hasAudio bool + player.onRecvHeader = func(ha, hv bool) error { + hasAudio, hasVideo = ha, hv + return nil + } + player.onRecvTag = func(tagType flv.TagType, size, timestamp uint32, tag []byte) error { if tagType == flv.TagTypeAudio { nnAudio++ - prevAudioTimestamp = (int64)(timestamp) } else if tagType == flv.TagTypeVideo { nnVideo++ - prevVideoTimestamp = (int64)(timestamp) } + logger.Tf(ctx, "got %v tag, %v %vms %vB", nnVideo+nnAudio, tagType, timestamp, len(tag)) - audioPacketsOK, videoPacketsOK := !hasAudio || nnAudio >= 10, !hasVideo || nnVideo >= 10 - if audioPacketsOK && videoPacketsOK { - avDiff := prevVideoTimestamp - prevAudioTimestamp - logger.Tf(ctx, "Flv recv %v/%v audio, %v/%v video, avDiff=%v", hasAudio, nnAudio, hasVideo, nnVideo, avDiff) + if audioPacketsOK, videoPacketsOK := !hasAudio || nnAudio >= 10, !hasVideo || nnVideo >= 10; audioPacketsOK && videoPacketsOK { + logger.Tf(ctx, "Flv recv %v/%v audio, %v/%v video", hasAudio, nnAudio, hasVideo, nnVideo) cancel() - break } - - _ = tag + return nil } - } + if err := player.Consume(ctx); err != nil { + return err + } + + return nil + }() }() } diff --git a/trunk/3rdparty/srs-bench/srs/rtmp_test.go b/trunk/3rdparty/srs-bench/srs/rtmp_test.go index eedfaaeba..e1fe6864c 100644 --- a/trunk/3rdparty/srs-bench/srs/rtmp_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtmp_test.go @@ -24,6 +24,7 @@ import ( "bytes" "context" "fmt" + "github.com/pkg/errors" "math/rand" "os" "sync" @@ -393,3 +394,246 @@ func TestRtmpPublish_MultipleSequences_RtcPlay(t *testing.T) { t.Errorf("err %+v", err) } } + +func TestRtmpPublish_FlvPlay(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + + var r0, r1 error + err := func() error { + publisher := NewRTMPPublisher() + defer publisher.Close() + + player := NewFLVPlayer() + defer player.Close() + + // Connect to RTMP URL. + streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int()) + rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix) + flvUrl := fmt.Sprintf("http://%v/live/%v.flv", *srsHttpServer, streamSuffix) + + if err := publisher.Publish(ctx, rtmpUrl); err != nil { + return err + } + + if err := player.Play(ctx, flvUrl); err != nil { + return err + } + + // Check packets. + var wg sync.WaitGroup + defer wg.Wait() + + publisherReady, publisherReadyCancel := context.WithCancel(context.Background()) + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(30 * time.Millisecond) // Wait for publisher to push sequence header. + publisherReadyCancel() + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-publisherReady.Done() + + var nnPackets int + player.onRecvHeader = func(hasAudio, hasVideo bool) error { + return nil + } + player.onRecvTag = func(tp flv.TagType, size, ts uint32, tag []byte) error { + logger.Tf(ctx, "got %v tag, %v %vms %vB", nnPackets, tp, ts, len(tag)) + if nnPackets += 1; nnPackets > 50 { + cancel() + } + return nil + } + if r1 = player.Consume(ctx); r1 != nil { + cancel() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + publisher.onSendPacket = func(m *rtmp.Message) error { + time.Sleep(1 * time.Millisecond) + return nil + } + if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil { + cancel() + } + }() + + return nil + }() + if err := filterTestError(ctx.Err(), err, r0, r1); err != nil { + t.Errorf("err %+v", err) + } +} + +func TestRtmpPublish_FlvPlayNoAudio(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + + var r0, r1 error + err := func() error { + publisher := NewRTMPPublisher() + defer publisher.Close() + + // Set publisher to drop audio. + publisher.hasAudio = false + + player := NewFLVPlayer() + defer player.Close() + + // Connect to RTMP URL. + streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int()) + rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix) + flvUrl := fmt.Sprintf("http://%v/live/%v.flv", *srsHttpServer, streamSuffix) + + if err := publisher.Publish(ctx, rtmpUrl); err != nil { + return err + } + + if err := player.Play(ctx, flvUrl); err != nil { + return err + } + + // Check packets. + var wg sync.WaitGroup + defer wg.Wait() + + publisherReady, publisherReadyCancel := context.WithCancel(context.Background()) + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(30 * time.Millisecond) // Wait for publisher to push sequence header. + publisherReadyCancel() + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-publisherReady.Done() + + var nnPackets int + player.onRecvHeader = func(hasAudio, hasVideo bool) error { + return nil + } + player.onRecvTag = func(tp flv.TagType, size, ts uint32, tag []byte) error { + if tp == flv.TagTypeAudio { + return errors.New("should no audio") + } + logger.Tf(ctx, "got %v tag, %v %vms %vB", nnPackets, tp, ts, len(tag)) + if nnPackets += 1; nnPackets > 50 { + cancel() + } + return nil + } + if r1 = player.Consume(ctx); r1 != nil { + cancel() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + publisher.onSendPacket = func(m *rtmp.Message) error { + time.Sleep(1 * time.Millisecond) + return nil + } + if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil { + cancel() + } + }() + + return nil + }() + if err := filterTestError(ctx.Err(), err, r0, r1); err != nil { + t.Errorf("err %+v", err) + } +} + +func TestRtmpPublish_FlvPlayNoVideo(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + + var r0, r1 error + err := func() error { + publisher := NewRTMPPublisher() + defer publisher.Close() + + // Set publisher to drop video. + publisher.hasVideo = false + + player := NewFLVPlayer() + defer player.Close() + + // Connect to RTMP URL. + streamSuffix := fmt.Sprintf("rtmp-regression-%v-%v", os.Getpid(), rand.Int()) + rtmpUrl := fmt.Sprintf("rtmp://%v/live/%v", *srsServer, streamSuffix) + flvUrl := fmt.Sprintf("http://%v/live/%v.flv", *srsHttpServer, streamSuffix) + + if err := publisher.Publish(ctx, rtmpUrl); err != nil { + return err + } + + if err := player.Play(ctx, flvUrl); err != nil { + return err + } + + // Check packets. + var wg sync.WaitGroup + defer wg.Wait() + + publisherReady, publisherReadyCancel := context.WithCancel(context.Background()) + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(30 * time.Millisecond) // Wait for publisher to push sequence header. + publisherReadyCancel() + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-publisherReady.Done() + + var nnPackets int + player.onRecvHeader = func(hasAudio, hasVideo bool) error { + return nil + } + player.onRecvTag = func(tp flv.TagType, size, ts uint32, tag []byte) error { + if tp == flv.TagTypeVideo { + return errors.New("should no video") + } + logger.Tf(ctx, "got %v tag, %v %vms %vB", nnPackets, tp, ts, len(tag)) + if nnPackets += 1; nnPackets > 50 { + cancel() + } + return nil + } + if r1 = player.Consume(ctx); r1 != nil { + cancel() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + publisher.onSendPacket = func(m *rtmp.Message) error { + time.Sleep(1 * time.Millisecond) + return nil + } + if r0 = publisher.Ingest(ctx, *srsPublishAvatar); r0 != nil { + cancel() + } + }() + + return nil + }() + if err := filterTestError(ctx.Err(), err, r0, r1); err != nil { + t.Errorf("err %+v", err) + } +} diff --git a/trunk/3rdparty/srs-bench/srs/util.go b/trunk/3rdparty/srs-bench/srs/util.go index cf3d5a6e5..e36d41a58 100644 --- a/trunk/3rdparty/srs-bench/srs/util.go +++ b/trunk/3rdparty/srs-bench/srs/util.go @@ -34,6 +34,7 @@ import ( "io" "math/rand" "net" + "net/http" "net/url" "os" "path" @@ -65,6 +66,7 @@ var srsDTLSDropPackets *int var srsSchema string var srsServer *string +var srsHttpServer *string var srsStream *string var srsLiveStream *string var srsPublishAudio *string @@ -75,7 +77,8 @@ var srsVnetClientIP *string func prepareTest() (err error) { srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API") - srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to") + srsServer = flag.String("srs-server", "127.0.0.1", "The RTMP/RTC server to connect to") + srsHttpServer = flag.String("srs-http-server", "127.0.0.1:8080", "The HTTP server to connect to") srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC app/stream to play") srsLiveStream = flag.String("srs-live-stream", "/live/livestream", "The LIVE app/stream to play") srsLog = flag.Bool("srs-log", false, "Whether enable the detail log") @@ -1445,6 +1448,10 @@ type RTMPPublisher struct { client *RTMPClient // Whether auto close transport when ingest done. closeTransportWhenIngestDone bool + // Whether drop audio, set the hasAudio to false. + hasAudio bool + // Whether drop video, set the hasVideo to false. + hasVideo bool onSendPacket func(m *rtmp.Message) error } @@ -1456,6 +1463,7 @@ func NewRTMPPublisher() *RTMPPublisher { // By default, set to on. v.closeTransportWhenIngestDone = true + v.hasAudio, v.hasVideo = true, true return v } @@ -1465,6 +1473,7 @@ func (v *RTMPPublisher) Close() error { } func (v *RTMPPublisher) Publish(ctx context.Context, rtmpUrl string) error { + logger.Tf(ctx, "Publish %v", rtmpUrl) return v.client.Publish(ctx, rtmpUrl) } @@ -1483,7 +1492,8 @@ func (v *RTMPPublisher) Ingest(ctx context.Context, flvInput string) error { }() // Consume all packets. - err := v.ingest(flvInput) + logger.Tf(ctx, "Start to ingest %v", flvInput) + err := v.ingest(ctx, flvInput) if err == io.EOF { return nil } @@ -1493,7 +1503,7 @@ func (v *RTMPPublisher) Ingest(ctx context.Context, flvInput string) error { return err } -func (v *RTMPPublisher) ingest(flvInput string) error { +func (v *RTMPPublisher) ingest(ctx context.Context, flvInput string) error { p := v.client fs, err := os.Open(flvInput) @@ -1501,6 +1511,7 @@ func (v *RTMPPublisher) ingest(flvInput string) error { return err } defer fs.Close() + logger.Tf(ctx, "Open input %v", flvInput) demuxer, err := flv.NewDemuxer(fs) if err != nil { @@ -1525,6 +1536,12 @@ func (v *RTMPPublisher) ingest(flvInput string) error { if tagType != flv.TagTypeVideo && tagType != flv.TagTypeAudio { continue } + if !v.hasAudio && tagType == flv.TagTypeAudio { + continue + } + if !v.hasVideo && tagType == flv.TagTypeVideo { + continue + } m := rtmp.NewStreamMessage(p.streamID) m.MessageType = rtmp.MessageType(tagType) @@ -1577,6 +1594,9 @@ func (v *RTMPPlayer) Consume(ctx context.Context) error { var wg sync.WaitGroup defer wg.Wait() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + wg.Add(1) go func() { defer wg.Done() @@ -1618,6 +1638,133 @@ func (v *RTMPPlayer) consume() error { } } +type FLVPlayer struct { + flvUrl string + client *http.Client + resp *http.Response + f flv.Demuxer + + onRecvHeader func(hasAudio, hasVideo bool) error + onRecvTag func(tp flv.TagType, size, ts uint32, tag []byte) error +} + +func NewFLVPlayer() *FLVPlayer { + return &FLVPlayer{ + client: &http.Client{}, resp: nil, f: nil, onRecvHeader: nil, onRecvTag: nil, + } +} + +func (v *FLVPlayer) Close() error { + if v.f != nil { + v.f.Close() + } + if v.resp != nil { + v.resp.Body.Close() + } + return nil +} + +func (v *FLVPlayer) Play(ctx context.Context, flvUrl string) error { + v.flvUrl = flvUrl + return nil +} + +func (v *FLVPlayer) Consume(ctx context.Context) error { + // If ctx is cancelled, close the RTMP transport. + var wg sync.WaitGroup + defer wg.Wait() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + v.Close() + }() + + // Start to play. + if err := v.play(ctx, v.flvUrl); err != nil { + return err + } + + // Consume all packets. + err := v.consume(ctx) + if err == io.EOF { + return nil + } + if ctx.Err() == context.Canceled { + return nil + } + return err +} + +func (v *FLVPlayer) play(ctx context.Context, flvUrl string) error { + logger.Tf(ctx, "Run play flv url=%v", flvUrl) + + req, err := http.NewRequestWithContext(ctx, "GET", flvUrl, nil) + if err != nil { + return errors.Wrapf(err, "New request for flv %v failed, err=%v", flvUrl, err) + } + + resp, err := v.client.Do(req) + if err != nil { + return errors.Wrapf(err, "Http get flv %v failed, err=%v", flvUrl, err) + } + logger.Tf(ctx, "Connected to %v", flvUrl) + + if v.resp != nil { + v.resp.Body.Close() + } + v.resp = resp + + f, err := flv.NewDemuxer(resp.Body) + if err != nil { + return errors.Wrapf(err, "Create flv demuxer for %v failed, err=%v", flvUrl, err) + } + + if v.f != nil { + v.f.Close() + } + v.f = f + + return nil +} + +func (v *FLVPlayer) consume(ctx context.Context) (err error) { + var hasVideo, hasAudio bool + if _, hasVideo, hasAudio, err = v.f.ReadHeader(); err != nil { + return errors.Wrapf(err, "Flv demuxer read header failed, err=%v", err) + } + logger.Tf(ctx, "Got audio=%v, video=%v", hasAudio, hasVideo) + + if v.onRecvHeader != nil { + if err := v.onRecvHeader(hasAudio, hasVideo); err != nil { + return errors.Wrapf(err, "Callback FLV header audio=%v, video=%v", hasAudio, hasVideo) + } + } + + for { + var tagType flv.TagType + var tagSize, timestamp uint32 + if tagType, tagSize, timestamp, err = v.f.ReadTagHeader(); err != nil { + return errors.Wrapf(err, "Flv demuxer read tag header failed, err=%v", err) + } + + var tag []byte + if tag, err = v.f.ReadTag(tagSize); err != nil { + return errors.Wrapf(err, "Flv demuxer read tag failed, err=%v", err) + } + + if v.onRecvTag != nil { + if err := v.onRecvTag(tagType, tagSize, timestamp, tag); err != nil { + return errors.Wrapf(err, "Callback tag type=%v, size=%v, ts=%v, tag=%vB", tagType, tagSize, timestamp, len(tag)) + } + } + } +} + func IsAvccrEquals(a, b *avc.AVCDecoderConfigurationRecord) bool { if a == nil || b == nil { return false diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 96f411209..7f620fa96 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -1453,9 +1453,33 @@ vhost http.remux.srs.com { # Whether drop packet if not match header. For example, there is has_audio and has video flag in FLV header, if # this is set to on and has_audio is false, then SRS will drop audio packets when got audio packets. Generally # it should work, but sometimes you might need SRS to keep packets even when FLV header is set to false. + # See https://github.com/ossrs/srs/issues/939#issuecomment-1348740526 + # TODO: Only support HTTP-FLV stream right now. # Overwrite by env SRS_VHOST_HTTP_REMUX_DROP_IF_NOT_MATCH for all vhosts. - # default: on + # Default: on drop_if_not_match on; + # Whether stream has audio track, used as default value for stream metadata, for example, FLV header contains + # this flag. Sometimes you might want to force the metadata by disable guess_has_av. + # See https://github.com/ossrs/srs/issues/939#issuecomment-1351385460 + # TODO: Only support HTTP-FLV stream right now. + # Overwrite by env SRS_VHOST_HTTP_REMUX_HAS_AUDIO for all vhosts. + # Default: on + has_audio on; + # Whether stream has video track, used as default value for stream metadata, for example, FLV header contains + # this flag. Sometimes you might want to force the metadata by disable guess_has_av. + # See https://github.com/ossrs/srs/issues/939#issuecomment-1351385460 + # TODO: Only support HTTP-FLV stream right now. + # Overwrite by env SRS_VHOST_HTTP_REMUX_HAS_VIDEO for all vhosts. + # Default: on + has_video on; + # Whether guessing stream about audio or video track, used to generate the flags in, such as FLV header. If + # guessing, depends on sequence header and frames in gop cache, so it might be incorrect especially your stream + # is not regular. If not guessing, use the configured default value has_audio and has_video. + # See https://github.com/ossrs/srs/issues/939#issuecomment-1351385460 + # TODO: Only support HTTP-FLV stream right now. + # Overwrite by env SRS_VHOST_HTTP_REMUX_GUESS_HAS_AV for all vhosts. + # Default: on + guess_has_av on; # the stream mount for rtmp to remux to live streaming. # typical mount to [vhost]/[app]/[stream].flv # the variables: diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 72e1a1a56..7326a9261 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -8,6 +8,7 @@ The changelog for SRS. ## SRS 5.0 Changelog +* v5.0, 2022-12-14, For [#939](https://github.com/ossrs/srs/issues/939): FLV: Support set default has_av and disable guessing. v5.0.110 * v5.0, 2022-12-13, For [#939](https://github.com/ossrs/srs/issues/939): FLV: Drop packet if header flag is not matched. v5.0.109 * v5.0, 2022-12-13, For [#939](https://github.com/ossrs/srs/issues/939): FLV: Reset has_audio or has_video if only sequence header. * v5.0, 2022-12-12, Merge [#3301](https://github.com/ossrs/srs/pull/3301): DASH: Fix dash crash bug when writing file. v5.0.108 diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 461119b3c..539b0ee64 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2600,7 +2600,8 @@ srs_error_t SrsConfig::check_normal_config() } else if (n == "http_remux") { for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; - if (m != "enabled" && m != "mount" && m != "fast_cache" && m != "drop_if_not_match") { + if (m != "enabled" && m != "mount" && m != "fast_cache" && m != "drop_if_not_match" + && m != "has_audio" && m != "has_video" && m != "guess_has_av") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.http_remux.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -8265,6 +8266,78 @@ bool SrsConfig::get_vhost_http_remux_drop_if_not_match(string vhost) return SRS_CONF_PERFER_TRUE(conf->arg0()); } +bool SrsConfig::get_vhost_http_remux_has_audio(string vhost) +{ + SRS_OVERWRITE_BY_ENV_BOOL2("srs.vhost.http_remux.has_audio"); // SRS_VHOST_HTTP_REMUX_HAS_AUDIO + + static bool DEFAULT = true; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("http_remux"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("has_audio"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + +bool SrsConfig::get_vhost_http_remux_has_video(string vhost) +{ + SRS_OVERWRITE_BY_ENV_BOOL2("srs.vhost.http_remux.has_video"); // SRS_VHOST_HTTP_REMUX_HAS_VIDEO + + static bool DEFAULT = true; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("http_remux"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("has_video"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + +bool SrsConfig::get_vhost_http_remux_guess_has_av(string vhost) +{ + SRS_OVERWRITE_BY_ENV_BOOL2("srs.vhost.http_remux.guess_has_av"); // SRS_VHOST_HTTP_REMUX_GUESS_HAS_AV + + static bool DEFAULT = true; + + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("http_remux"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("guess_has_av"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_TRUE(conf->arg0()); +} + string SrsConfig::get_vhost_http_remux_mount(string vhost) { SRS_OVERWRITE_BY_ENV_STRING("srs.vhost.http_remux.mount"); // SRS_VHOST_HTTP_REMUX_MOUNT diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 7597069cb..5fb939af8 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -1066,6 +1066,12 @@ public: virtual srs_utime_t get_vhost_http_remux_fast_cache(std::string vhost); // Whether drop packet if not match header. bool get_vhost_http_remux_drop_if_not_match(std::string vhost); + // Whether stream has audio track. + bool get_vhost_http_remux_has_audio(std::string vhost); + // Whether stream has video track. + bool get_vhost_http_remux_has_video(std::string vhost); + // Whether guessing stream about audio or video track + bool get_vhost_http_remux_guess_has_av(std::string vhost); // Get the http flv live stream mount point for vhost. // used to generate the flv stream mount path. virtual std::string get_vhost_http_remux_mount(std::string vhost); diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index b04624f9c..1c920ae67 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -238,6 +238,9 @@ SrsFlvStreamEncoder::SrsFlvStreamEncoder() { header_written = false; enc = new SrsFlvTransmuxer(); + has_audio_ = true; + has_video_ = true; + guess_has_av_ = true; } SrsFlvStreamEncoder::~SrsFlvStreamEncoder() @@ -260,7 +263,7 @@ srs_error_t SrsFlvStreamEncoder::write_audio(int64_t timestamp, char* data, int { srs_error_t err = srs_success; - if ((err = write_header()) != srs_success) { + if ((err = write_header(has_video_, has_audio_)) != srs_success) { return srs_error_wrap(err, "write header"); } @@ -271,7 +274,7 @@ srs_error_t SrsFlvStreamEncoder::write_video(int64_t timestamp, char* data, int { srs_error_t err = srs_success; - if ((err = write_header()) != srs_success) { + if ((err = write_header(has_video_, has_audio_)) != srs_success) { return srs_error_wrap(err, "write header"); } @@ -282,7 +285,7 @@ srs_error_t SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, i { srs_error_t err = srs_success; - if ((err = write_header()) != srs_success) { + if ((err = write_header(has_video_, has_audio_)) != srs_success) { return srs_error_wrap(err, "write header"); } @@ -294,6 +297,21 @@ void SrsFlvStreamEncoder::set_drop_if_not_match(bool v) enc->set_drop_if_not_match(v); } +void SrsFlvStreamEncoder::set_has_audio(bool v) +{ + has_audio_ = v; +} + +void SrsFlvStreamEncoder::set_has_video(bool v) +{ + has_video_ = v; +} + +void SrsFlvStreamEncoder::set_guess_has_av(bool v) +{ + guess_has_av_ = v; +} + bool SrsFlvStreamEncoder::has_cache() { // for flv stream, use gop cache of SrsLiveSource is ok. @@ -310,31 +328,39 @@ srs_error_t SrsFlvStreamEncoder::write_tags(SrsSharedPtrMessage** msgs, int coun { srs_error_t err = srs_success; + // Ignore if no messages. + if (count <= 0) return err; + // For https://github.com/ossrs/srs/issues/939 if (!header_written) { - bool has_video = false; bool has_audio = false; - int nn_video_frames = 0; int nn_audio_frames = 0; + bool has_video = has_audio_; bool has_audio = has_video_; - // Note that we must iterate all messages to count the audio and video frames. - for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs[i]; - if (msg->is_video()) { - if (!SrsFlvVideo::sh(msg->payload, msg->size)) nn_video_frames++; - has_video = true; - } else if (msg->is_audio()) { - if (!SrsFlvAudio::sh(msg->payload, msg->size)) nn_audio_frames++; - has_audio = true; + // See https://github.com/ossrs/srs/issues/939#issuecomment-1351385460 + if (guess_has_av_) { + int nn_video_frames = 0; int nn_audio_frames = 0; + has_audio = has_video = false; + + // Note that we must iterate all messages to count the audio and video frames. + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = msgs[i]; + if (msg->is_video()) { + if (!SrsFlvVideo::sh(msg->payload, msg->size)) nn_video_frames++; + has_video = true; + } else if (msg->is_audio()) { + if (!SrsFlvAudio::sh(msg->payload, msg->size)) nn_audio_frames++; + has_audio = true; + } } - } - // See https://github.com/ossrs/srs/issues/939#issuecomment-1348541733 - if (nn_video_frames > 0 && nn_audio_frames == 0) { - if (has_audio) srs_trace("FLV: Reset has_audio for videos=%d and audios=%d", nn_video_frames, nn_audio_frames); - has_audio = false; - } - if (nn_audio_frames > 0 && nn_video_frames == 0) { - if (has_video) srs_trace("FLV: Reset has_video for videos=%d and audios=%d", nn_video_frames, nn_audio_frames); - has_video = false; + // See https://github.com/ossrs/srs/issues/939#issuecomment-1348541733 + if (nn_video_frames > 0 && nn_audio_frames == 0) { + if (has_audio) srs_trace("FLV: Reset has_audio for videos=%d and audios=%d", nn_video_frames, nn_audio_frames); + has_audio = false; + } + if (nn_audio_frames > 0 && nn_video_frames == 0) { + if (has_video) srs_trace("FLV: Reset has_video for videos=%d and audios=%d", nn_video_frames, nn_audio_frames); + has_video = false; + } } // Drop data if no A+V. @@ -347,6 +373,7 @@ srs_error_t SrsFlvStreamEncoder::write_tags(SrsSharedPtrMessage** msgs, int coun } } + // Write tags after header is done. return enc->write_tags(msgs, count); } @@ -361,7 +388,8 @@ srs_error_t SrsFlvStreamEncoder::write_header(bool has_video, bool has_audio) return srs_error_wrap(err, "write header"); } - srs_trace("FLV: write header audio=%d, video=%d, dinm=%d", has_audio, has_video, enc->drop_if_not_match()); + srs_trace("FLV: write header audio=%d, video=%d, dinm=%d, config=%d/%d/%d", has_audio, has_video, + enc->drop_if_not_match(), has_audio_, has_video_, guess_has_av_); } return err; @@ -584,12 +612,18 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess srs_assert(entry); bool drop_if_not_match = _srs_config->get_vhost_http_remux_drop_if_not_match(req->vhost); + bool has_audio = _srs_config->get_vhost_http_remux_has_audio(req->vhost); + bool has_video = _srs_config->get_vhost_http_remux_has_video(req->vhost); + bool guess_has_av = _srs_config->get_vhost_http_remux_guess_has_av(req->vhost); if (srs_string_ends_with(entry->pattern, ".flv")) { w->header()->set_content_type("video/x-flv"); enc_desc = "FLV"; enc = new SrsFlvStreamEncoder(); ((SrsFlvStreamEncoder*)enc)->set_drop_if_not_match(drop_if_not_match); + ((SrsFlvStreamEncoder*)enc)->set_has_audio(has_audio); + ((SrsFlvStreamEncoder*)enc)->set_has_video(has_video); + ((SrsFlvStreamEncoder*)enc)->set_guess_has_av(guess_has_av); } else if (srs_string_ends_with(entry->pattern, ".aac")) { w->header()->set_content_type("audio/x-aac"); enc_desc = "AAC"; @@ -659,8 +693,9 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess } srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost); - srs_trace("FLV %s, encoder=%s, mw_sleep=%dms, cache=%d, msgs=%d, dinm=%d", entry->pattern.c_str(), enc_desc.c_str(), - srsu2msi(mw_sleep), enc->has_cache(), msgs.max, drop_if_not_match); + srs_trace("FLV %s, encoder=%s, mw_sleep=%dms, cache=%d, msgs=%d, dinm=%d, guess_av=%d/%d/%d", + entry->pattern.c_str(), enc_desc.c_str(), srsu2msi(mw_sleep), enc->has_cache(), msgs.max, drop_if_not_match, + has_audio, has_video, guess_has_av); // TODO: free and erase the disabled entry after all related connections is closed. // TODO: FXIME: Support timeout for player, quit infinite-loop. diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index e24004ad7..d01bbe3b1 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -68,6 +68,9 @@ class SrsFlvStreamEncoder : public ISrsBufferEncoder private: SrsFlvTransmuxer* enc; bool header_written; + bool has_audio_; + bool has_video_; + bool guess_has_av_; public: SrsFlvStreamEncoder(); virtual ~SrsFlvStreamEncoder(); @@ -78,13 +81,17 @@ public: virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size); public: void set_drop_if_not_match(bool v); + void set_has_audio(bool v); + void set_has_video(bool v); + void set_guess_has_av(bool v); +public: virtual bool has_cache(); virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter); public: // Write the tags in a time. virtual srs_error_t write_tags(SrsSharedPtrMessage** msgs, int count); private: - virtual srs_error_t write_header(bool has_video = true, bool has_audio = true); + virtual srs_error_t write_header(bool has_video, bool has_audio); }; // Transmux RTMP to HTTP TS Streaming. diff --git a/trunk/src/core/srs_core_version5.hpp b/trunk/src/core/srs_core_version5.hpp index 1b01763af..8897dd66c 100644 --- a/trunk/src/core/srs_core_version5.hpp +++ b/trunk/src/core/srs_core_version5.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 5 #define VERSION_MINOR 0 -#define VERSION_REVISION 109 +#define VERSION_REVISION 110 #endif diff --git a/trunk/src/utest/srs_utest_config.cpp b/trunk/src/utest/srs_utest_config.cpp index 8afbc934d..35219d3da 100644 --- a/trunk/src/utest/srs_utest_config.cpp +++ b/trunk/src/utest/srs_utest_config.cpp @@ -4688,6 +4688,36 @@ VOID TEST(ConfigEnvTest, CheckEnvValuesHttpRemux) SrsSetEnvConfig(drop_if_not_match2, "SRS_VHOST_HTTP_REMUX_DROP_IF_NOT_MATCH", "on"); EXPECT_TRUE(conf.get_vhost_http_remux_drop_if_not_match("__defaultVhost__")); } + + if (true) { + EXPECT_TRUE(conf.get_vhost_http_remux_has_audio("__defaultVhost__")); + + SrsSetEnvConfig(has_audio, "SRS_VHOST_HTTP_REMUX_HAS_AUDIO", "off"); + EXPECT_FALSE(conf.get_vhost_http_remux_has_audio("__defaultVhost__")); + + SrsSetEnvConfig(has_audio2, "SRS_VHOST_HTTP_REMUX_HAS_AUDIO", "on"); + EXPECT_TRUE(conf.get_vhost_http_remux_has_audio("__defaultVhost__")); + } + + if (true) { + EXPECT_TRUE(conf.get_vhost_http_remux_has_video("__defaultVhost__")); + + SrsSetEnvConfig(has_video, "SRS_VHOST_HTTP_REMUX_HAS_VIDEO", "off"); + EXPECT_FALSE(conf.get_vhost_http_remux_has_video("__defaultVhost__")); + + SrsSetEnvConfig(has_video2, "SRS_VHOST_HTTP_REMUX_HAS_VIDEO", "on"); + EXPECT_TRUE(conf.get_vhost_http_remux_has_video("__defaultVhost__")); + } + + if (true) { + EXPECT_TRUE(conf.get_vhost_http_remux_guess_has_av("__defaultVhost__")); + + SrsSetEnvConfig(guess_has_av, "SRS_VHOST_HTTP_REMUX_GUESS_HAS_AV", "off"); + EXPECT_FALSE(conf.get_vhost_http_remux_guess_has_av("__defaultVhost__")); + + SrsSetEnvConfig(guess_has_av2, "SRS_VHOST_HTTP_REMUX_GUESS_HAS_AV", "on"); + EXPECT_TRUE(conf.get_vhost_http_remux_guess_has_av("__defaultVhost__")); + } } VOID TEST(ConfigEnvTest, CheckEnvValuesDash)