blob: dbca4364c822f46d883a5f8c38f8391827fe5f51 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package beam
import (
"encoding/json"
"fmt"
"io"
"reflect"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"github.com/golang/protobuf/proto"
)
type jsonCoder interface {
json.Marshaler
json.Unmarshaler
}
var protoMessageType = reflect.TypeOf((*proto.Message)(nil)).Elem()
var jsonCoderType = reflect.TypeOf((*jsonCoder)(nil)).Elem()
func init() {
coder.RegisterCoder(protoMessageType, protoEnc, protoDec)
}
// Coder defines how to encode and decode values of type 'A' into byte streams.
// Coders are attached to PCollections of the same type. For PCollections
// consumed by GBK, the attached coders are required to be deterministic.
type Coder struct {
coder *coder.Coder
}
// IsValid returns true iff the Coder is valid. Any use of an invalid Coder
// will result in a panic.
func (c Coder) IsValid() bool {
return c.coder != nil
}
// Type returns the full type 'A' of elements the coder can encode and decode.
// 'A' must be a concrete full type, such as int or KV<int,string>.
func (c Coder) Type() FullType {
if !c.IsValid() {
panic("Invalid Coder")
}
return c.coder.T
}
func (c Coder) String() string {
if c.coder == nil {
return "$"
}
return c.coder.String()
}
// NewElementEncoder returns a new encoding function for the given type.
func NewElementEncoder(t reflect.Type) ElementEncoder {
c, err := inferCoder(typex.New(t))
if err != nil {
panic(err)
}
return &execEncoder{enc: exec.MakeElementEncoder(c)}
}
// execEncoder wraps an exec.ElementEncoder to implement the ElementDecoder interface
// in this package.
type execEncoder struct {
enc exec.ElementEncoder
coder *coder.Coder
}
func (e *execEncoder) Encode(element interface{}, w io.Writer) error {
return e.enc.Encode(&exec.FullValue{Elm: element}, w)
}
func (e *execEncoder) String() string {
return e.coder.String()
}
// NewElementDecoder returns an ElementDecoder the given type.
func NewElementDecoder(t reflect.Type) ElementDecoder {
c, err := inferCoder(typex.New(t))
if err != nil {
panic(err)
}
return &execDecoder{dec: exec.MakeElementDecoder(c)}
}
// execDecoder wraps an exec.ElementDecoder to implement the ElementDecoder interface
// in this package.
type execDecoder struct {
dec exec.ElementDecoder
coder *coder.Coder
}
func (d *execDecoder) Decode(r io.Reader) (interface{}, error) {
fv, err := d.dec.Decode(r)
if err != nil {
return nil, err
}
return fv.Elm, nil
}
func (d *execDecoder) String() string {
return d.coder.String()
}
// NewCoder infers a Coder for any bound full type.
func NewCoder(t FullType) Coder {
c, err := inferCoder(t)
if err != nil {
panic(err) // for now
}
return Coder{c}
}
func inferCoder(t FullType) (*coder.Coder, error) {
switch t.Class() {
case typex.Concrete, typex.Container:
switch t.Type() {
case reflectx.Int64:
// use the beam varint coder.
return &coder.Coder{Kind: coder.VarInt, T: t}, nil
case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32:
c, err := coderx.NewVarIntZ(t.Type())
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
case reflectx.Uint, reflectx.Uint8, reflectx.Uint16, reflectx.Uint32, reflectx.Uint64:
c, err := coderx.NewVarUintZ(t.Type())
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
case reflectx.Float32:
c, err := coderx.NewFloat(t.Type())
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
case reflectx.Float64:
return &coder.Coder{Kind: coder.Double, T: t}, nil
case reflectx.String:
c, err := coderx.NewString()
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
case reflectx.ByteSlice:
return &coder.Coder{Kind: coder.Bytes, T: t}, nil
case reflectx.Bool:
return &coder.Coder{Kind: coder.Bool, T: t}, nil
default:
et := t.Type()
if c := coder.LookupCustomCoder(et); c != nil {
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
}
// Interface types that implement JSON marshalling can be handled by the default coder.
// otherwise, inference needs to fail here.
if et.Kind() == reflect.Interface && !et.Implements(jsonCoderType) {
return nil, errors.Errorf("inferCoder failed: interface type %v has no coder registered", et)
}
c, err := newJSONCoder(et)
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
}
case typex.Composite:
c, err := inferCoders(t.Components())
if err != nil {
return nil, err
}
switch t.Type() {
case typex.KVType:
return &coder.Coder{Kind: coder.KV, T: t, Components: c}, nil
case typex.CoGBKType:
return &coder.Coder{Kind: coder.CoGBK, T: t, Components: c}, nil
case typex.WindowedValueType:
// TODO(herohde) 4/15/2018: do we ever infer W types now that PCollections
// are non-windowed? We either need to know the windowing strategy or
// we should remove this case.
return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: c, Window: coder.NewGlobalWindow()}, nil
default:
panic(fmt.Sprintf("Unexpected composite type: %v", t))
}
default:
panic(fmt.Sprintf("Unexpected type: %v", t))
}
}
func inferCoders(list []FullType) ([]*coder.Coder, error) {
var ret []*coder.Coder
for _, t := range list {
c, err := inferCoder(t)
if err != nil {
return nil, err
}
ret = append(ret, c)
}
return ret, nil
}
// protoEnc marshals the supplied proto.Message.
func protoEnc(in T) ([]byte, error) {
buf := proto.NewBuffer(nil)
buf.SetDeterministic(true)
if err := buf.Marshal(in.(proto.Message)); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// protoDec unmarshals the supplied bytes into an instance of the supplied
// proto.Message type.
func protoDec(t reflect.Type, in []byte) (T, error) {
val := reflect.New(t.Elem()).Interface().(proto.Message)
if err := proto.Unmarshal(in, val); err != nil {
return nil, err
}
return val, nil
}
// Concrete and universal custom coders both have a similar signature.
// Conversion is handled by reflection.
// jsonEnc encodes the supplied value in JSON.
func jsonEnc(in T) ([]byte, error) {
return json.Marshal(in)
}
// jsonDec decodes the supplied JSON into an instance of the supplied type.
func jsonDec(t reflect.Type, in []byte) (T, error) {
val := reflect.New(t)
if err := json.Unmarshal(in, val.Interface()); err != nil {
return nil, err
}
return val.Elem().Interface(), nil
}
func newJSONCoder(t reflect.Type) (*coder.CustomCoder, error) {
c, err := coder.NewCustomCoder("json", t, jsonEnc, jsonDec)
if err != nil {
return nil, errors.Wrapf(err, "invalid coder")
}
return c, nil
}