// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package schema contains utility functions for relating Go types and Beam Schemas.
//
// Not all Go types can be converted to schemas. This is Go is more expressive than
// Beam schemas. Just as not all Go types can be serialized, similarly,
// not all Beam Schemas will have a conversion to Go types, until the correct
// mechanism exists in the SDK to handle them.
//
// While efforts will be made to have conversions be reversable, this will not
// be possible in all instances. Eg. Go arrays as fields will be converted to
// Beam Arrays, but a Beam Array type will map by default to a Go slice.
package schema

import (
	"bytes"
	"fmt"
	"hash/fnv"
	"reflect"
	"strings"

	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
	"github.com/apache/beam/sdks/go/pkg/beam/core/sdf"
	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
	"github.com/golang/protobuf/proto"
	"github.com/google/uuid"
)

// Initialize registered schemas. For use by the beam package at beam.Init time.
func Initialize() {
	if err := defaultRegistry.reconcileRegistrations(); err != nil {
		panic(err)
	}
}

// FromType returns a Beam Schema of the passed in type.
// Returns an error if the type cannot be converted to a Schema.
func FromType(ot reflect.Type) (*pipepb.Schema, error) {
	return defaultRegistry.FromType(ot)
}

// ToType returns a Go type of the passed in Schema.
// Types returned by ToType are always of Struct kind.
// Returns an error if the Schema cannot be converted to a type.
func ToType(s *pipepb.Schema) (reflect.Type, error) {
	return defaultRegistry.ToType(s)
}

// Registered returns whether the given type has been registered with
// the default schema registry.
func Registered(ut reflect.Type) bool {
	return defaultRegistry.Registered(ut)
}

// RegisterType converts the type to it's schema representation, and converts it back to
// a synthetic type so we can map from the synthetic type back to the user type.
// Recursively registers other named struct types in any component parts.
func RegisterType(ut reflect.Type) {
	defaultRegistry.RegisterType(ut)
}

// getUUID generates a UUID using the string form of the type name.
func getUUID(ut reflect.Type) string {
	// String produces non-empty output for pointer and slice types.
	typename := ut.String()
	hasher := fnv.New128a()
	if n, err := hasher.Write([]byte(typename)); err != nil || n != len(typename) {
		panic(fmt.Sprintf("unable to generate schema uuid for %s, wrote out %d bytes, want %d: err %v", typename, n, len(typename), err))
	}
	id, err := uuid.NewRandomFromReader(bytes.NewBuffer(hasher.Sum(nil)))
	if err != nil {
		panic(fmt.Sprintf("unable to genereate schema uuid for type %s: %v", typename, err))
	}
	return id.String()
}

// Registered returns whether the given type has been registered with
// the schema package.
func (r *Registry) Registered(ut reflect.Type) bool {
	_, ok := r.syntheticToUser[ut]
	return ok
}

var sdfRtrackerType = reflect.TypeOf((*sdf.RTracker)(nil)).Elem()

// RegisterType converts the type to it's schema representation, and converts it back to
// a synthetic type so we can map from the synthetic type back to the user type.
// Recursively registers other named struct types in any component parts.
func (r *Registry) RegisterType(ut reflect.Type) {
	r.toReconcile = append(r.toReconcile, ut)
}

// reconcileRegistrations actually finishes the registration process.
func (r *Registry) reconcileRegistrations() (deferedErr error) {
	var ut reflect.Type
	defer func() {
		if r := recover(); r != nil {
			deferedErr = errors.Errorf("panicked: %v", r)
			deferedErr = errors.WithContextf(deferedErr, "reconciling schema registration for type %v", ut)
		}
	}()
	for _, ut := range r.toReconcile {
		check := func(ut reflect.Type) bool {
			return coder.LookupCustomCoder(ut) != nil
		}
		if check(ut) || check(reflect.PtrTo(ut)) {
			continue
		}
		if err := r.registerType(ut, map[reflect.Type]struct{}{}); err != nil {
			return errors.Wrapf(err, "error reconciling type %v", ut)
		}
	}
	r.toReconcile = nil
	return nil
}

