mirror of
				https://github.com/ossrs/srs.git
				synced 2025-03-09 15:49:59 +00:00 
			
		
		
		
	TEST: Upgrade pion to v3.2.9. (#3567)
------ Co-authored-by: chundonglinlin <chundonglinlin@163.com>
This commit is contained in:
		
							parent
							
								
									900c4cdd97
								
							
						
					
					
						commit
						1545425e06
					
				
					 1383 changed files with 118469 additions and 41421 deletions
				
			
		
							
								
								
									
										179
									
								
								trunk/3rdparty/srs-bench/vendor/github.com/pion/sctp/stream.go
									
										
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										179
									
								
								trunk/3rdparty/srs-bench/vendor/github.com/pion/sctp/stream.go
									
										
									
										generated
									
									
										vendored
									
									
								
							| 
						 | 
				
			
			@ -1,12 +1,16 @@
 | 
			
		|||
package sctp
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"math"
 | 
			
		||||
	"os"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/pion/logging"
 | 
			
		||||
	"github.com/pkg/errors"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
| 
						 | 
				
			
			@ -18,6 +22,36 @@ const (
 | 
			
		|||
	ReliabilityTypeTimed byte = 2
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// StreamState is an enum for SCTP Stream state field
 | 
			
		||||
// This field identifies the state of stream.
 | 
			
		||||
type StreamState int
 | 
			
		||||
 | 
			
		||||
// StreamState enums
 | 
			
		||||
const (
 | 
			
		||||
	StreamStateOpen    StreamState = iota // Stream object starts with StreamStateOpen
 | 
			
		||||
	StreamStateClosing                    // Outgoing stream is being reset
 | 
			
		||||
	StreamStateClosed                     // Stream has been closed
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func (ss StreamState) String() string {
 | 
			
		||||
	switch ss {
 | 
			
		||||
	case StreamStateOpen:
 | 
			
		||||
		return "open"
 | 
			
		||||
	case StreamStateClosing:
 | 
			
		||||
		return "closing"
 | 
			
		||||
	case StreamStateClosed:
 | 
			
		||||
		return "closed"
 | 
			
		||||
	}
 | 
			
		||||
	return "unknown"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SCTP stream errors
 | 
			
		||||
var (
 | 
			
		||||
	ErrOutboundPacketTooLarge = errors.New("outbound packet larger than maximum message size")
 | 
			
		||||
	ErrStreamClosed           = errors.New("stream closed")
 | 
			
		||||
	ErrReadDeadlineExceeded   = fmt.Errorf("read deadline exceeded: %w", os.ErrDeadlineExceeded)
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Stream represents an SCTP stream
 | 
			
		||||
type Stream struct {
 | 
			
		||||
	association         *Association
 | 
			
		||||
| 
						 | 
				
			
			@ -28,13 +62,14 @@ type Stream struct {
 | 
			
		|||
	sequenceNumber      uint16
 | 
			
		||||
	readNotifier        *sync.Cond
 | 
			
		||||
	readErr             error
 | 
			
		||||
	writeErr            error
 | 
			
		||||
	readTimeoutCancel   chan struct{}
 | 
			
		||||
	unordered           bool
 | 
			
		||||
	reliabilityType     byte
 | 
			
		||||
	reliabilityValue    uint32
 | 
			
		||||
	bufferedAmount      uint64
 | 
			
		||||
	bufferedAmountLow   uint64
 | 
			
		||||
	onBufferedAmountLow func()
 | 
			
		||||
	state               StreamState
 | 
			
		||||
	log                 logging.LeveledLogger
 | 
			
		||||
	name                string
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -48,15 +83,7 @@ func (s *Stream) StreamIdentifier() uint16 {
 | 
			
		|||
 | 
			
		||||
// SetDefaultPayloadType sets the default payload type used by Write.
 | 
			
		||||
func (s *Stream) SetDefaultPayloadType(defaultPayloadType PayloadProtocolIdentifier) {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	s.setDefaultPayloadType(defaultPayloadType)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// setDefaultPayloadType sets the defaultPayloadType. The caller should hold the lock.
 | 
			
		||||
func (s *Stream) setDefaultPayloadType(defaultPayloadType PayloadProtocolIdentifier) {
 | 
			
		||||
	s.defaultPayloadType = defaultPayloadType
 | 
			
		||||
	atomic.StoreUint32((*uint32)(&s.defaultPayloadType), uint32(defaultPayloadType))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetReliabilityParams sets reliability parameters for this stream.
 | 
			
		||||
| 
						 | 
				
			
			@ -93,6 +120,14 @@ func (s *Stream) ReadSCTP(p []byte) (int, PayloadProtocolIdentifier, error) {
 | 
			
		|||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		// close readTimeoutCancel if the current read timeout routine is no longer effective
 | 
			
		||||
		if s.readTimeoutCancel != nil && s.readErr != nil {
 | 
			
		||||
			close(s.readTimeoutCancel)
 | 
			
		||||
			s.readTimeoutCancel = nil
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		n, ppi, err := s.reassemblyQueue.read(p)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -110,6 +145,47 @@ func (s *Stream) ReadSCTP(p []byte) (int, PayloadProtocolIdentifier, error) {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetReadDeadline sets the read deadline in an identical way to net.Conn
 | 
			
		||||
func (s *Stream) SetReadDeadline(deadline time.Time) error {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	if s.readTimeoutCancel != nil {
 | 
			
		||||
		close(s.readTimeoutCancel)
 | 
			
		||||
		s.readTimeoutCancel = nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if s.readErr != nil {
 | 
			
		||||
		if !errors.Is(s.readErr, ErrReadDeadlineExceeded) {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
		s.readErr = nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !deadline.IsZero() {
 | 
			
		||||
		s.readTimeoutCancel = make(chan struct{})
 | 
			
		||||
 | 
			
		||||
		go func(readTimeoutCancel chan struct{}) {
 | 
			
		||||
			t := time.NewTimer(time.Until(deadline))
 | 
			
		||||
			select {
 | 
			
		||||
			case <-readTimeoutCancel:
 | 
			
		||||
				t.Stop()
 | 
			
		||||
				return
 | 
			
		||||
			case <-t.C:
 | 
			
		||||
				s.lock.Lock()
 | 
			
		||||
				if s.readErr == nil {
 | 
			
		||||
					s.readErr = ErrReadDeadlineExceeded
 | 
			
		||||
				}
 | 
			
		||||
				s.readTimeoutCancel = nil
 | 
			
		||||
				s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
				s.readNotifier.Signal()
 | 
			
		||||
			}
 | 
			
		||||
		}(s.readTimeoutCancel)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Stream) handleData(pd *chunkPayloadData) {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
| 
						 | 
				
			
			@ -174,26 +250,28 @@ func (s *Stream) handleForwardTSNForUnordered(newCumulativeTSN uint32) {
 | 
			
		|||
 | 
			
		||||
// Write writes len(p) bytes from p with the default Payload Protocol Identifier
 | 
			
		||||
func (s *Stream) Write(p []byte) (n int, err error) {
 | 
			
		||||
	return s.WriteSCTP(p, s.defaultPayloadType)
 | 
			
		||||
	ppi := PayloadProtocolIdentifier(atomic.LoadUint32((*uint32)(&s.defaultPayloadType)))
 | 
			
		||||
	return s.WriteSCTP(p, ppi)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WriteSCTP writes len(p) bytes from p to the DTLS connection
 | 
			
		||||
func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (n int, err error) {
 | 
			
		||||
func (s *Stream) WriteSCTP(p []byte, ppi PayloadProtocolIdentifier) (int, error) {
 | 
			
		||||
	maxMessageSize := s.association.MaxMessageSize()
 | 
			
		||||
	if len(p) > int(maxMessageSize) {
 | 
			
		||||
		return 0, errors.Errorf("Outbound packet larger than maximum message size %v", math.MaxUint16)
 | 
			
		||||
		return 0, fmt.Errorf("%w: %v", ErrOutboundPacketTooLarge, math.MaxUint16)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.lock.RLock()
 | 
			
		||||
	err = s.writeErr
 | 
			
		||||
	s.lock.RUnlock()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, err
 | 
			
		||||
	if s.State() != StreamStateOpen {
 | 
			
		||||
		return 0, ErrStreamClosed
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	chunks := s.packetize(p, ppi)
 | 
			
		||||
 | 
			
		||||
	return len(p), s.association.sendPayloadData(chunks)
 | 
			
		||||
	n := len(p)
 | 
			
		||||
	err := s.association.sendPayloadData(chunks)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return n, ErrStreamClosed
 | 
			
		||||
	}
 | 
			
		||||
	return n, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPayloadData {
 | 
			
		||||
| 
						 | 
				
			
			@ -257,26 +335,23 @@ func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPa
 | 
			
		|||
// Close closes the write-direction of the stream.
 | 
			
		||||
// Future calls to Write are not permitted after calling Close.
 | 
			
		||||
func (s *Stream) Close() error {
 | 
			
		||||
	if sid, isOpen := func() (uint16, bool) {
 | 
			
		||||
	if sid, resetOutbound := func() (uint16, bool) {
 | 
			
		||||
		s.lock.Lock()
 | 
			
		||||
		defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
		isOpen := true
 | 
			
		||||
		if s.writeErr == nil {
 | 
			
		||||
			s.writeErr = errors.New("Stream closed")
 | 
			
		||||
		} else {
 | 
			
		||||
			isOpen = false
 | 
			
		||||
		}
 | 
			
		||||
		s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String())
 | 
			
		||||
 | 
			
		||||
		if s.readErr == nil {
 | 
			
		||||
			s.readErr = io.EOF
 | 
			
		||||
		} else {
 | 
			
		||||
			isOpen = false
 | 
			
		||||
		if s.state == StreamStateOpen {
 | 
			
		||||
			if s.readErr == nil {
 | 
			
		||||
				s.state = StreamStateClosing
 | 
			
		||||
			} else {
 | 
			
		||||
				s.state = StreamStateClosed
 | 
			
		||||
			}
 | 
			
		||||
			s.log.Debugf("[%s] state change: open => %s", s.name, s.state.String())
 | 
			
		||||
			return s.streamIdentifier, true
 | 
			
		||||
		}
 | 
			
		||||
		s.readNotifier.Broadcast() // broadcast regardless
 | 
			
		||||
 | 
			
		||||
		return s.streamIdentifier, isOpen
 | 
			
		||||
	}(); isOpen {
 | 
			
		||||
		return s.streamIdentifier, false
 | 
			
		||||
	}(); resetOutbound {
 | 
			
		||||
		// Reset the outgoing stream
 | 
			
		||||
		// https://tools.ietf.org/html/rfc6525
 | 
			
		||||
		return s.association.sendResetRequest(sid)
 | 
			
		||||
| 
						 | 
				
			
			@ -355,3 +430,35 @@ func (s *Stream) getNumBytesInReassemblyQueue() int {
 | 
			
		|||
	// No lock is required as it reads the size with atomic load function.
 | 
			
		||||
	return s.reassemblyQueue.getNumBytes()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Stream) onInboundStreamReset() {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String())
 | 
			
		||||
 | 
			
		||||
	// No more inbound data to read. Unblock the read with io.EOF.
 | 
			
		||||
	// This should cause DCEP layer (datachannel package) to call Close() which
 | 
			
		||||
	// will reset outgoing stream also.
 | 
			
		||||
 | 
			
		||||
	// See RFC 8831 section 6.7:
 | 
			
		||||
	//	if one side decides to close the data channel, it resets the corresponding
 | 
			
		||||
	//	outgoing stream.  When the peer sees that an incoming stream was
 | 
			
		||||
	//	reset, it also resets its corresponding outgoing stream.  Once this
 | 
			
		||||
	//	is completed, the data channel is closed.
 | 
			
		||||
 | 
			
		||||
	s.readErr = io.EOF
 | 
			
		||||
	s.readNotifier.Broadcast()
 | 
			
		||||
 | 
			
		||||
	if s.state == StreamStateClosing {
 | 
			
		||||
		s.log.Debugf("[%s] state change: closing => closed", s.name)
 | 
			
		||||
		s.state = StreamStateClosed
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// State return the stream state.
 | 
			
		||||
func (s *Stream) State() StreamState {
 | 
			
		||||
	s.lock.RLock()
 | 
			
		||||
	defer s.lock.RUnlock()
 | 
			
		||||
	return s.state
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue