// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package graphx

import (
	"encoding/json"
	"fmt"
	"reflect"

	"time"

	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
	v1 "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)

var genFnType = reflect.TypeOf((*func(string, reflect.Type, []byte) reflectx.Func)(nil)).Elem()

// EncodeMultiEdge converts the preprocessed representation into the wire
// representation of the multiedge, capturing input and output type information.
func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, error) {
	ret := &v1.MultiEdge{}
	ret.Opcode = string(edge.Op)

	if edge.DoFn != nil {
		ref, err := encodeFn((*graph.Fn)(edge.DoFn))
		if err != nil {
			wrapped := errors.Wrap(err, "bad userfn")
			return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
		}
		ret.Fn = ref
	}
	if edge.CombineFn != nil {
		ref, err := encodeFn((*graph.Fn)(edge.CombineFn))
		if err != nil {
			wrapped := errors.Wrap(err, "bad combinefn")
			return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
		}
		ret.Fn = ref
	}
	if edge.WindowFn != nil {
		ret.WindowFn = encodeWindowFn(edge.WindowFn)
	}

	for _, in := range edge.Input {
		kind := encodeInputKind(in.Kind)
		t, err := encodeFullType(in.Type)
		if err != nil {
			wrapped := errors.Wrap(err, "bad input type")
			return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
		}
		ret.Inbound = append(ret.Inbound, &v1.MultiEdge_Inbound{Kind: kind, Type: t})
	}
	for _, out := range edge.Output {
		t, err := encodeFullType(out.Type)
		if err != nil {
			wrapped := errors.Wrap(err, "bad output type")
			return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
		}
		ret.Outbound = append(ret.Outbound, &v1.MultiEdge_Outbound{Type: t})
	}
	return ret, nil
}

// DecodeMultiEdge converts the wire representation into the preprocessed
// components representing that edge. We deserialize to components to avoid
// inserting the edge into a graph or creating a detached edge.
func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, []*graph.Inbound, []*graph.Outbound, error) {
	var u *graph.Fn
	var wfn *window.Fn
	var inbound []*graph.Inbound
	var outbound []*graph.Outbound

	opcode := graph.Opcode(edge.Opcode)

	if edge.Fn != nil {
		var err error
		u, err = decodeFn(edge.Fn)
		if err != nil {
			wrapped := errors.Wrap(err, "bad function")
			return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
		}
	}
	if edge.WindowFn != nil {
		wfn = decodeWindowFn(edge.WindowFn)
	}
	for _, in := range edge.Inbound {
		kind, err := decodeInputKind(in.Kind)
		if err != nil {
			wrapped := errors.Wrap(err, "bad input kind")
			return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
		}
		t, err := decodeFullType(in.Type)
		if err != nil {
			wrapped := errors.Wrap(err, "bad input type")
			return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
		}
		inbound = append(inbound, &graph.Inbound{Kind: kind, Type: t})
	}
	for _, out := range edge.Outbound {
		t, err := decodeFullType(out.Type)
		if err != nil {
			wrapped := errors.Wrap(err, "bad output type")
			return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
		}
		outbound = append(outbound, &graph.Outbound{Type: t})
	}

	return opcode, u, wfn, inbound, outbound, nil
}

func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error) {
	t, err := encodeType(c.Type)
	if err != nil {
		return nil, errors.WithContextf(err, "encoding custom coder %v for type %v", c, c.Type)
	}
	enc, err := encodeUserFn(c.Enc)
	if err != nil {
		wrapped := errors.Wrap(err, "bad encoding function")
		return nil, errors.WithContextf(wrapped, "encoding custom coder %v", c)
	}
	dec, err := encodeUserFn(c.Dec)
	if err != nil {
		wrapped := errors.Wrap(err, "bad decoding function")
		return nil, errors.WithContextf(wrapped, "encoding custom coder %v", c)
	}

	ret := &v1.CustomCoder{
		Name: c.Name,
		Type: t,
		Enc:  enc,
		Dec:  dec,
	}
	return ret, nil
}

