blob: 7d98f7912da3e362c6b22b53b6d2747a97067574 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package beam
import (
"bytes"
"encoding/json"
"fmt"
"io"
"reflect"
"sync"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/jsonx"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
protov1 "github.com/golang/protobuf/proto"
protov2 "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)
// EnableSchemas is a temporary configuration variable
// to use Beam Schema encoding by default instead of JSON.
// Before it is removed, it will be set to true by default
// and then eventually removed.
//
// Only users who rely on default JSON marshalling behaviour should set
// this explicitly.
var EnableSchemas bool = false
type jsonCoder interface {
json.Marshaler
json.Unmarshaler
}
var protoMessageType = reflect.TypeOf((*protov1.Message)(nil)).Elem()
var protoReflectMessageType = reflect.TypeOf((*protoreflect.ProtoMessage)(nil)).Elem()
var jsonCoderType = reflect.TypeOf((*jsonCoder)(nil)).Elem()
func init() {
coder.RegisterCoder(protoMessageType, protoEnc, protoDec)
coder.RegisterCoder(protoReflectMessageType, protoEnc, protoDec)
}
// Coder defines how to encode and decode values of type 'A' into byte streams.
// Coders are attached to PCollections of the same type. For PCollections
// consumed by GBK, the attached coders are required to be deterministic.
type Coder struct {
coder *coder.Coder
}
// IsValid returns true iff the Coder is valid. Any use of an invalid Coder
// will result in a panic.
func (c Coder) IsValid() bool {
return c.coder != nil
}
// Type returns the full type 'A' of elements the coder can encode and decode.
// 'A' must be a concrete full type, such as int or KV<int,string>.
func (c Coder) Type() FullType {
if !c.IsValid() {
panic("Invalid Coder")
}
return c.coder.T
}
func (c Coder) String() string {
if c.coder == nil {
return "$"
}
return c.coder.String()
}
// NewElementEncoder returns a new encoding function for the given type.
func NewElementEncoder(t reflect.Type) ElementEncoder {
c, err := inferCoder(typex.New(t))
if err != nil {
panic(err)
}
return &execEncoder{enc: exec.MakeElementEncoder(c)}
}
// execEncoder wraps an exec.ElementEncoder to implement the ElementDecoder interface
// in this package.
type execEncoder struct {
enc exec.ElementEncoder
coder *coder.Coder
}
func (e *execEncoder) Encode(element interface{}, w io.Writer) error {
return e.enc.Encode(&exec.FullValue{Elm: element}, w)
}
func (e *execEncoder) String() string {
return e.coder.String()
}
// NewElementDecoder returns an ElementDecoder the given type.
func NewElementDecoder(t reflect.Type) ElementDecoder {
c, err := inferCoder(typex.New(t))
if err != nil {
panic(err)
}
return &execDecoder{dec: exec.MakeElementDecoder(c)}
}
// execDecoder wraps an exec.ElementDecoder to implement the ElementDecoder interface
// in this package.
type execDecoder struct {
dec exec.ElementDecoder
coder *coder.Coder
}
func (d *execDecoder) Decode(r io.Reader) (interface{}, error) {
fv, err := d.dec.Decode(r)
if err != nil {
return nil, err
}
return fv.Elm, nil
}
func (d *execDecoder) String() string {
return d.coder.String()
}
// NewCoder infers a Coder for any bound full type.
func NewCoder(t FullType) Coder {
c, err := inferCoder(t)
if err != nil {
panic(err) // for now
}
return Coder{c}
}
func inferCoder(t FullType) (*coder.Coder, error) {
switch t.Class() {
case typex.Concrete, typex.Container:
switch t.Type() {
case reflectx.Int64:
// use the beam varint coder.
return &coder.Coder{Kind: coder.VarInt, T: t}, nil
case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32:
c, err := coderx.NewVarIntZ(t.Type())
if err != nil {
return nil, err
}
return coder.CoderFrom(c), nil
case reflectx.Uint, reflectx.Uint8, reflectx.Uint16, reflectx.Uint32, reflectx.Uint64:
c, err := coderx.NewVarUintZ(t.Type())
if err != nil {
return nil, err
}
return coder.CoderFrom(c), nil
case reflectx.Float32:
c, err := coderx.NewFloat(t.Type())
if err != nil {
return nil, err
}
return coder.CoderFrom(c), nil
case reflectx.Float64:
return &coder.Coder{Kind: coder.Double, T: t}, nil
case reflectx.String:
return &coder.Coder{Kind: coder.String, T: t}, nil
case reflectx.ByteSlice:
return &coder.Coder{Kind: coder.Bytes, T: t}, nil
case reflectx.Bool:
return &coder.Coder{Kind: coder.Bool, T: t}, nil
default:
et := t.Type()
if c := coder.LookupCustomCoder(et); c != nil {
return coder.CoderFrom(c), nil
}
if EnableSchemas {
switch et.Kind() {
case reflect.Ptr:
if et.Elem().Kind() != reflect.Struct {
break
}
fallthrough
case reflect.Struct:
return &coder.Coder{Kind: coder.Row, T: t}, nil
}
}
// Interface types that implement JSON marshalling can be handled by the default coder.
// otherwise, inference needs to fail here.
if et.Kind() == reflect.Interface && !et.Implements(jsonCoderType) {
return nil, errors.Errorf("inferCoder failed: interface type %v has no coder registered", et)
}
c, err := newJSONCoder(et)
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
}
case typex.Composite:
c, err := inferCoders(t.Components())
if err != nil {
return nil, err
}
switch t.Type() {
case typex.KVType:
return &coder.Coder{Kind: coder.KV, T: t, Components: c}, nil
case typex.CoGBKType:
return &coder.Coder{Kind: coder.CoGBK, T: t, Components: c}, nil
case typex.WindowedValueType:
// TODO(herohde) 4/15/2018: do we ever infer W types now that PCollections
// are non-windowed? We either need to know the windowing strategy or
// we should remove this case.
return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: c, Window: coder.NewGlobalWindow()}, nil
default:
panic(fmt.Sprintf("Unexpected composite type: %v", t))
}
default:
panic(fmt.Sprintf("Unexpected type: %v", t))
}
}
func inferCoders(list []FullType) ([]*coder.Coder, error) {
var ret []*coder.Coder
for _, t := range list {
c, err := inferCoder(t)
if err != nil {
return nil, err
}
ret = append(ret, c)
}
return ret, nil
}
// protoEnc marshals the supplied proto.Message.
func protoEnc(in T) ([]byte, error) {
var p protoreflect.ProtoMessage
switch it := in.(type) {
case protoreflect.ProtoMessage:
p = it
case protov1.Message:
p = protov1.MessageV2(it)
}
b, err := protov2.MarshalOptions{Deterministic: true}.Marshal(p)
if err != nil {
return nil, err
}
return b, nil
}
// protoDec unmarshals the supplied bytes into an instance of the supplied
// proto.Message type.
func protoDec(t reflect.Type, in []byte) (T, error) {
var p protoreflect.ProtoMessage
switch it := reflect.New(t.Elem()).Interface().(type) {
case protoreflect.ProtoMessage:
p = it
case protov1.Message:
p = protov1.MessageV2(it)
}
err := protov2.UnmarshalOptions{}.Unmarshal(in, p)
if err != nil {
return nil, err
}
return p, nil
}
// Concrete and universal custom coders both have a similar signature.
// Conversion is handled by reflection.
// jsonEnc encodes the supplied value in JSON.
func jsonEnc(in T) ([]byte, error) {
return jsonx.Marshal(in)
}
// jsonDec decodes the supplied JSON into an instance of the supplied type.
func jsonDec(t reflect.Type, in []byte) (T, error) {
val := reflect.New(t)
if err := jsonx.Unmarshal(val.Interface(), in); err != nil {
return nil, err
}
return val.Elem().Interface(), nil
}
func newJSONCoder(t reflect.Type) (*coder.CustomCoder, error) {
c, err := coder.NewCustomCoder("json", t, jsonEnc, jsonDec)
if err != nil {
return nil, errors.Wrapf(err, "invalid coder")
}
return c, nil
}
// These maps and mutexes are actuated per element, which can be expensive.
var (
encMu sync.Mutex
schemaEncs = map[reflect.Type]func(interface{}, io.Writer) error{}
decMu sync.Mutex
schemaDecs = map[reflect.Type]func(io.Reader) (interface{}, error){}
)
// schemaEnc encodes the supplied value as beam schema.
func schemaEnc(t reflect.Type, in T) ([]byte, error) {
switch t.Kind() {
case reflect.Slice, reflect.Array:
t = t.Elem()
}
encMu.Lock()
enc, ok := schemaEncs[t]
if !ok {
var err error
enc, err = coder.RowEncoderForStruct(t)
if err != nil {
encMu.Unlock()
return nil, err
}
schemaEncs[t] = enc
}
encMu.Unlock()
var buf bytes.Buffer
if err := enc(in, &buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// schemaDec decodes the supplied beam schema into an instance of the supplied type.
func schemaDec(t reflect.Type, in []byte) (T, error) {
switch t.Kind() {
case reflect.Slice, reflect.Array:
t = t.Elem()
}
decMu.Lock()
dec, ok := schemaDecs[t]
if !ok {
var err error
dec, err = coder.RowDecoderForStruct(t)
if err != nil {
decMu.Unlock()
return nil, err
}
schemaDecs[t] = dec
}
decMu.Unlock()
buf := bytes.NewBuffer(in)
val, err := dec(buf)
if err != nil {
return nil, err
}
return val, nil
}
func newSchemaCoder(t reflect.Type) (*coder.CustomCoder, error) {
c, err := coder.NewCustomCoder("schema", t, schemaEnc, schemaDec)
if err != nil {
return nil, errors.Wrapf(err, "invalid coder")
}
return c, nil
}