blob: ab2a8e8abf2000eb744d5ce9f89b3952832c2d8d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package beam
import (
"fmt"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
func addParDoCtx(err error, s Scope) error {
return errors.WithContextf(err, "inserting ParDo in scope %s", s)
}
// TryParDo attempts to insert a ParDo transform into the pipeline. It may fail
// for multiple reasons, notably that the dofn is not valid or cannot be bound
// -- due to type mismatch, say -- to the incoming PCollections.
func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCollection, error) {
side, typedefs, err := validate(s, col, opts)
if err != nil {
return nil, addParDoCtx(err, s)
}
doFnOpt := graph.NumMainInputs(graph.MainSingle)
// Check the PCollection for any keyed type (not just KV specifically).
if typex.IsKV(col.Type()) {
doFnOpt = graph.NumMainInputs(graph.MainKv)
} else if typex.IsCoGBK(col.Type()) {
doFnOpt = graph.CoGBKMainInput(len(col.Type().Components()))
}
fn, err := graph.NewDoFn(dofn, doFnOpt)
if err != nil {
return nil, addParDoCtx(err, s)
}
in := []*graph.Node{col.n}
for _, s := range side {
in = append(in, s.Input.n)
}
var rc *coder.Coder
if fn.IsSplittable() {
sdf := (*graph.SplittableDoFn)(fn)
rc, err = inferCoder(typex.New(sdf.RestrictionT()))
if err != nil {
return nil, addParDoCtx(err, s)
}
}
edge, err := graph.NewParDo(s.real, s.scope, fn, in, rc, typedefs)
if err != nil {
return nil, addParDoCtx(err, s)
}
var ret []PCollection
for _, out := range edge.Output {
c := PCollection{out.To}
c.SetCoder(NewCoder(c.Type()))
ret = append(ret, c)
}
return ret, nil
}
// ParDoN inserts a ParDo with any number of outputs into the pipeline.
func ParDoN(s Scope, dofn interface{}, col PCollection, opts ...Option) []PCollection {
return MustN(TryParDo(s, dofn, col, opts...))
}
// ParDo0 inserts a ParDo with zero output transform into the pipeline.
func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) {
ret := MustN(TryParDo(s, dofn, col, opts...))
if len(ret) != 0 {
panic(formatParDoError(dofn, len(ret), 0))
}
}
// ParDo is the core element-wise PTransform in Apache Beam, invoking a
// user-specified function on each of the elements of the input PCollection
// to produce zero or more output elements, all of which are collected into
// the output PCollection. Use one of the ParDo variants for a different
// number of output PCollections. The PCollections do no need to have the
// same types.
//
// Elements are processed independently, and possibly in parallel across
// distributed cloud resources. The ParDo processing style is similar to what
// happens inside the "Mapper" or "Reducer" class of a MapReduce-style
// algorithm.
//
// DoFns
//
// The function to use to process each element is specified by a DoFn, either as
// single function or as a struct with methods, notably ProcessElement. The
// struct may also define Setup, StartBundle, FinishBundle and Teardown methods.
// The struct is JSON-serialized and may contain construction-time values.
//
// Conceptually, when a ParDo transform is executed, the elements of the input
// PCollection are first divided up into some number of "bundles". These are
// farmed off to distributed worker machines (or run locally, if using the
// direct runner). For each bundle of input elements processing proceeds as
// follows:
//
// * If a struct, a fresh instance of the argument DoFn is created on a
// worker from json serialization, and the Setup method is called on this
// instance, if present. A runner may reuse DoFn instances for multiple
// bundles. A DoFn that has terminated abnormally (by returning an error)
// will never be reused.
// * The DoFn's StartBundle method, if provided, is called to initialize it.
// * The DoFn's ProcessElement method is called on each of the input elements
// in the bundle.
// * The DoFn's FinishBundle method, if provided, is called to complete its
// work. After FinishBundle is called, the framework will not again invoke
// ProcessElement or FinishBundle until a new call to StartBundle has
// occurred.
// * If any of Setup, StartBundle, ProcessElement or FinishBundle methods
// return an error, the Teardown method, if provided, will be called on the
// DoFn instance.
// * If a runner will no longer use a DoFn, the Teardown method, if provided,
// will be called on the discarded instance.
//
// Each of the calls to any of the DoFn's processing methods can produce zero
// or more output elements. All of the of output elements from all of the DoFn
// instances are included in an output PCollection.
//
// For example:
//
// words := beam.ParDo(s, &Foo{...}, ...)
// lengths := beam.ParDo(s, func (word string) int) {
// return len(word)
// }, words)
//
//
// Each output element has the same timestamp and is in the same windows as its
// corresponding input element. The timestamp can be accessed and/or emitted by
// including a EventTime-typed parameter. The name of the function or struct is
// used as the DoFn name. Function literals do not have stable names and should
// thus not be used in production code.
//
// Side Inputs
//
// While a ParDo processes elements from a single "main input" PCollection, it
// can take additional "side input" PCollections. These SideInput along with
// the DoFn parameter form express styles of accessing PCollection computed by
// earlier pipeline operations, passed in to the ParDo transform using SideInput
// options, and their contents accessible to each of the DoFn operations. For
// example:
//
// words := ...
// cufoff := ... // Singleton PCollection<int>
// smallWords := beam.ParDo(s, func (word string, cutoff int, emit func(string)) {
// if len(word) < cutoff {
// emit(word)
// }
// }, words, beam.SideInput{Input: cutoff})
//
// Additional Outputs
//
// Optionally, a ParDo transform can produce zero or multiple output
// PCollections. Note the use of ParDo2 to specfic 2 outputs. For example:
//
// words := ...
// cufoff := ... // Singleton PCollection<int>
// small, big := beam.ParDo2(s, func (word string, cutoff int, small, big func(string)) {
// if len(word) < cutoff {
// small(word)
// } else {
// big(word)
// }
// }, words, beam.SideInput{Input: cutoff})
//
//
// By default, the Coders for the elements of each output PCollections is
// inferred from the concrete type.
//
// No Global Shared State
//
// There are three main ways to initialize the state of a DoFn instance
// processing a bundle:
//
// * Define public instance variable state. This state will be automatically
// JSON serialized and then deserialized in the DoFn instances created for
// bundles. This method is good for state known when the original DoFn is
// created in the main program, if it's not overly large. This is not
// suitable for any state which must only be used for a single bundle, as
// DoFn's may be used to process multiple bundles.
//
// * Compute the state as a singleton PCollection and pass it in as a side
// input to the DoFn. This is good if the state needs to be computed by the
// pipeline, or if the state is very large and so is best read from file(s)
// rather than sent as part of the DoFn's serialized state.
//
// * Initialize the state in each DoFn instance, in a StartBundle method.
// This is good if the initialization doesn't depend on any information
// known only by the main program or computed by earlier pipeline
// operations, but is the same for all instances of this DoFn for all
// program executions, say setting up empty caches or initializing constant
// data.
//
// ParDo operations are intended to be able to run in parallel across multiple
// worker machines. This precludes easy sharing and updating mutable state
// across those machines. There is no support in the Beam model for
// communicating and synchronizing updates to shared state across worker
// machines, so programs should not access any mutable global variable state in
// their DoFn, without understanding that the Go processes for the main program
// and workers will each have its own independent copy of such state, and there
// won't be any automatic copying of that state across Java processes. All
// information should be communicated to DoFn instances via main and side
// inputs and serialized state, and all output should be communicated from a
// DoFn instance via output PCollections, in the absence of external
// communication mechanisms written by user code.
//
// Splittable DoFns (Experimental)
//
// Warning: Splittable DoFns are still experimental, largely untested, and
// likely to have bugs.
//
// Splittable DoFns are DoFns that are able to split work within an element,
// as opposed to only at element boundaries like normal DoFns. This is useful
// for DoFns that emit many outputs per input element and can distribute that
// work among multiple workers. The most common examples of this are sources.
//
// In order to split work within an element, splittable DoFns use the concept of
// restrictions, which are objects that are associated with an element and
// describe a portion of work on that element. For example, a restriction
// associated with a filename might describe what byte range within that file to
// process. In addition to restrictions, splittable DoFns also rely on
// restriction trackers to track progress and perform splits on a restriction
// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
// for more details.
//
// Splitting
//
// Splitting means taking one restriction and splitting into two or more that
// cover the entire input space of the original one. In other words, processing
// all the split restrictions should produce identical output to processing
// the original one.
//
// Splitting occurs in two stages. The initial splitting occurs before any
// restrictions have started processing. This step is used to split large
// restrictions into smaller ones that can then be distributed among multiple
// workers for processing. Initial splitting is user-defined and optional.
//
// Dynamic splitting occurs during the processing of a restriction in runners
// that have implemented it. If there are available workers, runners may split
// the unprocessed portion of work from a busy worker and shard it to available
// workers in order to better distribute work. With unsplittable DoFns this can
// only occur on element boundaries, but for splittable DoFns this split
// can land within a restriction and will require splitting that restriction.
//
// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
// only initial splitting. Only initially split restrictions can be
// distributed by liquid sharding. Stragglers will not be split during
// execution with dynamic splitting.
//
// Splittable DoFn Methods
//
// Making a splittable DoFn requires the following methods to be implemented on
// a DoFn in addition to the usual DoFn requirements. In the following
// method signatures `elem` represents the main input elements to the DoFn, and
// should match the types used in ProcessElement. `restriction` represents the
// user-defined restriction, and can be any type as long as it is consistent
// throughout all the splittable DoFn methods:
//
// * `CreateInitialRestriction(element) restriction`
// CreateInitialRestriction creates an initial restriction encompassing an
// entire element. The restriction created stays associated with the element
// it describes.
// * `SplitRestriction(elem, restriction) []restriction`
// SplitRestriction takes an element and its initial restriction, and
// optionally performs an initial split on it, returning a slice of all the
// split restrictions. If no splits are desired, the method returns a slice
// containing only the original restriction. This method will always be
// called on each newly created restriction before they are processed.
// * `RestrictionSize(elem, restriction) float64`
// RestrictionSize returns a cheap size estimation for a restriction. This
// size is an abstract scalar value that represents how much work a
// restriction takes compared to other restrictions in the same DoFn. For
// example, a size of 200 represents twice as much work as a size of
// 100, but the numbers do not represent anything on their own. Size is
// used by runners to estimate work for liquid sharding.
// * `CreateTracker(restriction) restrictionTracker`
// CreateTracker creates and returns a restriction tracker (a concrete type
// implementing the `sdf.RTracker` interface) given a restriction. The
// restriction tracker is used to track progress processing a restriction,
// and to allow for dynamic splits. This method is called on each
// restriction right before processing begins.
// * `ProcessElement(sdf.RTracker, element, func emit(output))`
// For splittable DoFns, ProcessElement requires a restriction tracker
// before inputs, and generally requires emits to be used for outputs, since
// restrictions will generally produce multiple outputs. For more details
// on processing restrictions in a splittable DoFn, see `sdf.RTracker`.
//
// Fault Tolerance
//
// In a distributed system, things can fail: machines can crash, machines can
// be unable to communicate across the network, etc. While individual failures
// are rare, the larger the job, the greater the chance that something,
// somewhere, will fail. Beam runners may strive to mask such failures by
// retrying failed DoFn bundles. This means that a DoFn instance might process
// a bundle partially, then crash for some reason, then be rerun (often as a
// new process) on that same bundle and on the same elements as before.
// Sometimes two or more DoFn instances will be running on the same bundle
// simultaneously, with the system taking the results of the first instance to
// complete successfully. Consequently, the code in a DoFn needs to be written
// such that these duplicate (sequential or concurrent) executions do not cause
// problems. If the outputs of a DoFn are a pure function of its inputs, then
// this requirement is satisfied. However, if a DoFn's execution has external
// side-effects, such as performing updates to external HTTP services, then
// the DoFn's code needs to take care to ensure that those updates are
// idempotent and that concurrent updates are acceptable. This property can be
// difficult to achieve, so it is advisable to strive to keep DoFns as pure
// functions as much as possible.
//
// Optimization
//
// Beam runners may choose to apply optimizations to a pipeline before it is
// executed. A key optimization, fusion, relates to ParDo operations. If one
// ParDo operation produces a PCollection that is then consumed as the main
// input of another ParDo operation, the two ParDo operations will be fused
// together into a single ParDo operation and run in a single pass; this is
// "producer-consumer fusion". Similarly, if two or more ParDo operations
// have the same PCollection main input, they will be fused into a single ParDo
// that makes just one pass over the input PCollection; this is "sibling
// fusion".
//
// If after fusion there are no more unfused references to a PCollection (e.g.,
// one between a producer ParDo and a consumer ParDo), the PCollection itself
// is "fused away" and won't ever be written to disk, saving all the I/O and
// space expense of constructing it.
//
// When Beam runners apply fusion optimization, it is essentially "free" to
// write ParDo operations in a very modular, composable style, each ParDo
// operation doing one clear task, and stringing together sequences of ParDo
// operations to get the desired overall effect. Such programs can be easier to
// understand, easier to unit-test, easier to extend and evolve, and easier to
// reuse in new programs. The predefined library of PTransforms that come with
// Beam makes heavy use of this modular, composable style, trusting to the
// runner to "flatten out" all the compositions into highly optimized stages.
//
// See https://beam.apache.org/documentation/programming-guide/#pardo
// for the web documentation for ParDo
func ParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) PCollection {
ret := MustN(TryParDo(s, dofn, col, opts...))
if len(ret) != 1 {
panic(formatParDoError(dofn, len(ret), 1))
}
return ret[0]
}
// TODO(herohde) 6/1/2017: add windowing aspects to above documentation.
// ParDo2 inserts a ParDo with 2 outputs into the pipeline.
func ParDo2(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection) {
ret := MustN(TryParDo(s, dofn, col, opts...))
if len(ret) != 2 {
panic(formatParDoError(dofn, len(ret), 2))
}
return ret[0], ret[1]
}
// ParDo3 inserts a ParDo with 3 outputs into the pipeline.
func ParDo3(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection) {
ret := MustN(TryParDo(s, dofn, col, opts...))
if len(ret) != 3 {
panic(formatParDoError(dofn, len(ret), 3))
}
return ret[0], ret[1], ret[2]
}
// ParDo4 inserts a ParDo with 4 outputs into the pipeline.
func ParDo4(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection) {
ret := MustN(TryParDo(s, dofn, col, opts...))
if len(ret) != 4 {
panic(formatParDoError(dofn, len(ret), 4))
}
return ret[0], ret[1], ret[2], ret[3]
}
// ParDo5 inserts a ParDo with 5 outputs into the pipeline.
func ParDo5(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection) {
ret := MustN(TryParDo(s, dofn, col, opts...))
if len(ret) != 5 {
panic(formatParDoError(dofn, len(ret), 5))
}
return ret[0], ret[1], ret[2], ret[3], ret[4]
}
// ParDo6 inserts a ParDo with 6 outputs into the pipeline.
func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
ret := MustN(TryParDo(s, dofn, col, opts...))
if len(ret) != 6 {
panic(formatParDoError(dofn, len(ret), 6))
}
return ret[0], ret[1], ret[2], ret[3], ret[4], ret[5]
}
// ParDo7 inserts a ParDo with 7 outputs into the pipeline.
func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
ret := MustN(TryParDo(s, dofn, col, opts...))
if len(ret) != 7 {
panic(formatParDoError(dofn, len(ret), 7))
}
return ret[0], ret[1], ret[2], ret[3], ret[4], ret[5], ret[6]
}
// formatParDoError is a helper function to provide a more concise error
// message to the users when a DoFn and its ParDo pairing is incorrect.
//
// We construct a new graph.Fn using the doFn which is passed. We explicitly
// ignore the error since we already know that its already a DoFn type as
// TryParDo would have panicked otherwise.
func formatParDoError(doFn interface{}, emitSize int, parDoSize int) string {
doFun, _ := graph.NewFn(doFn)
doFnName := doFun.Name()
thisParDo := parDoForSize(parDoSize) // Conveniently keeps the API slim.
correctParDo := parDoForSize(emitSize)
return fmt.Sprintf("DoFn %v has %v outputs, but %v requires %v outputs, use %v instead.", doFnName, emitSize, thisParDo, parDoSize, correctParDo)
}
// parDoForSize takes a in a DoFns emit dimension and recommends the correct
// ParDo to use.
func parDoForSize(emitDim int) string {
switch emitDim {
case 0, 2, 3, 4, 5, 6, 7:
return fmt.Sprintf("ParDo%d", emitDim)
case 1:
return "ParDo"
default:
return "ParDoN"
}
}