func decodeCustomCoder(c *v1.CustomCoder) (*coder.CustomCoder, error) {
	t, err := decodeType(c.Type)
	if err != nil {
		return nil, errors.WithContextf(err, "decoding custom coder %v for type %v", c, c.Type)
	}
	enc, err := decodeUserFn(c.Enc)
	if err != nil {
		wrapped := errors.Wrap(err, "bad encoding function")
		return nil, errors.WithContextf(wrapped, "decoding custom coder %v", c)
	}
	dec, err := decodeUserFn(c.Dec)
	if err != nil {
		wrapped := errors.Wrap(err, "bad decoding function")
		return nil, errors.WithContextf(wrapped, "decoding custom coder %v", c)
	}

	ret, err := coder.NewCustomCoder(c.Name, t, enc, dec)
	if err != nil {
		return nil, errors.WithContextf(err, "decoding custom coder %v", c)
	}
	return ret, nil
}

func encodeWindowFn(w *window.Fn) *v1.WindowFn {
	return &v1.WindowFn{
		Kind:     string(w.Kind),
		SizeMs:   duration2ms(w.Size),
		PeriodMs: duration2ms(w.Period),
		GapMs:    duration2ms(w.Gap),
	}
}

func decodeWindowFn(w *v1.WindowFn) *window.Fn {
	return &window.Fn{
		Kind:   window.Kind(w.Kind),
		Size:   ms2duration(w.SizeMs),
		Period: ms2duration(w.PeriodMs),
		Gap:    ms2duration(w.GapMs),
	}
}

func duration2ms(d time.Duration) int64 {
	return d.Nanoseconds() / 1e6
}

func ms2duration(d int64) time.Duration {
	return time.Duration(d) * time.Millisecond
}

func encodeFn(u *graph.Fn) (*v1.Fn, error) {
	switch {
	case u.DynFn != nil:
		gen := reflectx.FunctionName(u.DynFn.Gen)
		t, err := encodeType(u.DynFn.T)
		if err != nil {
			wrapped := errors.Wrap(err, "bad function type")
			return nil, errors.WithContextf(wrapped, "encoding dynamic DoFn %v", u)
		}
		return &v1.Fn{Dynfn: &v1.DynFn{
			Name: u.DynFn.Name,
			Type: t,
			Data: u.DynFn.Data,
			Gen:  gen,
		}}, nil

	case u.Fn != nil:
		fn, err := encodeUserFn(u.Fn)
		if err != nil {
			wrapped := errors.Wrap(err, "bad userfn")
			return nil, errors.WithContextf(wrapped, "encoding DoFn %v", u)
		}
		return &v1.Fn{Fn: fn}, nil

	case u.Recv != nil:
		t := reflect.TypeOf(u.Recv)
		k, ok := runtime.TypeKey(reflectx.SkipPtr(t))
		if !ok {
			err := errors.Errorf("failed to create TypeKey for receiver type %T", u.Recv)
			return nil, errors.WithContextf(err, "encoding structural DoFn %v", u)
		}
		if _, ok := runtime.LookupType(k); !ok {
			err := errors.Errorf("receiver type %v must be registered", t)
			return nil, errors.WithContextf(err, "encoding structural DoFn %v", u)
		}
		typ, err := encodeType(t)
		if err != nil {
			wrapped := errors.Wrapf(err, "failed to encode receiver type %T", u.Recv)
			panic(errors.WithContextf(wrapped, "encoding structural DoFn %v", u))
		}

		data, err := json.Marshal(u.Recv)
		if err != nil {
			wrapped := errors.Wrapf(err, "failed to marshal receiver %v", u.Recv)
			return nil, errors.WithContextf(wrapped, "encoding structural DoFn %v", u)
		}
		return &v1.Fn{Type: typ, Opt: string(data)}, nil

	default:
		panic(fmt.Sprintf("Failed to encode DoFn %v, missing fn", u))
	}
}

