| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package exec |
| |
| import ( |
| "context" |
| "fmt" |
| "path" |
| |
| "github.com/apache/beam/sdks/go/pkg/beam/core/graph" |
| "github.com/apache/beam/sdks/go/pkg/beam/core/sdf" |
| "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" |
| ) |
| |
| // PairWithRestriction is an executor for the expanded SDF step of the same |
| // name. This is the first step of an expanded SDF. It pairs each main input |
| // element with a restriction via the SDF's associated sdf.RestrictionProvider. |
| // This step is followed by SplitAndSizeRestrictions. |
| type PairWithRestriction struct { |
| UID UnitID |
| Fn *graph.DoFn |
| Out Node |
| |
| inv *cirInvoker |
| } |
| |
| // ID returns the UnitID for this unit. |
| func (n *PairWithRestriction) ID() UnitID { |
| return n.UID |
| } |
| |
| // Up performs one-time setup for this executor. |
| func (n *PairWithRestriction) Up(ctx context.Context) error { |
| fn := (*graph.SplittableDoFn)(n.Fn).CreateInitialRestrictionFn() |
| var err error |
| if n.inv, err = newCreateInitialRestrictionInvoker(fn); err != nil { |
| return errors.WithContextf(err, "%v", n) |
| } |
| return nil |
| } |
| |
| // StartBundle currently does nothing. |
| func (n *PairWithRestriction) StartBundle(ctx context.Context, id string, data DataContext) error { |
| return n.Out.StartBundle(ctx, id, data) |
| } |
| |
| // ProcessElement expects elm to be the main input to the ParDo. See |
| // exec.FullValue for more details on the expected input. |
| // |
| // ProcessElement creates an initial restriction representing the entire input. |
| // The output is in the structure <elem, restriction>, where elem is the main |
| // input originally passed in (i.e. the parameter elm). Windows and Timestamp |
| // are copied to the outer *FullValue. They can be left within the original |
| // element, but won't be used by later SDF steps. |
| // |
| // Output Diagram: |
| // |
| // *FullValue { |
| // Elm: *FullValue (original input) |
| // Elm2: Restriction |
| // Windows |
| // Timestamps |
| // } |
| func (n *PairWithRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { |
| rest := n.inv.Invoke(elm) |
| output := FullValue{Elm: elm, Elm2: rest, Timestamp: elm.Timestamp, Windows: elm.Windows} |
| |
| return n.Out.ProcessElement(ctx, &output, values...) |
| } |
| |
| // FinishBundle resets the invokers. |
| func (n *PairWithRestriction) FinishBundle(ctx context.Context) error { |
| n.inv.Reset() |
| return n.Out.FinishBundle(ctx) |
| } |
| |
| // Down currently does nothing. |
| func (n *PairWithRestriction) Down(ctx context.Context) error { |
| return nil |
| } |
| |
| // String outputs a human-readable description of this transform. |
| func (n *PairWithRestriction) String() string { |
| return fmt.Sprintf("SDF.PairWithRestriction[%v] UID:%v Out:%v", path.Base(n.Fn.Name()), n.UID, IDs(n.Out)) |
| } |
| |
| // SplitAndSizeRestrictions is an executor for the expanded SDF step of the |
| // same name. It is the second step of the expanded SDF, occuring after |
| // CreateInitialRestriction. It performs initial splits on the initial restrictions |
| // and adds sizing information, producing one or more output elements per input |
| // element. This step is followed by ProcessSizedElementsAndRestrictions. |
| type SplitAndSizeRestrictions struct { |
| UID UnitID |
| Fn *graph.DoFn |
| Out Node |
| |
| splitInv *srInvoker |
| sizeInv *rsInvoker |
| } |
| |
| // ID returns the UnitID for this unit. |
| func (n *SplitAndSizeRestrictions) ID() UnitID { |
| return n.UID |
| } |
| |
| // Up performs one-time setup for this executor. |
| func (n *SplitAndSizeRestrictions) Up(ctx context.Context) error { |
| fn := (*graph.SplittableDoFn)(n.Fn).SplitRestrictionFn() |
| var err error |
| if n.splitInv, err = newSplitRestrictionInvoker(fn); err != nil { |
| return errors.WithContextf(err, "%v", n) |
| } |
| |
| fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn() |
| if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil { |
| return errors.WithContextf(err, "%v", n) |
| } |
| |
| return nil |
| } |
| |
| // StartBundle currently does nothing. |
| func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error { |
| return n.Out.StartBundle(ctx, id, data) |
| } |
| |
| // ProcessElement expects elm.Elm to hold the original input while elm.Elm2 |
| // contains the restriction. |
| // |
| // Input Diagram: |
| // |
| // *FullValue { |
| // Elm: *FullValue (original input) |
| // Elm2: Restriction |
| // Windows |
| // Timestamps |
| // } |
| // |
| // ProcessElement splits the given restriction into one or more restrictions and |
| // then sizes each. The outputs are in the structure <<elem, restriction>, size> |
| // where elem is the original main input to the unexpanded SDF. Windows and |
| // Timestamps are copied to each split output. |
| // |
| // Output Diagram: |
| // |
| // *FullValue { |
| // Elm: *FullValue { |
| // Elm: *FullValue (original input) |
| // Elm2: Restriction |
| // } |
| // Elm2: float64 (size) |
| // Windows |
| // Timestamps |
| // } |
| func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { |
| rest := elm.Elm2 |
| mainElm := elm.Elm.(*FullValue) |
| |
| splitRests := n.splitInv.Invoke(mainElm, rest) |
| if len(splitRests) == 0 { |
| err := errors.Errorf("initial splitting returned 0 restrictions.") |
| return errors.WithContextf(err, "%v", n) |
| } |
| |
| for _, splitRest := range splitRests { |
| size := n.sizeInv.Invoke(mainElm, splitRest) |
| output := &FullValue{} |
| |
| output.Timestamp = elm.Timestamp |
| output.Windows = elm.Windows |
| output.Elm = &FullValue{Elm: mainElm, Elm2: splitRest} |
| output.Elm2 = size |
| |
| if err := n.Out.ProcessElement(ctx, output, values...); err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| // FinishBundle resets the invokers. |
| func (n *SplitAndSizeRestrictions) FinishBundle(ctx context.Context) error { |
| n.splitInv.Reset() |
| n.sizeInv.Reset() |
| return n.Out.FinishBundle(ctx) |
| } |
| |
| // Down currently does nothing. |
| func (n *SplitAndSizeRestrictions) Down(ctx context.Context) error { |
| return nil |
| } |
| |
| // String outputs a human-readable description of this transform. |
| func (n *SplitAndSizeRestrictions) String() string { |
| return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", path.Base(n.Fn.Name()), n.UID, IDs(n.Out)) |
| } |
| |
| // ProcessSizedElementsAndRestrictions is an executor for the expanded SDF step |
| // of the same name. It is the final step of the expanded SDF. It sets up and |
| // invokes the user's SDF methods, similar to exec.ParDo but with slight |
| // changes to support the SDF's method signatures and the expected structure |
| // of the FullValue being received. |
| type ProcessSizedElementsAndRestrictions struct { |
| PDo *ParDo |
| |
| inv *ctInvoker |
| |
| // Rt allows this unit to send out restriction trackers being processed. |
| // Receivers of the tracker do not own it, and must send it back through the |
| // same channel once finished with it. |
| Rt chan sdf.RTracker |
| } |
| |
| // ID calls the ParDo's ID method. |
| func (n *ProcessSizedElementsAndRestrictions) ID() UnitID { |
| return n.PDo.ID() |
| } |
| |
| // Up performs some one-time setup and then calls the ParDo's Up method. |
| func (n *ProcessSizedElementsAndRestrictions) Up(ctx context.Context) error { |
| fn := (*graph.SplittableDoFn)(n.PDo.Fn).CreateTrackerFn() |
| var err error |
| if n.inv, err = newCreateTrackerInvoker(fn); err != nil { |
| return errors.WithContextf(err, "%v", n) |
| } |
| n.Rt = make(chan sdf.RTracker, 1) |
| return n.PDo.Up(ctx) |
| } |
| |
| // StartBundle calls the ParDo's StartBundle method. |
| func (n *ProcessSizedElementsAndRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error { |
| return n.PDo.StartBundle(ctx, id, data) |
| } |
| |
| // ProcessElement expects the same structure as the output of |
| // SplitAndSizeRestrictions, approximately <<elem, restriction>, size>. The |
| // only difference is that if the input was decoded in between the two steps, |
| // then single-element inputs were lifted from the *FullValue they were |
| // stored in. |
| // |
| // Input Diagram: |
| // |
| // *FullValue { |
| // Elm: *FullValue { |
| // Elm: *FullValue (KV input) or InputType (single-element input) |
| // Elm2: Restriction |
| // } |
| // Elm2: float64 (size) |
| // Windows |
| // Timestamps |
| // } |
| // |
| // ProcessElement then creates a restriction tracker from the stored restriction |
| // and processes each element using the underlying ParDo and adding the |
| // restriction tracker to the normal invocation. Sizing information is present |
| // but currently ignored. Output is forwarded to the underlying ParDo's outputs. |
| func (n *ProcessSizedElementsAndRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { |
| if n.PDo.status != Active { |
| err := errors.Errorf("invalid status %v, want Active", n.PDo.status) |
| return errors.WithContextf(err, "%v", n) |
| } |
| |
| rest := elm.Elm.(*FullValue).Elm2 |
| rt := n.inv.Invoke(rest) |
| n.Rt <- rt |
| mainIn := &MainInput{ |
| Values: values, |
| RTracker: rt, |
| } |
| |
| // For the key, the way we fill it out depends on whether the input element |
| // is a KV or single-element. Single-elements might have been lifted out of |
| // their FullValue if they were decoded, so we need to have a case for that. |
| // Also, we use the the top-level windows and timestamp. |
| // TODO(BEAM-9798): Optimize this so it's decided in exec/translate.go |
| // instead of checking per-element. |
| if userElm, ok := elm.Elm.(*FullValue).Elm.(*FullValue); ok { |
| mainIn.Key = FullValue{ |
| Elm: userElm.Elm, |
| Elm2: userElm.Elm2, |
| Timestamp: elm.Timestamp, |
| Windows: elm.Windows, |
| } |
| } else { |
| mainIn.Key = FullValue{ |
| Elm: elm.Elm.(*FullValue).Elm, |
| Timestamp: elm.Timestamp, |
| Windows: elm.Windows, |
| } |
| } |
| |
| err := n.PDo.processMainInput(mainIn) |
| <-n.Rt |
| return err |
| } |
| |
| // FinishBundle resets the invokers and then calls the ParDo's FinishBundle method. |
| func (n *ProcessSizedElementsAndRestrictions) FinishBundle(ctx context.Context) error { |
| n.inv.Reset() |
| return n.PDo.FinishBundle(ctx) |
| } |
| |
| // Down closes open channels and calls the ParDo's Down method. |
| func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error { |
| close(n.Rt) |
| return n.PDo.Down(ctx) |
| } |
| |
| // String outputs a human-readable description of this transform. |
| func (n *ProcessSizedElementsAndRestrictions) String() string { |
| return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] UID:%v Out:%v", path.Base(n.PDo.Fn.Name()), n.PDo.ID(), IDs(n.PDo.Out...)) |
| } |
| |
| // SdfFallback is an executor used when an SDF isn't expanded into steps by the |
| // runner, indicating that the runner doesn't support splitting. It executes all |
| // the SDF steps together in one unit. |
| type SdfFallback struct { |
| PDo *ParDo |
| |
| initRestInv *cirInvoker |
| splitInv *srInvoker |
| trackerInv *ctInvoker |
| } |
| |
| // ID calls the ParDo's ID method. |
| func (n *SdfFallback) ID() UnitID { |
| return n.PDo.UID |
| } |
| |
| // Up performs some one-time setup and then calls the ParDo's Up method. |
| func (n *SdfFallback) Up(ctx context.Context) error { |
| dfn := (*graph.SplittableDoFn)(n.PDo.Fn) |
| addContext := func(err error) error { |
| return errors.WithContextf(err, "%v", n) |
| } |
| var err error |
| if n.initRestInv, err = newCreateInitialRestrictionInvoker(dfn.CreateInitialRestrictionFn()); err != nil { |
| return addContext(err) |
| } |
| if n.splitInv, err = newSplitRestrictionInvoker(dfn.SplitRestrictionFn()); err != nil { |
| return addContext(err) |
| } |
| if n.trackerInv, err = newCreateTrackerInvoker(dfn.CreateTrackerFn()); err != nil { |
| return addContext(err) |
| } |
| return n.PDo.Up(ctx) |
| } |
| |
| // StartBundle calls the ParDo's StartBundle method. |
| func (n *SdfFallback) StartBundle(ctx context.Context, id string, data DataContext) error { |
| return n.PDo.StartBundle(ctx, id, data) |
| } |
| |
| // ProcessElement performs all the work from the steps above in one transform. |
| // This means creating initial restrictions, performing initial splits on those |
| // restrictions, and then creating restriction trackers and processing each |
| // restriction with the underlying ParDo. This executor skips the sizing step |
| // because sizing information is unnecessary for unexpanded SDFs. |
| func (n *SdfFallback) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { |
| if n.PDo.status != Active { |
| err := errors.Errorf("invalid status %v, want Active", n.PDo.status) |
| return errors.WithContextf(err, "%v", n) |
| } |
| |
| rest := n.initRestInv.Invoke(elm) |
| splitRests := n.splitInv.Invoke(elm, rest) |
| if len(splitRests) == 0 { |
| err := errors.Errorf("initial splitting returned 0 restrictions.") |
| return errors.WithContextf(err, "%v", n) |
| } |
| |
| for _, splitRest := range splitRests { |
| rt := n.trackerInv.Invoke(splitRest) |
| mainIn := &MainInput{ |
| Key: *elm, |
| Values: values, |
| RTracker: rt, |
| } |
| if err := n.PDo.processMainInput(mainIn); err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| // FinishBundle resets the invokers and then calls the ParDo's FinishBundle method. |
| func (n *SdfFallback) FinishBundle(ctx context.Context) error { |
| n.initRestInv.Reset() |
| n.splitInv.Reset() |
| n.trackerInv.Reset() |
| return n.PDo.FinishBundle(ctx) |
| } |
| |
| // Down calls the ParDo's Down method. |
| func (n *SdfFallback) Down(ctx context.Context) error { |
| return n.PDo.Down(ctx) |
| } |
| |
| // String outputs a human-readable description of this transform. |
| func (n *SdfFallback) String() string { |
| return fmt.Sprintf("SDF.SdfFallback[%v] UID:%v Out:%v", path.Base(n.PDo.Fn.Name()), n.PDo.ID(), IDs(n.PDo.Out...)) |
| } |