From 301a8ceadbe528ecce1cf6b7c6598bedb602b96b Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 24 Jul 2021 21:10:35 +0800 Subject: [PATCH] Squash: Merge SRS 4.0 --- .run/srs-ingest.run.xml | 7 + CHANGELOG.md | 1 + trunk/3rdparty/srs-bench/srs/api.go | 154 +++++++++ trunk/3rdparty/srs-bench/srs/rtc_test.go | 423 +++++++++++++++++++++-- trunk/3rdparty/srs-bench/srs/util.go | 214 ++++++++---- trunk/conf/clion-ingest.conf | 53 +++ trunk/src/app/srs_app_rtc_conn.cpp | 2 +- trunk/src/core/srs_core_version4.hpp | 2 +- 8 files changed, 756 insertions(+), 100 deletions(-) create mode 100644 .run/srs-ingest.run.xml create mode 100644 trunk/3rdparty/srs-bench/srs/api.go create mode 100644 trunk/conf/clion-ingest.conf diff --git a/.run/srs-ingest.run.xml b/.run/srs-ingest.run.xml new file mode 100644 index 000000000..1ef1dba30 --- /dev/null +++ b/.run/srs-ingest.run.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index c5cf73851..cc794932d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The changelog for SRS. ## SRS 4.0 Changelog +* v4.0, 2021-07-24, Merge [#2373](https://github.com/ossrs/srs/pull/2373), RTC: Fix NACK negotiation bug for Firefox. 4.0.145 * v4.0, 2021-07-24, Merge [#2483](https://github.com/ossrs/srs/pull/2483), RTC: Support statistic for HTTP-API, HTTP-Callback and Security. 4.0.144 * v4.0, 2021-07-21, Merge [#2474](https://github.com/ossrs/srs/pull/2474), SRT: Use thread-safe log for multiple-threading SRT module. 4.0.143 * v4.0, 2021-07-17, Fix bugs and enhance code. 4.0.142 diff --git a/trunk/3rdparty/srs-bench/srs/api.go b/trunk/3rdparty/srs-bench/srs/api.go new file mode 100644 index 000000000..db24bfd32 --- /dev/null +++ b/trunk/3rdparty/srs-bench/srs/api.go @@ -0,0 +1,154 @@ +// The MIT License (MIT) +// +// Copyright (c) 2021 Winlin +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +package srs + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" +) + +// Request SRS API and got response, both in JSON. +// The r is HTTP API to request, like "http://localhost:1985/rtc/v1/play". +// The req is the HTTP request body, will be marshal to JSON object. nil is no body +// The res is the HTTP response body, already unmarshal to JSON object. +func apiRequest(ctx context.Context, r string, req interface{}, res interface{}) error { + var b []byte + if req != nil { + if b0, err := json.Marshal(req); err != nil { + return errors.Wrapf(err, "Marshal body %v", req) + } else { + b = b0 + } + } + logger.If(ctx, "Request url api=%v with %v", r, string(b)) + logger.Tf(ctx, "Request url api=%v with %v bytes", r, len(b)) + + method := "POST" + if req == nil { + method = "GET" + } + reqObj, err := http.NewRequest(method, r, strings.NewReader(string(b))) + if err != nil { + return errors.Wrapf(err, "HTTP request %v", string(b)) + } + + resObj, err := http.DefaultClient.Do(reqObj.WithContext(ctx)) + if err != nil { + return errors.Wrapf(err, "Do HTTP request %v", string(b)) + } + + b2, err := ioutil.ReadAll(resObj.Body) + if err != nil { + return errors.Wrapf(err, "Read response for %v", string(b)) + } + logger.If(ctx, "Response from %v is %v", r, string(b2)) + logger.Tf(ctx, "Response from %v is %v bytes", r, len(b2)) + + errorCode := struct { + Code int `json:"code"` + }{} + if err := json.Unmarshal(b2, &errorCode); err != nil { + return errors.Wrapf(err, "Unmarshal %v", string(b2)) + } + if errorCode.Code != 0 { + return errors.Errorf("Server fail code=%v %v", errorCode.Code, string(b2)) + } + + if err := json.Unmarshal(b2, res); err != nil { + return errors.Wrapf(err, "Unmarshal %v", string(b2)) + } + logger.Tf(ctx, "Parse response to code=%v ok, %v", errorCode.Code, res) + + return nil +} + +// The SRS HTTP statistic API. +type statAPI struct { + ctx context.Context + streams []*statStream + stream *statStream +} + +func newStatAPI(ctx context.Context) *statAPI { + return &statAPI{ctx: ctx} +} + +type statGeneral struct { + Code int `json:"code"` + Server string `json:"server"` +} + +type statPublishInStream struct { + Cid string `json:"cid"` + Active bool `json:"active"` +} + +func (v statPublishInStream) String() string { + return fmt.Sprintf("id=%v, active=%v", v.Cid, v.Active) +} + +type statStream struct { + ID string `json:"id"` + Vhost string `json:"vhost"` + App string `json:"app"` + Name string `json:"name"` + Clients int `json:"clients"` + Publish statPublishInStream `json:"publish"` +} + +func (v statStream) String() string { + return fmt.Sprintf("id=%v, name=%v, pub=%v", v.ID, v.Name, v.Publish) +} + +// Output to v.streams +func (v *statAPI) Streams() *statAPI { + res := struct { + statGeneral + Streams []*statStream `json:"streams"` + }{} + + ctx := v.ctx + if err := apiRequest(ctx, "http://localhost:1985/api/v1/streams/", nil, &res); err != nil { + logger.Tf(ctx, "query streams err %+v", err) + return v + } + + v.streams = res.Streams + return v +} + +// Output to v.stream +func (v *statAPI) FilterByStreamSuffix(suffix string) *statAPI { + for _, stream := range v.streams { + if strings.HasSuffix(stream.Name, suffix) { + v.stream = stream + break + } + } + return v +} diff --git a/trunk/3rdparty/srs-bench/srs/rtc_test.go b/trunk/3rdparty/srs-bench/srs/rtc_test.go index 686e54819..4d2360120 100644 --- a/trunk/3rdparty/srs-bench/srs/rtc_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtc_test.go @@ -25,11 +25,13 @@ import ( "encoding/json" "fmt" "github.com/pion/transport/vnet" + "github.com/pion/webrtc/v3" "io" "io/ioutil" "math/rand" "net/http" "os" + "strings" "sync" "testing" "time" @@ -58,11 +60,384 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +// Test for https://github.com/ossrs/srs/pull/2483 +func TestPR2483_RtcStatApi_PublisherOnly(t *testing.T) { + if err := filterTestError(func() error { + streamSuffix := fmt.Sprintf("publish-only-%v-%v", os.Getpid(), rand.Int()) + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { + p.streamSuffix = streamSuffix + return nil + }) + if err != nil { + return err + } + defer p.Close() + + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { + var once sync.Once + api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + once.Do(func() { + stat := newStatAPI(ctx).Streams().FilterByStreamSuffix(p.streamSuffix) + logger.Tf(ctx, "Check publishing, streams=%v, stream=%v", len(stat.streams), stat.stream) + if stat.stream != nil { + cancel() // done + } + }) + return i.nextRTCPReader.Read(buf, attributes) + } + })) + }); err != nil { + return err + } + + return p.Run(ctx, cancel) + }()); err != nil { + t.Errorf("err %+v", err) + } +} + +// Veirfy https://github.com/ossrs/srs/issues/2371 +func TestBugfix2371_PublishWithNack(t *testing.T) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := filterTestError(func() error { + streamSuffix := fmt.Sprintf("bugfix-2371-%v-%v", os.Getpid(), rand.Int()) + p, err := newTestPublisher(registerMiniCodecs, func(p *testPublisher) error { + p.streamSuffix = streamSuffix + p.onOffer = func(s *webrtc.SessionDescription) error { + if n := strings.Count(s.SDP, "nack"); n != 2 { + return errors.Errorf("invalid %v nack", n) + } + return nil + } + p.onAnswer = func(s *webrtc.SessionDescription) error { + if n := strings.Count(s.SDP, "nack"); n != 2 { + return errors.Errorf("invalid %v nack", n) + } + cancel() + return nil + } + return nil + }) + if err != nil { + return err + } + defer p.Close() + + if err := p.Setup(*srsVnetClientIP); err != nil { + return err + } + + return p.Run(ctx, cancel) + }()); err != nil { + t.Errorf("err %+v", err) + } +} + +// Veirfy https://github.com/ossrs/srs/issues/2371 +func TestBugfix2371_PublishWithoutNack(t *testing.T) { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := filterTestError(func() error { + streamSuffix := fmt.Sprintf("bugfix-2371-%v-%v", os.Getpid(), rand.Int()) + p, err := newTestPublisher(registerMiniCodecsWithoutNack, func(p *testPublisher) error { + p.streamSuffix = streamSuffix + p.onOffer = func(s *webrtc.SessionDescription) error { + if n := strings.Count(s.SDP, "nack"); n != 0 { + return errors.Errorf("invalid %v nack", n) + } + return nil + } + p.onAnswer = func(s *webrtc.SessionDescription) error { + if n := strings.Count(s.SDP, "nack"); n != 0 { + return errors.Errorf("invalid %v nack", n) + } + cancel() + return nil + } + return nil + }) + if err != nil { + return err + } + defer p.Close() + + if err := p.Setup(*srsVnetClientIP); err != nil { + return err + } + + return p.Run(ctx, cancel) + }()); err != nil { + t.Errorf("err %+v", err) + } +} + +// Veirfy https://github.com/ossrs/srs/issues/2371 +func TestBugfix2371_PlayWithNack(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + + var r0, r1, r2, r3 error + defer func(ctx context.Context) { + if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil { + t.Errorf("Fail for err %+v", err) + } else { + logger.Tf(ctx, "test done with err %+v", err) + } + }(ctx) + + var resources []io.Closer + defer func() { + for _, resource := range resources { + _ = resource.Close() + } + }() + + var wg sync.WaitGroup + defer wg.Wait() + + // The event notify. + var thePublisher *testPublisher + var thePlayer *testPlayer + + mainReady, mainReadyCancel := context.WithCancel(context.Background()) + publishReady, publishReadyCancel := context.WithCancel(context.Background()) + + // Objects init. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + doInit := func() (err error) { + streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int()) + + // Initialize player with private api. + if thePlayer, err = newTestPlayer(registerMiniCodecs, func(play *testPlayer) error { + play.streamSuffix = streamSuffix + play.onOffer = func(s *webrtc.SessionDescription) error { + if n := strings.Count(s.SDP, "nack"); n != 2 { + return errors.Errorf("invalid %v nack", n) + } + return nil + } + play.onAnswer = func(s *webrtc.SessionDescription) error { + if n := strings.Count(s.SDP, "nack"); n != 2 { + return errors.Errorf("invalid %v nack", n) + } + cancel() + return nil + } + resources = append(resources, play) + return play.Setup(*srsVnetClientIP) + }); err != nil { + return err + } + + // Initialize publisher with private api. + if thePublisher, err = newTestPublisher(registerMiniCodecs, func(pub *testPublisher) error { + pub.streamSuffix = streamSuffix + pub.iceReadyCancel = publishReadyCancel + resources = append(resources, pub) + return pub.Setup(*srsVnetClientIP) + }); err != nil { + return err + } + + // Init done. + mainReadyCancel() + + <-ctx.Done() + return nil + } + + if err := doInit(); err != nil { + r1 = err + } + }() + + // Run publisher. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + select { + case <-ctx.Done(): + case <-mainReady.Done(): + r2 = thePublisher.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "pub done") + } + }() + + // Run player. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + select { + case <-ctx.Done(): + case <-publishReady.Done(): + r3 = thePlayer.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "play done") + } + }() +} + +// Veirfy https://github.com/ossrs/srs/issues/2371 +func TestBugfix2371_PlayWithoutNack(t *testing.T) { + ctx := logger.WithContext(context.Background()) + ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond) + + var r0, r1, r2, r3 error + defer func(ctx context.Context) { + if err := filterTestError(ctx.Err(), r0, r1, r2, r3); err != nil { + t.Errorf("Fail for err %+v", err) + } else { + logger.Tf(ctx, "test done with err %+v", err) + } + }(ctx) + + var resources []io.Closer + defer func() { + for _, resource := range resources { + _ = resource.Close() + } + }() + + var wg sync.WaitGroup + defer wg.Wait() + + // The event notify. + var thePublisher *testPublisher + var thePlayer *testPlayer + + mainReady, mainReadyCancel := context.WithCancel(context.Background()) + publishReady, publishReadyCancel := context.WithCancel(context.Background()) + + // Objects init. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + doInit := func() (err error) { + streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int()) + + // Initialize player with private api. + if thePlayer, err = newTestPlayer(registerMiniCodecsWithoutNack, func(play *testPlayer) error { + play.streamSuffix = streamSuffix + play.onOffer = func(s *webrtc.SessionDescription) error { + if n := strings.Count(s.SDP, "nack"); n != 0 { + return errors.Errorf("invalid %v nack", n) + } + return nil + } + play.onAnswer = func(s *webrtc.SessionDescription) error { + if n := strings.Count(s.SDP, "nack"); n != 0 { + return errors.Errorf("invalid %v nack", n) + } + cancel() + return nil + } + resources = append(resources, play) + return play.Setup(*srsVnetClientIP) + }); err != nil { + return err + } + + // Initialize publisher with private api. + if thePublisher, err = newTestPublisher(registerMiniCodecs, func(pub *testPublisher) error { + pub.streamSuffix = streamSuffix + pub.iceReadyCancel = publishReadyCancel + resources = append(resources, pub) + return pub.Setup(*srsVnetClientIP) + }); err != nil { + return err + } + + // Init done. + mainReadyCancel() + + <-ctx.Done() + return nil + } + + if err := doInit(); err != nil { + r1 = err + } + }() + + // Run publisher. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + select { + case <-ctx.Done(): + case <-mainReady.Done(): + r2 = thePublisher.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "pub done") + } + }() + + // Run player. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + select { + case <-ctx.Done(): + case <-publishReady.Done(): + r3 = thePlayer.Run(logger.WithContext(ctx), cancel) + logger.Tf(ctx, "play done") + } + }() +} + +// Veirfy https://github.com/ossrs/srs/issues/2371 +func TestBugfix2371_RTMP2RTC_PlayWithNack(t *testing.T) { + if err := filterTestError(func() error { + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + p, err := newTestPlayer(registerMiniCodecs, func(play *testPlayer) error { + play.onOffer = func(s *webrtc.SessionDescription) error { + if n := strings.Count(s.SDP, "nack"); n != 2 { + return errors.Errorf("invalid %v nack", n) + } + return nil + } + play.onAnswer = func(s *webrtc.SessionDescription) error { + if n := strings.Count(s.SDP, "nack"); n != 2 { + return errors.Errorf("invalid %v nack", n) + } + cancel() + return nil + } + return nil + }) + if err != nil { + return err + } + defer p.Close() + + if err := p.Setup(*srsVnetClientIP); err != nil { + return err + } + + return p.Run(ctx, cancel) + }()); err != nil { + t.Errorf("err %+v", err) + } +} + // Basic use scenario, publish a stream. func TestRtcBasic_PublishOnly(t *testing.T) { if err := filterTestError(func() error { streamSuffix := fmt.Sprintf("publish-only-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix return nil }) @@ -149,7 +524,7 @@ func TestRtcBasic_PublishPlay(t *testing.T) { streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int()) // Initialize player with private api. - if thePlayer, err = newTestPlayer(createApiForPlayer, func(play *testPlayer) error { + if thePlayer, err = newTestPlayer(registerDefaultCodecs, func(play *testPlayer) error { play.streamSuffix = streamSuffix resources = append(resources, play) @@ -183,7 +558,7 @@ func TestRtcBasic_PublishPlay(t *testing.T) { } // Initialize publisher with private api. - if thePublisher, err = newTestPublisher(createApiForPublisher, func(pub *testPublisher) error { + if thePublisher, err = newTestPublisher(registerDefaultCodecs, func(pub *testPublisher) error { pub.streamSuffix = streamSuffix pub.iceReadyCancel = publishReadyCancel resources = append(resources, pub) @@ -304,7 +679,7 @@ func TestRtcBasic_Republish(t *testing.T) { streamSuffix := fmt.Sprintf("basic-publish-play-%v-%v", os.Getpid(), rand.Int()) // Initialize player with private api. - if thePlayer, err = newTestPlayer(createApiForPlayer, func(play *testPlayer) error { + if thePlayer, err = newTestPlayer(registerDefaultCodecs, func(play *testPlayer) error { play.streamSuffix = streamSuffix resources = append(resources, play) @@ -330,7 +705,7 @@ func TestRtcBasic_Republish(t *testing.T) { } // Initialize publisher with private api. - if thePublisher, err = newTestPublisher(createApiForPublisher, func(pub *testPublisher) error { + if thePublisher, err = newTestPublisher(registerDefaultCodecs, func(pub *testPublisher) error { pub.streamSuffix = streamSuffix pub.iceReadyCancel = publishReadyCancel resources = append(resources, pub) @@ -353,7 +728,7 @@ func TestRtcBasic_Republish(t *testing.T) { } // Initialize re-publisher with private api. - if theRepublisher, err = newTestPublisher(createApiForPublisher, func(pub *testPublisher) error { + if theRepublisher, err = newTestPublisher(registerDefaultCodecs, func(pub *testPublisher) error { pub.streamSuffix = streamSuffix pub.iceReadyCancel = republishReadyCancel resources = append(resources, pub) @@ -419,7 +794,7 @@ func TestRtcBasic_Republish(t *testing.T) { func TestRtcDTLS_ClientActive_Default(t *testing.T) { if err := filterTestError(func() error { streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -474,7 +849,7 @@ func TestRtcDTLS_ClientActive_Default(t *testing.T) { func TestRtcDTLS_ClientPassive_Default(t *testing.T) { if err := filterTestError(func() error { streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -526,7 +901,7 @@ func TestRtcDTLS_ClientPassive_Default(t *testing.T) { func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { if err := filterTestError(func() error { streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -585,7 +960,7 @@ func TestRtcDTLS_ClientActive_Duplicated_Alert(t *testing.T) { func TestRtcDTLS_ClientPassive_Duplicated_Alert(t *testing.T) { if err := filterTestError(func() error { streamSuffix := fmt.Sprintf("dtls-active-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -651,7 +1026,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ClientHello(t *testing.T var r0 error err := func() error { streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -729,7 +1104,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ClientHello(t *testing. var r0 error err := func() error { streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -806,7 +1181,7 @@ func TestRtcDTLS_ClientActive_ARQ_ClientHello_ByDropped_ServerHello(t *testing.T var r0, r1 error err := func() error { streamSuffix := fmt.Sprintf("dtls-active-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -894,7 +1269,7 @@ func TestRtcDTLS_ClientPassive_ARQ_ClientHello_ByDropped_ServerHello(t *testing. var r0, r1 error err := func() error { streamSuffix := fmt.Sprintf("dtls-passive-arq-client-hello-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -979,7 +1354,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_Certificate(t *testing.T var r0 error err := func() error { streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -1056,7 +1431,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_Certificate(t *testing. var r0 error err := func() error { streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1133,7 +1508,7 @@ func TestRtcDTLS_ClientActive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *test var r0, r1 error err := func() error { streamSuffix := fmt.Sprintf("dtls-active-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupActive return nil @@ -1219,7 +1594,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes var r0, r1 error err := func() error { streamSuffix := fmt.Sprintf("dtls-passive-arq-certificate-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1296,7 +1671,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_ByDropped_ChangeCipherSpec(t *tes func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { if err := filterTestError(func() error { streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1349,7 +1724,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ClientHello(t *testing.T) { func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { if err := filterTestError(func() error { streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1402,7 +1777,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ServerHello(t *testing.T) { func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { if err := filterTestError(func() error { streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1455,7 +1830,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_Certificate(t *testing.T) { func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { if err := filterTestError(func() error { streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1509,7 +1884,7 @@ func TestRtcDTLS_ClientPassive_ARQ_DropAllAfter_ChangeCipherSpec(t *testing.T) { func TestRtcDTLS_ClientPassive_ARQ_VeryBadNetwork(t *testing.T) { if err := filterTestError(func() error { streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil @@ -1587,7 +1962,7 @@ func TestRtcDTLS_ClientPassive_ARQ_Certificate_After_ClientHello(t *testing.T) { var r0 error err := func() error { streamSuffix := fmt.Sprintf("dtls-passive-no-arq-%v-%v", os.Getpid(), rand.Int()) - p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p, err := newTestPublisher(registerDefaultCodecs, func(p *testPublisher) error { p.streamSuffix = streamSuffix p.onOffer = testUtilSetupPassive return nil diff --git a/trunk/3rdparty/srs-bench/srs/util.go b/trunk/3rdparty/srs-bench/srs/util.go index 54516e1f0..eb88e8c95 100644 --- a/trunk/3rdparty/srs-bench/srs/util.go +++ b/trunk/3rdparty/srs-bench/srs/util.go @@ -23,13 +23,10 @@ package srs import ( "bytes" "context" - "encoding/json" "flag" "fmt" "io" - "io/ioutil" "net" - "net/http" "net/url" "os" "path" @@ -62,6 +59,7 @@ var srsDTLSDropPackets *int var srsSchema string var srsServer *string var srsStream *string +var srsLiveStream *string var srsPublishAudio *string var srsPublishVideo *string var srsVnetClientIP *string @@ -71,7 +69,8 @@ func prepareTest() error { srsHttps = flag.Bool("srs-https", false, "Whther connect to HTTPS-API") srsServer = flag.String("srs-server", "127.0.0.1", "The RTC server to connect to") - srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC stream to play") + srsStream = flag.String("srs-stream", "/rtc/regression", "The RTC app/stream to play") + srsLiveStream = flag.String("srs-live-stream", "/live/livestream", "The LIVE app/stream to play") srsLog = flag.Bool("srs-log", false, "Whether enable the detail log") srsTimeout = flag.Int("srs-timeout", 5000, "For each case, the timeout in ms") srsPlayPLI = flag.Int("srs-play-pli", 5000, "The PLI interval in seconds for player.") @@ -130,6 +129,10 @@ func prepareTest() error { return nil } +// Request SRS RTC API, the apiPath like "/rtc/v1/play", the r is WebRTC url like +// "webrtc://localhost/live/livestream", and the offer is SDP in string. +// +// Return the response of answer SDP in string. func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error) { u, err := url.Parse(r) if err != nil { @@ -165,41 +168,18 @@ func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error api, "", offer, r, } - b, err := json.Marshal(reqBody) - if err != nil { - return "", errors.Wrapf(err, "Marshal body %v", reqBody) - } - logger.If(ctx, "Request url api=%v with %v", api, string(b)) - logger.Tf(ctx, "Request url api=%v with %v bytes", api, len(b)) - - req, err := http.NewRequest("POST", api, strings.NewReader(string(b))) - if err != nil { - return "", errors.Wrapf(err, "HTTP request %v", string(b)) - } - - res, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return "", errors.Wrapf(err, "Do HTTP request %v", string(b)) - } - - b2, err := ioutil.ReadAll(res.Body) - if err != nil { - return "", errors.Wrapf(err, "Read response for %v", string(b)) - } - logger.If(ctx, "Response from %v is %v", api, string(b2)) - logger.Tf(ctx, "Response from %v is %v bytes", api, len(b2)) - resBody := struct { Code int `json:"code"` Session string `json:"sessionid"` SDP string `json:"sdp"` }{} - if err := json.Unmarshal(b2, &resBody); err != nil { - return "", errors.Wrapf(err, "Marshal %v", string(b2)) + + if err := apiRequest(ctx, api, reqBody, &resBody); err != nil { + return "", errors.Wrapf(err, "request api=%v", api) } if resBody.Code != 0 { - return "", errors.Errorf("Server fail code=%v %v", resBody.Code, string(b2)) + return "", errors.Errorf("Server fail code=%v", resBody.Code) } logger.If(ctx, "Parse response to code=%v, session=%v, sdp=%v", resBody.Code, resBody.Session, escapeSDP(resBody.SDP)) @@ -256,6 +236,11 @@ func (v *wallClock) Tick(d time.Duration) time.Duration { return 0 } +// Do nothing for SDP. +func testUtilPassBy(s *webrtc.SessionDescription) error { + return nil +} + // Set to active, as DTLS client, to start ClientHello. func testUtilSetupActive(s *webrtc.SessionDescription) error { if strings.Contains(s.SDP, "setup:passive") { @@ -570,6 +555,7 @@ func (v *dtlsRecord) Unmarshal(b []byte) error { return nil } +// The func to setup testWebRTCAPI type testWebRTCAPIOptionFunc func(api *testWebRTCAPI) type testWebRTCAPI struct { @@ -588,25 +574,88 @@ type testWebRTCAPI struct { proxy *vnet_proxy.UDPProxy } -func newTestWebRTCAPI(options ...testWebRTCAPIOptionFunc) (*testWebRTCAPI, error) { +// The func to initialize testWebRTCAPI +type testWebRTCAPIInitFunc func(api *testWebRTCAPI) error + +// Implements interface testWebRTCAPIInitFunc to init testWebRTCAPI +func registerDefaultCodecs(api *testWebRTCAPI) error { + v := api + + if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil { + return err + } + + if err := webrtc.RegisterDefaultInterceptors(v.mediaEngine, v.registry); err != nil { + return err + } + + return nil +} + +// Implements interface testWebRTCAPIInitFunc to init testWebRTCAPI +func registerMiniCodecs(api *testWebRTCAPI) error { + v := api + + if err := v.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{webrtc.MimeTypeOpus, 48000, 2, "minptime=10;useinbandfec=1", nil}, + PayloadType: 111, + }, webrtc.RTPCodecTypeAudio); err != nil { + return err + } + + videoRTCPFeedback := []webrtc.RTCPFeedback{{"goog-remb", ""}, {"ccm", "fir"}, {"nack", ""}, {"nack", "pli"}} + if err := v.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{webrtc.MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", videoRTCPFeedback}, + PayloadType: 108, + }, webrtc.RTPCodecTypeVideo); err != nil { + return err + } + + // Interceptors for NACK??? @see webrtc.ConfigureNack(v.mediaEngine, v.registry) + return nil +} + +// Implements interface testWebRTCAPIInitFunc to init testWebRTCAPI +func registerMiniCodecsWithoutNack(api *testWebRTCAPI) error { + v := api + + if err := v.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{webrtc.MimeTypeOpus, 48000, 2, "minptime=10;useinbandfec=1", nil}, + PayloadType: 111, + }, webrtc.RTPCodecTypeAudio); err != nil { + return err + } + + videoRTCPFeedback := []webrtc.RTCPFeedback{{"goog-remb", ""}, {"ccm", "fir"}} + if err := v.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{webrtc.MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f", videoRTCPFeedback}, + PayloadType: 108, + }, webrtc.RTPCodecTypeVideo); err != nil { + return err + } + + // Interceptors for NACK??? @see webrtc.ConfigureNack(v.mediaEngine, v.registry) + return nil +} + +func newTestWebRTCAPI(inits ...testWebRTCAPIInitFunc) (*testWebRTCAPI, error) { v := &testWebRTCAPI{} v.mediaEngine = &webrtc.MediaEngine{} - if err := v.mediaEngine.RegisterDefaultCodecs(); err != nil { - return nil, err - } - v.registry = &interceptor.Registry{} - if err := webrtc.RegisterDefaultInterceptors(v.mediaEngine, v.registry); err != nil { - return nil, err - } - - for _, setup := range options { - setup(v) - } - v.settingEngine = &webrtc.SettingEngine{} + // Apply initialize filter, for example, register default codecs when create publisher/player. + for _, setup := range inits { + if setup == nil { + continue + } + + if err := setup(v); err != nil { + return nil, err + } + } + return v, nil } @@ -657,10 +706,12 @@ func (v *testWebRTCAPI) Setup(vnetClientIP string, options ...testWebRTCAPIOptio return err } + // Apply options from params, for example, tester to register vnet filter. for _, setup := range options { setup(v) } + // Apply options in api, for example, publisher register audio-level interceptor. for _, setup := range v.options { setup(v) } @@ -681,27 +732,29 @@ func (v *testWebRTCAPI) NewPeerConnection(configuration webrtc.Configuration) (* type testPlayerOptionFunc func(p *testPlayer) error type testPlayer struct { + onOffer func(s *webrtc.SessionDescription) error + onAnswer func(s *webrtc.SessionDescription) error pc *webrtc.PeerConnection receivers []*webrtc.RTPReceiver // We should dispose it. api *testWebRTCAPI // Optional suffix for stream url. streamSuffix string + // Optional app/stream to play, use srsStream by default. + defaultStream string } -func createApiForPlayer(play *testPlayer) error { - api, err := newTestWebRTCAPI() - if err != nil { - return err - } - - play.api = api - return nil -} - -func newTestPlayer(options ...testPlayerOptionFunc) (*testPlayer, error) { +// Create test player, the init is used to initialize api which maybe nil, +// and the options is used to setup the player itself. +func newTestPlayer(init testWebRTCAPIInitFunc, options ...testPlayerOptionFunc) (*testPlayer, error) { v := &testPlayer{} + api, err := newTestWebRTCAPI(init) + if err != nil { + return nil, err + } + v.api = api + for _, opt := range options { if err := opt(v); err != nil { return nil, err @@ -733,6 +786,9 @@ func (v *testPlayer) Close() error { func (v *testPlayer) Run(ctx context.Context, cancel context.CancelFunc) error { r := fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, *srsStream) + if v.defaultStream != "" { + r = fmt.Sprintf("%v://%v%v", srsSchema, *srsServer, v.defaultStream) + } if v.streamSuffix != "" { r = fmt.Sprintf("%v-%v", r, v.streamSuffix) } @@ -765,22 +821,35 @@ func (v *testPlayer) Run(ctx context.Context, cancel context.CancelFunc) error { return errors.Wrapf(err, "Set offer %v", offer) } - answer, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP) + if v.onOffer != nil { + if err := v.onOffer(&offer); err != nil { + return errors.Wrapf(err, "sdp %v %v", offer.Type, offer.SDP) + } + } + + answerSDP, err := apiRtcRequest(ctx, "/rtc/v1/play", r, offer.SDP) if err != nil { return errors.Wrapf(err, "Api request offer=%v", offer.SDP) } // Run a proxy for real server and vnet. - if address, err := parseAddressOfCandidate(answer); err != nil { - return errors.Wrapf(err, "parse address of %v", answer) + if address, err := parseAddressOfCandidate(answerSDP); err != nil { + return errors.Wrapf(err, "parse address of %v", answerSDP) } else if err := v.api.proxy.Proxy(v.api.network, address); err != nil { return errors.Wrapf(err, "proxy %v to %v", v.api.network, address) } - if err := pc.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, SDP: answer, - }); err != nil { - return errors.Wrapf(err, "Set answer %v", answer) + answer := &webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, SDP: answerSDP, + } + if v.onAnswer != nil { + if err := v.onAnswer(answer); err != nil { + return errors.Wrapf(err, "on answerSDP") + } + } + + if err := pc.SetRemoteDescription(*answer); err != nil { + return errors.Wrapf(err, "Set answerSDP %v", answerSDP) } handleTrack := func(ctx context.Context, track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) error { @@ -852,21 +921,19 @@ type testPublisher struct { cancel context.CancelFunc } -func createApiForPublisher(pub *testPublisher) error { - api, err := newTestWebRTCAPI() - if err != nil { - return err - } - - pub.api = api - return nil -} - -func newTestPublisher(options ...testPublisherOptionFunc) (*testPublisher, error) { +// Create test publisher, the init is used to initialize api which maybe nil, +// and the options is used to setup the publisher itself. +func newTestPublisher(init testWebRTCAPIInitFunc, options ...testPublisherOptionFunc) (*testPublisher, error) { sourceVideo, sourceAudio := *srsPublishVideo, *srsPublishAudio v := &testPublisher{} + api, err := newTestWebRTCAPI(init) + if err != nil { + return nil, err + } + v.api = api + for _, opt := range options { if err := opt(v); err != nil { return nil, err @@ -882,7 +949,6 @@ func newTestPublisher(options ...testPublisherOptionFunc) (*testPublisher, error } // Setup the interceptors for packets. - api := v.api api.options = append(api.options, func(api *testWebRTCAPI) { // Filter for RTCP packets. rtcpInterceptor := &rtcpInterceptor{} diff --git a/trunk/conf/clion-ingest.conf b/trunk/conf/clion-ingest.conf new file mode 100644 index 000000000..76fab5f41 --- /dev/null +++ b/trunk/conf/clion-ingest.conf @@ -0,0 +1,53 @@ + +listen 1935; +max_connections 1000; +daemon off; +srs_log_tank console; + +http_server { + enabled on; + listen 8080; + dir ./objs/nginx/html; +} + +http_api { + enabled on; + listen 1985; +} +stats { + network 0; +} +rtc_server { + enabled on; + # Listen at udp://8000 + listen 8000; + # + # The $CANDIDATE means fetch from env, if not configed, use * as default. + # + # The * means retrieving server IP automatically, from all network interfaces, + # @see https://github.com/ossrs/srs/wiki/v4_CN_RTCWiki#config-candidate + candidate $CANDIDATE; +} + +vhost __defaultVhost__ { + rtc { + enabled on; + } + http_remux { + enabled on; + mount [vhost]/[app]/[stream].flv; + } + ingest livestream { + enabled on; + input { + type file; + url ./doc/source.200kbps.768x320.flv; + } + ffmpeg ./objs/ffmpeg/bin/ffmpeg; + engine { + enabled off; + output rtmp://127.0.0.1:[port]/live/livestream; + } + } +} + diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 42e2d7a14..a681c0038 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -2884,7 +2884,7 @@ srs_error_t SrsRtcConnection::negotiate_publish_capability(SrsRtcUserConfig* ruc //local_media_desc.payload_types_.back().rtcp_fb_.push_back("rrtr"); } - // TODO: FIXME: use one parse paylod from sdp. + // TODO: FIXME: use one parse payload from sdp. track_desc->create_auxiliary_payload(remote_media_desc.find_media_with_encoding_name("red")); track_desc->create_auxiliary_payload(remote_media_desc.find_media_with_encoding_name("rtx")); diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index f827cde0f..d11822368 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 144 +#define VERSION_REVISION 145 #endif