| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package beam |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "fmt" |
| "io" |
| "reflect" |
| "sync" |
| |
| "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/schema" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/typex" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/util/jsonx" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" |
| "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" |
| protov1 "github.com/golang/protobuf/proto" |
| protov2 "google.golang.org/protobuf/proto" |
| "google.golang.org/protobuf/reflect/protoreflect" |
| ) |
| |
| // EnableSchemas is a temporary configuration variable |
| // to use Beam Schema encoding by default instead of JSON. |
| // Before it is removed, it will be set to true by default |
| // and then eventually removed. |
| // |
| // Only users who rely on default JSON marshalling behaviour should set |
| // this explicitly, and file an issue on the BEAM JIRA so the issue may |
| // be resolved. |
| // https://issues.apache.org/jira/projects/BEAM/issues/ |
| var EnableSchemas bool = false |
| |
| func init() { |
| runtime.RegisterInit(func() { |
| if EnableSchemas { |
| schema.Initialize() |
| } |
| }) |
| } |
| |
| type jsonCoder interface { |
| json.Marshaler |
| json.Unmarshaler |
| } |
| |
| var protoMessageType = reflect.TypeOf((*protov1.Message)(nil)).Elem() |
| var protoReflectMessageType = reflect.TypeOf((*protoreflect.ProtoMessage)(nil)).Elem() |
| var jsonCoderType = reflect.TypeOf((*jsonCoder)(nil)).Elem() |
| |
| func init() { |
| coder.RegisterCoder(protoMessageType, protoEnc, protoDec) |
| coder.RegisterCoder(protoReflectMessageType, protoEnc, protoDec) |
| } |
| |
| // Coder defines how to encode and decode values of type 'A' into byte streams. |
| // Coders are attached to PCollections of the same type. For PCollections |
| // consumed by GBK, the attached coders are required to be deterministic. |
| type Coder struct { |
| coder *coder.Coder |
| } |
| |
| // IsValid returns true iff the Coder is valid. Any use of an invalid Coder |
| // will result in a panic. |
| func (c Coder) IsValid() bool { |
| return c.coder != nil |
| } |
| |
| // Type returns the full type 'A' of elements the coder can encode and decode. |
| // 'A' must be a concrete full type, such as int or KV<int,string>. |
| func (c Coder) Type() FullType { |
| if !c.IsValid() { |
| panic("Invalid Coder") |
| } |
| return c.coder.T |
| } |
| |
| func (c Coder) String() string { |
| if c.coder == nil { |
| return "$" |
| } |
| return c.coder.String() |
| } |
| |
| // NewElementEncoder returns a new encoding function for the given type. |
| func NewElementEncoder(t reflect.Type) ElementEncoder { |
| c, err := inferCoder(typex.New(t)) |
| if err != nil { |
| panic(err) |
| } |
| return &execEncoder{enc: exec.MakeElementEncoder(c)} |
| } |
| |
| // execEncoder wraps an exec.ElementEncoder to implement the ElementDecoder interface |
| // in this package. |
| type execEncoder struct { |
| enc exec.ElementEncoder |
| coder *coder.Coder |
| } |
| |
| func (e *execEncoder) Encode(element interface{}, w io.Writer) error { |
| return e.enc.Encode(&exec.FullValue{Elm: element}, w) |
| } |
| |
| func (e *execEncoder) String() string { |
| return e.coder.String() |
| } |
| |
| // NewElementDecoder returns an ElementDecoder the given type. |
| func NewElementDecoder(t reflect.Type) ElementDecoder { |
| c, err := inferCoder(typex.New(t)) |
| if err != nil { |
| panic(err) |
| } |
| return &execDecoder{dec: exec.MakeElementDecoder(c)} |
| } |
| |
| // execDecoder wraps an exec.ElementDecoder to implement the ElementDecoder interface |
| // in this package. |
| type execDecoder struct { |
| dec exec.ElementDecoder |
| coder *coder.Coder |
| } |
| |
| func (d *execDecoder) Decode(r io.Reader) (interface{}, error) { |
| fv, err := d.dec.Decode(r) |
| if err != nil { |
| return nil, err |
| } |
| return fv.Elm, nil |
| } |
| |
| func (d *execDecoder) String() string { |
| return d.coder.String() |
| } |
| |
| // NewCoder infers a Coder for any bound full type. |
| func NewCoder(t FullType) Coder { |
| c, err := inferCoder(t) |
| if err != nil { |
| panic(err) // for now |
| } |
| return Coder{c} |
| } |
| |
| func inferCoder(t FullType) (*coder.Coder, error) { |
| switch t.Class() { |
| case typex.Concrete, typex.Container: |
| switch t.Type() { |
| case reflectx.Int64: |
| // use the beam varint coder. |
| return &coder.Coder{Kind: coder.VarInt, T: t}, nil |
| case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32: |
| c, err := coderx.NewVarIntZ(t.Type()) |
| if err != nil { |
| return nil, err |
| } |
| return coder.CoderFrom(c), nil |
| case reflectx.Uint, reflectx.Uint8, reflectx.Uint16, reflectx.Uint32, reflectx.Uint64: |
| c, err := coderx.NewVarUintZ(t.Type()) |
| if err != nil { |
| return nil, err |
| } |
| return coder.CoderFrom(c), nil |
| |
| case reflectx.Float32: |
| c, err := coderx.NewFloat(t.Type()) |
| if err != nil { |
| return nil, err |
| } |
| return coder.CoderFrom(c), nil |
| |
| case reflectx.Float64: |
| return &coder.Coder{Kind: coder.Double, T: t}, nil |
| |
| case reflectx.String: |
| return &coder.Coder{Kind: coder.String, T: t}, nil |
| |
| case reflectx.ByteSlice: |
| return &coder.Coder{Kind: coder.Bytes, T: t}, nil |
| |
| case reflectx.Bool: |
| return &coder.Coder{Kind: coder.Bool, T: t}, nil |
| |
| default: |
| et := t.Type() |
| if c := coder.LookupCustomCoder(et); c != nil { |
| return coder.CoderFrom(c), nil |
| } |
| |
| if EnableSchemas { |
| switch et.Kind() { |
| case reflect.Ptr: |
| if et.Elem().Kind() != reflect.Struct { |
| break |
| } |
| fallthrough |
| case reflect.Struct: |
| return &coder.Coder{Kind: coder.Row, T: t}, nil |
| } |
| } |
| |
| // Interface types that implement JSON marshalling can be handled by the default coder. |
| // otherwise, inference needs to fail here. |
| if et.Kind() == reflect.Interface && !et.Implements(jsonCoderType) { |
| return nil, errors.Errorf("inferCoder failed: interface type %v has no coder registered", et) |
| } |
| |
| c, err := newJSONCoder(et) |
| if err != nil { |
| return nil, err |
| } |
| return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil |
| } |
| |
| case typex.Composite: |
| c, err := inferCoders(t.Components()) |
| if err != nil { |
| return nil, err |
| } |
| |
| switch t.Type() { |
| case typex.KVType: |
| return &coder.Coder{Kind: coder.KV, T: t, Components: c}, nil |
| case typex.CoGBKType: |
| return &coder.Coder{Kind: coder.CoGBK, T: t, Components: c}, nil |
| case typex.WindowedValueType: |
| // TODO(herohde) 4/15/2018: do we ever infer W types now that PCollections |
| // are non-windowed? We either need to know the windowing strategy or |
| // we should remove this case. |
| return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: c, Window: coder.NewGlobalWindow()}, nil |
| |
| default: |
| panic(fmt.Sprintf("Unexpected composite type: %v", t)) |
| } |
| default: |
| panic(fmt.Sprintf("Unexpected type: %v", t)) |
| } |
| } |
| |
| func inferCoders(list []FullType) ([]*coder.Coder, error) { |
| var ret []*coder.Coder |
| for _, t := range list { |
| c, err := inferCoder(t) |
| if err != nil { |
| return nil, err |
| } |
| ret = append(ret, c) |
| } |
| return ret, nil |
| } |
| |
| // protoEnc marshals the supplied proto.Message. |
| func protoEnc(in T) ([]byte, error) { |
| var p protoreflect.ProtoMessage |
| switch it := in.(type) { |
| case protoreflect.ProtoMessage: |
| p = it |
| case protov1.Message: |
| p = protov1.MessageV2(it) |
| } |
| b, err := protov2.MarshalOptions{Deterministic: true}.Marshal(p) |
| if err != nil { |
| return nil, err |
| } |
| return b, nil |
| } |
| |
| // protoDec unmarshals the supplied bytes into an instance of the supplied |
| // proto.Message type. |
| func protoDec(t reflect.Type, in []byte) (T, error) { |
| var p protoreflect.ProtoMessage |
| switch it := reflect.New(t.Elem()).Interface().(type) { |
| case protoreflect.ProtoMessage: |
| p = it |
| case protov1.Message: |
| p = protov1.MessageV2(it) |
| } |
| err := protov2.UnmarshalOptions{}.Unmarshal(in, p) |
| if err != nil { |
| return nil, err |
| } |
| return p, nil |
| } |
| |
| // Concrete and universal custom coders both have a similar signature. |
| // Conversion is handled by reflection. |
| |
| // jsonEnc encodes the supplied value in JSON. |
| func jsonEnc(in T) ([]byte, error) { |
| return jsonx.Marshal(in) |
| } |
| |
| // jsonDec decodes the supplied JSON into an instance of the supplied type. |
| func jsonDec(t reflect.Type, in []byte) (T, error) { |
| val := reflect.New(t) |
| if err := jsonx.Unmarshal(val.Interface(), in); err != nil { |
| return nil, err |
| } |
| return val.Elem().Interface(), nil |
| } |
| |
| func newJSONCoder(t reflect.Type) (*coder.CustomCoder, error) { |
| c, err := coder.NewCustomCoder("json", t, jsonEnc, jsonDec) |
| if err != nil { |
| return nil, errors.Wrapf(err, "invalid coder") |
| } |
| return c, nil |
| } |
| |
| // These maps and mutexes are actuated per element, which can be expensive. |
| var ( |
| encMu sync.Mutex |
| schemaEncs = map[reflect.Type]func(interface{}, io.Writer) error{} |
| |
| decMu sync.Mutex |
| schemaDecs = map[reflect.Type]func(io.Reader) (interface{}, error){} |
| ) |
| |
| // schemaEnc encodes the supplied value as beam schema. |
| func schemaEnc(t reflect.Type, in T) ([]byte, error) { |
| switch t.Kind() { |
| case reflect.Slice, reflect.Array: |
| t = t.Elem() |
| } |
| encMu.Lock() |
| enc, ok := schemaEncs[t] |
| if !ok { |
| var err error |
| enc, err = coder.RowEncoderForStruct(t) |
| if err != nil { |
| encMu.Unlock() |
| return nil, err |
| } |
| schemaEncs[t] = enc |
| } |
| encMu.Unlock() |
| var buf bytes.Buffer |
| if err := enc(in, &buf); err != nil { |
| return nil, err |
| } |
| return buf.Bytes(), nil |
| } |
| |
| // schemaDec decodes the supplied beam schema into an instance of the supplied type. |
| func schemaDec(t reflect.Type, in []byte) (T, error) { |
| switch t.Kind() { |
| case reflect.Slice, reflect.Array: |
| t = t.Elem() |
| } |
| decMu.Lock() |
| dec, ok := schemaDecs[t] |
| if !ok { |
| var err error |
| dec, err = coder.RowDecoderForStruct(t) |
| if err != nil { |
| decMu.Unlock() |
| return nil, err |
| } |
| schemaDecs[t] = dec |
| } |
| decMu.Unlock() |
| buf := bytes.NewBuffer(in) |
| val, err := dec(buf) |
| if err != nil { |
| return nil, err |
| } |
| return val, nil |
| } |
| |
| func newSchemaCoder(t reflect.Type) (*coder.CustomCoder, error) { |
| c, err := coder.NewCustomCoder("schema", t, schemaEnc, schemaDec) |
| if err != nil { |
| return nil, errors.Wrapf(err, "invalid coder") |
| } |
| return c, nil |
| } |