| package eventstream |
| |
| import ( |
| "bytes" |
| "encoding/binary" |
| "encoding/hex" |
| "encoding/json" |
| "fmt" |
| "hash" |
| "hash/crc32" |
| "io" |
| |
| "github.com/aws/aws-sdk-go/aws" |
| ) |
| |
| // Decoder provides decoding of an Event Stream messages. |
| type Decoder struct { |
| r io.Reader |
| logger aws.Logger |
| } |
| |
| // NewDecoder initializes and returns a Decoder for decoding event |
| // stream messages from the reader provided. |
| func NewDecoder(r io.Reader) *Decoder { |
| return &Decoder{ |
| r: r, |
| } |
| } |
| |
| // Decode attempts to decode a single message from the event stream reader. |
| // Will return the event stream message, or error if Decode fails to read |
| // the message from the stream. |
| func (d *Decoder) Decode(payloadBuf []byte) (m Message, err error) { |
| reader := d.r |
| if d.logger != nil { |
| debugMsgBuf := bytes.NewBuffer(nil) |
| reader = io.TeeReader(reader, debugMsgBuf) |
| defer func() { |
| logMessageDecode(d.logger, debugMsgBuf, m, err) |
| }() |
| } |
| |
| crc := crc32.New(crc32IEEETable) |
| hashReader := io.TeeReader(reader, crc) |
| |
| prelude, err := decodePrelude(hashReader, crc) |
| if err != nil { |
| return Message{}, err |
| } |
| |
| if prelude.HeadersLen > 0 { |
| lr := io.LimitReader(hashReader, int64(prelude.HeadersLen)) |
| m.Headers, err = decodeHeaders(lr) |
| if err != nil { |
| return Message{}, err |
| } |
| } |
| |
| if payloadLen := prelude.PayloadLen(); payloadLen > 0 { |
| buf, err := decodePayload(payloadBuf, io.LimitReader(hashReader, int64(payloadLen))) |
| if err != nil { |
| return Message{}, err |
| } |
| m.Payload = buf |
| } |
| |
| msgCRC := crc.Sum32() |
| if err := validateCRC(reader, msgCRC); err != nil { |
| return Message{}, err |
| } |
| |
| return m, nil |
| } |
| |
| // UseLogger specifies the Logger that that the decoder should use to log the |
| // message decode to. |
| func (d *Decoder) UseLogger(logger aws.Logger) { |
| d.logger = logger |
| } |
| |
| func logMessageDecode(logger aws.Logger, msgBuf *bytes.Buffer, msg Message, decodeErr error) { |
| w := bytes.NewBuffer(nil) |
| defer func() { logger.Log(w.String()) }() |
| |
| fmt.Fprintf(w, "Raw message:\n%s\n", |
| hex.Dump(msgBuf.Bytes())) |
| |
| if decodeErr != nil { |
| fmt.Fprintf(w, "Decode error: %v\n", decodeErr) |
| return |
| } |
| |
| rawMsg, err := msg.rawMessage() |
| if err != nil { |
| fmt.Fprintf(w, "failed to create raw message, %v\n", err) |
| return |
| } |
| |
| decodedMsg := decodedMessage{ |
| rawMessage: rawMsg, |
| Headers: decodedHeaders(msg.Headers), |
| } |
| |
| fmt.Fprintf(w, "Decoded message:\n") |
| encoder := json.NewEncoder(w) |
| if err := encoder.Encode(decodedMsg); err != nil { |
| fmt.Fprintf(w, "failed to generate decoded message, %v\n", err) |
| } |
| } |
| |
| func decodePrelude(r io.Reader, crc hash.Hash32) (messagePrelude, error) { |
| var p messagePrelude |
| |
| var err error |
| p.Length, err = decodeUint32(r) |
| if err != nil { |
| return messagePrelude{}, err |
| } |
| |
| p.HeadersLen, err = decodeUint32(r) |
| if err != nil { |
| return messagePrelude{}, err |
| } |
| |
| if err := p.ValidateLens(); err != nil { |
| return messagePrelude{}, err |
| } |
| |
| preludeCRC := crc.Sum32() |
| if err := validateCRC(r, preludeCRC); err != nil { |
| return messagePrelude{}, err |
| } |
| |
| p.PreludeCRC = preludeCRC |
| |
| return p, nil |
| } |
| |
| func decodePayload(buf []byte, r io.Reader) ([]byte, error) { |
| w := bytes.NewBuffer(buf[0:0]) |
| |
| _, err := io.Copy(w, r) |
| return w.Bytes(), err |
| } |
| |
| func decodeUint8(r io.Reader) (uint8, error) { |
| type byteReader interface { |
| ReadByte() (byte, error) |
| } |
| |
| if br, ok := r.(byteReader); ok { |
| v, err := br.ReadByte() |
| return uint8(v), err |
| } |
| |
| var b [1]byte |
| _, err := io.ReadFull(r, b[:]) |
| return uint8(b[0]), err |
| } |
| func decodeUint16(r io.Reader) (uint16, error) { |
| var b [2]byte |
| bs := b[:] |
| _, err := io.ReadFull(r, bs) |
| if err != nil { |
| return 0, err |
| } |
| return binary.BigEndian.Uint16(bs), nil |
| } |
| func decodeUint32(r io.Reader) (uint32, error) { |
| var b [4]byte |
| bs := b[:] |
| _, err := io.ReadFull(r, bs) |
| if err != nil { |
| return 0, err |
| } |
| return binary.BigEndian.Uint32(bs), nil |
| } |
| func decodeUint64(r io.Reader) (uint64, error) { |
| var b [8]byte |
| bs := b[:] |
| _, err := io.ReadFull(r, bs) |
| if err != nil { |
| return 0, err |
| } |
| return binary.BigEndian.Uint64(bs), nil |
| } |
| |
| func validateCRC(r io.Reader, expect uint32) error { |
| msgCRC, err := decodeUint32(r) |
| if err != nil { |
| return err |
| } |
| |
| if msgCRC != expect { |
| return ChecksumError{} |
| } |
| |
| return nil |
| } |