func implements(ut, ifacet reflect.Type) bool {
	if ut.Implements(ifacet) {
		return true
	}
	switch ut.Kind() {
	case reflect.Ptr:
		t := ut.Elem()
		if t.Implements(ifacet) {
			return true
		}
		return implements(t, ifacet)
	case reflect.Struct:
		for i := 0; i < ut.NumField(); i++ {
			sf := ut.Field(i)
			if sf.Anonymous {
				impls := implements(sf.Type, ifacet)
				if impls {
					return true
				}
			}
		}
	}
	return false
}

func ignoreField(t reflect.Type, sf reflect.StructField) (ignore, isAnon bool, err error) {
	isUnexported := sf.PkgPath != ""
	if sf.Anonymous {
		ft := sf.Type
		if ft.Kind() == reflect.Ptr {
			// If a struct embeds a pointer to an unexported type,
			// it is not possible to set a newly allocated value
			// since the field is unexported.
			//
			// See https://golang.org/issue/21357
			//
			// Since the values are created by the decoder reflectively,
			// fail early here.
			if isUnexported {
				return false, false, errors.Errorf("cannot make schema for type %v as it has an embedded field of a pointer to an unexported type %v. See https://golang.org/issue/21357", t, ft.Elem())
			}
			ft = ft.Elem()
		}
		if isUnexported && ft.Kind() != reflect.Struct {
			// Ignore embedded fields of unexported non-struct types.
			return true, true, nil
		}
		// Do not ignore embedded fields of unexported struct types
		// since they may have exported fields.
		return false, true, nil
	}
	if isUnexported {
		// Schemas can't handle unexported fields at all.
		return true, false, nil
	}
	if implements(sf.Type, sdfRtrackerType) {
		// ignoring sdf.Rtracker interface
		return true, false, nil
	}

	return false, false, nil
}

func (r *Registry) registerType(ut reflect.Type, seen map[reflect.Type]struct{}) error {
	// Ignore rtrackers.
	if implements(ut, sdfRtrackerType) {
		return nil
	}
	if _, ok := r.syntheticToUser[ut]; ok {
		return nil
	}
	if _, ok := seen[ut]; ok {
		return nil // already processed in this pass, don't reprocess.
	}
	seen[ut] = struct{}{}

	// Lets do some recursion to register fundamental type parts.
	t := ut
	if lID, ok := r.logicalTypeIdentifiers[t]; ok {
		lt := r.logicalTypes[lID]
		r.addToMaps(lt.StorageType(), t)
		return nil
	}

	for _, lti := range r.logicalTypeInterfaces {
		if !t.Implements(lti) {
			continue
		}
		p := r.logicalTypeProviders[lti]
		st, err := p(t)
		if err != nil {
			return errors.Wrapf(err, "unable to convert LogicalType[%v] using provider for %v", t, lti)
		}
		if st == nil {
			continue
		}
		r.RegisterLogicalType(ToLogicalType(t.String(), t, st))
		r.addToMaps(st, t)
		return nil
	}

	switch t.Kind() {
	case reflect.Map:
		if err := r.registerType(t.Key(), seen); err != nil {
			return err
		}
		fallthrough
	case reflect.Array, reflect.Slice, reflect.Ptr:
		if err := r.registerType(t.Elem(), seen); err != nil {
			return errors.Wrapf(err, "type is of kind %v", t.Kind())
		}
		return nil
	case reflect.Interface, reflect.Func, reflect.Chan, reflect.Invalid, reflect.UnsafePointer, reflect.Uintptr:
		// Ignore these, as they can't be serialized.
		return nil
	case reflect.Complex64, reflect.Complex128:
		// TODO(BEAM-9615): Support complex number types.
		return nil
	case reflect.Struct: // What we expect here.
	default:
		rt, ok := reflectKindToTypeMap[t.Kind()]
		if !ok {
			// Kind is not listed, meaning it's an unlisted somehow, which means either the map
			// is missing something, or the above switch cases are missing something.
			return errors.Errorf("unlisted kind %v for type %v reached.", t.Kind(), t)
		}
		if t != rt {
			// It's only a logical type if it's not a built in primitive type, which is returned by the map.
			r.RegisterLogicalType(ToLogicalType(t.String(), t, rt))
		}
		return nil
	}

	for i := 0; i < t.NumField(); i++ {
		sf := ut.Field(i)
		ignore, _, err := ignoreField(t, sf)
		if err != nil {
			return err
		}
		if ignore {
			continue
		}
		if err := r.registerType(sf.Type, seen); err != nil {
			return errors.Wrapf(err, "registering type for field %v in %v", sf.Name, ut)
		}
	}

	schm, err := r.fromType(ut)
	if err != nil {
		return errors.WithContextf(err, "converting %v to schema", ut)
	}
	synth, err := r.toType(schm)
	if err != nil {
		return errors.WithContextf(err, "converting %v's back to a synthetic type", ut)
	}

	r.addToMaps(synth, ut)
	return nil
}

