blob: 846dcebd8cb536c89ab7909e2cc8ff0be1298a3f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package graph
import (
"fmt"
"reflect"
"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
// Opcode represents a primitive Beam instruction kind.
type Opcode string
// Valid opcodes.
const (
Impulse Opcode = "Impulse"
ParDo Opcode = "ParDo"
CoGBK Opcode = "CoGBK"
External Opcode = "External"
Flatten Opcode = "Flatten"
Combine Opcode = "Combine"
WindowInto Opcode = "WindowInto"
)
// InputKind represents the role of the input and its shape.
type InputKind string
// Valid input kinds.
const (
Main InputKind = "Main"
Singleton InputKind = "Singleton"
Slice InputKind = "Slice"
Map InputKind = "Map" // TODO: allow?
MultiMap InputKind = "MultiMap" // TODO: allow?
Iter InputKind = "Iter"
ReIter InputKind = "ReIter"
)
// Inbound represents an inbound data link from a Node.
type Inbound struct {
// Kind presents the form of the data that the edge expects. Main input
// must be processed element-wise, but side input may take several
// convenient forms. For example, a DoFn that processes ints may choose
// among the following parameter types:
//
// * Main: int
// * Singleton: int
// * Slice: []int
// * Iter: func(*int) bool
// * ReIter: func() func(*int) bool
//
// If the DoFn is generic then int may be replaced by any of the type
// variables. For example,
//
// * Slice: []typex.T
// * Iter: func(*typex.X) bool
//
// If the input type is KV<int,string>, say, then the options are:
//
// * Main: int, string (as two separate parameters)
// * Map: map[int]string
// * MultiMap: map[int][]string
// * Iter: func(*int, *string) bool
// * ReIter: func() func(*int, *string) bool
//
// As above, either int, string, or both can be replaced with type
// variables. For example,
//
// * Map: map[typex.X]typex.Y
// * MultiMap: map[typex.T][]string
// * Iter: func(*typex.Z, *typex.Z) bool
//
// Note that in the last case the parameter type requires that both
// the key and value types are identical. Bind enforces such constraints.
Kind InputKind
// From is the incoming node in the graph.
From *Node
// Type is the fulltype matching the actual type used by the transform.
// Due to the loose signatures of DoFns, we can only determine the
// inbound structure when the fulltypes of the incoming links are present.
// For example,
//
// func (ctx context.Context, key int, value typex.X) error
//
// is a generic DoFn that if bound to KV<int,string> would have one
// Inbound link with type KV<int, X>.
Type typex.FullType
}
func (i *Inbound) String() string {
return fmt.Sprintf("In(%v): %v <- %v", i.Kind, i.Type, i.From)
}
// Outbound represents an outbound data link to a Node.
type Outbound struct {
// To is the outgoing node in the graph.
To *Node
// Type is the fulltype matching the actual type used by the transform.
// For DoFns, unlike inbound, the outbound types closely mimic the type
// signature. For example,
//
// func (ctx context.Context, emit func (key int, value typex.X)) error
//
// is a generic DoFn that produces one Outbound link of type KV<int,X>.
Type typex.FullType // representation type of data
}
func (o *Outbound) String() string {
return fmt.Sprintf("Out: %v -> %v", o.Type, o.To)
}
// Payload represents an external payload.
type Payload struct {
URN string
Data []byte
}
// MultiEdge represents a primitive data processing operation. Each non-user
// code operation may be implemented by either the harness or the runner.
type MultiEdge struct {
id int
parent *Scope
Op Opcode
DoFn *DoFn // ParDo
CombineFn *CombineFn // Combine
AccumCoder *coder.Coder // Combine
Value []byte // Impulse
Payload *Payload // External
WindowFn *window.Fn // WindowInto
Input []*Inbound
Output []*Outbound
}
// ID returns the graph-local identifier for the edge.
func (e *MultiEdge) ID() int {
return e.id
}
// Name returns a not-necessarily-unique name for the edge.
func (e *MultiEdge) Name() string {
if e.DoFn != nil {
return e.DoFn.Name()
}
if e.CombineFn != nil {
return e.CombineFn.Name()
}
return string(e.Op)
}
// Scope return the scope.
func (e *MultiEdge) Scope() *Scope {
return e.parent
}
func (e *MultiEdge) String() string {
return fmt.Sprintf("%v: %v %v -> %v", e.id, e.Op, e.Input, e.Output)
}
// NOTE(herohde) 4/28/2017: In general, we have no good coder guess for outgoing
// nodes, unless we add a notion of default coder for arbitrary types. We leave
// that to the beam layer.
// NewCoGBK inserts a new CoGBK edge into the graph.
func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
if len(ns) == 0 {
// TODO(BEAM-7086) Reduce the repetition in the context of all the errors in this file.
err := errors.New("needs at least 1 input")
return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
}
if !typex.IsKV(ns[0].Type()) {
err := errors.Errorf("input type must be KV: %v", ns[0])
return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
}
// (1) Create CoGBK result type: KV<T,U>, .., KV<T,Z> -> CoGBK<T,U,..,Z>.
c := ns[0].Coder.Components[0]
w := inputWindow(ns)
bounded := inputBounded(ns)
comp := []typex.FullType{c.T, ns[0].Type().Components()[1]}
for i := 1; i < len(ns); i++ {
n := ns[i]
if !typex.IsKV(n.Type()) {
err := errors.Errorf("input type must be KV: %v", n)
return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
}
if !n.Coder.Components[0].Equals(c) {
err := errors.Errorf("key coder for %v is %v, want %v", n, n.Coder.Components[0], c)
return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
}
if !w.Equals(n.WindowingStrategy()) {
err := errors.Errorf("mismatched CoGBK windowing strategies: %v, want %v", n.WindowingStrategy(), w)
return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
}
if bounded != n.Bounded() {
err := errors.Errorf("unmatched CoGBK boundedness: %v, want %v", n.Bounded(), bounded)
return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
}
comp = append(comp, n.Type().Components()[1])
}
t := typex.NewCoGBK(comp...)
out := g.NewNode(t, w, bounded)
// (2) Add CoGBK edge
edge := g.NewEdge(s)
edge.Op = CoGBK
for i := 0; i < len(ns); i++ {
edge.Input = append(edge.Input, &Inbound{Kind: Main, From: ns[i], Type: ns[i].Type()})
}
edge.Output = []*Outbound{{To: out, Type: t}}
return edge, nil
}
// NewFlatten inserts a new Flatten edge in the graph. Flatten output type is
// the shared input type.
func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
if len(in) < 2 {
err := errors.Errorf("Flatten needs at least 2 input, got %v", len(in))
return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
}
t := in[0].Type()
w := inputWindow(in)
// TODO(herohde) 4/5/2018: is it fine mixing boundedness for flatten?
// The output would be unbounded iff any input is.
bounded := true
for _, n := range in {
if !n.Bounded() {
bounded = false
break
}
}
for _, n := range in {
if !typex.IsEqual(t, n.Type()) {
err := errors.Errorf("mismatched Flatten input types: %v, want %v", n.Type(), t)
return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
}
if !w.Equals(n.WindowingStrategy()) {
err := errors.Errorf("mismatched Flatten window types: %v, want %v", n.WindowingStrategy(), w)
return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
}
}
if typex.IsCoGBK(t) {
err := errors.Errorf("Flatten input type cannot be CoGBK: %v", t)
return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
}
edge := g.NewEdge(s)
edge.Op = Flatten
for _, n := range in {
edge.Input = append(edge.Input, &Inbound{Kind: Main, From: n, Type: t})
}
edge.Output = []*Outbound{{To: g.NewNode(t, w, bounded), Type: t}}
return edge, nil
}
// NewExternal inserts an External transform. The system makes no assumptions about
// what this transform might do.
func NewExternal(g *Graph, s *Scope, payload *Payload, in []*Node, out []typex.FullType, bounded bool) *MultiEdge {
edge := g.NewEdge(s)
edge.Op = External
edge.Payload = payload
for _, n := range in {
edge.Input = append(edge.Input, &Inbound{Kind: Main, From: n, Type: n.Type()})
}
for _, t := range out {
n := g.NewNode(t, inputWindow(in), bounded)
edge.Output = append(edge.Output, &Outbound{To: n, Type: t})
}
return edge
}
// NewParDo inserts a new ParDo edge into the graph.
func NewParDo(g *Graph, s *Scope, u *DoFn, in []*Node, typedefs map[string]reflect.Type) (*MultiEdge, error) {
return newDoFnNode(ParDo, g, s, u, in, typedefs)
}
func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs map[string]reflect.Type) (*MultiEdge, error) {
// TODO(herohde) 5/22/2017: revisit choice of ProcessElement as representative. We should
// perhaps create a synthetic method for binding purposes? The main question is how to
// tell which side input binds to which if the signatures differ, which is a downside of
// positional binding.
inbound, kinds, outbound, out, err := Bind(u.ProcessElementFn(), typedefs, NodeTypes(in)...)
if err != nil {
return nil, errors.WithContextf(err, "creating new DoFn in scope %v", s)
}
edge := g.NewEdge(s)
edge.Op = op
edge.DoFn = u
for i := 0; i < len(in); i++ {
edge.Input = append(edge.Input, &Inbound{Kind: kinds[i], From: in[i], Type: inbound[i]})
}
for i := 0; i < len(out); i++ {
n := g.NewNode(out[i], inputWindow(in), inputBounded(in))
edge.Output = append(edge.Output, &Outbound{To: n, Type: outbound[i]})
}
return edge, nil
}
// CombinePerKeyScope is the Go SDK canonical name for the combine composite
// scope. With Beam Portability, "primitive" composite transforms like
// combine have their URNs & payloads attached to a high level scope, with a
// default representation beneath. The use of this const permits the
// translation layer to confirm the SDK expects this combine to be liftable
// by a runner and should set this scope's URN and Payload accordingly.
const CombinePerKeyScope = "CombinePerKey"
// NewCombine inserts a new Combine edge into the graph. Combines cannot have side
// input.
func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*MultiEdge, error) {
inT := in.Type()
if !typex.IsCoGBK(inT) {
err := errors.Errorf("Combine requires CoGBK type: %v", inT)
return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
}
if len(inT.Components()) > 2 {
err := errors.Errorf("Combine cannot follow multi-input CoGBK: %v", inT)
return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
}
// Create a synthetic function for binding purposes. It takes main input
// and returns the output type -- but hides the accumulator.
//
// (1) If AddInput exists, then it lists the main inputs. If not,
// the only main input type is the accumulator type.
// (2) If ExtractOutput exists then it returns the output type. If not,
// then the accumulator is the output type.
//
// MergeAccumulators is guaranteed to exist. We do not allow the accumulator
// to be a tuple type (i.e., so one can't define a inline KV merge function).
synth := &funcx.Fn{}
if f := u.AddInputFn(); f != nil {
// drop accumulator and irrelevant parameters
synth.Param = funcx.SubParams(f.Param, f.Params(funcx.FnValue)[1:]...)
} else {
synth.Param = []funcx.FnParam{{Kind: funcx.FnValue, T: u.MergeAccumulatorsFn().Ret[0].T}}
}
if f := u.ExtractOutputFn(); f != nil {
synth.Ret = f.Ret
} else {
synth.Ret = u.MergeAccumulatorsFn().Ret
}
// The shape of the inbound type and the type of the inbound node
// are different: a node type of CoGBK<A,B> will become B
// or KV<A,B>, depending on whether the combineFn is keyed or not.
// Combines may omit the key in the signature. In such a case,
// it is ignored for the purpose of binding. The runtime will later look at
// these types to decide whether to add the key or not.
//
// However, the outbound type will be KV<A,O> (where O is the output
// type) regardless of whether the combineFn is keyed or not.
if len(synth.Param) == 1 {
inT = inT.Components()[1] // Drop implicit key for binding purposes
} else {
inT = typex.NewKV(inT.Components()...)
}
// The runtime always adds the key for the output of combiners.
key := in.Type().Components()[0]
synth.Ret = append([]funcx.ReturnParam{{Kind: funcx.RetValue, T: key.Type()}}, synth.Ret...)
inbound, kinds, outbound, out, err := Bind(synth, nil, inT)
if err != nil {
return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
}
edge := g.NewEdge(s)
edge.Op = Combine
edge.CombineFn = u
edge.AccumCoder = ac
edge.Input = []*Inbound{{Kind: kinds[0], From: in, Type: inbound[0]}}
for i := 0; i < len(out); i++ {
n := g.NewNode(out[i], in.WindowingStrategy(), in.Bounded())
edge.Output = append(edge.Output, &Outbound{To: n, Type: outbound[i]})
}
return edge, nil
}
// NewImpulse inserts a new Impulse edge into the graph. It must use the
// built-in bytes coder.
func NewImpulse(g *Graph, s *Scope, value []byte) *MultiEdge {
ft := typex.New(reflectx.ByteSlice)
n := g.NewNode(ft, window.DefaultWindowingStrategy(), true)
n.Coder = coder.NewBytes()
edge := g.NewEdge(s)
edge.Op = Impulse
edge.Value = value
edge.Output = []*Outbound{{To: n, Type: ft}}
return edge
}
// NewWindowInto inserts a new WindowInto edge into the graph.
func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, in *Node) *MultiEdge {
n := g.NewNode(in.Type(), &window.WindowingStrategy{Fn: wfn}, in.Bounded())
n.Coder = in.Coder
edge := g.NewEdge(s)
edge.Op = WindowInto
edge.WindowFn = wfn
edge.Input = []*Inbound{{Kind: Main, From: in, Type: in.Type()}}
edge.Output = []*Outbound{{To: n, Type: in.Type()}}
return edge
}
func inputWindow(in []*Node) *window.WindowingStrategy {
if len(in) == 0 {
return window.DefaultWindowingStrategy()
}
return in[0].WindowingStrategy()
}
func inputBounded(in []*Node) bool {
if len(in) == 0 {
return true
}
return in[0].Bounded()
}