func decodeFn(u *v1.Fn) (*graph.Fn, error) {
	if u.Dynfn != nil {
		gen, err := runtime.ResolveFunction(u.Dynfn.Gen, genFnType)
		if err != nil {
			wrapped := errors.Wrapf(err, "bad symbol %v", u.Dynfn.Gen)
			return nil, errors.WithContextf(wrapped, "decoding dynamic DoFn %v", u)
		}

		t, err := decodeType(u.Dynfn.Type)
		if err != nil {
			wrapped := errors.Wrap(err, "bad type")
			return nil, errors.WithContextf(wrapped, "failed to decode dynamic DoFn %v", u)
		}
		return graph.NewFn(&graph.DynFn{
			Name: u.Dynfn.Name,
			T:    t,
			Data: u.Dynfn.Data,
			Gen:  gen.(func(string, reflect.Type, []byte) reflectx.Func),
		})
	}
	if u.Fn != nil {
		fn, err := decodeUserFn(u.Fn)
		if err != nil {
			wrapped := errors.Wrap(err, "failed to decode userfn")
			return nil, errors.WithContextf(wrapped, "decoding DoFn %v", u)
		}
		fx, err := funcx.New(reflectx.MakeFunc(fn))
		if err != nil {
			wrapped := errors.Wrap(err, "failed to construct userfn")
			return nil, errors.WithContextf(wrapped, "decoding DoFn %v", u)
		}
		return &graph.Fn{Fn: fx}, nil
	}

	t, err := decodeType(u.Type)
	if err != nil {
		wrapped := errors.Wrap(err, "bad type")
		return nil, errors.WithContextf(wrapped, "decoding structural DoFn %v", u)
	}
	fn, err := reflectx.UnmarshalJSON(t, u.Opt)
	if err != nil {
		wrapped := errors.Wrap(err, "bad struct encoding")
		return nil, errors.WithContextf(wrapped, "decoding structural DoFn %v", u)
	}
	return graph.NewFn(fn)
}

// encodeUserFn translates the preprocessed representation of a Beam user function
// into the wire representation, capturing all the inputs and outputs needed.
func encodeUserFn(u *funcx.Fn) (*v1.UserFn, error) {
	// TODO(herohde) 5/23/2017: reject closures and dynamic functions. They can't
	// be serialized.

	symbol := u.Fn.Name()
	t, err := encodeType(u.Fn.Type())
	if err != nil {
		wrapped := errors.Wrap(err, "bad function type")
		return nil, errors.WithContextf(wrapped, "encoding userfn %v", u)
	}
	return &v1.UserFn{Name: symbol, Type: t}, nil
}

// decodeUserFn receives the wire representation of a Beam user function,
// extracting the preprocessed representation, expanding all inputs and outputs
// of the function.
func decodeUserFn(ref *v1.UserFn) (interface{}, error) {
	t, err := decodeType(ref.GetType())
	if err != nil {
		return nil, err
	}

	return runtime.ResolveFunction(ref.Name, t)
}

func encodeFullType(t typex.FullType) (*v1.FullType, error) {
	var components []*v1.FullType
	if t.Class() == typex.Composite {
		// Drop the Aggregate convenience component.

		for _, comp := range t.Components() {
			c, err := encodeFullType(comp)
			if err != nil {
				return nil, err
			}
			components = append(components, c)
		}
	}

	prim, err := encodeType(t.Type())
	if err != nil {
		wrapped := errors.Wrap(err, "bad type")
		return nil, errors.WithContextf(wrapped, "encoding full type %v", t)
	}
	return &v1.FullType{Type: prim, Components: components}, nil
}

func decodeFullType(t *v1.FullType) (typex.FullType, error) {
	var components []typex.FullType
	for _, comp := range t.Components {
		c, err := decodeFullType(comp)
		if err != nil {
			return nil, err
		}
		components = append(components, c)
	}

	prim, err := decodeType(t.Type)
	if err != nil {
		wrapped := errors.Wrap(err, "bad type")
		return nil, errors.WithContextf(wrapped, "decoding full type %v", t)
	}
	return typex.New(prim, components...), nil
}