func (r *Registry) addToMaps(synth, ut reflect.Type) {
	synth = reflectx.SkipPtr(synth)
	ut = reflectx.SkipPtr(ut)
	// empty types have no value for lookups.
	if synth != emptyStructType {
		r.syntheticToUser[synth] = ut
		r.syntheticToUser[reflect.PtrTo(synth)] = reflect.PtrTo(ut)
	}
	if ut != emptyStructType {
		r.syntheticToUser[ut] = ut
		r.syntheticToUser[reflect.PtrTo(ut)] = reflect.PtrTo(ut)
	}
}

// FromType returns a Beam Schema of the passed in type.
// Returns an error if the type cannot be converted to a Schema.
func (r *Registry) FromType(ot reflect.Type) (*pipepb.Schema, error) {
	if err := r.reconcileRegistrations(); err != nil {
		return nil, errors.Wrap(err, "reconciling for FromType")
	}
	if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
	}
	return r.fromType(ot)
}

func (r *Registry) logicalTypeToFieldType(t reflect.Type) (*pipepb.FieldType, string, error) {
	// Check if a logical type was registered that matches this struct type directly
	// and if so, extract the schema from it for use.
	if lID, ok := r.logicalTypeIdentifiers[t]; ok {
		lt := r.logicalTypes[lID]
		ftype, err := r.reflectTypeToFieldType(lt.StorageType())
		if err != nil {
			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", lID, lt.StorageType(), lt.GoType())
		}
		return ftype, lID, nil
	}
	for _, lti := range r.logicalTypeInterfaces {
		if !t.Implements(lti) {
			continue
		}
		p := r.logicalTypeProviders[lti]
		st, err := p(t)
		if err != nil {
			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v] using provider for %v schema field", t, lti)
		}
		if st == nil {
			continue
		}
		ftype, err := r.reflectTypeToFieldType(st)
		if err != nil {
			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", "interface", st, t)
		}
		return ftype, t.String(), nil
	}
	return nil, "", nil
}

