diff --git a/trunk/3rdparty/srs-bench/srs/api.go b/trunk/3rdparty/srs-bench/srs/api.go new file mode 100644 index 000000000..db24bfd32 --- /dev/null +++ b/trunk/3rdparty/srs-bench/srs/api.go @@ -0,0 +1,154 @@ +// The MIT License (MIT) +// +// Copyright (c) 2021 Winlin +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +package srs + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" +) + +// Request SRS API and got response, both in JSON. +// The r is HTTP API to request, like "http://localhost:1985/rtc/v1/play". +// The req is the HTTP request body, will be marshal to JSON object. nil is no body +// The res is the HTTP response body, already unmarshal to JSON object. +func apiRequest(ctx context.Context, r string, req interface{}, res interface{}) error { + var b []byte + if req != nil { + if b0, err := json.Marshal(req); err != nil { + return errors.Wrapf(err, "Marshal body %v", req) + } else { + b = b0 + } + } + logger.If(ctx, "Request url api=%v with %v", r, string(b)) + logger.Tf(ctx, "Request url api=%v with %v bytes", r, len(b)) + + method := "POST" + if req == nil { + method = "GET" + } + reqObj, err := http.NewRequest(method, r, strings.NewReader(string(b))) + if err != nil { + return errors.Wrapf(err, "HTTP request %v", string(b)) + } + + resObj, err := http.DefaultClient.Do(reqObj.WithContext(ctx)) + if err != nil { + return errors.Wrapf(err, "Do HTTP request %v", string(b)) + } + + b2, err := ioutil.ReadAll(resObj.Body) + if err != nil { + return errors.Wrapf(err, "Read response for %v", string(b)) + } + logger.If(ctx, "Response from %v is %v", r, string(b2)) + logger.Tf(ctx, "Response from %v is %v bytes", r, len(b2)) + + errorCode := struct { + Code int `json:"code"` + }{} + if err := json.Unmarshal(b2, &errorCode); err != nil { + return errors.Wrapf(err, "Unmarshal %v", string(b2)) + } + if errorCode.Code != 0 { + return errors.Errorf("Server fail code=%v %v", errorCode.Code, string(b2)) + } + + if err := json.Unmarshal(b2, res); err != nil { + return errors.Wrapf(err, "Unmarshal %v", string(b2)) + } + logger.Tf(ctx, "Parse response to code=%v ok, %v", errorCode.Code, res) + + return nil +} + +// The SRS HTTP statistic API. +type statAPI struct { + ctx context.Context + streams []*statStream + stream *statStream +} + +func newStatAPI(ctx context.Context) *statAPI { + return &statAPI{ctx: ctx} +} + +type statGeneral struct { + Code int `json:"code"` + Server string `json:"server"` +} + +type statPublishInStream struct { + Cid string `json:"cid"` + Active bool `json:"active"` +} + +func (v statPublishInStream) String() string { + return fmt.Sprintf("id=%v, active=%v", v.Cid, v.Active) +} + +type statStream struct { + ID string `json:"id"` + Vhost string `json:"vhost"` + App string `json:"app"` + Name string `json:"name"` + Clients int `json:"clients"` + Publish statPublishInStream `json:"publish"` +} + +func (v statStream) String() string { + return fmt.Sprintf("id=%v, name=%v, pub=%v", v.ID, v.Name, v.Publish) +} + +// Output to v.streams +func (v *statAPI) Streams() *statAPI { + res := struct { + statGeneral + Streams []*statStream `json:"streams"` + }{} + + ctx := v.ctx + if err := apiRequest(ctx, "http://localhost:1985/api/v1/streams/", nil, &res); err != nil { + logger.Tf(ctx, "query streams err %+v", err) + return v + } + + v.streams = res.Streams + return v +} + +// Output to v.stream +func (v *statAPI) FilterByStreamSuffix(suffix string) *statAPI { + for _, stream := range v.streams { + if strings.HasSuffix(stream.Name, suffix) { + v.stream = stream + break + } + } + return v +} diff --git a/trunk/3rdparty/srs-bench/srs/rtc_test.go b/trunk/3rdparty/srs-bench/srs/rtc_test.go index 686e54819..941b1ccbe 100644 --- a/trunk/3rdparty/srs-bench/srs/rtc_test.go +++ b/trunk/3rdparty/srs-bench/srs/rtc_test.go @@ -58,6 +58,44 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +// Test for https://github.com/ossrs/srs/pull/2483 +func TestPR2483_RtcStatApi_PublisherOnly(t *testing.T) { + if err := filterTestError(func() error { + streamSuffix := fmt.Sprintf("publish-only-%v-%v", os.Getpid(), rand.Int()) + p, err := newTestPublisher(createApiForPublisher, func(p *testPublisher) error { + p.streamSuffix = streamSuffix + return nil + }) + if err != nil { + return err + } + defer p.Close() + + ctx, cancel := context.WithTimeout(logger.WithContext(context.Background()), time.Duration(*srsTimeout)*time.Millisecond) + if err := p.Setup(*srsVnetClientIP, func(api *testWebRTCAPI) { + var once sync.Once + api.registry.Add(newRTCPInterceptor(func(i *rtcpInterceptor) { + i.rtcpReader = func(buf []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + once.Do(func() { + stat := newStatAPI(ctx).Streams().FilterByStreamSuffix(p.streamSuffix) + logger.Tf(ctx, "Check publishing, streams=%v, stream=%v", len(stat.streams), stat.stream) + if stat.stream != nil { + cancel() // done + } + }) + return i.nextRTCPReader.Read(buf, attributes) + } + })) + }); err != nil { + return err + } + + return p.Run(ctx, cancel) + }()); err != nil { + t.Errorf("err %+v", err) + } +} + // Basic use scenario, publish a stream. func TestRtcBasic_PublishOnly(t *testing.T) { if err := filterTestError(func() error { diff --git a/trunk/3rdparty/srs-bench/srs/util.go b/trunk/3rdparty/srs-bench/srs/util.go index 54516e1f0..ed8692523 100644 --- a/trunk/3rdparty/srs-bench/srs/util.go +++ b/trunk/3rdparty/srs-bench/srs/util.go @@ -23,13 +23,10 @@ package srs import ( "bytes" "context" - "encoding/json" "flag" "fmt" "io" - "io/ioutil" "net" - "net/http" "net/url" "os" "path" @@ -130,6 +127,10 @@ func prepareTest() error { return nil } +// Request SRS RTC API, the apiPath like "/rtc/v1/play", the r is WebRTC url like +// "webrtc://localhost/live/livestream", and the offer is SDP in string. +// +// Return the response of answer SDP in string. func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error) { u, err := url.Parse(r) if err != nil { @@ -165,41 +166,18 @@ func apiRtcRequest(ctx context.Context, apiPath, r, offer string) (string, error api, "", offer, r, } - b, err := json.Marshal(reqBody) - if err != nil { - return "", errors.Wrapf(err, "Marshal body %v", reqBody) - } - logger.If(ctx, "Request url api=%v with %v", api, string(b)) - logger.Tf(ctx, "Request url api=%v with %v bytes", api, len(b)) - - req, err := http.NewRequest("POST", api, strings.NewReader(string(b))) - if err != nil { - return "", errors.Wrapf(err, "HTTP request %v", string(b)) - } - - res, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return "", errors.Wrapf(err, "Do HTTP request %v", string(b)) - } - - b2, err := ioutil.ReadAll(res.Body) - if err != nil { - return "", errors.Wrapf(err, "Read response for %v", string(b)) - } - logger.If(ctx, "Response from %v is %v", api, string(b2)) - logger.Tf(ctx, "Response from %v is %v bytes", api, len(b2)) - resBody := struct { Code int `json:"code"` Session string `json:"sessionid"` SDP string `json:"sdp"` }{} - if err := json.Unmarshal(b2, &resBody); err != nil { - return "", errors.Wrapf(err, "Marshal %v", string(b2)) + + if err := apiRequest(ctx, api, reqBody, &resBody); err != nil { + return "", errors.Wrapf(err, "request api=%v", api) } if resBody.Code != 0 { - return "", errors.Errorf("Server fail code=%v %v", resBody.Code, string(b2)) + return "", errors.Errorf("Server fail code=%v", resBody.Code) } logger.If(ctx, "Parse response to code=%v, session=%v, sdp=%v", resBody.Code, resBody.Session, escapeSDP(resBody.SDP))