| package eventstream |
| |
| import ( |
| "bytes" |
| "encoding/base64" |
| "encoding/json" |
| "fmt" |
| "strconv" |
| ) |
| |
| type decodedMessage struct { |
| rawMessage |
| Headers decodedHeaders `json:"headers"` |
| } |
| type jsonMessage struct { |
| Length json.Number `json:"total_length"` |
| HeadersLen json.Number `json:"headers_length"` |
| PreludeCRC json.Number `json:"prelude_crc"` |
| Headers decodedHeaders `json:"headers"` |
| Payload []byte `json:"payload"` |
| CRC json.Number `json:"message_crc"` |
| } |
| |
| func (d *decodedMessage) UnmarshalJSON(b []byte) (err error) { |
| var jsonMsg jsonMessage |
| if err = json.Unmarshal(b, &jsonMsg); err != nil { |
| return err |
| } |
| |
| d.Length, err = numAsUint32(jsonMsg.Length) |
| if err != nil { |
| return err |
| } |
| d.HeadersLen, err = numAsUint32(jsonMsg.HeadersLen) |
| if err != nil { |
| return err |
| } |
| d.PreludeCRC, err = numAsUint32(jsonMsg.PreludeCRC) |
| if err != nil { |
| return err |
| } |
| d.Headers = jsonMsg.Headers |
| d.Payload = jsonMsg.Payload |
| d.CRC, err = numAsUint32(jsonMsg.CRC) |
| if err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (d *decodedMessage) MarshalJSON() ([]byte, error) { |
| jsonMsg := jsonMessage{ |
| Length: json.Number(strconv.Itoa(int(d.Length))), |
| HeadersLen: json.Number(strconv.Itoa(int(d.HeadersLen))), |
| PreludeCRC: json.Number(strconv.Itoa(int(d.PreludeCRC))), |
| Headers: d.Headers, |
| Payload: d.Payload, |
| CRC: json.Number(strconv.Itoa(int(d.CRC))), |
| } |
| |
| return json.Marshal(jsonMsg) |
| } |
| |
| func numAsUint32(n json.Number) (uint32, error) { |
| v, err := n.Int64() |
| if err != nil { |
| return 0, fmt.Errorf("failed to get int64 json number, %v", err) |
| } |
| |
| return uint32(v), nil |
| } |
| |
| func (d decodedMessage) Message() Message { |
| return Message{ |
| Headers: Headers(d.Headers), |
| Payload: d.Payload, |
| } |
| } |
| |
| type decodedHeaders Headers |
| |
| func (hs *decodedHeaders) UnmarshalJSON(b []byte) error { |
| var jsonHeaders []struct { |
| Name string `json:"name"` |
| Type valueType `json:"type"` |
| Value interface{} `json:"value"` |
| } |
| |
| decoder := json.NewDecoder(bytes.NewReader(b)) |
| decoder.UseNumber() |
| if err := decoder.Decode(&jsonHeaders); err != nil { |
| return err |
| } |
| |
| var headers Headers |
| for _, h := range jsonHeaders { |
| value, err := valueFromType(h.Type, h.Value) |
| if err != nil { |
| return err |
| } |
| headers.Set(h.Name, value) |
| } |
| (*hs) = decodedHeaders(headers) |
| |
| return nil |
| } |
| |
| func valueFromType(typ valueType, val interface{}) (Value, error) { |
| switch typ { |
| case trueValueType: |
| return BoolValue(true), nil |
| case falseValueType: |
| return BoolValue(false), nil |
| case int8ValueType: |
| v, err := val.(json.Number).Int64() |
| return Int8Value(int8(v)), err |
| case int16ValueType: |
| v, err := val.(json.Number).Int64() |
| return Int16Value(int16(v)), err |
| case int32ValueType: |
| v, err := val.(json.Number).Int64() |
| return Int32Value(int32(v)), err |
| case int64ValueType: |
| v, err := val.(json.Number).Int64() |
| return Int64Value(v), err |
| case bytesValueType: |
| v, err := base64.StdEncoding.DecodeString(val.(string)) |
| return BytesValue(v), err |
| case stringValueType: |
| v, err := base64.StdEncoding.DecodeString(val.(string)) |
| return StringValue(string(v)), err |
| case timestampValueType: |
| v, err := val.(json.Number).Int64() |
| return TimestampValue(timeFromEpochMilli(v)), err |
| case uuidValueType: |
| v, err := base64.StdEncoding.DecodeString(val.(string)) |
| var tv UUIDValue |
| copy(tv[:], v) |
| return tv, err |
| default: |
| panic(fmt.Sprintf("unknown type, %s, %T", typ.String(), val)) |
| } |
| } |