| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package exec |
| |
| import ( |
| "context" |
| "fmt" |
| "path" |
| |
| "github.com/apache/beam/sdks/go/pkg/beam/core/funcx" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/graph" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/metrics" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/typex" |
| "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" |
| "github.com/apache/beam/sdks/go/pkg/beam/util/errorx" |
| ) |
| |
| // ParDo is a DoFn executor. |
| type ParDo struct { |
| UID UnitID |
| Fn *graph.DoFn |
| Inbound []*graph.Inbound |
| Side []SideInputAdapter |
| Out []Node |
| |
| PID string |
| emitters []ReusableEmitter |
| ctx context.Context |
| inv *invoker |
| |
| side StateReader |
| cache *cacheElm |
| |
| status Status |
| err errorx.GuardedError |
| } |
| |
| // GetPID returns the PTransformID for this ParDo. |
| func (n *ParDo) GetPID() string { |
| return n.PID |
| } |
| |
| // cacheElm holds per-window cached information about side input. |
| type cacheElm struct { |
| key typex.Window |
| sideinput []ReusableInput |
| extra []interface{} |
| } |
| |
| // ID returns the UnitID for this ParDo. |
| func (n *ParDo) ID() UnitID { |
| return n.UID |
| } |
| |
| // Up initializes this ParDo and does one-time DoFn setup. |
| func (n *ParDo) Up(ctx context.Context) error { |
| if n.status != Initializing { |
| return errors.Errorf("invalid status for pardo %v: %v, want Initializing", n.UID, n.status) |
| } |
| n.status = Up |
| n.inv = newInvoker(n.Fn.ProcessElementFn()) |
| |
| if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err != nil { |
| return n.fail(err) |
| } |
| |
| emitters, err := makeEmitters(n.Fn.ProcessElementFn(), n.Out) |
| if err != nil { |
| return n.fail(err) |
| } |
| n.emitters = emitters |
| return nil |
| } |
| |
| // StartBundle does pre-bundle processing operation for the DoFn. |
| func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) error { |
| if n.status != Up { |
| return errors.Errorf("invalid status for pardo %v: %v, want Up", n.UID, n.status) |
| } |
| n.status = Active |
| n.side = data.State |
| // Allocating contexts all the time is expensive, but we seldom re-write them, |
| // and never accept modified contexts from users, so we will cache them per-bundle |
| // per-unit, to avoid the constant allocation overhead. |
| n.ctx = metrics.SetPTransformID(ctx, n.PID) |
| |
| if err := MultiStartBundle(n.ctx, id, data, n.Out...); err != nil { |
| return n.fail(err) |
| } |
| |
| // TODO(BEAM-3303): what to set for StartBundle/FinishBundle window and emitter timestamp? |
| |
| if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.StartBundleFn(), nil); err != nil { |
| return n.fail(err) |
| } |
| return nil |
| } |
| |
| // ProcessElement processes each parallel element with the DoFn. |
| func (n *ParDo) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { |
| if n.status != Active { |
| return errors.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status) |
| } |
| // If the function observes windows, we must invoke it for each window. The expected fast path |
| // is that either there is a single window or the function doesn't observes windows. |
| |
| if !mustExplodeWindows(n.inv.fn, elm, len(n.Side) > 0) { |
| val, err := n.invokeProcessFn(n.ctx, elm.Windows, elm.Timestamp, &MainInput{Key: *elm, Values: values}) |
| if err != nil { |
| return n.fail(err) |
| } |
| |
| // Forward direct output, if any. It is always a main output. |
| if val != nil { |
| return n.Out[0].ProcessElement(n.ctx, val) |
| } |
| } else { |
| for _, w := range elm.Windows { |
| wElm := FullValue{Elm: elm.Elm, Elm2: elm.Elm2, Timestamp: elm.Timestamp, Windows: []typex.Window{w}} |
| |
| val, err := n.invokeProcessFn(n.ctx, wElm.Windows, wElm.Timestamp, &MainInput{Key: wElm, Values: values}) |
| if err != nil { |
| return n.fail(err) |
| } |
| |
| // Forward direct output, if any. It is always a main output. |
| if val != nil { |
| return n.Out[0].ProcessElement(n.ctx, val) |
| } |
| } |
| } |
| return nil |
| } |
| |
| // mustExplodeWindows returns true iif we need to call the function |
| // for each window. It is needed if the function either observes the |
| // window, either directly or indirectly via (windowed) side inputs. |
| func mustExplodeWindows(fn *funcx.Fn, elm *FullValue, usesSideInput bool) bool { |
| if len(elm.Windows) < 2 { |
| return false |
| } |
| _, explode := fn.Window() |
| return explode || usesSideInput |
| } |
| |
| // FinishBundle does post-bundle processing operations for the DoFn. |
| // Note: This is not a "FinalizeBundle" operation. Data is not yet durably |
| // persisted at this point. |
| func (n *ParDo) FinishBundle(ctx context.Context) error { |
| if n.status != Active { |
| return errors.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status) |
| } |
| n.status = Up |
| n.inv.Reset() |
| |
| if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil { |
| return n.fail(err) |
| } |
| n.side = nil |
| n.cache = nil |
| |
| if err := MultiFinishBundle(n.ctx, n.Out...); err != nil { |
| return n.fail(err) |
| } |
| return nil |
| } |
| |
| // Down performs best-effort teardown of DoFn resources. (May not run.) |
| func (n *ParDo) Down(ctx context.Context) error { |
| if n.status == Down { |
| return n.err.Error() |
| } |
| n.status = Down |
| n.side = nil |
| n.cache = nil |
| |
| if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err != nil { |
| n.err.TrySetError(err) |
| } |
| return n.err.Error() |
| } |
| |
| func (n *ParDo) initSideInput(ctx context.Context, w typex.Window) error { |
| if n.cache == nil { |
| // First time: init single-element cache. We know that side input |
| // must come before emitters in the signature. |
| |
| sideCount := len(n.Side) |
| emitCount := len(n.emitters) |
| |
| n.cache = &cacheElm{ |
| key: w, |
| extra: make([]interface{}, sideCount+emitCount, sideCount+emitCount), |
| } |
| for i, emit := range n.emitters { |
| n.cache.extra[i+sideCount] = emit.Value() |
| } |
| } else if w.Equals(n.cache.key) { |
| // Fast path: same window. Just unwind the side inputs. |
| |
| for _, s := range n.cache.sideinput { |
| if err := s.Init(); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // Slow path: init side input for the given window |
| |
| streams := make([]ReStream, len(n.Side), len(n.Side)) |
| for i, adapter := range n.Side { |
| s, err := adapter.NewIterable(ctx, n.side, w) |
| if err != nil { |
| return err |
| } |
| streams[i] = s |
| } |
| |
| sideinput, err := makeSideInputs(n.Fn.ProcessElementFn(), n.Inbound, streams) |
| if err != nil { |
| return err |
| } |
| n.cache.sideinput = sideinput |
| for i := 0; i < len(n.Side); i++ { |
| n.cache.extra[i] = sideinput[i].Value() |
| } |
| |
| for _, s := range n.cache.sideinput { |
| if err := s.Init(); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // invokeDataFn handle non-per element invocations. |
| func (n *ParDo) invokeDataFn(ctx context.Context, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput) (*FullValue, error) { |
| if fn == nil { |
| return nil, nil |
| } |
| if err := n.preInvoke(ctx, ws, ts); err != nil { |
| return nil, err |
| } |
| val, err := Invoke(ctx, ws, ts, fn, opt, n.cache.extra...) |
| if err != nil { |
| return nil, err |
| } |
| if err := n.postInvoke(); err != nil { |
| return nil, err |
| } |
| return val, nil |
| } |
| |
| // invokeProcessFn handles the per element invocations |
| func (n *ParDo) invokeProcessFn(ctx context.Context, ws []typex.Window, ts typex.EventTime, opt *MainInput) (*FullValue, error) { |
| if err := n.preInvoke(ctx, ws, ts); err != nil { |
| return nil, err |
| } |
| val, err := n.inv.Invoke(ctx, ws, ts, opt, n.cache.extra...) |
| if err != nil { |
| return nil, err |
| } |
| if err := n.postInvoke(); err != nil { |
| return nil, err |
| } |
| return val, nil |
| } |
| |
| func (n *ParDo) preInvoke(ctx context.Context, ws []typex.Window, ts typex.EventTime) error { |
| for _, e := range n.emitters { |
| if err := e.Init(ctx, ws, ts); err != nil { |
| return err |
| } |
| } |
| return n.initSideInput(ctx, ws[0]) |
| } |
| |
| func (n *ParDo) postInvoke() error { |
| for _, s := range n.cache.sideinput { |
| if err := s.Reset(); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func (n *ParDo) fail(err error) error { |
| n.status = Broken |
| n.err.TrySetError(err) |
| return err |
| } |
| |
| func (n *ParDo) String() string { |
| return fmt.Sprintf("ParDo[%v] Out:%v", path.Base(n.Fn.Name()), IDs(n.Out...)) |
| } |