[BEAM-9615] Improve error handling on schema conv
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 358f3d3..ac724b3 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -46,33 +46,39 @@
 	if t.Kind() != reflect.Struct {
 		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
 	}
-	return structToSchema(t), nil
+	return structToSchema(t)
 }
 
-func structToSchema(t reflect.Type) *pipepb.Schema {
+func structToSchema(t reflect.Type) (*pipepb.Schema, error) {
 	fields := make([]*pipepb.Field, 0, t.NumField())
 	for i := 0; i < t.NumField(); i++ {
-		fields = append(fields, structFieldToField(t.Field(i)))
+		f, err := structFieldToField(t.Field(i))
+		if err != nil {
+			return nil, errors.Wrapf(err, "cannot convert field %v to schema", t.Field(i).Name)
+		}
+		fields = append(fields, f)
 	}
 	return &pipepb.Schema{
 		Fields: fields,
-	}
+	}, nil
 }
 
-func structFieldToField(sf reflect.StructField) *pipepb.Field {
+func structFieldToField(sf reflect.StructField) (*pipepb.Field, error) {
 	name := sf.Name
 	if tag := sf.Tag.Get("beam"); tag != "" {
 		name, _ = parseTag(tag)
 	}
-	ftype := reflectTypeToFieldType(sf.Type)
-
+	ftype, err := reflectTypeToFieldType(sf.Type)
+	if err != nil {
+		return nil, err
+	}
 	return &pipepb.Field{
 		Name: name,
 		Type: ftype,
-	}
+	}, nil
 }
 
-func reflectTypeToFieldType(ot reflect.Type) *pipepb.FieldType {
+func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) {
 	var isPtr bool
 	t := ot
 	if t.Kind() == reflect.Ptr {
@@ -81,8 +87,14 @@
 	}
 	switch t.Kind() {
 	case reflect.Map:
-		kt := reflectTypeToFieldType(t.Key())
-		vt := reflectTypeToFieldType(t.Elem())
+		kt, err := reflectTypeToFieldType(t.Key())
+		if err != nil {
+			return nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot)
+		}
+		vt, err := reflectTypeToFieldType(t.Elem())
+		if err != nil {
+			return nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot)
+		}
 		return &pipepb.FieldType{
 			Nullable: isPtr,
 			TypeInfo: &pipepb.FieldType_MapType{
@@ -91,9 +103,12 @@
 					ValueType: vt,
 				},
 			},
-		}
+		}, nil
 	case reflect.Struct:
-		sch := structToSchema(t)
+		sch, err := structToSchema(t)
+		if err != nil {
+			return nil, errors.Wrapf(err, "unable to convert %v to schema field", ot)
+		}
 		return &pipepb.FieldType{
 			Nullable: isPtr,
 			TypeInfo: &pipepb.FieldType_RowType{
@@ -101,7 +116,7 @@
 					Schema: sch,
 				},
 			},
-		}
+		}, nil
 	case reflect.Slice, reflect.Array:
 		// Special handling for []byte
 		if t == reflectx.ByteSlice {
@@ -110,9 +125,12 @@
 				TypeInfo: &pipepb.FieldType_AtomicType{
 					AtomicType: pipepb.AtomicType_BYTES,
 				},
-			}
+			}, nil
 		}
-		vt := reflectTypeToFieldType(t.Elem())
+		vt, err := reflectTypeToFieldType(t.Elem())
+		if err != nil {
+			return nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot)
+		}
 		return &pipepb.FieldType{
 			Nullable: isPtr,
 			TypeInfo: &pipepb.FieldType_ArrayType{
@@ -120,43 +138,32 @@
 					ElementType: vt,
 				},
 			},
-		}
-	case reflect.Interface, reflect.Chan, reflect.UnsafePointer, reflect.Complex128, reflect.Complex64, reflect.Int:
-		panic(fmt.Sprintf("Unsupported type to convert to schema: %v", ot))
+		}, nil
+	case reflect.Interface, reflect.Chan, reflect.UnsafePointer, reflect.Complex128, reflect.Complex64:
+		return nil, errors.Errorf("unable to convert unsupported type %v to schema", ot)
 	default: // must be an atomic type
-		enum := reflectTypeToAtomicType(t)
-		return &pipepb.FieldType{
-			Nullable: isPtr,
-			TypeInfo: &pipepb.FieldType_AtomicType{
-				AtomicType: enum,
-			},
+		if enum, ok := reflectTypeToAtomicTypeMap[t.Kind()]; ok {
+			return &pipepb.FieldType{
+				Nullable: isPtr,
+				TypeInfo: &pipepb.FieldType_AtomicType{
+					AtomicType: enum,
+				},
+			}, nil
 		}
+		return nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t)
 	}
 }
 
