blob: 16fed5c9887af127f2e67c19ddebcb1e3ec44072 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package beam_test
import (
"bytes"
"fmt"
"io"
"reflect"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/google/go-cmp/cmp"
)
// RegisterSchemaProvider must be called before beam.Init, and conventionally in a package init block.
func init() {
beam.RegisterSchemaProvider(reflect.TypeOf((*Alphabet)(nil)).Elem(), &AlphabetProvider{})
// TODO(BEAM-9615): Registerying a self encoding type causes a cycle. Needs resolving.
// beam.RegisterType(reflect.TypeOf((*Cyrillic)(nil)))
beam.RegisterType(reflect.TypeOf((*Latin)(nil)))
beam.RegisterType(reflect.TypeOf((*Ελληνικά)(nil)))
}
type Alphabet interface {
alphabet() string
}
type Cyrillic struct {
A, B int
}
func (*Cyrillic) alphabet() string {
return "Cyrillic"
}
type Latin struct {
// Unexported fields are not serializable by beam schemas by default
// so we need to handle this ourselves.
c uint64
d *float32
}
func (*Latin) alphabet() string {
return "Latin"
}
type Ελληνικά struct {
q string
G func() string
}
func (*Ελληνικά) alphabet() string {
return "Ελληνικά"
}
// AlphabetProvider provides encodings for types that implement the Alphabet interface.
type AlphabetProvider struct {
enc *coder.RowEncoderBuilder
dec *coder.RowDecoderBuilder
}
var (
typeCyrillic = reflect.TypeOf((*Cyrillic)(nil))
typeLatin = reflect.TypeOf((*Latin)(nil))
typeΕλληνικά = reflect.TypeOf((*Ελληνικά)(nil))
)
func (p *AlphabetProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) {
// FromLogicalType produces schema representative types, which match the encoders
// and decoders that this function generates for this type.
// While this example uses statically assigned schema representative types, it's
// possible to generate the returned reflect.Type dynamically instead, using the
// reflect package.
switch rt {
// The Cyrillic type is able to be encoded by default, so we simply use it directly
// as it's own representative type.
case typeCyrillic:
return typeCyrillic, nil
case typeLatin:
// The Latin type only has unexported fields, so we need to make the equivalent
// have exported fields.
return reflect.TypeOf((*struct {
C uint64
D *float32
})(nil)).Elem(), nil
case typeΕλληνικά:
return reflect.TypeOf((*struct{ Q string })(nil)).Elem(), nil
}
return nil, fmt.Errorf("Unknown Alphabet: %v", rt)
}
// BuildEncoder returns beam schema encoder functions for types with the Alphabet interface.
func (p *AlphabetProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) {
switch rt {
case typeCyrillic:
if p.enc == nil {
p.enc = &coder.RowEncoderBuilder{}
}
// Since Cyrillic is by default encodable, defer to the standard schema row decoder for the type.
return p.enc.Build(rt)
case typeLatin:
return func(iface interface{}, w io.Writer) error {
v := iface.(*Latin)
// Beam Schema Rows have a header that indicates which fields if any, are nil.
if err := coder.WriteRowHeader(2, func(i int) bool {
if i == 1 {
return v.d == nil
}
return false
}, w); err != nil {
return err
}
// Afterwards, each field is encoded using the appropriate helper.
if err := coder.EncodeVarUint64(v.c, w); err != nil {
return err
}
// Nil fields have nothing written for them other than the header.
if v.d != nil {
if err := coder.EncodeDouble(float64(*v.d), w); err != nil {
return err
}
}
return nil
}, nil
case typeΕλληνικά:
return func(iface interface{}, w io.Writer) error {
// Since the representation for Ελληνικά never has nil fields
// we can use the simple header helper.
if err := coder.WriteSimpleRowHeader(1, w); err != nil {
return err
}
v := iface.(*Ελληνικά)
if err := coder.EncodeStringUTF8(v.q, w); err != nil {
return fmt.Errorf("decoding string field A: %v", err)
}
return nil
}, nil
}
return nil, fmt.Errorf("Unknown Alphabet: %v", rt)
}
// BuildDecoder returns beam schema decoder functions for types with the Alphabet interface.
func (p *AlphabetProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) {
switch rt {
case typeCyrillic:
if p.dec == nil {
p.dec = &coder.RowDecoderBuilder{}
}
// Since Cyrillic is by default encodable, defer to the standard schema row decoder for the type.
return p.dec.Build(rt)
case typeLatin:
return func(r io.Reader) (interface{}, error) {
// Since the d field can be nil, we use the header get the nil bits.
n, nils, err := coder.ReadRowHeader(r)
if err != nil {
return nil, err
}
// Header returns the number of fields, so we check if it has what we
// expect. This allows schemas to evolve if necessary.
if n != 2 {
return nil, fmt.Errorf("expected 2 fields, but got %v", n)
}
c, err := coder.DecodeVarUint64(r)
if err != nil {
return nil, err
}
// Check if the field is nil before trying to decode a value for it.
var d *float32
if !coder.IsFieldNil(nils, 1) {
f, err := coder.DecodeDouble(r)
if err != nil {
return nil, err
}
f32 := float32(f)
d = &f32
}
return &Latin{
c: c,
d: d,
}, nil
}, nil
case typeΕλληνικά:
return func(r io.Reader) (interface{}, error) {
// Since the representation for Ελληνικά never has nil fields
// we can use the simple header helper. Returns an error if
// something unexpected occurs.
if err := coder.ReadSimpleRowHeader(1, r); err != nil {
return nil, err
}
q, err := coder.DecodeStringUTF8(r)
if err != nil {
return nil, fmt.Errorf("decoding string field A: %v", err)
}
return &Ελληνικά{
q: q,
}, nil
}, nil
}
return nil, nil
}
// Schema providers work on fields of schema encoded types.
type translation struct {
C *Cyrillic
L *Latin
E *Ελληνικά
}
func ExampleRegisterSchemaProvider() {
f := float32(42.789)
want := translation{
C: &Cyrillic{A: 123, B: 456},
L: &Latin{c: 789, d: &f},
E: &Ελληνικά{q: "testing"},
}
rt := reflect.TypeOf((*translation)(nil)).Elem()
enc, err := coder.RowEncoderForStruct(rt)
if err != nil {
panic(err)
}
dec, err := coder.RowDecoderForStruct(rt)
if err != nil {
panic(err)
}
var buf bytes.Buffer
if err := enc(want, &buf); err != nil {
panic(err)
}
got, err := dec(&buf)
if err != nil {
panic(err)
}
if d := cmp.Diff(want, got,
cmp.AllowUnexported(Latin{}, Ελληνικά{})); d != "" {
fmt.Printf("diff in schema encoding translation: (-want,+got)\n%v\n", d)
} else {
fmt.Println("No diffs!")
}
// Output: No diffs!
}