blob: 55e483694e3c7ab4b4d7ef04ca4abb5d3ca7c2a5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package direct contains the direct runner for running single-bundle
// pipelines in the current process. Useful for testing.
package direct
import (
"context"
"path"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
"github.com/apache/beam/sdks/go/pkg/beam/runners/vet"
)
func init() {
beam.RegisterRunner("direct", Execute)
}
// Execute runs the pipeline in-process.
func Execute(ctx context.Context, p *beam.Pipeline) error {
log.Info(ctx, "Executing pipeline with the direct runner.")
if !beam.Initialized() {
log.Warn(ctx, "Beam has not been initialized. Call beam.Init() before pipeline construction.")
}
log.Info(ctx, "Pipeline:")
log.Info(ctx, p)
if *jobopts.Strict {
log.Info(ctx, "Strict mode enabled, applying additional validation.")
if err := vet.Execute(ctx, p); err != nil {
return errors.Wrap(err, "strictness check failed")
}
log.Info(ctx, "Strict mode validation passed.")
}
edges, _, err := p.Build()
if err != nil {
return errors.Wrap(err, "invalid pipeline")
}
plan, err := Compile(edges)
if err != nil {
return errors.Wrap(err, "translation failed")
}
log.Info(ctx, plan)
if err = plan.Execute(ctx, "", exec.DataContext{}); err != nil {
plan.Down(ctx) // ignore any teardown errors
return err
}
if err = plan.Down(ctx); err != nil {
return err
}
metrics.DumpToLog(ctx)
return nil
}
// Compile translates a pipeline to a multi-bundle execution plan.
func Compile(edges []*graph.MultiEdge) (*exec.Plan, error) {
// (1) Preprocess graph structure to allow insertion of Multiplex,
// Flatten and Discard.
prev := make(map[int]int) // nodeID -> #incoming
succ := make(map[int][]linkID) // nodeID -> []linkID
edgeMap := make(map[int]*graph.MultiEdge)
for _, edge := range edges {
edgeMap[edge.ID()] = edge
for i, in := range edge.Input {
from := in.From.ID()
succ[from] = append(succ[from], linkID{edge.ID(), i})
}
for _, out := range edge.Output {
to := out.To.ID()
prev[to]++
}
}
// (2) Constructs the plan units recursively.
b := &builder{
prev: prev,
succ: succ,
edges: edgeMap,
nodes: make(map[int]exec.Node),
links: make(map[linkID]exec.Node),
idgen: &exec.GenID{},
}
var roots []exec.Unit
for _, edge := range edges {
switch edge.Op {
case graph.Impulse:
out, err := b.makeNode(edge.Output[0].To.ID())
if err != nil {
return nil, err
}
u := &Impulse{UID: b.idgen.New(), Value: edge.Value, Out: out}
roots = append(roots, u)
default:
// skip non-roots
}
}
return exec.NewPlan("plan", append(roots, b.units...))
}
// linkID represents an incoming data link to an Edge.
type linkID struct {
to int // graph.MultiEdge
input int // input index. If > 0, it's a side or CoGBK input.
}
// builder is the recursive builder for non-root execution nodes.
type builder struct {
prev map[int]int // nodeID -> #incoming
succ map[int][]linkID // nodeID -> []linkID
edges map[int]*graph.MultiEdge // edgeID -> Edge
nodes map[int]exec.Node // nodeID -> Node (cache)
links map[linkID]exec.Node // linkID -> Node (cache)
units []exec.Unit // result
idgen *exec.GenID
}
func (b *builder) makeNodes(out []*graph.Outbound) ([]exec.Node, error) {
var ret []exec.Node
for _, o := range out {
n, err := b.makeNode(o.To.ID())
if err != nil {
return nil, err
}
ret = append(ret, n)
}
return ret, nil
}
func (b *builder) makeNode(id int) (exec.Node, error) {
if n, ok := b.nodes[id]; ok {
return n, nil
}
list := b.succ[id]
var u exec.Node
switch len(list) {
case 0:
// Discard.
u = &exec.Discard{UID: b.idgen.New()}
case 1:
return b.makeLink(list[0])
default:
// Multiplex.
out, err := b.makeLinks(list)
if err != nil {
return nil, err
}
u = &exec.Multiplex{UID: b.idgen.New(), Out: out}
}
if count := b.prev[id]; count > 1 {
// Guard node with Flatten, if needed.
b.units = append(b.units, u)
u = &exec.Flatten{UID: b.idgen.New(), N: count, Out: u}
}
b.nodes[id] = u
b.units = append(b.units, u)
return u, nil
}
func (b *builder) makeLinks(ids []linkID) ([]exec.Node, error) {
var ret []exec.Node
for _, id := range ids {
n, err := b.makeLink(id)
if err != nil {
return nil, err
}
ret = append(ret, n)
}
return ret, nil
}
func (b *builder) makeLink(id linkID) (exec.Node, error) {
if n, ok := b.links[id]; ok {
return n, nil
}
// Process all incoming links for the edge and cache them. It thus doesn't matter
// which exact link triggers the Node generation. The link caching is only needed
// to process ParDo side inputs and CoGBK.
edge := b.edges[id.to]
out, err := b.makeNodes(edge.Output)
if err != nil {
return nil, err
}
var u exec.Node
switch edge.Op {
case graph.ParDo:
pardo := &exec.ParDo{
UID: b.idgen.New(),
Fn: edge.DoFn,
Inbound: edge.Input,
Out: out,
PID: path.Base(edge.DoFn.Name()),
}
if len(edge.Input) == 1 {
u = pardo
break
}
// ParDo w/ side input. We need to insert buffering and wait. We also need to
// ensure that we return the correct link node.
b.units = append(b.units, pardo)
w := &wait{UID: b.idgen.New(), need: len(edge.Input) - 1, next: pardo}
b.units = append(b.units, w)
b.links[linkID{edge.ID(), 0}] = w
for i := 1; i < len(edge.Input); i++ {
n := &buffer{uid: b.idgen.New(), next: w.ID(), read: pardo.ID(), notify: w.notify}
pardo.Side = append(pardo.Side, n)
b.units = append(b.units, n)
b.links[linkID{edge.ID(), i}] = n
}
return b.links[id], nil
case graph.Combine:
usesKey := typex.IsKV(edge.Input[0].Type)
u = &exec.Combine{
UID: b.idgen.New(),
Fn: edge.CombineFn,
UsesKey: usesKey,
Out: out[0],
PID: path.Base(edge.CombineFn.Name()),
}
case graph.CoGBK:
u = &CoGBK{UID: b.idgen.New(), Edge: edge, Out: out[0]}
b.units = append(b.units, u)
// CoGBK needs injection of each incoming index. If > 1 incoming,
// insert Flatten as well.
if len(edge.Input) > 1 {
u = &exec.Flatten{UID: b.idgen.New(), N: len(edge.Input), Out: u}
b.units = append(b.units, u)
}
for i := 0; i < len(edge.Input); i++ {
n := &Inject{UID: b.idgen.New(), N: i, Out: u}
b.units = append(b.units, n)
b.links[linkID{edge.ID(), i}] = n
}
return b.links[id], nil
case graph.Flatten:
u = &exec.Flatten{UID: b.idgen.New(), N: len(edge.Input), Out: out[0]}
for i := 0; i < len(edge.Input); i++ {
b.links[linkID{edge.ID(), i}] = u
}
case graph.WindowInto:
u = &exec.WindowInto{UID: b.idgen.New(), Fn: edge.WindowFn, Out: out[0]}
default:
return nil, errors.Errorf("unexpected edge: %v", edge)
}
b.links[id] = u
b.units = append(b.units, u)
return u, nil
}