blob: ac724b38725bcadd4eee523dadb3fab6bfb49374 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package schema contains utility functions for relating Go types and Beam Schemas.
//
// Not all Go types can be converted to schemas. This is Go is more expressive than
// Beam schemas. Just as not all Go types can be serialized, similarly,
// not all Beam Schemas will have a conversion to Go types, until the correct
// mechanism exists in the SDK to handle them.
//
// While efforts will be made to have conversions be reversable, this will not
// be possible in all instances. Eg. Go arrays as fields will be converted to
// Beam Arrays, but a Beam Array type will map by default to a Go slice.
package schema
import (
"fmt"
"reflect"
"strings"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
)
// FromType returns a Beam Schema of the passed in type.
// Returns an error if the type cannot be converted to a Schema.
func FromType(ot reflect.Type) (*pipepb.Schema, error) {
t := ot // keep the original type for errors.
// The top level schema for a pointer to struct and the struct is the same.
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
if t.Kind() != reflect.Struct {
return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
}
return structToSchema(t)
}
func structToSchema(t reflect.Type) (*pipepb.Schema, error) {
fields := make([]*pipepb.Field, 0, t.NumField())
for i := 0; i < t.NumField(); i++ {
f, err := structFieldToField(t.Field(i))
if err != nil {
return nil, errors.Wrapf(err, "cannot convert field %v to schema", t.Field(i).Name)
}
fields = append(fields, f)
}
return &pipepb.Schema{
Fields: fields,
}, nil
}
func structFieldToField(sf reflect.StructField) (*pipepb.Field, error) {
name := sf.Name
if tag := sf.Tag.Get("beam"); tag != "" {
name, _ = parseTag(tag)
}
ftype, err := reflectTypeToFieldType(sf.Type)
if err != nil {
return nil, err
}
return &pipepb.Field{
Name: name,
Type: ftype,
}, nil
}
func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) {
var isPtr bool
t := ot
if t.Kind() == reflect.Ptr {
isPtr = true
t = t.Elem()
}
switch t.Kind() {
case reflect.Map:
kt, err := reflectTypeToFieldType(t.Key())
if err != nil {
return nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot)
}
vt, err := reflectTypeToFieldType(t.Elem())
if err != nil {
return nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot)
}
return &pipepb.FieldType{
Nullable: isPtr,
TypeInfo: &pipepb.FieldType_MapType{
MapType: &pipepb.MapType{
KeyType: kt,
ValueType: vt,
},
},
}, nil
case reflect.Struct:
sch, err := structToSchema(t)
if err != nil {
return nil, errors.Wrapf(err, "unable to convert %v to schema field", ot)
}
return &pipepb.FieldType{
Nullable: isPtr,
TypeInfo: &pipepb.FieldType_RowType{
RowType: &pipepb.RowType{
Schema: sch,
},
},
}, nil
case reflect.Slice, reflect.Array:
// Special handling for []byte
if t == reflectx.ByteSlice {
return &pipepb.FieldType{
Nullable: isPtr,
TypeInfo: &pipepb.FieldType_AtomicType{
AtomicType: pipepb.AtomicType_BYTES,
},
}, nil
}
vt, err := reflectTypeToFieldType(t.Elem())
if err != nil {
return nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot)
}
return &pipepb.FieldType{
Nullable: isPtr,
TypeInfo: &pipepb.FieldType_ArrayType{
ArrayType: &pipepb.ArrayType{
ElementType: vt,
},
},
}, nil
case reflect.Interface, reflect.Chan, reflect.UnsafePointer, reflect.Complex128, reflect.Complex64:
return nil, errors.Errorf("unable to convert unsupported type %v to schema", ot)
default: // must be an atomic type
if enum, ok := reflectTypeToAtomicTypeMap[t.Kind()]; ok {
return &pipepb.FieldType{
Nullable: isPtr,
TypeInfo: &pipepb.FieldType_AtomicType{
AtomicType: enum,
},
}, nil
}
return nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t)
}
}
var reflectTypeToAtomicTypeMap = map[reflect.Kind]pipepb.AtomicType{
reflect.Uint8: pipepb.AtomicType_BYTE,
reflect.Int16: pipepb.AtomicType_INT16,
reflect.Int32: pipepb.AtomicType_INT32,
reflect.Int64: pipepb.AtomicType_INT64,
reflect.Int: pipepb.AtomicType_INT64,
reflect.Float32: pipepb.AtomicType_FLOAT,
reflect.Float64: pipepb.AtomicType_DOUBLE,
reflect.String: pipepb.AtomicType_STRING,
reflect.Bool: pipepb.AtomicType_BOOLEAN,
}
// ToType returns a Go type of the passed in Schema.
// Types returned by ToType are always of Struct kind.
// Returns an error if the Schema cannot be converted to a type.
func ToType(s *pipepb.Schema) (reflect.Type, error) {
fields := make([]reflect.StructField, 0, len(s.GetFields()))
for _, sf := range s.GetFields() {
rf, err := fieldToStructField(sf)
if err != nil {
return nil, errors.Wrapf(err, "cannot convert schema field %v to field", sf.GetName())
}
fields = append(fields, rf)
}
return reflect.StructOf(fields), nil
}
func fieldToStructField(sf *pipepb.Field) (reflect.StructField, error) {
name := sf.GetName()
rt, err := fieldTypeToReflectType(sf.GetType())
if err != nil {
return reflect.StructField{}, err
}
return reflect.StructField{
Name: strings.ToUpper(name[:1]) + name[1:], // Go field name must be capitalized for export and encoding.
Type: rt,
Tag: reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name)),
}, nil
}
var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{
pipepb.AtomicType_BYTE: reflectx.Uint8,
pipepb.AtomicType_INT16: reflectx.Int16,
pipepb.AtomicType_INT32: reflectx.Int32,
pipepb.AtomicType_INT64: reflectx.Int64,
pipepb.AtomicType_FLOAT: reflectx.Float32,
pipepb.AtomicType_DOUBLE: reflectx.Float64,
pipepb.AtomicType_STRING: reflectx.String,
pipepb.AtomicType_BOOLEAN: reflectx.Bool,
pipepb.AtomicType_BYTES: reflectx.ByteSlice,
}
func fieldTypeToReflectType(sft *pipepb.FieldType) (reflect.Type, error) {
var t reflect.Type
switch sft.GetTypeInfo().(type) {
case *pipepb.FieldType_AtomicType:
var ok bool
if t, ok = atomicTypeToReflectType[sft.GetAtomicType()]; !ok {
return nil, errors.Errorf("unknown atomic type: %v", sft.GetAtomicType())
}
case *pipepb.FieldType_ArrayType:
rt, err := fieldTypeToReflectType(sft.GetArrayType().GetElementType())
if err != nil {
return nil, errors.Wrap(err, "unable to convert array element type")
}
t = reflect.SliceOf(rt)
case *pipepb.FieldType_MapType:
kt, err := fieldTypeToReflectType(sft.GetMapType().GetKeyType())
if err != nil {
return nil, errors.Wrap(err, "unable to convert map key type")
}
vt, err := fieldTypeToReflectType(sft.GetMapType().GetValueType())
if err != nil {
return nil, errors.Wrap(err, "unable to convert map value type")
}
t = reflect.MapOf(kt, vt) // Panics for invalid map keys (slices/iterables)
case *pipepb.FieldType_RowType:
rt, err := ToType(sft.GetRowType().GetSchema())
if err != nil {
return nil, errors.Wrapf(err, "unable to convert row type: %v", sft.GetRowType().GetSchema().GetId())
}
t = rt
// case *pipepb.FieldType_IterableType:
// TODO(BEAM-9615): handle IterableTypes.
// case *pipepb.FieldType_LogicalType:
// TODO(BEAM-9615): handle LogicalTypes types.
// Logical Types are for things that have more specialized user representation already, or
// things like Time or protocol buffers.
// They would be encoded with the schema encoding.
default:
return nil, errors.Errorf("unknown fieldtype: %T", sft.GetTypeInfo())
}
if sft.GetNullable() {
return reflect.PtrTo(t), nil
}
return t, nil
}
// parseTag splits a struct field's beam tag into its name and
// comma-separated options.
func parseTag(tag string) (string, options) {
if idx := strings.Index(tag, ","); idx != -1 {
return tag[:idx], options(tag[idx+1:])
}
return tag, options("")
}
type options string
// TODO(BEAM-9615): implement looking up specific options from the tags.