// fromType handles if the initial type is a pointer or not WRT lookups against
// registered types and then delegates to structToSchema for most of the conversion.
// For determinism in schema IDs, regardless of whther the original type is a pointer or not,
// both variants are cached for latter reuse.
func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
	if schm, ok := r.typeToSchema[ot]; ok {
		return schm, nil
	}
	ftype, lID, err := r.logicalTypeToFieldType(ot)
	if err != nil {
		return nil, err
	}
	if ftype != nil {
		schm := ftype.GetRowType().GetSchema()
		schm = proto.Clone(schm).(*pipepb.Schema)
		if ot.Kind() == reflect.Ptr {
			schm.Options = append(schm.Options, optGoNillable())
		}
		if lID != "" {
			schm.Options = append(schm.Options, logicalOption(lID))
		}
		schm.Id = getUUID(ot)
		r.typeToSchema[ot] = schm
		r.idToType[schm.GetId()] = ot
		return schm, nil
	}

	t := reflectx.SkipPtr(ot)

	schm, err := r.structToSchema(t)
	if err != nil {
		return nil, err
	}
	// Cache the pointer type here with it's own id.
	pt := reflect.PtrTo(t)
	schm = proto.Clone(schm).(*pipepb.Schema)
	schm.Id = getUUID(pt)
	schm.Options = append(schm.Options, optGoNillable())
	r.idToType[schm.GetId()] = pt
	r.typeToSchema[pt] = schm

	// Return whatever the original type was.
	return r.typeToSchema[ot], nil
}

// Schema Option urns.
const (
	// optGoNillable indicates that this top level schema should be returned as a pointer type.
	optGoNillableUrn = "beam:schema:go:nillable:v1"
	// optGoEmbedded indicates that this field is an embedded type.
	optGoEmbeddedUrn = "beam:schema:go:embedded_field:v1"
	// optGoLogical indicates that this top level schema has a logical type equivalent that need to be looked up.
	// It has a value type of String representing the URN for the logical type to look up.
	optGoLogicalUrn = "beam:schema:go:logical:v1"
)

func optGoNillable() *pipepb.Option {
	return newToggleOption(optGoNillableUrn)
}

func optGoEmbedded() *pipepb.Option {
	return newToggleOption(optGoEmbeddedUrn)
}

// newToggleOption constructs an Option whose presence is all
// that matters, rather than other configuration. The option
// is not set if the toggle isn't true, so the value is always
// true.
func newToggleOption(urn string) *pipepb.Option {
	return &pipepb.Option{
		Name: urn,
		Type: &pipepb.FieldType{
			TypeInfo: &pipepb.FieldType_AtomicType{
				AtomicType: pipepb.AtomicType_BOOLEAN,
			},
		},
		Value: &pipepb.FieldValue{
			FieldValue: &pipepb.FieldValue_AtomicValue{
				AtomicValue: &pipepb.AtomicTypeValue{
					Value: &pipepb.AtomicTypeValue_Boolean{
						Boolean: true,
					},
				},
			},
		},
	}
}

func checkOptions(opts []*pipepb.Option, urn string) *pipepb.Option {
	for _, opt := range opts {
		if opt.GetName() == urn {
			return opt
		}
	}
	return nil
}

// nillableFromOptions converts the passed in type to it's pointer version
// if the option is present. This permits go types to be pointers.
func nillableFromOptions(opts []*pipepb.Option, t reflect.Type) reflect.Type {
	if checkOptions(opts, optGoNillableUrn) != nil {
		return reflect.PtrTo(t)
	}
	return nil
}

var optGoLogicalType = &pipepb.FieldType{
	TypeInfo: &pipepb.FieldType_AtomicType{
		AtomicType: pipepb.AtomicType_STRING,
	},
}

func logicalOption(lID string) *pipepb.Option {
	return &pipepb.Option{
		Name: optGoLogicalUrn,
		Type: optGoLogicalType,
		Value: &pipepb.FieldValue{
			FieldValue: &pipepb.FieldValue_AtomicValue{
				AtomicValue: &pipepb.AtomicTypeValue{
					Value: &pipepb.AtomicTypeValue_String_{
						String_: lID,
					},
				},
			},
		},
	}
}

// fromLogicalOption returns the logical type id of this top
// level type if this schema has a logical equivalent.
func fromLogicalOption(opts []*pipepb.Option) (string, bool) {
	o := checkOptions(opts, optGoLogicalUrn)
	if o == nil {
		return "", false
	}
	lID := o.GetValue().GetAtomicValue().GetString_()
	return lID, true
}