func encodeType(t reflect.Type) (*v1.Type, error) {
	if s, ok := tryEncodeSpecial(t); ok {
		return &v1.Type{Kind: v1.Type_SPECIAL, Special: s}, nil
	}
	if k, ok := runtime.TypeKey(t); ok {
		if _, present := runtime.LookupType(k); present {
			// External type. Serialize by key and lookup in registry
			// on decoding side.

			return &v1.Type{Kind: v1.Type_EXTERNAL, ExternalKey: k}, nil
		}
	}

	// The supplied type isn't special, so apply the standard encodings.
	switch t.Kind() {
	case reflect.Bool:
		return &v1.Type{Kind: v1.Type_BOOL}, nil
	case reflect.Int:
		return &v1.Type{Kind: v1.Type_INT}, nil
	case reflect.Int8:
		return &v1.Type{Kind: v1.Type_INT8}, nil
	case reflect.Int16:
		return &v1.Type{Kind: v1.Type_INT16}, nil
	case reflect.Int32:
		return &v1.Type{Kind: v1.Type_INT32}, nil
	case reflect.Int64:
		return &v1.Type{Kind: v1.Type_INT64}, nil
	case reflect.Uint:
		return &v1.Type{Kind: v1.Type_UINT}, nil
	case reflect.Uint8:
		return &v1.Type{Kind: v1.Type_UINT8}, nil
	case reflect.Uint16:
		return &v1.Type{Kind: v1.Type_UINT16}, nil
	case reflect.Uint32:
		return &v1.Type{Kind: v1.Type_UINT32}, nil
	case reflect.Uint64:
		return &v1.Type{Kind: v1.Type_UINT64}, nil
	case reflect.Float32:
		return &v1.Type{Kind: v1.Type_FLOAT32}, nil
	case reflect.Float64:
		return &v1.Type{Kind: v1.Type_FLOAT64}, nil
	case reflect.String:
		return &v1.Type{Kind: v1.Type_STRING}, nil

	case reflect.Slice:
		elm, err := encodeType(t.Elem())
		if err != nil {
			wrapped := errors.Wrap(err, "bad element type")
			return nil, errors.WithContextf(wrapped, "encoding slice %v", t)
		}
		return &v1.Type{Kind: v1.Type_SLICE, Element: elm}, nil

	case reflect.Struct:
		var fields []*v1.Type_StructField
		for i := 0; i < t.NumField(); i++ {
			f := t.Field(i)

			if f.PkgPath != "" {
				wrapped := errors.Errorf("type has unexported field: %v", f.Name)
				return nil, errors.WithContextf(wrapped, "encoding struct %v", t)
			}

			fType, err := encodeType(f.Type)
			if err != nil {
				wrapped := errors.Wrap(err, "bad field type")
				return nil, errors.WithContextf(wrapped, "encoding struct %v", t)
			}

			field := &v1.Type_StructField{
				Name:      f.Name,
				PkgPath:   f.PkgPath,
				Type:      fType,
				Tag:       string(f.Tag),
				Offset:    int64(f.Offset),
				Index:     encodeInts(f.Index),
				Anonymous: f.Anonymous,
			}
			fields = append(fields, field)
		}
		return &v1.Type{Kind: v1.Type_STRUCT, Fields: fields}, nil

	case reflect.Func:
		var in []*v1.Type
		for i := 0; i < t.NumIn(); i++ {
			param, err := encodeType(t.In(i))
			if err != nil {
				wrapped := errors.Wrap(err, "bad parameter type")
				return nil, errors.WithContextf(wrapped, "encoding function %v", t)
			}
			in = append(in, param)
		}
		var out []*v1.Type
		for i := 0; i < t.NumOut(); i++ {
			ret, err := encodeType(t.Out(i))
			if err != nil {
				wrapped := errors.Wrap(err, "bad return type")
				return nil, errors.WithContextf(wrapped, "encoding function %v", t)
			}
			out = append(out, ret)
		}
		return &v1.Type{Kind: v1.Type_FUNC, ParameterTypes: in, ReturnTypes: out, IsVariadic: t.IsVariadic()}, nil

	case reflect.Chan:
		elm, err := encodeType(t.Elem())
		if err != nil {
			wrapped := errors.Wrap(err, "bad element type")
			return nil, errors.WithContextf(wrapped, "encoding channel %v", t)
		}
		dir := encodeChanDir(t.ChanDir())
		return &v1.Type{Kind: v1.Type_CHAN, Element: elm, ChanDir: dir}, nil

	case reflect.Ptr:
		elm, err := encodeType(t.Elem())
		if err != nil {
			wrapped := errors.Wrap(err, "bad base type")
			return nil, errors.WithContextf(wrapped, "encoding pointer %v", t)
		}
		return &v1.Type{Kind: v1.Type_PTR, Element: elm}, nil

	default:
		return nil, errors.Errorf("unencodable type %v", t)
	}
}

func tryEncodeSpecial(t reflect.Type) (v1.Type_Special, bool) {
	switch t {
	case reflectx.Error:
		return v1.Type_ERROR, true
	case reflectx.Context:
		return v1.Type_CONTEXT, true
	case reflectx.Type:
		return v1.Type_TYPE, true

	case typex.EventTimeType:
		return v1.Type_EVENTTIME, true
	case typex.WindowType:
		return v1.Type_WINDOW, true
	case typex.KVType:
		return v1.Type_KV, true
	case typex.CoGBKType:
		return v1.Type_COGBK, true
	case typex.WindowedValueType:
		return v1.Type_WINDOWEDVALUE, true

	case typex.TType:
		return v1.Type_T, true
	case typex.UType:
		return v1.Type_U, true
	case typex.VType:
		return v1.Type_V, true
	case typex.WType:
		return v1.Type_W, true
	case typex.XType:
		return v1.Type_X, true
	case typex.YType:
		return v1.Type_Y, true
	case typex.ZType:
		return v1.Type_Z, true

	default:
		return v1.Type_ILLEGAL, false
	}
}

