| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package funcx |
| |
| import ( |
| "fmt" |
| "reflect" |
| |
| "github.com/apache/beam/sdks/go/pkg/beam/core/typex" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" |
| "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" |
| ) |
| |
| // Note that we can't tell the difference between K, V and V, S before binding. |
| |
| // FnParamKind represents the kinds of parameters a user function may take. |
| type FnParamKind int |
| |
| const ( |
| // FnIllegal is an illegal function input parameter type. |
| FnIllegal FnParamKind = 0x0 |
| // FnContext marks a function input parameter of type context.Context. |
| FnContext FnParamKind = 0x1 |
| // FnEventTime indicates a function input parameter of type typex.EventTime. |
| FnEventTime FnParamKind = 0x2 |
| // FnValue indicates a function input parameter of an ordinary Go type. |
| FnValue FnParamKind = 0x4 |
| // FnIter indicates a function input parameter that is an iterator. |
| // Examples of iterators: |
| // "func (*int) bool" |
| // "func (*string, *T) bool" |
| // If there are 2 parameters, a KV input is implied. |
| FnIter FnParamKind = 0x08 |
| // FnReIter indicates a function input parameter that is a reiterable |
| // iterator. |
| // The function signature is a function returning a function matching |
| // the iterator signature. |
| // "func() func (*int) bool" |
| // "func() func (*string, *T) bool" |
| // are reiterable versions of the FnIter examples. |
| FnReIter FnParamKind = 0x10 |
| // FnEmit indicates a function input parameter that is an emitter. |
| // Examples of emitters: |
| // "func (int)" |
| // "func (string, T)" |
| // "func (EventTime, int)" |
| // "func (EventTime, string, T)" |
| // If there are 2 regular parameters, a KV output is implied. An optional |
| // EventTime is allowed as well. Emitters cannot fail. |
| FnEmit FnParamKind = 0x20 |
| // FnType indicates a function input parameter that is a type for a coder. It |
| // is only valid for coders. |
| FnType FnParamKind = 0x40 |
| // FnWindow indicates a function input parameter that implements typex.Window. |
| FnWindow FnParamKind = 0x80 |
| ) |
| |
| func (k FnParamKind) String() string { |
| switch k { |
| case FnContext: |
| return "Context" |
| case FnEventTime: |
| return "EventTime" |
| case FnValue: |
| return "Value" |
| case FnIter: |
| return "Iter" |
| case FnReIter: |
| return "ReIter" |
| case FnEmit: |
| return "Emit" |
| case FnType: |
| return "Type" |
| case FnWindow: |
| return "Window" |
| default: |
| return fmt.Sprintf("%v", int(k)) |
| } |
| } |
| |
| // FnParam captures the kind and type of a single user function parameter. |
| type FnParam struct { |
| Kind FnParamKind |
| T reflect.Type |
| } |
| |
| // ReturnKind represents the kinds of return values a user function may provide. |
| type ReturnKind int |
| |
| // The supported types of ReturnKind. |
| const ( |
| RetIllegal ReturnKind = 0x0 |
| RetEventTime ReturnKind = 0x1 |
| RetValue ReturnKind = 0x2 |
| RetError ReturnKind = 0x4 |
| ) |
| |
| func (k ReturnKind) String() string { |
| switch k { |
| case RetError: |
| return "Error" |
| case RetEventTime: |
| return "EventTime" |
| case RetValue: |
| return "Value" |
| default: |
| return fmt.Sprintf("%v", int(k)) |
| } |
| } |
| |
| // ReturnParam captures the kind and type of a single user function return value. |
| type ReturnParam struct { |
| Kind ReturnKind |
| T reflect.Type |
| } |
| |
| // Fn is the reflected user function or method, preprocessed. This wrapper |
| // is useful both at graph construction time as well as execution time. |
| type Fn struct { |
| Fn reflectx.Func |
| |
| Param []FnParam |
| Ret []ReturnParam |
| } |
| |
| // Context returns (index, true) iff the function expects a context.Context. |
| // The context should be the first parameter by convention. |
| func (u *Fn) Context() (pos int, exists bool) { |
| for i, p := range u.Param { |
| if p.Kind == FnContext { |
| return i, true |
| } |
| } |
| return -1, false |
| } |
| |
| // Type returns (index, true) iff the function expects a reflect.FullType. |
| func (u *Fn) Type() (pos int, exists bool) { |
| for i, p := range u.Param { |
| if p.Kind == FnType { |
| return i, true |
| } |
| } |
| return -1, false |
| } |
| |
| // EventTime returns (index, true) iff the function expects an event timestamp. |
| func (u *Fn) EventTime() (pos int, exists bool) { |
| for i, p := range u.Param { |
| if p.Kind == FnEventTime { |
| return i, true |
| } |
| } |
| return -1, false |
| } |
| |
| // Window returns (index, true) iff the function expects a window. |
| func (u *Fn) Window() (pos int, exists bool) { |
| for i, p := range u.Param { |
| if p.Kind == FnWindow { |
| return i, true |
| } |
| } |
| return -1, false |
| } |
| |
| // Error returns (index, true) iff the function returns an error. |
| func (u *Fn) Error() (pos int, exists bool) { |
| for i, p := range u.Ret { |
| if p.Kind == RetError { |
| return i, true |
| } |
| } |
| return -1, false |
| } |
| |
| // OutEventTime returns (index, true) iff the function returns an event timestamp. |
| func (u *Fn) OutEventTime() (pos int, exists bool) { |
| for i, p := range u.Ret { |
| if p.Kind == RetEventTime { |
| return i, true |
| } |
| } |
| return -1, false |
| } |
| |
| // Params returns the parameter indices that matches the given mask. |
| func (u *Fn) Params(mask FnParamKind) []int { |
| var ret []int |
| for i, p := range u.Param { |
| if (p.Kind & mask) != 0 { |
| ret = append(ret, i) |
| } |
| } |
| return ret |
| } |
| |
| // Returns returns the return indices that matches the given mask. |
| func (u *Fn) Returns(mask ReturnKind) []int { |
| var ret []int |
| for i, p := range u.Ret { |
| if (p.Kind & mask) != 0 { |
| ret = append(ret, i) |
| } |
| } |
| return ret |
| } |
| |
| func (u *Fn) String() string { |
| return fmt.Sprintf("{Fn:{Name:%v Kind:%v} Param:%+v Ret:%+v}", u.Fn.Name(), u.Fn.Type(), u.Param, u.Ret) |
| } |
| |
| // New returns a Fn from a user function, if valid. Closures and dynamically |
| // created functions are considered valid here, but will be rejected if they |
| // are attempted to be serialized. |
| func New(fn reflectx.Func) (*Fn, error) { |
| var param []FnParam |
| for i := 0; i < fn.Type().NumIn(); i++ { |
| t := fn.Type().In(i) |
| |
| kind := FnIllegal |
| switch { |
| case t == reflectx.Context: |
| kind = FnContext |
| case t == typex.EventTimeType: |
| kind = FnEventTime |
| case t.Implements(typex.WindowType): |
| kind = FnWindow |
| case t == reflectx.Type: |
| kind = FnType |
| case typex.IsContainer(t), typex.IsConcrete(t), typex.IsUniversal(t): |
| kind = FnValue |
| case IsEmit(t): |
| kind = FnEmit |
| case IsIter(t): |
| kind = FnIter |
| case IsReIter(t): |
| kind = FnReIter |
| default: |
| return nil, errors.Errorf("bad parameter type for %s: %v", fn.Name(), t) |
| } |
| |
| param = append(param, FnParam{Kind: kind, T: t}) |
| } |
| |
| var ret []ReturnParam |
| for i := 0; i < fn.Type().NumOut(); i++ { |
| t := fn.Type().Out(i) |
| |
| kind := RetIllegal |
| switch { |
| case t == reflectx.Error: |
| kind = RetError |
| case t == typex.EventTimeType: |
| kind = RetEventTime |
| case typex.IsContainer(t), typex.IsConcrete(t), typex.IsUniversal(t): |
| kind = RetValue |
| default: |
| return nil, errors.Errorf("bad return type for %s: %v", fn.Name(), t) |
| } |
| |
| ret = append(ret, ReturnParam{Kind: kind, T: t}) |
| } |
| |
| u := &Fn{Fn: fn, Param: param, Ret: ret} |
| |
| if err := validateOrder(u); err != nil { |
| return nil, err |
| } |
| return u, nil |
| } |
| |
| // SubParams returns the subsequence of the given params with the given |
| // indices. |
| func SubParams(list []FnParam, indices ...int) []FnParam { |
| var ret []FnParam |
| for _, index := range indices { |
| ret = append(ret, list[index]) |
| } |
| return ret |
| } |
| |
| // SubReturns returns the subsequence of the given return params with |
| // the given indices. |
| func SubReturns(list []ReturnParam, indices ...int) []ReturnParam { |
| var ret []ReturnParam |
| for _, index := range indices { |
| ret = append(ret, list[index]) |
| } |
| return ret |
| } |
| |
| // The order of present parameters and return values must be as follows: |
| // func(FnContext?, FnWindow?, FnEventTime?, FnType?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetEventTime?, RetError?) |
| // where ? indicates 0 or 1, and * indicates any number. |
| // and a SideInput is one of FnValue or FnIter or FnReIter |
| // Note: Fns with inputs must have at least one FnValue as the main input. |
| func validateOrder(u *Fn) error { |
| paramState := psStart |
| var err error |
| // Validate the parameter ordering. |
| for i, p := range u.Param { |
| if paramState, err = nextParamState(paramState, p.Kind); err != nil { |
| return errors.WithContextf(err, "validating parameter %d for %s", i, u.Fn.Name()) |
| } |
| } |
| // Validate the return value ordering. |
| retState := rsStart |
| for i, r := range u.Ret { |
| if retState, err = nextRetState(retState, r.Kind); err != nil { |
| return errors.WithContextf(err, "validating return value %d for %s", i, u.Fn.Name()) |
| } |
| } |
| return nil |
| } |
| |
| var ( |
| errContextParam = errors.New("may only have a single context.Context parameter and it must be the first parameter") |
| errWindowParamPrecedence = errors.New("may only have a single Window parameter and it must precede the EventTime and main input parameter") |
| errEventTimeParamPrecedence = errors.New("may only have a single beam.EventTime parameter and it must precede the main input parameter") |
| errReflectTypePrecedence = errors.New("may only have a single reflect.Type parameter and it must precede the main input parameter") |
| errSideInputPrecedence = errors.New("side input parameters must follow main input parameter") |
| errInputPrecedence = errors.New("inputs parameters must precede emit function parameters") |
| ) |
| |
| type paramState int |
| |
| const ( |
| psStart paramState = iota |
| psContext |
| psWindow |
| psEventTime |
| psType |
| psInput |
| psOutput |
| ) |
| |
| func nextParamState(cur paramState, transition FnParamKind) (paramState, error) { |
| switch cur { |
| case psStart: |
| switch transition { |
| case FnContext: |
| return psContext, nil |
| case FnWindow: |
| return psWindow, nil |
| case FnEventTime: |
| return psEventTime, nil |
| case FnType: |
| return psType, nil |
| } |
| case psContext: |
| switch transition { |
| case FnWindow: |
| return psWindow, nil |
| case FnEventTime: |
| return psEventTime, nil |
| case FnType: |
| return psType, nil |
| } |
| case psWindow: |
| switch transition { |
| case FnEventTime: |
| return psEventTime, nil |
| case FnType: |
| return psType, nil |
| } |
| case psEventTime: |
| switch transition { |
| case FnType: |
| return psType, nil |
| } |
| case psType: |
| // Completely handled by the default clause |
| case psInput: |
| switch transition { |
| case FnIter, FnReIter: |
| return psInput, nil |
| } |
| case psOutput: |
| switch transition { |
| case FnValue, FnIter, FnReIter: |
| return -1, errInputPrecedence |
| } |
| } |
| // Default transition cases to reduce duplication above |
| switch transition { |
| case FnContext: |
| return -1, errContextParam |
| case FnWindow: |
| return -1, errWindowParamPrecedence |
| case FnEventTime: |
| return -1, errEventTimeParamPrecedence |
| case FnType: |
| return -1, errReflectTypePrecedence |
| case FnValue: |
| return psInput, nil |
| case FnIter, FnReIter: |
| return -1, errSideInputPrecedence |
| case FnEmit: |
| return psOutput, nil |
| default: |
| panic(fmt.Sprintf("library error, unknown ParamKind: %v", transition)) |
| } |
| } |
| |
| var ( |
| errEventTimeRetPrecedence = errors.New("beam.EventTime must be first return parameter") |
| errErrorPrecedence = errors.New("error must be the final return parameter") |
| ) |
| |
| type retState int |
| |
| const ( |
| rsStart retState = iota |
| rsEventTime |
| rsOutput |
| rsError |
| ) |
| |
| func nextRetState(cur retState, transition ReturnKind) (retState, error) { |
| switch cur { |
| case rsStart: |
| switch transition { |
| case RetEventTime: |
| return rsEventTime, nil |
| } |
| case rsEventTime, rsOutput: |
| // Identical to the default cases. |
| case rsError: |
| // This is a terminal state. No valid transitions. error must be the final return value. |
| return -1, errErrorPrecedence |
| } |
| // The default cases to avoid repetition. |
| switch transition { |
| case RetEventTime: |
| return -1, errEventTimeRetPrecedence |
| case RetValue: |
| return rsOutput, nil |
| case RetError: |
| return rsError, nil |
| default: |
| panic(fmt.Sprintf("library error, unknown ReturnKind: %v", transition)) |
| } |
| } |