mirror of
				https://github.com/ossrs/srs.git
				synced 2025-03-09 15:49:59 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			2241 lines
		
	
	
	
		
			64 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			2241 lines
		
	
	
	
		
			64 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package sctp
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"math"
 | |
| 	"net"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/pion/logging"
 | |
| 	"github.com/pion/randutil"
 | |
| 	"github.com/pkg/errors"
 | |
| )
 | |
| 
 | |
| // Use global random generator to properly seed by crypto grade random.
 | |
| var (
 | |
| 	globalMathRandomGenerator = randutil.NewMathRandomGenerator() // nolint:gochecknoglobals
 | |
| 	errChunk                  = errors.New("Abort chunk, with following errors")
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	receiveMTU            uint32 = 8192 // MTU for inbound packet (from DTLS)
 | |
| 	initialMTU            uint32 = 1228 // initial MTU for outgoing packets (to DTLS)
 | |
| 	initialRecvBufSize    uint32 = 1024 * 1024
 | |
| 	commonHeaderSize      uint32 = 12
 | |
| 	dataChunkHeaderSize   uint32 = 16
 | |
| 	defaultMaxMessageSize uint32 = 65536
 | |
| )
 | |
| 
 | |
| // association state enums
 | |
| const (
 | |
| 	closed uint32 = iota
 | |
| 	cookieWait
 | |
| 	cookieEchoed
 | |
| 	established
 | |
| 	shutdownAckSent
 | |
| 	shutdownPending
 | |
| 	shutdownReceived
 | |
| 	shutdownSent
 | |
| )
 | |
| 
 | |
| // retransmission timer IDs
 | |
| const (
 | |
| 	timerT1Init int = iota
 | |
| 	timerT1Cookie
 | |
| 	timerT3RTX
 | |
| 	timerReconfig
 | |
| )
 | |
| 
 | |
| // ack mode (for testing)
 | |
| const (
 | |
| 	ackModeNormal int = iota
 | |
| 	ackModeNoDelay
 | |
| 	ackModeAlwaysDelay
 | |
| )
 | |
| 
 | |
| // ack transmission state
 | |
| const (
 | |
| 	ackStateIdle      int = iota // ack timer is off
 | |
| 	ackStateImmediate            // ack timer is on (ack is being delayed)
 | |
| 	ackStateDelay                // will send ack immediately
 | |
| )
 | |
| 
 | |
| // other constants
 | |
| const (
 | |
| 	acceptChSize = 16
 | |
| )
 | |
| 
 | |