func (r *Registry) structToSchema(t reflect.Type) (*pipepb.Schema, error) {
	if t.Kind() != reflect.Struct {
		return nil, errors.Errorf("non struct type received in structToSchema: %v is kind %v", t, t.Kind())
	}
	if schm, ok := r.typeToSchema[t]; ok {
		return schm, nil
	}

	ftype, lID, err := r.logicalTypeToFieldType(t)
	if err != nil {
		return nil, err
	}
	if ftype != nil {
		schm := ftype.GetRowType().GetSchema()
		schm = proto.Clone(schm).(*pipepb.Schema)
		schm.Options = append(schm.Options, logicalOption(lID))
		schm.Id = getUUID(t)
		r.typeToSchema[t] = schm
		r.idToType[schm.GetId()] = t
		return schm, nil
	}

	fields := make([]*pipepb.Field, 0, t.NumField())
	for i := 0; i < t.NumField(); i++ {
		sf := t.Field(i)
		ignore, isAnon, err := ignoreField(t, sf)
		if err != nil {
			return nil, err
		}
		if ignore {
			continue
		}
		f, err := r.structFieldToField(sf)
		if err != nil {
			return nil, errors.Wrapf(err, "cannot convert field %v to schema", t.Field(i).Name)
		}
		if isAnon {
			f = proto.Clone(f).(*pipepb.Field)
			f.Options = append(f.Options, optGoEmbedded())
		}
		fields = append(fields, f)
	}

	schm := &pipepb.Schema{
		Fields: fields,
		Id:     getUUID(t),
	}
	r.idToType[schm.GetId()] = t
	r.typeToSchema[t] = schm
	return schm, nil
}

func (r *Registry) structFieldToField(sf reflect.StructField) (*pipepb.Field, error) {
	name := sf.Name
	if tag := sf.Tag.Get("beam"); tag != "" {
		name, _ = parseTag(tag)
	}
	ftype, err := r.reflectTypeToFieldType(sf.Type)
	if err != nil {
		return nil, err
	}
	return &pipepb.Field{
		Name: name,
		Type: ftype,
	}, nil
}

func (r *Registry) reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) {
	ftype, lID, err := r.logicalTypeToFieldType(ot)
	if err != nil {
		return nil, err
	}
	if ftype != nil {
		return &pipepb.FieldType{
			TypeInfo: &pipepb.FieldType_LogicalType{
				LogicalType: &pipepb.LogicalType{
					Urn:            lID,
					Representation: ftype,
					// TODO(BEAM-9615): Handle type Arguments.
				},
			},
		}, nil
	}

	t := ot
	switch t.Kind() {
	case reflect.Ptr:
		vt, err := r.reflectTypeToFieldType(t.Elem())
		if err != nil {
			return nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot)
		}
		vt.Nullable = true
		return vt, nil
	case reflect.Map:
		kt, err := r.reflectTypeToFieldType(t.Key())
		if err != nil {
			return nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot)
		}
		vt, err := r.reflectTypeToFieldType(t.Elem())
		if err != nil {
			return nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot)
		}
		return &pipepb.FieldType{
			TypeInfo: &pipepb.FieldType_MapType{
				MapType: &pipepb.MapType{
					KeyType:   kt,
					ValueType: vt,
				},
			},
		}, nil
	case reflect.Struct:
		sch, err := r.structToSchema(t)
		if err != nil {
			return nil, errors.Wrapf(err, "unable to convert %v to schema field", ot)
		}
		return &pipepb.FieldType{
			TypeInfo: &pipepb.FieldType_RowType{
				RowType: &pipepb.RowType{
					Schema: sch,
				},
			},
		}, nil
	case reflect.Slice, reflect.Array:
		// Special handling for []byte
		if t == reflectx.ByteSlice {
			return &pipepb.FieldType{
				TypeInfo: &pipepb.FieldType_AtomicType{
					AtomicType: pipepb.AtomicType_BYTES,
				},
			}, nil
		}
		vt, err := r.reflectTypeToFieldType(t.Elem())
		if err != nil {
			return nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot)
		}
		return &pipepb.FieldType{
			TypeInfo: &pipepb.FieldType_ArrayType{
				ArrayType: &pipepb.ArrayType{
					ElementType: vt,
				},
			},
		}, nil
	case reflect.Interface, reflect.Func, reflect.Chan, reflect.UnsafePointer, reflect.Complex128, reflect.Complex64, reflect.Invalid:
		return nil, errors.Errorf("unable to convert unsupported type %v to schema", ot)
	default: // must be an atomic type
		if enum, ok := reflectTypeToAtomicTypeMap[t.Kind()]; ok {
			return &pipepb.FieldType{
				TypeInfo: &pipepb.FieldType_AtomicType{
					AtomicType: enum,
				},
			}, nil
		}
		return nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t)
	}
}

