blob: ac19d74f81fcd25baa1630e0193fc4498cc7516b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package fury
import (
"fmt"
"hash/fnv"
"reflect"
"regexp"
"strconv"
"strings"
"time"
)
type TypeId = int16
const (
// NA A NullFlag type having no physical storage
NA TypeId = iota // NA = 0
// BOOL Boolean as 1 bit LSB bit-packed ordering
BOOL
// UINT8 Unsigned 8-bit little-endian integer
UINT8
// INT8 Signed 8-bit little-endian integer
INT8
// UINT16 Unsigned 16-bit little-endian integer
UINT16
// INT16 Signed 16-bit little-endian integer
INT16
// UINT32 Unsigned 32-bit little-endian integer
UINT32
// INT32 Signed 32-bit little-endian integer
INT32
// UINT64 Unsigned 64-bit little-endian integer
UINT64
// INT64 Signed 64-bit little-endian integer
INT64
// HALF_FLOAT 2-byte floating point value
HALF_FLOAT
// FLOAT 4-byte floating point value
FLOAT
// DOUBLE 8-byte floating point value
DOUBLE
// STRING UTF8 variable-length string as List<Char>
STRING
// BINARY Variable-length bytes (no guarantee of UTF8-ness)
BINARY
// FIXED_SIZE_BINARY Fixed-size binary. Each value occupies the same number of bytes
FIXED_SIZE_BINARY
// DATE32 int32_t days since the UNIX epoch
DATE32
// DATE64 int64_t milliseconds since the UNIX epoch
DATE64
// TIMESTAMP Exact timestamp encoded with int64 since UNIX epoch
// Default unit millisecond
TIMESTAMP
// TIME32 Time as signed 32-bit integer representing either seconds or
// milliseconds since midnight
TIME32
// TIME64 Time as signed 64-bit integer representing either microseconds or
// nanoseconds since midnight
TIME64
// INTERVAL_MONTHS YEAR_MONTH interval in SQL style
INTERVAL_MONTHS
// INTERVAL_DAY_TIME DAY_TIME interval in SQL style
INTERVAL_DAY_TIME
// DECIMAL128 Precision- and scale-based decimal type with 128 bits.
DECIMAL128
// DECIMAL256 Precision- and scale-based decimal type with 256 bits.
DECIMAL256
// LIST A list of some logical data type
LIST
// STRUCT Struct of logical types
STRUCT
// SPARSE_UNION Sparse unions of logical types
SPARSE_UNION
// DENSE_UNION Dense unions of logical types
DENSE_UNION
// DICTIONARY Dictionary-encoded type also called "categorical" or "factor"
// in other programming languages. Holds the dictionary value
// type but not the dictionary itself which is part of the
// ArrayData struct
DICTIONARY
// MAP Map a repeated struct logical type
MAP
// EXTENSION Custom data type implemented by user
EXTENSION
// FIXED_SIZE_LIST Fixed size list of some logical type
FIXED_SIZE_LIST
// DURATION Measure of elapsed time in either seconds milliseconds microseconds
// or nanoseconds.
DURATION
// LARGE_STRING Like STRING but with 64-bit offsets
LARGE_STRING
// LARGE_BINARY Like BINARY but with 64-bit offsets
LARGE_BINARY
// LARGE_LIST Like LIST but with 64-bit offsets
LARGE_LIST
// MAX_ID Leave this at the end
MAX_ID
DECIMAL = DECIMAL128
// Fury added type for cross-language serialization.
// FURY_TYPE_TAG for type idendified by the tag
FURY_TYPE_TAG = 256
FURY_SET = 257
FURY_PRIMITIVE_BOOL_ARRAY = 258
FURY_PRIMITIVE_SHORT_ARRAY = 259
FURY_PRIMITIVE_INT_ARRAY = 260
FURY_PRIMITIVE_LONG_ARRAY = 261
FURY_PRIMITIVE_FLOAT_ARRAY = 262
FURY_PRIMITIVE_DOUBLE_ARRAY = 263
FURY_STRING_ARRAY = 264
FURY_SERIALIZED_OBJECT = 265
FURY_BUFFER = 266
FURY_ARROW_RECORD_BATCH = 267
FURY_ARROW_TABLE = 268
)
const (
NotSupportCrossLanguage = 0
useStringValue = 0
useStringId = 1
)
var (
interfaceType = reflect.TypeOf((*interface{})(nil)).Elem()
stringType = reflect.TypeOf((*string)(nil)).Elem()
// Make compilation support tinygo
stringPtrType = reflect.TypeOf((*string)(nil))
//stringPtrType = reflect.TypeOf((**string)(nil)).Elem()
stringSliceType = reflect.TypeOf((*[]string)(nil)).Elem()
byteSliceType = reflect.TypeOf((*[]byte)(nil)).Elem()
boolSliceType = reflect.TypeOf((*[]bool)(nil)).Elem()
int16SliceType = reflect.TypeOf((*[]int16)(nil)).Elem()
int32SliceType = reflect.TypeOf((*[]int32)(nil)).Elem()
int64SliceType = reflect.TypeOf((*[]int64)(nil)).Elem()
float32SliceType = reflect.TypeOf((*[]float32)(nil)).Elem()
float64SliceType = reflect.TypeOf((*[]float64)(nil)).Elem()
interfaceSliceType = reflect.TypeOf((*[]interface{})(nil)).Elem()
interfaceMapType = reflect.TypeOf((*map[interface{}]interface{})(nil)).Elem()
boolType = reflect.TypeOf((*bool)(nil)).Elem()
byteType = reflect.TypeOf((*byte)(nil)).Elem()
int8Type = reflect.TypeOf((*int8)(nil)).Elem()
int16Type = reflect.TypeOf((*int16)(nil)).Elem()
int32Type = reflect.TypeOf((*int32)(nil)).Elem()
int64Type = reflect.TypeOf((*int64)(nil)).Elem()
intType = reflect.TypeOf((*int)(nil)).Elem()
float32Type = reflect.TypeOf((*float32)(nil)).Elem()
float64Type = reflect.TypeOf((*float64)(nil)).Elem()
dateType = reflect.TypeOf((*Date)(nil)).Elem()
timestampType = reflect.TypeOf((*time.Time)(nil)).Elem()
genericSetType = reflect.TypeOf((*GenericSet)(nil)).Elem()
)
type typeResolver struct {
typeTagToSerializers map[string]Serializer
typeToSerializers map[reflect.Type]Serializer
typeToTypeInfo map[reflect.Type]string
typeToTypeTag map[reflect.Type]string
typeInfoToType map[string]reflect.Type
typeIdToType map[int16]reflect.Type
dynamicStringToId map[string]int16
dynamicIdToString map[int16]string
dynamicStringId int16
}
func newTypeResolver() *typeResolver {
r := &typeResolver{
typeTagToSerializers: map[string]Serializer{},
typeToSerializers: map[reflect.Type]Serializer{},
typeIdToType: map[int16]reflect.Type{},
typeToTypeInfo: map[reflect.Type]string{},
typeInfoToType: map[string]reflect.Type{},
dynamicStringToId: map[string]int16{},
dynamicIdToString: map[int16]string{},
}
// base type info for encode/decode types.
// composite types info will be constructed dynamically.
for _, t := range []reflect.Type{
boolType,
byteType,
int8Type,
int16Type,
int32Type,
intType,
int64Type,
float32Type,
float64Type,
stringType,
dateType,
timestampType,
interfaceType,
genericSetType, // FIXME set should be a generic type
} {
r.typeInfoToType[t.String()] = t
r.typeToTypeInfo[t] = t.String()
}
r.initialize()
return r
}
func (r *typeResolver) initialize() {
serializers := []struct {
reflect.Type
Serializer
}{{stringType, stringSerializer{}},
{stringPtrType, ptrToStringSerializer{}},
{stringSliceType, stringSliceSerializer{}},
{byteSliceType, byteSliceSerializer{}},
{boolSliceType, boolSliceSerializer{}},
{int16SliceType, int16SliceSerializer{}},
{int32SliceType, int32SliceSerializer{}},
{int64SliceType, int64SliceSerializer{}},
{float32SliceType, float32SliceSerializer{}},
{float64SliceType, float64SliceSerializer{}},
{interfaceSliceType, sliceSerializer{}},
{interfaceMapType, mapSerializer{}},
{boolType, boolSerializer{}},
{byteType, byteSerializer{}},
{int8Type, int8Serializer{}},
{int16Type, int16Serializer{}},
{int32Type, int32Serializer{}},
{int64Type, int64Serializer{}},
{intType, intSerializer{}},
{float32Type, float32Serializer{}},
{float64Type, float64Serializer{}},
{dateType, dateSerializer{}},
{timestampType, timeSerializer{}},
{genericSetType, setSerializer{}},
}
for _, elem := range serializers {
if err := r.RegisterSerializer(elem.Type, elem.Serializer); err != nil {
panic(fmt.Errorf("impossible error: %s", err))
}
}
}
func (r *typeResolver) RegisterSerializer(type_ reflect.Type, s Serializer) error {
if prev, ok := r.typeToSerializers[type_]; ok {
return fmt.Errorf("type %s already has a serializer %s registered", type_, prev)
}
r.typeToSerializers[type_] = s
typeId := s.TypeId()
if typeId != FURY_TYPE_TAG {
if typeId > NotSupportCrossLanguage {
if _, ok := r.typeIdToType[typeId]; ok {
return fmt.Errorf("type %s with id %d has been registered", type_, typeId)
}
r.typeIdToType[typeId] = type_
}
}
return nil
}
func (r *typeResolver) RegisterTypeTag(type_ reflect.Type, tag string) error {
if prev, ok := r.typeToSerializers[type_]; ok {
return fmt.Errorf("type %s already has a serializer %s registered", type_, prev)
}
serializer := &structSerializer{type_: type_, typeTag: tag}
r.typeToSerializers[type_] = serializer
// multiple struct with same name defined inside function will have same `type_.String()`, but they are
// different types. so we use tag to encode type info.
// tagged type encode as `@$tag`/`*@$tag`.
r.typeToTypeInfo[type_] = "@" + tag
r.typeInfoToType["@"+tag] = type_
ptrType := reflect.PtrTo(type_)
ptrSerializer := &ptrToStructSerializer{structSerializer: *serializer, type_: ptrType}
r.typeToSerializers[ptrType] = ptrSerializer
// use `ptrToStructSerializer` as default deserializer when deserializing data from other languages.
r.typeTagToSerializers[tag] = ptrSerializer
r.typeToTypeInfo[ptrType] = "*@" + tag
r.typeInfoToType["*@"+tag] = ptrType
return nil
}
func (r *typeResolver) RegisterExt(extId int16, type_ reflect.Type) error {
// Registering type is necessary, otherwise we may don't have the symbols of corresponding type when deserializing.
panic("not supported")
}
func (r *typeResolver) getSerializerByType(type_ reflect.Type) (Serializer, error) {
if serializer, ok := r.typeToSerializers[type_]; !ok {
if serializer, err := r.createSerializer(type_); err != nil {
return nil, err
} else {
r.typeToSerializers[type_] = serializer
return serializer, nil
}
} else {
return serializer, nil
}
}
func (r *typeResolver) getSerializerByTypeTag(typeTag string) (Serializer, error) {
if serializer, ok := r.typeTagToSerializers[typeTag]; !ok {
return nil, fmt.Errorf("type %s not supported", typeTag)
} else {
return serializer, nil
}
}
func (r *typeResolver) createSerializer(type_ reflect.Type) (s Serializer, err error) {
kind := type_.Kind()
switch kind {
case reflect.Ptr:
if elemKind := type_.Elem().Kind(); elemKind == reflect.Ptr || elemKind == reflect.Interface {
return nil, fmt.Errorf("pointer to pinter/interface are not supported but got type %s", type_)
}
valueSerializer, err := r.getSerializerByType(type_.Elem())
if err != nil {
return nil, err
}
return &ptrToValueSerializer{valueSerializer}, nil
case reflect.Slice:
elem := type_.Elem()
if isDynamicType(elem) {
return sliceSerializer{}, nil
} else {
elemSerializer, err := r.getSerializerByType(type_.Elem())
if err != nil {
return nil, err
}
return &sliceConcreteValueSerializer{
type_: type_,
elemSerializer: elemSerializer,
referencable: nullable(type_.Elem()),
}, nil
}
case reflect.Array:
elem := type_.Elem()
if isDynamicType(elem) {
return arraySerializer{}, nil
} else {
elemSerializer, err := r.getSerializerByType(type_.Elem())
if err != nil {
return nil, err
}
return &arrayConcreteValueSerializer{
type_: type_,
elemSerializer: elemSerializer,
referencable: nullable(type_.Elem()),
}, nil
}
case reflect.Map:
hasKeySerializer, hasValueSerializer := !isDynamicType(type_.Key()), !isDynamicType(type_.Elem())
if hasKeySerializer || hasValueSerializer {
var keySerializer, valueSerializer Serializer
if hasKeySerializer {
keySerializer, err = r.getSerializerByType(type_.Key())
if err != nil {
return nil, err
}
}
if hasValueSerializer {
valueSerializer, err = r.getSerializerByType(type_.Elem())
if err != nil {
return nil, err
}
}
return &mapConcreteKeyValueSerializer{
type_: type_,
keySerializer: keySerializer,
valueSerializer: valueSerializer,
keyReferencable: nullable(type_.Key()),
valueReferencable: nullable(type_.Elem()),
}, nil
} else {
return mapSerializer{}, nil
}
}
return nil, fmt.Errorf("type %s not supported", type_.String())
}
func isDynamicType(type_ reflect.Type) bool {
return type_.Kind() == reflect.Interface || (type_.Kind() == reflect.Ptr && (type_.Elem().Kind() == reflect.Ptr ||
type_.Elem().Kind() == reflect.Interface))
}
func (r *typeResolver) writeType(buffer *ByteBuffer, type_ reflect.Type) error {
typeInfo, ok := r.typeToTypeInfo[type_]
if !ok {
if encodeType, err := r.encodeType(type_); err != nil {
return err
} else {
typeInfo = encodeType
r.typeToTypeInfo[type_] = encodeType
}
}
if err := r.writeMetaString(buffer, typeInfo); err != nil {
return err
} else {
return nil
}
}
func (r *typeResolver) readType(buffer *ByteBuffer) (reflect.Type, error) {
metaString, err := r.readMetaString(buffer)
if err != nil {
return nil, err
}
type_, ok := r.typeInfoToType[metaString]
if !ok {
type_, _, err = r.decodeType(metaString)
if err != nil {
return nil, err
} else {
r.typeInfoToType[metaString] = type_
}
}
return type_, nil
}
func (r *typeResolver) encodeType(type_ reflect.Type) (string, error) {
if info, ok := r.typeToTypeInfo[type_]; ok {
return info, nil
}
switch kind := type_.Kind(); kind {
case reflect.Ptr, reflect.Array, reflect.Slice, reflect.Map:
if elemTypeStr, err := r.encodeType(type_.Elem()); err != nil {
return "", err
} else {
if kind == reflect.Ptr {
return "*" + elemTypeStr, nil
} else if kind == reflect.Array {
return fmt.Sprintf("[%d]", type_.Len()) + elemTypeStr, nil
} else if kind == reflect.Slice {
return "[]" + elemTypeStr, nil
} else if kind == reflect.Map {
if keyTypeStr, err := r.encodeType(type_.Key()); err != nil {
return "", err
} else {
return fmt.Sprintf("map[%s]%s", keyTypeStr, elemTypeStr), nil
}
}
}
}
return type_.String(), nil
}
func (r *typeResolver) decodeType(typeStr string) (reflect.Type, string, error) {
if type_, ok := r.typeInfoToType[typeStr]; ok {
return type_, typeStr, nil
}
if strings.HasPrefix(typeStr, "*") { // ptr
subStr := typeStr[len("*"):]
type_, subStr, err := r.decodeType(subStr)
if err != nil {
return nil, "", err
} else {
return reflect.PtrTo(type_), "*" + subStr, nil
}
} else if strings.HasPrefix(typeStr, "[]") { // slice
subStr := typeStr[len("[]"):]
type_, subStr, err := r.decodeType(subStr)
if err != nil {
return nil, "", err
} else {
return reflect.SliceOf(type_), "[]" + subStr, nil
}
} else if strings.HasPrefix(typeStr, "[") { // array
arrTypeRegex, _ := regexp.Compile(`\[([0-9]+)]`)
idx := arrTypeRegex.FindStringSubmatchIndex(typeStr)
if idx == nil {
return nil, "", fmt.Errorf("unparseable type %s", typeStr)
}
lenStr := typeStr[idx[2]:idx[3]]
if length, err := strconv.Atoi(lenStr); err != nil {
return nil, "", err
} else {
subStr := typeStr[idx[1]:]
type_, elemStr, err := r.decodeType(subStr)
if err != nil {
return nil, "", err
} else {
return reflect.ArrayOf(length, type_), typeStr[idx[0]:idx[1]] + elemStr, nil
}
}
} else if strings.HasPrefix(typeStr, "map[") {
subStr := typeStr[len("map["):]
keyType, keyStr, err := r.decodeType(subStr)
if err != nil {
return nil, "", fmt.Errorf("unparseable map type: %s : %s", typeStr, err)
} else {
subStr := typeStr[len("map[")+len(keyStr)+len("]"):]
valueType, valueStr, err := r.decodeType(subStr)
if err != nil {
return nil, "", fmt.Errorf("unparseable map value type: %s : %s", subStr, err)
} else {
return reflect.MapOf(keyType, valueType), "map[" + keyStr + "]" + valueStr, nil
}
}
} else {
if idx := strings.Index(typeStr, "]"); idx >= 0 {
return r.decodeType(typeStr[:idx])
}
if t, ok := r.typeInfoToType[typeStr]; !ok {
return nil, "", fmt.Errorf("type %s not supported", typeStr)
} else {
return t, typeStr, nil
}
}
}
func (r *typeResolver) writeTypeTag(buffer *ByteBuffer, typeTag string) error {
if err := r.writeMetaString(buffer, typeTag); err != nil {
return err
} else {
return nil
}
}
func (r *typeResolver) readTypeByReadTag(buffer *ByteBuffer) (reflect.Type, error) {
metaString, err := r.readMetaString(buffer)
if err != nil {
return nil, err
}
return r.typeTagToSerializers[metaString].(*ptrToStructSerializer).type_, err
}
func (r *typeResolver) readTypeInfo(buffer *ByteBuffer) (string, error) {
return r.readMetaString(buffer)
}
func (r *typeResolver) getTypeById(id int16) (reflect.Type, error) {
type_, ok := r.typeIdToType[id]
if !ok {
return nil, fmt.Errorf("type of id %d not supported, supported types: %v", id, r.typeIdToType)
}
return type_, nil
}
func (r *typeResolver) writeMetaString(buffer *ByteBuffer, str string) error {
if id, ok := r.dynamicStringToId[str]; !ok {
dynamicStringId := r.dynamicStringId
r.dynamicStringId += 1
r.dynamicStringToId[str] = dynamicStringId
buffer.WriteVarInt32(int32(len(str) << 1))
// TODO this hash should be unique, since we don't compare data equality for performance
h := fnv.New64a()
if _, err := h.Write([]byte(str)); err != nil {
return err
}
hash := int64(h.Sum64() & 0xffffffffffffff00)
buffer.WriteInt64(hash)
if len(str) > MaxInt16 {
return fmt.Errorf("too long string: %s", str)
}
buffer.WriteBinary(unsafeGetBytes(str))
} else {
buffer.WriteVarInt32(int32(((id + 1) << 1) | 1))
}
return nil
}
func (r *typeResolver) readMetaString(buffer *ByteBuffer) (string, error) {
header := buffer.ReadVarInt32()
var length = int(header >> 1)
if header&0b1 == 0 {
// TODO support use computed hash
buffer.ReadInt64()
str := string(buffer.ReadBinary(length))
dynamicStringId := r.dynamicStringId
r.dynamicStringId += 1
r.dynamicIdToString[dynamicStringId] = str
return str, nil
} else {
return r.dynamicIdToString[int16(length-1)], nil
}
}
func (r *typeResolver) resetWrite() {
if r.dynamicStringId > 0 {
r.dynamicStringToId = map[string]int16{}
r.dynamicIdToString = map[int16]string{}
r.dynamicStringId = 0
}
}
func (r *typeResolver) resetRead() {
if r.dynamicStringId > 0 {
r.dynamicStringToId = map[string]int16{}
r.dynamicIdToString = map[int16]string{}
r.dynamicStringId = 0
}
}
func computeStringHash(str string) int32 {
strBytes := unsafeGetBytes(str)
var hash int64 = 17
for _, b := range strBytes {
hash = hash*31 + int64(b)
for hash >= MaxInt32 {
hash = hash / 7
}
}
return int32(hash)
}