blob: 29b40c818ae59d7ac0cfea051adb97dda36f040d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exec
import (
"context"
"fmt"
"path"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
// PairWithRestriction is an executor for the expanded SDF step of the same
// name. This is the first step of an expanded SDF. It pairs each main input
// element with a restriction via the SDF's associated sdf.RestrictionProvider.
// This step is followed by SplitAndSizeRestrictions.
type PairWithRestriction struct {
UID UnitID
Fn *graph.DoFn
Out Node
inv *cirInvoker
}
// ID returns the UnitID for this unit.
func (n *PairWithRestriction) ID() UnitID {
return n.UID
}
// Up performs one-time setup for this executor.
func (n *PairWithRestriction) Up(_ context.Context) error {
fn := (*graph.SplittableDoFn)(n.Fn).CreateInitialRestrictionFn()
var err error
if n.inv, err = newCreateInitialRestrictionInvoker(fn); err != nil {
return errors.WithContextf(err, "%v", n)
}
return nil
}
// StartBundle currently does nothing.
func (n *PairWithRestriction) StartBundle(ctx context.Context, id string, data DataContext) error {
return n.Out.StartBundle(ctx, id, data)
}
// ProcessElement expects elm to be the main input to the ParDo. See
// exec.FullValue for more details on the expected input.
//
// ProcessElement creates an initial restriction representing the entire input.
// The output is in the structure <elem, restriction>, where elem is the main
// input originally passed in (i.e. the parameter elm). Windows and Timestamp
// are copied to the outer *FullValue. They can be left within the original
// element, but won't be used by later SDF steps.
//
// Output Diagram:
//
// *FullValue {
// Elm: *FullValue (original input)
// Elm2: Restriction
// Windows
// Timestamps
// }
func (n *PairWithRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
rest := n.inv.Invoke(elm)
output := FullValue{Elm: elm, Elm2: rest, Timestamp: elm.Timestamp, Windows: elm.Windows}
return n.Out.ProcessElement(ctx, &output, values...)
}
// FinishBundle resets the invokers.
func (n *PairWithRestriction) FinishBundle(ctx context.Context) error {
n.inv.Reset()
return n.Out.FinishBundle(ctx)
}
// Down currently does nothing.
func (n *PairWithRestriction) Down(_ context.Context) error {
return nil
}
// String outputs a human-readable description of this transform.
func (n *PairWithRestriction) String() string {
return fmt.Sprintf("SDF.PairWithRestriction[%v] UID:%v Out:%v", path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
}
// SplitAndSizeRestrictions is an executor for the expanded SDF step of the
// same name. It is the second step of the expanded SDF, occuring after
// CreateInitialRestriction. It performs initial splits on the initial restrictions
// and adds sizing information, producing one or more output elements per input
// element. This step is followed by ProcessSizedElementsAndRestrictions.
type SplitAndSizeRestrictions struct {
UID UnitID
Fn *graph.DoFn
Out Node
splitInv *srInvoker
sizeInv *rsInvoker
}
// ID returns the UnitID for this unit.
func (n *SplitAndSizeRestrictions) ID() UnitID {
return n.UID
}
// Up performs one-time setup for this executor.
func (n *SplitAndSizeRestrictions) Up(_ context.Context) error {
fn := (*graph.SplittableDoFn)(n.Fn).SplitRestrictionFn()
var err error
if n.splitInv, err = newSplitRestrictionInvoker(fn); err != nil {
return errors.WithContextf(err, "%v", n)
}
fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
return errors.WithContextf(err, "%v", n)
}
return nil
}
// StartBundle currently does nothing.
func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error {
return n.Out.StartBundle(ctx, id, data)
}
// ProcessElement expects elm.Elm to hold the original input while elm.Elm2
// contains the restriction.
//
// Input Diagram:
//
// *FullValue {
// Elm: *FullValue (original input)
// Elm2: Restriction
// Windows
// Timestamps
// }
//
// ProcessElement splits the given restriction into one or more restrictions and
// then sizes each. The outputs are in the structure <<elem, restriction>, size>
// where elem is the original main input to the unexpanded SDF. Windows and
// Timestamps are copied to each split output.
//
// Output Diagram:
//
// *FullValue {
// Elm: *FullValue {
// Elm: *FullValue (original input)
// Elm2: Restriction
// }
// Elm2: float64 (size)
// Windows
// Timestamps
// }
func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
rest := elm.Elm2
mainElm := elm.Elm.(*FullValue)
splitRests := n.splitInv.Invoke(mainElm, rest)
if len(splitRests) == 0 {
err := errors.Errorf("initial splitting returned 0 restrictions.")
return errors.WithContextf(err, "%v", n)
}
for _, splitRest := range splitRests {
size := n.sizeInv.Invoke(mainElm, splitRest)
output := &FullValue{}
output.Timestamp = elm.Timestamp
output.Windows = elm.Windows
output.Elm = &FullValue{Elm: mainElm, Elm2: splitRest}
output.Elm2 = size
if err := n.Out.ProcessElement(ctx, output, values...); err != nil {
return err
}
}
return nil
}
// FinishBundle resets the invokers.
func (n *SplitAndSizeRestrictions) FinishBundle(ctx context.Context) error {
n.splitInv.Reset()
n.sizeInv.Reset()
return n.Out.FinishBundle(ctx)
}
// Down currently does nothing.
func (n *SplitAndSizeRestrictions) Down(_ context.Context) error {
return nil
}
// String outputs a human-readable description of this transform.
func (n *SplitAndSizeRestrictions) String() string {
return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
}
// ProcessSizedElementsAndRestrictions is an executor for the expanded SDF step
// of the same name. It is the final step of the expanded SDF. It sets up and
// invokes the user's SDF methods, similar to exec.ParDo but with slight
// changes to support the SDF's method signatures and the expected structure
// of the FullValue being received.
type ProcessSizedElementsAndRestrictions struct {
PDo *ParDo
TfId string // Transform ID. Needed for splitting.
ctInv *ctInvoker
sizeInv *rsInvoker
// SU is a buffered channel for indicating when this unit is splittable.
// When this unit is processing an element, it sends a SplittableUnit
// interface through the channel. That interface can be received on other
// threads and used to perform splitting or other related operation.
//
// This channel should be received on in a non-blocking manner, to avoid
// hanging if no element is processing.
//
// Receiving the SplittableUnit prevents the current element from finishing
// processing, so the element does not unexpectedly change during a split.
// Therefore, receivers of the SplittableUnit must send it back through the
// channel once finished with it, or it will block indefinitely.
SU chan SplittableUnit
elm *FullValue // Currently processing element.
rt sdf.RTracker // Currently processing element's restriction tracker.
}
// ID calls the ParDo's ID method.
func (n *ProcessSizedElementsAndRestrictions) ID() UnitID {
return n.PDo.ID()
}
// Up performs some one-time setup and then calls the ParDo's Up method.
func (n *ProcessSizedElementsAndRestrictions) Up(ctx context.Context) error {
fn := (*graph.SplittableDoFn)(n.PDo.Fn).CreateTrackerFn()
var err error
if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
return errors.WithContextf(err, "%v", n)
}
fn = (*graph.SplittableDoFn)(n.PDo.Fn).RestrictionSizeFn()
if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
return errors.WithContextf(err, "%v", n)
}
n.SU = make(chan SplittableUnit, 1)
return n.PDo.Up(ctx)
}
// StartBundle calls the ParDo's StartBundle method.
func (n *ProcessSizedElementsAndRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error {
return n.PDo.StartBundle(ctx, id, data)
}
// ProcessElement expects the same structure as the output of
// SplitAndSizeRestrictions, approximately <<elem, restriction>, size>. The
// only difference is that if the input was decoded in between the two steps,
// then single-element inputs were lifted from the *FullValue they were
// stored in.
//
// Input Diagram:
//
// *FullValue {
// Elm: *FullValue {
// Elm: *FullValue (KV input) or InputType (single-element input)
// Elm2: Restriction
// }
// Elm2: float64 (size)
// Windows
// Timestamps
// }
//
// ProcessElement then creates a restriction tracker from the stored restriction
// and processes each element using the underlying ParDo and adding the
// restriction tracker to the normal invocation. Sizing information is present
// but currently ignored. Output is forwarded to the underlying ParDo's outputs.
func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context, elm *FullValue, values ...ReStream) error {
if n.PDo.status != Active {
err := errors.Errorf("invalid status %v, want Active", n.PDo.status)
return errors.WithContextf(err, "%v", n)
}
rest := elm.Elm.(*FullValue).Elm2
rt := n.ctInv.Invoke(rest)
n.rt = rt
n.elm = elm
n.SU <- n
defer func() {
<-n.SU
}()
mainIn := &MainInput{
Values: values,
RTracker: rt,
}
// For the key, the way we fill it out depends on whether the input element
// is a KV or single-element. Single-elements might have been lifted out of
// their FullValue if they were decoded, so we need to have a case for that.
// Also, we use the the top-level windows and timestamp.
// TODO(BEAM-9798): Optimize this so it's decided in exec/translate.go
// instead of checking per-element.
if userElm, ok := elm.Elm.(*FullValue).Elm.(*FullValue); ok {
mainIn.Key = FullValue{
Elm: userElm.Elm,
Elm2: userElm.Elm2,
Timestamp: elm.Timestamp,
Windows: elm.Windows,
}
} else {
mainIn.Key = FullValue{
Elm: elm.Elm.(*FullValue).Elm,
Timestamp: elm.Timestamp,
Windows: elm.Windows,
}
}
return n.PDo.processMainInput(mainIn)
}
// FinishBundle resets the invokers and then calls the ParDo's FinishBundle method.
func (n *ProcessSizedElementsAndRestrictions) FinishBundle(ctx context.Context) error {
n.ctInv.Reset()
n.sizeInv.Reset()
return n.PDo.FinishBundle(ctx)
}
// Down calls the ParDo's Down method.
func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error {
return n.PDo.Down(ctx)
}
// String outputs a human-readable description of this transform.
func (n *ProcessSizedElementsAndRestrictions) String() string {
return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] UID:%v Out:%v", path.Base(n.PDo.Fn.Name()), n.PDo.ID(), IDs(n.PDo.Out...))
}
// SplittableUnit is an interface that defines sub-element splitting operations
// for a unit, and provides access to them on other threads.
type SplittableUnit interface {
// Split performs a split on a fraction of a currently processing element
// and returns the primary and residual elements resulting from it, or an
// error if the split failed.
Split(fraction float64) (primary, residual *FullValue, err error)
// GetProgress returns the fraction of progress the current element has
// made in processing. (ex. 0.0 means no progress, and 1.0 means fully
// processed.)
GetProgress() float64
// GetTransformId returns the transform ID of the splittable unit.
GetTransformId() string
// GetInputId returns the local input ID of the input that the element being
// split was received from.
GetInputId() string
}
// Split splits the currently processing element using its restriction tracker.
// Then it returns an element for primary and residual, following the expected
// input structure to this unit, including updating the size of the split
// elements.
func (n *ProcessSizedElementsAndRestrictions) Split(f float64) (*FullValue, *FullValue, error) {
addContext := func(err error) error {
return errors.WithContext(err, "Attempting split in ProcessSizedElementsAndRestrictions")
}
// Check that the restriction tracker is in a state where it can be split.
if n.rt == nil {
return nil, nil, addContext(errors.New("Restriction tracker missing."))
}
if err := n.rt.GetError(); err != nil {
return nil, nil, addContext(err)
}
if n.rt.IsDone() { // Not an error, but not splittable.
return nil, nil, nil
}
p, r, err := n.rt.TrySplit(f)
if err != nil {
return nil, nil, addContext(err)
}
if r == nil { // If r is nil then the split failed/returned an empty residual.
return nil, nil, nil
}
var pfv, rfv *FullValue
var pSize, rSize float64
elm := n.elm.Elm.(*FullValue).Elm
if fv, ok := elm.(*FullValue); ok {
pSize = n.sizeInv.Invoke(fv, p)
rSize = n.sizeInv.Invoke(fv, r)
} else {
fv := &FullValue{Elm: elm}
pSize = n.sizeInv.Invoke(fv, p)
rSize = n.sizeInv.Invoke(fv, r)
}
pfv = &FullValue{
Elm: &FullValue{
Elm: elm,
Elm2: p,
},
Elm2: pSize,
Timestamp: n.elm.Timestamp,
Windows: n.elm.Windows,
}
rfv = &FullValue{
Elm: &FullValue{
Elm: elm,
Elm2: r,
},
Elm2: rSize,
Timestamp: n.elm.Timestamp,
Windows: n.elm.Windows,
}
return pfv, rfv, nil
}
// GetProgress returns the current restriction tracker's progress as a fraction.
func (n *ProcessSizedElementsAndRestrictions) GetProgress() float64 {
d, r := n.rt.GetProgress()
return d / (d + r)
}
// GetTransformId returns this transform's transform ID.
func (n *ProcessSizedElementsAndRestrictions) GetTransformId() string {
return n.TfId
}
// GetInputId returns the main input ID, since main input elements are being
// split.
func (n *ProcessSizedElementsAndRestrictions) GetInputId() string {
return indexToInputId(0)
}
// SdfFallback is an executor used when an SDF isn't expanded into steps by the
// runner, indicating that the runner doesn't support splitting. It executes all
// the SDF steps together in one unit.
type SdfFallback struct {
PDo *ParDo
initRestInv *cirInvoker
splitInv *srInvoker
trackerInv *ctInvoker
}
// ID calls the ParDo's ID method.
func (n *SdfFallback) ID() UnitID {
return n.PDo.UID
}
// Up performs some one-time setup and then calls the ParDo's Up method.
func (n *SdfFallback) Up(ctx context.Context) error {
dfn := (*graph.SplittableDoFn)(n.PDo.Fn)
addContext := func(err error) error {
return errors.WithContextf(err, "%v", n)
}
var err error
if n.initRestInv, err = newCreateInitialRestrictionInvoker(dfn.CreateInitialRestrictionFn()); err != nil {
return addContext(err)
}
if n.splitInv, err = newSplitRestrictionInvoker(dfn.SplitRestrictionFn()); err != nil {
return addContext(err)
}
if n.trackerInv, err = newCreateTrackerInvoker(dfn.CreateTrackerFn()); err != nil {
return addContext(err)
}
return n.PDo.Up(ctx)
}
// StartBundle calls the ParDo's StartBundle method.
func (n *SdfFallback) StartBundle(ctx context.Context, id string, data DataContext) error {
return n.PDo.StartBundle(ctx, id, data)
}
// ProcessElement performs all the work from the steps above in one transform.
// This means creating initial restrictions, performing initial splits on those
// restrictions, and then creating restriction trackers and processing each
// restriction with the underlying ParDo. This executor skips the sizing step
// because sizing information is unnecessary for unexpanded SDFs.
func (n *SdfFallback) ProcessElement(_ context.Context, elm *FullValue, values ...ReStream) error {
if n.PDo.status != Active {
err := errors.Errorf("invalid status %v, want Active", n.PDo.status)
return errors.WithContextf(err, "%v", n)
}
rest := n.initRestInv.Invoke(elm)
splitRests := n.splitInv.Invoke(elm, rest)
if len(splitRests) == 0 {
err := errors.Errorf("initial splitting returned 0 restrictions.")
return errors.WithContextf(err, "%v", n)
}
for _, splitRest := range splitRests {
rt := n.trackerInv.Invoke(splitRest)
mainIn := &MainInput{
Key: *elm,
Values: values,
RTracker: rt,
}
if err := n.PDo.processMainInput(mainIn); err != nil {
return err
}
}
return nil
}
// FinishBundle resets the invokers and then calls the ParDo's FinishBundle method.
func (n *SdfFallback) FinishBundle(ctx context.Context) error {
n.initRestInv.Reset()
n.splitInv.Reset()
n.trackerInv.Reset()
return n.PDo.FinishBundle(ctx)
}
// Down calls the ParDo's Down method.
func (n *SdfFallback) Down(ctx context.Context) error {
return n.PDo.Down(ctx)
}
// String outputs a human-readable description of this transform.
func (n *SdfFallback) String() string {
return fmt.Sprintf("SDF.SdfFallback[%v] UID:%v Out:%v", path.Base(n.PDo.Fn.Name()), n.PDo.ID(), IDs(n.PDo.Out...))
}