func decodeType(t *v1.Type) (reflect.Type, error) {
	if t == nil {
		err := errors.New("empty type")
		return nil, errors.WithContextf(err, "decoding type %v", t)
	}

	switch t.Kind {
	case v1.Type_BOOL:
		return reflectx.Bool, nil
	case v1.Type_INT:
		return reflectx.Int, nil
	case v1.Type_INT8:
		return reflectx.Int8, nil
	case v1.Type_INT16:
		return reflectx.Int16, nil
	case v1.Type_INT32:
		return reflectx.Int32, nil
	case v1.Type_INT64:
		return reflectx.Int64, nil
	case v1.Type_UINT:
		return reflectx.Uint, nil
	case v1.Type_UINT8:
		return reflectx.Uint8, nil
	case v1.Type_UINT16:
		return reflectx.Uint16, nil
	case v1.Type_UINT32:
		return reflectx.Uint32, nil
	case v1.Type_UINT64:
		return reflectx.Uint64, nil
	case v1.Type_FLOAT32:
		return reflectx.Float32, nil
	case v1.Type_FLOAT64:
		return reflectx.Float64, nil
	case v1.Type_STRING:
		return reflectx.String, nil

	case v1.Type_SLICE:
		elm, err := decodeType(t.GetElement())
		if err != nil {
			wrapped := errors.Wrap(err, "bad element")
			return nil, errors.WithContextf(wrapped, "failed to decode type %v, bad element", t)
		}
		return reflect.SliceOf(elm), nil

	case v1.Type_STRUCT:
		var fields []reflect.StructField
		for _, f := range t.Fields {
			fType, err := decodeType(f.Type)
			if err != nil {
				wrapped := errors.Wrap(err, "bad field type")
				return nil, errors.WithContextf(wrapped, "failed to decode type %v, bad field type", t)
			}

			field := reflect.StructField{
				Name:      f.GetName(),
				PkgPath:   f.GetPkgPath(),
				Type:      fType,
				Tag:       reflect.StructTag(f.GetTag()),
				Offset:    uintptr(f.GetOffset()),
				Index:     decodeInts(f.GetIndex()),
				Anonymous: f.GetAnonymous(),
			}
			fields = append(fields, field)
		}
		return reflect.StructOf(fields), nil

	case v1.Type_FUNC:
		in, err := decodeTypes(t.GetParameterTypes())
		if err != nil {
			wrapped := errors.Wrap(err, "bad parameter type")
			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
		}
		out, err := decodeTypes(t.GetReturnTypes())
		if err != nil {
			wrapped := errors.Wrap(err, "bad return type")
			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
		}
		return reflect.FuncOf(in, out, t.GetIsVariadic()), nil

	case v1.Type_CHAN:
		elm, err := decodeType(t.GetElement())
		if err != nil {
			wrapped := errors.Wrap(err, "bad element")
			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
		}
		dir, err := decodeChanDir(t.GetChanDir())
		if err != nil {
			wrapped := errors.Wrap(err, "bad channel direction")
			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
		}
		return reflect.ChanOf(dir, elm), nil

	case v1.Type_PTR:
		elm, err := decodeType(t.GetElement())
		if err != nil {
			wrapped := errors.Wrap(err, "bad element")
			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
		}
		return reflect.PtrTo(elm), nil

	case v1.Type_SPECIAL:
		ret, err := decodeSpecial(t.Special)
		if err != nil {
			wrapped := errors.Wrap(err, "bad element")
			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
		}
		return ret, nil

	case v1.Type_EXTERNAL:
		ret, ok := runtime.LookupType(t.ExternalKey)
		if !ok {
			err := errors.Errorf("external key not found %v", t.ExternalKey)
			return nil, errors.WithContextf(err, "decoding type %v", t)
		}
		return ret, nil

	default:
		err := errors.Errorf("unexpected type kind %v", t.Kind)
		return nil, errors.WithContextf(err, "failed to decode type %v", t)
	}
}

