[BEAM-9615] Improve error handling on schema conv
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 358f3d3..ac724b3 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -46,33 +46,39 @@
if t.Kind() != reflect.Struct {
return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
}
- return structToSchema(t), nil
+ return structToSchema(t)
}
-func structToSchema(t reflect.Type) *pipepb.Schema {
+func structToSchema(t reflect.Type) (*pipepb.Schema, error) {
fields := make([]*pipepb.Field, 0, t.NumField())
for i := 0; i < t.NumField(); i++ {
- fields = append(fields, structFieldToField(t.Field(i)))
+ f, err := structFieldToField(t.Field(i))
+ if err != nil {
+ return nil, errors.Wrapf(err, "cannot convert field %v to schema", t.Field(i).Name)
+ }
+ fields = append(fields, f)
}
return &pipepb.Schema{
Fields: fields,
- }
+ }, nil
}
-func structFieldToField(sf reflect.StructField) *pipepb.Field {
+func structFieldToField(sf reflect.StructField) (*pipepb.Field, error) {
name := sf.Name
if tag := sf.Tag.Get("beam"); tag != "" {
name, _ = parseTag(tag)
}
- ftype := reflectTypeToFieldType(sf.Type)
-
+ ftype, err := reflectTypeToFieldType(sf.Type)
+ if err != nil {
+ return nil, err
+ }
return &pipepb.Field{
Name: name,
Type: ftype,
- }
+ }, nil
}
-func reflectTypeToFieldType(ot reflect.Type) *pipepb.FieldType {
+func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) {
var isPtr bool
t := ot
if t.Kind() == reflect.Ptr {
@@ -81,8 +87,14 @@
}
switch t.Kind() {
case reflect.Map:
- kt := reflectTypeToFieldType(t.Key())
- vt := reflectTypeToFieldType(t.Elem())
+ kt, err := reflectTypeToFieldType(t.Key())
+ if err != nil {
+ return nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot)
+ }
+ vt, err := reflectTypeToFieldType(t.Elem())
+ if err != nil {
+ return nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot)
+ }
return &pipepb.FieldType{
Nullable: isPtr,
TypeInfo: &pipepb.FieldType_MapType{
@@ -91,9 +103,12 @@
ValueType: vt,
},
},
- }
+ }, nil
case reflect.Struct:
- sch := structToSchema(t)
+ sch, err := structToSchema(t)
+ if err != nil {
+ return nil, errors.Wrapf(err, "unable to convert %v to schema field", ot)
+ }
return &pipepb.FieldType{
Nullable: isPtr,
TypeInfo: &pipepb.FieldType_RowType{
@@ -101,7 +116,7 @@
Schema: sch,
},
},
- }
+ }, nil
case reflect.Slice, reflect.Array:
// Special handling for []byte
if t == reflectx.ByteSlice {
@@ -110,9 +125,12 @@
TypeInfo: &pipepb.FieldType_AtomicType{
AtomicType: pipepb.AtomicType_BYTES,
},
- }
+ }, nil
}
- vt := reflectTypeToFieldType(t.Elem())
+ vt, err := reflectTypeToFieldType(t.Elem())
+ if err != nil {
+ return nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot)
+ }
return &pipepb.FieldType{
Nullable: isPtr,
TypeInfo: &pipepb.FieldType_ArrayType{
@@ -120,43 +138,32 @@
ElementType: vt,
},
},
- }
- case reflect.Interface, reflect.Chan, reflect.UnsafePointer, reflect.Complex128, reflect.Complex64, reflect.Int:
- panic(fmt.Sprintf("Unsupported type to convert to schema: %v", ot))
+ }, nil
+ case reflect.Interface, reflect.Chan, reflect.UnsafePointer, reflect.Complex128, reflect.Complex64:
+ return nil, errors.Errorf("unable to convert unsupported type %v to schema", ot)
default: // must be an atomic type
- enum := reflectTypeToAtomicType(t)
- return &pipepb.FieldType{
- Nullable: isPtr,
- TypeInfo: &pipepb.FieldType_AtomicType{
- AtomicType: enum,
- },
+ if enum, ok := reflectTypeToAtomicTypeMap[t.Kind()]; ok {
+ return &pipepb.FieldType{
+ Nullable: isPtr,
+ TypeInfo: &pipepb.FieldType_AtomicType{
+ AtomicType: enum,
+ },
+ }, nil
}
+ return nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t)
}
}
-func reflectTypeToAtomicType(rt reflect.Type) pipepb.AtomicType {
- switch rt {
- case reflectx.Uint8:
- return pipepb.AtomicType_BYTE
- case reflectx.Int16:
- return pipepb.AtomicType_INT16
- case reflectx.Int32:
- return pipepb.AtomicType_INT32
- case reflectx.Int64, reflectx.Int:
- return pipepb.AtomicType_INT64
- case reflectx.Float32:
- return pipepb.AtomicType_FLOAT
- case reflectx.Float64:
- return pipepb.AtomicType_DOUBLE
- case reflectx.String:
- return pipepb.AtomicType_STRING
- case reflectx.Bool:
- return pipepb.AtomicType_BOOLEAN
- case reflectx.ByteSlice:
- return pipepb.AtomicType_BYTES
- default:
- panic(fmt.Sprintf("non atomic reflect type: %v", rt))
- }
+var reflectTypeToAtomicTypeMap = map[reflect.Kind]pipepb.AtomicType{
+ reflect.Uint8: pipepb.AtomicType_BYTE,
+ reflect.Int16: pipepb.AtomicType_INT16,
+ reflect.Int32: pipepb.AtomicType_INT32,
+ reflect.Int64: pipepb.AtomicType_INT64,
+ reflect.Int: pipepb.AtomicType_INT64,
+ reflect.Float32: pipepb.AtomicType_FLOAT,
+ reflect.Float64: pipepb.AtomicType_DOUBLE,
+ reflect.String: pipepb.AtomicType_STRING,
+ reflect.Bool: pipepb.AtomicType_BOOLEAN,
}
// ToType returns a Go type of the passed in Schema.
@@ -165,19 +172,26 @@
func ToType(s *pipepb.Schema) (reflect.Type, error) {
fields := make([]reflect.StructField, 0, len(s.GetFields()))
for _, sf := range s.GetFields() {
- rf := fieldToStructField(sf)
+ rf, err := fieldToStructField(sf)
+ if err != nil {
+ return nil, errors.Wrapf(err, "cannot convert schema field %v to field", sf.GetName())
+ }
fields = append(fields, rf)
}
return reflect.StructOf(fields), nil
}
-func fieldToStructField(sf *pipepb.Field) reflect.StructField {
+func fieldToStructField(sf *pipepb.Field) (reflect.StructField, error) {
name := sf.GetName()
+ rt, err := fieldTypeToReflectType(sf.GetType())
+ if err != nil {
+ return reflect.StructField{}, err
+ }
return reflect.StructField{
Name: strings.ToUpper(name[:1]) + name[1:], // Go field name must be capitalized for export and encoding.
- Type: fieldTypeToReflectType(sf.GetType()),
+ Type: rt,
Tag: reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name)),
- }
+ }, nil
}
var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{
@@ -192,24 +206,34 @@
pipepb.AtomicType_BYTES: reflectx.ByteSlice,
}
-func fieldTypeToReflectType(sft *pipepb.FieldType) reflect.Type {
+func fieldTypeToReflectType(sft *pipepb.FieldType) (reflect.Type, error) {
var t reflect.Type
switch sft.GetTypeInfo().(type) {
case *pipepb.FieldType_AtomicType:
var ok bool
if t, ok = atomicTypeToReflectType[sft.GetAtomicType()]; !ok {
- panic(fmt.Sprintf("unknown atomic type: %v", sft.GetAtomicType()))
+ return nil, errors.Errorf("unknown atomic type: %v", sft.GetAtomicType())
}
case *pipepb.FieldType_ArrayType:
- t = reflect.SliceOf(fieldTypeToReflectType(sft.GetArrayType().GetElementType()))
+ rt, err := fieldTypeToReflectType(sft.GetArrayType().GetElementType())
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to convert array element type")
+ }
+ t = reflect.SliceOf(rt)
case *pipepb.FieldType_MapType:
- kt := fieldTypeToReflectType(sft.GetMapType().GetKeyType())
- vt := fieldTypeToReflectType(sft.GetMapType().GetValueType())
+ kt, err := fieldTypeToReflectType(sft.GetMapType().GetKeyType())
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to convert map key type")
+ }
+ vt, err := fieldTypeToReflectType(sft.GetMapType().GetValueType())
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to convert map value type")
+ }
t = reflect.MapOf(kt, vt) // Panics for invalid map keys (slices/iterables)
case *pipepb.FieldType_RowType:
rt, err := ToType(sft.GetRowType().GetSchema())
if err != nil {
- panic(err)
+ return nil, errors.Wrapf(err, "unable to convert row type: %v", sft.GetRowType().GetSchema().GetId())
}
t = rt
// case *pipepb.FieldType_IterableType:
@@ -223,12 +247,12 @@
// They would be encoded with the schema encoding.
default:
- panic(fmt.Sprintf("unknown fieldtype: %T", sft.GetTypeInfo()))
+ return nil, errors.Errorf("unknown fieldtype: %T", sft.GetTypeInfo())
}
if sft.GetNullable() {
- return reflect.PtrTo(t)
+ return reflect.PtrTo(t), nil
}
- return t
+ return t, nil
}
// parseTag splits a struct field's beam tag into its name and