-func reflectTypeToAtomicType(rt reflect.Type) pipepb.AtomicType {
-	switch rt {
-	case reflectx.Uint8:
-		return pipepb.AtomicType_BYTE
-	case reflectx.Int16:
-		return pipepb.AtomicType_INT16
-	case reflectx.Int32:
-		return pipepb.AtomicType_INT32
-	case reflectx.Int64, reflectx.Int:
-		return pipepb.AtomicType_INT64
-	case reflectx.Float32:
-		return pipepb.AtomicType_FLOAT
-	case reflectx.Float64:
-		return pipepb.AtomicType_DOUBLE
-	case reflectx.String:
-		return pipepb.AtomicType_STRING
-	case reflectx.Bool:
-		return pipepb.AtomicType_BOOLEAN
-	case reflectx.ByteSlice:
-		return pipepb.AtomicType_BYTES
-	default:
-		panic(fmt.Sprintf("non atomic reflect type: %v", rt))
-	}
+var reflectTypeToAtomicTypeMap = map[reflect.Kind]pipepb.AtomicType{
+	reflect.Uint8:   pipepb.AtomicType_BYTE,
+	reflect.Int16:   pipepb.AtomicType_INT16,
+	reflect.Int32:   pipepb.AtomicType_INT32,
+	reflect.Int64:   pipepb.AtomicType_INT64,
+	reflect.Int:     pipepb.AtomicType_INT64,
+	reflect.Float32: pipepb.AtomicType_FLOAT,
+	reflect.Float64: pipepb.AtomicType_DOUBLE,
+	reflect.String:  pipepb.AtomicType_STRING,
+	reflect.Bool:    pipepb.AtomicType_BOOLEAN,
 }
 
 // ToType returns a Go type of the passed in Schema.
@@ -165,19 +172,26 @@
 func ToType(s *pipepb.Schema) (reflect.Type, error) {
 	fields := make([]reflect.StructField, 0, len(s.GetFields()))
 	for _, sf := range s.GetFields() {
-		rf := fieldToStructField(sf)
+		rf, err := fieldToStructField(sf)
+		if err != nil {
+			return nil, errors.Wrapf(err, "cannot convert schema field %v to field", sf.GetName())
+		}
 		fields = append(fields, rf)
 	}
 	return reflect.StructOf(fields), nil
 }
 
-func fieldToStructField(sf *pipepb.Field) reflect.StructField {
+func fieldToStructField(sf *pipepb.Field) (reflect.StructField, error) {
 	name := sf.GetName()
+	rt, err := fieldTypeToReflectType(sf.GetType())
+	if err != nil {
+		return reflect.StructField{}, err
+	}
 	return reflect.StructField{
 		Name: strings.ToUpper(name[:1]) + name[1:], // Go field name must be capitalized for export and encoding.
-		Type: fieldTypeToReflectType(sf.GetType()),
+		Type: rt,
 		Tag:  reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name)),
-	}
+	}, nil
 }
 
 var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{
@@ -192,24 +206,34 @@
 	pipepb.AtomicType_BYTES:   reflectx.ByteSlice,
 }
 
-func fieldTypeToReflectType(sft *pipepb.FieldType) reflect.Type {
+func fieldTypeToReflectType(sft *pipepb.FieldType) (reflect.Type, error) {
 	var t reflect.Type
 	switch sft.GetTypeInfo().(type) {
 	case *pipepb.FieldType_AtomicType:
 		var ok bool
 		if t, ok = atomicTypeToReflectType[sft.GetAtomicType()]; !ok {
-			panic(fmt.Sprintf("unknown atomic type: %v", sft.GetAtomicType()))
+			return nil, errors.Errorf("unknown atomic type: %v", sft.GetAtomicType())
 		}
 	case *pipepb.FieldType_ArrayType:
-		t = reflect.SliceOf(fieldTypeToReflectType(sft.GetArrayType().GetElementType()))
+		rt, err := fieldTypeToReflectType(sft.GetArrayType().GetElementType())
+		if err != nil {
+			return nil, errors.Wrap(err, "unable to convert array element type")
+		}
+		t = reflect.SliceOf(rt)
 	case *pipepb.FieldType_MapType:
-		kt := fieldTypeToReflectType(sft.GetMapType().GetKeyType())
-		vt := fieldTypeToReflectType(sft.GetMapType().GetValueType())
+		kt, err := fieldTypeToReflectType(sft.GetMapType().GetKeyType())
+		if err != nil {
+			return nil, errors.Wrap(err, "unable to convert map key type")
+		}
+		vt, err := fieldTypeToReflectType(sft.GetMapType().GetValueType())
+		if err != nil {
+			return nil, errors.Wrap(err, "unable to convert map value type")
+		}
 		t = reflect.MapOf(kt, vt) // Panics for invalid map keys (slices/iterables)
 	case *pipepb.FieldType_RowType:
 		rt, err := ToType(sft.GetRowType().GetSchema())
 		if err != nil {
-			panic(err)
+			return nil, errors.Wrapf(err, "unable to convert row type: %v", sft.GetRowType().GetSchema().GetId())
 		}
 		t = rt
 	// case *pipepb.FieldType_IterableType:
@@ -223,12 +247,12 @@
 	// They would be encoded with the schema encoding.
 
 	default:
-		panic(fmt.Sprintf("unknown fieldtype: %T", sft.GetTypeInfo()))
+		return nil, errors.Errorf("unknown fieldtype: %T", sft.GetTypeInfo())
 	}
 	if sft.GetNullable() {
-		return reflect.PtrTo(t)
+		return reflect.PtrTo(t), nil
 	}
-	return t
+	return t, nil
 }
 
 // parseTag splits a struct field's beam tag into its name and