blob: 1211989ed62e2b5ab565839fba2739778071d2e2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exec
import (
"context"
"fmt"
"reflect"
"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
//go:generate specialize --input=fn_arity.tmpl
//go:generate gofmt -w fn_arity.go
// MainInput is the main input and is unfolded in the invocation, if present.
type MainInput struct {
Key FullValue
Values []ReStream
}
// Invoke invokes the fn with the given values. The extra values must match the non-main
// side input and emitters. It returns the direct output, if any.
func Invoke(ctx context.Context, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) {
if fn == nil {
return nil, nil // ok: nothing to Invoke
}
inv := newInvoker(fn)
return inv.Invoke(ctx, ws, ts, opt, extra...)
}
// InvokeWithoutEventTime runs the given function at time 0 in the global window.
func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) {
if fn == nil {
return nil, nil // ok: nothing to Invoke
}
inv := newInvoker(fn)
return inv.InvokeWithoutEventTime(ctx, opt, extra...)
}
// invoker is a container struct for hot path invocations of DoFns, to avoid
// repeating fixed set up per element.
type invoker struct {
fn *funcx.Fn
args []interface{}
// TODO(lostluck): 2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement.
ctxIdx, wndIdx, etIdx int // specialized input indexes
outEtIdx, outErrIdx int // specialized output indexes
in, out []int // general indexes
ret FullValue // ret is a cached allocation for passing to the next Unit. Units never modify the passed in FullValue.
elmConvert, elm2Convert func(interface{}) interface{} // Cached conversion functions, which assums this invoker is always used with the same parameter types.
call func(ws []typex.Window, ts typex.EventTime) (*FullValue, error)
}
func newInvoker(fn *funcx.Fn) *invoker {
n := &invoker{
fn: fn,
args: make([]interface{}, len(fn.Param)),
in: fn.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter | funcx.FnEmit),
out: fn.Returns(funcx.RetValue),
}
var ok bool
if n.ctxIdx, ok = fn.Context(); !ok {
n.ctxIdx = -1
}
if n.wndIdx, ok = fn.Window(); !ok {
n.wndIdx = -1
}
if n.etIdx, ok = fn.EventTime(); !ok {
n.etIdx = -1
}
if n.outEtIdx, ok = fn.OutEventTime(); !ok {
n.outEtIdx = -1
}
if n.outErrIdx, ok = fn.Error(); !ok {
n.outErrIdx = -1
}
n.initCall()
return n
}
// Reset zeroes argument entries in the cached slice to allow values to be garbage collected after the bundle ends.
func (n *invoker) Reset() {
for i := range n.args {
n.args[i] = nil
}
// Avoid leaking user elements after bundle termination.
n.ret = FullValue{}
}
// InvokeWithoutEventTime runs the function at time 0 in the global window.
func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, extra ...interface{}) (*FullValue, error) {
return n.Invoke(ctx, window.SingleGlobalWindow, mtime.ZeroTimestamp, opt, extra...)
}
// Invoke invokes the fn with the given values. The extra values must match the non-main
// side input and emitters. It returns the direct output, if any.
func (n *invoker) Invoke(ctx context.Context, ws []typex.Window, ts typex.EventTime, opt *MainInput, extra ...interface{}) (*FullValue, error) {
// (1) Populate contexts
// extract these to make things easier to read.
args := n.args
fn := n.fn
in := n.in
if n.ctxIdx >= 0 {
args[n.ctxIdx] = ctx
}
if n.wndIdx >= 0 {
if len(ws) != 1 {
return nil, errors.Errorf("DoFns that observe windows must be invoked with single window: %v", opt.Key.Windows)
}
args[n.wndIdx] = ws[0]
}
if n.etIdx >= 0 {
args[n.etIdx] = ts
}
// (2) Main input from value, if any.
i := 0
if opt != nil {
if n.elmConvert == nil {
from := reflect.TypeOf(opt.Key.Elm)
n.elmConvert = ConvertFn(from, fn.Param[in[i]].T)
}
args[in[i]] = n.elmConvert(opt.Key.Elm)
i++
if opt.Key.Elm2 != nil {
if n.elm2Convert == nil {
from := reflect.TypeOf(opt.Key.Elm2)
n.elm2Convert = ConvertFn(from, fn.Param[in[i]].T)
}
args[in[i]] = n.elm2Convert(opt.Key.Elm2)
i++
}
for _, iter := range opt.Values {
param := fn.Param[in[i]]
if param.Kind != funcx.FnIter {
return nil, errors.Errorf("GBK/CoGBK result values must be iterable: %v", param)
}
// TODO(herohde) 12/12/2017: allow form conversion on GBK results?
it := makeIter(param.T, iter)
it.Init()
args[in[i]] = it.Value()
i++
}
}
// (3) Precomputed side input and emitters (or other output).
for _, arg := range extra {
args[in[i]] = arg
i++
}
// (4) Invoke
return n.call(ws, ts)
}
// ret1 handles processing of a single return value.
// Errors or single values are the only options.
func (n *invoker) ret1(ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
switch {
case n.outErrIdx >= 0:
if r0 != nil {
return nil, r0.(error)
}
return nil, nil
case n.outEtIdx >= 0:
panic("invoker.ret1: cannot return event time without a value")
default:
n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0}
return &n.ret, nil
}
}
// ret2 handles processing of a pair of return values.
func (n *invoker) ret2(ws []typex.Window, ts typex.EventTime, r0, r1 interface{}) (*FullValue, error) {
switch {
case n.outErrIdx >= 0:
if r1 != nil {
return nil, r1.(error)
}
n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0}
return &n.ret, nil
case n.outEtIdx == 0:
n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1}
return &n.ret, nil
default:
n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1}
return &n.ret, nil
}
}
// ret3 handles processing of a trio of return values.
func (n *invoker) ret3(ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
switch {
case n.outErrIdx >= 0:
if r2 != nil {
return nil, r2.(error)
}
if n.outEtIdx < 0 {
n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1}
return &n.ret, nil
}
n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1}
return &n.ret, nil
case n.outEtIdx == 0:
n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2}
return &n.ret, nil
default:
panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
}
}
// ret4 handles processing of a quad of return values.
func (n *invoker) ret4(ws []typex.Window, ts typex.EventTime, r0, r1, r2, r3 interface{}) (*FullValue, error) {
if r3 != nil {
return nil, r3.(error)
}
n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2}
return &n.ret, nil
}
func makeSideInputs(fn *funcx.Fn, in []*graph.Inbound, side []ReStream) ([]ReusableInput, error) {
if len(side) == 0 {
return nil, nil // ok: no side input
}
if len(in) != len(side)+1 {
return nil, errors.Errorf("found %v inbound, want %v", len(in), len(side)+1)
}
param := fn.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter)
if len(param) <= len(side) {
return nil, errors.Errorf("found %v params, want >%v", len(param), len(side))
}
// The side input are last of the above params, so we can compute the offset easily.
offset := len(param) - len(side)
var ret []ReusableInput
for i := 0; i < len(side); i++ {
s, err := makeSideInput(in[i+1].Kind, fn.Param[param[i+offset]].T, side[i])
if err != nil {
return nil, errors.WithContextf(err, "making side input %v for %v", i, fn)
}
ret = append(ret, s)
}
return ret, nil
}
func makeEmitters(fn *funcx.Fn, nodes []Node) ([]ReusableEmitter, error) {
if len(nodes) == 0 {
return nil, nil // ok: no output nodes
}
offset := 0
if len(fn.Returns(funcx.RetValue)) > 0 {
offset = 1
}
out := fn.Params(funcx.FnEmit)
if len(out) != len(nodes)-offset {
return nil, errors.Errorf("found %v emitters, want %v", len(out), len(nodes)-offset)
}
var ret []ReusableEmitter
for i := 0; i < len(out); i++ {
param := fn.Param[out[i]]
ret = append(ret, makeEmit(param.T, nodes[i+offset]))
}
return ret, nil
}
// makeSideInput returns a reusable side input of the given kind and type.
func makeSideInput(kind graph.InputKind, t reflect.Type, values ReStream) (ReusableInput, error) {
switch kind {
case graph.Singleton:
elms, err := ReadAll(values)
if err != nil {
return nil, err
}
if len(elms) != 1 {
return nil, errors.Errorf("got %d values, want one value for %v side input of type %v", len(elms), graph.Singleton, t)
}
return &fixedValue{val: Convert(elms[0].Elm, t)}, nil
case graph.Slice:
elms, err := ReadAll(values)
if err != nil {
return nil, err
}
slice := reflect.MakeSlice(t, len(elms), len(elms))
for i := 0; i < len(elms); i++ {
slice.Index(i).Set(reflect.ValueOf(Convert(elms[i].Elm, t.Elem())))
}
return &fixedValue{val: slice.Interface()}, nil
case graph.Iter:
return makeIter(t, values), nil
case graph.ReIter:
return makeReIter(t, values), nil
default:
panic(fmt.Sprintf("Unexpected side input kind: %v", kind))
}
}