| func getAssociationStateString(a uint32) string {
 | |
| 	switch a {
 | |
| 	case closed:
 | |
| 		return "Closed"
 | |
| 	case cookieWait:
 | |
| 		return "CookieWait"
 | |
| 	case cookieEchoed:
 | |
| 		return "CookieEchoed"
 | |
| 	case established:
 | |
| 		return "Established"
 | |
| 	case shutdownPending:
 | |
| 		return "ShutdownPending"
 | |
| 	case shutdownSent:
 | |
| 		return "ShutdownSent"
 | |
| 	case shutdownReceived:
 | |
| 		return "ShutdownReceived"
 | |
| 	case shutdownAckSent:
 | |
| 		return "ShutdownAckSent"
 | |
| 	default:
 | |
| 		return fmt.Sprintf("Invalid association state %d", a)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Association represents an SCTP association
 | |
| // 13.2.  Parameters Necessary per Association (i.e., the TCB)
 | |
| // Peer        : Tag value to be sent in every packet and is received
 | |
| // Verification: in the INIT or INIT ACK chunk.
 | |
| // Tag         :
 | |
| //
 | |
| // My          : Tag expected in every inbound packet and sent in the
 | |
| // Verification: INIT or INIT ACK chunk.
 | |
| //
 | |
| // Tag         :
 | |
| // State       : A state variable indicating what state the association
 | |
| //             : is in, i.e., COOKIE-WAIT, COOKIE-ECHOED, ESTABLISHED,
 | |
| //             : SHUTDOWN-PENDING, SHUTDOWN-SENT, SHUTDOWN-RECEIVED,
 | |
| //             : SHUTDOWN-ACK-SENT.
 | |
| //
 | |
| //               Note: No "CLOSED" state is illustrated since if a
 | |
| //               association is "CLOSED" its TCB SHOULD be removed.
 | |
| type Association struct {
 | |
| 	bytesReceived uint64
 | |
| 	bytesSent     uint64
 | |
| 
 | |
| 	lock sync.RWMutex
 | |
| 
 | |
| 	netConn net.Conn
 | |
| 
 | |
| 	peerVerificationTag    uint32
 | |
| 	myVerificationTag      uint32
 | |
| 	state                  uint32
 | |
| 	myNextTSN              uint32 // nextTSN
 | |
| 	peerLastTSN            uint32 // lastRcvdTSN
 | |
| 	minTSN2MeasureRTT      uint32 // for RTT measurement
 | |
| 	willSendForwardTSN     bool
 | |
| 	willRetransmitFast     bool
 | |
| 	willRetransmitReconfig bool
 | |
| 
 | |
| 	// Reconfig
 | |
| 	myNextRSN        uint32
 | |
| 	reconfigs        map[uint32]*chunkReconfig
 | |
| 	reconfigRequests map[uint32]*paramOutgoingResetRequest
 | |
| 
 | |
| 	// Non-RFC internal data
 | |
| 	sourcePort              uint16
 | |
| 	destinationPort         uint16
 | |
| 	myMaxNumInboundStreams  uint16
 | |
| 	myMaxNumOutboundStreams uint16
 | |
| 	myCookie                *paramStateCookie
 | |
| 	payloadQueue            *payloadQueue
 | |
| 	inflightQueue           *payloadQueue
 | |
| 	pendingQueue            *pendingQueue
 | |
| 	controlQueue            *controlQueue
 | |
| 	mtu                     uint32
 | |
| 	maxPayloadSize          uint32 // max DATA chunk payload size
 | |
| 	cumulativeTSNAckPoint   uint32
 | |
| 	advancedPeerTSNAckPoint uint32
 | |
| 	useForwardTSN           bool
 | |
| 
 | |
| 	// Congestion control parameters
 | |
| 	maxReceiveBufferSize uint32
 | |
| 	maxMessageSize       uint32
 | |
| 	cwnd                 uint32 // my congestion window size
 | |
| 	rwnd                 uint32 // calculated peer's receiver windows size
 | |
| 	ssthresh             uint32 // slow start threshold
 | |
| 	partialBytesAcked    uint32
 | |
| 	inFastRecovery       bool
 | |
| 	fastRecoverExitPoint uint32
 | |
| 
 | |
| 	// RTX & Ack timer
 | |
| 	rtoMgr    *rtoManager
 | |
| 	t1Init    *rtxTimer
 | |
| 	t1Cookie  *rtxTimer
 | |
| 	t3RTX     *rtxTimer
 | |
| 	tReconfig *rtxTimer
 | |
| 	ackTimer  *ackTimer
 | |
| 
 | |
| 	// Chunks stored for retransmission
 | |
| 	storedInit       *chunkInit
 | |
| 	storedCookieEcho *chunkCookieEcho
 | |
| 
 | |
| 	streams              map[uint16]*Stream
 | |
| 	acceptCh             chan *Stream
 | |
| 	readLoopCloseCh      chan struct{}
 | |
| 	awakeWriteLoopCh     chan struct{}
 | |
| 	closeWriteLoopCh     chan struct{}
 | |
| 	handshakeCompletedCh chan error
 | |
| 
 | |
| 	closeWriteLoopOnce sync.Once
 | |
| 
 | |
| 	// local error
 | |
| 	silentError error
 | |
| 
 | |
| 	ackState int
 | |
| 	ackMode  int // for testing
 | |
| 
 | |
| 	// stats
 | |
| 	stats *associationStats
 | |
| 
 | |
| 	// per inbound packet context
 | |
| 	delayedAckTriggered   bool
 | |
| 	immediateAckTriggered bool
 | |
| 
 | |
| 	name string
 | |
| 	log  logging.LeveledLogger
 | |
| }
 | |
| 
 | |
| // Config collects the arguments to createAssociation construction into
 | |
| // a single structure
 | |
| type Config struct {
 | |
| 	NetConn              net.Conn
 | |
| 	MaxReceiveBufferSize uint32
 | |
| 	MaxMessageSize       uint32
 | |
| 	LoggerFactory        logging.LoggerFactory
 | |
| }
 | |
| 
 | |
| // Server accepts a SCTP stream over a conn
 | |
| func Server(config Config) (*Association, error) {
 | |
| 	a := createAssociation(config)
 | |
| 	a.init(false)
 | |
| 
 | |
| 	select {
 | |
| 	case err := <-a.handshakeCompletedCh:
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		return a, nil
 | |
| 	case <-a.readLoopCloseCh:
 | |
| 		return nil, errors.Errorf("association closed before connecting")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Client opens a SCTP stream over a conn
 | |
| func Client(config Config) (*Association, error) {
 | |
| 	a := createAssociation(config)
 | |
| 	a.init(true)
 | |
| 
 | |
| 	select {
 | |
| 	case err := <-a.handshakeCompletedCh:
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		return a, nil
 | |
| 	case <-a.readLoopCloseCh:
 | |
| 		return nil, errors.Errorf("association closed before connecting")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func createAssociation(config Config) *Association {
 | |
| 	var maxReceiveBufferSize uint32
 | |
| 	if config.MaxReceiveBufferSize == 0 {
 | |
| 		maxReceiveBufferSize = initialRecvBufSize
 | |
| 	} else {
 | |
| 		maxReceiveBufferSize = config.MaxReceiveBufferSize
 | |
| 	}
 | |
| 
 | |
| 	var maxMessageSize uint32
 | |
| 	if config.MaxMessageSize == 0 {
 | |
| 		maxMessageSize = defaultMaxMessageSize
 | |
| 	} else {
 | |
| 		maxMessageSize = config.MaxMessageSize
 | |
| 	}
 | |
| 
 | |
| 	tsn := globalMathRandomGenerator.Uint32()
 | |
| 	a := &Association{
 | |
| 		netConn:                 config.NetConn,
 | |
| 		maxReceiveBufferSize:    maxReceiveBufferSize,
 | |
| 		maxMessageSize:          maxMessageSize,
 | |
| 		myMaxNumOutboundStreams: math.MaxUint16,
 | |
| 		myMaxNumInboundStreams:  math.MaxUint16,
 | |
| 		payloadQueue:            newPayloadQueue(),
 | |
| 		inflightQueue:           newPayloadQueue(),
 | |
| 		pendingQueue:            newPendingQueue(),
 | |
| 		controlQueue:            newControlQueue(),
 | |
| 		mtu:                     initialMTU,
 | |
| 		maxPayloadSize:          initialMTU - (commonHeaderSize + dataChunkHeaderSize),
 | |
| 		myVerificationTag:       globalMathRandomGenerator.Uint32(),
 | |
| 		myNextTSN:               tsn,
 | |
| 		myNextRSN:               tsn,
 | |
| 		minTSN2MeasureRTT:       tsn,
 | |
| 		state:                   closed,
 | |
| 		rtoMgr:                  newRTOManager(),
 | |
| 		streams:                 map[uint16]*Stream{},
 | |
| 		reconfigs:               map[uint32]*chunkReconfig{},
 | |
| 		reconfigRequests:        map[uint32]*paramOutgoingResetRequest{},
 | |
| 		acceptCh:                make(chan *Stream, acceptChSize),
 | |
| 		readLoopCloseCh:         make(chan struct{}),
 | |
| 		awakeWriteLoopCh:        make(chan struct{}, 1),
 | |
| 		closeWriteLoopCh:        make(chan struct{}),
 | |
| 		handshakeCompletedCh:    make(chan error),
 | |
| 		cumulativeTSNAckPoint:   tsn - 1,
 | |
| 		advancedPeerTSNAckPoint: tsn - 1,
 | |
| 		silentError:             errors.Errorf("silently discard"),
 | |
| 		stats:                   &associationStats{},
 | |
| 		log:                     config.LoggerFactory.NewLogger("sctp"),
 | |
| 	}
 | |
| 
 | |
| 	a.name = fmt.Sprintf("%p", a)
 | |
| 
 | |
| 	// RFC 4690 Sec 7.2.1
 | |
| 	//  o  The initial cwnd before DATA transmission or after a sufficiently
 | |
| 	//     long idle period MUST be set to min(4*MTU, max (2*MTU, 4380
 | |
| 	//     bytes)).
 | |
| 	a.cwnd = min32(4*a.mtu, max32(2*a.mtu, 4380))
 | |
| 	a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (INI)",
 | |
| 		a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes())
 | |
| 
 | |
| 	a.t1Init = newRTXTimer(timerT1Init, a, maxInitRetrans)
 | |
| 	a.t1Cookie = newRTXTimer(timerT1Cookie, a, maxInitRetrans)
 | |
| 	a.t3RTX = newRTXTimer(timerT3RTX, a, noMaxRetrans)        // retransmit forever
 | |
| 	a.tReconfig = newRTXTimer(timerReconfig, a, noMaxRetrans) // retransmit forever
 | |
| 	a.ackTimer = newAckTimer(a)
 | |
| 
 | |
| 	return a
 | |
| }
 | |
| 
 | |
| func (a *Association) init(isClient bool) {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	go a.readLoop()
 | |
| 	go a.writeLoop()
 | |
| 
 | |
| 	if isClient {
 | |
| 		a.setState(cookieWait)
 | |
| 		init := &chunkInit{}
 | |
| 		init.initialTSN = a.myNextTSN
 | |
| 		init.numOutboundStreams = a.myMaxNumOutboundStreams
 | |
| 		init.numInboundStreams = a.myMaxNumInboundStreams
 | |
| 		init.initiateTag = a.myVerificationTag
 | |
| 		init.advertisedReceiverWindowCredit = a.maxReceiveBufferSize
 | |
| 		setSupportedExtensions(&init.chunkInitCommon)
 | |
| 		a.storedInit = init
 | |
| 
 | |
| 		err := a.sendInit()
 | |
| 		if err != nil {
 | |
| 			a.log.Errorf("[%s] failed to send init: %s", a.name, err.Error())
 | |
| 		}
 | |
| 
 | |
| 		a.t1Init.start(a.rtoMgr.getRTO())
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // caller must hold a.lock
 | |
| func (a *Association) sendInit() error {
 | |
| 	a.log.Debugf("[%s] sending INIT", a.name)
 | |
| 	if a.storedInit == nil {
 | |
| 		return errors.Errorf("the init not stored to send")
 | |
| 	}
 | |
| 
 | |
| 	outbound := &packet{}
 | |
| 	outbound.verificationTag = a.peerVerificationTag
 | |
| 	a.sourcePort = 5000      // Spec??
 | |
| 	a.destinationPort = 5000 // Spec??
 | |
| 	outbound.sourcePort = a.sourcePort
 | |
| 	outbound.destinationPort = a.destinationPort
 | |
| 
 | |
| 	outbound.chunks = []chunk{a.storedInit}
 | |
| 
 | |
| 	a.controlQueue.push(outbound)
 | |
| 	a.awakeWriteLoop()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // caller must hold a.lock
 | |
| func (a *Association) sendCookieEcho() error {
 | |
| 	if a.storedCookieEcho == nil {
 | |
| 		return errors.Errorf("cookieEcho not stored to send")
 | |
| 	}
 | |
| 
 | |
| 	a.log.Debugf("[%s] sending COOKIE-ECHO", a.name)
 | |
| 
 | |
| 	outbound := &packet{}
 | |
| 	outbound.verificationTag = a.peerVerificationTag
 | |
| 	outbound.sourcePort = a.sourcePort
 | |
| 	outbound.destinationPort = a.destinationPort
 | |
| 	outbound.chunks = []chunk{a.storedCookieEcho}
 | |
| 
 | |
| 	a.controlQueue.push(outbound)
 | |
| 	a.awakeWriteLoop()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close ends the SCTP Association and cleans up any state
 | |
| func (a *Association) Close() error {
 | |
| 	a.log.Debugf("[%s] closing association..", a.name)
 | |
| 
 | |
| 	a.setState(closed)
 | |
| 
 | |
| 	err := a.netConn.Close()
 | |
| 
 | |
| 	a.closeAllTimers()
 | |
| 
 | |
| 	// awake writeLoop to exit
 | |
| 	a.closeWriteLoopOnce.Do(func() { close(a.closeWriteLoopCh) })
 | |
| 
 | |
| 	// Wait for readLoop to end
 | |
| 	<-a.readLoopCloseCh
 | |
| 
 | |
| 	a.log.Debugf("[%s] association closed", a.name)
 | |
| 	a.log.Debugf("[%s] stats nDATAs (in) : %d", a.name, a.stats.getNumDATAs())
 | |
| 	a.log.Debugf("[%s] stats nSACKs (in) : %d", a.name, a.stats.getNumSACKs())
 | |
| 	a.log.Debugf("[%s] stats nT3Timeouts : %d", a.name, a.stats.getNumT3Timeouts())
 | |
| 	a.log.Debugf("[%s] stats nAckTimeouts: %d", a.name, a.stats.getNumAckTimeouts())
 | |
| 	a.log.Debugf("[%s] stats nFastRetrans: %d", a.name, a.stats.getNumFastRetrans())
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (a *Association) closeAllTimers() {
 | |
| 	// Close all retransmission & ack timers
 | |
| 	a.t1Init.close()
 | |
| 	a.t1Cookie.close()
 | |
| 	a.t3RTX.close()
 | |
| 	a.tReconfig.close()
 | |
| 	a.ackTimer.close()
 | |
| }
 | |
| 
 | |
| func (a *Association) readLoop() {
 | |
| 	var closeErr error
 | |
| 	defer func() {
 | |
| 		// also stop writeLoop, otherwise writeLoop can be leaked
 | |
| 		// if connection is lost when there is no writing packet.
 | |
| 		a.closeWriteLoopOnce.Do(func() { close(a.closeWriteLoopCh) })
 | |
| 
 | |
| 		a.lock.Lock()
 | |
| 		for _, s := range a.streams {
 | |
| 			a.unregisterStream(s, closeErr)
 | |
| 		}
 | |
| 		a.lock.Unlock()
 | |
| 		close(a.acceptCh)
 | |
| 		close(a.readLoopCloseCh)
 | |
| 	}()
 | |
| 
 | |
| 	a.log.Debugf("[%s] readLoop entered", a.name)
 | |
| 	buffer := make([]byte, receiveMTU)
 | |
| 
 | |
| 	for {
 | |
| 		n, err := a.netConn.Read(buffer)
 | |
| 		if err != nil {
 | |
| 			closeErr = err
 | |
| 			break
 | |
| 		}
 | |
| 		// Make a buffer sized to what we read, then copy the data we
 | |
| 		// read from the underlying transport. We do this because the
 | |
| 		// user data is passed to the reassembly queue without
 | |
| 		// copying.
 | |
| 		inbound := make([]byte, n)
 | |
| 		copy(inbound, buffer[:n])
 | |
| 		atomic.AddUint64(&a.bytesReceived, uint64(n))
 | |
| 		if err = a.handleInbound(inbound); err != nil {
 | |
| 			closeErr = err
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	a.log.Debugf("[%s] readLoop exited %s", a.name, closeErr)
 | |
| }
 | |
| 
 | |
| func (a *Association) writeLoop() {
 | |
| 	a.log.Debugf("[%s] writeLoop entered", a.name)
 | |
| 
 | |
| loop:
 | |
| 	for {
 | |
| 		rawPackets := a.gatherOutbound()
 | |
| 
 | |
| 		for _, raw := range rawPackets {
 | |
| 			_, err := a.netConn.Write(raw)
 | |
| 			if err != nil {
 | |
| 				if err != io.EOF {
 | |
| 					a.log.Warnf("[%s] failed to write packets on netConn: %v", a.name, err)
 | |
| 				}
 | |
| 				a.log.Debugf("[%s] writeLoop ended", a.name)
 | |
| 				break loop
 | |
| 			}
 | |
| 			atomic.AddUint64(&a.bytesSent, uint64(len(raw)))
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case <-a.awakeWriteLoopCh:
 | |
| 		case <-a.closeWriteLoopCh:
 | |
| 			break loop
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	a.setState(closed)
 | |
| 	a.closeAllTimers()
 | |
| 
 | |
| 	a.log.Debugf("[%s] writeLoop exited", a.name)
 | |
| }
 | |
| 
 | |
| func (a *Association) awakeWriteLoop() {
 | |
| 	select {
 | |
| 	case a.awakeWriteLoopCh <- struct{}{}:
 | |
| 	default:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // unregisterStream un-registers a stream from the association
 | |
| // The caller should hold the association write lock.
 | |
| func (a *Association) unregisterStream(s *Stream, err error) {
 | |
| 	s.lock.Lock()
 | |
| 	defer s.lock.Unlock()
 | |
| 
 | |
| 	delete(a.streams, s.streamIdentifier)
 | |
| 	s.readErr = err
 | |
| 	s.readNotifier.Broadcast()
 | |
| }
 | |
| 
 | |
| // handleInbound parses incoming raw packets
 | |
| func (a *Association) handleInbound(raw []byte) error {
 | |
| 	p := &packet{}
 | |
| 	if err := p.unmarshal(raw); err != nil {
 | |
| 		a.log.Warnf("[%s] unable to parse SCTP packet %s", a.name, err)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if err := checkPacket(p); err != nil {
 | |
| 		a.log.Warnf("[%s] failed validating packet %s", a.name, err)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	a.handleChunkStart()
 | |
| 
 | |
| 	for _, c := range p.chunks {
 | |
| 		if err := a.handleChunk(p, c); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	a.handleChunkEnd()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock
 | |
| func (a *Association) gatherOutboundDataAndReconfigPackets(rawPackets [][]byte) [][]byte {
 | |
| 	for _, p := range a.getDataPacketsToRetransmit() {
 | |
| 		raw, err := p.marshal()
 | |
| 		if err != nil {
 | |
| 			a.log.Warnf("[%s] failed to serialize a DATA packet to be retransmitted", a.name)
 | |
| 			continue
 | |
| 		}
 | |
| 		rawPackets = append(rawPackets, raw)
 | |
| 	}
 | |
| 
 | |
| 	// Pop unsent data chunks from the pending queue to send as much as
 | |
| 	// cwnd and rwnd allow.
 | |
| 	chunks, sisToReset := a.popPendingDataChunksToSend()
 | |
| 	if len(chunks) > 0 {
 | |
| 		// Start timer. (noop if already started)
 | |
| 		a.log.Tracef("[%s] T3-rtx timer start (pt1)", a.name)
 | |
| 		a.t3RTX.start(a.rtoMgr.getRTO())
 | |
| 		for _, p := range a.bundleDataChunksIntoPackets(chunks) {
 | |
| 			raw, err := p.marshal()
 | |
| 			if err != nil {
 | |
| 				a.log.Warnf("[%s] failed to serialize a DATA packet", a.name)
 | |
| 				continue
 | |
| 			}
 | |
| 			rawPackets = append(rawPackets, raw)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if len(sisToReset) > 0 || a.willRetransmitReconfig {
 | |
| 		if a.willRetransmitReconfig {
 | |
| 			a.willRetransmitReconfig = false
 | |
| 			a.log.Debugf("[%s] retransmit %d RECONFIG chunk(s)", a.name, len(a.reconfigs))
 | |
| 			for _, c := range a.reconfigs {
 | |
| 				p := a.createPacket([]chunk{c})
 | |
| 				raw, err := p.marshal()
 | |
| 				if err != nil {
 | |
| 					a.log.Warnf("[%s] failed to serialize a RECONFIG packet to be retransmitted", a.name)
 | |
| 				} else {
 | |
| 					rawPackets = append(rawPackets, raw)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if len(sisToReset) > 0 {
 | |
| 			rsn := a.generateNextRSN()
 | |
| 			tsn := a.myNextTSN - 1
 | |
| 			c := &chunkReconfig{
 | |
| 				paramA: ¶mOutgoingResetRequest{
 | |
| 					reconfigRequestSequenceNumber: rsn,
 | |
| 					senderLastTSN:                 tsn,
 | |
| 					streamIdentifiers:             sisToReset,
 | |
| 				},
 | |
| 			}
 | |
| 			a.reconfigs[rsn] = c // store in the map for retransmission
 | |
| 			a.log.Debugf("[%s] sending RECONFIG: rsn=%d tsn=%d streams=%v",
 | |
| 				a.name, rsn, a.myNextTSN-1, sisToReset)
 | |
| 			p := a.createPacket([]chunk{c})
 | |
| 			raw, err := p.marshal()
 | |
| 			if err != nil {
 | |
| 				a.log.Warnf("[%s] failed to serialize a RECONFIG packet to be transmitted", a.name)
 | |
| 			} else {
 | |
| 				rawPackets = append(rawPackets, raw)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if len(a.reconfigs) > 0 {
 | |
| 			a.tReconfig.start(a.rtoMgr.getRTO())
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return rawPackets
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock
 | |
| func (a *Association) gatherOutboundFrastRetransmissionPackets(rawPackets [][]byte) [][]byte {
 | |
| 	if a.willRetransmitFast {
 | |
| 		a.willRetransmitFast = false
 | |
| 
 | |
| 		toFastRetrans := []chunk{}
 | |
| 		fastRetransSize := commonHeaderSize
 | |
| 
 | |
| 		for i := 0; ; i++ {
 | |
| 			c, ok := a.inflightQueue.get(a.cumulativeTSNAckPoint + uint32(i) + 1)
 | |
| 			if !ok {
 | |
| 				break // end of pending data
 | |
| 			}
 | |
| 
 | |
| 			if c.acked || c.abandoned() {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			if c.nSent > 1 || c.missIndicator < 3 {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			// RFC 4960 Sec 7.2.4 Fast Retransmit on Gap Reports
 | |
| 			//  3)  Determine how many of the earliest (i.e., lowest TSN) DATA chunks
 | |
| 			//      marked for retransmission will fit into a single packet, subject
 | |
| 			//      to constraint of the path MTU of the destination transport
 | |
| 			//      address to which the packet is being sent.  Call this value K.
 | |
| 			//      Retransmit those K DATA chunks in a single packet.  When a Fast
 | |
| 			//      Retransmit is being performed, the sender SHOULD ignore the value
 | |
| 			//      of cwnd and SHOULD NOT delay retransmission for this single
 | |
| 			//		packet.
 | |
| 
 | |
| 			dataChunkSize := dataChunkHeaderSize + uint32(len(c.userData))
 | |
| 			if a.mtu < fastRetransSize+dataChunkSize {
 | |
| 				break
 | |
| 			}
 | |
| 
 | |
| 			fastRetransSize += dataChunkSize
 | |
| 			a.stats.incFastRetrans()
 | |
| 			c.nSent++
 | |
| 			a.checkPartialReliabilityStatus(c)
 | |
| 			toFastRetrans = append(toFastRetrans, c)
 | |
| 			a.log.Tracef("[%s] fast-retransmit: tsn=%d sent=%d htna=%d",
 | |
| 				a.name, c.tsn, c.nSent, a.fastRecoverExitPoint)
 | |
| 		}
 | |
| 
 | |
| 		if len(toFastRetrans) > 0 {
 | |
| 			raw, err := a.createPacket(toFastRetrans).marshal()
 | |
| 			if err != nil {
 | |
| 				a.log.Warnf("[%s] failed to serialize a DATA packet to be fast-retransmitted", a.name)
 | |
| 			} else {
 | |
| 				rawPackets = append(rawPackets, raw)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return rawPackets
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock
 | |
| func (a *Association) gatherOutboundSackPackets(rawPackets [][]byte) [][]byte {
 | |
| 	if a.ackState == ackStateImmediate {
 | |
| 		a.ackState = ackStateIdle
 | |
| 		sack := a.createSelectiveAckChunk()
 | |
| 		a.log.Debugf("[%s] sending SACK: %s", a.name, sack.String())
 | |
| 		raw, err := a.createPacket([]chunk{sack}).marshal()
 | |
| 		if err != nil {
 | |
| 			a.log.Warnf("[%s] failed to serialize a SACK packet", a.name)
 | |
| 		} else {
 | |
| 			rawPackets = append(rawPackets, raw)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return rawPackets
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock
 | |
| func (a *Association) gatherOutboundForwardTSNPackets(rawPackets [][]byte) [][]byte {
 | |
| 	if a.willSendForwardTSN {
 | |
| 		a.willSendForwardTSN = false
 | |
| 		if sna32GT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) {
 | |
| 			fwdtsn := a.createForwardTSN()
 | |
| 			raw, err := a.createPacket([]chunk{fwdtsn}).marshal()
 | |
| 			if err != nil {
 | |
| 				a.log.Warnf("[%s] failed to serialize a Forward TSN packet", a.name)
 | |
| 			} else {
 | |
| 				rawPackets = append(rawPackets, raw)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return rawPackets
 | |
| }
 | |
| 
 | |
| // gatherOutbound gathers outgoing packets
 | |
| func (a *Association) gatherOutbound() [][]byte {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	rawPackets := [][]byte{}
 | |
| 
 | |
| 	if a.controlQueue.size() > 0 {
 | |
| 		for _, p := range a.controlQueue.popAll() {
 | |
| 			raw, err := p.marshal()
 | |
| 			if err != nil {
 | |
| 				a.log.Warnf("[%s] failed to serialize a control packet", a.name)
 | |
| 				continue
 | |
| 			}
 | |
| 			rawPackets = append(rawPackets, raw)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	state := a.getState()
 | |
| 
 | |
| 	if state == established {
 | |
| 		rawPackets = a.gatherOutboundDataAndReconfigPackets(rawPackets)
 | |
| 		rawPackets = a.gatherOutboundFrastRetransmissionPackets(rawPackets)
 | |
| 		rawPackets = a.gatherOutboundSackPackets(rawPackets)
 | |
| 		rawPackets = a.gatherOutboundForwardTSNPackets(rawPackets)
 | |
| 	}
 | |
| 
 | |
| 	return rawPackets
 | |
| }
 | |
| 
 | |
| func checkPacket(p *packet) error {
 | |
| 	// All packets must adhere to these rules
 | |
| 
 | |
| 	// This is the SCTP sender's port number.  It can be used by the
 | |
| 	// receiver in combination with the source IP address, the SCTP
 | |
| 	// destination port, and possibly the destination IP address to
 | |
| 	// identify the association to which this packet belongs.  The port
 | |
| 	// number 0 MUST NOT be used.
 | |
| 	if p.sourcePort == 0 {
 | |
| 		return errors.Errorf("sctp packet must not have a source port of 0")
 | |
| 	}
 | |
| 
 | |
| 	// This is the SCTP port number to which this packet is destined.
 | |
| 	// The receiving host will use this port number to de-multiplex the
 | |
| 	// SCTP packet to the correct receiving endpoint/application.  The
 | |
| 	// port number 0 MUST NOT be used.
 | |
| 	if p.destinationPort == 0 {
 | |
| 		return errors.Errorf("sctp packet must not have a destination port of 0")
 | |
| 	}
 | |
| 
 | |
| 	// Check values on the packet that are specific to a particular chunk type
 | |
| 	for _, c := range p.chunks {
 | |
| 		switch c.(type) { // nolint:gocritic
 | |
| 		case *chunkInit:
 | |
| 			// An INIT or INIT ACK chunk MUST NOT be bundled with any other chunk.
 | |
| 			// They MUST be the only chunks present in the SCTP packets that carry
 | |
| 			// them.
 | |
| 			if len(p.chunks) != 1 {
 | |
| 				return errors.Errorf("init chunk must not be bundled with any other chunk")
 | |
| 			}
 | |
| 
 | |
| 			// A packet containing an INIT chunk MUST have a zero Verification
 | |
| 			// Tag.
 | |
| 			if p.verificationTag != 0 {
 | |
| 				return errors.Errorf("init chunk expects a verification tag of 0 on the packet when out-of-the-blue")
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func min16(a, b uint16) uint16 {
 | |
| 	if a < b {
 | |
| 		return a
 | |
| 	}
 | |
| 	return b
 | |
| }
 | |
| 
 | |
| func max32(a, b uint32) uint32 {
 | |
| 	if a > b {
 | |
| 		return a
 | |
| 	}
 | |
| 	return b
 | |
| }
 | |
| 
 | |
| func min32(a, b uint32) uint32 {
 | |
| 	if a < b {
 | |
| 		return a
 | |
| 	}
 | |
| 	return b
 | |
| }
 | |
| 
 | |
| // setState atomically sets the state of the Association.
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) setState(newState uint32) {
 | |
| 	oldState := atomic.SwapUint32(&a.state, newState)
 | |
| 	if newState != oldState {
 | |
| 		a.log.Debugf("[%s] state change: '%s' => '%s'",
 | |
| 			a.name,
 | |
| 			getAssociationStateString(oldState),
 | |
| 			getAssociationStateString(newState))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // getState atomically returns the state of the Association.
 | |
| func (a *Association) getState() uint32 {
 | |
| 	return atomic.LoadUint32(&a.state)
 | |
| }
 | |
| 
 | |
| // BytesSent returns the number of bytes sent
 | |
| func (a *Association) BytesSent() uint64 {
 | |
| 	return atomic.LoadUint64(&a.bytesSent)
 | |
| }
 | |
| 
 | |
| // BytesReceived returns the number of bytes received
 | |
| func (a *Association) BytesReceived() uint64 {
 | |
| 	return atomic.LoadUint64(&a.bytesReceived)
 | |
| }
 | |
| 
 | |
| func setSupportedExtensions(init *chunkInitCommon) {
 | |
| 	// nolint:godox
 | |
| 	// TODO RFC5061 https://tools.ietf.org/html/rfc6525#section-5.2
 | |
| 	// An implementation supporting this (Supported Extensions Parameter)
 | |
| 	// extension MUST list the ASCONF, the ASCONF-ACK, and the AUTH chunks
 | |
| 	// in its INIT and INIT-ACK parameters.
 | |
| 	init.params = append(init.params, ¶mSupportedExtensions{
 | |
| 		ChunkTypes: []chunkType{ctReconfig, ctForwardTSN},
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handleInit(p *packet, i *chunkInit) ([]*packet, error) {
 | |
| 	state := a.getState()
 | |
| 	a.log.Debugf("[%s] chunkInit received in state '%s'", a.name, getAssociationStateString(state))
 | |
| 
 | |
| 	// https://tools.ietf.org/html/rfc4960#section-5.2.1
 | |
| 	// Upon receipt of an INIT in the COOKIE-WAIT state, an endpoint MUST
 | |
| 	// respond with an INIT ACK using the same parameters it sent in its
 | |
| 	// original INIT chunk (including its Initiate Tag, unchanged).  When
 | |
| 	// responding, the endpoint MUST send the INIT ACK back to the same
 | |
| 	// address that the original INIT (sent by this endpoint) was sent.
 | |
| 
 | |
| 	if state != closed && state != cookieWait && state != cookieEchoed {
 | |
| 		// 5.2.2.  Unexpected INIT in States Other than CLOSED, COOKIE-ECHOED,
 | |
| 		//        COOKIE-WAIT, and SHUTDOWN-ACK-SENT
 | |
| 		return nil, errors.Errorf("todo: handle Init when in state %s", getAssociationStateString(state))
 | |
| 	}
 | |
| 
 | |
| 	// Should we be setting any of these permanently until we've ACKed further?
 | |
| 	a.myMaxNumInboundStreams = min16(i.numInboundStreams, a.myMaxNumInboundStreams)
 | |
| 	a.myMaxNumOutboundStreams = min16(i.numOutboundStreams, a.myMaxNumOutboundStreams)
 | |
| 	a.peerVerificationTag = i.initiateTag
 | |
| 	a.sourcePort = p.destinationPort
 | |
| 	a.destinationPort = p.sourcePort
 | |
| 
 | |
| 	// 13.2 This is the last TSN received in sequence.  This value
 | |
| 	// is set initially by taking the peer's initial TSN,
 | |
| 	// received in the INIT or INIT ACK chunk, and
 | |
| 	// subtracting one from it.
 | |
| 	a.peerLastTSN = i.initialTSN - 1
 | |
| 
 | |
| 	for _, param := range i.params {
 | |
| 		switch v := param.(type) { // nolint:gocritic
 | |
| 		case *paramSupportedExtensions:
 | |
| 			for _, t := range v.ChunkTypes {
 | |
| 				if t == ctForwardTSN {
 | |
| 					a.log.Debugf("[%s] use ForwardTSN (on init)\n", a.name)
 | |
| 					a.useForwardTSN = true
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	if !a.useForwardTSN {
 | |
| 		a.log.Warnf("[%s] not using ForwardTSN (on init)\n", a.name)
 | |
| 	}
 | |
| 
 | |
| 	outbound := &packet{}
 | |
| 	outbound.verificationTag = a.peerVerificationTag
 | |
| 	outbound.sourcePort = a.sourcePort
 | |
| 	outbound.destinationPort = a.destinationPort
 | |
| 
 | |
| 	initAck := &chunkInitAck{}
 | |
| 
 | |
| 	initAck.initialTSN = a.myNextTSN
 | |
| 	initAck.numOutboundStreams = a.myMaxNumOutboundStreams
 | |
| 	initAck.numInboundStreams = a.myMaxNumInboundStreams
 | |
| 	initAck.initiateTag = a.myVerificationTag
 | |
| 	initAck.advertisedReceiverWindowCredit = a.maxReceiveBufferSize
 | |
| 
 | |
| 	if a.myCookie == nil {
 | |
| 		var err error
 | |
| 		if a.myCookie, err = newRandomStateCookie(); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	initAck.params = []param{a.myCookie}
 | |
| 
 | |
| 	setSupportedExtensions(&initAck.chunkInitCommon)
 | |
| 
 | |
| 	outbound.chunks = []chunk{initAck}
 | |
| 
 | |
| 	return pack(outbound), nil
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handleInitAck(p *packet, i *chunkInitAck) error {
 | |
| 	state := a.getState()
 | |
| 	a.log.Debugf("[%s] chunkInitAck received in state '%s'", a.name, getAssociationStateString(state))
 | |
| 	if state != cookieWait {
 | |
| 		// RFC 4960
 | |
| 		// 5.2.3.  Unexpected INIT ACK
 | |
| 		//   If an INIT ACK is received by an endpoint in any state other than the
 | |
| 		//   COOKIE-WAIT state, the endpoint should discard the INIT ACK chunk.
 | |
| 		//   An unexpected INIT ACK usually indicates the processing of an old or
 | |
| 		//   duplicated INIT chunk.
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	a.myMaxNumInboundStreams = min16(i.numInboundStreams, a.myMaxNumInboundStreams)
 | |
| 	a.myMaxNumOutboundStreams = min16(i.numOutboundStreams, a.myMaxNumOutboundStreams)
 | |
| 	a.peerVerificationTag = i.initiateTag
 | |
| 	a.peerLastTSN = i.initialTSN - 1
 | |
| 	if a.sourcePort != p.destinationPort ||
 | |
| 		a.destinationPort != p.sourcePort {
 | |
| 		a.log.Warnf("[%s] handleInitAck: port mismatch", a.name)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	a.rwnd = i.advertisedReceiverWindowCredit
 | |
| 	a.log.Debugf("[%s] initial rwnd=%d", a.name, a.rwnd)
 | |
| 
 | |
| 	// RFC 4690 Sec 7.2.1
 | |
| 	//  o  The initial value of ssthresh MAY be arbitrarily high (for
 | |
| 	//     example, implementations MAY use the size of the receiver
 | |
| 	//     advertised window).
 | |
| 	a.ssthresh = a.rwnd
 | |
| 	a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (INI)",
 | |
| 		a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes())
 | |
| 
 | |
| 	a.t1Init.stop()
 | |
| 	a.storedInit = nil
 | |
| 
 | |
| 	var cookieParam *paramStateCookie
 | |
| 	for _, param := range i.params {
 | |
| 		switch v := param.(type) {
 | |
| 		case *paramStateCookie:
 | |
| 			cookieParam = v
 | |
| 		case *paramSupportedExtensions:
 | |
| 			for _, t := range v.ChunkTypes {
 | |
| 				if t == ctForwardTSN {
 | |
| 					a.log.Debugf("[%s] use ForwardTSN (on initAck)\n", a.name)
 | |
| 					a.useForwardTSN = true
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	if !a.useForwardTSN {
 | |
| 		a.log.Warnf("[%s] not using ForwardTSN (on initAck)\n", a.name)
 | |
| 	}
 | |
| 	if cookieParam == nil {
 | |
| 		return errors.Errorf("no cookie in InitAck")
 | |
| 	}
 | |
| 
 | |
| 	a.storedCookieEcho = &chunkCookieEcho{}
 | |
| 	a.storedCookieEcho.cookie = cookieParam.cookie
 | |
| 
 | |
| 	err := a.sendCookieEcho()
 | |
| 	if err != nil {
 | |
| 		a.log.Errorf("[%s] failed to send init: %s", a.name, err.Error())
 | |
| 	}
 | |
| 
 | |
| 	a.t1Cookie.start(a.rtoMgr.getRTO())
 | |
| 	a.setState(cookieEchoed)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handleHeartbeat(c *chunkHeartbeat) []*packet {
 | |
| 	a.log.Tracef("[%s] chunkHeartbeat", a.name)
 | |
| 	hbi, ok := c.params[0].(*paramHeartbeatInfo)
 | |
| 	if !ok {
 | |
| 		a.log.Warnf("[%s] failed to handle Heartbeat, no ParamHeartbeatInfo", a.name)
 | |
| 	}
 | |
| 
 | |
| 	return pack(&packet{
 | |
| 		verificationTag: a.peerVerificationTag,
 | |
| 		sourcePort:      a.sourcePort,
 | |
| 		destinationPort: a.destinationPort,
 | |
| 		chunks: []chunk{&chunkHeartbeatAck{
 | |
| 			params: []param{
 | |
| 				¶mHeartbeatInfo{
 | |
| 					heartbeatInformation: hbi.heartbeatInformation,
 | |
| 				},
 | |
| 			},
 | |
| 		}},
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handleCookieEcho(c *chunkCookieEcho) []*packet {
 | |
| 	state := a.getState()
 | |
| 	a.log.Debugf("[%s] COOKIE-ECHO received in state '%s'", a.name, getAssociationStateString(state))
 | |
| 	switch state {
 | |
| 	default:
 | |
| 		return nil
 | |
| 	case established:
 | |
| 		if !bytes.Equal(a.myCookie.cookie, c.cookie) {
 | |
| 			return nil
 | |
| 		}
 | |
| 	case closed, cookieWait, cookieEchoed:
 | |
| 		if !bytes.Equal(a.myCookie.cookie, c.cookie) {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		a.t1Init.stop()
 | |
| 		a.storedInit = nil
 | |
| 
 | |
| 		a.t1Cookie.stop()
 | |
| 		a.storedCookieEcho = nil
 | |
| 
 | |
| 		a.setState(established)
 | |
| 		a.handshakeCompletedCh <- nil
 | |
| 	}
 | |
| 
 | |
| 	p := &packet{
 | |
| 		verificationTag: a.peerVerificationTag,
 | |
| 		sourcePort:      a.sourcePort,
 | |
| 		destinationPort: a.destinationPort,
 | |
| 		chunks:          []chunk{&chunkCookieAck{}},
 | |
| 	}
 | |
| 	return pack(p)
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handleCookieAck() {
 | |
| 	state := a.getState()
 | |
| 	a.log.Debugf("[%s] COOKIE-ACK received in state '%s'", a.name, getAssociationStateString(state))
 | |
| 	if state != cookieEchoed {
 | |
| 		// RFC 4960
 | |
| 		// 5.2.5.  Handle Duplicate COOKIE-ACK.
 | |
| 		//   At any state other than COOKIE-ECHOED, an endpoint should silently
 | |
| 		//   discard a received COOKIE ACK chunk.
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	a.t1Cookie.stop()
 | |
| 	a.storedCookieEcho = nil
 | |
| 
 | |
| 	a.setState(established)
 | |
| 	a.handshakeCompletedCh <- nil
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handleData(d *chunkPayloadData) []*packet {
 | |
| 	a.log.Tracef("[%s] DATA: tsn=%d immediateSack=%v len=%d",
 | |
| 		a.name, d.tsn, d.immediateSack, len(d.userData))
 | |
| 	a.stats.incDATAs()
 | |
| 
 | |
| 	canPush := a.payloadQueue.canPush(d, a.peerLastTSN)
 | |
| 	if canPush {
 | |
| 		s := a.getOrCreateStream(d.streamIdentifier)
 | |
| 		if s == nil {
 | |
| 			// silentely discard the data. (sender will retry on T3-rtx timeout)
 | |
| 			// see pion/sctp#30
 | |
| 			a.log.Debugf("discard %d", d.streamSequenceNumber)
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		if a.getMyReceiverWindowCredit() > 0 {
 | |
| 			// Pass the new chunk to stream level as soon as it arrives
 | |
| 			a.payloadQueue.push(d, a.peerLastTSN)
 | |
| 			s.handleData(d)
 | |
| 		} else {
 | |
| 			// Receive buffer is full
 | |
| 			lastTSN, ok := a.payloadQueue.getLastTSNReceived()
 | |
| 			if ok && sna32LT(d.tsn, lastTSN) {
 | |
| 				a.log.Debugf("[%s] receive buffer full, but accepted as this is a missing chunk with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber)
 | |
| 				a.payloadQueue.push(d, a.peerLastTSN)
 | |
| 				s.handleData(d)
 | |
| 			} else {
 | |
| 				a.log.Debugf("[%s] receive buffer full. dropping DATA with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return a.handlePeerLastTSNAndAcknowledgement(d.immediateSack)
 | |
| }
 | |
| 
 | |
| // A common routine for handleData and handleForwardTSN routines
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handlePeerLastTSNAndAcknowledgement(sackImmediately bool) []*packet {
 | |
| 	var reply []*packet
 | |
| 
 | |
| 	// Try to advance peerLastTSN
 | |
| 
 | |
| 	// From RFC 3758 Sec 3.6:
 | |
| 	//   .. and then MUST further advance its cumulative TSN point locally
 | |
| 	//   if possible
 | |
| 	// Meaning, if peerLastTSN+1 points to a chunk that is received,
 | |
| 	// advance peerLastTSN until peerLastTSN+1 points to unreceived chunk.
 | |
| 	for {
 | |
| 		if _, popOk := a.payloadQueue.pop(a.peerLastTSN + 1); !popOk {
 | |
| 			break
 | |
| 		}
 | |
| 		a.peerLastTSN++
 | |
| 
 | |
| 		for _, rstReq := range a.reconfigRequests {
 | |
| 			resp := a.resetStreamsIfAny(rstReq)
 | |
| 			if resp != nil {
 | |
| 				a.log.Debugf("[%s] RESET RESPONSE: %+v", a.name, resp)
 | |
| 				reply = append(reply, resp)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	hasPacketLoss := (a.payloadQueue.size() > 0)
 | |
| 	if hasPacketLoss {
 | |
| 		a.log.Tracef("[%s] packetloss: %s", a.name, a.payloadQueue.getGapAckBlocksString(a.peerLastTSN))
 | |
| 	}
 | |
| 
 | |
| 	if (a.ackState != ackStateImmediate && !sackImmediately && !hasPacketLoss && a.ackMode == ackModeNormal) || a.ackMode == ackModeAlwaysDelay {
 | |
| 		if a.ackState == ackStateIdle {
 | |
| 			a.delayedAckTriggered = true
 | |
| 		} else {
 | |
| 			a.immediateAckTriggered = true
 | |
| 		}
 | |
| 	} else {
 | |
| 		a.immediateAckTriggered = true
 | |
| 	}
 | |
| 
 | |
| 	return reply
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) getMyReceiverWindowCredit() uint32 {
 | |
| 	var bytesQueued uint32
 | |
| 	for _, s := range a.streams {
 | |
| 		bytesQueued += uint32(s.getNumBytesInReassemblyQueue())
 | |
| 	}
 | |
| 
 | |
| 	if bytesQueued >= a.maxReceiveBufferSize {
 | |
| 		return 0
 | |
| 	}
 | |
| 	return a.maxReceiveBufferSize - bytesQueued
 | |
| }
 | |
| 
 | |
| // OpenStream opens a stream
 | |
| func (a *Association) OpenStream(streamIdentifier uint16, defaultPayloadType PayloadProtocolIdentifier) (*Stream, error) {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	if _, ok := a.streams[streamIdentifier]; ok {
 | |
| 		return nil, errors.Errorf("there already exists a stream with identifier %d", streamIdentifier)
 | |
| 	}
 | |
| 
 | |
| 	s := a.createStream(streamIdentifier, false)
 | |
| 	s.setDefaultPayloadType(defaultPayloadType)
 | |
| 
 | |
| 	return s, nil
 | |
| }
 | |
| 
 | |
| // AcceptStream accepts a stream
 | |
| func (a *Association) AcceptStream() (*Stream, error) {
 | |
| 	s, ok := <-a.acceptCh
 | |
| 	if !ok {
 | |
| 		return nil, io.EOF // no more incoming streams
 | |
| 	}
 | |
| 	return s, nil
 | |
| }
 | |
| 
 | |
| // createStream creates a stream. The caller should hold the lock and check no stream exists for this id.
 | |
| func (a *Association) createStream(streamIdentifier uint16, accept bool) *Stream {
 | |
| 	s := &Stream{
 | |
| 		association:      a,
 | |
| 		streamIdentifier: streamIdentifier,
 | |
| 		reassemblyQueue:  newReassemblyQueue(streamIdentifier),
 | |
| 		log:              a.log,
 | |
| 		name:             fmt.Sprintf("%d:%s", streamIdentifier, a.name),
 | |
| 	}
 | |
| 
 | |
| 	s.readNotifier = sync.NewCond(&s.lock)
 | |
| 
 | |
| 	if accept {
 | |
| 		select {
 | |
| 		case a.acceptCh <- s:
 | |
| 			a.streams[streamIdentifier] = s
 | |
| 			a.log.Debugf("[%s] accepted a new stream (streamIdentifier: %d)",
 | |
| 				a.name, streamIdentifier)
 | |
| 		default:
 | |
| 			a.log.Debugf("[%s] dropped a new stream (acceptCh size: %d)",
 | |
| 				a.name, len(a.acceptCh))
 | |
| 			return nil
 | |
| 		}
 | |
| 	} else {
 | |
| 		a.streams[streamIdentifier] = s
 | |
| 	}
 | |
| 
 | |
| 	return s
 | |
| }
 | |
| 
 | |
| // getOrCreateStream gets or creates a stream. The caller should hold the lock.
 | |
| func (a *Association) getOrCreateStream(streamIdentifier uint16) *Stream {
 | |
| 	if s, ok := a.streams[streamIdentifier]; ok {
 | |
| 		return s
 | |
| 	}
 | |
| 
 | |
| 	return a.createStream(streamIdentifier, true)
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) processSelectiveAck(d *chunkSelectiveAck) (map[uint16]int, uint32, error) { // nolint:gocognit
 | |
| 	bytesAckedPerStream := map[uint16]int{}
 | |
| 
 | |
| 	// New ack point, so pop all ACKed packets from inflightQueue
 | |
| 	// We add 1 because the "currentAckPoint" has already been popped from the inflight queue
 | |
| 	// For the first SACK we take care of this by setting the ackpoint to cumAck - 1
 | |
| 	for i := a.cumulativeTSNAckPoint + 1; sna32LTE(i, d.cumulativeTSNAck); i++ {
 | |
| 		c, ok := a.inflightQueue.pop(i)
 | |
| 		if !ok {
 | |
| 			return nil, 0, errors.Errorf("tsn %v unable to be popped from inflight queue", i)
 | |
| 		}
 | |
| 
 | |
| 		if !c.acked {
 | |
| 			// RFC 4096 sec 6.3.2.  Retransmission Timer Rules
 | |
| 			//   R3)  Whenever a SACK is received that acknowledges the DATA chunk
 | |
| 			//        with the earliest outstanding TSN for that address, restart the
 | |
| 			//        T3-rtx timer for that address with its current RTO (if there is
 | |
| 			//        still outstanding data on that address).
 | |
| 			if i == a.cumulativeTSNAckPoint+1 {
 | |
| 				// T3 timer needs to be reset. Stop it for now.
 | |
| 				a.t3RTX.stop()
 | |
| 			}
 | |
| 
 | |
| 			nBytesAcked := len(c.userData)
 | |
| 
 | |
| 			// Sum the number of bytes acknowledged per stream
 | |
| 			if amount, ok := bytesAckedPerStream[c.streamIdentifier]; ok {
 | |
| 				bytesAckedPerStream[c.streamIdentifier] = amount + nBytesAcked
 | |
| 			} else {
 | |
| 				bytesAckedPerStream[c.streamIdentifier] = nBytesAcked
 | |
| 			}
 | |
| 
 | |
| 			// RFC 4960 sec 6.3.1.  RTO Calculation
 | |
| 			//   C4)  When data is in flight and when allowed by rule C5 below, a new
 | |
| 			//        RTT measurement MUST be made each round trip.  Furthermore, new
 | |
| 			//        RTT measurements SHOULD be made no more than once per round trip
 | |
| 			//        for a given destination transport address.
 | |
| 			//   C5)  Karn's algorithm: RTT measurements MUST NOT be made using
 | |
| 			//        packets that were retransmitted (and thus for which it is
 | |
| 			//        ambiguous whether the reply was for the first instance of the
 | |
| 			//        chunk or for a later instance)
 | |
| 			if c.nSent == 1 && sna32GTE(c.tsn, a.minTSN2MeasureRTT) {
 | |
| 				a.minTSN2MeasureRTT = a.myNextTSN
 | |
| 				rtt := time.Since(c.since).Seconds() * 1000.0
 | |
| 				srtt := a.rtoMgr.setNewRTT(rtt)
 | |
| 				a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
 | |
| 					a.name, rtt, srtt, a.rtoMgr.getRTO())
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if a.inFastRecovery && c.tsn == a.fastRecoverExitPoint {
 | |
| 			a.log.Debugf("[%s] exit fast-recovery", a.name)
 | |
| 			a.inFastRecovery = false
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	htna := d.cumulativeTSNAck
 | |
| 
 | |
| 	// Mark selectively acknowledged chunks as "acked"
 | |
| 	for _, g := range d.gapAckBlocks {
 | |
| 		for i := g.start; i <= g.end; i++ {
 | |
| 			tsn := d.cumulativeTSNAck + uint32(i)
 | |
| 			c, ok := a.inflightQueue.get(tsn)
 | |
| 			if !ok {
 | |
| 				return nil, 0, errors.Errorf("requested non-existent TSN %v", tsn)
 | |
| 			}
 | |
| 
 | |
| 			if !c.acked {
 | |
| 				nBytesAcked := a.inflightQueue.markAsAcked(tsn)
 | |
| 
 | |
| 				// Sum the number of bytes acknowledged per stream
 | |
| 				if amount, ok := bytesAckedPerStream[c.streamIdentifier]; ok {
 | |
| 					bytesAckedPerStream[c.streamIdentifier] = amount + nBytesAcked
 | |
| 				} else {
 | |
| 					bytesAckedPerStream[c.streamIdentifier] = nBytesAcked
 | |
| 				}
 | |
| 
 | |
| 				a.log.Tracef("[%s] tsn=%d has been sacked", a.name, c.tsn)
 | |
| 
 | |
| 				if c.nSent == 1 {
 | |
| 					a.minTSN2MeasureRTT = a.myNextTSN
 | |
| 					rtt := time.Since(c.since).Seconds() * 1000.0
 | |
| 					srtt := a.rtoMgr.setNewRTT(rtt)
 | |
| 					a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
 | |
| 						a.name, rtt, srtt, a.rtoMgr.getRTO())
 | |
| 				}
 | |
| 
 | |
| 				if sna32LT(htna, tsn) {
 | |
| 					htna = tsn
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return bytesAckedPerStream, htna, nil
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) onCumulativeTSNAckPointAdvanced(totalBytesAcked int) {
 | |
| 	// RFC 4096, sec 6.3.2.  Retransmission Timer Rules
 | |
| 	//   R2)  Whenever all outstanding data sent to an address have been
 | |
| 	//        acknowledged, turn off the T3-rtx timer of that address.
 | |
| 	if a.inflightQueue.size() == 0 {
 | |
| 		a.log.Tracef("[%s] SACK: no more packet in-flight (pending=%d)", a.name, a.pendingQueue.size())
 | |
| 		a.t3RTX.stop()
 | |
| 	} else {
 | |
| 		a.log.Tracef("[%s] T3-rtx timer start (pt2)", a.name)
 | |
| 		a.t3RTX.start(a.rtoMgr.getRTO())
 | |
| 	}
 | |
| 
 | |
| 	// Update congestion control parameters
 | |
| 	if a.cwnd <= a.ssthresh {
 | |
| 		// RFC 4096, sec 7.2.1.  Slow-Start
 | |
| 		//   o  When cwnd is less than or equal to ssthresh, an SCTP endpoint MUST
 | |
| 		//		use the slow-start algorithm to increase cwnd only if the current
 | |
| 		//      congestion window is being fully utilized, an incoming SACK
 | |
| 		//      advances the Cumulative TSN Ack Point, and the data sender is not
 | |
| 		//      in Fast Recovery.  Only when these three conditions are met can
 | |
| 		//      the cwnd be increased; otherwise, the cwnd MUST not be increased.
 | |
| 		//		If these conditions are met, then cwnd MUST be increased by, at
 | |
| 		//      most, the lesser of 1) the total size of the previously
 | |
| 		//      outstanding DATA chunk(s) acknowledged, and 2) the destination's
 | |
| 		//      path MTU.
 | |
| 		if !a.inFastRecovery &&
 | |
| 			a.pendingQueue.size() > 0 {
 | |
| 			a.cwnd += min32(uint32(totalBytesAcked), a.cwnd) // TCP way
 | |
| 			// a.cwnd += min32(uint32(totalBytesAcked), a.mtu) // SCTP way (slow)
 | |
| 			a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d acked=%d (SS)",
 | |
| 				a.name, a.cwnd, a.ssthresh, totalBytesAcked)
 | |
| 		} else {
 | |
| 			a.log.Tracef("[%s] cwnd did not grow: cwnd=%d ssthresh=%d acked=%d FR=%v pending=%d",
 | |
| 				a.name, a.cwnd, a.ssthresh, totalBytesAcked, a.inFastRecovery, a.pendingQueue.size())
 | |
| 		}
 | |
| 	} else {
 | |
| 		// RFC 4096, sec 7.2.2.  Congestion Avoidance
 | |
| 		//   o  Whenever cwnd is greater than ssthresh, upon each SACK arrival
 | |
| 		//      that advances the Cumulative TSN Ack Point, increase
 | |
| 		//      partial_bytes_acked by the total number of bytes of all new chunks
 | |
| 		//      acknowledged in that SACK including chunks acknowledged by the new
 | |
| 		//      Cumulative TSN Ack and by Gap Ack Blocks.
 | |
| 		a.partialBytesAcked += uint32(totalBytesAcked)
 | |
| 
 | |
| 		//   o  When partial_bytes_acked is equal to or greater than cwnd and
 | |
| 		//      before the arrival of the SACK the sender had cwnd or more bytes
 | |
| 		//      of data outstanding (i.e., before arrival of the SACK, flight size
 | |
| 		//      was greater than or equal to cwnd), increase cwnd by MTU, and
 | |
| 		//      reset partial_bytes_acked to (partial_bytes_acked - cwnd).
 | |
| 		if a.partialBytesAcked >= a.cwnd && a.pendingQueue.size() > 0 {
 | |
| 			a.partialBytesAcked -= a.cwnd
 | |
| 			a.cwnd += a.mtu
 | |
| 			a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d acked=%d (CA)",
 | |
| 				a.name, a.cwnd, a.ssthresh, totalBytesAcked)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) processFastRetransmission(cumTSNAckPoint, htna uint32, cumTSNAckPointAdvanced bool) error {
 | |
| 	// HTNA algorithm - RFC 4960 Sec 7.2.4
 | |
| 	// Increment missIndicator of each chunks that the SACK reported missing
 | |
| 	// when either of the following is met:
 | |
| 	// a)  Not in fast-recovery
 | |
| 	//     miss indications are incremented only for missing TSNs prior to the
 | |
| 	//     highest TSN newly acknowledged in the SACK.
 | |
| 	// b)  In fast-recovery AND the Cumulative TSN Ack Point advanced
 | |
| 	//     the miss indications are incremented for all TSNs reported missing
 | |
| 	//     in the SACK.
 | |
| 	if !a.inFastRecovery || (a.inFastRecovery && cumTSNAckPointAdvanced) {
 | |
| 		var maxTSN uint32
 | |
| 		if !a.inFastRecovery {
 | |
| 			// a) increment only for missing TSNs prior to the HTNA
 | |
| 			maxTSN = htna
 | |
| 		} else {
 | |
| 			// b) increment for all TSNs reported missing
 | |
| 			maxTSN = cumTSNAckPoint + uint32(a.inflightQueue.size()) + 1
 | |
| 		}
 | |
| 
 | |
| 		for tsn := cumTSNAckPoint + 1; sna32LT(tsn, maxTSN); tsn++ {
 | |
| 			c, ok := a.inflightQueue.get(tsn)
 | |
| 			if !ok {
 | |
| 				return errors.Errorf("requested non-existent TSN %v", tsn)
 | |
| 			}
 | |
| 			if !c.acked && !c.abandoned() && c.missIndicator < 3 {
 | |
| 				c.missIndicator++
 | |
| 				if c.missIndicator == 3 {
 | |
| 					if !a.inFastRecovery {
 | |
| 						// 2)  If not in Fast Recovery, adjust the ssthresh and cwnd of the
 | |
| 						//     destination address(es) to which the missing DATA chunks were
 | |
| 						//     last sent, according to the formula described in Section 7.2.3.
 | |
| 						a.inFastRecovery = true
 | |
| 						a.fastRecoverExitPoint = htna
 | |
| 						a.ssthresh = max32(a.cwnd/2, 4*a.mtu)
 | |
| 						a.cwnd = a.ssthresh
 | |
| 						a.partialBytesAcked = 0
 | |
| 						a.willRetransmitFast = true
 | |
| 
 | |
| 						a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (FR)",
 | |
| 							a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes())
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if a.inFastRecovery && cumTSNAckPointAdvanced {
 | |
| 		a.willRetransmitFast = true
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handleSack(d *chunkSelectiveAck) error {
 | |
| 	a.log.Tracef("[%s] SACK: cumTSN=%d a_rwnd=%d", a.name, d.cumulativeTSNAck, d.advertisedReceiverWindowCredit)
 | |
| 	state := a.getState()
 | |
| 	if state != established {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	a.stats.incSACKs()
 | |
| 
 | |
| 	if sna32GT(a.cumulativeTSNAckPoint, d.cumulativeTSNAck) {
 | |
| 		// RFC 4960 sec 6.2.1.  Processing a Received SACK
 | |
| 		// D)
 | |
| 		//   i) If Cumulative TSN Ack is less than the Cumulative TSN Ack
 | |
| 		//      Point, then drop the SACK.  Since Cumulative TSN Ack is
 | |
| 		//      monotonically increasing, a SACK whose Cumulative TSN Ack is
 | |
| 		//      less than the Cumulative TSN Ack Point indicates an out-of-
 | |
| 		//      order SACK.
 | |
| 
 | |
| 		a.log.Debugf("[%s] SACK Cumulative ACK %v is older than ACK point %v",
 | |
| 			a.name,
 | |
| 			d.cumulativeTSNAck,
 | |
| 			a.cumulativeTSNAckPoint)
 | |
| 
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Process selective ack
 | |
| 	bytesAckedPerStream, htna, err := a.processSelectiveAck(d)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	var totalBytesAcked int
 | |
| 	for _, nBytesAcked := range bytesAckedPerStream {
 | |
| 		totalBytesAcked += nBytesAcked
 | |
| 	}
 | |
| 
 | |
| 	cumTSNAckPointAdvanced := false
 | |
| 	if sna32LT(a.cumulativeTSNAckPoint, d.cumulativeTSNAck) {
 | |
| 		a.log.Tracef("[%s] SACK: cumTSN advanced: %d -> %d",
 | |
| 			a.name,
 | |
| 			a.cumulativeTSNAckPoint,
 | |
| 			d.cumulativeTSNAck)
 | |
| 
 | |
| 		a.cumulativeTSNAckPoint = d.cumulativeTSNAck
 | |
| 		cumTSNAckPointAdvanced = true
 | |
| 		a.onCumulativeTSNAckPointAdvanced(totalBytesAcked)
 | |
| 	}
 | |
| 
 | |
| 	for si, nBytesAcked := range bytesAckedPerStream {
 | |
| 		if s, ok := a.streams[si]; ok {
 | |
| 			a.lock.Unlock()
 | |
| 			s.onBufferReleased(nBytesAcked)
 | |
| 			a.lock.Lock()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// New rwnd value
 | |
| 	// RFC 4960 sec 6.2.1.  Processing a Received SACK
 | |
| 	// D)
 | |
| 	//   ii) Set rwnd equal to the newly received a_rwnd minus the number
 | |
| 	//       of bytes still outstanding after processing the Cumulative
 | |
| 	//       TSN Ack and the Gap Ack Blocks.
 | |
| 
 | |
| 	// bytes acked were already subtracted by markAsAcked() method
 | |
| 	bytesOutstanding := uint32(a.inflightQueue.getNumBytes())
 | |
| 	if bytesOutstanding >= d.advertisedReceiverWindowCredit {
 | |
| 		a.rwnd = 0
 | |
| 	} else {
 | |
| 		a.rwnd = d.advertisedReceiverWindowCredit - bytesOutstanding
 | |
| 	}
 | |
| 
 | |
| 	err = a.processFastRetransmission(d.cumulativeTSNAck, htna, cumTSNAckPointAdvanced)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if a.useForwardTSN {
 | |
| 		// RFC 3758 Sec 3.5 C1
 | |
| 		if sna32LT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) {
 | |
| 			a.advancedPeerTSNAckPoint = a.cumulativeTSNAckPoint
 | |
| 		}
 | |
| 
 | |
| 		// RFC 3758 Sec 3.5 C2
 | |
| 		for i := a.advancedPeerTSNAckPoint + 1; ; i++ {
 | |
| 			c, ok := a.inflightQueue.get(i)
 | |
| 			if !ok {
 | |
| 				break
 | |
| 			}
 | |
| 			if !c.abandoned() {
 | |
| 				break
 | |
| 			}
 | |
| 			a.advancedPeerTSNAckPoint = i
 | |
| 		}
 | |
| 
 | |
| 		// RFC 3758 Sec 3.5 C3
 | |
| 		if sna32GT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) {
 | |
| 			a.willSendForwardTSN = true
 | |
| 		}
 | |
| 		a.awakeWriteLoop()
 | |
| 	}
 | |
| 
 | |
| 	if a.inflightQueue.size() > 0 {
 | |
| 		// Start timer. (noop if already started)
 | |
| 		a.log.Tracef("[%s] T3-rtx timer start (pt3)", a.name)
 | |
| 		a.t3RTX.start(a.rtoMgr.getRTO())
 | |
| 	}
 | |
| 
 | |
| 	if cumTSNAckPointAdvanced {
 | |
| 		a.awakeWriteLoop()
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // createForwardTSN generates ForwardTSN chunk.
 | |
| // This method will be be called if useForwardTSN is set to false.
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) createForwardTSN() *chunkForwardTSN {
 | |
| 	// RFC 3758 Sec 3.5 C4
 | |
| 	streamMap := map[uint16]uint16{} // to report only once per SI
 | |
| 	for i := a.cumulativeTSNAckPoint + 1; sna32LTE(i, a.advancedPeerTSNAckPoint); i++ {
 | |
| 		c, ok := a.inflightQueue.get(i)
 | |
| 		if !ok {
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		ssn, ok := streamMap[c.streamIdentifier]
 | |
| 		if !ok {
 | |
| 			streamMap[c.streamIdentifier] = c.streamSequenceNumber
 | |
| 		} else if sna16LT(ssn, c.streamSequenceNumber) {
 | |
| 			// to report only once with greatest SSN
 | |
| 			streamMap[c.streamIdentifier] = c.streamSequenceNumber
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	fwdtsn := &chunkForwardTSN{
 | |
| 		newCumulativeTSN: a.advancedPeerTSNAckPoint,
 | |
| 		streams:          []chunkForwardTSNStream{},
 | |
| 	}
 | |
| 
 | |
| 	var streamStr string
 | |
| 	for si, ssn := range streamMap {
 | |
| 		streamStr += fmt.Sprintf("(si=%d ssn=%d)", si, ssn)
 | |
| 		fwdtsn.streams = append(fwdtsn.streams, chunkForwardTSNStream{
 | |
| 			identifier: si,
 | |
| 			sequence:   ssn,
 | |
| 		})
 | |
| 	}
 | |
| 	a.log.Tracef("[%s] building fwdtsn: newCumulativeTSN=%d cumTSN=%d - %s", a.name, fwdtsn.newCumulativeTSN, a.cumulativeTSNAckPoint, streamStr)
 | |
| 
 | |
| 	return fwdtsn
 | |
| }
 | |
| 
 | |
| // createPacket wraps chunks in a packet.
 | |
| // The caller should hold the read lock.
 | |
| func (a *Association) createPacket(cs []chunk) *packet {
 | |
| 	return &packet{
 | |
| 		verificationTag: a.peerVerificationTag,
 | |
| 		sourcePort:      a.sourcePort,
 | |
| 		destinationPort: a.destinationPort,
 | |
| 		chunks:          cs,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handleReconfig(c *chunkReconfig) ([]*packet, error) {
 | |
| 	a.log.Tracef("[%s] handleReconfig", a.name)
 | |
| 
 | |
| 	pp := make([]*packet, 0)
 | |
| 
 | |
| 	p, err := a.handleReconfigParam(c.paramA)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if p != nil {
 | |
| 		pp = append(pp, p)
 | |
| 	}
 | |
| 
 | |
| 	if c.paramB != nil {
 | |
| 		p, err = a.handleReconfigParam(c.paramB)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if p != nil {
 | |
| 			pp = append(pp, p)
 | |
| 		}
 | |
| 	}
 | |
| 	return pp, nil
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handleForwardTSN(c *chunkForwardTSN) []*packet {
 | |
| 	a.log.Tracef("[%s] FwdTSN: %s", a.name, c.String())
 | |
| 
 | |
| 	if !a.useForwardTSN {
 | |
| 		a.log.Warn("[%s] received FwdTSN but not enabled")
 | |
| 		// Return an error chunk
 | |
| 		cerr := &chunkError{
 | |
| 			errorCauses: []errorCause{&errorCauseUnrecognizedChunkType{}},
 | |
| 		}
 | |
| 		outbound := &packet{}
 | |
| 		outbound.verificationTag = a.peerVerificationTag
 | |
| 		outbound.sourcePort = a.sourcePort
 | |
| 		outbound.destinationPort = a.destinationPort
 | |
| 		outbound.chunks = []chunk{cerr}
 | |
| 		return []*packet{outbound}
 | |
| 	}
 | |
| 
 | |
| 	// From RFC 3758 Sec 3.6:
 | |
| 	//   Note, if the "New Cumulative TSN" value carried in the arrived
 | |
| 	//   FORWARD TSN chunk is found to be behind or at the current cumulative
 | |
| 	//   TSN point, the data receiver MUST treat this FORWARD TSN as out-of-
 | |
| 	//   date and MUST NOT update its Cumulative TSN.  The receiver SHOULD
 | |
| 	//   send a SACK to its peer (the sender of the FORWARD TSN) since such a
 | |
| 	//   duplicate may indicate the previous SACK was lost in the network.
 | |
| 
 | |
| 	a.log.Tracef("[%s] should send ack? newCumTSN=%d peerLastTSN=%d\n",
 | |
| 		a.name, c.newCumulativeTSN, a.peerLastTSN)
 | |
| 	if sna32LTE(c.newCumulativeTSN, a.peerLastTSN) {
 | |
| 		a.log.Tracef("[%s] sending ack on Forward TSN", a.name)
 | |
| 		a.ackState = ackStateImmediate
 | |
| 		a.ackTimer.stop()
 | |
| 		a.awakeWriteLoop()
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// From RFC 3758 Sec 3.6:
 | |
| 	//   the receiver MUST perform the same TSN handling, including duplicate
 | |
| 	//   detection, gap detection, SACK generation, cumulative TSN
 | |
| 	//   advancement, etc. as defined in RFC 2960 [2]---with the following
 | |
| 	//   exceptions and additions.
 | |
| 
 | |
| 	//   When a FORWARD TSN chunk arrives, the data receiver MUST first update
 | |
| 	//   its cumulative TSN point to the value carried in the FORWARD TSN
 | |
| 	//   chunk,
 | |
| 
 | |
| 	// Advance peerLastTSN
 | |
| 	for sna32LT(a.peerLastTSN, c.newCumulativeTSN) {
 | |
| 		a.payloadQueue.pop(a.peerLastTSN + 1) // may not exist
 | |
| 		a.peerLastTSN++
 | |
| 	}
 | |
| 
 | |
| 	// Report new peerLastTSN value and abandoned largest SSN value to
 | |
| 	// corresponding streams so that the abandoned chunks can be removed
 | |
| 	// from the reassemblyQueue.
 | |
| 	for _, forwarded := range c.streams {
 | |
| 		if s, ok := a.streams[forwarded.identifier]; ok {
 | |
| 			s.handleForwardTSNForOrdered(forwarded.sequence)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// TSN may be forewared for unordered chunks. ForwardTSN chunk does not
 | |
| 	// report which stream identifier it skipped for unordered chunks.
 | |
| 	// Therefore, we need to broadcast this event to all existing streams for
 | |
| 	// unordered chunks.
 | |
| 	// See https://github.com/pion/sctp/issues/106
 | |
| 	for _, s := range a.streams {
 | |
| 		s.handleForwardTSNForUnordered(c.newCumulativeTSN)
 | |
| 	}
 | |
| 
 | |
| 	return a.handlePeerLastTSNAndAcknowledgement(false)
 | |
| }
 | |
| 
 | |
| func (a *Association) sendResetRequest(streamIdentifier uint16) error {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	state := a.getState()
 | |
| 	if state != established {
 | |
| 		return errors.Errorf("sending reset packet in non-established state: state=%s",
 | |
| 			getAssociationStateString(state))
 | |
| 	}
 | |
| 
 | |
| 	// Create DATA chunk which only contains valid stream identifier with
 | |
| 	// nil userData and use it as a EOS from the stream.
 | |
| 	c := &chunkPayloadData{
 | |
| 		streamIdentifier:  streamIdentifier,
 | |
| 		beginningFragment: true,
 | |
| 		endingFragment:    true,
 | |
| 		userData:          nil,
 | |
| 	}
 | |
| 
 | |
| 	a.pendingQueue.push(c)
 | |
| 	a.awakeWriteLoop()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) handleReconfigParam(raw param) (*packet, error) {
 | |
| 	switch p := raw.(type) {
 | |
| 	case *paramOutgoingResetRequest:
 | |
| 		a.reconfigRequests[p.reconfigRequestSequenceNumber] = p
 | |
| 		resp := a.resetStreamsIfAny(p)
 | |
| 		if resp != nil {
 | |
| 			return resp, nil
 | |
| 		}
 | |
| 		return nil, nil
 | |
| 
 | |
| 	case *paramReconfigResponse:
 | |
| 		delete(a.reconfigs, p.reconfigResponseSequenceNumber)
 | |
| 		if len(a.reconfigs) == 0 {
 | |
| 			a.tReconfig.stop()
 | |
| 		}
 | |
| 		return nil, nil
 | |
| 	default:
 | |
| 		return nil, errors.Errorf("unexpected parameter type %T", p)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
 | |
| 	result := reconfigResultSuccessPerformed
 | |
| 	if sna32LTE(p.senderLastTSN, a.peerLastTSN) {
 | |
| 		a.log.Debugf("[%s] resetStream(): senderLastTSN=%d <= peerLastTSN=%d",
 | |
| 			a.name, p.senderLastTSN, a.peerLastTSN)
 | |
| 		for _, id := range p.streamIdentifiers {
 | |
| 			s, ok := a.streams[id]
 | |
| 			if !ok {
 | |
| 				continue
 | |
| 			}
 | |
| 			a.unregisterStream(s, io.EOF)
 | |
| 		}
 | |
| 		delete(a.reconfigRequests, p.reconfigRequestSequenceNumber)
 | |
| 	} else {
 | |
| 		a.log.Debugf("[%s] resetStream(): senderLastTSN=%d > peerLastTSN=%d",
 | |
| 			a.name, p.senderLastTSN, a.peerLastTSN)
 | |
| 		result = reconfigResultInProgress
 | |
| 	}
 | |
| 
 | |
| 	return a.createPacket([]chunk{&chunkReconfig{
 | |
| 		paramA: ¶mReconfigResponse{
 | |
| 			reconfigResponseSequenceNumber: p.reconfigRequestSequenceNumber,
 | |
| 			result:                         result,
 | |
| 		},
 | |
| 	}})
 | |
| }
 | |
| 
 | |
| // Move the chunk peeked with a.pendingQueue.peek() to the inflightQueue.
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) movePendingDataChunkToInflightQueue(c *chunkPayloadData) {
 | |
| 	if err := a.pendingQueue.pop(c); err != nil {
 | |
| 		a.log.Errorf("[%s] failed to pop from pending queue: %s", a.name, err.Error())
 | |
| 	}
 | |
| 
 | |
| 	// Mark all fragements are in-flight now
 | |
| 	if c.endingFragment {
 | |
| 		c.setAllInflight()
 | |
| 	}
 | |
| 
 | |
| 	// Assign TSN
 | |
| 	c.tsn = a.generateNextTSN()
 | |
| 
 | |
| 	c.since = time.Now() // use to calculate RTT and also for maxPacketLifeTime
 | |
| 	c.nSent = 1          // being sent for the first time
 | |
| 
 | |
| 	a.checkPartialReliabilityStatus(c)
 | |
| 
 | |
| 	a.log.Tracef("[%s] sending ppi=%d tsn=%d ssn=%d sent=%d len=%d (%v,%v)",
 | |
| 		a.name, c.payloadType, c.tsn, c.streamSequenceNumber, c.nSent, len(c.userData), c.beginningFragment, c.endingFragment)
 | |
| 
 | |
| 	// Push it into the inflightQueue
 | |
| 	a.inflightQueue.pushNoCheck(c)
 | |
| }
 | |
| 
 | |
| // popPendingDataChunksToSend pops chunks from the pending queues as many as
 | |
| // the cwnd and rwnd allows to send.
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint16) {
 | |
| 	chunks := []*chunkPayloadData{}
 | |
| 	var sisToReset []uint16 // stream identifieres to reset
 | |
| 
 | |
| 	if a.pendingQueue.size() > 0 {
 | |
| 		// RFC 4960 sec 6.1.  Transmission of DATA Chunks
 | |
| 		//   A) At any given time, the data sender MUST NOT transmit new data to
 | |
| 		//      any destination transport address if its peer's rwnd indicates
 | |
| 		//      that the peer has no buffer space (i.e., rwnd is 0; see Section
 | |
| 		//      6.2.1).  However, regardless of the value of rwnd (including if it
 | |
| 		//      is 0), the data sender can always have one DATA chunk in flight to
 | |
| 		//      the receiver if allowed by cwnd (see rule B, below).
 | |
| 
 | |
| 		for {
 | |
| 			c := a.pendingQueue.peek()
 | |
| 			if c == nil {
 | |
| 				break // no more pending data
 | |
| 			}
 | |
| 
 | |
| 			dataLen := uint32(len(c.userData))
 | |
| 			if dataLen == 0 {
 | |
| 				sisToReset = append(sisToReset, c.streamIdentifier)
 | |
| 				err := a.pendingQueue.pop(c)
 | |
| 				if err != nil {
 | |
| 					a.log.Errorf("failed to pop from pending queue: %s", err.Error())
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			if uint32(a.inflightQueue.getNumBytes())+dataLen > a.cwnd {
 | |
| 				break // would exceeds cwnd
 | |
| 			}
 | |
| 
 | |
| 			if dataLen > a.rwnd {
 | |
| 				break // no more rwnd
 | |
| 			}
 | |
| 
 | |
| 			a.rwnd -= dataLen
 | |
| 
 | |
| 			a.movePendingDataChunkToInflightQueue(c)
 | |
| 			chunks = append(chunks, c)
 | |
| 		}
 | |
| 
 | |
| 		// the data sender can always have one DATA chunk in flight to the receiver
 | |
| 		if len(chunks) == 0 && a.inflightQueue.size() == 0 {
 | |
| 			// Send zero window probe
 | |
| 			c := a.pendingQueue.peek()
 | |
| 			if c != nil {
 | |
| 				a.movePendingDataChunkToInflightQueue(c)
 | |
| 				chunks = append(chunks, c)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return chunks, sisToReset
 | |
| }
 | |
| 
 | |
| // bundleDataChunksIntoPackets packs DATA chunks into packets. It tries to bundle
 | |
| // DATA chunks into a packet so long as the resulting packet size does not exceed
 | |
| // the path MTU.
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) bundleDataChunksIntoPackets(chunks []*chunkPayloadData) []*packet {
 | |
| 	packets := []*packet{}
 | |
| 	chunksToSend := []chunk{}
 | |
| 	bytesInPacket := int(commonHeaderSize)
 | |
| 
 | |
| 	for _, c := range chunks {
 | |
| 		// RFC 4960 sec 6.1.  Transmission of DATA Chunks
 | |
| 		//   Multiple DATA chunks committed for transmission MAY be bundled in a
 | |
| 		//   single packet.  Furthermore, DATA chunks being retransmitted MAY be
 | |
| 		//   bundled with new DATA chunks, as long as the resulting packet size
 | |
| 		//   does not exceed the path MTU.
 | |
| 		if bytesInPacket+len(c.userData) > int(a.mtu) {
 | |
| 			packets = append(packets, a.createPacket(chunksToSend))
 | |
| 			chunksToSend = []chunk{}
 | |
| 			bytesInPacket = int(commonHeaderSize)
 | |
| 		}
 | |
| 
 | |
| 		chunksToSend = append(chunksToSend, c)
 | |
| 		bytesInPacket += int(dataChunkHeaderSize) + len(c.userData)
 | |
| 	}
 | |
| 
 | |
| 	if len(chunksToSend) > 0 {
 | |
| 		packets = append(packets, a.createPacket(chunksToSend))
 | |
| 	}
 | |
| 
 | |
| 	return packets
 | |
| }
 | |
| 
 | |
| // sendPayloadData sends the data chunks.
 | |
| func (a *Association) sendPayloadData(chunks []*chunkPayloadData) error {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	state := a.getState()
 | |
| 	if state != established {
 | |
| 		return errors.Errorf("sending payload data in non-established state: state=%s",
 | |
| 			getAssociationStateString(state))
 | |
| 	}
 | |
| 
 | |
| 	// Push the chunks into the pending queue first.
 | |
| 	for _, c := range chunks {
 | |
| 		a.pendingQueue.push(c)
 | |
| 	}
 | |
| 
 | |
| 	a.awakeWriteLoop()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) checkPartialReliabilityStatus(c *chunkPayloadData) {
 | |
| 	if !a.useForwardTSN {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// draft-ietf-rtcweb-data-protocol-09.txt section 6
 | |
| 	//	6.  Procedures
 | |
| 	//		All Data Channel Establishment Protocol messages MUST be sent using
 | |
| 	//		ordered delivery and reliable transmission.
 | |
| 	//
 | |
| 	if c.payloadType == PayloadTypeWebRTCDCEP {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// PR-SCTP
 | |
| 	if s, ok := a.streams[c.streamIdentifier]; ok {
 | |
| 		s.lock.RLock()
 | |
| 		if s.reliabilityType == ReliabilityTypeRexmit {
 | |
| 			if c.nSent >= s.reliabilityValue {
 | |
| 				c.setAbandoned(true)
 | |
| 				a.log.Tracef("[%s] marked as abandoned: tsn=%d ppi=%d (remix: %d)", a.name, c.tsn, c.payloadType, c.nSent)
 | |
| 			}
 | |
| 		} else if s.reliabilityType == ReliabilityTypeTimed {
 | |
| 			elapsed := int64(time.Since(c.since).Seconds() * 1000)
 | |
| 			if elapsed >= int64(s.reliabilityValue) {
 | |
| 				c.setAbandoned(true)
 | |
| 				a.log.Tracef("[%s] marked as abandoned: tsn=%d ppi=%d (timed: %d)", a.name, c.tsn, c.payloadType, elapsed)
 | |
| 			}
 | |
| 		}
 | |
| 		s.lock.RUnlock()
 | |
| 	} else {
 | |
| 		a.log.Errorf("[%s] stream %d not found)", a.name, c.streamIdentifier)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // getDataPacketsToRetransmit is called when T3-rtx is timed out and retransmit outstanding data chunks
 | |
| // that are not acked or abandoned yet.
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) getDataPacketsToRetransmit() []*packet {
 | |
| 	awnd := min32(a.cwnd, a.rwnd)
 | |
| 	chunks := []*chunkPayloadData{}
 | |
| 	var bytesToSend int
 | |
| 	var done bool
 | |
| 
 | |
| 	for i := 0; !done; i++ {
 | |
| 		c, ok := a.inflightQueue.get(a.cumulativeTSNAckPoint + uint32(i) + 1)
 | |
| 		if !ok {
 | |
| 			break // end of pending data
 | |
| 		}
 | |
| 
 | |
| 		if !c.retransmit {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if i == 0 && int(a.rwnd) < len(c.userData) {
 | |
| 			// Send it as a zero window probe
 | |
| 			done = true
 | |
| 		} else if bytesToSend+len(c.userData) > int(awnd) {
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		// reset the retransmit flag not to retransmit again before the next
 | |
| 		// t3-rtx timer fires
 | |
| 		c.retransmit = false
 | |
| 		bytesToSend += len(c.userData)
 | |
| 
 | |
| 		c.nSent++
 | |
| 
 | |
| 		a.checkPartialReliabilityStatus(c)
 | |
| 
 | |
| 		a.log.Tracef("[%s] retransmitting tsn=%d ssn=%d sent=%d", a.name, c.tsn, c.streamSequenceNumber, c.nSent)
 | |
| 
 | |
| 		chunks = append(chunks, c)
 | |
| 	}
 | |
| 
 | |
| 	return a.bundleDataChunksIntoPackets(chunks)
 | |
| }
 | |
| 
 | |
| // generateNextTSN returns the myNextTSN and increases it. The caller should hold the lock.
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) generateNextTSN() uint32 {
 | |
| 	tsn := a.myNextTSN
 | |
| 	a.myNextTSN++
 | |
| 	return tsn
 | |
| }
 | |
| 
 | |
| // generateNextRSN returns the myNextRSN and increases it. The caller should hold the lock.
 | |
| // The caller should hold the lock.
 | |
| func (a *Association) generateNextRSN() uint32 {
 | |
| 	rsn := a.myNextRSN
 | |
| 	a.myNextRSN++
 | |
| 	return rsn
 | |
| }
 | |
| 
 | |
| func (a *Association) createSelectiveAckChunk() *chunkSelectiveAck {
 | |
| 	sack := &chunkSelectiveAck{}
 | |
| 	sack.cumulativeTSNAck = a.peerLastTSN
 | |
| 	sack.advertisedReceiverWindowCredit = a.getMyReceiverWindowCredit()
 | |
| 	sack.duplicateTSN = a.payloadQueue.popDuplicates()
 | |
| 	sack.gapAckBlocks = a.payloadQueue.getGapAckBlocks(a.peerLastTSN)
 | |
| 	return sack
 | |
| }
 | |
| 
 | |
| func pack(p *packet) []*packet {
 | |
| 	return []*packet{p}
 | |
| }
 | |
| 
 | |
| func (a *Association) handleChunkStart() {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	a.delayedAckTriggered = false
 | |
| 	a.immediateAckTriggered = false
 | |
| }
 | |
| 
 | |
| func (a *Association) handleChunkEnd() {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	if a.immediateAckTriggered {
 | |
| 		// Send SACK now!
 | |
| 		a.ackState = ackStateImmediate
 | |
| 		a.ackTimer.stop()
 | |
| 		a.awakeWriteLoop()
 | |
| 	} else if a.delayedAckTriggered {
 | |
| 		// Will send delayed ack in the next ack timeout
 | |
| 		a.ackState = ackStateDelay
 | |
| 		a.ackTimer.start()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (a *Association) handleChunk(p *packet, c chunk) error {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	var packets []*packet
 | |
| 	var err error
 | |
| 
 | |
| 	if _, err = c.check(); err != nil {
 | |
| 		a.log.Errorf("[ %s ] failed validating chunk: %s ", a.name, err)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	switch c := c.(type) {
 | |
| 	case *chunkInit:
 | |
| 		packets, err = a.handleInit(p, c)
 | |
| 
 | |
| 	case *chunkInitAck:
 | |
| 		err = a.handleInitAck(p, c)
 | |
| 
 | |
| 	case *chunkAbort:
 | |
| 		var errStr string
 | |
| 		for _, e := range c.errorCauses {
 | |
| 			errStr += fmt.Sprintf("(%s)", e)
 | |
| 		}
 | |
| 		return fmt.Errorf("[%s] %w: %s", a.name, errChunk, errStr)
 | |
| 
 | |
| 	case *chunkError:
 | |
| 		var errStr string
 | |
| 		for _, e := range c.errorCauses {
 | |
| 			errStr += fmt.Sprintf("(%s)", e)
 | |
| 		}
 | |
| 		a.log.Debugf("[%s] Error chunk, with following errors: %s", a.name, errStr)
 | |
| 
 | |
| 	case *chunkHeartbeat:
 | |
| 		packets = a.handleHeartbeat(c)
 | |
| 
 | |
| 	case *chunkCookieEcho:
 | |
| 		packets = a.handleCookieEcho(c)
 | |
| 
 | |
| 	case *chunkCookieAck:
 | |
| 		a.handleCookieAck()
 | |
| 
 | |
| 	case *chunkPayloadData:
 | |
| 		packets = a.handleData(c)
 | |
| 
 | |
| 	case *chunkSelectiveAck:
 | |
| 		err = a.handleSack(c)
 | |
| 
 | |
| 	case *chunkReconfig:
 | |
| 		packets, err = a.handleReconfig(c)
 | |
| 
 | |
| 	case *chunkForwardTSN:
 | |
| 		packets = a.handleForwardTSN(c)
 | |
| 
 | |
| 	default:
 | |
| 		err = errors.Errorf("unhandled chunk type")
 | |
| 	}
 | |
| 
 | |
| 	// Log and return, the only condition that is fatal is a ABORT chunk
 | |
| 	if err != nil {
 | |
| 		a.log.Errorf("Failed to handle chunk: %v", err)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if len(packets) > 0 {
 | |
| 		a.controlQueue.pushAll(packets)
 | |
| 		a.awakeWriteLoop()
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (a *Association) onRetransmissionTimeout(id int, nRtos uint) {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	if id == timerT1Init {
 | |
| 		err := a.sendInit()
 | |
| 		if err != nil {
 | |
| 			a.log.Debugf("[%s] failed to retransmit init (nRtos=%d): %v", a.name, nRtos, err)
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if id == timerT1Cookie {
 | |
| 		err := a.sendCookieEcho()
 | |
| 		if err != nil {
 | |
| 			a.log.Debugf("[%s] failed to retransmit cookie-echo (nRtos=%d): %v", a.name, nRtos, err)
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if id == timerT3RTX {
 | |
| 		a.stats.incT3Timeouts()
 | |
| 
 | |
| 		// RFC 4960 sec 6.3.3
 | |
| 		//  E1)  For the destination address for which the timer expires, adjust
 | |
| 		//       its ssthresh with rules defined in Section 7.2.3 and set the
 | |
| 		//       cwnd <- MTU.
 | |
| 		// RFC 4960 sec 7.2.3
 | |
| 		//   When the T3-rtx timer expires on an address, SCTP should perform slow
 | |
| 		//   start by:
 | |
| 		//      ssthresh = max(cwnd/2, 4*MTU)
 | |
| 		//      cwnd = 1*MTU
 | |
| 
 | |
| 		a.ssthresh = max32(a.cwnd/2, 4*a.mtu)
 | |
| 		a.cwnd = a.mtu
 | |
| 		a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (RTO)",
 | |
| 			a.name, a.cwnd, a.ssthresh, a.inflightQueue.getNumBytes())
 | |
| 
 | |
| 		// RFC 3758 sec 3.5
 | |
| 		//  A5) Any time the T3-rtx timer expires, on any destination, the sender
 | |
| 		//  SHOULD try to advance the "Advanced.Peer.Ack.Point" by following
 | |
| 		//  the procedures outlined in C2 - C5.
 | |
| 		if a.useForwardTSN {
 | |
| 			// RFC 3758 Sec 3.5 C2
 | |
| 			for i := a.advancedPeerTSNAckPoint + 1; ; i++ {
 | |
| 				c, ok := a.inflightQueue.get(i)
 | |
| 				if !ok {
 | |
| 					break
 | |
| 				}
 | |
| 				if !c.abandoned() {
 | |
| 					break
 | |
| 				}
 | |
| 				a.advancedPeerTSNAckPoint = i
 | |
| 			}
 | |
| 
 | |
| 			// RFC 3758 Sec 3.5 C3
 | |
| 			if sna32GT(a.advancedPeerTSNAckPoint, a.cumulativeTSNAckPoint) {
 | |
| 				a.willSendForwardTSN = true
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		a.log.Debugf("[%s] T3-rtx timed out: nRtos=%d cwnd=%d ssthresh=%d", a.name, nRtos, a.cwnd, a.ssthresh)
 | |
| 
 | |
| 		/*
 | |
| 			a.log.Debugf("   - advancedPeerTSNAckPoint=%d", a.advancedPeerTSNAckPoint)
 | |
| 			a.log.Debugf("   - cumulativeTSNAckPoint=%d", a.cumulativeTSNAckPoint)
 | |
| 			a.inflightQueue.updateSortedKeys()
 | |
| 			for i, tsn := range a.inflightQueue.sorted {
 | |
| 				if c, ok := a.inflightQueue.get(tsn); ok {
 | |
| 					a.log.Debugf("   - [%d] tsn=%d acked=%v abandoned=%v (%v,%v) len=%d",
 | |
| 						i, c.tsn, c.acked, c.abandoned(), c.beginningFragment, c.endingFragment, len(c.userData))
 | |
| 				}
 | |
| 			}
 | |
| 		*/
 | |
| 
 | |
| 		a.inflightQueue.markAllToRetrasmit()
 | |
| 		a.awakeWriteLoop()
 | |
| 
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if id == timerReconfig {
 | |
| 		a.willRetransmitReconfig = true
 | |
| 		a.awakeWriteLoop()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (a *Association) onRetransmissionFailure(id int) {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	if id == timerT1Init {
 | |
| 		a.log.Errorf("[%s] retransmission failure: T1-init", a.name)
 | |
| 		a.handshakeCompletedCh <- errors.Errorf("handshake failed (INIT ACK)")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if id == timerT1Cookie {
 | |
| 		a.log.Errorf("[%s] retransmission failure: T1-cookie", a.name)
 | |
| 		a.handshakeCompletedCh <- errors.Errorf("handshake failed (COOKIE ECHO)")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if id == timerT3RTX {
 | |
| 		// T3-rtx timer will not fail by design
 | |
| 		// Justifications:
 | |
| 		//  * ICE would fail if the connectivity is lost
 | |
| 		//  * WebRTC spec is not clear how this incident should be reported to ULP
 | |
| 		a.log.Errorf("[%s] retransmission failure: T3-rtx (DATA)", a.name)
 | |
| 		return
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (a *Association) onAckTimeout() {
 | |
| 	a.lock.Lock()
 | |
| 	defer a.lock.Unlock()
 | |
| 
 | |
| 	a.log.Tracef("[%s] ack timed out (ackState: %d)", a.name, a.ackState)
 | |
| 	a.stats.incAckTimeouts()
 | |
| 
 | |
| 	a.ackState = ackStateImmediate
 | |
| 	a.awakeWriteLoop()
 | |
| }
 | |
| 
 | |
| // bufferedAmount returns total amount (in bytes) of currently buffered user data.
 | |
| // This is used only by testing.
 | |
| func (a *Association) bufferedAmount() int {
 | |
| 	a.lock.RLock()
 | |
| 	defer a.lock.RUnlock()
 | |
| 
 | |
| 	return a.pendingQueue.getNumBytes() + a.inflightQueue.getNumBytes()
 | |
| }
 | |
| 
 | |
| // MaxMessageSize returns the maximum message size you can send.
 | |
| func (a *Association) MaxMessageSize() uint32 {
 | |
| 	return atomic.LoadUint32(&a.maxMessageSize)
 | |
| }
 | |
| 
 | |
| // SetMaxMessageSize sets the maximum message size you can send.
 | |
| func (a *Association) SetMaxMessageSize(maxMsgSize uint32) {
 | |
| 	atomic.StoreUint32(&a.maxMessageSize, maxMsgSize)
 | |
| }
 |