blob: bce12b79a24ad3e35ad6714074c2894490e8c2f3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package graphx
import (
"encoding/json"
"fmt"
"reflect"
"time"
"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
var genFnType = reflect.TypeOf((*func(string, reflect.Type, []byte) reflectx.Func)(nil)).Elem()
// EncodeMultiEdge converts the preprocessed representation into the wire
// representation of the multiedge, capturing input and output type information.
func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, error) {
ret := &v1.MultiEdge{}
ret.Opcode = string(edge.Op)
if edge.DoFn != nil {
ref, err := encodeFn((*graph.Fn)(edge.DoFn))
if err != nil {
wrapped := errors.Wrap(err, "bad userfn")
return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
}
ret.Fn = ref
}
if edge.CombineFn != nil {
ref, err := encodeFn((*graph.Fn)(edge.CombineFn))
if err != nil {
wrapped := errors.Wrap(err, "bad combinefn")
return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
}
ret.Fn = ref
}
if edge.WindowFn != nil {
ret.WindowFn = encodeWindowFn(edge.WindowFn)
}
for _, in := range edge.Input {
kind := encodeInputKind(in.Kind)
t, err := encodeFullType(in.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad input type")
return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
}
ret.Inbound = append(ret.Inbound, &v1.MultiEdge_Inbound{Kind: kind, Type: t})
}
for _, out := range edge.Output {
t, err := encodeFullType(out.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad output type")
return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
}
ret.Outbound = append(ret.Outbound, &v1.MultiEdge_Outbound{Type: t})
}
return ret, nil
}
// DecodeMultiEdge converts the wire representation into the preprocessed
// components representing that edge. We deserialize to components to avoid
// inserting the edge into a graph or creating a detached edge.
func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, []*graph.Inbound, []*graph.Outbound, error) {
var u *graph.Fn
var wfn *window.Fn
var inbound []*graph.Inbound
var outbound []*graph.Outbound
opcode := graph.Opcode(edge.Opcode)
if edge.Fn != nil {
var err error
u, err = decodeFn(edge.Fn)
if err != nil {
wrapped := errors.Wrap(err, "bad function")
return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
}
}
if edge.WindowFn != nil {
wfn = decodeWindowFn(edge.WindowFn)
}
for _, in := range edge.Inbound {
kind, err := decodeInputKind(in.Kind)
if err != nil {
wrapped := errors.Wrap(err, "bad input kind")
return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
}
t, err := decodeFullType(in.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad input type")
return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
}
inbound = append(inbound, &graph.Inbound{Kind: kind, Type: t})
}
for _, out := range edge.Outbound {
t, err := decodeFullType(out.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad output type")
return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
}
outbound = append(outbound, &graph.Outbound{Type: t})
}
return opcode, u, wfn, inbound, outbound, nil
}
func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error) {
t, err := encodeType(c.Type)
if err != nil {
return nil, errors.WithContextf(err, "encoding custom coder %v for type %v", c, c.Type)
}
enc, err := encodeUserFn(c.Enc)
if err != nil {
wrapped := errors.Wrap(err, "bad encoding function")
return nil, errors.WithContextf(wrapped, "encoding custom coder %v", c)
}
dec, err := encodeUserFn(c.Dec)
if err != nil {
wrapped := errors.Wrap(err, "bad decoding function")
return nil, errors.WithContextf(wrapped, "encoding custom coder %v", c)
}
ret := &v1.CustomCoder{
Name: c.Name,
Type: t,
Enc: enc,
Dec: dec,
}
return ret, nil
}
func decodeCustomCoder(c *v1.CustomCoder) (*coder.CustomCoder, error) {
t, err := decodeType(c.Type)
if err != nil {
return nil, errors.WithContextf(err, "decoding custom coder %v for type %v", c, c.Type)
}
enc, err := decodeUserFn(c.Enc)
if err != nil {
wrapped := errors.Wrap(err, "bad encoding function")
return nil, errors.WithContextf(wrapped, "decoding custom coder %v", c)
}
dec, err := decodeUserFn(c.Dec)
if err != nil {
wrapped := errors.Wrap(err, "bad decoding function")
return nil, errors.WithContextf(wrapped, "decoding custom coder %v", c)
}
ret, err := coder.NewCustomCoder(c.Name, t, enc, dec)
if err != nil {
return nil, errors.WithContextf(err, "decoding custom coder %v", c)
}
return ret, nil
}
func encodeWindowFn(w *window.Fn) *v1.WindowFn {
return &v1.WindowFn{
Kind: string(w.Kind),
SizeMs: duration2ms(w.Size),
PeriodMs: duration2ms(w.Period),
GapMs: duration2ms(w.Gap),
}
}
func decodeWindowFn(w *v1.WindowFn) *window.Fn {
return &window.Fn{
Kind: window.Kind(w.Kind),
Size: ms2duration(w.SizeMs),
Period: ms2duration(w.PeriodMs),
Gap: ms2duration(w.GapMs),
}
}
func duration2ms(d time.Duration) int64 {
return d.Nanoseconds() / 1e6
}
func ms2duration(d int64) time.Duration {
return time.Duration(d) * time.Millisecond
}
func encodeFn(u *graph.Fn) (*v1.Fn, error) {
switch {
case u.DynFn != nil:
gen := reflectx.FunctionName(u.DynFn.Gen)
t, err := encodeType(u.DynFn.T)
if err != nil {
wrapped := errors.Wrap(err, "bad function type")
return nil, errors.WithContextf(wrapped, "encoding dynamic DoFn %v", u)
}
return &v1.Fn{Dynfn: &v1.DynFn{
Name: u.DynFn.Name,
Type: t,
Data: u.DynFn.Data,
Gen: gen,
}}, nil
case u.Fn != nil:
fn, err := encodeUserFn(u.Fn)
if err != nil {
wrapped := errors.Wrap(err, "bad userfn")
return nil, errors.WithContextf(wrapped, "encoding DoFn %v", u)
}
return &v1.Fn{Fn: fn}, nil
case u.Recv != nil:
t := reflect.TypeOf(u.Recv)
k, ok := runtime.TypeKey(reflectx.SkipPtr(t))
if !ok {
err := errors.Errorf("failed to create TypeKey for receiver type %T", u.Recv)
return nil, errors.WithContextf(err, "encoding structural DoFn %v", u)
}
if _, ok := runtime.LookupType(k); !ok {
err := errors.Errorf("receiver type %v must be registered", t)
return nil, errors.WithContextf(err, "encoding structural DoFn %v", u)
}
typ, err := encodeType(t)
if err != nil {
wrapped := errors.Wrapf(err, "failed to encode receiver type %T", u.Recv)
panic(errors.WithContextf(wrapped, "encoding structural DoFn %v", u))
}
data, err := json.Marshal(u.Recv)
if err != nil {
wrapped := errors.Wrapf(err, "failed to marshal receiver %v", u.Recv)
return nil, errors.WithContextf(wrapped, "encoding structural DoFn %v", u)
}
return &v1.Fn{Type: typ, Opt: string(data)}, nil
default:
panic(fmt.Sprintf("Failed to encode DoFn %v, missing fn", u))
}
}
func decodeFn(u *v1.Fn) (*graph.Fn, error) {
if u.Dynfn != nil {
gen, err := runtime.ResolveFunction(u.Dynfn.Gen, genFnType)
if err != nil {
wrapped := errors.Wrapf(err, "bad symbol %v", u.Dynfn.Gen)
return nil, errors.WithContextf(wrapped, "decoding dynamic DoFn %v", u)
}
t, err := decodeType(u.Dynfn.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad type")
return nil, errors.WithContextf(wrapped, "failed to decode dynamic DoFn %v", u)
}
return graph.NewFn(&graph.DynFn{
Name: u.Dynfn.Name,
T: t,
Data: u.Dynfn.Data,
Gen: gen.(func(string, reflect.Type, []byte) reflectx.Func),
})
}
if u.Fn != nil {
fn, err := decodeUserFn(u.Fn)
if err != nil {
wrapped := errors.Wrap(err, "failed to decode userfn")
return nil, errors.WithContextf(wrapped, "decoding DoFn %v", u)
}
fx, err := funcx.New(reflectx.MakeFunc(fn))
if err != nil {
wrapped := errors.Wrap(err, "failed to construct userfn")
return nil, errors.WithContextf(wrapped, "decoding DoFn %v", u)
}
return &graph.Fn{Fn: fx}, nil
}
t, err := decodeType(u.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad type")
return nil, errors.WithContextf(wrapped, "decoding structural DoFn %v", u)
}
fn, err := reflectx.UnmarshalJSON(t, u.Opt)
if err != nil {
wrapped := errors.Wrap(err, "bad struct encoding")
return nil, errors.WithContextf(wrapped, "decoding structural DoFn %v", u)
}
return graph.NewFn(fn)
}
// encodeUserFn translates the preprocessed representation of a Beam user function
// into the wire representation, capturing all the inputs and outputs needed.
func encodeUserFn(u *funcx.Fn) (*v1.UserFn, error) {
// TODO(herohde) 5/23/2017: reject closures and dynamic functions. They can't
// be serialized.
symbol := u.Fn.Name()
t, err := encodeType(u.Fn.Type())
if err != nil {
wrapped := errors.Wrap(err, "bad function type")
return nil, errors.WithContextf(wrapped, "encoding userfn %v", u)
}
return &v1.UserFn{Name: symbol, Type: t}, nil
}
// decodeUserFn receives the wire representation of a Beam user function,
// extracting the preprocessed representation, expanding all inputs and outputs
// of the function.
func decodeUserFn(ref *v1.UserFn) (interface{}, error) {
t, err := decodeType(ref.GetType())
if err != nil {
return nil, err
}
return runtime.ResolveFunction(ref.Name, t)
}
func encodeFullType(t typex.FullType) (*v1.FullType, error) {
var components []*v1.FullType
if t.Class() == typex.Composite {
// Drop the Aggregate convenience component.
for _, comp := range t.Components() {
c, err := encodeFullType(comp)
if err != nil {
return nil, err
}
components = append(components, c)
}
}
prim, err := encodeType(t.Type())
if err != nil {
wrapped := errors.Wrap(err, "bad type")
return nil, errors.WithContextf(wrapped, "encoding full type %v", t)
}
return &v1.FullType{Type: prim, Components: components}, nil
}
func decodeFullType(t *v1.FullType) (typex.FullType, error) {
var components []typex.FullType
for _, comp := range t.Components {
c, err := decodeFullType(comp)
if err != nil {
return nil, err
}
components = append(components, c)
}
prim, err := decodeType(t.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad type")
return nil, errors.WithContextf(wrapped, "decoding full type %v", t)
}
return typex.New(prim, components...), nil
}
func encodeType(t reflect.Type) (*v1.Type, error) {
if s, ok := tryEncodeSpecial(t); ok {
return &v1.Type{Kind: v1.Type_SPECIAL, Special: s}, nil
}
if k, ok := runtime.TypeKey(t); ok {
if _, present := runtime.LookupType(k); present {
// External type. Serialize by key and lookup in registry
// on decoding side.
return &v1.Type{Kind: v1.Type_EXTERNAL, ExternalKey: k}, nil
}
}
// The supplied type isn't special, so apply the standard encodings.
switch t.Kind() {
case reflect.Bool:
return &v1.Type{Kind: v1.Type_BOOL}, nil
case reflect.Int:
return &v1.Type{Kind: v1.Type_INT}, nil
case reflect.Int8:
return &v1.Type{Kind: v1.Type_INT8}, nil
case reflect.Int16:
return &v1.Type{Kind: v1.Type_INT16}, nil
case reflect.Int32:
return &v1.Type{Kind: v1.Type_INT32}, nil
case reflect.Int64:
return &v1.Type{Kind: v1.Type_INT64}, nil
case reflect.Uint:
return &v1.Type{Kind: v1.Type_UINT}, nil
case reflect.Uint8:
return &v1.Type{Kind: v1.Type_UINT8}, nil
case reflect.Uint16:
return &v1.Type{Kind: v1.Type_UINT16}, nil
case reflect.Uint32:
return &v1.Type{Kind: v1.Type_UINT32}, nil
case reflect.Uint64:
return &v1.Type{Kind: v1.Type_UINT64}, nil
case reflect.Float32:
return &v1.Type{Kind: v1.Type_FLOAT32}, nil
case reflect.Float64:
return &v1.Type{Kind: v1.Type_FLOAT64}, nil
case reflect.String:
return &v1.Type{Kind: v1.Type_STRING}, nil
case reflect.Slice:
elm, err := encodeType(t.Elem())
if err != nil {
wrapped := errors.Wrap(err, "bad element type")
return nil, errors.WithContextf(wrapped, "encoding slice %v", t)
}
return &v1.Type{Kind: v1.Type_SLICE, Element: elm}, nil
case reflect.Struct:
var fields []*v1.Type_StructField
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
fType, err := encodeType(f.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad field type")
return nil, errors.WithContextf(wrapped, "encoding struct %v", t)
}
field := &v1.Type_StructField{
Name: f.Name,
PkgPath: f.PkgPath,
Type: fType,
Tag: string(f.Tag),
Offset: int64(f.Offset),
Index: encodeInts(f.Index),
Anonymous: f.Anonymous,
}
fields = append(fields, field)
}
return &v1.Type{Kind: v1.Type_STRUCT, Fields: fields}, nil
case reflect.Func:
var in []*v1.Type
for i := 0; i < t.NumIn(); i++ {
param, err := encodeType(t.In(i))
if err != nil {
wrapped := errors.Wrap(err, "bad parameter type")
return nil, errors.WithContextf(wrapped, "encoding function %v", t)
}
in = append(in, param)
}
var out []*v1.Type
for i := 0; i < t.NumOut(); i++ {
ret, err := encodeType(t.Out(i))
if err != nil {
wrapped := errors.Wrap(err, "bad return type")
return nil, errors.WithContextf(wrapped, "encoding function %v", t)
}
out = append(out, ret)
}
return &v1.Type{Kind: v1.Type_FUNC, ParameterTypes: in, ReturnTypes: out, IsVariadic: t.IsVariadic()}, nil
case reflect.Chan:
elm, err := encodeType(t.Elem())
if err != nil {
wrapped := errors.Wrap(err, "bad element type")
return nil, errors.WithContextf(wrapped, "encoding channel %v", t)
}
dir := encodeChanDir(t.ChanDir())
return &v1.Type{Kind: v1.Type_CHAN, Element: elm, ChanDir: dir}, nil
case reflect.Ptr:
elm, err := encodeType(t.Elem())
if err != nil {
wrapped := errors.Wrap(err, "bad base type")
return nil, errors.WithContextf(wrapped, "encoding pointer %v", t)
}
return &v1.Type{Kind: v1.Type_PTR, Element: elm}, nil
default:
return nil, errors.Errorf("unencodable type %v", t)
}
}
func tryEncodeSpecial(t reflect.Type) (v1.Type_Special, bool) {
switch t {
case reflectx.Error:
return v1.Type_ERROR, true
case reflectx.Context:
return v1.Type_CONTEXT, true
case reflectx.Type:
return v1.Type_TYPE, true
case typex.EventTimeType:
return v1.Type_EVENTTIME, true
case typex.WindowType:
return v1.Type_WINDOW, true
case typex.KVType:
return v1.Type_KV, true
case typex.CoGBKType:
return v1.Type_COGBK, true
case typex.WindowedValueType:
return v1.Type_WINDOWEDVALUE, true
case typex.TType:
return v1.Type_T, true
case typex.UType:
return v1.Type_U, true
case typex.VType:
return v1.Type_V, true
case typex.WType:
return v1.Type_W, true
case typex.XType:
return v1.Type_X, true
case typex.YType:
return v1.Type_Y, true
case typex.ZType:
return v1.Type_Z, true
default:
return v1.Type_ILLEGAL, false
}
}
func decodeType(t *v1.Type) (reflect.Type, error) {
if t == nil {
err := errors.New("empty type")
return nil, errors.WithContextf(err, "decoding type %v", t)
}
switch t.Kind {
case v1.Type_BOOL:
return reflectx.Bool, nil
case v1.Type_INT:
return reflectx.Int, nil
case v1.Type_INT8:
return reflectx.Int8, nil
case v1.Type_INT16:
return reflectx.Int16, nil
case v1.Type_INT32:
return reflectx.Int32, nil
case v1.Type_INT64:
return reflectx.Int64, nil
case v1.Type_UINT:
return reflectx.Uint, nil
case v1.Type_UINT8:
return reflectx.Uint8, nil
case v1.Type_UINT16:
return reflectx.Uint16, nil
case v1.Type_UINT32:
return reflectx.Uint32, nil
case v1.Type_UINT64:
return reflectx.Uint64, nil
case v1.Type_FLOAT32:
return reflectx.Float32, nil
case v1.Type_FLOAT64:
return reflectx.Float64, nil
case v1.Type_STRING:
return reflectx.String, nil
case v1.Type_SLICE:
elm, err := decodeType(t.GetElement())
if err != nil {
wrapped := errors.Wrap(err, "bad element")
return nil, errors.WithContextf(wrapped, "failed to decode type %v, bad element", t)
}
return reflect.SliceOf(elm), nil
case v1.Type_STRUCT:
var fields []reflect.StructField
for _, f := range t.Fields {
fType, err := decodeType(f.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad field type")
return nil, errors.WithContextf(wrapped, "failed to decode type %v, bad field type", t)
}
field := reflect.StructField{
Name: f.GetName(),
PkgPath: f.GetPkgPath(),
Type: fType,
Tag: reflect.StructTag(f.GetTag()),
Offset: uintptr(f.GetOffset()),
Index: decodeInts(f.GetIndex()),
Anonymous: f.GetAnonymous(),
}
fields = append(fields, field)
}
return reflect.StructOf(fields), nil
case v1.Type_FUNC:
in, err := decodeTypes(t.GetParameterTypes())
if err != nil {
wrapped := errors.Wrap(err, "bad parameter type")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
out, err := decodeTypes(t.GetReturnTypes())
if err != nil {
wrapped := errors.Wrap(err, "bad return type")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
return reflect.FuncOf(in, out, t.GetIsVariadic()), nil
case v1.Type_CHAN:
elm, err := decodeType(t.GetElement())
if err != nil {
wrapped := errors.Wrap(err, "bad element")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
dir, err := decodeChanDir(t.GetChanDir())
if err != nil {
wrapped := errors.Wrap(err, "bad channel direction")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
return reflect.ChanOf(dir, elm), nil
case v1.Type_PTR:
elm, err := decodeType(t.GetElement())
if err != nil {
wrapped := errors.Wrap(err, "bad element")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
return reflect.PtrTo(elm), nil
case v1.Type_SPECIAL:
ret, err := decodeSpecial(t.Special)
if err != nil {
wrapped := errors.Wrap(err, "bad element")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
return ret, nil
case v1.Type_EXTERNAL:
ret, ok := runtime.LookupType(t.ExternalKey)
if !ok {
err := errors.Errorf("external key not found %v", t.ExternalKey)
return nil, errors.WithContextf(err, "decoding type %v", t)
}
return ret, nil
default:
err := errors.Errorf("unexpected type kind %v", t.Kind)
return nil, errors.WithContextf(err, "failed to decode type %v", t)
}
}
func decodeSpecial(s v1.Type_Special) (reflect.Type, error) {
switch s {
case v1.Type_ERROR:
return reflectx.Error, nil
case v1.Type_CONTEXT:
return reflectx.Context, nil
case v1.Type_TYPE:
return reflectx.Type, nil
case v1.Type_EVENTTIME:
return typex.EventTimeType, nil
case v1.Type_WINDOW:
return typex.WindowType, nil
case v1.Type_KV:
return typex.KVType, nil
case v1.Type_COGBK:
return typex.CoGBKType, nil
case v1.Type_WINDOWEDVALUE:
return typex.WindowedValueType, nil
case v1.Type_T:
return typex.TType, nil
case v1.Type_U:
return typex.UType, nil
case v1.Type_V:
return typex.VType, nil
case v1.Type_W:
return typex.WType, nil
case v1.Type_X:
return typex.XType, nil
case v1.Type_Y:
return typex.YType, nil
case v1.Type_Z:
return typex.ZType, nil
default:
return nil, errors.Errorf("failed to decode special type, unknown type %v", s)
}
}
func decodeTypes(list []*v1.Type) ([]reflect.Type, error) {
var ret []reflect.Type
for _, elm := range list {
t, err := decodeType(elm)
if err != nil {
return nil, err
}
ret = append(ret, t)
}
return ret, nil
}
func encodeInts(offsets []int) []int32 {
var ret []int32
for _, elm := range offsets {
ret = append(ret, int32(elm))
}
return ret
}
func decodeInts(offsets []int32) []int {
var ret []int
for _, elm := range offsets {
ret = append(ret, int(elm))
}
return ret
}
func encodeChanDir(dir reflect.ChanDir) v1.Type_ChanDir {
switch dir {
case reflect.RecvDir:
return v1.Type_RECV
case reflect.SendDir:
return v1.Type_SEND
case reflect.BothDir:
return v1.Type_BOTH
default:
panic(fmt.Sprintf("Failed to encode channel direction, invalid value: %v", dir))
}
}
func decodeChanDir(dir v1.Type_ChanDir) (reflect.ChanDir, error) {
switch dir {
case v1.Type_RECV:
return reflect.RecvDir, nil
case v1.Type_SEND:
return reflect.SendDir, nil
case v1.Type_BOTH:
return reflect.BothDir, nil
default:
err := errors.Errorf("invalid value: %v", dir)
return reflect.BothDir, errors.WithContext(err, "decoding channel direction")
}
}
func encodeInputKind(k graph.InputKind) v1.MultiEdge_Inbound_InputKind {
switch k {
case graph.Main:
return v1.MultiEdge_Inbound_MAIN
case graph.Singleton:
return v1.MultiEdge_Inbound_SINGLETON
case graph.Slice:
return v1.MultiEdge_Inbound_SLICE
case graph.Map:
return v1.MultiEdge_Inbound_MAP
case graph.MultiMap:
return v1.MultiEdge_Inbound_MULTIMAP
case graph.Iter:
return v1.MultiEdge_Inbound_ITER
case graph.ReIter:
return v1.MultiEdge_Inbound_REITER
default:
panic(fmt.Sprintf("Failed to encode input kind, invalid value: %v", k))
}
}
func decodeInputKind(k v1.MultiEdge_Inbound_InputKind) (graph.InputKind, error) {
switch k {
case v1.MultiEdge_Inbound_MAIN:
return graph.Main, nil
case v1.MultiEdge_Inbound_SINGLETON:
return graph.Singleton, nil
case v1.MultiEdge_Inbound_SLICE:
return graph.Slice, nil
case v1.MultiEdge_Inbound_MAP:
return graph.Map, nil
case v1.MultiEdge_Inbound_MULTIMAP:
return graph.MultiMap, nil
case v1.MultiEdge_Inbound_ITER:
return graph.Iter, nil
case v1.MultiEdge_Inbound_REITER:
return graph.ReIter, nil
default:
err := errors.Errorf("invalid value: %v", k)
return graph.Main, errors.WithContext(err, "decoding input kind")
}
}