var reflectTypeToAtomicTypeMap = map[reflect.Kind]pipepb.AtomicType{
	reflect.Uint8:   pipepb.AtomicType_BYTE,
	reflect.Int16:   pipepb.AtomicType_INT16,
	reflect.Int32:   pipepb.AtomicType_INT32,
	reflect.Int64:   pipepb.AtomicType_INT64,
	reflect.Float32: pipepb.AtomicType_FLOAT,
	reflect.Float64: pipepb.AtomicType_DOUBLE,
	reflect.String:  pipepb.AtomicType_STRING,
	reflect.Bool:    pipepb.AtomicType_BOOLEAN,
}

var reflectKindToTypeMap = map[reflect.Kind]reflect.Type{
	reflect.Uint:    reflectx.Uint,
	reflect.Uint8:   reflectx.Uint8,
	reflect.Uint16:  reflectx.Uint16,
	reflect.Uint32:  reflectx.Uint32,
	reflect.Uint64:  reflectx.Uint64,
	reflect.Int:     reflectx.Int,
	reflect.Int8:    reflectx.Int8,
	reflect.Int16:   reflectx.Int16,
	reflect.Int32:   reflectx.Int32,
	reflect.Int64:   reflectx.Int64,
	reflect.Float32: reflectx.Float32,
	reflect.Float64: reflectx.Float64,
	reflect.String:  reflectx.String,
	reflect.Bool:    reflectx.Bool,
}

var emptyStructType = reflect.TypeOf((*struct{})(nil)).Elem()

// ToType returns a Go type of the passed in Schema.
// Types returned by ToType are always of Struct kind.
// Returns an error if the Schema cannot be converted to a type.
func (r *Registry) ToType(s *pipepb.Schema) (reflect.Type, error) {
	if err := r.reconcileRegistrations(); err != nil {
		return nil, errors.Wrap(err, "reconciling for ToType")
	}
	return r.toType(s)
}

func (r *Registry) toType(s *pipepb.Schema) (reflect.Type, error) {
	if t, ok := r.idToType[s.GetId()]; ok {
		return t, nil
	}
	if lID, ok := fromLogicalOption(s.GetOptions()); ok {
		if lt, ok := r.logicalTypes[lID]; ok {
			return lt.GoType(), nil
		}
	}

	fields := make([]reflect.StructField, 0, len(s.GetFields()))
	for _, sf := range s.GetFields() {
		rf, err := r.fieldToStructField(sf)
		if err != nil {
			return nil, errors.Wrapf(err, "cannot convert schema field %v to field", sf.GetName())
		}
		if checkOptions(sf.Options, optGoEmbeddedUrn) != nil {
			rf.Anonymous = true
		}
		fields = append(fields, rf)
	}
	ret := reflect.StructOf(fields)
	if ut, ok := r.syntheticToUser[ret]; ok {
		ret = ut
	}
	if t := nillableFromOptions(s.GetOptions(), ret); t != nil {
		return t, nil
	}
	return ret, nil
}

