// Copyright (c) 2024 Winlin // // SPDX-License-Identifier: MIT package rtmp import ( "bufio" "bytes" "context" "encoding" "encoding/binary" "fmt" "io" "math/rand" "sync" "srs-proxy/errors" ) // The handshake implements the RTMP handshake protocol. type Handshake struct { // The random number generator. r *rand.Rand // The c1s1 cache. c1s1 []byte } func NewHandshake(r *rand.Rand) *Handshake { return &Handshake{r: r} } func (v *Handshake) C1S1() []byte { return v.c1s1 } func (v *Handshake) WriteC0S0(w io.Writer) (err error) { r := bytes.NewReader([]byte{0x03}) if _, err = io.Copy(w, r); err != nil { return errors.Wrap(err, "write c0s0") } return } func (v *Handshake) ReadC0S0(r io.Reader) (c0 []byte, err error) { b := &bytes.Buffer{} if _, err = io.CopyN(b, r, 1); err != nil { return nil, errors.Wrap(err, "read c0s0") } c0 = b.Bytes() return } func (v *Handshake) WriteC1S1(w io.Writer) (err error) { p := make([]byte, 1536) for i := 8; i < len(p); i++ { p[i] = byte(v.r.Int()) } r := bytes.NewReader(p) if _, err = io.Copy(w, r); err != nil { return errors.Wrap(err, "write c0s1") } return } func (v *Handshake) ReadC1S1(r io.Reader) (c1s1 []byte, err error) { b := &bytes.Buffer{} if _, err = io.CopyN(b, r, 1536); err != nil { return nil, errors.Wrap(err, "read c1s1") } c1s1 = b.Bytes() v.c1s1 = c1s1 return } func (v *Handshake) WriteC2S2(w io.Writer, s1c1 []byte) (err error) { r := bytes.NewReader(s1c1[:]) if _, err = io.Copy(w, r); err != nil { return errors.Wrap(err, "write c2s2") } return } func (v *Handshake) ReadC2S2(r io.Reader) (c2 []byte, err error) { b := &bytes.Buffer{} if _, err = io.CopyN(b, r, 1536); err != nil { return nil, errors.Wrap(err, "read c2s2") } c2 = b.Bytes() return } // Please read @doc rtmp_specification_1.0.pdf, @page 16, @section 6.1. Chunk Format // Extended timestamp: 0 or 4 bytes // This field MUST be sent when the normal timsestamp is set to // 0xffffff, it MUST NOT be sent if the normal timestamp is set to // anything else. So for values less than 0xffffff the normal // timestamp field SHOULD be used in which case the extended timestamp // MUST NOT be present. For values greater than or equal to 0xffffff // the normal timestamp field MUST NOT be used and MUST be set to // 0xffffff and the extended timestamp MUST be sent. const extendedTimestamp = uint64(0xffffff) // The default chunk size of RTMP is 128 bytes. const defaultChunkSize = 128 // The intput or output settings for RTMP protocol. type settings struct { chunkSize uint32 } func newSettings() *settings { return &settings{ chunkSize: defaultChunkSize, } } // The chunk stream which transport a message once. type chunkStream struct { format formatType cid chunkID header messageHeader message *Message count uint64 extendedTimestamp bool } func newChunkStream() *chunkStream { return &chunkStream{} } // The protocol implements the RTMP command and chunk stack. type Protocol struct { r *bufio.Reader w *bufio.Writer input struct { opt *settings chunks map[chunkID]*chunkStream transactions map[amf0Number]amf0String ltransactions sync.Mutex } output struct { opt *settings } } func NewProtocol(rw io.ReadWriter) *Protocol { v := &Protocol{ r: bufio.NewReader(rw), w: bufio.NewWriter(rw), } v.input.opt = newSettings() v.input.chunks = map[chunkID]*chunkStream{} v.input.transactions = map[amf0Number]amf0String{} v.output.opt = newSettings() return v } func ExpectPacket[T Packet](ctx context.Context, v *Protocol, ppkt *T) (m *Message, err error) { for { if m, err = v.ReadMessage(ctx); err != nil { return nil, errors.WithMessage(err, "read message") } var pkt Packet if pkt, err = v.DecodeMessage(m); err != nil { return nil, errors.WithMessage(err, "decode message") } if p, ok := pkt.(T); ok { *ppkt = p break } } return } // Deprecated: Please use rtmp.ExpectPacket instead. func (v *Protocol) ExpectPacket(ctx context.Context, ppkt any) (m *Message, err error) { panic("Please use rtmp.ExpectPacket instead") } func (v *Protocol) ExpectMessage(ctx context.Context, types ...MessageType) (m *Message, err error) { for { if m, err = v.ReadMessage(ctx); err != nil { return nil, errors.WithMessage(err, "read message") } if len(types) == 0 { return } for _, t := range types { if m.MessageType == t { return } } } return } func (v *Protocol) parseAMFObject(p []byte) (pkt Packet, err error) { var commandName amf0String if err = commandName.UnmarshalBinary(p); err != nil { return nil, errors.WithMessage(err, "unmarshal command name") } switch commandName { case commandResult, commandError: var transactionID amf0Number if err = transactionID.UnmarshalBinary(p[commandName.Size():]); err != nil { return nil, errors.WithMessage(err, "unmarshal tid") } var requestName amf0String if err = func() error { v.input.ltransactions.Lock() defer v.input.ltransactions.Unlock() var ok bool if requestName, ok = v.input.transactions[transactionID]; !ok { return errors.Errorf("No matched request for tid=%v", transactionID) } delete(v.input.transactions, transactionID) return nil }(); err != nil { return nil, errors.WithMessage(err, "discovery request name") } switch requestName { case commandConnect: return NewConnectAppResPacket(transactionID), nil case commandCreateStream: return NewCreateStreamResPacket(transactionID), nil case commandReleaseStream, commandFCPublish, commandFCUnpublish: call := NewCallPacket() call.TransactionID = transactionID return call, nil default: return nil, errors.Errorf("No request for %v", string(requestName)) } case commandConnect: return NewConnectAppPacket(), nil case commandPublish: return NewPublishPacket(), nil case commandPlay: return NewPlayPacket(), nil default: return NewCallPacket(), nil } } func (v *Protocol) DecodeMessage(m *Message) (pkt Packet, err error) { p := m.Payload[:] if len(p) == 0 { return nil, errors.New("Empty packet") } switch m.MessageType { case MessageTypeAMF3Command, MessageTypeAMF3Data: p = p[1:] } switch m.MessageType { case MessageTypeSetChunkSize: pkt = NewSetChunkSize() case MessageTypeWindowAcknowledgementSize: pkt = NewWindowAcknowledgementSize() case MessageTypeSetPeerBandwidth: pkt = NewSetPeerBandwidth() case MessageTypeAMF0Command, MessageTypeAMF3Command, MessageTypeAMF0Data, MessageTypeAMF3Data: if pkt, err = v.parseAMFObject(p); err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("Parse AMF %v", m.MessageType)) } case MessageTypeUserControl: pkt = NewUserControl() default: return nil, errors.Errorf("Unknown message %v", m.MessageType) } if err = pkt.UnmarshalBinary(p); err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("Unmarshal %v", m.MessageType)) } return } func (v *Protocol) ReadMessage(ctx context.Context) (m *Message, err error) { for m == nil { // TODO: We should convert buffered io to async io, because we will be stuck in block io here, // TODO: but the risk is acceptable because we literally will set the underlay io timeout. if ctx.Err() != nil { return nil, ctx.Err() } var cid chunkID var format formatType if format, cid, err = v.readBasicHeader(ctx); err != nil { return nil, errors.WithMessage(err, "read basic header") } var ok bool var chunk *chunkStream if chunk, ok = v.input.chunks[cid]; !ok { chunk = newChunkStream() v.input.chunks[cid] = chunk chunk.header.betterCid = cid } if err = v.readMessageHeader(ctx, chunk, format); err != nil { return nil, errors.WithMessage(err, "read message header") } if m, err = v.readMessagePayload(ctx, chunk); err != nil { return nil, errors.WithMessage(err, "read message payload") } if err = v.onMessageArrivated(m); err != nil { return nil, errors.WithMessage(err, "on message") } } return } func (v *Protocol) readMessagePayload(ctx context.Context, chunk *chunkStream) (m *Message, err error) { // Empty payload message. if chunk.message.payloadLength == 0 { m = chunk.message chunk.message = nil return } // Calculate the chunk payload size. chunkedPayloadSize := int(chunk.message.payloadLength) - len(chunk.message.Payload) if chunkedPayloadSize > int(v.input.opt.chunkSize) { chunkedPayloadSize = int(v.input.opt.chunkSize) } b := make([]byte, chunkedPayloadSize) if _, err = io.ReadFull(v.r, b); err != nil { return nil, errors.Wrapf(err, "read chunk %vB", chunkedPayloadSize) } chunk.message.Payload = append(chunk.message.Payload, b...) // Got entire RTMP message? if int(chunk.message.payloadLength) == len(chunk.message.Payload) { m = chunk.message chunk.message = nil } return } // Please read @doc rtmp_specification_1.0.pdf, @page 18, @section 6.1.2. Chunk Message Header // There are four different formats for the chunk message header, // selected by the "fmt" field in the chunk basic header. type formatType uint8 const ( // 6.1.2.1. Type 0 // Chunks of Type 0 are 11 bytes long. This type MUST be used at the // start of a chunk stream, and whenever the stream timestamp goes // backward (e.g., because of a backward seek). formatType0 formatType = iota // 6.1.2.2. Type 1 // Chunks of Type 1 are 7 bytes long. The message stream ID is not // included; this chunk takes the same stream ID as the preceding chunk. // Streams with variable-sized messages (for example, many video // formats) SHOULD use this format for the first chunk of each new // message after the first. formatType1 // 6.1.2.3. Type 2 // Chunks of Type 2 are 3 bytes long. Neither the stream ID nor the // message length is included; this chunk has the same stream ID and // message length as the preceding chunk. Streams with constant-sized // messages (for example, some audio and data formats) SHOULD use this // format for the first chunk of each message after the first. formatType2 // 6.1.2.4. Type 3 // Chunks of Type 3 have no header. Stream ID, message length and // timestamp delta are not present; chunks of this type take values from // the preceding chunk. When a single message is split into chunks, all // chunks of a message except the first one, SHOULD use this type. Refer // to example 2 in section 6.2.2. Stream consisting of messages of // exactly the same size, stream ID and spacing in time SHOULD use this // type for all chunks after chunk of Type 2. Refer to example 1 in // section 6.2.1. If the delta between the first message and the second // message is same as the time stamp of first message, then chunk of // type 3 would immediately follow the chunk of type 0 as there is no // need for a chunk of type 2 to register the delta. If Type 3 chunk // follows a Type 0 chunk, then timestamp delta for this Type 3 chunk is // the same as the timestamp of Type 0 chunk. formatType3 ) // The message header size, index is format. var messageHeaderSizes = []int{11, 7, 3, 0} // Parse the chunk message header. // 3bytes: timestamp delta, fmt=0,1,2 // 3bytes: payload length, fmt=0,1 // 1bytes: message type, fmt=0,1 // 4bytes: stream id, fmt=0 // where: // fmt=0, 0x0X // fmt=1, 0x4X // fmt=2, 0x8X // fmt=3, 0xCX func (v *Protocol) readMessageHeader(ctx context.Context, chunk *chunkStream, format formatType) (err error) { // We should not assert anything about fmt, for the first packet. // (when first packet, the chunk.message is nil). // the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet. // the previous packet is: // 04 // fmt=0, cid=4 // 00 00 1a // timestamp=26 // 00 00 9d // payload_length=157 // 08 // message_type=8(audio) // 01 00 00 00 // stream_id=1 // the current packet maybe: // c4 // fmt=3, cid=4 // it's ok, for the packet is audio, and timestamp delta is 26. // the current packet must be parsed as: // fmt=0, cid=4 // timestamp=26+26=52 // payload_length=157 // message_type=8(audio) // stream_id=1 // so we must update the timestamp even fmt=3 for first packet. // // The fresh packet used to update the timestamp even fmt=3 for first packet. // fresh packet always means the chunk is the first one of message. var isFirstChunkOfMsg bool if chunk.message == nil { isFirstChunkOfMsg = true } // But, we can ensure that when a chunk stream is fresh, // the fmt must be 0, a new stream. if chunk.count == 0 && format != formatType0 { // For librtmp, if ping, it will send a fresh stream with fmt=1, // 0x42 where: fmt=1, cid=2, protocol contorl user-control message // 0x00 0x00 0x00 where: timestamp=0 // 0x00 0x00 0x06 where: payload_length=6 // 0x04 where: message_type=4(protocol control user-control message) // 0x00 0x06 where: event Ping(0x06) // 0x00 0x00 0x0d 0x0f where: event data 4bytes ping timestamp. // @see: https://github.com/ossrs/srs/issues/98 if chunk.cid == chunkIDProtocolControl && format == formatType1 { // We accept cid=2, fmt=1 to make librtmp happy. } else { return errors.Errorf("For fresh chunk, fmt %v != %v(required), cid is %v", format, formatType0, chunk.cid) } } // When exists cache msg, means got an partial message, // the fmt must not be type0 which means new message. if chunk.message != nil && format == formatType0 { return errors.Errorf("For exists chunk, fmt is %v, cid is %v", format, chunk.cid) } // Create msg when new chunk stream start if chunk.message == nil { chunk.message = NewMessage() } // Read the message header. p := make([]byte, messageHeaderSizes[format]) if _, err = io.ReadFull(v.r, p); err != nil { return errors.Wrapf(err, "read %vB message header", len(p)) } // Prse the message header. // 3bytes: timestamp delta, fmt=0,1,2 // 3bytes: payload length, fmt=0,1 // 1bytes: message type, fmt=0,1 // 4bytes: stream id, fmt=0 // where: // fmt=0, 0x0X // fmt=1, 0x4X // fmt=2, 0x8X // fmt=3, 0xCX if format <= formatType2 { chunk.header.timestampDelta = uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2]) p = p[3:] // fmt: 0 // timestamp: 3 bytes // If the timestamp is greater than or equal to 16777215 // (hexadecimal 0x00ffffff), this value MUST be 16777215, and the // 'extended timestamp header' MUST be present. Otherwise, this value // SHOULD be the entire timestamp. // // fmt: 1 or 2 // timestamp delta: 3 bytes // If the delta is greater than or equal to 16777215 (hexadecimal // 0x00ffffff), this value MUST be 16777215, and the 'extended // timestamp header' MUST be present. Otherwise, this value SHOULD be // the entire delta. chunk.extendedTimestamp = uint64(chunk.header.timestampDelta) >= extendedTimestamp if !chunk.extendedTimestamp { // Extended timestamp: 0 or 4 bytes // This field MUST be sent when the normal timsestamp is set to // 0xffffff, it MUST NOT be sent if the normal timestamp is set to // anything else. So for values less than 0xffffff the normal // timestamp field SHOULD be used in which case the extended timestamp // MUST NOT be present. For values greater than or equal to 0xffffff // the normal timestamp field MUST NOT be used and MUST be set to // 0xffffff and the extended timestamp MUST be sent. if format == formatType0 { // 6.1.2.1. Type 0 // For a type-0 chunk, the absolute timestamp of the message is sent // here. chunk.header.Timestamp = uint64(chunk.header.timestampDelta) } else { // 6.1.2.2. Type 1 // 6.1.2.3. Type 2 // For a type-1 or type-2 chunk, the difference between the previous // chunk's timestamp and the current chunk's timestamp is sent here. chunk.header.Timestamp += uint64(chunk.header.timestampDelta) } } if format <= formatType1 { payloadLength := uint32(p[0])<<16 | uint32(p[1])<<8 | uint32(p[2]) p = p[3:] // For a message, if msg exists in cache, the size must not changed. // always use the actual msg size to compare, for the cache payload length can changed, // for the fmt type1(stream_id not changed), user can change the payload // length(it's not allowed in the continue chunks). if !isFirstChunkOfMsg && chunk.header.payloadLength != payloadLength { return errors.Errorf("Chunk message size %v != %v(required)", payloadLength, chunk.header.payloadLength) } chunk.header.payloadLength = payloadLength chunk.header.MessageType = MessageType(p[0]) p = p[1:] if format == formatType0 { chunk.header.streamID = uint32(p[0]) | uint32(p[1])<<8 | uint32(p[2])<<16 | uint32(p[3])<<24 p = p[4:] } } } else { // Update the timestamp even fmt=3 for first chunk packet if isFirstChunkOfMsg && !chunk.extendedTimestamp { chunk.header.Timestamp += uint64(chunk.header.timestampDelta) } } // Read extended-timestamp if chunk.extendedTimestamp { var timestamp uint32 if err = binary.Read(v.r, binary.BigEndian, ×tamp); err != nil { return errors.Wrapf(err, "read ext-ts, pkt-ts=%v", chunk.header.Timestamp) } // We always use 31bits timestamp, for some server may use 32bits extended timestamp. // @see https://github.com/ossrs/srs/issues/111 timestamp &= 0x7fffffff // TODO: FIXME: Support detect the extended timestamp. // @see http://blog.csdn.net/win_lin/article/details/13363699 chunk.header.Timestamp = uint64(timestamp) } // The extended-timestamp must be unsigned-int, // 24bits timestamp: 0xffffff = 16777215ms = 16777.215s = 4.66h // 32bits timestamp: 0xffffffff = 4294967295ms = 4294967.295s = 1193.046h = 49.71d // because the rtmp protocol says the 32bits timestamp is about "50 days": // 3. Byte Order, Alignment, and Time Format // Because timestamps are generally only 32 bits long, they will roll // over after fewer than 50 days. // // but, its sample says the timestamp is 31bits: // An application could assume, for example, that all // adjacent timestamps are within 2^31 milliseconds of each other, so // 10000 comes after 4000000000, while 3000000000 comes before // 4000000000. // and flv specification says timestamp is 31bits: // Extension of the Timestamp field to form a SI32 value. This // field represents the upper 8 bits, while the previous // Timestamp field represents the lower 24 bits of the time in // milliseconds. // in a word, 31bits timestamp is ok. // convert extended timestamp to 31bits. chunk.header.Timestamp &= 0x7fffffff // Copy header to msg chunk.message.messageHeader = chunk.header // Increase the msg count, the chunk stream can accept fmt=1/2/3 message now. chunk.count++ return } // Please read @doc rtmp_specification_1.0.pdf, @page 17, @section 6.1.1. Chunk Basic Header // The Chunk Basic Header encodes the chunk stream ID and the chunk // type(represented by fmt field in the figure below). Chunk type // determines the format of the encoded message header. Chunk Basic // Header field may be 1, 2, or 3 bytes, depending on the chunk stream // ID. // // The bits 0-5 (least significant) in the chunk basic header represent // the chunk stream ID. // // Chunk stream IDs 2-63 can be encoded in the 1-byte version of this // field. // 0 1 2 3 4 5 6 7 // +-+-+-+-+-+-+-+-+ // |fmt| cs id | // +-+-+-+-+-+-+-+-+ // Figure 6 Chunk basic header 1 // // Chunk stream IDs 64-319 can be encoded in the 2-byte version of this // field. ID is computed as (the second byte + 64). // 0 1 // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // |fmt| 0 | cs id - 64 | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // Figure 7 Chunk basic header 2 // // Chunk stream IDs 64-65599 can be encoded in the 3-byte version of // this field. ID is computed as ((the third byte)*256 + the second byte // + 64). // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // |fmt| 1 | cs id - 64 | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // Figure 8 Chunk basic header 3 // // cs id: 6 bits // fmt: 2 bits // cs id - 64: 8 or 16 bits // // Chunk stream IDs with values 64-319 could be represented by both 2- // byte version and 3-byte version of this field. func (v *Protocol) readBasicHeader(ctx context.Context) (format formatType, cid chunkID, err error) { // 2-63, 1B chunk header var t uint8 if err = binary.Read(v.r, binary.BigEndian, &t); err != nil { return format, cid, errors.Wrap(err, "read basic header") } cid = chunkID(t & 0x3f) format = formatType((t >> 6) & 0x03) if cid > 1 { return } // 64-319, 2B chunk header if err = binary.Read(v.r, binary.BigEndian, &t); err != nil { return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid) } cid = chunkID(64 + uint32(t)) // 64-65599, 3B chunk header if cid == 1 { if err = binary.Read(v.r, binary.BigEndian, &t); err != nil { return format, cid, errors.Wrapf(err, "read basic header for cid=%v", cid) } cid += chunkID(uint32(t) * 256) } return } func (v *Protocol) WritePacket(ctx context.Context, pkt Packet, streamID int) (err error) { m := NewMessage() if m.Payload, err = pkt.MarshalBinary(); err != nil { return errors.WithMessage(err, "marshal payload") } m.MessageType = pkt.Type() m.streamID = uint32(streamID) m.betterCid = pkt.BetterCid() if err = v.WriteMessage(ctx, m); err != nil { return errors.WithMessage(err, "write message") } if err = v.onPacketWriten(m, pkt); err != nil { return errors.WithMessage(err, "on write packet") } return } func (v *Protocol) onPacketWriten(m *Message, pkt Packet) (err error) { var tid amf0Number var name amf0String switch pkt := pkt.(type) { case *ConnectAppPacket: tid, name = pkt.TransactionID, pkt.CommandName case *CreateStreamPacket: tid, name = pkt.TransactionID, pkt.CommandName case *CallPacket: tid, name = pkt.TransactionID, pkt.CommandName } if tid > 0 && len(name) > 0 { v.input.ltransactions.Lock() defer v.input.ltransactions.Unlock() v.input.transactions[tid] = name } return } func (v *Protocol) onMessageArrivated(m *Message) (err error) { if m == nil { return } var pkt Packet switch m.MessageType { case MessageTypeSetChunkSize, MessageTypeUserControl, MessageTypeWindowAcknowledgementSize: if pkt, err = v.DecodeMessage(m); err != nil { return errors.Errorf("decode message %v", m.MessageType) } } switch pkt := pkt.(type) { case *SetChunkSize: v.input.opt.chunkSize = pkt.ChunkSize } return } func (v *Protocol) WriteMessage(ctx context.Context, m *Message) (err error) { m.payloadLength = uint32(len(m.Payload)) var c0h, c3h []byte if c0h, err = m.generateC0Header(); err != nil { return errors.WithMessage(err, "generate c0 header") } if c3h, err = m.generateC3Header(); err != nil { return errors.WithMessage(err, "generate c3 header") } var h []byte p := m.Payload for len(p) > 0 { // TODO: We should convert buffered io to async io, because we will be stuck in block io here, // TODO: but the risk is acceptable because we literally will set the underlay io timeout. if ctx.Err() != nil { return ctx.Err() } if h == nil { h = c0h } else { h = c3h } if _, err = io.Copy(v.w, bytes.NewReader(h)); err != nil { return errors.Wrapf(err, "write c0c3 header %x", h) } size := len(p) if size > int(v.output.opt.chunkSize) { size = int(v.output.opt.chunkSize) } if _, err = io.Copy(v.w, bytes.NewReader(p[:size])); err != nil { return errors.Wrapf(err, "write chunk payload %vB", size) } p = p[size:] } // TODO: We should convert buffered io to async io, because we will be stuck in block io here, // TODO: but the risk is acceptable because we literally will set the underlay io timeout. if ctx.Err() != nil { return ctx.Err() } // TODO: FIXME: Use writev to write for high performance. if err = v.w.Flush(); err != nil { return errors.Wrapf(err, "flush writer") } return } // Please read @doc rtmp_specification_1.0.pdf, @page 30, @section 4.1. Message Header // 1byte. One byte field to represent the message type. A range of type IDs // (1-7) are reserved for protocol control messages. type MessageType uint8 const ( // Please read @doc rtmp_specification_1.0.pdf, @page 30, @section 5. Protocol Control Messages // RTMP reserves message type IDs 1-7 for protocol control messages. // These messages contain information needed by the RTM Chunk Stream // protocol or RTMP itself. Protocol messages with IDs 1 & 2 are // reserved for usage with RTM Chunk Stream protocol. Protocol messages // with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID // 7 is used between edge server and origin server. MessageTypeSetChunkSize MessageType = 0x01 MessageTypeAbort MessageType = 0x02 // 0x02 MessageTypeAcknowledgement MessageType = 0x03 // 0x03 MessageTypeUserControl MessageType = 0x04 // 0x04 MessageTypeWindowAcknowledgementSize MessageType = 0x05 // 0x05 MessageTypeSetPeerBandwidth MessageType = 0x06 // 0x06 MessageTypeEdgeAndOriginServerCommand MessageType = 0x07 // 0x07 // Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3. Types of messages // The server and the client send messages over the network to // communicate with each other. The messages can be of any type which // includes audio messages, video messages, command messages, shared // object messages, data messages, and user control messages. // // Please read @doc rtmp_specification_1.0.pdf, @page 41, @section 3.4. Audio message // The client or the server sends this message to send audio data to the // peer. The message type value of 8 is reserved for audio messages. MessageTypeAudio MessageType = 0x08 // Please read @doc rtmp_specification_1.0.pdf, @page 41, @section 3.5. Video message // The client or the server sends this message to send video data to the // peer. The message type value of 9 is reserved for video messages. // These messages are large and can delay the sending of other type of // messages. To avoid such a situation, the video message is assigned // the lowest priority. MessageTypeVideo MessageType = 0x09 // 0x09 // Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3.1. Command message // Command messages carry the AMF-encoded commands between the client // and the server. These messages have been assigned message type value // of 20 for AMF0 encoding and message type value of 17 for AMF3 // encoding. These messages are sent to perform some operations like // connect, createStream, publish, play, pause on the peer. Command // messages like onstatus, result etc. are used to inform the sender // about the status of the requested commands. A command message // consists of command name, transaction ID, and command object that // contains related parameters. A client or a server can request Remote // Procedure Calls (RPC) over streams that are communicated using the // command messages to the peer. MessageTypeAMF3Command MessageType = 17 // 0x11 MessageTypeAMF0Command MessageType = 20 // 0x14 // Please read @doc rtmp_specification_1.0.pdf, @page 38, @section 3.2. Data message // The client or the server sends this message to send Metadata or any // user data to the peer. Metadata includes details about the // data(audio, video etc.) like creation time, duration, theme and so // on. These messages have been assigned message type value of 18 for // AMF0 and message type value of 15 for AMF3. MessageTypeAMF0Data MessageType = 18 // 0x12 MessageTypeAMF3Data MessageType = 15 // 0x0f ) // The header of message. type messageHeader struct { // 3bytes. // Three-byte field that contains a timestamp delta of the message. // @remark, only used for decoding message from chunk stream. timestampDelta uint32 // 3bytes. // Three-byte field that represents the size of the payload in bytes. // It is set in big-endian format. payloadLength uint32 // 1byte. // One byte field to represent the message type. A range of type IDs // (1-7) are reserved for protocol control messages. MessageType MessageType // 4bytes. // Four-byte field that identifies the stream of the message. These // bytes are set in little-endian format. streamID uint32 // The chunk stream id over which transport. betterCid chunkID // Four-byte field that contains a timestamp of the message. // The 4 bytes are packed in the big-endian order. // @remark, we use 64bits for large time for jitter detect and for large tbn like HLS. Timestamp uint64 } // The RTMP message, transport over chunk stream in RTMP. // Please read the cs id of @doc rtmp_specification_1.0.pdf, @page 30, @section 4.1. Message Header type Message struct { messageHeader // The payload which carries the RTMP packet. Payload []byte } func NewMessage() *Message { return &Message{} } func NewStreamMessage(streamID int) *Message { v := NewMessage() v.streamID = uint32(streamID) v.betterCid = chunkIDOverStream return v } func (v *Message) generateC3Header() ([]byte, error) { var c3h []byte if v.Timestamp < extendedTimestamp { c3h = make([]byte, 1) } else { c3h = make([]byte, 1+4) } p := c3h p[0] = 0xc0 | byte(v.betterCid&0x3f) p = p[1:] // In RTMP protocol, there must not any timestamp in C3 header, // but actually all products from adobe, such as FMS/AMS and Flash player and FMLE, // always carry a extended timestamp in C3 header. // @see: http://blog.csdn.net/win_lin/article/details/13363699 if v.Timestamp >= extendedTimestamp { p[0] = byte(v.Timestamp >> 24) p[1] = byte(v.Timestamp >> 16) p[2] = byte(v.Timestamp >> 8) p[3] = byte(v.Timestamp) } return c3h, nil } func (v *Message) generateC0Header() ([]byte, error) { var c0h []byte if v.Timestamp < extendedTimestamp { c0h = make([]byte, 1+3+3+1+4) } else { c0h = make([]byte, 1+3+3+1+4+4) } p := c0h p[0] = byte(v.betterCid) & 0x3f p = p[1:] if v.Timestamp < extendedTimestamp { p[0] = byte(v.Timestamp >> 16) p[1] = byte(v.Timestamp >> 8) p[2] = byte(v.Timestamp) } else { p[0] = 0xff p[1] = 0xff p[2] = 0xff } p = p[3:] p[0] = byte(v.payloadLength >> 16) p[1] = byte(v.payloadLength >> 8) p[2] = byte(v.payloadLength) p = p[3:] p[0] = byte(v.MessageType) p = p[1:] p[0] = byte(v.streamID) p[1] = byte(v.streamID >> 8) p[2] = byte(v.streamID >> 16) p[3] = byte(v.streamID >> 24) p = p[4:] if v.Timestamp >= extendedTimestamp { p[0] = byte(v.Timestamp >> 24) p[1] = byte(v.Timestamp >> 16) p[2] = byte(v.Timestamp >> 8) p[3] = byte(v.Timestamp) } return c0h, nil } // Please read the cs id of @doc rtmp_specification_1.0.pdf, @page 17, @section 6.1.1. Chunk Basic Header type chunkID uint32 const ( chunkIDProtocolControl chunkID = 0x02 chunkIDOverConnection chunkID = 0x03 chunkIDOverConnection2 chunkID = 0x04 chunkIDOverStream chunkID = 0x05 chunkIDOverStream2 chunkID = 0x06 chunkIDVideo chunkID = 0x07 chunkIDAudio chunkID = 0x08 ) // The Command Name of message. const ( commandConnect amf0String = amf0String("connect") commandCreateStream amf0String = amf0String("createStream") commandCloseStream amf0String = amf0String("closeStream") commandPlay amf0String = amf0String("play") commandPause amf0String = amf0String("pause") commandOnBWDone amf0String = amf0String("onBWDone") commandOnStatus amf0String = amf0String("onStatus") commandResult amf0String = amf0String("_result") commandError amf0String = amf0String("_error") commandReleaseStream amf0String = amf0String("releaseStream") commandFCPublish amf0String = amf0String("FCPublish") commandFCUnpublish amf0String = amf0String("FCUnpublish") commandPublish amf0String = amf0String("publish") commandRtmpSampleAccess amf0String = amf0String("|RtmpSampleAccess") ) // The RTMP packet, transport as payload of RTMP message. type Packet interface { // Marshaler and unmarshaler Size() int encoding.BinaryUnmarshaler encoding.BinaryMarshaler // RTMP protocol fields for each packet. BetterCid() chunkID Type() MessageType } // A Call packet, both object and args are AMF0 objects. type objectCallPacket struct { CommandName amf0String TransactionID amf0Number CommandObject *amf0Object Args *amf0Object } func (v *objectCallPacket) BetterCid() chunkID { return chunkIDOverConnection } func (v *objectCallPacket) Type() MessageType { return MessageTypeAMF0Command } func (v *objectCallPacket) Size() int { size := v.CommandName.Size() + v.TransactionID.Size() + v.CommandObject.Size() if v.Args != nil { size += v.Args.Size() } return size } func (v *objectCallPacket) UnmarshalBinary(data []byte) (err error) { p := data if err = v.CommandName.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal command name") } p = p[v.CommandName.Size():] if err = v.TransactionID.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal tid") } p = p[v.TransactionID.Size():] if err = v.CommandObject.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal command") } p = p[v.CommandObject.Size():] if len(p) == 0 { return } v.Args = NewAmf0Object() if err = v.Args.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal args") } return } func (v *objectCallPacket) MarshalBinary() (data []byte, err error) { var pb []byte if pb, err = v.CommandName.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal command name") } data = append(data, pb...) if pb, err = v.TransactionID.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal tid") } data = append(data, pb...) if pb, err = v.CommandObject.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal command object") } data = append(data, pb...) if v.Args != nil { if pb, err = v.Args.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal args") } data = append(data, pb...) } return } // Please read @doc rtmp_specification_1.0.pdf, @page 45, @section 4.1.1. connect // The client sends the connect command to the server to request // connection to a server application instance. type ConnectAppPacket struct { objectCallPacket } func NewConnectAppPacket() *ConnectAppPacket { v := &ConnectAppPacket{} v.CommandName = commandConnect v.CommandObject = NewAmf0Object() v.TransactionID = amf0Number(1.0) return v } func (v *ConnectAppPacket) UnmarshalBinary(data []byte) (err error) { if err = v.objectCallPacket.UnmarshalBinary(data); err != nil { return errors.WithMessage(err, "unmarshal call") } if v.CommandName != commandConnect { return errors.Errorf("Invalid command name %v", string(v.CommandName)) } if v.TransactionID != 1.0 { return errors.Errorf("Invalid transaction ID %v", float64(v.TransactionID)) } return } func (v *ConnectAppPacket) TcUrl() string { if v.CommandObject != nil { if v, ok := v.CommandObject.Get("tcUrl").(*amf0String); ok { return string(*v) } } return "" } // The response for ConnectAppPacket. type ConnectAppResPacket struct { objectCallPacket } func NewConnectAppResPacket(tid amf0Number) *ConnectAppResPacket { v := &ConnectAppResPacket{} v.CommandName = commandResult v.CommandObject = NewAmf0Object() v.Args = NewAmf0Object() v.TransactionID = tid return v } func (v *ConnectAppResPacket) SrsID() string { if v.Args != nil { if v, ok := v.Args.Get("data").(*amf0EcmaArray); ok { if v, ok := v.Get("srs_id").(*amf0String); ok { return string(*v) } } } return "" } func (v *ConnectAppResPacket) UnmarshalBinary(data []byte) (err error) { if err = v.objectCallPacket.UnmarshalBinary(data); err != nil { return errors.WithMessage(err, "unmarshal call") } if v.CommandName != commandResult { return errors.Errorf("Invalid command name %v", string(v.CommandName)) } return } // A Call object, command object is variant. type variantCallPacket struct { CommandName amf0String TransactionID amf0Number CommandObject amf0Any // object or null } func (v *variantCallPacket) BetterCid() chunkID { return chunkIDOverConnection } func (v *variantCallPacket) Type() MessageType { return MessageTypeAMF0Command } func (v *variantCallPacket) Size() int { size := v.CommandName.Size() + v.TransactionID.Size() if v.CommandObject != nil { size += v.CommandObject.Size() } return size } func (v *variantCallPacket) UnmarshalBinary(data []byte) (err error) { p := data if err = v.CommandName.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal command name") } p = p[v.CommandName.Size():] if err = v.TransactionID.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal tid") } p = p[v.TransactionID.Size():] if len(p) > 0 { if v.CommandObject, err = Amf0Discovery(p); err != nil { return errors.WithMessage(err, "discovery command object") } if err = v.CommandObject.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal command object") } p = p[v.CommandObject.Size():] } return } func (v *variantCallPacket) MarshalBinary() (data []byte, err error) { var pb []byte if pb, err = v.CommandName.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal command name") } data = append(data, pb...) if pb, err = v.TransactionID.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal tid") } data = append(data, pb...) if v.CommandObject != nil { if pb, err = v.CommandObject.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal command object") } data = append(data, pb...) } return } // Please read @doc rtmp_specification_1.0.pdf, @page 51, @section 4.1.2. Call // The call method of the NetConnection object runs remote procedure // calls (RPC) at the receiving end. The called RPC name is passed as a // parameter to the call command. // @remark onStatus packet is a call packet. type CallPacket struct { variantCallPacket Args amf0Any // optional or object or null } func NewCallPacket() *CallPacket { return &CallPacket{} } func (v *CallPacket) ArgsCode() string { if v.Args != nil { if v, ok := v.Args.(*amf0Object); ok { if code, ok := v.Get("code").(*amf0String); ok { return string(*code) } } } return "" } func (v *CallPacket) Size() int { size := v.variantCallPacket.Size() if v.Args != nil { size += v.Args.Size() } return size } func (v *CallPacket) UnmarshalBinary(data []byte) (err error) { p := data if err = v.variantCallPacket.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal call") } p = p[v.variantCallPacket.Size():] if len(p) > 0 { if v.Args, err = Amf0Discovery(p); err != nil { return errors.WithMessage(err, "discovery args") } if err = v.Args.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal args") } } return } func (v *CallPacket) MarshalBinary() (data []byte, err error) { var pb []byte if pb, err = v.variantCallPacket.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal call") } data = append(data, pb...) if v.Args != nil { if pb, err = v.Args.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal args") } data = append(data, pb...) } return } // Please read @doc rtmp_specification_1.0.pdf, @page 52, @section 4.1.3. createStream // The client sends this command to the server to create a logical // channel for message communication The publishing of audio, video, and // metadata is carried out over stream channel created using the // createStream command. type CreateStreamPacket struct { variantCallPacket } func NewCreateStreamPacket() *CreateStreamPacket { v := &CreateStreamPacket{} v.CommandName = commandCreateStream v.TransactionID = amf0Number(2) v.CommandObject = NewAmf0Null() return v } // The response for create stream type CreateStreamResPacket struct { variantCallPacket StreamID amf0Number } func NewCreateStreamResPacket(tid amf0Number) *CreateStreamResPacket { v := &CreateStreamResPacket{} v.CommandName = commandResult v.TransactionID = tid v.CommandObject = NewAmf0Null() v.StreamID = 0 return v } func (v *CreateStreamResPacket) Size() int { return v.variantCallPacket.Size() + v.StreamID.Size() } func (v *CreateStreamResPacket) UnmarshalBinary(data []byte) (err error) { p := data if err = v.variantCallPacket.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal call") } p = p[v.variantCallPacket.Size():] if err = v.StreamID.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal sid") } return } func (v *CreateStreamResPacket) MarshalBinary() (data []byte, err error) { var pb []byte if pb, err = v.variantCallPacket.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal call") } data = append(data, pb...) if pb, err = v.StreamID.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal sid") } data = append(data, pb...) return } // Please read @doc rtmp_specification_1.0.pdf, @page 64, @section 4.2.6. Publish type PublishPacket struct { variantCallPacket StreamName amf0String StreamType amf0String } func NewPublishPacket() *PublishPacket { v := &PublishPacket{} v.CommandName = commandPublish v.CommandObject = NewAmf0Null() v.StreamType = "live" return v } func (v *PublishPacket) Size() int { return v.variantCallPacket.Size() + v.StreamName.Size() + v.StreamType.Size() } func (v *PublishPacket) UnmarshalBinary(data []byte) (err error) { p := data if err = v.variantCallPacket.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal call") } p = p[v.variantCallPacket.Size():] if err = v.StreamName.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal stream name") } p = p[v.StreamName.Size():] if err = v.StreamType.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal stream type") } return } func (v *PublishPacket) MarshalBinary() (data []byte, err error) { var pb []byte if pb, err = v.variantCallPacket.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal call") } data = append(data, pb...) if pb, err = v.StreamName.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal stream name") } data = append(data, pb...) if pb, err = v.StreamType.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal stream type") } data = append(data, pb...) return } // Please read @doc rtmp_specification_1.0.pdf, @page 54, @section 4.2.1. play type PlayPacket struct { variantCallPacket StreamName amf0String } func NewPlayPacket() *PlayPacket { v := &PlayPacket{} v.CommandName = commandPlay v.CommandObject = NewAmf0Null() return v } func (v *PlayPacket) Size() int { return v.variantCallPacket.Size() + v.StreamName.Size() } func (v *PlayPacket) UnmarshalBinary(data []byte) (err error) { p := data if err = v.variantCallPacket.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal call") } p = p[v.variantCallPacket.Size():] if err = v.StreamName.UnmarshalBinary(p); err != nil { return errors.WithMessage(err, "unmarshal stream name") } p = p[v.StreamName.Size():] return } func (v *PlayPacket) MarshalBinary() (data []byte, err error) { var pb []byte if pb, err = v.variantCallPacket.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal call") } data = append(data, pb...) if pb, err = v.StreamName.MarshalBinary(); err != nil { return nil, errors.WithMessage(err, "marshal stream name") } data = append(data, pb...) return } // Please read @doc rtmp_specification_1.0.pdf, @page 31, @section 5.1. Set Chunk Size // Protocol control message 1, Set Chunk Size, is used to notify the // peer about the new maximum chunk size. type SetChunkSize struct { ChunkSize uint32 } func NewSetChunkSize() *SetChunkSize { return &SetChunkSize{ ChunkSize: defaultChunkSize, } } func (v *SetChunkSize) BetterCid() chunkID { return chunkIDProtocolControl } func (v *SetChunkSize) Type() MessageType { return MessageTypeSetChunkSize } func (v *SetChunkSize) Size() int { return 4 } func (v *SetChunkSize) UnmarshalBinary(data []byte) (err error) { if len(data) < 4 { return errors.Errorf("requires 4 only %v bytes, %x", len(data), data) } v.ChunkSize = binary.BigEndian.Uint32(data) return } func (v *SetChunkSize) MarshalBinary() (data []byte, err error) { data = make([]byte, 4) binary.BigEndian.PutUint32(data, v.ChunkSize) return } // Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.5. Window Acknowledgement Size (5) // The client or the server sends this message to inform the peer which // window size to use when sending acknowledgment. type WindowAcknowledgementSize struct { AckSize uint32 } func NewWindowAcknowledgementSize() *WindowAcknowledgementSize { return &WindowAcknowledgementSize{} } func (v *WindowAcknowledgementSize) BetterCid() chunkID { return chunkIDProtocolControl } func (v *WindowAcknowledgementSize) Type() MessageType { return MessageTypeWindowAcknowledgementSize } func (v *WindowAcknowledgementSize) Size() int { return 4 } func (v *WindowAcknowledgementSize) UnmarshalBinary(data []byte) (err error) { if len(data) < 4 { return errors.Errorf("requires 4 only %v bytes, %x", len(data), data) } v.AckSize = binary.BigEndian.Uint32(data) return } func (v *WindowAcknowledgementSize) MarshalBinary() (data []byte, err error) { data = make([]byte, 4) binary.BigEndian.PutUint32(data, v.AckSize) return } // Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.6. Set Peer Bandwidth (6) // The sender can mark this message hard (0), soft (1), or dynamic (2) // using the Limit type field. type LimitType uint8 const ( LimitTypeHard LimitType = iota LimitTypeSoft LimitTypeDynamic ) // Please read @doc rtmp_specification_1.0.pdf, @page 33, @section 5.6. Set Peer Bandwidth (6) // The client or the server sends this message to update the output // bandwidth of the peer. type SetPeerBandwidth struct { Bandwidth uint32 LimitType LimitType } func NewSetPeerBandwidth() *SetPeerBandwidth { return &SetPeerBandwidth{} } func (v *SetPeerBandwidth) BetterCid() chunkID { return chunkIDProtocolControl } func (v *SetPeerBandwidth) Type() MessageType { return MessageTypeSetPeerBandwidth } func (v *SetPeerBandwidth) Size() int { return 4 + 1 } func (v *SetPeerBandwidth) UnmarshalBinary(data []byte) (err error) { if len(data) < 5 { return errors.Errorf("requires 5 only %v bytes, %x", len(data), data) } v.Bandwidth = binary.BigEndian.Uint32(data) v.LimitType = LimitType(data[4]) return } func (v *SetPeerBandwidth) MarshalBinary() (data []byte, err error) { data = make([]byte, 5) binary.BigEndian.PutUint32(data, v.Bandwidth) data[4] = byte(v.LimitType) return } type EventType uint16 const ( // Generally, 4bytes event-data // The server sends this event to notify the client // that a stream has become functional and can be // used for communication. By default, this event // is sent on ID 0 after the application connect // command is successfully received from the // client. The event data is 4-byte and represents // The stream ID of the stream that became // Functional. EventTypeStreamBegin = 0x00 // The server sends this event to notify the client // that the playback of data is over as requested // on this stream. No more data is sent without // issuing additional commands. The client discards // The messages received for the stream. The // 4 bytes of event data represent the ID of the // stream on which playback has ended. EventTypeStreamEOF = 0x01 // The server sends this event to notify the client // that there is no more data on the stream. If the // server does not detect any message for a time // period, it can notify the subscribed clients // that the stream is dry. The 4 bytes of event // data represent the stream ID of the dry stream. EventTypeStreamDry = 0x02 // The client sends this event to inform the server // of the buffer size (in milliseconds) that is // used to buffer any data coming over a stream. // This event is sent before the server starts // processing the stream. The first 4 bytes of the // event data represent the stream ID and the next // 4 bytes represent the buffer length, in // milliseconds. EventTypeSetBufferLength = 0x03 // 8bytes event-data // The server sends this event to notify the client // that the stream is a recorded stream. The // 4 bytes event data represent the stream ID of // The recorded stream. EventTypeStreamIsRecorded = 0x04 // The server sends this event to test whether the // client is reachable. Event data is a 4-byte // timestamp, representing the local server time // When the server dispatched the command. The // client responds with kMsgPingResponse on // receiving kMsgPingRequest. EventTypePingRequest = 0x06 // The client sends this event to the server in // Response to the ping request. The event data is // a 4-byte timestamp, which was received with the // kMsgPingRequest request. EventTypePingResponse = 0x07 // For PCUC size=3, for example the payload is "00 1A 01", // it's a FMS control event, where the event type is 0x001a and event data is 0x01, // please notice that the event data is only 1 byte for this event. EventTypeFmsEvent0 = 0x1a ) // Please read @doc rtmp_specification_1.0.pdf, @page 32, @5.4. User Control Message (4) // The client or the server sends this message to notify the peer about the user control events. // This message carries Event type and Event data. type UserControl struct { // Event type is followed by Event data. // @see: SrcPCUCEventType EventType EventType // The event data generally in 4bytes. // @remark for event type is 0x001a, only 1bytes. // @see SrsPCUCFmsEvent0 EventData int32 // 4bytes if event_type is SetBufferLength; otherwise 0. ExtraData int32 } func NewUserControl() *UserControl { return &UserControl{} } func (v *UserControl) BetterCid() chunkID { return chunkIDProtocolControl } func (v *UserControl) Type() MessageType { return MessageTypeUserControl } func (v *UserControl) Size() int { size := 2 if v.EventType == EventTypeFmsEvent0 { size += 1 } else { size += 4 } if v.EventType == EventTypeSetBufferLength { size += 4 } return size } func (v *UserControl) UnmarshalBinary(data []byte) (err error) { if len(data) < 3 { return errors.Errorf("requires 5 only %v bytes, %x", len(data), data) } v.EventType = EventType(binary.BigEndian.Uint16(data)) if len(data) < v.Size() { return errors.Errorf("requires %v only %v bytes, %x", v.Size(), len(data), data) } if v.EventType == EventTypeFmsEvent0 { v.EventData = int32(uint8(data[2])) } else { v.EventData = int32(binary.BigEndian.Uint32(data[2:])) } if v.EventType == EventTypeSetBufferLength { v.ExtraData = int32(binary.BigEndian.Uint32(data[6:])) } return } func (v *UserControl) MarshalBinary() (data []byte, err error) { data = make([]byte, v.Size()) binary.BigEndian.PutUint16(data, uint16(v.EventType)) if v.EventType == EventTypeFmsEvent0 { data[2] = uint8(v.EventData) } else { binary.BigEndian.PutUint32(data[2:], uint32(v.EventData)) } if v.EventType == EventTypeSetBufferLength { binary.BigEndian.PutUint32(data[6:], uint32(v.ExtraData)) } return }