blob: d8469937ea3fe851bde9e8ab046867a970ba2113 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package amqp
// #include <proton/codec.h>
import "C"
import (
"fmt"
"io"
"reflect"
"time"
"unsafe"
)
// Error returned if Go data cannot be marshaled as an AMQP type.
type MarshalError struct {
// The Go type.
GoType reflect.Type
s string
}
func (e MarshalError) Error() string { return e.s }
func newMarshalError(v interface{}, s string) *MarshalError {
t := reflect.TypeOf(v)
return &MarshalError{GoType: t, s: fmt.Sprintf("cannot marshal %s: %s", t, s)}
}
func dataMarshalError(v interface{}, data *C.pn_data_t) error {
if pe := PnError(C.pn_data_error(data)); pe != nil {
return newMarshalError(v, pe.Error())
}
return nil
}
/*
Marshal encodes a Go value as AMQP data in buffer.
If buffer is nil, or is not large enough, a new buffer is created.
Returns the buffer used for encoding with len() adjusted to the actual size of data.
Go types are encoded as follows
+-------------------------------------+--------------------------------------------+
|Go type |AMQP type |
+-------------------------------------+--------------------------------------------+
|bool |bool |
+-------------------------------------+--------------------------------------------+
|int8, int16, int32, int64 (int) |byte, short, int, long (int or long) |
+-------------------------------------+--------------------------------------------+
|uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) |
+-------------------------------------+--------------------------------------------+
|float32, float64 |float, double. |
+-------------------------------------+--------------------------------------------+
|string |string |
+-------------------------------------+--------------------------------------------+
|[]byte, Binary |binary |
+-------------------------------------+--------------------------------------------+
|Symbol |symbol |
+-------------------------------------+--------------------------------------------+
|Char |char |
+-------------------------------------+--------------------------------------------+
|interface{} |the contained type |
+-------------------------------------+--------------------------------------------+
|nil |null |
+-------------------------------------+--------------------------------------------+
|map[K]T |map with K and T converted as above |
+-------------------------------------+--------------------------------------------+
|Map |map, may have mixed types for keys, values |
+-------------------------------------+--------------------------------------------+
|AnyMap |map (See AnyMap) |
+-------------------------------------+--------------------------------------------+
|List, []interface{} |list, may have mixed-type values |
+-------------------------------------+--------------------------------------------+
|[]T, [N]T |array, T is mapped as per this table |
+-------------------------------------+--------------------------------------------+
|Described |described type |
+-------------------------------------+--------------------------------------------+
|time.Time |timestamp |
+-------------------------------------+--------------------------------------------+
|UUID |uuid |
+-------------------------------------+--------------------------------------------+
The following Go types cannot be marshaled: uintptr, function, channel, struct, complex64/128
AMQP types not yet supported: decimal32/64/128
*/
func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
data := C.pn_data(0)
defer C.pn_data_free(data)
if err = recoverMarshal(v, data); err != nil {
return buffer, err
}
encode := func(buf []byte) ([]byte, error) {
n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
switch {
case n == int(C.PN_OVERFLOW):
return buf, overflow
case n < 0:
return buf, dataMarshalError(v, data)
default:
return buf[:n], nil
}
}
return encodeGrow(buffer, encode)
}
// Internal use only
func MarshalUnsafe(v interface{}, pnData unsafe.Pointer) (err error) {
return recoverMarshal(v, (*C.pn_data_t)(pnData))
}
func recoverMarshal(v interface{}, data *C.pn_data_t) (err error) {
defer func() { // Convert panic to error return
if r := recover(); r != nil {
if err2, ok := r.(*MarshalError); ok {
err = err2 // Convert internal panic to error
} else {
panic(r) // Unrecognized error, continue to panic
}
}
}()
marshal(v, data) // Panics on error
return
}
const minEncode = 256
// overflow is returned when an encoding function can't fit data in the buffer.
var overflow = fmt.Errorf("buffer too small")
// encodeFn encodes into buffer[0:len(buffer)].
// Returns buffer with length adjusted for data encoded.
// If buffer too small, returns overflow as error.
type encodeFn func(buffer []byte) ([]byte, error)
// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
// Returns the final buffer.
func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
if buffer == nil || len(buffer) == 0 {
buffer = make([]byte, minEncode)
}
var err error
for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
buffer = make([]byte, 2*len(buffer))
}
return buffer, err
}
// Marshal v to data
func marshal(i interface{}, data *C.pn_data_t) {
switch v := i.(type) {
case nil:
C.pn_data_put_null(data)
case bool:
C.pn_data_put_bool(data, C.bool(v))
// Signed integers
case int8:
C.pn_data_put_byte(data, C.int8_t(v))
case int16:
C.pn_data_put_short(data, C.int16_t(v))
case int32:
C.pn_data_put_int(data, C.int32_t(v))
case int64:
C.pn_data_put_long(data, C.int64_t(v))
case int:
if intIs64 {
C.pn_data_put_long(data, C.int64_t(v))
} else {
C.pn_data_put_int(data, C.int32_t(v))
}
// Unsigned integers
case uint8:
C.pn_data_put_ubyte(data, C.uint8_t(v))
case uint16:
C.pn_data_put_ushort(data, C.uint16_t(v))
case uint32:
C.pn_data_put_uint(data, C.uint32_t(v))
case uint64:
C.pn_data_put_ulong(data, C.uint64_t(v))
case uint:
if intIs64 {
C.pn_data_put_ulong(data, C.uint64_t(v))
} else {
C.pn_data_put_uint(data, C.uint32_t(v))
}
// Floating point
case float32:
C.pn_data_put_float(data, C.float(v))
case float64:
C.pn_data_put_double(data, C.double(v))
// String-like (string, binary, symbol)
case string:
C.pn_data_put_string(data, pnBytes([]byte(v)))
case []byte:
C.pn_data_put_binary(data, pnBytes(v))
case Binary:
C.pn_data_put_binary(data, pnBytes([]byte(v)))
case Symbol:
C.pn_data_put_symbol(data, pnBytes([]byte(v)))
// Other simple types
case time.Time:
C.pn_data_put_timestamp(data, pnTime(v))
case UUID:
C.pn_data_put_uuid(data, *(*C.pn_uuid_t)(unsafe.Pointer(&v[0])))
case Char:
C.pn_data_put_char(data, (C.pn_char_t)(v))
// Described types
case Described:
C.pn_data_put_described(data)
C.pn_data_enter(data)
marshal(v.Descriptor, data)
marshal(v.Value, data)
C.pn_data_exit(data)
// Restricted type annotation-key, marshals as contained value
case AnnotationKey:
marshal(v.Get(), data)
// Special type to represent AMQP maps with keys that are illegal in Go
case AnyMap:
C.pn_data_put_map(data)
C.pn_data_enter(data)
defer C.pn_data_exit(data)
for _, kv := range v {
marshal(kv.Key, data)
marshal(kv.Value, data)
}
default:
// Examine complex types (Go map, slice, array) by reflected structure
switch reflect.TypeOf(i).Kind() {
case reflect.Map:
m := reflect.ValueOf(v)
C.pn_data_put_map(data)
if C.pn_data_enter(data) {
defer C.pn_data_exit(data)
} else {
panic(dataMarshalError(i, data))
}
for _, key := range m.MapKeys() {
marshal(key.Interface(), data)
marshal(m.MapIndex(key).Interface(), data)
}
case reflect.Slice, reflect.Array:
// Note: Go array and slice are mapped the same way:
// if element type is an interface, map to AMQP list (mixed type)
// if element type is a non-interface type map to AMQP array (single type)
s := reflect.ValueOf(v)
if pnType, ok := arrayTypeMap[s.Type().Elem()]; ok {
C.pn_data_put_array(data, false, pnType)
} else {
C.pn_data_put_list(data)
}
C.pn_data_enter(data)
defer C.pn_data_exit(data)
for j := 0; j < s.Len(); j++ {
marshal(s.Index(j).Interface(), data)
}
default:
panic(newMarshalError(v, "no conversion"))
}
}
if err := dataMarshalError(i, data); err != nil {
panic(err)
}
}
// Mapping froo Go element type to AMQP array type for types that can go in an AMQP array
// NOTE: this must be kept consistent with marshal() which does the actual marshalling.
var arrayTypeMap = map[reflect.Type]C.pn_type_t{
nil: C.PN_NULL,
reflect.TypeOf(true): C.PN_BOOL,
reflect.TypeOf(int8(0)): C.PN_BYTE,
reflect.TypeOf(int16(0)): C.PN_INT,
reflect.TypeOf(int32(0)): C.PN_SHORT,
reflect.TypeOf(int64(0)): C.PN_LONG,
reflect.TypeOf(uint8(0)): C.PN_UBYTE,
reflect.TypeOf(uint16(0)): C.PN_UINT,
reflect.TypeOf(uint32(0)): C.PN_USHORT,
reflect.TypeOf(uint64(0)): C.PN_ULONG,
reflect.TypeOf(float32(0)): C.PN_FLOAT,
reflect.TypeOf(float64(0)): C.PN_DOUBLE,
reflect.TypeOf(""): C.PN_STRING,
reflect.TypeOf((*Symbol)(nil)).Elem(): C.PN_SYMBOL,
reflect.TypeOf((*Binary)(nil)).Elem(): C.PN_BINARY,
reflect.TypeOf([]byte{}): C.PN_BINARY,
reflect.TypeOf((*time.Time)(nil)).Elem(): C.PN_TIMESTAMP,
reflect.TypeOf((*UUID)(nil)).Elem(): C.PN_UUID,
reflect.TypeOf((*Char)(nil)).Elem(): C.PN_CHAR,
}
// Compute mapping of int/uint at runtime as they depend on execution environment.
func init() {
if intIs64 {
arrayTypeMap[reflect.TypeOf(int(0))] = C.PN_LONG
arrayTypeMap[reflect.TypeOf(uint(0))] = C.PN_ULONG
} else {
arrayTypeMap[reflect.TypeOf(int(0))] = C.PN_INT
arrayTypeMap[reflect.TypeOf(uint(0))] = C.PN_UINT
}
}
func clearMarshal(v interface{}, data *C.pn_data_t) {
C.pn_data_clear(data)
marshal(v, data)
}
// Encoder encodes AMQP values to an io.Writer
type Encoder struct {
writer io.Writer
buffer []byte
}
// New encoder returns a new encoder that writes to w.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w, make([]byte, minEncode)}
}
func (e *Encoder) Encode(v interface{}) (err error) {
e.buffer, err = Marshal(v, e.buffer)
if err == nil {
_, err = e.writer.Write(e.buffer)
}
return err
}