func decodeSpecial(s v1.Type_Special) (reflect.Type, error) {
	switch s {
	case v1.Type_ERROR:
		return reflectx.Error, nil
	case v1.Type_CONTEXT:
		return reflectx.Context, nil
	case v1.Type_TYPE:
		return reflectx.Type, nil

	case v1.Type_EVENTTIME:
		return typex.EventTimeType, nil
	case v1.Type_WINDOW:
		return typex.WindowType, nil
	case v1.Type_KV:
		return typex.KVType, nil
	case v1.Type_COGBK:
		return typex.CoGBKType, nil
	case v1.Type_WINDOWEDVALUE:
		return typex.WindowedValueType, nil

	case v1.Type_T:
		return typex.TType, nil
	case v1.Type_U:
		return typex.UType, nil
	case v1.Type_V:
		return typex.VType, nil
	case v1.Type_W:
		return typex.WType, nil
	case v1.Type_X:
		return typex.XType, nil
	case v1.Type_Y:
		return typex.YType, nil
	case v1.Type_Z:
		return typex.ZType, nil

	default:
		return nil, errors.Errorf("failed to decode special type, unknown type %v", s)
	}
}

func decodeTypes(list []*v1.Type) ([]reflect.Type, error) {
	var ret []reflect.Type
	for _, elm := range list {
		t, err := decodeType(elm)
		if err != nil {
			return nil, err
		}
		ret = append(ret, t)
	}
	return ret, nil
}

func encodeInts(offsets []int) []int32 {
	var ret []int32
	for _, elm := range offsets {
		ret = append(ret, int32(elm))
	}
	return ret
}

func decodeInts(offsets []int32) []int {
	var ret []int
	for _, elm := range offsets {
		ret = append(ret, int(elm))
	}
	return ret
}

func encodeChanDir(dir reflect.ChanDir) v1.Type_ChanDir {
	switch dir {
	case reflect.RecvDir:
		return v1.Type_RECV
	case reflect.SendDir:
		return v1.Type_SEND
	case reflect.BothDir:
		return v1.Type_BOTH
	default:
		panic(fmt.Sprintf("Failed to encode channel direction, invalid value: %v", dir))
	}
}

func decodeChanDir(dir v1.Type_ChanDir) (reflect.ChanDir, error) {
	switch dir {
	case v1.Type_RECV:
		return reflect.RecvDir, nil
	case v1.Type_SEND:
		return reflect.SendDir, nil
	case v1.Type_BOTH:
		return reflect.BothDir, nil
	default:
		err := errors.Errorf("invalid value: %v", dir)
		return reflect.BothDir, errors.WithContext(err, "decoding channel direction")
	}
}

func encodeInputKind(k graph.InputKind) v1.MultiEdge_Inbound_InputKind {
	switch k {
	case graph.Main:
		return v1.MultiEdge_Inbound_MAIN
	case graph.Singleton:
		return v1.MultiEdge_Inbound_SINGLETON
	case graph.Slice:
		return v1.MultiEdge_Inbound_SLICE
	case graph.Map:
		return v1.MultiEdge_Inbound_MAP
	case graph.MultiMap:
		return v1.MultiEdge_Inbound_MULTIMAP
	case graph.Iter:
		return v1.MultiEdge_Inbound_ITER
	case graph.ReIter:
		return v1.MultiEdge_Inbound_REITER
	default:
		panic(fmt.Sprintf("Failed to encode input kind, invalid value: %v", k))
	}
}

func decodeInputKind(k v1.MultiEdge_Inbound_InputKind) (graph.InputKind, error) {
	switch k {
	case v1.MultiEdge_Inbound_MAIN:
		return graph.Main, nil
	case v1.MultiEdge_Inbound_SINGLETON:
		return graph.Singleton, nil
	case v1.MultiEdge_Inbound_SLICE:
		return graph.Slice, nil
	case v1.MultiEdge_Inbound_MAP:
		return graph.Map, nil
	case v1.MultiEdge_Inbound_MULTIMAP:
		return graph.MultiMap, nil
	case v1.MultiEdge_Inbound_ITER:
		return graph.Iter, nil
	case v1.MultiEdge_Inbound_REITER:
		return graph.ReIter, nil
	default:
		err := errors.Errorf("invalid value: %v", k)
		return graph.Main, errors.WithContext(err, "decoding input kind")
	}
}