func (r *Registry) fieldToStructField(sf *pipepb.Field) (reflect.StructField, error) {
	name := sf.GetName()
	rt, err := r.fieldTypeToReflectType(sf.GetType(), sf.Options)
	if err != nil {
		return reflect.StructField{}, err
	}

	rsf := reflect.StructField{
		Name: strings.ToUpper(name[:1]) + name[1:], // Go field name must be capitalized for export and encoding.
		Type: rt,
	}
	// Add a name tag if they don't match.
	if name != rsf.Name {
		rsf.Tag = reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name))
	}
	return rsf, nil
}

var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{
	pipepb.AtomicType_BYTE:    reflectx.Uint8,
	pipepb.AtomicType_INT16:   reflectx.Int16,
	pipepb.AtomicType_INT32:   reflectx.Int32,
	pipepb.AtomicType_INT64:   reflectx.Int64,
	pipepb.AtomicType_FLOAT:   reflectx.Float32,
	pipepb.AtomicType_DOUBLE:  reflectx.Float64,
	pipepb.AtomicType_STRING:  reflectx.String,
	pipepb.AtomicType_BOOLEAN: reflectx.Bool,
	pipepb.AtomicType_BYTES:   reflectx.ByteSlice,
}

func (r *Registry) fieldTypeToReflectType(sft *pipepb.FieldType, opts []*pipepb.Option) (reflect.Type, error) {
	var t reflect.Type
	switch sft.GetTypeInfo().(type) {
	case *pipepb.FieldType_AtomicType:
		var ok bool
		if t, ok = atomicTypeToReflectType[sft.GetAtomicType()]; !ok {
			return nil, errors.Errorf("unknown atomic type: %v", sft.GetAtomicType())
		}
	case *pipepb.FieldType_ArrayType:
		rt, err := r.fieldTypeToReflectType(sft.GetArrayType().GetElementType(), nil)
		if err != nil {
			return nil, errors.Wrap(err, "unable to convert array element type")
		}
		t = reflect.SliceOf(rt)
	case *pipepb.FieldType_MapType:
		kt, err := r.fieldTypeToReflectType(sft.GetMapType().GetKeyType(), nil)
		if err != nil {
			return nil, errors.Wrap(err, "unable to convert map key type")
		}
		vt, err := r.fieldTypeToReflectType(sft.GetMapType().GetValueType(), nil)
		if err != nil {
			return nil, errors.Wrap(err, "unable to convert map value type")
		}
		t = reflect.MapOf(kt, vt) // Panics for invalid map keys (slices/iterables)
	case *pipepb.FieldType_RowType:
		rt, err := r.toType(sft.GetRowType().GetSchema())
		if err != nil {
			return nil, errors.Wrapf(err, "unable to convert row type: %v", sft.GetRowType().GetSchema().GetId())
		}
		t = rt
	// case *pipepb.FieldType_IterableType:
	// TODO(BEAM-9615): handle IterableTypes (eg. CoGBK values)

	case *pipepb.FieldType_LogicalType:
		lst := sft.GetLogicalType()
		identifier := lst.GetUrn()
		lt, ok := r.logicalTypes[identifier]
		if !ok {
			return nil, errors.Errorf("unknown logical type: %v", identifier)
		}
		t = lt.GoType()

	default:
		return nil, errors.Errorf("unknown fieldtype: %T", sft.GetTypeInfo())
	}
	if sft.GetNullable() {
		return reflect.PtrTo(t), nil
	}
	return t, nil
}

// parseTag splits a struct field's beam tag into its name and
// comma-separated options.
func parseTag(tag string) (string, options) {
	if idx := strings.Index(tag, ","); idx != -1 {
		return tag[:idx], options(tag[idx+1:])
	}
	return tag, options("")
}

type options string

// TODO(BEAM-9615): implement looking up specific options from the tags.
